Skip to main content

citadel_sync/
hlc.rs

1use std::sync::atomic::{AtomicI64, Ordering};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4/// Hybrid Logical Clock timestamp.
5///
6/// Layout (12 bytes):
7/// - `wall_time`: `i64` — nanoseconds since Unix epoch (true nanosecond precision)
8/// - `logical`: `i32` — counter for events within the same nanosecond
9///
10/// Comparison: wall_time first, then logical (total order).
11/// Big-endian byte serialization preserves comparison order for non-negative values.
12///
13/// Range:
14/// - wall_time: covers ~292 years from epoch (until year 2262)
15/// - logical: up to 2,147,483,647 events per nanosecond
16#[derive(Clone, Copy, PartialEq, Eq, Hash)]
17pub struct HlcTimestamp {
18    wall_time: i64,
19    logical: i32,
20}
21
22/// Wire size of an HlcTimestamp in bytes.
23pub const HLC_TIMESTAMP_SIZE: usize = 12;
24
25impl HlcTimestamp {
26    pub const ZERO: Self = Self {
27        wall_time: 0,
28        logical: 0,
29    };
30
31    /// Create from wall-clock time in nanoseconds and logical counter.
32    #[inline]
33    pub fn new(wall_time_ns: i64, logical: i32) -> Self {
34        Self {
35            wall_time: wall_time_ns,
36            logical,
37        }
38    }
39
40    /// Wall-clock time in nanoseconds since Unix epoch.
41    #[inline]
42    pub fn wall_time(&self) -> i64 {
43        self.wall_time
44    }
45
46    /// Logical counter.
47    #[inline]
48    pub fn logical(&self) -> i32 {
49        self.logical
50    }
51
52    /// Serialize to 12 bytes big-endian (preserves comparison order for non-negative values).
53    #[inline]
54    pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
55        let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
56        buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
57        buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
58        buf
59    }
60
61    /// Reconstruct from 12 bytes big-endian.
62    #[inline]
63    pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
64        Self {
65            wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
66            logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
67        }
68    }
69
70    /// Returns `true` if this is the zero/uninitialized timestamp.
71    #[inline]
72    pub fn is_zero(&self) -> bool {
73        self.wall_time == 0 && self.logical == 0
74    }
75}
76
77impl PartialOrd for HlcTimestamp {
78    #[inline]
79    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84impl Ord for HlcTimestamp {
85    #[inline]
86    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
87        self.wall_time
88            .cmp(&other.wall_time)
89            .then(self.logical.cmp(&other.logical))
90    }
91}
92
93impl std::fmt::Debug for HlcTimestamp {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
96    }
97}
98
99impl std::fmt::Display for HlcTimestamp {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        write!(f, "{}:{}", self.wall_time, self.logical)
102    }
103}
104
105/// Errors from HLC clock operations.
106#[derive(Debug, thiserror::Error)]
107pub enum ClockError {
108    #[error(
109        "clock drift exceeded: remote_wall_time={remote_ns}ns, \
110         physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
111    )]
112    ClockDriftExceeded {
113        remote_ns: i64,
114        physical_ns: i64,
115        max_drift_ns: i64,
116    },
117
118    #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
119    CounterOverflow,
120}
121
122/// Source of physical time for the HLC.
123///
124/// Abstracted as a trait to allow deterministic testing with [`ManualClock`].
125pub trait PhysicalClock: Send {
126    /// Current time in nanoseconds since Unix epoch.
127    fn now_ns(&self) -> i64;
128}
129
130/// Physical clock backed by the system clock.
131///
132/// On Linux: provides true nanosecond precision via `clock_gettime(CLOCK_REALTIME)`.
133/// On macOS: microsecond precision for wall clock.
134/// On Windows: ~100ns precision (system timer resolution).
135pub struct SystemClock;
136
137impl PhysicalClock for SystemClock {
138    fn now_ns(&self) -> i64 {
139        SystemTime::now()
140            .duration_since(UNIX_EPOCH)
141            .expect("system clock before UNIX epoch")
142            .as_nanos() as i64
143    }
144}
145
146/// Manually controlled clock for deterministic testing.
147pub struct ManualClock {
148    time_ns: AtomicI64,
149}
150
151impl ManualClock {
152    /// Create a manual clock starting at `initial_ns` nanoseconds.
153    pub fn new(initial_ns: i64) -> Self {
154        Self {
155            time_ns: AtomicI64::new(initial_ns),
156        }
157    }
158
159    /// Set the clock to an absolute time in nanoseconds.
160    pub fn set(&self, time_ns: i64) {
161        self.time_ns.store(time_ns, Ordering::SeqCst);
162    }
163
164    /// Advance the clock by `delta_ns` nanoseconds.
165    pub fn advance(&self, delta_ns: i64) {
166        self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
167    }
168}
169
170impl PhysicalClock for ManualClock {
171    fn now_ns(&self) -> i64 {
172        self.time_ns.load(Ordering::SeqCst)
173    }
174}
175
176/// One second in nanoseconds.
177const SECOND_NS: i64 = 1_000_000_000;
178
179/// Default maximum clock drift: 5 seconds.
180const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
181
182/// Hybrid Logical Clock state machine.
183///
184/// Two core operations:
185/// - [`now()`](HlcClock::now) — generate a timestamp for a local event
186/// - [`update()`](HlcClock::update) — merge a remote timestamp into local state
187///
188/// `now()` always returns a strictly increasing timestamp.
189/// `update()` advances the internal clock state without generating a new timestamp.
190/// Timestamps are only generated by local events; received timestamps only advance
191/// the clock.
192pub struct HlcClock<C: PhysicalClock = SystemClock> {
193    last: HlcTimestamp,
194    max_drift_ns: i64,
195    clock: C,
196}
197
198impl Default for HlcClock<SystemClock> {
199    fn default() -> Self {
200        Self::new()
201    }
202}
203
204impl HlcClock<SystemClock> {
205    /// Create an HLC clock with the system clock and default max drift (5s).
206    pub fn new() -> Self {
207        Self {
208            last: HlcTimestamp::ZERO,
209            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
210            clock: SystemClock,
211        }
212    }
213}
214
215impl<C: PhysicalClock> HlcClock<C> {
216    /// Create an HLC clock with a custom physical clock source.
217    pub fn with_clock(clock: C) -> Self {
218        Self {
219            last: HlcTimestamp::ZERO,
220            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
221            clock,
222        }
223    }
224
225    /// Set the maximum allowed drift in nanoseconds.
226    pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
227        self.max_drift_ns = max_drift_ns;
228    }
229
230    /// Seed the clock with a previously persisted timestamp.
231    ///
232    /// Call on startup to restore monotonicity after a restart.
233    /// The next `now()` will return a timestamp strictly greater than `ts`.
234    pub fn set_last(&mut self, ts: HlcTimestamp) {
235        self.last = ts;
236    }
237
238    /// Generate a timestamp for a local event.
239    ///
240    /// Guarantees:
241    /// - Monotonically increasing (never returns a value <= previous)
242    /// - Wall time tracks physical clock within `max_drift`
243    /// - Returns `ClockError::CounterOverflow` if counter saturates
244    /// - Returns `ClockError::ClockDriftExceeded` if wall time is too far
245    ///   ahead of physical time
246    pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
247        let pt = self.clock.now_ns();
248
249        let ts = if self.last.wall_time >= pt {
250            // Physical clock hasn't advanced past stored wall time.
251            // Increment logical counter.
252            let new_logical = self
253                .last
254                .logical
255                .checked_add(1)
256                .ok_or(ClockError::CounterOverflow)?;
257
258            // Check drift: stored wall_time vs physical
259            self.check_drift(self.last.wall_time, pt)?;
260
261            HlcTimestamp::new(self.last.wall_time, new_logical)
262        } else {
263            // Physical clock advanced — use it, reset logical to 0.
264            HlcTimestamp::new(pt, 0)
265        };
266
267        self.last = ts;
268        Ok(ts)
269    }
270
271    /// Merge a remote timestamp into local clock state.
272    ///
273    /// Advances the internal state so that subsequent `now()` calls
274    /// will produce timestamps strictly greater than the remote.
275    /// Does NOT generate a new timestamp.
276    ///
277    /// Returns `ClockError::ClockDriftExceeded` if the remote wall time
278    /// is more than `max_drift` ahead of the local physical clock.
279    pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
280        let pt = self.clock.now_ns();
281
282        // Reject remote timestamps that are too far ahead
283        if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
284            return Err(ClockError::ClockDriftExceeded {
285                remote_ns: remote.wall_time,
286                physical_ns: pt,
287                max_drift_ns: self.max_drift_ns,
288            });
289        }
290
291        if remote.wall_time > self.last.wall_time {
292            // Remote is ahead: adopt its wall time and logical
293            self.last = remote;
294        } else if remote.wall_time == self.last.wall_time {
295            // Same wall time: take the max logical
296            if remote.logical > self.last.logical {
297                self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
298            }
299        }
300        // If remote.wall_time < self.last.wall_time: do nothing
301
302        Ok(())
303    }
304
305    /// Returns the last generated or merged timestamp.
306    pub fn last_timestamp(&self) -> HlcTimestamp {
307        self.last
308    }
309
310    /// Returns a reference to the underlying physical clock.
311    pub fn physical_clock(&self) -> &C {
312        &self.clock
313    }
314
315    fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
316        if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
317            return Err(ClockError::ClockDriftExceeded {
318                remote_ns: wall_time_ns,
319                physical_ns,
320                max_drift_ns: self.max_drift_ns,
321            });
322        }
323        Ok(())
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    const SECOND: i64 = 1_000_000_000;
332    const MS: i64 = 1_000_000;
333
334    // ── HlcTimestamp basics ───────────────────────────────────────────
335
336    #[test]
337    fn new_and_accessors() {
338        let ts = HlcTimestamp::new(1_000_000_000, 42);
339        assert_eq!(ts.wall_time(), 1_000_000_000);
340        assert_eq!(ts.logical(), 42);
341    }
342
343    #[test]
344    fn zero_timestamp() {
345        let ts = HlcTimestamp::ZERO;
346        assert_eq!(ts.wall_time(), 0);
347        assert_eq!(ts.logical(), 0);
348        assert!(ts.is_zero());
349    }
350
351    #[test]
352    fn non_zero_is_not_zero() {
353        let ts = HlcTimestamp::new(1, 0);
354        assert!(!ts.is_zero());
355        let ts2 = HlcTimestamp::new(0, 1);
356        assert!(!ts2.is_zero());
357    }
358
359    #[test]
360    fn ordering_wall_time_dominates() {
361        let a = HlcTimestamp::new(100, i32::MAX);
362        let b = HlcTimestamp::new(101, 0);
363        assert!(a < b);
364    }
365
366    #[test]
367    fn ordering_logical_tiebreaks() {
368        let a = HlcTimestamp::new(100, 5);
369        let b = HlcTimestamp::new(100, 6);
370        assert!(a < b);
371    }
372
373    #[test]
374    fn ordering_equality() {
375        let a = HlcTimestamp::new(100, 5);
376        let b = HlcTimestamp::new(100, 5);
377        assert_eq!(a, b);
378        assert!(a <= b);
379        assert!(a >= b);
380    }
381
382    #[test]
383    fn ordering_negative_wall_time() {
384        let a = HlcTimestamp::new(-100, 0);
385        let b = HlcTimestamp::new(0, 0);
386        let c = HlcTimestamp::new(100, 0);
387        assert!(a < b);
388        assert!(b < c);
389    }
390
391    // ── Byte serialization ────────────────────────────────────────────
392
393    #[test]
394    fn bytes_roundtrip() {
395        let ts = HlcTimestamp::new(123_456_789_000_000, 1000);
396        let bytes = ts.to_bytes();
397        assert_eq!(bytes.len(), 12);
398        let ts2 = HlcTimestamp::from_bytes(&bytes);
399        assert_eq!(ts, ts2);
400    }
401
402    #[test]
403    fn bytes_roundtrip_zero() {
404        let ts = HlcTimestamp::ZERO;
405        let bytes = ts.to_bytes();
406        let ts2 = HlcTimestamp::from_bytes(&bytes);
407        assert_eq!(ts, ts2);
408    }
409
410    #[test]
411    fn bytes_roundtrip_max() {
412        let ts = HlcTimestamp::new(i64::MAX, i32::MAX);
413        let bytes = ts.to_bytes();
414        let ts2 = HlcTimestamp::from_bytes(&bytes);
415        assert_eq!(ts, ts2);
416    }
417
418    #[test]
419    fn bytes_preserve_order_for_positive_values() {
420        let a = HlcTimestamp::new(100, 5);
421        let b = HlcTimestamp::new(100, 6);
422        let c = HlcTimestamp::new(101, 0);
423
424        let ba = a.to_bytes();
425        let bb = b.to_bytes();
426        let bc = c.to_bytes();
427
428        assert!(ba < bb);
429        assert!(bb < bc);
430    }
431
432    #[test]
433    fn bytes_wall_time_is_big_endian() {
434        let ts = HlcTimestamp::new(0x0102_0304_0506_0708, 0);
435        let bytes = ts.to_bytes();
436        assert_eq!(
437            &bytes[0..8],
438            &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
439        );
440    }
441
442    #[test]
443    fn bytes_logical_is_big_endian() {
444        let ts = HlcTimestamp::new(0, 0x01020304);
445        let bytes = ts.to_bytes();
446        assert_eq!(&bytes[8..12], &[0x01, 0x02, 0x03, 0x04]);
447    }
448
449    // ── Display / Debug ───────────────────────────────────────────────
450
451    #[test]
452    fn display_format() {
453        let ts = HlcTimestamp::new(1_000_000_000, 5);
454        assert_eq!(format!("{ts}"), "1000000000:5");
455    }
456
457    #[test]
458    fn debug_format() {
459        let ts = HlcTimestamp::new(1_000_000_000, 5);
460        assert_eq!(format!("{ts:?}"), "HLC(1000000000ns:5)");
461    }
462
463    // ── ManualClock ───────────────────────────────────────────────────
464
465    #[test]
466    fn manual_clock_basic() {
467        let mc = ManualClock::new(100);
468        assert_eq!(mc.now_ns(), 100);
469        mc.advance(50);
470        assert_eq!(mc.now_ns(), 150);
471        mc.set(200);
472        assert_eq!(mc.now_ns(), 200);
473    }
474
475    // ── SystemClock ───────────────────────────────────────────────────
476
477    #[test]
478    fn system_clock_produces_reasonable_values() {
479        let sc = SystemClock;
480        let now = sc.now_ns();
481        // Should be after 2020-01-01 in nanoseconds
482        let jan_2020_ns: i64 = 1_577_836_800 * SECOND;
483        assert!(now > jan_2020_ns);
484        // Should fit in i64 (won't overflow until year 2262)
485        assert!(now > 0);
486    }
487
488    // ── HlcClock::now() ──────────────────────────────────────────────
489
490    #[test]
491    fn now_monotonic() {
492        let mc = ManualClock::new(1000 * SECOND);
493        let mut clock = HlcClock::with_clock(mc);
494
495        let t1 = clock.now().unwrap();
496        let t2 = clock.now().unwrap();
497        let t3 = clock.now().unwrap();
498
499        assert!(t1 < t2);
500        assert!(t2 < t3);
501    }
502
503    #[test]
504    fn now_same_physical_increments_logical() {
505        let mc = ManualClock::new(1000 * SECOND);
506        let mut clock = HlcClock::with_clock(mc);
507
508        let t1 = clock.now().unwrap();
509        let t2 = clock.now().unwrap();
510
511        assert_eq!(t1.wall_time(), 1000 * SECOND);
512        assert_eq!(t1.logical(), 0);
513        assert_eq!(t2.wall_time(), 1000 * SECOND);
514        assert_eq!(t2.logical(), 1);
515    }
516
517    #[test]
518    fn now_physical_advance_resets_logical() {
519        let mc = ManualClock::new(1000 * SECOND);
520        let mut clock = HlcClock::with_clock(mc);
521
522        let _t1 = clock.now().unwrap();
523        let _t2 = clock.now().unwrap();
524        assert_eq!(_t2.logical(), 1);
525
526        // Advance by 1 nanosecond
527        clock.physical_clock().advance(1);
528        let t3 = clock.now().unwrap();
529        assert_eq!(t3.wall_time(), 1000 * SECOND + 1);
530        assert_eq!(t3.logical(), 0);
531    }
532
533    #[test]
534    fn now_backward_jump_stays_at_high_watermark() {
535        let mc = ManualClock::new(1000 * SECOND);
536        let mut clock = HlcClock::with_clock(mc);
537
538        let t1 = clock.now().unwrap();
539        assert_eq!(t1.wall_time(), 1000 * SECOND);
540
541        // Jump backward by 2 seconds (within 5s default max drift)
542        clock.physical_clock().set(998 * SECOND);
543        let t2 = clock.now().unwrap();
544
545        // Should stay at high watermark, logical increments
546        assert_eq!(t2.wall_time(), 1000 * SECOND);
547        assert!(t2 > t1);
548    }
549
550    #[test]
551    fn now_counter_overflow() {
552        let mc = ManualClock::new(1000 * SECOND);
553        let mut clock = HlcClock::with_clock(mc);
554
555        // Seed near max logical
556        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX - 1));
557
558        // One more should work
559        let t = clock.now().unwrap();
560        assert_eq!(t.logical(), i32::MAX);
561
562        // Next should overflow
563        let err = clock.now().unwrap_err();
564        assert!(matches!(err, ClockError::CounterOverflow));
565    }
566
567    #[test]
568    fn now_counter_overflow_recovery_via_time_advance() {
569        let mc = ManualClock::new(1000 * SECOND);
570        let mut clock = HlcClock::with_clock(mc);
571
572        // Max out counter
573        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX));
574
575        // Overflow at same time
576        let err = clock.now().unwrap_err();
577        assert!(matches!(err, ClockError::CounterOverflow));
578
579        // Advance physical time — should recover
580        clock.physical_clock().advance(1);
581        let t = clock.now().unwrap();
582        assert_eq!(t.wall_time(), 1000 * SECOND + 1);
583        assert_eq!(t.logical(), 0);
584    }
585
586    #[test]
587    fn now_drift_protection() {
588        let mc = ManualClock::new(1000 * SECOND);
589        let mut clock = HlcClock::with_clock(mc);
590        clock.set_max_drift_ns(SECOND); // 1 second max drift
591
592        // Inject a far-future timestamp
593        clock.set_last(HlcTimestamp::new(1010 * SECOND, 0));
594
595        // now() should fail because wall_time is 10s ahead of physical
596        let err = clock.now().unwrap_err();
597        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
598    }
599
600    // ── HlcClock::update() ───────────────────────────────────────────
601
602    #[test]
603    fn update_remote_behind() {
604        let mc = ManualClock::new(1000 * SECOND);
605        let mut clock = HlcClock::with_clock(mc);
606
607        let _t1 = clock.now().unwrap(); // (1000s, 0)
608
609        // Remote is behind
610        let remote = HlcTimestamp::new(500 * SECOND, 99);
611        clock.update(remote).unwrap();
612
613        // State unchanged — local was ahead
614        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
615        assert_eq!(clock.last_timestamp().logical(), 0);
616    }
617
618    #[test]
619    fn update_remote_ahead() {
620        let mc = ManualClock::new(1000 * SECOND);
621        let mut clock = HlcClock::with_clock(mc);
622
623        let _t1 = clock.now().unwrap(); // (1000s, 0)
624
625        // Remote is ahead (within drift)
626        let remote = HlcTimestamp::new(1002 * SECOND, 5);
627        clock.update(remote).unwrap();
628
629        // State adopted remote
630        assert_eq!(clock.last_timestamp().wall_time(), 1002 * SECOND);
631        assert_eq!(clock.last_timestamp().logical(), 5);
632
633        // Next now() is strictly after remote
634        let t2 = clock.now().unwrap();
635        assert!(t2 > remote);
636        assert_eq!(t2.wall_time(), 1002 * SECOND);
637        assert_eq!(t2.logical(), 6);
638    }
639
640    #[test]
641    fn update_remote_same_wall_time_higher_logical() {
642        let mc = ManualClock::new(1000 * SECOND);
643        let mut clock = HlcClock::with_clock(mc);
644
645        let _t1 = clock.now().unwrap(); // (1000s, 0)
646
647        // Remote has same wall time but higher logical
648        let remote = HlcTimestamp::new(1000 * SECOND, 10);
649        clock.update(remote).unwrap();
650
651        // State adopted higher logical
652        assert_eq!(clock.last_timestamp().logical(), 10);
653
654        // Next now() increments from merged state
655        let t2 = clock.now().unwrap();
656        assert_eq!(t2.logical(), 11);
657    }
658
659    #[test]
660    fn update_remote_same_wall_time_lower_logical() {
661        let mc = ManualClock::new(1000 * SECOND);
662        let mut clock = HlcClock::with_clock(mc);
663
664        // Generate several events
665        for _ in 0..5 {
666            clock.now().unwrap();
667        }
668        // State: (1000s, 4)
669
670        // Remote has same wall time but lower logical
671        let remote = HlcTimestamp::new(1000 * SECOND, 2);
672        clock.update(remote).unwrap();
673
674        // State unchanged — local logical was higher
675        assert_eq!(clock.last_timestamp().logical(), 4);
676    }
677
678    #[test]
679    fn update_drift_exceeded() {
680        let mc = ManualClock::new(1000 * SECOND);
681        let mut clock = HlcClock::with_clock(mc);
682        clock.set_max_drift_ns(SECOND); // 1 second
683
684        // Remote is 10 seconds ahead
685        let remote = HlcTimestamp::new(1010 * SECOND, 0);
686        let err = clock.update(remote).unwrap_err();
687        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
688
689        // State unchanged after rejection
690        assert_eq!(clock.last_timestamp(), HlcTimestamp::ZERO);
691    }
692
693    #[test]
694    fn update_drift_boundary_exact() {
695        let mc = ManualClock::new(1000 * SECOND);
696        let mut clock = HlcClock::with_clock(mc);
697        clock.set_max_drift_ns(SECOND); // 1 second
698
699        // Remote exactly at boundary: should succeed
700        let remote = HlcTimestamp::new(1001 * SECOND, 0);
701        clock.update(remote).unwrap();
702        assert_eq!(clock.last_timestamp().wall_time(), 1001 * SECOND);
703
704        // One nanosecond past boundary: should fail
705        let mc2 = ManualClock::new(1000 * SECOND);
706        let mut clock2 = HlcClock::with_clock(mc2);
707        clock2.set_max_drift_ns(SECOND);
708
709        let remote2 = HlcTimestamp::new(1001 * SECOND + 1, 0);
710        let err = clock2.update(remote2).unwrap_err();
711        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
712    }
713
714    #[test]
715    fn update_zero_timestamp_is_noop() {
716        let mc = ManualClock::new(1000 * SECOND);
717        let mut clock = HlcClock::with_clock(mc);
718        let _t1 = clock.now().unwrap();
719
720        clock.update(HlcTimestamp::ZERO).unwrap();
721
722        // State unchanged
723        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
724    }
725
726    // ── set_last (persistence) ────────────────────────────────────────
727
728    #[test]
729    fn set_last_restores_monotonicity() {
730        let mc = ManualClock::new(1000 * SECOND);
731        let mut clock = HlcClock::with_clock(mc);
732
733        let persisted = HlcTimestamp::new(1000 * SECOND, 50);
734        clock.set_last(persisted);
735
736        let t1 = clock.now().unwrap();
737        assert!(t1 > persisted);
738        assert_eq!(t1.logical(), 51);
739    }
740
741    // ── Two clocks syncing ────────────────────────────────────────────
742
743    #[test]
744    fn two_clocks_converge() {
745        let mc_a = ManualClock::new(1000 * SECOND);
746        let mc_b = ManualClock::new(1000 * SECOND + 50 * MS);
747
748        let mut clock_a = HlcClock::with_clock(mc_a);
749        let mut clock_b = HlcClock::with_clock(mc_b);
750
751        // A generates event
752        let ta1 = clock_a.now().unwrap();
753        assert_eq!(ta1.wall_time(), 1000 * SECOND);
754
755        // B generates event
756        let tb1 = clock_b.now().unwrap();
757        assert_eq!(tb1.wall_time(), 1000 * SECOND + 50 * MS);
758
759        // A receives B's timestamp via update, then generates its own
760        clock_a.update(tb1).unwrap();
761        let ta2 = clock_a.now().unwrap();
762        // A should advance to B's wall time with incremented logical
763        assert_eq!(ta2.wall_time(), 1000 * SECOND + 50 * MS);
764        assert_eq!(ta2.logical(), 1);
765
766        // B receives A's earlier timestamp — already behind, no effect
767        clock_b.update(ta1).unwrap();
768        let tb2 = clock_b.now().unwrap();
769        assert_eq!(tb2.wall_time(), 1000 * SECOND + 50 * MS);
770        assert!(tb2 > tb1);
771    }
772
773    #[test]
774    fn causal_ordering_preserved() {
775        let mc_a = ManualClock::new(1000 * SECOND);
776        let mc_b = ManualClock::new(1000 * SECOND);
777
778        let mut clock_a = HlcClock::with_clock(mc_a);
779        let mut clock_b = HlcClock::with_clock(mc_b);
780
781        // A: write X
782        let ta1 = clock_a.now().unwrap();
783
784        // B receives A's message (sees write X), then writes Y
785        clock_b.update(ta1).unwrap();
786        let tb1 = clock_b.now().unwrap();
787
788        // Causal chain: ta1 (write X) < tb1 (write Y after seeing X)
789        assert!(ta1 < tb1);
790    }
791
792    #[test]
793    fn physical_time_advance_during_sync() {
794        let mc_a = ManualClock::new(1000 * SECOND);
795        let mc_b = ManualClock::new(1000 * SECOND);
796
797        let mut clock_a = HlcClock::with_clock(mc_a);
798        let mut clock_b = HlcClock::with_clock(mc_b);
799
800        // Both generate local events at same physical time
801        let ta = clock_a.now().unwrap(); // (1000s, 0)
802        let tb = clock_b.now().unwrap(); // (1000s, 0) — same!
803
804        // Advance physical time on both
805        clock_a.physical_clock().advance(100 * MS);
806        clock_b.physical_clock().advance(100 * MS);
807
808        // A receives B's timestamp then generates event
809        clock_a.update(tb).unwrap();
810        let ta2 = clock_a.now().unwrap();
811
812        // Physical time advanced past stored wall_time, so logical resets
813        assert_eq!(ta2.wall_time(), 1000 * SECOND + 100 * MS);
814        assert_eq!(ta2.logical(), 0);
815        assert!(ta2 > ta);
816        assert!(ta2 > tb);
817    }
818
819    // ── Three-node ring sync ──────────────────────────────────────────
820
821    #[test]
822    fn three_node_ring_sync() {
823        let mc_a = ManualClock::new(1000 * SECOND);
824        let mc_b = ManualClock::new(1000 * SECOND + 10 * MS);
825        let mc_c = ManualClock::new(1000 * SECOND + 20 * MS);
826
827        let mut a = HlcClock::with_clock(mc_a);
828        let mut b = HlcClock::with_clock(mc_b);
829        let mut c = HlcClock::with_clock(mc_c);
830
831        // Each generates a local event
832        let ta = a.now().unwrap();
833        let tb = b.now().unwrap();
834        let tc = c.now().unwrap();
835
836        // A → B: B receives A's timestamp and generates event
837        b.update(ta).unwrap();
838        let tb2 = b.now().unwrap();
839        assert!(tb2 > tb);
840        assert!(tb2 > ta);
841
842        // B → C: C receives B's latest and generates event
843        c.update(tb2).unwrap();
844        let tc2 = c.now().unwrap();
845        assert!(tc2 > tc);
846        assert!(tc2 > tb2);
847
848        // C → A: A receives C's latest and generates event
849        a.update(tc2).unwrap();
850        let ta2 = a.now().unwrap();
851        assert!(ta2 > ta);
852        assert!(ta2 > tc2);
853    }
854
855    // ── Many events ───────────────────────────────────────────────────
856
857    #[test]
858    fn many_events_same_nanosecond() {
859        let mc = ManualClock::new(1000 * SECOND);
860        let mut clock = HlcClock::with_clock(mc);
861
862        for i in 0i32..1000 {
863            let t = clock.now().unwrap();
864            assert_eq!(t.logical(), i);
865        }
866    }
867
868    // ── Hash consistency ──────────────────────────────────────────────
869
870    #[test]
871    fn hash_consistency() {
872        use std::collections::HashSet;
873        let a = HlcTimestamp::new(100, 5);
874        let b = HlcTimestamp::new(100, 5);
875        let c = HlcTimestamp::new(100, 6);
876
877        let mut set = HashSet::new();
878        set.insert(a);
879        assert!(set.contains(&b));
880        assert!(!set.contains(&c));
881    }
882
883    // ── System clock integration ──────────────────────────────────────
884
885    #[test]
886    fn system_clock_hlc_integration() {
887        let mut clock = HlcClock::new();
888        let t1 = clock.now().unwrap();
889        let t2 = clock.now().unwrap();
890        assert!(t2 > t1);
891        assert!(!t1.is_zero());
892    }
893
894    // ── Nanosecond precision ──────────────────────────────────────────
895
896    #[test]
897    fn nanosecond_precision_preserved() {
898        let ts = HlcTimestamp::new(1_741_000_000_123_456_789, 0);
899        assert_eq!(ts.wall_time(), 1_741_000_000_123_456_789);
900
901        let bytes = ts.to_bytes();
902        let ts2 = HlcTimestamp::from_bytes(&bytes);
903        assert_eq!(ts2.wall_time(), 1_741_000_000_123_456_789);
904    }
905
906    #[test]
907    fn sub_millisecond_ordering() {
908        // Two events 1 microsecond apart
909        let a = HlcTimestamp::new(1000 * SECOND, 0);
910        let b = HlcTimestamp::new(1000 * SECOND + 1000, 0); // +1μs
911        assert!(a < b);
912
913        // Two events 1 nanosecond apart
914        let c = HlcTimestamp::new(1000 * SECOND, 0);
915        let d = HlcTimestamp::new(1000 * SECOND + 1, 0); // +1ns
916        assert!(c < d);
917    }
918
919    #[test]
920    fn i32_max_logical_counter() {
921        // i32::MAX = 2,147,483,647 — much more than u16's 65,535
922        let ts = HlcTimestamp::new(1000 * SECOND, i32::MAX);
923        assert_eq!(ts.logical(), i32::MAX);
924
925        let bytes = ts.to_bytes();
926        let ts2 = HlcTimestamp::from_bytes(&bytes);
927        assert_eq!(ts2.logical(), i32::MAX);
928    }
929
930    // ── Wire size constant ────────────────────────────────────────────
931
932    #[test]
933    fn wire_size_is_12() {
934        assert_eq!(HLC_TIMESTAMP_SIZE, 12);
935        assert_eq!(std::mem::size_of::<i64>() + std::mem::size_of::<i32>(), 12);
936    }
937}