1use bytemuck::{Pod, Zeroable};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{SystemTime, UNIX_EPOCH};
27use zerocopy::{AsBytes, FromBytes, FromZeroes};
28
29use crate::error::{Result, RingKernelError};
30
31pub const MAX_CLOCK_SKEW_MS: u64 = 60_000; #[derive(
43 Debug,
44 Clone,
45 Copy,
46 PartialEq,
47 Eq,
48 Hash,
49 AsBytes,
50 FromBytes,
51 FromZeroes,
52 Pod,
53 Zeroable,
54 rkyv::Archive,
55 rkyv::Serialize,
56 rkyv::Deserialize,
57)]
58#[archive(compare(PartialEq))]
59#[repr(C, align(8))]
60pub struct HlcTimestamp {
61 pub physical: u64,
63 pub logical: u64,
65 pub node_id: u64,
67}
68
69impl HlcTimestamp {
70 pub const fn new(physical: u64, logical: u64, node_id: u64) -> Self {
72 Self {
73 physical,
74 logical,
75 node_id,
76 }
77 }
78
79 pub const fn zero() -> Self {
81 Self {
82 physical: 0,
83 logical: 0,
84 node_id: 0,
85 }
86 }
87
88 #[inline]
90 pub fn now(node_id: u64) -> Self {
91 let physical = SystemTime::now()
92 .duration_since(UNIX_EPOCH)
93 .expect("Time went backwards")
94 .as_micros() as u64;
95
96 Self {
97 physical,
98 logical: 0,
99 node_id,
100 }
101 }
102
103 pub const fn is_zero(&self) -> bool {
105 self.physical == 0 && self.logical == 0
106 }
107
108 pub const fn as_micros(&self) -> u64 {
110 self.physical
111 }
112
113 pub const fn as_millis(&self) -> u64 {
115 self.physical / 1000
116 }
117
118 pub const fn pack(&self) -> u128 {
121 ((self.physical as u128) << 64)
122 | ((self.logical as u128) << 16)
123 | (self.node_id as u128 & 0xFFFF)
124 }
125
126 pub const fn unpack(packed: u128) -> Self {
128 Self {
129 physical: (packed >> 64) as u64,
130 logical: ((packed >> 16) & 0xFFFF_FFFF_FFFF) as u64,
131 node_id: (packed & 0xFFFF) as u64,
132 }
133 }
134}
135
136impl Default for HlcTimestamp {
137 fn default() -> Self {
138 Self::zero()
139 }
140}
141
142impl Ord for HlcTimestamp {
143 #[inline]
144 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
145 match self.physical.cmp(&other.physical) {
147 std::cmp::Ordering::Equal => {}
148 ord => return ord,
149 }
150 match self.logical.cmp(&other.logical) {
152 std::cmp::Ordering::Equal => {}
153 ord => return ord,
154 }
155 self.node_id.cmp(&other.node_id)
157 }
158}
159
160impl PartialOrd for HlcTimestamp {
161 #[inline]
162 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
163 Some(self.cmp(other))
164 }
165}
166
167impl std::fmt::Display for HlcTimestamp {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 write!(
170 f,
171 "HLC({}.{}.{})",
172 self.physical, self.logical, self.node_id
173 )
174 }
175}
176
177pub struct HlcClock {
181 physical: AtomicU64,
183 logical: AtomicU64,
185 node_id: u64,
187 max_drift_us: u64,
189}
190
191impl HlcClock {
192 pub fn new(node_id: u64) -> Self {
194 let now = SystemTime::now()
195 .duration_since(UNIX_EPOCH)
196 .expect("Time went backwards")
197 .as_micros() as u64;
198
199 Self {
200 physical: AtomicU64::new(now),
201 logical: AtomicU64::new(0),
202 node_id,
203 max_drift_us: MAX_CLOCK_SKEW_MS * 1000,
204 }
205 }
206
207 pub fn with_max_drift(node_id: u64, max_drift_ms: u64) -> Self {
209 let now = SystemTime::now()
210 .duration_since(UNIX_EPOCH)
211 .expect("Time went backwards")
212 .as_micros() as u64;
213
214 Self {
215 physical: AtomicU64::new(now),
216 logical: AtomicU64::new(0),
217 node_id,
218 max_drift_us: max_drift_ms * 1000,
219 }
220 }
221
222 pub fn node_id(&self) -> u64 {
224 self.node_id
225 }
226
227 pub fn now(&self) -> HlcTimestamp {
229 let wall = Self::wall_time();
230 let physical = self.physical.load(Ordering::Acquire);
231 let logical = self.logical.load(Ordering::Acquire);
232
233 let new_physical = physical.max(wall);
235
236 HlcTimestamp {
237 physical: new_physical,
238 logical,
239 node_id: self.node_id,
240 }
241 }
242
243 #[inline]
245 pub fn tick(&self) -> HlcTimestamp {
246 let wall = Self::wall_time();
247
248 loop {
249 let old_physical = self.physical.load(Ordering::Acquire);
250 let old_logical = self.logical.load(Ordering::Acquire);
251
252 let (new_physical, new_logical) = if wall > old_physical {
253 (wall, 0)
255 } else {
256 (old_physical, old_logical + 1)
258 };
259
260 if self
262 .physical
263 .compare_exchange(
264 old_physical,
265 new_physical,
266 Ordering::Release,
267 Ordering::Relaxed,
268 )
269 .is_ok()
270 {
271 self.logical.store(new_logical, Ordering::Release);
272 return HlcTimestamp {
273 physical: new_physical,
274 logical: new_logical,
275 node_id: self.node_id,
276 };
277 }
278 }
280 }
281
282 #[inline]
286 pub fn update(&self, received: &HlcTimestamp) -> Result<HlcTimestamp> {
287 let wall = Self::wall_time();
288
289 if received.physical > wall + self.max_drift_us {
291 return Err(RingKernelError::ClockSkew {
292 skew_ms: (received.physical - wall) / 1000,
293 max_ms: self.max_drift_us / 1000,
294 });
295 }
296
297 loop {
298 let old_physical = self.physical.load(Ordering::Acquire);
299 let old_logical = self.logical.load(Ordering::Acquire);
300
301 let max_physical = wall.max(old_physical).max(received.physical);
303
304 let new_logical = if max_physical == old_physical && max_physical == received.physical {
305 old_logical.max(received.logical) + 1
307 } else if max_physical == old_physical {
308 old_logical + 1
310 } else if max_physical == received.physical {
311 received.logical + 1
313 } else {
314 0
316 };
317
318 if self
320 .physical
321 .compare_exchange(
322 old_physical,
323 max_physical,
324 Ordering::Release,
325 Ordering::Relaxed,
326 )
327 .is_ok()
328 {
329 self.logical.store(new_logical, Ordering::Release);
330 return Ok(HlcTimestamp {
331 physical: max_physical,
332 logical: new_logical,
333 node_id: self.node_id,
334 });
335 }
336 }
338 }
339
340 #[inline]
342 fn wall_time() -> u64 {
343 SystemTime::now()
344 .duration_since(UNIX_EPOCH)
345 .expect("Time went backwards")
346 .as_micros() as u64
347 }
348}
349
350impl std::fmt::Debug for HlcClock {
351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352 f.debug_struct("HlcClock")
353 .field("physical", &self.physical.load(Ordering::Relaxed))
354 .field("logical", &self.logical.load(Ordering::Relaxed))
355 .field("node_id", &self.node_id)
356 .finish()
357 }
358}
359
360#[derive(Debug, Clone, Copy, Default, AsBytes, FromBytes, FromZeroes, Pod, Zeroable)]
362#[repr(C, align(16))]
363pub struct HlcState {
364 pub physical: u64,
366 pub logical: u64,
368}
369
370impl HlcState {
371 pub const fn new(physical: u64, logical: u64) -> Self {
373 Self { physical, logical }
374 }
375
376 pub const fn to_timestamp(&self, node_id: u64) -> HlcTimestamp {
378 HlcTimestamp {
379 physical: self.physical,
380 logical: self.logical,
381 node_id,
382 }
383 }
384
385 pub const fn from_timestamp(ts: &HlcTimestamp) -> Self {
387 Self {
388 physical: ts.physical,
389 logical: ts.logical,
390 }
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn test_timestamp_ordering() {
400 let ts1 = HlcTimestamp::new(100, 0, 1);
401 let ts2 = HlcTimestamp::new(100, 1, 1);
402 let ts3 = HlcTimestamp::new(101, 0, 1);
403
404 assert!(ts1 < ts2);
405 assert!(ts2 < ts3);
406 assert!(ts1 < ts3);
407 }
408
409 #[test]
410 fn test_timestamp_node_id_tiebreak() {
411 let ts1 = HlcTimestamp::new(100, 5, 1);
412 let ts2 = HlcTimestamp::new(100, 5, 2);
413
414 assert!(ts1 < ts2);
415 }
416
417 #[test]
418 fn test_clock_tick() {
419 let clock = HlcClock::new(1);
420
421 let ts1 = clock.tick();
422 let ts2 = clock.tick();
423 let ts3 = clock.tick();
424
425 assert!(ts1 < ts2);
426 assert!(ts2 < ts3);
427 }
428
429 #[test]
430 fn test_clock_update() {
431 let clock1 = HlcClock::new(1);
432 let clock2 = HlcClock::new(2);
433
434 let ts1 = clock1.tick();
435 let ts2 = clock2.update(&ts1).unwrap();
436
437 assert!(ts1 < ts2);
439 }
440
441 #[test]
442 fn test_pack_unpack() {
443 let original = HlcTimestamp::new(12345678901234, 42, 7);
444 let packed = original.pack();
445 let unpacked = HlcTimestamp::unpack(packed);
446
447 assert_eq!(original.physical, unpacked.physical);
448 assert_eq!(original.logical, unpacked.logical);
450 }
451
452 #[test]
453 fn test_clock_skew_detection() {
454 let clock = HlcClock::with_max_drift(1, 100); let future = HlcTimestamp::new(
458 SystemTime::now()
459 .duration_since(UNIX_EPOCH)
460 .unwrap()
461 .as_micros() as u64
462 + 200_000_000, 0,
464 2,
465 );
466
467 let result = clock.update(&future);
468 assert!(matches!(result, Err(RingKernelError::ClockSkew { .. })));
469 }
470
471 #[test]
472 fn test_timestamp_display() {
473 let ts = HlcTimestamp::new(1234567890, 42, 7);
474 let s = format!("{}", ts);
475 assert!(s.contains("1234567890"));
476 assert!(s.contains("42"));
477 assert!(s.contains("7"));
478 }
479}
480
481#[cfg(test)]
482mod proptests {
483 use super::*;
484 use proptest::prelude::*;
485
486 proptest! {
487 #[test]
488 fn total_ordering_reflexive(p in 0u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
489 let ts = HlcTimestamp::new(p, l, n);
490 prop_assert_eq!(ts.cmp(&ts), std::cmp::Ordering::Equal);
491 }
492
493 #[test]
494 fn total_ordering_antisymmetric(
495 p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
496 p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
497 ) {
498 let a = HlcTimestamp::new(p1, l1, n1);
499 let b = HlcTimestamp::new(p2, l2, n2);
500 if a <= b && b <= a {
501 prop_assert_eq!(a, b);
502 }
503 }
504
505 #[test]
506 fn total_ordering_transitive(
507 p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
508 p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
509 p3 in 0u64..1_000_000, l3 in 0u64..1000, n3 in 0u64..100,
510 ) {
511 let a = HlcTimestamp::new(p1, l1, n1);
512 let b = HlcTimestamp::new(p2, l2, n2);
513 let c = HlcTimestamp::new(p3, l3, n3);
514 if a <= b && b <= c {
515 prop_assert!(a <= c);
516 }
517 }
518
519 #[test]
520 fn zero_is_minimum(p in 1u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
521 let zero = HlcTimestamp::zero();
522 let ts = HlcTimestamp::new(p, l, n);
523 prop_assert!(zero < ts);
524 }
525
526 #[test]
527 fn pack_unpack_preserves_physical_and_logical(
528 p in 0u64..u64::MAX, l in 0u64..0xFFFF_FFFF_FFFF, n in 0u64..0xFFFF,
529 ) {
530 let ts = HlcTimestamp::new(p, l, n);
531 let unpacked = HlcTimestamp::unpack(ts.pack());
532 prop_assert_eq!(ts.physical, unpacked.physical);
533 prop_assert_eq!(ts.logical, unpacked.logical);
534 prop_assert_eq!(ts.node_id, unpacked.node_id);
535 }
536
537 #[test]
538 fn tick_strictly_increasing(n in 2usize..=20) {
539 let clock = HlcClock::new(42);
540 let mut prev = clock.tick();
541 for _ in 1..n {
542 let curr = clock.tick();
543 prop_assert!(curr > prev, "tick() must be strictly increasing: {:?} not > {:?}", curr, prev);
544 prev = curr;
545 }
546 }
547
548 #[test]
549 fn update_preserves_causality(node_a in 1u64..100, node_b in 100u64..200) {
550 let clock_a = HlcClock::new(node_a);
551 let clock_b = HlcClock::new(node_b);
552
553 let ts_a = clock_a.tick();
554 let ts_b = clock_b.update(&ts_a).unwrap();
555
556 prop_assert!(ts_b > ts_a);
558 }
559 }
560}