use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct HlcTimestamp {
wall_time: i64,
logical: i32,
}
pub const HLC_TIMESTAMP_SIZE: usize = 12;
impl HlcTimestamp {
pub const ZERO: Self = Self {
wall_time: 0,
logical: 0,
};
#[inline]
pub fn new(wall_time_ns: i64, logical: i32) -> Self {
Self {
wall_time: wall_time_ns,
logical,
}
}
#[inline]
pub fn wall_time(&self) -> i64 {
self.wall_time
}
#[inline]
pub fn logical(&self) -> i32 {
self.logical
}
#[inline]
pub fn to_bytes(&self) -> [u8; HLC_TIMESTAMP_SIZE] {
let mut buf = [0u8; HLC_TIMESTAMP_SIZE];
buf[0..8].copy_from_slice(&self.wall_time.to_be_bytes());
buf[8..12].copy_from_slice(&self.logical.to_be_bytes());
buf
}
#[inline]
pub fn from_bytes(b: &[u8; HLC_TIMESTAMP_SIZE]) -> Self {
Self {
wall_time: i64::from_be_bytes(b[0..8].try_into().unwrap()),
logical: i32::from_be_bytes(b[8..12].try_into().unwrap()),
}
}
#[inline]
pub fn is_zero(&self) -> bool {
self.wall_time == 0 && self.logical == 0
}
}
impl PartialOrd for HlcTimestamp {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HlcTimestamp {
#[inline]
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.wall_time
.cmp(&other.wall_time)
.then(self.logical.cmp(&other.logical))
}
}
impl std::fmt::Debug for HlcTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HLC({}ns:{})", self.wall_time, self.logical)
}
}
impl std::fmt::Display for HlcTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.wall_time, self.logical)
}
}
#[derive(Debug, thiserror::Error)]
pub enum ClockError {
#[error(
"clock drift exceeded: remote_wall_time={remote_ns}ns, \
physical_now={physical_ns}ns, max_drift={max_drift_ns}ns"
)]
ClockDriftExceeded {
remote_ns: i64,
physical_ns: i64,
max_drift_ns: i64,
},
#[error("HLC counter overflow (>2^31-1 events in same nanosecond)")]
CounterOverflow,
}
pub trait PhysicalClock: Send {
fn now_ns(&self) -> i64;
}
pub struct SystemClock;
impl PhysicalClock for SystemClock {
fn now_ns(&self) -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_nanos() as i64
}
}
pub struct ManualClock {
time_ns: AtomicI64,
}
impl ManualClock {
pub fn new(initial_ns: i64) -> Self {
Self {
time_ns: AtomicI64::new(initial_ns),
}
}
pub fn set(&self, time_ns: i64) {
self.time_ns.store(time_ns, Ordering::SeqCst);
}
pub fn advance(&self, delta_ns: i64) {
self.time_ns.fetch_add(delta_ns, Ordering::SeqCst);
}
}
impl PhysicalClock for ManualClock {
fn now_ns(&self) -> i64 {
self.time_ns.load(Ordering::SeqCst)
}
}
const SECOND_NS: i64 = 1_000_000_000;
const DEFAULT_MAX_DRIFT_NS: i64 = 5 * SECOND_NS;
pub struct HlcClock<C: PhysicalClock = SystemClock> {
last: HlcTimestamp,
max_drift_ns: i64,
clock: C,
}
impl Default for HlcClock<SystemClock> {
fn default() -> Self {
Self::new()
}
}
impl HlcClock<SystemClock> {
pub fn new() -> Self {
Self {
last: HlcTimestamp::ZERO,
max_drift_ns: DEFAULT_MAX_DRIFT_NS,
clock: SystemClock,
}
}
}
impl<C: PhysicalClock> HlcClock<C> {
pub fn with_clock(clock: C) -> Self {
Self {
last: HlcTimestamp::ZERO,
max_drift_ns: DEFAULT_MAX_DRIFT_NS,
clock,
}
}
pub fn set_max_drift_ns(&mut self, max_drift_ns: i64) {
self.max_drift_ns = max_drift_ns;
}
pub fn set_last(&mut self, ts: HlcTimestamp) {
self.last = ts;
}
pub fn now(&mut self) -> Result<HlcTimestamp, ClockError> {
let pt = self.clock.now_ns();
let ts = if self.last.wall_time >= pt {
let new_logical = self
.last
.logical
.checked_add(1)
.ok_or(ClockError::CounterOverflow)?;
self.check_drift(self.last.wall_time, pt)?;
HlcTimestamp::new(self.last.wall_time, new_logical)
} else {
HlcTimestamp::new(pt, 0)
};
self.last = ts;
Ok(ts)
}
pub fn update(&mut self, remote: HlcTimestamp) -> Result<(), ClockError> {
let pt = self.clock.now_ns();
if remote.wall_time.saturating_sub(pt) > self.max_drift_ns {
return Err(ClockError::ClockDriftExceeded {
remote_ns: remote.wall_time,
physical_ns: pt,
max_drift_ns: self.max_drift_ns,
});
}
if remote.wall_time > self.last.wall_time {
self.last = remote;
} else if remote.wall_time == self.last.wall_time {
if remote.logical > self.last.logical {
self.last = HlcTimestamp::new(self.last.wall_time, remote.logical);
}
}
Ok(())
}
pub fn last_timestamp(&self) -> HlcTimestamp {
self.last
}
pub fn physical_clock(&self) -> &C {
&self.clock
}
fn check_drift(&self, wall_time_ns: i64, physical_ns: i64) -> Result<(), ClockError> {
if wall_time_ns.saturating_sub(physical_ns) > self.max_drift_ns {
return Err(ClockError::ClockDriftExceeded {
remote_ns: wall_time_ns,
physical_ns,
max_drift_ns: self.max_drift_ns,
});
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
const SECOND: i64 = 1_000_000_000;
const MS: i64 = 1_000_000;
#[test]
fn new_and_accessors() {
let ts = HlcTimestamp::new(1_000_000_000, 42);
assert_eq!(ts.wall_time(), 1_000_000_000);
assert_eq!(ts.logical(), 42);
}
#[test]
fn zero_timestamp() {
let ts = HlcTimestamp::ZERO;
assert_eq!(ts.wall_time(), 0);
assert_eq!(ts.logical(), 0);
assert!(ts.is_zero());
}
#[test]
fn non_zero_is_not_zero() {
let ts = HlcTimestamp::new(1, 0);
assert!(!ts.is_zero());
let ts2 = HlcTimestamp::new(0, 1);
assert!(!ts2.is_zero());
}
#[test]
fn ordering_wall_time_dominates() {
let a = HlcTimestamp::new(100, i32::MAX);
let b = HlcTimestamp::new(101, 0);
assert!(a < b);
}
#[test]
fn ordering_logical_tiebreaks() {
let a = HlcTimestamp::new(100, 5);
let b = HlcTimestamp::new(100, 6);
assert!(a < b);
}
#[test]
fn ordering_equality() {
let a = HlcTimestamp::new(100, 5);
let b = HlcTimestamp::new(100, 5);
assert_eq!(a, b);
assert!(a <= b);
assert!(a >= b);
}
#[test]
fn ordering_negative_wall_time() {
let a = HlcTimestamp::new(-100, 0);
let b = HlcTimestamp::new(0, 0);
let c = HlcTimestamp::new(100, 0);
assert!(a < b);
assert!(b < c);
}
#[test]
fn bytes_roundtrip() {
let ts = HlcTimestamp::new(123_456_789_000_000, 1000);
let bytes = ts.to_bytes();
assert_eq!(bytes.len(), 12);
let ts2 = HlcTimestamp::from_bytes(&bytes);
assert_eq!(ts, ts2);
}
#[test]
fn bytes_roundtrip_zero() {
let ts = HlcTimestamp::ZERO;
let bytes = ts.to_bytes();
let ts2 = HlcTimestamp::from_bytes(&bytes);
assert_eq!(ts, ts2);
}
#[test]
fn bytes_roundtrip_max() {
let ts = HlcTimestamp::new(i64::MAX, i32::MAX);
let bytes = ts.to_bytes();
let ts2 = HlcTimestamp::from_bytes(&bytes);
assert_eq!(ts, ts2);
}
#[test]
fn bytes_preserve_order_for_positive_values() {
let a = HlcTimestamp::new(100, 5);
let b = HlcTimestamp::new(100, 6);
let c = HlcTimestamp::new(101, 0);
let ba = a.to_bytes();
let bb = b.to_bytes();
let bc = c.to_bytes();
assert!(ba < bb);
assert!(bb < bc);
}
#[test]
fn bytes_wall_time_is_big_endian() {
let ts = HlcTimestamp::new(0x0102_0304_0506_0708, 0);
let bytes = ts.to_bytes();
assert_eq!(
&bytes[0..8],
&[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
);
}
#[test]
fn bytes_logical_is_big_endian() {
let ts = HlcTimestamp::new(0, 0x01020304);
let bytes = ts.to_bytes();
assert_eq!(&bytes[8..12], &[0x01, 0x02, 0x03, 0x04]);
}
#[test]
fn display_format() {
let ts = HlcTimestamp::new(1_000_000_000, 5);
assert_eq!(format!("{ts}"), "1000000000:5");
}
#[test]
fn debug_format() {
let ts = HlcTimestamp::new(1_000_000_000, 5);
assert_eq!(format!("{ts:?}"), "HLC(1000000000ns:5)");
}
#[test]
fn manual_clock_basic() {
let mc = ManualClock::new(100);
assert_eq!(mc.now_ns(), 100);
mc.advance(50);
assert_eq!(mc.now_ns(), 150);
mc.set(200);
assert_eq!(mc.now_ns(), 200);
}
#[test]
fn system_clock_produces_reasonable_values() {
let sc = SystemClock;
let now = sc.now_ns();
let jan_2020_ns: i64 = 1_577_836_800 * SECOND;
assert!(now > jan_2020_ns);
assert!(now > 0);
}
#[test]
fn now_monotonic() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let t1 = clock.now().unwrap();
let t2 = clock.now().unwrap();
let t3 = clock.now().unwrap();
assert!(t1 < t2);
assert!(t2 < t3);
}
#[test]
fn now_same_physical_increments_logical() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let t1 = clock.now().unwrap();
let t2 = clock.now().unwrap();
assert_eq!(t1.wall_time(), 1000 * SECOND);
assert_eq!(t1.logical(), 0);
assert_eq!(t2.wall_time(), 1000 * SECOND);
assert_eq!(t2.logical(), 1);
}
#[test]
fn now_physical_advance_resets_logical() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let _t1 = clock.now().unwrap();
let _t2 = clock.now().unwrap();
assert_eq!(_t2.logical(), 1);
clock.physical_clock().advance(1);
let t3 = clock.now().unwrap();
assert_eq!(t3.wall_time(), 1000 * SECOND + 1);
assert_eq!(t3.logical(), 0);
}
#[test]
fn now_backward_jump_stays_at_high_watermark() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let t1 = clock.now().unwrap();
assert_eq!(t1.wall_time(), 1000 * SECOND);
clock.physical_clock().set(998 * SECOND);
let t2 = clock.now().unwrap();
assert_eq!(t2.wall_time(), 1000 * SECOND);
assert!(t2 > t1);
}
#[test]
fn now_counter_overflow() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX - 1));
let t = clock.now().unwrap();
assert_eq!(t.logical(), i32::MAX);
let err = clock.now().unwrap_err();
assert!(matches!(err, ClockError::CounterOverflow));
}
#[test]
fn now_counter_overflow_recovery_via_time_advance() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
clock.set_last(HlcTimestamp::new(1000 * SECOND, i32::MAX));
let err = clock.now().unwrap_err();
assert!(matches!(err, ClockError::CounterOverflow));
clock.physical_clock().advance(1);
let t = clock.now().unwrap();
assert_eq!(t.wall_time(), 1000 * SECOND + 1);
assert_eq!(t.logical(), 0);
}
#[test]
fn now_drift_protection() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
clock.set_max_drift_ns(SECOND);
clock.set_last(HlcTimestamp::new(1010 * SECOND, 0));
let err = clock.now().unwrap_err();
assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
}
#[test]
fn update_remote_behind() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let _t1 = clock.now().unwrap();
let remote = HlcTimestamp::new(500 * SECOND, 99);
clock.update(remote).unwrap();
assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
assert_eq!(clock.last_timestamp().logical(), 0);
}
#[test]
fn update_remote_ahead() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let _t1 = clock.now().unwrap();
let remote = HlcTimestamp::new(1002 * SECOND, 5);
clock.update(remote).unwrap();
assert_eq!(clock.last_timestamp().wall_time(), 1002 * SECOND);
assert_eq!(clock.last_timestamp().logical(), 5);
let t2 = clock.now().unwrap();
assert!(t2 > remote);
assert_eq!(t2.wall_time(), 1002 * SECOND);
assert_eq!(t2.logical(), 6);
}
#[test]
fn update_remote_same_wall_time_higher_logical() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let _t1 = clock.now().unwrap();
let remote = HlcTimestamp::new(1000 * SECOND, 10);
clock.update(remote).unwrap();
assert_eq!(clock.last_timestamp().logical(), 10);
let t2 = clock.now().unwrap();
assert_eq!(t2.logical(), 11);
}
#[test]
fn update_remote_same_wall_time_lower_logical() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
for _ in 0..5 {
clock.now().unwrap();
}
let remote = HlcTimestamp::new(1000 * SECOND, 2);
clock.update(remote).unwrap();
assert_eq!(clock.last_timestamp().logical(), 4);
}
#[test]
fn update_drift_exceeded() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
clock.set_max_drift_ns(SECOND);
let remote = HlcTimestamp::new(1010 * SECOND, 0);
let err = clock.update(remote).unwrap_err();
assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
assert_eq!(clock.last_timestamp(), HlcTimestamp::ZERO);
}
#[test]
fn update_drift_boundary_exact() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
clock.set_max_drift_ns(SECOND);
let remote = HlcTimestamp::new(1001 * SECOND, 0);
clock.update(remote).unwrap();
assert_eq!(clock.last_timestamp().wall_time(), 1001 * SECOND);
let mc2 = ManualClock::new(1000 * SECOND);
let mut clock2 = HlcClock::with_clock(mc2);
clock2.set_max_drift_ns(SECOND);
let remote2 = HlcTimestamp::new(1001 * SECOND + 1, 0);
let err = clock2.update(remote2).unwrap_err();
assert!(matches!(err, ClockError::ClockDriftExceeded { .. }));
}
#[test]
fn update_zero_timestamp_is_noop() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let _t1 = clock.now().unwrap();
clock.update(HlcTimestamp::ZERO).unwrap();
assert_eq!(clock.last_timestamp().wall_time(), 1000 * SECOND);
}
#[test]
fn set_last_restores_monotonicity() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
let persisted = HlcTimestamp::new(1000 * SECOND, 50);
clock.set_last(persisted);
let t1 = clock.now().unwrap();
assert!(t1 > persisted);
assert_eq!(t1.logical(), 51);
}
#[test]
fn two_clocks_converge() {
let mc_a = ManualClock::new(1000 * SECOND);
let mc_b = ManualClock::new(1000 * SECOND + 50 * MS);
let mut clock_a = HlcClock::with_clock(mc_a);
let mut clock_b = HlcClock::with_clock(mc_b);
let ta1 = clock_a.now().unwrap();
assert_eq!(ta1.wall_time(), 1000 * SECOND);
let tb1 = clock_b.now().unwrap();
assert_eq!(tb1.wall_time(), 1000 * SECOND + 50 * MS);
clock_a.update(tb1).unwrap();
let ta2 = clock_a.now().unwrap();
assert_eq!(ta2.wall_time(), 1000 * SECOND + 50 * MS);
assert_eq!(ta2.logical(), 1);
clock_b.update(ta1).unwrap();
let tb2 = clock_b.now().unwrap();
assert_eq!(tb2.wall_time(), 1000 * SECOND + 50 * MS);
assert!(tb2 > tb1);
}
#[test]
fn causal_ordering_preserved() {
let mc_a = ManualClock::new(1000 * SECOND);
let mc_b = ManualClock::new(1000 * SECOND);
let mut clock_a = HlcClock::with_clock(mc_a);
let mut clock_b = HlcClock::with_clock(mc_b);
let ta1 = clock_a.now().unwrap();
clock_b.update(ta1).unwrap();
let tb1 = clock_b.now().unwrap();
assert!(ta1 < tb1);
}
#[test]
fn physical_time_advance_during_sync() {
let mc_a = ManualClock::new(1000 * SECOND);
let mc_b = ManualClock::new(1000 * SECOND);
let mut clock_a = HlcClock::with_clock(mc_a);
let mut clock_b = HlcClock::with_clock(mc_b);
let ta = clock_a.now().unwrap(); let tb = clock_b.now().unwrap();
clock_a.physical_clock().advance(100 * MS);
clock_b.physical_clock().advance(100 * MS);
clock_a.update(tb).unwrap();
let ta2 = clock_a.now().unwrap();
assert_eq!(ta2.wall_time(), 1000 * SECOND + 100 * MS);
assert_eq!(ta2.logical(), 0);
assert!(ta2 > ta);
assert!(ta2 > tb);
}
#[test]
fn three_node_ring_sync() {
let mc_a = ManualClock::new(1000 * SECOND);
let mc_b = ManualClock::new(1000 * SECOND + 10 * MS);
let mc_c = ManualClock::new(1000 * SECOND + 20 * MS);
let mut a = HlcClock::with_clock(mc_a);
let mut b = HlcClock::with_clock(mc_b);
let mut c = HlcClock::with_clock(mc_c);
let ta = a.now().unwrap();
let tb = b.now().unwrap();
let tc = c.now().unwrap();
b.update(ta).unwrap();
let tb2 = b.now().unwrap();
assert!(tb2 > tb);
assert!(tb2 > ta);
c.update(tb2).unwrap();
let tc2 = c.now().unwrap();
assert!(tc2 > tc);
assert!(tc2 > tb2);
a.update(tc2).unwrap();
let ta2 = a.now().unwrap();
assert!(ta2 > ta);
assert!(ta2 > tc2);
}
#[test]
fn many_events_same_nanosecond() {
let mc = ManualClock::new(1000 * SECOND);
let mut clock = HlcClock::with_clock(mc);
for i in 0i32..1000 {
let t = clock.now().unwrap();
assert_eq!(t.logical(), i);
}
}
#[test]
fn hash_consistency() {
use std::collections::HashSet;
let a = HlcTimestamp::new(100, 5);
let b = HlcTimestamp::new(100, 5);
let c = HlcTimestamp::new(100, 6);
let mut set = HashSet::new();
set.insert(a);
assert!(set.contains(&b));
assert!(!set.contains(&c));
}
#[test]
fn system_clock_hlc_integration() {
let mut clock = HlcClock::new();
let t1 = clock.now().unwrap();
let t2 = clock.now().unwrap();
assert!(t2 > t1);
assert!(!t1.is_zero());
}
#[test]
fn nanosecond_precision_preserved() {
let ts = HlcTimestamp::new(1_741_000_000_123_456_789, 0);
assert_eq!(ts.wall_time(), 1_741_000_000_123_456_789);
let bytes = ts.to_bytes();
let ts2 = HlcTimestamp::from_bytes(&bytes);
assert_eq!(ts2.wall_time(), 1_741_000_000_123_456_789);
}
#[test]
fn sub_millisecond_ordering() {
let a = HlcTimestamp::new(1000 * SECOND, 0);
let b = HlcTimestamp::new(1000 * SECOND + 1000, 0); assert!(a < b);
let c = HlcTimestamp::new(1000 * SECOND, 0);
let d = HlcTimestamp::new(1000 * SECOND + 1, 0); assert!(c < d);
}
#[test]
fn i32_max_logical_counter() {
let ts = HlcTimestamp::new(1000 * SECOND, i32::MAX);
assert_eq!(ts.logical(), i32::MAX);
let bytes = ts.to_bytes();
let ts2 = HlcTimestamp::from_bytes(&bytes);
assert_eq!(ts2.logical(), i32::MAX);
}
#[test]
fn wire_size_is_12() {
assert_eq!(HLC_TIMESTAMP_SIZE, 12);
assert_eq!(std::mem::size_of::<i64>() + std::mem::size_of::<i32>(), 12);
}
}