declarative_dataflow/timestamp/
mod.rs1use std::time::Duration;
4
5pub mod altneu;
6pub mod pair;
7
8#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
13pub enum Time {
14 TxId(u64),
16 Real(Duration),
18 Bi(Duration, u64),
20}
21
22impl std::convert::From<Time> for u64 {
23 fn from(t: Time) -> u64 {
24 if let Time::TxId(time) = t {
25 time
26 } else {
27 panic!("Time {:?} can't be converted to u64", t);
28 }
29 }
30}
31
32impl std::convert::From<u64> for Time {
33 fn from(t: u64) -> Time {
34 Time::TxId(t)
35 }
36}
37
38impl std::convert::From<Time> for Duration {
39 fn from(t: Time) -> Duration {
40 if let Time::Real(time) = t {
41 time
42 } else {
43 panic!("Time {:?} can't be converted to Duration", t);
44 }
45 }
46}
47
48impl std::convert::From<Duration> for Time {
49 fn from(t: Duration) -> Time {
50 Time::Real(t)
51 }
52}
53
54impl std::convert::From<Time> for pair::Pair<Duration, u64> {
55 fn from(t: Time) -> Self {
56 if let Time::Bi(sys, event) = t {
57 Self::new(sys, event)
58 } else {
59 panic!("Time {:?} can't be converted to Pair", t);
60 }
61 }
62}
63
64impl std::convert::From<pair::Pair<Duration, u64>> for Time {
65 fn from(t: pair::Pair<Duration, u64>) -> Time {
66 Time::Bi(t.first, t.second)
67 }
68}
69
70pub trait Rewind: std::convert::From<Time> {
74 fn rewind(&self, slack: Self) -> Self;
81}
82
83impl Rewind for u64 {
84 fn rewind(&self, slack: Self) -> Self {
85 match self.checked_sub(slack) {
86 None => *self,
87 Some(rewound) => rewound,
88 }
89 }
90}
91
92impl Rewind for Duration {
93 fn rewind(&self, slack: Self) -> Self {
94 match self.checked_sub(slack) {
95 None => *self,
96 Some(rewound) => rewound,
97 }
98 }
99}
100
101impl Rewind for pair::Pair<Duration, u64> {
102 fn rewind(&self, slack: Self) -> Self {
103 let first_rewound = self.first.rewind(slack.first);
104 let second_rewound = self.second.rewind(slack.second);
105
106 Self::new(first_rewound, second_rewound)
107 }
108}
109
110pub trait Coarsen {
114 fn coarsen(&self, window_size: &Self) -> Self;
117}
118
119impl Coarsen for u64 {
120 fn coarsen(&self, window_size: &Self) -> Self {
121 (self / window_size + 1) * window_size
122 }
123}
124
125impl Coarsen for Duration {
126 fn coarsen(&self, window_size: &Self) -> Self {
127 let w_secs = window_size.as_secs();
128 let w_nanos = window_size.subsec_nanos();
129
130 let secs_coarsened = if w_secs == 0 {
131 self.as_secs()
132 } else {
133 (self.as_secs() / w_secs + 1) * w_secs
134 };
135
136 let nanos_coarsened = if w_nanos == 0 {
137 0
138 } else {
139 (self.subsec_nanos() / w_nanos + 1) * w_nanos
140 };
141
142 Duration::new(secs_coarsened, nanos_coarsened)
143 }
144}
145
146impl Coarsen for pair::Pair<Duration, u64> {
147 fn coarsen(&self, window_size: &Self) -> Self {
148 let first_coarsened = self.first.coarsen(&window_size.first);
149 let second_coarsened = self.second.coarsen(&window_size.second);
150
151 Self::new(first_coarsened, second_coarsened)
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::{Coarsen, Rewind};
158 use std::time::Duration;
159
160 #[test]
161 fn test_rewind() {
162 assert_eq!((0 as u64).rewind(1), 0);
163 assert_eq!((10 as u64).rewind(5), 5);
164
165 assert_eq!(
166 Duration::from_secs(0).rewind(Duration::from_secs(10)),
167 Duration::from_secs(0)
168 );
169 assert_eq!(
170 Duration::from_millis(12345).rewind(Duration::from_millis(45)),
171 Duration::from_millis(12300)
172 );
173 }
174
175 #[test]
176 fn test_coarsen() {
177 assert_eq!((0 as u64).coarsen(&10), 10);
178 assert_eq!((6 as u64).coarsen(&10), 10);
179 assert_eq!((11 as u64).coarsen(&10), 20);
180
181 assert_eq!(
182 Duration::from_secs(0).coarsen(&Duration::from_secs(10)),
183 Duration::from_secs(10)
184 );
185 assert_eq!(
186 Duration::new(10, 500).coarsen(&Duration::from_secs(10)),
187 Duration::from_secs(20),
188 );
189 }
190}