exocore_core/time/
clock.rs1use std::{
2 sync::{
3 atomic::{AtomicUsize, Ordering},
4 Arc,
5 },
6 time::Duration,
7};
8
9use chrono::{DateTime, TimeZone, Utc};
10
11use super::{ConsistentTimestamp, Instant, SystemTime};
12use crate::cell::Node;
13
14const CONSISTENT_COUNTER_MAX: usize = 999;
17
18#[derive(Clone)]
19pub struct Clock {
20 source: Source,
21 consistent_counter: Arc<AtomicUsize>,
22}
23
24impl Clock {
25 pub fn new() -> Clock {
26 Clock {
27 source: Source::System,
28 consistent_counter: Arc::new(AtomicUsize::new(0)),
29 }
30 }
31
32 #[cfg(any(test, feature = "tests-utils"))]
33 pub fn new_mocked() -> Clock {
34 Clock {
35 source: Source::Mocked(std::sync::Arc::new(std::sync::RwLock::new(None))),
36 consistent_counter: Arc::new(AtomicUsize::new(0)),
37 }
38 }
39
40 #[cfg(any(test, feature = "tests-utils"))]
41 pub fn new_fixed_mocked(instant: Instant) -> Clock {
42 let clock = Self::new_mocked();
43 clock.set_fixed_instant(instant);
44 clock
45 }
46
47 #[inline]
48 pub fn instant(&self) -> Instant {
49 match &self.source {
50 Source::System => Instant::now(),
51 #[cfg(any(test, feature = "tests-utils"))]
52 Source::Mocked(time) => {
53 let mocked_instant = time.read().expect("Couldn't acquire read lock");
54 if let Some((fixed_instant, _fixed_unix_elaps)) = *mocked_instant {
55 fixed_instant
56 } else {
57 Instant::now()
58 }
59 }
60 }
61 }
62
63 pub fn now_chrono(&self) -> DateTime<Utc> {
64 let unix_elapsed = self.unix_elapsed();
65 Utc.timestamp_nanos(unix_elapsed.as_nanos() as i64)
66 }
67
68 pub fn consistent_time(&self, node: &Node) -> ConsistentTimestamp {
69 let counter = loop {
70 let counter = self.consistent_counter.fetch_add(1, Ordering::SeqCst);
71 if counter < CONSISTENT_COUNTER_MAX {
72 break counter;
73 }
74
75 Self::sleep_next_millisecond();
79
80 if self
85 .consistent_counter
86 .compare_exchange(counter + 1, 1, Ordering::SeqCst, Ordering::Relaxed)
87 .is_ok()
88 {
89 break 0; }
91 };
92
93 let unix_elapsed = self.unix_elapsed();
94 ConsistentTimestamp::from_context(unix_elapsed, counter as u64, node.consistent_clock_id())
95 }
96
97 pub fn unix_elapsed(&self) -> Duration {
98 match &self.source {
99 Source::System => {
100 let now_system = SystemTime::now();
101 now_system.duration_since(wasm_timer::UNIX_EPOCH).unwrap()
102 }
103 #[cfg(any(test, feature = "tests-utils"))]
104 Source::Mocked(time) => {
105 let mocked_instant = time.read().expect("Couldn't acquire read lock");
106
107 if let Some((_fixed_instant, fixed_unix_elaps)) = *mocked_instant {
108 fixed_unix_elaps
109 } else {
110 SystemTime::now()
111 .duration_since(wasm_timer::UNIX_EPOCH)
112 .unwrap()
113 }
114 }
115 }
116 }
117
118 #[cfg(any(test, feature = "tests-utils"))]
119 pub fn set_fixed_instant(&self, fixed_instant: Instant) {
120 if let Source::Mocked(mocked_instant) = &self.source {
121 let mut mocked_instant = mocked_instant.write().expect("Couldn't acquire write lock");
122
123 let now_system = SystemTime::now();
124 let unix_elapsed = now_system.duration_since(wasm_timer::UNIX_EPOCH).unwrap();
125 let now_instant = Instant::now();
126
127 let fixed_unix_elaps = if now_instant > fixed_instant {
128 unix_elapsed - (now_instant - fixed_instant)
129 } else {
130 unix_elapsed + (fixed_instant - now_instant)
131 };
132
133 *mocked_instant = Some((fixed_instant, fixed_unix_elaps));
134 } else {
135 panic!("Called set_time, but clock source is system");
136 }
137 }
138
139 #[cfg(any(test, feature = "tests-utils"))]
140 pub fn add_fixed_instant_duration(&self, duration: super::Duration) {
141 if let Source::Mocked(mocked_instant) = &self.source {
142 let mut mocked_instant = mocked_instant.write().expect("Couldn't acquire write lock");
143 if let Some((current_instant, unix_elapsed)) = *mocked_instant {
144 *mocked_instant = Some((current_instant + duration, unix_elapsed + duration))
145 }
146 } else {
147 panic!("Called set_time, but clock source is system");
148 }
149 }
150
151 #[cfg(any(test, feature = "tests-utils"))]
152 pub fn reset_fixed_instant(&self) {
153 if let Source::Mocked(mocked_instant) = &self.source {
154 let mut mocked_instant = mocked_instant.write().expect("Couldn't acquire write lock");
155 *mocked_instant = None;
156 } else {
157 panic!("Called set_time, but clock source is system");
158 }
159 }
160
161 fn sleep_next_millisecond() {
163 #[cfg(not(target_arch = "wasm32"))]
164 {
165 std::thread::sleep(std::time::Duration::from_millis(1));
166 }
167
168 #[cfg(target_arch = "wasm32")]
169 {
170 let before = Instant::now();
171 while before.elapsed() < Duration::from_millis(1) {}
172 }
173 }
174}
175
176impl Default for Clock {
177 fn default() -> Self {
178 Clock::new()
179 }
180}
181
182#[derive(Clone)]
183enum Source {
184 System,
185 #[cfg(any(test, feature = "tests-utils"))]
186 Mocked(std::sync::Arc<std::sync::RwLock<Option<(Instant, super::Duration)>>>),
187}
188
189#[cfg(test)]
190mod tests {
191 use std::{sync::Arc, thread};
192
193 use super::{super::Duration, *};
194 use crate::cell::LocalNode;
195
196 #[test]
197 fn non_mocked_clock() {
198 let now = Instant::now();
199
200 let clock = Clock::new();
201 let instant1 = clock.instant();
202 assert!(instant1 > now);
203
204 let instant2 = clock.instant();
205 assert!(instant2 > instant1);
206 }
207
208 #[test]
209 fn fixed_mocked_clock() {
210 let mocked_clock = Clock::new_fixed_mocked(Instant::now());
211 assert_eq!(mocked_clock.instant(), mocked_clock.instant());
212
213 let new_instant = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
214 mocked_clock.set_fixed_instant(new_instant);
215
216 assert_eq!(mocked_clock.instant(), new_instant);
217
218 let dur_2secs = Duration::from_secs(2);
219 mocked_clock.add_fixed_instant_duration(dur_2secs);
220 assert_eq!(mocked_clock.instant(), new_instant + dur_2secs);
221 }
222
223 #[test]
224 fn fixed_consistent_time() {
225 let mocked_clock = Clock::new_fixed_mocked(Instant::now());
226 let local_node = LocalNode::generate();
227
228 let time1 = mocked_clock.consistent_time(local_node.node());
229 std::thread::sleep(Duration::from_millis(10));
230 let time2 = mocked_clock.consistent_time(local_node.node());
231 assert_eq!(time1 + ConsistentTimestamp::from(1), time2); mocked_clock.reset_fixed_instant();
234 let time3 = mocked_clock.consistent_time(local_node.node());
235 std::thread::sleep(Duration::from_millis(10));
236 let time4 = mocked_clock.consistent_time(local_node.node());
237
238 let elaps = Duration::from_millis(10);
239 assert!((time4 - time3).unwrap() > elaps);
240 }
241
242 #[test]
243 fn consistent_time_collision() {
244 let mocked_clock = Clock::new_fixed_mocked(Instant::now());
245 let local_node = LocalNode::generate();
246
247 let mut last_time = ConsistentTimestamp::from(0);
248 for i in 0..10000 {
249 let current_time = mocked_clock.consistent_time(local_node.node());
250 assert_ne!(last_time, current_time, "at iteration {i}");
251 last_time = current_time;
252 }
253 }
254
255 #[test]
256 fn fixed_future_consistent_time() {
257 let mocked_clock = Clock::new_fixed_mocked(Instant::now() + Duration::from_secs(10));
258 let local_node = LocalNode::generate();
259
260 let time1 = mocked_clock.consistent_time(local_node.node());
261 std::thread::sleep(Duration::from_millis(10));
262 let time2 = mocked_clock.consistent_time(local_node.node());
263 assert_eq!(time1 + ConsistentTimestamp::from(1), time2); mocked_clock.reset_fixed_instant();
266 let time3 = mocked_clock.consistent_time(local_node.node());
267 std::thread::sleep(Duration::from_millis(10));
268 let time4 = mocked_clock.consistent_time(local_node.node());
269
270 let elaps = Duration::from_millis(10);
271 assert!((time4 - time3).unwrap() > elaps);
272 }
273
274 #[test]
275 fn unfixed_mocked_clock() {
276 let mocked_clock = Clock::new_mocked();
277 assert_ne!(mocked_clock.instant(), mocked_clock.instant());
278
279 let inst = Instant::now();
280 mocked_clock.set_fixed_instant(inst);
281
282 assert_eq!(mocked_clock.instant(), inst);
283
284 {
285 mocked_clock.reset_fixed_instant();
287
288 let t1 = mocked_clock.instant();
289
290 std::thread::sleep(Duration::from_millis(1));
293
294 let t2 = mocked_clock.instant();
295
296 assert_ne!(t1, t2);
297 }
298 }
299
300 #[test]
301 fn thread_safety() {
302 let now = Instant::now();
303
304 let mocked_clock = Arc::new(Clock::new_mocked());
305
306 let thread_clock = Arc::clone(&mocked_clock);
307 thread::spawn(move || {
308 thread_clock.set_fixed_instant(now);
309 })
310 .join()
311 .unwrap();
312
313 assert_eq!(mocked_clock.instant(), now);
314 }
315
316 #[test]
317 fn chrono_datetime() {
318 let clock = Clock::new();
319
320 let now1 = clock.now_chrono();
321 thread::sleep(Duration::from_nanos(1));
322 let now2 = clock.now_chrono();
323 assert!(now2 > now1);
324
325 let fixed_clock = Clock::new_fixed_mocked(Instant::now());
326 let now3 = fixed_clock.now_chrono();
327 assert!(now3 > now2);
328
329 let now4 = fixed_clock.now_chrono();
330 assert_eq!(now3, now4);
331 }
332}