1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22pub struct MergeBatcher<Input, C, M: Merger> {
27 chunker: C,
29 chains: Vec<Vec<M::Chunk>>,
33 stash: Vec<M::Chunk>,
35 merger: M,
37 lower: Antichain<M::Time>,
39 frontier: Antichain<M::Time>,
41 logger: Option<Logger>,
43 operator_id: usize,
45 _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51 C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52 M: Merger<Time: Timestamp>,
53{
54 type Input = Input;
55 type Time = M::Time;
56 type Output = M::Chunk;
57
58 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59 Self {
60 logger,
61 operator_id,
62 chunker: C::default(),
63 merger: M::default(),
64 chains: Vec::new(),
65 stash: Vec::new(),
66 frontier: Antichain::new(),
67 lower: Antichain::from_elem(M::Time::minimum()),
68 _marker: PhantomData,
69 }
70 }
71
72 fn push_container(&mut self, container: &mut Input) {
75 self.chunker.push_into(container);
76 while let Some(chunk) = self.chunker.extract() {
77 let chunk = std::mem::take(chunk);
78 self.insert_chain(vec![chunk]);
79 }
80 }
81
82 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87 while let Some(chunk) = self.chunker.finish() {
89 let chunk = std::mem::take(chunk);
90 self.insert_chain(vec![chunk]);
91 }
92
93 while self.chains.len() > 1 {
95 let list1 = self.chain_pop().unwrap();
96 let list2 = self.chain_pop().unwrap();
97 let merged = self.merge_by(list1, list2);
98 self.chain_push(merged);
99 }
100 let merged = self.chain_pop().unwrap_or_default();
101
102 let mut kept = Vec::new();
104 let mut readied = Vec::new();
105 self.frontier.clear();
106
107 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109 if !kept.is_empty() {
110 self.chain_push(kept);
111 }
112
113 self.stash.clear();
114
115 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116 let seal = B::seal(&mut readied, description);
117 self.lower = upper;
118 seal
119 }
120
121 #[inline]
123 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124 self.frontier.borrow()
125 }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132 if !chain.is_empty() {
133 self.chain_push(chain);
134 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135 let list1 = self.chain_pop().unwrap();
136 let list2 = self.chain_pop().unwrap();
137 let merged = self.merge_by(list1, list2);
138 self.chain_push(merged);
139 }
140 }
141 }
142
143 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145 let mut output = Vec::with_capacity(list1.len() + list2.len());
147 self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149 output
150 }
151
152 #[inline]
154 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155 let chain = self.chains.pop();
156 self.account(chain.iter().flatten().map(M::account), -1);
157 chain
158 }
159
160 #[inline]
162 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163 self.account(chain.iter().map(M::account), 1);
164 self.chains.push(chain);
165 }
166
167 #[inline]
172 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173 if let Some(logger) = &self.logger {
174 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175 for (records_, size_, capacity_, allocations_) in items {
176 records = records.saturating_add_unsigned(records_);
177 size = size.saturating_add_unsigned(size_);
178 capacity = capacity.saturating_add_unsigned(capacity_);
179 allocations = allocations.saturating_add_unsigned(allocations_);
180 }
181 logger.log(BatcherEvent {
182 operator: self.operator_id,
183 records_diff: records * diff,
184 size_diff: size * diff,
185 capacity_diff: capacity * diff,
186 allocations_diff: allocations * diff,
187 })
188 }
189 }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193 fn drop(&mut self) {
194 while self.chain_pop().is_some() {}
196 }
197}
198
199pub trait Merger: Default {
201 type Chunk: Default;
203 type Time;
205 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207 fn extract(
209 &mut self,
210 merged: Vec<Self::Chunk>,
211 upper: AntichainRef<Self::Time>,
212 frontier: &mut Antichain<Self::Time>,
213 readied: &mut Vec<Self::Chunk>,
214 kept: &mut Vec<Self::Chunk>,
215 stash: &mut Vec<Self::Chunk>,
216 );
217
218 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226 use std::marker::PhantomData;
234 use timely::container::SizableContainer;
235 use timely::progress::frontier::{Antichain, AntichainRef};
236 use timely::{Accountable, PartialOrder};
237 use crate::trace::implementations::merge_batcher::Merger;
238
239 pub trait InternalMerge: Accountable + SizableContainer + Default {
242 type TimeOwned;
244
245 fn len(&self) -> usize;
247
248 fn clear(&mut self);
250
251 fn account(&self) -> (usize, usize, usize, usize) {
253 let (size, capacity, allocations) = (0, 0, 0);
254 (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255 }
256
257 fn merge_from(
265 &mut self,
266 others: &mut [Self],
267 positions: &mut [usize],
268 );
269
270 fn extract(
286 &mut self,
287 position: &mut usize,
288 upper: AntichainRef<Self::TimeOwned>,
289 frontier: &mut Antichain<Self::TimeOwned>,
290 keep: &mut Self,
291 ship: &mut Self,
292 );
293 }
294
295 pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
297 pub struct VecMerger<D, T, R> {
299 _marker: PhantomData<(D, T, R)>,
300 }
301
302 impl<D, T, R> Default for VecMerger<D, T, R> {
303 fn default() -> Self { Self { _marker: PhantomData } }
304 }
305
306 impl<D, T, R> VecMerger<D, T, R> {
307 fn target_capacity() -> usize {
312 timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
313 }
314 fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
316 let target = Self::target_capacity();
317 let mut container = stash.pop().unwrap_or_default();
318 container.clear();
319 if container.capacity() != target {
321 container = Vec::with_capacity(target);
322 }
323 container
324 }
325 fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
327 if queue.is_empty() {
328 let target = Self::target_capacity();
329 if stash.len() < 2 {
330 let mut recycled = Vec::from(std::mem::take(queue));
331 recycled.clear();
332 if recycled.capacity() == target {
333 stash.push(recycled);
334 }
335 }
336 if let Some(chunk) = iter.next() {
337 *queue = std::collections::VecDeque::from(chunk);
338 }
339 }
340 }
341 }
342
343 impl<D, T, R> Merger for VecMerger<D, T, R>
344 where
345 D: Ord + Clone + 'static,
346 T: Ord + Clone + PartialOrder + 'static,
347 R: crate::difference::Semigroup + Clone + 'static,
348 {
349 type Chunk = Vec<(D, T, R)>;
350 type Time = T;
351
352 fn merge(
353 &mut self,
354 list1: Vec<Vec<(D, T, R)>>,
355 list2: Vec<Vec<(D, T, R)>>,
356 output: &mut Vec<Vec<(D, T, R)>>,
357 stash: &mut Vec<Vec<(D, T, R)>>,
358 ) {
359 use std::cmp::Ordering;
360 use std::collections::VecDeque;
361
362 let mut iter1 = list1.into_iter();
363 let mut iter2 = list2.into_iter();
364 let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
365 let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
366
367 let mut result = self.empty(stash);
368
369 while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
371 match (d1, t1).cmp(&(d2, t2)) {
372 Ordering::Less => {
373 result.push(q1.pop_front().unwrap());
374 }
375 Ordering::Greater => {
376 result.push(q2.pop_front().unwrap());
377 }
378 Ordering::Equal => {
379 let (d, t, mut r1) = q1.pop_front().unwrap();
380 let (_, _, r2) = q2.pop_front().unwrap();
381 r1.plus_equals(&r2);
382 if !r1.is_zero() {
383 result.push((d, t, r1));
384 }
385 }
386 }
387
388 if result.at_capacity() {
389 output.push(std::mem::take(&mut result));
390 result = self.empty(stash);
391 }
392
393 if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
395 if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
396 }
397
398 if !result.is_empty() { output.push(result); }
400 for q in [q1, q2] {
401 if !q.is_empty() { output.push(Vec::from(q)); }
402 }
403 output.extend(iter1);
404 output.extend(iter2);
405 }
406
407 fn extract(
408 &mut self,
409 merged: Vec<Vec<(D, T, R)>>,
410 upper: AntichainRef<T>,
411 frontier: &mut Antichain<T>,
412 ship: &mut Vec<Vec<(D, T, R)>>,
413 kept: &mut Vec<Vec<(D, T, R)>>,
414 stash: &mut Vec<Vec<(D, T, R)>>,
415 ) {
416 let mut keep = self.empty(stash);
417 let mut ready = self.empty(stash);
418
419 for mut chunk in merged {
420 for (data, time, diff) in chunk.drain(..) {
422 if upper.less_equal(&time) {
423 frontier.insert_with(&time, |time| time.clone());
424 keep.push((data, time, diff));
425 } else {
426 ready.push((data, time, diff));
427 }
428 if keep.at_capacity() {
429 kept.push(std::mem::take(&mut keep));
430 keep = self.empty(stash);
431 }
432 if ready.at_capacity() {
433 ship.push(std::mem::take(&mut ready));
434 ready = self.empty(stash);
435 }
436 }
437 if chunk.capacity() == Self::target_capacity() {
439 stash.push(chunk);
440 }
441 }
442 if !keep.is_empty() { kept.push(keep); }
443 if !ready.is_empty() { ship.push(ready); }
444 }
445
446 fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
447 (chunk.len(), 0, 0, 0)
448 }
449 }
450
451 pub struct InternalMerger<MC> {
453 _marker: PhantomData<MC>,
454 }
455
456 impl<MC> Default for InternalMerger<MC> {
457 fn default() -> Self { Self { _marker: PhantomData } }
458 }
459
460 impl<MC> InternalMerger<MC> where MC: InternalMerge {
461 #[inline]
462 fn empty(&self, stash: &mut Vec<MC>) -> MC {
463 stash.pop().unwrap_or_else(|| {
464 let mut container = MC::default();
465 container.ensure_capacity(&mut None);
466 container
467 })
468 }
469 #[inline]
470 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
471 chunk.clear();
472 stash.push(chunk);
473 }
474 fn drain_side(
479 &self,
480 head: &mut MC,
481 pos: &mut usize,
482 list: &mut std::vec::IntoIter<MC>,
483 result: &mut MC,
484 output: &mut Vec<MC>,
485 stash: &mut Vec<MC>,
486 ) {
487 if *pos < head.len() {
489 result.merge_from(
490 std::slice::from_mut(head),
491 std::slice::from_mut(pos),
492 );
493 }
494 if !result.is_empty() {
496 output.push(std::mem::take(result));
497 *result = self.empty(stash);
498 }
499 output.extend(list);
501 }
502 }
503
504 impl<MC> Merger for InternalMerger<MC>
505 where
506 MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
507 {
508 type Time = MC::TimeOwned;
509 type Chunk = MC;
510
511 fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
512 let mut list1 = list1.into_iter();
513 let mut list2 = list2.into_iter();
514
515 let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
516 let mut positions = [0usize, 0usize];
517
518 let mut result = self.empty(stash);
519
520 while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
522 result.merge_from(&mut heads, &mut positions);
523
524 if positions[0] >= heads[0].len() {
525 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
526 self.recycle(old, stash);
527 positions[0] = 0;
528 }
529 if positions[1] >= heads[1].len() {
530 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
531 self.recycle(old, stash);
532 positions[1] = 0;
533 }
534 if result.at_capacity() {
535 output.push(std::mem::take(&mut result));
536 result = self.empty(stash);
537 }
538 }
539
540 self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
542 self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
543 if !result.is_empty() {
544 output.push(result);
545 }
546 }
547
548 fn extract(
549 &mut self,
550 merged: Vec<Self::Chunk>,
551 upper: AntichainRef<Self::Time>,
552 frontier: &mut Antichain<Self::Time>,
553 ship: &mut Vec<Self::Chunk>,
554 kept: &mut Vec<Self::Chunk>,
555 stash: &mut Vec<Self::Chunk>,
556 ) {
557 let mut keep = self.empty(stash);
558 let mut ready = self.empty(stash);
559
560 for mut buffer in merged {
561 let mut position = 0;
562 let len = buffer.len();
563 while position < len {
564 buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
565 if keep.at_capacity() {
566 kept.push(std::mem::take(&mut keep));
567 keep = self.empty(stash);
568 }
569 if ready.at_capacity() {
570 ship.push(std::mem::take(&mut ready));
571 ready = self.empty(stash);
572 }
573 }
574 self.recycle(buffer, stash);
575 }
576 if !keep.is_empty() {
577 kept.push(keep);
578 }
579 if !ready.is_empty() {
580 ship.push(ready);
581 }
582 }
583
584 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
585 chunk.account()
586 }
587 }
588
589 pub mod vec_internal {
595 use std::cmp::Ordering;
596 use timely::PartialOrder;
597 use timely::container::SizableContainer;
598 use timely::progress::frontier::{Antichain, AntichainRef};
599 use crate::difference::Semigroup;
600 use super::InternalMerge;
601
602 impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
603 type TimeOwned = T;
604
605 fn len(&self) -> usize { Vec::len(self) }
606 fn clear(&mut self) { Vec::clear(self) }
607
608 fn merge_from(
609 &mut self,
610 others: &mut [Self],
611 positions: &mut [usize],
612 ) {
613 match others.len() {
614 0 => {},
615 1 => {
616 let other = &mut others[0];
617 let pos = &mut positions[0];
618 if self.is_empty() && *pos == 0 {
619 std::mem::swap(self, other);
620 return;
621 }
622 self.extend_from_slice(&other[*pos ..]);
623 *pos = other.len();
624 },
625 2 => {
626 let (left, right) = others.split_at_mut(1);
627 let other1 = &mut left[0];
628 let other2 = &mut right[0];
629
630 while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
631 let (d1, t1, _) = &other1[positions[0]];
632 let (d2, t2, _) = &other2[positions[1]];
633 match (d1, t1).cmp(&(d2, t2)) {
635 Ordering::Less => {
636 self.push(other1[positions[0]].clone());
637 positions[0] += 1;
638 }
639 Ordering::Greater => {
640 self.push(other2[positions[1]].clone());
641 positions[1] += 1;
642 }
643 Ordering::Equal => {
644 let (d, t, mut r1) = other1[positions[0]].clone();
645 let (_, _, ref r2) = other2[positions[1]];
646 r1.plus_equals(r2);
647 if !r1.is_zero() {
648 self.push((d, t, r1));
649 }
650 positions[0] += 1;
651 positions[1] += 1;
652 }
653 }
654 }
655 },
656 n => unimplemented!("{n}-way merge not yet supported"),
657 }
658 }
659
660 fn extract(
661 &mut self,
662 position: &mut usize,
663 upper: AntichainRef<T>,
664 frontier: &mut Antichain<T>,
665 keep: &mut Self,
666 ship: &mut Self,
667 ) {
668 let len = self.len();
669 while *position < len && !keep.at_capacity() && !ship.at_capacity() {
670 let (data, time, diff) = self[*position].clone();
671 if upper.less_equal(&time) {
672 frontier.insert_with(&time, |time| time.clone());
673 keep.push((data, time, diff));
674 } else {
675 ship.push((data, time, diff));
676 }
677 *position += 1;
678 }
679 }
680 }
681 }
682
683}