Skip to main content

ringkernel_core/
hlc.rs

1//! Hybrid Logical Clock (HLC) implementation for causal ordering.
2//!
3//! HLC combines physical time with logical counters to provide total ordering
4//! of events across distributed GPU kernels while maintaining close relationship
5//! with real time.
6//!
7//! ## Properties
8//!
9//! - **Total Ordering**: All timestamps can be compared
10//! - **Causality**: If event A causes event B, then HLC(A) < HLC(B)
11//! - **Bounded Drift**: Physical component stays within bounded drift of real time
12//!
13//! ## Usage
14//!
15//! ```
16//! use ringkernel_core::hlc::{HlcTimestamp, HlcClock};
17//!
18//! let clock = HlcClock::new(1); // Node ID = 1
19//! let ts1 = clock.tick();
20//! let ts2 = clock.tick();
21//! assert!(ts1 < ts2); // tick() guarantees strictly increasing timestamps
22//! ```
23
24use bytemuck::{Pod, Zeroable};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{SystemTime, UNIX_EPOCH};
27use zerocopy::{AsBytes, FromBytes, FromZeroes};
28
29use crate::error::{Result, RingKernelError};
30
31/// Maximum allowed clock skew in milliseconds.
32pub const MAX_CLOCK_SKEW_MS: u64 = 60_000; // 1 minute
33
34/// Hybrid Logical Clock timestamp.
35///
36/// Composed of:
37/// - Physical time (wall clock in microseconds since epoch)
38/// - Logical counter (for events at same physical time)
39/// - Node ID (for tie-breaking across nodes)
40///
41/// This struct is 24 bytes and cache-line friendly.
42#[derive(
43    Debug,
44    Clone,
45    Copy,
46    PartialEq,
47    Eq,
48    Hash,
49    AsBytes,
50    FromBytes,
51    FromZeroes,
52    Pod,
53    Zeroable,
54    rkyv::Archive,
55    rkyv::Serialize,
56    rkyv::Deserialize,
57)]
58#[archive(compare(PartialEq))]
59#[repr(C, align(8))]
60pub struct HlcTimestamp {
61    /// Physical time component (microseconds since UNIX epoch).
62    pub physical: u64,
63    /// Logical counter for events at the same physical time.
64    pub logical: u64,
65    /// Node identifier for tie-breaking.
66    pub node_id: u64,
67}
68
69impl HlcTimestamp {
70    /// Create a new HLC timestamp.
71    pub const fn new(physical: u64, logical: u64, node_id: u64) -> Self {
72        Self {
73            physical,
74            logical,
75            node_id,
76        }
77    }
78
79    /// Create a zero timestamp (minimum value).
80    pub const fn zero() -> Self {
81        Self {
82            physical: 0,
83            logical: 0,
84            node_id: 0,
85        }
86    }
87
88    /// Create a timestamp from the current wall clock.
89    #[inline]
90    pub fn now(node_id: u64) -> Self {
91        let physical = SystemTime::now()
92            .duration_since(UNIX_EPOCH)
93            .expect("Time went backwards")
94            .as_micros() as u64;
95
96        Self {
97            physical,
98            logical: 0,
99            node_id,
100        }
101    }
102
103    /// Check if this timestamp is zero/uninitialized.
104    pub const fn is_zero(&self) -> bool {
105        self.physical == 0 && self.logical == 0
106    }
107
108    /// Get physical time as microseconds since epoch.
109    pub const fn as_micros(&self) -> u64 {
110        self.physical
111    }
112
113    /// Get physical time as milliseconds since epoch.
114    pub const fn as_millis(&self) -> u64 {
115        self.physical / 1000
116    }
117
118    /// Pack timestamp into a single u128 for atomic comparison.
119    /// Format: [physical:64][logical:48][node_id:16]
120    pub const fn pack(&self) -> u128 {
121        ((self.physical as u128) << 64)
122            | ((self.logical as u128) << 16)
123            | (self.node_id as u128 & 0xFFFF)
124    }
125
126    /// Unpack timestamp from u128.
127    pub const fn unpack(packed: u128) -> Self {
128        Self {
129            physical: (packed >> 64) as u64,
130            logical: ((packed >> 16) & 0xFFFF_FFFF_FFFF) as u64,
131            node_id: (packed & 0xFFFF) as u64,
132        }
133    }
134}
135
136impl Default for HlcTimestamp {
137    fn default() -> Self {
138        Self::zero()
139    }
140}
141
142impl Ord for HlcTimestamp {
143    #[inline]
144    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
145        // Compare physical time first
146        match self.physical.cmp(&other.physical) {
147            std::cmp::Ordering::Equal => {}
148            ord => return ord,
149        }
150        // Then logical counter
151        match self.logical.cmp(&other.logical) {
152            std::cmp::Ordering::Equal => {}
153            ord => return ord,
154        }
155        // Finally node_id for total ordering
156        self.node_id.cmp(&other.node_id)
157    }
158}
159
160impl PartialOrd for HlcTimestamp {
161    #[inline]
162    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
163        Some(self.cmp(other))
164    }
165}
166
167impl std::fmt::Display for HlcTimestamp {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        write!(
170            f,
171            "HLC({}.{}.{})",
172            self.physical, self.logical, self.node_id
173        )
174    }
175}
176
177/// Hybrid Logical Clock for generating causally-ordered timestamps.
178///
179/// Thread-safe implementation using atomics for the state.
180pub struct HlcClock {
181    /// Current physical time (atomically updated).
182    physical: AtomicU64,
183    /// Current logical counter (atomically updated).
184    logical: AtomicU64,
185    /// Node identifier.
186    node_id: u64,
187    /// Maximum allowed clock drift in microseconds.
188    max_drift_us: u64,
189}
190
191impl HlcClock {
192    /// Create a new HLC clock with the given node ID.
193    pub fn new(node_id: u64) -> Self {
194        let now = SystemTime::now()
195            .duration_since(UNIX_EPOCH)
196            .expect("Time went backwards")
197            .as_micros() as u64;
198
199        Self {
200            physical: AtomicU64::new(now),
201            logical: AtomicU64::new(0),
202            node_id,
203            max_drift_us: MAX_CLOCK_SKEW_MS * 1000,
204        }
205    }
206
207    /// Create a new HLC clock with custom max drift.
208    pub fn with_max_drift(node_id: u64, max_drift_ms: u64) -> Self {
209        let now = SystemTime::now()
210            .duration_since(UNIX_EPOCH)
211            .expect("Time went backwards")
212            .as_micros() as u64;
213
214        Self {
215            physical: AtomicU64::new(now),
216            logical: AtomicU64::new(0),
217            node_id,
218            max_drift_us: max_drift_ms * 1000,
219        }
220    }
221
222    /// Get the node ID.
223    pub fn node_id(&self) -> u64 {
224        self.node_id
225    }
226
227    /// Get current timestamp without advancing the clock.
228    pub fn now(&self) -> HlcTimestamp {
229        let wall = Self::wall_time();
230        let physical = self.physical.load(Ordering::Acquire);
231        let logical = self.logical.load(Ordering::Acquire);
232
233        // Use max of wall clock and stored physical
234        let new_physical = physical.max(wall);
235
236        HlcTimestamp {
237            physical: new_physical,
238            logical,
239            node_id: self.node_id,
240        }
241    }
242
243    /// Generate a new timestamp, advancing the clock.
244    #[inline]
245    pub fn tick(&self) -> HlcTimestamp {
246        let wall = Self::wall_time();
247
248        loop {
249            let old_physical = self.physical.load(Ordering::Acquire);
250            let old_logical = self.logical.load(Ordering::Acquire);
251
252            let (new_physical, new_logical) = if wall > old_physical {
253                // Wall clock advanced: use wall time, reset logical
254                (wall, 0)
255            } else {
256                // Same or past: increment logical counter
257                (old_physical, old_logical + 1)
258            };
259
260            // Try to update atomically
261            if self
262                .physical
263                .compare_exchange(
264                    old_physical,
265                    new_physical,
266                    Ordering::Release,
267                    Ordering::Relaxed,
268                )
269                .is_ok()
270            {
271                self.logical.store(new_logical, Ordering::Release);
272                return HlcTimestamp {
273                    physical: new_physical,
274                    logical: new_logical,
275                    node_id: self.node_id,
276                };
277            }
278            // CAS failed, retry
279        }
280    }
281
282    /// Update clock on receiving a message with the given timestamp.
283    ///
284    /// Returns the new local timestamp that causally follows the received timestamp.
285    #[inline]
286    pub fn update(&self, received: &HlcTimestamp) -> Result<HlcTimestamp> {
287        let wall = Self::wall_time();
288
289        // Check for clock skew
290        if received.physical > wall + self.max_drift_us {
291            return Err(RingKernelError::ClockSkew {
292                skew_ms: (received.physical - wall) / 1000,
293                max_ms: self.max_drift_us / 1000,
294            });
295        }
296
297        loop {
298            let old_physical = self.physical.load(Ordering::Acquire);
299            let old_logical = self.logical.load(Ordering::Acquire);
300
301            // Take max of wall, local, and received physical
302            let max_physical = wall.max(old_physical).max(received.physical);
303
304            let new_logical = if max_physical == old_physical && max_physical == received.physical {
305                // All three equal: take max logical + 1
306                old_logical.max(received.logical) + 1
307            } else if max_physical == old_physical {
308                // Local physical wins: increment local logical
309                old_logical + 1
310            } else if max_physical == received.physical {
311                // Received physical wins: use received logical + 1
312                received.logical + 1
313            } else {
314                // Wall clock wins: reset logical
315                0
316            };
317
318            // Try to update atomically
319            if self
320                .physical
321                .compare_exchange(
322                    old_physical,
323                    max_physical,
324                    Ordering::Release,
325                    Ordering::Relaxed,
326                )
327                .is_ok()
328            {
329                self.logical.store(new_logical, Ordering::Release);
330                return Ok(HlcTimestamp {
331                    physical: max_physical,
332                    logical: new_logical,
333                    node_id: self.node_id,
334                });
335            }
336            // CAS failed, retry
337        }
338    }
339
340    /// Get current wall clock time in microseconds.
341    #[inline]
342    fn wall_time() -> u64 {
343        SystemTime::now()
344            .duration_since(UNIX_EPOCH)
345            .expect("Time went backwards")
346            .as_micros() as u64
347    }
348}
349
350impl std::fmt::Debug for HlcClock {
351    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352        f.debug_struct("HlcClock")
353            .field("physical", &self.physical.load(Ordering::Relaxed))
354            .field("logical", &self.logical.load(Ordering::Relaxed))
355            .field("node_id", &self.node_id)
356            .finish()
357    }
358}
359
360/// Compact HLC state for GPU-side storage (16 bytes).
361#[derive(Debug, Clone, Copy, Default, AsBytes, FromBytes, FromZeroes, Pod, Zeroable)]
362#[repr(C, align(16))]
363pub struct HlcState {
364    /// Physical time in microseconds.
365    pub physical: u64,
366    /// Logical counter.
367    pub logical: u64,
368}
369
370impl HlcState {
371    /// Create new HLC state.
372    pub const fn new(physical: u64, logical: u64) -> Self {
373        Self { physical, logical }
374    }
375
376    /// Convert to full timestamp with node ID.
377    pub const fn to_timestamp(&self, node_id: u64) -> HlcTimestamp {
378        HlcTimestamp {
379            physical: self.physical,
380            logical: self.logical,
381            node_id,
382        }
383    }
384
385    /// Create from full timestamp (drops node_id).
386    pub const fn from_timestamp(ts: &HlcTimestamp) -> Self {
387        Self {
388            physical: ts.physical,
389            logical: ts.logical,
390        }
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_timestamp_ordering() {
400        let ts1 = HlcTimestamp::new(100, 0, 1);
401        let ts2 = HlcTimestamp::new(100, 1, 1);
402        let ts3 = HlcTimestamp::new(101, 0, 1);
403
404        assert!(ts1 < ts2);
405        assert!(ts2 < ts3);
406        assert!(ts1 < ts3);
407    }
408
409    #[test]
410    fn test_timestamp_node_id_tiebreak() {
411        let ts1 = HlcTimestamp::new(100, 5, 1);
412        let ts2 = HlcTimestamp::new(100, 5, 2);
413
414        assert!(ts1 < ts2);
415    }
416
417    #[test]
418    fn test_clock_tick() {
419        let clock = HlcClock::new(1);
420
421        let ts1 = clock.tick();
422        let ts2 = clock.tick();
423        let ts3 = clock.tick();
424
425        assert!(ts1 < ts2);
426        assert!(ts2 < ts3);
427    }
428
429    #[test]
430    fn test_clock_update() {
431        let clock1 = HlcClock::new(1);
432        let clock2 = HlcClock::new(2);
433
434        let ts1 = clock1.tick();
435        let ts2 = clock2.update(&ts1).unwrap();
436
437        // ts2 should causally follow ts1
438        assert!(ts1 < ts2);
439    }
440
441    #[test]
442    fn test_pack_unpack() {
443        let original = HlcTimestamp::new(12345678901234, 42, 7);
444        let packed = original.pack();
445        let unpacked = HlcTimestamp::unpack(packed);
446
447        assert_eq!(original.physical, unpacked.physical);
448        // Note: node_id is truncated to 16 bits in pack format
449        assert_eq!(original.logical, unpacked.logical);
450    }
451
452    #[test]
453    fn test_clock_skew_detection() {
454        let clock = HlcClock::with_max_drift(1, 100); // 100ms max drift
455
456        // Create a timestamp far in the future
457        let future = HlcTimestamp::new(
458            SystemTime::now()
459                .duration_since(UNIX_EPOCH)
460                .unwrap()
461                .as_micros() as u64
462                + 200_000_000, // 200 seconds in future
463            0,
464            2,
465        );
466
467        let result = clock.update(&future);
468        assert!(matches!(result, Err(RingKernelError::ClockSkew { .. })));
469    }
470
471    #[test]
472    fn test_timestamp_display() {
473        let ts = HlcTimestamp::new(1234567890, 42, 7);
474        let s = format!("{}", ts);
475        assert!(s.contains("1234567890"));
476        assert!(s.contains("42"));
477        assert!(s.contains("7"));
478    }
479}
480
481#[cfg(test)]
482mod proptests {
483    use super::*;
484    use proptest::prelude::*;
485
486    proptest! {
487        #[test]
488        fn total_ordering_reflexive(p in 0u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
489            let ts = HlcTimestamp::new(p, l, n);
490            prop_assert_eq!(ts.cmp(&ts), std::cmp::Ordering::Equal);
491        }
492
493        #[test]
494        fn total_ordering_antisymmetric(
495            p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
496            p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
497        ) {
498            let a = HlcTimestamp::new(p1, l1, n1);
499            let b = HlcTimestamp::new(p2, l2, n2);
500            if a <= b && b <= a {
501                prop_assert_eq!(a, b);
502            }
503        }
504
505        #[test]
506        fn total_ordering_transitive(
507            p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
508            p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
509            p3 in 0u64..1_000_000, l3 in 0u64..1000, n3 in 0u64..100,
510        ) {
511            let a = HlcTimestamp::new(p1, l1, n1);
512            let b = HlcTimestamp::new(p2, l2, n2);
513            let c = HlcTimestamp::new(p3, l3, n3);
514            if a <= b && b <= c {
515                prop_assert!(a <= c);
516            }
517        }
518
519        #[test]
520        fn zero_is_minimum(p in 1u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
521            let zero = HlcTimestamp::zero();
522            let ts = HlcTimestamp::new(p, l, n);
523            prop_assert!(zero < ts);
524        }
525
526        #[test]
527        fn pack_unpack_preserves_physical_and_logical(
528            p in 0u64..u64::MAX, l in 0u64..0xFFFF_FFFF_FFFF, n in 0u64..0xFFFF,
529        ) {
530            let ts = HlcTimestamp::new(p, l, n);
531            let unpacked = HlcTimestamp::unpack(ts.pack());
532            prop_assert_eq!(ts.physical, unpacked.physical);
533            prop_assert_eq!(ts.logical, unpacked.logical);
534            prop_assert_eq!(ts.node_id, unpacked.node_id);
535        }
536
537        #[test]
538        fn tick_strictly_increasing(n in 2usize..=20) {
539            let clock = HlcClock::new(42);
540            let mut prev = clock.tick();
541            for _ in 1..n {
542                let curr = clock.tick();
543                prop_assert!(curr > prev, "tick() must be strictly increasing: {:?} not > {:?}", curr, prev);
544                prev = curr;
545            }
546        }
547
548        #[test]
549        fn update_preserves_causality(node_a in 1u64..100, node_b in 100u64..200) {
550            let clock_a = HlcClock::new(node_a);
551            let clock_b = HlcClock::new(node_b);
552
553            let ts_a = clock_a.tick();
554            let ts_b = clock_b.update(&ts_a).unwrap();
555
556            // Received message causality: ts_b must follow ts_a
557            prop_assert!(ts_b > ts_a);
558        }
559    }
560}