differential_dataflow/trace/implementations/
rhh.rs

1//! Batch implementation based on Robin Hood Hashing.
2//! 
3//! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means
4//! that these implementations should only be used with each other, under 
5//! the same `hash` function, or for types that also order by `(hash(X), X)`,
6//! for example wrapped types that implement `Ord` that way.
7
8use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::implementations::spine_fueled::Spine;
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Update, Layout, Vector, TStack};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher for ordered lists.
27pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder for ordered lists.
29pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41// /// A trace implementation backed by columnar storage.
42// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
43
44/// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`.
45pub trait HashOrdered: Hashable { }
46
47impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
48
49/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`.
50#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
51pub struct HashWrapper<T: std::hash::Hash + Hashable> {
52    /// The inner value, freely modifiable.
53    pub inner: T
54}
55
56impl<T: PartialOrd + std::hash::Hash + Hashable> PartialOrd for HashWrapper<T>
57where <T as Hashable>::Output: PartialOrd {
58    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59        let this_hash = self.inner.hashed();
60        let that_hash = other.inner.hashed();
61        (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
62    }
63}
64
65impl<T: Ord + PartialOrd + std::hash::Hash + Hashable> Ord for HashWrapper<T> 
66where <T as Hashable>::Output: PartialOrd { 
67    fn cmp(&self, other: &Self) -> Ordering {
68        self.partial_cmp(other).unwrap()
69    }
70}
71
72impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
73
74impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
75    type Output = T::Output;
76    fn hashed(&self) -> Self::Output { self.inner.hashed() }
77}
78
79impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
80
81impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
82    type Output = T::Output;
83    fn hashed(&self) -> Self::Output { self.inner.hashed() }
84}
85
86mod val_batch {
87
88    use std::convert::TryInto;
89    use std::marker::PhantomData;
90    use serde::{Deserialize, Serialize};
91    use timely::container::PushInto;
92    use timely::progress::{Antichain, frontier::AntichainRef};
93
94    use crate::hashable::Hashable;
95    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
96    use crate::trace::implementations::{BatchContainer, BuilderInput};
97    use crate::IntoOwned;
98
99    use super::{Layout, Update, HashOrdered};
100
101    /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`.
102    ///
103    /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`, 
104    /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict 
105    /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule: 
106    /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot.
107    ///
108    /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all
109    /// keys for valid updates must have some associated values with updates. This is the same type of 
110    /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values.
111    ///
112    /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. 
113    /// We specifically want to use the highest bits of the result (we will) because the low bits have
114    /// likely been spent shuffling the data between workers (by key), and are likely low entropy.
115    #[derive(Debug, Serialize, Deserialize)]
116    pub struct RhhValStorage<L: Layout> 
117    where 
118        <L::Target as Update>::Key: Default + HashOrdered,
119    {
120
121        /// The requested capacity for `keys`. We use this when determining where a key with a certain hash
122        /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method,
123        /// otherwise we would just use that.
124        pub key_capacity: usize,
125        /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`.
126        /// When that capacity is zero or one, this is set to zero instead.
127        pub divisor: u64,
128        /// The number of present keys, distinct from `keys.len()` which contains 
129        pub key_count: usize,
130
131        /// An ordered list of keys, corresponding to entries in `keys_offs`.
132        pub keys: L::KeyContainer,
133        /// Offsets used to provide indexes from keys to values.
134        ///
135        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
136        pub keys_offs: L::OffsetContainer,
137        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
138        pub vals: L::ValContainer,
139        /// Offsets used to provide indexes from values to updates.
140        ///
141        /// This list has a special representation that any empty range indicates the singleton
142        /// element just before the range, as if the start were decremented by one. The empty
143        /// range is otherwise an invalid representation, and we borrow it to compactly encode
144        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
145        ///
146        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
147        pub vals_offs: L::OffsetContainer,
148        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
149        pub times: L::TimeContainer,
150        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
151        pub diffs: L::DiffContainer,
152    }
153
154    impl<L: Layout> RhhValStorage<L>
155    where 
156        <L::Target as Update>::Key: Default + HashOrdered,
157        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
158    {
159        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
160        fn values_for_key(&self, index: usize) -> (usize, usize) {
161            let lower = self.keys_offs.index(index);
162            let upper = self.keys_offs.index(index+1);
163            // Looking up values for an invalid key indicates something is wrong.
164            assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
165            (lower, upper)
166        }
167        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
168        fn updates_for_value(&self, index: usize) -> (usize, usize) {
169            let mut lower = self.vals_offs.index(index);
170            let upper = self.vals_offs.index(index+1);
171            // We use equal lower and upper to encode "singleton update; just before here".
172            // It should only apply when there is a prior element, so `lower` should be greater than zero.
173            if lower == upper {
174                assert!(lower > 0);
175                lower -= 1;
176            }
177            (lower, upper)
178        }
179
180        /// Inserts the key at its desired location, or nearby.
181        ///
182        /// Because there may be collisions, they key may be placed just after its desired location.
183        /// If necessary, this method will introduce default keys and copy the offsets to create space
184        /// after which to insert the key. These will be indicated by `None` entries in the `hash` vector.
185        ///
186        /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified,
187        /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may
188        /// not know the final offset at the moment of key insertion can prepare for receiving the offset.
189        fn insert_key(&mut self, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>, offset: Option<usize>) {
190            let desired = self.desired_location(&key);
191            // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, 
192            // push additional blank entries in.
193            while self.keys.len() < desired {
194                // We insert a default (dummy) key and repeat the offset to indicate this.
195                let current_offset = self.keys_offs.index(self.keys.len());
196                self.keys.push(<<L::Target as Update>::Key as Default>::default());
197                self.keys_offs.push(current_offset);
198            }
199
200            // Now we insert the key. Even if it is no longer the desired location because of contention.
201            // If an offset has been supplied we insert it, and otherwise leave it for future determination.
202            self.keys.push(key);
203            if let Some(offset) = offset {
204                self.keys_offs.push(offset);
205            }
206            self.key_count += 1;
207        }
208
209        /// Indicates both the desired location and the hash signature of the key.
210        fn desired_location<K: Hashable>(&self, key: &K) -> usize {
211            if self.divisor == 0 { 0 }
212            else {
213                (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
214            }
215        }
216
217        /// Returns true if one should advance one's index in the search for `key`.
218        fn advance_key(&self, index: usize, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>) -> bool {
219            // Ideally this short-circuits, as `self.keys[index]` is bogus data.
220            !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
221        }
222
223        /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range.
224        fn live_key(&self, index: usize) -> bool {
225            self.keys_offs.index(index) != self.keys_offs.index(index+1)
226        }
227
228        /// Advances `index` until it references a live key, or is `keys.len()`.
229        fn advance_to_live_key(&self, index: &mut usize) {
230            while *index < self.keys.len() && !self.live_key(*index) {
231                *index += 1;
232            }
233        }
234
235        /// A value large enough that any `u64` divided by it is less than `capacity`.
236        ///
237        /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one.
238        /// In those cases, we'll return `0` to communicate the exception, for which we should
239        /// just return `0` when announcing a target location (and a zero capacity that we insert
240        /// into becomes a bug).
241        fn divisor_for_capacity(capacity: usize) -> u64 {
242            let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
243            if capacity == 0 || capacity == 1 { 0 }
244            else {
245                ((1 << 63) / capacity) << 1
246            }
247        }
248    }
249
250    /// An immutable collection of update tuples, from a contiguous interval of logical times.
251    ///
252    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
253    /// merge batcher to select.
254    #[derive(Serialize, Deserialize)]
255    #[serde(bound = "
256        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
257        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
258        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
259        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
260        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
261    ")]
262    pub struct RhhValBatch<L: Layout>
263    where 
264        <L::Target as Update>::Key: Default + HashOrdered,
265    {
266        /// The updates themselves.
267        pub storage: RhhValStorage<L>,
268        /// Description of the update times this layer represents.
269        pub description: Description<<L::Target as Update>::Time>,
270        /// The number of updates reflected in the batch.
271        ///
272        /// We track this separately from `storage` because due to the singleton optimization,
273        /// we may have many more updates than `storage.updates.len()`. It should equal that 
274        /// length, plus the number of singleton optimizations employed.
275        pub updates: usize,
276    }
277
278    impl<L: Layout> BatchReader for RhhValBatch<L> 
279    where 
280        <L::Target as Update>::Key: Default + HashOrdered,
281        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
282    {
283        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
284        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
285        type Time = <L::Target as Update>::Time;
286        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
287        type Diff = <L::Target as Update>::Diff;
288        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
289
290        type Cursor = RhhValCursor<L>;
291        fn cursor(&self) -> Self::Cursor { 
292            let mut cursor = RhhValCursor {
293                key_cursor: 0,
294                val_cursor: 0,
295                phantom: std::marker::PhantomData,
296            };
297            cursor.step_key(self);
298            cursor
299        }
300        fn len(&self) -> usize { 
301            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
302            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
303            self.updates
304        }
305        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
306    }
307
308    impl<L: Layout> Batch for RhhValBatch<L> 
309    where 
310        <L::Target as Update>::Key: Default + HashOrdered,
311        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
312    {
313        type Merger = RhhValMerger<L>;
314
315        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
316            RhhValMerger::new(self, other, compaction_frontier)
317        }
318
319        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
320            use timely::progress::Timestamp;
321            Self {
322                storage: RhhValStorage {
323                    keys: L::KeyContainer::with_capacity(0),
324                    keys_offs: L::OffsetContainer::with_capacity(0),
325                    vals: L::ValContainer::with_capacity(0),
326                    vals_offs: L::OffsetContainer::with_capacity(0),
327                    times: L::TimeContainer::with_capacity(0),
328                    diffs: L::DiffContainer::with_capacity(0),
329                    key_count: 0,
330                    key_capacity: 0,
331                    divisor: 0,
332                },
333                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
334                updates: 0,
335            }
336        }
337    }
338
339    /// State for an in-progress merge.
340    pub struct RhhValMerger<L: Layout> 
341    where 
342        <L::Target as Update>::Key: Default + HashOrdered,
343    {
344        /// Key position to merge next in the first batch.
345        key_cursor1: usize,
346        /// Key position to merge next in the second batch.
347        key_cursor2: usize,
348        /// result that we are currently assembling.
349        result: RhhValStorage<L>,
350        /// description
351        description: Description<<L::Target as Update>::Time>,
352
353        /// Local stash of updates, to use for consolidation.
354        ///
355        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
356        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
357        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
358        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
359        singletons: usize,
360    }
361
362    impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
363    where
364        <L::Target as Update>::Key: Default + HashOrdered,
365        RhhValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
366        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
367    {
368        fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
369
370            assert!(batch1.upper() == batch2.lower());
371            use crate::lattice::Lattice;
372            let mut since = batch1.description().since().join(batch2.description().since());
373            since = since.join(&compaction_frontier.to_owned());
374
375            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
376
377            // This is a massive overestimate on the number of keys, but we don't have better information.
378            // An over-estimate can be a massive problem as well, with sparse regions being hard to cross.
379            let max_cap = batch1.len() + batch2.len();
380            let rhh_cap = 2 * max_cap;
381
382            let batch1 = &batch1.storage;
383            let batch2 = &batch2.storage;
384
385            let mut storage = RhhValStorage {
386                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
387                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
388                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
389                vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
390                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
391                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
392                key_count: 0,
393                key_capacity: rhh_cap,
394                divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
395            };
396
397            // Mark explicit types because type inference fails to resolve it.
398            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
399            keys_offs.push(0);
400            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
401            vals_offs.push(0);
402
403            RhhValMerger {
404                key_cursor1: 0,
405                key_cursor2: 0,
406                result: storage,
407                description,
408                update_stash: Vec::new(),
409                singletons: 0,
410            }
411        }
412        fn done(self) -> RhhValBatch<L> {
413            RhhValBatch {
414                updates: self.result.times.len() + self.singletons,
415                storage: self.result,
416                description: self.description,
417            }
418        }
419        fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
420
421            // An (incomplete) indication of the amount of work we've done so far.
422            let starting_updates = self.result.times.len();
423            let mut effort = 0isize;
424
425            source1.storage.advance_to_live_key(&mut self.key_cursor1);
426            source2.storage.advance_to_live_key(&mut self.key_cursor2);
427
428            // While both mergees are still active, perform single-key merges.
429            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
430                self.merge_key(&source1.storage, &source2.storage);
431                source1.storage.advance_to_live_key(&mut self.key_cursor1);
432                source2.storage.advance_to_live_key(&mut self.key_cursor2);
433                    // An (incomplete) accounting of the work we've done.
434                effort = (self.result.times.len() - starting_updates) as isize;
435            }
436
437            // Merging is complete, and only copying remains. 
438            // Key-by-key copying allows effort interruption, and compaction.
439            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
440                self.copy_key(&source1.storage, self.key_cursor1);
441                self.key_cursor1 += 1;
442                source1.storage.advance_to_live_key(&mut self.key_cursor1);
443                effort = (self.result.times.len() - starting_updates) as isize;
444            }
445            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
446                self.copy_key(&source2.storage, self.key_cursor2);
447                self.key_cursor2 += 1;
448                source2.storage.advance_to_live_key(&mut self.key_cursor2);
449                effort = (self.result.times.len() - starting_updates) as isize;
450            }
451
452            *fuel -= effort;
453        }
454    }
455
456    // Helper methods in support of merging batches.
457    impl<L: Layout> RhhValMerger<L> 
458    where 
459        <L::Target as Update>::Key: Default + HashOrdered,
460        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
461    {
462        /// Copy the next key in `source`.
463        ///
464        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
465        /// If the result does not wholly cancel, they key will be present in `self` with the
466        /// compacted values and updates. 
467        /// 
468        /// The caller should be certain to update the cursor, as this method does not do this.
469        fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
470            // Capture the initial number of values to determine if the merge was ultimately non-empty.
471            let init_vals = self.result.vals.len();
472            let (mut lower, upper) = source.values_for_key(cursor);
473            while lower < upper {
474                self.stash_updates_for_val(source, lower);
475                if let Some(off) = self.consolidate_updates() {
476                    self.result.vals_offs.push(off);
477                    self.result.vals.push(source.vals.index(lower));
478                }
479                lower += 1;
480            }            
481
482            // If we have pushed any values, copy the key as well.
483            if self.result.vals.len() > init_vals {
484                self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
485            }           
486        }
487        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
488        ///
489        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
490        /// if the updates cancel either directly or after compaction.
491        fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
492
493            use ::std::cmp::Ordering;
494            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
495                Ordering::Less => { 
496                    self.copy_key(source1, self.key_cursor1);
497                    self.key_cursor1 += 1;
498                },
499                Ordering::Equal => {
500                    // Keys are equal; must merge all values from both sources for this one key.
501                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
502                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
503                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
504                        self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
505                    }
506                    // Increment cursors in either case; the keys are merged.
507                    self.key_cursor1 += 1;
508                    self.key_cursor2 += 1;
509                },
510                Ordering::Greater => {
511                    self.copy_key(source2, self.key_cursor2);
512                    self.key_cursor2 += 1;
513                },
514            }
515        }
516        /// Merge two ranges of values into `self`.
517        ///
518        /// If the compacted result contains values with non-empty updates, the function returns
519        /// an offset that should be recorded to indicate the upper extent of the result values.
520        fn merge_vals(
521            &mut self, 
522            (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize), 
523            (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
524        ) -> Option<usize> {
525            // Capture the initial number of values to determine if the merge was ultimately non-empty.
526            let init_vals = self.result.vals.len();
527            while lower1 < upper1 && lower2 < upper2 {
528                // We compare values, and fold in updates for the lowest values;
529                // if they are non-empty post-consolidation, we write the value.
530                // We could multi-way merge and it wouldn't be very complicated.
531                use ::std::cmp::Ordering;
532                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
533                    Ordering::Less => { 
534                        // Extend stash by updates, with logical compaction applied.
535                        self.stash_updates_for_val(source1, lower1);
536                        if let Some(off) = self.consolidate_updates() {
537                            self.result.vals_offs.push(off);
538                            self.result.vals.push(source1.vals.index(lower1));
539                        }
540                        lower1 += 1;
541                    },
542                    Ordering::Equal => {
543                        self.stash_updates_for_val(source1, lower1);
544                        self.stash_updates_for_val(source2, lower2);
545                        if let Some(off) = self.consolidate_updates() {
546                            self.result.vals_offs.push(off);
547                            self.result.vals.push(source1.vals.index(lower1));
548                        }
549                        lower1 += 1;
550                        lower2 += 1;
551                    },
552                    Ordering::Greater => { 
553                        // Extend stash by updates, with logical compaction applied.
554                        self.stash_updates_for_val(source2, lower2);
555                        if let Some(off) = self.consolidate_updates() {
556                            self.result.vals_offs.push(off);
557                            self.result.vals.push(source2.vals.index(lower2));
558                        }
559                        lower2 += 1;
560                    },
561                }
562            }
563            // Merging is complete, but we may have remaining elements to push.
564            while lower1 < upper1 {
565                self.stash_updates_for_val(source1, lower1);
566                if let Some(off) = self.consolidate_updates() {
567                    self.result.vals_offs.push(off);
568                    self.result.vals.push(source1.vals.index(lower1));
569                }
570                lower1 += 1;
571            }
572            while lower2 < upper2 {
573                self.stash_updates_for_val(source2, lower2);
574                if let Some(off) = self.consolidate_updates() {
575                    self.result.vals_offs.push(off);
576                    self.result.vals.push(source2.vals.index(lower2));
577                }
578                lower2 += 1;
579            }
580
581            // Values being pushed indicate non-emptiness.
582            if self.result.vals.len() > init_vals {
583                Some(self.result.vals.len())
584            } else {
585                None
586            }
587        }
588
589        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
590        fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
591            let (lower, upper) = source.updates_for_value(index);
592            for i in lower .. upper {
593                // NB: Here is where we would need to look back if `lower == upper`.
594                let time = source.times.index(i);
595                let diff = source.diffs.index(i);
596                let mut new_time = time.into_owned();
597                use crate::lattice::Lattice;
598                new_time.advance_by(self.description.since().borrow());
599                self.update_stash.push((new_time, diff.into_owned()));
600            }
601        }
602
603        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
604        fn consolidate_updates(&mut self) -> Option<usize> {
605            use crate::consolidation;
606            consolidation::consolidate(&mut self.update_stash);
607            if !self.update_stash.is_empty() {
608                // If there is a single element, equal to a just-prior recorded update,
609                // we push nothing and report an unincremented offset to encode this case.
610                let time_diff = self.result.times.last().zip(self.result.diffs.last());
611                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
612                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
613                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
614                    t1.eq(&t2) && d1.eq(&d2)
615                });
616                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
617                    // Just clear out update_stash, as we won't drain it here.
618                    self.update_stash.clear();
619                    self.singletons += 1;
620                }
621                else {
622                    // Conventional; move `update_stash` into `updates`.
623                    for (time, diff) in self.update_stash.drain(..) {
624                        self.result.times.push(time);
625                        self.result.diffs.push(diff);
626                    }
627                }
628                Some(self.result.times.len())
629            } else {
630                None
631            }
632        }
633    }
634
635
636    /// A cursor through a Robin Hood Hashed list of keys, vals, and such.
637    ///
638    /// The important detail is that not all of `keys` represent valid keys.
639    /// We must consult `storage.hashed` to see if the associated data is valid.
640    /// Importantly, we should skip over invalid keys, rather than report them as
641    /// invalid through `key_valid`: that method is meant to indicate the end of
642    /// the cursor, rather than internal state.
643    pub struct RhhValCursor<L: Layout> 
644    where 
645        <L::Target as Update>::Key: Default + HashOrdered,
646    {
647        /// Absolute position of the current key.
648        key_cursor: usize,
649        /// Absolute position of the current value.
650        val_cursor: usize,
651        /// Phantom marker for Rust happiness.
652        phantom: PhantomData<L>,
653    }
654
655    impl<L: Layout> Cursor for RhhValCursor<L> 
656    where 
657        <L::Target as Update>::Key: Default + HashOrdered,
658        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
659    {
660        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
661        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
662        type Time = <L::Target as Update>::Time;
663        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
664        type Diff = <L::Target as Update>::Diff;
665        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
666
667        type Storage = RhhValBatch<L>;
668
669        fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { 
670            storage.storage.keys.index(self.key_cursor) 
671        }
672        fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
673        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
674            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
675            for index in lower .. upper {
676                let time = storage.storage.times.index(index);
677                let diff = storage.storage.diffs.index(index);
678                logic(time, diff);
679            }
680        }
681        fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
682        fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
683        fn step_key(&mut self, storage: &RhhValBatch<L>){
684            // We advance the cursor by one for certain, and then as long as we need to find a valid key.
685            self.key_cursor += 1;
686            storage.storage.advance_to_live_key(&mut self.key_cursor);
687
688            if self.key_valid(storage) {
689                self.rewind_vals(storage);
690            }
691            else {
692                self.key_cursor = storage.storage.keys.len();
693            }
694        }
695        fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
696            // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
697            let desired = storage.storage.desired_location(&key);
698            // Advance the cursor, if `desired` is ahead of it.
699            if self.key_cursor < desired {
700                self.key_cursor = desired;
701            }
702            // Advance the cursor as long as we have not found a value greater or equal to `key`.
703            // We may have already passed `key`, and confirmed its absence, but our goal is to
704            // find the next key afterwards so that users can, for example, alternately iterate.
705            while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
706                // TODO: Based on our encoding, we could skip logarithmically over empty regions by galloping
707                //       through `storage.keys_offs`, which stays put for dead space.
708                self.key_cursor += 1;
709            }
710
711            if self.key_valid(storage) {
712                self.rewind_vals(storage);
713            }
714        }
715        fn step_val(&mut self, storage: &RhhValBatch<L>) {
716            self.val_cursor += 1; 
717            if !self.val_valid(storage) {
718                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
719            }
720        }
721        fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
722            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
723        }
724        fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
725            self.key_cursor = 0;
726            storage.storage.advance_to_live_key(&mut self.key_cursor);
727
728            if self.key_valid(storage) {
729                self.rewind_vals(storage)
730            }
731        }
732        fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
733            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
734        }
735    }
736
737    /// A builder for creating layers from unsorted update tuples.
738    pub struct RhhValBuilder<L: Layout, CI>
739    where 
740        <L::Target as Update>::Key: Default + HashOrdered,
741    {
742        result: RhhValStorage<L>,
743        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
744        /// Counts the number of singleton optimizations we performed.
745        ///
746        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
747        /// even though `updates.len()` may be much shorter than this amount.
748        singletons: usize,
749        _marker: PhantomData<CI>,
750    }
751
752    impl<L: Layout, CI> RhhValBuilder<L, CI>
753    where 
754        <L::Target as Update>::Key: Default + HashOrdered,
755    {
756        /// Pushes a single update, which may set `self.singleton` rather than push.
757        ///
758        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
759        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
760        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
761        /// This otherwise invalid state encodes "look back one element".
762        ///
763        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
764        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
765        /// The update tuple is retained in `self.singleton` in case we see another update and need
766        /// to recover the singleton to push it into `updates` to join the second update.
767        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
768            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
769            let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
770            let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
771            if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
772                assert!(self.singleton.is_none());
773                self.singleton = Some((time, diff));
774            }
775            else {
776                // If we have pushed a single element, we need to copy it out to meet this one.
777                if let Some((time, diff)) = self.singleton.take() {
778                    self.result.times.push(time);
779                    self.result.diffs.push(diff);
780                }
781                self.result.times.push(time);
782                self.result.diffs.push(diff);
783            }
784        }
785    }
786
787    impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
788    where
789        <L::Target as Update>::Key: Default + HashOrdered,
790        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
791        for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
792        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = <L::Target as Update>::Key>,
793    {
794        type Input = CI;
795        type Time = <L::Target as Update>::Time;
796        type Output = RhhValBatch<L>;
797
798        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
799
800            // Double the capacity for RHH; probably excessive.
801            let rhh_capacity = 2 * keys;
802            let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);                        
803            // We want some additive slop, in case we spill over.
804            // This number magically chosen based on nothing in particular.
805            // Worst case, we will re-alloc and copy if we spill beyond this.
806            let keys = rhh_capacity + 10;
807
808            // We don't introduce zero offsets as they will be introduced by the first `push` call.
809            Self { 
810                result: RhhValStorage {
811                    keys: L::KeyContainer::with_capacity(keys),
812                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
813                    vals: L::ValContainer::with_capacity(vals),
814                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
815                    times: L::TimeContainer::with_capacity(upds),
816                    diffs: L::DiffContainer::with_capacity(upds),
817                    key_count: 0,
818                    key_capacity: rhh_capacity,
819                    divisor,
820                },
821                singleton: None,
822                singletons: 0,
823                _marker: PhantomData,
824            }
825        }
826
827        #[inline]
828        fn push(&mut self, chunk: &mut Self::Input) {
829            for item in chunk.drain() {
830                let (key, val, time, diff) = CI::into_parts(item);
831                // Perhaps this is a continuation of an already received key.
832                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
833                    // Perhaps this is a continuation of an already received value.
834                    if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
835                        self.push_update(time, diff);
836                    } else {
837                        // New value; complete representation of prior value.
838                        self.result.vals_offs.push(self.result.times.len());
839                        if self.singleton.take().is_some() { self.singletons += 1; }
840                        self.push_update(time, diff);
841                        self.result.vals.push(val);
842                    }
843                } else {
844                    // New key; complete representation of prior key.
845                    self.result.vals_offs.push(self.result.times.len());
846                    if self.singleton.take().is_some() { self.singletons += 1; }
847                    self.result.keys_offs.push(self.result.vals.len());
848                    self.push_update(time, diff);
849                    self.result.vals.push(val);
850                    // Insert the key, but with no specified offset.
851                    self.result.insert_key(IntoOwned::borrow_as(&key), None);
852                }
853            }
854        }
855
856        #[inline(never)]
857        fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
858            // Record the final offsets
859            self.result.vals_offs.push(self.result.times.len());
860            // Remove any pending singleton, and if it was set increment our count.
861            if self.singleton.take().is_some() { self.singletons += 1; }
862            self.result.keys_offs.push(self.result.vals.len());
863            RhhValBatch {
864                updates: self.result.times.len() + self.singletons,
865                storage: self.result,
866                description,
867            }
868        }
869
870        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
871            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
872            let mut builder = Self::with_capacity(keys, vals, upds);
873            for mut chunk in chain.drain(..) {
874                builder.push(&mut chunk);
875            }
876
877            builder.done(description)
878        }
879    }
880
881}
882
883mod key_batch {
884
885    // Copy the above, once it works!
886
887}