Skip to main content

differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::trace::implementations::spine_fueled::Spine;
14use crate::trace::implementations::merge_batcher::MergeBatcher;
15use crate::trace::implementations::merge_batcher::vec::VecMerger;
16use crate::trace::rc_blanket_impls::RcBuilder;
17
18use super::{Layout, Vector};
19
20pub use self::val_batch::{OrdValBatch, OrdValBuilder};
21pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
22
23/// A trace implementation using a spine of ordered lists.
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
25/// A batcher using ordered lists.
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<VecMerger<(K, V), T, R>>;
27/// A builder using ordered lists.
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
29
30/// A trace implementation using a spine of ordered lists.
31pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
32/// A batcher for ordered lists.
33pub type OrdKeyBatcher<K, T, R> = MergeBatcher<VecMerger<(K, ()), T, R>>;
34/// A builder for ordered lists.
35pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
36
37pub use layers::{Vals, Upds};
38/// Layers are containers of lists of some type.
39///
40/// The intent is that they "attach" to an outer layer which has as many values
41/// as the layer has lists, thereby associating a list with each outer value.
42/// A sequence of layers, each matching the number of values in its predecessor,
43/// forms a layered trie: a tree with values of some type on nodes at each depth.
44///
45/// We will form tries here by layering `[Keys, Vals, Upds]` or `[Keys, Upds]`.
46pub mod layers {
47
48    use serde::{Deserialize, Serialize};
49    use crate::trace::implementations::BatchContainer;
50
51    /// A container for non-empty lists of values.
52    #[derive(Debug, Serialize, Deserialize)]
53    pub struct Vals<O, V> {
54        /// Offsets used to provide indexes from keys to values.
55        ///
56        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
57        pub offs: O,
58        /// Concatenated ordered lists of values, bracketed by offsets in `offs`.
59        pub vals: V,
60    }
61
62    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
63        fn default() -> Self { Self::with_capacity(0, 0) }
64    }
65
66    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
67        /// Lower and upper bounds in `self.vals` of the indexed list.
68        #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
69            (self.offs.index(index), self.offs.index(index+1))
70        }
71        /// Retrieves a value using relative indexes.
72        ///
73        /// The first index identifies a list, and the second an item within the list.
74        /// The method adds the list's lower bound to the item index, and then calls
75        /// `get_abs`. Using absolute indexes within the list's bounds can be more
76        /// efficient than using relative indexing.
77        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
78            self.get_abs(self.bounds(list_idx).0 + item_idx)
79        }
80
81        /// Number of lists in the container.
82        pub fn len(&self) -> usize { self.offs.len() - 1 }
83        /// Retrieves a value using an absolute rather than relative index.
84        pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
85            self.vals.index(index)
86        }
87        /// Allocates with capacities for a number of lists and values.
88        pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
89            let mut offs = <O as BatchContainer>::with_capacity(o_size);
90            offs.push_ref(0);
91            Self {
92                offs,
93                vals: <V as BatchContainer>::with_capacity(v_size),
94            }
95        }
96        /// Allocates with enough capacity to contain two inputs.
97        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
98            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
99            offs.push_ref(0);
100            Self {
101                offs,
102                vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
103            }
104        }
105    }
106
107    /// A container for non-empty lists of updates.
108    ///
109    /// This container uses the special representiation of an empty slice to stand in for
110    /// "the previous single element". An empty slice is an otherwise invalid representation.
111    #[derive(Debug, Serialize, Deserialize)]
112    pub struct Upds<O, T, D> {
113        /// Offsets used to provide indexes from values to updates.
114        pub offs: O,
115        /// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
116        pub times: T,
117        /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
118        pub diffs: D,
119    }
120
121    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
122        fn default() -> Self { Self::with_capacity(0, 0) }
123    }
124    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
125        /// Lower and upper bounds in `self.times` and `self.diffs` of the indexed list.
126        pub fn bounds(&self, index: usize) -> (usize, usize) {
127            let mut lower = self.offs.index(index);
128            let upper = self.offs.index(index+1);
129            // We use equal lower and upper to encode "singleton update; just before here".
130            // It should only apply when there is a prior element, so `lower` should be greater than zero.
131            if lower == upper {
132                assert!(lower > 0);
133                lower -= 1;
134            }
135            (lower, upper)
136        }
137        /// Retrieves a value using relative indexes.
138        ///
139        /// The first index identifies a list, and the second an item within the list.
140        /// The method adds the list's lower bound to the item index, and then calls
141        /// `get_abs`. Using absolute indexes within the list's bounds can be more
142        /// efficient than using relative indexing.
143        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
144            self.get_abs(self.bounds(list_idx).0 + item_idx)
145        }
146
147        /// Number of lists in the container.
148        pub fn len(&self) -> usize { self.offs.len() - 1 }
149        /// Retrieves a value using an absolute rather than relative index.
150        pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
151            (self.times.index(index), self.diffs.index(index))
152        }
153        /// Allocates with capacities for a number of lists and values.
154        pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
155            let mut offs = <O as BatchContainer>::with_capacity(o_size);
156            offs.push_ref(0);
157            Self {
158                offs,
159                times: <T as BatchContainer>::with_capacity(u_size),
160                diffs: <D as BatchContainer>::with_capacity(u_size),
161            }
162        }
163        /// Allocates with enough capacity to contain two inputs.
164        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
165            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
166            offs.push_ref(0);
167            Self {
168                offs,
169                times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
170                diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
171            }
172        }
173    }
174
175    /// Helper type for constructing `Upds` containers.
176    pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
177        /// Local stash of updates, to use for consolidation.
178        ///
179        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
180        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
181        stash: Vec<(T::Owned, D::Owned)>,
182        /// Total number of consolidated updates.
183        ///
184        /// Tracked independently to account for duplicate compression.
185        total: usize,
186
187        /// Time container to stage singleton times for evaluation.
188        time_con: T,
189        /// Diff container to stage singleton times for evaluation.
190        diff_con: D,
191    }
192
193    impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
194        fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
195    }
196
197
198    impl<T, D> UpdsBuilder<T, D>
199    where
200        T: BatchContainer<Owned: Ord>,
201        D: BatchContainer<Owned: crate::difference::Semigroup>,
202    {
203        /// Stages one update, but does not seal the set of updates.
204        pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
205            self.stash.push((time, diff));
206        }
207
208        /// Consolidate and insert (if non-empty) the stashed updates.
209        ///
210        /// The return indicates whether the results were indeed non-empty.
211        pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
212            use crate::consolidation;
213            consolidation::consolidate(&mut self.stash);
214            // If everything consolidates away, return false.
215            if self.stash.is_empty() { return false; }
216            // If there is a singleton, we may be able to optimize.
217            if self.stash.len() == 1 {
218                let (time, diff) = self.stash.last().unwrap();
219                self.time_con.clear(); self.time_con.push_own(time);
220                self.diff_con.clear(); self.diff_con.push_own(diff);
221                if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
222                    self.total += 1;
223                    self.stash.clear();
224                    upds.offs.push_ref(upds.times.len());
225                    return true;
226                }
227            }
228            // Conventional; move `stash` into `updates`.
229            self.total += self.stash.len();
230            for (time, diff) in self.stash.drain(..) {
231                upds.times.push_own(&time);
232                upds.diffs.push_own(&diff);
233            }
234            upds.offs.push_ref(upds.times.len());
235            true
236        }
237
238        /// Completes the building and returns the total updates sealed.
239        pub fn total(&self) -> usize { self.total }
240    }
241}
242
243/// Types related to forming batches with values.
244pub mod val_batch {
245
246    use std::marker::PhantomData;
247    use serde::{Deserialize, Serialize};
248    use timely::container::PushInto;
249    use timely::progress::{Antichain, frontier::AntichainRef};
250
251    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
252    use crate::trace::implementations::{BatchContainer, BuilderInput};
253    use crate::trace::implementations::layout;
254
255    use super::{Layout, Vals, Upds, layers::UpdsBuilder};
256
257    /// An immutable collection of update tuples, from a contiguous interval of logical times.
258    #[derive(Debug, Serialize, Deserialize)]
259    #[serde(bound = "
260        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
261        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
262        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
263        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
264        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
265    ")]
266    pub struct OrdValStorage<L: Layout> {
267        /// An ordered list of keys.
268        pub keys: L::KeyContainer,
269        /// For each key in `keys`, a list of values.
270        pub vals: Vals<L::OffsetContainer, L::ValContainer>,
271        /// For each val in `vals`, a list of (time, diff) updates.
272        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
273    }
274
275    /// An immutable collection of update tuples, from a contiguous interval of logical times.
276    ///
277    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
278    /// merge batcher to select.
279    #[derive(Serialize, Deserialize)]
280    #[serde(bound = "
281        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
282        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
283        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
284        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
285        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
286    ")]
287    pub struct OrdValBatch<L: Layout> {
288        /// The updates themselves.
289        pub storage: OrdValStorage<L>,
290        /// Description of the update times this layer represents.
291        pub description: Description<layout::Time<L>>,
292        /// The number of updates reflected in the batch.
293        ///
294        /// We track this separately from `storage` because due to the singleton optimization,
295        /// we may have many more updates than `storage.updates.len()`. It should equal that
296        /// length, plus the number of singleton optimizations employed.
297        pub updates: usize,
298    }
299
300    impl<L: Layout> WithLayout for OrdValBatch<L> {
301        type Layout = L;
302    }
303
304    impl<L: Layout> BatchReader for OrdValBatch<L> {
305
306        type Cursor = OrdValCursor<L>;
307        fn cursor(&self) -> Self::Cursor {
308            OrdValCursor {
309                key_cursor: 0,
310                val_cursor: 0,
311                phantom: PhantomData,
312            }
313        }
314        fn len(&self) -> usize {
315            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
316            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
317            self.updates
318        }
319        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
320    }
321
322    impl<L: Layout> Batch for OrdValBatch<L> {
323        type Merger = OrdValMerger<L>;
324
325        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
326            OrdValMerger::new(self, other, compaction_frontier)
327        }
328
329        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
330            use timely::progress::Timestamp;
331            Self {
332                storage: OrdValStorage {
333                    keys: L::KeyContainer::with_capacity(0),
334                    vals: Default::default(),
335                    upds: Default::default(),
336                },
337                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
338                updates: 0,
339            }
340        }
341    }
342
343    /// State for an in-progress merge.
344    pub struct OrdValMerger<L: Layout> {
345        /// Key position to merge next in the first batch.
346        key_cursor1: usize,
347        /// Key position to merge next in the second batch.
348        key_cursor2: usize,
349        /// result that we are currently assembling.
350        result: OrdValStorage<L>,
351        /// description
352        description: Description<layout::Time<L>>,
353        /// Staging area to consolidate owned times and diffs, before sealing.
354        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
355    }
356
357    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
358    where
359        OrdValBatch<L>: Batch<Time=layout::Time<L>>,
360    {
361        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
362
363            assert!(batch1.upper() == batch2.lower());
364            use crate::lattice::Lattice;
365            let mut since = batch1.description().since().join(batch2.description().since());
366            since = since.join(&compaction_frontier.to_owned());
367
368            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
369
370            let batch1 = &batch1.storage;
371            let batch2 = &batch2.storage;
372
373            OrdValMerger {
374                key_cursor1: 0,
375                key_cursor2: 0,
376                result: OrdValStorage {
377                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
378                    vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
379                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
380                },
381                description,
382                staging: UpdsBuilder::default(),
383            }
384        }
385        fn done(self) -> OrdValBatch<L> {
386            OrdValBatch {
387                updates: self.staging.total(),
388                storage: self.result,
389                description: self.description,
390            }
391        }
392        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
393
394            // An (incomplete) indication of the amount of work we've done so far.
395            let starting_updates = self.staging.total();
396            let mut effort = 0isize;
397
398            // While both mergees are still active, perform single-key merges.
399            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
400                self.merge_key(&source1.storage, &source2.storage);
401                // An (incomplete) accounting of the work we've done.
402                effort = (self.staging.total() - starting_updates) as isize;
403            }
404
405            // Merging is complete, and only copying remains.
406            // Key-by-key copying allows effort interruption, and compaction.
407            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
408                self.copy_key(&source1.storage, self.key_cursor1);
409                self.key_cursor1 += 1;
410                effort = (self.staging.total() - starting_updates) as isize;
411            }
412            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
413                self.copy_key(&source2.storage, self.key_cursor2);
414                self.key_cursor2 += 1;
415                effort = (self.staging.total() - starting_updates) as isize;
416            }
417
418            *fuel -= effort;
419        }
420    }
421
422    // Helper methods in support of merging batches.
423    impl<L: Layout> OrdValMerger<L> {
424        /// Copy the next key in `source`.
425        ///
426        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
427        /// If the result does not wholly cancel, they key will be present in `self` with the
428        /// compacted values and updates.
429        ///
430        /// The caller should be certain to update the cursor, as this method does not do this.
431        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
432            // Capture the initial number of values to determine if the merge was ultimately non-empty.
433            let init_vals = self.result.vals.vals.len();
434            let (mut lower, upper) = source.vals.bounds(cursor);
435            while lower < upper {
436                self.stash_updates_for_val(source, lower);
437                if self.staging.seal(&mut self.result.upds) {
438                    self.result.vals.vals.push_ref(source.vals.get_abs(lower));
439                }
440                lower += 1;
441            }
442
443            // If we have pushed any values, copy the key as well.
444            if self.result.vals.vals.len() > init_vals {
445                self.result.keys.push_ref(source.keys.index(cursor));
446                self.result.vals.offs.push_ref(self.result.vals.vals.len());
447            }
448        }
449        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
450        ///
451        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
452        /// if the updates cancel either directly or after compaction.
453        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
454            use ::std::cmp::Ordering;
455            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
456                Ordering::Less => {
457                    self.copy_key(source1, self.key_cursor1);
458                    self.key_cursor1 += 1;
459                },
460                Ordering::Equal => {
461                    // Keys are equal; must merge all values from both sources for this one key.
462                    let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
463                    let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
464                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
465                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
466                        self.result.vals.offs.push_ref(off);
467                    }
468                    // Increment cursors in either case; the keys are merged.
469                    self.key_cursor1 += 1;
470                    self.key_cursor2 += 1;
471                },
472                Ordering::Greater => {
473                    self.copy_key(source2, self.key_cursor2);
474                    self.key_cursor2 += 1;
475                },
476            }
477        }
478        /// Merge two ranges of values into `self`.
479        ///
480        /// If the compacted result contains values with non-empty updates, the function returns
481        /// an offset that should be recorded to indicate the upper extent of the result values.
482        fn merge_vals(
483            &mut self,
484            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
485            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
486        ) -> Option<usize> {
487            // Capture the initial number of values to determine if the merge was ultimately non-empty.
488            let init_vals = self.result.vals.vals.len();
489            while lower1 < upper1 && lower2 < upper2 {
490                // We compare values, and fold in updates for the lowest values;
491                // if they are non-empty post-consolidation, we write the value.
492                // We could multi-way merge and it wouldn't be very complicated.
493                use ::std::cmp::Ordering;
494                match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
495                    Ordering::Less => {
496                        // Extend stash by updates, with logical compaction applied.
497                        self.stash_updates_for_val(source1, lower1);
498                        if self.staging.seal(&mut self.result.upds) {
499                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
500                        }
501                        lower1 += 1;
502                    },
503                    Ordering::Equal => {
504                        self.stash_updates_for_val(source1, lower1);
505                        self.stash_updates_for_val(source2, lower2);
506                        if self.staging.seal(&mut self.result.upds) {
507                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
508                        }
509                        lower1 += 1;
510                        lower2 += 1;
511                    },
512                    Ordering::Greater => {
513                        // Extend stash by updates, with logical compaction applied.
514                        self.stash_updates_for_val(source2, lower2);
515                        if self.staging.seal(&mut self.result.upds) {
516                            self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
517                        }
518                        lower2 += 1;
519                    },
520                }
521            }
522            // Merging is complete, but we may have remaining elements to push.
523            while lower1 < upper1 {
524                self.stash_updates_for_val(source1, lower1);
525                if self.staging.seal(&mut self.result.upds) {
526                    self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
527                }
528                lower1 += 1;
529            }
530            while lower2 < upper2 {
531                self.stash_updates_for_val(source2, lower2);
532                if self.staging.seal(&mut self.result.upds) {
533                    self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
534                }
535                lower2 += 1;
536            }
537
538            // Values being pushed indicate non-emptiness.
539            if self.result.vals.vals.len() > init_vals {
540                Some(self.result.vals.vals.len())
541            } else {
542                None
543            }
544        }
545
546        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
547        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
548            let (lower, upper) = source.upds.bounds(index);
549            for i in lower .. upper {
550                // NB: Here is where we would need to look back if `lower == upper`.
551                let (time, diff) = source.upds.get_abs(i);
552                use crate::lattice::Lattice;
553                let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
554                new_time.advance_by(self.description.since().borrow());
555                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
556            }
557        }
558    }
559
560    /// A cursor for navigating a single layer.
561    pub struct OrdValCursor<L: Layout> {
562        /// Absolute position of the current key.
563        key_cursor: usize,
564        /// Absolute position of the current value.
565        val_cursor: usize,
566        /// Phantom marker for Rust happiness.
567        phantom: PhantomData<L>,
568    }
569
570    use crate::trace::implementations::WithLayout;
571    impl<L: Layout> WithLayout for OrdValCursor<L> {
572        type Layout = L;
573    }
574
575    impl<L: Layout> Cursor for OrdValCursor<L> {
576
577        type Storage = OrdValBatch<L>;
578
579        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
580        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
581
582        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
583        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
584        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
585            let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
586            for index in lower .. upper {
587                let (time, diff) = storage.storage.upds.get_abs(index);
588                logic(time, diff);
589            }
590        }
591        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
592        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
593        fn step_key(&mut self, storage: &OrdValBatch<L>){
594            self.key_cursor += 1;
595            if self.key_valid(storage) {
596                self.rewind_vals(storage);
597            }
598            else {
599                self.key_cursor = storage.storage.keys.len();
600            }
601        }
602        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
603            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
604            if self.key_valid(storage) {
605                self.rewind_vals(storage);
606            }
607        }
608        fn step_val(&mut self, storage: &OrdValBatch<L>) {
609            self.val_cursor += 1;
610            if !self.val_valid(storage) {
611                self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
612            }
613        }
614        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
615            self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
616        }
617        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
618            self.key_cursor = 0;
619            if self.key_valid(storage) {
620                self.rewind_vals(storage)
621            }
622        }
623        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
624            self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
625        }
626    }
627
628    /// A builder for creating layers from unsorted update tuples.
629    pub struct OrdValBuilder<L: Layout, CI> {
630        /// The in-progress result.
631        ///
632        /// This is public to allow container implementors to set and inspect their container.
633        pub result: OrdValStorage<L>,
634        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
635        _marker: PhantomData<CI>,
636    }
637
638    impl<L, CI> Builder for OrdValBuilder<L, CI>
639    where
640        L: for<'a> Layout<
641            KeyContainer: PushInto<CI::Key<'a>>,
642            ValContainer: PushInto<CI::Val<'a>>,
643        >,
644        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
645    {
646
647        type Input = CI;
648        type Time = layout::Time<L>;
649        type Output = OrdValBatch<L>;
650
651        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
652            Self {
653                result: OrdValStorage {
654                    keys: L::KeyContainer::with_capacity(keys),
655                    vals: Vals::with_capacity(keys + 1, vals),
656                    upds: Upds::with_capacity(vals + 1, upds),
657                },
658                staging: UpdsBuilder::default(),
659                _marker: PhantomData,
660            }
661        }
662
663        #[inline]
664        fn push(&mut self, chunk: &mut Self::Input) {
665            for item in chunk.drain() {
666                let (key, val, time, diff) = CI::into_parts(item);
667
668                // Pre-load the first update.
669                if self.result.keys.is_empty() {
670                    self.result.vals.vals.push_into(val);
671                    self.result.keys.push_into(key);
672                    self.staging.push(time, diff);
673                }
674                // Perhaps this is a continuation of an already received key.
675                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
676                    // Perhaps this is a continuation of an already received value.
677                    if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
678                        self.staging.push(time, diff);
679                    } else {
680                        // New value; complete representation of prior value.
681                        self.staging.seal(&mut self.result.upds);
682                        self.staging.push(time, diff);
683                        self.result.vals.vals.push_into(val);
684                    }
685                } else {
686                    // New key; complete representation of prior key.
687                    self.staging.seal(&mut self.result.upds);
688                    self.staging.push(time, diff);
689                    self.result.vals.offs.push_ref(self.result.vals.vals.len());
690                    self.result.vals.vals.push_into(val);
691                    self.result.keys.push_into(key);
692                }
693            }
694        }
695
696        #[inline(never)]
697        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
698            self.staging.seal(&mut self.result.upds);
699            self.result.vals.offs.push_ref(self.result.vals.vals.len());
700            OrdValBatch {
701                updates: self.staging.total(),
702                storage: self.result,
703                description,
704            }
705        }
706
707        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
708            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
709            let mut builder = Self::with_capacity(keys, vals, upds);
710            for mut chunk in chain.drain(..) {
711                builder.push(&mut chunk);
712            }
713
714            builder.done(description)
715        }
716    }
717}
718
719/// Types related to forming batches of keys.
720pub mod key_batch {
721
722    use std::marker::PhantomData;
723    use serde::{Deserialize, Serialize};
724    use timely::container::PushInto;
725    use timely::progress::{Antichain, frontier::AntichainRef};
726
727    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
728    use crate::trace::implementations::{BatchContainer, BuilderInput};
729    use crate::trace::implementations::layout;
730
731    use super::{Layout, Upds, layers::UpdsBuilder};
732
733    /// An immutable collection of update tuples, from a contiguous interval of logical times.
734    #[derive(Debug, Serialize, Deserialize)]
735    #[serde(bound = "
736        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
737        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
738        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
739        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
740    ")]
741    pub struct OrdKeyStorage<L: Layout> {
742        /// An ordered list of keys, corresponding to entries in `keys_offs`.
743        pub keys: L::KeyContainer,
744        /// For each key in `keys`, a list of (time, diff) updates.
745        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
746    }
747
748    /// An immutable collection of update tuples, from a contiguous interval of logical times.
749    ///
750    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
751    /// merge batcher to select.
752    #[derive(Serialize, Deserialize)]
753    #[serde(bound = "
754        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
755        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
756        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
757        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
758        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
759    ")]
760    pub struct OrdKeyBatch<L: Layout> {
761        /// The updates themselves.
762        pub storage: OrdKeyStorage<L>,
763        /// Description of the update times this layer represents.
764        pub description: Description<layout::Time<L>>,
765        /// The number of updates reflected in the batch.
766        ///
767        /// We track this separately from `storage` because due to the singleton optimization,
768        /// we may have many more updates than `storage.updates.len()`. It should equal that
769        /// length, plus the number of singleton optimizations employed.
770        pub updates: usize,
771
772        /// Single value to return if asked.
773        pub value: L::ValContainer,
774    }
775
776    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
777        /// Creates a container with one value, to slot in to `self.value`.
778        pub fn create_value() -> L::ValContainer {
779            let mut value = L::ValContainer::with_capacity(1);
780            value.push_own(&Default::default());
781            value
782        }
783    }
784
785    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
786        type Layout = L;
787    }
788
789    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
790
791        type Cursor = OrdKeyCursor<L>;
792        fn cursor(&self) -> Self::Cursor {
793            OrdKeyCursor {
794                key_cursor: 0,
795                val_stepped: false,
796                phantom: std::marker::PhantomData,
797            }
798        }
799        fn len(&self) -> usize {
800            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
801            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
802            self.updates
803        }
804        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
805    }
806
807    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
808        type Merger = OrdKeyMerger<L>;
809
810        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
811            OrdKeyMerger::new(self, other, compaction_frontier)
812        }
813
814        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
815            use timely::progress::Timestamp;
816            Self {
817                storage: OrdKeyStorage {
818                    keys: L::KeyContainer::with_capacity(0),
819                    upds: Upds::default(),
820                },
821                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
822                updates: 0,
823                value: Self::create_value(),
824            }
825        }
826    }
827
828    /// State for an in-progress merge.
829    pub struct OrdKeyMerger<L: Layout> {
830        /// Key position to merge next in the first batch.
831        key_cursor1: usize,
832        /// Key position to merge next in the second batch.
833        key_cursor2: usize,
834        /// result that we are currently assembling.
835        result: OrdKeyStorage<L>,
836        /// description
837        description: Description<layout::Time<L>>,
838
839        /// Local stash of updates, to use for consolidation.
840        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
841    }
842
843    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
844    where
845        OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
846    {
847        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
848
849            assert!(batch1.upper() == batch2.lower());
850            use crate::lattice::Lattice;
851            let mut since = batch1.description().since().join(batch2.description().since());
852            since = since.join(&compaction_frontier.to_owned());
853
854            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
855
856            let batch1 = &batch1.storage;
857            let batch2 = &batch2.storage;
858
859            OrdKeyMerger {
860                key_cursor1: 0,
861                key_cursor2: 0,
862                result: OrdKeyStorage {
863                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
864                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
865                },
866                description,
867                staging: UpdsBuilder::default(),
868            }
869        }
870        fn done(self) -> OrdKeyBatch<L> {
871            OrdKeyBatch {
872                updates: self.staging.total(),
873                storage: self.result,
874                description: self.description,
875                value: OrdKeyBatch::<L>::create_value(),
876            }
877        }
878        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
879
880            // An (incomplete) indication of the amount of work we've done so far.
881            let starting_updates = self.staging.total();
882            let mut effort = 0isize;
883
884            // While both mergees are still active, perform single-key merges.
885            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
886                self.merge_key(&source1.storage, &source2.storage);
887                // An (incomplete) accounting of the work we've done.
888                effort = (self.staging.total() - starting_updates) as isize;
889            }
890
891            // Merging is complete, and only copying remains.
892            // Key-by-key copying allows effort interruption, and compaction.
893            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
894                self.copy_key(&source1.storage, self.key_cursor1);
895                self.key_cursor1 += 1;
896                effort = (self.staging.total() - starting_updates) as isize;
897            }
898            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
899                self.copy_key(&source2.storage, self.key_cursor2);
900                self.key_cursor2 += 1;
901                effort = (self.staging.total() - starting_updates) as isize;
902            }
903
904            *fuel -= effort;
905        }
906    }
907
908    // Helper methods in support of merging batches.
909    impl<L: Layout> OrdKeyMerger<L> {
910        /// Copy the next key in `source`.
911        ///
912        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
913        /// If the result does not wholly cancel, they key will be present in `self` with the
914        /// compacted values and updates.
915        ///
916        /// The caller should be certain to update the cursor, as this method does not do this.
917        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
918            self.stash_updates_for_key(source, cursor);
919            if self.staging.seal(&mut self.result.upds) {
920                self.result.keys.push_ref(source.keys.index(cursor));
921            }
922        }
923        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
924        ///
925        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
926        /// if the updates cancel either directly or after compaction.
927        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
928            use ::std::cmp::Ordering;
929            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
930                Ordering::Less => {
931                    self.copy_key(source1, self.key_cursor1);
932                    self.key_cursor1 += 1;
933                },
934                Ordering::Equal => {
935                    // Keys are equal; must merge all updates from both sources for this one key.
936                    self.stash_updates_for_key(source1, self.key_cursor1);
937                    self.stash_updates_for_key(source2, self.key_cursor2);
938                    if self.staging.seal(&mut self.result.upds) {
939                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
940                    }
941                    // Increment cursors in either case; the keys are merged.
942                    self.key_cursor1 += 1;
943                    self.key_cursor2 += 1;
944                },
945                Ordering::Greater => {
946                    self.copy_key(source2, self.key_cursor2);
947                    self.key_cursor2 += 1;
948                },
949            }
950        }
951
952        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
953        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
954            let (lower, upper) = source.upds.bounds(index);
955            for i in lower .. upper {
956                // NB: Here is where we would need to look back if `lower == upper`.
957                let (time, diff) = source.upds.get_abs(i);
958                use crate::lattice::Lattice;
959                let mut new_time = L::TimeContainer::into_owned(time);
960                new_time.advance_by(self.description.since().borrow());
961                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
962            }
963        }
964    }
965
966    /// A cursor for navigating a single layer.
967    pub struct OrdKeyCursor<L: Layout> {
968        /// Absolute position of the current key.
969        key_cursor: usize,
970        /// If the value has been stepped for the key, there are no more values.
971        val_stepped: bool,
972        /// Phantom marker for Rust happiness.
973        phantom: PhantomData<L>,
974    }
975
976    use crate::trace::implementations::WithLayout;
977    impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
978        type Layout = L;
979    }
980
981    impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
982
983        type Storage = OrdKeyBatch<L>;
984
985        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
986        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
987
988        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
989        fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
990        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
991            let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
992            for index in lower .. upper {
993                let (time, diff) = storage.storage.upds.get_abs(index);
994                logic(time, diff);
995            }
996        }
997        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
998        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
999        fn step_key(&mut self, storage: &Self::Storage){
1000            self.key_cursor += 1;
1001            if self.key_valid(storage) {
1002                self.rewind_vals(storage);
1003            }
1004            else {
1005                self.key_cursor = storage.storage.keys.len();
1006            }
1007        }
1008        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1009            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1010            if self.key_valid(storage) {
1011                self.rewind_vals(storage);
1012            }
1013        }
1014        fn step_val(&mut self, _storage: &Self::Storage) {
1015            self.val_stepped = true;
1016        }
1017        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1018        fn rewind_keys(&mut self, storage: &Self::Storage) {
1019            self.key_cursor = 0;
1020            if self.key_valid(storage) {
1021                self.rewind_vals(storage)
1022            }
1023        }
1024        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1025            self.val_stepped = false;
1026        }
1027    }
1028
1029    /// A builder for creating layers from unsorted update tuples.
1030    pub struct OrdKeyBuilder<L: Layout, CI> {
1031        /// The in-progress result.
1032        ///
1033        /// This is public to allow container implementors to set and inspect their container.
1034        pub result: OrdKeyStorage<L>,
1035        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1036        _marker: PhantomData<CI>,
1037    }
1038
1039    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1040    where
1041        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1042        L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1043        CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1044    {
1045
1046        type Input = CI;
1047        type Time = layout::Time<L>;
1048        type Output = OrdKeyBatch<L>;
1049
1050        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1051            Self {
1052                result: OrdKeyStorage {
1053                    keys: L::KeyContainer::with_capacity(keys),
1054                    upds: Upds::with_capacity(keys+1, upds),
1055                },
1056                staging: UpdsBuilder::default(),
1057                _marker: PhantomData,
1058            }
1059        }
1060
1061        #[inline]
1062        fn push(&mut self, chunk: &mut Self::Input) {
1063            for item in chunk.drain() {
1064                let (key, _val, time, diff) = CI::into_parts(item);
1065                if self.result.keys.is_empty() {
1066                    self.result.keys.push_into(key);
1067                    self.staging.push(time, diff);
1068                }
1069                // Perhaps this is a continuation of an already received key.
1070                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1071                    self.staging.push(time, diff);
1072                } else {
1073                    self.staging.seal(&mut self.result.upds);
1074                    self.staging.push(time, diff);
1075                    self.result.keys.push_into(key);
1076                }
1077            }
1078        }
1079
1080        #[inline(never)]
1081        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1082            self.staging.seal(&mut self.result.upds);
1083            OrdKeyBatch {
1084                updates: self.staging.total(),
1085                storage: self.result,
1086                description,
1087                value: OrdKeyBatch::<L>::create_value(),
1088            }
1089        }
1090
1091        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1092            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1093            let mut builder = Self::with_capacity(keys, vals, upds);
1094            for mut chunk in chain.drain(..) {
1095                builder.push(&mut chunk);
1096            }
1097
1098            builder.done(description)
1099        }
1100    }
1101
1102}