yeti_types/hlc.rs
1//! Hybrid Logical Clock (HLC) — replication timestamp primitive.
2//!
3//! HLC is the foundational ordering primitive for yeti's multi-master
4//! CRDT data plane. Every log entry is stamped with an HLC and applied
5//! last-to-arrive. The richer mode (planned) stores per-record HLCs in
6//! an `__hlc_meta` column family and switches the receive path to
7//! HLC-LWW for plain fields plus CRDT merge for `@crdt`-marked fields.
8//!
9//! ## Why HLC instead of physical or logical alone
10//!
11//! - **Physical timestamps** (wall clock) drift between nodes and can
12//! regress if the clock is adjusted. Two writes with the same wall-
13//! clock millisecond can't be ordered.
14//! - **Logical clocks** (Lamport) preserve causal order but lose any
15//! relationship to real time, so you can't say "the most recent
16//! write" in a way humans understand.
17//! - **HLC** (Kulkarni et al. 2014) combines both: a physical
18//! component bounded by wall-clock that advances monotonically, plus
19//! a logical counter that breaks ties when physical time stalls or
20//! regresses.
21//!
22//! ## Wire shape
23//!
24//! An `HlcTimestamp` is a `u64` packed:
25//! - High 48 bits: physical time, milliseconds since UNIX epoch
26//! (sufficient for ~8900 years past 1970)
27//! - Low 16 bits: logical counter (65,535 events at the same
28//! physical millisecond before overflow — at which point physical
29//! time will have advanced)
30//!
31//! Total ordering on `HlcTimestamp` is the natural u64 ordering.
32//! Two timestamps from different nodes that happen to collide are
33//! broken by appending `node_id` at the call site (the HLC itself is
34//! node-agnostic; the node-id tie-break is part of the replication
35//! envelope, not the timestamp).
36//!
37//! ## Skew tolerance
38//!
39//! HLC-LWW assumes bounded clock skew between nodes. A peer whose
40//! wall clock is far ahead of ours could write timestamps that look
41//! arbitrarily-in-the-future from our perspective; if we accept them
42//! into our HLC state, subsequent local writes will be perpetually
43//! "behind" until our wall clock catches up. The skew tolerance
44//! caps how far in the future we accept incoming timestamps.
45//!
46//! Default 250ms (CockroachDB precedent). Configurable per-cluster
47//! via `replication.hlc_skew_tolerance_ms` in `yeti-config.yaml`.
48
49use std::sync::Mutex;
50use std::time::{SystemTime, UNIX_EPOCH};
51
52use serde::{Deserialize, Serialize};
53
54/// Default HLC skew tolerance — the maximum amount we'll let a peer's
55/// timestamp exceed our local wall clock before rejecting it.
56///
57/// Configurable per-cluster via `yeti-config.yaml`
58/// (`replication.hlc_skew_tolerance_ms`). 250ms is the `CockroachDB`
59/// precedent — tight enough to limit poisoning damage from a badly-
60/// skewed peer, loose enough to absorb normal NTP-disciplined drift.
61pub const DEFAULT_SKEW_TOLERANCE_MS: u64 = 250;
62
63const PHYSICAL_BITS: u32 = 48;
64const LOGICAL_BITS: u32 = 16;
65const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
66const PHYSICAL_MASK: u64 = !LOGICAL_MASK;
67
68/// A 64-bit hybrid logical clock timestamp.
69///
70/// Layout: `[physical_ms : 48][logical : 16]`. Total ordering is the
71/// natural `u64` ordering on the packed value. Tie-breaking across
72/// nodes (when two HLC values are equal) is the caller's concern —
73/// the replication envelope appends `node_id` as a secondary key.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
75#[serde(transparent)]
76pub struct HlcTimestamp(u64);
77
78impl HlcTimestamp {
79 /// Zero / sentinel value. Less than any real timestamp; useful
80 /// for "no last-applied yet" cases.
81 pub const ZERO: Self = Self(0);
82
83 /// The maximum representable timestamp.
84 pub const MAX: Self = Self(u64::MAX);
85
86 /// Compose from physical milliseconds and logical counter.
87 ///
88 /// Panics if `physical_ms` overflows 48 bits (year ~10880) or
89 /// `logical` overflows 16 bits.
90 #[must_use]
91 pub const fn new(physical_ms: u64, logical: u16) -> Self {
92 assert!(
93 physical_ms < (1 << PHYSICAL_BITS),
94 "HLC physical component exceeds 48 bits"
95 );
96 Self((physical_ms << LOGICAL_BITS) | (logical as u64))
97 }
98
99 /// Physical milliseconds since UNIX epoch.
100 #[must_use]
101 pub const fn physical_ms(&self) -> u64 {
102 (self.0 & PHYSICAL_MASK) >> LOGICAL_BITS
103 }
104
105 /// Logical counter component.
106 #[must_use]
107 pub const fn logical(&self) -> u16 {
108 (self.0 & LOGICAL_MASK) as u16
109 }
110
111 /// Underlying packed `u64`. Useful for storage / wire formats
112 /// that want a single integer.
113 #[must_use]
114 pub const fn as_u64(&self) -> u64 {
115 self.0
116 }
117
118 /// Reconstruct from the packed `u64`.
119 #[must_use]
120 pub const fn from_u64(packed: u64) -> Self {
121 Self(packed)
122 }
123}
124
125impl std::fmt::Display for HlcTimestamp {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(f, "{}.{}", self.physical_ms(), self.logical())
128 }
129}
130
131/// Error from [`HybridLogicalClock::update`] when an incoming
132/// timestamp exceeds the configured skew tolerance.
133#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
134#[error(
135 "HLC skew tolerance exceeded: remote {remote_physical_ms}ms > local {local_physical_ms}ms + {tolerance_ms}ms tolerance"
136)]
137pub struct SkewExceeded {
138 /// The remote timestamp's physical component (ms since UNIX epoch).
139 pub remote_physical_ms: u64,
140 /// The local wall clock's reading at the time of the rejection.
141 pub local_physical_ms: u64,
142 /// The skew-tolerance bound that was exceeded.
143 pub tolerance_ms: u64,
144}
145
146/// A node-local hybrid logical clock.
147///
148/// One instance per node. Thread-safe (internal `Mutex` on the
149/// last-emitted timestamp). All HLC reads / updates go through this
150/// type so the invariant "every emitted timestamp is strictly greater
151/// than every previously-emitted timestamp on this node" is
152/// preserved.
153///
154/// ## Usage
155///
156/// - [`now`](Self::now) — call before writing a record locally. The
157/// returned timestamp is guaranteed strictly greater than any
158/// previously-emitted timestamp by this clock.
159/// - [`update`](Self::update) — call when receiving a record from a
160/// peer. The returned timestamp is greater than both the local
161/// last-emitted and the incoming remote timestamp. Errors with
162/// [`SkewExceeded`] if the remote's physical component is more
163/// than `skew_tolerance_ms` ahead of our wall clock.
164/// - [`last_emitted`](Self::last_emitted) — current high-water mark;
165/// used by replication for catch-up bookmarks.
166pub struct HybridLogicalClock {
167 /// The last timestamp this clock emitted. Strictly monotonic.
168 last: Mutex<HlcTimestamp>,
169 /// Wall-clock source. Injected for testability; production uses
170 /// [`SystemTime::now`].
171 wall_clock: Box<dyn Fn() -> u64 + Send + Sync>,
172 /// Maximum allowed `(remote_physical - local_physical)` before we
173 /// reject the incoming timestamp.
174 skew_tolerance_ms: u64,
175}
176
177impl std::fmt::Debug for HybridLogicalClock {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 let last = self.last.lock().map_or(HlcTimestamp::ZERO, |g| *g);
180 f.debug_struct("HybridLogicalClock")
181 .field("last_emitted", &last)
182 .field("skew_tolerance_ms", &self.skew_tolerance_ms)
183 .field("wall_clock", &"<dyn Fn() -> u64>")
184 .finish()
185 }
186}
187
188impl HybridLogicalClock {
189 /// Construct with default wall clock and default skew tolerance.
190 #[must_use]
191 pub fn new() -> Self {
192 Self::with_skew_tolerance_ms(DEFAULT_SKEW_TOLERANCE_MS)
193 }
194
195 /// Construct with a custom skew-tolerance bound.
196 #[must_use]
197 pub fn with_skew_tolerance_ms(skew_tolerance_ms: u64) -> Self {
198 Self {
199 last: Mutex::new(HlcTimestamp::ZERO),
200 wall_clock: Box::new(default_wall_clock_ms),
201 skew_tolerance_ms,
202 }
203 }
204
205 /// Construct with an injected wall clock (for testing). The clock
206 /// must return milliseconds since some fixed epoch.
207 #[must_use]
208 pub fn with_wall_clock<F>(wall_clock: F, skew_tolerance_ms: u64) -> Self
209 where
210 F: Fn() -> u64 + Send + Sync + 'static,
211 {
212 Self {
213 last: Mutex::new(HlcTimestamp::ZERO),
214 wall_clock: Box::new(wall_clock),
215 skew_tolerance_ms,
216 }
217 }
218
219 /// Emit a fresh timestamp. Strictly greater than every previously-
220 /// emitted timestamp by this clock.
221 ///
222 /// Algorithm:
223 /// 1. Read wall clock `wall`.
224 /// 2. If `wall > last.physical`, return `(wall, 0)` and update.
225 /// 3. Else, return `(last.physical, last.logical + 1)` and update
226 /// (logical-counter advance because wall clock didn't move
227 /// past the last emit).
228 ///
229 /// Logical overflow (65,536 events at the same wall-clock
230 /// millisecond) panics — in practice this requires emitting
231 /// 65,536 timestamps within ~1ms, which would mean a writer is
232 /// generating > 65M events/sec on a single clock; a real
233 /// scenario at that rate needs a re-architecture, not a wider
234 /// counter.
235 pub fn now(&self) -> HlcTimestamp {
236 let wall = (self.wall_clock)();
237 // Recover the inner value on poison: HLC monotonicity is preserved
238 // even if a panicking writer held the lock — the last-emitted
239 // timestamp is still the high-water mark and remains valid.
240 let mut last = self
241 .last
242 .lock()
243 .unwrap_or_else(std::sync::PoisonError::into_inner);
244 let new = if wall > last.physical_ms() {
245 HlcTimestamp::new(wall, 0)
246 } else {
247 // Wall didn't advance — bump logical. Overflow at >65k events
248 // in one millisecond is a documented unrecoverable state
249 // (single writer generating >65M events/sec); the panic
250 // message is the intended failure mode.
251 #[allow(clippy::expect_used)]
252 let next_logical = last
253 .logical()
254 .checked_add(1)
255 .expect("HLC logical counter overflow within a single ms");
256 HlcTimestamp::new(last.physical_ms(), next_logical)
257 };
258 *last = new;
259 new
260 }
261
262 /// Integrate a remote timestamp and emit a fresh local timestamp
263 /// strictly greater than both the local state and the remote.
264 ///
265 /// Algorithm:
266 /// 1. Read wall clock `wall`.
267 /// 2. Reject if `remote.physical > wall + skew_tolerance`. This
268 /// prevents a misbehaving peer from poisoning our HLC state
269 /// with a far-future timestamp.
270 /// 3. Compute `new_physical = max(local.physical, remote.physical, wall)`.
271 /// 4. Compute `new_logical`:
272 /// - both `local.physical` and `remote.physical` == `new_physical`:
273 /// `max(local.logical, remote.logical) + 1`
274 /// - only `local.physical` == `new_physical`: `local.logical + 1`
275 /// - only `remote.physical` == `new_physical`: `remote.logical + 1`
276 /// - neither (wall is new winner): `0`
277 /// 5. Update local and return.
278 ///
279 /// # Errors
280 ///
281 /// Returns [`SkewExceeded`] when the remote's physical component
282 /// is more than `skew_tolerance_ms` ahead of our wall clock.
283 pub fn update(&self, remote: HlcTimestamp) -> Result<HlcTimestamp, SkewExceeded> {
284 let wall = (self.wall_clock)();
285 if remote.physical_ms() > wall.saturating_add(self.skew_tolerance_ms) {
286 return Err(SkewExceeded {
287 remote_physical_ms: remote.physical_ms(),
288 local_physical_ms: wall,
289 tolerance_ms: self.skew_tolerance_ms,
290 });
291 }
292
293 let mut last = self
294 .last
295 .lock()
296 .unwrap_or_else(std::sync::PoisonError::into_inner);
297 let local_phys = last.physical_ms();
298 let remote_phys = remote.physical_ms();
299 let new_phys = local_phys.max(remote_phys).max(wall);
300
301 // Logical overflow paths are documented unrecoverable invariants
302 // (see `now()` for rationale).
303 #[allow(clippy::expect_used)]
304 let new_logical = if new_phys == local_phys && new_phys == remote_phys {
305 last.logical()
306 .max(remote.logical())
307 .checked_add(1)
308 .expect("HLC logical overflow on update (local + remote same ms)")
309 } else if new_phys == local_phys {
310 last.logical()
311 .checked_add(1)
312 .expect("HLC logical overflow on update (local advanced)")
313 } else if new_phys == remote_phys {
314 remote
315 .logical()
316 .checked_add(1)
317 .expect("HLC logical overflow on update (remote advanced)")
318 } else {
319 0
320 };
321
322 let new = HlcTimestamp::new(new_phys, new_logical);
323 *last = new;
324 drop(last);
325 Ok(new)
326 }
327
328 /// Read the current high-water mark without emitting a new
329 /// timestamp. Used by replication to record per-peer `last_applied`
330 /// bookmarks.
331 #[must_use]
332 pub fn last_emitted(&self) -> HlcTimestamp {
333 *self
334 .last
335 .lock()
336 .unwrap_or_else(std::sync::PoisonError::into_inner)
337 }
338
339 /// The skew tolerance this clock was configured with.
340 #[must_use]
341 pub const fn skew_tolerance_ms(&self) -> u64 {
342 self.skew_tolerance_ms
343 }
344}
345
346impl Default for HybridLogicalClock {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352fn default_wall_clock_ms() -> u64 {
353 SystemTime::now()
354 .duration_since(UNIX_EPOCH)
355 // Pre-1970 timestamps shouldn't happen; if the clock is set to
356 // before the epoch, fall back to 0 so we still emit *something*
357 // sensible (the logical counter will dominate).
358 //
359 // u128 → u64 saturates at u64::MAX (~584 million years post-epoch);
360 // realistic wall-clock millis fit in u64 forever.
361 .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
362}
363
364#[cfg(test)]
365#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
366mod tests {
367 use super::*;
368 use std::sync::Arc;
369 use std::sync::atomic::{AtomicU64, Ordering};
370
371 /// Build an HLC with a controllable wall clock.
372 fn new_test_clock(initial_wall_ms: u64) -> (HybridLogicalClock, Arc<AtomicU64>) {
373 let wall = Arc::new(AtomicU64::new(initial_wall_ms));
374 let wall_for_clock = Arc::clone(&wall);
375 let clock = HybridLogicalClock::with_wall_clock(
376 move || wall_for_clock.load(Ordering::SeqCst),
377 DEFAULT_SKEW_TOLERANCE_MS,
378 );
379 (clock, wall)
380 }
381
382 #[test]
383 fn timestamp_packing_round_trip() {
384 let ts = HlcTimestamp::new(1_700_000_000_000, 42);
385 assert_eq!(ts.physical_ms(), 1_700_000_000_000);
386 assert_eq!(ts.logical(), 42);
387 assert_eq!(HlcTimestamp::from_u64(ts.as_u64()), ts);
388 }
389
390 #[test]
391 fn natural_ordering_is_physical_then_logical() {
392 let a = HlcTimestamp::new(100, 5);
393 let b = HlcTimestamp::new(100, 6);
394 let c = HlcTimestamp::new(101, 0);
395 assert!(a < b);
396 assert!(b < c);
397 assert!(a < c);
398 }
399
400 #[test]
401 fn now_advances_monotonically_when_wall_stalls() {
402 let (clock, _wall) = new_test_clock(1000);
403 let t1 = clock.now();
404 let t2 = clock.now();
405 let t3 = clock.now();
406 assert_eq!(t1.physical_ms(), 1000);
407 assert_eq!(t1.logical(), 0);
408 assert_eq!(t2.physical_ms(), 1000);
409 assert_eq!(t2.logical(), 1);
410 assert_eq!(t3.physical_ms(), 1000);
411 assert_eq!(t3.logical(), 2);
412 assert!(t1 < t2);
413 assert!(t2 < t3);
414 }
415
416 #[test]
417 fn now_takes_wall_when_wall_advances() {
418 let (clock, wall) = new_test_clock(1000);
419 let t1 = clock.now();
420 wall.store(2000, Ordering::SeqCst);
421 let t2 = clock.now();
422 assert_eq!(t2.physical_ms(), 2000);
423 assert_eq!(t2.logical(), 0);
424 assert!(t1 < t2);
425 }
426
427 #[test]
428 fn now_holds_at_last_when_wall_regresses() {
429 // Operator adjusts the wall clock backwards (NTP step). The HLC
430 // must NOT regress — the physical component holds at the last-
431 // emitted value and the logical counter advances.
432 let (clock, wall) = new_test_clock(5000);
433 let t1 = clock.now();
434 wall.store(3000, Ordering::SeqCst);
435 let t2 = clock.now();
436 assert_eq!(t1.physical_ms(), 5000);
437 assert_eq!(
438 t2.physical_ms(),
439 5000,
440 "HLC must not regress on wall stepback"
441 );
442 assert_eq!(t2.logical(), 1);
443 assert!(t1 < t2);
444 }
445
446 #[test]
447 fn update_with_strictly_greater_remote() {
448 // Wall starts at 1000; we advance it to 2000 so a remote at
449 // (2000, 5) is inside the skew tolerance window.
450 let (clock, wall) = new_test_clock(1000);
451 let _ = clock.now(); // local at (1000, 0)
452 wall.store(2000, Ordering::SeqCst);
453 let remote = HlcTimestamp::new(2000, 5);
454 let merged = clock.update(remote).unwrap();
455 assert_eq!(merged.physical_ms(), 2000);
456 assert_eq!(
457 merged.logical(),
458 6,
459 "remote.logical + 1 when remote phys wins"
460 );
461 }
462
463 #[test]
464 fn update_with_equal_physical_takes_max_logical_plus_one() {
465 let (clock, _wall) = new_test_clock(1000);
466 let _ = clock.now();
467 let _ = clock.now(); // local at (1000, 1)
468 let remote = HlcTimestamp::new(1000, 10);
469 let merged = clock.update(remote).unwrap();
470 assert_eq!(merged.physical_ms(), 1000);
471 assert_eq!(merged.logical(), 11);
472 }
473
474 #[test]
475 fn update_with_lesser_remote_advances_local_logical() {
476 let (clock, _wall) = new_test_clock(5000);
477 let _ = clock.now(); // local (5000, 0)
478 let remote = HlcTimestamp::new(3000, 0);
479 let merged = clock.update(remote).unwrap();
480 // local.physical > remote.physical, so new_phys = local.physical;
481 // new_logical = local.logical + 1.
482 assert_eq!(merged.physical_ms(), 5000);
483 assert_eq!(merged.logical(), 1);
484 }
485
486 #[test]
487 fn update_with_wall_winning_resets_logical() {
488 let (clock, wall) = new_test_clock(1000);
489 let _ = clock.now(); // local (1000, 0)
490 wall.store(10_000, Ordering::SeqCst);
491 let remote = HlcTimestamp::new(2000, 99);
492 let merged = clock.update(remote).unwrap();
493 // wall(10000) > local(1000), wall > remote(2000) → new_phys = wall, logical = 0.
494 assert_eq!(merged.physical_ms(), 10_000);
495 assert_eq!(merged.logical(), 0);
496 }
497
498 #[test]
499 fn update_rejects_remote_beyond_skew_tolerance() {
500 let (clock, _wall) = new_test_clock(1000);
501 let far_future = HlcTimestamp::new(1000 + DEFAULT_SKEW_TOLERANCE_MS + 1, 0);
502 let err = clock.update(far_future).unwrap_err();
503 assert_eq!(err.remote_physical_ms, 1000 + DEFAULT_SKEW_TOLERANCE_MS + 1);
504 assert_eq!(err.local_physical_ms, 1000);
505 assert_eq!(err.tolerance_ms, DEFAULT_SKEW_TOLERANCE_MS);
506 }
507
508 #[test]
509 fn update_accepts_remote_at_skew_tolerance_boundary() {
510 let (clock, _wall) = new_test_clock(1000);
511 // Exactly at the boundary: remote_phys == wall + tolerance.
512 let at_boundary = HlcTimestamp::new(1000 + DEFAULT_SKEW_TOLERANCE_MS, 0);
513 let merged = clock.update(at_boundary).unwrap();
514 assert_eq!(merged.physical_ms(), 1000 + DEFAULT_SKEW_TOLERANCE_MS);
515 }
516
517 #[test]
518 fn skew_tolerance_is_configurable() {
519 // A tight cluster (PTP-disciplined) can tighten the bound.
520 let wall = Arc::new(AtomicU64::new(1000));
521 let wall_clone = wall;
522 let strict =
523 HybridLogicalClock::with_wall_clock(move || wall_clone.load(Ordering::SeqCst), 10);
524 let too_far = HlcTimestamp::new(1100, 0);
525 assert!(strict.update(too_far).is_err());
526
527 let just_inside = HlcTimestamp::new(1010, 0);
528 assert!(strict.update(just_inside).is_ok());
529 }
530
531 #[test]
532 fn now_and_update_interleave_preserve_monotonicity() {
533 // Realistic scenario: local writes interleaved with incoming
534 // remote replications; every emitted timestamp must be > every
535 // prior emitted timestamp on this clock.
536 let (clock, wall) = new_test_clock(1000);
537 let t1 = clock.now();
538 let t2 = clock.update(HlcTimestamp::new(1100, 0)).unwrap();
539 let t3 = clock.now();
540 wall.store(1200, Ordering::SeqCst);
541 let t4 = clock.now();
542 let t5 = clock.update(HlcTimestamp::new(1150, 50)).unwrap();
543 let t6 = clock.now();
544 let series = [t1, t2, t3, t4, t5, t6];
545 for w in series.windows(2) {
546 assert!(
547 w[0] < w[1],
548 "monotonicity violated: {:?} >= {:?}",
549 w[0],
550 w[1]
551 );
552 }
553 }
554
555 #[test]
556 fn last_emitted_reflects_latest_without_advancing() {
557 let (clock, _wall) = new_test_clock(1000);
558 let t1 = clock.now();
559 assert_eq!(clock.last_emitted(), t1);
560 let t2 = clock.now();
561 assert_eq!(clock.last_emitted(), t2);
562 // Reading last_emitted does not advance state.
563 assert_eq!(clock.last_emitted(), t2);
564 }
565
566 #[test]
567 fn display_format() {
568 let ts = HlcTimestamp::new(1_700_000_000_000, 42);
569 assert_eq!(format!("{ts}"), "1700000000000.42");
570 }
571
572 #[test]
573 fn zero_is_less_than_anything_real() {
574 let ts = HlcTimestamp::new(1, 0);
575 assert!(HlcTimestamp::ZERO < ts);
576 }
577}