Skip to main content

nodedb_array/sync/
hlc.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Hybrid Logical Clock (HLC) for array CRDT sync.
4//!
5//! Each array op carries an [`Hlc`] that encodes a wall-clock millisecond
6//! timestamp (`physical_ms`), a per-millisecond monotonic counter (`logical`),
7//! and the originating [`ReplicaId`]. The total order
8//! `(physical_ms, logical, replica_id)` provides deterministic tiebreaks under
9//! any clock skew without requiring synchronised clocks.
10//!
11//! `physical_ms` is stored as `u64` but only values up to
12//! [`MAX_PHYSICAL_MS`] (2^48 − 1 ≈ year 10 889) are valid. This matches the
13//! spec's u48 intent while keeping byte alignment simple.
14
15use std::sync::Mutex;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use serde::{Deserialize, Serialize};
19
20use crate::error::{ArrayError, ArrayResult};
21use crate::sync::replica_id::ReplicaId;
22
23/// Maximum value for `physical_ms` (2^48 − 1).
24///
25/// Values above this are rejected by [`Hlc::new`] and [`HlcGenerator::next`].
26/// This cap corresponds to roughly the year 10 889 CE, so it is not a
27/// practical limit.
28pub const MAX_PHYSICAL_MS: u64 = (1u64 << 48) - 1;
29
30/// A Hybrid Logical Clock timestamp.
31///
32/// Total order: `(physical_ms, logical, replica_id.0)` lexicographic.
33/// This order is also the byte order produced by [`Hlc::to_bytes`].
34#[derive(
35    Copy,
36    Clone,
37    Debug,
38    PartialEq,
39    Eq,
40    Serialize,
41    Deserialize,
42    zerompk::ToMessagePack,
43    zerompk::FromMessagePack,
44)]
45pub struct Hlc {
46    /// Wall-clock milliseconds since Unix epoch at the originating replica.
47    /// Valid range: `0..=MAX_PHYSICAL_MS`.
48    pub physical_ms: u64,
49    /// Monotonic counter within a single millisecond on one replica.
50    pub logical: u16,
51    /// Stable identity of the replica that generated this timestamp.
52    pub replica_id: ReplicaId,
53}
54
55impl Hlc {
56    /// The bottom of the total order; useful as a "no ops seen yet" sentinel.
57    pub const ZERO: Self = Self {
58        physical_ms: 0,
59        logical: 0,
60        replica_id: ReplicaId(0),
61    };
62
63    /// Construct a new [`Hlc`], returning an error if `physical_ms` exceeds
64    /// [`MAX_PHYSICAL_MS`].
65    pub fn new(physical_ms: u64, logical: u16, replica_id: ReplicaId) -> ArrayResult<Self> {
66        if physical_ms > MAX_PHYSICAL_MS {
67            return Err(ArrayError::InvalidHlc {
68                detail: format!(
69                    "physical_ms {physical_ms} exceeds MAX_PHYSICAL_MS {MAX_PHYSICAL_MS}"
70                ),
71            });
72        }
73        Ok(Self {
74            physical_ms,
75            logical,
76            replica_id,
77        })
78    }
79
80    /// Encode as 18 bytes in big-endian order for stable byte-comparable sort.
81    ///
82    /// Layout: `physical_ms` (8 bytes) | `logical` (2 bytes) | `replica_id` (8 bytes).
83    pub fn to_bytes(&self) -> [u8; 18] {
84        let mut out = [0u8; 18];
85        out[0..8].copy_from_slice(&self.physical_ms.to_be_bytes());
86        out[8..10].copy_from_slice(&self.logical.to_be_bytes());
87        out[10..18].copy_from_slice(&self.replica_id.0.to_be_bytes());
88        out
89    }
90
91    /// Decode from a 18-byte big-endian slice produced by [`Hlc::to_bytes`].
92    ///
93    /// No range validation is performed; the caller should use [`Hlc::new`]
94    /// if validation is required.
95    pub fn from_bytes(b: &[u8; 18]) -> Self {
96        let physical_ms = u64::from_be_bytes(
97            b[0..8]
98                .try_into()
99                .expect("invariant: b is [u8; 18], slice [0..8] is exactly 8 bytes"),
100        );
101        let logical = u16::from_be_bytes(
102            b[8..10]
103                .try_into()
104                .expect("invariant: b is [u8; 18], slice [8..10] is exactly 2 bytes"),
105        );
106        let replica_id =
107            ReplicaId(u64::from_be_bytes(b[10..18].try_into().expect(
108                "invariant: b is [u8; 18], slice [10..18] is exactly 8 bytes",
109            )));
110        Self {
111            physical_ms,
112            logical,
113            replica_id,
114        }
115    }
116}
117
118impl PartialOrd for Hlc {
119    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
120        Some(self.cmp(other))
121    }
122}
123
124impl Ord for Hlc {
125    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
126        (self.physical_ms, self.logical, self.replica_id.0).cmp(&(
127            other.physical_ms,
128            other.logical,
129            other.replica_id.0,
130        ))
131    }
132}
133
134/// Monotonic HLC generator for a single replica.
135///
136/// Thread-safe via an internal `Mutex`. Lock contention at HLC-generation
137/// rates (typically hundreds per second per replica) is negligible.
138pub struct HlcGenerator {
139    replica_id: ReplicaId,
140    /// Guarded state: `(last_physical_ms, last_logical)`.
141    state: Mutex<(u64, u16)>,
142}
143
144impl HlcGenerator {
145    /// Create a new generator for the given replica.
146    pub fn new(replica_id: ReplicaId) -> Self {
147        Self {
148            replica_id,
149            state: Mutex::new((0, 0)),
150        }
151    }
152
153    /// Return the current wall-clock milliseconds since Unix epoch.
154    fn now_ms() -> ArrayResult<u64> {
155        SystemTime::now()
156            .duration_since(UNIX_EPOCH)
157            .map(|d| d.as_millis() as u64)
158            .map_err(|e| ArrayError::InvalidHlc {
159                detail: format!("system clock before Unix epoch: {e}"),
160            })
161    }
162
163    /// Generate the next [`Hlc`], guaranteeing strict monotonicity.
164    ///
165    /// Implements the standard HLC advancement algorithm:
166    /// - `new_physical = max(now_ms, last_physical)`
167    /// - if `new_physical == last_physical`: `new_logical = last_logical + 1`
168    /// - else: `new_logical = 0`
169    pub fn next(&self) -> ArrayResult<Hlc> {
170        let now_ms = Self::now_ms()?;
171        if now_ms > MAX_PHYSICAL_MS {
172            return Err(ArrayError::InvalidHlc {
173                detail: format!("system clock {now_ms} exceeds MAX_PHYSICAL_MS"),
174            });
175        }
176
177        let mut guard = self.state.lock().map_err(|_| ArrayError::HlcLockPoisoned)?;
178        let (last_physical, last_logical) = *guard;
179
180        let new_physical = now_ms.max(last_physical);
181        let new_logical = if new_physical == last_physical {
182            last_logical
183                .checked_add(1)
184                .ok_or_else(|| ArrayError::InvalidHlc {
185                    detail: "logical counter overflow within one millisecond".into(),
186                })?
187        } else {
188            0
189        };
190
191        *guard = (new_physical, new_logical);
192        drop(guard);
193
194        Hlc::new(new_physical, new_logical, self.replica_id)
195    }
196
197    /// Advance local state after observing a remote [`Hlc`].
198    ///
199    /// Implements the standard HLC merge rule so that subsequent calls to
200    /// [`next`](HlcGenerator::next) return timestamps strictly greater than
201    /// any observed remote timestamp.
202    pub fn observe(&self, remote: Hlc) -> ArrayResult<()> {
203        let now_ms = Self::now_ms()?.min(MAX_PHYSICAL_MS);
204
205        let mut guard = self.state.lock().map_err(|_| ArrayError::HlcLockPoisoned)?;
206        let (last_physical, last_logical) = *guard;
207
208        let new_physical = now_ms.max(last_physical).max(remote.physical_ms);
209        let new_logical = if new_physical == last_physical && new_physical == remote.physical_ms {
210            // All three agree on physical; advance logical past both.
211            last_logical
212                .max(remote.logical)
213                .checked_add(1)
214                .ok_or_else(|| ArrayError::InvalidHlc {
215                    detail: "logical counter overflow during observe".into(),
216                })?
217        } else if new_physical == last_physical {
218            last_logical
219                .checked_add(1)
220                .ok_or_else(|| ArrayError::InvalidHlc {
221                    detail: "logical counter overflow during observe (local wins)".into(),
222                })?
223        } else if new_physical == remote.physical_ms {
224            remote
225                .logical
226                .checked_add(1)
227                .ok_or_else(|| ArrayError::InvalidHlc {
228                    detail: "logical counter overflow during observe (remote wins)".into(),
229                })?
230        } else {
231            // now_ms is strictly larger than both; reset logical.
232            0
233        };
234
235        *guard = (new_physical, new_logical);
236        Ok(())
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    fn test_replica() -> ReplicaId {
245        ReplicaId::new(42)
246    }
247
248    #[test]
249    fn monotonic_under_fast_calls() {
250        let g = HlcGenerator::new(test_replica());
251        let mut prev = g.next().unwrap();
252        for _ in 0..999 {
253            let curr = g.next().unwrap();
254            assert!(
255                curr > prev,
256                "HLC must be strictly increasing: {curr:?} <= {prev:?}"
257            );
258            prev = curr;
259        }
260    }
261
262    #[test]
263    fn survives_clock_skew_injection() {
264        let g = HlcGenerator::new(test_replica());
265
266        // Get a baseline HLC.
267        let baseline = g.next().unwrap();
268
269        // Synthesise a "remote" HLC far in the future (but within MAX_PHYSICAL_MS).
270        let future_physical = baseline.physical_ms + 100_000; // +100 seconds
271        let future_hlc = Hlc::new(future_physical, 50, ReplicaId::new(99)).unwrap();
272
273        // Observe the future HLC.
274        g.observe(future_hlc).unwrap();
275
276        // Next local HLC must be > the observed future HLC.
277        let next = g.next().unwrap();
278        assert!(
279            next > future_hlc,
280            "next {next:?} should be > observed future {future_hlc:?}"
281        );
282    }
283
284    #[test]
285    fn to_bytes_roundtrip() {
286        let hlc = Hlc::new(123_456_789, 7, ReplicaId::new(0xabcd)).unwrap();
287        let bytes = hlc.to_bytes();
288        let back = Hlc::from_bytes(&bytes);
289        assert_eq!(hlc, back);
290    }
291
292    #[test]
293    fn byte_order_matches_lexicographic() {
294        use std::collections::BTreeMap;
295
296        // Build 100 HLCs with varied fields.
297        let replica = ReplicaId::new(1);
298        let mut hlcs: Vec<Hlc> = (0u64..100)
299            .map(|i| Hlc {
300                physical_ms: i / 10,
301                logical: (i % 10) as u16,
302                replica_id: replica,
303            })
304            .collect();
305
306        // Sort by Ord.
307        let mut by_ord = hlcs.clone();
308        by_ord.sort();
309
310        // Sort by to_bytes().
311        hlcs.sort_by_key(|h| h.to_bytes());
312
313        assert_eq!(by_ord, hlcs, "byte sort must match Ord sort");
314
315        // Also verify via BTreeMap (key = bytes).
316        let mut map: BTreeMap<[u8; 18], Hlc> = BTreeMap::new();
317        for h in &by_ord {
318            map.insert(h.to_bytes(), *h);
319        }
320        let map_order: Vec<Hlc> = map.into_values().collect();
321        assert_eq!(by_ord, map_order);
322    }
323
324    #[test]
325    fn serialize_roundtrip() {
326        let hlc = Hlc::new(9_999, 3, ReplicaId::new(77)).unwrap();
327        let bytes = zerompk::to_msgpack_vec(&hlc).expect("serialize");
328        let back: Hlc = zerompk::from_msgpack(&bytes).expect("deserialize");
329        assert_eq!(hlc, back);
330    }
331
332    #[test]
333    fn physical_ms_overflow_errors() {
334        let result = Hlc::new(MAX_PHYSICAL_MS + 1, 0, ReplicaId::new(1));
335        assert!(
336            matches!(result, Err(ArrayError::InvalidHlc { .. })),
337            "expected InvalidHlc, got: {result:?}"
338        );
339    }
340
341    #[test]
342    fn hlc_zero_is_minimum() {
343        let non_zero = Hlc::new(1, 0, ReplicaId::new(0)).unwrap();
344        assert!(Hlc::ZERO < non_zero);
345    }
346}