Skip to main content

palimpsest_dataflow/trace/implementations/
rhh.rs

1//! Batch implementation based on Robin Hood Hashing.
2//!
3//! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means
4//! that these implementations should only be used with each other, under
5//! the same `hash` function, or for types that also order by `(hash(X), X)`,
6//! for example wrapped types that implement `Ord` that way.
7
8use std::cmp::Ordering;
9use std::rc::Rc;
10
11use serde::{Deserialize, Serialize};
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::merge_batcher::{ColMerger, MergeBatcher, VecMerger};
16use crate::trace::implementations::spine_fueled::Spine;
17use crate::trace::rc_blanket_impls::RcBuilder;
18use crate::Hashable;
19
20use super::{Layout, TStack, Vector};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K, V), T, R)>>>>;
26/// A batcher for ordered lists.
27pub type VecBatcher<K, V, T, R> =
28    MergeBatcher<Vec<((K, V), T, R)>, VecChunker<((K, V), T, R)>, VecMerger<(K, V), T, R>>;
29/// A builder for ordered lists.
30pub type VecBuilder<K, V, T, R> =
31    RcBuilder<RhhValBuilder<Vector<((K, V), T, R)>, Vec<((K, V), T, R)>>>;
32
33// /// A trace implementation for empty values using a spine of ordered lists.
34// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
35
36/// A trace implementation backed by columnar storage.
37pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K, V), T, R)>>>>;
38/// A batcher for columnar storage.
39pub type ColBatcher<K, V, T, R> =
40    MergeBatcher<Vec<((K, V), T, R)>, ColumnationChunker<((K, V), T, R)>, ColMerger<(K, V), T, R>>;
41/// A builder for columnar storage.
42pub type ColBuilder<K, V, T, R> =
43    RcBuilder<RhhValBuilder<TStack<((K, V), T, R)>, TimelyStack<((K, V), T, R)>>>;
44
45// /// A trace implementation backed by columnar storage.
46// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
47
48/// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`.
49pub trait HashOrdered: Hashable {}
50
51impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T {}
52
53/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`.
54#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
55pub struct HashWrapper<T: std::hash::Hash + Hashable> {
56    /// The inner value, freely modifiable.
57    pub inner: T,
58}
59
60impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
61    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
62        let this_hash = self.inner.hashed();
63        let that_hash = other.inner.hashed();
64        (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
65    }
66}
67
68impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
69    fn cmp(&self, other: &Self) -> Ordering {
70        self.partial_cmp(other).unwrap()
71    }
72}
73
74impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> {}
75
76impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
77    type Output = T::Output;
78    fn hashed(&self) -> Self::Output {
79        self.inner.hashed()
80    }
81}
82
83impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> {}
84
85impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
86    type Output = T::Output;
87    fn hashed(&self) -> Self::Output {
88        self.inner.hashed()
89    }
90}
91
92mod val_batch {
93
94    use serde::{Deserialize, Serialize};
95    use std::convert::TryInto;
96    use std::marker::PhantomData;
97    use timely::container::PushInto;
98    use timely::progress::{frontier::AntichainRef, Antichain};
99
100    use crate::hashable::Hashable;
101    use crate::trace::implementations::layout;
102    use crate::trace::implementations::{BatchContainer, BuilderInput};
103    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
104
105    use super::{HashOrdered, Layout};
106
107    /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`.
108    ///
109    /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`,
110    /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict
111    /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule:
112    /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot.
113    ///
114    /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all
115    /// keys for valid updates must have some associated values with updates. This is the same type of
116    /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values.
117    ///
118    /// We will use the `Hashable` trait here, but any consistent hash function should work out ok.
119    /// We specifically want to use the highest bits of the result (we will) because the low bits have
120    /// likely been spent shuffling the data between workers (by key), and are likely low entropy.
121    #[derive(Debug, Serialize, Deserialize)]
122    pub struct RhhValStorage<L: Layout>
123    where
124        layout::Key<L>: Default + HashOrdered,
125    {
126        /// The requested capacity for `keys`. We use this when determining where a key with a certain hash
127        /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method,
128        /// otherwise we would just use that.
129        pub key_capacity: usize,
130        /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`.
131        /// When that capacity is zero or one, this is set to zero instead.
132        pub divisor: u64,
133        /// The number of present keys, distinct from `keys.len()` which contains
134        pub key_count: usize,
135
136        /// An ordered list of keys, corresponding to entries in `keys_offs`.
137        pub keys: L::KeyContainer,
138        /// Offsets used to provide indexes from keys to values.
139        ///
140        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
141        pub keys_offs: L::OffsetContainer,
142        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
143        pub vals: L::ValContainer,
144        /// Offsets used to provide indexes from values to updates.
145        ///
146        /// This list has a special representation that any empty range indicates the singleton
147        /// element just before the range, as if the start were decremented by one. The empty
148        /// range is otherwise an invalid representation, and we borrow it to compactly encode
149        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
150        ///
151        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
152        pub vals_offs: L::OffsetContainer,
153        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
154        pub times: L::TimeContainer,
155        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
156        pub diffs: L::DiffContainer,
157    }
158
159    impl<L: Layout> RhhValStorage<L>
160    where
161        layout::Key<L>: Default + HashOrdered,
162        for<'a> layout::KeyRef<'a, L>: HashOrdered,
163    {
164        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
165        fn values_for_key(&self, index: usize) -> (usize, usize) {
166            let lower = self.keys_offs.index(index);
167            let upper = self.keys_offs.index(index + 1);
168            // Looking up values for an invalid key indicates something is wrong.
169            assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
170            (lower, upper)
171        }
172        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
173        fn updates_for_value(&self, index: usize) -> (usize, usize) {
174            let mut lower = self.vals_offs.index(index);
175            let upper = self.vals_offs.index(index + 1);
176            // We use equal lower and upper to encode "singleton update; just before here".
177            // It should only apply when there is a prior element, so `lower` should be greater than zero.
178            if lower == upper {
179                assert!(lower > 0);
180                lower -= 1;
181            }
182            (lower, upper)
183        }
184
185        /// Inserts the key at its desired location, or nearby.
186        ///
187        /// Because there may be collisions, they key may be placed just after its desired location.
188        /// If necessary, this method will introduce default keys and copy the offsets to create space
189        /// after which to insert the key. These will be indicated by `None` entries in the `hash` vector.
190        ///
191        /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified,
192        /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may
193        /// not know the final offset at the moment of key insertion can prepare for receiving the offset.
194        fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option<usize>) {
195            let desired = self.desired_location(&key);
196            // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong,
197            // push additional blank entries in.
198            while self.keys.len() < desired {
199                // We insert a default (dummy) key and repeat the offset to indicate this.
200                let current_offset = self.keys_offs.index(self.keys.len());
201                self.keys.push_own(&<layout::Key<L> as Default>::default());
202                self.keys_offs.push_ref(current_offset);
203            }
204
205            // Now we insert the key. Even if it is no longer the desired location because of contention.
206            // If an offset has been supplied we insert it, and otherwise leave it for future determination.
207            self.keys.push_ref(key);
208            if let Some(offset) = offset {
209                self.keys_offs.push_ref(offset);
210            }
211            self.key_count += 1;
212        }
213
214        /// Inserts a reference to an owned key, inefficiently. Should be removed.
215        fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
216            let mut key_con = L::KeyContainer::with_capacity(1);
217            key_con.push_own(&key);
218            self.insert_key(key_con.index(0), offset)
219        }
220
221        /// Indicates both the desired location and the hash signature of the key.
222        fn desired_location<K: Hashable>(&self, key: &K) -> usize {
223            if self.divisor == 0 {
224                0
225            } else {
226                (key.hashed().into() / self.divisor)
227                    .try_into()
228                    .expect("divisor not large enough to force u64 into uisze")
229            }
230        }
231
232        /// Returns true if one should advance one's index in the search for `key`.
233        fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool {
234            // Ideally this short-circuits, as `self.keys[index]` is bogus data.
235            !self.live_key(index)
236                || self
237                    .keys
238                    .index(index)
239                    .lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
240        }
241
242        /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range.
243        fn live_key(&self, index: usize) -> bool {
244            self.keys_offs.index(index) != self.keys_offs.index(index + 1)
245        }
246
247        /// Advances `index` until it references a live key, or is `keys.len()`.
248        fn advance_to_live_key(&self, index: &mut usize) {
249            while *index < self.keys.len() && !self.live_key(*index) {
250                *index += 1;
251            }
252        }
253
254        /// A value large enough that any `u64` divided by it is less than `capacity`.
255        ///
256        /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one.
257        /// In those cases, we'll return `0` to communicate the exception, for which we should
258        /// just return `0` when announcing a target location (and a zero capacity that we insert
259        /// into becomes a bug).
260        fn divisor_for_capacity(capacity: usize) -> u64 {
261            let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
262            if capacity == 0 || capacity == 1 {
263                0
264            } else {
265                ((1 << 63) / capacity) << 1
266            }
267        }
268    }
269
270    /// An immutable collection of update tuples, from a contiguous interval of logical times.
271    ///
272    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
273    /// merge batcher to select.
274    #[derive(Serialize, Deserialize)]
275    #[serde(bound = "
276        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
277        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
278        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
279        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
280        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
281    ")]
282    pub struct RhhValBatch<L: Layout>
283    where
284        layout::Key<L>: Default + HashOrdered,
285    {
286        /// The updates themselves.
287        pub storage: RhhValStorage<L>,
288        /// Description of the update times this layer represents.
289        pub description: Description<layout::Time<L>>,
290        /// The number of updates reflected in the batch.
291        ///
292        /// We track this separately from `storage` because due to the singleton optimization,
293        /// we may have many more updates than `storage.updates.len()`. It should equal that
294        /// length, plus the number of singleton optimizations employed.
295        pub updates: usize,
296    }
297
298    impl<L: Layout> WithLayout for RhhValBatch<L>
299    where
300        layout::Key<L>: Default + HashOrdered,
301        for<'a> layout::KeyRef<'a, L>: HashOrdered,
302    {
303        type Layout = L;
304    }
305
306    impl<L: Layout> BatchReader for RhhValBatch<L>
307    where
308        layout::Key<L>: Default + HashOrdered,
309        for<'a> layout::KeyRef<'a, L>: HashOrdered,
310    {
311        type Cursor = RhhValCursor<L>;
312        fn cursor(&self) -> Self::Cursor {
313            let mut cursor = RhhValCursor {
314                key_cursor: 0,
315                val_cursor: 0,
316                phantom: std::marker::PhantomData,
317            };
318            cursor.step_key(self);
319            cursor
320        }
321        fn len(&self) -> usize {
322            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
323            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
324            self.updates
325        }
326        fn description(&self) -> &Description<layout::Time<L>> {
327            &self.description
328        }
329    }
330
331    impl<L: Layout> Batch for RhhValBatch<L>
332    where
333        layout::Key<L>: Default + HashOrdered,
334        for<'a> layout::KeyRef<'a, L>: HashOrdered,
335    {
336        type Merger = RhhValMerger<L>;
337
338        fn begin_merge(
339            &self,
340            other: &Self,
341            compaction_frontier: AntichainRef<layout::Time<L>>,
342        ) -> Self::Merger {
343            RhhValMerger::new(self, other, compaction_frontier)
344        }
345
346        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
347            use timely::progress::Timestamp;
348            Self {
349                storage: RhhValStorage {
350                    keys: L::KeyContainer::with_capacity(0),
351                    keys_offs: L::OffsetContainer::with_capacity(0),
352                    vals: L::ValContainer::with_capacity(0),
353                    vals_offs: L::OffsetContainer::with_capacity(0),
354                    times: L::TimeContainer::with_capacity(0),
355                    diffs: L::DiffContainer::with_capacity(0),
356                    key_count: 0,
357                    key_capacity: 0,
358                    divisor: 0,
359                },
360                description: Description::new(
361                    lower,
362                    upper,
363                    Antichain::from_elem(Self::Time::minimum()),
364                ),
365                updates: 0,
366            }
367        }
368    }
369
370    /// State for an in-progress merge.
371    pub struct RhhValMerger<L: Layout>
372    where
373        layout::Key<L>: Default + HashOrdered,
374    {
375        /// Key position to merge next in the first batch.
376        key_cursor1: usize,
377        /// Key position to merge next in the second batch.
378        key_cursor2: usize,
379        /// result that we are currently assembling.
380        result: RhhValStorage<L>,
381        /// description
382        description: Description<layout::Time<L>>,
383
384        /// Local stash of updates, to use for consolidation.
385        ///
386        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
387        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
388        update_stash: Vec<(layout::Time<L>, layout::Diff<L>)>,
389        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
390        singletons: usize,
391    }
392
393    impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
394    where
395        layout::Key<L>: Default + HashOrdered,
396        RhhValBatch<L>: Batch<Time = layout::Time<L>>,
397        for<'a> layout::KeyRef<'a, L>: HashOrdered,
398    {
399        fn new(
400            batch1: &RhhValBatch<L>,
401            batch2: &RhhValBatch<L>,
402            compaction_frontier: AntichainRef<layout::Time<L>>,
403        ) -> Self {
404            assert!(batch1.upper() == batch2.lower());
405            use crate::lattice::Lattice;
406            let mut since = batch1
407                .description()
408                .since()
409                .join(batch2.description().since());
410            since = since.join(&compaction_frontier.to_owned());
411
412            let description =
413                Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
414
415            // This is a massive overestimate on the number of keys, but we don't have better information.
416            // An over-estimate can be a massive problem as well, with sparse regions being hard to cross.
417            let max_cap = batch1.len() + batch2.len();
418            let rhh_cap = 2 * max_cap;
419
420            let batch1 = &batch1.storage;
421            let batch2 = &batch2.storage;
422
423            let mut storage = RhhValStorage {
424                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
425                keys_offs: L::OffsetContainer::with_capacity(
426                    batch1.keys_offs.len() + batch2.keys_offs.len(),
427                ),
428                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
429                vals_offs: L::OffsetContainer::with_capacity(
430                    batch1.vals_offs.len() + batch2.vals_offs.len(),
431                ),
432                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
433                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
434                key_count: 0,
435                key_capacity: rhh_cap,
436                divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
437            };
438
439            // Mark explicit types because type inference fails to resolve it.
440            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
441            keys_offs.push_ref(0);
442            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
443            vals_offs.push_ref(0);
444
445            RhhValMerger {
446                key_cursor1: 0,
447                key_cursor2: 0,
448                result: storage,
449                description,
450                update_stash: Vec::new(),
451                singletons: 0,
452            }
453        }
454        fn done(self) -> RhhValBatch<L> {
455            RhhValBatch {
456                updates: self.result.times.len() + self.singletons,
457                storage: self.result,
458                description: self.description,
459            }
460        }
461        fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
462            // An (incomplete) indication of the amount of work we've done so far.
463            let starting_updates = self.result.times.len();
464            let mut effort = 0isize;
465
466            source1.storage.advance_to_live_key(&mut self.key_cursor1);
467            source2.storage.advance_to_live_key(&mut self.key_cursor2);
468
469            // While both mergees are still active, perform single-key merges.
470            while self.key_cursor1 < source1.storage.keys.len()
471                && self.key_cursor2 < source2.storage.keys.len()
472                && effort < *fuel
473            {
474                self.merge_key(&source1.storage, &source2.storage);
475                source1.storage.advance_to_live_key(&mut self.key_cursor1);
476                source2.storage.advance_to_live_key(&mut self.key_cursor2);
477                // An (incomplete) accounting of the work we've done.
478                effort = (self.result.times.len() - starting_updates) as isize;
479            }
480
481            // Merging is complete, and only copying remains.
482            // Key-by-key copying allows effort interruption, and compaction.
483            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
484                self.copy_key(&source1.storage, self.key_cursor1);
485                self.key_cursor1 += 1;
486                source1.storage.advance_to_live_key(&mut self.key_cursor1);
487                effort = (self.result.times.len() - starting_updates) as isize;
488            }
489            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
490                self.copy_key(&source2.storage, self.key_cursor2);
491                self.key_cursor2 += 1;
492                source2.storage.advance_to_live_key(&mut self.key_cursor2);
493                effort = (self.result.times.len() - starting_updates) as isize;
494            }
495
496            *fuel -= effort;
497        }
498    }
499
500    // Helper methods in support of merging batches.
501    impl<L: Layout> RhhValMerger<L>
502    where
503        layout::Key<L>: Default + HashOrdered,
504        for<'a> layout::KeyRef<'a, L>: HashOrdered,
505    {
506        /// Copy the next key in `source`.
507        ///
508        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
509        /// If the result does not wholly cancel, they key will be present in `self` with the
510        /// compacted values and updates.
511        ///
512        /// The caller should be certain to update the cursor, as this method does not do this.
513        fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
514            // Capture the initial number of values to determine if the merge was ultimately non-empty.
515            let init_vals = self.result.vals.len();
516            let (mut lower, upper) = source.values_for_key(cursor);
517            while lower < upper {
518                self.stash_updates_for_val(source, lower);
519                if let Some(off) = self.consolidate_updates() {
520                    self.result.vals_offs.push_ref(off);
521                    self.result.vals.push_ref(source.vals.index(lower));
522                }
523                lower += 1;
524            }
525
526            // If we have pushed any values, copy the key as well.
527            if self.result.vals.len() > init_vals {
528                self.result
529                    .insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
530            }
531        }
532        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
533        ///
534        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
535        /// if the updates cancel either directly or after compaction.
536        fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
537            use ::std::cmp::Ordering;
538            match source1
539                .keys
540                .index(self.key_cursor1)
541                .cmp(&source2.keys.index(self.key_cursor2))
542            {
543                Ordering::Less => {
544                    self.copy_key(source1, self.key_cursor1);
545                    self.key_cursor1 += 1;
546                }
547                Ordering::Equal => {
548                    // Keys are equal; must merge all values from both sources for this one key.
549                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
550                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
551                    if let Some(off) =
552                        self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2))
553                    {
554                        self.result
555                            .insert_key(source1.keys.index(self.key_cursor1), Some(off));
556                    }
557                    // Increment cursors in either case; the keys are merged.
558                    self.key_cursor1 += 1;
559                    self.key_cursor2 += 1;
560                }
561                Ordering::Greater => {
562                    self.copy_key(source2, self.key_cursor2);
563                    self.key_cursor2 += 1;
564                }
565            }
566        }
567        /// Merge two ranges of values into `self`.
568        ///
569        /// If the compacted result contains values with non-empty updates, the function returns
570        /// an offset that should be recorded to indicate the upper extent of the result values.
571        fn merge_vals(
572            &mut self,
573            (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
574            (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
575        ) -> Option<usize> {
576            // Capture the initial number of values to determine if the merge was ultimately non-empty.
577            let init_vals = self.result.vals.len();
578            while lower1 < upper1 && lower2 < upper2 {
579                // We compare values, and fold in updates for the lowest values;
580                // if they are non-empty post-consolidation, we write the value.
581                // We could multi-way merge and it wouldn't be very complicated.
582                use ::std::cmp::Ordering;
583                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
584                    Ordering::Less => {
585                        // Extend stash by updates, with logical compaction applied.
586                        self.stash_updates_for_val(source1, lower1);
587                        if let Some(off) = self.consolidate_updates() {
588                            self.result.vals_offs.push_ref(off);
589                            self.result.vals.push_ref(source1.vals.index(lower1));
590                        }
591                        lower1 += 1;
592                    }
593                    Ordering::Equal => {
594                        self.stash_updates_for_val(source1, lower1);
595                        self.stash_updates_for_val(source2, lower2);
596                        if let Some(off) = self.consolidate_updates() {
597                            self.result.vals_offs.push_ref(off);
598                            self.result.vals.push_ref(source1.vals.index(lower1));
599                        }
600                        lower1 += 1;
601                        lower2 += 1;
602                    }
603                    Ordering::Greater => {
604                        // Extend stash by updates, with logical compaction applied.
605                        self.stash_updates_for_val(source2, lower2);
606                        if let Some(off) = self.consolidate_updates() {
607                            self.result.vals_offs.push_ref(off);
608                            self.result.vals.push_ref(source2.vals.index(lower2));
609                        }
610                        lower2 += 1;
611                    }
612                }
613            }
614            // Merging is complete, but we may have remaining elements to push.
615            while lower1 < upper1 {
616                self.stash_updates_for_val(source1, lower1);
617                if let Some(off) = self.consolidate_updates() {
618                    self.result.vals_offs.push_ref(off);
619                    self.result.vals.push_ref(source1.vals.index(lower1));
620                }
621                lower1 += 1;
622            }
623            while lower2 < upper2 {
624                self.stash_updates_for_val(source2, lower2);
625                if let Some(off) = self.consolidate_updates() {
626                    self.result.vals_offs.push_ref(off);
627                    self.result.vals.push_ref(source2.vals.index(lower2));
628                }
629                lower2 += 1;
630            }
631
632            // Values being pushed indicate non-emptiness.
633            if self.result.vals.len() > init_vals {
634                Some(self.result.vals.len())
635            } else {
636                None
637            }
638        }
639
640        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
641        fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
642            let (lower, upper) = source.updates_for_value(index);
643            for i in lower..upper {
644                // NB: Here is where we would need to look back if `lower == upper`.
645                let time = source.times.index(i);
646                let diff = source.diffs.index(i);
647                let mut new_time = L::TimeContainer::into_owned(time);
648                use crate::lattice::Lattice;
649                new_time.advance_by(self.description.since().borrow());
650                self.update_stash
651                    .push((new_time, L::DiffContainer::into_owned(diff)));
652            }
653        }
654
655        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
656        fn consolidate_updates(&mut self) -> Option<usize> {
657            use crate::consolidation;
658            consolidation::consolidate(&mut self.update_stash);
659            if !self.update_stash.is_empty() {
660                // If there is a single element, equal to a just-prior recorded update,
661                // we push nothing and report an unincremented offset to encode this case.
662                let time_diff = self.result.times.last().zip(self.result.diffs.last());
663                let last_eq =
664                    self.update_stash
665                        .last()
666                        .zip(time_diff)
667                        .map(|((t1, d1), (t2, d2))| {
668                            // TODO: The use of `into_owned` is a work-around for not having reference types.
669                            *t1 == L::TimeContainer::into_owned(t2)
670                                && *d1 == L::DiffContainer::into_owned(d2)
671                        });
672                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
673                    // Just clear out update_stash, as we won't drain it here.
674                    self.update_stash.clear();
675                    self.singletons += 1;
676                } else {
677                    // Conventional; move `update_stash` into `updates`.
678                    for (time, diff) in self.update_stash.drain(..) {
679                        self.result.times.push_own(&time);
680                        self.result.diffs.push_own(&diff);
681                    }
682                }
683                Some(self.result.times.len())
684            } else {
685                None
686            }
687        }
688    }
689
690    /// A cursor through a Robin Hood Hashed list of keys, vals, and such.
691    ///
692    /// The important detail is that not all of `keys` represent valid keys.
693    /// We must consult `storage.hashed` to see if the associated data is valid.
694    /// Importantly, we should skip over invalid keys, rather than report them as
695    /// invalid through `key_valid`: that method is meant to indicate the end of
696    /// the cursor, rather than internal state.
697    pub struct RhhValCursor<L: Layout>
698    where
699        layout::Key<L>: Default + HashOrdered,
700    {
701        /// Absolute position of the current key.
702        key_cursor: usize,
703        /// Absolute position of the current value.
704        val_cursor: usize,
705        /// Phantom marker for Rust happiness.
706        phantom: PhantomData<L>,
707    }
708
709    use crate::trace::implementations::WithLayout;
710    impl<L: Layout> WithLayout for RhhValCursor<L>
711    where
712        layout::Key<L>: Default + HashOrdered,
713        for<'a> layout::KeyRef<'a, L>: HashOrdered,
714    {
715        type Layout = L;
716    }
717
718    impl<L: Layout> Cursor for RhhValCursor<L>
719    where
720        layout::Key<L>: Default + HashOrdered,
721        for<'a> layout::KeyRef<'a, L>: HashOrdered,
722    {
723        type Storage = RhhValBatch<L>;
724
725        fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> {
726            storage.storage.keys.get(self.key_cursor)
727        }
728        fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> {
729            if self.val_valid(storage) {
730                storage.storage.vals.get(self.val_cursor)
731            } else {
732                None
733            }
734        }
735        fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> {
736            storage.storage.keys.index(self.key_cursor)
737        }
738        fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> {
739            storage.storage.vals.index(self.val_cursor)
740        }
741        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
742            &mut self,
743            storage: &RhhValBatch<L>,
744            mut logic: L2,
745        ) {
746            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
747            for index in lower..upper {
748                let time = storage.storage.times.index(index);
749                let diff = storage.storage.diffs.index(index);
750                logic(time, diff);
751            }
752        }
753        fn key_valid(&self, storage: &RhhValBatch<L>) -> bool {
754            self.key_cursor < storage.storage.keys.len()
755        }
756        fn val_valid(&self, storage: &RhhValBatch<L>) -> bool {
757            self.val_cursor < storage.storage.values_for_key(self.key_cursor).1
758        }
759        fn step_key(&mut self, storage: &RhhValBatch<L>) {
760            // We advance the cursor by one for certain, and then as long as we need to find a valid key.
761            self.key_cursor += 1;
762            storage.storage.advance_to_live_key(&mut self.key_cursor);
763
764            if self.key_valid(storage) {
765                self.rewind_vals(storage);
766            } else {
767                self.key_cursor = storage.storage.keys.len();
768            }
769        }
770        fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
771            // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
772            let desired = storage.storage.desired_location(&key);
773            // Advance the cursor, if `desired` is ahead of it.
774            if self.key_cursor < desired {
775                self.key_cursor = desired;
776            }
777            // Advance the cursor as long as we have not found a value greater or equal to `key`.
778            // We may have already passed `key`, and confirmed its absence, but our goal is to
779            // find the next key afterwards so that users can, for example, alternately iterate.
780            while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
781                // TODO: Based on our encoding, we could skip logarithmically over empty regions by galloping
782                //       through `storage.keys_offs`, which stays put for dead space.
783                self.key_cursor += 1;
784            }
785
786            if self.key_valid(storage) {
787                self.rewind_vals(storage);
788            }
789        }
790        fn step_val(&mut self, storage: &RhhValBatch<L>) {
791            self.val_cursor += 1;
792            if !self.val_valid(storage) {
793                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
794            }
795        }
796        fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
797            self.val_cursor += storage.storage.vals.advance(
798                self.val_cursor,
799                storage.storage.values_for_key(self.key_cursor).1,
800                |x| {
801                    <L::ValContainer as BatchContainer>::reborrow(x)
802                        .lt(&<L::ValContainer as BatchContainer>::reborrow(val))
803                },
804            );
805        }
806        fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
807            self.key_cursor = 0;
808            storage.storage.advance_to_live_key(&mut self.key_cursor);
809
810            if self.key_valid(storage) {
811                self.rewind_vals(storage)
812            }
813        }
814        fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
815            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
816        }
817    }
818
819    /// A builder for creating layers from unsorted update tuples.
820    pub struct RhhValBuilder<L: Layout, CI>
821    where
822        layout::Key<L>: Default + HashOrdered,
823    {
824        result: RhhValStorage<L>,
825        singleton: Option<(layout::Time<L>, layout::Diff<L>)>,
826        /// Counts the number of singleton optimizations we performed.
827        ///
828        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
829        /// even though `updates.len()` may be much shorter than this amount.
830        singletons: usize,
831        _marker: PhantomData<CI>,
832    }
833
834    impl<L: Layout, CI> RhhValBuilder<L, CI>
835    where
836        layout::Key<L>: Default + HashOrdered,
837    {
838        /// Pushes a single update, which may set `self.singleton` rather than push.
839        ///
840        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
841        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
842        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
843        /// This otherwise invalid state encodes "look back one element".
844        ///
845        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
846        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
847        /// The update tuple is retained in `self.singleton` in case we see another update and need
848        /// to recover the singleton to push it into `updates` to join the second update.
849        fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
850            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
851            // TODO: The use of `into_owned` is a bandage for not having references we can compare.
852            if self
853                .result
854                .times
855                .last()
856                .map(|t| L::TimeContainer::into_owned(t) == time)
857                .unwrap_or(false)
858                && self
859                    .result
860                    .diffs
861                    .last()
862                    .map(|d| L::DiffContainer::into_owned(d) == diff)
863                    .unwrap_or(false)
864            {
865                assert!(self.singleton.is_none());
866                self.singleton = Some((time, diff));
867            } else {
868                // If we have pushed a single element, we need to copy it out to meet this one.
869                if let Some((time, diff)) = self.singleton.take() {
870                    self.result.times.push_own(&time);
871                    self.result.diffs.push_own(&diff);
872                }
873                self.result.times.push_own(&time);
874                self.result.diffs.push_own(&diff);
875            }
876        }
877    }
878
879    impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
880    where
881        layout::Key<L>: Default + HashOrdered,
882        CI: for<'a> BuilderInput<
883            L::KeyContainer,
884            L::ValContainer,
885            Key<'a> = layout::Key<L>,
886            Time = layout::Time<L>,
887            Diff = layout::Diff<L>,
888        >,
889        for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
890        for<'a> layout::KeyRef<'a, L>: HashOrdered,
891    {
892        type Input = CI;
893        type Time = layout::Time<L>;
894        type Output = RhhValBatch<L>;
895
896        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
897            // Double the capacity for RHH; probably excessive.
898            let rhh_capacity = 2 * keys;
899            let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
900            // We want some additive slop, in case we spill over.
901            // This number magically chosen based on nothing in particular.
902            // Worst case, we will re-alloc and copy if we spill beyond this.
903            let keys = rhh_capacity + 10;
904
905            // We don't introduce zero offsets as they will be introduced by the first `push` call.
906            Self {
907                result: RhhValStorage {
908                    keys: L::KeyContainer::with_capacity(keys),
909                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
910                    vals: L::ValContainer::with_capacity(vals),
911                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
912                    times: L::TimeContainer::with_capacity(upds),
913                    diffs: L::DiffContainer::with_capacity(upds),
914                    key_count: 0,
915                    key_capacity: rhh_capacity,
916                    divisor,
917                },
918                singleton: None,
919                singletons: 0,
920                _marker: PhantomData,
921            }
922        }
923
924        #[inline]
925        fn push(&mut self, chunk: &mut Self::Input) {
926            for item in chunk.drain() {
927                let (key, val, time, diff) = CI::into_parts(item);
928                // Perhaps this is a continuation of an already received key.
929                if self
930                    .result
931                    .keys
932                    .last()
933                    .map(|k| CI::key_eq(&key, k))
934                    .unwrap_or(false)
935                {
936                    // Perhaps this is a continuation of an already received value.
937                    if self
938                        .result
939                        .vals
940                        .last()
941                        .map(|v| CI::val_eq(&val, v))
942                        .unwrap_or(false)
943                    {
944                        self.push_update(time, diff);
945                    } else {
946                        // New value; complete representation of prior value.
947                        self.result.vals_offs.push_ref(self.result.times.len());
948                        if self.singleton.take().is_some() {
949                            self.singletons += 1;
950                        }
951                        self.push_update(time, diff);
952                        self.result.vals.push_into(val);
953                    }
954                } else {
955                    // New key; complete representation of prior key.
956                    self.result.vals_offs.push_ref(self.result.times.len());
957                    if self.singleton.take().is_some() {
958                        self.singletons += 1;
959                    }
960                    self.result.keys_offs.push_ref(self.result.vals.len());
961                    self.push_update(time, diff);
962                    self.result.vals.push_into(val);
963                    // Insert the key, but with no specified offset.
964                    self.result.insert_key_own(&key, None);
965                }
966            }
967        }
968
969        #[inline(never)]
970        fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
971            // Record the final offsets
972            self.result.vals_offs.push_ref(self.result.times.len());
973            // Remove any pending singleton, and if it was set increment our count.
974            if self.singleton.take().is_some() {
975                self.singletons += 1;
976            }
977            self.result.keys_offs.push_ref(self.result.vals.len());
978            RhhValBatch {
979                updates: self.result.times.len() + self.singletons,
980                storage: self.result,
981                description,
982            }
983        }
984
985        fn seal(
986            chain: &mut Vec<Self::Input>,
987            description: Description<Self::Time>,
988        ) -> Self::Output {
989            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
990            let mut builder = Self::with_capacity(keys, vals, upds);
991            for mut chunk in chain.drain(..) {
992                builder.push(&mut chunk);
993            }
994
995            builder.done(description)
996        }
997    }
998}
999
1000mod key_batch {
1001
1002    // Copy the above, once it works!
1003}