1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22pub struct MergeBatcher<Input, C, M: Merger> {
27 chunker: C,
29 chains: Vec<Vec<M::Chunk>>,
33 stash: Vec<M::Chunk>,
35 merger: M,
37 lower: Antichain<M::Time>,
39 frontier: Antichain<M::Time>,
41 logger: Option<Logger>,
43 operator_id: usize,
45 _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51 C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52 M: Merger<Time: Timestamp>,
53{
54 type Input = Input;
55 type Time = M::Time;
56 type Output = M::Chunk;
57
58 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59 Self {
60 logger,
61 operator_id,
62 chunker: C::default(),
63 merger: M::default(),
64 chains: Vec::new(),
65 stash: Vec::new(),
66 frontier: Antichain::new(),
67 lower: Antichain::from_elem(M::Time::minimum()),
68 _marker: PhantomData,
69 }
70 }
71
72 fn push_container(&mut self, container: &mut Input) {
75 self.chunker.push_into(container);
76 while let Some(chunk) = self.chunker.extract() {
77 let chunk = std::mem::take(chunk);
78 self.insert_chain(vec![chunk]);
79 }
80 }
81
82 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87 while let Some(chunk) = self.chunker.finish() {
89 let chunk = std::mem::take(chunk);
90 self.insert_chain(vec![chunk]);
91 }
92
93 while self.chains.len() > 1 {
95 let list1 = self.chain_pop().unwrap();
96 let list2 = self.chain_pop().unwrap();
97 let merged = self.merge_by(list1, list2);
98 self.chain_push(merged);
99 }
100 let merged = self.chain_pop().unwrap_or_default();
101
102 let mut kept = Vec::new();
104 let mut readied = Vec::new();
105 self.frontier.clear();
106
107 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109 if !kept.is_empty() {
110 self.chain_push(kept);
111 }
112
113 self.stash.clear();
114
115 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116 let seal = B::seal(&mut readied, description);
117 self.lower = upper;
118 seal
119 }
120
121 #[inline]
123 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124 self.frontier.borrow()
125 }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132 if !chain.is_empty() {
133 self.chain_push(chain);
134 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135 let list1 = self.chain_pop().unwrap();
136 let list2 = self.chain_pop().unwrap();
137 let merged = self.merge_by(list1, list2);
138 self.chain_push(merged);
139 }
140 }
141 }
142
143 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145 let mut output = Vec::with_capacity(list1.len() + list2.len());
147 self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149 output
150 }
151
152 #[inline]
154 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155 let chain = self.chains.pop();
156 self.account(chain.iter().flatten().map(M::account), -1);
157 chain
158 }
159
160 #[inline]
162 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163 self.account(chain.iter().map(M::account), 1);
164 self.chains.push(chain);
165 }
166
167 #[inline]
172 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173 if let Some(logger) = &self.logger {
174 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175 for (records_, size_, capacity_, allocations_) in items {
176 records = records.saturating_add_unsigned(records_);
177 size = size.saturating_add_unsigned(size_);
178 capacity = capacity.saturating_add_unsigned(capacity_);
179 allocations = allocations.saturating_add_unsigned(allocations_);
180 }
181 logger.log(BatcherEvent {
182 operator: self.operator_id,
183 records_diff: records * diff,
184 size_diff: size * diff,
185 capacity_diff: capacity * diff,
186 allocations_diff: allocations * diff,
187 })
188 }
189 }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193 fn drop(&mut self) {
194 while self.chain_pop().is_some() {}
196 }
197}
198
199pub trait Merger: Default {
201 type Chunk: Default;
203 type Time;
205 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207 fn extract(
209 &mut self,
210 merged: Vec<Self::Chunk>,
211 upper: AntichainRef<Self::Time>,
212 frontier: &mut Antichain<Self::Time>,
213 readied: &mut Vec<Self::Chunk>,
214 kept: &mut Vec<Self::Chunk>,
215 stash: &mut Vec<Self::Chunk>,
216 );
217
218 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226 use std::marker::PhantomData;
234 use timely::container::SizableContainer;
235 use timely::progress::frontier::{Antichain, AntichainRef};
236 use timely::{Accountable, PartialOrder};
237 use crate::trace::implementations::merge_batcher::Merger;
238
239 pub trait InternalMerge: Accountable + SizableContainer + Default {
242 type TimeOwned;
244
245 fn len(&self) -> usize;
247
248 fn clear(&mut self);
250
251 fn account(&self) -> (usize, usize, usize, usize) {
253 let (size, capacity, allocations) = (0, 0, 0);
254 (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255 }
256
257 fn merge_from(
265 &mut self,
266 others: &mut [Self],
267 positions: &mut [usize],
268 );
269
270 fn extract(
273 &mut self,
274 upper: AntichainRef<Self::TimeOwned>,
275 frontier: &mut Antichain<Self::TimeOwned>,
276 keep: &mut Self,
277 ship: &mut Self,
278 );
279 }
280
281 pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
283 pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
285
286 pub struct VecMerger<D, T, R> {
288 _marker: PhantomData<(D, T, R)>,
289 }
290
291 impl<D, T, R> Default for VecMerger<D, T, R> {
292 fn default() -> Self { Self { _marker: PhantomData } }
293 }
294
295 impl<D, T, R> VecMerger<D, T, R> {
296 fn target_capacity() -> usize {
301 timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
302 }
303 fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
305 let target = Self::target_capacity();
306 let mut container = stash.pop().unwrap_or_default();
307 container.clear();
308 if container.capacity() != target {
310 container = Vec::with_capacity(target);
311 }
312 container
313 }
314 fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
316 if queue.is_empty() {
317 let target = Self::target_capacity();
318 if stash.len() < 2 {
319 let mut recycled = Vec::from(std::mem::take(queue));
320 recycled.clear();
321 if recycled.capacity() == target {
322 stash.push(recycled);
323 }
324 }
325 if let Some(chunk) = iter.next() {
326 *queue = std::collections::VecDeque::from(chunk);
327 }
328 }
329 }
330 }
331
332 impl<D, T, R> Merger for VecMerger<D, T, R>
333 where
334 D: Ord + Clone + 'static,
335 T: Ord + Clone + PartialOrder + 'static,
336 R: crate::difference::Semigroup + Clone + 'static,
337 {
338 type Chunk = Vec<(D, T, R)>;
339 type Time = T;
340
341 fn merge(
342 &mut self,
343 list1: Vec<Vec<(D, T, R)>>,
344 list2: Vec<Vec<(D, T, R)>>,
345 output: &mut Vec<Vec<(D, T, R)>>,
346 stash: &mut Vec<Vec<(D, T, R)>>,
347 ) {
348 use std::cmp::Ordering;
349 use std::collections::VecDeque;
350
351 let mut iter1 = list1.into_iter();
352 let mut iter2 = list2.into_iter();
353 let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
354 let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
355
356 let mut result = self.empty(stash);
357
358 while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
360 match (d1, t1).cmp(&(d2, t2)) {
361 Ordering::Less => {
362 result.push(q1.pop_front().unwrap());
363 }
364 Ordering::Greater => {
365 result.push(q2.pop_front().unwrap());
366 }
367 Ordering::Equal => {
368 let (d, t, mut r1) = q1.pop_front().unwrap();
369 let (_, _, r2) = q2.pop_front().unwrap();
370 r1.plus_equals(&r2);
371 if !r1.is_zero() {
372 result.push((d, t, r1));
373 }
374 }
375 }
376
377 if result.at_capacity() {
378 output.push(std::mem::take(&mut result));
379 result = self.empty(stash);
380 }
381
382 if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
384 if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
385 }
386
387 if !result.is_empty() { output.push(result); }
389 for q in [q1, q2] {
390 if !q.is_empty() { output.push(Vec::from(q)); }
391 }
392 output.extend(iter1);
393 output.extend(iter2);
394 }
395
396 fn extract(
397 &mut self,
398 merged: Vec<Vec<(D, T, R)>>,
399 upper: AntichainRef<T>,
400 frontier: &mut Antichain<T>,
401 ship: &mut Vec<Vec<(D, T, R)>>,
402 kept: &mut Vec<Vec<(D, T, R)>>,
403 stash: &mut Vec<Vec<(D, T, R)>>,
404 ) {
405 let mut keep = self.empty(stash);
406 let mut ready = self.empty(stash);
407
408 for chunk in merged {
409 for (data, time, diff) in chunk {
410 if upper.less_equal(&time) {
411 frontier.insert_with(&time, |time| time.clone());
412 keep.push((data, time, diff));
413 } else {
414 ready.push((data, time, diff));
415 }
416 }
417 if keep.at_capacity() {
418 kept.push(std::mem::take(&mut keep));
419 keep = self.empty(stash);
420 }
421 if ready.at_capacity() {
422 ship.push(std::mem::take(&mut ready));
423 ready = self.empty(stash);
424 }
425 }
426 if !keep.is_empty() { kept.push(keep); }
427 if !ready.is_empty() { ship.push(ready); }
428 }
429
430 fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
431 (chunk.len(), 0, 0, 0)
432 }
433 }
434
435 pub struct InternalMerger<MC> {
437 _marker: PhantomData<MC>,
438 }
439
440 impl<MC> Default for InternalMerger<MC> {
441 fn default() -> Self { Self { _marker: PhantomData } }
442 }
443
444 impl<MC> InternalMerger<MC> where MC: InternalMerge {
445 #[inline]
446 fn empty(&self, stash: &mut Vec<MC>) -> MC {
447 stash.pop().unwrap_or_else(|| {
448 let mut container = MC::default();
449 container.ensure_capacity(&mut None);
450 container
451 })
452 }
453 #[inline]
454 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
455 chunk.clear();
456 stash.push(chunk);
457 }
458 fn drain_side(
463 &self,
464 head: &mut MC,
465 pos: &mut usize,
466 list: &mut std::vec::IntoIter<MC>,
467 result: &mut MC,
468 output: &mut Vec<MC>,
469 stash: &mut Vec<MC>,
470 ) {
471 if *pos < head.len() {
473 result.merge_from(
474 std::slice::from_mut(head),
475 std::slice::from_mut(pos),
476 );
477 }
478 if !result.is_empty() {
480 output.push(std::mem::take(result));
481 *result = self.empty(stash);
482 }
483 output.extend(list);
485 }
486 }
487
488 impl<MC> Merger for InternalMerger<MC>
489 where
490 MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
491 {
492 type Time = MC::TimeOwned;
493 type Chunk = MC;
494
495 fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
496 let mut list1 = list1.into_iter();
497 let mut list2 = list2.into_iter();
498
499 let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
500 let mut positions = [0usize, 0usize];
501
502 let mut result = self.empty(stash);
503
504 while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
506 result.merge_from(&mut heads, &mut positions);
507
508 if positions[0] >= heads[0].len() {
509 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
510 self.recycle(old, stash);
511 positions[0] = 0;
512 }
513 if positions[1] >= heads[1].len() {
514 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
515 self.recycle(old, stash);
516 positions[1] = 0;
517 }
518 if result.at_capacity() {
519 output.push(std::mem::take(&mut result));
520 result = self.empty(stash);
521 }
522 }
523
524 self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
526 self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
527 if !result.is_empty() {
528 output.push(result);
529 }
530 }
531
532 fn extract(
533 &mut self,
534 merged: Vec<Self::Chunk>,
535 upper: AntichainRef<Self::Time>,
536 frontier: &mut Antichain<Self::Time>,
537 ship: &mut Vec<Self::Chunk>,
538 kept: &mut Vec<Self::Chunk>,
539 stash: &mut Vec<Self::Chunk>,
540 ) {
541 let mut keep = self.empty(stash);
542 let mut ready = self.empty(stash);
543
544 for mut buffer in merged {
545 buffer.extract(upper, frontier, &mut keep, &mut ready);
546 self.recycle(buffer, stash);
547 if keep.at_capacity() {
548 kept.push(std::mem::take(&mut keep));
549 keep = self.empty(stash);
550 }
551 if ready.at_capacity() {
552 ship.push(std::mem::take(&mut ready));
553 ready = self.empty(stash);
554 }
555 }
556 if !keep.is_empty() {
557 kept.push(keep);
558 }
559 if !ready.is_empty() {
560 ship.push(ready);
561 }
562 }
563
564 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
565 chunk.account()
566 }
567 }
568
569 pub mod vec_internal {
575 use std::cmp::Ordering;
576 use timely::PartialOrder;
577 use timely::container::SizableContainer;
578 use timely::progress::frontier::{Antichain, AntichainRef};
579 use crate::difference::Semigroup;
580 use super::InternalMerge;
581
582 impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
583 type TimeOwned = T;
584
585 fn len(&self) -> usize { Vec::len(self) }
586 fn clear(&mut self) { Vec::clear(self) }
587
588 fn merge_from(
589 &mut self,
590 others: &mut [Self],
591 positions: &mut [usize],
592 ) {
593 match others.len() {
594 0 => {},
595 1 => {
596 let other = &mut others[0];
597 let pos = &mut positions[0];
598 if self.is_empty() && *pos == 0 {
599 std::mem::swap(self, other);
600 return;
601 }
602 self.extend_from_slice(&other[*pos ..]);
603 *pos = other.len();
604 },
605 2 => {
606 let (left, right) = others.split_at_mut(1);
607 let other1 = &mut left[0];
608 let other2 = &mut right[0];
609
610 while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
611 let (d1, t1, _) = &other1[positions[0]];
612 let (d2, t2, _) = &other2[positions[1]];
613 match (d1, t1).cmp(&(d2, t2)) {
614 Ordering::Less => {
615 self.push(other1[positions[0]].clone());
616 positions[0] += 1;
617 }
618 Ordering::Greater => {
619 self.push(other2[positions[1]].clone());
620 positions[1] += 1;
621 }
622 Ordering::Equal => {
623 let (d, t, mut r1) = other1[positions[0]].clone();
624 let (_, _, ref r2) = other2[positions[1]];
625 r1.plus_equals(r2);
626 if !r1.is_zero() {
627 self.push((d, t, r1));
628 }
629 positions[0] += 1;
630 positions[1] += 1;
631 }
632 }
633 }
634 },
635 n => unimplemented!("{n}-way merge not yet supported"),
636 }
637 }
638
639 fn extract(
640 &mut self,
641 upper: AntichainRef<T>,
642 frontier: &mut Antichain<T>,
643 keep: &mut Self,
644 ship: &mut Self,
645 ) {
646 for (data, time, diff) in self.drain(..) {
647 if upper.less_equal(&time) {
648 frontier.insert_with(&time, |time| time.clone());
649 keep.push((data, time, diff));
650 } else {
651 ship.push((data, time, diff));
652 }
653 }
654 }
655 }
656 }
657
658 pub mod columnation_internal {
660 use std::cmp::Ordering;
661 use columnation::Columnation;
662 use timely::PartialOrder;
663 use timely::container::SizableContainer;
664 use timely::progress::frontier::{Antichain, AntichainRef};
665 use crate::containers::TimelyStack;
666 use crate::difference::Semigroup;
667 use super::InternalMerge;
668
669 impl<D, T, R> InternalMerge for TimelyStack<(D, T, R)>
670 where
671 D: Ord + Columnation + Clone + 'static,
672 T: Ord + Columnation + Clone + PartialOrder + 'static,
673 R: Default + Semigroup + Columnation + Clone + 'static,
674 {
675 type TimeOwned = T;
676
677 fn len(&self) -> usize { self[..].len() }
678 fn clear(&mut self) { TimelyStack::clear(self) }
679
680 fn account(&self) -> (usize, usize, usize, usize) {
681 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
682 let cb = |siz, cap| {
683 size += siz;
684 capacity += cap;
685 allocations += 1;
686 };
687 self.heap_size(cb);
688 (self.len(), size, capacity, allocations)
689 }
690
691 fn merge_from(
692 &mut self,
693 others: &mut [Self],
694 positions: &mut [usize],
695 ) {
696 match others.len() {
697 0 => {},
698 1 => {
699 let other = &mut others[0];
700 let pos = &mut positions[0];
701 if self[..].is_empty() && *pos == 0 {
702 std::mem::swap(self, other);
703 return;
704 }
705 for i in *pos .. other[..].len() {
706 self.copy(&other[i]);
707 }
708 *pos = other[..].len();
709 },
710 2 => {
711 let (left, right) = others.split_at_mut(1);
712 let other1 = &left[0];
713 let other2 = &right[0];
714
715 let mut stash = R::default();
716
717 while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() {
718 let (d1, t1, _) = &other1[positions[0]];
719 let (d2, t2, _) = &other2[positions[1]];
720 match (d1, t1).cmp(&(d2, t2)) {
721 Ordering::Less => {
722 self.copy(&other1[positions[0]]);
723 positions[0] += 1;
724 }
725 Ordering::Greater => {
726 self.copy(&other2[positions[1]]);
727 positions[1] += 1;
728 }
729 Ordering::Equal => {
730 let (_, _, r1) = &other1[positions[0]];
731 let (_, _, r2) = &other2[positions[1]];
732 stash.clone_from(r1);
733 stash.plus_equals(r2);
734 if !stash.is_zero() {
735 let (d, t, _) = &other1[positions[0]];
736 self.copy_destructured(d, t, &stash);
737 }
738 positions[0] += 1;
739 positions[1] += 1;
740 }
741 }
742 }
743 },
744 n => unimplemented!("{n}-way merge not yet supported"),
745 }
746 }
747
748 fn extract(
749 &mut self,
750 upper: AntichainRef<T>,
751 frontier: &mut Antichain<T>,
752 keep: &mut Self,
753 ship: &mut Self,
754 ) {
755 for (data, time, diff) in self.iter() {
756 if upper.less_equal(time) {
757 frontier.insert_with(time, |time| time.clone());
758 keep.copy_destructured(data, time, diff);
759 } else {
760 ship.copy_destructured(data, time, diff);
761 }
762 }
763 }
764 }
765 }
766}