Skip to main content

differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22/// Creates batches from containers of unordered tuples.
23///
24/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
25/// and must produce outputs of type `M::Chunk`.
26pub struct MergeBatcher<Input, C, M: Merger> {
27    /// Transforms input streams to chunks of sorted, consolidated data.
28    chunker: C,
29    /// A sequence of power-of-two length lists of sorted, consolidated containers.
30    ///
31    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
32    chains: Vec<Vec<M::Chunk>>,
33    /// Stash of empty chunks, recycled through the merging process.
34    stash: Vec<M::Chunk>,
35    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
36    merger: M,
37    /// Current lower frontier, we sealed up to here.
38    lower: Antichain<M::Time>,
39    /// The lower-bound frontier of the data, after the last call to seal.
40    frontier: Antichain<M::Time>,
41    /// Logger for size accounting.
42    logger: Option<Logger>,
43    /// Timely operator ID.
44    operator_id: usize,
45    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
46    _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51    C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52    M: Merger<Time: Timestamp>,
53{
54    type Input = Input;
55    type Time = M::Time;
56    type Output = M::Chunk;
57
58    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59        Self {
60            logger,
61            operator_id,
62            chunker: C::default(),
63            merger: M::default(),
64            chains: Vec::new(),
65            stash: Vec::new(),
66            frontier: Antichain::new(),
67            lower: Antichain::from_elem(M::Time::minimum()),
68            _marker: PhantomData,
69        }
70    }
71
72    /// Push a container of data into this merge batcher. Updates the internal chain structure if
73    /// needed.
74    fn push_container(&mut self, container: &mut Input) {
75        self.chunker.push_into(container);
76        while let Some(chunk) = self.chunker.extract() {
77            let chunk = std::mem::take(chunk);
78            self.insert_chain(vec![chunk]);
79        }
80    }
81
82    // Sealing a batch means finding those updates with times not greater or equal to any time
83    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
84    // which we call `lower`, by assumption that after sealing a batcher we receive no more
85    // updates with times not greater or equal to `upper`.
86    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87        // Finish
88        while let Some(chunk) = self.chunker.finish() {
89            let chunk = std::mem::take(chunk);
90            self.insert_chain(vec![chunk]);
91        }
92
93        // Merge all remaining chains into a single chain.
94        while self.chains.len() > 1 {
95            let list1 = self.chain_pop().unwrap();
96            let list2 = self.chain_pop().unwrap();
97            let merged = self.merge_by(list1, list2);
98            self.chain_push(merged);
99        }
100        let merged = self.chain_pop().unwrap_or_default();
101
102        // Extract readied data.
103        let mut kept = Vec::new();
104        let mut readied = Vec::new();
105        self.frontier.clear();
106
107        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109        if !kept.is_empty() {
110            self.chain_push(kept);
111        }
112
113        self.stash.clear();
114
115        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116        let seal = B::seal(&mut readied, description);
117        self.lower = upper;
118        seal
119    }
120
121    /// The frontier of elements remaining after the most recent call to `self.seal`.
122    #[inline]
123    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124        self.frontier.borrow()
125    }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
130    /// by decreasing length.
131    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132        if !chain.is_empty() {
133            self.chain_push(chain);
134            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135                let list1 = self.chain_pop().unwrap();
136                let list2 = self.chain_pop().unwrap();
137                let merged = self.merge_by(list1, list2);
138                self.chain_push(merged);
139            }
140        }
141    }
142
143    // merges two sorted input lists into one sorted output list.
144    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145        // TODO: `list1` and `list2` get dropped; would be better to reuse?
146        let mut output = Vec::with_capacity(list1.len() + list2.len());
147        self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149        output
150    }
151
152    /// Pop a chain and account size changes.
153    #[inline]
154    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155        let chain = self.chains.pop();
156        self.account(chain.iter().flatten().map(M::account), -1);
157        chain
158    }
159
160    /// Push a chain and account size changes.
161    #[inline]
162    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163        self.account(chain.iter().map(M::account), 1);
164        self.chains.push(chain);
165    }
166
167    /// Account size changes. Only performs work if a logger exists.
168    ///
169    /// Calculate the size based on the iterator passed along, with each attribute
170    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
171    #[inline]
172    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173        if let Some(logger) = &self.logger {
174            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175            for (records_, size_, capacity_, allocations_) in items {
176                records = records.saturating_add_unsigned(records_);
177                size = size.saturating_add_unsigned(size_);
178                capacity = capacity.saturating_add_unsigned(capacity_);
179                allocations = allocations.saturating_add_unsigned(allocations_);
180            }
181            logger.log(BatcherEvent {
182                operator: self.operator_id,
183                records_diff: records * diff,
184                size_diff: size * diff,
185                capacity_diff: capacity * diff,
186                allocations_diff: allocations * diff,
187            })
188        }
189    }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193    fn drop(&mut self) {
194        // Cleanup chain to retract accounting information.
195        while self.chain_pop().is_some() {}
196    }
197}
198
199/// A trait to describe interesting moments in a merge batcher.
200pub trait Merger: Default {
201    /// The internal representation of chunks of data.
202    type Chunk: Default;
203    /// The type of time in frontiers to extract updates.
204    type Time;
205    /// Merge chains into an output chain.
206    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207    /// Extract ready updates based on the `upper` frontier.
208    fn extract(
209        &mut self,
210        merged: Vec<Self::Chunk>,
211        upper: AntichainRef<Self::Time>,
212        frontier: &mut Antichain<Self::Time>,
213        readied: &mut Vec<Self::Chunk>,
214        kept: &mut Vec<Self::Chunk>,
215        stash: &mut Vec<Self::Chunk>,
216    );
217
218    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
219    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226    //! Merger implementations for the merge batcher.
227    //!
228    //! The `InternalMerge` trait allows containers to merge sorted, consolidated
229    //! data using internal iteration. The `InternalMerger` type implements the
230    //! `Merger` trait using `InternalMerge`, and is the standard merger for all
231    //! container types.
232
233    use std::marker::PhantomData;
234    use timely::container::SizableContainer;
235    use timely::progress::frontier::{Antichain, AntichainRef};
236    use timely::{Accountable, PartialOrder};
237    use crate::trace::implementations::merge_batcher::Merger;
238
239    /// A container that supports the operations needed by the merge batcher:
240    /// merging sorted chains and extracting updates by time.
241    pub trait InternalMerge: Accountable + SizableContainer + Default {
242        /// The owned time type, for maintaining antichains.
243        type TimeOwned;
244
245        /// The number of items in this container.
246        fn len(&self) -> usize;
247
248        /// Clear the container for reuse.
249        fn clear(&mut self);
250
251        /// Account the allocations behind the chunk.
252        fn account(&self) -> (usize, usize, usize, usize) {
253            let (size, capacity, allocations) = (0, 0, 0);
254            (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255        }
256
257        /// Merge items from sorted inputs into `self`, advancing positions.
258        /// Merges until `self` is at capacity or all inputs are exhausted.
259        ///
260        /// Dispatches based on the number of inputs:
261        /// - **0**: no-op
262        /// - **1**: bulk copy (may swap the input into `self`)
263        /// - **2**: merge two sorted streams
264        fn merge_from(
265            &mut self,
266            others: &mut [Self],
267            positions: &mut [usize],
268        );
269
270        /// Extract updates from `self` into `ship` (times not beyond `upper`)
271        /// and `keep` (times beyond `upper`), updating `frontier` with kept times.
272        ///
273        /// Iteration starts at `*position` and advances `*position` as updates
274        /// are consumed. The implementation must yield (return early) when
275        /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true,
276        /// so the caller can swap out a full output buffer and resume by
277        /// calling `extract` again. The caller invokes `extract` repeatedly
278        /// until `*position >= self.len()`.
279        ///
280        /// This shape exists because `at_capacity()` for `Vec` and
281        /// `TimelyStack` is `len() == capacity()`, which silently becomes
282        /// false again the moment a push past capacity grows the backing
283        /// allocation. Without per-element yielding, a single `extract` call
284        /// can quietly produce oversized output chunks.
285        fn extract(
286            &mut self,
287            position: &mut usize,
288            upper: AntichainRef<Self::TimeOwned>,
289            frontier: &mut Antichain<Self::TimeOwned>,
290            keep: &mut Self,
291            ship: &mut Self,
292        );
293    }
294
295    /// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
296    pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
297    /// A `Merger` using internal iteration for `TimelyStack` containers.
298    pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
299
300    /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
301    pub struct VecMerger<D, T, R> {
302        _marker: PhantomData<(D, T, R)>,
303    }
304
305    impl<D, T, R> Default for VecMerger<D, T, R> {
306        fn default() -> Self { Self { _marker: PhantomData } }
307    }
308
309    impl<D, T, R> VecMerger<D, T, R> {
310        /// The target capacity for output buffers, as a power of two.
311        ///
312        /// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
313        /// If this is mis-set, there is the potential for more memory churn than anticipated.
314        fn target_capacity() -> usize {
315            timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
316        }
317        /// Acquire a buffer with the target capacity.
318        fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
319            let target = Self::target_capacity();
320            let mut container = stash.pop().unwrap_or_default();
321            container.clear();
322            // Reuse if at target; otherwise allocate fresh.
323            if container.capacity() != target {
324                container = Vec::with_capacity(target);
325            }
326            container
327        }
328        /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
329        fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
330            if queue.is_empty() {
331                let target = Self::target_capacity();
332                if stash.len() < 2 {
333                    let mut recycled = Vec::from(std::mem::take(queue));
334                    recycled.clear();
335                    if recycled.capacity() == target {
336                        stash.push(recycled);
337                    }
338                }
339                if let Some(chunk) = iter.next() {
340                    *queue = std::collections::VecDeque::from(chunk);
341                }
342            }
343        }
344    }
345
346    impl<D, T, R> Merger for VecMerger<D, T, R>
347    where
348        D: Ord + Clone + 'static,
349        T: Ord + Clone + PartialOrder + 'static,
350        R: crate::difference::Semigroup + Clone + 'static,
351    {
352        type Chunk = Vec<(D, T, R)>;
353        type Time = T;
354
355        fn merge(
356            &mut self,
357            list1: Vec<Vec<(D, T, R)>>,
358            list2: Vec<Vec<(D, T, R)>>,
359            output: &mut Vec<Vec<(D, T, R)>>,
360            stash: &mut Vec<Vec<(D, T, R)>>,
361        ) {
362            use std::cmp::Ordering;
363            use std::collections::VecDeque;
364
365            let mut iter1 = list1.into_iter();
366            let mut iter2 = list2.into_iter();
367            let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
368            let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
369
370            let mut result = self.empty(stash);
371
372            // Merge while both queues are non-empty.
373            while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
374                match (d1, t1).cmp(&(d2, t2)) {
375                    Ordering::Less => {
376                        result.push(q1.pop_front().unwrap());
377                    }
378                    Ordering::Greater => {
379                        result.push(q2.pop_front().unwrap());
380                    }
381                    Ordering::Equal => {
382                        let (d, t, mut r1) = q1.pop_front().unwrap();
383                        let (_, _, r2) = q2.pop_front().unwrap();
384                        r1.plus_equals(&r2);
385                        if !r1.is_zero() {
386                            result.push((d, t, r1));
387                        }
388                    }
389                }
390
391                if result.at_capacity() {
392                    output.push(std::mem::take(&mut result));
393                    result = self.empty(stash);
394                }
395
396                // Refill emptied queues from their chains.
397                if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
398                if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
399            }
400
401            // Push partial result and remaining data from both sides.
402            if !result.is_empty() { output.push(result); }
403            for q in [q1, q2] {
404                if !q.is_empty() { output.push(Vec::from(q)); }
405            }
406            output.extend(iter1);
407            output.extend(iter2);
408        }
409
410        fn extract(
411            &mut self,
412            merged: Vec<Vec<(D, T, R)>>,
413            upper: AntichainRef<T>,
414            frontier: &mut Antichain<T>,
415            ship: &mut Vec<Vec<(D, T, R)>>,
416            kept: &mut Vec<Vec<(D, T, R)>>,
417            stash: &mut Vec<Vec<(D, T, R)>>,
418        ) {
419            let mut keep = self.empty(stash);
420            let mut ready = self.empty(stash);
421
422            for mut chunk in merged {
423                // Go update-by-update to swap out full containers.
424                for (data, time, diff) in chunk.drain(..) {
425                    if upper.less_equal(&time) {
426                        frontier.insert_with(&time, |time| time.clone());
427                        keep.push((data, time, diff));
428                    } else {
429                        ready.push((data, time, diff));
430                    }
431                    if keep.at_capacity() {
432                        kept.push(std::mem::take(&mut keep));
433                        keep = self.empty(stash);
434                    }
435                    if ready.at_capacity() {
436                        ship.push(std::mem::take(&mut ready));
437                        ready = self.empty(stash);
438                    }
439                }
440                // Recycle the now-empty chunk if it has the right capacity.
441                if chunk.capacity() == Self::target_capacity() {
442                    stash.push(chunk);
443                }
444            }
445            if !keep.is_empty() { kept.push(keep); }
446            if !ready.is_empty() { ship.push(ready); }
447        }
448
449        fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
450            (chunk.len(), 0, 0, 0)
451        }
452    }
453
454    /// A merger that uses internal iteration via [`InternalMerge`].
455    pub struct InternalMerger<MC> {
456        _marker: PhantomData<MC>,
457    }
458
459    impl<MC> Default for InternalMerger<MC> {
460        fn default() -> Self { Self { _marker: PhantomData } }
461    }
462
463    impl<MC> InternalMerger<MC> where MC: InternalMerge {
464        #[inline]
465        fn empty(&self, stash: &mut Vec<MC>) -> MC {
466            stash.pop().unwrap_or_else(|| {
467                let mut container = MC::default();
468                container.ensure_capacity(&mut None);
469                container
470            })
471        }
472        #[inline]
473        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
474            chunk.clear();
475            stash.push(chunk);
476        }
477        /// Drain remaining items from one side into `result`/`output`.
478        ///
479        /// Copies the partially-consumed head into `result`, then appends
480        /// remaining full chunks directly to `output` without copying.
481        fn drain_side(
482            &self,
483            head: &mut MC,
484            pos: &mut usize,
485            list: &mut std::vec::IntoIter<MC>,
486            result: &mut MC,
487            output: &mut Vec<MC>,
488            stash: &mut Vec<MC>,
489        ) {
490            // Copy the partially-consumed head into result.
491            if *pos < head.len() {
492                result.merge_from(
493                    std::slice::from_mut(head),
494                    std::slice::from_mut(pos),
495                );
496            }
497            // Flush result before appending full chunks.
498            if !result.is_empty() {
499                output.push(std::mem::take(result));
500                *result = self.empty(stash);
501            }
502            // Remaining full chunks go directly to output.
503            output.extend(list);
504        }
505    }
506
507    impl<MC> Merger for InternalMerger<MC>
508    where
509        MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
510    {
511        type Time = MC::TimeOwned;
512        type Chunk = MC;
513
514        fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
515            let mut list1 = list1.into_iter();
516            let mut list2 = list2.into_iter();
517
518            let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
519            let mut positions = [0usize, 0usize];
520
521            let mut result = self.empty(stash);
522
523            // Main merge loop: both sides have data.
524            while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
525                result.merge_from(&mut heads, &mut positions);
526
527                if positions[0] >= heads[0].len() {
528                    let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
529                    self.recycle(old, stash);
530                    positions[0] = 0;
531                }
532                if positions[1] >= heads[1].len() {
533                    let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
534                    self.recycle(old, stash);
535                    positions[1] = 0;
536                }
537                if result.at_capacity() {
538                    output.push(std::mem::take(&mut result));
539                    result = self.empty(stash);
540                }
541            }
542
543            // Drain remaining from each side: copy partial head, then append full chunks.
544            self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
545            self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
546            if !result.is_empty() {
547                output.push(result);
548            }
549        }
550
551        fn extract(
552            &mut self,
553            merged: Vec<Self::Chunk>,
554            upper: AntichainRef<Self::Time>,
555            frontier: &mut Antichain<Self::Time>,
556            ship: &mut Vec<Self::Chunk>,
557            kept: &mut Vec<Self::Chunk>,
558            stash: &mut Vec<Self::Chunk>,
559        ) {
560            let mut keep = self.empty(stash);
561            let mut ready = self.empty(stash);
562
563            for mut buffer in merged {
564                let mut position = 0;
565                let len = buffer.len();
566                while position < len {
567                    buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
568                    if keep.at_capacity() {
569                        kept.push(std::mem::take(&mut keep));
570                        keep = self.empty(stash);
571                    }
572                    if ready.at_capacity() {
573                        ship.push(std::mem::take(&mut ready));
574                        ready = self.empty(stash);
575                    }
576                }
577                self.recycle(buffer, stash);
578            }
579            if !keep.is_empty() {
580                kept.push(keep);
581            }
582            if !ready.is_empty() {
583                ship.push(ready);
584            }
585        }
586
587        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
588            chunk.account()
589        }
590    }
591
592    /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
593    ///
594    /// Note: The `VecMerger` type implements `Merger` directly and avoids
595    /// cloning by draining inputs. This `InternalMerge` impl is retained
596    /// because `reduce` requires `Builder::Input: InternalMerge`.
597    pub mod vec_internal {
598        use std::cmp::Ordering;
599        use timely::PartialOrder;
600        use timely::container::SizableContainer;
601        use timely::progress::frontier::{Antichain, AntichainRef};
602        use crate::difference::Semigroup;
603        use super::InternalMerge;
604
605        impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
606            type TimeOwned = T;
607
608            fn len(&self) -> usize { Vec::len(self) }
609            fn clear(&mut self) { Vec::clear(self) }
610
611            fn merge_from(
612                &mut self,
613                others: &mut [Self],
614                positions: &mut [usize],
615            ) {
616                match others.len() {
617                    0 => {},
618                    1 => {
619                        let other = &mut others[0];
620                        let pos = &mut positions[0];
621                        if self.is_empty() && *pos == 0 {
622                            std::mem::swap(self, other);
623                            return;
624                        }
625                        self.extend_from_slice(&other[*pos ..]);
626                        *pos = other.len();
627                    },
628                    2 => {
629                        let (left, right) = others.split_at_mut(1);
630                        let other1 = &mut left[0];
631                        let other2 = &mut right[0];
632
633                        while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
634                            let (d1, t1, _) = &other1[positions[0]];
635                            let (d2, t2, _) = &other2[positions[1]];
636                            // NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release.
637                            match (d1, t1).cmp(&(d2, t2)) {
638                                Ordering::Less => {
639                                    self.push(other1[positions[0]].clone());
640                                    positions[0] += 1;
641                                }
642                                Ordering::Greater => {
643                                    self.push(other2[positions[1]].clone());
644                                    positions[1] += 1;
645                                }
646                                Ordering::Equal => {
647                                    let (d, t, mut r1) = other1[positions[0]].clone();
648                                    let (_, _, ref r2) = other2[positions[1]];
649                                    r1.plus_equals(r2);
650                                    if !r1.is_zero() {
651                                        self.push((d, t, r1));
652                                    }
653                                    positions[0] += 1;
654                                    positions[1] += 1;
655                                }
656                            }
657                        }
658                    },
659                    n => unimplemented!("{n}-way merge not yet supported"),
660                }
661            }
662
663            fn extract(
664                &mut self,
665                position: &mut usize,
666                upper: AntichainRef<T>,
667                frontier: &mut Antichain<T>,
668                keep: &mut Self,
669                ship: &mut Self,
670            ) {
671                let len = self.len();
672                while *position < len && !keep.at_capacity() && !ship.at_capacity() {
673                    let (data, time, diff) = self[*position].clone();
674                    if upper.less_equal(&time) {
675                        frontier.insert_with(&time, |time| time.clone());
676                        keep.push((data, time, diff));
677                    } else {
678                        ship.push((data, time, diff));
679                    }
680                    *position += 1;
681                }
682            }
683        }
684    }
685
686    /// Implementation of `InternalMerge` for `TimelyStack<(D, T, R)>`.
687    pub mod columnation_internal {
688        use std::cmp::Ordering;
689        use columnation::Columnation;
690        use timely::PartialOrder;
691        use timely::container::SizableContainer;
692        use timely::progress::frontier::{Antichain, AntichainRef};
693        use crate::containers::TimelyStack;
694        use crate::difference::Semigroup;
695        use super::InternalMerge;
696
697        impl<D, T, R> InternalMerge for TimelyStack<(D, T, R)>
698        where
699            D: Ord + Columnation + Clone + 'static,
700            T: Ord + Columnation + Clone + PartialOrder + 'static,
701            R: Default + Semigroup + Columnation + Clone + 'static,
702        {
703            type TimeOwned = T;
704
705            fn len(&self) -> usize { self[..].len() }
706            fn clear(&mut self) { TimelyStack::clear(self) }
707
708            fn account(&self) -> (usize, usize, usize, usize) {
709                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
710                let cb = |siz, cap| {
711                    size += siz;
712                    capacity += cap;
713                    allocations += 1;
714                };
715                self.heap_size(cb);
716                (self.len(), size, capacity, allocations)
717            }
718
719            fn merge_from(
720                &mut self,
721                others: &mut [Self],
722                positions: &mut [usize],
723            ) {
724                match others.len() {
725                    0 => {},
726                    1 => {
727                        let other = &mut others[0];
728                        let pos = &mut positions[0];
729                        if self[..].is_empty() && *pos == 0 {
730                            std::mem::swap(self, other);
731                            return;
732                        }
733                        for i in *pos .. other[..].len() {
734                            self.copy(&other[i]);
735                        }
736                        *pos = other[..].len();
737                    },
738                    2 => {
739                        let (left, right) = others.split_at_mut(1);
740                        let other1 = &left[0];
741                        let other2 = &right[0];
742
743                        let mut stash = R::default();
744
745                        while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() {
746                            let (d1, t1, _) = &other1[positions[0]];
747                            let (d2, t2, _) = &other2[positions[1]];
748                            match (d1, t1).cmp(&(d2, t2)) {
749                                Ordering::Less => {
750                                    self.copy(&other1[positions[0]]);
751                                    positions[0] += 1;
752                                }
753                                Ordering::Greater => {
754                                    self.copy(&other2[positions[1]]);
755                                    positions[1] += 1;
756                                }
757                                Ordering::Equal => {
758                                    let (_, _, r1) = &other1[positions[0]];
759                                    let (_, _, r2) = &other2[positions[1]];
760                                    stash.clone_from(r1);
761                                    stash.plus_equals(r2);
762                                    if !stash.is_zero() {
763                                        let (d, t, _) = &other1[positions[0]];
764                                        self.copy_destructured(d, t, &stash);
765                                    }
766                                    positions[0] += 1;
767                                    positions[1] += 1;
768                                }
769                            }
770                        }
771                    },
772                    n => unimplemented!("{n}-way merge not yet supported"),
773                }
774            }
775
776            fn extract(
777                &mut self,
778                position: &mut usize,
779                upper: AntichainRef<T>,
780                frontier: &mut Antichain<T>,
781                keep: &mut Self,
782                ship: &mut Self,
783            ) {
784                let len = self[..].len();
785                while *position < len && !keep.at_capacity() && !ship.at_capacity() {
786                    let (data, time, diff) = &self[*position];
787                    if upper.less_equal(time) {
788                        frontier.insert_with(time, |time| time.clone());
789                        keep.copy_destructured(data, time, diff);
790                    } else {
791                        ship.copy_destructured(data, time, diff);
792                    }
793                    *position += 1;
794                }
795            }
796        }
797    }
798}