Skip to main content

citadel_sync/
hlc.rs

1use std::sync::atomic::{AtomicI64, Ordering};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4/// Hybrid Logical Clock timestamp.
5///
6/// Layout (12 bytes):
7/// - `wall_time`: `i64` - nanoseconds since Unix epoch (true nanosecond precision)
8/// - `logical`: `i32` - counter for events within the same nanosecond
9///
10/// Comparison: wall_time first, then logical (total order).
11/// Big-endian byte serialization preserves comparison order for non-negative values.
12///
13/// Range:
14/// - wall_time: covers ~292 years from epoch (until year 2262)
15/// - logical: up to 2,147,483,647 events per nanosecond
16#[derive(Clone, Copy, PartialEq, Eq, Hash)]
17pub struct HlcTimestamp {
18    wall_time: i64,
19    logical: i32,
20}
21
22pub const HLC_TIMESTAMP_SIZE: usize = 12;
23
24impl HlcTimestamp {
25    pub const ZERO: Self = Self {
26        wall_time: 0,
27        logical: 0,
28    };
29
30    #[inline]
31    pub fn new(wall_time_ns: i64, logical: i32) -> Self {
32        Self {
33            wall_time: wall_time_ns,
34            logical,
35        }
36    }
37
38    #[inline]
39    pub fn wall_time(&self) -> i64 {
40        self.wall_time
41    }
42
43    #[inline]
44    pub fn logical(&self) -> i32 {
45        self.logical
46    }
47
48    /// Big-endian serialization preserves comparison order for non-negative values.
49    #[inline]
50    pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
51        let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
52        buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
53        buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
54        buf
55    }
56
57    #[inline]
58    pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
59        Self {
60            wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
61            logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
62        }
63    }
64
65    #[inline]
66    pub fn is_zero(&self) -> bool {
67        self.wall_time == 0 && self.logical == 0
68    }
69}
70
71impl PartialOrd for HlcTimestamp {
72    #[inline]
73    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for HlcTimestamp {
79    #[inline]
80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81        self.wall_time
82            .cmp(&other.wall_time)
83            .then(self.logical.cmp(&other.logical))
84    }
85}
86
87impl std::fmt::Debug for HlcTimestamp {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
90    }
91}
92
93impl std::fmt::Display for HlcTimestamp {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(f, "{}:{}", self.wall_time, self.logical)
96    }
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum ClockError {
101    #[error(
102        "clock drift exceeded: remote_wall_time={remote_ns}ns, \
103         physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
104    )]
105    ClockDriftExceeded {
106        remote_ns: i64,
107        physical_ns: i64,
108        max_drift_ns: i64,
109    },
110
111    #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
112    CounterOverflow,
113}
114
115/// Source of physical time for the HLC.
116///
117/// Abstracted as a trait to allow deterministic testing with [`ManualClock`].
118pub trait PhysicalClock: Send {
119    /// Current time in nanoseconds since Unix epoch.
120    fn now_ns(&self) -> i64;
121}
122
123/// Physical clock backed by the system clock.
124///
125/// On Linux: provides true nanosecond precision via `clock_gettime(CLOCK_REALTIME)`.
126/// On macOS: microsecond precision for wall clock.
127/// On Windows: ~100ns precision (system timer resolution).
128pub struct SystemClock;
129
130impl PhysicalClock for SystemClock {
131    fn now_ns(&self) -> i64 {
132        SystemTime::now()
133            .duration_since(UNIX_EPOCH)
134            .expect("system clock before UNIX epoch")
135            .as_nanos() as i64
136    }
137}
138
139pub struct ManualClock {
140    time_ns: AtomicI64,
141}
142
143impl ManualClock {
144    pub fn new(initial_ns: i64) -> Self {
145        Self {
146            time_ns: AtomicI64::new(initial_ns),
147        }
148    }
149
150    pub fn set(&self, time_ns: i64) {
151        self.time_ns.store(time_ns, Ordering::SeqCst);
152    }
153
154    pub fn advance(&self, delta_ns: i64) {
155        self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
156    }
157}
158
159impl PhysicalClock for ManualClock {
160    fn now_ns(&self) -> i64 {
161        self.time_ns.load(Ordering::SeqCst)
162    }
163}
164
165const SECOND_NS: i64 = 1_000_000_000;
166const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
167
168/// Hybrid Logical Clock state machine.
169///
170/// Two core operations:
171/// - [`now()`](HlcClock::now) - generate a timestamp for a local event
172/// - [`update()`](HlcClock::update) - merge a remote timestamp into local state
173///
174/// `now()` always returns a strictly increasing timestamp.
175/// `update()` advances the internal clock state without generating a new timestamp.
176/// Timestamps are only generated by local events; received timestamps only advance
177/// the clock.
178pub struct HlcClock<C: PhysicalClock = SystemClock> {
179    last: HlcTimestamp,
180    max_drift_ns: i64,
181    clock: C,
182}
183
184impl Default for HlcClock<SystemClock> {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl HlcClock<SystemClock> {
191    pub fn new() -> Self {
192        Self {
193            last: HlcTimestamp::ZERO,
194            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
195            clock: SystemClock,
196        }
197    }
198}
199
200impl<C: PhysicalClock> HlcClock<C> {
201    pub fn with_clock(clock: C) -> Self {
202        Self {
203            last: HlcTimestamp::ZERO,
204            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
205            clock,
206        }
207    }
208
209    pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
210        self.max_drift_ns = max_drift_ns;
211    }
212
213    /// Call on startup to restore monotonicity after a restart.
214    pub fn set_last(&mut self, ts: HlcTimestamp) {
215        self.last = ts;
216    }
217
218    /// Generate a monotonically increasing timestamp for a local event.
219    pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
220        let pt = self.clock.now_ns();
221
222        let ts = if self.last.wall_time >= pt {
223            // Physical clock hasn't advanced past stored wall time.
224            // Increment logical counter.
225            let new_logical = self
226                .last
227                .logical
228                .checked_add(1)
229                .ok_or(ClockError::CounterOverflow)?;
230
231            self.check_drift(self.last.wall_time, pt)?;
232
233            HlcTimestamp::new(self.last.wall_time, new_logical)
234        } else {
235            // Physical clock advanced - use it, reset logical to 0.
236            HlcTimestamp::new(pt, 0)
237        };
238
239        self.last = ts;
240        Ok(ts)
241    }
242
243    /// Merge a remote timestamp into local state (does not generate a new timestamp).
244    pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
245        let pt = self.clock.now_ns();
246
247        // Reject remote timestamps that are too far ahead
248        if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
249            return Err(ClockError::ClockDriftExceeded {
250                remote_ns: remote.wall_time,
251                physical_ns: pt,
252                max_drift_ns: self.max_drift_ns,
253            });
254        }
255
256        if remote.wall_time > self.last.wall_time {
257            // Remote is ahead: adopt its wall time and logical
258            self.last = remote;
259        } else if remote.wall_time == self.last.wall_time {
260            // Same wall time: take the max logical
261            if remote.logical > self.last.logical {
262                self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
263            }
264        }
265        // If remote.wall_time < self.last.wall_time: do nothing
266
267        Ok(())
268    }
269
270    pub fn last_timestamp(&self) -> HlcTimestamp {
271        self.last
272    }
273
274    pub fn physical_clock(&self) -> &C {
275        &self.clock
276    }
277
278    fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
279        if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
280            return Err(ClockError::ClockDriftExceeded {
281                remote_ns: wall_time_ns,
282                physical_ns,
283                max_drift_ns: self.max_drift_ns,
284            });
285        }
286        Ok(())
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    const SECOND: i64 = 1_000_000_000;
295    const MS: i64 = 1_000_000;
296
297    #[test]
298    fn new_and_accessors() {
299        let ts = HlcTimestamp::new(1_000_000_000, 42);
300        assert_eq!(ts.wall_time(), 1_000_000_000);
301        assert_eq!(ts.logical(), 42);
302    }
303
304    #[test]
305    fn zero_timestamp() {
306        let ts = HlcTimestamp::ZERO;
307        assert_eq!(ts.wall_time(), 0);
308        assert_eq!(ts.logical(), 0);
309        assert!(ts.is_zero());
310    }
311
312    #[test]
313    fn non_zero_is_not_zero() {
314        let ts = HlcTimestamp::new(1, 0);
315        assert!(!ts.is_zero());
316        let ts2 = HlcTimestamp::new(0, 1);
317        assert!(!ts2.is_zero());
318    }
319
320    #[test]
321    fn ordering_wall_time_dominates() {
322        let a = HlcTimestamp::new(100, i32::MAX);
323        let b = HlcTimestamp::new(101, 0);
324        assert!(a < b);
325    }
326
327    #[test]
328    fn ordering_logical_tiebreaks() {
329        let a = HlcTimestamp::new(100, 5);
330        let b = HlcTimestamp::new(100, 6);
331        assert!(a < b);
332    }
333
334    #[test]
335    fn ordering_equality() {
336        let a = HlcTimestamp::new(100, 5);
337        let b = HlcTimestamp::new(100, 5);
338        assert_eq!(a, b);
339        assert!(a <= b);
340        assert!(a >= b);
341    }
342
343    #[test]
344    fn ordering_negative_wall_time() {
345        let a = HlcTimestamp::new(-100, 0);
346        let b = HlcTimestamp::new(0, 0);
347        let c = HlcTimestamp::new(100, 0);
348        assert!(a < b);
349        assert!(b < c);
350    }
351
352    #[test]
353    fn bytes_roundtrip() {
354        let ts = HlcTimestamp::new(123_456_789_000_000, 1000);
355        let bytes = ts.to_bytes();
356        assert_eq!(bytes.len(), 12);
357        let ts2 = HlcTimestamp::from_bytes(&bytes);
358        assert_eq!(ts, ts2);
359    }
360
361    #[test]
362    fn bytes_roundtrip_zero() {
363        let ts = HlcTimestamp::ZERO;
364        let bytes = ts.to_bytes();
365        let ts2 = HlcTimestamp::from_bytes(&bytes);
366        assert_eq!(ts, ts2);
367    }
368
369    #[test]
370    fn bytes_roundtrip_max() {
371        let ts = HlcTimestamp::new(i64::MAX, i32::MAX);
372        let bytes = ts.to_bytes();
373        let ts2 = HlcTimestamp::from_bytes(&bytes);
374        assert_eq!(ts, ts2);
375    }
376
377    #[test]
378    fn bytes_preserve_order_for_positive_values() {
379        let a = HlcTimestamp::new(100, 5);
380        let b = HlcTimestamp::new(100, 6);
381        let c = HlcTimestamp::new(101, 0);
382
383        let ba = a.to_bytes();
384        let bb = b.to_bytes();
385        let bc = c.to_bytes();
386
387        assert!(ba < bb);
388        assert!(bb < bc);
389    }
390
391    #[test]
392    fn bytes_wall_time_is_big_endian() {
393        let ts = HlcTimestamp::new(0x0102_0304_0506_0708, 0);
394        let bytes = ts.to_bytes();
395        assert_eq!(
396            &bytes[0..8],
397            &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
398        );
399    }
400
401    #[test]
402    fn bytes_logical_is_big_endian() {
403        let ts = HlcTimestamp::new(0, 0x01020304);
404        let bytes = ts.to_bytes();
405        assert_eq!(&bytes[8..12], &[0x01, 0x02, 0x03, 0x04]);
406    }
407
408    #[test]
409    fn display_format() {
410        let ts = HlcTimestamp::new(1_000_000_000, 5);
411        assert_eq!(format!("{ts}"), "1000000000:5");
412    }
413
414    #[test]
415    fn debug_format() {
416        let ts = HlcTimestamp::new(1_000_000_000, 5);
417        assert_eq!(format!("{ts:?}"), "HLC(1000000000ns:5)");
418    }
419
420    #[test]
421    fn manual_clock_basic() {
422        let mc = ManualClock::new(100);
423        assert_eq!(mc.now_ns(), 100);
424        mc.advance(50);
425        assert_eq!(mc.now_ns(), 150);
426        mc.set(200);
427        assert_eq!(mc.now_ns(), 200);
428    }
429
430    #[test]
431    fn system_clock_produces_reasonable_values() {
432        let sc = SystemClock;
433        let now = sc.now_ns();
434        let jan_2020_ns: i64 = 1_577_836_800 * SECOND;
435        assert!(now > jan_2020_ns);
436        assert!(now > 0);
437    }
438
439    #[test]
440    fn now_monotonic() {
441        let mc = ManualClock::new(1000 * SECOND);
442        let mut clock = HlcClock::with_clock(mc);
443
444        let t1 = clock.now().unwrap();
445        let t2 = clock.now().unwrap();
446        let t3 = clock.now().unwrap();
447
448        assert!(t1 < t2);
449        assert!(t2 < t3);
450    }
451
452    #[test]
453    fn now_same_physical_increments_logical() {
454        let mc = ManualClock::new(1000 * SECOND);
455        let mut clock = HlcClock::with_clock(mc);
456
457        let t1 = clock.now().unwrap();
458        let t2 = clock.now().unwrap();
459
460        assert_eq!(t1.wall_time(), 1000 * SECOND);
461        assert_eq!(t1.logical(), 0);
462        assert_eq!(t2.wall_time(), 1000 * SECOND);
463        assert_eq!(t2.logical(), 1);
464    }
465
466    #[test]
467    fn now_physical_advance_resets_logical() {
468        let mc = ManualClock::new(1000 * SECOND);
469        let mut clock = HlcClock::with_clock(mc);
470
471        let _t1 = clock.now().unwrap();
472        let _t2 = clock.now().unwrap();
473        assert_eq!(_t2.logical(), 1);
474
475        clock.physical_clock().advance(1);
476        let t3 = clock.now().unwrap();
477        assert_eq!(t3.wall_time(), 1000 * SECOND + 1);
478        assert_eq!(t3.logical(), 0);
479    }
480
481    #[test]
482    fn now_backward_jump_stays_at_high_watermark() {
483        let mc = ManualClock::new(1000 * SECOND);
484        let mut clock = HlcClock::with_clock(mc);
485
486        let t1 = clock.now().unwrap();
487        assert_eq!(t1.wall_time(), 1000 * SECOND);
488
489        // Jump backward by 2 seconds (within 5s default max drift)
490        clock.physical_clock().set(998 * SECOND);
491        let t2 = clock.now().unwrap();
492
493        // Should stay at high watermark, logical increments
494        assert_eq!(t2.wall_time(), 1000 * SECOND);
495        assert!(t2 > t1);
496    }
497
498    #[test]
499    fn now_counter_overflow() {
500        let mc = ManualClock::new(1000 * SECOND);
501        let mut clock = HlcClock::with_clock(mc);
502
503        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX - 1));
504
505        let t = clock.now().unwrap();
506        assert_eq!(t.logical(), i32::MAX);
507
508        let err = clock.now().unwrap_err();
509        assert!(matches!(err, ClockError::CounterOverflow));
510    }
511
512    #[test]
513    fn now_counter_overflow_recovery_via_time_advance() {
514        let mc = ManualClock::new(1000 * SECOND);
515        let mut clock = HlcClock::with_clock(mc);
516
517        clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX));
518
519        let err = clock.now().unwrap_err();
520        assert!(matches!(err, ClockError::CounterOverflow));
521
522        clock.physical_clock().advance(1);
523        let t = clock.now().unwrap();
524        assert_eq!(t.wall_time(), 1000 * SECOND + 1);
525        assert_eq!(t.logical(), 0);
526    }
527
528    #[test]
529    fn now_drift_protection() {
530        let mc = ManualClock::new(1000 * SECOND);
531        let mut clock = HlcClock::with_clock(mc);
532        clock.set_max_drift_ns(SECOND); // 1 second max drift
533
534        clock.set_last(HlcTimestamp::new(1010 * SECOND, 0));
535
536        let err = clock.now().unwrap_err();
537        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
538    }
539
540    #[test]
541    fn update_remote_behind() {
542        let mc = ManualClock::new(1000 * SECOND);
543        let mut clock = HlcClock::with_clock(mc);
544
545        let _t1 = clock.now().unwrap(); // (1000s, 0)
546
547        let remote = HlcTimestamp::new(500 * SECOND, 99);
548        clock.update(remote).unwrap();
549
550        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
551        assert_eq!(clock.last_timestamp().logical(), 0);
552    }
553
554    #[test]
555    fn update_remote_ahead() {
556        let mc = ManualClock::new(1000 * SECOND);
557        let mut clock = HlcClock::with_clock(mc);
558
559        let _t1 = clock.now().unwrap(); // (1000s, 0)
560
561        let remote = HlcTimestamp::new(1002 * SECOND, 5);
562        clock.update(remote).unwrap();
563
564        assert_eq!(clock.last_timestamp().wall_time(), 1002 * SECOND);
565        assert_eq!(clock.last_timestamp().logical(), 5);
566
567        let t2 = clock.now().unwrap();
568        assert!(t2 > remote);
569        assert_eq!(t2.wall_time(), 1002 * SECOND);
570        assert_eq!(t2.logical(), 6);
571    }
572
573    #[test]
574    fn update_remote_same_wall_time_higher_logical() {
575        let mc = ManualClock::new(1000 * SECOND);
576        let mut clock = HlcClock::with_clock(mc);
577
578        let _t1 = clock.now().unwrap(); // (1000s, 0)
579
580        let remote = HlcTimestamp::new(1000 * SECOND, 10);
581        clock.update(remote).unwrap();
582
583        assert_eq!(clock.last_timestamp().logical(), 10);
584
585        let t2 = clock.now().unwrap();
586        assert_eq!(t2.logical(), 11);
587    }
588
589    #[test]
590    fn update_remote_same_wall_time_lower_logical() {
591        let mc = ManualClock::new(1000 * SECOND);
592        let mut clock = HlcClock::with_clock(mc);
593
594        for _ in 0..5 {
595            clock.now().unwrap();
596        }
597
598        let remote = HlcTimestamp::new(1000 * SECOND, 2);
599        clock.update(remote).unwrap();
600
601        assert_eq!(clock.last_timestamp().logical(), 4);
602    }
603
604    #[test]
605    fn update_drift_exceeded() {
606        let mc = ManualClock::new(1000 * SECOND);
607        let mut clock = HlcClock::with_clock(mc);
608        clock.set_max_drift_ns(SECOND); // 1 second
609
610        let remote = HlcTimestamp::new(1010 * SECOND, 0);
611        let err = clock.update(remote).unwrap_err();
612        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
613
614        assert_eq!(clock.last_timestamp(), HlcTimestamp::ZERO);
615    }
616
617    #[test]
618    fn update_drift_boundary_exact() {
619        let mc = ManualClock::new(1000 * SECOND);
620        let mut clock = HlcClock::with_clock(mc);
621        clock.set_max_drift_ns(SECOND); // 1 second
622
623        let remote = HlcTimestamp::new(1001 * SECOND, 0);
624        clock.update(remote).unwrap();
625        assert_eq!(clock.last_timestamp().wall_time(), 1001 * SECOND);
626
627        let mc2 = ManualClock::new(1000 * SECOND);
628        let mut clock2 = HlcClock::with_clock(mc2);
629        clock2.set_max_drift_ns(SECOND);
630
631        let remote2 = HlcTimestamp::new(1001 * SECOND + 1, 0);
632        let err = clock2.update(remote2).unwrap_err();
633        assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
634    }
635
636    #[test]
637    fn update_zero_timestamp_is_noop() {
638        let mc = ManualClock::new(1000 * SECOND);
639        let mut clock = HlcClock::with_clock(mc);
640        let _t1 = clock.now().unwrap();
641
642        clock.update(HlcTimestamp::ZERO).unwrap();
643
644        assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
645    }
646
647    #[test]
648    fn set_last_restores_monotonicity() {
649        let mc = ManualClock::new(1000 * SECOND);
650        let mut clock = HlcClock::with_clock(mc);
651
652        let persisted = HlcTimestamp::new(1000 * SECOND, 50);
653        clock.set_last(persisted);
654
655        let t1 = clock.now().unwrap();
656        assert!(t1 > persisted);
657        assert_eq!(t1.logical(), 51);
658    }
659
660    #[test]
661    fn two_clocks_converge() {
662        let mc_a = ManualClock::new(1000 * SECOND);
663        let mc_b = ManualClock::new(1000 * SECOND + 50 * MS);
664
665        let mut clock_a = HlcClock::with_clock(mc_a);
666        let mut clock_b = HlcClock::with_clock(mc_b);
667
668        // A generates event
669        let ta1 = clock_a.now().unwrap();
670        assert_eq!(ta1.wall_time(), 1000 * SECOND);
671
672        // B generates event
673        let tb1 = clock_b.now().unwrap();
674        assert_eq!(tb1.wall_time(), 1000 * SECOND + 50 * MS);
675
676        // A receives B's timestamp via update, then generates its own
677        clock_a.update(tb1).unwrap();
678        let ta2 = clock_a.now().unwrap();
679        // A should advance to B's wall time with incremented logical
680        assert_eq!(ta2.wall_time(), 1000 * SECOND + 50 * MS);
681        assert_eq!(ta2.logical(), 1);
682
683        // B receives A's earlier timestamp - already behind, no effect
684        clock_b.update(ta1).unwrap();
685        let tb2 = clock_b.now().unwrap();
686        assert_eq!(tb2.wall_time(), 1000 * SECOND + 50 * MS);
687        assert!(tb2 > tb1);
688    }
689
690    #[test]
691    fn causal_ordering_preserved() {
692        let mc_a = ManualClock::new(1000 * SECOND);
693        let mc_b = ManualClock::new(1000 * SECOND);
694
695        let mut clock_a = HlcClock::with_clock(mc_a);
696        let mut clock_b = HlcClock::with_clock(mc_b);
697
698        // A: write X
699        let ta1 = clock_a.now().unwrap();
700
701        // B receives A's message (sees write X), then writes Y
702        clock_b.update(ta1).unwrap();
703        let tb1 = clock_b.now().unwrap();
704
705        // Causal chain: ta1 (write X) < tb1 (write Y after seeing X)
706        assert!(ta1 < tb1);
707    }
708
709    #[test]
710    fn physical_time_advance_during_sync() {
711        let mc_a = ManualClock::new(1000 * SECOND);
712        let mc_b = ManualClock::new(1000 * SECOND);
713
714        let mut clock_a = HlcClock::with_clock(mc_a);
715        let mut clock_b = HlcClock::with_clock(mc_b);
716
717        // Both generate local events at same physical time
718        let ta = clock_a.now().unwrap(); // (1000s, 0)
719        let tb = clock_b.now().unwrap(); // (1000s, 0) - same!
720
721        // Advance physical time on both
722        clock_a.physical_clock().advance(100 * MS);
723        clock_b.physical_clock().advance(100 * MS);
724
725        // A receives B's timestamp then generates event
726        clock_a.update(tb).unwrap();
727        let ta2 = clock_a.now().unwrap();
728
729        // Physical time advanced past stored wall_time, so logical resets
730        assert_eq!(ta2.wall_time(), 1000 * SECOND + 100 * MS);
731        assert_eq!(ta2.logical(), 0);
732        assert!(ta2 > ta);
733        assert!(ta2 > tb);
734    }
735
736    #[test]
737    fn three_node_ring_sync() {
738        let mc_a = ManualClock::new(1000 * SECOND);
739        let mc_b = ManualClock::new(1000 * SECOND + 10 * MS);
740        let mc_c = ManualClock::new(1000 * SECOND + 20 * MS);
741
742        let mut a = HlcClock::with_clock(mc_a);
743        let mut b = HlcClock::with_clock(mc_b);
744        let mut c = HlcClock::with_clock(mc_c);
745
746        // Each generates a local event
747        let ta = a.now().unwrap();
748        let tb = b.now().unwrap();
749        let tc = c.now().unwrap();
750
751        // A -> B: B receives A's timestamp and generates event
752        b.update(ta).unwrap();
753        let tb2 = b.now().unwrap();
754        assert!(tb2 > tb);
755        assert!(tb2 > ta);
756
757        // B -> C: C receives B's latest and generates event
758        c.update(tb2).unwrap();
759        let tc2 = c.now().unwrap();
760        assert!(tc2 > tc);
761        assert!(tc2 > tb2);
762
763        // C -> A: A receives C's latest and generates event
764        a.update(tc2).unwrap();
765        let ta2 = a.now().unwrap();
766        assert!(ta2 > ta);
767        assert!(ta2 > tc2);
768    }
769
770    #[test]
771    fn many_events_same_nanosecond() {
772        let mc = ManualClock::new(1000 * SECOND);
773        let mut clock = HlcClock::with_clock(mc);
774
775        for i in 0i32..1000 {
776            let t = clock.now().unwrap();
777            assert_eq!(t.logical(), i);
778        }
779    }
780
781    #[test]
782    fn hash_consistency() {
783        use std::collections::HashSet;
784        let a = HlcTimestamp::new(100, 5);
785        let b = HlcTimestamp::new(100, 5);
786        let c = HlcTimestamp::new(100, 6);
787
788        let mut set = HashSet::new();
789        set.insert(a);
790        assert!(set.contains(&b));
791        assert!(!set.contains(&c));
792    }
793
794    #[test]
795    fn system_clock_hlc_integration() {
796        let mut clock = HlcClock::new();
797        let t1 = clock.now().unwrap();
798        let t2 = clock.now().unwrap();
799        assert!(t2 > t1);
800        assert!(!t1.is_zero());
801    }
802
803    #[test]
804    fn nanosecond_precision_preserved() {
805        let ts = HlcTimestamp::new(1_741_000_000_123_456_789, 0);
806        assert_eq!(ts.wall_time(), 1_741_000_000_123_456_789);
807
808        let bytes = ts.to_bytes();
809        let ts2 = HlcTimestamp::from_bytes(&bytes);
810        assert_eq!(ts2.wall_time(), 1_741_000_000_123_456_789);
811    }
812
813    #[test]
814    fn sub_millisecond_ordering() {
815        // Two events 1 microsecond apart
816        let a = HlcTimestamp::new(1000 * SECOND, 0);
817        let b = HlcTimestamp::new(1000 * SECOND + 1000, 0); // +1μs
818        assert!(a < b);
819
820        // Two events 1 nanosecond apart
821        let c = HlcTimestamp::new(1000 * SECOND, 0);
822        let d = HlcTimestamp::new(1000 * SECOND + 1, 0); // +1ns
823        assert!(c < d);
824    }
825
826    #[test]
827    fn i32_max_logical_counter() {
828        // i32::MAX = 2,147,483,647 - much more than u16's 65,535
829        let ts = HlcTimestamp::new(1000 * SECOND, i32::MAX);
830        assert_eq!(ts.logical(), i32::MAX);
831
832        let bytes = ts.to_bytes();
833        let ts2 = HlcTimestamp::from_bytes(&bytes);
834        assert_eq!(ts2.logical(), i32::MAX);
835    }
836
837    #[test]
838    fn wire_size_is_12() {
839        assert_eq!(HLC_TIMESTAMP_SIZE, 12);
840        assert_eq!(std::mem::size_of::<i64>() + std::mem::size_of::<i32>(), 12);
841    }
842}