exocore_core/time/
clock.rs

1use std::{
2    sync::{
3        atomic::{AtomicUsize, Ordering},
4        Arc,
5    },
6    time::Duration,
7};
8
9use chrono::{DateTime, TimeZone, Utc};
10
11use super::{ConsistentTimestamp, Instant, SystemTime};
12use crate::cell::Node;
13
14// TODO: To be completed in https://github.com/appaquet/exocore/issues/6
15
16const CONSISTENT_COUNTER_MAX: usize = 999;
17
18#[derive(Clone)]
19pub struct Clock {
20    source: Source,
21    consistent_counter: Arc<AtomicUsize>,
22}
23
24impl Clock {
25    pub fn new() -> Clock {
26        Clock {
27            source: Source::System,
28            consistent_counter: Arc::new(AtomicUsize::new(0)),
29        }
30    }
31
32    #[cfg(any(test, feature = "tests-utils"))]
33    pub fn new_mocked() -> Clock {
34        Clock {
35            source: Source::Mocked(std::sync::Arc::new(std::sync::RwLock::new(None))),
36            consistent_counter: Arc::new(AtomicUsize::new(0)),
37        }
38    }
39
40    #[cfg(any(test, feature = "tests-utils"))]
41    pub fn new_fixed_mocked(instant: Instant) -> Clock {
42        let clock = Self::new_mocked();
43        clock.set_fixed_instant(instant);
44        clock
45    }
46
47    #[inline]
48    pub fn instant(&self) -> Instant {
49        match &self.source {
50            Source::System => Instant::now(),
51            #[cfg(any(test, feature = "tests-utils"))]
52            Source::Mocked(time) => {
53                let mocked_instant = time.read().expect("Couldn't acquire read lock");
54                if let Some((fixed_instant, _fixed_unix_elaps)) = *mocked_instant {
55                    fixed_instant
56                } else {
57                    Instant::now()
58                }
59            }
60        }
61    }
62
63    pub fn now_chrono(&self) -> DateTime<Utc> {
64        let unix_elapsed = self.unix_elapsed();
65        Utc.timestamp_nanos(unix_elapsed.as_nanos() as i64)
66    }
67
68    pub fn consistent_time(&self, node: &Node) -> ConsistentTimestamp {
69        let counter = loop {
70            let counter = self.consistent_counter.fetch_add(1, Ordering::SeqCst);
71            if counter < CONSISTENT_COUNTER_MAX {
72                break counter;
73            }
74
75            // unfortunately, as soon as we roll over the counter, we need to make sure that
76            // we don't have the same millisecond since that would mean its not monotonic.
77            // we sleep for a millisecond.
78            Self::sleep_next_millisecond();
79
80            // counter is higher than MAX, we try to swap it with 1 (so that we can return
81            // 0) if the previous value after swap wasn't equal to what we
82            // expected, it means another thread swapped / increased the value,
83            // and we need to retry
84            if self
85                .consistent_counter
86                .compare_exchange(counter + 1, 1, Ordering::SeqCst, Ordering::Relaxed)
87                .is_ok()
88            {
89                break 0; // set to 1, but can return 0 since next will return 0
90            }
91        };
92
93        let unix_elapsed = self.unix_elapsed();
94        ConsistentTimestamp::from_context(unix_elapsed, counter as u64, node.consistent_clock_id())
95    }
96
97    pub fn unix_elapsed(&self) -> Duration {
98        match &self.source {
99            Source::System => {
100                let now_system = SystemTime::now();
101                now_system.duration_since(wasm_timer::UNIX_EPOCH).unwrap()
102            }
103            #[cfg(any(test, feature = "tests-utils"))]
104            Source::Mocked(time) => {
105                let mocked_instant = time.read().expect("Couldn't acquire read lock");
106
107                if let Some((_fixed_instant, fixed_unix_elaps)) = *mocked_instant {
108                    fixed_unix_elaps
109                } else {
110                    SystemTime::now()
111                        .duration_since(wasm_timer::UNIX_EPOCH)
112                        .unwrap()
113                }
114            }
115        }
116    }
117
118    #[cfg(any(test, feature = "tests-utils"))]
119    pub fn set_fixed_instant(&self, fixed_instant: Instant) {
120        if let Source::Mocked(mocked_instant) = &self.source {
121            let mut mocked_instant = mocked_instant.write().expect("Couldn't acquire write lock");
122
123            let now_system = SystemTime::now();
124            let unix_elapsed = now_system.duration_since(wasm_timer::UNIX_EPOCH).unwrap();
125            let now_instant = Instant::now();
126
127            let fixed_unix_elaps = if now_instant > fixed_instant {
128                unix_elapsed - (now_instant - fixed_instant)
129            } else {
130                unix_elapsed + (fixed_instant - now_instant)
131            };
132
133            *mocked_instant = Some((fixed_instant, fixed_unix_elaps));
134        } else {
135            panic!("Called set_time, but clock source is system");
136        }
137    }
138
139    #[cfg(any(test, feature = "tests-utils"))]
140    pub fn add_fixed_instant_duration(&self, duration: super::Duration) {
141        if let Source::Mocked(mocked_instant) = &self.source {
142            let mut mocked_instant = mocked_instant.write().expect("Couldn't acquire write lock");
143            if let Some((current_instant, unix_elapsed)) = *mocked_instant {
144                *mocked_instant = Some((current_instant + duration, unix_elapsed + duration))
145            }
146        } else {
147            panic!("Called set_time, but clock source is system");
148        }
149    }
150
151    #[cfg(any(test, feature = "tests-utils"))]
152    pub fn reset_fixed_instant(&self) {
153        if let Source::Mocked(mocked_instant) = &self.source {
154            let mut mocked_instant = mocked_instant.write().expect("Couldn't acquire write lock");
155            *mocked_instant = None;
156        } else {
157            panic!("Called set_time, but clock source is system");
158        }
159    }
160
161    /// Sleeps for a millisecond. In wasm, sleep is not implemented, so we spin.
162    fn sleep_next_millisecond() {
163        #[cfg(not(target_arch = "wasm32"))]
164        {
165            std::thread::sleep(std::time::Duration::from_millis(1));
166        }
167
168        #[cfg(target_arch = "wasm32")]
169        {
170            let before = Instant::now();
171            while before.elapsed() < Duration::from_millis(1) {}
172        }
173    }
174}
175
176impl Default for Clock {
177    fn default() -> Self {
178        Clock::new()
179    }
180}
181
182#[derive(Clone)]
183enum Source {
184    System,
185    #[cfg(any(test, feature = "tests-utils"))]
186    Mocked(std::sync::Arc<std::sync::RwLock<Option<(Instant, super::Duration)>>>),
187}
188
189#[cfg(test)]
190mod tests {
191    use std::{sync::Arc, thread};
192
193    use super::{super::Duration, *};
194    use crate::cell::LocalNode;
195
196    #[test]
197    fn non_mocked_clock() {
198        let now = Instant::now();
199
200        let clock = Clock::new();
201        let instant1 = clock.instant();
202        assert!(instant1 > now);
203
204        let instant2 = clock.instant();
205        assert!(instant2 > instant1);
206    }
207
208    #[test]
209    fn fixed_mocked_clock() {
210        let mocked_clock = Clock::new_fixed_mocked(Instant::now());
211        assert_eq!(mocked_clock.instant(), mocked_clock.instant());
212
213        let new_instant = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
214        mocked_clock.set_fixed_instant(new_instant);
215
216        assert_eq!(mocked_clock.instant(), new_instant);
217
218        let dur_2secs = Duration::from_secs(2);
219        mocked_clock.add_fixed_instant_duration(dur_2secs);
220        assert_eq!(mocked_clock.instant(), new_instant + dur_2secs);
221    }
222
223    #[test]
224    fn fixed_consistent_time() {
225        let mocked_clock = Clock::new_fixed_mocked(Instant::now());
226        let local_node = LocalNode::generate();
227
228        let time1 = mocked_clock.consistent_time(local_node.node());
229        std::thread::sleep(Duration::from_millis(10));
230        let time2 = mocked_clock.consistent_time(local_node.node());
231        assert_eq!(time1 + ConsistentTimestamp::from(1), time2); // time2 is +1 because of counter
232
233        mocked_clock.reset_fixed_instant();
234        let time3 = mocked_clock.consistent_time(local_node.node());
235        std::thread::sleep(Duration::from_millis(10));
236        let time4 = mocked_clock.consistent_time(local_node.node());
237
238        let elaps = Duration::from_millis(10);
239        assert!((time4 - time3).unwrap() > elaps);
240    }
241
242    #[test]
243    fn consistent_time_collision() {
244        let mocked_clock = Clock::new_fixed_mocked(Instant::now());
245        let local_node = LocalNode::generate();
246
247        let mut last_time = ConsistentTimestamp::from(0);
248        for i in 0..10000 {
249            let current_time = mocked_clock.consistent_time(local_node.node());
250            assert_ne!(last_time, current_time, "at iteration {i}");
251            last_time = current_time;
252        }
253    }
254
255    #[test]
256    fn fixed_future_consistent_time() {
257        let mocked_clock = Clock::new_fixed_mocked(Instant::now() + Duration::from_secs(10));
258        let local_node = LocalNode::generate();
259
260        let time1 = mocked_clock.consistent_time(local_node.node());
261        std::thread::sleep(Duration::from_millis(10));
262        let time2 = mocked_clock.consistent_time(local_node.node());
263        assert_eq!(time1 + ConsistentTimestamp::from(1), time2); // time2 is +1 because of counter
264
265        mocked_clock.reset_fixed_instant();
266        let time3 = mocked_clock.consistent_time(local_node.node());
267        std::thread::sleep(Duration::from_millis(10));
268        let time4 = mocked_clock.consistent_time(local_node.node());
269
270        let elaps = Duration::from_millis(10);
271        assert!((time4 - time3).unwrap() > elaps);
272    }
273
274    #[test]
275    fn unfixed_mocked_clock() {
276        let mocked_clock = Clock::new_mocked();
277        assert_ne!(mocked_clock.instant(), mocked_clock.instant());
278
279        let inst = Instant::now();
280        mocked_clock.set_fixed_instant(inst);
281
282        assert_eq!(mocked_clock.instant(), inst);
283
284        {
285            // unfixed clock should now give different instant
286            mocked_clock.reset_fixed_instant();
287
288            let t1 = mocked_clock.instant();
289
290            // ony *nix, two call will give diff instant because of sys calls, but on
291            // windows it may be the same
292            std::thread::sleep(Duration::from_millis(1));
293
294            let t2 = mocked_clock.instant();
295
296            assert_ne!(t1, t2);
297        }
298    }
299
300    #[test]
301    fn thread_safety() {
302        let now = Instant::now();
303
304        let mocked_clock = Arc::new(Clock::new_mocked());
305
306        let thread_clock = Arc::clone(&mocked_clock);
307        thread::spawn(move || {
308            thread_clock.set_fixed_instant(now);
309        })
310        .join()
311        .unwrap();
312
313        assert_eq!(mocked_clock.instant(), now);
314    }
315
316    #[test]
317    fn chrono_datetime() {
318        let clock = Clock::new();
319
320        let now1 = clock.now_chrono();
321        thread::sleep(Duration::from_nanos(1));
322        let now2 = clock.now_chrono();
323        assert!(now2 > now1);
324
325        let fixed_clock = Clock::new_fixed_mocked(Instant::now());
326        let now3 = fixed_clock.now_chrono();
327        assert!(now3 > now2);
328
329        let now4 = fixed_clock.now_chrono();
330        assert_eq!(now3, now4);
331    }
332}