1use std::sync::atomic::{AtomicI64, Ordering};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4#[derive(Clone, Copy, PartialEq, Eq, Hash)]
17pub struct HlcTimestamp {
18 wall_time: i64,
19 logical: i32,
20}
21
22pub const HLC_TIMESTAMP_SIZE: usize = 12;
23
24impl HlcTimestamp {
25 pub const ZERO: Self = Self {
26 wall_time: 0,
27 logical: 0,
28 };
29
30 #[inline]
31 pub fn new(wall_time_ns: i64, logical: i32) -> Self {
32 Self {
33 wall_time: wall_time_ns,
34 logical,
35 }
36 }
37
38 #[inline]
39 pub fn wall_time(&self) -> i64 {
40 self.wall_time
41 }
42
43 #[inline]
44 pub fn logical(&self) -> i32 {
45 self.logical
46 }
47
48 #[inline]
50 pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
51 let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
52 buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
53 buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
54 buf
55 }
56
57 #[inline]
58 pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
59 Self {
60 wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
61 logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
62 }
63 }
64
65 #[inline]
66 pub fn is_zero(&self) -> bool {
67 self.wall_time == 0 && self.logical == 0
68 }
69}
70
71impl PartialOrd for HlcTimestamp {
72 #[inline]
73 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
74 Some(self.cmp(other))
75 }
76}
77
78impl Ord for HlcTimestamp {
79 #[inline]
80 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 self.wall_time
82 .cmp(&other.wall_time)
83 .then(self.logical.cmp(&other.logical))
84 }
85}
86
87impl std::fmt::Debug for HlcTimestamp {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
90 }
91}
92
93impl std::fmt::Display for HlcTimestamp {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(f, "{}:{}", self.wall_time, self.logical)
96 }
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum ClockError {
101 #[error(
102 "clock drift exceeded: remote_wall_time={remote_ns}ns, \
103 physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
104 )]
105 ClockDriftExceeded {
106 remote_ns: i64,
107 physical_ns: i64,
108 max_drift_ns: i64,
109 },
110
111 #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
112 CounterOverflow,
113}
114
115pub trait PhysicalClock: Send {
119 fn now_ns(&self) -> i64;
121}
122
123pub struct SystemClock;
129
130impl PhysicalClock for SystemClock {
131 fn now_ns(&self) -> i64 {
132 SystemTime::now()
133 .duration_since(UNIX_EPOCH)
134 .expect("system clock before UNIX epoch")
135 .as_nanos() as i64
136 }
137}
138
139pub struct ManualClock {
140 time_ns: AtomicI64,
141}
142
143impl ManualClock {
144 pub fn new(initial_ns: i64) -> Self {
145 Self {
146 time_ns: AtomicI64::new(initial_ns),
147 }
148 }
149
150 pub fn set(&self, time_ns: i64) {
151 self.time_ns.store(time_ns, Ordering::SeqCst);
152 }
153
154 pub fn advance(&self, delta_ns: i64) {
155 self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
156 }
157}
158
159impl PhysicalClock for ManualClock {
160 fn now_ns(&self) -> i64 {
161 self.time_ns.load(Ordering::SeqCst)
162 }
163}
164
165const SECOND_NS: i64 = 1_000_000_000;
166const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
167
168pub struct HlcClock<C: PhysicalClock = SystemClock> {
179 last: HlcTimestamp,
180 max_drift_ns: i64,
181 clock: C,
182}
183
184impl Default for HlcClock<SystemClock> {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190impl HlcClock<SystemClock> {
191 pub fn new() -> Self {
192 Self {
193 last: HlcTimestamp::ZERO,
194 max_drift_ns: DEFAULT_MAX_DRIFT_NS,
195 clock: SystemClock,
196 }
197 }
198}
199
200impl<C: PhysicalClock> HlcClock<C> {
201 pub fn with_clock(clock: C) -> Self {
202 Self {
203 last: HlcTimestamp::ZERO,
204 max_drift_ns: DEFAULT_MAX_DRIFT_NS,
205 clock,
206 }
207 }
208
209 pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
210 self.max_drift_ns = max_drift_ns;
211 }
212
213 pub fn set_last(&mut self, ts: HlcTimestamp) {
215 self.last = ts;
216 }
217
218 pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
220 let pt = self.clock.now_ns();
221
222 let ts = if self.last.wall_time >= pt {
223 let new_logical = self
226 .last
227 .logical
228 .checked_add(1)
229 .ok_or(ClockError::CounterOverflow)?;
230
231 self.check_drift(self.last.wall_time, pt)?;
232
233 HlcTimestamp::new(self.last.wall_time, new_logical)
234 } else {
235 HlcTimestamp::new(pt, 0)
237 };
238
239 self.last = ts;
240 Ok(ts)
241 }
242
243 pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
245 let pt = self.clock.now_ns();
246
247 if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
249 return Err(ClockError::ClockDriftExceeded {
250 remote_ns: remote.wall_time,
251 physical_ns: pt,
252 max_drift_ns: self.max_drift_ns,
253 });
254 }
255
256 if remote.wall_time > self.last.wall_time {
257 self.last = remote;
259 } else if remote.wall_time == self.last.wall_time {
260 if remote.logical > self.last.logical {
262 self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
263 }
264 }
265 Ok(())
268 }
269
270 pub fn last_timestamp(&self) -> HlcTimestamp {
271 self.last
272 }
273
274 pub fn physical_clock(&self) -> &C {
275 &self.clock
276 }
277
278 fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
279 if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
280 return Err(ClockError::ClockDriftExceeded {
281 remote_ns: wall_time_ns,
282 physical_ns,
283 max_drift_ns: self.max_drift_ns,
284 });
285 }
286 Ok(())
287 }
288}
289
290#[cfg(test)]
291#[path = "hlc_tests.rs"]
292mod tests;