Skip to main content

harn_vm/triggers/test_util/
clock.rs

1use std::cell::RefCell;
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::{Duration as StdDuration, Instant};
4
5use async_trait::async_trait;
6use time::OffsetDateTime;
7use tokio::sync::Notify;
8
9use crate::connectors::cron::scheduler::Clock;
10
11thread_local! {
12    static MOCK_CLOCK_STACK: RefCell<Vec<Arc<MockClock>>> = const { RefCell::new(Vec::new()) };
13}
14
15fn process_start() -> &'static Instant {
16    static PROCESS_START: OnceLock<Instant> = OnceLock::new();
17    PROCESS_START.get_or_init(Instant::now)
18}
19
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
21pub struct ClockInstant(StdDuration);
22
23impl ClockInstant {
24    pub fn duration_since(self, earlier: Self) -> StdDuration {
25        self.0.saturating_sub(earlier.0)
26    }
27
28    pub fn as_millis(self) -> u128 {
29        self.0.as_millis()
30    }
31}
32
33pub struct ClockOverrideGuard;
34
35impl Drop for ClockOverrideGuard {
36    fn drop(&mut self) {
37        MOCK_CLOCK_STACK.with(|slot| {
38            slot.borrow_mut().pop();
39        });
40    }
41}
42
43#[derive(Debug)]
44pub struct MockClock {
45    now: Mutex<OffsetDateTime>,
46    monotonic: Mutex<StdDuration>,
47    notify: Notify,
48}
49
50impl MockClock {
51    pub fn new(now: OffsetDateTime) -> Arc<Self> {
52        Arc::new(Self {
53            now: Mutex::new(now),
54            monotonic: Mutex::new(StdDuration::ZERO),
55            notify: Notify::new(),
56        })
57    }
58
59    pub fn monotonic_now(&self) -> ClockInstant {
60        ClockInstant(
61            *self
62                .monotonic
63                .lock()
64                .expect("mock clock monotonic mutex poisoned"),
65        )
66    }
67
68    pub async fn set(&self, now: OffsetDateTime) {
69        let mut wall = self.now.lock().expect("mock clock mutex poisoned");
70        let previous = *wall;
71        *wall = now;
72        drop(wall);
73
74        if now > previous {
75            let delta = now - previous;
76            if let Ok(delta) = TryInto::<StdDuration>::try_into(delta) {
77                let mut monotonic = self
78                    .monotonic
79                    .lock()
80                    .expect("mock clock monotonic mutex poisoned");
81                *monotonic += delta;
82            }
83        }
84
85        self.notify.notify_waiters();
86    }
87
88    pub async fn advance(&self, duration: time::Duration) {
89        let Ok(delta) = TryInto::<StdDuration>::try_into(duration) else {
90            return;
91        };
92        self.advance_std(delta).await;
93    }
94
95    pub async fn advance_std(&self, duration: StdDuration) {
96        if duration.is_zero() {
97            self.notify.notify_waiters();
98            return;
99        }
100        let delta =
101            time::Duration::try_from(duration).expect("std duration should fit in time::Duration");
102        let next = *self.now.lock().expect("mock clock mutex poisoned") + delta;
103        self.set(next).await;
104    }
105
106    pub async fn advance_ticks(&self, ticks: u32, tick: StdDuration) {
107        for _ in 0..ticks {
108            self.advance_std(tick).await;
109        }
110    }
111}
112
113#[async_trait]
114impl Clock for MockClock {
115    fn now(&self) -> OffsetDateTime {
116        *self.now.lock().expect("mock clock mutex poisoned")
117    }
118
119    async fn sleep_until(&self, deadline: OffsetDateTime) {
120        loop {
121            if *self.now.lock().expect("mock clock mutex poisoned") >= deadline {
122                return;
123            }
124            self.notify.notified().await;
125        }
126    }
127}
128
129pub fn install_override(clock: Arc<MockClock>) -> ClockOverrideGuard {
130    MOCK_CLOCK_STACK.with(|slot| {
131        slot.borrow_mut().push(clock);
132    });
133    ClockOverrideGuard
134}
135
136pub fn active_mock_clock() -> Option<Arc<MockClock>> {
137    MOCK_CLOCK_STACK.with(|slot| slot.borrow().last().cloned())
138}
139
140pub fn now_utc() -> OffsetDateTime {
141    active_mock_clock()
142        .map(|clock| clock.now())
143        .unwrap_or_else(OffsetDateTime::now_utc)
144}
145
146pub fn now_ms() -> i64 {
147    now_utc().unix_timestamp_nanos() as i64 / 1_000_000
148}
149
150pub fn instant_now() -> ClockInstant {
151    active_mock_clock()
152        .map(|clock| clock.monotonic_now())
153        .unwrap_or_else(|| ClockInstant(process_start().elapsed()))
154}