1use bytemuck::{Pod, Zeroable};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{SystemTime, UNIX_EPOCH};
27use zerocopy::{AsBytes, FromBytes, FromZeroes};
28
29use crate::error::{Result, RingKernelError};
30
31pub const MAX_CLOCK_SKEW_MS: u64 = 60_000; #[derive(
43 Debug, Clone, Copy, PartialEq, Eq, Hash, AsBytes, FromBytes, FromZeroes, Pod, Zeroable,
44)]
45#[repr(C, align(8))]
46pub struct HlcTimestamp {
47 pub physical: u64,
49 pub logical: u64,
51 pub node_id: u64,
53}
54
55impl HlcTimestamp {
56 pub const fn new(physical: u64, logical: u64, node_id: u64) -> Self {
58 Self {
59 physical,
60 logical,
61 node_id,
62 }
63 }
64
65 pub const fn zero() -> Self {
67 Self {
68 physical: 0,
69 logical: 0,
70 node_id: 0,
71 }
72 }
73
74 #[inline]
76 pub fn now(node_id: u64) -> Self {
77 let physical = SystemTime::now()
78 .duration_since(UNIX_EPOCH)
79 .expect("Time went backwards")
80 .as_micros() as u64;
81
82 Self {
83 physical,
84 logical: 0,
85 node_id,
86 }
87 }
88
89 pub const fn is_zero(&self) -> bool {
91 self.physical == 0 && self.logical == 0
92 }
93
94 pub const fn as_micros(&self) -> u64 {
96 self.physical
97 }
98
99 pub const fn as_millis(&self) -> u64 {
101 self.physical / 1000
102 }
103
104 pub const fn pack(&self) -> u128 {
107 ((self.physical as u128) << 64)
108 | ((self.logical as u128) << 16)
109 | (self.node_id as u128 & 0xFFFF)
110 }
111
112 pub const fn unpack(packed: u128) -> Self {
114 Self {
115 physical: (packed >> 64) as u64,
116 logical: ((packed >> 16) & 0xFFFF_FFFF_FFFF) as u64,
117 node_id: (packed & 0xFFFF) as u64,
118 }
119 }
120}
121
122impl Default for HlcTimestamp {
123 fn default() -> Self {
124 Self::zero()
125 }
126}
127
128impl Ord for HlcTimestamp {
129 #[inline]
130 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
131 match self.physical.cmp(&other.physical) {
133 std::cmp::Ordering::Equal => {}
134 ord => return ord,
135 }
136 match self.logical.cmp(&other.logical) {
138 std::cmp::Ordering::Equal => {}
139 ord => return ord,
140 }
141 self.node_id.cmp(&other.node_id)
143 }
144}
145
146impl PartialOrd for HlcTimestamp {
147 #[inline]
148 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
149 Some(self.cmp(other))
150 }
151}
152
153impl std::fmt::Display for HlcTimestamp {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 write!(
156 f,
157 "HLC({}.{}.{})",
158 self.physical, self.logical, self.node_id
159 )
160 }
161}
162
163pub struct HlcClock {
167 physical: AtomicU64,
169 logical: AtomicU64,
171 node_id: u64,
173 max_drift_us: u64,
175}
176
177impl HlcClock {
178 pub fn new(node_id: u64) -> Self {
180 let now = SystemTime::now()
181 .duration_since(UNIX_EPOCH)
182 .expect("Time went backwards")
183 .as_micros() as u64;
184
185 Self {
186 physical: AtomicU64::new(now),
187 logical: AtomicU64::new(0),
188 node_id,
189 max_drift_us: MAX_CLOCK_SKEW_MS * 1000,
190 }
191 }
192
193 pub fn with_max_drift(node_id: u64, max_drift_ms: u64) -> Self {
195 let now = SystemTime::now()
196 .duration_since(UNIX_EPOCH)
197 .expect("Time went backwards")
198 .as_micros() as u64;
199
200 Self {
201 physical: AtomicU64::new(now),
202 logical: AtomicU64::new(0),
203 node_id,
204 max_drift_us: max_drift_ms * 1000,
205 }
206 }
207
208 pub fn node_id(&self) -> u64 {
210 self.node_id
211 }
212
213 pub fn now(&self) -> HlcTimestamp {
215 let wall = Self::wall_time();
216 let physical = self.physical.load(Ordering::Acquire);
217 let logical = self.logical.load(Ordering::Acquire);
218
219 let new_physical = physical.max(wall);
221
222 HlcTimestamp {
223 physical: new_physical,
224 logical,
225 node_id: self.node_id,
226 }
227 }
228
229 #[inline]
231 pub fn tick(&self) -> HlcTimestamp {
232 let wall = Self::wall_time();
233
234 loop {
235 let old_physical = self.physical.load(Ordering::Acquire);
236 let old_logical = self.logical.load(Ordering::Acquire);
237
238 let (new_physical, new_logical) = if wall > old_physical {
239 (wall, 0)
241 } else {
242 (old_physical, old_logical + 1)
244 };
245
246 if self
248 .physical
249 .compare_exchange(
250 old_physical,
251 new_physical,
252 Ordering::Release,
253 Ordering::Relaxed,
254 )
255 .is_ok()
256 {
257 self.logical.store(new_logical, Ordering::Release);
258 return HlcTimestamp {
259 physical: new_physical,
260 logical: new_logical,
261 node_id: self.node_id,
262 };
263 }
264 }
266 }
267
268 #[inline]
272 pub fn update(&self, received: &HlcTimestamp) -> Result<HlcTimestamp> {
273 let wall = Self::wall_time();
274
275 if received.physical > wall + self.max_drift_us {
277 return Err(RingKernelError::ClockSkew {
278 skew_ms: (received.physical - wall) / 1000,
279 max_ms: self.max_drift_us / 1000,
280 });
281 }
282
283 loop {
284 let old_physical = self.physical.load(Ordering::Acquire);
285 let old_logical = self.logical.load(Ordering::Acquire);
286
287 let max_physical = wall.max(old_physical).max(received.physical);
289
290 let new_logical = if max_physical == old_physical && max_physical == received.physical {
291 old_logical.max(received.logical) + 1
293 } else if max_physical == old_physical {
294 old_logical + 1
296 } else if max_physical == received.physical {
297 received.logical + 1
299 } else {
300 0
302 };
303
304 if self
306 .physical
307 .compare_exchange(
308 old_physical,
309 max_physical,
310 Ordering::Release,
311 Ordering::Relaxed,
312 )
313 .is_ok()
314 {
315 self.logical.store(new_logical, Ordering::Release);
316 return Ok(HlcTimestamp {
317 physical: max_physical,
318 logical: new_logical,
319 node_id: self.node_id,
320 });
321 }
322 }
324 }
325
326 #[inline]
328 fn wall_time() -> u64 {
329 SystemTime::now()
330 .duration_since(UNIX_EPOCH)
331 .expect("Time went backwards")
332 .as_micros() as u64
333 }
334}
335
336impl std::fmt::Debug for HlcClock {
337 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338 f.debug_struct("HlcClock")
339 .field("physical", &self.physical.load(Ordering::Relaxed))
340 .field("logical", &self.logical.load(Ordering::Relaxed))
341 .field("node_id", &self.node_id)
342 .finish()
343 }
344}
345
346#[derive(Debug, Clone, Copy, Default, AsBytes, FromBytes, FromZeroes, Pod, Zeroable)]
348#[repr(C, align(16))]
349pub struct HlcState {
350 pub physical: u64,
352 pub logical: u64,
354}
355
356impl HlcState {
357 pub const fn new(physical: u64, logical: u64) -> Self {
359 Self { physical, logical }
360 }
361
362 pub const fn to_timestamp(&self, node_id: u64) -> HlcTimestamp {
364 HlcTimestamp {
365 physical: self.physical,
366 logical: self.logical,
367 node_id,
368 }
369 }
370
371 pub const fn from_timestamp(ts: &HlcTimestamp) -> Self {
373 Self {
374 physical: ts.physical,
375 logical: ts.logical,
376 }
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[test]
385 fn test_timestamp_ordering() {
386 let ts1 = HlcTimestamp::new(100, 0, 1);
387 let ts2 = HlcTimestamp::new(100, 1, 1);
388 let ts3 = HlcTimestamp::new(101, 0, 1);
389
390 assert!(ts1 < ts2);
391 assert!(ts2 < ts3);
392 assert!(ts1 < ts3);
393 }
394
395 #[test]
396 fn test_timestamp_node_id_tiebreak() {
397 let ts1 = HlcTimestamp::new(100, 5, 1);
398 let ts2 = HlcTimestamp::new(100, 5, 2);
399
400 assert!(ts1 < ts2);
401 }
402
403 #[test]
404 fn test_clock_tick() {
405 let clock = HlcClock::new(1);
406
407 let ts1 = clock.tick();
408 let ts2 = clock.tick();
409 let ts3 = clock.tick();
410
411 assert!(ts1 < ts2);
412 assert!(ts2 < ts3);
413 }
414
415 #[test]
416 fn test_clock_update() {
417 let clock1 = HlcClock::new(1);
418 let clock2 = HlcClock::new(2);
419
420 let ts1 = clock1.tick();
421 let ts2 = clock2.update(&ts1).unwrap();
422
423 assert!(ts1 < ts2);
425 }
426
427 #[test]
428 fn test_pack_unpack() {
429 let original = HlcTimestamp::new(12345678901234, 42, 7);
430 let packed = original.pack();
431 let unpacked = HlcTimestamp::unpack(packed);
432
433 assert_eq!(original.physical, unpacked.physical);
434 assert_eq!(original.logical, unpacked.logical);
436 }
437
438 #[test]
439 fn test_clock_skew_detection() {
440 let clock = HlcClock::with_max_drift(1, 100); let future = HlcTimestamp::new(
444 SystemTime::now()
445 .duration_since(UNIX_EPOCH)
446 .unwrap()
447 .as_micros() as u64
448 + 200_000_000, 0,
450 2,
451 );
452
453 let result = clock.update(&future);
454 assert!(matches!(result, Err(RingKernelError::ClockSkew { .. })));
455 }
456
457 #[test]
458 fn test_timestamp_display() {
459 let ts = HlcTimestamp::new(1234567890, 42, 7);
460 let s = format!("{}", ts);
461 assert!(s.contains("1234567890"));
462 assert!(s.contains("42"));
463 assert!(s.contains("7"));
464 }
465}
466
467#[cfg(test)]
468mod proptests {
469 use super::*;
470 use proptest::prelude::*;
471
472 proptest! {
473 #[test]
474 fn total_ordering_reflexive(p in 0u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
475 let ts = HlcTimestamp::new(p, l, n);
476 prop_assert_eq!(ts.cmp(&ts), std::cmp::Ordering::Equal);
477 }
478
479 #[test]
480 fn total_ordering_antisymmetric(
481 p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
482 p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
483 ) {
484 let a = HlcTimestamp::new(p1, l1, n1);
485 let b = HlcTimestamp::new(p2, l2, n2);
486 if a <= b && b <= a {
487 prop_assert_eq!(a, b);
488 }
489 }
490
491 #[test]
492 fn total_ordering_transitive(
493 p1 in 0u64..1_000_000, l1 in 0u64..1000, n1 in 0u64..100,
494 p2 in 0u64..1_000_000, l2 in 0u64..1000, n2 in 0u64..100,
495 p3 in 0u64..1_000_000, l3 in 0u64..1000, n3 in 0u64..100,
496 ) {
497 let a = HlcTimestamp::new(p1, l1, n1);
498 let b = HlcTimestamp::new(p2, l2, n2);
499 let c = HlcTimestamp::new(p3, l3, n3);
500 if a <= b && b <= c {
501 prop_assert!(a <= c);
502 }
503 }
504
505 #[test]
506 fn zero_is_minimum(p in 1u64..1_000_000, l in 0u64..1000, n in 0u64..100) {
507 let zero = HlcTimestamp::zero();
508 let ts = HlcTimestamp::new(p, l, n);
509 prop_assert!(zero < ts);
510 }
511
512 #[test]
513 fn pack_unpack_preserves_physical_and_logical(
514 p in 0u64..u64::MAX, l in 0u64..0xFFFF_FFFF_FFFF, n in 0u64..0xFFFF,
515 ) {
516 let ts = HlcTimestamp::new(p, l, n);
517 let unpacked = HlcTimestamp::unpack(ts.pack());
518 prop_assert_eq!(ts.physical, unpacked.physical);
519 prop_assert_eq!(ts.logical, unpacked.logical);
520 prop_assert_eq!(ts.node_id, unpacked.node_id);
521 }
522
523 #[test]
524 fn tick_strictly_increasing(n in 2usize..=20) {
525 let clock = HlcClock::new(42);
526 let mut prev = clock.tick();
527 for _ in 1..n {
528 let curr = clock.tick();
529 prop_assert!(curr > prev, "tick() must be strictly increasing: {:?} not > {:?}", curr, prev);
530 prev = curr;
531 }
532 }
533
534 #[test]
535 fn update_preserves_causality(node_a in 1u64..100, node_b in 100u64..200) {
536 let clock_a = HlcClock::new(node_a);
537 let clock_b = HlcClock::new(node_b);
538
539 let ts_a = clock_a.tick();
540 let ts_b = clock_b.update(&ts_a).unwrap();
541
542 prop_assert!(ts_b > ts_a);
544 }
545 }
546}