Skip to main content

differential_dataflow/trace/implementations/
rhh.rs

1//! Batch implementation based on Robin Hood Hashing.
2//!
3//! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means
4//! that these implementations should only be used with each other, under
5//! the same `hash` function, or for types that also order by `(hash(X), X)`,
6//! for example wrapped types that implement `Ord` that way.
7
8use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker};
16use crate::trace::implementations::merge_batcher::MergeBatcher;
17use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger};
18use crate::trace::implementations::spine_fueled::Spine;
19use crate::trace::rc_blanket_impls::RcBuilder;
20
21use super::{Layout, Vector, TStack};
22
23use self::val_batch::{RhhValBatch, RhhValBuilder};
24
25/// A trace implementation using a spine of ordered lists.
26pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
27/// A batcher for ordered lists.
28pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
29/// A builder for ordered lists.
30pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
31
32// /// A trace implementation for empty values using a spine of ordered lists.
33// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
34
35/// A trace implementation backed by columnar storage.
36pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
37/// A batcher for columnar storage.
38pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColInternalMerger<(K,V),T,R>>;
39/// A builder for columnar storage.
40pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
41
42// /// A trace implementation backed by columnar storage.
43// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
44
45/// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`.
46pub trait HashOrdered: Hashable { }
47
48impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
49
50/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`.
51#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
52pub struct HashWrapper<T: std::hash::Hash + Hashable> {
53    /// The inner value, freely modifiable.
54    pub inner: T
55}
56
57impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
58    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59        let this_hash = self.inner.hashed();
60        let that_hash = other.inner.hashed();
61        (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
62    }
63}
64
65impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
66    fn cmp(&self, other: &Self) -> Ordering {
67        self.partial_cmp(other).unwrap()
68    }
69}
70
71impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
72
73impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
74    type Output = T::Output;
75    fn hashed(&self) -> Self::Output { self.inner.hashed() }
76}
77
78impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
79
80impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
81    type Output = T::Output;
82    fn hashed(&self) -> Self::Output { self.inner.hashed() }
83}
84
85mod val_batch {
86
87    use std::convert::TryInto;
88    use std::marker::PhantomData;
89    use serde::{Deserialize, Serialize};
90    use timely::container::PushInto;
91    use timely::progress::{Antichain, frontier::AntichainRef};
92
93    use crate::hashable::Hashable;
94    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
95    use crate::trace::implementations::{BatchContainer, BuilderInput};
96    use crate::trace::implementations::layout;
97
98    use super::{Layout, HashOrdered};
99
100    /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`.
101    ///
102    /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`,
103    /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict
104    /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule:
105    /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot.
106    ///
107    /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all
108    /// keys for valid updates must have some associated values with updates. This is the same type of
109    /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values.
110    ///
111    /// We will use the `Hashable` trait here, but any consistent hash function should work out ok.
112    /// We specifically want to use the highest bits of the result (we will) because the low bits have
113    /// likely been spent shuffling the data between workers (by key), and are likely low entropy.
114    #[derive(Debug, Serialize, Deserialize)]
115    pub struct RhhValStorage<L: Layout>
116    where
117        layout::Key<L>: Default + HashOrdered,
118    {
119
120        /// The requested capacity for `keys`. We use this when determining where a key with a certain hash
121        /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method,
122        /// otherwise we would just use that.
123        pub key_capacity: usize,
124        /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`.
125        /// When that capacity is zero or one, this is set to zero instead.
126        pub divisor: u64,
127        /// The number of present keys, distinct from `keys.len()` which contains
128        pub key_count: usize,
129
130        /// An ordered list of keys, corresponding to entries in `keys_offs`.
131        pub keys: L::KeyContainer,
132        /// Offsets used to provide indexes from keys to values.
133        ///
134        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
135        pub keys_offs: L::OffsetContainer,
136        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
137        pub vals: L::ValContainer,
138        /// Offsets used to provide indexes from values to updates.
139        ///
140        /// This list has a special representation that any empty range indicates the singleton
141        /// element just before the range, as if the start were decremented by one. The empty
142        /// range is otherwise an invalid representation, and we borrow it to compactly encode
143        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
144        ///
145        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
146        pub vals_offs: L::OffsetContainer,
147        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
148        pub times: L::TimeContainer,
149        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
150        pub diffs: L::DiffContainer,
151    }
152
153    impl<L: Layout> RhhValStorage<L>
154    where
155        layout::Key<L>: Default + HashOrdered,
156        for<'a> layout::KeyRef<'a, L>: HashOrdered,
157    {
158        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
159        fn values_for_key(&self, index: usize) -> (usize, usize) {
160            let lower = self.keys_offs.index(index);
161            let upper = self.keys_offs.index(index+1);
162            // Looking up values for an invalid key indicates something is wrong.
163            assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
164            (lower, upper)
165        }
166        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
167        fn updates_for_value(&self, index: usize) -> (usize, usize) {
168            let mut lower = self.vals_offs.index(index);
169            let upper = self.vals_offs.index(index+1);
170            // We use equal lower and upper to encode "singleton update; just before here".
171            // It should only apply when there is a prior element, so `lower` should be greater than zero.
172            if lower == upper {
173                assert!(lower > 0);
174                lower -= 1;
175            }
176            (lower, upper)
177        }
178
179        /// Inserts the key at its desired location, or nearby.
180        ///
181        /// Because there may be collisions, they key may be placed just after its desired location.
182        /// If necessary, this method will introduce default keys and copy the offsets to create space
183        /// after which to insert the key. These will be indicated by `None` entries in the `hash` vector.
184        ///
185        /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified,
186        /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may
187        /// not know the final offset at the moment of key insertion can prepare for receiving the offset.
188        fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option<usize>) {
189            let desired = self.desired_location(&key);
190            // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong,
191            // push additional blank entries in.
192            while self.keys.len() < desired {
193                // We insert a default (dummy) key and repeat the offset to indicate this.
194                let current_offset = self.keys_offs.index(self.keys.len());
195                self.keys.push_own(&<layout::Key<L> as Default>::default());
196                self.keys_offs.push_ref(current_offset);
197            }
198
199            // Now we insert the key. Even if it is no longer the desired location because of contention.
200            // If an offset has been supplied we insert it, and otherwise leave it for future determination.
201            self.keys.push_ref(key);
202            if let Some(offset) = offset {
203                self.keys_offs.push_ref(offset);
204            }
205            self.key_count += 1;
206        }
207
208        /// Inserts a reference to an owned key, inefficiently. Should be removed.
209        fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
210            let mut key_con = L::KeyContainer::with_capacity(1);
211            key_con.push_own(&key);
212            self.insert_key(key_con.index(0), offset)
213        }
214
215        /// Indicates both the desired location and the hash signature of the key.
216        fn desired_location<K: Hashable>(&self, key: &K) -> usize {
217            if self.divisor == 0 { 0 }
218            else {
219                (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
220            }
221        }
222
223        /// Returns true if one should advance one's index in the search for `key`.
224        fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool {
225            // Ideally this short-circuits, as `self.keys[index]` is bogus data.
226            !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
227        }
228
229        /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range.
230        fn live_key(&self, index: usize) -> bool {
231            self.keys_offs.index(index) != self.keys_offs.index(index+1)
232        }
233
234        /// Advances `index` until it references a live key, or is `keys.len()`.
235        fn advance_to_live_key(&self, index: &mut usize) {
236            while *index < self.keys.len() && !self.live_key(*index) {
237                *index += 1;
238            }
239        }
240
241        /// A value large enough that any `u64` divided by it is less than `capacity`.
242        ///
243        /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one.
244        /// In those cases, we'll return `0` to communicate the exception, for which we should
245        /// just return `0` when announcing a target location (and a zero capacity that we insert
246        /// into becomes a bug).
247        fn divisor_for_capacity(capacity: usize) -> u64 {
248            let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
249            if capacity == 0 || capacity == 1 { 0 }
250            else {
251                ((1 << 63) / capacity) << 1
252            }
253        }
254    }
255
256    /// An immutable collection of update tuples, from a contiguous interval of logical times.
257    ///
258    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
259    /// merge batcher to select.
260    #[derive(Serialize, Deserialize)]
261    #[serde(bound = "
262        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
263        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
264        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
265        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
266        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
267    ")]
268    pub struct RhhValBatch<L: Layout>
269    where
270        layout::Key<L>: Default + HashOrdered,
271    {
272        /// The updates themselves.
273        pub storage: RhhValStorage<L>,
274        /// Description of the update times this layer represents.
275        pub description: Description<layout::Time<L>>,
276        /// The number of updates reflected in the batch.
277        ///
278        /// We track this separately from `storage` because due to the singleton optimization,
279        /// we may have many more updates than `storage.updates.len()`. It should equal that
280        /// length, plus the number of singleton optimizations employed.
281        pub updates: usize,
282    }
283
284    impl<L: Layout> WithLayout for RhhValBatch<L>
285    where
286        layout::Key<L>: Default + HashOrdered,
287        for<'a> layout::KeyRef<'a, L>: HashOrdered,
288    {
289        type Layout = L;
290    }
291
292    impl<L: Layout> BatchReader for RhhValBatch<L>
293    where
294        layout::Key<L>: Default + HashOrdered,
295        for<'a> layout::KeyRef<'a, L>: HashOrdered,
296    {
297        type Cursor = RhhValCursor<L>;
298        fn cursor(&self) -> Self::Cursor {
299            let mut cursor = RhhValCursor {
300                key_cursor: 0,
301                val_cursor: 0,
302                phantom: std::marker::PhantomData,
303            };
304            cursor.step_key(self);
305            cursor
306        }
307        fn len(&self) -> usize {
308            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
309            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
310            self.updates
311        }
312        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
313    }
314
315    impl<L: Layout> Batch for RhhValBatch<L>
316    where
317        layout::Key<L>: Default + HashOrdered,
318        for<'a> layout::KeyRef<'a, L>: HashOrdered,
319    {
320        type Merger = RhhValMerger<L>;
321
322        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
323            RhhValMerger::new(self, other, compaction_frontier)
324        }
325
326        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
327            use timely::progress::Timestamp;
328            Self {
329                storage: RhhValStorage {
330                    keys: L::KeyContainer::with_capacity(0),
331                    keys_offs: L::OffsetContainer::with_capacity(0),
332                    vals: L::ValContainer::with_capacity(0),
333                    vals_offs: L::OffsetContainer::with_capacity(0),
334                    times: L::TimeContainer::with_capacity(0),
335                    diffs: L::DiffContainer::with_capacity(0),
336                    key_count: 0,
337                    key_capacity: 0,
338                    divisor: 0,
339                },
340                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
341                updates: 0,
342            }
343        }
344    }
345
346    /// State for an in-progress merge.
347    pub struct RhhValMerger<L: Layout>
348    where
349        layout::Key<L>: Default + HashOrdered,
350    {
351        /// Key position to merge next in the first batch.
352        key_cursor1: usize,
353        /// Key position to merge next in the second batch.
354        key_cursor2: usize,
355        /// result that we are currently assembling.
356        result: RhhValStorage<L>,
357        /// description
358        description: Description<layout::Time<L>>,
359
360        /// Local stash of updates, to use for consolidation.
361        ///
362        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
363        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
364        update_stash: Vec<(layout::Time<L>, layout::Diff<L>)>,
365        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
366        singletons: usize,
367    }
368
369    impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
370    where
371        layout::Key<L>: Default + HashOrdered,
372        RhhValBatch<L>: Batch<Time=layout::Time<L>>,
373        for<'a> layout::KeyRef<'a, L>: HashOrdered,
374    {
375        fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
376
377            assert!(batch1.upper() == batch2.lower());
378            use crate::lattice::Lattice;
379            let mut since = batch1.description().since().join(batch2.description().since());
380            since = since.join(&compaction_frontier.to_owned());
381
382            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
383
384            // This is a massive overestimate on the number of keys, but we don't have better information.
385            // An over-estimate can be a massive problem as well, with sparse regions being hard to cross.
386            let max_cap = batch1.len() + batch2.len();
387            let rhh_cap = 2 * max_cap;
388
389            let batch1 = &batch1.storage;
390            let batch2 = &batch2.storage;
391
392            let mut storage = RhhValStorage {
393                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
394                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
395                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
396                vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
397                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
398                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
399                key_count: 0,
400                key_capacity: rhh_cap,
401                divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
402            };
403
404            // Mark explicit types because type inference fails to resolve it.
405            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
406            keys_offs.push_ref(0);
407            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
408            vals_offs.push_ref(0);
409
410            RhhValMerger {
411                key_cursor1: 0,
412                key_cursor2: 0,
413                result: storage,
414                description,
415                update_stash: Vec::new(),
416                singletons: 0,
417            }
418        }
419        fn done(self) -> RhhValBatch<L> {
420            RhhValBatch {
421                updates: self.result.times.len() + self.singletons,
422                storage: self.result,
423                description: self.description,
424            }
425        }
426        fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
427
428            // An (incomplete) indication of the amount of work we've done so far.
429            let starting_updates = self.result.times.len();
430            let mut effort = 0isize;
431
432            source1.storage.advance_to_live_key(&mut self.key_cursor1);
433            source2.storage.advance_to_live_key(&mut self.key_cursor2);
434
435            // While both mergees are still active, perform single-key merges.
436            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
437                self.merge_key(&source1.storage, &source2.storage);
438                source1.storage.advance_to_live_key(&mut self.key_cursor1);
439                source2.storage.advance_to_live_key(&mut self.key_cursor2);
440                    // An (incomplete) accounting of the work we've done.
441                effort = (self.result.times.len() - starting_updates) as isize;
442            }
443
444            // Merging is complete, and only copying remains.
445            // Key-by-key copying allows effort interruption, and compaction.
446            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
447                self.copy_key(&source1.storage, self.key_cursor1);
448                self.key_cursor1 += 1;
449                source1.storage.advance_to_live_key(&mut self.key_cursor1);
450                effort = (self.result.times.len() - starting_updates) as isize;
451            }
452            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
453                self.copy_key(&source2.storage, self.key_cursor2);
454                self.key_cursor2 += 1;
455                source2.storage.advance_to_live_key(&mut self.key_cursor2);
456                effort = (self.result.times.len() - starting_updates) as isize;
457            }
458
459            *fuel -= effort;
460        }
461    }
462
463    // Helper methods in support of merging batches.
464    impl<L: Layout> RhhValMerger<L>
465    where
466        layout::Key<L>: Default + HashOrdered,
467        for<'a> layout::KeyRef<'a, L>: HashOrdered,
468    {
469        /// Copy the next key in `source`.
470        ///
471        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
472        /// If the result does not wholly cancel, they key will be present in `self` with the
473        /// compacted values and updates.
474        ///
475        /// The caller should be certain to update the cursor, as this method does not do this.
476        fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
477            // Capture the initial number of values to determine if the merge was ultimately non-empty.
478            let init_vals = self.result.vals.len();
479            let (mut lower, upper) = source.values_for_key(cursor);
480            while lower < upper {
481                self.stash_updates_for_val(source, lower);
482                if let Some(off) = self.consolidate_updates() {
483                    self.result.vals_offs.push_ref(off);
484                    self.result.vals.push_ref(source.vals.index(lower));
485                }
486                lower += 1;
487            }
488
489            // If we have pushed any values, copy the key as well.
490            if self.result.vals.len() > init_vals {
491                self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
492            }
493        }
494        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
495        ///
496        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
497        /// if the updates cancel either directly or after compaction.
498        fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
499
500            use ::std::cmp::Ordering;
501            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
502                Ordering::Less => {
503                    self.copy_key(source1, self.key_cursor1);
504                    self.key_cursor1 += 1;
505                },
506                Ordering::Equal => {
507                    // Keys are equal; must merge all values from both sources for this one key.
508                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
509                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
510                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
511                        self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
512                    }
513                    // Increment cursors in either case; the keys are merged.
514                    self.key_cursor1 += 1;
515                    self.key_cursor2 += 1;
516                },
517                Ordering::Greater => {
518                    self.copy_key(source2, self.key_cursor2);
519                    self.key_cursor2 += 1;
520                },
521            }
522        }
523        /// Merge two ranges of values into `self`.
524        ///
525        /// If the compacted result contains values with non-empty updates, the function returns
526        /// an offset that should be recorded to indicate the upper extent of the result values.
527        fn merge_vals(
528            &mut self,
529            (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
530            (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
531        ) -> Option<usize> {
532            // Capture the initial number of values to determine if the merge was ultimately non-empty.
533            let init_vals = self.result.vals.len();
534            while lower1 < upper1 && lower2 < upper2 {
535                // We compare values, and fold in updates for the lowest values;
536                // if they are non-empty post-consolidation, we write the value.
537                // We could multi-way merge and it wouldn't be very complicated.
538                use ::std::cmp::Ordering;
539                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
540                    Ordering::Less => {
541                        // Extend stash by updates, with logical compaction applied.
542                        self.stash_updates_for_val(source1, lower1);
543                        if let Some(off) = self.consolidate_updates() {
544                            self.result.vals_offs.push_ref(off);
545                            self.result.vals.push_ref(source1.vals.index(lower1));
546                        }
547                        lower1 += 1;
548                    },
549                    Ordering::Equal => {
550                        self.stash_updates_for_val(source1, lower1);
551                        self.stash_updates_for_val(source2, lower2);
552                        if let Some(off) = self.consolidate_updates() {
553                            self.result.vals_offs.push_ref(off);
554                            self.result.vals.push_ref(source1.vals.index(lower1));
555                        }
556                        lower1 += 1;
557                        lower2 += 1;
558                    },
559                    Ordering::Greater => {
560                        // Extend stash by updates, with logical compaction applied.
561                        self.stash_updates_for_val(source2, lower2);
562                        if let Some(off) = self.consolidate_updates() {
563                            self.result.vals_offs.push_ref(off);
564                            self.result.vals.push_ref(source2.vals.index(lower2));
565                        }
566                        lower2 += 1;
567                    },
568                }
569            }
570            // Merging is complete, but we may have remaining elements to push.
571            while lower1 < upper1 {
572                self.stash_updates_for_val(source1, lower1);
573                if let Some(off) = self.consolidate_updates() {
574                    self.result.vals_offs.push_ref(off);
575                    self.result.vals.push_ref(source1.vals.index(lower1));
576                }
577                lower1 += 1;
578            }
579            while lower2 < upper2 {
580                self.stash_updates_for_val(source2, lower2);
581                if let Some(off) = self.consolidate_updates() {
582                    self.result.vals_offs.push_ref(off);
583                    self.result.vals.push_ref(source2.vals.index(lower2));
584                }
585                lower2 += 1;
586            }
587
588            // Values being pushed indicate non-emptiness.
589            if self.result.vals.len() > init_vals {
590                Some(self.result.vals.len())
591            } else {
592                None
593            }
594        }
595
596        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
597        fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
598            let (lower, upper) = source.updates_for_value(index);
599            for i in lower .. upper {
600                // NB: Here is where we would need to look back if `lower == upper`.
601                let time = source.times.index(i);
602                let diff = source.diffs.index(i);
603                let mut new_time = L::TimeContainer::into_owned(time);
604                use crate::lattice::Lattice;
605                new_time.advance_by(self.description.since().borrow());
606                self.update_stash.push((new_time, L::DiffContainer::into_owned(diff)));
607            }
608        }
609
610        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
611        fn consolidate_updates(&mut self) -> Option<usize> {
612            use crate::consolidation;
613            consolidation::consolidate(&mut self.update_stash);
614            if !self.update_stash.is_empty() {
615                // If there is a single element, equal to a just-prior recorded update,
616                // we push nothing and report an unincremented offset to encode this case.
617                let time_diff = self.result.times.last().zip(self.result.diffs.last());
618                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
619                    // TODO: The use of `into_owned` is a work-around for not having reference types.
620                    *t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2)
621                });
622                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
623                    // Just clear out update_stash, as we won't drain it here.
624                    self.update_stash.clear();
625                    self.singletons += 1;
626                }
627                else {
628                    // Conventional; move `update_stash` into `updates`.
629                    for (time, diff) in self.update_stash.drain(..) {
630                        self.result.times.push_own(&time);
631                        self.result.diffs.push_own(&diff);
632                    }
633                }
634                Some(self.result.times.len())
635            } else {
636                None
637            }
638        }
639    }
640
641
642    /// A cursor through a Robin Hood Hashed list of keys, vals, and such.
643    ///
644    /// The important detail is that not all of `keys` represent valid keys.
645    /// We must consult `storage.hashed` to see if the associated data is valid.
646    /// Importantly, we should skip over invalid keys, rather than report them as
647    /// invalid through `key_valid`: that method is meant to indicate the end of
648    /// the cursor, rather than internal state.
649    pub struct RhhValCursor<L: Layout>
650    where
651        layout::Key<L>: Default + HashOrdered,
652    {
653        /// Absolute position of the current key.
654        key_cursor: usize,
655        /// Absolute position of the current value.
656        val_cursor: usize,
657        /// Phantom marker for Rust happiness.
658        phantom: PhantomData<L>,
659    }
660
661    use crate::trace::implementations::WithLayout;
662    impl<L: Layout> WithLayout for RhhValCursor<L>
663    where
664        layout::Key<L>: Default + HashOrdered,
665        for<'a> layout::KeyRef<'a, L>: HashOrdered,
666    {
667        type Layout = L;
668    }
669
670    impl<L: Layout> Cursor for RhhValCursor<L>
671    where
672        layout::Key<L>: Default + HashOrdered,
673        for<'a> layout::KeyRef<'a, L>: HashOrdered,
674    {
675        type Storage = RhhValBatch<L>;
676
677        fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
678        fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
679        fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
680        fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
681        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
682            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
683            for index in lower .. upper {
684                let time = storage.storage.times.index(index);
685                let diff = storage.storage.diffs.index(index);
686                logic(time, diff);
687            }
688        }
689        fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
690        fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
691        fn step_key(&mut self, storage: &RhhValBatch<L>){
692            // We advance the cursor by one for certain, and then as long as we need to find a valid key.
693            self.key_cursor += 1;
694            storage.storage.advance_to_live_key(&mut self.key_cursor);
695
696            if self.key_valid(storage) {
697                self.rewind_vals(storage);
698            }
699            else {
700                self.key_cursor = storage.storage.keys.len();
701            }
702        }
703        fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
704            // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
705            let desired = storage.storage.desired_location(&key);
706            // Advance the cursor, if `desired` is ahead of it.
707            if self.key_cursor < desired {
708                self.key_cursor = desired;
709            }
710            // Advance the cursor as long as we have not found a value greater or equal to `key`.
711            // We may have already passed `key`, and confirmed its absence, but our goal is to
712            // find the next key afterwards so that users can, for example, alternately iterate.
713            while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
714                // TODO: Based on our encoding, we could skip logarithmically over empty regions by galloping
715                //       through `storage.keys_offs`, which stays put for dead space.
716                self.key_cursor += 1;
717            }
718
719            if self.key_valid(storage) {
720                self.rewind_vals(storage);
721            }
722        }
723        fn step_val(&mut self, storage: &RhhValBatch<L>) {
724            self.val_cursor += 1;
725            if !self.val_valid(storage) {
726                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
727            }
728        }
729        fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
730            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
731        }
732        fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
733            self.key_cursor = 0;
734            storage.storage.advance_to_live_key(&mut self.key_cursor);
735
736            if self.key_valid(storage) {
737                self.rewind_vals(storage)
738            }
739        }
740        fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
741            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
742        }
743    }
744
745    /// A builder for creating layers from unsorted update tuples.
746    pub struct RhhValBuilder<L: Layout, CI>
747    where
748        layout::Key<L>: Default + HashOrdered,
749    {
750        result: RhhValStorage<L>,
751        singleton: Option<(layout::Time<L>, layout::Diff<L>)>,
752        /// Counts the number of singleton optimizations we performed.
753        ///
754        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
755        /// even though `updates.len()` may be much shorter than this amount.
756        singletons: usize,
757        _marker: PhantomData<CI>,
758    }
759
760    impl<L: Layout, CI> RhhValBuilder<L, CI>
761    where
762        layout::Key<L>: Default + HashOrdered,
763    {
764        /// Pushes a single update, which may set `self.singleton` rather than push.
765        ///
766        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
767        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
768        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
769        /// This otherwise invalid state encodes "look back one element".
770        ///
771        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
772        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
773        /// The update tuple is retained in `self.singleton` in case we see another update and need
774        /// to recover the singleton to push it into `updates` to join the second update.
775        fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
776            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
777            // TODO: The use of `into_owned` is a bandage for not having references we can compare.
778            if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) {
779                assert!(self.singleton.is_none());
780                self.singleton = Some((time, diff));
781            }
782            else {
783                // If we have pushed a single element, we need to copy it out to meet this one.
784                if let Some((time, diff)) = self.singleton.take() {
785                    self.result.times.push_own(&time);
786                    self.result.diffs.push_own(&diff);
787                }
788                self.result.times.push_own(&time);
789                self.result.diffs.push_own(&diff);
790            }
791        }
792    }
793
794    impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
795    where
796        layout::Key<L>: Default + HashOrdered,
797        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = layout::Key<L>, Time=layout::Time<L>, Diff=layout::Diff<L>>,
798        for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
799        for<'a> layout::KeyRef<'a, L>: HashOrdered,
800    {
801        type Input = CI;
802        type Time = layout::Time<L>;
803        type Output = RhhValBatch<L>;
804
805        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
806
807            // Double the capacity for RHH; probably excessive.
808            let rhh_capacity = 2 * keys;
809            let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
810            // We want some additive slop, in case we spill over.
811            // This number magically chosen based on nothing in particular.
812            // Worst case, we will re-alloc and copy if we spill beyond this.
813            let keys = rhh_capacity + 10;
814
815            // We don't introduce zero offsets as they will be introduced by the first `push` call.
816            Self {
817                result: RhhValStorage {
818                    keys: L::KeyContainer::with_capacity(keys),
819                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
820                    vals: L::ValContainer::with_capacity(vals),
821                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
822                    times: L::TimeContainer::with_capacity(upds),
823                    diffs: L::DiffContainer::with_capacity(upds),
824                    key_count: 0,
825                    key_capacity: rhh_capacity,
826                    divisor,
827                },
828                singleton: None,
829                singletons: 0,
830                _marker: PhantomData,
831            }
832        }
833
834        #[inline]
835        fn push(&mut self, chunk: &mut Self::Input) {
836            for item in chunk.drain() {
837                let (key, val, time, diff) = CI::into_parts(item);
838                // Perhaps this is a continuation of an already received key.
839                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
840                    // Perhaps this is a continuation of an already received value.
841                    if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
842                        self.push_update(time, diff);
843                    } else {
844                        // New value; complete representation of prior value.
845                        self.result.vals_offs.push_ref(self.result.times.len());
846                        if self.singleton.take().is_some() { self.singletons += 1; }
847                        self.push_update(time, diff);
848                        self.result.vals.push_into(val);
849                    }
850                } else {
851                    // New key; complete representation of prior key.
852                    self.result.vals_offs.push_ref(self.result.times.len());
853                    if self.singleton.take().is_some() { self.singletons += 1; }
854                    self.result.keys_offs.push_ref(self.result.vals.len());
855                    self.push_update(time, diff);
856                    self.result.vals.push_into(val);
857                    // Insert the key, but with no specified offset.
858                    self.result.insert_key_own(&key, None);
859                }
860            }
861        }
862
863        #[inline(never)]
864        fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
865            // Record the final offsets
866            self.result.vals_offs.push_ref(self.result.times.len());
867            // Remove any pending singleton, and if it was set increment our count.
868            if self.singleton.take().is_some() { self.singletons += 1; }
869            self.result.keys_offs.push_ref(self.result.vals.len());
870            RhhValBatch {
871                updates: self.result.times.len() + self.singletons,
872                storage: self.result,
873                description,
874            }
875        }
876
877        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
878            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
879            let mut builder = Self::with_capacity(keys, vals, upds);
880            for mut chunk in chain.drain(..) {
881                builder.push(&mut chunk);
882            }
883
884            builder.done(description)
885        }
886    }
887
888}
889
890mod key_batch {
891
892    // Copy the above, once it works!
893
894}