Skip to main content

differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22/// Creates batches from containers of unordered tuples.
23///
24/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
25/// and must produce outputs of type `M::Chunk`.
26pub struct MergeBatcher<Input, C, M: Merger> {
27    /// Transforms input streams to chunks of sorted, consolidated data.
28    chunker: C,
29    /// A sequence of power-of-two length lists of sorted, consolidated containers.
30    ///
31    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
32    chains: Vec<Vec<M::Chunk>>,
33    /// Stash of empty chunks, recycled through the merging process.
34    stash: Vec<M::Chunk>,
35    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
36    merger: M,
37    /// Current lower frontier, we sealed up to here.
38    lower: Antichain<M::Time>,
39    /// The lower-bound frontier of the data, after the last call to seal.
40    frontier: Antichain<M::Time>,
41    /// Logger for size accounting.
42    logger: Option<Logger>,
43    /// Timely operator ID.
44    operator_id: usize,
45    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
46    _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51    C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52    M: Merger<Time: Timestamp>,
53{
54    type Input = Input;
55    type Time = M::Time;
56    type Output = M::Chunk;
57
58    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59        Self {
60            logger,
61            operator_id,
62            chunker: C::default(),
63            merger: M::default(),
64            chains: Vec::new(),
65            stash: Vec::new(),
66            frontier: Antichain::new(),
67            lower: Antichain::from_elem(M::Time::minimum()),
68            _marker: PhantomData,
69        }
70    }
71
72    /// Push a container of data into this merge batcher. Updates the internal chain structure if
73    /// needed.
74    fn push_container(&mut self, container: &mut Input) {
75        self.chunker.push_into(container);
76        while let Some(chunk) = self.chunker.extract() {
77            let chunk = std::mem::take(chunk);
78            self.insert_chain(vec![chunk]);
79        }
80    }
81
82    // Sealing a batch means finding those updates with times not greater or equal to any time
83    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
84    // which we call `lower`, by assumption that after sealing a batcher we receive no more
85    // updates with times not greater or equal to `upper`.
86    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87        // Finish
88        while let Some(chunk) = self.chunker.finish() {
89            let chunk = std::mem::take(chunk);
90            self.insert_chain(vec![chunk]);
91        }
92
93        // Merge all remaining chains into a single chain.
94        while self.chains.len() > 1 {
95            let list1 = self.chain_pop().unwrap();
96            let list2 = self.chain_pop().unwrap();
97            let merged = self.merge_by(list1, list2);
98            self.chain_push(merged);
99        }
100        let merged = self.chain_pop().unwrap_or_default();
101
102        // Extract readied data.
103        let mut kept = Vec::new();
104        let mut readied = Vec::new();
105        self.frontier.clear();
106
107        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109        if !kept.is_empty() {
110            self.chain_push(kept);
111        }
112
113        self.stash.clear();
114
115        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116        let seal = B::seal(&mut readied, description);
117        self.lower = upper;
118        seal
119    }
120
121    /// The frontier of elements remaining after the most recent call to `self.seal`.
122    #[inline]
123    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124        self.frontier.borrow()
125    }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
130    /// by decreasing length.
131    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132        if !chain.is_empty() {
133            self.chain_push(chain);
134            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135                let list1 = self.chain_pop().unwrap();
136                let list2 = self.chain_pop().unwrap();
137                let merged = self.merge_by(list1, list2);
138                self.chain_push(merged);
139            }
140        }
141    }
142
143    // merges two sorted input lists into one sorted output list.
144    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145        // TODO: `list1` and `list2` get dropped; would be better to reuse?
146        let mut output = Vec::with_capacity(list1.len() + list2.len());
147        self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149        output
150    }
151
152    /// Pop a chain and account size changes.
153    #[inline]
154    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155        let chain = self.chains.pop();
156        self.account(chain.iter().flatten().map(M::account), -1);
157        chain
158    }
159
160    /// Push a chain and account size changes.
161    #[inline]
162    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163        self.account(chain.iter().map(M::account), 1);
164        self.chains.push(chain);
165    }
166
167    /// Account size changes. Only performs work if a logger exists.
168    ///
169    /// Calculate the size based on the iterator passed along, with each attribute
170    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
171    #[inline]
172    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173        if let Some(logger) = &self.logger {
174            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175            for (records_, size_, capacity_, allocations_) in items {
176                records = records.saturating_add_unsigned(records_);
177                size = size.saturating_add_unsigned(size_);
178                capacity = capacity.saturating_add_unsigned(capacity_);
179                allocations = allocations.saturating_add_unsigned(allocations_);
180            }
181            logger.log(BatcherEvent {
182                operator: self.operator_id,
183                records_diff: records * diff,
184                size_diff: size * diff,
185                capacity_diff: capacity * diff,
186                allocations_diff: allocations * diff,
187            })
188        }
189    }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193    fn drop(&mut self) {
194        // Cleanup chain to retract accounting information.
195        while self.chain_pop().is_some() {}
196    }
197}
198
199/// A trait to describe interesting moments in a merge batcher.
200pub trait Merger: Default {
201    /// The internal representation of chunks of data.
202    type Chunk: Default;
203    /// The type of time in frontiers to extract updates.
204    type Time;
205    /// Merge chains into an output chain.
206    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207    /// Extract ready updates based on the `upper` frontier.
208    fn extract(
209        &mut self,
210        merged: Vec<Self::Chunk>,
211        upper: AntichainRef<Self::Time>,
212        frontier: &mut Antichain<Self::Time>,
213        readied: &mut Vec<Self::Chunk>,
214        kept: &mut Vec<Self::Chunk>,
215        stash: &mut Vec<Self::Chunk>,
216    );
217
218    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
219    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226    //! Merger implementations for the merge batcher.
227    //!
228    //! The `InternalMerge` trait allows containers to merge sorted, consolidated
229    //! data using internal iteration. The `InternalMerger` type implements the
230    //! `Merger` trait using `InternalMerge`, and is the standard merger for all
231    //! container types.
232
233    use std::marker::PhantomData;
234    use timely::container::SizableContainer;
235    use timely::progress::frontier::{Antichain, AntichainRef};
236    use timely::{Accountable, PartialOrder};
237    use crate::trace::implementations::merge_batcher::Merger;
238
239    /// A container that supports the operations needed by the merge batcher:
240    /// merging sorted chains and extracting updates by time.
241    pub trait InternalMerge: Accountable + SizableContainer + Default {
242        /// The owned time type, for maintaining antichains.
243        type TimeOwned;
244
245        /// The number of items in this container.
246        fn len(&self) -> usize;
247
248        /// Clear the container for reuse.
249        fn clear(&mut self);
250
251        /// Account the allocations behind the chunk.
252        fn account(&self) -> (usize, usize, usize, usize) {
253            let (size, capacity, allocations) = (0, 0, 0);
254            (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255        }
256
257        /// Merge items from sorted inputs into `self`, advancing positions.
258        /// Merges until `self` is at capacity or all inputs are exhausted.
259        ///
260        /// Dispatches based on the number of inputs:
261        /// - **0**: no-op
262        /// - **1**: bulk copy (may swap the input into `self`)
263        /// - **2**: merge two sorted streams
264        fn merge_from(
265            &mut self,
266            others: &mut [Self],
267            positions: &mut [usize],
268        );
269
270        /// Extract updates from `self` into `ship` (times not beyond `upper`)
271        /// and `keep` (times beyond `upper`), updating `frontier` with kept times.
272        fn extract(
273            &mut self,
274            upper: AntichainRef<Self::TimeOwned>,
275            frontier: &mut Antichain<Self::TimeOwned>,
276            keep: &mut Self,
277            ship: &mut Self,
278        );
279    }
280
281    /// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
282    pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
283    /// A `Merger` using internal iteration for `TimelyStack` containers.
284    pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
285
286    /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
287    pub struct VecMerger<D, T, R> {
288        _marker: PhantomData<(D, T, R)>,
289    }
290
291    impl<D, T, R> Default for VecMerger<D, T, R> {
292        fn default() -> Self { Self { _marker: PhantomData } }
293    }
294
295    impl<D, T, R> VecMerger<D, T, R> {
296        /// The target capacity for output buffers, as a power of two.
297        ///
298        /// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
299        /// If this is mis-set, there is the potential for more memory churn than anticipated.
300        fn target_capacity() -> usize {
301            timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
302        }
303        /// Acquire a buffer with the target capacity.
304        fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
305            let target = Self::target_capacity();
306            let mut container = stash.pop().unwrap_or_default();
307            container.clear();
308            // Reuse if at target; otherwise allocate fresh.
309            if container.capacity() != target {
310                container = Vec::with_capacity(target);
311            }
312            container
313        }
314        /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
315        fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
316            if queue.is_empty() {
317                let target = Self::target_capacity();
318                if stash.len() < 2 {
319                    let mut recycled = Vec::from(std::mem::take(queue));
320                    recycled.clear();
321                    if recycled.capacity() == target {
322                        stash.push(recycled);
323                    }
324                }
325                if let Some(chunk) = iter.next() {
326                    *queue = std::collections::VecDeque::from(chunk);
327                }
328            }
329        }
330    }
331
332    impl<D, T, R> Merger for VecMerger<D, T, R>
333    where
334        D: Ord + Clone + 'static,
335        T: Ord + Clone + PartialOrder + 'static,
336        R: crate::difference::Semigroup + Clone + 'static,
337    {
338        type Chunk = Vec<(D, T, R)>;
339        type Time = T;
340
341        fn merge(
342            &mut self,
343            list1: Vec<Vec<(D, T, R)>>,
344            list2: Vec<Vec<(D, T, R)>>,
345            output: &mut Vec<Vec<(D, T, R)>>,
346            stash: &mut Vec<Vec<(D, T, R)>>,
347        ) {
348            use std::cmp::Ordering;
349            use std::collections::VecDeque;
350
351            let mut iter1 = list1.into_iter();
352            let mut iter2 = list2.into_iter();
353            let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
354            let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
355
356            let mut result = self.empty(stash);
357
358            // Merge while both queues are non-empty.
359            while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
360                match (d1, t1).cmp(&(d2, t2)) {
361                    Ordering::Less => {
362                        result.push(q1.pop_front().unwrap());
363                    }
364                    Ordering::Greater => {
365                        result.push(q2.pop_front().unwrap());
366                    }
367                    Ordering::Equal => {
368                        let (d, t, mut r1) = q1.pop_front().unwrap();
369                        let (_, _, r2) = q2.pop_front().unwrap();
370                        r1.plus_equals(&r2);
371                        if !r1.is_zero() {
372                            result.push((d, t, r1));
373                        }
374                    }
375                }
376
377                if result.at_capacity() {
378                    output.push(std::mem::take(&mut result));
379                    result = self.empty(stash);
380                }
381
382                // Refill emptied queues from their chains.
383                if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
384                if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
385            }
386
387            // Push partial result and remaining data from both sides.
388            if !result.is_empty() { output.push(result); }
389            for q in [q1, q2] {
390                if !q.is_empty() { output.push(Vec::from(q)); }
391            }
392            output.extend(iter1);
393            output.extend(iter2);
394        }
395
396        fn extract(
397            &mut self,
398            merged: Vec<Vec<(D, T, R)>>,
399            upper: AntichainRef<T>,
400            frontier: &mut Antichain<T>,
401            ship: &mut Vec<Vec<(D, T, R)>>,
402            kept: &mut Vec<Vec<(D, T, R)>>,
403            stash: &mut Vec<Vec<(D, T, R)>>,
404        ) {
405            let mut keep = self.empty(stash);
406            let mut ready = self.empty(stash);
407
408            for chunk in merged {
409                for (data, time, diff) in chunk {
410                    if upper.less_equal(&time) {
411                        frontier.insert_with(&time, |time| time.clone());
412                        keep.push((data, time, diff));
413                    } else {
414                        ready.push((data, time, diff));
415                    }
416                }
417                if keep.at_capacity() {
418                    kept.push(std::mem::take(&mut keep));
419                    keep = self.empty(stash);
420                }
421                if ready.at_capacity() {
422                    ship.push(std::mem::take(&mut ready));
423                    ready = self.empty(stash);
424                }
425            }
426            if !keep.is_empty() { kept.push(keep); }
427            if !ready.is_empty() { ship.push(ready); }
428        }
429
430        fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
431            (chunk.len(), 0, 0, 0)
432        }
433    }
434
435    /// A merger that uses internal iteration via [`InternalMerge`].
436    pub struct InternalMerger<MC> {
437        _marker: PhantomData<MC>,
438    }
439
440    impl<MC> Default for InternalMerger<MC> {
441        fn default() -> Self { Self { _marker: PhantomData } }
442    }
443
444    impl<MC> InternalMerger<MC> where MC: InternalMerge {
445        #[inline]
446        fn empty(&self, stash: &mut Vec<MC>) -> MC {
447            stash.pop().unwrap_or_else(|| {
448                let mut container = MC::default();
449                container.ensure_capacity(&mut None);
450                container
451            })
452        }
453        #[inline]
454        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
455            chunk.clear();
456            stash.push(chunk);
457        }
458        /// Drain remaining items from one side into `result`/`output`.
459        ///
460        /// Copies the partially-consumed head into `result`, then appends
461        /// remaining full chunks directly to `output` without copying.
462        fn drain_side(
463            &self,
464            head: &mut MC,
465            pos: &mut usize,
466            list: &mut std::vec::IntoIter<MC>,
467            result: &mut MC,
468            output: &mut Vec<MC>,
469            stash: &mut Vec<MC>,
470        ) {
471            // Copy the partially-consumed head into result.
472            if *pos < head.len() {
473                result.merge_from(
474                    std::slice::from_mut(head),
475                    std::slice::from_mut(pos),
476                );
477            }
478            // Flush result before appending full chunks.
479            if !result.is_empty() {
480                output.push(std::mem::take(result));
481                *result = self.empty(stash);
482            }
483            // Remaining full chunks go directly to output.
484            output.extend(list);
485        }
486    }
487
488    impl<MC> Merger for InternalMerger<MC>
489    where
490        MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
491    {
492        type Time = MC::TimeOwned;
493        type Chunk = MC;
494
495        fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
496            let mut list1 = list1.into_iter();
497            let mut list2 = list2.into_iter();
498
499            let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
500            let mut positions = [0usize, 0usize];
501
502            let mut result = self.empty(stash);
503
504            // Main merge loop: both sides have data.
505            while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
506                result.merge_from(&mut heads, &mut positions);
507
508                if positions[0] >= heads[0].len() {
509                    let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
510                    self.recycle(old, stash);
511                    positions[0] = 0;
512                }
513                if positions[1] >= heads[1].len() {
514                    let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
515                    self.recycle(old, stash);
516                    positions[1] = 0;
517                }
518                if result.at_capacity() {
519                    output.push(std::mem::take(&mut result));
520                    result = self.empty(stash);
521                }
522            }
523
524            // Drain remaining from each side: copy partial head, then append full chunks.
525            self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
526            self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
527            if !result.is_empty() {
528                output.push(result);
529            }
530        }
531
532        fn extract(
533            &mut self,
534            merged: Vec<Self::Chunk>,
535            upper: AntichainRef<Self::Time>,
536            frontier: &mut Antichain<Self::Time>,
537            ship: &mut Vec<Self::Chunk>,
538            kept: &mut Vec<Self::Chunk>,
539            stash: &mut Vec<Self::Chunk>,
540        ) {
541            let mut keep = self.empty(stash);
542            let mut ready = self.empty(stash);
543
544            for mut buffer in merged {
545                buffer.extract(upper, frontier, &mut keep, &mut ready);
546                self.recycle(buffer, stash);
547                if keep.at_capacity() {
548                    kept.push(std::mem::take(&mut keep));
549                    keep = self.empty(stash);
550                }
551                if ready.at_capacity() {
552                    ship.push(std::mem::take(&mut ready));
553                    ready = self.empty(stash);
554                }
555            }
556            if !keep.is_empty() {
557                kept.push(keep);
558            }
559            if !ready.is_empty() {
560                ship.push(ready);
561            }
562        }
563
564        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
565            chunk.account()
566        }
567    }
568
569    /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
570    ///
571    /// Note: The `VecMerger` type implements `Merger` directly and avoids
572    /// cloning by draining inputs. This `InternalMerge` impl is retained
573    /// because `reduce` requires `Builder::Input: InternalMerge`.
574    pub mod vec_internal {
575        use std::cmp::Ordering;
576        use timely::PartialOrder;
577        use timely::container::SizableContainer;
578        use timely::progress::frontier::{Antichain, AntichainRef};
579        use crate::difference::Semigroup;
580        use super::InternalMerge;
581
582        impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
583            type TimeOwned = T;
584
585            fn len(&self) -> usize { Vec::len(self) }
586            fn clear(&mut self) { Vec::clear(self) }
587
588            fn merge_from(
589                &mut self,
590                others: &mut [Self],
591                positions: &mut [usize],
592            ) {
593                match others.len() {
594                    0 => {},
595                    1 => {
596                        let other = &mut others[0];
597                        let pos = &mut positions[0];
598                        if self.is_empty() && *pos == 0 {
599                            std::mem::swap(self, other);
600                            return;
601                        }
602                        self.extend_from_slice(&other[*pos ..]);
603                        *pos = other.len();
604                    },
605                    2 => {
606                        let (left, right) = others.split_at_mut(1);
607                        let other1 = &mut left[0];
608                        let other2 = &mut right[0];
609
610                        while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
611                            let (d1, t1, _) = &other1[positions[0]];
612                            let (d2, t2, _) = &other2[positions[1]];
613                            match (d1, t1).cmp(&(d2, t2)) {
614                                Ordering::Less => {
615                                    self.push(other1[positions[0]].clone());
616                                    positions[0] += 1;
617                                }
618                                Ordering::Greater => {
619                                    self.push(other2[positions[1]].clone());
620                                    positions[1] += 1;
621                                }
622                                Ordering::Equal => {
623                                    let (d, t, mut r1) = other1[positions[0]].clone();
624                                    let (_, _, ref r2) = other2[positions[1]];
625                                    r1.plus_equals(r2);
626                                    if !r1.is_zero() {
627                                        self.push((d, t, r1));
628                                    }
629                                    positions[0] += 1;
630                                    positions[1] += 1;
631                                }
632                            }
633                        }
634                    },
635                    n => unimplemented!("{n}-way merge not yet supported"),
636                }
637            }
638
639            fn extract(
640                &mut self,
641                upper: AntichainRef<T>,
642                frontier: &mut Antichain<T>,
643                keep: &mut Self,
644                ship: &mut Self,
645            ) {
646                for (data, time, diff) in self.drain(..) {
647                    if upper.less_equal(&time) {
648                        frontier.insert_with(&time, |time| time.clone());
649                        keep.push((data, time, diff));
650                    } else {
651                        ship.push((data, time, diff));
652                    }
653                }
654            }
655        }
656    }
657
658    /// Implementation of `InternalMerge` for `TimelyStack<(D, T, R)>`.
659    pub mod columnation_internal {
660        use std::cmp::Ordering;
661        use columnation::Columnation;
662        use timely::PartialOrder;
663        use timely::container::SizableContainer;
664        use timely::progress::frontier::{Antichain, AntichainRef};
665        use crate::containers::TimelyStack;
666        use crate::difference::Semigroup;
667        use super::InternalMerge;
668
669        impl<D, T, R> InternalMerge for TimelyStack<(D, T, R)>
670        where
671            D: Ord + Columnation + Clone + 'static,
672            T: Ord + Columnation + Clone + PartialOrder + 'static,
673            R: Default + Semigroup + Columnation + Clone + 'static,
674        {
675            type TimeOwned = T;
676
677            fn len(&self) -> usize { self[..].len() }
678            fn clear(&mut self) { TimelyStack::clear(self) }
679
680            fn account(&self) -> (usize, usize, usize, usize) {
681                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
682                let cb = |siz, cap| {
683                    size += siz;
684                    capacity += cap;
685                    allocations += 1;
686                };
687                self.heap_size(cb);
688                (self.len(), size, capacity, allocations)
689            }
690
691            fn merge_from(
692                &mut self,
693                others: &mut [Self],
694                positions: &mut [usize],
695            ) {
696                match others.len() {
697                    0 => {},
698                    1 => {
699                        let other = &mut others[0];
700                        let pos = &mut positions[0];
701                        if self[..].is_empty() && *pos == 0 {
702                            std::mem::swap(self, other);
703                            return;
704                        }
705                        for i in *pos .. other[..].len() {
706                            self.copy(&other[i]);
707                        }
708                        *pos = other[..].len();
709                    },
710                    2 => {
711                        let (left, right) = others.split_at_mut(1);
712                        let other1 = &left[0];
713                        let other2 = &right[0];
714
715                        let mut stash = R::default();
716
717                        while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() {
718                            let (d1, t1, _) = &other1[positions[0]];
719                            let (d2, t2, _) = &other2[positions[1]];
720                            match (d1, t1).cmp(&(d2, t2)) {
721                                Ordering::Less => {
722                                    self.copy(&other1[positions[0]]);
723                                    positions[0] += 1;
724                                }
725                                Ordering::Greater => {
726                                    self.copy(&other2[positions[1]]);
727                                    positions[1] += 1;
728                                }
729                                Ordering::Equal => {
730                                    let (_, _, r1) = &other1[positions[0]];
731                                    let (_, _, r2) = &other2[positions[1]];
732                                    stash.clone_from(r1);
733                                    stash.plus_equals(r2);
734                                    if !stash.is_zero() {
735                                        let (d, t, _) = &other1[positions[0]];
736                                        self.copy_destructured(d, t, &stash);
737                                    }
738                                    positions[0] += 1;
739                                    positions[1] += 1;
740                                }
741                            }
742                        }
743                    },
744                    n => unimplemented!("{n}-way merge not yet supported"),
745                }
746            }
747
748            fn extract(
749                &mut self,
750                upper: AntichainRef<T>,
751                frontier: &mut Antichain<T>,
752                keep: &mut Self,
753                ship: &mut Self,
754            ) {
755                for (data, time, diff) in self.iter() {
756                    if upper.less_equal(time) {
757                        frontier.insert_with(time, |time| time.clone());
758                        keep.copy_destructured(data, time, diff);
759                    } else {
760                        ship.copy_destructured(data, time, diff);
761                    }
762                }
763            }
764        }
765    }
766}