Skip to main content

nodedb_types/
hlc.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Hybrid Logical Clock.
4//!
5//! Canonical timestamp for replicated metadata entries, descriptor
6//! modification times, and descriptor lease expiry. Monotonic across
7//! wall-clock skew: `update` folds a remote HLC into the local clock so
8//! causally later events always receive a strictly greater timestamp.
9//!
10//! Metadata DDL is not a hot path (hundreds per second at most), so the
11//! clock is backed by a short-lived mutex rather than atomics.
12
13use std::cmp::Ordering;
14use std::sync::Mutex;
15use std::time::{SystemTime, UNIX_EPOCH};
16
17use serde::{Deserialize, Serialize};
18
19/// Hybrid Logical Clock timestamp.
20///
21/// `#[non_exhaustive]` — a `epoch_id` discriminant for multi-cluster
22/// logical epochs may be added in a future release.
23///
24/// # Wire format note
25///
26/// `logical` is a 64-bit counter. MessagePack integers decode by value,
27/// so small values serialised by future readers will still deserialise
28/// correctly into this type.
29#[non_exhaustive]
30#[derive(
31    Debug,
32    Clone,
33    Copy,
34    Default,
35    PartialEq,
36    Eq,
37    Hash,
38    Serialize,
39    Deserialize,
40    zerompk::ToMessagePack,
41    zerompk::FromMessagePack,
42)]
43pub struct Hlc {
44    /// Wall-clock component: nanoseconds since the Unix epoch.
45    pub wall_ns: u64,
46    /// Logical counter incremented when two events share a wall-clock tick.
47    pub logical: u64,
48}
49
50impl Hlc {
51    pub const ZERO: Hlc = Hlc {
52        wall_ns: 0,
53        logical: 0,
54    };
55
56    pub const fn new(wall_ns: u64, logical: u64) -> Self {
57        Self { wall_ns, logical }
58    }
59}
60
61impl PartialOrd for Hlc {
62    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
63        Some(self.cmp(other))
64    }
65}
66
67impl Ord for Hlc {
68    fn cmp(&self, other: &Self) -> Ordering {
69        match self.wall_ns.cmp(&other.wall_ns) {
70            Ordering::Equal => self.logical.cmp(&other.logical),
71            other => other,
72        }
73    }
74}
75
76/// Thread-safe HLC source. One instance per node.
77#[derive(Debug, Default)]
78pub struct HlcClock {
79    state: Mutex<Hlc>,
80}
81
82impl HlcClock {
83    pub fn new() -> Self {
84        Self {
85            state: Mutex::new(Hlc::ZERO),
86        }
87    }
88
89    /// Return a new HLC strictly greater than any previously returned value
90    /// and ≥ the current wall clock.
91    pub fn now(&self) -> Hlc {
92        let wall = wall_now_ns();
93        let mut st = self.state.lock().unwrap_or_else(|p| p.into_inner());
94        // Clamp wall to at least st.wall_ns so the HLC never regresses on
95        // clock skew or NTP adjustments.
96        let wall = wall.max(st.wall_ns);
97        let next = if wall > st.wall_ns {
98            Hlc::new(wall, 0)
99        } else {
100            Hlc::new(st.wall_ns, st.logical + 1)
101        };
102        *st = next;
103        next
104    }
105
106    /// Fold a remote HLC into the local clock and return a strictly greater HLC
107    /// than both the prior local state and the remote observation.
108    pub fn update(&self, remote: Hlc) -> Hlc {
109        let wall = wall_now_ns();
110        let mut st = self.state.lock().unwrap_or_else(|p| p.into_inner());
111        let prev = *st;
112        let max_wall = wall.max(prev.wall_ns).max(remote.wall_ns);
113        let next_logical = if max_wall == prev.wall_ns && max_wall == remote.wall_ns {
114            prev.logical.max(remote.logical) + 1
115        } else if max_wall == prev.wall_ns {
116            prev.logical + 1
117        } else if max_wall == remote.wall_ns {
118            remote.logical + 1
119        } else {
120            0
121        };
122        let next = Hlc::new(max_wall, next_logical);
123        *st = next;
124        next
125    }
126
127    /// Read the last observed HLC without advancing.
128    pub fn peek(&self) -> Hlc {
129        *self.state.lock().unwrap_or_else(|p| p.into_inner())
130    }
131}
132
133fn wall_now_ns() -> u64 {
134    SystemTime::now()
135        .duration_since(UNIX_EPOCH)
136        .map(|d| d.as_nanos() as u64)
137        .unwrap_or_else(|_| {
138            use std::sync::atomic::{AtomicBool, Ordering};
139            static LOGGED: AtomicBool = AtomicBool::new(false);
140            if !LOGGED.swap(true, Ordering::Relaxed) {
141                tracing::error!(
142                    module = module_path!(),
143                    "system clock is before UNIX_EPOCH; using 0 (epoch) \
144                     — check NTP/RTC configuration"
145                );
146            }
147            0
148        })
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn monotonic_within_tick() {
157        let clock = HlcClock::new();
158        let mut prev = clock.now();
159        for _ in 0..1_000 {
160            let next = clock.now();
161            assert!(next > prev);
162            prev = next;
163        }
164    }
165
166    #[test]
167    fn update_produces_strictly_greater() {
168        let clock = HlcClock::new();
169        let local = clock.now();
170        let remote = Hlc::new(local.wall_ns + 1_000_000, 7);
171        let merged = clock.update(remote);
172        assert!(merged > remote);
173        assert!(merged > local);
174    }
175
176    #[test]
177    fn ordering_is_total() {
178        let a = Hlc::new(10, 0);
179        let b = Hlc::new(10, 1);
180        let c = Hlc::new(11, 0);
181        assert!(a < b);
182        assert!(b < c);
183        assert!(a < c);
184    }
185
186    /// 100K rapid-fire `now()` calls must each be strictly greater than
187    /// the previous. Verifies the u64 logical counter advances without
188    /// saturation.
189    #[test]
190    fn burst_strictly_monotonic() {
191        let clock = HlcClock::new();
192        let mut prev = clock.now();
193        for _ in 0..100_000 {
194            let next = clock.now();
195            assert!(
196                next > prev,
197                "monotonicity violated: prev={prev:?} next={next:?}"
198            );
199            prev = next;
200        }
201    }
202
203    /// Directly set the clock's logical counter to `u32::MAX` (the old
204    /// saturation ceiling) and assert that the next `now()` returns a
205    /// strictly greater HLC. Under the old `u32::saturating_add(1)` this
206    /// would have pinned and returned an equal value.
207    #[test]
208    fn saturating_add_regression() {
209        let clock = HlcClock::new();
210        // Prime the clock so wall_ns is fixed.
211        let seed = clock.now();
212        // Inject logical = u32::MAX at the same wall_ns to reproduce the
213        // old saturation point.
214        {
215            let mut st = clock.state.lock().unwrap_or_else(|p| p.into_inner());
216            *st = Hlc::new(seed.wall_ns, u32::MAX as u64);
217        }
218        let next = clock.now();
219        assert!(
220            next > Hlc::new(seed.wall_ns, u32::MAX as u64),
221            "logical counter pinned at u32::MAX: next={next:?}"
222        );
223    }
224
225    /// 100K `update(remote)` calls with a fixed remote must each produce
226    /// a strictly greater HLC than the prior call.
227    #[test]
228    fn update_burst_strictly_monotonic() {
229        let clock = HlcClock::new();
230        let remote = clock.now();
231        let mut prev = clock.update(remote);
232        for _ in 0..100_000 {
233            let next = clock.update(remote);
234            assert!(
235                next > prev,
236                "update monotonicity violated: prev={prev:?} next={next:?}"
237            );
238            prev = next;
239        }
240    }
241
242    /// An `Hlc` with `logical` above `u32::MAX` must survive a zerompk
243    /// encode/decode roundtrip with the value preserved exactly.
244    #[test]
245    fn roundtrip_logical_gt_u32_max() {
246        let logical_val = (u32::MAX as u64) + 1;
247        let original = Hlc::new(1_700_000_000_000_000_000_u64, logical_val);
248        let bytes = zerompk::to_msgpack_vec(&original).expect("encode");
249        let decoded: Hlc = zerompk::from_msgpack(&bytes).expect("decode");
250        assert_eq!(decoded, original);
251        assert_eq!(decoded.logical, logical_val);
252    }
253}