Skip to main content

ringkernel_core/
hlc.rs

1//! Hybrid Logical Clock (HLC) implementation for causal ordering.
2//!
3//! HLC combines physical time with logical counters to provide total ordering
4//! of events across distributed GPU kernels while maintaining close relationship
5//! with real time.
6//!
7//! ## Properties
8//!
9//! - **Total Ordering**: All timestamps can be compared
10//! - **Causality**: If event A causes event B, then HLC(A) < HLC(B)
11//! - **Bounded Drift**: Physical component stays within bounded drift of real time
12//!
13//! ## Usage
14//!
15//! ```
16//! use ringkernel_core::hlc::{HlcTimestamp, HlcClock};
17//!
18//! let clock = HlcClock::new(1); // Node ID = 1
19//! let ts1 = clock.tick();
20//! let ts2 = clock.tick();
21//! assert!(ts1 < ts2); // tick() guarantees strictly increasing timestamps
22//! ```
23
24use bytemuck::{Pod, Zeroable};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{SystemTime, UNIX_EPOCH};
27use zerocopy::{AsBytes, FromBytes, FromZeroes};
28
29use crate::error::{Result, RingKernelError};
30
31/// Maximum allowed clock skew in milliseconds.
32pub const MAX_CLOCK_SKEW_MS: u64 = 60_000; // 1 minute
33
34/// Hybrid Logical Clock timestamp.
35///
36/// Composed of:
37/// - Physical time (wall clock in microseconds since epoch)
38/// - Logical counter (for events at same physical time)
39/// - Node ID (for tie-breaking across nodes)
40///
41/// This struct is 24 bytes and cache-line friendly.
42#[derive(
43    Debug, Clone, Copy, PartialEq, Eq, Hash, AsBytes, FromBytes, FromZeroes, Pod, Zeroable,
44)]
45#[repr(C, align(8))]
46pub struct HlcTimestamp {
47    /// Physical time component (microseconds since UNIX epoch).
48    pub physical: u64,
49    /// Logical counter for events at the same physical time.
50    pub logical: u64,
51    /// Node identifier for tie-breaking.
52    pub node_id: u64,
53}
54
55impl HlcTimestamp {
56    /// Create a new HLC timestamp.
57    pub const fn new(physical: u64, logical: u64, node_id: u64) -> Self {
58        Self {
59            physical,
60            logical,
61            node_id,
62        }
63    }
64
65    /// Create a zero timestamp (minimum value).
66    pub const fn zero() -> Self {
67        Self {
68            physical: 0,
69            logical: 0,
70            node_id: 0,
71        }
72    }
73
74    /// Create a timestamp from the current wall clock.
75    #[inline]
76    pub fn now(node_id: u64) -> Self {
77        let physical = SystemTime::now()
78            .duration_since(UNIX_EPOCH)
79            .expect("Time went backwards")
80            .as_micros() as u64;
81
82        Self {
83            physical,
84            logical: 0,
85            node_id,
86        }
87    }
88
89    /// Check if this timestamp is zero/uninitialized.
90    pub const fn is_zero(&self) -> bool {
91        self.physical == 0 && self.logical == 0
92    }
93
94    /// Get physical time as microseconds since epoch.
95    pub const fn as_micros(&self) -> u64 {
96        self.physical
97    }
98
99    /// Get physical time as milliseconds since epoch.
100    pub const fn as_millis(&self) -> u64 {
101        self.physical / 1000
102    }
103
104    /// Pack timestamp into a single u128 for atomic comparison.
105    /// Format: [physical:64][logical:48][node_id:16]
106    pub const fn pack(&self) -> u128 {
107        ((self.physical as u128) << 64)
108            | ((self.logical as u128) << 16)
109            | (self.node_id as u128 & 0xFFFF)
110    }
111
112    /// Unpack timestamp from u128.
113    pub const fn unpack(packed: u128) -> Self {
114        Self {
115            physical: (packed >> 64) as u64,
116            logical: ((packed >> 16) & 0xFFFF_FFFF_FFFF) as u64,
117            node_id: (packed & 0xFFFF) as u64,
118        }
119    }
120}
121
122impl Default for HlcTimestamp {
123    fn default() -> Self {
124        Self::zero()
125    }
126}
127
128impl Ord for HlcTimestamp {
129    #[inline]
130    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131        // Compare physical time first
132        match self.physical.cmp(&other.physical) {
133            std::cmp::Ordering::Equal => {}
134            ord => return ord,
135        }
136        // Then logical counter
137        match self.logical.cmp(&other.logical) {
138            std::cmp::Ordering::Equal => {}
139            ord => return ord,
140        }
141        // Finally node_id for total ordering
142        self.node_id.cmp(&other.node_id)
143    }
144}
145
146impl PartialOrd for HlcTimestamp {
147    #[inline]
148    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
149        Some(self.cmp(other))
150    }
151}
152
153impl std::fmt::Display for HlcTimestamp {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        write!(
156            f,
157            "HLC({}.{}.{})",
158            self.physical, self.logical, self.node_id
159        )
160    }
161}
162
163/// Hybrid Logical Clock for generating causally-ordered timestamps.
164///
165/// Thread-safe implementation using atomics for the state.
166pub struct HlcClock {
167    /// Current physical time (atomically updated).
168    physical: AtomicU64,
169    /// Current logical counter (atomically updated).
170    logical: AtomicU64,
171    /// Node identifier.
172    node_id: u64,
173    /// Maximum allowed clock drift in microseconds.
174    max_drift_us: u64,
175}
176
177impl HlcClock {
178    /// Create a new HLC clock with the given node ID.
179    pub fn new(node_id: u64) -> Self {
180        let now = SystemTime::now()
181            .duration_since(UNIX_EPOCH)
182            .expect("Time went backwards")
183            .as_micros() as u64;
184
185        Self {
186            physical: AtomicU64::new(now),
187            logical: AtomicU64::new(0),
188            node_id,
189            max_drift_us: MAX_CLOCK_SKEW_MS * 1000,
190        }
191    }
192
193    /// Create a new HLC clock with custom max drift.
194    pub fn with_max_drift(node_id: u64, max_drift_ms: u64) -> Self {
195        let now = SystemTime::now()
196            .duration_since(UNIX_EPOCH)
197            .expect("Time went backwards")
198            .as_micros() as u64;
199
200        Self {
201            physical: AtomicU64::new(now),
202            logical: AtomicU64::new(0),
203            node_id,
204            max_drift_us: max_drift_ms * 1000,
205        }
206    }
207
208    /// Get the node ID.
209    pub fn node_id(&self) -> u64 {
210        self.node_id
211    }
212
213    /// Get current timestamp without advancing the clock.
214    pub fn now(&self) -> HlcTimestamp {
215        let wall = Self::wall_time();
216        let physical = self.physical.load(Ordering::Acquire);
217        let logical = self.logical.load(Ordering::Acquire);
218
219        // Use max of wall clock and stored physical
220        let new_physical = physical.max(wall);
221
222        HlcTimestamp {
223            physical: new_physical,
224            logical,
225            node_id: self.node_id,
226        }
227    }
228
229    /// Generate a new timestamp, advancing the clock.
230    #[inline]
231    pub fn tick(&self) -> HlcTimestamp {
232        let wall = Self::wall_time();
233
234        loop {
235            let old_physical = self.physical.load(Ordering::Acquire);
236            let old_logical = self.logical.load(Ordering::Acquire);
237
238            let (new_physical, new_logical) = if wall > old_physical {
239                // Wall clock advanced: use wall time, reset logical
240                (wall, 0)
241            } else {
242                // Same or past: increment logical counter
243                (old_physical, old_logical + 1)
244            };
245
246            // Try to update atomically
247            if self
248                .physical
249                .compare_exchange(
250                    old_physical,
251                    new_physical,
252                    Ordering::Release,
253                    Ordering::Relaxed,
254                )
255                .is_ok()
256            {
257                self.logical.store(new_logical, Ordering::Release);
258                return HlcTimestamp {
259                    physical: new_physical,
260                    logical: new_logical,
261                    node_id: self.node_id,
262                };
263            }
264            // CAS failed, retry
265        }
266    }
267
268    /// Update clock on receiving a message with the given timestamp.
269    ///
270    /// Returns the new local timestamp that causally follows the received timestamp.
271    #[inline]
272    pub fn update(&self, received: &HlcTimestamp) -> Result<HlcTimestamp> {
273        let wall = Self::wall_time();
274
275        // Check for clock skew
276        if received.physical > wall + self.max_drift_us {
277            return Err(RingKernelError::ClockSkew {
278                skew_ms: (received.physical - wall) / 1000,
279                max_ms: self.max_drift_us / 1000,
280            });
281        }
282
283        loop {
284            let old_physical = self.physical.load(Ordering::Acquire);
285            let old_logical = self.logical.load(Ordering::Acquire);
286
287            // Take max of wall, local, and received physical
288            let max_physical = wall.max(old_physical).max(received.physical);
289
290            let new_logical = if max_physical == old_physical && max_physical == received.physical {
291                // All three equal: take max logical + 1
292                old_logical.max(received.logical) + 1
293            } else if max_physical == old_physical {
294                // Local physical wins: increment local logical
295                old_logical + 1
296            } else if max_physical == received.physical {
297                // Received physical wins: use received logical + 1
298                received.logical + 1
299            } else {
300                // Wall clock wins: reset logical
301                0
302            };
303
304            // Try to update atomically
305            if self
306                .physical
307                .compare_exchange(
308                    old_physical,
309                    max_physical,
310                    Ordering::Release,
311                    Ordering::Relaxed,
312                )
313                .is_ok()
314            {
315                self.logical.store(new_logical, Ordering::Release);
316                return Ok(HlcTimestamp {
317                    physical: max_physical,
318                    logical: new_logical,
319                    node_id: self.node_id,
320                });
321            }
322            // CAS failed, retry
323        }
324    }
325
326    /// Get current wall clock time in microseconds.
327    #[inline]
328    fn wall_time() -> u64 {
329        SystemTime::now()
330            .duration_since(UNIX_EPOCH)
331            .expect("Time went backwards")
332            .as_micros() as u64
333    }
334}
335
336impl std::fmt::Debug for HlcClock {
337    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338        f.debug_struct("HlcClock")
339            .field("physical", &self.physical.load(Ordering::Relaxed))
340            .field("logical", &self.logical.load(Ordering::Relaxed))
341            .field("node_id", &self.node_id)
342            .finish()
343    }
344}
345
346/// Compact HLC state for GPU-side storage (16 bytes).
347#[derive(Debug, Clone, Copy, Default, AsBytes, FromBytes, FromZeroes, Pod, Zeroable)]
348#[repr(C, align(16))]
349pub struct HlcState {
350    /// Physical time in microseconds.
351    pub physical: u64,
352    /// Logical counter.
353    pub logical: u64,
354}
355
356impl HlcState {
357    /// Create new HLC state.
358    pub const fn new(physical: u64, logical: u64) -> Self {
359        Self { physical, logical }
360    }
361
362    /// Convert to full timestamp with node ID.
363    pub const fn to_timestamp(&self, node_id: u64) -> HlcTimestamp {
364        HlcTimestamp {
365            physical: self.physical,
366            logical: self.logical,
367            node_id,
368        }
369    }
370
371    /// Create from full timestamp (drops node_id).
372    pub const fn from_timestamp(ts: &HlcTimestamp) -> Self {
373        Self {
374            physical: ts.physical,
375            logical: ts.logical,
376        }
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    #[test]
385    fn test_timestamp_ordering() {
386        let ts1 = HlcTimestamp::new(100, 0, 1);
387        let ts2 = HlcTimestamp::new(100, 1, 1);
388        let ts3 = HlcTimestamp::new(101, 0, 1);
389
390        assert!(ts1 < ts2);
391        assert!(ts2 < ts3);
392        assert!(ts1 < ts3);
393    }
394
395    #[test]
396    fn test_timestamp_node_id_tiebreak() {
397        let ts1 = HlcTimestamp::new(100, 5, 1);
398        let ts2 = HlcTimestamp::new(100, 5, 2);
399
400        assert!(ts1 < ts2);
401    }
402
403    #[test]
404    fn test_clock_tick() {
405        let clock = HlcClock::new(1);
406
407        let ts1 = clock.tick();
408        let ts2 = clock.tick();
409        let ts3 = clock.tick();
410
411        assert!(ts1 < ts2);
412        assert!(ts2 < ts3);
413    }
414
415    #[test]
416    fn test_clock_update() {
417        let clock1 = HlcClock::new(1);
418        let clock2 = HlcClock::new(2);
419
420        let ts1 = clock1.tick();
421        let ts2 = clock2.update(&ts1).unwrap();
422
423        // ts2 should causally follow ts1
424        assert!(ts1 < ts2);
425    }
426
427    #[test]
428    fn test_pack_unpack() {
429        let original = HlcTimestamp::new(12345678901234, 42, 7);
430        let packed = original.pack();
431        let unpacked = HlcTimestamp::unpack(packed);
432
433        assert_eq!(original.physical, unpacked.physical);
434        // Note: node_id is truncated to 16 bits in pack format
435        assert_eq!(original.logical, unpacked.logical);
436    }
437
438    #[test]
439    fn test_clock_skew_detection() {
440        let clock = HlcClock::with_max_drift(1, 100); // 100ms max drift
441
442        // Create a timestamp far in the future
443        let future = HlcTimestamp::new(
444            SystemTime::now()
445                .duration_since(UNIX_EPOCH)
446                .unwrap()
447                .as_micros() as u64
448                + 200_000_000, // 200 seconds in future
449            0,
450            2,
451        );
452
453        let result = clock.update(&future);
454        assert!(matches!(result, Err(RingKernelError::ClockSkew { .. })));
455    }
456
457    #[test]
458    fn test_timestamp_display() {
459        let ts = HlcTimestamp::new(1234567890, 42, 7);
460        let s = format!("{}", ts);
461        assert!(s.contains("1234567890"));
462        assert!(s.contains("42"));
463        assert!(s.contains("7"));
464    }
465}
466
467#[cfg(test)]
468mod proptests {
469    use super::*;
470    use proptest::prelude::*;
471
472    proptest! {
473        #[test]
474        fn total_ordering_reflexive(p in 0u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
475            let ts = HlcTimestamp::new(p, l, n);
476            prop_assert_eq!(ts.cmp(&ts), std::cmp::Ordering::Equal);
477        }
478
479        #[test]
480        fn total_ordering_antisymmetric(
481            p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
482            p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
483        ) {
484            let a = HlcTimestamp::new(p1, l1, n1);
485            let b = HlcTimestamp::new(p2, l2, n2);
486            if a <= b && b <= a {
487                prop_assert_eq!(a, b);
488            }
489        }
490
491        #[test]
492        fn total_ordering_transitive(
493            p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
494            p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
495            p3 in 0u64..1_000_000, l3 in 0u64..1000, n3 in 0u64..100,
496        ) {
497            let a = HlcTimestamp::new(p1, l1, n1);
498            let b = HlcTimestamp::new(p2, l2, n2);
499            let c = HlcTimestamp::new(p3, l3, n3);
500            if a <= b && b <= c {
501                prop_assert!(a <= c);
502            }
503        }
504
505        #[test]
506        fn zero_is_minimum(p in 1u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
507            let zero = HlcTimestamp::zero();
508            let ts = HlcTimestamp::new(p, l, n);
509            prop_assert!(zero < ts);
510        }
511
512        #[test]
513        fn pack_unpack_preserves_physical_and_logical(
514            p in 0u64..u64::MAX, l in 0u64..0xFFFF_FFFF_FFFF, n in 0u64..0xFFFF,
515        ) {
516            let ts = HlcTimestamp::new(p, l, n);
517            let unpacked = HlcTimestamp::unpack(ts.pack());
518            prop_assert_eq!(ts.physical, unpacked.physical);
519            prop_assert_eq!(ts.logical, unpacked.logical);
520            prop_assert_eq!(ts.node_id, unpacked.node_id);
521        }
522
523        #[test]
524        fn tick_strictly_increasing(n in 2usize..=20) {
525            let clock = HlcClock::new(42);
526            let mut prev = clock.tick();
527            for _ in 1..n {
528                let curr = clock.tick();
529                prop_assert!(curr > prev, "tick() must be strictly increasing: {:?} not > {:?}", curr, prev);
530                prev = curr;
531            }
532        }
533
534        #[test]
535        fn update_preserves_causality(node_a in 1u64..100, node_b in 100u64..200) {
536            let clock_a = HlcClock::new(node_a);
537            let clock_b = HlcClock::new(node_b);
538
539            let ts_a = clock_a.tick();
540            let ts_b = clock_b.update(&ts_a).unwrap();
541
542            // Received message causality: ts_b must follow ts_a
543            prop_assert!(ts_b > ts_a);
544        }
545    }
546}