Skip to main content

citadel_sync/
hlc.rs

1use std::sync::atomic::{AtomicI64, Ordering};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4/// Hybrid Logical Clock timestamp.
5///
6/// Layout (12 bytes):
7/// - `wall_time`: `i64` - nanoseconds since Unix epoch (true nanosecond precision)
8/// - `logical`: `i32` - counter for events within the same nanosecond
9///
10/// Comparison: wall_time first, then logical (total order).
11/// Big-endian byte serialization preserves comparison order for non-negative values.
12///
13/// Range:
14/// - wall_time: covers ~292 years from epoch (until year 2262)
15/// - logical: up to 2,147,483,647 events per nanosecond
16#[derive(Clone, Copy, PartialEq, Eq, Hash)]
17pub struct HlcTimestamp {
18    wall_time: i64,
19    logical: i32,
20}
21
22pub const HLC_TIMESTAMP_SIZE: usize = 12;
23
24impl HlcTimestamp {
25    pub const ZERO: Self = Self {
26        wall_time: 0,
27        logical: 0,
28    };
29
30    #[inline]
31    pub fn new(wall_time_ns: i64, logical: i32) -> Self {
32        Self {
33            wall_time: wall_time_ns,
34            logical,
35        }
36    }
37
38    #[inline]
39    pub fn wall_time(&self) -> i64 {
40        self.wall_time
41    }
42
43    #[inline]
44    pub fn logical(&self) -> i32 {
45        self.logical
46    }
47
48    /// Big-endian serialization preserves comparison order for non-negative values.
49    #[inline]
50    pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
51        let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
52        buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
53        buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
54        buf
55    }
56
57    #[inline]
58    pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
59        Self {
60            wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
61            logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
62        }
63    }
64
65    #[inline]
66    pub fn is_zero(&self) -> bool {
67        self.wall_time == 0 && self.logical == 0
68    }
69}
70
71impl PartialOrd for HlcTimestamp {
72    #[inline]
73    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for HlcTimestamp {
79    #[inline]
80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81        self.wall_time
82            .cmp(&other.wall_time)
83            .then(self.logical.cmp(&other.logical))
84    }
85}
86
87impl std::fmt::Debug for HlcTimestamp {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
90    }
91}
92
93impl std::fmt::Display for HlcTimestamp {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(f, "{}:{}", self.wall_time, self.logical)
96    }
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum ClockError {
101    #[error(
102        "clock drift exceeded: remote_wall_time={remote_ns}ns, \
103         physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
104    )]
105    ClockDriftExceeded {
106        remote_ns: i64,
107        physical_ns: i64,
108        max_drift_ns: i64,
109    },
110
111    #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
112    CounterOverflow,
113}
114
115/// Source of physical time for the HLC.
116///
117/// Abstracted as a trait to allow deterministic testing with [`ManualClock`].
118pub trait PhysicalClock: Send {
119    /// Current time in nanoseconds since Unix epoch.
120    fn now_ns(&self) -> i64;
121}
122
123/// Physical clock backed by the system clock.
124///
125/// On Linux: provides true nanosecond precision via `clock_gettime(CLOCK_REALTIME)`.
126/// On macOS: microsecond precision for wall clock.
127/// On Windows: ~100ns precision (system timer resolution).
128pub struct SystemClock;
129
130impl PhysicalClock for SystemClock {
131    fn now_ns(&self) -> i64 {
132        SystemTime::now()
133            .duration_since(UNIX_EPOCH)
134            .expect("system clock before UNIX epoch")
135            .as_nanos() as i64
136    }
137}
138
139pub struct ManualClock {
140    time_ns: AtomicI64,
141}
142
143impl ManualClock {
144    pub fn new(initial_ns: i64) -> Self {
145        Self {
146            time_ns: AtomicI64::new(initial_ns),
147        }
148    }
149
150    pub fn set(&self, time_ns: i64) {
151        self.time_ns.store(time_ns, Ordering::SeqCst);
152    }
153
154    pub fn advance(&self, delta_ns: i64) {
155        self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
156    }
157}
158
159impl PhysicalClock for ManualClock {
160    fn now_ns(&self) -> i64 {
161        self.time_ns.load(Ordering::SeqCst)
162    }
163}
164
165const SECOND_NS: i64 = 1_000_000_000;
166const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
167
168/// Hybrid Logical Clock state machine.
169///
170/// Two core operations:
171/// - [`now()`](HlcClock::now) - generate a timestamp for a local event
172/// - [`update()`](HlcClock::update) - merge a remote timestamp into local state
173///
174/// `now()` always returns a strictly increasing timestamp.
175/// `update()` advances the internal clock state without generating a new timestamp.
176/// Timestamps are only generated by local events; received timestamps only advance
177/// the clock.
178pub struct HlcClock<C: PhysicalClock = SystemClock> {
179    last: HlcTimestamp,
180    max_drift_ns: i64,
181    clock: C,
182}
183
184impl Default for HlcClock<SystemClock> {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl HlcClock<SystemClock> {
191    pub fn new() -> Self {
192        Self {
193            last: HlcTimestamp::ZERO,
194            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
195            clock: SystemClock,
196        }
197    }
198}
199
200impl<C: PhysicalClock> HlcClock<C> {
201    pub fn with_clock(clock: C) -> Self {
202        Self {
203            last: HlcTimestamp::ZERO,
204            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
205            clock,
206        }
207    }
208
209    pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
210        self.max_drift_ns = max_drift_ns;
211    }
212
213    /// Call on startup to restore monotonicity after a restart.
214    pub fn set_last(&mut self, ts: HlcTimestamp) {
215        self.last = ts;
216    }
217
218    /// Generate a monotonically increasing timestamp for a local event.
219    pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
220        let pt = self.clock.now_ns();
221
222        let ts = if self.last.wall_time >= pt {
223            // Physical clock hasn't advanced past stored wall time.
224            // Increment logical counter.
225            let new_logical = self
226                .last
227                .logical
228                .checked_add(1)
229                .ok_or(ClockError::CounterOverflow)?;
230
231            // Check drift: stored wall_time vs physical
232            self.check_drift(self.last.wall_time, pt)?;
233
234            HlcTimestamp::new(self.last.wall_time, new_logical)
235        } else {
236            // Physical clock advanced - use it, reset logical to 0.
237            HlcTimestamp::new(pt, 0)
238        };
239
240        self.last = ts;
241        Ok(ts)
242    }
243
244    /// Merge a remote timestamp into local state (does not generate a new timestamp).
245    pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
246        let pt = self.clock.now_ns();
247
248        // Reject remote timestamps that are too far ahead
249        if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
250            return Err(ClockError::ClockDriftExceeded {
251                remote_ns: remote.wall_time,
252                physical_ns: pt,
253                max_drift_ns: self.max_drift_ns,
254            });
255        }
256
257        if remote.wall_time > self.last.wall_time {
258            // Remote is ahead: adopt its wall time and logical
259            self.last = remote;
260        } else if remote.wall_time == self.last.wall_time {
261            // Same wall time: take the max logical
262            if remote.logical > self.last.logical {
263                self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
264            }
265        }
266        // If remote.wall_time < self.last.wall_time: do nothing
267
268        Ok(())
269    }
270
271    pub fn last_timestamp(&self) -> HlcTimestamp {
272        self.last
273    }
274
275    pub fn physical_clock(&self) -> &C {
276        &self.clock
277    }
278
279    fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
280        if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
281            return Err(ClockError::ClockDriftExceeded {
282                remote_ns: wall_time_ns,
283                physical_ns,
284                max_drift_ns: self.max_drift_ns,
285            });
286        }
287        Ok(())
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    const SECOND: i64 = 1_000_000_000;
296    const MS: i64 = 1_000_000;
297
298    // ── HlcTimestamp basics ───────────────────────────────────────────
299
300    #[test]
301    fn new_and_accessors() {
302        let ts = HlcTimestamp::new(1_000_000_000, 42);
303        assert_eq!(ts.wall_time(), 1_000_000_000);
304        assert_eq!(ts.logical(), 42);
305    }
306
307    #[test]
308    fn zero_timestamp() {
309        let ts = HlcTimestamp::ZERO;
310        assert_eq!(ts.wall_time(), 0);
311        assert_eq!(ts.logical(), 0);
312        assert!(ts.is_zero());
313    }
314
315    #[test]
316    fn non_zero_is_not_zero() {
317        let ts = HlcTimestamp::new(1, 0);
318        assert!(!ts.is_zero());
319        let ts2 = HlcTimestamp::new(0, 1);
320        assert!(!ts2.is_zero());
321    }
322
323    #[test]
324    fn ordering_wall_time_dominates() {
325        let a = HlcTimestamp::new(100, i32::MAX);
326        let b = HlcTimestamp::new(101, 0);
327        assert!(a < b);
328    }
329
330    #[test]
331    fn ordering_logical_tiebreaks() {
332        let a = HlcTimestamp::new(100, 5);
333        let b = HlcTimestamp::new(100, 6);
334        assert!(a < b);
335    }
336
337    #[test]
338    fn ordering_equality() {
339        let a = HlcTimestamp::new(100, 5);
340        let b = HlcTimestamp::new(100, 5);
341        assert_eq!(a, b);
342        assert!(a <= b);
343        assert!(a >= b);
344    }
345
346    #[test]
347    fn ordering_negative_wall_time() {
348        let a = HlcTimestamp::new(-100, 0);
349        let b = HlcTimestamp::new(0, 0);
350        let c = HlcTimestamp::new(100, 0);
351        assert!(a < b);
352        assert!(b < c);
353    }
354
355    // ── Byte serialization ────────────────────────────────────────────
356
357    #[test]
358    fn bytes_roundtrip() {
359        let ts = HlcTimestamp::new(123_456_789_000_000, 1000);
360        let bytes = ts.to_bytes();
361        assert_eq!(bytes.len(), 12);
362        let ts2 = HlcTimestamp::from_bytes(&bytes);
363        assert_eq!(ts, ts2);
364    }
365
366    #[test]
367    fn bytes_roundtrip_zero() {
368        let ts = HlcTimestamp::ZERO;
369        let bytes = ts.to_bytes();
370        let ts2 = HlcTimestamp::from_bytes(&bytes);
371        assert_eq!(ts, ts2);
372    }
373
374    #[test]
375    fn bytes_roundtrip_max() {
376        let ts = HlcTimestamp::new(i64::MAX, i32::MAX);
377        let bytes = ts.to_bytes();
378        let ts2 = HlcTimestamp::from_bytes(&bytes);
379        assert_eq!(ts, ts2);
380    }
381
382    #[test]
383    fn bytes_preserve_order_for_positive_values() {
384        let a = HlcTimestamp::new(100, 5);
385        let b = HlcTimestamp::new(100, 6);
386        let c = HlcTimestamp::new(101, 0);
387
388        let ba = a.to_bytes();
389        let bb = b.to_bytes();
390        let bc = c.to_bytes();
391
392        assert!(ba < bb);
393        assert!(bb < bc);
394    }
395
396    #[test]
397    fn bytes_wall_time_is_big_endian() {
398        let ts = HlcTimestamp::new(0x0102_0304_0506_0708, 0);
399        let bytes = ts.to_bytes();
400        assert_eq!(
401            &bytes[0..8],
402            &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
403        );
404    }
405
406    #[test]
407    fn bytes_logical_is_big_endian() {
408        let ts = HlcTimestamp::new(0, 0x01020304);
409        let bytes = ts.to_bytes();
410        assert_eq!(&bytes[8..12], &[0x01, 0x02, 0x03, 0x04]);
411    }
412
413    // ── Display / Debug ───────────────────────────────────────────────
414
415    #[test]
416    fn display_format() {
417        let ts = HlcTimestamp::new(1_000_000_000, 5);
418        assert_eq!(format!("{ts}"), "1000000000:5");
419    }
420
421    #[test]
422    fn debug_format() {
423        let ts = HlcTimestamp::new(1_000_000_000, 5);
424        assert_eq!(format!("{ts:?}"), "HLC(1000000000ns:5)");
425    }
426
427    // ── ManualClock ───────────────────────────────────────────────────
428
429    #[test]
430    fn manual_clock_basic() {
431        let mc = ManualClock::new(100);
432        assert_eq!(mc.now_ns(), 100);
433        mc.advance(50);
434        assert_eq!(mc.now_ns(), 150);
435        mc.set(200);
436        assert_eq!(mc.now_ns(), 200);
437    }
438
439    // ── SystemClock ───────────────────────────────────────────────────
440
441    #[test]
442    fn system_clock_produces_reasonable_values() {
443        let sc = SystemClock;
444        let now = sc.now_ns();
445        // Should be after 2020-01-01 in nanoseconds
446        let jan_2020_ns: i64 = 1_577_836_800 * SECOND;
447        assert!(now > jan_2020_ns);
448        // Should fit in i64 (won't overflow until year 2262)
449        assert!(now > 0);
450    }
451
452    // ── HlcClock::now() ──────────────────────────────────────────────
453
454    #[test]
455    fn now_monotonic() {
456        let mc = ManualClock::new(1000 * SECOND);
457        let mut clock = HlcClock::with_clock(mc);
458
459        let t1 = clock.now().unwrap();
460        let t2 = clock.now().unwrap();
461        let t3 = clock.now().unwrap();
462
463        assert!(t1 < t2);
464        assert!(t2 < t3);
465    }
466
467    #[test]
468    fn now_same_physical_increments_logical() {
469        let mc = ManualClock::new(1000 * SECOND);
470        let mut clock = HlcClock::with_clock(mc);
471
472        let t1 = clock.now().unwrap();
473        let t2 = clock.now().unwrap();
474
475        assert_eq!(t1.wall_time(), 1000 * SECOND);
476        assert_eq!(t1.logical(), 0);
477        assert_eq!(t2.wall_time(), 1000 * SECOND);
478        assert_eq!(t2.logical(), 1);
479    }
480
481    #[test]
482    fn now_physical_advance_resets_logical() {
483        let mc = ManualClock::new(1000 * SECOND);
484        let mut clock = HlcClock::with_clock(mc);
485
486        let _t1 = clock.now().unwrap();
487        let _t2 = clock.now().unwrap();
488        assert_eq!(_t2.logical(), 1);
489
490        // Advance by 1 nanosecond
491        clock.physical_clock().advance(1);
492        let t3 = clock.now().unwrap();
493        assert_eq!(t3.wall_time(), 1000 * SECOND + 1);
494        assert_eq!(t3.logical(), 0);
495    }
496
497    #[test]
498    fn now_backward_jump_stays_at_high_watermark() {
499        let mc = ManualClock::new(1000 * SECOND);
500        let mut clock = HlcClock::with_clock(mc);
501
502        let t1 = clock.now().unwrap();
503        assert_eq!(t1.wall_time(), 1000 * SECOND);
504
505        // Jump backward by 2 seconds (within 5s default max drift)
506        clock.physical_clock().set(998 * SECOND);
507        let t2 = clock.now().unwrap();
508
509        // Should stay at high watermark, logical increments
510        assert_eq!(t2.wall_time(), 1000 * SECOND);
511        assert!(t2 > t1);
512    }
513
514    #[test]
515    fn now_counter_overflow() {
516        let mc = ManualClock::new(1000 * SECOND);
517        let mut clock = HlcClock::with_clock(mc);
518
519        // Seed near max logical
520        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX - 1));
521
522        // One more should work
523        let t = clock.now().unwrap();
524        assert_eq!(t.logical(), i32::MAX);
525
526        // Next should overflow
527        let err = clock.now().unwrap_err();
528        assert!(matches!(err, ClockError::CounterOverflow));
529    }
530
531    #[test]
532    fn now_counter_overflow_recovery_via_time_advance() {
533        let mc = ManualClock::new(1000 * SECOND);
534        let mut clock = HlcClock::with_clock(mc);
535
536        // Max out counter
537        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX));
538
539        // Overflow at same time
540        let err = clock.now().unwrap_err();
541        assert!(matches!(err, ClockError::CounterOverflow));
542
543        // Advance physical time - should recover
544        clock.physical_clock().advance(1);
545        let t = clock.now().unwrap();
546        assert_eq!(t.wall_time(), 1000 * SECOND + 1);
547        assert_eq!(t.logical(), 0);
548    }
549
550    #[test]
551    fn now_drift_protection() {
552        let mc = ManualClock::new(1000 * SECOND);
553        let mut clock = HlcClock::with_clock(mc);
554        clock.set_max_drift_ns(SECOND); // 1 second max drift
555
556        // Inject a far-future timestamp
557        clock.set_last(HlcTimestamp::new(1010 * SECOND, 0));
558
559        // now() should fail because wall_time is 10s ahead of physical
560        let err = clock.now().unwrap_err();
561        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
562    }
563
564    // ── HlcClock::update() ───────────────────────────────────────────
565
566    #[test]
567    fn update_remote_behind() {
568        let mc = ManualClock::new(1000 * SECOND);
569        let mut clock = HlcClock::with_clock(mc);
570
571        let _t1 = clock.now().unwrap(); // (1000s, 0)
572
573        // Remote is behind
574        let remote = HlcTimestamp::new(500 * SECOND, 99);
575        clock.update(remote).unwrap();
576
577        // State unchanged - local was ahead
578        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
579        assert_eq!(clock.last_timestamp().logical(), 0);
580    }
581
582    #[test]
583    fn update_remote_ahead() {
584        let mc = ManualClock::new(1000 * SECOND);
585        let mut clock = HlcClock::with_clock(mc);
586
587        let _t1 = clock.now().unwrap(); // (1000s, 0)
588
589        // Remote is ahead (within drift)
590        let remote = HlcTimestamp::new(1002 * SECOND, 5);
591        clock.update(remote).unwrap();
592
593        // State adopted remote
594        assert_eq!(clock.last_timestamp().wall_time(), 1002 * SECOND);
595        assert_eq!(clock.last_timestamp().logical(), 5);
596
597        // Next now() is strictly after remote
598        let t2 = clock.now().unwrap();
599        assert!(t2 > remote);
600        assert_eq!(t2.wall_time(), 1002 * SECOND);
601        assert_eq!(t2.logical(), 6);
602    }
603
604    #[test]
605    fn update_remote_same_wall_time_higher_logical() {
606        let mc = ManualClock::new(1000 * SECOND);
607        let mut clock = HlcClock::with_clock(mc);
608
609        let _t1 = clock.now().unwrap(); // (1000s, 0)
610
611        // Remote has same wall time but higher logical
612        let remote = HlcTimestamp::new(1000 * SECOND, 10);
613        clock.update(remote).unwrap();
614
615        // State adopted higher logical
616        assert_eq!(clock.last_timestamp().logical(), 10);
617
618        // Next now() increments from merged state
619        let t2 = clock.now().unwrap();
620        assert_eq!(t2.logical(), 11);
621    }
622
623    #[test]
624    fn update_remote_same_wall_time_lower_logical() {
625        let mc = ManualClock::new(1000 * SECOND);
626        let mut clock = HlcClock::with_clock(mc);
627
628        // Generate several events
629        for _ in 0..5 {
630            clock.now().unwrap();
631        }
632        // State: (1000s, 4)
633
634        // Remote has same wall time but lower logical
635        let remote = HlcTimestamp::new(1000 * SECOND, 2);
636        clock.update(remote).unwrap();
637
638        // State unchanged - local logical was higher
639        assert_eq!(clock.last_timestamp().logical(), 4);
640    }
641
642    #[test]
643    fn update_drift_exceeded() {
644        let mc = ManualClock::new(1000 * SECOND);
645        let mut clock = HlcClock::with_clock(mc);
646        clock.set_max_drift_ns(SECOND); // 1 second
647
648        // Remote is 10 seconds ahead
649        let remote = HlcTimestamp::new(1010 * SECOND, 0);
650        let err = clock.update(remote).unwrap_err();
651        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
652
653        // State unchanged after rejection
654        assert_eq!(clock.last_timestamp(), HlcTimestamp::ZERO);
655    }
656
657    #[test]
658    fn update_drift_boundary_exact() {
659        let mc = ManualClock::new(1000 * SECOND);
660        let mut clock = HlcClock::with_clock(mc);
661        clock.set_max_drift_ns(SECOND); // 1 second
662
663        // Remote exactly at boundary: should succeed
664        let remote = HlcTimestamp::new(1001 * SECOND, 0);
665        clock.update(remote).unwrap();
666        assert_eq!(clock.last_timestamp().wall_time(), 1001 * SECOND);
667
668        // One nanosecond past boundary: should fail
669        let mc2 = ManualClock::new(1000 * SECOND);
670        let mut clock2 = HlcClock::with_clock(mc2);
671        clock2.set_max_drift_ns(SECOND);
672
673        let remote2 = HlcTimestamp::new(1001 * SECOND + 1, 0);
674        let err = clock2.update(remote2).unwrap_err();
675        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
676    }
677
678    #[test]
679    fn update_zero_timestamp_is_noop() {
680        let mc = ManualClock::new(1000 * SECOND);
681        let mut clock = HlcClock::with_clock(mc);
682        let _t1 = clock.now().unwrap();
683
684        clock.update(HlcTimestamp::ZERO).unwrap();
685
686        // State unchanged
687        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
688    }
689
690    // ── set_last (persistence) ────────────────────────────────────────
691
692    #[test]
693    fn set_last_restores_monotonicity() {
694        let mc = ManualClock::new(1000 * SECOND);
695        let mut clock = HlcClock::with_clock(mc);
696
697        let persisted = HlcTimestamp::new(1000 * SECOND, 50);
698        clock.set_last(persisted);
699
700        let t1 = clock.now().unwrap();
701        assert!(t1 > persisted);
702        assert_eq!(t1.logical(), 51);
703    }
704
705    // ── Two clocks syncing ────────────────────────────────────────────
706
707    #[test]
708    fn two_clocks_converge() {
709        let mc_a = ManualClock::new(1000 * SECOND);
710        let mc_b = ManualClock::new(1000 * SECOND + 50 * MS);
711
712        let mut clock_a = HlcClock::with_clock(mc_a);
713        let mut clock_b = HlcClock::with_clock(mc_b);
714
715        // A generates event
716        let ta1 = clock_a.now().unwrap();
717        assert_eq!(ta1.wall_time(), 1000 * SECOND);
718
719        // B generates event
720        let tb1 = clock_b.now().unwrap();
721        assert_eq!(tb1.wall_time(), 1000 * SECOND + 50 * MS);
722
723        // A receives B's timestamp via update, then generates its own
724        clock_a.update(tb1).unwrap();
725        let ta2 = clock_a.now().unwrap();
726        // A should advance to B's wall time with incremented logical
727        assert_eq!(ta2.wall_time(), 1000 * SECOND + 50 * MS);
728        assert_eq!(ta2.logical(), 1);
729
730        // B receives A's earlier timestamp - already behind, no effect
731        clock_b.update(ta1).unwrap();
732        let tb2 = clock_b.now().unwrap();
733        assert_eq!(tb2.wall_time(), 1000 * SECOND + 50 * MS);
734        assert!(tb2 > tb1);
735    }
736
737    #[test]
738    fn causal_ordering_preserved() {
739        let mc_a = ManualClock::new(1000 * SECOND);
740        let mc_b = ManualClock::new(1000 * SECOND);
741
742        let mut clock_a = HlcClock::with_clock(mc_a);
743        let mut clock_b = HlcClock::with_clock(mc_b);
744
745        // A: write X
746        let ta1 = clock_a.now().unwrap();
747
748        // B receives A's message (sees write X), then writes Y
749        clock_b.update(ta1).unwrap();
750        let tb1 = clock_b.now().unwrap();
751
752        // Causal chain: ta1 (write X) < tb1 (write Y after seeing X)
753        assert!(ta1 < tb1);
754    }
755
756    #[test]
757    fn physical_time_advance_during_sync() {
758        let mc_a = ManualClock::new(1000 * SECOND);
759        let mc_b = ManualClock::new(1000 * SECOND);
760
761        let mut clock_a = HlcClock::with_clock(mc_a);
762        let mut clock_b = HlcClock::with_clock(mc_b);
763
764        // Both generate local events at same physical time
765        let ta = clock_a.now().unwrap(); // (1000s, 0)
766        let tb = clock_b.now().unwrap(); // (1000s, 0) - same!
767
768        // Advance physical time on both
769        clock_a.physical_clock().advance(100 * MS);
770        clock_b.physical_clock().advance(100 * MS);
771
772        // A receives B's timestamp then generates event
773        clock_a.update(tb).unwrap();
774        let ta2 = clock_a.now().unwrap();
775
776        // Physical time advanced past stored wall_time, so logical resets
777        assert_eq!(ta2.wall_time(), 1000 * SECOND + 100 * MS);
778        assert_eq!(ta2.logical(), 0);
779        assert!(ta2 > ta);
780        assert!(ta2 > tb);
781    }
782
783    // ── Three-node ring sync ──────────────────────────────────────────
784
785    #[test]
786    fn three_node_ring_sync() {
787        let mc_a = ManualClock::new(1000 * SECOND);
788        let mc_b = ManualClock::new(1000 * SECOND + 10 * MS);
789        let mc_c = ManualClock::new(1000 * SECOND + 20 * MS);
790
791        let mut a = HlcClock::with_clock(mc_a);
792        let mut b = HlcClock::with_clock(mc_b);
793        let mut c = HlcClock::with_clock(mc_c);
794
795        // Each generates a local event
796        let ta = a.now().unwrap();
797        let tb = b.now().unwrap();
798        let tc = c.now().unwrap();
799
800        // A -> B: B receives A's timestamp and generates event
801        b.update(ta).unwrap();
802        let tb2 = b.now().unwrap();
803        assert!(tb2 > tb);
804        assert!(tb2 > ta);
805
806        // B -> C: C receives B's latest and generates event
807        c.update(tb2).unwrap();
808        let tc2 = c.now().unwrap();
809        assert!(tc2 > tc);
810        assert!(tc2 > tb2);
811
812        // C -> A: A receives C's latest and generates event
813        a.update(tc2).unwrap();
814        let ta2 = a.now().unwrap();
815        assert!(ta2 > ta);
816        assert!(ta2 > tc2);
817    }
818
819    // ── Many events ───────────────────────────────────────────────────
820
821    #[test]
822    fn many_events_same_nanosecond() {
823        let mc = ManualClock::new(1000 * SECOND);
824        let mut clock = HlcClock::with_clock(mc);
825
826        for i in 0i32..1000 {
827            let t = clock.now().unwrap();
828            assert_eq!(t.logical(), i);
829        }
830    }
831
832    // ── Hash consistency ──────────────────────────────────────────────
833
834    #[test]
835    fn hash_consistency() {
836        use std::collections::HashSet;
837        let a = HlcTimestamp::new(100, 5);
838        let b = HlcTimestamp::new(100, 5);
839        let c = HlcTimestamp::new(100, 6);
840
841        let mut set = HashSet::new();
842        set.insert(a);
843        assert!(set.contains(&b));
844        assert!(!set.contains(&c));
845    }
846
847    // ── System clock integration ──────────────────────────────────────
848
849    #[test]
850    fn system_clock_hlc_integration() {
851        let mut clock = HlcClock::new();
852        let t1 = clock.now().unwrap();
853        let t2 = clock.now().unwrap();
854        assert!(t2 > t1);
855        assert!(!t1.is_zero());
856    }
857
858    // ── Nanosecond precision ──────────────────────────────────────────
859
860    #[test]
861    fn nanosecond_precision_preserved() {
862        let ts = HlcTimestamp::new(1_741_000_000_123_456_789, 0);
863        assert_eq!(ts.wall_time(), 1_741_000_000_123_456_789);
864
865        let bytes = ts.to_bytes();
866        let ts2 = HlcTimestamp::from_bytes(&bytes);
867        assert_eq!(ts2.wall_time(), 1_741_000_000_123_456_789);
868    }
869
870    #[test]
871    fn sub_millisecond_ordering() {
872        // Two events 1 microsecond apart
873        let a = HlcTimestamp::new(1000 * SECOND, 0);
874        let b = HlcTimestamp::new(1000 * SECOND + 1000, 0); // +1μs
875        assert!(a < b);
876
877        // Two events 1 nanosecond apart
878        let c = HlcTimestamp::new(1000 * SECOND, 0);
879        let d = HlcTimestamp::new(1000 * SECOND + 1, 0); // +1ns
880        assert!(c < d);
881    }
882
883    #[test]
884    fn i32_max_logical_counter() {
885        // i32::MAX = 2,147,483,647 - much more than u16's 65,535
886        let ts = HlcTimestamp::new(1000 * SECOND, i32::MAX);
887        assert_eq!(ts.logical(), i32::MAX);
888
889        let bytes = ts.to_bytes();
890        let ts2 = HlcTimestamp::from_bytes(&bytes);
891        assert_eq!(ts2.logical(), i32::MAX);
892    }
893
894    // ── Wire size constant ────────────────────────────────────────────
895
896    #[test]
897    fn wire_size_is_12() {
898        assert_eq!(HLC_TIMESTAMP_SIZE, 12);
899        assert_eq!(std::mem::size_of::<i64>() + std::mem::size_of::<i32>(), 12);
900    }
901}