harn_vm/triggers/test_util/
clock.rs1use std::cell::RefCell;
2use std::sync::{Arc, Mutex, OnceLock};
3use std::time::{Duration as StdDuration, Instant};
4
5use async_trait::async_trait;
6use time::OffsetDateTime;
7use tokio::sync::Notify;
8
9use crate::connectors::cron::scheduler::Clock;
10
11thread_local! {
12 static MOCK_CLOCK_STACK: RefCell<Vec<Arc<MockClock>>> = const { RefCell::new(Vec::new()) };
13}
14
15fn process_start() -> &'static Instant {
16 static PROCESS_START: OnceLock<Instant> = OnceLock::new();
17 PROCESS_START.get_or_init(Instant::now)
18}
19
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
21pub struct ClockInstant(StdDuration);
22
23impl ClockInstant {
24 pub fn duration_since(self, earlier: Self) -> StdDuration {
25 self.0.saturating_sub(earlier.0)
26 }
27
28 pub fn as_millis(self) -> u128 {
29 self.0.as_millis()
30 }
31}
32
33pub struct ClockOverrideGuard;
34
35impl Drop for ClockOverrideGuard {
36 fn drop(&mut self) {
37 MOCK_CLOCK_STACK.with(|slot| {
38 slot.borrow_mut().pop();
39 });
40 }
41}
42
43#[derive(Debug)]
44pub struct MockClock {
45 now: Mutex<OffsetDateTime>,
46 monotonic: Mutex<StdDuration>,
47 notify: Notify,
48}
49
50impl MockClock {
51 pub fn new(now: OffsetDateTime) -> Arc<Self> {
52 Arc::new(Self {
53 now: Mutex::new(now),
54 monotonic: Mutex::new(StdDuration::ZERO),
55 notify: Notify::new(),
56 })
57 }
58
59 pub fn monotonic_now(&self) -> ClockInstant {
60 ClockInstant(
61 *self
62 .monotonic
63 .lock()
64 .expect("mock clock monotonic mutex poisoned"),
65 )
66 }
67
68 pub async fn set(&self, now: OffsetDateTime) {
69 let mut wall = self.now.lock().expect("mock clock mutex poisoned");
70 let previous = *wall;
71 *wall = now;
72 drop(wall);
73
74 if now > previous {
75 let delta = now - previous;
76 if let Ok(delta) = TryInto::<StdDuration>::try_into(delta) {
77 let mut monotonic = self
78 .monotonic
79 .lock()
80 .expect("mock clock monotonic mutex poisoned");
81 *monotonic += delta;
82 }
83 }
84
85 self.notify.notify_waiters();
86 }
87
88 pub async fn advance(&self, duration: time::Duration) {
89 let Ok(delta) = TryInto::<StdDuration>::try_into(duration) else {
90 return;
91 };
92 self.advance_std(delta).await;
93 }
94
95 pub async fn advance_std(&self, duration: StdDuration) {
96 if duration.is_zero() {
97 self.notify.notify_waiters();
98 return;
99 }
100 let delta =
101 time::Duration::try_from(duration).expect("std duration should fit in time::Duration");
102 let next = *self.now.lock().expect("mock clock mutex poisoned") + delta;
103 self.set(next).await;
104 }
105
106 pub async fn advance_ticks(&self, ticks: u32, tick: StdDuration) {
107 for _ in 0..ticks {
108 self.advance_std(tick).await;
109 }
110 }
111}
112
113#[async_trait]
114impl Clock for MockClock {
115 fn now(&self) -> OffsetDateTime {
116 *self.now.lock().expect("mock clock mutex poisoned")
117 }
118
119 async fn sleep_until(&self, deadline: OffsetDateTime) {
120 loop {
121 if *self.now.lock().expect("mock clock mutex poisoned") >= deadline {
122 return;
123 }
124 self.notify.notified().await;
125 }
126 }
127}
128
129pub fn install_override(clock: Arc<MockClock>) -> ClockOverrideGuard {
130 MOCK_CLOCK_STACK.with(|slot| {
131 slot.borrow_mut().push(clock);
132 });
133 ClockOverrideGuard
134}
135
136pub fn active_mock_clock() -> Option<Arc<MockClock>> {
137 MOCK_CLOCK_STACK.with(|slot| slot.borrow().last().cloned())
138}
139
140pub fn now_utc() -> OffsetDateTime {
141 active_mock_clock()
142 .map(|clock| clock.now())
143 .unwrap_or_else(OffsetDateTime::now_utc)
144}
145
146pub fn now_ms() -> i64 {
147 now_utc().unix_timestamp_nanos() as i64 / 1_000_000
148}
149
150pub fn instant_now() -> ClockInstant {
151 active_mock_clock()
152 .map(|clock| clock.monotonic_now())
153 .unwrap_or_else(|| ClockInstant(process_start().elapsed()))
154}