declarative_dataflow/timestamp/
mod.rs

1//! Various timestamp implementations.
2
3use std::time::Duration;
4
5pub mod altneu;
6pub mod pair;
7
8/// Possible timestamp types.
9///
10/// This enum captures the currently supported timestamp types, and is
11/// the least common denominator for the types of times moved around.
12#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
13pub enum Time {
14    /// Logical transaction time or sequence numbers.
15    TxId(u64),
16    /// Real time.
17    Real(Duration),
18    /// Bitemporal.
19    Bi(Duration, u64),
20}
21
22impl std::convert::From<Time> for u64 {
23    fn from(t: Time) -> u64 {
24        if let Time::TxId(time) = t {
25            time
26        } else {
27            panic!("Time {:?} can't be converted to u64", t);
28        }
29    }
30}
31
32impl std::convert::From<u64> for Time {
33    fn from(t: u64) -> Time {
34        Time::TxId(t)
35    }
36}
37
38impl std::convert::From<Time> for Duration {
39    fn from(t: Time) -> Duration {
40        if let Time::Real(time) = t {
41            time
42        } else {
43            panic!("Time {:?} can't be converted to Duration", t);
44        }
45    }
46}
47
48impl std::convert::From<Duration> for Time {
49    fn from(t: Duration) -> Time {
50        Time::Real(t)
51    }
52}
53
54impl std::convert::From<Time> for pair::Pair<Duration, u64> {
55    fn from(t: Time) -> Self {
56        if let Time::Bi(sys, event) = t {
57            Self::new(sys, event)
58        } else {
59            panic!("Time {:?} can't be converted to Pair", t);
60        }
61    }
62}
63
64impl std::convert::From<pair::Pair<Duration, u64>> for Time {
65    fn from(t: pair::Pair<Duration, u64>) -> Time {
66        Time::Bi(t.first, t.second)
67    }
68}
69
70/// Extension trait for timestamp types that can be safely re-wound to
71/// an earlier time. This is required for automatically advancing
72/// traces according to their configured slack.
73pub trait Rewind: std::convert::From<Time> {
74    /// Returns a new timestamp corresponding to self rewound by the
75    /// specified amount of slack. Calling rewind is always safe, in
76    /// that no invalid times will be returned.
77    ///
78    /// e.g. 0.rewind(10) -> 0
79    /// and Duration(0).rewind(Duration(1)) -> Duration(0)
80    fn rewind(&self, slack: Self) -> Self;
81}
82
83impl Rewind for u64 {
84    fn rewind(&self, slack: Self) -> Self {
85        match self.checked_sub(slack) {
86            None => *self,
87            Some(rewound) => rewound,
88        }
89    }
90}
91
92impl Rewind for Duration {
93    fn rewind(&self, slack: Self) -> Self {
94        match self.checked_sub(slack) {
95            None => *self,
96            Some(rewound) => rewound,
97        }
98    }
99}
100
101impl Rewind for pair::Pair<Duration, u64> {
102    fn rewind(&self, slack: Self) -> Self {
103        let first_rewound = self.first.rewind(slack.first);
104        let second_rewound = self.second.rewind(slack.second);
105
106        Self::new(first_rewound, second_rewound)
107    }
108}
109
110/// Extension trait for timestamp types that can be rounded up to
111/// interval bounds, thus coarsening the granularity of timestamps and
112/// delaying results.
113pub trait Coarsen {
114    /// Returns a new timestamp delayed up to the next multiple of the
115    /// specified window size.
116    fn coarsen(&self, window_size: &Self) -> Self;
117}
118
119impl Coarsen for u64 {
120    fn coarsen(&self, window_size: &Self) -> Self {
121        (self / window_size + 1) * window_size
122    }
123}
124
125impl Coarsen for Duration {
126    fn coarsen(&self, window_size: &Self) -> Self {
127        let w_secs = window_size.as_secs();
128        let w_nanos = window_size.subsec_nanos();
129
130        let secs_coarsened = if w_secs == 0 {
131            self.as_secs()
132        } else {
133            (self.as_secs() / w_secs + 1) * w_secs
134        };
135
136        let nanos_coarsened = if w_nanos == 0 {
137            0
138        } else {
139            (self.subsec_nanos() / w_nanos + 1) * w_nanos
140        };
141
142        Duration::new(secs_coarsened, nanos_coarsened)
143    }
144}
145
146impl Coarsen for pair::Pair<Duration, u64> {
147    fn coarsen(&self, window_size: &Self) -> Self {
148        let first_coarsened = self.first.coarsen(&window_size.first);
149        let second_coarsened = self.second.coarsen(&window_size.second);
150
151        Self::new(first_coarsened, second_coarsened)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::{Coarsen, Rewind};
158    use std::time::Duration;
159
160    #[test]
161    fn test_rewind() {
162        assert_eq!((0 as u64).rewind(1), 0);
163        assert_eq!((10 as u64).rewind(5), 5);
164
165        assert_eq!(
166            Duration::from_secs(0).rewind(Duration::from_secs(10)),
167            Duration::from_secs(0)
168        );
169        assert_eq!(
170            Duration::from_millis(12345).rewind(Duration::from_millis(45)),
171            Duration::from_millis(12300)
172        );
173    }
174
175    #[test]
176    fn test_coarsen() {
177        assert_eq!((0 as u64).coarsen(&10), 10);
178        assert_eq!((6 as u64).coarsen(&10), 10);
179        assert_eq!((11 as u64).coarsen(&10), 20);
180
181        assert_eq!(
182            Duration::from_secs(0).coarsen(&Duration::from_secs(10)),
183            Duration::from_secs(10)
184        );
185        assert_eq!(
186            Duration::new(10, 500).coarsen(&Duration::from_secs(10)),
187            Duration::from_secs(20),
188        );
189    }
190}