1use std::fmt;
12use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use time::OffsetDateTime;
19use tokio::sync::Notify;
20
21#[async_trait]
28pub trait Clock: Send + Sync + fmt::Debug {
29 fn now_utc(&self) -> OffsetDateTime;
31
32 fn monotonic_ms(&self) -> i64;
38
39 async fn sleep(&self, duration: Duration);
41
42 async fn sleep_until_utc(&self, deadline: OffsetDateTime);
45}
46
47pub fn now_wall_ms(clock: &dyn Clock) -> i64 {
49 let ts = clock.now_utc();
50 let nanos = ts.unix_timestamp_nanos();
51 (nanos / 1_000_000) as i64
52}
53
54pub struct RealClock {
60 monotonic_origin: tokio::time::Instant,
61}
62
63impl Default for RealClock {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl fmt::Debug for RealClock {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("RealClock").finish()
72 }
73}
74
75impl RealClock {
76 pub fn new() -> Self {
77 Self {
78 monotonic_origin: tokio::time::Instant::now(),
79 }
80 }
81
82 pub fn arc() -> Arc<dyn Clock> {
84 Arc::new(Self::new())
85 }
86}
87
88#[async_trait]
89impl Clock for RealClock {
90 fn now_utc(&self) -> OffsetDateTime {
91 OffsetDateTime::now_utc()
92 }
93
94 fn monotonic_ms(&self) -> i64 {
95 let elapsed = tokio::time::Instant::now().saturating_duration_since(self.monotonic_origin);
96 elapsed.as_millis() as i64
97 }
98
99 async fn sleep(&self, duration: Duration) {
100 if duration.is_zero() {
101 return;
102 }
103 tokio::time::sleep(duration).await;
104 }
105
106 async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
107 let now = self.now_utc();
108 if deadline <= now {
109 return;
110 }
111 let delta = deadline - now;
112 let Ok(duration) = Duration::try_from(delta) else {
113 return;
114 };
115 tokio::time::sleep(duration).await;
116 }
117}
118
119pub struct PausedClock {
132 state: Mutex<PausedState>,
133 notify: Notify,
134}
135
136struct PausedState {
137 wall: OffsetDateTime,
138 monotonic: Duration,
139}
140
141impl fmt::Debug for PausedClock {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 let state = self.state.lock();
144 f.debug_struct("PausedClock")
145 .field("wall", &state.wall)
146 .field("monotonic_ms", &state.monotonic.as_millis())
147 .finish()
148 }
149}
150
151impl PausedClock {
152 pub fn new(origin: OffsetDateTime) -> Arc<Self> {
154 Arc::new(Self {
155 state: Mutex::new(PausedState {
156 wall: origin,
157 monotonic: Duration::ZERO,
158 }),
159 notify: Notify::new(),
160 })
161 }
162
163 pub fn advance(&self, duration: Duration) {
166 let delta = match time::Duration::try_from(duration) {
167 Ok(value) => value,
168 Err(_) => return,
169 };
170 {
171 let mut state = self.state.lock();
172 state.wall += delta;
173 state.monotonic = state.monotonic.saturating_add(duration);
174 }
175 self.notify.notify_waiters();
176 }
177
178 pub fn advance_time(&self, duration: time::Duration) {
180 let Ok(positive) = Duration::try_from(duration) else {
181 return;
182 };
183 self.advance(positive);
184 }
185
186 pub fn advance_ticks(&self, ticks: u32, tick: Duration) {
189 for _ in 0..ticks {
190 self.advance(tick);
191 }
192 }
193
194 pub fn set(&self, wall: OffsetDateTime) {
197 {
198 let mut state = self.state.lock();
199 let delta = wall - state.wall;
200 state.wall = wall;
201 if delta.is_positive() {
202 if let Ok(positive) = Duration::try_from(delta) {
203 state.monotonic = state.monotonic.saturating_add(positive);
204 }
205 }
206 }
207 self.notify.notify_waiters();
208 }
209}
210
211#[async_trait]
212impl Clock for PausedClock {
213 fn now_utc(&self) -> OffsetDateTime {
214 self.state.lock().wall
215 }
216
217 fn monotonic_ms(&self) -> i64 {
218 self.state.lock().monotonic.as_millis() as i64
219 }
220
221 async fn sleep(&self, duration: Duration) {
222 if duration.is_zero() {
223 return;
224 }
225 let deadline = self.state.lock().wall
226 + time::Duration::try_from(duration).unwrap_or(time::Duration::ZERO);
227 self.sleep_until_utc(deadline).await;
228 }
229
230 async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
231 loop {
232 let notified = self.notify.notified();
233 if self.state.lock().wall >= deadline {
234 return;
235 }
236 notified.await;
237 }
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245#[serde(tag = "kind", rename_all = "snake_case")]
246pub enum ClockEvent {
247 NowUtc { wall_ns: i128 },
248 MonotonicMs { value: i64 },
249 Sleep { duration_ms: u64 },
250 SleepUntil { wall_ns: i128 },
251}
252
253#[derive(Debug, Default)]
255pub struct ClockEventLog {
256 events: Mutex<Vec<ClockEvent>>,
257}
258
259impl ClockEventLog {
260 pub fn new() -> Self {
261 Self::default()
262 }
263
264 pub fn push(&self, event: ClockEvent) {
267 self.events.lock().push(event);
268 }
269
270 pub fn snapshot(&self) -> Vec<ClockEvent> {
272 self.events.lock().clone()
273 }
274
275 pub fn len(&self) -> usize {
277 self.events.lock().len()
278 }
279
280 pub fn is_empty(&self) -> bool {
281 self.len() == 0
282 }
283}
284
285pub struct RecordedClock {
292 inner: Arc<dyn Clock>,
293 log: Arc<ClockEventLog>,
294}
295
296impl fmt::Debug for RecordedClock {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 f.debug_struct("RecordedClock")
299 .field("inner", &self.inner)
300 .field("events", &self.log.len())
301 .finish()
302 }
303}
304
305impl RecordedClock {
306 pub fn new(inner: Arc<dyn Clock>, log: Arc<ClockEventLog>) -> Self {
307 Self { inner, log }
308 }
309
310 pub fn log(&self) -> Arc<ClockEventLog> {
311 self.log.clone()
312 }
313}
314
315#[async_trait]
316impl Clock for RecordedClock {
317 fn now_utc(&self) -> OffsetDateTime {
318 let value = self.inner.now_utc();
319 self.log.push(ClockEvent::NowUtc {
320 wall_ns: value.unix_timestamp_nanos(),
321 });
322 value
323 }
324
325 fn monotonic_ms(&self) -> i64 {
326 let value = self.inner.monotonic_ms();
327 self.log.push(ClockEvent::MonotonicMs { value });
328 value
329 }
330
331 async fn sleep(&self, duration: Duration) {
332 self.log.push(ClockEvent::Sleep {
333 duration_ms: duration.as_millis() as u64,
334 });
335 self.inner.sleep(duration).await;
336 }
337
338 async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
339 self.log.push(ClockEvent::SleepUntil {
340 wall_ns: deadline.unix_timestamp_nanos(),
341 });
342 self.inner.sleep_until_utc(deadline).await;
343 }
344}
345
346#[cfg(test)]
349mod tests {
350 use super::*;
351
352 fn epoch() -> OffsetDateTime {
353 OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap()
354 }
355
356 #[tokio::test]
357 async fn real_clock_returns_increasing_monotonic() {
358 let clock = RealClock::new();
359 let a = clock.monotonic_ms();
360 tokio::task::yield_now().await;
361 let b = clock.monotonic_ms();
362 assert!(b >= a, "monotonic must not move backwards");
363 }
364
365 #[tokio::test]
366 async fn paused_clock_pins_wall_and_monotonic_until_advanced() {
367 let clock = PausedClock::new(epoch());
368 assert_eq!(clock.now_utc(), epoch());
369 assert_eq!(clock.monotonic_ms(), 0);
370 clock.advance(Duration::from_millis(250));
371 assert_eq!(clock.monotonic_ms(), 250);
372 assert_eq!(clock.now_utc(), epoch() + time::Duration::milliseconds(250));
373 }
374
375 #[tokio::test]
376 async fn paused_clock_sleep_resumes_after_advance() {
377 let clock = PausedClock::new(epoch());
378 let clock_for_sleep = clock.clone();
379 let task = tokio::spawn(async move {
380 clock_for_sleep.sleep(Duration::from_secs(5)).await;
381 });
382 tokio::task::yield_now().await;
383 assert!(!task.is_finished(), "sleep should still be pending");
384 clock.advance(Duration::from_secs(10));
385 task.await.expect("sleep task panicked");
386 }
387
388 #[tokio::test]
389 async fn paused_clock_sleep_until_returns_immediately_for_past_deadline() {
390 let clock = PausedClock::new(epoch());
391 clock.advance(Duration::from_secs(60));
392 clock.sleep_until_utc(epoch()).await;
393 }
394
395 #[tokio::test]
396 async fn recorded_clock_appends_one_event_per_call() {
397 let log = Arc::new(ClockEventLog::new());
398 let clock = RecordedClock::new(PausedClock::new(epoch()), log.clone());
399 let _ = clock.now_utc();
400 let _ = clock.monotonic_ms();
401 clock.sleep(Duration::ZERO).await;
402 clock.sleep_until_utc(epoch()).await;
403 let events = log.snapshot();
404 assert_eq!(events.len(), 4);
405 assert!(matches!(events[0], ClockEvent::NowUtc { .. }));
406 assert!(matches!(events[1], ClockEvent::MonotonicMs { .. }));
407 assert!(matches!(events[2], ClockEvent::Sleep { duration_ms: 0 }));
408 assert!(matches!(events[3], ClockEvent::SleepUntil { .. }));
409 }
410
411 #[tokio::test]
412 async fn now_wall_ms_helper_matches_clock_observation() {
413 let clock = PausedClock::new(epoch());
414 let observed = now_wall_ms(clock.as_ref());
415 let expected = epoch().unix_timestamp_nanos() / 1_000_000;
416 assert_eq!(observed, expected as i64);
417 }
418}