use std::{
sync::Mutex,
time::{Duration, SystemTime, UNIX_EPOCH},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HlcError {
SystemClockError,
FutureDrift {
received_physical_ns: u64,
local_physical_ns: u64,
max_drift_ns: u64,
},
LogicalOverflow,
}
impl std::fmt::Display for HlcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SystemClockError => {
write!(f, "system clock error: time is before the Unix epoch")
}
Self::FutureDrift {
received_physical_ns,
local_physical_ns,
max_drift_ns,
} => {
let ahead_ms = (received_physical_ns - local_physical_ns) as f64 / 1_000_000.0;
let max_ms = *max_drift_ns as f64 / 1_000_000.0;
write!(
f,
"received timestamp is {ahead_ms:.3}ms ahead of local clock \
(max allowed drift: {max_ms:.3}ms)"
)
}
Self::LogicalOverflow => write!(f, "logical counter overflow (u32::MAX exceeded)"),
}
}
}
impl std::error::Error for HlcError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct HlcTimestamp {
pub physical: u64,
pub logical: u32,
}
impl HlcTimestamp {
#[must_use]
#[inline]
pub fn happened_before(self, other: HlcTimestamp) -> bool {
self < other
}
#[must_use]
#[inline]
pub fn concurrent_with(self, other: HlcTimestamp) -> bool {
self == other
}
#[must_use]
#[inline]
pub fn to_bytes(self) -> [u8; 12] {
let mut buf = [0u8; 12];
buf[..8].copy_from_slice(&self.physical.to_be_bytes());
buf[8..].copy_from_slice(&self.logical.to_be_bytes());
buf
}
#[must_use]
#[inline]
pub fn from_bytes(bytes: [u8; 12]) -> Self {
let physical = u64::from_be_bytes(bytes[..8].try_into().unwrap());
let logical = u32::from_be_bytes(bytes[8..].try_into().unwrap());
Self { physical, logical }
}
}
impl PartialOrd for HlcTimestamp {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HlcTimestamp {
#[inline]
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.physical, self.logical).cmp(&(other.physical, other.logical))
}
}
#[derive(Debug)]
struct HlcState {
physical: u64, logical: u32,
}
#[derive(Debug)]
pub struct HlcClock {
state: Mutex<HlcState>,
max_drift_ns: u64,
}
impl HlcClock {
pub fn new() -> Self {
Self::with_max_drift(Duration::from_millis(500))
}
pub fn with_max_drift(max_drift: Duration) -> Self {
Self {
state: Mutex::new(HlcState {
physical: 0,
logical: 0,
}),
max_drift_ns: u64::try_from(max_drift.as_nanos()).unwrap_or(u64::MAX),
}
}
pub fn now(&self) -> Result<HlcTimestamp, HlcError> {
let pt = physical_now()?;
let mut s = self.state.lock().expect("HLC state lock poisoned");
let new_pt = pt.max(s.physical);
let new_l = if new_pt == s.physical {
s.logical.checked_add(1).ok_or(HlcError::LogicalOverflow)?
} else {
0
};
s.physical = new_pt;
s.logical = new_l;
Ok(HlcTimestamp {
physical: new_pt,
logical: new_l,
})
}
pub fn recv(&self, msg: HlcTimestamp) -> Result<HlcTimestamp, HlcError> {
let pt = physical_now()?;
if msg.physical > pt.saturating_add(self.max_drift_ns) {
return Err(HlcError::FutureDrift {
received_physical_ns: msg.physical,
local_physical_ns: pt,
max_drift_ns: self.max_drift_ns,
});
}
let mut s = self.state.lock().expect("HLC state lock poisoned");
let new_pt = pt.max(s.physical).max(msg.physical);
let new_l = match (new_pt == s.physical, new_pt == msg.physical) {
(true, true) => s
.logical
.max(msg.logical)
.checked_add(1)
.ok_or(HlcError::LogicalOverflow)?,
(true, false) => s.logical.checked_add(1).ok_or(HlcError::LogicalOverflow)?,
(false, true) => msg
.logical
.checked_add(1)
.ok_or(HlcError::LogicalOverflow)?,
(false, false) => 0, };
s.physical = new_pt;
s.logical = new_l;
Ok(HlcTimestamp {
physical: new_pt,
logical: new_l,
})
}
#[must_use]
pub fn last(&self) -> HlcTimestamp {
let s = self.state.lock().expect("HLC state lock poisoned");
HlcTimestamp {
physical: s.physical,
logical: s.logical,
}
}
}
impl Default for HlcClock {
fn default() -> Self {
Self::new()
}
}
pub(crate) fn physical_now() -> Result<u64, HlcError> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|_| HlcError::SystemClockError)
.and_then(|d| u64::try_from(d.as_nanos()).map_err(|_| HlcError::SystemClockError))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn timestamp_ordering() {
let a = HlcTimestamp {
physical: 100,
logical: 0,
};
let b = HlcTimestamp {
physical: 100,
logical: 1,
};
let c = HlcTimestamp {
physical: 200,
logical: 0,
};
assert!(a < b);
assert!(b < c);
assert!(a < c);
assert!(a.happened_before(b));
assert!(b.happened_before(c));
}
#[test]
fn timestamp_concurrent() {
let a = HlcTimestamp {
physical: 42,
logical: 7,
};
assert!(a.concurrent_with(a));
let b = HlcTimestamp {
physical: 42,
logical: 8,
};
assert!(!a.concurrent_with(b));
}
#[test]
fn timestamp_bytes_roundtrip() {
let ts = HlcTimestamp {
physical: 1_700_000_000_000_000_000,
logical: 12345,
};
let restored = HlcTimestamp::from_bytes(ts.to_bytes());
assert_eq!(ts, restored);
}
#[test]
fn bytes_preserve_order() {
let a = HlcTimestamp {
physical: 100,
logical: 5,
};
let b = HlcTimestamp {
physical: 100,
logical: 6,
};
assert!(a.to_bytes() < b.to_bytes());
let c = HlcTimestamp {
physical: 99,
logical: 999,
};
let d = HlcTimestamp {
physical: 100,
logical: 0,
};
assert!(c.to_bytes() < d.to_bytes());
}
#[test]
fn now_is_monotonic() {
let clock = HlcClock::new();
let mut prev = clock.now().unwrap();
for _ in 0..1000 {
let next = clock.now().unwrap();
assert!(
prev < next,
"now() must be strictly monotonic: {prev:?} >= {next:?}"
);
prev = next;
}
}
#[test]
fn recv_advances_past_message() {
let sender = HlcClock::new();
let receiver = HlcClock::new();
let send_ts = sender.now().unwrap();
let recv_ts = receiver.recv(send_ts).unwrap();
assert!(
send_ts.happened_before(recv_ts),
"receive timestamp must be after the send timestamp"
);
}
#[test]
fn recv_preserves_causality_chain() {
let a = HlcClock::new();
let b = HlcClock::new();
let c = HlcClock::new();
let ts_a = a.now().unwrap();
let ts_b = b.recv(ts_a).unwrap(); let ts_c = c.recv(ts_b).unwrap();
assert!(ts_a.happened_before(ts_b));
assert!(ts_b.happened_before(ts_c));
assert!(ts_a.happened_before(ts_c)); }
#[test]
fn recv_rejects_future_drift() {
let clock = HlcClock::with_max_drift(Duration::from_millis(100));
let far_future = HlcTimestamp {
physical: physical_now().unwrap() + 10_000_000_000, logical: 0,
};
assert!(matches!(
clock.recv(far_future),
Err(HlcError::FutureDrift { .. })
));
}
#[test]
fn last_does_not_advance_clock() {
let clock = HlcClock::new();
let ts1 = clock.now().unwrap();
let last = clock.last();
let ts2 = clock.now().unwrap();
assert_eq!(last, ts1);
assert!(ts1 < ts2);
}
#[test]
fn snapshot_isolation_lower_bound() {
let writer = HlcClock::new();
let reader = HlcClock::new();
let write_ts = writer.now().unwrap();
let snapshot_ts = reader.recv(write_ts).unwrap();
assert!(write_ts.happened_before(snapshot_ts));
}
}