1use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{SystemTime, UNIX_EPOCH};
20use zerocopy::{FromBytes, IntoBytes};
21
22#[derive(
29 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, IntoBytes, FromBytes,
30)]
31#[repr(transparent)]
32pub struct HlcTimestamp(u64);
33
34impl HlcTimestamp {
35 const LOGICAL_BITS: u32 = 20;
37
38 const LOGICAL_MASK: u64 = (1 << Self::LOGICAL_BITS) - 1;
40
41 pub const MAX_LOGICAL: u32 = (1 << Self::LOGICAL_BITS) - 1;
43
44 #[inline]
50 #[must_use]
51 pub const fn new(physical_ms: u64, logical: u32) -> Self {
52 debug_assert!(logical < (1 << Self::LOGICAL_BITS));
53 Self((physical_ms << Self::LOGICAL_BITS) | (logical as u64))
54 }
55
56 #[inline]
58 #[must_use]
59 pub const fn from_raw(raw: u64) -> Self {
60 Self(raw)
61 }
62
63 #[inline]
65 #[must_use]
66 pub const fn zero() -> Self {
67 Self(0)
68 }
69
70 #[inline]
72 #[must_use]
73 pub const fn max() -> Self {
74 Self(u64::MAX)
75 }
76
77 #[inline]
79 #[must_use]
80 pub const fn physical_ms(&self) -> u64 {
81 self.0 >> Self::LOGICAL_BITS
82 }
83
84 #[inline]
86 #[must_use]
87 pub const fn logical(&self) -> u32 {
88 (self.0 & Self::LOGICAL_MASK) as u32
89 }
90
91 #[inline]
93 #[must_use]
94 pub const fn as_u64(&self) -> u64 {
95 self.0
96 }
97
98 #[inline]
102 #[must_use]
103 pub const fn to_nanos_approx(&self) -> u64 {
104 self.physical_ms() * 1_000_000
105 }
106}
107
108impl std::fmt::Display for HlcTimestamp {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 write!(f, "HLC({}.{})", self.physical_ms(), self.logical())
111 }
112}
113
114pub struct HybridLogicalClock {
129 last: AtomicU64,
131 node_id: u16,
133}
134
135impl HybridLogicalClock {
136 pub const MAX_DRIFT_WARNING_MS: u64 = 100;
138
139 pub const MAX_DRIFT_CRITICAL_MS: u64 = 1000;
141
142 #[must_use]
144 pub fn new(node_id: u16) -> Self {
145 Self {
146 last: AtomicU64::new(0),
147 node_id,
148 }
149 }
150
151 #[inline]
153 #[must_use]
154 pub const fn node_id(&self) -> u16 {
155 self.node_id
156 }
157
158 pub fn now(&self) -> HlcTimestamp {
168 let wall_ms = Self::wall_clock_ms();
169
170 loop {
171 let last = HlcTimestamp(self.last.load(Ordering::Acquire));
172 let last_physical = last.physical_ms();
173 let last_logical = last.logical();
174
175 let (new_physical, new_logical) = if wall_ms > last_physical {
176 (wall_ms, 0)
178 } else {
179 if last_logical >= HlcTimestamp::MAX_LOGICAL {
181 std::hint::spin_loop();
184 continue;
185 }
186 (last_physical, last_logical + 1)
187 };
188
189 let new_ts = HlcTimestamp::new(new_physical, new_logical);
190
191 if self
193 .last
194 .compare_exchange_weak(last.0, new_ts.0, Ordering::AcqRel, Ordering::Acquire)
195 .is_ok()
196 {
197 return new_ts;
198 }
199 }
201 }
202
203 pub fn receive(&self, remote_ts: HlcTimestamp) -> HlcTimestamp {
215 let wall_ms = Self::wall_clock_ms();
216
217 loop {
218 let last = HlcTimestamp(self.last.load(Ordering::Acquire));
219
220 let (new_physical, new_logical) =
221 if wall_ms > last.physical_ms() && wall_ms > remote_ts.physical_ms() {
222 (wall_ms, 0)
224 } else if last.physical_ms() > remote_ts.physical_ms() {
225 (last.physical_ms(), last.logical().saturating_add(1))
227 } else if remote_ts.physical_ms() > last.physical_ms() {
228 (
230 remote_ts.physical_ms(),
231 remote_ts.logical().saturating_add(1),
232 )
233 } else {
234 let max_logical = last.logical().max(remote_ts.logical());
236 (last.physical_ms(), max_logical.saturating_add(1))
237 };
238
239 let new_logical = new_logical.min(HlcTimestamp::MAX_LOGICAL);
241 let new_ts = HlcTimestamp::new(new_physical, new_logical);
242
243 if self
244 .last
245 .compare_exchange_weak(last.0, new_ts.0, Ordering::AcqRel, Ordering::Acquire)
246 .is_ok()
247 {
248 return new_ts;
249 }
250 }
251 }
252
253 #[inline]
257 #[must_use]
258 pub fn current(&self) -> HlcTimestamp {
259 HlcTimestamp(self.last.load(Ordering::Acquire))
260 }
261
262 #[must_use]
266 pub fn drift_ms(&self) -> i64 {
267 let wall_ms = Self::wall_clock_ms();
268 let hlc_physical = self.current().physical_ms();
269 #[allow(clippy::cast_possible_wrap)]
270 let result = hlc_physical as i64 - wall_ms as i64;
271 result
272 }
273
274 #[must_use]
276 pub fn health(&self) -> ClockHealth {
277 let drift = self.drift_ms().unsigned_abs();
278
279 if drift > Self::MAX_DRIFT_CRITICAL_MS {
280 ClockHealth::Critical { drift_ms: drift }
281 } else if drift > Self::MAX_DRIFT_WARNING_MS {
282 ClockHealth::Degraded { drift_ms: drift }
283 } else {
284 ClockHealth::Healthy
285 }
286 }
287
288 #[inline]
290 fn wall_clock_ms() -> u64 {
291 SystemTime::now()
292 .duration_since(UNIX_EPOCH)
293 .map(|d| {
294 #[allow(clippy::cast_possible_truncation)]
295 let ms = d.as_millis() as u64;
296 ms
297 })
298 .unwrap_or(0)
299 }
300}
301
302impl std::fmt::Debug for HybridLogicalClock {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 f.debug_struct("HybridLogicalClock")
305 .field("node_id", &self.node_id)
306 .field("current", &self.current())
307 .field("drift_ms", &self.drift_ms())
308 .finish_non_exhaustive()
309 }
310}
311
312#[derive(Debug, Clone, Copy, PartialEq, Eq)]
314pub enum ClockHealth {
315 Healthy,
317 Degraded { drift_ms: u64 },
319 Critical { drift_ms: u64 },
321}
322
323impl ClockHealth {
324 #[inline]
326 #[must_use]
327 pub const fn is_healthy(&self) -> bool {
328 matches!(self, Self::Healthy)
329 }
330
331 #[inline]
333 #[must_use]
334 pub const fn is_critical(&self) -> bool {
335 matches!(self, Self::Critical { .. })
336 }
337}
338
339#[cfg(test)]
340#[allow(clippy::unwrap_used, clippy::expect_used, clippy::unreadable_literal)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn test_hlc_timestamp_packing() {
346 let ts = HlcTimestamp::new(1234567890, 42);
347 assert_eq!(ts.physical_ms(), 1234567890);
348 assert_eq!(ts.logical(), 42);
349 }
350
351 #[test]
352 fn test_hlc_timestamp_ordering() {
353 let ts1 = HlcTimestamp::new(100, 0);
354 let ts2 = HlcTimestamp::new(100, 1);
355 let ts3 = HlcTimestamp::new(101, 0);
356
357 assert!(ts1 < ts2);
358 assert!(ts2 < ts3);
359 assert!(ts1 < ts3);
360 }
361
362 #[test]
363 fn test_hlc_monotonicity() {
364 let hlc = HybridLogicalClock::new(1);
365 let mut prev = hlc.now();
366
367 for _ in 0..1000 {
368 let curr = hlc.now();
369 assert!(curr > prev, "HLC must be strictly monotonic");
370 prev = curr;
371 }
372 }
373
374 #[test]
375 fn test_hlc_receive_advances() {
376 let hlc = HybridLogicalClock::new(1);
377 let local = hlc.now();
378
379 let remote = HlcTimestamp::new(local.physical_ms() + 1000, 0);
381 let after_receive = hlc.receive(remote);
382
383 assert!(after_receive > local);
384 assert!(after_receive > remote);
385 }
386
387 #[test]
388 fn test_hlc_receive_same_physical() {
389 let hlc = HybridLogicalClock::new(1);
390
391 let ts1 = hlc.now();
393
394 let remote = HlcTimestamp::new(ts1.physical_ms(), ts1.logical() + 10);
396 let after = hlc.receive(remote);
397
398 assert!(after > remote);
399 assert!(after > ts1);
400 }
401
402 #[test]
403 fn test_hlc_concurrent_access() {
404 use std::sync::Arc;
405 use std::thread;
406
407 let hlc = Arc::new(HybridLogicalClock::new(1));
408 let mut handles = vec![];
409
410 for _ in 0..4 {
411 let hlc_clone = Arc::clone(&hlc);
412 handles.push(thread::spawn(move || {
413 let mut timestamps = Vec::with_capacity(1000);
414 for _ in 0..1000 {
415 timestamps.push(hlc_clone.now());
416 }
417 timestamps
418 }));
419 }
420
421 let mut all_timestamps: Vec<HlcTimestamp> = handles
422 .into_iter()
423 .flat_map(|h| h.join().expect("thread panicked"))
424 .collect();
425
426 let original_len = all_timestamps.len();
428 all_timestamps.sort();
429 all_timestamps.dedup();
430 assert_eq!(
431 all_timestamps.len(),
432 original_len,
433 "HLC produced duplicate timestamps"
434 );
435 }
436
437 #[test]
438 fn test_hlc_size() {
439 assert_eq!(std::mem::size_of::<HlcTimestamp>(), 8);
440 }
441
442 #[test]
443 fn test_clock_health() {
444 let hlc = HybridLogicalClock::new(1);
445 let _ = hlc.now();
447 assert!(hlc.health().is_healthy());
449 }
450}