Skip to main content

citadeldb_sync/
hlc.rs

1use std::sync::atomic::{AtomicI64, Ordering};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4/// Hybrid Logical Clock timestamp.
5///
6/// Layout (12 bytes):
7/// - `wall_time`: `i64` — nanoseconds since Unix epoch (true nanosecond precision)
8/// - `logical`: `i32` — counter for events within the same nanosecond
9///
10/// Comparison: wall_time first, then logical (total order).
11/// Big-endian byte serialization preserves comparison order for non-negative values.
12///
13/// Range:
14/// - wall_time: covers ~292 years from epoch (until year 2262)
15/// - logical: up to 2,147,483,647 events per nanosecond
16#[derive(Clone, Copy, PartialEq, Eq, Hash)]
17pub struct HlcTimestamp {
18    wall_time: i64,
19    logical: i32,
20}
21
22/// Wire size of an HlcTimestamp in bytes.
23pub const HLC_TIMESTAMP_SIZE: usize = 12;
24
25impl HlcTimestamp {
26    pub const ZERO: Self = Self {
27        wall_time: 0,
28        logical: 0,
29    };
30
31    /// Create from wall-clock time in nanoseconds and logical counter.
32    #[inline]
33    pub fn new(wall_time_ns: i64, logical: i32) -> Self {
34        Self {
35            wall_time: wall_time_ns,
36            logical,
37        }
38    }
39
40    /// Wall-clock time in nanoseconds since Unix epoch.
41    #[inline]
42    pub fn wall_time(&self) -> i64 {
43        self.wall_time
44    }
45
46    /// Logical counter.
47    #[inline]
48    pub fn logical(&self) -> i32 {
49        self.logical
50    }
51
52    /// Serialize to 12 bytes big-endian (preserves comparison order for non-negative values).
53    #[inline]
54    pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
55        let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
56        buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
57        buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
58        buf
59    }
60
61    /// Reconstruct from 12 bytes big-endian.
62    #[inline]
63    pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
64        Self {
65            wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
66            logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
67        }
68    }
69
70    /// Returns `true` if this is the zero/uninitialized timestamp.
71    #[inline]
72    pub fn is_zero(&self) -> bool {
73        self.wall_time == 0 && self.logical == 0
74    }
75}
76
77impl PartialOrd for HlcTimestamp {
78    #[inline]
79    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84impl Ord for HlcTimestamp {
85    #[inline]
86    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
87        self.wall_time
88            .cmp(&other.wall_time)
89            .then(self.logical.cmp(&other.logical))
90    }
91}
92
93impl std::fmt::Debug for HlcTimestamp {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
96    }
97}
98
99impl std::fmt::Display for HlcTimestamp {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        write!(f, "{}:{}", self.wall_time, self.logical)
102    }
103}
104
105/// Errors from HLC clock operations.
106#[derive(Debug, thiserror::Error)]
107pub enum ClockError {
108    #[error(
109        "clock drift exceeded: remote_wall_time={remote_ns}ns, \
110         physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
111    )]
112    ClockDriftExceeded {
113        remote_ns: i64,
114        physical_ns: i64,
115        max_drift_ns: i64,
116    },
117
118    #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
119    CounterOverflow,
120}
121
122/// Source of physical time for the HLC.
123///
124/// Abstracted as a trait to allow deterministic testing with [`ManualClock`].
125pub trait PhysicalClock: Send {
126    /// Current time in nanoseconds since Unix epoch.
127    fn now_ns(&self) -> i64;
128}
129
130/// Physical clock backed by the system clock.
131///
132/// On Linux: provides true nanosecond precision via `clock_gettime(CLOCK_REALTIME)`.
133/// On macOS: microsecond precision for wall clock.
134/// On Windows: ~100ns precision (system timer resolution).
135pub struct SystemClock;
136
137impl PhysicalClock for SystemClock {
138    fn now_ns(&self) -> i64 {
139        SystemTime::now()
140            .duration_since(UNIX_EPOCH)
141            .expect("system clock before UNIX epoch")
142            .as_nanos() as i64
143    }
144}
145
146/// Manually controlled clock for deterministic testing.
147pub struct ManualClock {
148    time_ns: AtomicI64,
149}
150
151impl ManualClock {
152    /// Create a manual clock starting at `initial_ns` nanoseconds.
153    pub fn new(initial_ns: i64) -> Self {
154        Self {
155            time_ns: AtomicI64::new(initial_ns),
156        }
157    }
158
159    /// Set the clock to an absolute time in nanoseconds.
160    pub fn set(&self, time_ns: i64) {
161        self.time_ns.store(time_ns, Ordering::SeqCst);
162    }
163
164    /// Advance the clock by `delta_ns` nanoseconds.
165    pub fn advance(&self, delta_ns: i64) {
166        self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
167    }
168}
169
170impl PhysicalClock for ManualClock {
171    fn now_ns(&self) -> i64 {
172        self.time_ns.load(Ordering::SeqCst)
173    }
174}
175
176/// One second in nanoseconds.
177const SECOND_NS: i64 = 1_000_000_000;
178
179/// Default maximum clock drift: 5 seconds.
180const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
181
182/// Hybrid Logical Clock state machine.
183///
184/// Two core operations:
185/// - [`now()`](HlcClock::now) — generate a timestamp for a local event
186/// - [`update()`](HlcClock::update) — merge a remote timestamp into local state
187///
188/// `now()` always returns a strictly increasing timestamp.
189/// `update()` advances the internal clock state without generating a new timestamp.
190/// Timestamps are only generated by local events; received timestamps only advance
191/// the clock.
192pub struct HlcClock<C: PhysicalClock = SystemClock> {
193    last: HlcTimestamp,
194    max_drift_ns: i64,
195    clock: C,
196}
197
198impl HlcClock<SystemClock> {
199    /// Create an HLC clock with the system clock and default max drift (5s).
200    pub fn new() -> Self {
201        Self {
202            last: HlcTimestamp::ZERO,
203            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
204            clock: SystemClock,
205        }
206    }
207}
208
209impl<C: PhysicalClock> HlcClock<C> {
210    /// Create an HLC clock with a custom physical clock source.
211    pub fn with_clock(clock: C) -> Self {
212        Self {
213            last: HlcTimestamp::ZERO,
214            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
215            clock,
216        }
217    }
218
219    /// Set the maximum allowed drift in nanoseconds.
220    pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
221        self.max_drift_ns = max_drift_ns;
222    }
223
224    /// Seed the clock with a previously persisted timestamp.
225    ///
226    /// Call on startup to restore monotonicity after a restart.
227    /// The next `now()` will return a timestamp strictly greater than `ts`.
228    pub fn set_last(&mut self, ts: HlcTimestamp) {
229        self.last = ts;
230    }
231
232    /// Generate a timestamp for a local event.
233    ///
234    /// Guarantees:
235    /// - Monotonically increasing (never returns a value <= previous)
236    /// - Wall time tracks physical clock within `max_drift`
237    /// - Returns `ClockError::CounterOverflow` if counter saturates
238    /// - Returns `ClockError::ClockDriftExceeded` if wall time is too far
239    ///   ahead of physical time
240    pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
241        let pt = self.clock.now_ns();
242
243        let ts = if self.last.wall_time >= pt {
244            // Physical clock hasn't advanced past stored wall time.
245            // Increment logical counter.
246            let new_logical = self
247                .last
248                .logical
249                .checked_add(1)
250                .ok_or(ClockError::CounterOverflow)?;
251
252            // Check drift: stored wall_time vs physical
253            self.check_drift(self.last.wall_time, pt)?;
254
255            HlcTimestamp::new(self.last.wall_time, new_logical)
256        } else {
257            // Physical clock advanced — use it, reset logical to 0.
258            HlcTimestamp::new(pt, 0)
259        };
260
261        self.last = ts;
262        Ok(ts)
263    }
264
265    /// Merge a remote timestamp into local clock state.
266    ///
267    /// Advances the internal state so that subsequent `now()` calls
268    /// will produce timestamps strictly greater than the remote.
269    /// Does NOT generate a new timestamp.
270    ///
271    /// Returns `ClockError::ClockDriftExceeded` if the remote wall time
272    /// is more than `max_drift` ahead of the local physical clock.
273    pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
274        let pt = self.clock.now_ns();
275
276        // Reject remote timestamps that are too far ahead
277        if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
278            return Err(ClockError::ClockDriftExceeded {
279                remote_ns: remote.wall_time,
280                physical_ns: pt,
281                max_drift_ns: self.max_drift_ns,
282            });
283        }
284
285        if remote.wall_time > self.last.wall_time {
286            // Remote is ahead: adopt its wall time and logical
287            self.last = remote;
288        } else if remote.wall_time == self.last.wall_time {
289            // Same wall time: take the max logical
290            if remote.logical > self.last.logical {
291                self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
292            }
293        }
294        // If remote.wall_time < self.last.wall_time: do nothing
295
296        Ok(())
297    }
298
299    /// Returns the last generated or merged timestamp.
300    pub fn last_timestamp(&self) -> HlcTimestamp {
301        self.last
302    }
303
304    /// Returns a reference to the underlying physical clock.
305    pub fn physical_clock(&self) -> &C {
306        &self.clock
307    }
308
309    fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
310        if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
311            return Err(ClockError::ClockDriftExceeded {
312                remote_ns: wall_time_ns,
313                physical_ns,
314                max_drift_ns: self.max_drift_ns,
315            });
316        }
317        Ok(())
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    const SECOND: i64 = 1_000_000_000;
326    const MS: i64 = 1_000_000;
327
328    // ── HlcTimestamp basics ───────────────────────────────────────────
329
330    #[test]
331    fn new_and_accessors() {
332        let ts = HlcTimestamp::new(1_000_000_000, 42);
333        assert_eq!(ts.wall_time(), 1_000_000_000);
334        assert_eq!(ts.logical(), 42);
335    }
336
337    #[test]
338    fn zero_timestamp() {
339        let ts = HlcTimestamp::ZERO;
340        assert_eq!(ts.wall_time(), 0);
341        assert_eq!(ts.logical(), 0);
342        assert!(ts.is_zero());
343    }
344
345    #[test]
346    fn non_zero_is_not_zero() {
347        let ts = HlcTimestamp::new(1, 0);
348        assert!(!ts.is_zero());
349        let ts2 = HlcTimestamp::new(0, 1);
350        assert!(!ts2.is_zero());
351    }
352
353    #[test]
354    fn ordering_wall_time_dominates() {
355        let a = HlcTimestamp::new(100, i32::MAX);
356        let b = HlcTimestamp::new(101, 0);
357        assert!(a < b);
358    }
359
360    #[test]
361    fn ordering_logical_tiebreaks() {
362        let a = HlcTimestamp::new(100, 5);
363        let b = HlcTimestamp::new(100, 6);
364        assert!(a < b);
365    }
366
367    #[test]
368    fn ordering_equality() {
369        let a = HlcTimestamp::new(100, 5);
370        let b = HlcTimestamp::new(100, 5);
371        assert_eq!(a, b);
372        assert!(a <= b);
373        assert!(a >= b);
374    }
375
376    #[test]
377    fn ordering_negative_wall_time() {
378        let a = HlcTimestamp::new(-100, 0);
379        let b = HlcTimestamp::new(0, 0);
380        let c = HlcTimestamp::new(100, 0);
381        assert!(a < b);
382        assert!(b < c);
383    }
384
385    // ── Byte serialization ────────────────────────────────────────────
386
387    #[test]
388    fn bytes_roundtrip() {
389        let ts = HlcTimestamp::new(123_456_789_000_000, 1000);
390        let bytes = ts.to_bytes();
391        assert_eq!(bytes.len(), 12);
392        let ts2 = HlcTimestamp::from_bytes(&bytes);
393        assert_eq!(ts, ts2);
394    }
395
396    #[test]
397    fn bytes_roundtrip_zero() {
398        let ts = HlcTimestamp::ZERO;
399        let bytes = ts.to_bytes();
400        let ts2 = HlcTimestamp::from_bytes(&bytes);
401        assert_eq!(ts, ts2);
402    }
403
404    #[test]
405    fn bytes_roundtrip_max() {
406        let ts = HlcTimestamp::new(i64::MAX, i32::MAX);
407        let bytes = ts.to_bytes();
408        let ts2 = HlcTimestamp::from_bytes(&bytes);
409        assert_eq!(ts, ts2);
410    }
411
412    #[test]
413    fn bytes_preserve_order_for_positive_values() {
414        let a = HlcTimestamp::new(100, 5);
415        let b = HlcTimestamp::new(100, 6);
416        let c = HlcTimestamp::new(101, 0);
417
418        let ba = a.to_bytes();
419        let bb = b.to_bytes();
420        let bc = c.to_bytes();
421
422        assert!(ba < bb);
423        assert!(bb < bc);
424    }
425
426    #[test]
427    fn bytes_wall_time_is_big_endian() {
428        let ts = HlcTimestamp::new(0x0102_0304_0506_0708, 0);
429        let bytes = ts.to_bytes();
430        assert_eq!(&bytes[0..8], &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]);
431    }
432
433    #[test]
434    fn bytes_logical_is_big_endian() {
435        let ts = HlcTimestamp::new(0, 0x01020304);
436        let bytes = ts.to_bytes();
437        assert_eq!(&bytes[8..12], &[0x01, 0x02, 0x03, 0x04]);
438    }
439
440    // ── Display / Debug ───────────────────────────────────────────────
441
442    #[test]
443    fn display_format() {
444        let ts = HlcTimestamp::new(1_000_000_000, 5);
445        assert_eq!(format!("{ts}"), "1000000000:5");
446    }
447
448    #[test]
449    fn debug_format() {
450        let ts = HlcTimestamp::new(1_000_000_000, 5);
451        assert_eq!(format!("{ts:?}"), "HLC(1000000000ns:5)");
452    }
453
454    // ── ManualClock ───────────────────────────────────────────────────
455
456    #[test]
457    fn manual_clock_basic() {
458        let mc = ManualClock::new(100);
459        assert_eq!(mc.now_ns(), 100);
460        mc.advance(50);
461        assert_eq!(mc.now_ns(), 150);
462        mc.set(200);
463        assert_eq!(mc.now_ns(), 200);
464    }
465
466    // ── SystemClock ───────────────────────────────────────────────────
467
468    #[test]
469    fn system_clock_produces_reasonable_values() {
470        let sc = SystemClock;
471        let now = sc.now_ns();
472        // Should be after 2020-01-01 in nanoseconds
473        let jan_2020_ns: i64 = 1_577_836_800 * SECOND;
474        assert!(now > jan_2020_ns);
475        // Should fit in i64 (won't overflow until year 2262)
476        assert!(now > 0);
477    }
478
479    // ── HlcClock::now() ──────────────────────────────────────────────
480
481    #[test]
482    fn now_monotonic() {
483        let mc = ManualClock::new(1000 * SECOND);
484        let mut clock = HlcClock::with_clock(mc);
485
486        let t1 = clock.now().unwrap();
487        let t2 = clock.now().unwrap();
488        let t3 = clock.now().unwrap();
489
490        assert!(t1 < t2);
491        assert!(t2 < t3);
492    }
493
494    #[test]
495    fn now_same_physical_increments_logical() {
496        let mc = ManualClock::new(1000 * SECOND);
497        let mut clock = HlcClock::with_clock(mc);
498
499        let t1 = clock.now().unwrap();
500        let t2 = clock.now().unwrap();
501
502        assert_eq!(t1.wall_time(), 1000 * SECOND);
503        assert_eq!(t1.logical(), 0);
504        assert_eq!(t2.wall_time(), 1000 * SECOND);
505        assert_eq!(t2.logical(), 1);
506    }
507
508    #[test]
509    fn now_physical_advance_resets_logical() {
510        let mc = ManualClock::new(1000 * SECOND);
511        let mut clock = HlcClock::with_clock(mc);
512
513        let _t1 = clock.now().unwrap();
514        let _t2 = clock.now().unwrap();
515        assert_eq!(_t2.logical(), 1);
516
517        // Advance by 1 nanosecond
518        clock.physical_clock().advance(1);
519        let t3 = clock.now().unwrap();
520        assert_eq!(t3.wall_time(), 1000 * SECOND + 1);
521        assert_eq!(t3.logical(), 0);
522    }
523
524    #[test]
525    fn now_backward_jump_stays_at_high_watermark() {
526        let mc = ManualClock::new(1000 * SECOND);
527        let mut clock = HlcClock::with_clock(mc);
528
529        let t1 = clock.now().unwrap();
530        assert_eq!(t1.wall_time(), 1000 * SECOND);
531
532        // Jump backward by 2 seconds (within 5s default max drift)
533        clock.physical_clock().set(998 * SECOND);
534        let t2 = clock.now().unwrap();
535
536        // Should stay at high watermark, logical increments
537        assert_eq!(t2.wall_time(), 1000 * SECOND);
538        assert!(t2 > t1);
539    }
540
541    #[test]
542    fn now_counter_overflow() {
543        let mc = ManualClock::new(1000 * SECOND);
544        let mut clock = HlcClock::with_clock(mc);
545
546        // Seed near max logical
547        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX - 1));
548
549        // One more should work
550        let t = clock.now().unwrap();
551        assert_eq!(t.logical(), i32::MAX);
552
553        // Next should overflow
554        let err = clock.now().unwrap_err();
555        assert!(matches!(err, ClockError::CounterOverflow));
556    }
557
558    #[test]
559    fn now_counter_overflow_recovery_via_time_advance() {
560        let mc = ManualClock::new(1000 * SECOND);
561        let mut clock = HlcClock::with_clock(mc);
562
563        // Max out counter
564        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX));
565
566        // Overflow at same time
567        let err = clock.now().unwrap_err();
568        assert!(matches!(err, ClockError::CounterOverflow));
569
570        // Advance physical time — should recover
571        clock.physical_clock().advance(1);
572        let t = clock.now().unwrap();
573        assert_eq!(t.wall_time(), 1000 * SECOND + 1);
574        assert_eq!(t.logical(), 0);
575    }
576
577    #[test]
578    fn now_drift_protection() {
579        let mc = ManualClock::new(1000 * SECOND);
580        let mut clock = HlcClock::with_clock(mc);
581        clock.set_max_drift_ns(SECOND); // 1 second max drift
582
583        // Inject a far-future timestamp
584        clock.set_last(HlcTimestamp::new(1010 * SECOND, 0));
585
586        // now() should fail because wall_time is 10s ahead of physical
587        let err = clock.now().unwrap_err();
588        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
589    }
590
591    // ── HlcClock::update() ───────────────────────────────────────────
592
593    #[test]
594    fn update_remote_behind() {
595        let mc = ManualClock::new(1000 * SECOND);
596        let mut clock = HlcClock::with_clock(mc);
597
598        let _t1 = clock.now().unwrap(); // (1000s, 0)
599
600        // Remote is behind
601        let remote = HlcTimestamp::new(500 * SECOND, 99);
602        clock.update(remote).unwrap();
603
604        // State unchanged — local was ahead
605        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
606        assert_eq!(clock.last_timestamp().logical(), 0);
607    }
608
609    #[test]
610    fn update_remote_ahead() {
611        let mc = ManualClock::new(1000 * SECOND);
612        let mut clock = HlcClock::with_clock(mc);
613
614        let _t1 = clock.now().unwrap(); // (1000s, 0)
615
616        // Remote is ahead (within drift)
617        let remote = HlcTimestamp::new(1002 * SECOND, 5);
618        clock.update(remote).unwrap();
619
620        // State adopted remote
621        assert_eq!(clock.last_timestamp().wall_time(), 1002 * SECOND);
622        assert_eq!(clock.last_timestamp().logical(), 5);
623
624        // Next now() is strictly after remote
625        let t2 = clock.now().unwrap();
626        assert!(t2 > remote);
627        assert_eq!(t2.wall_time(), 1002 * SECOND);
628        assert_eq!(t2.logical(), 6);
629    }
630
631    #[test]
632    fn update_remote_same_wall_time_higher_logical() {
633        let mc = ManualClock::new(1000 * SECOND);
634        let mut clock = HlcClock::with_clock(mc);
635
636        let _t1 = clock.now().unwrap(); // (1000s, 0)
637
638        // Remote has same wall time but higher logical
639        let remote = HlcTimestamp::new(1000 * SECOND, 10);
640        clock.update(remote).unwrap();
641
642        // State adopted higher logical
643        assert_eq!(clock.last_timestamp().logical(), 10);
644
645        // Next now() increments from merged state
646        let t2 = clock.now().unwrap();
647        assert_eq!(t2.logical(), 11);
648    }
649
650    #[test]
651    fn update_remote_same_wall_time_lower_logical() {
652        let mc = ManualClock::new(1000 * SECOND);
653        let mut clock = HlcClock::with_clock(mc);
654
655        // Generate several events
656        for _ in 0..5 {
657            clock.now().unwrap();
658        }
659        // State: (1000s, 4)
660
661        // Remote has same wall time but lower logical
662        let remote = HlcTimestamp::new(1000 * SECOND, 2);
663        clock.update(remote).unwrap();
664
665        // State unchanged — local logical was higher
666        assert_eq!(clock.last_timestamp().logical(), 4);
667    }
668
669    #[test]
670    fn update_drift_exceeded() {
671        let mc = ManualClock::new(1000 * SECOND);
672        let mut clock = HlcClock::with_clock(mc);
673        clock.set_max_drift_ns(SECOND); // 1 second
674
675        // Remote is 10 seconds ahead
676        let remote = HlcTimestamp::new(1010 * SECOND, 0);
677        let err = clock.update(remote).unwrap_err();
678        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
679
680        // State unchanged after rejection
681        assert_eq!(clock.last_timestamp(), HlcTimestamp::ZERO);
682    }
683
684    #[test]
685    fn update_drift_boundary_exact() {
686        let mc = ManualClock::new(1000 * SECOND);
687        let mut clock = HlcClock::with_clock(mc);
688        clock.set_max_drift_ns(SECOND); // 1 second
689
690        // Remote exactly at boundary: should succeed
691        let remote = HlcTimestamp::new(1001 * SECOND, 0);
692        clock.update(remote).unwrap();
693        assert_eq!(clock.last_timestamp().wall_time(), 1001 * SECOND);
694
695        // One nanosecond past boundary: should fail
696        let mc2 = ManualClock::new(1000 * SECOND);
697        let mut clock2 = HlcClock::with_clock(mc2);
698        clock2.set_max_drift_ns(SECOND);
699
700        let remote2 = HlcTimestamp::new(1001 * SECOND + 1, 0);
701        let err = clock2.update(remote2).unwrap_err();
702        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
703    }
704
705    #[test]
706    fn update_zero_timestamp_is_noop() {
707        let mc = ManualClock::new(1000 * SECOND);
708        let mut clock = HlcClock::with_clock(mc);
709        let _t1 = clock.now().unwrap();
710
711        clock.update(HlcTimestamp::ZERO).unwrap();
712
713        // State unchanged
714        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
715    }
716
717    // ── set_last (persistence) ────────────────────────────────────────
718
719    #[test]
720    fn set_last_restores_monotonicity() {
721        let mc = ManualClock::new(1000 * SECOND);
722        let mut clock = HlcClock::with_clock(mc);
723
724        let persisted = HlcTimestamp::new(1000 * SECOND, 50);
725        clock.set_last(persisted);
726
727        let t1 = clock.now().unwrap();
728        assert!(t1 > persisted);
729        assert_eq!(t1.logical(), 51);
730    }
731
732    // ── Two clocks syncing ────────────────────────────────────────────
733
734    #[test]
735    fn two_clocks_converge() {
736        let mc_a = ManualClock::new(1000 * SECOND);
737        let mc_b = ManualClock::new(1000 * SECOND + 50 * MS);
738
739        let mut clock_a = HlcClock::with_clock(mc_a);
740        let mut clock_b = HlcClock::with_clock(mc_b);
741
742        // A generates event
743        let ta1 = clock_a.now().unwrap();
744        assert_eq!(ta1.wall_time(), 1000 * SECOND);
745
746        // B generates event
747        let tb1 = clock_b.now().unwrap();
748        assert_eq!(tb1.wall_time(), 1000 * SECOND + 50 * MS);
749
750        // A receives B's timestamp via update, then generates its own
751        clock_a.update(tb1).unwrap();
752        let ta2 = clock_a.now().unwrap();
753        // A should advance to B's wall time with incremented logical
754        assert_eq!(ta2.wall_time(), 1000 * SECOND + 50 * MS);
755        assert_eq!(ta2.logical(), 1);
756
757        // B receives A's earlier timestamp — already behind, no effect
758        clock_b.update(ta1).unwrap();
759        let tb2 = clock_b.now().unwrap();
760        assert_eq!(tb2.wall_time(), 1000 * SECOND + 50 * MS);
761        assert!(tb2 > tb1);
762    }
763
764    #[test]
765    fn causal_ordering_preserved() {
766        let mc_a = ManualClock::new(1000 * SECOND);
767        let mc_b = ManualClock::new(1000 * SECOND);
768
769        let mut clock_a = HlcClock::with_clock(mc_a);
770        let mut clock_b = HlcClock::with_clock(mc_b);
771
772        // A: write X
773        let ta1 = clock_a.now().unwrap();
774
775        // B receives A's message (sees write X), then writes Y
776        clock_b.update(ta1).unwrap();
777        let tb1 = clock_b.now().unwrap();
778
779        // Causal chain: ta1 (write X) < tb1 (write Y after seeing X)
780        assert!(ta1 < tb1);
781    }
782
783    #[test]
784    fn physical_time_advance_during_sync() {
785        let mc_a = ManualClock::new(1000 * SECOND);
786        let mc_b = ManualClock::new(1000 * SECOND);
787
788        let mut clock_a = HlcClock::with_clock(mc_a);
789        let mut clock_b = HlcClock::with_clock(mc_b);
790
791        // Both generate local events at same physical time
792        let ta = clock_a.now().unwrap(); // (1000s, 0)
793        let tb = clock_b.now().unwrap(); // (1000s, 0) — same!
794
795        // Advance physical time on both
796        clock_a.physical_clock().advance(100 * MS);
797        clock_b.physical_clock().advance(100 * MS);
798
799        // A receives B's timestamp then generates event
800        clock_a.update(tb).unwrap();
801        let ta2 = clock_a.now().unwrap();
802
803        // Physical time advanced past stored wall_time, so logical resets
804        assert_eq!(ta2.wall_time(), 1000 * SECOND + 100 * MS);
805        assert_eq!(ta2.logical(), 0);
806        assert!(ta2 > ta);
807        assert!(ta2 > tb);
808    }
809
810    // ── Three-node ring sync ──────────────────────────────────────────
811
812    #[test]
813    fn three_node_ring_sync() {
814        let mc_a = ManualClock::new(1000 * SECOND);
815        let mc_b = ManualClock::new(1000 * SECOND + 10 * MS);
816        let mc_c = ManualClock::new(1000 * SECOND + 20 * MS);
817
818        let mut a = HlcClock::with_clock(mc_a);
819        let mut b = HlcClock::with_clock(mc_b);
820        let mut c = HlcClock::with_clock(mc_c);
821
822        // Each generates a local event
823        let ta = a.now().unwrap();
824        let tb = b.now().unwrap();
825        let tc = c.now().unwrap();
826
827        // A → B: B receives A's timestamp and generates event
828        b.update(ta).unwrap();
829        let tb2 = b.now().unwrap();
830        assert!(tb2 > tb);
831        assert!(tb2 > ta);
832
833        // B → C: C receives B's latest and generates event
834        c.update(tb2).unwrap();
835        let tc2 = c.now().unwrap();
836        assert!(tc2 > tc);
837        assert!(tc2 > tb2);
838
839        // C → A: A receives C's latest and generates event
840        a.update(tc2).unwrap();
841        let ta2 = a.now().unwrap();
842        assert!(ta2 > ta);
843        assert!(ta2 > tc2);
844    }
845
846    // ── Many events ───────────────────────────────────────────────────
847
848    #[test]
849    fn many_events_same_nanosecond() {
850        let mc = ManualClock::new(1000 * SECOND);
851        let mut clock = HlcClock::with_clock(mc);
852
853        for i in 0i32..1000 {
854            let t = clock.now().unwrap();
855            assert_eq!(t.logical(), i);
856        }
857    }
858
859    // ── Hash consistency ──────────────────────────────────────────────
860
861    #[test]
862    fn hash_consistency() {
863        use std::collections::HashSet;
864        let a = HlcTimestamp::new(100, 5);
865        let b = HlcTimestamp::new(100, 5);
866        let c = HlcTimestamp::new(100, 6);
867
868        let mut set = HashSet::new();
869        set.insert(a);
870        assert!(set.contains(&b));
871        assert!(!set.contains(&c));
872    }
873
874    // ── System clock integration ──────────────────────────────────────
875
876    #[test]
877    fn system_clock_hlc_integration() {
878        let mut clock = HlcClock::new();
879        let t1 = clock.now().unwrap();
880        let t2 = clock.now().unwrap();
881        assert!(t2 > t1);
882        assert!(!t1.is_zero());
883    }
884
885    // ── Nanosecond precision ──────────────────────────────────────────
886
887    #[test]
888    fn nanosecond_precision_preserved() {
889        let ts = HlcTimestamp::new(1_741_000_000_123_456_789, 0);
890        assert_eq!(ts.wall_time(), 1_741_000_000_123_456_789);
891
892        let bytes = ts.to_bytes();
893        let ts2 = HlcTimestamp::from_bytes(&bytes);
894        assert_eq!(ts2.wall_time(), 1_741_000_000_123_456_789);
895    }
896
897    #[test]
898    fn sub_millisecond_ordering() {
899        // Two events 1 microsecond apart
900        let a = HlcTimestamp::new(1000 * SECOND, 0);
901        let b = HlcTimestamp::new(1000 * SECOND + 1000, 0); // +1μs
902        assert!(a < b);
903
904        // Two events 1 nanosecond apart
905        let c = HlcTimestamp::new(1000 * SECOND, 0);
906        let d = HlcTimestamp::new(1000 * SECOND + 1, 0); // +1ns
907        assert!(c < d);
908    }
909
910    #[test]
911    fn i32_max_logical_counter() {
912        // i32::MAX = 2,147,483,647 — much more than u16's 65,535
913        let ts = HlcTimestamp::new(1000 * SECOND, i32::MAX);
914        assert_eq!(ts.logical(), i32::MAX);
915
916        let bytes = ts.to_bytes();
917        let ts2 = HlcTimestamp::from_bytes(&bytes);
918        assert_eq!(ts2.logical(), i32::MAX);
919    }
920
921    // ── Wire size constant ────────────────────────────────────────────
922
923    #[test]
924    fn wire_size_is_12() {
925        assert_eq!(HLC_TIMESTAMP_SIZE, 12);
926        assert_eq!(std::mem::size_of::<i64>() + std::mem::size_of::<i32>(), 12);
927    }
928}