differential_dataflow/trace/implementations/
rhh.rs

1//! Batch implementation based on Robin Hood Hashing.
2//! 
3//! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means
4//! that these implementations should only be used with each other, under 
5//! the same `hash` function, or for types that also order by `(hash(X), X)`,
6//! for example wrapped types that implement `Ord` that way.
7
8use std::rc::Rc;
9
10use crate::Hashable;
11use crate::trace::implementations::spine_fueled::Spine;
12use crate::trace::implementations::merge_batcher::MergeBatcher;
13use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
14use crate::trace::rc_blanket_impls::RcBuilder;
15
16use super::{Update, Layout, Vector, TStack};
17
18use self::val_batch::{RhhValBatch, RhhValBuilder};
19
20/// A trace implementation using a spine of ordered lists.
21pub type VecSpine<K, V, T, R> = Spine<
22    Rc<RhhValBatch<Vector<((K,V),T,R)>>>,
23    MergeBatcher<K,V,T,R>,
24    RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>>>,
25>;
26// /// A trace implementation for empty values using a spine of ordered lists.
27// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
28
29/// A trace implementation backed by columnar storage.
30pub type ColSpine<K, V, T, R> = Spine<
31    Rc<RhhValBatch<TStack<((K,V),T,R)>>>,
32    ColumnatedMergeBatcher<K,V,T,R>,
33    RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>>>,
34>;
35// /// A trace implementation backed by columnar storage.
36// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
37
38/// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`.
39pub trait HashOrdered: Hashable { }
40
41impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
42
43/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`.
44#[derive(Copy, Clone, Eq, PartialEq, Debug, Abomonation, Default)]
45pub struct HashWrapper<T: std::hash::Hash + Hashable> {
46    /// The inner value, freely modifiable.
47    pub inner: T
48}
49
50use std::cmp::Ordering;
51use abomonation_derive::Abomonation;
52
53impl<T: PartialOrd + std::hash::Hash + Hashable> PartialOrd for HashWrapper<T>
54where <T as Hashable>::Output: PartialOrd {
55    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
56        let this_hash = self.inner.hashed();
57        let that_hash = other.inner.hashed();
58        (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
59    }
60}
61
62impl<T: Ord + PartialOrd + std::hash::Hash + Hashable> Ord for HashWrapper<T> 
63where <T as Hashable>::Output: PartialOrd { 
64    fn cmp(&self, other: &Self) -> Ordering {
65        self.partial_cmp(other).unwrap()
66    }
67}
68
69impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
70
71impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> { 
72    type Output = T::Output;
73    fn hashed(&self) -> Self::Output { self.inner.hashed() }
74}
75
76mod val_batch {
77
78    use std::borrow::Borrow;
79    use std::convert::TryInto;
80    use std::marker::PhantomData;
81    use abomonation_derive::Abomonation;
82    use timely::progress::{Antichain, frontier::AntichainRef};
83
84    use crate::hashable::Hashable;
85
86    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
87    use crate::trace::implementations::{BatchContainer, OffsetList};
88    use crate::trace::cursor::MyTrait;
89
90    use super::{Layout, Update, HashOrdered};
91
92    /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`.
93    ///
94    /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`, 
95    /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict 
96    /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule: 
97    /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot.
98    ///
99    /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all
100    /// keys for valid updates must have some associated values with updates. This is the same type of 
101    /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values.
102    ///
103    /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. 
104    /// We specifically want to use the highest bits of the result (we will) because the low bits have
105    /// likely been spent shuffling the data between workers (by key), and are likely low entropy.
106    #[derive(Abomonation)]
107    pub struct RhhValStorage<L: Layout> 
108    where 
109        <L::Target as Update>::Key: Default + HashOrdered,
110    {
111
112        /// The requested capacity for `keys`. We use this when determining where a key with a certain hash
113        /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method,
114        /// otherwise we would just use that.
115        pub key_capacity: usize,
116        pub divisor: usize,
117        /// The number of present keys, distinct from `keys.len()` which contains 
118        pub key_count: usize,
119
120        /// An ordered list of keys, corresponding to entries in `keys_offs`.
121        pub keys: L::KeyContainer,
122        /// Offsets used to provide indexes from keys to values.
123        ///
124        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
125        pub keys_offs: OffsetList,
126        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
127        pub vals: L::ValContainer,
128        /// Offsets used to provide indexes from values to updates.
129        ///
130        /// This list has a special representation that any empty range indicates the singleton
131        /// element just before the range, as if the start were decremented by one. The empty
132        /// range is otherwise an invalid representation, and we borrow it to compactly encode
133        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
134        ///
135        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
136        pub vals_offs: OffsetList,
137        /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
138        pub updates: L::UpdContainer,
139    }
140
141    impl<L: Layout> RhhValStorage<L> 
142    where 
143        <L::Target as Update>::Key: Default + HashOrdered,
144    {
145        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
146        fn values_for_key(&self, index: usize) -> (usize, usize) {
147            let lower = self.keys_offs.index(index);
148            let upper = self.keys_offs.index(index+1);
149            // Looking up values for an invalid key indicates something is wrong.
150            assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
151            (lower, upper)
152        }
153        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
154        fn updates_for_value(&self, index: usize) -> (usize, usize) {
155            let mut lower = self.vals_offs.index(index);
156            let upper = self.vals_offs.index(index+1);
157            // We use equal lower and upper to encode "singleton update; just before here".
158            // It should only apply when there is a prior element, so `lower` should be greater than zero.
159            if lower == upper {
160                assert!(lower > 0);
161                lower -= 1;
162            }
163            (lower, upper)
164        }
165
166        /// Inserts the key at its desired location, or nearby.
167        ///
168        /// Because there may be collisions, they key may be placed just after its desired location.
169        /// If necessary, this method will introduce default keys and copy the offsets to create space
170        /// after which to insert the key. These will be indicated by `None` entries in the `hash` vector.
171        ///
172        /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified,
173        /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may
174        /// not know the final offset at the moment of key insertion can prepare for receiving the offset.
175        fn insert_key(&mut self, key: &<L::Target as Update>::Key, offset: Option<usize>) {
176            let desired = self.desired_location(key);
177            // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, 
178            // push additional blank entries in.
179            while self.keys.len() < desired {
180                // We insert a default (dummy) key and repeat the offset to indicate this.
181                let current_offset = self.keys_offs.index(self.keys.len());
182                self.keys.push(Default::default());
183                self.keys_offs.push(current_offset);
184            }
185
186            // Now we insert the key. Even if it is no longer the desired location because of contention.
187            // If an offset has been supplied we insert it, and otherwise leave it for future determination.
188            self.keys.copy_push(key);
189            if let Some(offset) = offset {
190                self.keys_offs.push(offset);
191            }
192            self.key_count += 1;
193        }
194
195        /// Indicates both the desired location and the hash signature of the key.
196        fn desired_location<K: Hashable>(&self, key: &K) -> usize {
197            let hash: usize = key.hashed().into().try_into().unwrap();
198            hash / self.divisor
199        }
200
201        /// Returns true if one should advance one's index in the search for `key`.
202        fn advance_key(&self, index: usize, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>) -> bool {
203            // Ideally this short-circuits, as `self.keys[index]` is bogus data.
204            !self.live_key(index) || self.keys.index(index).lt(&key)
205        }
206
207        /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range.
208        fn live_key(&self, index: usize) -> bool {
209            self.keys_offs.index(index) != self.keys_offs.index(index+1)
210        }
211
212        /// Advances `index` until it references a live key, or is `keys.len()`.
213        fn advance_to_live_key(&self, index: &mut usize) {
214            while *index < self.keys.len() && !self.live_key(*index) {
215                *index += 1;
216            }
217        }
218
219        // I hope this works out; meant to be 2^64 / self.key_capacity, so that dividing
220        // `signature` by this gives something in `[0, self.key_capacity)`. We could also
221        // do powers of two and just make this really easy.
222        fn divisor_for_capacity(capacity: usize) -> usize {
223            if capacity == 0 { 0 } 
224            else {
225                ((1 << 63) / capacity) << 1
226            }
227        }
228    }
229
230    /// An immutable collection of update tuples, from a contiguous interval of logical times.
231    ///
232    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
233    /// merge batcher to select.
234    #[derive(Abomonation)]
235    pub struct RhhValBatch<L: Layout> 
236    where 
237        <L::Target as Update>::Key: Default + HashOrdered,
238    {
239        /// The updates themselves.
240        pub storage: RhhValStorage<L>,
241        /// Description of the update times this layer represents.
242        pub description: Description<<L::Target as Update>::Time>,
243        /// The number of updates reflected in the batch.
244        ///
245        /// We track this separately from `storage` because due to the singleton optimization,
246        /// we may have many more updates than `storage.updates.len()`. It should equal that 
247        /// length, plus the number of singleton optimizations employed.
248        pub updates: usize,
249    }
250
251    impl<L: Layout> BatchReader for RhhValBatch<L> 
252    where 
253        <L::Target as Update>::Key: Default + HashOrdered,
254        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
255    {
256        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
257        type KeyOwned = <L::Target as Update>::Key;
258        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
259        type ValOwned = <L::Target as Update>::Val;
260        type Time = <L::Target as Update>::Time;
261        type Diff = <L::Target as Update>::Diff;
262
263        type Cursor = RhhValCursor<L>;
264        fn cursor(&self) -> Self::Cursor { 
265            let mut cursor = RhhValCursor {
266                key_cursor: 0,
267                val_cursor: 0,
268                phantom: std::marker::PhantomData,
269            };
270            cursor.step_key(self);
271            cursor
272        }
273        fn len(&self) -> usize { 
274            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
275            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
276            self.updates
277        }
278        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
279    }
280
281    impl<L: Layout> Batch for RhhValBatch<L> 
282    where 
283        <L::Target as Update>::Key: Default + HashOrdered,
284        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
285    {
286        type Merger = RhhValMerger<L>;
287
288        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
289            RhhValMerger::new(self, other, compaction_frontier)
290        }
291    }
292
293    /// State for an in-progress merge.
294    pub struct RhhValMerger<L: Layout> 
295    where 
296        <L::Target as Update>::Key: Default + HashOrdered,
297    {
298        /// Key position to merge next in the first batch.
299        key_cursor1: usize,
300        /// Key position to merge next in the second batch.
301        key_cursor2: usize,
302        /// result that we are currently assembling.
303        result: RhhValStorage<L>,
304        /// description
305        description: Description<<L::Target as Update>::Time>,
306
307        /// Owned key for copying into.
308        key_owned: <<L::Target as Update>::Key as ToOwned>::Owned,
309        /// Local stash of updates, to use for consolidation.
310        ///
311        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
312        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
313        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
314        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
315        singletons: usize,
316    }
317
318    impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
319    where
320        <L::Target as Update>::Key: Default + HashOrdered,
321        RhhValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
322    {
323        fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
324
325            assert!(batch1.upper() == batch2.lower());
326            use crate::lattice::Lattice;
327            let mut since = batch1.description().since().join(batch2.description().since());
328            since = since.join(&compaction_frontier.to_owned());
329
330            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
331
332            // This is a massive overestimate on the number of keys, but we don't have better information.
333            // An over-estimate can be a massive problem as well, with sparse regions being hard to cross.
334            let max_cap = batch1.len() + batch2.len();
335            let rhh_cap = 2 * max_cap;
336
337            let batch1 = &batch1.storage;
338            let batch2 = &batch2.storage;
339
340            let mut storage = RhhValStorage {
341                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
342                keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
343                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
344                vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
345                updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
346                key_count: 0,
347                key_capacity: rhh_cap,
348                divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
349            };
350
351            storage.keys_offs.push(0);
352            storage.vals_offs.push(0);
353
354            RhhValMerger {
355                key_cursor1: 0,
356                key_cursor2: 0,
357                result: storage,
358                description,
359                key_owned: Default::default(),
360                update_stash: Vec::new(),
361                singletons: 0,
362            }
363        }
364        fn done(self) -> RhhValBatch<L> {
365            RhhValBatch {
366                updates: self.result.updates.len() + self.singletons,
367                storage: self.result,
368                description: self.description,
369            }
370        }
371        fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
372
373            // An (incomplete) indication of the amount of work we've done so far.
374            let starting_updates = self.result.updates.len();
375            let mut effort = 0isize;
376
377            source1.storage.advance_to_live_key(&mut self.key_cursor1);
378            source2.storage.advance_to_live_key(&mut self.key_cursor2);
379
380            // While both mergees are still active, perform single-key merges.
381            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
382                self.merge_key(&source1.storage, &source2.storage);
383                source1.storage.advance_to_live_key(&mut self.key_cursor1);
384                source2.storage.advance_to_live_key(&mut self.key_cursor2);
385                    // An (incomplete) accounting of the work we've done.
386                effort = (self.result.updates.len() - starting_updates) as isize;
387            }
388
389            // Merging is complete, and only copying remains. 
390            // Key-by-key copying allows effort interruption, and compaction.
391            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
392                self.copy_key(&source1.storage, self.key_cursor1);
393                self.key_cursor1 += 1;
394                source1.storage.advance_to_live_key(&mut self.key_cursor1);
395                effort = (self.result.updates.len() - starting_updates) as isize;
396            }
397            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
398                self.copy_key(&source2.storage, self.key_cursor2);
399                self.key_cursor2 += 1;
400                source2.storage.advance_to_live_key(&mut self.key_cursor2);
401                effort = (self.result.updates.len() - starting_updates) as isize;
402            }
403
404            *fuel -= effort;
405        }
406    }
407
408    // Helper methods in support of merging batches.
409    impl<L: Layout> RhhValMerger<L> 
410    where 
411        <L::Target as Update>::Key: Default + HashOrdered,
412    {
413        /// Copy the next key in `source`.
414        ///
415        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
416        /// If the result does not wholly cancel, they key will be present in `self` with the
417        /// compacted values and updates. 
418        /// 
419        /// The caller should be certain to update the cursor, as this method does not do this.
420        fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
421            // Capture the initial number of values to determine if the merge was ultimately non-empty.
422            let init_vals = self.result.vals.len();
423            let (mut lower, upper) = source.values_for_key(cursor);
424            while lower < upper {
425                self.stash_updates_for_val(source, lower);
426                if let Some(off) = self.consolidate_updates() {
427                    self.result.vals_offs.push(off);
428                    self.result.vals.copy(source.vals.index(lower));
429                }
430                lower += 1;
431            }            
432
433            // If we have pushed any values, copy the key as well.
434            if self.result.vals.len() > init_vals {
435                source.keys.index(cursor).clone_onto(&mut self.key_owned);
436                self.result.insert_key(&self.key_owned, Some(self.result.vals.len()));
437            }           
438        }
439        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
440        ///
441        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
442        /// if the updates cancel either directly or after compaction.
443        fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
444
445            use ::std::cmp::Ordering;
446            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
447                Ordering::Less => { 
448                    self.copy_key(source1, self.key_cursor1);
449                    self.key_cursor1 += 1;
450                },
451                Ordering::Equal => {
452                    // Keys are equal; must merge all values from both sources for this one key.
453                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
454                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
455                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
456                        source1.keys.index(self.key_cursor1).clone_onto(&mut self.key_owned);
457                        self.result.insert_key(&self.key_owned, Some(off));
458                    }
459                    // Increment cursors in either case; the keys are merged.
460                    self.key_cursor1 += 1;
461                    self.key_cursor2 += 1;
462                },
463                Ordering::Greater => {
464                    self.copy_key(source2, self.key_cursor2);
465                    self.key_cursor2 += 1;
466                },
467            }
468        }
469        /// Merge two ranges of values into `self`.
470        ///
471        /// If the compacted result contains values with non-empty updates, the function returns
472        /// an offset that should be recorded to indicate the upper extent of the result values.
473        fn merge_vals(
474            &mut self, 
475            (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize), 
476            (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
477        ) -> Option<usize> {
478            // Capture the initial number of values to determine if the merge was ultimately non-empty.
479            let init_vals = self.result.vals.len();
480            while lower1 < upper1 && lower2 < upper2 {
481                // We compare values, and fold in updates for the lowest values;
482                // if they are non-empty post-consolidation, we write the value.
483                // We could multi-way merge and it wouldn't be very complicated.
484                use ::std::cmp::Ordering;
485                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
486                    Ordering::Less => { 
487                        // Extend stash by updates, with logical compaction applied.
488                        self.stash_updates_for_val(source1, lower1);
489                        if let Some(off) = self.consolidate_updates() {
490                            self.result.vals_offs.push(off);
491                            self.result.vals.copy(source1.vals.index(lower1));
492                        }
493                        lower1 += 1;
494                    },
495                    Ordering::Equal => {
496                        self.stash_updates_for_val(source1, lower1);
497                        self.stash_updates_for_val(source2, lower2);
498                        if let Some(off) = self.consolidate_updates() {
499                            self.result.vals_offs.push(off);
500                            self.result.vals.copy(source1.vals.index(lower1));
501                        }
502                        lower1 += 1;
503                        lower2 += 1;
504                    },
505                    Ordering::Greater => { 
506                        // Extend stash by updates, with logical compaction applied.
507                        self.stash_updates_for_val(source2, lower2);
508                        if let Some(off) = self.consolidate_updates() {
509                            self.result.vals_offs.push(off);
510                            self.result.vals.copy(source2.vals.index(lower2));
511                        }
512                        lower2 += 1;
513                    },
514                }
515            }
516            // Merging is complete, but we may have remaining elements to push.
517            while lower1 < upper1 {
518                self.stash_updates_for_val(source1, lower1);
519                if let Some(off) = self.consolidate_updates() {
520                    self.result.vals_offs.push(off);
521                    self.result.vals.copy(source1.vals.index(lower1));
522                }
523                lower1 += 1;
524            }
525            while lower2 < upper2 {
526                self.stash_updates_for_val(source2, lower2);
527                if let Some(off) = self.consolidate_updates() {
528                    self.result.vals_offs.push(off);
529                    self.result.vals.copy(source2.vals.index(lower2));
530                }
531                lower2 += 1;
532            }
533
534            // Values being pushed indicate non-emptiness.
535            if self.result.vals.len() > init_vals {
536                Some(self.result.vals.len())
537            } else {
538                None
539            }
540        }
541
542        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
543        fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
544            let (lower, upper) = source.updates_for_value(index);
545            for i in lower .. upper {
546                // NB: Here is where we would need to look back if `lower == upper`.
547                let (time, diff) = &source.updates.index(i);
548                let mut new_time = time.clone();
549                use crate::lattice::Lattice;
550                new_time.advance_by(self.description.since().borrow());
551                self.update_stash.push((new_time, diff.clone()));
552            }
553        }
554
555        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
556        fn consolidate_updates(&mut self) -> Option<usize> {
557            use crate::consolidation;
558            consolidation::consolidate(&mut self.update_stash);
559            if !self.update_stash.is_empty() {
560                // If there is a single element, equal to a just-prior recorded update,
561                // we push nothing and report an unincremented offset to encode this case.
562                if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.equals(self.update_stash.last().unwrap())).unwrap_or(false) {
563                    // Just clear out update_stash, as we won't drain it here.
564                    self.update_stash.clear();
565                    self.singletons += 1;
566                }
567                else {
568                    // Conventional; move `update_stash` into `updates`.
569                    for item in self.update_stash.drain(..) {
570                        self.result.updates.push(item);
571                    }
572                }
573                Some(self.result.updates.len())
574            } else {
575                None
576            }
577        }
578    }
579
580
581    /// A cursor through a Robin Hood Hashed list of keys, vals, and such.
582    ///
583    /// The important detail is that not all of `keys` represent valid keys.
584    /// We must consult `storage.hashed` to see if the associated data is valid.
585    /// Importantly, we should skip over invalid keys, rather than report them as
586    /// invalid through `key_valid`: that method is meant to indicate the end of
587    /// the cursor, rather than internal state.
588    pub struct RhhValCursor<L: Layout> 
589    where 
590        <L::Target as Update>::Key: Default + HashOrdered,
591    {
592        /// Absolute position of the current key.
593        key_cursor: usize,
594        /// Absolute position of the current value.
595        val_cursor: usize,
596        /// Phantom marker for Rust happiness.
597        phantom: PhantomData<L>,
598    }
599
600    impl<L: Layout> Cursor for RhhValCursor<L> 
601    where 
602        <L::Target as Update>::Key: Default + HashOrdered,
603        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
604    {
605        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
606        type KeyOwned = <L::Target as Update>::Key;
607        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
608        type ValOwned = <L::Target as Update>::Val;
609        type Time = <L::Target as Update>::Time;
610        type Diff = <L::Target as Update>::Diff;
611
612        type Storage = RhhValBatch<L>;
613
614        fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { 
615            storage.storage.keys.index(self.key_cursor) 
616        }
617        fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
618        fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
619            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
620            for index in lower .. upper {
621                let (time, diff) = &storage.storage.updates.index(index);
622                logic(time, diff);
623            }
624        }
625        fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
626        fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
627        fn step_key(&mut self, storage: &RhhValBatch<L>){
628            // We advance the cursor by one for certain, and then as long as we need to find a valid key.
629            self.key_cursor += 1;
630            storage.storage.advance_to_live_key(&mut self.key_cursor);
631
632            if self.key_valid(storage) {
633                self.rewind_vals(storage);
634            }
635            else {
636                self.key_cursor = storage.storage.keys.len();
637            }
638        }
639        fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
640            // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
641            let desired = storage.storage.desired_location(&key);
642            // Advance the cursor, if `desired` is ahead of it.
643            if self.key_cursor < desired {
644                self.key_cursor = desired;
645            }
646            // Advance the cursor as long as we have not found a value greater or equal to `key`.
647            // We may have already passed `key`, and confirmed its absence, but our goal is to
648            // find the next key afterwards so that users can, for example, alternately iterate.
649            while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
650                // TODO: Based on our encoding, we could skip logarithmically over empy regions by galloping
651                //       through `storage.keys_offs`, which stays put for dead space.
652                self.key_cursor += 1;
653            }
654
655            if self.key_valid(storage) {
656                self.rewind_vals(storage);
657            }
658        }
659        fn step_val(&mut self, storage: &RhhValBatch<L>) {
660            self.val_cursor += 1; 
661            if !self.val_valid(storage) {
662                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
663            }
664        }
665        fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
666            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val));
667        }
668        fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
669            self.key_cursor = 0;
670            storage.storage.advance_to_live_key(&mut self.key_cursor);
671
672            if self.key_valid(storage) {
673                self.rewind_vals(storage)
674            }
675        }
676        fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
677            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
678        }
679    }
680
681    /// A builder for creating layers from unsorted update tuples.
682    pub struct RhhValBuilder<L: Layout> 
683    where 
684        <L::Target as Update>::Key: Default + HashOrdered,
685    {
686        result: RhhValStorage<L>,
687        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
688        /// Counts the number of singleton optimizations we performed.
689        ///
690        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
691        /// even though `updates.len()` may be much shorter than this amount.
692        singletons: usize,
693    }
694
695    impl<L: Layout> RhhValBuilder<L> 
696    where 
697        <L::Target as Update>::Key: Default + HashOrdered,
698    {
699        /// Pushes a single update, which may set `self.singleton` rather than push.
700        ///
701        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
702        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
703        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
704        /// This otherwise invalid state encodes "look back one element".
705        ///
706        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
707        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
708        /// The update tuple is retained in `self.singleton` in case we see another update and need
709        /// to recover the singleton to push it into `updates` to join the second update.
710        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
711            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
712            if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
713                assert!(self.singleton.is_none());
714                self.singleton = Some((time, diff));
715            }
716            else {
717                // If we have pushed a single element, we need to copy it out to meet this one.
718                if let Some(time_diff) = self.singleton.take() {
719                    self.result.updates.push(time_diff);
720                }
721                self.result.updates.push((time, diff));
722            }
723        }
724    }
725
726    impl<L: Layout> Builder for RhhValBuilder<L>
727    where
728        <L::Target as Update>::Key: Default + HashOrdered,
729        // RhhValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
730    {
731        type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
732        type Time = <L::Target as Update>::Time;
733        type Output = RhhValBatch<L>;
734
735        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
736
737            // Double the capacity for RHH; probably excessive.
738            let rhh_capacity = 2 * keys;
739            let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);                        
740            // We want some additive slop, in case we spill over.
741            // This number magically chosen based on nothing in particular.
742            // Worst case, we will re-alloc and copy if we spill beyond this.
743            let keys = rhh_capacity + 10;
744
745            // We don't introduce zero offsets as they will be introduced by the first `push` call.
746            Self { 
747                result: RhhValStorage {
748                    keys: L::KeyContainer::with_capacity(keys),
749                    keys_offs: OffsetList::with_capacity(keys + 1),
750                    vals: L::ValContainer::with_capacity(vals),
751                    vals_offs: OffsetList::with_capacity(vals + 1),
752                    updates: L::UpdContainer::with_capacity(upds),
753                    key_count: 0,
754                    key_capacity: rhh_capacity,
755                    divisor,
756                },
757                singleton: None,
758                singletons: 0,
759            }
760        }
761
762        #[inline]
763        fn push(&mut self, ((key, val), time, diff): Self::Item) {
764
765            // Perhaps this is a continuation of an already received key.
766            if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
767                // Perhaps this is a continuation of an already received value.
768                if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) {
769                    self.push_update(time, diff);
770                } else {
771                    // New value; complete representation of prior value.
772                    self.result.vals_offs.push(self.result.updates.len());
773                    if self.singleton.take().is_some() { self.singletons += 1; }
774                    self.push_update(time, diff);
775                    self.result.vals.push(val);
776                }
777            } else {
778                // New key; complete representation of prior key.
779                self.result.vals_offs.push(self.result.updates.len());
780                if self.singleton.take().is_some() { self.singletons += 1; }
781                self.result.keys_offs.push(self.result.vals.len());
782                self.push_update(time, diff);
783                self.result.vals.push(val);
784                // Insert the key, but with no specified offset.
785                self.result.insert_key(key.borrow(), None);
786            }
787        }
788
789        #[inline]
790        fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
791
792            // Perhaps this is a continuation of an already received key.
793            if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
794                // Perhaps this is a continuation of an already received value.
795                if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) {
796                    // TODO: here we could look for repetition, and not push the update in that case.
797                    // More logic (and state) would be required to correctly wrangle this.
798                    self.push_update(time.clone(), diff.clone());
799                } else {
800                    // New value; complete representation of prior value.
801                    self.result.vals_offs.push(self.result.updates.len());
802                    // Remove any pending singleton, and if it was set increment our count.
803                    if self.singleton.take().is_some() { self.singletons += 1; }
804                    self.push_update(time.clone(), diff.clone());
805                    self.result.vals.copy_push(val);
806                }
807            } else {
808                // New key; complete representation of prior key.
809                self.result.vals_offs.push(self.result.updates.len());
810                // Remove any pending singleton, and if it was set increment our count.
811                if self.singleton.take().is_some() { self.singletons += 1; }
812                self.result.keys_offs.push(self.result.vals.len());
813                self.push_update(time.clone(), diff.clone());
814                self.result.vals.copy_push(val);
815                // Insert the key, but with no specified offset.
816                self.result.insert_key(key, None);
817            }
818        }
819
820        #[inline(never)]
821        fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> RhhValBatch<L> {
822            // Record the final offsets
823            self.result.vals_offs.push(self.result.updates.len());
824            // Remove any pending singleton, and if it was set increment our count.
825            if self.singleton.take().is_some() { self.singletons += 1; }
826            self.result.keys_offs.push(self.result.vals.len());
827            RhhValBatch {
828                updates: self.result.updates.len() + self.singletons,
829                storage: self.result,
830                description: Description::new(lower, upper, since),
831            }
832        }
833    }
834
835}
836
837mod key_batch {
838
839    // Copy the above, once it works!
840
841}