1use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
25pub struct LogicalTime(pub u64);
26
27impl LogicalTime {
28 pub const ZERO: LogicalTime = LogicalTime(0);
30
31 pub const fn from_nanos(nanos: u64) -> Self {
33 LogicalTime(nanos)
34 }
35
36 pub const fn from_millis(millis: u64) -> Self {
38 LogicalTime(millis.saturating_mul(1_000_000))
39 }
40
41 pub const fn as_nanos(self) -> u64 {
43 self.0
44 }
45}
46
47pub trait Clock: Send + Sync {
52 fn now(&self) -> LogicalTime;
54}
55
56impl<C: Clock + ?Sized> Clock for Arc<C> {
57 fn now(&self) -> LogicalTime {
58 (**self).now()
59 }
60}
61
62#[derive(Debug, Clone, Copy, Default)]
64pub struct SystemClock;
65
66impl Clock for SystemClock {
67 fn now(&self) -> LogicalTime {
68 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
69 LogicalTime(nanos)
70 }
71}
72
73#[derive(Debug, Clone, Default)]
80pub struct ManualClock {
81 watermark: Arc<AtomicU64>,
82}
83
84impl ManualClock {
85 pub fn new() -> Self {
87 Self { watermark: Arc::new(AtomicU64::new(0)) }
88 }
89
90 pub fn at(start: LogicalTime) -> Self {
92 Self { watermark: Arc::new(AtomicU64::new(start.0)) }
93 }
94
95 pub fn advance_to(&self, target: LogicalTime) {
98 self.watermark.fetch_max(target.0, Ordering::SeqCst);
99 }
100
101 pub fn advance_by(&self, delta: LogicalTime) {
103 self.watermark.fetch_add(delta.0, Ordering::SeqCst);
104 }
105
106 pub fn watermark(&self) -> LogicalTime {
108 LogicalTime(self.watermark.load(Ordering::SeqCst))
109 }
110}
111
112impl Clock for ManualClock {
113 fn now(&self) -> LogicalTime {
114 self.watermark()
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn manual_clock_is_monotonic() {
124 let c = ManualClock::new();
125 assert_eq!(c.watermark(), LogicalTime::ZERO);
126 c.advance_to(LogicalTime::from_millis(100));
127 assert_eq!(c.now(), LogicalTime::from_millis(100));
128 c.advance_to(LogicalTime::from_millis(50));
130 assert_eq!(c.now(), LogicalTime::from_millis(100));
131 c.advance_by(LogicalTime::from_millis(25));
132 assert_eq!(c.now(), LogicalTime::from_millis(125));
133 }
134
135 #[test]
136 fn clones_share_watermark() {
137 let a = ManualClock::new();
138 let b = a.clone();
139 a.advance_to(LogicalTime::from_nanos(42));
140 assert_eq!(b.now(), LogicalTime::from_nanos(42));
141 }
142
143 #[test]
144 fn system_clock_advances() {
145 let c = SystemClock;
146 let t1 = c.now();
147 assert!(t1.as_nanos() > 0);
148 }
149
150 #[test]
151 fn arc_dyn_clock_dispatches() {
152 let c: Arc<dyn Clock> = Arc::new(ManualClock::at(LogicalTime::from_nanos(7)));
153 assert_eq!(c.now(), LogicalTime::from_nanos(7));
154 }
155}