Skip to main content

palimpsest_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::merge_batcher::{ColMerger, MergeBatcher, VecMerger};
16use crate::trace::implementations::spine_fueled::Spine;
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, TStack, Vector};
20
21pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
22pub use self::val_batch::{OrdValBatch, OrdValBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K, V), T, R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> =
28    MergeBatcher<Vec<((K, V), T, R)>, VecChunker<((K, V), T, R)>, VecMerger<(K, V), T, R>>;
29/// A builder using ordered lists.
30pub type RcOrdValBuilder<K, V, T, R> =
31    RcBuilder<OrdValBuilder<Vector<((K, V), T, R)>, Vec<((K, V), T, R)>>>;
32
33// /// A trace implementation for empty values using a spine of ordered lists.
34// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
35
36/// A trace implementation backed by columnar storage.
37pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K, V), T, R)>>>>;
38/// A batcher for columnar storage.
39pub type ColValBatcher<K, V, T, R> =
40    MergeBatcher<Vec<((K, V), T, R)>, ColumnationChunker<((K, V), T, R)>, ColMerger<(K, V), T, R>>;
41/// A builder for columnar storage.
42pub type ColValBuilder<K, V, T, R> =
43    RcBuilder<OrdValBuilder<TStack<((K, V), T, R)>, TimelyStack<((K, V), T, R)>>>;
44
45/// A trace implementation using a spine of ordered lists.
46pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K, ()), T, R)>>>>;
47/// A batcher for ordered lists.
48pub type OrdKeyBatcher<K, T, R> =
49    MergeBatcher<Vec<((K, ()), T, R)>, VecChunker<((K, ()), T, R)>, VecMerger<(K, ()), T, R>>;
50/// A builder for ordered lists.
51pub type RcOrdKeyBuilder<K, T, R> =
52    RcBuilder<OrdKeyBuilder<Vector<((K, ()), T, R)>, Vec<((K, ()), T, R)>>>;
53
54// /// A trace implementation for empty values using a spine of ordered lists.
55// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
56
57/// A trace implementation backed by columnar storage.
58pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K, ()), T, R)>>>>;
59/// A batcher for columnar storage
60pub type ColKeyBatcher<K, T, R> = MergeBatcher<
61    Vec<((K, ()), T, R)>,
62    ColumnationChunker<((K, ()), T, R)>,
63    ColMerger<(K, ()), T, R>,
64>;
65/// A builder for columnar storage
66pub type ColKeyBuilder<K, T, R> =
67    RcBuilder<OrdKeyBuilder<TStack<((K, ()), T, R)>, TimelyStack<((K, ()), T, R)>>>;
68
69// /// A trace implementation backed by columnar storage.
70// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
71
72pub use layers::{Upds, Vals};
73/// Layers are containers of lists of some type.
74///
75/// The intent is that they "attach" to an outer layer which has as many values
76/// as the layer has lists, thereby associating a list with each outer value.
77/// A sequence of layers, each matching the number of values in its predecessor,
78/// forms a layered trie: a tree with values of some type on nodes at each depth.
79///
80/// We will form tries here by layering `[Keys, Vals, Upds]` or `[Keys, Upds]`.
81pub mod layers {
82
83    use crate::trace::implementations::BatchContainer;
84    use serde::{Deserialize, Serialize};
85
86    /// A container for non-empty lists of values.
87    #[derive(Debug, Serialize, Deserialize)]
88    pub struct Vals<O, V> {
89        /// Offsets used to provide indexes from keys to values.
90        ///
91        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
92        pub offs: O,
93        /// Concatenated ordered lists of values, bracketed by offsets in `offs`.
94        pub vals: V,
95    }
96
97    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
98        fn default() -> Self {
99            Self::with_capacity(0, 0)
100        }
101    }
102
103    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
104        /// Lower and upper bounds in `self.vals` of the indexed list.
105        pub fn bounds(&self, index: usize) -> (usize, usize) {
106            (self.offs.index(index), self.offs.index(index + 1))
107        }
108        /// Retrieves a value using relative indexes.
109        ///
110        /// The first index identifies a list, and the second an item within the list.
111        /// The method adds the list's lower bound to the item index, and then calls
112        /// `get_abs`. Using absolute indexes within the list's bounds can be more
113        /// efficient than using relative indexing.
114        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
115            self.get_abs(self.bounds(list_idx).0 + item_idx)
116        }
117
118        /// Number of lists in the container.
119        pub fn len(&self) -> usize {
120            self.offs.len() - 1
121        }
122        /// Retrieves a value using an absolute rather than relative index.
123        pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
124            self.vals.index(index)
125        }
126        /// Allocates with capacities for a number of lists and values.
127        pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
128            let mut offs = <O as BatchContainer>::with_capacity(o_size);
129            offs.push_ref(0);
130            Self {
131                offs,
132                vals: <V as BatchContainer>::with_capacity(v_size),
133            }
134        }
135        /// Allocates with enough capacity to contain two inputs.
136        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
137            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
138            offs.push_ref(0);
139            Self {
140                offs,
141                vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
142            }
143        }
144    }
145
146    /// A container for non-empty lists of updates.
147    ///
148    /// This container uses the special representiation of an empty slice to stand in for
149    /// "the previous single element". An empty slice is an otherwise invalid representation.
150    #[derive(Debug, Serialize, Deserialize)]
151    pub struct Upds<O, T, D> {
152        /// Offsets used to provide indexes from values to updates.
153        pub offs: O,
154        /// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
155        pub times: T,
156        /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
157        pub diffs: D,
158    }
159
160    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer>
161        Default for Upds<O, T, D>
162    {
163        fn default() -> Self {
164            Self::with_capacity(0, 0)
165        }
166    }
167    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer>
168        Upds<O, T, D>
169    {
170        /// Lower and upper bounds in `self.times` and `self.diffs` of the indexed list.
171        pub fn bounds(&self, index: usize) -> (usize, usize) {
172            let mut lower = self.offs.index(index);
173            let upper = self.offs.index(index + 1);
174            // We use equal lower and upper to encode "singleton update; just before here".
175            // It should only apply when there is a prior element, so `lower` should be greater than zero.
176            if lower == upper {
177                assert!(lower > 0);
178                lower -= 1;
179            }
180            (lower, upper)
181        }
182        /// Retrieves a value using relative indexes.
183        ///
184        /// The first index identifies a list, and the second an item within the list.
185        /// The method adds the list's lower bound to the item index, and then calls
186        /// `get_abs`. Using absolute indexes within the list's bounds can be more
187        /// efficient than using relative indexing.
188        pub fn get_rel(
189            &self,
190            list_idx: usize,
191            item_idx: usize,
192        ) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
193            self.get_abs(self.bounds(list_idx).0 + item_idx)
194        }
195
196        /// Number of lists in the container.
197        pub fn len(&self) -> usize {
198            self.offs.len() - 1
199        }
200        /// Retrieves a value using an absolute rather than relative index.
201        pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
202            (self.times.index(index), self.diffs.index(index))
203        }
204        /// Allocates with capacities for a number of lists and values.
205        pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
206            let mut offs = <O as BatchContainer>::with_capacity(o_size);
207            offs.push_ref(0);
208            Self {
209                offs,
210                times: <T as BatchContainer>::with_capacity(u_size),
211                diffs: <D as BatchContainer>::with_capacity(u_size),
212            }
213        }
214        /// Allocates with enough capacity to contain two inputs.
215        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
216            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
217            offs.push_ref(0);
218            Self {
219                offs,
220                times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
221                diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
222            }
223        }
224    }
225
226    /// Helper type for constructing `Upds` containers.
227    pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
228        /// Local stash of updates, to use for consolidation.
229        ///
230        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
231        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
232        stash: Vec<(T::Owned, D::Owned)>,
233        /// Total number of consolidated updates.
234        ///
235        /// Tracked independently to account for duplicate compression.
236        total: usize,
237
238        /// Time container to stage singleton times for evaluation.
239        time_con: T,
240        /// Diff container to stage singleton times for evaluation.
241        diff_con: D,
242    }
243
244    impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
245        fn default() -> Self {
246            Self {
247                stash: Vec::default(),
248                total: 0,
249                time_con: BatchContainer::with_capacity(1),
250                diff_con: BatchContainer::with_capacity(1),
251            }
252        }
253    }
254
255    impl<T, D> UpdsBuilder<T, D>
256    where
257        T: BatchContainer<Owned: Ord>,
258        D: BatchContainer<Owned: crate::difference::Semigroup>,
259    {
260        /// Stages one update, but does not seal the set of updates.
261        pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
262            self.stash.push((time, diff));
263        }
264
265        /// Consolidate and insert (if non-empty) the stashed updates.
266        ///
267        /// The return indicates whether the results were indeed non-empty.
268        pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(
269            &mut self,
270            upds: &mut Upds<O, T, D>,
271        ) -> bool {
272            use crate::consolidation;
273            consolidation::consolidate(&mut self.stash);
274            // If everything consolidates away, return false.
275            if self.stash.is_empty() {
276                return false;
277            }
278            // If there is a singleton, we may be able to optimize.
279            if self.stash.len() == 1 {
280                let (time, diff) = self.stash.last().unwrap();
281                self.time_con.clear();
282                self.time_con.push_own(time);
283                self.diff_con.clear();
284                self.diff_con.push_own(diff);
285                if upds.times.last() == self.time_con.get(0)
286                    && upds.diffs.last() == self.diff_con.get(0)
287                {
288                    self.total += 1;
289                    self.stash.clear();
290                    upds.offs.push_ref(upds.times.len());
291                    return true;
292                }
293            }
294            // Conventional; move `stash` into `updates`.
295            self.total += self.stash.len();
296            for (time, diff) in self.stash.drain(..) {
297                upds.times.push_own(&time);
298                upds.diffs.push_own(&diff);
299            }
300            upds.offs.push_ref(upds.times.len());
301            true
302        }
303
304        /// Completes the building and returns the total updates sealed.
305        pub fn total(&self) -> usize {
306            self.total
307        }
308    }
309}
310
311/// Types related to forming batches with values.
312pub mod val_batch {
313
314    use serde::{Deserialize, Serialize};
315    use std::marker::PhantomData;
316    use timely::container::PushInto;
317    use timely::progress::{frontier::AntichainRef, Antichain};
318
319    use crate::trace::implementations::layout;
320    use crate::trace::implementations::{BatchContainer, BuilderInput};
321    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
322
323    use super::{layers::UpdsBuilder, Layout, Upds, Vals};
324
325    /// An immutable collection of update tuples, from a contiguous interval of logical times.
326    #[derive(Debug, Serialize, Deserialize)]
327    #[serde(bound = "
328        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
329        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
330        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
331        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
332        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
333    ")]
334    pub struct OrdValStorage<L: Layout> {
335        /// An ordered list of keys.
336        pub keys: L::KeyContainer,
337        /// For each key in `keys`, a list of values.
338        pub vals: Vals<L::OffsetContainer, L::ValContainer>,
339        /// For each val in `vals`, a list of (time, diff) updates.
340        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
341    }
342
343    /// An immutable collection of update tuples, from a contiguous interval of logical times.
344    ///
345    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
346    /// merge batcher to select.
347    #[derive(Serialize, Deserialize)]
348    #[serde(bound = "
349        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
350        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
351        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
352        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
353        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
354    ")]
355    pub struct OrdValBatch<L: Layout> {
356        /// The updates themselves.
357        pub storage: OrdValStorage<L>,
358        /// Description of the update times this layer represents.
359        pub description: Description<layout::Time<L>>,
360        /// The number of updates reflected in the batch.
361        ///
362        /// We track this separately from `storage` because due to the singleton optimization,
363        /// we may have many more updates than `storage.updates.len()`. It should equal that
364        /// length, plus the number of singleton optimizations employed.
365        pub updates: usize,
366    }
367
368    impl<L: Layout> WithLayout for OrdValBatch<L> {
369        type Layout = L;
370    }
371
372    impl<L: Layout> BatchReader for OrdValBatch<L> {
373        type Cursor = OrdValCursor<L>;
374        fn cursor(&self) -> Self::Cursor {
375            OrdValCursor {
376                key_cursor: 0,
377                val_cursor: 0,
378                phantom: PhantomData,
379            }
380        }
381        fn len(&self) -> usize {
382            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
383            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
384            self.updates
385        }
386        fn description(&self) -> &Description<layout::Time<L>> {
387            &self.description
388        }
389    }
390
391    impl<L: Layout> Batch for OrdValBatch<L> {
392        type Merger = OrdValMerger<L>;
393
394        fn begin_merge(
395            &self,
396            other: &Self,
397            compaction_frontier: AntichainRef<layout::Time<L>>,
398        ) -> Self::Merger {
399            OrdValMerger::new(self, other, compaction_frontier)
400        }
401
402        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
403            use timely::progress::Timestamp;
404            Self {
405                storage: OrdValStorage {
406                    keys: L::KeyContainer::with_capacity(0),
407                    vals: Default::default(),
408                    upds: Default::default(),
409                },
410                description: Description::new(
411                    lower,
412                    upper,
413                    Antichain::from_elem(Self::Time::minimum()),
414                ),
415                updates: 0,
416            }
417        }
418    }
419
420    /// State for an in-progress merge.
421    pub struct OrdValMerger<L: Layout> {
422        /// Key position to merge next in the first batch.
423        key_cursor1: usize,
424        /// Key position to merge next in the second batch.
425        key_cursor2: usize,
426        /// result that we are currently assembling.
427        result: OrdValStorage<L>,
428        /// description
429        description: Description<layout::Time<L>>,
430        /// Staging area to consolidate owned times and diffs, before sealing.
431        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
432    }
433
434    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
435    where
436        OrdValBatch<L>: Batch<Time = layout::Time<L>>,
437    {
438        fn new(
439            batch1: &OrdValBatch<L>,
440            batch2: &OrdValBatch<L>,
441            compaction_frontier: AntichainRef<layout::Time<L>>,
442        ) -> Self {
443            assert!(batch1.upper() == batch2.lower());
444            use crate::lattice::Lattice;
445            let mut since = batch1
446                .description()
447                .since()
448                .join(batch2.description().since());
449            since = since.join(&compaction_frontier.to_owned());
450
451            let description =
452                Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
453
454            let batch1 = &batch1.storage;
455            let batch2 = &batch2.storage;
456
457            OrdValMerger {
458                key_cursor1: 0,
459                key_cursor2: 0,
460                result: OrdValStorage {
461                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
462                    vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
463                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
464                },
465                description,
466                staging: UpdsBuilder::default(),
467            }
468        }
469        fn done(self) -> OrdValBatch<L> {
470            OrdValBatch {
471                updates: self.staging.total(),
472                storage: self.result,
473                description: self.description,
474            }
475        }
476        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
477            // An (incomplete) indication of the amount of work we've done so far.
478            let starting_updates = self.staging.total();
479            let mut effort = 0isize;
480
481            // While both mergees are still active, perform single-key merges.
482            while self.key_cursor1 < source1.storage.keys.len()
483                && self.key_cursor2 < source2.storage.keys.len()
484                && effort < *fuel
485            {
486                self.merge_key(&source1.storage, &source2.storage);
487                // An (incomplete) accounting of the work we've done.
488                effort = (self.staging.total() - starting_updates) as isize;
489            }
490
491            // Merging is complete, and only copying remains.
492            // Key-by-key copying allows effort interruption, and compaction.
493            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
494                self.copy_key(&source1.storage, self.key_cursor1);
495                self.key_cursor1 += 1;
496                effort = (self.staging.total() - starting_updates) as isize;
497            }
498            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
499                self.copy_key(&source2.storage, self.key_cursor2);
500                self.key_cursor2 += 1;
501                effort = (self.staging.total() - starting_updates) as isize;
502            }
503
504            *fuel -= effort;
505        }
506    }
507
508    // Helper methods in support of merging batches.
509    impl<L: Layout> OrdValMerger<L> {
510        /// Copy the next key in `source`.
511        ///
512        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
513        /// If the result does not wholly cancel, they key will be present in `self` with the
514        /// compacted values and updates.
515        ///
516        /// The caller should be certain to update the cursor, as this method does not do this.
517        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
518            // Capture the initial number of values to determine if the merge was ultimately non-empty.
519            let init_vals = self.result.vals.vals.len();
520            let (mut lower, upper) = source.vals.bounds(cursor);
521            while lower < upper {
522                self.stash_updates_for_val(source, lower);
523                if self.staging.seal(&mut self.result.upds) {
524                    self.result.vals.vals.push_ref(source.vals.get_abs(lower));
525                }
526                lower += 1;
527            }
528
529            // If we have pushed any values, copy the key as well.
530            if self.result.vals.vals.len() > init_vals {
531                self.result.keys.push_ref(source.keys.index(cursor));
532                self.result.vals.offs.push_ref(self.result.vals.vals.len());
533            }
534        }
535        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
536        ///
537        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
538        /// if the updates cancel either directly or after compaction.
539        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
540            use ::std::cmp::Ordering;
541            match source1
542                .keys
543                .index(self.key_cursor1)
544                .cmp(&source2.keys.index(self.key_cursor2))
545            {
546                Ordering::Less => {
547                    self.copy_key(source1, self.key_cursor1);
548                    self.key_cursor1 += 1;
549                }
550                Ordering::Equal => {
551                    // Keys are equal; must merge all values from both sources for this one key.
552                    let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
553                    let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
554                    if let Some(off) =
555                        self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2))
556                    {
557                        self.result
558                            .keys
559                            .push_ref(source1.keys.index(self.key_cursor1));
560                        self.result.vals.offs.push_ref(off);
561                    }
562                    // Increment cursors in either case; the keys are merged.
563                    self.key_cursor1 += 1;
564                    self.key_cursor2 += 1;
565                }
566                Ordering::Greater => {
567                    self.copy_key(source2, self.key_cursor2);
568                    self.key_cursor2 += 1;
569                }
570            }
571        }
572        /// Merge two ranges of values into `self`.
573        ///
574        /// If the compacted result contains values with non-empty updates, the function returns
575        /// an offset that should be recorded to indicate the upper extent of the result values.
576        fn merge_vals(
577            &mut self,
578            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
579            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
580        ) -> Option<usize> {
581            // Capture the initial number of values to determine if the merge was ultimately non-empty.
582            let init_vals = self.result.vals.vals.len();
583            while lower1 < upper1 && lower2 < upper2 {
584                // We compare values, and fold in updates for the lowest values;
585                // if they are non-empty post-consolidation, we write the value.
586                // We could multi-way merge and it wouldn't be very complicated.
587                use ::std::cmp::Ordering;
588                match source1
589                    .vals
590                    .get_abs(lower1)
591                    .cmp(&source2.vals.get_abs(lower2))
592                {
593                    Ordering::Less => {
594                        // Extend stash by updates, with logical compaction applied.
595                        self.stash_updates_for_val(source1, lower1);
596                        if self.staging.seal(&mut self.result.upds) {
597                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
598                        }
599                        lower1 += 1;
600                    }
601                    Ordering::Equal => {
602                        self.stash_updates_for_val(source1, lower1);
603                        self.stash_updates_for_val(source2, lower2);
604                        if self.staging.seal(&mut self.result.upds) {
605                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
606                        }
607                        lower1 += 1;
608                        lower2 += 1;
609                    }
610                    Ordering::Greater => {
611                        // Extend stash by updates, with logical compaction applied.
612                        self.stash_updates_for_val(source2, lower2);
613                        if self.staging.seal(&mut self.result.upds) {
614                            self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
615                        }
616                        lower2 += 1;
617                    }
618                }
619            }
620            // Merging is complete, but we may have remaining elements to push.
621            while lower1 < upper1 {
622                self.stash_updates_for_val(source1, lower1);
623                if self.staging.seal(&mut self.result.upds) {
624                    self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
625                }
626                lower1 += 1;
627            }
628            while lower2 < upper2 {
629                self.stash_updates_for_val(source2, lower2);
630                if self.staging.seal(&mut self.result.upds) {
631                    self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
632                }
633                lower2 += 1;
634            }
635
636            // Values being pushed indicate non-emptiness.
637            if self.result.vals.vals.len() > init_vals {
638                Some(self.result.vals.vals.len())
639            } else {
640                None
641            }
642        }
643
644        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
645        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
646            let (lower, upper) = source.upds.bounds(index);
647            for i in lower..upper {
648                // NB: Here is where we would need to look back if `lower == upper`.
649                let (time, diff) = source.upds.get_abs(i);
650                use crate::lattice::Lattice;
651                let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
652                new_time.advance_by(self.description.since().borrow());
653                self.staging
654                    .push(new_time, L::DiffContainer::into_owned(diff));
655            }
656        }
657    }
658
659    /// A cursor for navigating a single layer.
660    pub struct OrdValCursor<L: Layout> {
661        /// Absolute position of the current key.
662        key_cursor: usize,
663        /// Absolute position of the current value.
664        val_cursor: usize,
665        /// Phantom marker for Rust happiness.
666        phantom: PhantomData<L>,
667    }
668
669    use crate::trace::implementations::WithLayout;
670    impl<L: Layout> WithLayout for OrdValCursor<L> {
671        type Layout = L;
672    }
673
674    impl<L: Layout> Cursor for OrdValCursor<L> {
675        type Storage = OrdValBatch<L>;
676
677        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
678            storage.storage.keys.get(self.key_cursor)
679        }
680        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
681            if self.val_valid(storage) {
682                Some(self.val(storage))
683            } else {
684                None
685            }
686        }
687
688        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> {
689            storage.storage.keys.index(self.key_cursor)
690        }
691        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> {
692            storage.storage.vals.get_abs(self.val_cursor)
693        }
694        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
695            &mut self,
696            storage: &OrdValBatch<L>,
697            mut logic: L2,
698        ) {
699            let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
700            for index in lower..upper {
701                let (time, diff) = storage.storage.upds.get_abs(index);
702                logic(time, diff);
703            }
704        }
705        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool {
706            self.key_cursor < storage.storage.keys.len()
707        }
708        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool {
709            self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1
710        }
711        fn step_key(&mut self, storage: &OrdValBatch<L>) {
712            self.key_cursor += 1;
713            if self.key_valid(storage) {
714                self.rewind_vals(storage);
715            } else {
716                self.key_cursor = storage.storage.keys.len();
717            }
718        }
719        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
720            self.key_cursor +=
721                storage
722                    .storage
723                    .keys
724                    .advance(self.key_cursor, storage.storage.keys.len(), |x| {
725                        <L::KeyContainer as BatchContainer>::reborrow(x)
726                            .lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
727                    });
728            if self.key_valid(storage) {
729                self.rewind_vals(storage);
730            }
731        }
732        fn step_val(&mut self, storage: &OrdValBatch<L>) {
733            self.val_cursor += 1;
734            if !self.val_valid(storage) {
735                self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
736            }
737        }
738        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
739            self.val_cursor += storage.storage.vals.vals.advance(
740                self.val_cursor,
741                storage.storage.vals.bounds(self.key_cursor).1,
742                |x| {
743                    <L::ValContainer as BatchContainer>::reborrow(x)
744                        .lt(&<L::ValContainer as BatchContainer>::reborrow(val))
745                },
746            );
747        }
748        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
749            self.key_cursor = 0;
750            if self.key_valid(storage) {
751                self.rewind_vals(storage)
752            }
753        }
754        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
755            self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
756        }
757    }
758
759    /// A builder for creating layers from unsorted update tuples.
760    pub struct OrdValBuilder<L: Layout, CI> {
761        /// The in-progress result.
762        ///
763        /// This is public to allow container implementors to set and inspect their container.
764        pub result: OrdValStorage<L>,
765        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
766        _marker: PhantomData<CI>,
767    }
768
769    impl<L, CI> Builder for OrdValBuilder<L, CI>
770    where
771        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>, ValContainer: PushInto<CI::Val<'a>>>,
772        CI: for<'a> BuilderInput<
773            L::KeyContainer,
774            L::ValContainer,
775            Time = layout::Time<L>,
776            Diff = layout::Diff<L>,
777        >,
778    {
779        type Input = CI;
780        type Time = layout::Time<L>;
781        type Output = OrdValBatch<L>;
782
783        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
784            Self {
785                result: OrdValStorage {
786                    keys: L::KeyContainer::with_capacity(keys),
787                    vals: Vals::with_capacity(keys + 1, vals),
788                    upds: Upds::with_capacity(vals + 1, upds),
789                },
790                staging: UpdsBuilder::default(),
791                _marker: PhantomData,
792            }
793        }
794
795        #[inline]
796        fn push(&mut self, chunk: &mut Self::Input) {
797            for item in chunk.drain() {
798                let (key, val, time, diff) = CI::into_parts(item);
799
800                // Pre-load the first update.
801                if self.result.keys.is_empty() {
802                    self.result.vals.vals.push_into(val);
803                    self.result.keys.push_into(key);
804                    self.staging.push(time, diff);
805                }
806                // Perhaps this is a continuation of an already received key.
807                else if self
808                    .result
809                    .keys
810                    .last()
811                    .map(|k| CI::key_eq(&key, k))
812                    .unwrap_or(false)
813                {
814                    // Perhaps this is a continuation of an already received value.
815                    if self
816                        .result
817                        .vals
818                        .vals
819                        .last()
820                        .map(|v| CI::val_eq(&val, v))
821                        .unwrap_or(false)
822                    {
823                        self.staging.push(time, diff);
824                    } else {
825                        // New value; complete representation of prior value.
826                        self.staging.seal(&mut self.result.upds);
827                        self.staging.push(time, diff);
828                        self.result.vals.vals.push_into(val);
829                    }
830                } else {
831                    // New key; complete representation of prior key.
832                    self.staging.seal(&mut self.result.upds);
833                    self.staging.push(time, diff);
834                    self.result.vals.offs.push_ref(self.result.vals.vals.len());
835                    self.result.vals.vals.push_into(val);
836                    self.result.keys.push_into(key);
837                }
838            }
839        }
840
841        #[inline(never)]
842        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
843            self.staging.seal(&mut self.result.upds);
844            self.result.vals.offs.push_ref(self.result.vals.vals.len());
845            OrdValBatch {
846                updates: self.staging.total(),
847                storage: self.result,
848                description,
849            }
850        }
851
852        fn seal(
853            chain: &mut Vec<Self::Input>,
854            description: Description<Self::Time>,
855        ) -> Self::Output {
856            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
857            let mut builder = Self::with_capacity(keys, vals, upds);
858            for mut chunk in chain.drain(..) {
859                builder.push(&mut chunk);
860            }
861
862            builder.done(description)
863        }
864    }
865}
866
867/// Types related to forming batches of keys.
868pub mod key_batch {
869
870    use serde::{Deserialize, Serialize};
871    use std::marker::PhantomData;
872    use timely::container::PushInto;
873    use timely::progress::{frontier::AntichainRef, Antichain};
874
875    use crate::trace::implementations::layout;
876    use crate::trace::implementations::{BatchContainer, BuilderInput};
877    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
878
879    use super::{layers::UpdsBuilder, Layout, Upds};
880
881    /// An immutable collection of update tuples, from a contiguous interval of logical times.
882    #[derive(Debug, Serialize, Deserialize)]
883    #[serde(bound = "
884        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
885        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
886        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
887        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
888    ")]
889    pub struct OrdKeyStorage<L: Layout> {
890        /// An ordered list of keys, corresponding to entries in `keys_offs`.
891        pub keys: L::KeyContainer,
892        /// For each key in `keys`, a list of (time, diff) updates.
893        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
894    }
895
896    /// An immutable collection of update tuples, from a contiguous interval of logical times.
897    ///
898    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
899    /// merge batcher to select.
900    #[derive(Serialize, Deserialize)]
901    #[serde(bound = "
902        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
903        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
904        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
905        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
906        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
907    ")]
908    pub struct OrdKeyBatch<L: Layout> {
909        /// The updates themselves.
910        pub storage: OrdKeyStorage<L>,
911        /// Description of the update times this layer represents.
912        pub description: Description<layout::Time<L>>,
913        /// The number of updates reflected in the batch.
914        ///
915        /// We track this separately from `storage` because due to the singleton optimization,
916        /// we may have many more updates than `storage.updates.len()`. It should equal that
917        /// length, plus the number of singleton optimizations employed.
918        pub updates: usize,
919
920        /// Single value to return if asked.
921        pub value: L::ValContainer,
922    }
923
924    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
925        /// Creates a container with one value, to slot in to `self.value`.
926        pub fn create_value() -> L::ValContainer {
927            let mut value = L::ValContainer::with_capacity(1);
928            value.push_own(&Default::default());
929            value
930        }
931    }
932
933    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
934        type Layout = L;
935    }
936
937    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
938        type Cursor = OrdKeyCursor<L>;
939        fn cursor(&self) -> Self::Cursor {
940            OrdKeyCursor {
941                key_cursor: 0,
942                val_stepped: false,
943                phantom: std::marker::PhantomData,
944            }
945        }
946        fn len(&self) -> usize {
947            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
948            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
949            self.updates
950        }
951        fn description(&self) -> &Description<layout::Time<L>> {
952            &self.description
953        }
954    }
955
956    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
957        type Merger = OrdKeyMerger<L>;
958
959        fn begin_merge(
960            &self,
961            other: &Self,
962            compaction_frontier: AntichainRef<layout::Time<L>>,
963        ) -> Self::Merger {
964            OrdKeyMerger::new(self, other, compaction_frontier)
965        }
966
967        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
968            use timely::progress::Timestamp;
969            Self {
970                storage: OrdKeyStorage {
971                    keys: L::KeyContainer::with_capacity(0),
972                    upds: Upds::default(),
973                },
974                description: Description::new(
975                    lower,
976                    upper,
977                    Antichain::from_elem(Self::Time::minimum()),
978                ),
979                updates: 0,
980                value: Self::create_value(),
981            }
982        }
983    }
984
985    /// State for an in-progress merge.
986    pub struct OrdKeyMerger<L: Layout> {
987        /// Key position to merge next in the first batch.
988        key_cursor1: usize,
989        /// Key position to merge next in the second batch.
990        key_cursor2: usize,
991        /// result that we are currently assembling.
992        result: OrdKeyStorage<L>,
993        /// description
994        description: Description<layout::Time<L>>,
995
996        /// Local stash of updates, to use for consolidation.
997        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
998    }
999
1000    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>>
1001        for OrdKeyMerger<L>
1002    where
1003        OrdKeyBatch<L>: Batch<Time = layout::Time<L>>,
1004    {
1005        fn new(
1006            batch1: &OrdKeyBatch<L>,
1007            batch2: &OrdKeyBatch<L>,
1008            compaction_frontier: AntichainRef<layout::Time<L>>,
1009        ) -> Self {
1010            assert!(batch1.upper() == batch2.lower());
1011            use crate::lattice::Lattice;
1012            let mut since = batch1
1013                .description()
1014                .since()
1015                .join(batch2.description().since());
1016            since = since.join(&compaction_frontier.to_owned());
1017
1018            let description =
1019                Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
1020
1021            let batch1 = &batch1.storage;
1022            let batch2 = &batch2.storage;
1023
1024            OrdKeyMerger {
1025                key_cursor1: 0,
1026                key_cursor2: 0,
1027                result: OrdKeyStorage {
1028                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
1029                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
1030                },
1031                description,
1032                staging: UpdsBuilder::default(),
1033            }
1034        }
1035        fn done(self) -> OrdKeyBatch<L> {
1036            OrdKeyBatch {
1037                updates: self.staging.total(),
1038                storage: self.result,
1039                description: self.description,
1040                value: OrdKeyBatch::<L>::create_value(),
1041            }
1042        }
1043        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
1044            // An (incomplete) indication of the amount of work we've done so far.
1045            let starting_updates = self.staging.total();
1046            let mut effort = 0isize;
1047
1048            // While both mergees are still active, perform single-key merges.
1049            while self.key_cursor1 < source1.storage.keys.len()
1050                && self.key_cursor2 < source2.storage.keys.len()
1051                && effort < *fuel
1052            {
1053                self.merge_key(&source1.storage, &source2.storage);
1054                // An (incomplete) accounting of the work we've done.
1055                effort = (self.staging.total() - starting_updates) as isize;
1056            }
1057
1058            // Merging is complete, and only copying remains.
1059            // Key-by-key copying allows effort interruption, and compaction.
1060            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
1061                self.copy_key(&source1.storage, self.key_cursor1);
1062                self.key_cursor1 += 1;
1063                effort = (self.staging.total() - starting_updates) as isize;
1064            }
1065            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
1066                self.copy_key(&source2.storage, self.key_cursor2);
1067                self.key_cursor2 += 1;
1068                effort = (self.staging.total() - starting_updates) as isize;
1069            }
1070
1071            *fuel -= effort;
1072        }
1073    }
1074
1075    // Helper methods in support of merging batches.
1076    impl<L: Layout> OrdKeyMerger<L> {
1077        /// Copy the next key in `source`.
1078        ///
1079        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
1080        /// If the result does not wholly cancel, they key will be present in `self` with the
1081        /// compacted values and updates.
1082        ///
1083        /// The caller should be certain to update the cursor, as this method does not do this.
1084        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
1085            self.stash_updates_for_key(source, cursor);
1086            if self.staging.seal(&mut self.result.upds) {
1087                self.result.keys.push_ref(source.keys.index(cursor));
1088            }
1089        }
1090        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
1091        ///
1092        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
1093        /// if the updates cancel either directly or after compaction.
1094        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
1095            use ::std::cmp::Ordering;
1096            match source1
1097                .keys
1098                .index(self.key_cursor1)
1099                .cmp(&source2.keys.index(self.key_cursor2))
1100            {
1101                Ordering::Less => {
1102                    self.copy_key(source1, self.key_cursor1);
1103                    self.key_cursor1 += 1;
1104                }
1105                Ordering::Equal => {
1106                    // Keys are equal; must merge all updates from both sources for this one key.
1107                    self.stash_updates_for_key(source1, self.key_cursor1);
1108                    self.stash_updates_for_key(source2, self.key_cursor2);
1109                    if self.staging.seal(&mut self.result.upds) {
1110                        self.result
1111                            .keys
1112                            .push_ref(source1.keys.index(self.key_cursor1));
1113                    }
1114                    // Increment cursors in either case; the keys are merged.
1115                    self.key_cursor1 += 1;
1116                    self.key_cursor2 += 1;
1117                }
1118                Ordering::Greater => {
1119                    self.copy_key(source2, self.key_cursor2);
1120                    self.key_cursor2 += 1;
1121                }
1122            }
1123        }
1124
1125        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
1126        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
1127            let (lower, upper) = source.upds.bounds(index);
1128            for i in lower..upper {
1129                // NB: Here is where we would need to look back if `lower == upper`.
1130                let (time, diff) = source.upds.get_abs(i);
1131                use crate::lattice::Lattice;
1132                let mut new_time = L::TimeContainer::into_owned(time);
1133                new_time.advance_by(self.description.since().borrow());
1134                self.staging
1135                    .push(new_time, L::DiffContainer::into_owned(diff));
1136            }
1137        }
1138    }
1139
1140    /// A cursor for navigating a single layer.
1141    pub struct OrdKeyCursor<L: Layout> {
1142        /// Absolute position of the current key.
1143        key_cursor: usize,
1144        /// If the value has been stepped for the key, there are no more values.
1145        val_stepped: bool,
1146        /// Phantom marker for Rust happiness.
1147        phantom: PhantomData<L>,
1148    }
1149
1150    use crate::trace::implementations::WithLayout;
1151    impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
1152        type Layout = L;
1153    }
1154
1155    impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
1156        type Storage = OrdKeyBatch<L>;
1157
1158        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
1159            storage.storage.keys.get(self.key_cursor)
1160        }
1161        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
1162            if self.val_valid(storage) {
1163                Some(self.val(storage))
1164            } else {
1165                None
1166            }
1167        }
1168
1169        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
1170            storage.storage.keys.index(self.key_cursor)
1171        }
1172        fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
1173            storage.value.index(0)
1174        }
1175        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
1176            &mut self,
1177            storage: &Self::Storage,
1178            mut logic: L2,
1179        ) {
1180            let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1181            for index in lower..upper {
1182                let (time, diff) = storage.storage.upds.get_abs(index);
1183                logic(time, diff);
1184            }
1185        }
1186        fn key_valid(&self, storage: &Self::Storage) -> bool {
1187            self.key_cursor < storage.storage.keys.len()
1188        }
1189        fn val_valid(&self, _storage: &Self::Storage) -> bool {
1190            !self.val_stepped
1191        }
1192        fn step_key(&mut self, storage: &Self::Storage) {
1193            self.key_cursor += 1;
1194            if self.key_valid(storage) {
1195                self.rewind_vals(storage);
1196            } else {
1197                self.key_cursor = storage.storage.keys.len();
1198            }
1199        }
1200        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1201            self.key_cursor +=
1202                storage
1203                    .storage
1204                    .keys
1205                    .advance(self.key_cursor, storage.storage.keys.len(), |x| {
1206                        <L::KeyContainer as BatchContainer>::reborrow(x)
1207                            .lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
1208                    });
1209            if self.key_valid(storage) {
1210                self.rewind_vals(storage);
1211            }
1212        }
1213        fn step_val(&mut self, _storage: &Self::Storage) {
1214            self.val_stepped = true;
1215        }
1216        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) {}
1217        fn rewind_keys(&mut self, storage: &Self::Storage) {
1218            self.key_cursor = 0;
1219            if self.key_valid(storage) {
1220                self.rewind_vals(storage)
1221            }
1222        }
1223        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1224            self.val_stepped = false;
1225        }
1226    }
1227
1228    /// A builder for creating layers from unsorted update tuples.
1229    pub struct OrdKeyBuilder<L: Layout, CI> {
1230        /// The in-progress result.
1231        ///
1232        /// This is public to allow container implementors to set and inspect their container.
1233        pub result: OrdKeyStorage<L>,
1234        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1235        _marker: PhantomData<CI>,
1236    }
1237
1238    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1239    where
1240        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1241        L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1242        CI: BuilderInput<
1243            L::KeyContainer,
1244            L::ValContainer,
1245            Time = layout::Time<L>,
1246            Diff = layout::Diff<L>,
1247        >,
1248    {
1249        type Input = CI;
1250        type Time = layout::Time<L>;
1251        type Output = OrdKeyBatch<L>;
1252
1253        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1254            Self {
1255                result: OrdKeyStorage {
1256                    keys: L::KeyContainer::with_capacity(keys),
1257                    upds: Upds::with_capacity(keys + 1, upds),
1258                },
1259                staging: UpdsBuilder::default(),
1260                _marker: PhantomData,
1261            }
1262        }
1263
1264        #[inline]
1265        fn push(&mut self, chunk: &mut Self::Input) {
1266            for item in chunk.drain() {
1267                let (key, _val, time, diff) = CI::into_parts(item);
1268                if self.result.keys.is_empty() {
1269                    self.result.keys.push_into(key);
1270                    self.staging.push(time, diff);
1271                }
1272                // Perhaps this is a continuation of an already received key.
1273                else if self
1274                    .result
1275                    .keys
1276                    .last()
1277                    .map(|k| CI::key_eq(&key, k))
1278                    .unwrap_or(false)
1279                {
1280                    self.staging.push(time, diff);
1281                } else {
1282                    self.staging.seal(&mut self.result.upds);
1283                    self.staging.push(time, diff);
1284                    self.result.keys.push_into(key);
1285                }
1286            }
1287        }
1288
1289        #[inline(never)]
1290        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1291            self.staging.seal(&mut self.result.upds);
1292            OrdKeyBatch {
1293                updates: self.staging.total(),
1294                storage: self.result,
1295                description,
1296                value: OrdKeyBatch::<L>::create_value(),
1297            }
1298        }
1299
1300        fn seal(
1301            chain: &mut Vec<Self::Input>,
1302            description: Description<Self::Time>,
1303        ) -> Self::Output {
1304            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1305            let mut builder = Self::with_capacity(keys, vals, upds);
1306            for mut chunk in chain.drain(..) {
1307                builder.push(&mut chunk);
1308            }
1309
1310            builder.done(description)
1311        }
1312    }
1313}