dx_forge/sync/
clock.rs

1use once_cell::sync::Lazy;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5const LOGICAL_BITS: u32 = 16;
6const LOGICAL_MASK: u64 = (1u64 << LOGICAL_BITS) - 1;
7
8/// Hybrid logical clock inspired by HLCs. Encodes physical milliseconds and
9/// a logical counter into a single `u64` so ordering stays monotonic even
10/// across multiple processes.
11pub struct HybridLogicalClock {
12    value: AtomicU64,
13}
14
15impl HybridLogicalClock {
16    pub fn new() -> Self {
17        let now = current_millis();
18        Self {
19            value: AtomicU64::new(encode(now, 0)),
20        }
21    }
22
23    /// Tick the clock for a local event and return the new hybrid timestamp.
24    pub fn tick(&self) -> u64 {
25        let physical = current_millis();
26        loop {
27            let current = self.value.load(Ordering::Relaxed);
28            let (cur_phys, cur_logical) = decode(current);
29
30            let next = if physical > cur_phys {
31                encode(physical, 0)
32            } else if physical == cur_phys {
33                encode(cur_phys, cur_logical.wrapping_add(1))
34            } else {
35                encode(cur_phys, cur_logical.wrapping_add(1))
36            };
37
38            if self
39                .value
40                .compare_exchange(current, next, Ordering::SeqCst, Ordering::Relaxed)
41                .is_ok()
42            {
43                return next;
44            }
45        }
46    }
47
48    /// Observe a remote timestamp so subsequent local ticks stay ahead.
49    pub fn observe(&self, external: u64) {
50        loop {
51            let current = self.value.load(Ordering::Relaxed);
52            if current >= external {
53                return;
54            }
55            if self
56                .value
57                .compare_exchange(current, external, Ordering::SeqCst, Ordering::Relaxed)
58                .is_ok()
59            {
60                return;
61            }
62        }
63    }
64}
65
66fn current_millis() -> u64 {
67    SystemTime::now()
68        .duration_since(UNIX_EPOCH)
69        .unwrap_or_default()
70        .as_millis() as u64
71}
72
73fn encode(physical: u64, logical: u64) -> u64 {
74    (physical << LOGICAL_BITS) | (logical & LOGICAL_MASK)
75}
76
77fn decode(value: u64) -> (u64, u64) {
78    (value >> LOGICAL_BITS, value & LOGICAL_MASK)
79}
80
81pub static GLOBAL_CLOCK: Lazy<HybridLogicalClock> = Lazy::new(HybridLogicalClock::new);