Skip to main content

citadel_sync/
hlc.rs

1use std::sync::atomic::{AtomicI64, Ordering};
2use std::time::{SystemTime, UNIX_EPOCH};
3
4/// Hybrid Logical Clock timestamp.
5///
6/// Layout (12 bytes):
7/// - `wall_time`: `i64` - nanoseconds since Unix epoch (true nanosecond precision)
8/// - `logical`: `i32` - counter for events within the same nanosecond
9///
10/// Comparison: wall_time first, then logical (total order).
11/// Big-endian byte serialization preserves comparison order for non-negative values.
12///
13/// Range:
14/// - wall_time: covers ~292 years from epoch (until year 2262)
15/// - logical: up to 2,147,483,647 events per nanosecond
16#[derive(Clone, Copy, PartialEq, Eq, Hash)]
17pub struct HlcTimestamp {
18    wall_time: i64,
19    logical: i32,
20}
21
22pub const HLC_TIMESTAMP_SIZE: usize = 12;
23
24impl HlcTimestamp {
25    pub const ZERO: Self = Self {
26        wall_time: 0,
27        logical: 0,
28    };
29
30    #[inline]
31    pub fn new(wall_time_ns: i64, logical: i32) -> Self {
32        Self {
33            wall_time: wall_time_ns,
34            logical,
35        }
36    }
37
38    #[inline]
39    pub fn wall_time(&self) -> i64 {
40        self.wall_time
41    }
42
43    #[inline]
44    pub fn logical(&self) -> i32 {
45        self.logical
46    }
47
48    /// Big-endian serialization preserves comparison order for non-negative values.
49    #[inline]
50    pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
51        let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
52        buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
53        buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
54        buf
55    }
56
57    #[inline]
58    pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
59        Self {
60            wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
61            logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
62        }
63    }
64
65    #[inline]
66    pub fn is_zero(&self) -> bool {
67        self.wall_time == 0 && self.logical == 0
68    }
69}
70
71impl PartialOrd for HlcTimestamp {
72    #[inline]
73    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for HlcTimestamp {
79    #[inline]
80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81        self.wall_time
82            .cmp(&other.wall_time)
83            .then(self.logical.cmp(&other.logical))
84    }
85}
86
87impl std::fmt::Debug for HlcTimestamp {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
90    }
91}
92
93impl std::fmt::Display for HlcTimestamp {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(f, "{}:{}", self.wall_time, self.logical)
96    }
97}
98
99#[derive(Debug, thiserror::Error)]
100pub enum ClockError {
101    #[error(
102        "clock drift exceeded: remote_wall_time={remote_ns}ns, \
103         physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
104    )]
105    ClockDriftExceeded {
106        remote_ns: i64,
107        physical_ns: i64,
108        max_drift_ns: i64,
109    },
110
111    #[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
112    CounterOverflow,
113}
114
115/// Source of physical time for the HLC.
116///
117/// Abstracted as a trait to allow deterministic testing with [`ManualClock`].
118pub trait PhysicalClock: Send {
119    /// Current time in nanoseconds since Unix epoch.
120    fn now_ns(&self) -> i64;
121}
122
123/// Physical clock backed by the system clock.
124///
125/// On Linux: provides true nanosecond precision via `clock_gettime(CLOCK_REALTIME)`.
126/// On macOS: microsecond precision for wall clock.
127/// On Windows: ~100ns precision (system timer resolution).
128pub struct SystemClock;
129
130impl PhysicalClock for SystemClock {
131    fn now_ns(&self) -> i64 {
132        SystemTime::now()
133            .duration_since(UNIX_EPOCH)
134            .expect("system clock before UNIX epoch")
135            .as_nanos() as i64
136    }
137}
138
139pub struct ManualClock {
140    time_ns: AtomicI64,
141}
142
143impl ManualClock {
144    pub fn new(initial_ns: i64) -> Self {
145        Self {
146            time_ns: AtomicI64::new(initial_ns),
147        }
148    }
149
150    pub fn set(&self, time_ns: i64) {
151        self.time_ns.store(time_ns, Ordering::SeqCst);
152    }
153
154    pub fn advance(&self, delta_ns: i64) {
155        self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
156    }
157}
158
159impl PhysicalClock for ManualClock {
160    fn now_ns(&self) -> i64 {
161        self.time_ns.load(Ordering::SeqCst)
162    }
163}
164
165const SECOND_NS: i64 = 1_000_000_000;
166const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
167
168/// Hybrid Logical Clock state machine.
169///
170/// Two core operations:
171/// - [`now()`](HlcClock::now) - generate a timestamp for a local event
172/// - [`update()`](HlcClock::update) - merge a remote timestamp into local state
173///
174/// `now()` always returns a strictly increasing timestamp.
175/// `update()` advances the internal clock state without generating a new timestamp.
176/// Timestamps are only generated by local events; received timestamps only advance
177/// the clock.
178pub struct HlcClock<C: PhysicalClock = SystemClock> {
179    last: HlcTimestamp,
180    max_drift_ns: i64,
181    clock: C,
182}
183
184impl Default for HlcClock<SystemClock> {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl HlcClock<SystemClock> {
191    pub fn new() -> Self {
192        Self {
193            last: HlcTimestamp::ZERO,
194            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
195            clock: SystemClock,
196        }
197    }
198}
199
200impl<C: PhysicalClock> HlcClock<C> {
201    pub fn with_clock(clock: C) -> Self {
202        Self {
203            last: HlcTimestamp::ZERO,
204            max_drift_ns: DEFAULT_MAX_DRIFT_NS,
205            clock,
206        }
207    }
208
209    pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
210        self.max_drift_ns = max_drift_ns;
211    }
212
213    /// Call on startup to restore monotonicity after a restart.
214    pub fn set_last(&mut self, ts: HlcTimestamp) {
215        self.last = ts;
216    }
217
218    /// Generate a monotonically increasing timestamp for a local event.
219    pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
220        let pt = self.clock.now_ns();
221
222        let ts = if self.last.wall_time >= pt {
223            // Physical clock hasn't advanced past stored wall time.
224            // Increment logical counter.
225            let new_logical = self
226                .last
227                .logical
228                .checked_add(1)
229                .ok_or(ClockError::CounterOverflow)?;
230
231            self.check_drift(self.last.wall_time, pt)?;
232
233            HlcTimestamp::new(self.last.wall_time, new_logical)
234        } else {
235            // Physical clock advanced - use it, reset logical to 0.
236            HlcTimestamp::new(pt, 0)
237        };
238
239        self.last = ts;
240        Ok(ts)
241    }
242
243    /// Merge a remote timestamp into local state (does not generate a new timestamp).
244    pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
245        let pt = self.clock.now_ns();
246
247        // Reject remote timestamps that are too far ahead
248        if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
249            return Err(ClockError::ClockDriftExceeded {
250                remote_ns: remote.wall_time,
251                physical_ns: pt,
252                max_drift_ns: self.max_drift_ns,
253            });
254        }
255
256        if remote.wall_time > self.last.wall_time {
257            // Remote is ahead: adopt its wall time and logical
258            self.last = remote;
259        } else if remote.wall_time == self.last.wall_time {
260            // Same wall time: take the max logical
261            if remote.logical > self.last.logical {
262                self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
263            }
264        }
265        // If remote.wall_time < self.last.wall_time: do nothing
266
267        Ok(())
268    }
269
270    pub fn last_timestamp(&self) -> HlcTimestamp {
271        self.last
272    }
273
274    pub fn physical_clock(&self) -> &C {
275        &self.clock
276    }
277
278    fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
279        if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
280            return Err(ClockError::ClockDriftExceeded {
281                remote_ns: wall_time_ns,
282                physical_ns,
283                max_drift_ns: self.max_drift_ns,
284            });
285        }
286        Ok(())
287    }
288}
289
290#[cfg(test)]
291#[path = "hlc_tests.rs"]
292mod tests;