Skip to main content

harn_clock/
lib.rs

1//! Unified `Clock` trait for the Harn runtime.
2//!
3//! Production code that needs the current time, monotonic measurement, or a
4//! cancellable sleep takes an `Arc<dyn Clock>` and reads through it. Real
5//! deployments wire [`RealClock`]; tests substitute [`PausedClock`] to drive
6//! virtual time deterministically; recording layers wrap any inner clock with
7//! [`RecordedClock`] to capture every observation for replay.
8//!
9//! See `docs/src/dev/testing.md` for usage patterns.
10
11use std::fmt;
12use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize};
18use time::OffsetDateTime;
19use tokio::sync::Notify;
20
21/// Unified clock abstraction.
22///
23/// All time observations and sleeps in production-facing Harn code should
24/// route through a `Clock`. Cron, the trigger dispatcher, the stdlib
25/// `now_ms` / `sleep_ms` builtins, and the `OrchestratorHarness` all accept
26/// `Arc<dyn Clock>` so test harnesses can swap in [`PausedClock`].
27#[async_trait]
28pub trait Clock: Send + Sync + fmt::Debug {
29    /// Current wall-clock UTC time.
30    fn now_utc(&self) -> OffsetDateTime;
31
32    /// Monotonic milliseconds since an implementation-defined origin.
33    ///
34    /// Required to be non-decreasing across calls on the same `Clock`
35    /// instance. Used by the stdlib `monotonic_ms` / `elapsed` builtins and
36    /// any code measuring durations across operations.
37    fn monotonic_ms(&self) -> i64;
38
39    /// Sleep for `duration`. No-op when `duration` is zero.
40    async fn sleep(&self, duration: Duration);
41
42    /// Sleep until the wall-clock UTC `deadline`. No-op if `deadline` is in
43    /// the past.
44    async fn sleep_until_utc(&self, deadline: OffsetDateTime);
45}
46
47/// Convert an `OffsetDateTime` to Unix epoch milliseconds. The division
48/// happens in `i128` so timestamps past April 2262 — where nanoseconds
49/// exceed `i64::MAX` — don't silently corrupt when cast.
50pub fn offset_datetime_to_ms(ts: OffsetDateTime) -> i64 {
51    (ts.unix_timestamp_nanos() / 1_000_000) as i64
52}
53
54/// Convenience: current wall-clock millis since UNIX_EPOCH.
55pub fn now_wall_ms(clock: &dyn Clock) -> i64 {
56    offset_datetime_to_ms(clock.now_utc())
57}
58
59// ── Real clock ─────────────────────────────────────────────────────────────────
60
61/// Production clock. Reads `OffsetDateTime::now_utc()` and
62/// `tokio::time::sleep`. Honors `tokio::time::pause()` for sleep-driven
63/// scheduling but `now_utc` always returns true wall time.
64pub struct RealClock {
65    monotonic_origin: tokio::time::Instant,
66}
67
68impl Default for RealClock {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl fmt::Debug for RealClock {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        f.debug_struct("RealClock").finish()
77    }
78}
79
80impl RealClock {
81    pub fn new() -> Self {
82        Self {
83            monotonic_origin: tokio::time::Instant::now(),
84        }
85    }
86
87    /// Convenience: wrap in an `Arc` for handing to consumers.
88    pub fn arc() -> Arc<dyn Clock> {
89        Arc::new(Self::new())
90    }
91}
92
93#[async_trait]
94impl Clock for RealClock {
95    fn now_utc(&self) -> OffsetDateTime {
96        OffsetDateTime::now_utc()
97    }
98
99    fn monotonic_ms(&self) -> i64 {
100        let elapsed = tokio::time::Instant::now().saturating_duration_since(self.monotonic_origin);
101        elapsed.as_millis() as i64
102    }
103
104    async fn sleep(&self, duration: Duration) {
105        if duration.is_zero() {
106            return;
107        }
108        tokio::time::sleep(duration).await;
109    }
110
111    async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
112        let now = self.now_utc();
113        if deadline <= now {
114            return;
115        }
116        let delta = deadline - now;
117        let Ok(duration) = Duration::try_from(delta) else {
118            return;
119        };
120        tokio::time::sleep(duration).await;
121    }
122}
123
124// ── Paused clock ───────────────────────────────────────────────────────────────
125
126/// Fully-virtual clock for tests.
127///
128/// Stores its own wall-clock cursor and a monotonic counter. Sleeps suspend on
129/// an internal `Notify` and wake when [`PausedClock::advance`] or
130/// [`PausedClock::set`] crosses the deadline. No tokio-runtime cooperation is
131/// required: `PausedClock` works inside `current_thread`, `multi_thread`, and
132/// `start_paused` runtimes equally well.
133///
134/// Pairs with `tokio::time::pause()` for tests that mix this clock with code
135/// that uses `tokio::time::sleep` directly (e.g. `tokio::time::timeout`).
136pub struct PausedClock {
137    state: Mutex<PausedState>,
138    notify: Notify,
139}
140
141struct PausedState {
142    wall: OffsetDateTime,
143    monotonic: Duration,
144}
145
146impl fmt::Debug for PausedClock {
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let state = self.state.lock();
149        f.debug_struct("PausedClock")
150            .field("wall", &state.wall)
151            .field("monotonic_ms", &state.monotonic.as_millis())
152            .finish()
153    }
154}
155
156impl PausedClock {
157    /// Build a paused clock pinned at `origin`.
158    pub fn new(origin: OffsetDateTime) -> Arc<Self> {
159        Arc::new(Self {
160            state: Mutex::new(PausedState {
161                wall: origin,
162                monotonic: Duration::ZERO,
163            }),
164            notify: Notify::new(),
165        })
166    }
167
168    /// Advance wall + monotonic by `duration` and wake any sleepers whose
169    /// deadlines are now in the past.
170    pub fn advance(&self, duration: Duration) {
171        let delta = match time::Duration::try_from(duration) {
172            Ok(value) => value,
173            Err(_) => return,
174        };
175        {
176            let mut state = self.state.lock();
177            state.wall += delta;
178            state.monotonic = state.monotonic.saturating_add(duration);
179        }
180        self.notify.notify_waiters();
181    }
182
183    /// Advance using a `time::Duration`. Negative durations are clamped to zero.
184    pub fn advance_time(&self, duration: time::Duration) {
185        let Ok(positive) = Duration::try_from(duration) else {
186            return;
187        };
188        self.advance(positive);
189    }
190
191    /// Step `ticks` times by `tick`, notifying sleepers between every step so
192    /// tasks observing intermediate wake-ups see each notification.
193    pub fn advance_ticks(&self, ticks: u32, tick: Duration) {
194        for _ in 0..ticks {
195            self.advance(tick);
196        }
197    }
198
199    /// Pin the wall clock to `wall`. Monotonic counter advances by the
200    /// (signed) delta, never moving backwards.
201    pub fn set(&self, wall: OffsetDateTime) {
202        {
203            let mut state = self.state.lock();
204            let delta = wall - state.wall;
205            state.wall = wall;
206            if delta.is_positive() {
207                if let Ok(positive) = Duration::try_from(delta) {
208                    state.monotonic = state.monotonic.saturating_add(positive);
209                }
210            }
211        }
212        self.notify.notify_waiters();
213    }
214}
215
216#[async_trait]
217impl Clock for PausedClock {
218    fn now_utc(&self) -> OffsetDateTime {
219        self.state.lock().wall
220    }
221
222    fn monotonic_ms(&self) -> i64 {
223        self.state.lock().monotonic.as_millis() as i64
224    }
225
226    async fn sleep(&self, duration: Duration) {
227        if duration.is_zero() {
228            return;
229        }
230        let deadline = self.state.lock().wall
231            + time::Duration::try_from(duration).unwrap_or(time::Duration::ZERO);
232        self.sleep_until_utc(deadline).await;
233    }
234
235    async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
236        loop {
237            let notified = self.notify.notified();
238            if self.state.lock().wall >= deadline {
239                return;
240            }
241            notified.await;
242        }
243    }
244}
245
246// ── Recorded clock ─────────────────────────────────────────────────────────────
247
248/// Single observation captured by [`RecordedClock`].
249#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "kind", rename_all = "snake_case")]
251pub enum ClockEvent {
252    NowUtc { wall_ns: i128 },
253    MonotonicMs { value: i64 },
254    Sleep { duration_ms: u64 },
255    SleepUntil { wall_ns: i128 },
256}
257
258/// In-memory append-only log of clock observations.
259#[derive(Debug, Default)]
260pub struct ClockEventLog {
261    events: Mutex<Vec<ClockEvent>>,
262}
263
264impl ClockEventLog {
265    pub fn new() -> Self {
266        Self::default()
267    }
268
269    /// Append an event. Called from `RecordedClock`; tests can also seed
270    /// expected events when building a replay oracle.
271    pub fn push(&self, event: ClockEvent) {
272        self.events.lock().push(event);
273    }
274
275    /// Snapshot of events recorded so far.
276    pub fn snapshot(&self) -> Vec<ClockEvent> {
277        self.events.lock().clone()
278    }
279
280    /// Number of events recorded.
281    pub fn len(&self) -> usize {
282        self.events.lock().len()
283    }
284
285    pub fn is_empty(&self) -> bool {
286        self.len() == 0
287    }
288}
289
290/// Wraps an inner [`Clock`] and records every observation to a
291/// [`ClockEventLog`].
292///
293/// The recording is the substrate the testbench replay/recording feature
294/// (#1441) builds on. It is deliberately scoped to the clock surface; other
295/// I/O substrates record separately.
296pub struct RecordedClock {
297    inner: Arc<dyn Clock>,
298    log: Arc<ClockEventLog>,
299}
300
301impl fmt::Debug for RecordedClock {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        f.debug_struct("RecordedClock")
304            .field("inner", &self.inner)
305            .field("events", &self.log.len())
306            .finish()
307    }
308}
309
310impl RecordedClock {
311    pub fn new(inner: Arc<dyn Clock>, log: Arc<ClockEventLog>) -> Self {
312        Self { inner, log }
313    }
314
315    pub fn log(&self) -> Arc<ClockEventLog> {
316        self.log.clone()
317    }
318}
319
320#[async_trait]
321impl Clock for RecordedClock {
322    fn now_utc(&self) -> OffsetDateTime {
323        let value = self.inner.now_utc();
324        self.log.push(ClockEvent::NowUtc {
325            wall_ns: value.unix_timestamp_nanos(),
326        });
327        value
328    }
329
330    fn monotonic_ms(&self) -> i64 {
331        let value = self.inner.monotonic_ms();
332        self.log.push(ClockEvent::MonotonicMs { value });
333        value
334    }
335
336    async fn sleep(&self, duration: Duration) {
337        self.log.push(ClockEvent::Sleep {
338            duration_ms: duration.as_millis() as u64,
339        });
340        self.inner.sleep(duration).await;
341    }
342
343    async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
344        self.log.push(ClockEvent::SleepUntil {
345            wall_ns: deadline.unix_timestamp_nanos(),
346        });
347        self.inner.sleep_until_utc(deadline).await;
348    }
349}
350
351// ── Tests ──────────────────────────────────────────────────────────────────────
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    fn epoch() -> OffsetDateTime {
358        OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap()
359    }
360
361    #[tokio::test]
362    async fn real_clock_returns_increasing_monotonic() {
363        let clock = RealClock::new();
364        let a = clock.monotonic_ms();
365        tokio::task::yield_now().await;
366        let b = clock.monotonic_ms();
367        assert!(b >= a, "monotonic must not move backwards");
368    }
369
370    #[tokio::test]
371    async fn paused_clock_pins_wall_and_monotonic_until_advanced() {
372        let clock = PausedClock::new(epoch());
373        assert_eq!(clock.now_utc(), epoch());
374        assert_eq!(clock.monotonic_ms(), 0);
375        clock.advance(Duration::from_millis(250));
376        assert_eq!(clock.monotonic_ms(), 250);
377        assert_eq!(clock.now_utc(), epoch() + time::Duration::milliseconds(250));
378    }
379
380    #[tokio::test]
381    async fn paused_clock_sleep_resumes_after_advance() {
382        let clock = PausedClock::new(epoch());
383        let clock_for_sleep = clock.clone();
384        let task = tokio::spawn(async move {
385            clock_for_sleep.sleep(Duration::from_secs(5)).await;
386        });
387        tokio::task::yield_now().await;
388        assert!(!task.is_finished(), "sleep should still be pending");
389        clock.advance(Duration::from_secs(10));
390        task.await.expect("sleep task panicked");
391    }
392
393    #[tokio::test]
394    async fn paused_clock_sleep_until_returns_immediately_for_past_deadline() {
395        let clock = PausedClock::new(epoch());
396        clock.advance(Duration::from_mins(1));
397        clock.sleep_until_utc(epoch()).await;
398    }
399
400    #[tokio::test]
401    async fn recorded_clock_appends_one_event_per_call() {
402        let log = Arc::new(ClockEventLog::new());
403        let clock = RecordedClock::new(PausedClock::new(epoch()), log.clone());
404        let _ = clock.now_utc();
405        let _ = clock.monotonic_ms();
406        clock.sleep(Duration::ZERO).await;
407        clock.sleep_until_utc(epoch()).await;
408        let events = log.snapshot();
409        assert_eq!(events.len(), 4);
410        assert!(matches!(events[0], ClockEvent::NowUtc { .. }));
411        assert!(matches!(events[1], ClockEvent::MonotonicMs { .. }));
412        assert!(matches!(events[2], ClockEvent::Sleep { duration_ms: 0 }));
413        assert!(matches!(events[3], ClockEvent::SleepUntil { .. }));
414    }
415
416    #[tokio::test]
417    async fn now_wall_ms_helper_matches_clock_observation() {
418        let clock = PausedClock::new(epoch());
419        let observed = now_wall_ms(clock.as_ref());
420        let expected = epoch().unix_timestamp_nanos() / 1_000_000;
421        assert_eq!(observed, expected as i64);
422    }
423}