differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder using ordered lists.
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41/// A trace implementation using a spine of ordered lists.
42pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43/// A batcher for ordered lists.
44pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45/// A builder for ordered lists.
46pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48// /// A trace implementation for empty values using a spine of ordered lists.
49// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
50
51/// A trace implementation backed by columnar storage.
52pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53/// A batcher for columnar storage
54pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55/// A builder for columnar storage
56pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58/// A trace implementation backed by columnar storage.
59pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60/// A batcher for columnar storage.
61pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62/// A builder for columnar storage.
63pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65// /// A trace implementation backed by columnar storage.
66// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
67
68
69mod val_batch {
70
71    use std::marker::PhantomData;
72    use serde::{Deserialize, Serialize};
73    use timely::container::PushInto;
74    use timely::progress::{Antichain, frontier::AntichainRef};
75
76    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
77    use crate::trace::implementations::{BatchContainer, BuilderInput};
78    use crate::IntoOwned;
79
80    use super::{Layout, Update};
81
82    /// An immutable collection of update tuples, from a contiguous interval of logical times.
83    #[derive(Debug, Serialize, Deserialize)]
84    pub struct OrdValStorage<L: Layout> {
85        /// An ordered list of keys, corresponding to entries in `keys_offs`.
86        pub keys: L::KeyContainer,
87        /// Offsets used to provide indexes from keys to values.
88        ///
89        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
90        pub keys_offs: L::OffsetContainer,
91        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
92        pub vals: L::ValContainer,
93        /// Offsets used to provide indexes from values to updates.
94        ///
95        /// This list has a special representation that any empty range indicates the singleton
96        /// element just before the range, as if the start were decremented by one. The empty
97        /// range is otherwise an invalid representation, and we borrow it to compactly encode
98        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
99        ///
100        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
101        pub vals_offs: L::OffsetContainer,
102        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
103        pub times: L::TimeContainer,
104        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
105        pub diffs: L::DiffContainer,
106    }
107
108    impl<L: Layout> OrdValStorage<L> {
109        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
110        fn values_for_key(&self, index: usize) -> (usize, usize) {
111            (self.keys_offs.index(index), self.keys_offs.index(index+1))
112        }
113        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
114        fn updates_for_value(&self, index: usize) -> (usize, usize) {
115            let mut lower = self.vals_offs.index(index);
116            let upper = self.vals_offs.index(index+1);
117            // We use equal lower and upper to encode "singleton update; just before here".
118            // It should only apply when there is a prior element, so `lower` should be greater than zero.
119            if lower == upper {
120                assert!(lower > 0);
121                lower -= 1;
122            }
123            (lower, upper)
124        }
125    }
126
127    /// An immutable collection of update tuples, from a contiguous interval of logical times.
128    ///
129    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
130    /// merge batcher to select.
131    #[derive(Serialize, Deserialize)]
132    #[serde(bound = "
133        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
134        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
135        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
136        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
137        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
138    ")]
139    pub struct OrdValBatch<L: Layout> {
140        /// The updates themselves.
141        pub storage: OrdValStorage<L>,
142        /// Description of the update times this layer represents.
143        pub description: Description<<L::Target as Update>::Time>,
144        /// The number of updates reflected in the batch.
145        ///
146        /// We track this separately from `storage` because due to the singleton optimization,
147        /// we may have many more updates than `storage.updates.len()`. It should equal that 
148        /// length, plus the number of singleton optimizations employed.
149        pub updates: usize,
150    }
151
152    impl<L: Layout> BatchReader for OrdValBatch<L> {
153        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
154        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
155        type Time = <L::Target as Update>::Time;
156        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
157        type Diff = <L::Target as Update>::Diff;
158        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
159
160        type Cursor = OrdValCursor<L>;
161        fn cursor(&self) -> Self::Cursor { 
162            OrdValCursor {
163                key_cursor: 0,
164                val_cursor: 0,
165                phantom: PhantomData,
166            }
167        }
168        fn len(&self) -> usize { 
169            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
170            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
171            self.updates
172        }
173        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
174    }
175
176    impl<L: Layout> Batch for OrdValBatch<L> {
177        type Merger = OrdValMerger<L>;
178
179        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
180            OrdValMerger::new(self, other, compaction_frontier)
181        }
182
183        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
184            use timely::progress::Timestamp;
185            Self {
186                storage: OrdValStorage {
187                    keys: L::KeyContainer::with_capacity(0),
188                    keys_offs: L::OffsetContainer::with_capacity(0),
189                    vals: L::ValContainer::with_capacity(0),
190                    vals_offs: L::OffsetContainer::with_capacity(0),
191                    times: L::TimeContainer::with_capacity(0),
192                    diffs: L::DiffContainer::with_capacity(0),
193                },
194                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
195                updates: 0,
196            }
197        }
198    }
199
200    /// State for an in-progress merge.
201    pub struct OrdValMerger<L: Layout> {
202        /// Key position to merge next in the first batch.
203        key_cursor1: usize,
204        /// Key position to merge next in the second batch.
205        key_cursor2: usize,
206        /// result that we are currently assembling.
207        result: OrdValStorage<L>,
208        /// description
209        description: Description<<L::Target as Update>::Time>,
210
211        /// Local stash of updates, to use for consolidation.
212        ///
213        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
214        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
215        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
216        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
217        singletons: usize,
218    }
219
220    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
221    where
222        OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
223        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
224        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
225    {
226        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
227
228            assert!(batch1.upper() == batch2.lower());
229            use crate::lattice::Lattice;
230            let mut since = batch1.description().since().join(batch2.description().since());
231            since = since.join(&compaction_frontier.to_owned());
232
233            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
234
235            let batch1 = &batch1.storage;
236            let batch2 = &batch2.storage;
237
238            let mut storage = OrdValStorage {
239                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
240                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
241                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
242                vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
243                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
244                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
245            };
246
247            // Mark explicit types because type inference fails to resolve it.
248            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
249            keys_offs.push(0);
250            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
251            vals_offs.push(0);
252
253            OrdValMerger {
254                key_cursor1: 0,
255                key_cursor2: 0,
256                result: storage,
257                description,
258                update_stash: Vec::new(),
259                singletons: 0,
260            }
261        }
262        fn done(self) -> OrdValBatch<L> {
263            OrdValBatch {
264                updates: self.result.times.len() + self.singletons,
265                storage: self.result,
266                description: self.description,
267            }
268        }
269        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
270
271            // An (incomplete) indication of the amount of work we've done so far.
272            let starting_updates = self.result.times.len();
273            let mut effort = 0isize;
274
275            // While both mergees are still active, perform single-key merges.
276            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
277                self.merge_key(&source1.storage, &source2.storage);
278                // An (incomplete) accounting of the work we've done.
279                effort = (self.result.times.len() - starting_updates) as isize;
280            }
281
282            // Merging is complete, and only copying remains. 
283            // Key-by-key copying allows effort interruption, and compaction.
284            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
285                self.copy_key(&source1.storage, self.key_cursor1);
286                self.key_cursor1 += 1;
287                effort = (self.result.times.len() - starting_updates) as isize;
288            }
289            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
290                self.copy_key(&source2.storage, self.key_cursor2);
291                self.key_cursor2 += 1;
292                effort = (self.result.times.len() - starting_updates) as isize;
293            }
294
295            *fuel -= effort;
296        }
297    }
298
299    // Helper methods in support of merging batches.
300    impl<L: Layout> OrdValMerger<L> {
301        /// Copy the next key in `source`.
302        ///
303        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
304        /// If the result does not wholly cancel, they key will be present in `self` with the
305        /// compacted values and updates.
306        ///
307        /// The caller should be certain to update the cursor, as this method does not do this.
308        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
309            // Capture the initial number of values to determine if the merge was ultimately non-empty.
310            let init_vals = self.result.vals.len();
311            let (mut lower, upper) = source.values_for_key(cursor);
312            while lower < upper {
313                self.stash_updates_for_val(source, lower);
314                if let Some(off) = self.consolidate_updates() {
315                    self.result.vals_offs.push(off);
316                    self.result.vals.push(source.vals.index(lower));
317                }
318                lower += 1;
319            }            
320
321            // If we have pushed any values, copy the key as well.
322            if self.result.vals.len() > init_vals {
323                self.result.keys.push(source.keys.index(cursor));
324                self.result.keys_offs.push(self.result.vals.len());
325            }           
326        }
327        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
328        ///
329        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
330        /// if the updates cancel either directly or after compaction.
331        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
332            use ::std::cmp::Ordering;
333            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
334                Ordering::Less => { 
335                    self.copy_key(source1, self.key_cursor1);
336                    self.key_cursor1 += 1;
337                },
338                Ordering::Equal => {
339                    // Keys are equal; must merge all values from both sources for this one key.
340                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
341                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
342                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
343                        self.result.keys.push(source1.keys.index(self.key_cursor1));
344                        self.result.keys_offs.push(off);
345                    }
346                    // Increment cursors in either case; the keys are merged.
347                    self.key_cursor1 += 1;
348                    self.key_cursor2 += 1;
349                },
350                Ordering::Greater => {
351                    self.copy_key(source2, self.key_cursor2);
352                    self.key_cursor2 += 1;
353                },
354            }
355        }
356        /// Merge two ranges of values into `self`.
357        ///
358        /// If the compacted result contains values with non-empty updates, the function returns
359        /// an offset that should be recorded to indicate the upper extent of the result values.
360        fn merge_vals(
361            &mut self, 
362            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize), 
363            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
364        ) -> Option<usize> {
365            // Capture the initial number of values to determine if the merge was ultimately non-empty.
366            let init_vals = self.result.vals.len();
367            while lower1 < upper1 && lower2 < upper2 {
368                // We compare values, and fold in updates for the lowest values;
369                // if they are non-empty post-consolidation, we write the value.
370                // We could multi-way merge and it wouldn't be very complicated.
371                use ::std::cmp::Ordering;
372                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
373                    Ordering::Less => { 
374                        // Extend stash by updates, with logical compaction applied.
375                        self.stash_updates_for_val(source1, lower1);
376                        if let Some(off) = self.consolidate_updates() {
377                            self.result.vals_offs.push(off);
378                            self.result.vals.push(source1.vals.index(lower1));
379                        }
380                        lower1 += 1;
381                    },
382                    Ordering::Equal => {
383                        self.stash_updates_for_val(source1, lower1);
384                        self.stash_updates_for_val(source2, lower2);
385                        if let Some(off) = self.consolidate_updates() {
386                            self.result.vals_offs.push(off);
387                            self.result.vals.push(source1.vals.index(lower1));
388                        }
389                        lower1 += 1;
390                        lower2 += 1;
391                    },
392                    Ordering::Greater => { 
393                        // Extend stash by updates, with logical compaction applied.
394                        self.stash_updates_for_val(source2, lower2);
395                        if let Some(off) = self.consolidate_updates() {
396                            self.result.vals_offs.push(off);
397                            self.result.vals.push(source2.vals.index(lower2));
398                        }
399                        lower2 += 1;
400                    },
401                }
402            }
403            // Merging is complete, but we may have remaining elements to push.
404            while lower1 < upper1 {
405                self.stash_updates_for_val(source1, lower1);
406                if let Some(off) = self.consolidate_updates() {
407                    self.result.vals_offs.push(off);
408                    self.result.vals.push(source1.vals.index(lower1));
409                }
410                lower1 += 1;
411            }
412            while lower2 < upper2 {
413                self.stash_updates_for_val(source2, lower2);
414                if let Some(off) = self.consolidate_updates() {
415                    self.result.vals_offs.push(off);
416                    self.result.vals.push(source2.vals.index(lower2));
417                }
418                lower2 += 1;
419            }
420
421            // Values being pushed indicate non-emptiness.
422            if self.result.vals.len() > init_vals {
423                Some(self.result.vals.len())
424            } else {
425                None
426            }
427        }
428
429        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
430        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
431            let (lower, upper) = source.updates_for_value(index);
432            for i in lower .. upper {
433                // NB: Here is where we would need to look back if `lower == upper`.
434                let time = source.times.index(i);
435                let diff = source.diffs.index(i);
436                use crate::lattice::Lattice;
437                let mut new_time: <L::Target as Update>::Time = time.into_owned();
438                new_time.advance_by(self.description.since().borrow());
439                self.update_stash.push((new_time, diff.into_owned()));
440            }
441        }
442
443        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
444        fn consolidate_updates(&mut self) -> Option<usize> {
445            use crate::consolidation;
446            consolidation::consolidate(&mut self.update_stash);
447            if !self.update_stash.is_empty() {
448                // If there is a single element, equal to a just-prior recorded update,
449                // we push nothing and report an unincremented offset to encode this case.
450                let time_diff = self.result.times.last().zip(self.result.diffs.last());
451                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
452                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
453                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
454                    t1.eq(&t2) && d1.eq(&d2)
455                });
456                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
457                    // Just clear out update_stash, as we won't drain it here.
458                    self.update_stash.clear();
459                    self.singletons += 1;
460                }
461                else {
462                    // Conventional; move `update_stash` into `updates`.
463                    for (time, diff) in self.update_stash.drain(..) {
464                        self.result.times.push(time);
465                        self.result.diffs.push(diff);
466                    }
467                }
468                Some(self.result.times.len())
469            } else {
470                None
471            }
472        }
473    }
474
475    /// A cursor for navigating a single layer.
476    pub struct OrdValCursor<L: Layout> {
477        /// Absolute position of the current key.
478        key_cursor: usize,
479        /// Absolute position of the current value.
480        val_cursor: usize,
481        /// Phantom marker for Rust happiness.
482        phantom: PhantomData<L>,
483    }
484
485    impl<L: Layout> Cursor for OrdValCursor<L> {
486
487        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
488        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
489        type Time = <L::Target as Update>::Time;
490        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
491        type Diff = <L::Target as Update>::Diff;
492        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
493
494        type Storage = OrdValBatch<L>;
495
496        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
497        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
498        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
499            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
500            for index in lower .. upper {
501                let time = storage.storage.times.index(index);
502                let diff = storage.storage.diffs.index(index);
503                logic(time, diff);
504            }
505        }
506        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
507        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
508        fn step_key(&mut self, storage: &OrdValBatch<L>){
509            self.key_cursor += 1;
510            if self.key_valid(storage) {
511                self.rewind_vals(storage);
512            }
513            else {
514                self.key_cursor = storage.storage.keys.len();
515            }
516        }
517        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
518            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
519            if self.key_valid(storage) {
520                self.rewind_vals(storage);
521            }
522        }
523        fn step_val(&mut self, storage: &OrdValBatch<L>) {
524            self.val_cursor += 1; 
525            if !self.val_valid(storage) {
526                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
527            }
528        }
529        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
530            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
531        }
532        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
533            self.key_cursor = 0;
534            if self.key_valid(storage) {
535                self.rewind_vals(storage)
536            }
537        }
538        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
539            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
540        }
541    }
542
543    /// A builder for creating layers from unsorted update tuples.
544    pub struct OrdValBuilder<L: Layout, CI> {
545        result: OrdValStorage<L>,
546        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
547        /// Counts the number of singleton optimizations we performed.
548        ///
549        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
550        /// even though `updates.len()` may be much shorter than this amount.
551        singletons: usize,
552        _marker: PhantomData<CI>,
553    }
554
555    impl<L: Layout, CI> OrdValBuilder<L, CI> {
556        /// Pushes a single update, which may set `self.singleton` rather than push.
557        ///
558        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
559        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
560        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
561        /// This otherwise invalid state encodes "look back one element".
562        ///
563        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
564        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
565        /// The update tuple is retained in `self.singleton` in case we see another update and need
566        /// to recover the singleton to push it into `updates` to join the second update.
567        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
568            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
569            if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
570               self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
571            {
572                assert!(self.singleton.is_none());
573                self.singleton = Some((time, diff));
574            }
575            else {
576                // If we have pushed a single element, we need to copy it out to meet this one.
577                if let Some((time, diff)) = self.singleton.take() {
578                    self.result.times.push(time);
579                    self.result.diffs.push(diff);
580                }
581                self.result.times.push(time);
582                self.result.diffs.push(diff);
583            }
584        }
585    }
586
587    impl<L, CI> Builder for OrdValBuilder<L, CI>
588    where
589        L: Layout,
590        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
591        for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
592        for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
593        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
594        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
595    {
596
597        type Input = CI;
598        type Time = <L::Target as Update>::Time;
599        type Output = OrdValBatch<L>;
600
601        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
602            // We don't introduce zero offsets as they will be introduced by the first `push` call.
603            Self { 
604                result: OrdValStorage {
605                    keys: L::KeyContainer::with_capacity(keys),
606                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
607                    vals: L::ValContainer::with_capacity(vals),
608                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
609                    times: L::TimeContainer::with_capacity(upds),
610                    diffs: L::DiffContainer::with_capacity(upds),
611                },
612                singleton: None,
613                singletons: 0,
614                _marker: PhantomData,
615            }
616        }
617
618        #[inline]
619        fn push(&mut self, chunk: &mut Self::Input) {
620            for item in chunk.drain() {
621                let (key, val, time, diff) = CI::into_parts(item);
622                // Perhaps this is a continuation of an already received key.
623                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
624                    // Perhaps this is a continuation of an already received value.
625                    if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
626                        self.push_update(time, diff);
627                    } else {
628                        // New value; complete representation of prior value.
629                        self.result.vals_offs.push(self.result.times.len());
630                        if self.singleton.take().is_some() { self.singletons += 1; }
631                        self.push_update(time, diff);
632                        self.result.vals.push(val);
633                    }
634                } else {
635                    // New key; complete representation of prior key.
636                    self.result.vals_offs.push(self.result.times.len());
637                    if self.singleton.take().is_some() { self.singletons += 1; }
638                    self.result.keys_offs.push(self.result.vals.len());
639                    self.push_update(time, diff);
640                    self.result.vals.push(val);
641                    self.result.keys.push(key);
642                }
643            }
644        }
645
646        #[inline(never)]
647        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
648            // Record the final offsets
649            self.result.vals_offs.push(self.result.times.len());
650            // Remove any pending singleton, and if it was set increment our count.
651            if self.singleton.take().is_some() { self.singletons += 1; }
652            self.result.keys_offs.push(self.result.vals.len());
653            OrdValBatch {
654                updates: self.result.times.len() + self.singletons,
655                storage: self.result,
656                description,
657            }
658        }
659
660        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
661            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
662            let mut builder = Self::with_capacity(keys, vals, upds);
663            for mut chunk in chain.drain(..) {
664                builder.push(&mut chunk);
665            }
666    
667            builder.done(description)
668        }
669    }
670}
671
672mod key_batch {
673
674    use std::marker::PhantomData;
675    use serde::{Deserialize, Serialize};
676    use timely::container::PushInto;
677    use timely::progress::{Antichain, frontier::AntichainRef};
678
679    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
680    use crate::trace::implementations::{BatchContainer, BuilderInput};
681    use crate::IntoOwned;
682
683    use super::{Layout, Update};
684
685    /// An immutable collection of update tuples, from a contiguous interval of logical times.
686    #[derive(Debug, Serialize, Deserialize)]
687    pub struct OrdKeyStorage<L: Layout> {
688        /// An ordered list of keys, corresponding to entries in `keys_offs`.
689        pub keys: L::KeyContainer,
690        /// Offsets used to provide indexes from keys to updates.
691        ///
692        /// This list has a special representation that any empty range indicates the singleton
693        /// element just before the range, as if the start were decremented by one. The empty
694        /// range is otherwise an invalid representation, and we borrow it to compactly encode
695        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
696        ///
697        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
698        pub keys_offs: L::OffsetContainer,
699        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
700        pub times: L::TimeContainer,
701        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
702        pub diffs: L::DiffContainer,
703    }
704
705    impl<L: Layout> OrdKeyStorage<L> {
706        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
707        fn updates_for_key(&self, index: usize) -> (usize, usize) {
708            let mut lower = self.keys_offs.index(index);
709            let upper = self.keys_offs.index(index+1);
710            // We use equal lower and upper to encode "singleton update; just before here".
711            // It should only apply when there is a prior element, so `lower` should be greater than zero.
712            if lower == upper {
713                assert!(lower > 0);
714                lower -= 1;
715            }
716            (lower, upper)
717        }
718    }
719
720    /// An immutable collection of update tuples, from a contiguous interval of logical times.
721    ///
722    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
723    /// merge batcher to select.
724    #[derive(Serialize, Deserialize)]
725    #[serde(bound = "
726        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
727        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
728        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
729        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
730    ")]
731    pub struct OrdKeyBatch<L: Layout> {
732        /// The updates themselves.
733        pub storage: OrdKeyStorage<L>,
734        /// Description of the update times this layer represents.
735        pub description: Description<<L::Target as Update>::Time>,
736        /// The number of updates reflected in the batch.
737        ///
738        /// We track this separately from `storage` because due to the singleton optimization,
739        /// we may have many more updates than `storage.updates.len()`. It should equal that
740        /// length, plus the number of singleton optimizations employed.
741        pub updates: usize,
742    }
743
744    impl<L: Layout> BatchReader for OrdKeyBatch<L> {
745        
746        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
747        type Val<'a> = &'a ();
748        type Time = <L::Target as Update>::Time;
749        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
750        type Diff = <L::Target as Update>::Diff;
751        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
752
753        type Cursor = OrdKeyCursor<L>;
754        fn cursor(&self) -> Self::Cursor {
755            OrdKeyCursor {
756                key_cursor: 0,
757                val_stepped: false,
758                phantom: std::marker::PhantomData,
759            }
760        }
761        fn len(&self) -> usize {
762            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
763            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
764            self.updates
765        }
766        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
767    }
768
769    impl<L: Layout> Batch for OrdKeyBatch<L> {
770        type Merger = OrdKeyMerger<L>;
771
772        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
773            OrdKeyMerger::new(self, other, compaction_frontier)
774        }
775
776        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
777            use timely::progress::Timestamp;
778            Self {
779                storage: OrdKeyStorage {
780                    keys: L::KeyContainer::with_capacity(0),
781                    keys_offs: L::OffsetContainer::with_capacity(0),
782                    times: L::TimeContainer::with_capacity(0),
783                    diffs: L::DiffContainer::with_capacity(0),
784                },
785                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
786                updates: 0,
787            }
788        }
789    }
790
791    /// State for an in-progress merge.
792    pub struct OrdKeyMerger<L: Layout> {
793        /// Key position to merge next in the first batch.
794        key_cursor1: usize,
795        /// Key position to merge next in the second batch.
796        key_cursor2: usize,
797        /// result that we are currently assembling.
798        result: OrdKeyStorage<L>,
799        /// description
800        description: Description<<L::Target as Update>::Time>,
801
802        /// Local stash of updates, to use for consolidation.
803        ///
804        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
805        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
806        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
807        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
808        singletons: usize,
809    }
810
811    impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
812    where
813        OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
814        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
815        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
816    {
817        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
818
819            assert!(batch1.upper() == batch2.lower());
820            use crate::lattice::Lattice;
821            let mut since = batch1.description().since().join(batch2.description().since());
822            since = since.join(&compaction_frontier.to_owned());
823
824            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
825
826            let batch1 = &batch1.storage;
827            let batch2 = &batch2.storage;
828
829            let mut storage = OrdKeyStorage {
830                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
831                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
832                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
833                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
834            };
835
836            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
837            keys_offs.push(0);
838
839            OrdKeyMerger {
840                key_cursor1: 0,
841                key_cursor2: 0,
842                result: storage,
843                description,
844                update_stash: Vec::new(),
845                singletons: 0,
846            }
847        }
848        fn done(self) -> OrdKeyBatch<L> {
849            OrdKeyBatch {
850                updates: self.result.times.len() + self.singletons,
851                storage: self.result,
852                description: self.description,
853            }
854        }
855        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
856
857            // An (incomplete) indication of the amount of work we've done so far.
858            let starting_updates = self.result.times.len();
859            let mut effort = 0isize;
860
861            // While both mergees are still active, perform single-key merges.
862            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
863                self.merge_key(&source1.storage, &source2.storage);
864                // An (incomplete) accounting of the work we've done.
865                effort = (self.result.times.len() - starting_updates) as isize;
866            }
867
868            // Merging is complete, and only copying remains.
869            // Key-by-key copying allows effort interruption, and compaction.
870            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
871                self.copy_key(&source1.storage, self.key_cursor1);
872                self.key_cursor1 += 1;
873                effort = (self.result.times.len() - starting_updates) as isize;
874            }
875            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
876                self.copy_key(&source2.storage, self.key_cursor2);
877                self.key_cursor2 += 1;
878                effort = (self.result.times.len() - starting_updates) as isize;
879            }
880
881            *fuel -= effort;
882        }
883    }
884
885    // Helper methods in support of merging batches.
886    impl<L: Layout> OrdKeyMerger<L> {
887        /// Copy the next key in `source`.
888        ///
889        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
890        /// If the result does not wholly cancel, they key will be present in `self` with the
891        /// compacted values and updates. 
892        /// 
893        /// The caller should be certain to update the cursor, as this method does not do this.
894        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
895            self.stash_updates_for_key(source, cursor);
896            if let Some(off) = self.consolidate_updates() {
897                self.result.keys_offs.push(off);
898                self.result.keys.push(source.keys.index(cursor));
899            }
900        }
901        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
902        ///
903        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
904        /// if the updates cancel either directly or after compaction.
905        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
906            use ::std::cmp::Ordering;
907            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
908                Ordering::Less => { 
909                    self.copy_key(source1, self.key_cursor1);
910                    self.key_cursor1 += 1;
911                },
912                Ordering::Equal => {
913                    // Keys are equal; must merge all updates from both sources for this one key.
914                    self.stash_updates_for_key(source1, self.key_cursor1);
915                    self.stash_updates_for_key(source2, self.key_cursor2);
916                    if let Some(off) = self.consolidate_updates() {
917                        self.result.keys_offs.push(off);
918                        self.result.keys.push(source1.keys.index(self.key_cursor1));
919                    }
920                    // Increment cursors in either case; the keys are merged.
921                    self.key_cursor1 += 1;
922                    self.key_cursor2 += 1;
923                },
924                Ordering::Greater => {
925                    self.copy_key(source2, self.key_cursor2);
926                    self.key_cursor2 += 1;
927                },
928            }
929        }
930
931        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
932        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
933            let (lower, upper) = source.updates_for_key(index);
934            for i in lower .. upper {
935                // NB: Here is where we would need to look back if `lower == upper`.
936                let time = source.times.index(i);
937                let diff = source.diffs.index(i);
938                use crate::lattice::Lattice;
939                let mut new_time = time.into_owned();
940                new_time.advance_by(self.description.since().borrow());
941                self.update_stash.push((new_time, diff.into_owned()));
942            }
943        }
944
945        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
946        fn consolidate_updates(&mut self) -> Option<usize> {
947            use crate::consolidation;
948            consolidation::consolidate(&mut self.update_stash);
949            if !self.update_stash.is_empty() {
950                // If there is a single element, equal to a just-prior recorded update,
951                // we push nothing and report an unincremented offset to encode this case.
952                let time_diff = self.result.times.last().zip(self.result.diffs.last());
953                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
954                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
955                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
956                    t1.eq(&t2) && d1.eq(&d2)
957                });
958                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
959                    // Just clear out update_stash, as we won't drain it here.
960                    self.update_stash.clear();
961                    self.singletons += 1;
962                }
963                else {
964                    // Conventional; move `update_stash` into `updates`.
965                    for (time, diff) in self.update_stash.drain(..) {
966                        self.result.times.push(time);
967                        self.result.diffs.push(diff);
968                    }
969                }
970                Some(self.result.times.len())
971            } else {
972                None
973            }
974        }
975    }
976
977    /// A cursor for navigating a single layer.
978    pub struct OrdKeyCursor<L: Layout> {
979        /// Absolute position of the current key.
980        key_cursor: usize,
981        /// If the value has been stepped for the key, there are no more values.
982        val_stepped: bool,
983        /// Phantom marker for Rust happiness.
984        phantom: PhantomData<L>,
985    }
986
987    impl<L: Layout> Cursor for OrdKeyCursor<L> {
988        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
989        type Val<'a> = &'a ();
990        type Time = <L::Target as Update>::Time;
991        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
992        type Diff = <L::Target as Update>::Diff;
993        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
994
995        type Storage = OrdKeyBatch<L>;
996
997        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
998        fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
999        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1000            let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1001            for index in lower .. upper {
1002                let time = storage.storage.times.index(index);
1003                let diff = storage.storage.diffs.index(index);
1004                logic(time, diff);
1005            }
1006        }
1007        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1008        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1009        fn step_key(&mut self, storage: &Self::Storage){
1010            self.key_cursor += 1;
1011            if self.key_valid(storage) {
1012                self.rewind_vals(storage);
1013            }
1014            else {
1015                self.key_cursor = storage.storage.keys.len();
1016            }
1017        }
1018        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1019            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1020            if self.key_valid(storage) {
1021                self.rewind_vals(storage);
1022            }
1023        }
1024        fn step_val(&mut self, _storage: &Self::Storage) {
1025            self.val_stepped = true;
1026        }
1027        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1028        fn rewind_keys(&mut self, storage: &Self::Storage) {
1029            self.key_cursor = 0;
1030            if self.key_valid(storage) {
1031                self.rewind_vals(storage)
1032            }
1033        }
1034        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1035            self.val_stepped = false;
1036        }
1037    }
1038
1039    /// A builder for creating layers from unsorted update tuples.
1040    pub struct OrdKeyBuilder<L: Layout, CI> {
1041        result: OrdKeyStorage<L>,
1042        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1043        /// Counts the number of singleton optimizations we performed.
1044        ///
1045        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
1046        /// even though `updates.len()` may be much shorter than this amount.
1047        singletons: usize,
1048        _marker: PhantomData<CI>,
1049    }
1050
1051    impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1052        /// Pushes a single update, which may set `self.singleton` rather than push.
1053        ///
1054        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
1055        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
1056        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
1057        /// This otherwise invalid state encodes "look back one element".
1058        ///
1059        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
1060        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
1061        /// The update tuple is retained in `self.singleton` in case we see another update and need
1062        /// to recover the singleton to push it into `updates` to join the second update.
1063        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1064            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
1065            let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1066            let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1067            if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1068                assert!(self.singleton.is_none());
1069                self.singleton = Some((time, diff));
1070            }
1071            else {
1072                // If we have pushed a single element, we need to copy it out to meet this one.
1073                if let Some((time, diff)) = self.singleton.take() {
1074                    self.result.times.push(time);
1075                    self.result.diffs.push(diff);
1076                }
1077                self.result.times.push(time);
1078                self.result.diffs.push(diff);
1079            }
1080        }
1081    }
1082
1083    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1084    where
1085        L: Layout,
1086        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1087        for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
1088        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1089        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1090    {
1091
1092        type Input = CI;
1093        type Time = <L::Target as Update>::Time;
1094        type Output = OrdKeyBatch<L>;
1095
1096        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1097            // We don't introduce zero offsets as they will be introduced by the first `push` call.
1098            Self { 
1099                result: OrdKeyStorage {
1100                    keys: L::KeyContainer::with_capacity(keys),
1101                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1102                    times: L::TimeContainer::with_capacity(upds),
1103                    diffs: L::DiffContainer::with_capacity(upds),
1104                },
1105                singleton: None,
1106                singletons: 0,
1107                _marker: PhantomData,
1108            }
1109        }
1110
1111        #[inline]
1112        fn push(&mut self, chunk: &mut Self::Input) {
1113            for item in chunk.drain() {
1114                let (key, _val, time, diff) = CI::into_parts(item);
1115                // Perhaps this is a continuation of an already received key.
1116                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1117                    self.push_update(time, diff);
1118                } else {
1119                    // New key; complete representation of prior key.
1120                    self.result.keys_offs.push(self.result.times.len());
1121                    // Remove any pending singleton, and if it was set increment our count.
1122                    if self.singleton.take().is_some() { self.singletons += 1; }
1123                    self.push_update(time, diff);
1124                    self.result.keys.push(key);
1125                }
1126            }
1127        }
1128
1129        #[inline(never)]
1130        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1131            // Record the final offsets
1132            self.result.keys_offs.push(self.result.times.len());
1133            // Remove any pending singleton, and if it was set increment our count.
1134            if self.singleton.take().is_some() { self.singletons += 1; }
1135            OrdKeyBatch {
1136                updates: self.result.times.len() + self.singletons,
1137                storage: self.result,
1138                description,
1139            }
1140        }
1141
1142        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1143            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1144            let mut builder = Self::with_capacity(keys, vals, upds);
1145            for mut chunk in chain.drain(..) {
1146                builder.push(&mut chunk);
1147            }
1148    
1149            builder.done(description)
1150        }
1151    }
1152
1153}