Skip to main content

palimpsest_dataflow/trace/
mod.rs

1//! Traits and datastructures representing a collection trace.
2//!
3//! A collection trace is a set of updates of the form `(key, val, time, diff)`, which determine the contents
4//! of a collection at given times by accumulating updates whose time field is less or equal to the target field.
5//!
6//! The `Trace` trait describes those types and methods that a data structure must implement to be viewed as a
7//! collection trace. This trait allows operator implementations to be generic with respect to the type of trace,
8//! and allows various data structures to be interpretable as multiple different types of trace.
9
10pub mod cursor;
11pub mod description;
12pub mod implementations;
13pub mod wrappers;
14
15use timely::progress::Timestamp;
16use timely::progress::{frontier::AntichainRef, Antichain};
17
18pub use self::cursor::Cursor;
19pub use self::description::Description;
20use crate::logging::Logger;
21
22use crate::trace::implementations::LayoutExt;
23
24/// A type used to express how much effort a trace should exert even in the absence of updates.
25pub type ExertionLogic =
26    std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize)]) -> Option<usize> + Send + Sync>;
27
28//     The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
29//     values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
30//     and vals that change the `Ord` implementation, or stash hash codes, or the like.
31//
32//     This complicates what requirements we make so that the trace is still usable by someone who knows only about
33//     the base key and value types. For example, the complex types should likely dereference to the simpler types,
34//    so that the user can make sense of the result as if they were given references to the simpler types. At the
35//  same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint)
36//  and we should, somehow, be able to take a reference to the simple types to compare against the more complex
37//  types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and
38//  really don't need anything more complex than a reference, but we can't form an owned copy of the complex type
39//  without cloning it.
40//
41//  We could just start by cloning things. Worry about wrapping references later on.
42
43/// A trace whose contents may be read.
44///
45/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
46/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
47/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
48pub trait TraceReader: LayoutExt {
49    /// The type of an immutable collection of updates.
50    type Batch: 'static
51        + Clone
52        + BatchReader
53        + WithLayout<Layout = Self::Layout>
54        + for<'a> LayoutExt<
55            Key<'a> = Self::Key<'a>,
56            KeyOwn = Self::KeyOwn,
57            Val<'a> = Self::Val<'a>,
58            ValOwn = Self::ValOwn,
59            Time = Self::Time,
60            TimeGat<'a> = Self::TimeGat<'a>,
61            Diff = Self::Diff,
62            DiffGat<'a> = Self::DiffGat<'a>,
63            KeyContainer = Self::KeyContainer,
64            ValContainer = Self::ValContainer,
65            TimeContainer = Self::TimeContainer,
66            DiffContainer = Self::DiffContainer,
67        >;
68
69    /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
70    type Storage;
71
72    /// The type used to enumerate the collections contents.
73    type Cursor: Cursor<Storage = Self::Storage>
74        + WithLayout<Layout = Self::Layout>
75        + for<'a> LayoutExt<
76            Key<'a> = Self::Key<'a>,
77            KeyOwn = Self::KeyOwn,
78            Val<'a> = Self::Val<'a>,
79            ValOwn = Self::ValOwn,
80            Time = Self::Time,
81            TimeGat<'a> = Self::TimeGat<'a>,
82            Diff = Self::Diff,
83            DiffGat<'a> = Self::DiffGat<'a>,
84            KeyContainer = Self::KeyContainer,
85            ValContainer = Self::ValContainer,
86            TimeContainer = Self::TimeContainer,
87            DiffContainer = Self::DiffContainer,
88        >;
89
90    /// Provides a cursor over updates contained in the trace.
91    fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
92        if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
93            cursor
94        } else {
95            panic!("unable to acquire complete cursor for trace; is it closed?");
96        }
97    }
98
99    /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
100    /// equal to an element of `upper`.
101    ///
102    /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
103    /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
104    /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
105    /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
106    fn cursor_through(
107        &mut self,
108        upper: AntichainRef<Self::Time>,
109    ) -> Option<(Self::Cursor, Self::Storage)>;
110
111    /// Advances the frontier that constrains logical compaction.
112    ///
113    /// Logical compaction is the ability of the trace to change the times of the updates it contains.
114    /// Update times may be changed as long as their comparison to all query times beyond the logical compaction
115    /// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can
116    /// be coalesced into fewer representative times.
117    ///
118    /// Logical compaction is important, as it allows the trace to forget historical distinctions between update
119    /// times, and maintain a compact memory footprint over an unbounded update history.
120    ///
121    /// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent updates,
122    /// but loses the ability to observe historical detail that is not beyond `frontier`.
123    ///
124    /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
125    /// this method, or the initial value of `get_logical_compaction()` if this method has not yet been called.
126    fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
127
128    /// Reports the logical compaction frontier.
129    ///
130    /// All update times beyond this frontier will be presented with their original times, and all update times
131    /// not beyond this frontier will present as a time that compares identically with all query times beyond
132    /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
133    /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
134    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
135
136    /// Advances the frontier that constrains physical compaction.
137    ///
138    /// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical
139    /// compaction does not change the updates or their timestamps, although it is also the moment at which
140    /// logical compaction is most likely to happen.
141    ///
142    /// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is
143    /// what allows the trace to provide efficient random access by keys and values.
144    ///
145    /// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates,
146    /// but loses the ability to create a cursor through any frontier not beyond `frontier`.
147    ///
148    /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
149    /// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
150    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Self::Time>);
151
152    /// Reports the physical compaction frontier.
153    ///
154    /// All batches containing updates beyond this frontier will not be merged with other batches. This allows
155    /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
156    /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
157    /// other operators who need to take notice of the physical structure of update batches.
158    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
159
160    /// Maps logic across the non-empty sequence of batches in the trace.
161    ///
162    /// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
163    /// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
164    /// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
165    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
166
167    /// Reads the upper frontier of committed times.
168    ///
169    ///
170    #[inline]
171    fn read_upper(&mut self, target: &mut Antichain<Self::Time>) {
172        target.clear();
173        target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
174        self.map_batches(|batch| {
175            target.clone_from(batch.upper());
176        });
177    }
178
179    /// Advances `upper` by any empty batches.
180    ///
181    /// An empty batch whose `batch.lower` bound equals the current
182    /// contents of `upper` will advance `upper` to `batch.upper`.
183    /// Taken across all batches, this should advance `upper` across
184    /// empty batch regions.
185    fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) {
186        self.map_batches(|batch| {
187            if batch.is_empty() && batch.lower() == upper {
188                upper.clone_from(batch.upper());
189            }
190        });
191    }
192}
193
194/// An append-only collection of `(key, val, time, diff)` tuples.
195///
196/// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted
197/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above.
198///
199/// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need
200/// to return them.
201pub trait Trace: TraceReader<Batch: Batch> {
202    /// Allocates a new empty trace.
203    fn new(
204        info: ::timely::dataflow::operators::generic::OperatorInfo,
205        logging: Option<crate::logging::Logger>,
206        activator: Option<timely::scheduling::activate::Activator>,
207    ) -> Self;
208
209    /// Exert merge effort, even without updates.
210    fn exert(&mut self);
211
212    /// Sets the logic for exertion in the absence of updates.
213    ///
214    /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
215    /// indicating the level, the number of batches, and their total length in updates. It should return a number of
216    /// updates to perform, or `None` if no work is required.
217    fn set_exert_logic(&mut self, logic: ExertionLogic);
218
219    /// Introduces a batch of updates to the trace.
220    ///
221    /// Batches describe the time intervals they contain, and they should be added to the trace in contiguous
222    /// intervals. If a batch arrives with a lower bound that does not equal the upper bound of the most recent
223    /// addition, the trace will add an empty batch. It is an error to then try to populate that region of time.
224    ///
225    /// This restriction could be relaxed, especially if we discover ways in which batch interval order could
226    /// commute. For now, the trace should complain, to the extent that it cares about contiguous intervals.
227    fn insert(&mut self, batch: Self::Batch);
228
229    /// Introduces an empty batch concluding the trace.
230    ///
231    /// This method should be logically equivalent to introducing an empty batch whose lower frontier equals
232    /// the upper frontier of the most recently introduced batch, and whose upper frontier is empty.
233    fn close(&mut self);
234}
235
236use crate::trace::implementations::WithLayout;
237
238/// A batch of updates whose contents may be read.
239///
240/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
241/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
242/// especially useful for views derived from other sources in ways that prevent the construction of batches
243/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
244pub trait BatchReader: LayoutExt + Sized {
245    /// The type used to enumerate the batch's contents.
246    type Cursor: Cursor<Storage = Self>
247        + WithLayout<Layout = Self::Layout>
248        + for<'a> LayoutExt<
249            Key<'a> = Self::Key<'a>,
250            KeyOwn = Self::KeyOwn,
251            Val<'a> = Self::Val<'a>,
252            ValOwn = Self::ValOwn,
253            Time = Self::Time,
254            TimeGat<'a> = Self::TimeGat<'a>,
255            Diff = Self::Diff,
256            DiffGat<'a> = Self::DiffGat<'a>,
257            KeyContainer = Self::KeyContainer,
258            ValContainer = Self::ValContainer,
259            TimeContainer = Self::TimeContainer,
260            DiffContainer = Self::DiffContainer,
261        >;
262
263    /// Acquires a cursor to the batch's contents.
264    fn cursor(&self) -> Self::Cursor;
265    /// The number of updates in the batch.
266    fn len(&self) -> usize;
267    /// True if the batch is empty.
268    fn is_empty(&self) -> bool {
269        self.len() == 0
270    }
271    /// Describes the times of the updates in the batch.
272    fn description(&self) -> &Description<Self::Time>;
273
274    /// All times in the batch are greater or equal to an element of `lower`.
275    fn lower(&self) -> &Antichain<Self::Time> {
276        self.description().lower()
277    }
278    /// All times in the batch are not greater or equal to any element of `upper`.
279    fn upper(&self) -> &Antichain<Self::Time> {
280        self.description().upper()
281    }
282}
283
284/// An immutable collection of updates.
285pub trait Batch: BatchReader + Sized {
286    /// A type used to progressively merge batches.
287    type Merger: Merger<Self>;
288
289    /// Initiates the merging of consecutive batches.
290    ///
291    /// The result of this method can be exercised to eventually produce the same result
292    /// that a call to `self.merge(other)` would produce, but it can be done in a measured
293    /// fashion. This can help to avoid latency spikes where a large merge needs to happen.
294    fn begin_merge(
295        &self,
296        other: &Self,
297        compaction_frontier: AntichainRef<Self::Time>,
298    ) -> Self::Merger {
299        Self::Merger::new(self, other, compaction_frontier)
300    }
301
302    /// Produce an empty batch over the indicated interval.
303    fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self;
304}
305
306/// Functionality for collecting and batching updates.
307pub trait Batcher {
308    /// Type pushed into the batcher.
309    type Input;
310    /// Type produced by the batcher.
311    type Output;
312    /// Times at which batches are formed.
313    type Time: Timestamp;
314    /// Allocates a new empty batcher.
315    fn new(logger: Option<Logger>, operator_id: usize) -> Self;
316    /// Adds an unordered container of elements to the batcher.
317    fn push_container(&mut self, batch: &mut Self::Input);
318    /// Returns all updates not greater or equal to an element of `upper`.
319    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
320        &mut self,
321        upper: Antichain<Self::Time>,
322    ) -> B::Output;
323    /// Returns the lower envelope of contained update times.
324    fn frontier(&mut self) -> AntichainRef<'_, Self::Time>;
325}
326
327/// Functionality for building batches from ordered update sequences.
328pub trait Builder: Sized {
329    /// Input item type.
330    type Input;
331    /// Timestamp type.
332    type Time: Timestamp;
333    /// Output batch type.
334    type Output;
335
336    /// Allocates an empty builder.
337    ///
338    /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
339    // #[deprecated]
340    fn new() -> Self {
341        Self::with_capacity(0, 0, 0)
342    }
343    /// Allocates an empty builder with capacity for the specified keys, values, and updates.
344    ///
345    /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
346    fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
347    /// Adds a chunk of elements to the batch.
348    ///
349    /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
350    fn push(&mut self, chunk: &mut Self::Input);
351    /// Completes building and returns the batch.
352    fn done(self, description: Description<Self::Time>) -> Self::Output;
353
354    /// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds.
355    ///
356    /// This method relies on the chain only containing updates greater or equal to the lower frontier,
357    /// and not greater or equal to the upper frontier, as encoded in the description. Chains must also
358    /// be sorted and consolidated.
359    fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
360}
361
362/// Represents a merge in progress.
363pub trait Merger<Output: Batch> {
364    /// Creates a new merger to merge the supplied batches, optionally compacting
365    /// up to the supplied frontier.
366    fn new(
367        source1: &Output,
368        source2: &Output,
369        compaction_frontier: AntichainRef<Output::Time>,
370    ) -> Self;
371    /// Perform some amount of work, decrementing `fuel`.
372    ///
373    /// If `fuel` is non-zero after the call, the merging is complete and
374    /// one should call `done` to extract the merged results.
375    fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
376    /// Extracts merged results.
377    ///
378    /// This method should only be called after `work` has been called and
379    /// has not brought `fuel` to zero. Otherwise, the merge is still in
380    /// progress.
381    fn done(self) -> Output;
382}
383
384/// Blanket implementations for reference counted batches.
385pub mod rc_blanket_impls {
386
387    use std::rc::Rc;
388
389    use super::{Batch, BatchReader, Builder, Cursor, Description, Merger};
390    use timely::progress::{frontier::AntichainRef, Antichain};
391
392    impl<B: BatchReader> WithLayout for Rc<B> {
393        type Layout = B::Layout;
394    }
395
396    impl<B: BatchReader> BatchReader for Rc<B> {
397        /// The type used to enumerate the batch's contents.
398        type Cursor = RcBatchCursor<B::Cursor>;
399        /// Acquires a cursor to the batch's contents.
400        fn cursor(&self) -> Self::Cursor {
401            RcBatchCursor::new((**self).cursor())
402        }
403
404        /// The number of updates in the batch.
405        fn len(&self) -> usize {
406            (**self).len()
407        }
408        /// Describes the times of the updates in the batch.
409        fn description(&self) -> &Description<Self::Time> {
410            (**self).description()
411        }
412    }
413
414    /// Wrapper to provide cursor to nested scope.
415    pub struct RcBatchCursor<C> {
416        cursor: C,
417    }
418
419    use crate::trace::implementations::WithLayout;
420    impl<C: Cursor> WithLayout for RcBatchCursor<C> {
421        type Layout = C::Layout;
422    }
423
424    impl<C> RcBatchCursor<C> {
425        fn new(cursor: C) -> Self {
426            RcBatchCursor { cursor }
427        }
428    }
429
430    impl<C: Cursor> Cursor for RcBatchCursor<C> {
431        type Storage = Rc<C::Storage>;
432
433        #[inline]
434        fn key_valid(&self, storage: &Self::Storage) -> bool {
435            self.cursor.key_valid(storage)
436        }
437        #[inline]
438        fn val_valid(&self, storage: &Self::Storage) -> bool {
439            self.cursor.val_valid(storage)
440        }
441
442        #[inline]
443        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
444            self.cursor.key(storage)
445        }
446        #[inline]
447        fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
448            self.cursor.val(storage)
449        }
450
451        #[inline]
452        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
453            self.cursor.get_key(storage)
454        }
455        #[inline]
456        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
457            self.cursor.get_val(storage)
458        }
459
460        #[inline]
461        fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
462            &mut self,
463            storage: &Self::Storage,
464            logic: L,
465        ) {
466            self.cursor.map_times(storage, logic)
467        }
468
469        #[inline]
470        fn step_key(&mut self, storage: &Self::Storage) {
471            self.cursor.step_key(storage)
472        }
473        #[inline]
474        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
475            self.cursor.seek_key(storage, key)
476        }
477
478        #[inline]
479        fn step_val(&mut self, storage: &Self::Storage) {
480            self.cursor.step_val(storage)
481        }
482        #[inline]
483        fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
484            self.cursor.seek_val(storage, val)
485        }
486
487        #[inline]
488        fn rewind_keys(&mut self, storage: &Self::Storage) {
489            self.cursor.rewind_keys(storage)
490        }
491        #[inline]
492        fn rewind_vals(&mut self, storage: &Self::Storage) {
493            self.cursor.rewind_vals(storage)
494        }
495    }
496
497    /// An immutable collection of updates.
498    impl<B: Batch> Batch for Rc<B> {
499        type Merger = RcMerger<B>;
500        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
501            Rc::new(B::empty(lower, upper))
502        }
503    }
504
505    /// Wrapper type for building reference counted batches.
506    pub struct RcBuilder<B: Builder> {
507        builder: B,
508    }
509
510    /// Functionality for building batches from ordered update sequences.
511    impl<B: Builder> Builder for RcBuilder<B> {
512        type Input = B::Input;
513        type Time = B::Time;
514        type Output = Rc<B::Output>;
515        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
516            RcBuilder {
517                builder: B::with_capacity(keys, vals, upds),
518            }
519        }
520        fn push(&mut self, input: &mut Self::Input) {
521            self.builder.push(input)
522        }
523        fn done(self, description: Description<Self::Time>) -> Rc<B::Output> {
524            Rc::new(self.builder.done(description))
525        }
526        fn seal(
527            chain: &mut Vec<Self::Input>,
528            description: Description<Self::Time>,
529        ) -> Self::Output {
530            Rc::new(B::seal(chain, description))
531        }
532    }
533
534    /// Wrapper type for merging reference counted batches.
535    pub struct RcMerger<B: Batch> {
536        merger: B::Merger,
537    }
538
539    /// Represents a merge in progress.
540    impl<B: Batch> Merger<Rc<B>> for RcMerger<B> {
541        fn new(
542            source1: &Rc<B>,
543            source2: &Rc<B>,
544            compaction_frontier: AntichainRef<B::Time>,
545        ) -> Self {
546            RcMerger {
547                merger: B::begin_merge(source1, source2, compaction_frontier),
548            }
549        }
550        fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) {
551            self.merger.work(source1, source2, fuel)
552        }
553        fn done(self) -> Rc<B> {
554            Rc::new(self.merger.done())
555        }
556    }
557}