Skip to main content

atomr_core/
time.rs

1//! Logical-clock abstraction shared across the substrate.
2//!
3//! Time-based behaviour in atomr (stream throttling, replay gating, rate
4//! limiting) historically reached for `tokio::time` directly, which couples
5//! correctness to the wall clock. That is fatal for deterministic replay and
6//! point-in-time backtests, where consumers must not observe data "ahead" of a
7//! simulation watermark regardless of async latency.
8//!
9//! This module introduces a pluggable [`Clock`] so production code runs on
10//! [`SystemClock`] while replay / backtest harnesses drive a [`ManualClock`]
11//! they advance explicitly. It is the foundation for the clock-gated stream
12//! source (FR-2), the token-bucket rate limiter (FR-3), and record-and-replay
13//! determinism (FR-13).
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19/// Monotonic logical time, measured in nanoseconds since an arbitrary epoch.
20///
21/// For [`SystemClock`] the epoch is the Unix epoch; for [`ManualClock`] it is
22/// whatever the harness defines. Only ordering and differences are meaningful
23/// across clocks of the same kind.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
25pub struct LogicalTime(pub u64);
26
27impl LogicalTime {
28    /// The zero instant.
29    pub const ZERO: LogicalTime = LogicalTime(0);
30
31    /// Construct from a raw nanosecond count.
32    pub const fn from_nanos(nanos: u64) -> Self {
33        LogicalTime(nanos)
34    }
35
36    /// Construct from a millisecond count (saturating).
37    pub const fn from_millis(millis: u64) -> Self {
38        LogicalTime(millis.saturating_mul(1_000_000))
39    }
40
41    /// Raw nanosecond count.
42    pub const fn as_nanos(self) -> u64 {
43        self.0
44    }
45}
46
47/// A source of [`LogicalTime`].
48///
49/// Implementations must be cheap to call and thread-safe; emission/gating logic
50/// may poll `now()` frequently.
51pub trait Clock: Send + Sync {
52    /// The current logical instant.
53    fn now(&self) -> LogicalTime;
54}
55
56impl<C: Clock + ?Sized> Clock for Arc<C> {
57    fn now(&self) -> LogicalTime {
58        (**self).now()
59    }
60}
61
62/// Wall-clock [`Clock`] backed by `SystemTime`.
63#[derive(Debug, Clone, Copy, Default)]
64pub struct SystemClock;
65
66impl Clock for SystemClock {
67    fn now(&self) -> LogicalTime {
68        let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
69        LogicalTime(nanos)
70    }
71}
72
73/// An explicitly-advanced logical clock for deterministic replay and backtests.
74///
75/// The watermark only ever moves forward: [`advance_to`](Self::advance_to)
76/// ignores regressions, guaranteeing monotonicity even under concurrent
77/// advances. Clones share the same underlying watermark, so a harness can hold
78/// one handle while a stream operator holds another.
79#[derive(Debug, Clone, Default)]
80pub struct ManualClock {
81    watermark: Arc<AtomicU64>,
82}
83
84impl ManualClock {
85    /// A new clock at [`LogicalTime::ZERO`].
86    pub fn new() -> Self {
87        Self { watermark: Arc::new(AtomicU64::new(0)) }
88    }
89
90    /// A new clock starting at `start`.
91    pub fn at(start: LogicalTime) -> Self {
92        Self { watermark: Arc::new(AtomicU64::new(start.0)) }
93    }
94
95    /// Advance the watermark to `target`. Monotonic — a `target` at or below the
96    /// current watermark is a no-op.
97    pub fn advance_to(&self, target: LogicalTime) {
98        self.watermark.fetch_max(target.0, Ordering::SeqCst);
99    }
100
101    /// Advance the watermark by `delta`.
102    pub fn advance_by(&self, delta: LogicalTime) {
103        self.watermark.fetch_add(delta.0, Ordering::SeqCst);
104    }
105
106    /// The current watermark.
107    pub fn watermark(&self) -> LogicalTime {
108        LogicalTime(self.watermark.load(Ordering::SeqCst))
109    }
110}
111
112impl Clock for ManualClock {
113    fn now(&self) -> LogicalTime {
114        self.watermark()
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn manual_clock_is_monotonic() {
124        let c = ManualClock::new();
125        assert_eq!(c.watermark(), LogicalTime::ZERO);
126        c.advance_to(LogicalTime::from_millis(100));
127        assert_eq!(c.now(), LogicalTime::from_millis(100));
128        // Regression is ignored.
129        c.advance_to(LogicalTime::from_millis(50));
130        assert_eq!(c.now(), LogicalTime::from_millis(100));
131        c.advance_by(LogicalTime::from_millis(25));
132        assert_eq!(c.now(), LogicalTime::from_millis(125));
133    }
134
135    #[test]
136    fn clones_share_watermark() {
137        let a = ManualClock::new();
138        let b = a.clone();
139        a.advance_to(LogicalTime::from_nanos(42));
140        assert_eq!(b.now(), LogicalTime::from_nanos(42));
141    }
142
143    #[test]
144    fn system_clock_advances() {
145        let c = SystemClock;
146        let t1 = c.now();
147        assert!(t1.as_nanos() > 0);
148    }
149
150    #[test]
151    fn arc_dyn_clock_dispatches() {
152        let c: Arc<dyn Clock> = Arc::new(ManualClock::at(LogicalTime::from_nanos(7)));
153        assert_eq!(c.now(), LogicalTime::from_nanos(7));
154    }
155}