differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::trace::implementations::spine_fueled::Spine;
14use crate::trace::implementations::merge_batcher::MergeBatcher;
15use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
16use crate::trace::rc_blanket_impls::RcBuilder;
17
18use super::{Update, Layout, Vector, TStack, Preferred};
19
20pub use self::val_batch::{OrdValBatch, OrdValBuilder};
21pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
22
23/// A trace implementation using a spine of ordered lists.
24pub type OrdValSpine<K, V, T, R> = Spine<
25    Rc<OrdValBatch<Vector<((K,V),T,R)>>>,
26    MergeBatcher<K,V,T,R>,
27    RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>>>,
28>;
29// /// A trace implementation for empty values using a spine of ordered lists.
30// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
31
32/// A trace implementation backed by columnar storage.
33pub type ColValSpine<K, V, T, R> = Spine<
34    Rc<OrdValBatch<TStack<((K,V),T,R)>>>,
35    ColumnatedMergeBatcher<K,V,T,R>,
36    RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>>>,
37>;
38
39/// A trace implementation using a spine of ordered lists.
40pub type OrdKeySpine<K, T, R> = Spine<
41    Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>,
42    MergeBatcher<K,(),T,R>,
43    RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>>>,
44>;
45// /// A trace implementation for empty values using a spine of ordered lists.
46// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
47
48/// A trace implementation backed by columnar storage.
49pub type ColKeySpine<K, T, R> = Spine<
50    Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>,
51    ColumnatedMergeBatcher<K,(),T,R>,
52    RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>>>,
53>;
54
55/// A trace implementation backed by columnar storage.
56pub type PreferredSpine<K, V, T, R> = Spine<
57    Rc<OrdValBatch<Preferred<K,V,T,R>>>,
58    ColumnatedMergeBatcher<<K as ToOwned>::Owned,<V as ToOwned>::Owned,T,R>,
59    RcBuilder<OrdValBuilder<Preferred<K,V,T,R>>>,
60>;
61
62
63// /// A trace implementation backed by columnar storage.
64// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
65
66mod val_batch {
67
68    use std::marker::PhantomData;
69    use abomonation_derive::Abomonation;
70    use timely::progress::{Antichain, frontier::AntichainRef};
71
72    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
73    use crate::trace::implementations::{BatchContainer, OffsetList};
74    use crate::trace::cursor::MyTrait;
75
76    use super::{Layout, Update};
77
78    /// An immutable collection of update tuples, from a contiguous interval of logical times.
79    #[derive(Abomonation, Debug)]
80    pub struct OrdValStorage<L: Layout> {
81        /// An ordered list of keys, corresponding to entries in `keys_offs`.
82        pub keys: L::KeyContainer,
83        /// Offsets used to provide indexes from keys to values.
84        ///
85        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
86        pub keys_offs: OffsetList,
87        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
88        pub vals: L::ValContainer,
89        /// Offsets used to provide indexes from values to updates.
90        ///
91        /// This list has a special representation that any empty range indicates the singleton
92        /// element just before the range, as if the start were decremented by one. The empty
93        /// range is otherwise an invalid representation, and we borrow it to compactly encode
94        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
95        ///
96        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
97        pub vals_offs: OffsetList,
98        /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
99        pub updates: L::UpdContainer,
100    }
101
102    impl<L: Layout> OrdValStorage<L> {
103        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
104        fn values_for_key(&self, index: usize) -> (usize, usize) {
105            (self.keys_offs.index(index), self.keys_offs.index(index+1))
106        }
107        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
108        fn updates_for_value(&self, index: usize) -> (usize, usize) {
109            let mut lower = self.vals_offs.index(index);
110            let upper = self.vals_offs.index(index+1);
111            // We use equal lower and upper to encode "singleton update; just before here".
112            // It should only apply when there is a prior element, so `lower` should be greater than zero.
113            if lower == upper {
114                assert!(lower > 0);
115                lower -= 1;
116            }
117            (lower, upper)
118        }
119    }
120
121    /// An immutable collection of update tuples, from a contiguous interval of logical times.
122    ///
123    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
124    /// merge batcher to select.
125    #[derive(Abomonation)]
126    pub struct OrdValBatch<L: Layout> {
127        /// The updates themselves.
128        pub storage: OrdValStorage<L>,
129        /// Description of the update times this layer represents.
130        pub description: Description<<L::Target as Update>::Time>,
131        /// The number of updates reflected in the batch.
132        ///
133        /// We track this separately from `storage` because due to the singleton optimization,
134        /// we may have many more updates than `storage.updates.len()`. It should equal that 
135        /// length, plus the number of singleton optimizations employed.
136        pub updates: usize,
137    }
138
139    impl<L: Layout> BatchReader for OrdValBatch<L> {
140        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
141        type KeyOwned = <L::Target as Update>::Key;
142        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
143        type ValOwned = <L::Target as Update>::Val;
144        type Time = <L::Target as Update>::Time;
145        type Diff = <L::Target as Update>::Diff;
146
147        type Cursor = OrdValCursor<L>;
148        fn cursor(&self) -> Self::Cursor { 
149            OrdValCursor {
150                key_cursor: 0,
151                val_cursor: 0,
152                phantom: std::marker::PhantomData,
153            }
154        }
155        fn len(&self) -> usize { 
156            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
157            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
158            self.updates
159        }
160        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
161    }
162
163    impl<L: Layout> Batch for OrdValBatch<L> {
164        type Merger = OrdValMerger<L>;
165
166        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
167            OrdValMerger::new(self, other, compaction_frontier)
168        }
169    }
170
171    /// State for an in-progress merge.
172    pub struct OrdValMerger<L: Layout> {
173        /// Key position to merge next in the first batch.
174        key_cursor1: usize,
175        /// Key position to merge next in the second batch.
176        key_cursor2: usize,
177        /// result that we are currently assembling.
178        result: OrdValStorage<L>,
179        /// description
180        description: Description<<L::Target as Update>::Time>,
181
182        /// Local stash of updates, to use for consolidation.
183        ///
184        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
185        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
186        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
187        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
188        singletons: usize,
189    }
190
191    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
192    where
193        OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>
194    {
195        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
196
197            assert!(batch1.upper() == batch2.lower());
198            use crate::lattice::Lattice;
199            let mut since = batch1.description().since().join(batch2.description().since());
200            since = since.join(&compaction_frontier.to_owned());
201
202            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
203
204            let batch1 = &batch1.storage;
205            let batch2 = &batch2.storage;
206
207            let mut storage = OrdValStorage {
208                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
209                keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
210                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
211                vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
212                updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
213            };
214
215            storage.keys_offs.push(0);
216            storage.vals_offs.push(0);
217
218            OrdValMerger {
219                key_cursor1: 0,
220                key_cursor2: 0,
221                result: storage,
222                description,
223                update_stash: Vec::new(),
224                singletons: 0,
225            }
226        }
227        fn done(self) -> OrdValBatch<L> {
228            OrdValBatch {
229                updates: self.result.updates.len() + self.singletons,
230                storage: self.result,
231                description: self.description,
232            }
233        }
234        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
235
236            // An (incomplete) indication of the amount of work we've done so far.
237            let starting_updates = self.result.updates.len();
238            let mut effort = 0isize;
239
240            // While both mergees are still active, perform single-key merges.
241            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
242                self.merge_key(&source1.storage, &source2.storage);
243                // An (incomplete) accounting of the work we've done.
244                effort = (self.result.updates.len() - starting_updates) as isize;
245            }
246
247            // Merging is complete, and only copying remains. 
248            // Key-by-key copying allows effort interruption, and compaction.
249            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
250                self.copy_key(&source1.storage, self.key_cursor1);
251                self.key_cursor1 += 1;
252                effort = (self.result.updates.len() - starting_updates) as isize;
253            }
254            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
255                self.copy_key(&source2.storage, self.key_cursor2);
256                self.key_cursor2 += 1;
257                effort = (self.result.updates.len() - starting_updates) as isize;
258            }
259
260            *fuel -= effort;
261        }
262    }
263
264    // Helper methods in support of merging batches.
265    impl<L: Layout> OrdValMerger<L> {
266        /// Copy the next key in `source`.
267        ///
268        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
269        /// If the result does not wholly cancel, they key will be present in `self` with the
270        /// compacted values and updates.
271        ///
272        /// The caller should be certain to update the cursor, as this method does not do this.
273        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
274            // Capture the initial number of values to determine if the merge was ultimately non-empty.
275            let init_vals = self.result.vals.len();
276            let (mut lower, upper) = source.values_for_key(cursor);
277            while lower < upper {
278                self.stash_updates_for_val(source, lower);
279                if let Some(off) = self.consolidate_updates() {
280                    self.result.vals_offs.push(off);
281                    self.result.vals.copy(source.vals.index(lower));
282                }
283                lower += 1;
284            }            
285
286            // If we have pushed any values, copy the key as well.
287            if self.result.vals.len() > init_vals {
288                self.result.keys.copy(source.keys.index(cursor));
289                self.result.keys_offs.push(self.result.vals.len());
290            }           
291        }
292        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
293        ///
294        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
295        /// if the updates cancel either directly or after compaction.
296        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
297            use ::std::cmp::Ordering;
298            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
299                Ordering::Less => { 
300                    self.copy_key(source1, self.key_cursor1);
301                    self.key_cursor1 += 1;
302                },
303                Ordering::Equal => {
304                    // Keys are equal; must merge all values from both sources for this one key.
305                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
306                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
307                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
308                        self.result.keys.copy(source1.keys.index(self.key_cursor1));
309                        self.result.keys_offs.push(off);
310                    }
311                    // Increment cursors in either case; the keys are merged.
312                    self.key_cursor1 += 1;
313                    self.key_cursor2 += 1;
314                },
315                Ordering::Greater => {
316                    self.copy_key(source2, self.key_cursor2);
317                    self.key_cursor2 += 1;
318                },
319            }
320        }
321        /// Merge two ranges of values into `self`.
322        ///
323        /// If the compacted result contains values with non-empty updates, the function returns
324        /// an offset that should be recorded to indicate the upper extent of the result values.
325        fn merge_vals(
326            &mut self, 
327            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize), 
328            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
329        ) -> Option<usize> {
330            // Capture the initial number of values to determine if the merge was ultimately non-empty.
331            let init_vals = self.result.vals.len();
332            while lower1 < upper1 && lower2 < upper2 {
333                // We compare values, and fold in updates for the lowest values;
334                // if they are non-empty post-consolidation, we write the value.
335                // We could multi-way merge and it wouldn't be very complicated.
336                use ::std::cmp::Ordering;
337                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
338                    Ordering::Less => { 
339                        // Extend stash by updates, with logical compaction applied.
340                        self.stash_updates_for_val(source1, lower1);
341                        if let Some(off) = self.consolidate_updates() {
342                            self.result.vals_offs.push(off);
343                            self.result.vals.copy(source1.vals.index(lower1));
344                        }
345                        lower1 += 1;
346                    },
347                    Ordering::Equal => {
348                        self.stash_updates_for_val(source1, lower1);
349                        self.stash_updates_for_val(source2, lower2);
350                        if let Some(off) = self.consolidate_updates() {
351                            self.result.vals_offs.push(off);
352                            self.result.vals.copy(source1.vals.index(lower1));
353                        }
354                        lower1 += 1;
355                        lower2 += 1;
356                    },
357                    Ordering::Greater => { 
358                        // Extend stash by updates, with logical compaction applied.
359                        self.stash_updates_for_val(source2, lower2);
360                        if let Some(off) = self.consolidate_updates() {
361                            self.result.vals_offs.push(off);
362                            self.result.vals.copy(source2.vals.index(lower2));
363                        }
364                        lower2 += 1;
365                    },
366                }
367            }
368            // Merging is complete, but we may have remaining elements to push.
369            while lower1 < upper1 {
370                self.stash_updates_for_val(source1, lower1);
371                if let Some(off) = self.consolidate_updates() {
372                    self.result.vals_offs.push(off);
373                    self.result.vals.copy(source1.vals.index(lower1));
374                }
375                lower1 += 1;
376            }
377            while lower2 < upper2 {
378                self.stash_updates_for_val(source2, lower2);
379                if let Some(off) = self.consolidate_updates() {
380                    self.result.vals_offs.push(off);
381                    self.result.vals.copy(source2.vals.index(lower2));
382                }
383                lower2 += 1;
384            }
385
386            // Values being pushed indicate non-emptiness.
387            if self.result.vals.len() > init_vals {
388                Some(self.result.vals.len())
389            } else {
390                None
391            }
392        }
393
394        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
395        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
396            let (lower, upper) = source.updates_for_value(index);
397            for i in lower .. upper {
398                // NB: Here is where we would need to look back if `lower == upper`.
399                let (time, diff) = &source.updates.index(i).into_owned();
400                use crate::lattice::Lattice;
401                let mut new_time = time.clone();
402                new_time.advance_by(self.description.since().borrow());
403                self.update_stash.push((new_time, diff.clone()));
404            }
405        }
406
407        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
408        fn consolidate_updates(&mut self) -> Option<usize> {
409            use crate::consolidation;
410            consolidation::consolidate(&mut self.update_stash);
411            if !self.update_stash.is_empty() {
412                // If there is a single element, equal to a just-prior recorded update,
413                // we push nothing and report an unincremented offset to encode this case.
414                if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().equals(ud)).unwrap_or(false) {
415                        // Just clear out update_stash, as we won't drain it here.
416                    self.update_stash.clear();
417                    self.singletons += 1;
418                }
419                else {
420                    // Conventional; move `update_stash` into `updates`.
421                    for item in self.update_stash.drain(..) {
422                        self.result.updates.push(item);
423                    }
424                }
425                Some(self.result.updates.len())
426            } else {
427                None
428            }
429        }
430    }
431
432    /// A cursor for navigating a single layer.
433    pub struct OrdValCursor<L: Layout> {
434        /// Absolute position of the current key.
435        key_cursor: usize,
436        /// Absolute position of the current value.
437        val_cursor: usize,
438        /// Phantom marker for Rust happiness.
439        phantom: PhantomData<L>,
440    }
441
442    impl<L: Layout> Cursor for OrdValCursor<L> {
443
444        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
445        type KeyOwned = <L::Target as Update>::Key;
446        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
447        type ValOwned = <L::Target as Update>::Val;
448        type Time = <L::Target as Update>::Time;
449        type Diff = <L::Target as Update>::Diff;
450
451        type Storage = OrdValBatch<L>;
452
453        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
454        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
455        fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
456            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
457            for index in lower .. upper {
458                let (time, diff) = &storage.storage.updates.index(index);
459                logic(time, diff);
460            }
461        }
462        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
463        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
464        fn step_key(&mut self, storage: &OrdValBatch<L>){
465            self.key_cursor += 1;
466            if self.key_valid(storage) {
467                self.rewind_vals(storage);
468            }
469            else {
470                self.key_cursor = storage.storage.keys.len();
471            }
472        }
473        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
474            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key));
475            if self.key_valid(storage) {
476                self.rewind_vals(storage);
477            }
478        }
479        fn step_val(&mut self, storage: &OrdValBatch<L>) {
480            self.val_cursor += 1; 
481            if !self.val_valid(storage) {
482                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
483            }
484        }
485        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
486            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val));
487        }
488        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
489            self.key_cursor = 0;
490            if self.key_valid(storage) {
491                self.rewind_vals(storage)
492            }
493        }
494        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
495            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
496        }
497    }
498
499    /// A builder for creating layers from unsorted update tuples.
500    pub struct OrdValBuilder<L: Layout> {
501        result: OrdValStorage<L>,
502        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
503        /// Counts the number of singleton optimizations we performed.
504        ///
505        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
506        /// even though `updates.len()` may be much shorter than this amount.
507        singletons: usize,
508    }
509
510    impl<L: Layout> OrdValBuilder<L> {
511        /// Pushes a single update, which may set `self.singleton` rather than push.
512        ///
513        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
514        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
515        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
516        /// This otherwise invalid state encodes "look back one element".
517        ///
518        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
519        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
520        /// The update tuple is retained in `self.singleton` in case we see another update and need
521        /// to recover the singleton to push it into `updates` to join the second update.
522        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
523            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
524            if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
525                assert!(self.singleton.is_none());
526                self.singleton = Some((time, diff));
527            }
528            else {
529                // If we have pushed a single element, we need to copy it out to meet this one.
530                if let Some(time_diff) = self.singleton.take() {
531                    self.result.updates.push(time_diff);
532                }
533                self.result.updates.push((time, diff));
534            }
535        }
536    }
537
538    impl<L: Layout> Builder for OrdValBuilder<L> {
539
540        type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
541        type Time = <L::Target as Update>::Time;
542        type Output = OrdValBatch<L>;
543
544        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
545            // We don't introduce zero offsets as they will be introduced by the first `push` call.
546            Self { 
547                result: OrdValStorage {
548                    keys: L::KeyContainer::with_capacity(keys),
549                    keys_offs: OffsetList::with_capacity(keys + 1),
550                    vals: L::ValContainer::with_capacity(vals),
551                    vals_offs: OffsetList::with_capacity(vals + 1),
552                    updates: L::UpdContainer::with_capacity(upds),
553                },
554                singleton: None,
555                singletons: 0,
556            }
557        }
558
559        #[inline]
560        fn push(&mut self, ((key, val), time, diff): Self::Item) {
561
562            // Perhaps this is a continuation of an already received key.
563            if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
564                // Perhaps this is a continuation of an already received value.
565                if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) {
566                    self.push_update(time, diff);
567                } else {
568                    // New value; complete representation of prior value.
569                    self.result.vals_offs.push(self.result.updates.len());
570                    if self.singleton.take().is_some() { self.singletons += 1; }
571                    self.push_update(time, diff);
572                    self.result.vals.push(val);
573                }
574            } else {
575                // New key; complete representation of prior key.
576                self.result.vals_offs.push(self.result.updates.len());
577                if self.singleton.take().is_some() { self.singletons += 1; }
578                self.result.keys_offs.push(self.result.vals.len());
579                self.push_update(time, diff);
580                self.result.vals.push(val);
581                self.result.keys.push(key);
582            }
583        }
584
585        #[inline]
586        fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
587
588            // Perhaps this is a continuation of an already received key.
589            if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
590                // Perhaps this is a continuation of an already received value.
591                if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) {
592                    // TODO: here we could look for repetition, and not push the update in that case.
593                    // More logic (and state) would be required to correctly wrangle this.
594                    self.push_update(time.clone(), diff.clone());
595                } else {
596                    // New value; complete representation of prior value.
597                    self.result.vals_offs.push(self.result.updates.len());
598                    // Remove any pending singleton, and if it was set increment our count.
599                    if self.singleton.take().is_some() { self.singletons += 1; }
600                    self.push_update(time.clone(), diff.clone());
601                    self.result.vals.copy_push(val);
602                }
603            } else {
604                // New key; complete representation of prior key.
605                self.result.vals_offs.push(self.result.updates.len());
606                // Remove any pending singleton, and if it was set increment our count.
607                if self.singleton.take().is_some() { self.singletons += 1; }
608                self.result.keys_offs.push(self.result.vals.len());
609                self.push_update(time.clone(), diff.clone());
610                self.result.vals.copy_push(val);
611                self.result.keys.copy_push(key);
612            }
613        }
614
615        #[inline(never)]
616        fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L> {
617            // Record the final offsets
618            self.result.vals_offs.push(self.result.updates.len());
619            // Remove any pending singleton, and if it was set increment our count.
620            if self.singleton.take().is_some() { self.singletons += 1; }
621            self.result.keys_offs.push(self.result.vals.len());
622            OrdValBatch {
623                updates: self.result.updates.len() + self.singletons,
624                storage: self.result,
625                description: Description::new(lower, upper, since),
626            }
627        }
628    }
629
630}
631
632mod key_batch {
633
634    use std::marker::PhantomData;
635    use abomonation_derive::Abomonation;
636    use timely::progress::{Antichain, frontier::AntichainRef};
637
638    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
639    use crate::trace::implementations::{BatchContainer, OffsetList};
640    use crate::trace::cursor::MyTrait;
641
642    use super::{Layout, Update};
643
644    /// An immutable collection of update tuples, from a contiguous interval of logical times.
645    #[derive(Abomonation, Debug)]
646    pub struct OrdKeyStorage<L: Layout> {
647        /// An ordered list of keys, corresponding to entries in `keys_offs`.
648        pub keys: L::KeyContainer,
649        /// Offsets used to provide indexes from keys to updates.
650        ///
651        /// This list has a special representation that any empty range indicates the singleton
652        /// element just before the range, as if the start were decremented by one. The empty
653        /// range is otherwise an invalid representation, and we borrow it to compactly encode
654        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
655        ///
656        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
657        pub keys_offs: OffsetList,
658        /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`.
659        pub updates: L::UpdContainer,
660    }
661
662    impl<L: Layout> OrdKeyStorage<L> {
663        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
664        fn updates_for_key(&self, index: usize) -> (usize, usize) {
665            let mut lower = self.keys_offs.index(index);
666            let upper = self.keys_offs.index(index+1);
667            // We use equal lower and upper to encode "singleton update; just before here".
668            // It should only apply when there is a prior element, so `lower` should be greater than zero.
669            if lower == upper {
670                assert!(lower > 0);
671                lower -= 1;
672            }
673            (lower, upper)
674        }
675    }
676
677    /// An immutable collection of update tuples, from a contiguous interval of logical times.
678    ///
679    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
680    /// merge batcher to select.
681    #[derive(Abomonation)]
682    pub struct OrdKeyBatch<L: Layout> {
683        /// The updates themselves.
684        pub storage: OrdKeyStorage<L>,
685        /// Description of the update times this layer represents.
686        pub description: Description<<L::Target as Update>::Time>,
687        /// The number of updates reflected in the batch.
688        ///
689        /// We track this separately from `storage` because due to the singleton optimization,
690        /// we may have many more updates than `storage.updates.len()`. It should equal that
691        /// length, plus the number of singleton optimizations employed.
692        pub updates: usize,
693    }
694
695    impl<L: Layout> BatchReader for OrdKeyBatch<L> {
696        
697        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
698        type KeyOwned = <L::Target as Update>::Key;
699        type Val<'a> = &'a ();
700        type ValOwned = ();
701        type Time = <L::Target as Update>::Time;
702        type Diff = <L::Target as Update>::Diff;
703
704        type Cursor = OrdKeyCursor<L>;
705        fn cursor(&self) -> Self::Cursor {
706            OrdKeyCursor {
707                key_cursor: 0,
708                val_stepped: false,
709                phantom: std::marker::PhantomData,
710            }
711        }
712        fn len(&self) -> usize {
713            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
714            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
715            self.updates
716        }
717        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
718    }
719
720    impl<L: Layout> Batch for OrdKeyBatch<L> {
721        type Merger = OrdKeyMerger<L>;
722
723        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
724            OrdKeyMerger::new(self, other, compaction_frontier)
725        }
726    }
727
728    /// State for an in-progress merge.
729    pub struct OrdKeyMerger<L: Layout> {
730        /// Key position to merge next in the first batch.
731        key_cursor1: usize,
732        /// Key position to merge next in the second batch.
733        key_cursor2: usize,
734        /// result that we are currently assembling.
735        result: OrdKeyStorage<L>,
736        /// description
737        description: Description<<L::Target as Update>::Time>,
738
739        /// Local stash of updates, to use for consolidation.
740        ///
741        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
742        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
743        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
744        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
745        singletons: usize,
746    }
747
748    impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
749    where
750        OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>
751    {
752        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
753
754            assert!(batch1.upper() == batch2.lower());
755            use crate::lattice::Lattice;
756            let mut since = batch1.description().since().join(batch2.description().since());
757            since = since.join(&compaction_frontier.to_owned());
758
759            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
760
761            let batch1 = &batch1.storage;
762            let batch2 = &batch2.storage;
763
764            let mut storage = OrdKeyStorage {
765                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
766                keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
767                updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates),
768            };
769
770            storage.keys_offs.push(0);
771
772            OrdKeyMerger {
773                key_cursor1: 0,
774                key_cursor2: 0,
775                result: storage,
776                description,
777                update_stash: Vec::new(),
778                singletons: 0,
779            }
780        }
781        fn done(self) -> OrdKeyBatch<L> {
782            OrdKeyBatch {
783                updates: self.result.updates.len() + self.singletons,
784                storage: self.result,
785                description: self.description,
786            }
787        }
788        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
789
790            // An (incomplete) indication of the amount of work we've done so far.
791            let starting_updates = self.result.updates.len();
792            let mut effort = 0isize;
793
794            // While both mergees are still active, perform single-key merges.
795            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
796                self.merge_key(&source1.storage, &source2.storage);
797                // An (incomplete) accounting of the work we've done.
798                effort = (self.result.updates.len() - starting_updates) as isize;
799            }
800
801            // Merging is complete, and only copying remains.
802            // Key-by-key copying allows effort interruption, and compaction.
803            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
804                self.copy_key(&source1.storage, self.key_cursor1);
805                self.key_cursor1 += 1;
806                effort = (self.result.updates.len() - starting_updates) as isize;
807            }
808            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
809                self.copy_key(&source2.storage, self.key_cursor2);
810                self.key_cursor2 += 1;
811                effort = (self.result.updates.len() - starting_updates) as isize;
812            }
813
814            *fuel -= effort;
815        }
816    }
817
818    // Helper methods in support of merging batches.
819    impl<L: Layout> OrdKeyMerger<L> {
820        /// Copy the next key in `source`.
821        ///
822        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
823        /// If the result does not wholly cancel, they key will be present in `self` with the
824        /// compacted values and updates. 
825        /// 
826        /// The caller should be certain to update the cursor, as this method does not do this.
827        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
828            self.stash_updates_for_key(source, cursor);
829            if let Some(off) = self.consolidate_updates() {
830                self.result.keys_offs.push(off);
831                self.result.keys.copy(source.keys.index(cursor));
832            }
833        }
834        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
835        ///
836        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
837        /// if the updates cancel either directly or after compaction.
838        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
839            use ::std::cmp::Ordering;
840            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
841                Ordering::Less => { 
842                    self.copy_key(source1, self.key_cursor1);
843                    self.key_cursor1 += 1;
844                },
845                Ordering::Equal => {
846                    // Keys are equal; must merge all updates from both sources for this one key.
847                    self.stash_updates_for_key(source1, self.key_cursor1);
848                    self.stash_updates_for_key(source2, self.key_cursor2);
849                    if let Some(off) = self.consolidate_updates() {
850                        self.result.keys_offs.push(off);
851                        self.result.keys.copy(source1.keys.index(self.key_cursor1));
852                    }
853                    // Increment cursors in either case; the keys are merged.
854                    self.key_cursor1 += 1;
855                    self.key_cursor2 += 1;
856                },
857                Ordering::Greater => {
858                    self.copy_key(source2, self.key_cursor2);
859                    self.key_cursor2 += 1;
860                },
861            }
862        }
863
864        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
865        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
866            let (lower, upper) = source.updates_for_key(index);
867            for i in lower .. upper {
868                // NB: Here is where we would need to look back if `lower == upper`.
869                let (time, diff) = &source.updates.index(i);
870                use crate::lattice::Lattice;
871                let mut new_time = time.clone();
872                new_time.advance_by(self.description.since().borrow());
873                self.update_stash.push((new_time, diff.clone()));
874            }
875        }
876
877        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
878        fn consolidate_updates(&mut self) -> Option<usize> {
879            use crate::consolidation;
880            consolidation::consolidate(&mut self.update_stash);
881            if !self.update_stash.is_empty() {
882                // If there is a single element, equal to a just-prior recorded update,
883                // we push nothing and report an unincremented offset to encode this case.
884                if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() {
885                    // Just clear out update_stash, as we won't drain it here.
886                    self.update_stash.clear();
887                    self.singletons += 1;
888                }
889                else {
890                    // Conventional; move `update_stash` into `updates`.
891                    for item in self.update_stash.drain(..) {
892                        self.result.updates.push(item);
893                    }
894                }
895                Some(self.result.updates.len())
896            } else {
897                None
898            }
899        }
900    }
901
902    /// A cursor for navigating a single layer.
903    pub struct OrdKeyCursor<L: Layout> {
904        /// Absolute position of the current key.
905        key_cursor: usize,
906        /// If the value has been stepped for the key, there are no more values.
907        val_stepped: bool,
908        /// Phantom marker for Rust happiness.
909        phantom: PhantomData<L>,
910    }
911
912    impl<L: Layout> Cursor for OrdKeyCursor<L> {
913
914        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
915        type KeyOwned = <L::Target as Update>::Key;
916        type Val<'a> = &'a ();
917        type ValOwned = ();
918        type Time = <L::Target as Update>::Time;
919        type Diff = <L::Target as Update>::Diff;
920
921        type Storage = OrdKeyBatch<L>;
922
923        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
924        fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
925        fn map_times<L2: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, mut logic: L2) {
926            let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
927            for index in lower .. upper {
928                let (time, diff) = &storage.storage.updates.index(index);
929                logic(time, diff);
930            }
931        }
932        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
933        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
934        fn step_key(&mut self, storage: &Self::Storage){
935            self.key_cursor += 1;
936            if self.key_valid(storage) {
937                self.rewind_vals(storage);
938            }
939            else {
940                self.key_cursor = storage.storage.keys.len();
941            }
942        }
943        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
944            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key));
945            if self.key_valid(storage) {
946                self.rewind_vals(storage);
947            }
948        }
949        fn step_val(&mut self, _storage: &Self::Storage) {
950            self.val_stepped = true;
951        }
952        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
953        fn rewind_keys(&mut self, storage: &Self::Storage) {
954            self.key_cursor = 0;
955            if self.key_valid(storage) {
956                self.rewind_vals(storage)
957            }
958        }
959        fn rewind_vals(&mut self, _storage: &Self::Storage) {
960            self.val_stepped = false;
961        }
962    }
963
964    /// A builder for creating layers from unsorted update tuples.
965    pub struct OrdKeyBuilder<L: Layout> {
966        result: OrdKeyStorage<L>,
967        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
968        /// Counts the number of singleton optimizations we performed.
969        ///
970        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
971        /// even though `updates.len()` may be much shorter than this amount.
972        singletons: usize,
973    }
974
975    impl<L: Layout> OrdKeyBuilder<L> {
976        /// Pushes a single update, which may set `self.singleton` rather than push.
977        ///
978        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
979        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
980        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
981        /// This otherwise invalid state encodes "look back one element".
982        ///
983        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
984        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
985        /// The update tuple is retained in `self.singleton` in case we see another update and need
986        /// to recover the singleton to push it into `updates` to join the second update.
987        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
988            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
989            if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
990                assert!(self.singleton.is_none());
991                self.singleton = Some((time, diff));
992            }
993            else {
994                // If we have pushed a single element, we need to copy it out to meet this one.
995                if let Some(time_diff) = self.singleton.take() {
996                    self.result.updates.push(time_diff);
997                }
998                self.result.updates.push((time, diff));
999            }
1000        }
1001    }
1002
1003    impl<L: Layout> Builder for OrdKeyBuilder<L> {
1004
1005        type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
1006        type Time = <L::Target as Update>::Time;
1007        type Output = OrdKeyBatch<L>;
1008
1009        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1010            // We don't introduce zero offsets as they will be introduced by the first `push` call.
1011            Self { 
1012                result: OrdKeyStorage {
1013                    keys: L::KeyContainer::with_capacity(keys),
1014                    keys_offs: OffsetList::with_capacity(keys + 1),
1015                    updates: L::UpdContainer::with_capacity(upds),
1016                },
1017                singleton: None,
1018                singletons: 0,
1019            }
1020        }
1021
1022        #[inline]
1023        fn push(&mut self, ((key, ()), time, diff): Self::Item) {
1024
1025            // Perhaps this is a continuation of an already received key.
1026            if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
1027                self.push_update(time, diff);
1028            } else {
1029                // New key; complete representation of prior key.
1030                self.result.keys_offs.push(self.result.updates.len());
1031                // Remove any pending singleton, and if it was set increment our count.
1032                if self.singleton.take().is_some() { self.singletons += 1; }
1033                self.push_update(time, diff);
1034                self.result.keys.push(key);
1035            }
1036        }
1037
1038        #[inline]
1039        fn copy(&mut self, ((key, ()), time, diff): &Self::Item) {
1040
1041            // Perhaps this is a continuation of an already received key.
1042            if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
1043                self.push_update(time.clone(), diff.clone());
1044            } else {
1045                // New key; complete representation of prior key.
1046                self.result.keys_offs.push(self.result.updates.len());
1047                // Remove any pending singleton, and if it was set increment our count.
1048                if self.singleton.take().is_some() { self.singletons += 1; }
1049                self.push_update(time.clone(), diff.clone());
1050                self.result.keys.copy_push(key);
1051            }
1052        }
1053
1054        #[inline(never)]
1055        fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdKeyBatch<L> {
1056            // Record the final offsets
1057            self.result.keys_offs.push(self.result.updates.len());
1058            // Remove any pending singleton, and if it was set increment our count.
1059            if self.singleton.take().is_some() { self.singletons += 1; }
1060            OrdKeyBatch {
1061                updates: self.result.updates.len() + self.singletons,
1062                storage: self.result,
1063                description: Description::new(lower, upper, since),
1064            }
1065        }
1066    }
1067
1068}