Skip to main content

lnc_core/
hlc.rs

1//! Hybrid Logical Clock (HLC) implementation for LANCE.
2//!
3//! HLC provides causally consistent timestamps that remain close to physical wall-clock
4//! time while handling clock drift, NTP kinks, and distributed ordering.
5//!
6//! # Structure
7//!
8//! An HLC timestamp is a 64-bit value packed as:
9//! - **44 bits**: Physical time in milliseconds since Unix epoch (~557 years)
10//! - **20 bits**: Logical counter (1M events per millisecond per node)
11//!
12//! # Guarantees
13//!
14//! - **Monotonicity**: Local timestamps always increase
15//! - **Causality**: If A happened-before B, then HLC(A) < HLC(B)
16//! - **Physical closeness**: HLC stays within bounded drift of wall-clock time
17
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{SystemTime, UNIX_EPOCH};
20use zerocopy::{FromBytes, IntoBytes};
21
22/// Hybrid Logical Clock timestamp (64-bit packed).
23///
24/// Layout: `[physical_ms: 44 bits][logical: 20 bits]`
25///
26/// - 44 bits physical = ~557 years from epoch (sufficient until 2527)
27/// - 20 bits logical = 1M events per millisecond per node
28#[derive(
29    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, IntoBytes, FromBytes,
30)]
31#[repr(transparent)]
32pub struct HlcTimestamp(u64);
33
34impl HlcTimestamp {
35    /// Number of bits allocated to the logical counter.
36    const LOGICAL_BITS: u32 = 20;
37
38    /// Mask for extracting the logical counter.
39    const LOGICAL_MASK: u64 = (1 << Self::LOGICAL_BITS) - 1;
40
41    /// Maximum value for the logical counter.
42    pub const MAX_LOGICAL: u32 = (1 << Self::LOGICAL_BITS) - 1;
43
44    /// Create a new HLC timestamp from physical time (ms) and logical counter.
45    ///
46    /// # Panics
47    ///
48    /// Debug builds will panic if `logical >= 2^20`.
49    #[inline]
50    #[must_use]
51    pub const fn new(physical_ms: u64, logical: u32) -> Self {
52        debug_assert!(logical < (1 << Self::LOGICAL_BITS));
53        Self((physical_ms << Self::LOGICAL_BITS) | (logical as u64))
54    }
55
56    /// Create an HLC timestamp from a raw u64 value.
57    #[inline]
58    #[must_use]
59    pub const fn from_raw(raw: u64) -> Self {
60        Self(raw)
61    }
62
63    /// Create a zero (minimum) HLC timestamp.
64    #[inline]
65    #[must_use]
66    pub const fn zero() -> Self {
67        Self(0)
68    }
69
70    /// Create a maximum HLC timestamp.
71    #[inline]
72    #[must_use]
73    pub const fn max() -> Self {
74        Self(u64::MAX)
75    }
76
77    /// Extract the physical time component in milliseconds.
78    #[inline]
79    #[must_use]
80    pub const fn physical_ms(&self) -> u64 {
81        self.0 >> Self::LOGICAL_BITS
82    }
83
84    /// Extract the logical counter component.
85    #[inline]
86    #[must_use]
87    pub const fn logical(&self) -> u32 {
88        (self.0 & Self::LOGICAL_MASK) as u32
89    }
90
91    /// Get the raw u64 representation.
92    #[inline]
93    #[must_use]
94    pub const fn as_u64(&self) -> u64 {
95        self.0
96    }
97
98    /// Convert to nanoseconds (approximate, for display/logging only).
99    ///
100    /// Note: This loses precision since HLC uses milliseconds internally.
101    #[inline]
102    #[must_use]
103    pub const fn to_nanos_approx(&self) -> u64 {
104        self.physical_ms() * 1_000_000
105    }
106}
107
108impl std::fmt::Display for HlcTimestamp {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        write!(f, "HLC({}.{})", self.physical_ms(), self.logical())
111    }
112}
113
114/// Hybrid Logical Clock for generating causally consistent timestamps.
115///
116/// Thread-safe via atomic operations. Each node should have one HLC instance.
117///
118/// # Example
119///
120/// ```
121/// use lnc_core::HybridLogicalClock;
122///
123/// let hlc = HybridLogicalClock::new(1);
124/// let ts1 = hlc.now();
125/// let ts2 = hlc.now();
126/// assert!(ts1 < ts2);
127/// ```
128pub struct HybridLogicalClock {
129    /// Last known HLC timestamp (atomic for thread safety).
130    last: AtomicU64,
131    /// Node ID for identification (not part of HLC, used externally).
132    node_id: u16,
133}
134
135impl HybridLogicalClock {
136    /// Maximum drift allowed before warning (milliseconds).
137    pub const MAX_DRIFT_WARNING_MS: u64 = 100;
138
139    /// Maximum drift allowed before critical alert (milliseconds).
140    pub const MAX_DRIFT_CRITICAL_MS: u64 = 1000;
141
142    /// Create a new HLC instance for a given node.
143    #[must_use]
144    pub fn new(node_id: u16) -> Self {
145        Self {
146            last: AtomicU64::new(0),
147            node_id,
148        }
149    }
150
151    /// Get the node ID associated with this clock.
152    #[inline]
153    #[must_use]
154    pub const fn node_id(&self) -> u16 {
155        self.node_id
156    }
157
158    /// Generate a new HLC timestamp for a local event.
159    ///
160    /// This method is lock-free and thread-safe via CAS operations.
161    ///
162    /// # Algorithm
163    ///
164    /// 1. If wall clock > last physical: use wall clock, reset logical to 0
165    /// 2. Otherwise: keep physical, increment logical
166    /// 3. CAS to ensure monotonicity under concurrent access
167    pub fn now(&self) -> HlcTimestamp {
168        let wall_ms = Self::wall_clock_ms();
169
170        loop {
171            let last = HlcTimestamp(self.last.load(Ordering::Acquire));
172            let last_physical = last.physical_ms();
173            let last_logical = last.logical();
174
175            let (new_physical, new_logical) = if wall_ms > last_physical {
176                // Wall clock advanced: reset logical counter
177                (wall_ms, 0)
178            } else {
179                // Wall clock unchanged or behind: increment logical
180                if last_logical >= HlcTimestamp::MAX_LOGICAL {
181                    // Logical counter exhausted: spin until wall clock advances
182                    // This should be extremely rare (1M events/ms)
183                    std::hint::spin_loop();
184                    continue;
185                }
186                (last_physical, last_logical + 1)
187            };
188
189            let new_ts = HlcTimestamp::new(new_physical, new_logical);
190
191            // CAS to ensure monotonicity under concurrency
192            if self
193                .last
194                .compare_exchange_weak(last.0, new_ts.0, Ordering::AcqRel, Ordering::Acquire)
195                .is_ok()
196            {
197                return new_ts;
198            }
199            // CAS failed: another thread updated; retry
200        }
201    }
202
203    /// Update HLC upon receiving a message with a remote timestamp.
204    ///
205    /// Returns a new timestamp that is greater than both the local clock
206    /// and the remote timestamp, ensuring causal ordering.
207    ///
208    /// # Algorithm
209    ///
210    /// 1. If wall clock > max(local, remote): use wall clock, reset logical
211    /// 2. If local > remote: use local physical, increment local logical
212    /// 3. If remote > local: use remote physical, increment remote logical
213    /// 4. If equal physical: use that physical, increment max logical
214    pub fn receive(&self, remote_ts: HlcTimestamp) -> HlcTimestamp {
215        let wall_ms = Self::wall_clock_ms();
216
217        loop {
218            let last = HlcTimestamp(self.last.load(Ordering::Acquire));
219
220            let (new_physical, new_logical) =
221                if wall_ms > last.physical_ms() && wall_ms > remote_ts.physical_ms() {
222                    // Wall clock is ahead of both: use wall clock, reset logical
223                    (wall_ms, 0)
224                } else if last.physical_ms() > remote_ts.physical_ms() {
225                    // Local is ahead: increment local logical
226                    (last.physical_ms(), last.logical().saturating_add(1))
227                } else if remote_ts.physical_ms() > last.physical_ms() {
228                    // Remote is ahead: adopt remote physical, increment remote logical
229                    (
230                        remote_ts.physical_ms(),
231                        remote_ts.logical().saturating_add(1),
232                    )
233                } else {
234                    // Same physical time: use max logical + 1
235                    let max_logical = last.logical().max(remote_ts.logical());
236                    (last.physical_ms(), max_logical.saturating_add(1))
237                };
238
239            // Clamp logical to max value (shouldn't happen in practice)
240            let new_logical = new_logical.min(HlcTimestamp::MAX_LOGICAL);
241            let new_ts = HlcTimestamp::new(new_physical, new_logical);
242
243            if self
244                .last
245                .compare_exchange_weak(last.0, new_ts.0, Ordering::AcqRel, Ordering::Acquire)
246                .is_ok()
247            {
248                return new_ts;
249            }
250        }
251    }
252
253    /// Get the current HLC timestamp without advancing it.
254    ///
255    /// Useful for reading the last known time without generating a new event.
256    #[inline]
257    #[must_use]
258    pub fn current(&self) -> HlcTimestamp {
259        HlcTimestamp(self.last.load(Ordering::Acquire))
260    }
261
262    /// Calculate the drift between HLC physical time and wall clock.
263    ///
264    /// Returns positive value if HLC is ahead of wall clock.
265    #[must_use]
266    pub fn drift_ms(&self) -> i64 {
267        let wall_ms = Self::wall_clock_ms();
268        let hlc_physical = self.current().physical_ms();
269        #[allow(clippy::cast_possible_wrap)]
270        let result = hlc_physical as i64 - wall_ms as i64;
271        result
272    }
273
274    /// Check if the clock drift is within acceptable bounds.
275    #[must_use]
276    pub fn health(&self) -> ClockHealth {
277        let drift = self.drift_ms().unsigned_abs();
278
279        if drift > Self::MAX_DRIFT_CRITICAL_MS {
280            ClockHealth::Critical { drift_ms: drift }
281        } else if drift > Self::MAX_DRIFT_WARNING_MS {
282            ClockHealth::Degraded { drift_ms: drift }
283        } else {
284            ClockHealth::Healthy
285        }
286    }
287
288    /// Get current wall clock time in milliseconds since Unix epoch.
289    #[inline]
290    fn wall_clock_ms() -> u64 {
291        SystemTime::now()
292            .duration_since(UNIX_EPOCH)
293            .map(|d| {
294                #[allow(clippy::cast_possible_truncation)]
295                let ms = d.as_millis() as u64;
296                ms
297            })
298            .unwrap_or(0)
299    }
300}
301
302impl std::fmt::Debug for HybridLogicalClock {
303    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304        f.debug_struct("HybridLogicalClock")
305            .field("node_id", &self.node_id)
306            .field("current", &self.current())
307            .field("drift_ms", &self.drift_ms())
308            .finish_non_exhaustive()
309    }
310}
311
312/// Health status of the HLC clock.
313#[derive(Debug, Clone, Copy, PartialEq, Eq)]
314pub enum ClockHealth {
315    /// Clock drift is within normal bounds (<100ms).
316    Healthy,
317    /// Clock drift is elevated but acceptable (100ms - 1s).
318    Degraded { drift_ms: u64 },
319    /// Clock drift is critical (>1s) - indicates NTP failure or partition.
320    Critical { drift_ms: u64 },
321}
322
323impl ClockHealth {
324    /// Check if the clock is in a healthy state.
325    #[inline]
326    #[must_use]
327    pub const fn is_healthy(&self) -> bool {
328        matches!(self, Self::Healthy)
329    }
330
331    /// Check if the clock is in a critical state.
332    #[inline]
333    #[must_use]
334    pub const fn is_critical(&self) -> bool {
335        matches!(self, Self::Critical { .. })
336    }
337}
338
339#[cfg(test)]
340#[allow(clippy::unwrap_used, clippy::expect_used, clippy::unreadable_literal)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn test_hlc_timestamp_packing() {
346        let ts = HlcTimestamp::new(1234567890, 42);
347        assert_eq!(ts.physical_ms(), 1234567890);
348        assert_eq!(ts.logical(), 42);
349    }
350
351    #[test]
352    fn test_hlc_timestamp_ordering() {
353        let ts1 = HlcTimestamp::new(100, 0);
354        let ts2 = HlcTimestamp::new(100, 1);
355        let ts3 = HlcTimestamp::new(101, 0);
356
357        assert!(ts1 < ts2);
358        assert!(ts2 < ts3);
359        assert!(ts1 < ts3);
360    }
361
362    #[test]
363    fn test_hlc_monotonicity() {
364        let hlc = HybridLogicalClock::new(1);
365        let mut prev = hlc.now();
366
367        for _ in 0..1000 {
368            let curr = hlc.now();
369            assert!(curr > prev, "HLC must be strictly monotonic");
370            prev = curr;
371        }
372    }
373
374    #[test]
375    fn test_hlc_receive_advances() {
376        let hlc = HybridLogicalClock::new(1);
377        let local = hlc.now();
378
379        // Simulate receiving a message with a future timestamp
380        let remote = HlcTimestamp::new(local.physical_ms() + 1000, 0);
381        let after_receive = hlc.receive(remote);
382
383        assert!(after_receive > local);
384        assert!(after_receive > remote);
385    }
386
387    #[test]
388    fn test_hlc_receive_same_physical() {
389        let hlc = HybridLogicalClock::new(1);
390
391        // Force the HLC to a known state
392        let ts1 = hlc.now();
393
394        // Receive a message with same physical but higher logical
395        let remote = HlcTimestamp::new(ts1.physical_ms(), ts1.logical() + 10);
396        let after = hlc.receive(remote);
397
398        assert!(after > remote);
399        assert!(after > ts1);
400    }
401
402    #[test]
403    fn test_hlc_concurrent_access() {
404        use std::sync::Arc;
405        use std::thread;
406
407        let hlc = Arc::new(HybridLogicalClock::new(1));
408        let mut handles = vec![];
409
410        for _ in 0..4 {
411            let hlc_clone = Arc::clone(&hlc);
412            handles.push(thread::spawn(move || {
413                let mut timestamps = Vec::with_capacity(1000);
414                for _ in 0..1000 {
415                    timestamps.push(hlc_clone.now());
416                }
417                timestamps
418            }));
419        }
420
421        let mut all_timestamps: Vec<HlcTimestamp> = handles
422            .into_iter()
423            .flat_map(|h| h.join().expect("thread panicked"))
424            .collect();
425
426        // All timestamps should be unique (no duplicates)
427        let original_len = all_timestamps.len();
428        all_timestamps.sort();
429        all_timestamps.dedup();
430        assert_eq!(
431            all_timestamps.len(),
432            original_len,
433            "HLC produced duplicate timestamps"
434        );
435    }
436
437    #[test]
438    fn test_hlc_size() {
439        assert_eq!(std::mem::size_of::<HlcTimestamp>(), 8);
440    }
441
442    #[test]
443    fn test_clock_health() {
444        let hlc = HybridLogicalClock::new(1);
445        // Generate a timestamp to sync with wall clock
446        let _ = hlc.now();
447        // After generating a timestamp, clock should be healthy
448        assert!(hlc.health().is_healthy());
449    }
450}