Skip to main content

yeti_types/
hlc.rs

1//! Hybrid Logical Clock (HLC) — replication timestamp primitive.
2//!
3//! HLC is the foundational ordering primitive for yeti's multi-master
4//! CRDT data plane. Every log entry is stamped with an HLC and applied
5//! last-to-arrive. The richer mode (planned) stores per-record HLCs in
6//! an `__hlc_meta` column family and switches the receive path to
7//! HLC-LWW for plain fields plus CRDT merge for `@crdt`-marked fields.
8//!
9//! ## Why HLC instead of physical or logical alone
10//!
11//! - **Physical timestamps** (wall clock) drift between nodes and can
12//!   regress if the clock is adjusted. Two writes with the same wall-
13//!   clock millisecond can't be ordered.
14//! - **Logical clocks** (Lamport) preserve causal order but lose any
15//!   relationship to real time, so you can't say "the most recent
16//!   write" in a way humans understand.
17//! - **HLC** (Kulkarni et al. 2014) combines both: a physical
18//!   component bounded by wall-clock that advances monotonically, plus
19//!   a logical counter that breaks ties when physical time stalls or
20//!   regresses.
21//!
22//! ## Wire shape
23//!
24//! An `HlcTimestamp` is a `u64` packed:
25//! - High 48 bits: physical time, milliseconds since UNIX epoch
26//!   (sufficient for ~8900 years past 1970)
27//! - Low 16 bits: logical counter (65,535 events at the same
28//!   physical millisecond before overflow — at which point physical
29//!   time will have advanced)
30//!
31//! Total ordering on `HlcTimestamp` is the natural u64 ordering.
32//! Two timestamps from different nodes that happen to collide are
33//! broken by appending `node_id` at the call site (the HLC itself is
34//! node-agnostic; the node-id tie-break is part of the replication
35//! envelope, not the timestamp).
36//!
37//! ## Skew tolerance
38//!
39//! HLC-LWW assumes bounded clock skew between nodes. A peer whose
40//! wall clock is far ahead of ours could write timestamps that look
41//! arbitrarily-in-the-future from our perspective; if we accept them
42//! into our HLC state, subsequent local writes will be perpetually
43//! "behind" until our wall clock catches up. The skew tolerance
44//! caps how far in the future we accept incoming timestamps.
45//!
46//! Default 250ms (CockroachDB precedent). Configurable per-cluster
47//! via `replication.hlc_skew_tolerance_ms` in `yeti-config.yaml`.
48
49use std::sync::Mutex;
50use std::time::{SystemTime, UNIX_EPOCH};
51
52use serde::{Deserialize, Serialize};
53
54/// Default HLC skew tolerance — the maximum amount we'll let a peer's
55/// timestamp exceed our local wall clock before rejecting it.
56///
57/// Configurable per-cluster via `yeti-config.yaml`
58/// (`replication.hlc_skew_tolerance_ms`). 250ms is the `CockroachDB`
59/// precedent — tight enough to limit poisoning damage from a badly-
60/// skewed peer, loose enough to absorb normal NTP-disciplined drift.
61pub const DEFAULT_SKEW_TOLERANCE_MS: u64 = 250;
62
63const PHYSICAL_BITS: u32 = 48;
64const LOGICAL_BITS: u32 = 16;
65const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
66const PHYSICAL_MASK: u64 = !LOGICAL_MASK;
67
68/// A 64-bit hybrid logical clock timestamp.
69///
70/// Layout: `[physical_ms : 48][logical : 16]`. Total ordering is the
71/// natural `u64` ordering on the packed value. Tie-breaking across
72/// nodes (when two HLC values are equal) is the caller's concern —
73/// the replication envelope appends `node_id` as a secondary key.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
75#[serde(transparent)]
76pub struct HlcTimestamp(u64);
77
78impl HlcTimestamp {
79    /// Zero / sentinel value. Less than any real timestamp; useful
80    /// for "no last-applied yet" cases.
81    pub const ZERO: Self = Self(0);
82
83    /// The maximum representable timestamp.
84    pub const MAX: Self = Self(u64::MAX);
85
86    /// Compose from physical milliseconds and logical counter.
87    ///
88    /// Panics if `physical_ms` overflows 48 bits (year ~10880) or
89    /// `logical` overflows 16 bits.
90    #[must_use]
91    pub const fn new(physical_ms: u64, logical: u16) -> Self {
92        assert!(
93            physical_ms < (1 << PHYSICAL_BITS),
94            "HLC physical component exceeds 48 bits"
95        );
96        Self((physical_ms << LOGICAL_BITS) | (logical as u64))
97    }
98
99    /// Physical milliseconds since UNIX epoch.
100    #[must_use]
101    pub const fn physical_ms(&self) -> u64 {
102        (self.0 & PHYSICAL_MASK) >> LOGICAL_BITS
103    }
104
105    /// Logical counter component.
106    #[must_use]
107    pub const fn logical(&self) -> u16 {
108        (self.0 & LOGICAL_MASK) as u16
109    }
110
111    /// Underlying packed `u64`. Useful for storage / wire formats
112    /// that want a single integer.
113    #[must_use]
114    pub const fn as_u64(&self) -> u64 {
115        self.0
116    }
117
118    /// Reconstruct from the packed `u64`.
119    #[must_use]
120    pub const fn from_u64(packed: u64) -> Self {
121        Self(packed)
122    }
123}
124
125impl std::fmt::Display for HlcTimestamp {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(f, "{}.{}", self.physical_ms(), self.logical())
128    }
129}
130
131/// Error from [`HybridLogicalClock::update`] when an incoming
132/// timestamp exceeds the configured skew tolerance.
133#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
134#[error(
135    "HLC skew tolerance exceeded: remote {remote_physical_ms}ms > local {local_physical_ms}ms + {tolerance_ms}ms tolerance"
136)]
137pub struct SkewExceeded {
138    /// The remote timestamp's physical component (ms since UNIX epoch).
139    pub remote_physical_ms: u64,
140    /// The local wall clock's reading at the time of the rejection.
141    pub local_physical_ms: u64,
142    /// The skew-tolerance bound that was exceeded.
143    pub tolerance_ms: u64,
144}
145
146/// A node-local hybrid logical clock.
147///
148/// One instance per node. Thread-safe (internal `Mutex` on the
149/// last-emitted timestamp). All HLC reads / updates go through this
150/// type so the invariant "every emitted timestamp is strictly greater
151/// than every previously-emitted timestamp on this node" is
152/// preserved.
153///
154/// ## Usage
155///
156/// - [`now`](Self::now) — call before writing a record locally. The
157///   returned timestamp is guaranteed strictly greater than any
158///   previously-emitted timestamp by this clock.
159/// - [`update`](Self::update) — call when receiving a record from a
160///   peer. The returned timestamp is greater than both the local
161///   last-emitted and the incoming remote timestamp. Errors with
162///   [`SkewExceeded`] if the remote's physical component is more
163///   than `skew_tolerance_ms` ahead of our wall clock.
164/// - [`last_emitted`](Self::last_emitted) — current high-water mark;
165///   used by replication for catch-up bookmarks.
166pub struct HybridLogicalClock {
167    /// The last timestamp this clock emitted. Strictly monotonic.
168    last: Mutex<HlcTimestamp>,
169    /// Wall-clock source. Injected for testability; production uses
170    /// [`SystemTime::now`].
171    wall_clock: Box<dyn Fn() -> u64 + Send + Sync>,
172    /// Maximum allowed `(remote_physical - local_physical)` before we
173    /// reject the incoming timestamp.
174    skew_tolerance_ms: u64,
175}
176
177impl std::fmt::Debug for HybridLogicalClock {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        let last = self.last.lock().map_or(HlcTimestamp::ZERO, |g| *g);
180        f.debug_struct("HybridLogicalClock")
181            .field("last_emitted", &last)
182            .field("skew_tolerance_ms", &self.skew_tolerance_ms)
183            .field("wall_clock", &"<dyn Fn() -> u64>")
184            .finish()
185    }
186}
187
188impl HybridLogicalClock {
189    /// Construct with default wall clock and default skew tolerance.
190    #[must_use]
191    pub fn new() -> Self {
192        Self::with_skew_tolerance_ms(DEFAULT_SKEW_TOLERANCE_MS)
193    }
194
195    /// Construct with a custom skew-tolerance bound.
196    #[must_use]
197    pub fn with_skew_tolerance_ms(skew_tolerance_ms: u64) -> Self {
198        Self {
199            last: Mutex::new(HlcTimestamp::ZERO),
200            wall_clock: Box::new(default_wall_clock_ms),
201            skew_tolerance_ms,
202        }
203    }
204
205    /// Construct with an injected wall clock (for testing). The clock
206    /// must return milliseconds since some fixed epoch.
207    #[must_use]
208    pub fn with_wall_clock<F>(wall_clock: F, skew_tolerance_ms: u64) -> Self
209    where
210        F: Fn() -> u64 + Send + Sync + 'static,
211    {
212        Self {
213            last: Mutex::new(HlcTimestamp::ZERO),
214            wall_clock: Box::new(wall_clock),
215            skew_tolerance_ms,
216        }
217    }
218
219    /// Emit a fresh timestamp. Strictly greater than every previously-
220    /// emitted timestamp by this clock.
221    ///
222    /// Algorithm:
223    /// 1. Read wall clock `wall`.
224    /// 2. If `wall > last.physical`, return `(wall, 0)` and update.
225    /// 3. Else, return `(last.physical, last.logical + 1)` and update
226    ///    (logical-counter advance because wall clock didn't move
227    ///    past the last emit).
228    ///
229    /// Logical overflow (65,536 events at the same wall-clock
230    /// millisecond) panics — in practice this requires emitting
231    /// 65,536 timestamps within ~1ms, which would mean a writer is
232    /// generating > 65M events/sec on a single clock; a real
233    /// scenario at that rate needs a re-architecture, not a wider
234    /// counter.
235    pub fn now(&self) -> HlcTimestamp {
236        let wall = (self.wall_clock)();
237        // Recover the inner value on poison: HLC monotonicity is preserved
238        // even if a panicking writer held the lock — the last-emitted
239        // timestamp is still the high-water mark and remains valid.
240        let mut last = self
241            .last
242            .lock()
243            .unwrap_or_else(std::sync::PoisonError::into_inner);
244        let new = if wall > last.physical_ms() {
245            HlcTimestamp::new(wall, 0)
246        } else {
247            // Wall didn't advance — bump logical. Overflow at >65k events
248            // in one millisecond is a documented unrecoverable state
249            // (single writer generating >65M events/sec); the panic
250            // message is the intended failure mode.
251            #[allow(clippy::expect_used)]
252            let next_logical = last
253                .logical()
254                .checked_add(1)
255                .expect("HLC logical counter overflow within a single ms");
256            HlcTimestamp::new(last.physical_ms(), next_logical)
257        };
258        *last = new;
259        new
260    }
261
262    /// Integrate a remote timestamp and emit a fresh local timestamp
263    /// strictly greater than both the local state and the remote.
264    ///
265    /// Algorithm:
266    /// 1. Read wall clock `wall`.
267    /// 2. Reject if `remote.physical > wall + skew_tolerance`. This
268    ///    prevents a misbehaving peer from poisoning our HLC state
269    ///    with a far-future timestamp.
270    /// 3. Compute `new_physical = max(local.physical, remote.physical, wall)`.
271    /// 4. Compute `new_logical`:
272    ///    - both `local.physical` and `remote.physical` == `new_physical`:
273    ///      `max(local.logical, remote.logical) + 1`
274    ///    - only `local.physical` == `new_physical`: `local.logical + 1`
275    ///    - only `remote.physical` == `new_physical`: `remote.logical + 1`
276    ///    - neither (wall is new winner): `0`
277    /// 5. Update local and return.
278    ///
279    /// # Errors
280    ///
281    /// Returns [`SkewExceeded`] when the remote's physical component
282    /// is more than `skew_tolerance_ms` ahead of our wall clock.
283    pub fn update(&self, remote: HlcTimestamp) -> Result<HlcTimestamp, SkewExceeded> {
284        let wall = (self.wall_clock)();
285        if remote.physical_ms() > wall.saturating_add(self.skew_tolerance_ms) {
286            return Err(SkewExceeded {
287                remote_physical_ms: remote.physical_ms(),
288                local_physical_ms: wall,
289                tolerance_ms: self.skew_tolerance_ms,
290            });
291        }
292
293        let mut last = self
294            .last
295            .lock()
296            .unwrap_or_else(std::sync::PoisonError::into_inner);
297        let local_phys = last.physical_ms();
298        let remote_phys = remote.physical_ms();
299        let new_phys = local_phys.max(remote_phys).max(wall);
300
301        // Logical overflow paths are documented unrecoverable invariants
302        // (see `now()` for rationale).
303        #[allow(clippy::expect_used)]
304        let new_logical = if new_phys == local_phys && new_phys == remote_phys {
305            last.logical()
306                .max(remote.logical())
307                .checked_add(1)
308                .expect("HLC logical overflow on update (local + remote same ms)")
309        } else if new_phys == local_phys {
310            last.logical()
311                .checked_add(1)
312                .expect("HLC logical overflow on update (local advanced)")
313        } else if new_phys == remote_phys {
314            remote
315                .logical()
316                .checked_add(1)
317                .expect("HLC logical overflow on update (remote advanced)")
318        } else {
319            0
320        };
321
322        let new = HlcTimestamp::new(new_phys, new_logical);
323        *last = new;
324        drop(last);
325        Ok(new)
326    }
327
328    /// Read the current high-water mark without emitting a new
329    /// timestamp. Used by replication to record per-peer `last_applied`
330    /// bookmarks.
331    #[must_use]
332    pub fn last_emitted(&self) -> HlcTimestamp {
333        *self
334            .last
335            .lock()
336            .unwrap_or_else(std::sync::PoisonError::into_inner)
337    }
338
339    /// The skew tolerance this clock was configured with.
340    #[must_use]
341    pub const fn skew_tolerance_ms(&self) -> u64 {
342        self.skew_tolerance_ms
343    }
344}
345
346impl Default for HybridLogicalClock {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352fn default_wall_clock_ms() -> u64 {
353    SystemTime::now()
354        .duration_since(UNIX_EPOCH)
355        // Pre-1970 timestamps shouldn't happen; if the clock is set to
356        // before the epoch, fall back to 0 so we still emit *something*
357        // sensible (the logical counter will dominate).
358        //
359        // u128 → u64 saturates at u64::MAX (~584 million years post-epoch);
360        // realistic wall-clock millis fit in u64 forever.
361        .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
362}
363
364#[cfg(test)]
365#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
366mod tests {
367    use super::*;
368    use std::sync::Arc;
369    use std::sync::atomic::{AtomicU64, Ordering};
370
371    /// Build an HLC with a controllable wall clock.
372    fn new_test_clock(initial_wall_ms: u64) -> (HybridLogicalClock, Arc<AtomicU64>) {
373        let wall = Arc::new(AtomicU64::new(initial_wall_ms));
374        let wall_for_clock = Arc::clone(&wall);
375        let clock = HybridLogicalClock::with_wall_clock(
376            move || wall_for_clock.load(Ordering::SeqCst),
377            DEFAULT_SKEW_TOLERANCE_MS,
378        );
379        (clock, wall)
380    }
381
382    #[test]
383    fn timestamp_packing_round_trip() {
384        let ts = HlcTimestamp::new(1_700_000_000_000, 42);
385        assert_eq!(ts.physical_ms(), 1_700_000_000_000);
386        assert_eq!(ts.logical(), 42);
387        assert_eq!(HlcTimestamp::from_u64(ts.as_u64()), ts);
388    }
389
390    #[test]
391    fn natural_ordering_is_physical_then_logical() {
392        let a = HlcTimestamp::new(100, 5);
393        let b = HlcTimestamp::new(100, 6);
394        let c = HlcTimestamp::new(101, 0);
395        assert!(a < b);
396        assert!(b < c);
397        assert!(a < c);
398    }
399
400    #[test]
401    fn now_advances_monotonically_when_wall_stalls() {
402        let (clock, _wall) = new_test_clock(1000);
403        let t1 = clock.now();
404        let t2 = clock.now();
405        let t3 = clock.now();
406        assert_eq!(t1.physical_ms(), 1000);
407        assert_eq!(t1.logical(), 0);
408        assert_eq!(t2.physical_ms(), 1000);
409        assert_eq!(t2.logical(), 1);
410        assert_eq!(t3.physical_ms(), 1000);
411        assert_eq!(t3.logical(), 2);
412        assert!(t1 < t2);
413        assert!(t2 < t3);
414    }
415
416    #[test]
417    fn now_takes_wall_when_wall_advances() {
418        let (clock, wall) = new_test_clock(1000);
419        let t1 = clock.now();
420        wall.store(2000, Ordering::SeqCst);
421        let t2 = clock.now();
422        assert_eq!(t2.physical_ms(), 2000);
423        assert_eq!(t2.logical(), 0);
424        assert!(t1 < t2);
425    }
426
427    #[test]
428    fn now_holds_at_last_when_wall_regresses() {
429        // Operator adjusts the wall clock backwards (NTP step). The HLC
430        // must NOT regress — the physical component holds at the last-
431        // emitted value and the logical counter advances.
432        let (clock, wall) = new_test_clock(5000);
433        let t1 = clock.now();
434        wall.store(3000, Ordering::SeqCst);
435        let t2 = clock.now();
436        assert_eq!(t1.physical_ms(), 5000);
437        assert_eq!(
438            t2.physical_ms(),
439            5000,
440            "HLC must not regress on wall stepback"
441        );
442        assert_eq!(t2.logical(), 1);
443        assert!(t1 < t2);
444    }
445
446    #[test]
447    fn update_with_strictly_greater_remote() {
448        // Wall starts at 1000; we advance it to 2000 so a remote at
449        // (2000, 5) is inside the skew tolerance window.
450        let (clock, wall) = new_test_clock(1000);
451        let _ = clock.now(); // local at (1000, 0)
452        wall.store(2000, Ordering::SeqCst);
453        let remote = HlcTimestamp::new(2000, 5);
454        let merged = clock.update(remote).unwrap();
455        assert_eq!(merged.physical_ms(), 2000);
456        assert_eq!(
457            merged.logical(),
458            6,
459            "remote.logical + 1 when remote phys wins"
460        );
461    }
462
463    #[test]
464    fn update_with_equal_physical_takes_max_logical_plus_one() {
465        let (clock, _wall) = new_test_clock(1000);
466        let _ = clock.now();
467        let _ = clock.now(); // local at (1000, 1)
468        let remote = HlcTimestamp::new(1000, 10);
469        let merged = clock.update(remote).unwrap();
470        assert_eq!(merged.physical_ms(), 1000);
471        assert_eq!(merged.logical(), 11);
472    }
473
474    #[test]
475    fn update_with_lesser_remote_advances_local_logical() {
476        let (clock, _wall) = new_test_clock(5000);
477        let _ = clock.now(); // local (5000, 0)
478        let remote = HlcTimestamp::new(3000, 0);
479        let merged = clock.update(remote).unwrap();
480        // local.physical > remote.physical, so new_phys = local.physical;
481        // new_logical = local.logical + 1.
482        assert_eq!(merged.physical_ms(), 5000);
483        assert_eq!(merged.logical(), 1);
484    }
485
486    #[test]
487    fn update_with_wall_winning_resets_logical() {
488        let (clock, wall) = new_test_clock(1000);
489        let _ = clock.now(); // local (1000, 0)
490        wall.store(10_000, Ordering::SeqCst);
491        let remote = HlcTimestamp::new(2000, 99);
492        let merged = clock.update(remote).unwrap();
493        // wall(10000) > local(1000), wall > remote(2000) → new_phys = wall, logical = 0.
494        assert_eq!(merged.physical_ms(), 10_000);
495        assert_eq!(merged.logical(), 0);
496    }
497
498    #[test]
499    fn update_rejects_remote_beyond_skew_tolerance() {
500        let (clock, _wall) = new_test_clock(1000);
501        let far_future = HlcTimestamp::new(1000 + DEFAULT_SKEW_TOLERANCE_MS + 1, 0);
502        let err = clock.update(far_future).unwrap_err();
503        assert_eq!(err.remote_physical_ms, 1000 + DEFAULT_SKEW_TOLERANCE_MS + 1);
504        assert_eq!(err.local_physical_ms, 1000);
505        assert_eq!(err.tolerance_ms, DEFAULT_SKEW_TOLERANCE_MS);
506    }
507
508    #[test]
509    fn update_accepts_remote_at_skew_tolerance_boundary() {
510        let (clock, _wall) = new_test_clock(1000);
511        // Exactly at the boundary: remote_phys == wall + tolerance.
512        let at_boundary = HlcTimestamp::new(1000 + DEFAULT_SKEW_TOLERANCE_MS, 0);
513        let merged = clock.update(at_boundary).unwrap();
514        assert_eq!(merged.physical_ms(), 1000 + DEFAULT_SKEW_TOLERANCE_MS);
515    }
516
517    #[test]
518    fn skew_tolerance_is_configurable() {
519        // A tight cluster (PTP-disciplined) can tighten the bound.
520        let wall = Arc::new(AtomicU64::new(1000));
521        let wall_clone = wall;
522        let strict =
523            HybridLogicalClock::with_wall_clock(move || wall_clone.load(Ordering::SeqCst), 10);
524        let too_far = HlcTimestamp::new(1100, 0);
525        assert!(strict.update(too_far).is_err());
526
527        let just_inside = HlcTimestamp::new(1010, 0);
528        assert!(strict.update(just_inside).is_ok());
529    }
530
531    #[test]
532    fn now_and_update_interleave_preserve_monotonicity() {
533        // Realistic scenario: local writes interleaved with incoming
534        // remote replications; every emitted timestamp must be > every
535        // prior emitted timestamp on this clock.
536        let (clock, wall) = new_test_clock(1000);
537        let t1 = clock.now();
538        let t2 = clock.update(HlcTimestamp::new(1100, 0)).unwrap();
539        let t3 = clock.now();
540        wall.store(1200, Ordering::SeqCst);
541        let t4 = clock.now();
542        let t5 = clock.update(HlcTimestamp::new(1150, 50)).unwrap();
543        let t6 = clock.now();
544        let series = [t1, t2, t3, t4, t5, t6];
545        for w in series.windows(2) {
546            assert!(
547                w[0] < w[1],
548                "monotonicity violated: {:?} >= {:?}",
549                w[0],
550                w[1]
551            );
552        }
553    }
554
555    #[test]
556    fn last_emitted_reflects_latest_without_advancing() {
557        let (clock, _wall) = new_test_clock(1000);
558        let t1 = clock.now();
559        assert_eq!(clock.last_emitted(), t1);
560        let t2 = clock.now();
561        assert_eq!(clock.last_emitted(), t2);
562        // Reading last_emitted does not advance state.
563        assert_eq!(clock.last_emitted(), t2);
564    }
565
566    #[test]
567    fn display_format() {
568        let ts = HlcTimestamp::new(1_700_000_000_000, 42);
569        assert_eq!(format!("{ts}"), "1700000000000.42");
570    }
571
572    #[test]
573    fn zero_is_less_than_anything_real() {
574        let ts = HlcTimestamp::new(1, 0);
575        assert!(HlcTimestamp::ZERO < ts);
576    }
577}