differential_dataflow/trace/mod.rs
1//! Traits and datastructures representing a collection trace.
2//!
3//! A collection trace is a set of updates of the form `(key, val, time, diff)`, which determine the contents
4//! of a collection at given times by accumulating updates whose time field is less or equal to the target field.
5//!
6//! The `Trace` trait describes those types and methods that a data structure must implement to be viewed as a
7//! collection trace. This trait allows operator implementations to be generic with respect to the type of trace,
8//! and allows various data structures to be interpretable as multiple different types of trace.
9
10pub mod cursor;
11pub mod description;
12pub mod implementations;
13pub mod wrappers;
14
15use timely::progress::{Antichain, frontier::AntichainRef};
16use timely::progress::Timestamp;
17
18use crate::logging::Logger;
19use crate::difference::Semigroup;
20use crate::IntoOwned;
21use crate::lattice::Lattice;
22pub use self::cursor::Cursor;
23pub use self::description::Description;
24
25/// A type used to express how much effort a trace should exert even in the absence of updates.
26pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize)])->Option<usize>+Send+Sync>;
27
28// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
29// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
30// and vals that change the `Ord` implementation, or stash hash codes, or the like.
31//
32// This complicates what requirements we make so that the trace is still usable by someone who knows only about
33// the base key and value types. For example, the complex types should likely dereference to the simpler types,
34// so that the user can make sense of the result as if they were given references to the simpler types. At the
35// same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint)
36// and we should, somehow, be able to take a reference to the simple types to compare against the more complex
37// types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and
38// really don't need anything more complex than a reference, but we can't form an owned copy of the complex type
39// without cloning it.
40//
41// We could just start by cloning things. Worry about wrapping references later on.
42
43/// A trace whose contents may be read.
44///
45/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
46/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
47/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
48pub trait TraceReader {
49
50 /// Key by which updates are indexed.
51 type Key<'a>: Copy + Clone + Ord;
52 /// Values associated with keys.
53 type Val<'a>: Copy + Clone;
54 /// Timestamps associated with updates
55 type Time: Timestamp + Lattice + Ord + Clone;
56 /// Borrowed form of timestamp.
57 type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
58 /// Owned form of update difference.
59 type Diff: Semigroup + 'static;
60 /// Borrowed form of update difference.
61 type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;
62
63 /// The type of an immutable collection of updates.
64 type Batch: for<'a> BatchReader<Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static;
65
66 /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
67 type Storage;
68
69 /// The type used to enumerate the collections contents.
70 type Cursor: for<'a> Cursor<Storage=Self::Storage, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>;
71
72 /// Provides a cursor over updates contained in the trace.
73 fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
74 if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
75 cursor
76 }
77 else {
78 panic!("unable to acquire complete cursor for trace; is it closed?");
79 }
80 }
81
82 /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
83 /// equal to an element of `upper`.
84 ///
85 /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
86 /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
87 /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
88 /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
89 fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
90
91 /// Advances the frontier that constrains logical compaction.
92 ///
93 /// Logical compaction is the ability of the trace to change the times of the updates it contains.
94 /// Update times may be changed as long as their comparison to all query times beyond the logical compaction
95 /// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can
96 /// be coalesced into fewer representative times.
97 ///
98 /// Logical compaction is important, as it allows the trace to forget historical distinctions between update
99 /// times, and maintain a compact memory footprint over an unbounded update history.
100 ///
101 /// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent updates,
102 /// but loses the ability to observe historical detail that is not beyond `frontier`.
103 ///
104 /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
105 /// this method, or the initial value of `get_logical_compaction()` if this method has not yet been called.
106 fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
107
108 /// Deprecated form of `set_logical_compaction`.
109 #[deprecated(since = "0.11", note = "please use `set_logical_compaction`")]
110 fn advance_by(&mut self, frontier: AntichainRef<Self::Time>) {
111 self.set_logical_compaction(frontier);
112 }
113
114 /// Reports the logical compaction frontier.
115 ///
116 /// All update times beyond this frontier will be presented with their original times, and all update times
117 /// not beyond this frontier will present as a time that compares identically with all query times beyond
118 /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
119 /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
120 fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time>;
121
122 /// Deprecated form of `get_logical_compaction`.
123 #[deprecated(since = "0.11", note = "please use `get_logical_compaction`")]
124 fn advance_frontier(&mut self) -> AntichainRef<Self::Time> {
125 self.get_logical_compaction()
126 }
127
128 /// Advances the frontier that constrains physical compaction.
129 ///
130 /// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical
131 /// compaction does not change the updates or their timestamps, although it is also the moment at which
132 /// logical compaction is most likely to happen.
133 ///
134 /// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is
135 /// what allows the trace to provide efficient random access by keys and values.
136 ///
137 /// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates,
138 /// but loses the ability to create a cursor through any frontier not beyond `frontier`.
139 ///
140 /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
141 /// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
142 fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
143
144 /// Deprecated form of `set_physical_compaction`.
145 #[deprecated(since = "0.11", note = "please use `set_physical_compaction`")]
146 fn distinguish_since(&mut self, frontier: AntichainRef<Self::Time>) {
147 self.set_physical_compaction(frontier);
148 }
149
150 /// Reports the physical compaction frontier.
151 ///
152 /// All batches containing updates beyond this frontier will not be merged with other batches. This allows
153 /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
154 /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
155 /// other operators who need to take notice of the physical structure of update batches.
156 fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time>;
157
158 /// Deprecated form of `get_physical_compaction`.
159 #[deprecated(since = "0.11", note = "please use `get_physical_compaction`")]
160 fn distinguish_frontier(&mut self) -> AntichainRef<Self::Time> {
161 self.get_physical_compaction()
162 }
163
164 /// Maps logic across the non-empty sequence of batches in the trace.
165 ///
166 /// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
167 /// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
168 /// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
169 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
170
171 /// Reads the upper frontier of committed times.
172 ///
173 ///
174 #[inline]
175 fn read_upper(&mut self, target: &mut Antichain<Self::Time>) {
176 target.clear();
177 target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
178 self.map_batches(|batch| {
179 target.clone_from(batch.upper());
180 });
181 }
182
183 /// Advances `upper` by any empty batches.
184 ///
185 /// An empty batch whose `batch.lower` bound equals the current
186 /// contents of `upper` will advance `upper` to `batch.upper`.
187 /// Taken across all batches, this should advance `upper` across
188 /// empty batch regions.
189 fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) {
190 self.map_batches(|batch| {
191 if batch.is_empty() && batch.lower() == upper {
192 upper.clone_from(batch.upper());
193 }
194 });
195 }
196
197}
198
199/// An append-only collection of `(key, val, time, diff)` tuples.
200///
201/// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted
202/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above.
203///
204/// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need
205/// to return them.
206pub trait Trace : TraceReader
207where <Self as TraceReader>::Batch: Batch {
208
209 /// Allocates a new empty trace.
210 fn new(
211 info: ::timely::dataflow::operators::generic::OperatorInfo,
212 logging: Option<crate::logging::Logger>,
213 activator: Option<timely::scheduling::activate::Activator>,
214 ) -> Self;
215
216 /// Exert merge effort, even without updates.
217 fn exert(&mut self);
218
219 /// Sets the logic for exertion in the absence of updates.
220 ///
221 /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
222 /// indicating the level, the number of batches, and their total length in updates. It should return a number of
223 /// updates to perform, or `None` if no work is required.
224 fn set_exert_logic(&mut self, logic: ExertionLogic);
225
226 /// Introduces a batch of updates to the trace.
227 ///
228 /// Batches describe the time intervals they contain, and they should be added to the trace in contiguous
229 /// intervals. If a batch arrives with a lower bound that does not equal the upper bound of the most recent
230 /// addition, the trace will add an empty batch. It is an error to then try to populate that region of time.
231 ///
232 /// This restriction could be relaxed, especially if we discover ways in which batch interval order could
233 /// commute. For now, the trace should complain, to the extent that it cares about contiguous intervals.
234 fn insert(&mut self, batch: Self::Batch);
235
236 /// Introduces an empty batch concluding the trace.
237 ///
238 /// This method should be logically equivalent to introducing an empty batch whose lower frontier equals
239 /// the upper frontier of the most recently introduced batch, and whose upper frontier is empty.
240 fn close(&mut self);
241}
242
243/// A batch of updates whose contents may be read.
244///
245/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
246/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
247/// especially useful for views derived from other sources in ways that prevent the construction of batches
248/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
249pub trait BatchReader
250where
251 Self: ::std::marker::Sized,
252{
253 /// Key by which updates are indexed.
254 type Key<'a>: Copy + Clone + Ord;
255 /// Values associated with keys.
256 type Val<'a>: Copy + Clone;
257 /// Timestamps associated with updates
258 type Time: Timestamp + Lattice + Ord + Clone;
259 /// Borrowed form of timestamp.
260 type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
261 /// Owned form of update difference.
262 type Diff: Semigroup + 'static;
263 /// Borrowed form of update difference.
264 type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;
265
266 /// The type used to enumerate the batch's contents.
267 type Cursor: for<'a> Cursor<Storage=Self, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>;
268 /// Acquires a cursor to the batch's contents.
269 fn cursor(&self) -> Self::Cursor;
270 /// The number of updates in the batch.
271 fn len(&self) -> usize;
272 /// True if the batch is empty.
273 fn is_empty(&self) -> bool { self.len() == 0 }
274 /// Describes the times of the updates in the batch.
275 fn description(&self) -> &Description<Self::Time>;
276
277 /// All times in the batch are greater or equal to an element of `lower`.
278 fn lower(&self) -> &Antichain<Self::Time> { self.description().lower() }
279 /// All times in the batch are not greater or equal to any element of `upper`.
280 fn upper(&self) -> &Antichain<Self::Time> { self.description().upper() }
281}
282
283/// An immutable collection of updates.
284pub trait Batch : BatchReader where Self: ::std::marker::Sized {
285 /// A type used to progressively merge batches.
286 type Merger: Merger<Self>;
287
288 /// Initiates the merging of consecutive batches.
289 ///
290 /// The result of this method can be exercised to eventually produce the same result
291 /// that a call to `self.merge(other)` would produce, but it can be done in a measured
292 /// fashion. This can help to avoid latency spikes where a large merge needs to happen.
293 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
294 Self::Merger::new(self, other, compaction_frontier)
295 }
296
297 /// Produce an empty batch over the indicated interval.
298 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self;
299}
300
301/// Functionality for collecting and batching updates.
302pub trait Batcher {
303 /// Type pushed into the batcher.
304 type Input;
305 /// Type produced by the batcher.
306 type Output;
307 /// Times at which batches are formed.
308 type Time: Timestamp;
309 /// Allocates a new empty batcher.
310 fn new(logger: Option<Logger>, operator_id: usize) -> Self;
311 /// Adds an unordered container of elements to the batcher.
312 fn push_container(&mut self, batch: &mut Self::Input);
313 /// Returns all updates not greater or equal to an element of `upper`.
314 fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
315 /// Returns the lower envelope of contained update times.
316 fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
317}
318
319/// Functionality for building batches from ordered update sequences.
320pub trait Builder: Sized {
321 /// Input item type.
322 type Input;
323 /// Timestamp type.
324 type Time: Timestamp;
325 /// Output batch type.
326 type Output;
327
328 /// Allocates an empty builder.
329 ///
330 /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
331 // #[deprecated]
332 fn new() -> Self { Self::with_capacity(0, 0, 0) }
333 /// Allocates an empty builder with capacity for the specified keys, values, and updates.
334 ///
335 /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
336 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
337 /// Adds a chunk of elements to the batch.
338 ///
339 /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
340 fn push(&mut self, chunk: &mut Self::Input);
341 /// Completes building and returns the batch.
342 fn done(self, description: Description<Self::Time>) -> Self::Output;
343
344 /// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds.
345 ///
346 /// This method relies on the chain only containing updates greater or equal to the lower frontier,
347 /// and not greater or equal to the upper frontier, as encoded in the description. Chains must also
348 /// be sorted and consolidated.
349 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
350}
351
352/// Represents a merge in progress.
353pub trait Merger<Output: Batch> {
354 /// Creates a new merger to merge the supplied batches, optionally compacting
355 /// up to the supplied frontier.
356 fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
357 /// Perform some amount of work, decrementing `fuel`.
358 ///
359 /// If `fuel` is non-zero after the call, the merging is complete and
360 /// one should call `done` to extract the merged results.
361 fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
362 /// Extracts merged results.
363 ///
364 /// This method should only be called after `work` has been called and
365 /// has not brought `fuel` to zero. Otherwise, the merge is still in
366 /// progress.
367 fn done(self) -> Output;
368}
369
370
371/// Blanket implementations for reference counted batches.
372pub mod rc_blanket_impls {
373
374 use std::rc::Rc;
375
376 use timely::progress::{Antichain, frontier::AntichainRef};
377 use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
378
379 impl<B: BatchReader> BatchReader for Rc<B> {
380 type Key<'a> = B::Key<'a>;
381 type Val<'a> = B::Val<'a>;
382 type Time = B::Time;
383 type TimeGat<'a> = B::TimeGat<'a>;
384 type Diff = B::Diff;
385 type DiffGat<'a> = B::DiffGat<'a>;
386
387 /// The type used to enumerate the batch's contents.
388 type Cursor = RcBatchCursor<B::Cursor>;
389 /// Acquires a cursor to the batch's contents.
390 fn cursor(&self) -> Self::Cursor {
391 RcBatchCursor::new((**self).cursor())
392 }
393
394 /// The number of updates in the batch.
395 fn len(&self) -> usize { (**self).len() }
396 /// Describes the times of the updates in the batch.
397 fn description(&self) -> &Description<Self::Time> { (**self).description() }
398 }
399
400 /// Wrapper to provide cursor to nested scope.
401 pub struct RcBatchCursor<C> {
402 cursor: C,
403 }
404
405 impl<C> RcBatchCursor<C> {
406 fn new(cursor: C) -> Self {
407 RcBatchCursor {
408 cursor,
409 }
410 }
411 }
412
413 impl<C: Cursor> Cursor for RcBatchCursor<C> {
414
415 type Key<'a> = C::Key<'a>;
416 type Val<'a> = C::Val<'a>;
417 type Time = C::Time;
418 type TimeGat<'a> = C::TimeGat<'a>;
419 type Diff = C::Diff;
420 type DiffGat<'a> = C::DiffGat<'a>;
421
422 type Storage = Rc<C::Storage>;
423
424 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
425 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
426
427 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
428 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
429
430 #[inline]
431 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
432 self.cursor.map_times(storage, logic)
433 }
434
435 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
436 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
437
438 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
439 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
440
441 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
442 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
443 }
444
445 /// An immutable collection of updates.
446 impl<B: Batch> Batch for Rc<B> {
447 type Merger = RcMerger<B>;
448 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
449 Rc::new(B::empty(lower, upper))
450 }
451 }
452
453 /// Wrapper type for building reference counted batches.
454 pub struct RcBuilder<B: Builder> { builder: B }
455
456 /// Functionality for building batches from ordered update sequences.
457 impl<B: Builder> Builder for RcBuilder<B> {
458 type Input = B::Input;
459 type Time = B::Time;
460 type Output = Rc<B::Output>;
461 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
462 fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) }
463 fn done(self, description: Description<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(description)) }
464 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
465 Rc::new(B::seal(chain, description))
466 }
467 }
468
469 /// Wrapper type for merging reference counted batches.
470 pub struct RcMerger<B:Batch> { merger: B::Merger }
471
472 /// Represents a merge in progress.
473 impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
474 fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
475 fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
476 fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
477 }
478}