Skip to main content

differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::MergeBatcher;
17use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger};
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Layout, Vector, TStack};
21
22pub use self::val_batch::{OrdValBatch, OrdValBuilder};
23pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
24
25/// A trace implementation using a spine of ordered lists.
26pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
27/// A batcher using ordered lists.
28pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
29/// A builder using ordered lists.
30pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
31
32// /// A trace implementation for empty values using a spine of ordered lists.
33// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
34
35/// A trace implementation backed by columnar storage.
36pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
37/// A batcher for columnar storage.
38pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColInternalMerger<(K,V),T,R>>;
39/// A builder for columnar storage.
40pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
41
42/// A trace implementation using a spine of ordered lists.
43pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
44/// A batcher for ordered lists.
45pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
46/// A builder for ordered lists.
47pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
48
49// /// A trace implementation for empty values using a spine of ordered lists.
50// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
51
52/// A trace implementation backed by columnar storage.
53pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
54/// A batcher for columnar storage
55pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColInternalMerger<(K,()),T,R>>;
56/// A builder for columnar storage
57pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
58
59// /// A trace implementation backed by columnar storage.
60// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
61
62pub use layers::{Vals, Upds};
63/// Layers are containers of lists of some type.
64///
65/// The intent is that they "attach" to an outer layer which has as many values
66/// as the layer has lists, thereby associating a list with each outer value.
67/// A sequence of layers, each matching the number of values in its predecessor,
68/// forms a layered trie: a tree with values of some type on nodes at each depth.
69///
70/// We will form tries here by layering `[Keys, Vals, Upds]` or `[Keys, Upds]`.
71pub mod layers {
72
73    use serde::{Deserialize, Serialize};
74    use crate::trace::implementations::BatchContainer;
75
76    /// A container for non-empty lists of values.
77    #[derive(Debug, Serialize, Deserialize)]
78    pub struct Vals<O, V> {
79        /// Offsets used to provide indexes from keys to values.
80        ///
81        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
82        pub offs: O,
83        /// Concatenated ordered lists of values, bracketed by offsets in `offs`.
84        pub vals: V,
85    }
86
87    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
88        fn default() -> Self { Self::with_capacity(0, 0) }
89    }
90
91    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
92        /// Lower and upper bounds in `self.vals` of the indexed list.
93        #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
94            (self.offs.index(index), self.offs.index(index+1))
95        }
96        /// Retrieves a value using relative indexes.
97        ///
98        /// The first index identifies a list, and the second an item within the list.
99        /// The method adds the list's lower bound to the item index, and then calls
100        /// `get_abs`. Using absolute indexes within the list's bounds can be more
101        /// efficient than using relative indexing.
102        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
103            self.get_abs(self.bounds(list_idx).0 + item_idx)
104        }
105
106        /// Number of lists in the container.
107        pub fn len(&self) -> usize { self.offs.len() - 1 }
108        /// Retrieves a value using an absolute rather than relative index.
109        pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
110            self.vals.index(index)
111        }
112        /// Allocates with capacities for a number of lists and values.
113        pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
114            let mut offs = <O as BatchContainer>::with_capacity(o_size);
115            offs.push_ref(0);
116            Self {
117                offs,
118                vals: <V as BatchContainer>::with_capacity(v_size),
119            }
120        }
121        /// Allocates with enough capacity to contain two inputs.
122        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
123            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
124            offs.push_ref(0);
125            Self {
126                offs,
127                vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
128            }
129        }
130    }
131
132    /// A container for non-empty lists of updates.
133    ///
134    /// This container uses the special representiation of an empty slice to stand in for
135    /// "the previous single element". An empty slice is an otherwise invalid representation.
136    #[derive(Debug, Serialize, Deserialize)]
137    pub struct Upds<O, T, D> {
138        /// Offsets used to provide indexes from values to updates.
139        pub offs: O,
140        /// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
141        pub times: T,
142        /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
143        pub diffs: D,
144    }
145
146    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
147        fn default() -> Self { Self::with_capacity(0, 0) }
148    }
149    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
150        /// Lower and upper bounds in `self.times` and `self.diffs` of the indexed list.
151        pub fn bounds(&self, index: usize) -> (usize, usize) {
152            let mut lower = self.offs.index(index);
153            let upper = self.offs.index(index+1);
154            // We use equal lower and upper to encode "singleton update; just before here".
155            // It should only apply when there is a prior element, so `lower` should be greater than zero.
156            if lower == upper {
157                assert!(lower > 0);
158                lower -= 1;
159            }
160            (lower, upper)
161        }
162        /// Retrieves a value using relative indexes.
163        ///
164        /// The first index identifies a list, and the second an item within the list.
165        /// The method adds the list's lower bound to the item index, and then calls
166        /// `get_abs`. Using absolute indexes within the list's bounds can be more
167        /// efficient than using relative indexing.
168        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
169            self.get_abs(self.bounds(list_idx).0 + item_idx)
170        }
171
172        /// Number of lists in the container.
173        pub fn len(&self) -> usize { self.offs.len() - 1 }
174        /// Retrieves a value using an absolute rather than relative index.
175        pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
176            (self.times.index(index), self.diffs.index(index))
177        }
178        /// Allocates with capacities for a number of lists and values.
179        pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
180            let mut offs = <O as BatchContainer>::with_capacity(o_size);
181            offs.push_ref(0);
182            Self {
183                offs,
184                times: <T as BatchContainer>::with_capacity(u_size),
185                diffs: <D as BatchContainer>::with_capacity(u_size),
186            }
187        }
188        /// Allocates with enough capacity to contain two inputs.
189        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
190            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
191            offs.push_ref(0);
192            Self {
193                offs,
194                times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
195                diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
196            }
197        }
198    }
199
200    /// Helper type for constructing `Upds` containers.
201    pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
202        /// Local stash of updates, to use for consolidation.
203        ///
204        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
205        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
206        stash: Vec<(T::Owned, D::Owned)>,
207        /// Total number of consolidated updates.
208        ///
209        /// Tracked independently to account for duplicate compression.
210        total: usize,
211
212        /// Time container to stage singleton times for evaluation.
213        time_con: T,
214        /// Diff container to stage singleton times for evaluation.
215        diff_con: D,
216    }
217
218    impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
219        fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
220    }
221
222
223    impl<T, D> UpdsBuilder<T, D>
224    where
225        T: BatchContainer<Owned: Ord>,
226        D: BatchContainer<Owned: crate::difference::Semigroup>,
227    {
228        /// Stages one update, but does not seal the set of updates.
229        pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
230            self.stash.push((time, diff));
231        }
232
233        /// Consolidate and insert (if non-empty) the stashed updates.
234        ///
235        /// The return indicates whether the results were indeed non-empty.
236        pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
237            use crate::consolidation;
238            consolidation::consolidate(&mut self.stash);
239            // If everything consolidates away, return false.
240            if self.stash.is_empty() { return false; }
241            // If there is a singleton, we may be able to optimize.
242            if self.stash.len() == 1 {
243                let (time, diff) = self.stash.last().unwrap();
244                self.time_con.clear(); self.time_con.push_own(time);
245                self.diff_con.clear(); self.diff_con.push_own(diff);
246                if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
247                    self.total += 1;
248                    self.stash.clear();
249                    upds.offs.push_ref(upds.times.len());
250                    return true;
251                }
252            }
253            // Conventional; move `stash` into `updates`.
254            self.total += self.stash.len();
255            for (time, diff) in self.stash.drain(..) {
256                upds.times.push_own(&time);
257                upds.diffs.push_own(&diff);
258            }
259            upds.offs.push_ref(upds.times.len());
260            true
261        }
262
263        /// Completes the building and returns the total updates sealed.
264        pub fn total(&self) -> usize { self.total }
265    }
266}
267
268/// Types related to forming batches with values.
269pub mod val_batch {
270
271    use std::marker::PhantomData;
272    use serde::{Deserialize, Serialize};
273    use timely::container::PushInto;
274    use timely::progress::{Antichain, frontier::AntichainRef};
275
276    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
277    use crate::trace::implementations::{BatchContainer, BuilderInput};
278    use crate::trace::implementations::layout;
279
280    use super::{Layout, Vals, Upds, layers::UpdsBuilder};
281
282    /// An immutable collection of update tuples, from a contiguous interval of logical times.
283    #[derive(Debug, Serialize, Deserialize)]
284    #[serde(bound = "
285        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
286        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
287        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
288        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
289        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
290    ")]
291    pub struct OrdValStorage<L: Layout> {
292        /// An ordered list of keys.
293        pub keys: L::KeyContainer,
294        /// For each key in `keys`, a list of values.
295        pub vals: Vals<L::OffsetContainer, L::ValContainer>,
296        /// For each val in `vals`, a list of (time, diff) updates.
297        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
298    }
299
300    /// An immutable collection of update tuples, from a contiguous interval of logical times.
301    ///
302    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
303    /// merge batcher to select.
304    #[derive(Serialize, Deserialize)]
305    #[serde(bound = "
306        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
307        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
308        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
309        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
310        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
311    ")]
312    pub struct OrdValBatch<L: Layout> {
313        /// The updates themselves.
314        pub storage: OrdValStorage<L>,
315        /// Description of the update times this layer represents.
316        pub description: Description<layout::Time<L>>,
317        /// The number of updates reflected in the batch.
318        ///
319        /// We track this separately from `storage` because due to the singleton optimization,
320        /// we may have many more updates than `storage.updates.len()`. It should equal that
321        /// length, plus the number of singleton optimizations employed.
322        pub updates: usize,
323    }
324
325    impl<L: Layout> WithLayout for OrdValBatch<L> {
326        type Layout = L;
327    }
328
329    impl<L: Layout> BatchReader for OrdValBatch<L> {
330
331        type Cursor = OrdValCursor<L>;
332        fn cursor(&self) -> Self::Cursor {
333            OrdValCursor {
334                key_cursor: 0,
335                val_cursor: 0,
336                phantom: PhantomData,
337            }
338        }
339        fn len(&self) -> usize {
340            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
341            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
342            self.updates
343        }
344        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
345    }
346
347    impl<L: Layout> Batch for OrdValBatch<L> {
348        type Merger = OrdValMerger<L>;
349
350        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
351            OrdValMerger::new(self, other, compaction_frontier)
352        }
353
354        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
355            use timely::progress::Timestamp;
356            Self {
357                storage: OrdValStorage {
358                    keys: L::KeyContainer::with_capacity(0),
359                    vals: Default::default(),
360                    upds: Default::default(),
361                },
362                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
363                updates: 0,
364            }
365        }
366    }
367
368    /// State for an in-progress merge.
369    pub struct OrdValMerger<L: Layout> {
370        /// Key position to merge next in the first batch.
371        key_cursor1: usize,
372        /// Key position to merge next in the second batch.
373        key_cursor2: usize,
374        /// result that we are currently assembling.
375        result: OrdValStorage<L>,
376        /// description
377        description: Description<layout::Time<L>>,
378        /// Staging area to consolidate owned times and diffs, before sealing.
379        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
380    }
381
382    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
383    where
384        OrdValBatch<L>: Batch<Time=layout::Time<L>>,
385    {
386        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
387
388            assert!(batch1.upper() == batch2.lower());
389            use crate::lattice::Lattice;
390            let mut since = batch1.description().since().join(batch2.description().since());
391            since = since.join(&compaction_frontier.to_owned());
392
393            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
394
395            let batch1 = &batch1.storage;
396            let batch2 = &batch2.storage;
397
398            OrdValMerger {
399                key_cursor1: 0,
400                key_cursor2: 0,
401                result: OrdValStorage {
402                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
403                    vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
404                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
405                },
406                description,
407                staging: UpdsBuilder::default(),
408            }
409        }
410        fn done(self) -> OrdValBatch<L> {
411            OrdValBatch {
412                updates: self.staging.total(),
413                storage: self.result,
414                description: self.description,
415            }
416        }
417        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
418
419            // An (incomplete) indication of the amount of work we've done so far.
420            let starting_updates = self.staging.total();
421            let mut effort = 0isize;
422
423            // While both mergees are still active, perform single-key merges.
424            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
425                self.merge_key(&source1.storage, &source2.storage);
426                // An (incomplete) accounting of the work we've done.
427                effort = (self.staging.total() - starting_updates) as isize;
428            }
429
430            // Merging is complete, and only copying remains.
431            // Key-by-key copying allows effort interruption, and compaction.
432            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
433                self.copy_key(&source1.storage, self.key_cursor1);
434                self.key_cursor1 += 1;
435                effort = (self.staging.total() - starting_updates) as isize;
436            }
437            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
438                self.copy_key(&source2.storage, self.key_cursor2);
439                self.key_cursor2 += 1;
440                effort = (self.staging.total() - starting_updates) as isize;
441            }
442
443            *fuel -= effort;
444        }
445    }
446
447    // Helper methods in support of merging batches.
448    impl<L: Layout> OrdValMerger<L> {
449        /// Copy the next key in `source`.
450        ///
451        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
452        /// If the result does not wholly cancel, they key will be present in `self` with the
453        /// compacted values and updates.
454        ///
455        /// The caller should be certain to update the cursor, as this method does not do this.
456        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
457            // Capture the initial number of values to determine if the merge was ultimately non-empty.
458            let init_vals = self.result.vals.vals.len();
459            let (mut lower, upper) = source.vals.bounds(cursor);
460            while lower < upper {
461                self.stash_updates_for_val(source, lower);
462                if self.staging.seal(&mut self.result.upds) {
463                    self.result.vals.vals.push_ref(source.vals.get_abs(lower));
464                }
465                lower += 1;
466            }
467
468            // If we have pushed any values, copy the key as well.
469            if self.result.vals.vals.len() > init_vals {
470                self.result.keys.push_ref(source.keys.index(cursor));
471                self.result.vals.offs.push_ref(self.result.vals.vals.len());
472            }
473        }
474        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
475        ///
476        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
477        /// if the updates cancel either directly or after compaction.
478        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
479            use ::std::cmp::Ordering;
480            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
481                Ordering::Less => {
482                    self.copy_key(source1, self.key_cursor1);
483                    self.key_cursor1 += 1;
484                },
485                Ordering::Equal => {
486                    // Keys are equal; must merge all values from both sources for this one key.
487                    let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
488                    let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
489                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
490                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
491                        self.result.vals.offs.push_ref(off);
492                    }
493                    // Increment cursors in either case; the keys are merged.
494                    self.key_cursor1 += 1;
495                    self.key_cursor2 += 1;
496                },
497                Ordering::Greater => {
498                    self.copy_key(source2, self.key_cursor2);
499                    self.key_cursor2 += 1;
500                },
501            }
502        }
503        /// Merge two ranges of values into `self`.
504        ///
505        /// If the compacted result contains values with non-empty updates, the function returns
506        /// an offset that should be recorded to indicate the upper extent of the result values.
507        fn merge_vals(
508            &mut self,
509            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
510            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
511        ) -> Option<usize> {
512            // Capture the initial number of values to determine if the merge was ultimately non-empty.
513            let init_vals = self.result.vals.vals.len();
514            while lower1 < upper1 && lower2 < upper2 {
515                // We compare values, and fold in updates for the lowest values;
516                // if they are non-empty post-consolidation, we write the value.
517                // We could multi-way merge and it wouldn't be very complicated.
518                use ::std::cmp::Ordering;
519                match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
520                    Ordering::Less => {
521                        // Extend stash by updates, with logical compaction applied.
522                        self.stash_updates_for_val(source1, lower1);
523                        if self.staging.seal(&mut self.result.upds) {
524                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
525                        }
526                        lower1 += 1;
527                    },
528                    Ordering::Equal => {
529                        self.stash_updates_for_val(source1, lower1);
530                        self.stash_updates_for_val(source2, lower2);
531                        if self.staging.seal(&mut self.result.upds) {
532                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
533                        }
534                        lower1 += 1;
535                        lower2 += 1;
536                    },
537                    Ordering::Greater => {
538                        // Extend stash by updates, with logical compaction applied.
539                        self.stash_updates_for_val(source2, lower2);
540                        if self.staging.seal(&mut self.result.upds) {
541                            self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
542                        }
543                        lower2 += 1;
544                    },
545                }
546            }
547            // Merging is complete, but we may have remaining elements to push.
548            while lower1 < upper1 {
549                self.stash_updates_for_val(source1, lower1);
550                if self.staging.seal(&mut self.result.upds) {
551                    self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
552                }
553                lower1 += 1;
554            }
555            while lower2 < upper2 {
556                self.stash_updates_for_val(source2, lower2);
557                if self.staging.seal(&mut self.result.upds) {
558                    self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
559                }
560                lower2 += 1;
561            }
562
563            // Values being pushed indicate non-emptiness.
564            if self.result.vals.vals.len() > init_vals {
565                Some(self.result.vals.vals.len())
566            } else {
567                None
568            }
569        }
570
571        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
572        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
573            let (lower, upper) = source.upds.bounds(index);
574            for i in lower .. upper {
575                // NB: Here is where we would need to look back if `lower == upper`.
576                let (time, diff) = source.upds.get_abs(i);
577                use crate::lattice::Lattice;
578                let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
579                new_time.advance_by(self.description.since().borrow());
580                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
581            }
582        }
583    }
584
585    /// A cursor for navigating a single layer.
586    pub struct OrdValCursor<L: Layout> {
587        /// Absolute position of the current key.
588        key_cursor: usize,
589        /// Absolute position of the current value.
590        val_cursor: usize,
591        /// Phantom marker for Rust happiness.
592        phantom: PhantomData<L>,
593    }
594
595    use crate::trace::implementations::WithLayout;
596    impl<L: Layout> WithLayout for OrdValCursor<L> {
597        type Layout = L;
598    }
599
600    impl<L: Layout> Cursor for OrdValCursor<L> {
601
602        type Storage = OrdValBatch<L>;
603
604        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
605        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
606
607        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
608        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
609        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
610            let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
611            for index in lower .. upper {
612                let (time, diff) = storage.storage.upds.get_abs(index);
613                logic(time, diff);
614            }
615        }
616        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
617        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
618        fn step_key(&mut self, storage: &OrdValBatch<L>){
619            self.key_cursor += 1;
620            if self.key_valid(storage) {
621                self.rewind_vals(storage);
622            }
623            else {
624                self.key_cursor = storage.storage.keys.len();
625            }
626        }
627        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
628            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
629            if self.key_valid(storage) {
630                self.rewind_vals(storage);
631            }
632        }
633        fn step_val(&mut self, storage: &OrdValBatch<L>) {
634            self.val_cursor += 1;
635            if !self.val_valid(storage) {
636                self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
637            }
638        }
639        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
640            self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
641        }
642        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
643            self.key_cursor = 0;
644            if self.key_valid(storage) {
645                self.rewind_vals(storage)
646            }
647        }
648        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
649            self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
650        }
651    }
652
653    /// A builder for creating layers from unsorted update tuples.
654    pub struct OrdValBuilder<L: Layout, CI> {
655        /// The in-progress result.
656        ///
657        /// This is public to allow container implementors to set and inspect their container.
658        pub result: OrdValStorage<L>,
659        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
660        _marker: PhantomData<CI>,
661    }
662
663    impl<L, CI> Builder for OrdValBuilder<L, CI>
664    where
665        L: for<'a> Layout<
666            KeyContainer: PushInto<CI::Key<'a>>,
667            ValContainer: PushInto<CI::Val<'a>>,
668        >,
669        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
670    {
671
672        type Input = CI;
673        type Time = layout::Time<L>;
674        type Output = OrdValBatch<L>;
675
676        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
677            Self {
678                result: OrdValStorage {
679                    keys: L::KeyContainer::with_capacity(keys),
680                    vals: Vals::with_capacity(keys + 1, vals),
681                    upds: Upds::with_capacity(vals + 1, upds),
682                },
683                staging: UpdsBuilder::default(),
684                _marker: PhantomData,
685            }
686        }
687
688        #[inline]
689        fn push(&mut self, chunk: &mut Self::Input) {
690            for item in chunk.drain() {
691                let (key, val, time, diff) = CI::into_parts(item);
692
693                // Pre-load the first update.
694                if self.result.keys.is_empty() {
695                    self.result.vals.vals.push_into(val);
696                    self.result.keys.push_into(key);
697                    self.staging.push(time, diff);
698                }
699                // Perhaps this is a continuation of an already received key.
700                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
701                    // Perhaps this is a continuation of an already received value.
702                    if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
703                        self.staging.push(time, diff);
704                    } else {
705                        // New value; complete representation of prior value.
706                        self.staging.seal(&mut self.result.upds);
707                        self.staging.push(time, diff);
708                        self.result.vals.vals.push_into(val);
709                    }
710                } else {
711                    // New key; complete representation of prior key.
712                    self.staging.seal(&mut self.result.upds);
713                    self.staging.push(time, diff);
714                    self.result.vals.offs.push_ref(self.result.vals.vals.len());
715                    self.result.vals.vals.push_into(val);
716                    self.result.keys.push_into(key);
717                }
718            }
719        }
720
721        #[inline(never)]
722        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
723            self.staging.seal(&mut self.result.upds);
724            self.result.vals.offs.push_ref(self.result.vals.vals.len());
725            OrdValBatch {
726                updates: self.staging.total(),
727                storage: self.result,
728                description,
729            }
730        }
731
732        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
733            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
734            let mut builder = Self::with_capacity(keys, vals, upds);
735            for mut chunk in chain.drain(..) {
736                builder.push(&mut chunk);
737            }
738
739            builder.done(description)
740        }
741    }
742}
743
744/// Types related to forming batches of keys.
745pub mod key_batch {
746
747    use std::marker::PhantomData;
748    use serde::{Deserialize, Serialize};
749    use timely::container::PushInto;
750    use timely::progress::{Antichain, frontier::AntichainRef};
751
752    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
753    use crate::trace::implementations::{BatchContainer, BuilderInput};
754    use crate::trace::implementations::layout;
755
756    use super::{Layout, Upds, layers::UpdsBuilder};
757
758    /// An immutable collection of update tuples, from a contiguous interval of logical times.
759    #[derive(Debug, Serialize, Deserialize)]
760    #[serde(bound = "
761        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
762        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
763        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
764        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
765    ")]
766    pub struct OrdKeyStorage<L: Layout> {
767        /// An ordered list of keys, corresponding to entries in `keys_offs`.
768        pub keys: L::KeyContainer,
769        /// For each key in `keys`, a list of (time, diff) updates.
770        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
771    }
772
773    /// An immutable collection of update tuples, from a contiguous interval of logical times.
774    ///
775    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
776    /// merge batcher to select.
777    #[derive(Serialize, Deserialize)]
778    #[serde(bound = "
779        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
780        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
781        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
782        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
783        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
784    ")]
785    pub struct OrdKeyBatch<L: Layout> {
786        /// The updates themselves.
787        pub storage: OrdKeyStorage<L>,
788        /// Description of the update times this layer represents.
789        pub description: Description<layout::Time<L>>,
790        /// The number of updates reflected in the batch.
791        ///
792        /// We track this separately from `storage` because due to the singleton optimization,
793        /// we may have many more updates than `storage.updates.len()`. It should equal that
794        /// length, plus the number of singleton optimizations employed.
795        pub updates: usize,
796
797        /// Single value to return if asked.
798        pub value: L::ValContainer,
799    }
800
801    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
802        /// Creates a container with one value, to slot in to `self.value`.
803        pub fn create_value() -> L::ValContainer {
804            let mut value = L::ValContainer::with_capacity(1);
805            value.push_own(&Default::default());
806            value
807        }
808    }
809
810    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
811        type Layout = L;
812    }
813
814    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
815
816        type Cursor = OrdKeyCursor<L>;
817        fn cursor(&self) -> Self::Cursor {
818            OrdKeyCursor {
819                key_cursor: 0,
820                val_stepped: false,
821                phantom: std::marker::PhantomData,
822            }
823        }
824        fn len(&self) -> usize {
825            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
826            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
827            self.updates
828        }
829        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
830    }
831
832    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
833        type Merger = OrdKeyMerger<L>;
834
835        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
836            OrdKeyMerger::new(self, other, compaction_frontier)
837        }
838
839        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
840            use timely::progress::Timestamp;
841            Self {
842                storage: OrdKeyStorage {
843                    keys: L::KeyContainer::with_capacity(0),
844                    upds: Upds::default(),
845                },
846                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
847                updates: 0,
848                value: Self::create_value(),
849            }
850        }
851    }
852
853    /// State for an in-progress merge.
854    pub struct OrdKeyMerger<L: Layout> {
855        /// Key position to merge next in the first batch.
856        key_cursor1: usize,
857        /// Key position to merge next in the second batch.
858        key_cursor2: usize,
859        /// result that we are currently assembling.
860        result: OrdKeyStorage<L>,
861        /// description
862        description: Description<layout::Time<L>>,
863
864        /// Local stash of updates, to use for consolidation.
865        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
866    }
867
868    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
869    where
870        OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
871    {
872        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
873
874            assert!(batch1.upper() == batch2.lower());
875            use crate::lattice::Lattice;
876            let mut since = batch1.description().since().join(batch2.description().since());
877            since = since.join(&compaction_frontier.to_owned());
878
879            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
880
881            let batch1 = &batch1.storage;
882            let batch2 = &batch2.storage;
883
884            OrdKeyMerger {
885                key_cursor1: 0,
886                key_cursor2: 0,
887                result: OrdKeyStorage {
888                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
889                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
890                },
891                description,
892                staging: UpdsBuilder::default(),
893            }
894        }
895        fn done(self) -> OrdKeyBatch<L> {
896            OrdKeyBatch {
897                updates: self.staging.total(),
898                storage: self.result,
899                description: self.description,
900                value: OrdKeyBatch::<L>::create_value(),
901            }
902        }
903        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
904
905            // An (incomplete) indication of the amount of work we've done so far.
906            let starting_updates = self.staging.total();
907            let mut effort = 0isize;
908
909            // While both mergees are still active, perform single-key merges.
910            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
911                self.merge_key(&source1.storage, &source2.storage);
912                // An (incomplete) accounting of the work we've done.
913                effort = (self.staging.total() - starting_updates) as isize;
914            }
915
916            // Merging is complete, and only copying remains.
917            // Key-by-key copying allows effort interruption, and compaction.
918            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
919                self.copy_key(&source1.storage, self.key_cursor1);
920                self.key_cursor1 += 1;
921                effort = (self.staging.total() - starting_updates) as isize;
922            }
923            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
924                self.copy_key(&source2.storage, self.key_cursor2);
925                self.key_cursor2 += 1;
926                effort = (self.staging.total() - starting_updates) as isize;
927            }
928
929            *fuel -= effort;
930        }
931    }
932
933    // Helper methods in support of merging batches.
934    impl<L: Layout> OrdKeyMerger<L> {
935        /// Copy the next key in `source`.
936        ///
937        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
938        /// If the result does not wholly cancel, they key will be present in `self` with the
939        /// compacted values and updates.
940        ///
941        /// The caller should be certain to update the cursor, as this method does not do this.
942        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
943            self.stash_updates_for_key(source, cursor);
944            if self.staging.seal(&mut self.result.upds) {
945                self.result.keys.push_ref(source.keys.index(cursor));
946            }
947        }
948        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
949        ///
950        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
951        /// if the updates cancel either directly or after compaction.
952        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
953            use ::std::cmp::Ordering;
954            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
955                Ordering::Less => {
956                    self.copy_key(source1, self.key_cursor1);
957                    self.key_cursor1 += 1;
958                },
959                Ordering::Equal => {
960                    // Keys are equal; must merge all updates from both sources for this one key.
961                    self.stash_updates_for_key(source1, self.key_cursor1);
962                    self.stash_updates_for_key(source2, self.key_cursor2);
963                    if self.staging.seal(&mut self.result.upds) {
964                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
965                    }
966                    // Increment cursors in either case; the keys are merged.
967                    self.key_cursor1 += 1;
968                    self.key_cursor2 += 1;
969                },
970                Ordering::Greater => {
971                    self.copy_key(source2, self.key_cursor2);
972                    self.key_cursor2 += 1;
973                },
974            }
975        }
976
977        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
978        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
979            let (lower, upper) = source.upds.bounds(index);
980            for i in lower .. upper {
981                // NB: Here is where we would need to look back if `lower == upper`.
982                let (time, diff) = source.upds.get_abs(i);
983                use crate::lattice::Lattice;
984                let mut new_time = L::TimeContainer::into_owned(time);
985                new_time.advance_by(self.description.since().borrow());
986                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
987            }
988        }
989    }
990
991    /// A cursor for navigating a single layer.
992    pub struct OrdKeyCursor<L: Layout> {
993        /// Absolute position of the current key.
994        key_cursor: usize,
995        /// If the value has been stepped for the key, there are no more values.
996        val_stepped: bool,
997        /// Phantom marker for Rust happiness.
998        phantom: PhantomData<L>,
999    }
1000
1001    use crate::trace::implementations::WithLayout;
1002    impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
1003        type Layout = L;
1004    }
1005
1006    impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
1007
1008        type Storage = OrdKeyBatch<L>;
1009
1010        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1011        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
1012
1013        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1014        fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
1015        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1016            let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1017            for index in lower .. upper {
1018                let (time, diff) = storage.storage.upds.get_abs(index);
1019                logic(time, diff);
1020            }
1021        }
1022        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1023        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1024        fn step_key(&mut self, storage: &Self::Storage){
1025            self.key_cursor += 1;
1026            if self.key_valid(storage) {
1027                self.rewind_vals(storage);
1028            }
1029            else {
1030                self.key_cursor = storage.storage.keys.len();
1031            }
1032        }
1033        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1034            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1035            if self.key_valid(storage) {
1036                self.rewind_vals(storage);
1037            }
1038        }
1039        fn step_val(&mut self, _storage: &Self::Storage) {
1040            self.val_stepped = true;
1041        }
1042        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1043        fn rewind_keys(&mut self, storage: &Self::Storage) {
1044            self.key_cursor = 0;
1045            if self.key_valid(storage) {
1046                self.rewind_vals(storage)
1047            }
1048        }
1049        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1050            self.val_stepped = false;
1051        }
1052    }
1053
1054    /// A builder for creating layers from unsorted update tuples.
1055    pub struct OrdKeyBuilder<L: Layout, CI> {
1056        /// The in-progress result.
1057        ///
1058        /// This is public to allow container implementors to set and inspect their container.
1059        pub result: OrdKeyStorage<L>,
1060        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1061        _marker: PhantomData<CI>,
1062    }
1063
1064    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1065    where
1066        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1067        L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1068        CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1069    {
1070
1071        type Input = CI;
1072        type Time = layout::Time<L>;
1073        type Output = OrdKeyBatch<L>;
1074
1075        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1076            Self {
1077                result: OrdKeyStorage {
1078                    keys: L::KeyContainer::with_capacity(keys),
1079                    upds: Upds::with_capacity(keys+1, upds),
1080                },
1081                staging: UpdsBuilder::default(),
1082                _marker: PhantomData,
1083            }
1084        }
1085
1086        #[inline]
1087        fn push(&mut self, chunk: &mut Self::Input) {
1088            for item in chunk.drain() {
1089                let (key, _val, time, diff) = CI::into_parts(item);
1090                if self.result.keys.is_empty() {
1091                    self.result.keys.push_into(key);
1092                    self.staging.push(time, diff);
1093                }
1094                // Perhaps this is a continuation of an already received key.
1095                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1096                    self.staging.push(time, diff);
1097                } else {
1098                    self.staging.seal(&mut self.result.upds);
1099                    self.staging.push(time, diff);
1100                    self.result.keys.push_into(key);
1101                }
1102            }
1103        }
1104
1105        #[inline(never)]
1106        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1107            self.staging.seal(&mut self.result.upds);
1108            OrdKeyBatch {
1109                updates: self.staging.total(),
1110                storage: self.result,
1111                description,
1112                value: OrdKeyBatch::<L>::create_value(),
1113            }
1114        }
1115
1116        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1117            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1118            let mut builder = Self::with_capacity(keys, vals, upds);
1119            for mut chunk in chain.drain(..) {
1120                builder.push(&mut chunk);
1121            }
1122
1123            builder.done(description)
1124        }
1125    }
1126
1127}