Skip to main content

dbsp/
time.rs

1//! # Logical time
2//!
3//! Every value produced by a DBSP operator is logically labelled by the time
4//! when it was produced.  A logical time can be represented as an array of
5//! integers whose length is equal to the nesting depth of the circuit and whose
6//! `i`th element is the local time of the circuit at the `i`th nesting level.
7//! A root circuit has a 1-dimensional logical clock, which starts at 0 and
8//! increments by 1 at each clock tick, a nested circuit has a 2-dimensional
9//! logical clock, and so on.
10//!
11//! The [`Timestamp`] trait captures common functionality of logical time
12//! representation.
13//!
14//! # Lossy times
15//!
16//! In practice, if a logical time needs to be stored explicitly (which is not
17//! always necessary), we use a more compact "lossy" representation of logical
18//! time that stores just enough timing information for operators that compute
19//! on the data.
20//!
21//! ## Example: Untimed data
22//!
23//! We use unit type `()` for untimed data, where we only track values and not
24//! the time when each value was generated.
25//!
26//! The [`integrate_trace`](`crate::circuit::Stream::integrate_trace`) operator
27//! computes the union of all Z-sets in its input stream generated during the
28//! current clock epoch, i.e., since the last
29//! [`clock_start`](`crate::circuit::operator_traits::Operator::clock_start`)
30//! invocation.  Its consumers only need to know the current value of the
31//! sum and not when each update was produced.  It therefore stores its output
32//! in an untimed [`Trace`]([`crate::trace::Trace`]), i.e., a trace whose
33//! timestamp type is `()`.
34//!
35//! # Comparing times
36//!
37//! Timestamps partially order logical time.  Multidimensional timestamps, in
38//! particular, are not totally ordered: `(x1,x2) <= (y1,y2)` if and only if `x1
39//! <= y1 && x2 <= y2`, so that, e.g. `(1,2)` and `(2,1)` are not comparable.
40//! The [`PartialOrder`] trait that bounds `Timestamp` allows this logical time
41//! ordering to be separate from the ordinary [`PartialOrd`] used for, e.g.,
42//! sorting.
43
44mod antichain;
45mod product;
46
47use crate::{
48    DBData, Scope,
49    algebra::{Lattice, PartialOrder},
50    trace::Batch,
51};
52use rkyv::{Archive, Deserialize, Serialize};
53use size_of::SizeOf;
54use std::{fmt::Debug, hash::Hash};
55
56pub use antichain::{Antichain, AntichainRef};
57//pub use nested_ts32::NestedTimestamp32;
58pub use product::Product;
59
60/// Logical timestamp.
61///
62/// See [crate documentation](crate::time) for an overview of logical time in
63/// DBSP.
64// TODO: Conversion to/from the most general time representation (`[usize]`).
65// TODO: Model overflow by having `advance` return Option<Self>.
66pub trait Timestamp: DBData + PartialOrder + Lattice {
67    /// Nesting depth of the circuit running this clock.
68    ///
69    /// 0 - for the top-level clock, 1 - first-level nested circuit, etc.
70    const NESTING_DEPTH: usize;
71
72    type Nested: Timestamp;
73
74    /// A version of a batch type `B` with `Self` used as a timestamp.
75    ///
76    /// If `Self = ()`, then `TimedBatch<B>` is `B`.
77    /// Otherwise, `TimedBatch<B> = B::Timed<Self>`.
78    type TimedBatch<B: Batch<Time = ()>>: Batch<Key = B::Key, Val = B::Val, Time = Self, R = B::R>;
79
80    fn minimum() -> Self;
81
82    /// The value of the timestamp when the clock starts ticking.
83    ///
84    /// This is typically but not always equal to `Self::minimum`.  For example,
85    /// we use 1-bit timestamp that starts at value 1 (current clock epoch),
86    /// value 0 (previous epochs) can only be obtained by calling
87    /// [`recede`](`Timestamp::recede`).
88    fn clock_start() -> Self {
89        Self::minimum()
90    }
91
92    /// Advance the timestamp by one clock tick.
93    ///
94    /// Advance `self` by one clock tick by incrementing the clock at the
95    /// specified nesting level by one, while resetting all clocks at deeper
96    /// nesting levels to `0`. `scope` identifies the nesting level of the
97    /// circuit whose clock is ticking.  `0` refers to the innermost
98    /// circuit.  `1` is its parent circuit, etc. Returns the most accurate
99    /// approximation of the new timestamp value supported by this time
100    /// representation.
101    ///
102    /// # Example
103    ///
104    /// Assume a two-dimensional time modeled as (parent time, child time)
105    /// tuple.
106    ///
107    /// ```ignore
108    /// assert_eq!((2, 3).advance(0), (2, 4));
109    /// assert_eq!((2, 3).advance(1), (3, 0));
110    /// ```
111    fn advance(&self, scope: Scope) -> Self;
112
113    /// Push the timestamp back by one clock tick.
114    ///
115    /// Push `self` back by one clock cycle by decrementing the clock at the
116    /// specified nesting level by one.  `scope` identifies the nesting
117    /// level of the circuit whose clock is ticking. `0` refers to the
118    /// innermost circuit.  `1` is its parent circuit, etc.
119    ///
120    /// # Panics
121    ///
122    /// Panics if the clock at the `scope` nesting level is equal to zero
123    /// (i.e., we are running the first clock cycle) and hence cannot be
124    /// decremented.
125    fn recede(&self, scope: Scope) -> Self {
126        self.checked_recede(scope).unwrap()
127    }
128
129    /// Like `recede`, but returns `None` if the clock at level `scope` is
130    /// equal to zero (i.e., we are running the first clock cycle) and hence
131    /// cannot be decremented.
132    fn checked_recede(&self, scope: Scope) -> Option<Self>;
133
134    /// Returns the first time stamp of the current clock epoch in `scope`.
135    fn epoch_start(&self, scope: Scope) -> Self;
136
137    /// Advance `self` to the end of the current clock epoch in `scope`.
138    fn epoch_end(&self, scope: Scope) -> Self;
139}
140
141/// Zero-dimensional clock that doesn't need to count ticks.
142///
143/// This type is only used to bootstrap the recursive definition of the
144/// `WithClock` trait.  You can otherwise use `()` as the type for an empty
145/// timestamp.
146#[derive(
147    Clone, Debug, Hash, PartialOrd, Ord, PartialEq, Eq, SizeOf, Archive, Serialize, Deserialize,
148)]
149#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd, Hash))]
150#[archive(compare(PartialEq, PartialOrd))]
151#[archive_attr(doc(hidden))]
152pub struct UnitTimestamp;
153
154impl Default for UnitTimestamp {
155    fn default() -> Self {
156        UnitTimestamp
157    }
158}
159
160impl PartialOrder for UnitTimestamp {
161    fn less_equal(&self, _other: &Self) -> bool {
162        true
163    }
164}
165
166impl Lattice for UnitTimestamp {
167    fn join(&self, _other: &Self) -> Self {
168        UnitTimestamp
169    }
170
171    fn meet(&self, _other: &Self) -> Self {
172        UnitTimestamp
173    }
174}
175
176impl Timestamp for UnitTimestamp {
177    const NESTING_DEPTH: usize = 0;
178
179    type Nested = ();
180
181    type TimedBatch<B: Batch<Time = ()>> = B::Timed<Self>;
182
183    fn minimum() -> Self {
184        UnitTimestamp
185    }
186    fn advance(&self, _scope: Scope) -> Self {
187        UnitTimestamp
188    }
189    fn recede(&self, _scope: Scope) -> Self {
190        UnitTimestamp
191    }
192    fn checked_recede(&self, _scope: Scope) -> Option<Self> {
193        None
194    }
195    fn epoch_start(&self, _scope: Scope) -> Self {
196        UnitTimestamp
197    }
198    fn epoch_end(&self, _scope: Scope) -> Self {
199        UnitTimestamp
200    }
201}
202
203impl Timestamp for () {
204    const NESTING_DEPTH: usize = 0;
205
206    type TimedBatch<B: Batch<Time = ()>> = B;
207
208    type Nested = Product<u32, u32>;
209
210    fn minimum() -> Self {}
211    fn advance(&self, _scope: Scope) -> Self {}
212    fn recede(&self, _scope: Scope) -> Self {}
213    fn checked_recede(&self, _scope: Scope) -> Option<Self> {
214        Some(())
215    }
216    fn epoch_start(&self, _scope: Scope) -> Self {}
217    fn epoch_end(&self, _scope: Scope) -> Self {}
218}
219
220impl Timestamp for u32 {
221    const NESTING_DEPTH: usize = 0;
222
223    type Nested = Product<u32, u32>;
224
225    type TimedBatch<B: Batch<Time = ()>> = B::Timed<Self>;
226
227    fn minimum() -> Self {
228        0
229    }
230    fn advance(&self, _scope: Scope) -> Self {
231        self + 1
232    }
233    fn recede(&self, _scope: Scope) -> Self {
234        self - 1
235    }
236    fn checked_recede(&self, _scope: Scope) -> Option<Self> {
237        self.checked_sub(1)
238    }
239    fn epoch_start(&self, _scope: Scope) -> Self {
240        0
241    }
242    fn epoch_end(&self, _scope: Scope) -> Self {
243        Self::MAX
244    }
245}