differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder using ordered lists.
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41/// A trace implementation using a spine of ordered lists.
42pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43/// A batcher for ordered lists.
44pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45/// A builder for ordered lists.
46pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48// /// A trace implementation for empty values using a spine of ordered lists.
49// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
50
51/// A trace implementation backed by columnar storage.
52pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53/// A batcher for columnar storage
54pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55/// A builder for columnar storage
56pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58/// A trace implementation backed by columnar storage.
59pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60/// A batcher for columnar storage.
61pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62/// A builder for columnar storage.
63pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65// /// A trace implementation backed by columnar storage.
66// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
67
68
69mod val_batch {
70
71    use std::marker::PhantomData;
72    use serde::{Deserialize, Serialize};
73    use timely::container::PushInto;
74    use timely::progress::{Antichain, frontier::AntichainRef};
75
76    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
77    use crate::trace::implementations::{BatchContainer, BuilderInput};
78    use crate::IntoOwned;
79
80    use super::{Layout, Update};
81
82    /// An immutable collection of update tuples, from a contiguous interval of logical times.
83    #[derive(Debug, Serialize, Deserialize)]
84    pub struct OrdValStorage<L: Layout> {
85        /// An ordered list of keys, corresponding to entries in `keys_offs`.
86        pub keys: L::KeyContainer,
87        /// Offsets used to provide indexes from keys to values.
88        ///
89        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
90        pub keys_offs: L::OffsetContainer,
91        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
92        pub vals: L::ValContainer,
93        /// Offsets used to provide indexes from values to updates.
94        ///
95        /// This list has a special representation that any empty range indicates the singleton
96        /// element just before the range, as if the start were decremented by one. The empty
97        /// range is otherwise an invalid representation, and we borrow it to compactly encode
98        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
99        ///
100        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
101        pub vals_offs: L::OffsetContainer,
102        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
103        pub times: L::TimeContainer,
104        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
105        pub diffs: L::DiffContainer,
106    }
107
108    impl<L: Layout> OrdValStorage<L> {
109        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
110        fn values_for_key(&self, index: usize) -> (usize, usize) {
111            (self.keys_offs.index(index), self.keys_offs.index(index+1))
112        }
113        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
114        fn updates_for_value(&self, index: usize) -> (usize, usize) {
115            let mut lower = self.vals_offs.index(index);
116            let upper = self.vals_offs.index(index+1);
117            // We use equal lower and upper to encode "singleton update; just before here".
118            // It should only apply when there is a prior element, so `lower` should be greater than zero.
119            if lower == upper {
120                assert!(lower > 0);
121                lower -= 1;
122            }
123            (lower, upper)
124        }
125    }
126
127    /// An immutable collection of update tuples, from a contiguous interval of logical times.
128    ///
129    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
130    /// merge batcher to select.
131    #[derive(Serialize, Deserialize)]
132    #[serde(bound = "
133        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
134        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
135        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
136        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
137        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
138    ")]
139    pub struct OrdValBatch<L: Layout> {
140        /// The updates themselves.
141        pub storage: OrdValStorage<L>,
142        /// Description of the update times this layer represents.
143        pub description: Description<<L::Target as Update>::Time>,
144        /// The number of updates reflected in the batch.
145        ///
146        /// We track this separately from `storage` because due to the singleton optimization,
147        /// we may have many more updates than `storage.updates.len()`. It should equal that 
148        /// length, plus the number of singleton optimizations employed.
149        pub updates: usize,
150    }
151
152    impl<L: Layout> BatchReader for OrdValBatch<L> {
153        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
154        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
155        type Time = <L::Target as Update>::Time;
156        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
157        type Diff = <L::Target as Update>::Diff;
158        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
159
160        type Cursor = OrdValCursor<L>;
161        fn cursor(&self) -> Self::Cursor { 
162            OrdValCursor {
163                key_cursor: 0,
164                val_cursor: 0,
165                phantom: PhantomData,
166            }
167        }
168        fn len(&self) -> usize { 
169            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
170            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
171            self.updates
172        }
173        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
174    }
175
176    impl<L: Layout> Batch for OrdValBatch<L> {
177        type Merger = OrdValMerger<L>;
178
179        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
180            OrdValMerger::new(self, other, compaction_frontier)
181        }
182
183        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
184            use timely::progress::Timestamp;
185            Self {
186                storage: OrdValStorage {
187                    keys: L::KeyContainer::with_capacity(0),
188                    keys_offs: L::OffsetContainer::with_capacity(0),
189                    vals: L::ValContainer::with_capacity(0),
190                    vals_offs: L::OffsetContainer::with_capacity(0),
191                    times: L::TimeContainer::with_capacity(0),
192                    diffs: L::DiffContainer::with_capacity(0),
193                },
194                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
195                updates: 0,
196            }
197        }
198    }
199
200    /// State for an in-progress merge.
201    pub struct OrdValMerger<L: Layout> {
202        /// Key position to merge next in the first batch.
203        key_cursor1: usize,
204        /// Key position to merge next in the second batch.
205        key_cursor2: usize,
206        /// result that we are currently assembling.
207        result: OrdValStorage<L>,
208        /// description
209        description: Description<<L::Target as Update>::Time>,
210
211        /// Local stash of updates, to use for consolidation.
212        ///
213        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
214        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
215        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
216        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
217        singletons: usize,
218    }
219
220    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
221    where
222        OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
223        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
224        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
225    {
226        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
227
228            assert!(batch1.upper() == batch2.lower());
229            use crate::lattice::Lattice;
230            let mut since = batch1.description().since().join(batch2.description().since());
231            since = since.join(&compaction_frontier.to_owned());
232
233            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
234
235            let batch1 = &batch1.storage;
236            let batch2 = &batch2.storage;
237
238            let mut storage = OrdValStorage {
239                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
240                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
241                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
242                vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
243                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
244                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
245            };
246
247            // Mark explicit types because type inference fails to resolve it.
248            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
249            keys_offs.push(0);
250            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
251            vals_offs.push(0);
252
253            OrdValMerger {
254                key_cursor1: 0,
255                key_cursor2: 0,
256                result: storage,
257                description,
258                update_stash: Vec::new(),
259                singletons: 0,
260            }
261        }
262        fn done(self) -> OrdValBatch<L> {
263            OrdValBatch {
264                updates: self.result.times.len() + self.singletons,
265                storage: self.result,
266                description: self.description,
267            }
268        }
269        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
270
271            // An (incomplete) indication of the amount of work we've done so far.
272            let starting_updates = self.result.times.len();
273            let mut effort = 0isize;
274
275            // While both mergees are still active, perform single-key merges.
276            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
277                self.merge_key(&source1.storage, &source2.storage);
278                // An (incomplete) accounting of the work we've done.
279                effort = (self.result.times.len() - starting_updates) as isize;
280            }
281
282            // Merging is complete, and only copying remains. 
283            // Key-by-key copying allows effort interruption, and compaction.
284            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
285                self.copy_key(&source1.storage, self.key_cursor1);
286                self.key_cursor1 += 1;
287                effort = (self.result.times.len() - starting_updates) as isize;
288            }
289            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
290                self.copy_key(&source2.storage, self.key_cursor2);
291                self.key_cursor2 += 1;
292                effort = (self.result.times.len() - starting_updates) as isize;
293            }
294
295            *fuel -= effort;
296        }
297    }
298
299    // Helper methods in support of merging batches.
300    impl<L: Layout> OrdValMerger<L> {
301        /// Copy the next key in `source`.
302        ///
303        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
304        /// If the result does not wholly cancel, they key will be present in `self` with the
305        /// compacted values and updates.
306        ///
307        /// The caller should be certain to update the cursor, as this method does not do this.
308        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
309            // Capture the initial number of values to determine if the merge was ultimately non-empty.
310            let init_vals = self.result.vals.len();
311            let (mut lower, upper) = source.values_for_key(cursor);
312            while lower < upper {
313                self.stash_updates_for_val(source, lower);
314                if let Some(off) = self.consolidate_updates() {
315                    self.result.vals_offs.push(off);
316                    self.result.vals.push(source.vals.index(lower));
317                }
318                lower += 1;
319            }            
320
321            // If we have pushed any values, copy the key as well.
322            if self.result.vals.len() > init_vals {
323                self.result.keys.push(source.keys.index(cursor));
324                self.result.keys_offs.push(self.result.vals.len());
325            }           
326        }
327        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
328        ///
329        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
330        /// if the updates cancel either directly or after compaction.
331        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
332            use ::std::cmp::Ordering;
333            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
334                Ordering::Less => { 
335                    self.copy_key(source1, self.key_cursor1);
336                    self.key_cursor1 += 1;
337                },
338                Ordering::Equal => {
339                    // Keys are equal; must merge all values from both sources for this one key.
340                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
341                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
342                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
343                        self.result.keys.push(source1.keys.index(self.key_cursor1));
344                        self.result.keys_offs.push(off);
345                    }
346                    // Increment cursors in either case; the keys are merged.
347                    self.key_cursor1 += 1;
348                    self.key_cursor2 += 1;
349                },
350                Ordering::Greater => {
351                    self.copy_key(source2, self.key_cursor2);
352                    self.key_cursor2 += 1;
353                },
354            }
355        }
356        /// Merge two ranges of values into `self`.
357        ///
358        /// If the compacted result contains values with non-empty updates, the function returns
359        /// an offset that should be recorded to indicate the upper extent of the result values.
360        fn merge_vals(
361            &mut self, 
362            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize), 
363            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
364        ) -> Option<usize> {
365            // Capture the initial number of values to determine if the merge was ultimately non-empty.
366            let init_vals = self.result.vals.len();
367            while lower1 < upper1 && lower2 < upper2 {
368                // We compare values, and fold in updates for the lowest values;
369                // if they are non-empty post-consolidation, we write the value.
370                // We could multi-way merge and it wouldn't be very complicated.
371                use ::std::cmp::Ordering;
372                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
373                    Ordering::Less => { 
374                        // Extend stash by updates, with logical compaction applied.
375                        self.stash_updates_for_val(source1, lower1);
376                        if let Some(off) = self.consolidate_updates() {
377                            self.result.vals_offs.push(off);
378                            self.result.vals.push(source1.vals.index(lower1));
379                        }
380                        lower1 += 1;
381                    },
382                    Ordering::Equal => {
383                        self.stash_updates_for_val(source1, lower1);
384                        self.stash_updates_for_val(source2, lower2);
385                        if let Some(off) = self.consolidate_updates() {
386                            self.result.vals_offs.push(off);
387                            self.result.vals.push(source1.vals.index(lower1));
388                        }
389                        lower1 += 1;
390                        lower2 += 1;
391                    },
392                    Ordering::Greater => { 
393                        // Extend stash by updates, with logical compaction applied.
394                        self.stash_updates_for_val(source2, lower2);
395                        if let Some(off) = self.consolidate_updates() {
396                            self.result.vals_offs.push(off);
397                            self.result.vals.push(source2.vals.index(lower2));
398                        }
399                        lower2 += 1;
400                    },
401                }
402            }
403            // Merging is complete, but we may have remaining elements to push.
404            while lower1 < upper1 {
405                self.stash_updates_for_val(source1, lower1);
406                if let Some(off) = self.consolidate_updates() {
407                    self.result.vals_offs.push(off);
408                    self.result.vals.push(source1.vals.index(lower1));
409                }
410                lower1 += 1;
411            }
412            while lower2 < upper2 {
413                self.stash_updates_for_val(source2, lower2);
414                if let Some(off) = self.consolidate_updates() {
415                    self.result.vals_offs.push(off);
416                    self.result.vals.push(source2.vals.index(lower2));
417                }
418                lower2 += 1;
419            }
420
421            // Values being pushed indicate non-emptiness.
422            if self.result.vals.len() > init_vals {
423                Some(self.result.vals.len())
424            } else {
425                None
426            }
427        }
428
429        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
430        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
431            let (lower, upper) = source.updates_for_value(index);
432            for i in lower .. upper {
433                // NB: Here is where we would need to look back if `lower == upper`.
434                let time = source.times.index(i);
435                let diff = source.diffs.index(i);
436                use crate::lattice::Lattice;
437                let mut new_time: <L::Target as Update>::Time = time.into_owned();
438                new_time.advance_by(self.description.since().borrow());
439                self.update_stash.push((new_time, diff.into_owned()));
440            }
441        }
442
443        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
444        fn consolidate_updates(&mut self) -> Option<usize> {
445            use crate::consolidation;
446            consolidation::consolidate(&mut self.update_stash);
447            if !self.update_stash.is_empty() {
448                // If there is a single element, equal to a just-prior recorded update,
449                // we push nothing and report an unincremented offset to encode this case.
450                let time_diff = self.result.times.last().zip(self.result.diffs.last());
451                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
452                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
453                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
454                    t1.eq(&t2) && d1.eq(&d2)
455                });
456                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
457                    // Just clear out update_stash, as we won't drain it here.
458                    self.update_stash.clear();
459                    self.singletons += 1;
460                }
461                else {
462                    // Conventional; move `update_stash` into `updates`.
463                    for (time, diff) in self.update_stash.drain(..) {
464                        self.result.times.push(time);
465                        self.result.diffs.push(diff);
466                    }
467                }
468                Some(self.result.times.len())
469            } else {
470                None
471            }
472        }
473    }
474
475    /// A cursor for navigating a single layer.
476    pub struct OrdValCursor<L: Layout> {
477        /// Absolute position of the current key.
478        key_cursor: usize,
479        /// Absolute position of the current value.
480        val_cursor: usize,
481        /// Phantom marker for Rust happiness.
482        phantom: PhantomData<L>,
483    }
484
485    impl<L: Layout> Cursor for OrdValCursor<L> {
486
487        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
488        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
489        type Time = <L::Target as Update>::Time;
490        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
491        type Diff = <L::Target as Update>::Diff;
492        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
493
494        type Storage = OrdValBatch<L>;
495
496        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
497        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
498
499        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
500        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
501        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
502            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
503            for index in lower .. upper {
504                let time = storage.storage.times.index(index);
505                let diff = storage.storage.diffs.index(index);
506                logic(time, diff);
507            }
508        }
509        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
510        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
511        fn step_key(&mut self, storage: &OrdValBatch<L>){
512            self.key_cursor += 1;
513            if self.key_valid(storage) {
514                self.rewind_vals(storage);
515            }
516            else {
517                self.key_cursor = storage.storage.keys.len();
518            }
519        }
520        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
521            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
522            if self.key_valid(storage) {
523                self.rewind_vals(storage);
524            }
525        }
526        fn step_val(&mut self, storage: &OrdValBatch<L>) {
527            self.val_cursor += 1; 
528            if !self.val_valid(storage) {
529                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
530            }
531        }
532        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
533            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
534        }
535        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
536            self.key_cursor = 0;
537            if self.key_valid(storage) {
538                self.rewind_vals(storage)
539            }
540        }
541        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
542            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
543        }
544    }
545
546    /// A builder for creating layers from unsorted update tuples.
547    pub struct OrdValBuilder<L: Layout, CI> {
548        /// The in-progress result.
549        ///
550        /// This is public to allow container implementors to set and inspect their container.
551        pub result: OrdValStorage<L>,
552        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
553        /// Counts the number of singleton optimizations we performed.
554        ///
555        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
556        /// even though `updates.len()` may be much shorter than this amount.
557        singletons: usize,
558        _marker: PhantomData<CI>,
559    }
560
561    impl<L: Layout, CI> OrdValBuilder<L, CI> {
562        /// Pushes a single update, which may set `self.singleton` rather than push.
563        ///
564        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
565        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
566        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
567        /// This otherwise invalid state encodes "look back one element".
568        ///
569        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
570        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
571        /// The update tuple is retained in `self.singleton` in case we see another update and need
572        /// to recover the singleton to push it into `updates` to join the second update.
573        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
574            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
575            if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
576               self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
577            {
578                assert!(self.singleton.is_none());
579                self.singleton = Some((time, diff));
580            }
581            else {
582                // If we have pushed a single element, we need to copy it out to meet this one.
583                if let Some((time, diff)) = self.singleton.take() {
584                    self.result.times.push(time);
585                    self.result.diffs.push(diff);
586                }
587                self.result.times.push(time);
588                self.result.diffs.push(diff);
589            }
590        }
591    }
592
593    impl<L, CI> Builder for OrdValBuilder<L, CI>
594    where
595        L: Layout,
596        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
597        for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
598        for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
599        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
600        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
601    {
602
603        type Input = CI;
604        type Time = <L::Target as Update>::Time;
605        type Output = OrdValBatch<L>;
606
607        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
608            // We don't introduce zero offsets as they will be introduced by the first `push` call.
609            Self { 
610                result: OrdValStorage {
611                    keys: L::KeyContainer::with_capacity(keys),
612                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
613                    vals: L::ValContainer::with_capacity(vals),
614                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
615                    times: L::TimeContainer::with_capacity(upds),
616                    diffs: L::DiffContainer::with_capacity(upds),
617                },
618                singleton: None,
619                singletons: 0,
620                _marker: PhantomData,
621            }
622        }
623
624        #[inline]
625        fn push(&mut self, chunk: &mut Self::Input) {
626            for item in chunk.drain() {
627                let (key, val, time, diff) = CI::into_parts(item);
628                // Perhaps this is a continuation of an already received key.
629                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
630                    // Perhaps this is a continuation of an already received value.
631                    if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
632                        self.push_update(time, diff);
633                    } else {
634                        // New value; complete representation of prior value.
635                        self.result.vals_offs.push(self.result.times.len());
636                        if self.singleton.take().is_some() { self.singletons += 1; }
637                        self.push_update(time, diff);
638                        self.result.vals.push(val);
639                    }
640                } else {
641                    // New key; complete representation of prior key.
642                    self.result.vals_offs.push(self.result.times.len());
643                    if self.singleton.take().is_some() { self.singletons += 1; }
644                    self.result.keys_offs.push(self.result.vals.len());
645                    self.push_update(time, diff);
646                    self.result.vals.push(val);
647                    self.result.keys.push(key);
648                }
649            }
650        }
651
652        #[inline(never)]
653        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
654            // Record the final offsets
655            self.result.vals_offs.push(self.result.times.len());
656            // Remove any pending singleton, and if it was set increment our count.
657            if self.singleton.take().is_some() { self.singletons += 1; }
658            self.result.keys_offs.push(self.result.vals.len());
659            OrdValBatch {
660                updates: self.result.times.len() + self.singletons,
661                storage: self.result,
662                description,
663            }
664        }
665
666        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
667            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
668            let mut builder = Self::with_capacity(keys, vals, upds);
669            for mut chunk in chain.drain(..) {
670                builder.push(&mut chunk);
671            }
672    
673            builder.done(description)
674        }
675    }
676}
677
678mod key_batch {
679
680    use std::marker::PhantomData;
681    use serde::{Deserialize, Serialize};
682    use timely::container::PushInto;
683    use timely::progress::{Antichain, frontier::AntichainRef};
684
685    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
686    use crate::trace::implementations::{BatchContainer, BuilderInput};
687    use crate::IntoOwned;
688
689    use super::{Layout, Update};
690
691    /// An immutable collection of update tuples, from a contiguous interval of logical times.
692    #[derive(Debug, Serialize, Deserialize)]
693    pub struct OrdKeyStorage<L: Layout> {
694        /// An ordered list of keys, corresponding to entries in `keys_offs`.
695        pub keys: L::KeyContainer,
696        /// Offsets used to provide indexes from keys to updates.
697        ///
698        /// This list has a special representation that any empty range indicates the singleton
699        /// element just before the range, as if the start were decremented by one. The empty
700        /// range is otherwise an invalid representation, and we borrow it to compactly encode
701        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
702        ///
703        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
704        pub keys_offs: L::OffsetContainer,
705        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
706        pub times: L::TimeContainer,
707        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
708        pub diffs: L::DiffContainer,
709    }
710
711    impl<L: Layout> OrdKeyStorage<L> {
712        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
713        fn updates_for_key(&self, index: usize) -> (usize, usize) {
714            let mut lower = self.keys_offs.index(index);
715            let upper = self.keys_offs.index(index+1);
716            // We use equal lower and upper to encode "singleton update; just before here".
717            // It should only apply when there is a prior element, so `lower` should be greater than zero.
718            if lower == upper {
719                assert!(lower > 0);
720                lower -= 1;
721            }
722            (lower, upper)
723        }
724    }
725
726    /// An immutable collection of update tuples, from a contiguous interval of logical times.
727    ///
728    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
729    /// merge batcher to select.
730    #[derive(Serialize, Deserialize)]
731    #[serde(bound = "
732        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
733        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
734        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
735        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
736    ")]
737    pub struct OrdKeyBatch<L: Layout> {
738        /// The updates themselves.
739        pub storage: OrdKeyStorage<L>,
740        /// Description of the update times this layer represents.
741        pub description: Description<<L::Target as Update>::Time>,
742        /// The number of updates reflected in the batch.
743        ///
744        /// We track this separately from `storage` because due to the singleton optimization,
745        /// we may have many more updates than `storage.updates.len()`. It should equal that
746        /// length, plus the number of singleton optimizations employed.
747        pub updates: usize,
748    }
749
750    impl<L: Layout> BatchReader for OrdKeyBatch<L> {
751        
752        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
753        type Val<'a> = &'a ();
754        type Time = <L::Target as Update>::Time;
755        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
756        type Diff = <L::Target as Update>::Diff;
757        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
758
759        type Cursor = OrdKeyCursor<L>;
760        fn cursor(&self) -> Self::Cursor {
761            OrdKeyCursor {
762                key_cursor: 0,
763                val_stepped: false,
764                phantom: std::marker::PhantomData,
765            }
766        }
767        fn len(&self) -> usize {
768            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
769            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
770            self.updates
771        }
772        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
773    }
774
775    impl<L: Layout> Batch for OrdKeyBatch<L> {
776        type Merger = OrdKeyMerger<L>;
777
778        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
779            OrdKeyMerger::new(self, other, compaction_frontier)
780        }
781
782        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
783            use timely::progress::Timestamp;
784            Self {
785                storage: OrdKeyStorage {
786                    keys: L::KeyContainer::with_capacity(0),
787                    keys_offs: L::OffsetContainer::with_capacity(0),
788                    times: L::TimeContainer::with_capacity(0),
789                    diffs: L::DiffContainer::with_capacity(0),
790                },
791                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
792                updates: 0,
793            }
794        }
795    }
796
797    /// State for an in-progress merge.
798    pub struct OrdKeyMerger<L: Layout> {
799        /// Key position to merge next in the first batch.
800        key_cursor1: usize,
801        /// Key position to merge next in the second batch.
802        key_cursor2: usize,
803        /// result that we are currently assembling.
804        result: OrdKeyStorage<L>,
805        /// description
806        description: Description<<L::Target as Update>::Time>,
807
808        /// Local stash of updates, to use for consolidation.
809        ///
810        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
811        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
812        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
813        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
814        singletons: usize,
815    }
816
817    impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
818    where
819        OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
820        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
821        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
822    {
823        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
824
825            assert!(batch1.upper() == batch2.lower());
826            use crate::lattice::Lattice;
827            let mut since = batch1.description().since().join(batch2.description().since());
828            since = since.join(&compaction_frontier.to_owned());
829
830            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
831
832            let batch1 = &batch1.storage;
833            let batch2 = &batch2.storage;
834
835            let mut storage = OrdKeyStorage {
836                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
837                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
838                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
839                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
840            };
841
842            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
843            keys_offs.push(0);
844
845            OrdKeyMerger {
846                key_cursor1: 0,
847                key_cursor2: 0,
848                result: storage,
849                description,
850                update_stash: Vec::new(),
851                singletons: 0,
852            }
853        }
854        fn done(self) -> OrdKeyBatch<L> {
855            OrdKeyBatch {
856                updates: self.result.times.len() + self.singletons,
857                storage: self.result,
858                description: self.description,
859            }
860        }
861        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
862
863            // An (incomplete) indication of the amount of work we've done so far.
864            let starting_updates = self.result.times.len();
865            let mut effort = 0isize;
866
867            // While both mergees are still active, perform single-key merges.
868            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
869                self.merge_key(&source1.storage, &source2.storage);
870                // An (incomplete) accounting of the work we've done.
871                effort = (self.result.times.len() - starting_updates) as isize;
872            }
873
874            // Merging is complete, and only copying remains.
875            // Key-by-key copying allows effort interruption, and compaction.
876            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
877                self.copy_key(&source1.storage, self.key_cursor1);
878                self.key_cursor1 += 1;
879                effort = (self.result.times.len() - starting_updates) as isize;
880            }
881            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
882                self.copy_key(&source2.storage, self.key_cursor2);
883                self.key_cursor2 += 1;
884                effort = (self.result.times.len() - starting_updates) as isize;
885            }
886
887            *fuel -= effort;
888        }
889    }
890
891    // Helper methods in support of merging batches.
892    impl<L: Layout> OrdKeyMerger<L> {
893        /// Copy the next key in `source`.
894        ///
895        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
896        /// If the result does not wholly cancel, they key will be present in `self` with the
897        /// compacted values and updates. 
898        /// 
899        /// The caller should be certain to update the cursor, as this method does not do this.
900        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
901            self.stash_updates_for_key(source, cursor);
902            if let Some(off) = self.consolidate_updates() {
903                self.result.keys_offs.push(off);
904                self.result.keys.push(source.keys.index(cursor));
905            }
906        }
907        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
908        ///
909        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
910        /// if the updates cancel either directly or after compaction.
911        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
912            use ::std::cmp::Ordering;
913            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
914                Ordering::Less => { 
915                    self.copy_key(source1, self.key_cursor1);
916                    self.key_cursor1 += 1;
917                },
918                Ordering::Equal => {
919                    // Keys are equal; must merge all updates from both sources for this one key.
920                    self.stash_updates_for_key(source1, self.key_cursor1);
921                    self.stash_updates_for_key(source2, self.key_cursor2);
922                    if let Some(off) = self.consolidate_updates() {
923                        self.result.keys_offs.push(off);
924                        self.result.keys.push(source1.keys.index(self.key_cursor1));
925                    }
926                    // Increment cursors in either case; the keys are merged.
927                    self.key_cursor1 += 1;
928                    self.key_cursor2 += 1;
929                },
930                Ordering::Greater => {
931                    self.copy_key(source2, self.key_cursor2);
932                    self.key_cursor2 += 1;
933                },
934            }
935        }
936
937        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
938        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
939            let (lower, upper) = source.updates_for_key(index);
940            for i in lower .. upper {
941                // NB: Here is where we would need to look back if `lower == upper`.
942                let time = source.times.index(i);
943                let diff = source.diffs.index(i);
944                use crate::lattice::Lattice;
945                let mut new_time = time.into_owned();
946                new_time.advance_by(self.description.since().borrow());
947                self.update_stash.push((new_time, diff.into_owned()));
948            }
949        }
950
951        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
952        fn consolidate_updates(&mut self) -> Option<usize> {
953            use crate::consolidation;
954            consolidation::consolidate(&mut self.update_stash);
955            if !self.update_stash.is_empty() {
956                // If there is a single element, equal to a just-prior recorded update,
957                // we push nothing and report an unincremented offset to encode this case.
958                let time_diff = self.result.times.last().zip(self.result.diffs.last());
959                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
960                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
961                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
962                    t1.eq(&t2) && d1.eq(&d2)
963                });
964                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
965                    // Just clear out update_stash, as we won't drain it here.
966                    self.update_stash.clear();
967                    self.singletons += 1;
968                }
969                else {
970                    // Conventional; move `update_stash` into `updates`.
971                    for (time, diff) in self.update_stash.drain(..) {
972                        self.result.times.push(time);
973                        self.result.diffs.push(diff);
974                    }
975                }
976                Some(self.result.times.len())
977            } else {
978                None
979            }
980        }
981    }
982
983    /// A cursor for navigating a single layer.
984    pub struct OrdKeyCursor<L: Layout> {
985        /// Absolute position of the current key.
986        key_cursor: usize,
987        /// If the value has been stepped for the key, there are no more values.
988        val_stepped: bool,
989        /// Phantom marker for Rust happiness.
990        phantom: PhantomData<L>,
991    }
992
993    impl<L: Layout> Cursor for OrdKeyCursor<L> {
994        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
995        type Val<'a> = &'a ();
996        type Time = <L::Target as Update>::Time;
997        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
998        type Diff = <L::Target as Update>::Diff;
999        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
1000
1001        type Storage = OrdKeyBatch<L>;
1002
1003        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1004        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }
1005
1006        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1007        fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
1008        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1009            let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1010            for index in lower .. upper {
1011                let time = storage.storage.times.index(index);
1012                let diff = storage.storage.diffs.index(index);
1013                logic(time, diff);
1014            }
1015        }
1016        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1017        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1018        fn step_key(&mut self, storage: &Self::Storage){
1019            self.key_cursor += 1;
1020            if self.key_valid(storage) {
1021                self.rewind_vals(storage);
1022            }
1023            else {
1024                self.key_cursor = storage.storage.keys.len();
1025            }
1026        }
1027        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1028            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1029            if self.key_valid(storage) {
1030                self.rewind_vals(storage);
1031            }
1032        }
1033        fn step_val(&mut self, _storage: &Self::Storage) {
1034            self.val_stepped = true;
1035        }
1036        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1037        fn rewind_keys(&mut self, storage: &Self::Storage) {
1038            self.key_cursor = 0;
1039            if self.key_valid(storage) {
1040                self.rewind_vals(storage)
1041            }
1042        }
1043        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1044            self.val_stepped = false;
1045        }
1046    }
1047
1048    /// A builder for creating layers from unsorted update tuples.
1049    pub struct OrdKeyBuilder<L: Layout, CI> {
1050        /// The in-progress result.
1051        ///
1052        /// This is public to allow container implementors to set and inspect their container.
1053        pub result: OrdKeyStorage<L>,
1054        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1055        /// Counts the number of singleton optimizations we performed.
1056        ///
1057        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
1058        /// even though `updates.len()` may be much shorter than this amount.
1059        singletons: usize,
1060        _marker: PhantomData<CI>,
1061    }
1062
1063    impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1064        /// Pushes a single update, which may set `self.singleton` rather than push.
1065        ///
1066        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
1067        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
1068        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
1069        /// This otherwise invalid state encodes "look back one element".
1070        ///
1071        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
1072        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
1073        /// The update tuple is retained in `self.singleton` in case we see another update and need
1074        /// to recover the singleton to push it into `updates` to join the second update.
1075        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1076            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
1077            let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1078            let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1079            if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1080                assert!(self.singleton.is_none());
1081                self.singleton = Some((time, diff));
1082            }
1083            else {
1084                // If we have pushed a single element, we need to copy it out to meet this one.
1085                if let Some((time, diff)) = self.singleton.take() {
1086                    self.result.times.push(time);
1087                    self.result.diffs.push(diff);
1088                }
1089                self.result.times.push(time);
1090                self.result.diffs.push(diff);
1091            }
1092        }
1093    }
1094
1095    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1096    where
1097        L: Layout,
1098        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1099        for<'a> L::KeyContainer: PushInto<CI::Key<'a>>,
1100        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1101        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1102    {
1103
1104        type Input = CI;
1105        type Time = <L::Target as Update>::Time;
1106        type Output = OrdKeyBatch<L>;
1107
1108        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1109            // We don't introduce zero offsets as they will be introduced by the first `push` call.
1110            Self { 
1111                result: OrdKeyStorage {
1112                    keys: L::KeyContainer::with_capacity(keys),
1113                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1114                    times: L::TimeContainer::with_capacity(upds),
1115                    diffs: L::DiffContainer::with_capacity(upds),
1116                },
1117                singleton: None,
1118                singletons: 0,
1119                _marker: PhantomData,
1120            }
1121        }
1122
1123        #[inline]
1124        fn push(&mut self, chunk: &mut Self::Input) {
1125            for item in chunk.drain() {
1126                let (key, _val, time, diff) = CI::into_parts(item);
1127                // Perhaps this is a continuation of an already received key.
1128                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1129                    self.push_update(time, diff);
1130                } else {
1131                    // New key; complete representation of prior key.
1132                    self.result.keys_offs.push(self.result.times.len());
1133                    // Remove any pending singleton, and if it was set increment our count.
1134                    if self.singleton.take().is_some() { self.singletons += 1; }
1135                    self.push_update(time, diff);
1136                    self.result.keys.push(key);
1137                }
1138            }
1139        }
1140
1141        #[inline(never)]
1142        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1143            // Record the final offsets
1144            self.result.keys_offs.push(self.result.times.len());
1145            // Remove any pending singleton, and if it was set increment our count.
1146            if self.singleton.take().is_some() { self.singletons += 1; }
1147            OrdKeyBatch {
1148                updates: self.result.times.len() + self.singletons,
1149                storage: self.result,
1150                description,
1151            }
1152        }
1153
1154        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1155            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1156            let mut builder = Self::with_capacity(keys, vals, upds);
1157            for mut chunk in chain.drain(..) {
1158                builder.push(&mut chunk);
1159            }
1160    
1161            builder.done(description)
1162        }
1163    }
1164
1165}