use super::timestamp::Timestamp;
pub const MAX_SKEW_MS: u64 = 60_000;
pub struct Hlc {
state: Timestamp,
}
impl Hlc {
pub fn new(seed: u64) -> Self {
Self {
state: Timestamp::from_raw(seed),
}
}
pub fn tick(&mut self, local_wall_ms: u64) -> Timestamp {
let old_wall = self.state.millis();
let old_counter = self.state.counter();
let mut new_wall = local_wall_ms.max(old_wall);
let new_counter = if new_wall == old_wall {
let c = old_counter.saturating_add(1);
if c == old_counter {
new_wall += 1;
0
} else {
c
}
} else {
0
};
self.state = Timestamp::from_parts(new_wall, new_counter);
self.state
}
pub fn observe(&mut self, received: Timestamp, local_wall_ms: u64) -> Result<(), SkewError> {
if !Self::is_within_skew(received, local_wall_ms) {
return Err(SkewError {
received,
local_wall_ms,
});
}
if received > self.state {
self.state = received;
}
Ok(())
}
pub fn state(&self) -> Timestamp {
self.state
}
pub fn is_within_skew(received: Timestamp, local_wall_ms: u64) -> bool {
let remote_wall = received.millis();
remote_wall <= local_wall_ms.saturating_add(MAX_SKEW_MS)
}
}
pub fn wall_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SkewError {
pub received: Timestamp,
pub local_wall_ms: u64,
}
impl std::fmt::Display for SkewError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"remote timestamp {} ms ahead of local wall clock (max skew {} ms)",
self.received.millis().saturating_sub(self.local_wall_ms),
MAX_SKEW_MS
)
}
}
impl std::error::Error for SkewError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tick_is_monotone() {
let mut hlc = Hlc::new(0);
let t1 = hlc.tick(wall_ms());
let t2 = hlc.tick(wall_ms());
let t3 = hlc.tick(wall_ms());
assert!(t2 > t1);
assert!(t3 > t2);
}
#[test]
fn tick_after_observe_beats_observed() {
let mut hlc = Hlc::new(0);
let local = wall_ms();
let remote = Timestamp::from_parts(local + MAX_SKEW_MS, 999);
hlc.observe(remote, local).unwrap();
let t = hlc.tick(wall_ms());
assert!(t > remote);
}
#[test]
fn counter_saturation_advances_wall() {
let far_future = (1u64 << 48) - 2;
let saturated = Timestamp::from_parts(far_future, u16::MAX);
let mut hlc = Hlc::new(saturated.raw());
let t = hlc.tick(wall_ms());
assert!(t > saturated, "must advance past saturated counter");
assert_eq!(t.millis(), far_future + 1);
assert_eq!(t.counter(), 0);
}
#[test]
fn observe_stale_is_noop() {
let mut hlc = Hlc::new(1000);
hlc.observe(Timestamp::from_raw(500), wall_ms()).unwrap();
assert_eq!(hlc.state(), Timestamp::from_raw(1000));
}
#[test]
fn observe_rejects_beyond_skew_without_advancing() {
let local = wall_ms();
let too_far = Timestamp::from_parts(local + MAX_SKEW_MS + 1, 0);
let mut hlc = Hlc::new(0);
let err = hlc.observe(too_far, local).unwrap_err();
assert_eq!(err.received, too_far);
assert_eq!(hlc.state(), Timestamp::from_raw(0));
}
#[test]
fn skew_accepts_past_timestamps() {
let local = wall_ms();
let past = Timestamp::from_parts(local.saturating_sub(10 * 365 * 86_400_000), 0); assert!(Hlc::is_within_skew(past, local));
}
#[test]
fn skew_accepts_within_window() {
let local = wall_ms();
assert!(Hlc::is_within_skew(Timestamp::from_parts(local, 0), local));
let edge = Timestamp::from_parts(local + MAX_SKEW_MS, 0);
assert!(Hlc::is_within_skew(edge, local));
}
#[test]
fn skew_rejects_beyond_window() {
let local = wall_ms();
let too_far = Timestamp::from_parts(local + MAX_SKEW_MS + 1, 0);
assert!(!Hlc::is_within_skew(too_far, local));
let far_future = Timestamp::from_parts(local + (365 * 86_400_000), 0); assert!(!Hlc::is_within_skew(far_future, local));
}
}