Skip to main content

harn_clock/
lib.rs

1//! Unified `Clock` trait for the Harn runtime.
2//!
3//! Production code that needs the current time, monotonic measurement, or a
4//! cancellable sleep takes an `Arc<dyn Clock>` and reads through it. Real
5//! deployments wire [`RealClock`]; tests substitute [`PausedClock`] to drive
6//! virtual time deterministically; recording layers wrap any inner clock with
7//! [`RecordedClock`] to capture every observation for replay.
8//!
9//! See `docs/src/dev/testing.md` for usage patterns.
10
11use std::fmt;
12use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use time::OffsetDateTime;
19use tokio::sync::Notify;
20
21/// Unified clock abstraction.
22///
23/// All time observations and sleeps in production-facing Harn code should
24/// route through a `Clock`. Cron, the trigger dispatcher, the stdlib
25/// `now_ms` / `sleep_ms` builtins, and the `OrchestratorHarness` all accept
26/// `Arc<dyn Clock>` so test harnesses can swap in [`PausedClock`].
27#[async_trait]
28pub trait Clock: Send + Sync + fmt::Debug {
29    /// Current wall-clock UTC time.
30    fn now_utc(&self) -> OffsetDateTime;
31
32    /// Monotonic milliseconds since an implementation-defined origin.
33    ///
34    /// Required to be non-decreasing across calls on the same `Clock`
35    /// instance. Used by the stdlib `monotonic_ms` / `elapsed` builtins and
36    /// any code measuring durations across operations.
37    fn monotonic_ms(&self) -> i64;
38
39    /// Sleep for `duration`. No-op when `duration` is zero.
40    async fn sleep(&self, duration: Duration);
41
42    /// Sleep until the wall-clock UTC `deadline`. No-op if `deadline` is in
43    /// the past.
44    async fn sleep_until_utc(&self, deadline: OffsetDateTime);
45}
46
47/// Convenience: current wall-clock millis since UNIX_EPOCH.
48pub fn now_wall_ms(clock: &dyn Clock) -> i64 {
49    let ts = clock.now_utc();
50    let nanos = ts.unix_timestamp_nanos();
51    (nanos / 1_000_000) as i64
52}
53
54// ── Real clock ─────────────────────────────────────────────────────────────────
55
56/// Production clock. Reads `OffsetDateTime::now_utc()` and
57/// `tokio::time::sleep`. Honors `tokio::time::pause()` for sleep-driven
58/// scheduling but `now_utc` always returns true wall time.
59pub struct RealClock {
60    monotonic_origin: tokio::time::Instant,
61}
62
63impl Default for RealClock {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl fmt::Debug for RealClock {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("RealClock").finish()
72    }
73}
74
75impl RealClock {
76    pub fn new() -> Self {
77        Self {
78            monotonic_origin: tokio::time::Instant::now(),
79        }
80    }
81
82    /// Convenience: wrap in an `Arc` for handing to consumers.
83    pub fn arc() -> Arc<dyn Clock> {
84        Arc::new(Self::new())
85    }
86}
87
88#[async_trait]
89impl Clock for RealClock {
90    fn now_utc(&self) -> OffsetDateTime {
91        OffsetDateTime::now_utc()
92    }
93
94    fn monotonic_ms(&self) -> i64 {
95        let elapsed = tokio::time::Instant::now().saturating_duration_since(self.monotonic_origin);
96        elapsed.as_millis() as i64
97    }
98
99    async fn sleep(&self, duration: Duration) {
100        if duration.is_zero() {
101            return;
102        }
103        tokio::time::sleep(duration).await;
104    }
105
106    async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
107        let now = self.now_utc();
108        if deadline <= now {
109            return;
110        }
111        let delta = deadline - now;
112        let Ok(duration) = Duration::try_from(delta) else {
113            return;
114        };
115        tokio::time::sleep(duration).await;
116    }
117}
118
119// ── Paused clock ───────────────────────────────────────────────────────────────
120
121/// Fully-virtual clock for tests.
122///
123/// Stores its own wall-clock cursor and a monotonic counter. Sleeps suspend on
124/// an internal `Notify` and wake when [`PausedClock::advance`] or
125/// [`PausedClock::set`] crosses the deadline. No tokio-runtime cooperation is
126/// required: `PausedClock` works inside `current_thread`, `multi_thread`, and
127/// `start_paused` runtimes equally well.
128///
129/// Pairs with `tokio::time::pause()` for tests that mix this clock with code
130/// that uses `tokio::time::sleep` directly (e.g. `tokio::time::timeout`).
131pub struct PausedClock {
132    state: Mutex<PausedState>,
133    notify: Notify,
134}
135
136struct PausedState {
137    wall: OffsetDateTime,
138    monotonic: Duration,
139}
140
141impl fmt::Debug for PausedClock {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        let state = self.state.lock();
144        f.debug_struct("PausedClock")
145            .field("wall", &state.wall)
146            .field("monotonic_ms", &state.monotonic.as_millis())
147            .finish()
148    }
149}
150
151impl PausedClock {
152    /// Build a paused clock pinned at `origin`.
153    pub fn new(origin: OffsetDateTime) -> Arc<Self> {
154        Arc::new(Self {
155            state: Mutex::new(PausedState {
156                wall: origin,
157                monotonic: Duration::ZERO,
158            }),
159            notify: Notify::new(),
160        })
161    }
162
163    /// Advance wall + monotonic by `duration` and wake any sleepers whose
164    /// deadlines are now in the past.
165    pub fn advance(&self, duration: Duration) {
166        let delta = match time::Duration::try_from(duration) {
167            Ok(value) => value,
168            Err(_) => return,
169        };
170        {
171            let mut state = self.state.lock();
172            state.wall += delta;
173            state.monotonic = state.monotonic.saturating_add(duration);
174        }
175        self.notify.notify_waiters();
176    }
177
178    /// Advance using a `time::Duration`. Negative durations are clamped to zero.
179    pub fn advance_time(&self, duration: time::Duration) {
180        let Ok(positive) = Duration::try_from(duration) else {
181            return;
182        };
183        self.advance(positive);
184    }
185
186    /// Step `ticks` times by `tick`, notifying sleepers between every step so
187    /// tasks observing intermediate wake-ups see each notification.
188    pub fn advance_ticks(&self, ticks: u32, tick: Duration) {
189        for _ in 0..ticks {
190            self.advance(tick);
191        }
192    }
193
194    /// Pin the wall clock to `wall`. Monotonic counter advances by the
195    /// (signed) delta, never moving backwards.
196    pub fn set(&self, wall: OffsetDateTime) {
197        {
198            let mut state = self.state.lock();
199            let delta = wall - state.wall;
200            state.wall = wall;
201            if delta.is_positive() {
202                if let Ok(positive) = Duration::try_from(delta) {
203                    state.monotonic = state.monotonic.saturating_add(positive);
204                }
205            }
206        }
207        self.notify.notify_waiters();
208    }
209}
210
211#[async_trait]
212impl Clock for PausedClock {
213    fn now_utc(&self) -> OffsetDateTime {
214        self.state.lock().wall
215    }
216
217    fn monotonic_ms(&self) -> i64 {
218        self.state.lock().monotonic.as_millis() as i64
219    }
220
221    async fn sleep(&self, duration: Duration) {
222        if duration.is_zero() {
223            return;
224        }
225        let deadline = self.state.lock().wall
226            + time::Duration::try_from(duration).unwrap_or(time::Duration::ZERO);
227        self.sleep_until_utc(deadline).await;
228    }
229
230    async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
231        loop {
232            let notified = self.notify.notified();
233            if self.state.lock().wall >= deadline {
234                return;
235            }
236            notified.await;
237        }
238    }
239}
240
241// ── Recorded clock ─────────────────────────────────────────────────────────────
242
243/// Single observation captured by [`RecordedClock`].
244#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
245#[serde(tag = "kind", rename_all = "snake_case")]
246pub enum ClockEvent {
247    NowUtc { wall_ns: i128 },
248    MonotonicMs { value: i64 },
249    Sleep { duration_ms: u64 },
250    SleepUntil { wall_ns: i128 },
251}
252
253/// In-memory append-only log of clock observations.
254#[derive(Debug, Default)]
255pub struct ClockEventLog {
256    events: Mutex<Vec<ClockEvent>>,
257}
258
259impl ClockEventLog {
260    pub fn new() -> Self {
261        Self::default()
262    }
263
264    /// Append an event. Called from `RecordedClock`; tests can also seed
265    /// expected events when building a replay oracle.
266    pub fn push(&self, event: ClockEvent) {
267        self.events.lock().push(event);
268    }
269
270    /// Snapshot of events recorded so far.
271    pub fn snapshot(&self) -> Vec<ClockEvent> {
272        self.events.lock().clone()
273    }
274
275    /// Number of events recorded.
276    pub fn len(&self) -> usize {
277        self.events.lock().len()
278    }
279
280    pub fn is_empty(&self) -> bool {
281        self.len() == 0
282    }
283}
284
285/// Wraps an inner [`Clock`] and records every observation to a
286/// [`ClockEventLog`].
287///
288/// The recording is the substrate the testbench replay/recording feature
289/// (#1441) builds on. It is deliberately scoped to the clock surface; other
290/// I/O substrates record separately.
291pub struct RecordedClock {
292    inner: Arc<dyn Clock>,
293    log: Arc<ClockEventLog>,
294}
295
296impl fmt::Debug for RecordedClock {
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298        f.debug_struct("RecordedClock")
299            .field("inner", &self.inner)
300            .field("events", &self.log.len())
301            .finish()
302    }
303}
304
305impl RecordedClock {
306    pub fn new(inner: Arc<dyn Clock>, log: Arc<ClockEventLog>) -> Self {
307        Self { inner, log }
308    }
309
310    pub fn log(&self) -> Arc<ClockEventLog> {
311        self.log.clone()
312    }
313}
314
315#[async_trait]
316impl Clock for RecordedClock {
317    fn now_utc(&self) -> OffsetDateTime {
318        let value = self.inner.now_utc();
319        self.log.push(ClockEvent::NowUtc {
320            wall_ns: value.unix_timestamp_nanos(),
321        });
322        value
323    }
324
325    fn monotonic_ms(&self) -> i64 {
326        let value = self.inner.monotonic_ms();
327        self.log.push(ClockEvent::MonotonicMs { value });
328        value
329    }
330
331    async fn sleep(&self, duration: Duration) {
332        self.log.push(ClockEvent::Sleep {
333            duration_ms: duration.as_millis() as u64,
334        });
335        self.inner.sleep(duration).await;
336    }
337
338    async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
339        self.log.push(ClockEvent::SleepUntil {
340            wall_ns: deadline.unix_timestamp_nanos(),
341        });
342        self.inner.sleep_until_utc(deadline).await;
343    }
344}
345
346// ── Tests ──────────────────────────────────────────────────────────────────────
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    fn epoch() -> OffsetDateTime {
353        OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap()
354    }
355
356    #[tokio::test]
357    async fn real_clock_returns_increasing_monotonic() {
358        let clock = RealClock::new();
359        let a = clock.monotonic_ms();
360        tokio::task::yield_now().await;
361        let b = clock.monotonic_ms();
362        assert!(b >= a, "monotonic must not move backwards");
363    }
364
365    #[tokio::test]
366    async fn paused_clock_pins_wall_and_monotonic_until_advanced() {
367        let clock = PausedClock::new(epoch());
368        assert_eq!(clock.now_utc(), epoch());
369        assert_eq!(clock.monotonic_ms(), 0);
370        clock.advance(Duration::from_millis(250));
371        assert_eq!(clock.monotonic_ms(), 250);
372        assert_eq!(clock.now_utc(), epoch() + time::Duration::milliseconds(250));
373    }
374
375    #[tokio::test]
376    async fn paused_clock_sleep_resumes_after_advance() {
377        let clock = PausedClock::new(epoch());
378        let clock_for_sleep = clock.clone();
379        let task = tokio::spawn(async move {
380            clock_for_sleep.sleep(Duration::from_secs(5)).await;
381        });
382        tokio::task::yield_now().await;
383        assert!(!task.is_finished(), "sleep should still be pending");
384        clock.advance(Duration::from_secs(10));
385        task.await.expect("sleep task panicked");
386    }
387
388    #[tokio::test]
389    async fn paused_clock_sleep_until_returns_immediately_for_past_deadline() {
390        let clock = PausedClock::new(epoch());
391        clock.advance(Duration::from_secs(60));
392        clock.sleep_until_utc(epoch()).await;
393    }
394
395    #[tokio::test]
396    async fn recorded_clock_appends_one_event_per_call() {
397        let log = Arc::new(ClockEventLog::new());
398        let clock = RecordedClock::new(PausedClock::new(epoch()), log.clone());
399        let _ = clock.now_utc();
400        let _ = clock.monotonic_ms();
401        clock.sleep(Duration::ZERO).await;
402        clock.sleep_until_utc(epoch()).await;
403        let events = log.snapshot();
404        assert_eq!(events.len(), 4);
405        assert!(matches!(events[0], ClockEvent::NowUtc { .. }));
406        assert!(matches!(events[1], ClockEvent::MonotonicMs { .. }));
407        assert!(matches!(events[2], ClockEvent::Sleep { duration_ms: 0 }));
408        assert!(matches!(events[3], ClockEvent::SleepUntil { .. }));
409    }
410
411    #[tokio::test]
412    async fn now_wall_ms_helper_matches_clock_observation() {
413        let clock = PausedClock::new(epoch());
414        let observed = now_wall_ms(clock.as_ref());
415        let expected = epoch().unix_timestamp_nanos() / 1_000_000;
416        assert_eq!(observed, expected as i64);
417    }
418}