dbsp/time.rs
1//! # Logical time
2//!
3//! Every value produced by a DBSP operator is logically labelled by the time
4//! when it was produced. A logical time can be represented as an array of
5//! integers whose length is equal to the nesting depth of the circuit and whose
6//! `i`th element is the local time of the circuit at the `i`th nesting level.
7//! A root circuit has a 1-dimensional logical clock, which starts at 0 and
8//! increments by 1 at each clock tick, a nested circuit has a 2-dimensional
9//! logical clock, and so on.
10//!
11//! The [`Timestamp`] trait captures common functionality of logical time
12//! representation.
13//!
14//! # Lossy times
15//!
16//! In practice, if a logical time needs to be stored explicitly (which is not
17//! always necessary), we use a more compact "lossy" representation of logical
18//! time that stores just enough timing information for operators that compute
19//! on the data.
20//!
21//! ## Example: Untimed data
22//!
23//! We use unit type `()` for untimed data, where we only track values and not
24//! the time when each value was generated.
25//!
26//! The [`integrate_trace`](`crate::circuit::Stream::integrate_trace`) operator
27//! computes the union of all Z-sets in its input stream generated during the
28//! current clock epoch, i.e., since the last
29//! [`clock_start`](`crate::circuit::operator_traits::Operator::clock_start`)
30//! invocation. Its consumers only need to know the current value of the
31//! sum and not when each update was produced. It therefore stores its output
32//! in an untimed [`Trace`]([`crate::trace::Trace`]), i.e., a trace whose
33//! timestamp type is `()`.
34//!
35//! # Comparing times
36//!
37//! Timestamps partially order logical time. Multidimensional timestamps, in
38//! particular, are not totally ordered: `(x1,x2) <= (y1,y2)` if and only if `x1
39//! <= y1 && x2 <= y2`, so that, e.g. `(1,2)` and `(2,1)` are not comparable.
40//! The [`PartialOrder`] trait that bounds `Timestamp` allows this logical time
41//! ordering to be separate from the ordinary [`PartialOrd`] used for, e.g.,
42//! sorting.
43
44mod antichain;
45mod product;
46
47use crate::{
48 DBData, Scope,
49 algebra::{Lattice, PartialOrder},
50 trace::Batch,
51};
52use rkyv::{Archive, Deserialize, Serialize};
53use size_of::SizeOf;
54use std::{fmt::Debug, hash::Hash};
55
56pub use antichain::{Antichain, AntichainRef};
57//pub use nested_ts32::NestedTimestamp32;
58pub use product::Product;
59
60/// Logical timestamp.
61///
62/// See [crate documentation](crate::time) for an overview of logical time in
63/// DBSP.
64// TODO: Conversion to/from the most general time representation (`[usize]`).
65// TODO: Model overflow by having `advance` return Option<Self>.
66pub trait Timestamp: DBData + PartialOrder + Lattice {
67 /// Nesting depth of the circuit running this clock.
68 ///
69 /// 0 - for the top-level clock, 1 - first-level nested circuit, etc.
70 const NESTING_DEPTH: usize;
71
72 type Nested: Timestamp;
73
74 /// A version of a batch type `B` with `Self` used as a timestamp.
75 ///
76 /// If `Self = ()`, then `TimedBatch<B>` is `B`.
77 /// Otherwise, `TimedBatch<B> = B::Timed<Self>`.
78 type TimedBatch<B: Batch<Time = ()>>: Batch<Key = B::Key, Val = B::Val, Time = Self, R = B::R>;
79
80 fn minimum() -> Self;
81
82 /// The value of the timestamp when the clock starts ticking.
83 ///
84 /// This is typically but not always equal to `Self::minimum`. For example,
85 /// we use 1-bit timestamp that starts at value 1 (current clock epoch),
86 /// value 0 (previous epochs) can only be obtained by calling
87 /// [`recede`](`Timestamp::recede`).
88 fn clock_start() -> Self {
89 Self::minimum()
90 }
91
92 /// Advance the timestamp by one clock tick.
93 ///
94 /// Advance `self` by one clock tick by incrementing the clock at the
95 /// specified nesting level by one, while resetting all clocks at deeper
96 /// nesting levels to `0`. `scope` identifies the nesting level of the
97 /// circuit whose clock is ticking. `0` refers to the innermost
98 /// circuit. `1` is its parent circuit, etc. Returns the most accurate
99 /// approximation of the new timestamp value supported by this time
100 /// representation.
101 ///
102 /// # Example
103 ///
104 /// Assume a two-dimensional time modeled as (parent time, child time)
105 /// tuple.
106 ///
107 /// ```ignore
108 /// assert_eq!((2, 3).advance(0), (2, 4));
109 /// assert_eq!((2, 3).advance(1), (3, 0));
110 /// ```
111 fn advance(&self, scope: Scope) -> Self;
112
113 /// Push the timestamp back by one clock tick.
114 ///
115 /// Push `self` back by one clock cycle by decrementing the clock at the
116 /// specified nesting level by one. `scope` identifies the nesting
117 /// level of the circuit whose clock is ticking. `0` refers to the
118 /// innermost circuit. `1` is its parent circuit, etc.
119 ///
120 /// # Panics
121 ///
122 /// Panics if the clock at the `scope` nesting level is equal to zero
123 /// (i.e., we are running the first clock cycle) and hence cannot be
124 /// decremented.
125 fn recede(&self, scope: Scope) -> Self {
126 self.checked_recede(scope).unwrap()
127 }
128
129 /// Like `recede`, but returns `None` if the clock at level `scope` is
130 /// equal to zero (i.e., we are running the first clock cycle) and hence
131 /// cannot be decremented.
132 fn checked_recede(&self, scope: Scope) -> Option<Self>;
133
134 /// Returns the first time stamp of the current clock epoch in `scope`.
135 fn epoch_start(&self, scope: Scope) -> Self;
136
137 /// Advance `self` to the end of the current clock epoch in `scope`.
138 fn epoch_end(&self, scope: Scope) -> Self;
139}
140
141/// Zero-dimensional clock that doesn't need to count ticks.
142///
143/// This type is only used to bootstrap the recursive definition of the
144/// `WithClock` trait. You can otherwise use `()` as the type for an empty
145/// timestamp.
146#[derive(
147 Clone, Debug, Hash, PartialOrd, Ord, PartialEq, Eq, SizeOf, Archive, Serialize, Deserialize,
148)]
149#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd, Hash))]
150#[archive(compare(PartialEq, PartialOrd))]
151#[archive_attr(doc(hidden))]
152pub struct UnitTimestamp;
153
154impl Default for UnitTimestamp {
155 fn default() -> Self {
156 UnitTimestamp
157 }
158}
159
160impl PartialOrder for UnitTimestamp {
161 fn less_equal(&self, _other: &Self) -> bool {
162 true
163 }
164}
165
166impl Lattice for UnitTimestamp {
167 fn join(&self, _other: &Self) -> Self {
168 UnitTimestamp
169 }
170
171 fn meet(&self, _other: &Self) -> Self {
172 UnitTimestamp
173 }
174}
175
176impl Timestamp for UnitTimestamp {
177 const NESTING_DEPTH: usize = 0;
178
179 type Nested = ();
180
181 type TimedBatch<B: Batch<Time = ()>> = B::Timed<Self>;
182
183 fn minimum() -> Self {
184 UnitTimestamp
185 }
186 fn advance(&self, _scope: Scope) -> Self {
187 UnitTimestamp
188 }
189 fn recede(&self, _scope: Scope) -> Self {
190 UnitTimestamp
191 }
192 fn checked_recede(&self, _scope: Scope) -> Option<Self> {
193 None
194 }
195 fn epoch_start(&self, _scope: Scope) -> Self {
196 UnitTimestamp
197 }
198 fn epoch_end(&self, _scope: Scope) -> Self {
199 UnitTimestamp
200 }
201}
202
203impl Timestamp for () {
204 const NESTING_DEPTH: usize = 0;
205
206 type TimedBatch<B: Batch<Time = ()>> = B;
207
208 type Nested = Product<u32, u32>;
209
210 fn minimum() -> Self {}
211 fn advance(&self, _scope: Scope) -> Self {}
212 fn recede(&self, _scope: Scope) -> Self {}
213 fn checked_recede(&self, _scope: Scope) -> Option<Self> {
214 Some(())
215 }
216 fn epoch_start(&self, _scope: Scope) -> Self {}
217 fn epoch_end(&self, _scope: Scope) -> Self {}
218}
219
220impl Timestamp for u32 {
221 const NESTING_DEPTH: usize = 0;
222
223 type Nested = Product<u32, u32>;
224
225 type TimedBatch<B: Batch<Time = ()>> = B::Timed<Self>;
226
227 fn minimum() -> Self {
228 0
229 }
230 fn advance(&self, _scope: Scope) -> Self {
231 self + 1
232 }
233 fn recede(&self, _scope: Scope) -> Self {
234 self - 1
235 }
236 fn checked_recede(&self, _scope: Scope) -> Option<Self> {
237 self.checked_sub(1)
238 }
239 fn epoch_start(&self, _scope: Scope) -> Self {
240 0
241 }
242 fn epoch_end(&self, _scope: Scope) -> Self {
243 Self::MAX
244 }
245}