use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Hlc(pub u64);
impl Hlc {
pub const ZERO: Hlc = Hlc(0);
pub const PHYSICAL_BITS: u32 = 48;
pub const LOGICAL_BITS: u32 = 16;
pub const LOGICAL_MASK: u64 = (1 << Self::LOGICAL_BITS) - 1;
pub fn from_packed(packed: u64) -> Self {
Hlc(packed)
}
pub fn pack(physical_ms: u64, logical: u16) -> Self {
let phys = physical_ms & ((1u64 << Self::PHYSICAL_BITS) - 1);
Hlc((phys << Self::LOGICAL_BITS) | (logical as u64))
}
pub fn physical_ms(&self) -> u64 {
self.0 >> Self::LOGICAL_BITS
}
pub fn logical(&self) -> u16 {
(self.0 & Self::LOGICAL_MASK) as u16
}
pub fn to_le_bytes(&self) -> [u8; 8] {
self.0.to_le_bytes()
}
pub fn from_le_bytes(buf: [u8; 8]) -> Self {
Hlc(u64::from_le_bytes(buf))
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum HlcError {
#[error("clock skew of {0} ms exceeds the {1} ms tolerance")]
SkewExceeded(u64, u64),
#[error("logical counter overflow at physical ms {0}")]
LogicalOverflow(u64),
#[error("system clock returned a time before the UNIX epoch")]
BeforeEpoch,
}
pub const DEFAULT_MAX_SKEW_MS: u64 = 60 * 60 * 1000;
pub struct HlcClock {
state: Mutex<Hlc>,
max_skew_ms: u64,
}
impl HlcClock {
pub fn new() -> Self {
Self::with_max_skew(DEFAULT_MAX_SKEW_MS)
}
pub fn with_max_skew(max_skew_ms: u64) -> Self {
let initial = wall_now_ms().unwrap_or(0);
Self {
state: Mutex::new(Hlc::pack(initial, 0)),
max_skew_ms,
}
}
pub fn now(&self) -> Result<Hlc, HlcError> {
let mut state = self.state.lock();
let wall = wall_now_ms()?;
let prev_phys = state.physical_ms();
let prev_log = state.logical();
let next = if wall > prev_phys {
Hlc::pack(wall, 0)
} else {
let new_log = prev_log
.checked_add(1)
.ok_or(HlcError::LogicalOverflow(prev_phys))?;
Hlc::pack(prev_phys, new_log)
};
*state = next;
Ok(next)
}
pub fn update(&self, remote: Hlc) -> Result<Hlc, HlcError> {
let mut state = self.state.lock();
let wall = wall_now_ms()?;
if remote.physical_ms() > wall + self.max_skew_ms {
return Err(HlcError::SkewExceeded(
remote.physical_ms() - wall,
self.max_skew_ms,
));
}
let prev_phys = state.physical_ms();
let prev_log = state.logical();
let rem_phys = remote.physical_ms();
let rem_log = remote.logical();
let new_phys = wall.max(prev_phys).max(rem_phys);
let new_log = if new_phys == prev_phys && new_phys == rem_phys {
prev_log
.max(rem_log)
.checked_add(1)
.ok_or(HlcError::LogicalOverflow(new_phys))?
} else if new_phys == prev_phys {
prev_log
.checked_add(1)
.ok_or(HlcError::LogicalOverflow(new_phys))?
} else if new_phys == rem_phys {
rem_log
.checked_add(1)
.ok_or(HlcError::LogicalOverflow(new_phys))?
} else {
0
};
let next = Hlc::pack(new_phys, new_log);
*state = next;
Ok(next)
}
pub fn read(&self) -> Hlc {
*self.state.lock()
}
}
impl Default for HlcClock {
fn default() -> Self {
Self::new()
}
}
fn wall_now_ms() -> Result<u64, HlcError> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.map_err(|_| HlcError::BeforeEpoch)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hlc_pack_unpack_round_trip() {
let cases = [
(0u64, 0u16),
(1, 1),
(1234567890, 42),
((1u64 << 48) - 1, u16::MAX),
];
for (phys, log) in cases {
let h = Hlc::pack(phys, log);
assert_eq!(h.physical_ms(), phys, "physical mismatch");
assert_eq!(h.logical(), log, "logical mismatch");
assert_eq!(Hlc::from_le_bytes(h.to_le_bytes()), h);
}
}
#[test]
fn test_hlc_now_strictly_monotonic() {
let clock = HlcClock::new();
let mut prev = Hlc::ZERO;
for _ in 0..1000 {
let h = clock.now().unwrap();
assert!(h > prev, "HLC must be strictly monotonic: {h:?} > {prev:?}");
prev = h;
}
}
#[test]
fn test_hlc_update_with_higher_remote() {
let clock = HlcClock::new();
let local_now = clock.now().unwrap();
let remote = Hlc::pack(local_now.physical_ms() + 1000, 5);
let after = clock.update(remote).unwrap();
assert!(after >= remote);
assert!(after.physical_ms() >= remote.physical_ms());
}
#[test]
fn test_hlc_update_with_lower_remote() {
let clock = HlcClock::new();
let local = clock.now().unwrap();
let remote = Hlc::pack(local.physical_ms().saturating_sub(10_000), 0);
let after = clock.update(remote).unwrap();
assert!(
after >= local,
"after ({after:?}) must be >= local ({local:?})"
);
}
#[test]
fn test_hlc_update_with_equal_remote() {
let clock = HlcClock::with_max_skew(60_000);
let local = clock.now().unwrap();
let remote = Hlc::pack(local.physical_ms(), local.logical());
let after = clock.update(remote).unwrap();
assert!(
after > local,
"after ({after:?}) must be > local ({local:?})"
);
assert!(after > remote);
}
#[test]
fn test_hlc_update_skew_exceeded() {
let clock = HlcClock::with_max_skew(1_000); let wall = wall_now_ms().unwrap();
let remote = Hlc::pack(wall + 7_200_000, 0); let err = clock.update(remote).unwrap_err();
match err {
HlcError::SkewExceeded(skew, tol) => {
assert!(skew >= 7_200_000 - 1_000);
assert_eq!(tol, 1_000);
}
other => panic!("expected SkewExceeded, got {other:?}"),
}
}
#[test]
fn test_hlc_logical_overflow_safety() {
let clock = HlcClock::new();
let local = clock.now().unwrap();
let remote = Hlc::pack(local.physical_ms(), u16::MAX);
let err = clock.update(remote).unwrap_err();
assert!(matches!(err, HlcError::LogicalOverflow(_)));
}
#[test]
fn test_hlc_read_does_not_advance() {
let clock = HlcClock::new();
let h1 = clock.now().unwrap();
let r1 = clock.read();
let r2 = clock.read();
assert_eq!(r1, h1);
assert_eq!(r2, h1);
}
#[test]
fn test_hlc_ordering_via_packed() {
let a = Hlc::pack(100, 0);
let b = Hlc::pack(100, 1);
let c = Hlc::pack(101, 0);
assert!(a < b);
assert!(b < c);
assert!(a < c);
}
}