Skip to main content

differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::trace::implementations::chunker::ContainerChunker;
14use crate::trace::implementations::spine_fueled::Spine;
15use crate::trace::implementations::merge_batcher::MergeBatcher;
16use crate::trace::implementations::merge_batcher::container::VecInternalMerger;
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, Vector};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
28/// A builder using ordered lists.
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation using a spine of ordered lists.
35pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
36/// A batcher for ordered lists.
37pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
38/// A builder for ordered lists.
39pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
40
41// /// A trace implementation for empty values using a spine of ordered lists.
42// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43
44pub use layers::{Vals, Upds};
45/// Layers are containers of lists of some type.
46///
47/// The intent is that they "attach" to an outer layer which has as many values
48/// as the layer has lists, thereby associating a list with each outer value.
49/// A sequence of layers, each matching the number of values in its predecessor,
50/// forms a layered trie: a tree with values of some type on nodes at each depth.
51///
52/// We will form tries here by layering `[Keys, Vals, Upds]` or `[Keys, Upds]`.
53pub mod layers {
54
55    use serde::{Deserialize, Serialize};
56    use crate::trace::implementations::BatchContainer;
57
58    /// A container for non-empty lists of values.
59    #[derive(Debug, Serialize, Deserialize)]
60    pub struct Vals<O, V> {
61        /// Offsets used to provide indexes from keys to values.
62        ///
63        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
64        pub offs: O,
65        /// Concatenated ordered lists of values, bracketed by offsets in `offs`.
66        pub vals: V,
67    }
68
69    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
70        fn default() -> Self { Self::with_capacity(0, 0) }
71    }
72
73    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
74        /// Lower and upper bounds in `self.vals` of the indexed list.
75        #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
76            (self.offs.index(index), self.offs.index(index+1))
77        }
78        /// Retrieves a value using relative indexes.
79        ///
80        /// The first index identifies a list, and the second an item within the list.
81        /// The method adds the list's lower bound to the item index, and then calls
82        /// `get_abs`. Using absolute indexes within the list's bounds can be more
83        /// efficient than using relative indexing.
84        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
85            self.get_abs(self.bounds(list_idx).0 + item_idx)
86        }
87
88        /// Number of lists in the container.
89        pub fn len(&self) -> usize { self.offs.len() - 1 }
90        /// Retrieves a value using an absolute rather than relative index.
91        pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
92            self.vals.index(index)
93        }
94        /// Allocates with capacities for a number of lists and values.
95        pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
96            let mut offs = <O as BatchContainer>::with_capacity(o_size);
97            offs.push_ref(0);
98            Self {
99                offs,
100                vals: <V as BatchContainer>::with_capacity(v_size),
101            }
102        }
103        /// Allocates with enough capacity to contain two inputs.
104        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
105            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
106            offs.push_ref(0);
107            Self {
108                offs,
109                vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
110            }
111        }
112    }
113
114    /// A container for non-empty lists of updates.
115    ///
116    /// This container uses the special representiation of an empty slice to stand in for
117    /// "the previous single element". An empty slice is an otherwise invalid representation.
118    #[derive(Debug, Serialize, Deserialize)]
119    pub struct Upds<O, T, D> {
120        /// Offsets used to provide indexes from values to updates.
121        pub offs: O,
122        /// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
123        pub times: T,
124        /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
125        pub diffs: D,
126    }
127
128    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
129        fn default() -> Self { Self::with_capacity(0, 0) }
130    }
131    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
132        /// Lower and upper bounds in `self.times` and `self.diffs` of the indexed list.
133        pub fn bounds(&self, index: usize) -> (usize, usize) {
134            let mut lower = self.offs.index(index);
135            let upper = self.offs.index(index+1);
136            // We use equal lower and upper to encode "singleton update; just before here".
137            // It should only apply when there is a prior element, so `lower` should be greater than zero.
138            if lower == upper {
139                assert!(lower > 0);
140                lower -= 1;
141            }
142            (lower, upper)
143        }
144        /// Retrieves a value using relative indexes.
145        ///
146        /// The first index identifies a list, and the second an item within the list.
147        /// The method adds the list's lower bound to the item index, and then calls
148        /// `get_abs`. Using absolute indexes within the list's bounds can be more
149        /// efficient than using relative indexing.
150        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
151            self.get_abs(self.bounds(list_idx).0 + item_idx)
152        }
153
154        /// Number of lists in the container.
155        pub fn len(&self) -> usize { self.offs.len() - 1 }
156        /// Retrieves a value using an absolute rather than relative index.
157        pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
158            (self.times.index(index), self.diffs.index(index))
159        }
160        /// Allocates with capacities for a number of lists and values.
161        pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
162            let mut offs = <O as BatchContainer>::with_capacity(o_size);
163            offs.push_ref(0);
164            Self {
165                offs,
166                times: <T as BatchContainer>::with_capacity(u_size),
167                diffs: <D as BatchContainer>::with_capacity(u_size),
168            }
169        }
170        /// Allocates with enough capacity to contain two inputs.
171        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
172            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
173            offs.push_ref(0);
174            Self {
175                offs,
176                times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
177                diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
178            }
179        }
180    }
181
182    /// Helper type for constructing `Upds` containers.
183    pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
184        /// Local stash of updates, to use for consolidation.
185        ///
186        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
187        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
188        stash: Vec<(T::Owned, D::Owned)>,
189        /// Total number of consolidated updates.
190        ///
191        /// Tracked independently to account for duplicate compression.
192        total: usize,
193
194        /// Time container to stage singleton times for evaluation.
195        time_con: T,
196        /// Diff container to stage singleton times for evaluation.
197        diff_con: D,
198    }
199
200    impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
201        fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
202    }
203
204
205    impl<T, D> UpdsBuilder<T, D>
206    where
207        T: BatchContainer<Owned: Ord>,
208        D: BatchContainer<Owned: crate::difference::Semigroup>,
209    {
210        /// Stages one update, but does not seal the set of updates.
211        pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
212            self.stash.push((time, diff));
213        }
214
215        /// Consolidate and insert (if non-empty) the stashed updates.
216        ///
217        /// The return indicates whether the results were indeed non-empty.
218        pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
219            use crate::consolidation;
220            consolidation::consolidate(&mut self.stash);
221            // If everything consolidates away, return false.
222            if self.stash.is_empty() { return false; }
223            // If there is a singleton, we may be able to optimize.
224            if self.stash.len() == 1 {
225                let (time, diff) = self.stash.last().unwrap();
226                self.time_con.clear(); self.time_con.push_own(time);
227                self.diff_con.clear(); self.diff_con.push_own(diff);
228                if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
229                    self.total += 1;
230                    self.stash.clear();
231                    upds.offs.push_ref(upds.times.len());
232                    return true;
233                }
234            }
235            // Conventional; move `stash` into `updates`.
236            self.total += self.stash.len();
237            for (time, diff) in self.stash.drain(..) {
238                upds.times.push_own(&time);
239                upds.diffs.push_own(&diff);
240            }
241            upds.offs.push_ref(upds.times.len());
242            true
243        }
244
245        /// Completes the building and returns the total updates sealed.
246        pub fn total(&self) -> usize { self.total }
247    }
248}
249
250/// Types related to forming batches with values.
251pub mod val_batch {
252
253    use std::marker::PhantomData;
254    use serde::{Deserialize, Serialize};
255    use timely::container::PushInto;
256    use timely::progress::{Antichain, frontier::AntichainRef};
257
258    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
259    use crate::trace::implementations::{BatchContainer, BuilderInput};
260    use crate::trace::implementations::layout;
261
262    use super::{Layout, Vals, Upds, layers::UpdsBuilder};
263
264    /// An immutable collection of update tuples, from a contiguous interval of logical times.
265    #[derive(Debug, Serialize, Deserialize)]
266    #[serde(bound = "
267        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
268        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
269        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
270        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
271        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
272    ")]
273    pub struct OrdValStorage<L: Layout> {
274        /// An ordered list of keys.
275        pub keys: L::KeyContainer,
276        /// For each key in `keys`, a list of values.
277        pub vals: Vals<L::OffsetContainer, L::ValContainer>,
278        /// For each val in `vals`, a list of (time, diff) updates.
279        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
280    }
281
282    /// An immutable collection of update tuples, from a contiguous interval of logical times.
283    ///
284    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
285    /// merge batcher to select.
286    #[derive(Serialize, Deserialize)]
287    #[serde(bound = "
288        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
289        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
290        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
291        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
292        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
293    ")]
294    pub struct OrdValBatch<L: Layout> {
295        /// The updates themselves.
296        pub storage: OrdValStorage<L>,
297        /// Description of the update times this layer represents.
298        pub description: Description<layout::Time<L>>,
299        /// The number of updates reflected in the batch.
300        ///
301        /// We track this separately from `storage` because due to the singleton optimization,
302        /// we may have many more updates than `storage.updates.len()`. It should equal that
303        /// length, plus the number of singleton optimizations employed.
304        pub updates: usize,
305    }
306
307    impl<L: Layout> WithLayout for OrdValBatch<L> {
308        type Layout = L;
309    }
310
311    impl<L: Layout> BatchReader for OrdValBatch<L> {
312
313        type Cursor = OrdValCursor<L>;
314        fn cursor(&self) -> Self::Cursor {
315            OrdValCursor {
316                key_cursor: 0,
317                val_cursor: 0,
318                phantom: PhantomData,
319            }
320        }
321        fn len(&self) -> usize {
322            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
323            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
324            self.updates
325        }
326        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
327    }
328
329    impl<L: Layout> Batch for OrdValBatch<L> {
330        type Merger = OrdValMerger<L>;
331
332        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
333            OrdValMerger::new(self, other, compaction_frontier)
334        }
335
336        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
337            use timely::progress::Timestamp;
338            Self {
339                storage: OrdValStorage {
340                    keys: L::KeyContainer::with_capacity(0),
341                    vals: Default::default(),
342                    upds: Default::default(),
343                },
344                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
345                updates: 0,
346            }
347        }
348    }
349
350    /// State for an in-progress merge.
351    pub struct OrdValMerger<L: Layout> {
352        /// Key position to merge next in the first batch.
353        key_cursor1: usize,
354        /// Key position to merge next in the second batch.
355        key_cursor2: usize,
356        /// result that we are currently assembling.
357        result: OrdValStorage<L>,
358        /// description
359        description: Description<layout::Time<L>>,
360        /// Staging area to consolidate owned times and diffs, before sealing.
361        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
362    }
363
364    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
365    where
366        OrdValBatch<L>: Batch<Time=layout::Time<L>>,
367    {
368        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
369
370            assert!(batch1.upper() == batch2.lower());
371            use crate::lattice::Lattice;
372            let mut since = batch1.description().since().join(batch2.description().since());
373            since = since.join(&compaction_frontier.to_owned());
374
375            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
376
377            let batch1 = &batch1.storage;
378            let batch2 = &batch2.storage;
379
380            OrdValMerger {
381                key_cursor1: 0,
382                key_cursor2: 0,
383                result: OrdValStorage {
384                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
385                    vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
386                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
387                },
388                description,
389                staging: UpdsBuilder::default(),
390            }
391        }
392        fn done(self) -> OrdValBatch<L> {
393            OrdValBatch {
394                updates: self.staging.total(),
395                storage: self.result,
396                description: self.description,
397            }
398        }
399        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
400
401            // An (incomplete) indication of the amount of work we've done so far.
402            let starting_updates = self.staging.total();
403            let mut effort = 0isize;
404
405            // While both mergees are still active, perform single-key merges.
406            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
407                self.merge_key(&source1.storage, &source2.storage);
408                // An (incomplete) accounting of the work we've done.
409                effort = (self.staging.total() - starting_updates) as isize;
410            }
411
412            // Merging is complete, and only copying remains.
413            // Key-by-key copying allows effort interruption, and compaction.
414            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
415                self.copy_key(&source1.storage, self.key_cursor1);
416                self.key_cursor1 += 1;
417                effort = (self.staging.total() - starting_updates) as isize;
418            }
419            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
420                self.copy_key(&source2.storage, self.key_cursor2);
421                self.key_cursor2 += 1;
422                effort = (self.staging.total() - starting_updates) as isize;
423            }
424
425            *fuel -= effort;
426        }
427    }
428
429    // Helper methods in support of merging batches.
430    impl<L: Layout> OrdValMerger<L> {
431        /// Copy the next key in `source`.
432        ///
433        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
434        /// If the result does not wholly cancel, they key will be present in `self` with the
435        /// compacted values and updates.
436        ///
437        /// The caller should be certain to update the cursor, as this method does not do this.
438        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
439            // Capture the initial number of values to determine if the merge was ultimately non-empty.
440            let init_vals = self.result.vals.vals.len();
441            let (mut lower, upper) = source.vals.bounds(cursor);
442            while lower < upper {
443                self.stash_updates_for_val(source, lower);
444                if self.staging.seal(&mut self.result.upds) {
445                    self.result.vals.vals.push_ref(source.vals.get_abs(lower));
446                }
447                lower += 1;
448            }
449
450            // If we have pushed any values, copy the key as well.
451            if self.result.vals.vals.len() > init_vals {
452                self.result.keys.push_ref(source.keys.index(cursor));
453                self.result.vals.offs.push_ref(self.result.vals.vals.len());
454            }
455        }
456        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
457        ///
458        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
459        /// if the updates cancel either directly or after compaction.
460        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
461            use ::std::cmp::Ordering;
462            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
463                Ordering::Less => {
464                    self.copy_key(source1, self.key_cursor1);
465                    self.key_cursor1 += 1;
466                },
467                Ordering::Equal => {
468                    // Keys are equal; must merge all values from both sources for this one key.
469                    let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
470                    let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
471                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
472                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
473                        self.result.vals.offs.push_ref(off);
474                    }
475                    // Increment cursors in either case; the keys are merged.
476                    self.key_cursor1 += 1;
477                    self.key_cursor2 += 1;
478                },
479                Ordering::Greater => {
480                    self.copy_key(source2, self.key_cursor2);
481                    self.key_cursor2 += 1;
482                },
483            }
484        }
485        /// Merge two ranges of values into `self`.
486        ///
487        /// If the compacted result contains values with non-empty updates, the function returns
488        /// an offset that should be recorded to indicate the upper extent of the result values.
489        fn merge_vals(
490            &mut self,
491            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
492            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
493        ) -> Option<usize> {
494            // Capture the initial number of values to determine if the merge was ultimately non-empty.
495            let init_vals = self.result.vals.vals.len();
496            while lower1 < upper1 && lower2 < upper2 {
497                // We compare values, and fold in updates for the lowest values;
498                // if they are non-empty post-consolidation, we write the value.
499                // We could multi-way merge and it wouldn't be very complicated.
500                use ::std::cmp::Ordering;
501                match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
502                    Ordering::Less => {
503                        // Extend stash by updates, with logical compaction applied.
504                        self.stash_updates_for_val(source1, lower1);
505                        if self.staging.seal(&mut self.result.upds) {
506                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
507                        }
508                        lower1 += 1;
509                    },
510                    Ordering::Equal => {
511                        self.stash_updates_for_val(source1, lower1);
512                        self.stash_updates_for_val(source2, lower2);
513                        if self.staging.seal(&mut self.result.upds) {
514                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
515                        }
516                        lower1 += 1;
517                        lower2 += 1;
518                    },
519                    Ordering::Greater => {
520                        // Extend stash by updates, with logical compaction applied.
521                        self.stash_updates_for_val(source2, lower2);
522                        if self.staging.seal(&mut self.result.upds) {
523                            self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
524                        }
525                        lower2 += 1;
526                    },
527                }
528            }
529            // Merging is complete, but we may have remaining elements to push.
530            while lower1 < upper1 {
531                self.stash_updates_for_val(source1, lower1);
532                if self.staging.seal(&mut self.result.upds) {
533                    self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
534                }
535                lower1 += 1;
536            }
537            while lower2 < upper2 {
538                self.stash_updates_for_val(source2, lower2);
539                if self.staging.seal(&mut self.result.upds) {
540                    self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
541                }
542                lower2 += 1;
543            }
544
545            // Values being pushed indicate non-emptiness.
546            if self.result.vals.vals.len() > init_vals {
547                Some(self.result.vals.vals.len())
548            } else {
549                None
550            }
551        }
552
553        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
554        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
555            let (lower, upper) = source.upds.bounds(index);
556            for i in lower .. upper {
557                // NB: Here is where we would need to look back if `lower == upper`.
558                let (time, diff) = source.upds.get_abs(i);
559                use crate::lattice::Lattice;
560                let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
561                new_time.advance_by(self.description.since().borrow());
562                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
563            }
564        }
565    }
566
567    /// A cursor for navigating a single layer.
568    pub struct OrdValCursor<L: Layout> {
569        /// Absolute position of the current key.
570        key_cursor: usize,
571        /// Absolute position of the current value.
572        val_cursor: usize,
573        /// Phantom marker for Rust happiness.
574        phantom: PhantomData<L>,
575    }
576
577    use crate::trace::implementations::WithLayout;
578    impl<L: Layout> WithLayout for OrdValCursor<L> {
579        type Layout = L;
580    }
581
582    impl<L: Layout> Cursor for OrdValCursor<L> {
583
584        type Storage = OrdValBatch<L>;
585
586        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
587        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
588
589        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
590        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
591        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
592            let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
593            for index in lower .. upper {
594                let (time, diff) = storage.storage.upds.get_abs(index);
595                logic(time, diff);
596            }
597        }
598        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
599        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
600        fn step_key(&mut self, storage: &OrdValBatch<L>){
601            self.key_cursor += 1;
602            if self.key_valid(storage) {
603                self.rewind_vals(storage);
604            }
605            else {
606                self.key_cursor = storage.storage.keys.len();
607            }
608        }
609        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
610            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
611            if self.key_valid(storage) {
612                self.rewind_vals(storage);
613            }
614        }
615        fn step_val(&mut self, storage: &OrdValBatch<L>) {
616            self.val_cursor += 1;
617            if !self.val_valid(storage) {
618                self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
619            }
620        }
621        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
622            self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
623        }
624        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
625            self.key_cursor = 0;
626            if self.key_valid(storage) {
627                self.rewind_vals(storage)
628            }
629        }
630        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
631            self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
632        }
633    }
634
635    /// A builder for creating layers from unsorted update tuples.
636    pub struct OrdValBuilder<L: Layout, CI> {
637        /// The in-progress result.
638        ///
639        /// This is public to allow container implementors to set and inspect their container.
640        pub result: OrdValStorage<L>,
641        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
642        _marker: PhantomData<CI>,
643    }
644
645    impl<L, CI> Builder for OrdValBuilder<L, CI>
646    where
647        L: for<'a> Layout<
648            KeyContainer: PushInto<CI::Key<'a>>,
649            ValContainer: PushInto<CI::Val<'a>>,
650        >,
651        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
652    {
653
654        type Input = CI;
655        type Time = layout::Time<L>;
656        type Output = OrdValBatch<L>;
657
658        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
659            Self {
660                result: OrdValStorage {
661                    keys: L::KeyContainer::with_capacity(keys),
662                    vals: Vals::with_capacity(keys + 1, vals),
663                    upds: Upds::with_capacity(vals + 1, upds),
664                },
665                staging: UpdsBuilder::default(),
666                _marker: PhantomData,
667            }
668        }
669
670        #[inline]
671        fn push(&mut self, chunk: &mut Self::Input) {
672            for item in chunk.drain() {
673                let (key, val, time, diff) = CI::into_parts(item);
674
675                // Pre-load the first update.
676                if self.result.keys.is_empty() {
677                    self.result.vals.vals.push_into(val);
678                    self.result.keys.push_into(key);
679                    self.staging.push(time, diff);
680                }
681                // Perhaps this is a continuation of an already received key.
682                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
683                    // Perhaps this is a continuation of an already received value.
684                    if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
685                        self.staging.push(time, diff);
686                    } else {
687                        // New value; complete representation of prior value.
688                        self.staging.seal(&mut self.result.upds);
689                        self.staging.push(time, diff);
690                        self.result.vals.vals.push_into(val);
691                    }
692                } else {
693                    // New key; complete representation of prior key.
694                    self.staging.seal(&mut self.result.upds);
695                    self.staging.push(time, diff);
696                    self.result.vals.offs.push_ref(self.result.vals.vals.len());
697                    self.result.vals.vals.push_into(val);
698                    self.result.keys.push_into(key);
699                }
700            }
701        }
702
703        #[inline(never)]
704        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
705            self.staging.seal(&mut self.result.upds);
706            self.result.vals.offs.push_ref(self.result.vals.vals.len());
707            OrdValBatch {
708                updates: self.staging.total(),
709                storage: self.result,
710                description,
711            }
712        }
713
714        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
715            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
716            let mut builder = Self::with_capacity(keys, vals, upds);
717            for mut chunk in chain.drain(..) {
718                builder.push(&mut chunk);
719            }
720
721            builder.done(description)
722        }
723    }
724}
725
726/// Types related to forming batches of keys.
727pub mod key_batch {
728
729    use std::marker::PhantomData;
730    use serde::{Deserialize, Serialize};
731    use timely::container::PushInto;
732    use timely::progress::{Antichain, frontier::AntichainRef};
733
734    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
735    use crate::trace::implementations::{BatchContainer, BuilderInput};
736    use crate::trace::implementations::layout;
737
738    use super::{Layout, Upds, layers::UpdsBuilder};
739
740    /// An immutable collection of update tuples, from a contiguous interval of logical times.
741    #[derive(Debug, Serialize, Deserialize)]
742    #[serde(bound = "
743        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
744        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
745        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
746        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
747    ")]
748    pub struct OrdKeyStorage<L: Layout> {
749        /// An ordered list of keys, corresponding to entries in `keys_offs`.
750        pub keys: L::KeyContainer,
751        /// For each key in `keys`, a list of (time, diff) updates.
752        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
753    }
754
755    /// An immutable collection of update tuples, from a contiguous interval of logical times.
756    ///
757    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
758    /// merge batcher to select.
759    #[derive(Serialize, Deserialize)]
760    #[serde(bound = "
761        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
762        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
763        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
764        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
765        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
766    ")]
767    pub struct OrdKeyBatch<L: Layout> {
768        /// The updates themselves.
769        pub storage: OrdKeyStorage<L>,
770        /// Description of the update times this layer represents.
771        pub description: Description<layout::Time<L>>,
772        /// The number of updates reflected in the batch.
773        ///
774        /// We track this separately from `storage` because due to the singleton optimization,
775        /// we may have many more updates than `storage.updates.len()`. It should equal that
776        /// length, plus the number of singleton optimizations employed.
777        pub updates: usize,
778
779        /// Single value to return if asked.
780        pub value: L::ValContainer,
781    }
782
783    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
784        /// Creates a container with one value, to slot in to `self.value`.
785        pub fn create_value() -> L::ValContainer {
786            let mut value = L::ValContainer::with_capacity(1);
787            value.push_own(&Default::default());
788            value
789        }
790    }
791
792    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
793        type Layout = L;
794    }
795
796    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
797
798        type Cursor = OrdKeyCursor<L>;
799        fn cursor(&self) -> Self::Cursor {
800            OrdKeyCursor {
801                key_cursor: 0,
802                val_stepped: false,
803                phantom: std::marker::PhantomData,
804            }
805        }
806        fn len(&self) -> usize {
807            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
808            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
809            self.updates
810        }
811        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
812    }
813
814    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
815        type Merger = OrdKeyMerger<L>;
816
817        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
818            OrdKeyMerger::new(self, other, compaction_frontier)
819        }
820
821        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
822            use timely::progress::Timestamp;
823            Self {
824                storage: OrdKeyStorage {
825                    keys: L::KeyContainer::with_capacity(0),
826                    upds: Upds::default(),
827                },
828                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
829                updates: 0,
830                value: Self::create_value(),
831            }
832        }
833    }
834
835    /// State for an in-progress merge.
836    pub struct OrdKeyMerger<L: Layout> {
837        /// Key position to merge next in the first batch.
838        key_cursor1: usize,
839        /// Key position to merge next in the second batch.
840        key_cursor2: usize,
841        /// result that we are currently assembling.
842        result: OrdKeyStorage<L>,
843        /// description
844        description: Description<layout::Time<L>>,
845
846        /// Local stash of updates, to use for consolidation.
847        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
848    }
849
850    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
851    where
852        OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
853    {
854        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
855
856            assert!(batch1.upper() == batch2.lower());
857            use crate::lattice::Lattice;
858            let mut since = batch1.description().since().join(batch2.description().since());
859            since = since.join(&compaction_frontier.to_owned());
860
861            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
862
863            let batch1 = &batch1.storage;
864            let batch2 = &batch2.storage;
865
866            OrdKeyMerger {
867                key_cursor1: 0,
868                key_cursor2: 0,
869                result: OrdKeyStorage {
870                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
871                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
872                },
873                description,
874                staging: UpdsBuilder::default(),
875            }
876        }
877        fn done(self) -> OrdKeyBatch<L> {
878            OrdKeyBatch {
879                updates: self.staging.total(),
880                storage: self.result,
881                description: self.description,
882                value: OrdKeyBatch::<L>::create_value(),
883            }
884        }
885        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
886
887            // An (incomplete) indication of the amount of work we've done so far.
888            let starting_updates = self.staging.total();
889            let mut effort = 0isize;
890
891            // While both mergees are still active, perform single-key merges.
892            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
893                self.merge_key(&source1.storage, &source2.storage);
894                // An (incomplete) accounting of the work we've done.
895                effort = (self.staging.total() - starting_updates) as isize;
896            }
897
898            // Merging is complete, and only copying remains.
899            // Key-by-key copying allows effort interruption, and compaction.
900            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
901                self.copy_key(&source1.storage, self.key_cursor1);
902                self.key_cursor1 += 1;
903                effort = (self.staging.total() - starting_updates) as isize;
904            }
905            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
906                self.copy_key(&source2.storage, self.key_cursor2);
907                self.key_cursor2 += 1;
908                effort = (self.staging.total() - starting_updates) as isize;
909            }
910
911            *fuel -= effort;
912        }
913    }
914
915    // Helper methods in support of merging batches.
916    impl<L: Layout> OrdKeyMerger<L> {
917        /// Copy the next key in `source`.
918        ///
919        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
920        /// If the result does not wholly cancel, they key will be present in `self` with the
921        /// compacted values and updates.
922        ///
923        /// The caller should be certain to update the cursor, as this method does not do this.
924        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
925            self.stash_updates_for_key(source, cursor);
926            if self.staging.seal(&mut self.result.upds) {
927                self.result.keys.push_ref(source.keys.index(cursor));
928            }
929        }
930        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
931        ///
932        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
933        /// if the updates cancel either directly or after compaction.
934        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
935            use ::std::cmp::Ordering;
936            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
937                Ordering::Less => {
938                    self.copy_key(source1, self.key_cursor1);
939                    self.key_cursor1 += 1;
940                },
941                Ordering::Equal => {
942                    // Keys are equal; must merge all updates from both sources for this one key.
943                    self.stash_updates_for_key(source1, self.key_cursor1);
944                    self.stash_updates_for_key(source2, self.key_cursor2);
945                    if self.staging.seal(&mut self.result.upds) {
946                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
947                    }
948                    // Increment cursors in either case; the keys are merged.
949                    self.key_cursor1 += 1;
950                    self.key_cursor2 += 1;
951                },
952                Ordering::Greater => {
953                    self.copy_key(source2, self.key_cursor2);
954                    self.key_cursor2 += 1;
955                },
956            }
957        }
958
959        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
960        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
961            let (lower, upper) = source.upds.bounds(index);
962            for i in lower .. upper {
963                // NB: Here is where we would need to look back if `lower == upper`.
964                let (time, diff) = source.upds.get_abs(i);
965                use crate::lattice::Lattice;
966                let mut new_time = L::TimeContainer::into_owned(time);
967                new_time.advance_by(self.description.since().borrow());
968                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
969            }
970        }
971    }
972
973    /// A cursor for navigating a single layer.
974    pub struct OrdKeyCursor<L: Layout> {
975        /// Absolute position of the current key.
976        key_cursor: usize,
977        /// If the value has been stepped for the key, there are no more values.
978        val_stepped: bool,
979        /// Phantom marker for Rust happiness.
980        phantom: PhantomData<L>,
981    }
982
983    use crate::trace::implementations::WithLayout;
984    impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
985        type Layout = L;
986    }
987
988    impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
989
990        type Storage = OrdKeyBatch<L>;
991
992        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
993        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
994
995        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
996        fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
997        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
998            let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
999            for index in lower .. upper {
1000                let (time, diff) = storage.storage.upds.get_abs(index);
1001                logic(time, diff);
1002            }
1003        }
1004        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1005        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1006        fn step_key(&mut self, storage: &Self::Storage){
1007            self.key_cursor += 1;
1008            if self.key_valid(storage) {
1009                self.rewind_vals(storage);
1010            }
1011            else {
1012                self.key_cursor = storage.storage.keys.len();
1013            }
1014        }
1015        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1016            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1017            if self.key_valid(storage) {
1018                self.rewind_vals(storage);
1019            }
1020        }
1021        fn step_val(&mut self, _storage: &Self::Storage) {
1022            self.val_stepped = true;
1023        }
1024        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1025        fn rewind_keys(&mut self, storage: &Self::Storage) {
1026            self.key_cursor = 0;
1027            if self.key_valid(storage) {
1028                self.rewind_vals(storage)
1029            }
1030        }
1031        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1032            self.val_stepped = false;
1033        }
1034    }
1035
1036    /// A builder for creating layers from unsorted update tuples.
1037    pub struct OrdKeyBuilder<L: Layout, CI> {
1038        /// The in-progress result.
1039        ///
1040        /// This is public to allow container implementors to set and inspect their container.
1041        pub result: OrdKeyStorage<L>,
1042        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1043        _marker: PhantomData<CI>,
1044    }
1045
1046    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1047    where
1048        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1049        L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1050        CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1051    {
1052
1053        type Input = CI;
1054        type Time = layout::Time<L>;
1055        type Output = OrdKeyBatch<L>;
1056
1057        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1058            Self {
1059                result: OrdKeyStorage {
1060                    keys: L::KeyContainer::with_capacity(keys),
1061                    upds: Upds::with_capacity(keys+1, upds),
1062                },
1063                staging: UpdsBuilder::default(),
1064                _marker: PhantomData,
1065            }
1066        }
1067
1068        #[inline]
1069        fn push(&mut self, chunk: &mut Self::Input) {
1070            for item in chunk.drain() {
1071                let (key, _val, time, diff) = CI::into_parts(item);
1072                if self.result.keys.is_empty() {
1073                    self.result.keys.push_into(key);
1074                    self.staging.push(time, diff);
1075                }
1076                // Perhaps this is a continuation of an already received key.
1077                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1078                    self.staging.push(time, diff);
1079                } else {
1080                    self.staging.seal(&mut self.result.upds);
1081                    self.staging.push(time, diff);
1082                    self.result.keys.push_into(key);
1083                }
1084            }
1085        }
1086
1087        #[inline(never)]
1088        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1089            self.staging.seal(&mut self.result.upds);
1090            OrdKeyBatch {
1091                updates: self.staging.total(),
1092                storage: self.result,
1093                description,
1094                value: OrdKeyBatch::<L>::create_value(),
1095            }
1096        }
1097
1098        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1099            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1100            let mut builder = Self::with_capacity(keys, vals, upds);
1101            for mut chunk in chain.drain(..) {
1102                builder.push(&mut chunk);
1103            }
1104
1105            builder.done(description)
1106        }
1107    }
1108
1109}