use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
const LOGICAL_BITS: u32 = 16;
const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1; const MAX_LOGICAL: u64 = LOGICAL_MASK;
#[derive(
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
#[repr(transparent)]
pub struct HlcTimestamp(u64);
impl HlcTimestamp {
#[inline]
#[must_use]
pub const fn new(physical_ms: u64, logical: u16) -> Self {
Self((physical_ms << LOGICAL_BITS) | (logical as u64))
}
#[must_use]
pub fn now() -> Self {
Self::new(wall_clock_ms(), 0)
}
#[inline]
#[must_use]
pub const fn physical_ms(&self) -> u64 {
self.0 >> LOGICAL_BITS
}
#[inline]
#[must_use]
pub const fn logical(&self) -> u16 {
(self.0 & LOGICAL_MASK) as u16
}
#[inline]
#[must_use]
pub const fn as_u64(&self) -> u64 {
self.0
}
#[inline]
#[must_use]
pub const fn from_u64(raw: u64) -> Self {
Self(raw)
}
#[inline]
#[must_use]
pub const fn zero() -> Self {
Self(0)
}
}
impl fmt::Debug for HlcTimestamp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "HlcTimestamp({}.{})", self.physical_ms(), self.logical())
}
}
impl fmt::Display for HlcTimestamp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.{}", self.physical_ms(), self.logical())
}
}
impl Default for HlcTimestamp {
fn default() -> Self {
Self::zero()
}
}
impl From<u64> for HlcTimestamp {
fn from(raw: u64) -> Self {
Self::from_u64(raw)
}
}
impl From<HlcTimestamp> for u64 {
fn from(ts: HlcTimestamp) -> Self {
ts.as_u64()
}
}
pub struct HlcClock {
last: AtomicU64,
}
impl HlcClock {
#[must_use]
pub fn new() -> Self {
Self {
last: AtomicU64::new(HlcTimestamp::now().as_u64()),
}
}
pub fn now(&self) -> HlcTimestamp {
let pt = wall_clock_ms();
loop {
let last_raw = self.last.load(Ordering::Acquire);
let last = HlcTimestamp::from_u64(last_raw);
let last_pt = last.physical_ms();
let last_lc = last.logical() as u64;
let next = match pt.cmp(&last_pt) {
std::cmp::Ordering::Greater => {
HlcTimestamp::new(pt, 0)
}
std::cmp::Ordering::Equal => {
let lc = last_lc.saturating_add(1).min(MAX_LOGICAL);
HlcTimestamp::new(pt, lc as u16)
}
std::cmp::Ordering::Less => {
let lc = last_lc.saturating_add(1).min(MAX_LOGICAL);
HlcTimestamp::new(last_pt, lc as u16)
}
};
if self
.last
.compare_exchange_weak(last_raw, next.as_u64(), Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return next;
}
}
}
pub fn update(&self, received: HlcTimestamp) -> HlcTimestamp {
let pt = wall_clock_ms();
loop {
let last_raw = self.last.load(Ordering::Acquire);
let last = HlcTimestamp::from_u64(last_raw);
let last_pt = last.physical_ms();
let recv_pt = received.physical_ms();
let max_pt = pt.max(last_pt).max(recv_pt);
let next = if max_pt == pt && pt > last_pt && pt > recv_pt {
HlcTimestamp::new(pt, 0)
} else if max_pt == last_pt && last_pt == recv_pt {
let lc = last.logical().max(received.logical()) as u64;
let lc = lc.saturating_add(1).min(MAX_LOGICAL);
HlcTimestamp::new(max_pt, lc as u16)
} else if max_pt == last_pt {
let lc = (last.logical() as u64).saturating_add(1).min(MAX_LOGICAL);
HlcTimestamp::new(max_pt, lc as u16)
} else {
let lc = (received.logical() as u64)
.saturating_add(1)
.min(MAX_LOGICAL);
HlcTimestamp::new(max_pt, lc as u16)
};
if self
.last
.compare_exchange_weak(last_raw, next.as_u64(), Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return next;
}
}
}
#[must_use]
pub fn peek(&self) -> HlcTimestamp {
HlcTimestamp::from_u64(self.last.load(Ordering::Acquire))
}
}
impl Default for HlcClock {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for HlcClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "HlcClock(last={})", self.peek())
}
}
fn wall_clock_ms() -> u64 {
let duration = SystemTime::now().duration_since(UNIX_EPOCH);
#[allow(clippy::cast_possible_truncation)]
duration.map_or(0, |d| d.as_millis() as u64)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::sync::{Arc, Barrier};
use std::thread;
#[test]
fn encoding_roundtrip() {
let ts = HlcTimestamp::new(1_700_000_000_000, 42);
assert_eq!(ts.physical_ms(), 1_700_000_000_000);
assert_eq!(ts.logical(), 42);
assert_eq!(HlcTimestamp::from_u64(ts.as_u64()), ts);
}
#[test]
fn backward_compatible_with_plain_u64() {
let plain_ms: u64 = 1_700_000_000_000;
let ts = HlcTimestamp::from_u64(plain_ms << LOGICAL_BITS);
assert_eq!(ts.physical_ms(), plain_ms);
assert_eq!(ts.logical(), 0);
}
#[test]
fn ordering_physical_major() {
let a = HlcTimestamp::new(100, 5);
let b = HlcTimestamp::new(101, 0);
assert!(a < b, "later physical time should be greater");
}
#[test]
fn ordering_logical_minor() {
let a = HlcTimestamp::new(100, 3);
let b = HlcTimestamp::new(100, 4);
assert!(
a < b,
"higher logical counter at same physical should be greater"
);
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn sequential_now_strictly_increasing() {
let clock = HlcClock::new();
let mut prev = clock.now();
for _ in 0..1000 {
let next = clock.now();
assert!(
next > prev,
"HLC must be strictly increasing: {prev} then {next}"
);
prev = next;
}
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn concurrent_now_all_unique() {
let clock = Arc::new(HlcClock::new());
let num_threads = 8;
let per_thread = 100;
let barrier = Arc::new(Barrier::new(num_threads));
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let clock = Arc::clone(&clock);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let mut results = Vec::with_capacity(per_thread);
for _ in 0..per_thread {
results.push(clock.now().as_u64());
}
results
})
})
.collect();
let all: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
let unique: HashSet<u64> = all.iter().copied().collect();
assert_eq!(
unique.len(),
all.len(),
"All concurrent HLC timestamps must be unique"
);
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn update_advances_past_remote() {
let clock = HlcClock::new();
let remote = HlcTimestamp::new(wall_clock_ms() + 10_000, 5);
let merged = clock.update(remote);
assert!(
merged > remote,
"Merged timestamp must be > remote: {merged} vs {remote}"
);
let next = clock.now();
assert!(
next > remote,
"Post-update now() must be > remote: {next} vs {remote}"
);
}
#[test]
fn zero_and_default() {
assert_eq!(HlcTimestamp::zero().as_u64(), 0);
assert_eq!(HlcTimestamp::default(), HlcTimestamp::zero());
}
#[test]
fn display_format() {
let ts = HlcTimestamp::new(1000, 42);
assert_eq!(format!("{ts}"), "1000.42");
}
#[test]
fn debug_format() {
let ts = HlcTimestamp::new(500, 7);
let dbg = format!("{ts:?}");
assert!(dbg.contains("500"), "Debug should contain physical_ms");
assert!(dbg.contains('7'), "Debug should contain logical");
}
#[test]
fn from_u64_roundtrip() {
let raw: u64 = 0xDEAD_BEEF_0042;
let ts: HlcTimestamp = raw.into();
let back: u64 = ts.into();
assert_eq!(raw, back);
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn update_local_wall_clock_is_newest() {
let clock = HlcClock {
last: AtomicU64::new(HlcTimestamp::new(1, 0).as_u64()),
};
let remote = HlcTimestamp::new(2, 3);
let merged = clock.update(remote);
let now_ms = wall_clock_ms();
assert!(
merged.physical_ms() >= now_ms - 1,
"When wall clock is newest, physical should match current time"
);
assert_eq!(
merged.logical(),
0,
"When wall clock advances past both, logical resets to 0"
);
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn update_all_three_equal() {
let pt = wall_clock_ms() + 60_000;
let clock = HlcClock {
last: AtomicU64::new(HlcTimestamp::new(pt, 5).as_u64()),
};
let remote = HlcTimestamp::new(pt, 10);
let merged = clock.update(remote);
assert_eq!(merged.physical_ms(), pt);
assert_eq!(merged.logical(), 11);
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn update_local_hlc_ahead() {
let future_pt = wall_clock_ms() + 100_000;
let clock = HlcClock {
last: AtomicU64::new(HlcTimestamp::new(future_pt, 3).as_u64()),
};
let remote = HlcTimestamp::new(1, 0); let merged = clock.update(remote);
assert_eq!(
merged.physical_ms(),
future_pt,
"Local HLC physical should dominate"
);
assert_eq!(merged.logical(), 4, "Should increment local logical by 1");
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn update_remote_ahead() {
let clock = HlcClock::new();
let future_pt = wall_clock_ms() + 200_000;
let remote = HlcTimestamp::new(future_pt, 7);
let merged = clock.update(remote);
assert_eq!(
merged.physical_ms(),
future_pt,
"Remote physical should dominate"
);
assert_eq!(merged.logical(), 8, "Should increment remote logical by 1");
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn clock_debug_format() {
let clock = HlcClock::new();
let dbg = format!("{clock:?}");
assert!(dbg.starts_with("HlcClock(last="));
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn peek_does_not_advance() {
let clock = HlcClock::new();
let a = clock.peek();
let b = clock.peek();
assert_eq!(a, b, "peek() should not advance the clock");
}
#[test]
#[cfg_attr(
miri,
ignore = "requires wall clock (SystemTime::now) unavailable under Miri isolation"
)]
fn default_clock() {
let _clock = HlcClock::default();
}
}