1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22pub struct MergeBatcher<Input, C, M: Merger> {
27 chunker: C,
29 chains: Vec<Vec<M::Chunk>>,
33 stash: Vec<M::Chunk>,
35 merger: M,
37 lower: Antichain<M::Time>,
39 frontier: Antichain<M::Time>,
41 logger: Option<Logger>,
43 operator_id: usize,
45 _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51 C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52 M: Merger<Time: Timestamp>,
53{
54 type Input = Input;
55 type Time = M::Time;
56 type Output = M::Chunk;
57
58 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59 Self {
60 logger,
61 operator_id,
62 chunker: C::default(),
63 merger: M::default(),
64 chains: Vec::new(),
65 stash: Vec::new(),
66 frontier: Antichain::new(),
67 lower: Antichain::from_elem(M::Time::minimum()),
68 _marker: PhantomData,
69 }
70 }
71
72 fn push_container(&mut self, container: &mut Input) {
75 self.chunker.push_into(container);
76 while let Some(chunk) = self.chunker.extract() {
77 let chunk = std::mem::take(chunk);
78 self.insert_chain(vec![chunk]);
79 }
80 }
81
82 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87 while let Some(chunk) = self.chunker.finish() {
89 let chunk = std::mem::take(chunk);
90 self.insert_chain(vec![chunk]);
91 }
92
93 while self.chains.len() > 1 {
95 let list1 = self.chain_pop().unwrap();
96 let list2 = self.chain_pop().unwrap();
97 let merged = self.merge_by(list1, list2);
98 self.chain_push(merged);
99 }
100 let merged = self.chain_pop().unwrap_or_default();
101
102 let mut kept = Vec::new();
104 let mut readied = Vec::new();
105 self.frontier.clear();
106
107 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109 if !kept.is_empty() {
110 self.chain_push(kept);
111 }
112
113 self.stash.clear();
114
115 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116 let seal = B::seal(&mut readied, description);
117 self.lower = upper;
118 seal
119 }
120
121 #[inline]
123 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124 self.frontier.borrow()
125 }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132 if !chain.is_empty() {
133 self.chain_push(chain);
134 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135 let list1 = self.chain_pop().unwrap();
136 let list2 = self.chain_pop().unwrap();
137 let merged = self.merge_by(list1, list2);
138 self.chain_push(merged);
139 }
140 }
141 }
142
143 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145 let mut output = Vec::with_capacity(list1.len() + list2.len());
147 self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149 output
150 }
151
152 #[inline]
154 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155 let chain = self.chains.pop();
156 self.account(chain.iter().flatten().map(M::account), -1);
157 chain
158 }
159
160 #[inline]
162 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163 self.account(chain.iter().map(M::account), 1);
164 self.chains.push(chain);
165 }
166
167 #[inline]
172 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173 if let Some(logger) = &self.logger {
174 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175 for (records_, size_, capacity_, allocations_) in items {
176 records = records.saturating_add_unsigned(records_);
177 size = size.saturating_add_unsigned(size_);
178 capacity = capacity.saturating_add_unsigned(capacity_);
179 allocations = allocations.saturating_add_unsigned(allocations_);
180 }
181 logger.log(BatcherEvent {
182 operator: self.operator_id,
183 records_diff: records * diff,
184 size_diff: size * diff,
185 capacity_diff: capacity * diff,
186 allocations_diff: allocations * diff,
187 })
188 }
189 }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193 fn drop(&mut self) {
194 while self.chain_pop().is_some() {}
196 }
197}
198
199pub trait Merger: Default {
201 type Chunk: Default;
203 type Time;
205 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207 fn extract(
209 &mut self,
210 merged: Vec<Self::Chunk>,
211 upper: AntichainRef<Self::Time>,
212 frontier: &mut Antichain<Self::Time>,
213 readied: &mut Vec<Self::Chunk>,
214 kept: &mut Vec<Self::Chunk>,
215 stash: &mut Vec<Self::Chunk>,
216 );
217
218 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226 use std::marker::PhantomData;
234 use timely::container::SizableContainer;
235 use timely::progress::frontier::{Antichain, AntichainRef};
236 use timely::{Accountable, PartialOrder};
237 use crate::trace::implementations::merge_batcher::Merger;
238
239 pub trait InternalMerge: Accountable + SizableContainer + Default {
242 type TimeOwned;
244
245 fn len(&self) -> usize;
247
248 fn clear(&mut self);
250
251 fn account(&self) -> (usize, usize, usize, usize) {
253 let (size, capacity, allocations) = (0, 0, 0);
254 (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255 }
256
257 fn merge_from(
265 &mut self,
266 others: &mut [Self],
267 positions: &mut [usize],
268 );
269
270 fn extract(
286 &mut self,
287 position: &mut usize,
288 upper: AntichainRef<Self::TimeOwned>,
289 frontier: &mut Antichain<Self::TimeOwned>,
290 keep: &mut Self,
291 ship: &mut Self,
292 );
293 }
294
295 pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
297 pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
299
300 pub struct VecMerger<D, T, R> {
302 _marker: PhantomData<(D, T, R)>,
303 }
304
305 impl<D, T, R> Default for VecMerger<D, T, R> {
306 fn default() -> Self { Self { _marker: PhantomData } }
307 }
308
309 impl<D, T, R> VecMerger<D, T, R> {
310 fn target_capacity() -> usize {
315 timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
316 }
317 fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
319 let target = Self::target_capacity();
320 let mut container = stash.pop().unwrap_or_default();
321 container.clear();
322 if container.capacity() != target {
324 container = Vec::with_capacity(target);
325 }
326 container
327 }
328 fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
330 if queue.is_empty() {
331 let target = Self::target_capacity();
332 if stash.len() < 2 {
333 let mut recycled = Vec::from(std::mem::take(queue));
334 recycled.clear();
335 if recycled.capacity() == target {
336 stash.push(recycled);
337 }
338 }
339 if let Some(chunk) = iter.next() {
340 *queue = std::collections::VecDeque::from(chunk);
341 }
342 }
343 }
344 }
345
346 impl<D, T, R> Merger for VecMerger<D, T, R>
347 where
348 D: Ord + Clone + 'static,
349 T: Ord + Clone + PartialOrder + 'static,
350 R: crate::difference::Semigroup + Clone + 'static,
351 {
352 type Chunk = Vec<(D, T, R)>;
353 type Time = T;
354
355 fn merge(
356 &mut self,
357 list1: Vec<Vec<(D, T, R)>>,
358 list2: Vec<Vec<(D, T, R)>>,
359 output: &mut Vec<Vec<(D, T, R)>>,
360 stash: &mut Vec<Vec<(D, T, R)>>,
361 ) {
362 use std::cmp::Ordering;
363 use std::collections::VecDeque;
364
365 let mut iter1 = list1.into_iter();
366 let mut iter2 = list2.into_iter();
367 let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
368 let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
369
370 let mut result = self.empty(stash);
371
372 while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
374 match (d1, t1).cmp(&(d2, t2)) {
375 Ordering::Less => {
376 result.push(q1.pop_front().unwrap());
377 }
378 Ordering::Greater => {
379 result.push(q2.pop_front().unwrap());
380 }
381 Ordering::Equal => {
382 let (d, t, mut r1) = q1.pop_front().unwrap();
383 let (_, _, r2) = q2.pop_front().unwrap();
384 r1.plus_equals(&r2);
385 if !r1.is_zero() {
386 result.push((d, t, r1));
387 }
388 }
389 }
390
391 if result.at_capacity() {
392 output.push(std::mem::take(&mut result));
393 result = self.empty(stash);
394 }
395
396 if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
398 if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
399 }
400
401 if !result.is_empty() { output.push(result); }
403 for q in [q1, q2] {
404 if !q.is_empty() { output.push(Vec::from(q)); }
405 }
406 output.extend(iter1);
407 output.extend(iter2);
408 }
409
410 fn extract(
411 &mut self,
412 merged: Vec<Vec<(D, T, R)>>,
413 upper: AntichainRef<T>,
414 frontier: &mut Antichain<T>,
415 ship: &mut Vec<Vec<(D, T, R)>>,
416 kept: &mut Vec<Vec<(D, T, R)>>,
417 stash: &mut Vec<Vec<(D, T, R)>>,
418 ) {
419 let mut keep = self.empty(stash);
420 let mut ready = self.empty(stash);
421
422 for mut chunk in merged {
423 for (data, time, diff) in chunk.drain(..) {
425 if upper.less_equal(&time) {
426 frontier.insert_with(&time, |time| time.clone());
427 keep.push((data, time, diff));
428 } else {
429 ready.push((data, time, diff));
430 }
431 if keep.at_capacity() {
432 kept.push(std::mem::take(&mut keep));
433 keep = self.empty(stash);
434 }
435 if ready.at_capacity() {
436 ship.push(std::mem::take(&mut ready));
437 ready = self.empty(stash);
438 }
439 }
440 if chunk.capacity() == Self::target_capacity() {
442 stash.push(chunk);
443 }
444 }
445 if !keep.is_empty() { kept.push(keep); }
446 if !ready.is_empty() { ship.push(ready); }
447 }
448
449 fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
450 (chunk.len(), 0, 0, 0)
451 }
452 }
453
454 pub struct InternalMerger<MC> {
456 _marker: PhantomData<MC>,
457 }
458
459 impl<MC> Default for InternalMerger<MC> {
460 fn default() -> Self { Self { _marker: PhantomData } }
461 }
462
463 impl<MC> InternalMerger<MC> where MC: InternalMerge {
464 #[inline]
465 fn empty(&self, stash: &mut Vec<MC>) -> MC {
466 stash.pop().unwrap_or_else(|| {
467 let mut container = MC::default();
468 container.ensure_capacity(&mut None);
469 container
470 })
471 }
472 #[inline]
473 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
474 chunk.clear();
475 stash.push(chunk);
476 }
477 fn drain_side(
482 &self,
483 head: &mut MC,
484 pos: &mut usize,
485 list: &mut std::vec::IntoIter<MC>,
486 result: &mut MC,
487 output: &mut Vec<MC>,
488 stash: &mut Vec<MC>,
489 ) {
490 if *pos < head.len() {
492 result.merge_from(
493 std::slice::from_mut(head),
494 std::slice::from_mut(pos),
495 );
496 }
497 if !result.is_empty() {
499 output.push(std::mem::take(result));
500 *result = self.empty(stash);
501 }
502 output.extend(list);
504 }
505 }
506
507 impl<MC> Merger for InternalMerger<MC>
508 where
509 MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
510 {
511 type Time = MC::TimeOwned;
512 type Chunk = MC;
513
514 fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
515 let mut list1 = list1.into_iter();
516 let mut list2 = list2.into_iter();
517
518 let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
519 let mut positions = [0usize, 0usize];
520
521 let mut result = self.empty(stash);
522
523 while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
525 result.merge_from(&mut heads, &mut positions);
526
527 if positions[0] >= heads[0].len() {
528 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
529 self.recycle(old, stash);
530 positions[0] = 0;
531 }
532 if positions[1] >= heads[1].len() {
533 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
534 self.recycle(old, stash);
535 positions[1] = 0;
536 }
537 if result.at_capacity() {
538 output.push(std::mem::take(&mut result));
539 result = self.empty(stash);
540 }
541 }
542
543 self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
545 self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
546 if !result.is_empty() {
547 output.push(result);
548 }
549 }
550
551 fn extract(
552 &mut self,
553 merged: Vec<Self::Chunk>,
554 upper: AntichainRef<Self::Time>,
555 frontier: &mut Antichain<Self::Time>,
556 ship: &mut Vec<Self::Chunk>,
557 kept: &mut Vec<Self::Chunk>,
558 stash: &mut Vec<Self::Chunk>,
559 ) {
560 let mut keep = self.empty(stash);
561 let mut ready = self.empty(stash);
562
563 for mut buffer in merged {
564 let mut position = 0;
565 let len = buffer.len();
566 while position < len {
567 buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
568 if keep.at_capacity() {
569 kept.push(std::mem::take(&mut keep));
570 keep = self.empty(stash);
571 }
572 if ready.at_capacity() {
573 ship.push(std::mem::take(&mut ready));
574 ready = self.empty(stash);
575 }
576 }
577 self.recycle(buffer, stash);
578 }
579 if !keep.is_empty() {
580 kept.push(keep);
581 }
582 if !ready.is_empty() {
583 ship.push(ready);
584 }
585 }
586
587 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
588 chunk.account()
589 }
590 }
591
592 pub mod vec_internal {
598 use std::cmp::Ordering;
599 use timely::PartialOrder;
600 use timely::container::SizableContainer;
601 use timely::progress::frontier::{Antichain, AntichainRef};
602 use crate::difference::Semigroup;
603 use super::InternalMerge;
604
605 impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
606 type TimeOwned = T;
607
608 fn len(&self) -> usize { Vec::len(self) }
609 fn clear(&mut self) { Vec::clear(self) }
610
611 fn merge_from(
612 &mut self,
613 others: &mut [Self],
614 positions: &mut [usize],
615 ) {
616 match others.len() {
617 0 => {},
618 1 => {
619 let other = &mut others[0];
620 let pos = &mut positions[0];
621 if self.is_empty() && *pos == 0 {
622 std::mem::swap(self, other);
623 return;
624 }
625 self.extend_from_slice(&other[*pos ..]);
626 *pos = other.len();
627 },
628 2 => {
629 let (left, right) = others.split_at_mut(1);
630 let other1 = &mut left[0];
631 let other2 = &mut right[0];
632
633 while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
634 let (d1, t1, _) = &other1[positions[0]];
635 let (d2, t2, _) = &other2[positions[1]];
636 match (d1, t1).cmp(&(d2, t2)) {
638 Ordering::Less => {
639 self.push(other1[positions[0]].clone());
640 positions[0] += 1;
641 }
642 Ordering::Greater => {
643 self.push(other2[positions[1]].clone());
644 positions[1] += 1;
645 }
646 Ordering::Equal => {
647 let (d, t, mut r1) = other1[positions[0]].clone();
648 let (_, _, ref r2) = other2[positions[1]];
649 r1.plus_equals(r2);
650 if !r1.is_zero() {
651 self.push((d, t, r1));
652 }
653 positions[0] += 1;
654 positions[1] += 1;
655 }
656 }
657 }
658 },
659 n => unimplemented!("{n}-way merge not yet supported"),
660 }
661 }
662
663 fn extract(
664 &mut self,
665 position: &mut usize,
666 upper: AntichainRef<T>,
667 frontier: &mut Antichain<T>,
668 keep: &mut Self,
669 ship: &mut Self,
670 ) {
671 let len = self.len();
672 while *position < len && !keep.at_capacity() && !ship.at_capacity() {
673 let (data, time, diff) = self[*position].clone();
674 if upper.less_equal(&time) {
675 frontier.insert_with(&time, |time| time.clone());
676 keep.push((data, time, diff));
677 } else {
678 ship.push((data, time, diff));
679 }
680 *position += 1;
681 }
682 }
683 }
684 }
685
686 pub mod columnation_internal {
688 use std::cmp::Ordering;
689 use columnation::Columnation;
690 use timely::PartialOrder;
691 use timely::container::SizableContainer;
692 use timely::progress::frontier::{Antichain, AntichainRef};
693 use crate::containers::TimelyStack;
694 use crate::difference::Semigroup;
695 use super::InternalMerge;
696
697 impl<D, T, R> InternalMerge for TimelyStack<(D, T, R)>
698 where
699 D: Ord + Columnation + Clone + 'static,
700 T: Ord + Columnation + Clone + PartialOrder + 'static,
701 R: Default + Semigroup + Columnation + Clone + 'static,
702 {
703 type TimeOwned = T;
704
705 fn len(&self) -> usize { self[..].len() }
706 fn clear(&mut self) { TimelyStack::clear(self) }
707
708 fn account(&self) -> (usize, usize, usize, usize) {
709 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
710 let cb = |siz, cap| {
711 size += siz;
712 capacity += cap;
713 allocations += 1;
714 };
715 self.heap_size(cb);
716 (self.len(), size, capacity, allocations)
717 }
718
719 fn merge_from(
720 &mut self,
721 others: &mut [Self],
722 positions: &mut [usize],
723 ) {
724 match others.len() {
725 0 => {},
726 1 => {
727 let other = &mut others[0];
728 let pos = &mut positions[0];
729 if self[..].is_empty() && *pos == 0 {
730 std::mem::swap(self, other);
731 return;
732 }
733 for i in *pos .. other[..].len() {
734 self.copy(&other[i]);
735 }
736 *pos = other[..].len();
737 },
738 2 => {
739 let (left, right) = others.split_at_mut(1);
740 let other1 = &left[0];
741 let other2 = &right[0];
742
743 let mut stash = R::default();
744
745 while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() {
746 let (d1, t1, _) = &other1[positions[0]];
747 let (d2, t2, _) = &other2[positions[1]];
748 match (d1, t1).cmp(&(d2, t2)) {
749 Ordering::Less => {
750 self.copy(&other1[positions[0]]);
751 positions[0] += 1;
752 }
753 Ordering::Greater => {
754 self.copy(&other2[positions[1]]);
755 positions[1] += 1;
756 }
757 Ordering::Equal => {
758 let (_, _, r1) = &other1[positions[0]];
759 let (_, _, r2) = &other2[positions[1]];
760 stash.clone_from(r1);
761 stash.plus_equals(r2);
762 if !stash.is_zero() {
763 let (d, t, _) = &other1[positions[0]];
764 self.copy_destructured(d, t, &stash);
765 }
766 positions[0] += 1;
767 positions[1] += 1;
768 }
769 }
770 }
771 },
772 n => unimplemented!("{n}-way merge not yet supported"),
773 }
774 }
775
776 fn extract(
777 &mut self,
778 position: &mut usize,
779 upper: AntichainRef<T>,
780 frontier: &mut Antichain<T>,
781 keep: &mut Self,
782 ship: &mut Self,
783 ) {
784 let len = self[..].len();
785 while *position < len && !keep.at_capacity() && !ship.at_capacity() {
786 let (data, time, diff) = &self[*position];
787 if upper.less_equal(time) {
788 frontier.insert_with(time, |time| time.clone());
789 keep.copy_destructured(data, time, diff);
790 } else {
791 ship.copy_destructured(data, time, diff);
792 }
793 *position += 1;
794 }
795 }
796 }
797 }
798}