Skip to main content

ash_time/
hlc.rs

1use std::{
2    sync::Mutex,
3    time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6// ---------------------------------------------------------------------------
7// Error
8// ---------------------------------------------------------------------------
9
10/// Errors that can be returned by [`HlcClock`] operations.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum HlcError {
13    /// The system clock returned a time before the Unix epoch.
14    SystemClockError,
15
16    /// A received timestamp's physical component is too far ahead of the local
17    /// wall clock, indicating a misconfigured or malicious peer.
18    FutureDrift {
19        received_physical_ns: u64,
20        local_physical_ns: u64,
21        max_drift_ns: u64,
22    },
23
24    /// The logical counter would overflow u32::MAX.
25    /// Practically impossible; indicates an extreme event burst.
26    LogicalOverflow,
27}
28
29impl std::fmt::Display for HlcError {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        match self {
32            Self::SystemClockError => {
33                write!(f, "system clock error: time is before the Unix epoch")
34            }
35            Self::FutureDrift {
36                received_physical_ns,
37                local_physical_ns,
38                max_drift_ns,
39            } => {
40                let ahead_ms = (received_physical_ns - local_physical_ns) as f64 / 1_000_000.0;
41                let max_ms = *max_drift_ns as f64 / 1_000_000.0;
42                write!(
43                    f,
44                    "received timestamp is {ahead_ms:.3}ms ahead of local clock \
45                     (max allowed drift: {max_ms:.3}ms)"
46                )
47            }
48            Self::LogicalOverflow => write!(f, "logical counter overflow (u32::MAX exceeded)"),
49        }
50    }
51}
52
53impl std::error::Error for HlcError {}
54
55// ---------------------------------------------------------------------------
56// HlcTimestamp
57// ---------------------------------------------------------------------------
58
59/// A Hybrid Logical Clock timestamp.
60///
61/// Totally ordered: first by [`physical`](Self::physical) (nanoseconds since
62/// Unix epoch), then by [`logical`](Self::logical) to break ties within the
63/// same nanosecond.
64///
65/// The `Ord` / `PartialOrd` implementations reflect causal order: a smaller
66/// timestamp happened (or could have happened) before a larger one.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub struct HlcTimestamp {
69    /// Wall-clock component: nanoseconds since the Unix epoch.
70    pub physical: u64,
71    /// Logical counter: incremented when the physical component is tied.
72    pub logical: u32,
73}
74
75impl HlcTimestamp {
76    /// Returns `true` if `self` causally happened before `other`.
77    ///
78    /// Equivalent to `self < other`.
79    #[must_use]
80    #[inline]
81    pub fn happened_before(self, other: HlcTimestamp) -> bool {
82        self < other
83    }
84
85    /// Returns `true` if `self` and `other` are concurrent, i.e. neither
86    /// happened before the other.
87    ///
88    /// In HLC, two timestamps are concurrent only when they are identical
89    /// — meaning they were produced by different nodes without communication
90    /// at precisely the same `(physical, logical)` point.
91    #[must_use]
92    #[inline]
93    pub fn concurrent_with(self, other: HlcTimestamp) -> bool {
94        self == other
95    }
96
97    /// Serialise to a 12-byte big-endian representation suitable for
98    /// embedding in RPC headers or log entries.
99    ///
100    /// The byte layout preserves the total order: a lexicographic comparison
101    /// of the byte arrays gives the same result as `Ord`.
102    #[must_use]
103    #[inline]
104    pub fn to_bytes(self) -> [u8; 12] {
105        let mut buf = [0u8; 12];
106        buf[..8].copy_from_slice(&self.physical.to_be_bytes());
107        buf[8..].copy_from_slice(&self.logical.to_be_bytes());
108        buf
109    }
110
111    /// Deserialise from bytes produced by [`to_bytes`](Self::to_bytes).
112    #[must_use]
113    #[inline]
114    pub fn from_bytes(bytes: [u8; 12]) -> Self {
115        let physical = u64::from_be_bytes(bytes[..8].try_into().unwrap());
116        let logical = u32::from_be_bytes(bytes[8..].try_into().unwrap());
117        Self { physical, logical }
118    }
119}
120
121impl PartialOrd for HlcTimestamp {
122    #[inline]
123    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
124        Some(self.cmp(other))
125    }
126}
127
128impl Ord for HlcTimestamp {
129    #[inline]
130    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131        (self.physical, self.logical).cmp(&(other.physical, other.logical))
132    }
133}
134
135// ---------------------------------------------------------------------------
136// HlcClock
137// ---------------------------------------------------------------------------
138
139#[derive(Debug)]
140struct HlcState {
141    physical: u64, // max physical ns seen so far
142    logical: u32,
143}
144
145/// A thread-safe Hybrid Logical Clock.
146///
147/// Create one per process and share it across threads via `Arc`. Every call to
148/// [`now`](Self::now) produces a timestamp strictly greater than the last, and
149/// every call to [`recv`](Self::recv) advances the clock past an incoming
150/// remote timestamp — guaranteeing that your local clock always reflects
151/// what it has "seen".
152#[derive(Debug)]
153pub struct HlcClock {
154    state: Mutex<HlcState>,
155    max_drift_ns: u64,
156}
157
158impl HlcClock {
159    /// Create a new clock with the default maximum drift (500 ms).
160    pub fn new() -> Self {
161        Self::with_max_drift(Duration::from_millis(500))
162    }
163
164    /// Create a new clock with a custom maximum drift.
165    ///
166    /// Received timestamps whose physical component is more than `max_drift`
167    /// ahead of the local wall clock will be rejected with
168    /// [`HlcError::FutureDrift`].
169    pub fn with_max_drift(max_drift: Duration) -> Self {
170        Self {
171            state: Mutex::new(HlcState {
172                physical: 0,
173                logical: 0,
174            }),
175            max_drift_ns: u64::try_from(max_drift.as_nanos()).unwrap_or(u64::MAX),
176        }
177    }
178
179    /// Generate a new timestamp for a local event or an outgoing message.
180    ///
181    /// The returned timestamp is strictly greater than every timestamp
182    /// previously produced by this clock instance.
183    pub fn now(&self) -> Result<HlcTimestamp, HlcError> {
184        let pt = physical_now()?;
185        let mut s = self.state.lock().expect("HLC state lock poisoned");
186
187        let new_pt = pt.max(s.physical);
188        let new_l = if new_pt == s.physical {
189            s.logical.checked_add(1).ok_or(HlcError::LogicalOverflow)?
190        } else {
191            0
192        };
193
194        s.physical = new_pt;
195        s.logical = new_l;
196
197        Ok(HlcTimestamp {
198            physical: new_pt,
199            logical: new_l,
200        })
201    }
202
203    /// Advance the clock upon receiving a message stamped with `msg`.
204    ///
205    /// Returns the updated local timestamp. Attach this to any reply, or use
206    /// it as the lower-bound for a consistent snapshot read.
207    ///
208    /// # Errors
209    ///
210    /// Returns [`HlcError::FutureDrift`] if `msg.physical` is more than
211    /// `max_drift` ahead of the local wall clock.
212    pub fn recv(&self, msg: HlcTimestamp) -> Result<HlcTimestamp, HlcError> {
213        let pt = physical_now()?;
214
215        if msg.physical > pt.saturating_add(self.max_drift_ns) {
216            return Err(HlcError::FutureDrift {
217                received_physical_ns: msg.physical,
218                local_physical_ns: pt,
219                max_drift_ns: self.max_drift_ns,
220            });
221        }
222
223        let mut s = self.state.lock().expect("HLC state lock poisoned");
224
225        let new_pt = pt.max(s.physical).max(msg.physical);
226
227        let new_l = match (new_pt == s.physical, new_pt == msg.physical) {
228            (true, true) => s
229                .logical
230                .max(msg.logical)
231                .checked_add(1)
232                .ok_or(HlcError::LogicalOverflow)?,
233            (true, false) => s.logical.checked_add(1).ok_or(HlcError::LogicalOverflow)?,
234            (false, true) => msg
235                .logical
236                .checked_add(1)
237                .ok_or(HlcError::LogicalOverflow)?,
238            (false, false) => 0, // wall clock advanced past both
239        };
240
241        s.physical = new_pt;
242        s.logical = new_l;
243
244        Ok(HlcTimestamp {
245            physical: new_pt,
246            logical: new_l,
247        })
248    }
249
250    /// Return the last recorded timestamp without advancing the clock.
251    #[must_use]
252    pub fn last(&self) -> HlcTimestamp {
253        let s = self.state.lock().expect("HLC state lock poisoned");
254        HlcTimestamp {
255            physical: s.physical,
256            logical: s.logical,
257        }
258    }
259}
260
261impl Default for HlcClock {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267// ---------------------------------------------------------------------------
268// Internal helpers
269// ---------------------------------------------------------------------------
270
271pub(crate) fn physical_now() -> Result<u64, HlcError> {
272    SystemTime::now()
273        .duration_since(UNIX_EPOCH)
274        .map_err(|_| HlcError::SystemClockError)
275        .and_then(|d| u64::try_from(d.as_nanos()).map_err(|_| HlcError::SystemClockError))
276}
277
278// ---------------------------------------------------------------------------
279// Tests
280// ---------------------------------------------------------------------------
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    // --- HlcTimestamp -------------------------------------------------------
287
288    #[test]
289    fn timestamp_ordering() {
290        let a = HlcTimestamp {
291            physical: 100,
292            logical: 0,
293        };
294        let b = HlcTimestamp {
295            physical: 100,
296            logical: 1,
297        };
298        let c = HlcTimestamp {
299            physical: 200,
300            logical: 0,
301        };
302
303        assert!(a < b);
304        assert!(b < c);
305        assert!(a < c);
306        assert!(a.happened_before(b));
307        assert!(b.happened_before(c));
308    }
309
310    #[test]
311    fn timestamp_concurrent() {
312        let a = HlcTimestamp {
313            physical: 42,
314            logical: 7,
315        };
316        assert!(a.concurrent_with(a));
317        let b = HlcTimestamp {
318            physical: 42,
319            logical: 8,
320        };
321        assert!(!a.concurrent_with(b));
322    }
323
324    #[test]
325    fn timestamp_bytes_roundtrip() {
326        let ts = HlcTimestamp {
327            physical: 1_700_000_000_000_000_000,
328            logical: 12345,
329        };
330        let restored = HlcTimestamp::from_bytes(ts.to_bytes());
331        assert_eq!(ts, restored);
332    }
333
334    #[test]
335    fn bytes_preserve_order() {
336        let a = HlcTimestamp {
337            physical: 100,
338            logical: 5,
339        };
340        let b = HlcTimestamp {
341            physical: 100,
342            logical: 6,
343        };
344        assert!(a.to_bytes() < b.to_bytes());
345
346        let c = HlcTimestamp {
347            physical: 99,
348            logical: 999,
349        };
350        let d = HlcTimestamp {
351            physical: 100,
352            logical: 0,
353        };
354        assert!(c.to_bytes() < d.to_bytes());
355    }
356
357    // --- HlcClock -----------------------------------------------------------
358
359    #[test]
360    fn now_is_monotonic() {
361        let clock = HlcClock::new();
362        let mut prev = clock.now().unwrap();
363        for _ in 0..1000 {
364            let next = clock.now().unwrap();
365            assert!(
366                prev < next,
367                "now() must be strictly monotonic: {prev:?} >= {next:?}"
368            );
369            prev = next;
370        }
371    }
372
373    #[test]
374    fn recv_advances_past_message() {
375        let sender = HlcClock::new();
376        let receiver = HlcClock::new();
377
378        let send_ts = sender.now().unwrap();
379        let recv_ts = receiver.recv(send_ts).unwrap();
380
381        assert!(
382            send_ts.happened_before(recv_ts),
383            "receive timestamp must be after the send timestamp"
384        );
385    }
386
387    #[test]
388    fn recv_preserves_causality_chain() {
389        let a = HlcClock::new();
390        let b = HlcClock::new();
391        let c = HlcClock::new();
392
393        let ts_a = a.now().unwrap();
394        let ts_b = b.recv(ts_a).unwrap(); // b receives from a
395        let ts_c = c.recv(ts_b).unwrap(); // c receives from b
396
397        assert!(ts_a.happened_before(ts_b));
398        assert!(ts_b.happened_before(ts_c));
399        assert!(ts_a.happened_before(ts_c)); // transitivity
400    }
401
402    #[test]
403    fn recv_rejects_future_drift() {
404        let clock = HlcClock::with_max_drift(Duration::from_millis(100));
405        let far_future = HlcTimestamp {
406            physical: physical_now().unwrap() + 10_000_000_000, // 10 s ahead
407            logical: 0,
408        };
409        assert!(matches!(
410            clock.recv(far_future),
411            Err(HlcError::FutureDrift { .. })
412        ));
413    }
414
415    #[test]
416    fn last_does_not_advance_clock() {
417        let clock = HlcClock::new();
418        let ts1 = clock.now().unwrap();
419        let last = clock.last();
420        let ts2 = clock.now().unwrap();
421
422        assert_eq!(last, ts1);
423        assert!(ts1 < ts2);
424    }
425
426    #[test]
427    fn snapshot_isolation_lower_bound() {
428        // Simulate: writer stamps a write, reader must observe it if
429        // the reader's snapshot timestamp >= write timestamp.
430        let writer = HlcClock::new();
431        let reader = HlcClock::new();
432
433        let write_ts = writer.now().unwrap();
434
435        // Reader learns about the write (e.g. via replication message).
436        let snapshot_ts = reader.recv(write_ts).unwrap();
437
438        // Any read with snapshot_ts will see the write because
439        // write_ts happened before snapshot_ts.
440        assert!(write_ts.happened_before(snapshot_ts));
441    }
442}