Skip to main content

differential_dataflow/trace/
mod.rs

1//! Traits and datastructures representing a collection trace.
2//!
3//! A collection trace is a set of updates of the form `(key, val, time, diff)`, which determine the contents
4//! of a collection at given times by accumulating updates whose time field is less or equal to the target field.
5//!
6//! The `Trace` trait describes those types and methods that a data structure must implement to be viewed as a
7//! collection trace. This trait allows operator implementations to be generic with respect to the type of trace,
8//! and allows various data structures to be interpretable as multiple different types of trace.
9
10pub mod cursor;
11pub mod description;
12pub mod implementations;
13pub mod wrappers;
14
15use timely::container::PushInto;
16use timely::progress::{Antichain, frontier::AntichainRef};
17use timely::progress::Timestamp;
18
19use crate::logging::Logger;
20pub use self::cursor::Cursor;
21pub use self::description::Description;
22
23use crate::trace::implementations::LayoutExt;
24
25/// A type used to express how much effort a trace should exert even in the absence of updates.
26pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize)])->Option<usize>+Send+Sync>;
27
28//     The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
29//     values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
30//     and vals that change the `Ord` implementation, or stash hash codes, or the like.
31//
32//     This complicates what requirements we make so that the trace is still usable by someone who knows only about
33//     the base key and value types. For example, the complex types should likely dereference to the simpler types,
34//    so that the user can make sense of the result as if they were given references to the simpler types. At the
35//  same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint)
36//  and we should, somehow, be able to take a reference to the simple types to compare against the more complex
37//  types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and
38//  really don't need anything more complex than a reference, but we can't form an owned copy of the complex type
39//  without cloning it.
40//
41//  We could just start by cloning things. Worry about wrapping references later on.
42
43/// A trace whose contents may be read.
44///
45/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
46/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
47/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
48pub trait TraceReader : LayoutExt {
49
50    /// The type of an immutable collection of updates.
51    type Batch:
52        'static +
53        Clone +
54        BatchReader +
55        WithLayout<Layout = Self::Layout> +
56        for<'a> LayoutExt<
57            Key<'a> = Self::Key<'a>,
58            Val<'a> = Self::Val<'a>,
59            ValOwn = Self::ValOwn,
60            Time = Self::Time,
61            TimeGat<'a> = Self::TimeGat<'a>,
62            Diff = Self::Diff,
63            DiffGat<'a> = Self::DiffGat<'a>,
64            KeyContainer = Self::KeyContainer,
65            ValContainer = Self::ValContainer,
66            TimeContainer = Self::TimeContainer,
67            DiffContainer = Self::DiffContainer,
68        >;
69
70
71    /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
72    type Storage;
73
74    /// The type used to enumerate the collections contents.
75    type Cursor:
76        Cursor<Storage=Self::Storage> +
77        WithLayout<Layout = Self::Layout> +
78        for<'a> LayoutExt<
79            Key<'a> = Self::Key<'a>,
80            Val<'a> = Self::Val<'a>,
81            ValOwn = Self::ValOwn,
82            Time = Self::Time,
83            TimeGat<'a> = Self::TimeGat<'a>,
84            Diff = Self::Diff,
85            DiffGat<'a> = Self::DiffGat<'a>,
86            KeyContainer = Self::KeyContainer,
87            ValContainer = Self::ValContainer,
88            TimeContainer = Self::TimeContainer,
89            DiffContainer = Self::DiffContainer,
90        >;
91
92
93    /// Provides a cursor over updates contained in the trace.
94    fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
95        if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
96            cursor
97        }
98        else {
99            panic!("unable to acquire complete cursor for trace; is it closed?");
100        }
101    }
102
103    /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
104    /// equal to an element of `upper`.
105    ///
106    /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
107    /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
108    /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
109    /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
110    fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
111
112    /// Advances the frontier that constrains logical compaction.
113    ///
114    /// Logical compaction is the ability of the trace to change the times of the updates it contains.
115    /// Update times may be changed as long as their comparison to all query times beyond the logical compaction
116    /// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can
117    /// be coalesced into fewer representative times.
118    ///
119    /// Logical compaction is important, as it allows the trace to forget historical distinctions between update
120    /// times, and maintain a compact memory footprint over an unbounded update history.
121    ///
122    /// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent updates,
123    /// but loses the ability to observe historical detail that is not beyond `frontier`.
124    ///
125    /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
126    /// this method, or the initial value of `get_logical_compaction()` if this method has not yet been called.
127    fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
128
129    /// Reports the logical compaction frontier.
130    ///
131    /// All update times beyond this frontier will be presented with their original times, and all update times
132    /// not beyond this frontier will present as a time that compares identically with all query times beyond
133    /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
134    /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
135    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
136
137    /// Advances the frontier that constrains physical compaction.
138    ///
139    /// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical
140    /// compaction does not change the updates or their timestamps, although it is also the moment at which
141    /// logical compaction is most likely to happen.
142    ///
143    /// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is
144    /// what allows the trace to provide efficient random access by keys and values.
145    ///
146    /// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates,
147    /// but loses the ability to create a cursor through any frontier not beyond `frontier`.
148    ///
149    /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
150    /// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
151    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Self::Time>);
152
153    /// Reports the physical compaction frontier.
154    ///
155    /// All batches containing updates beyond this frontier will not be merged with other batches. This allows
156    /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
157    /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
158    /// other operators who need to take notice of the physical structure of update batches.
159    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
160
161    /// Maps logic across the non-empty sequence of batches in the trace.
162    ///
163    /// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
164    /// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
165    /// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
166    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
167
168    /// Reads the upper frontier of committed times.
169    ///
170    ///
171    #[inline]
172    fn read_upper(&mut self, target: &mut Antichain<Self::Time>) {
173        target.clear();
174        target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
175        self.map_batches(|batch| {
176            target.clone_from(batch.upper());
177        });
178    }
179
180    /// Advances `upper` by any empty batches.
181    ///
182    /// An empty batch whose `batch.lower` bound equals the current
183    /// contents of `upper` will advance `upper` to `batch.upper`.
184    /// Taken across all batches, this should advance `upper` across
185    /// empty batch regions.
186    fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) {
187        self.map_batches(|batch| {
188            if batch.is_empty() && batch.lower() == upper {
189                upper.clone_from(batch.upper());
190            }
191        });
192    }
193
194}
195
196/// An append-only collection of `(key, val, time, diff)` tuples.
197///
198/// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted
199/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above.
200///
201/// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need
202/// to return them.
203pub trait Trace : TraceReader<Batch: Batch> {
204
205    /// Allocates a new empty trace.
206    fn new(
207        info: ::timely::dataflow::operators::generic::OperatorInfo,
208        logging: Option<crate::logging::Logger>,
209        activator: Option<timely::scheduling::activate::Activator>,
210    ) -> Self;
211
212    /// Exert merge effort, even without updates.
213    fn exert(&mut self);
214
215    /// Sets the logic for exertion in the absence of updates.
216    ///
217    /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
218    /// indicating the level, the number of batches, and their total length in updates. It should return a number of
219    /// updates to perform, or `None` if no work is required.
220    fn set_exert_logic(&mut self, logic: ExertionLogic);
221
222    /// Introduces a batch of updates to the trace.
223    ///
224    /// Batches describe the time intervals they contain, and they should be added to the trace in contiguous
225    /// intervals. If a batch arrives with a lower bound that does not equal the upper bound of the most recent
226    /// addition, the trace will add an empty batch. It is an error to then try to populate that region of time.
227    ///
228    /// This restriction could be relaxed, especially if we discover ways in which batch interval order could
229    /// commute. For now, the trace should complain, to the extent that it cares about contiguous intervals.
230    fn insert(&mut self, batch: Self::Batch);
231
232    /// Introduces an empty batch concluding the trace.
233    ///
234    /// This method should be logically equivalent to introducing an empty batch whose lower frontier equals
235    /// the upper frontier of the most recently introduced batch, and whose upper frontier is empty.
236    fn close(&mut self);
237}
238
239use crate::trace::implementations::WithLayout;
240
241/// A batch of updates whose contents may be read.
242///
243/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
244/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
245/// especially useful for views derived from other sources in ways that prevent the construction of batches
246/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
247pub trait BatchReader : LayoutExt + Sized {
248
249    /// The type used to enumerate the batch's contents.
250    type Cursor:
251        Cursor<Storage=Self> +
252        WithLayout<Layout = Self::Layout> +
253        for<'a> LayoutExt<
254            Key<'a> = Self::Key<'a>,
255            Val<'a> = Self::Val<'a>,
256            ValOwn = Self::ValOwn,
257            Time = Self::Time,
258            TimeGat<'a> = Self::TimeGat<'a>,
259            Diff = Self::Diff,
260            DiffGat<'a> = Self::DiffGat<'a>,
261            KeyContainer = Self::KeyContainer,
262            ValContainer = Self::ValContainer,
263            TimeContainer = Self::TimeContainer,
264            DiffContainer = Self::DiffContainer,
265        >;
266
267      /// Acquires a cursor to the batch's contents.
268    fn cursor(&self) -> Self::Cursor;
269    /// The number of updates in the batch.
270    fn len(&self) -> usize;
271    /// True if the batch is empty.
272    fn is_empty(&self) -> bool { self.len() == 0 }
273    /// Describes the times of the updates in the batch.
274    fn description(&self) -> &Description<Self::Time>;
275
276    /// All times in the batch are greater or equal to an element of `lower`.
277    fn lower(&self) -> &Antichain<Self::Time> { self.description().lower() }
278    /// All times in the batch are not greater or equal to any element of `upper`.
279    fn upper(&self) -> &Antichain<Self::Time> { self.description().upper() }
280}
281
282/// An immutable collection of updates.
283pub trait Batch : BatchReader + Sized {
284    /// A type used to progressively merge batches.
285    type Merger: Merger<Self>;
286
287    /// Initiates the merging of consecutive batches.
288    ///
289    /// The result of this method can be exercised to eventually produce the same result
290    /// that a call to `self.merge(other)` would produce, but it can be done in a measured
291    /// fashion. This can help to avoid latency spikes where a large merge needs to happen.
292    fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
293        Self::Merger::new(self, other, compaction_frontier)
294    }
295
296    /// Produce an empty batch over the indicated interval.
297    fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self;
298}
299
300/// Functionality for collecting and batching updates.
301///
302/// Accepts containers of type `Output` via [`PushInto`] and produces output batches of the same
303/// type. Callers are responsible for converting raw input data into `Output` containers (e.g.
304/// using a chunker) before pushing into the batcher.
305pub trait Batcher: PushInto<Self::Output> {
306    /// Type produced by the batcher, and also the type it consumes.
307    type Output: Default;
308    /// Times at which batches are formed.
309    type Time: Timestamp;
310    /// Allocates a new empty batcher.
311    fn new(logger: Option<Logger>, operator_id: usize) -> Self;
312    /// Returns all updates not greater or equal to an element of `upper`, as a sorted and
313    /// consolidated chain together with the description that bounds them.
314    ///
315    /// The returned chain is suitable to hand directly to [`Builder::seal`].
316    fn seal(&mut self, upper: Antichain<Self::Time>) -> (Vec<Self::Output>, Description<Self::Time>);
317    /// Returns the lower envelope of contained update times.
318    fn frontier(&mut self) -> AntichainRef<'_, Self::Time>;
319}
320
321/// Functionality for building batches from ordered update sequences.
322pub trait Builder: Sized {
323    /// Input item type.
324    type Input;
325    /// Timestamp type.
326    type Time: Timestamp;
327    /// Output batch type.
328    type Output;
329
330    /// Allocates an empty builder.
331    ///
332    /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
333    // #[deprecated]
334    fn new() -> Self { Self::with_capacity(0, 0, 0) }
335    /// Allocates an empty builder with capacity for the specified keys, values, and updates.
336    ///
337    /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
338    fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
339    /// Adds a chunk of elements to the batch.
340    ///
341    /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
342    fn push(&mut self, chunk: &mut Self::Input);
343    /// Completes building and returns the batch.
344    fn done(self, description: Description<Self::Time>) -> Self::Output;
345
346    /// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds.
347    ///
348    /// This method relies on the chain only containing updates greater or equal to the lower frontier,
349    /// and not greater or equal to the upper frontier, as encoded in the description. Chains must also
350    /// be sorted and consolidated.
351    fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
352}
353
354/// Represents a merge in progress.
355pub trait Merger<Output: Batch> {
356    /// Creates a new merger to merge the supplied batches, optionally compacting
357    /// up to the supplied frontier.
358    fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
359    /// Perform some amount of work, decrementing `fuel`.
360    ///
361    /// If `fuel` is non-zero after the call, the merging is complete and
362    /// one should call `done` to extract the merged results.
363    fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
364    /// Extracts merged results.
365    ///
366    /// This method should only be called after `work` has been called and
367    /// has not brought `fuel` to zero. Otherwise, the merge is still in
368    /// progress.
369    fn done(self) -> Output;
370}
371
372
373/// Blanket implementations for reference counted batches.
374pub mod rc_blanket_impls {
375
376    use std::rc::Rc;
377
378    use timely::progress::{Antichain, frontier::AntichainRef};
379    use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
380
381    impl<B: BatchReader> WithLayout for Rc<B> {
382        type Layout = B::Layout;
383    }
384
385    impl<B: BatchReader> BatchReader for Rc<B> {
386
387        /// The type used to enumerate the batch's contents.
388        type Cursor = RcBatchCursor<B::Cursor>;
389        /// Acquires a cursor to the batch's contents.
390        fn cursor(&self) -> Self::Cursor {
391            RcBatchCursor::new((**self).cursor())
392        }
393
394        /// The number of updates in the batch.
395        fn len(&self) -> usize { (**self).len() }
396        /// Describes the times of the updates in the batch.
397        fn description(&self) -> &Description<Self::Time> { (**self).description() }
398    }
399
400    /// Wrapper to provide cursor to nested scope.
401    pub struct RcBatchCursor<C> {
402        cursor: C,
403    }
404
405    use crate::trace::implementations::WithLayout;
406    impl<C: Cursor> WithLayout for RcBatchCursor<C> {
407        type Layout = C::Layout;
408    }
409
410    impl<C> RcBatchCursor<C> {
411        fn new(cursor: C) -> Self {
412            RcBatchCursor {
413                cursor,
414            }
415        }
416    }
417
418    impl<C: Cursor> Cursor for RcBatchCursor<C> {
419
420        type Storage = Rc<C::Storage>;
421
422        #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
423        #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
424
425        #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
426        #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
427
428        #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
429        #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
430
431        #[inline]
432        fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
433            self.cursor.map_times(storage, logic)
434        }
435
436        #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
437        #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
438
439        #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
440        #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
441
442        #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
443        #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
444    }
445
446    /// An immutable collection of updates.
447    impl<B: Batch> Batch for Rc<B> {
448        type Merger = RcMerger<B>;
449        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
450            Rc::new(B::empty(lower, upper))
451        }
452    }
453
454    /// Wrapper type for building reference counted batches.
455    pub struct RcBuilder<B: Builder> { builder: B }
456
457    /// Functionality for building batches from ordered update sequences.
458    impl<B: Builder> Builder for RcBuilder<B> {
459        type Input = B::Input;
460        type Time = B::Time;
461        type Output = Rc<B::Output>;
462        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
463        fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) }
464        fn done(self, description: Description<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(description)) }
465        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
466            Rc::new(B::seal(chain, description))
467        }
468    }
469
470    /// Wrapper type for merging reference counted batches.
471    pub struct RcMerger<B:Batch> { merger: B::Merger }
472
473    /// Represents a merge in progress.
474    impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
475        fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
476        fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
477        fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
478    }
479}