1use std::{
2 sync::Mutex,
3 time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum HlcError {
13 SystemClockError,
15
16 FutureDrift {
19 received_physical_ns: u64,
20 local_physical_ns: u64,
21 max_drift_ns: u64,
22 },
23
24 LogicalOverflow,
27}
28
29impl std::fmt::Display for HlcError {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 match self {
32 Self::SystemClockError => {
33 write!(f, "system clock error: time is before the Unix epoch")
34 }
35 Self::FutureDrift {
36 received_physical_ns,
37 local_physical_ns,
38 max_drift_ns,
39 } => {
40 let ahead_ms = (received_physical_ns - local_physical_ns) as f64 / 1_000_000.0;
41 let max_ms = *max_drift_ns as f64 / 1_000_000.0;
42 write!(
43 f,
44 "received timestamp is {ahead_ms:.3}ms ahead of local clock \
45 (max allowed drift: {max_ms:.3}ms)"
46 )
47 }
48 Self::LogicalOverflow => write!(f, "logical counter overflow (u32::MAX exceeded)"),
49 }
50 }
51}
52
53impl std::error::Error for HlcError {}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub struct HlcTimestamp {
69 pub physical: u64,
71 pub logical: u32,
73}
74
75impl HlcTimestamp {
76 #[must_use]
80 #[inline]
81 pub fn happened_before(self, other: HlcTimestamp) -> bool {
82 self < other
83 }
84
85 #[must_use]
92 #[inline]
93 pub fn concurrent_with(self, other: HlcTimestamp) -> bool {
94 self == other
95 }
96
97 #[must_use]
103 #[inline]
104 pub fn to_bytes(self) -> [u8; 12] {
105 let mut buf = [0u8; 12];
106 buf[..8].copy_from_slice(&self.physical.to_be_bytes());
107 buf[8..].copy_from_slice(&self.logical.to_be_bytes());
108 buf
109 }
110
111 #[must_use]
113 #[inline]
114 pub fn from_bytes(bytes: [u8; 12]) -> Self {
115 let physical = u64::from_be_bytes(bytes[..8].try_into().unwrap());
116 let logical = u32::from_be_bytes(bytes[8..].try_into().unwrap());
117 Self { physical, logical }
118 }
119}
120
121impl PartialOrd for HlcTimestamp {
122 #[inline]
123 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
124 Some(self.cmp(other))
125 }
126}
127
128impl Ord for HlcTimestamp {
129 #[inline]
130 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131 (self.physical, self.logical).cmp(&(other.physical, other.logical))
132 }
133}
134
135#[derive(Debug)]
140struct HlcState {
141 physical: u64, logical: u32,
143}
144
145#[derive(Debug)]
153pub struct HlcClock {
154 state: Mutex<HlcState>,
155 max_drift_ns: u64,
156}
157
158impl HlcClock {
159 pub fn new() -> Self {
161 Self::with_max_drift(Duration::from_millis(500))
162 }
163
164 pub fn with_max_drift(max_drift: Duration) -> Self {
170 Self {
171 state: Mutex::new(HlcState {
172 physical: 0,
173 logical: 0,
174 }),
175 max_drift_ns: u64::try_from(max_drift.as_nanos()).unwrap_or(u64::MAX),
176 }
177 }
178
179 pub fn now(&self) -> Result<HlcTimestamp, HlcError> {
184 let pt = physical_now()?;
185 let mut s = self.state.lock().expect("HLC state lock poisoned");
186
187 let new_pt = pt.max(s.physical);
188 let new_l = if new_pt == s.physical {
189 s.logical.checked_add(1).ok_or(HlcError::LogicalOverflow)?
190 } else {
191 0
192 };
193
194 s.physical = new_pt;
195 s.logical = new_l;
196
197 Ok(HlcTimestamp {
198 physical: new_pt,
199 logical: new_l,
200 })
201 }
202
203 pub fn recv(&self, msg: HlcTimestamp) -> Result<HlcTimestamp, HlcError> {
213 let pt = physical_now()?;
214
215 if msg.physical > pt.saturating_add(self.max_drift_ns) {
216 return Err(HlcError::FutureDrift {
217 received_physical_ns: msg.physical,
218 local_physical_ns: pt,
219 max_drift_ns: self.max_drift_ns,
220 });
221 }
222
223 let mut s = self.state.lock().expect("HLC state lock poisoned");
224
225 let new_pt = pt.max(s.physical).max(msg.physical);
226
227 let new_l = match (new_pt == s.physical, new_pt == msg.physical) {
228 (true, true) => s
229 .logical
230 .max(msg.logical)
231 .checked_add(1)
232 .ok_or(HlcError::LogicalOverflow)?,
233 (true, false) => s.logical.checked_add(1).ok_or(HlcError::LogicalOverflow)?,
234 (false, true) => msg
235 .logical
236 .checked_add(1)
237 .ok_or(HlcError::LogicalOverflow)?,
238 (false, false) => 0, };
240
241 s.physical = new_pt;
242 s.logical = new_l;
243
244 Ok(HlcTimestamp {
245 physical: new_pt,
246 logical: new_l,
247 })
248 }
249
250 #[must_use]
252 pub fn last(&self) -> HlcTimestamp {
253 let s = self.state.lock().expect("HLC state lock poisoned");
254 HlcTimestamp {
255 physical: s.physical,
256 logical: s.logical,
257 }
258 }
259}
260
261impl Default for HlcClock {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267pub(crate) fn physical_now() -> Result<u64, HlcError> {
272 SystemTime::now()
273 .duration_since(UNIX_EPOCH)
274 .map_err(|_| HlcError::SystemClockError)
275 .and_then(|d| u64::try_from(d.as_nanos()).map_err(|_| HlcError::SystemClockError))
276}
277
278#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
289 fn timestamp_ordering() {
290 let a = HlcTimestamp {
291 physical: 100,
292 logical: 0,
293 };
294 let b = HlcTimestamp {
295 physical: 100,
296 logical: 1,
297 };
298 let c = HlcTimestamp {
299 physical: 200,
300 logical: 0,
301 };
302
303 assert!(a < b);
304 assert!(b < c);
305 assert!(a < c);
306 assert!(a.happened_before(b));
307 assert!(b.happened_before(c));
308 }
309
310 #[test]
311 fn timestamp_concurrent() {
312 let a = HlcTimestamp {
313 physical: 42,
314 logical: 7,
315 };
316 assert!(a.concurrent_with(a));
317 let b = HlcTimestamp {
318 physical: 42,
319 logical: 8,
320 };
321 assert!(!a.concurrent_with(b));
322 }
323
324 #[test]
325 fn timestamp_bytes_roundtrip() {
326 let ts = HlcTimestamp {
327 physical: 1_700_000_000_000_000_000,
328 logical: 12345,
329 };
330 let restored = HlcTimestamp::from_bytes(ts.to_bytes());
331 assert_eq!(ts, restored);
332 }
333
334 #[test]
335 fn bytes_preserve_order() {
336 let a = HlcTimestamp {
337 physical: 100,
338 logical: 5,
339 };
340 let b = HlcTimestamp {
341 physical: 100,
342 logical: 6,
343 };
344 assert!(a.to_bytes() < b.to_bytes());
345
346 let c = HlcTimestamp {
347 physical: 99,
348 logical: 999,
349 };
350 let d = HlcTimestamp {
351 physical: 100,
352 logical: 0,
353 };
354 assert!(c.to_bytes() < d.to_bytes());
355 }
356
357 #[test]
360 fn now_is_monotonic() {
361 let clock = HlcClock::new();
362 let mut prev = clock.now().unwrap();
363 for _ in 0..1000 {
364 let next = clock.now().unwrap();
365 assert!(
366 prev < next,
367 "now() must be strictly monotonic: {prev:?} >= {next:?}"
368 );
369 prev = next;
370 }
371 }
372
373 #[test]
374 fn recv_advances_past_message() {
375 let sender = HlcClock::new();
376 let receiver = HlcClock::new();
377
378 let send_ts = sender.now().unwrap();
379 let recv_ts = receiver.recv(send_ts).unwrap();
380
381 assert!(
382 send_ts.happened_before(recv_ts),
383 "receive timestamp must be after the send timestamp"
384 );
385 }
386
387 #[test]
388 fn recv_preserves_causality_chain() {
389 let a = HlcClock::new();
390 let b = HlcClock::new();
391 let c = HlcClock::new();
392
393 let ts_a = a.now().unwrap();
394 let ts_b = b.recv(ts_a).unwrap(); let ts_c = c.recv(ts_b).unwrap(); assert!(ts_a.happened_before(ts_b));
398 assert!(ts_b.happened_before(ts_c));
399 assert!(ts_a.happened_before(ts_c)); }
401
402 #[test]
403 fn recv_rejects_future_drift() {
404 let clock = HlcClock::with_max_drift(Duration::from_millis(100));
405 let far_future = HlcTimestamp {
406 physical: physical_now().unwrap() + 10_000_000_000, logical: 0,
408 };
409 assert!(matches!(
410 clock.recv(far_future),
411 Err(HlcError::FutureDrift { .. })
412 ));
413 }
414
415 #[test]
416 fn last_does_not_advance_clock() {
417 let clock = HlcClock::new();
418 let ts1 = clock.now().unwrap();
419 let last = clock.last();
420 let ts2 = clock.now().unwrap();
421
422 assert_eq!(last, ts1);
423 assert!(ts1 < ts2);
424 }
425
426 #[test]
427 fn snapshot_isolation_lower_bound() {
428 let writer = HlcClock::new();
431 let reader = HlcClock::new();
432
433 let write_ts = writer.now().unwrap();
434
435 let snapshot_ts = reader.recv(write_ts).unwrap();
437
438 assert!(write_ts.happened_before(snapshot_ts));
441 }
442}