1use std::fmt;
12use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use time::OffsetDateTime;
19use tokio::sync::Notify;
20
21#[async_trait]
28pub trait Clock: Send + Sync + fmt::Debug {
29 fn now_utc(&self) -> OffsetDateTime;
31
32 fn monotonic_ms(&self) -> i64;
38
39 async fn sleep(&self, duration: Duration);
41
42 async fn sleep_until_utc(&self, deadline: OffsetDateTime);
45}
46
47pub fn offset_datetime_to_ms(ts: OffsetDateTime) -> i64 {
51 (ts.unix_timestamp_nanos() / 1_000_000) as i64
52}
53
54pub fn now_wall_ms(clock: &dyn Clock) -> i64 {
56 offset_datetime_to_ms(clock.now_utc())
57}
58
59pub struct RealClock {
65 monotonic_origin: tokio::time::Instant,
66}
67
68impl Default for RealClock {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl fmt::Debug for RealClock {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 f.debug_struct("RealClock").finish()
77 }
78}
79
80impl RealClock {
81 pub fn new() -> Self {
82 Self {
83 monotonic_origin: tokio::time::Instant::now(),
84 }
85 }
86
87 pub fn arc() -> Arc<dyn Clock> {
89 Arc::new(Self::new())
90 }
91}
92
93#[async_trait]
94impl Clock for RealClock {
95 fn now_utc(&self) -> OffsetDateTime {
96 OffsetDateTime::now_utc()
97 }
98
99 fn monotonic_ms(&self) -> i64 {
100 let elapsed = tokio::time::Instant::now().saturating_duration_since(self.monotonic_origin);
101 elapsed.as_millis() as i64
102 }
103
104 async fn sleep(&self, duration: Duration) {
105 if duration.is_zero() {
106 return;
107 }
108 tokio::time::sleep(duration).await;
109 }
110
111 async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
112 let now = self.now_utc();
113 if deadline <= now {
114 return;
115 }
116 let delta = deadline - now;
117 let Ok(duration) = Duration::try_from(delta) else {
118 return;
119 };
120 tokio::time::sleep(duration).await;
121 }
122}
123
124pub struct PausedClock {
137 state: Mutex<PausedState>,
138 notify: Notify,
139}
140
141struct PausedState {
142 wall: OffsetDateTime,
143 monotonic: Duration,
144}
145
146impl fmt::Debug for PausedClock {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 let state = self.state.lock();
149 f.debug_struct("PausedClock")
150 .field("wall", &state.wall)
151 .field("monotonic_ms", &state.monotonic.as_millis())
152 .finish()
153 }
154}
155
156impl PausedClock {
157 pub fn new(origin: OffsetDateTime) -> Arc<Self> {
159 Arc::new(Self {
160 state: Mutex::new(PausedState {
161 wall: origin,
162 monotonic: Duration::ZERO,
163 }),
164 notify: Notify::new(),
165 })
166 }
167
168 pub fn advance(&self, duration: Duration) {
171 let delta = match time::Duration::try_from(duration) {
172 Ok(value) => value,
173 Err(_) => return,
174 };
175 {
176 let mut state = self.state.lock();
177 state.wall += delta;
178 state.monotonic = state.monotonic.saturating_add(duration);
179 }
180 self.notify.notify_waiters();
181 }
182
183 pub fn advance_time(&self, duration: time::Duration) {
185 let Ok(positive) = Duration::try_from(duration) else {
186 return;
187 };
188 self.advance(positive);
189 }
190
191 pub fn advance_ticks(&self, ticks: u32, tick: Duration) {
194 for _ in 0..ticks {
195 self.advance(tick);
196 }
197 }
198
199 pub fn set(&self, wall: OffsetDateTime) {
202 {
203 let mut state = self.state.lock();
204 let delta = wall - state.wall;
205 state.wall = wall;
206 if delta.is_positive() {
207 if let Ok(positive) = Duration::try_from(delta) {
208 state.monotonic = state.monotonic.saturating_add(positive);
209 }
210 }
211 }
212 self.notify.notify_waiters();
213 }
214}
215
216#[async_trait]
217impl Clock for PausedClock {
218 fn now_utc(&self) -> OffsetDateTime {
219 self.state.lock().wall
220 }
221
222 fn monotonic_ms(&self) -> i64 {
223 self.state.lock().monotonic.as_millis() as i64
224 }
225
226 async fn sleep(&self, duration: Duration) {
227 if duration.is_zero() {
228 return;
229 }
230 let deadline = self.state.lock().wall
231 + time::Duration::try_from(duration).unwrap_or(time::Duration::ZERO);
232 self.sleep_until_utc(deadline).await;
233 }
234
235 async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
236 loop {
237 let notified = self.notify.notified();
238 if self.state.lock().wall >= deadline {
239 return;
240 }
241 notified.await;
242 }
243 }
244}
245
246#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "kind", rename_all = "snake_case")]
251pub enum ClockEvent {
252 NowUtc { wall_ns: i128 },
253 MonotonicMs { value: i64 },
254 Sleep { duration_ms: u64 },
255 SleepUntil { wall_ns: i128 },
256}
257
258#[derive(Debug, Default)]
260pub struct ClockEventLog {
261 events: Mutex<Vec<ClockEvent>>,
262}
263
264impl ClockEventLog {
265 pub fn new() -> Self {
266 Self::default()
267 }
268
269 pub fn push(&self, event: ClockEvent) {
272 self.events.lock().push(event);
273 }
274
275 pub fn snapshot(&self) -> Vec<ClockEvent> {
277 self.events.lock().clone()
278 }
279
280 pub fn len(&self) -> usize {
282 self.events.lock().len()
283 }
284
285 pub fn is_empty(&self) -> bool {
286 self.len() == 0
287 }
288}
289
290pub struct RecordedClock {
297 inner: Arc<dyn Clock>,
298 log: Arc<ClockEventLog>,
299}
300
301impl fmt::Debug for RecordedClock {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 f.debug_struct("RecordedClock")
304 .field("inner", &self.inner)
305 .field("events", &self.log.len())
306 .finish()
307 }
308}
309
310impl RecordedClock {
311 pub fn new(inner: Arc<dyn Clock>, log: Arc<ClockEventLog>) -> Self {
312 Self { inner, log }
313 }
314
315 pub fn log(&self) -> Arc<ClockEventLog> {
316 self.log.clone()
317 }
318}
319
320#[async_trait]
321impl Clock for RecordedClock {
322 fn now_utc(&self) -> OffsetDateTime {
323 let value = self.inner.now_utc();
324 self.log.push(ClockEvent::NowUtc {
325 wall_ns: value.unix_timestamp_nanos(),
326 });
327 value
328 }
329
330 fn monotonic_ms(&self) -> i64 {
331 let value = self.inner.monotonic_ms();
332 self.log.push(ClockEvent::MonotonicMs { value });
333 value
334 }
335
336 async fn sleep(&self, duration: Duration) {
337 self.log.push(ClockEvent::Sleep {
338 duration_ms: duration.as_millis() as u64,
339 });
340 self.inner.sleep(duration).await;
341 }
342
343 async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
344 self.log.push(ClockEvent::SleepUntil {
345 wall_ns: deadline.unix_timestamp_nanos(),
346 });
347 self.inner.sleep_until_utc(deadline).await;
348 }
349}
350
351#[cfg(test)]
354mod tests {
355 use super::*;
356
357 fn epoch() -> OffsetDateTime {
358 OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap()
359 }
360
361 #[tokio::test]
362 async fn real_clock_returns_increasing_monotonic() {
363 let clock = RealClock::new();
364 let a = clock.monotonic_ms();
365 tokio::task::yield_now().await;
366 let b = clock.monotonic_ms();
367 assert!(b >= a, "monotonic must not move backwards");
368 }
369
370 #[tokio::test]
371 async fn paused_clock_pins_wall_and_monotonic_until_advanced() {
372 let clock = PausedClock::new(epoch());
373 assert_eq!(clock.now_utc(), epoch());
374 assert_eq!(clock.monotonic_ms(), 0);
375 clock.advance(Duration::from_millis(250));
376 assert_eq!(clock.monotonic_ms(), 250);
377 assert_eq!(clock.now_utc(), epoch() + time::Duration::milliseconds(250));
378 }
379
380 #[tokio::test]
381 async fn paused_clock_sleep_resumes_after_advance() {
382 let clock = PausedClock::new(epoch());
383 let clock_for_sleep = clock.clone();
384 let task = tokio::spawn(async move {
385 clock_for_sleep.sleep(Duration::from_secs(5)).await;
386 });
387 tokio::task::yield_now().await;
388 assert!(!task.is_finished(), "sleep should still be pending");
389 clock.advance(Duration::from_secs(10));
390 task.await.expect("sleep task panicked");
391 }
392
393 #[tokio::test]
394 async fn paused_clock_sleep_until_returns_immediately_for_past_deadline() {
395 let clock = PausedClock::new(epoch());
396 clock.advance(Duration::from_mins(1));
397 clock.sleep_until_utc(epoch()).await;
398 }
399
400 #[tokio::test]
401 async fn recorded_clock_appends_one_event_per_call() {
402 let log = Arc::new(ClockEventLog::new());
403 let clock = RecordedClock::new(PausedClock::new(epoch()), log.clone());
404 let _ = clock.now_utc();
405 let _ = clock.monotonic_ms();
406 clock.sleep(Duration::ZERO).await;
407 clock.sleep_until_utc(epoch()).await;
408 let events = log.snapshot();
409 assert_eq!(events.len(), 4);
410 assert!(matches!(events[0], ClockEvent::NowUtc { .. }));
411 assert!(matches!(events[1], ClockEvent::MonotonicMs { .. }));
412 assert!(matches!(events[2], ClockEvent::Sleep { duration_ms: 0 }));
413 assert!(matches!(events[3], ClockEvent::SleepUntil { .. }));
414 }
415
416 #[tokio::test]
417 async fn now_wall_ms_helper_matches_clock_observation() {
418 let clock = PausedClock::new(epoch());
419 let observed = now_wall_ms(clock.as_ref());
420 let expected = epoch().unix_timestamp_nanos() / 1_000_000;
421 assert_eq!(observed, expected as i64);
422 }
423}