use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::error::{ArrayError, ArrayResult};
use crate::sync::replica_id::ReplicaId;
pub const MAX_PHYSICAL_MS: u64 = (1u64 << 48) - 1;
#[derive(
Copy,
Clone,
Debug,
PartialEq,
Eq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct Hlc {
pub physical_ms: u64,
pub logical: u16,
pub replica_id: ReplicaId,
}
impl Hlc {
pub const ZERO: Self = Self {
physical_ms: 0,
logical: 0,
replica_id: ReplicaId(0),
};
pub fn new(physical_ms: u64, logical: u16, replica_id: ReplicaId) -> ArrayResult<Self> {
if physical_ms > MAX_PHYSICAL_MS {
return Err(ArrayError::InvalidHlc {
detail: format!(
"physical_ms {physical_ms} exceeds MAX_PHYSICAL_MS {MAX_PHYSICAL_MS}"
),
});
}
Ok(Self {
physical_ms,
logical,
replica_id,
})
}
pub fn to_bytes(&self) -> [u8; 18] {
let mut out = [0u8; 18];
out[0..8].copy_from_slice(&self.physical_ms.to_be_bytes());
out[8..10].copy_from_slice(&self.logical.to_be_bytes());
out[10..18].copy_from_slice(&self.replica_id.0.to_be_bytes());
out
}
pub fn from_bytes(b: &[u8; 18]) -> Self {
let physical_ms = u64::from_be_bytes(
b[0..8]
.try_into()
.expect("invariant: b is [u8; 18], slice [0..8] is exactly 8 bytes"),
);
let logical = u16::from_be_bytes(
b[8..10]
.try_into()
.expect("invariant: b is [u8; 18], slice [8..10] is exactly 2 bytes"),
);
let replica_id =
ReplicaId(u64::from_be_bytes(b[10..18].try_into().expect(
"invariant: b is [u8; 18], slice [10..18] is exactly 8 bytes",
)));
Self {
physical_ms,
logical,
replica_id,
}
}
}
impl PartialOrd for Hlc {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Hlc {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.physical_ms, self.logical, self.replica_id.0).cmp(&(
other.physical_ms,
other.logical,
other.replica_id.0,
))
}
}
pub struct HlcGenerator {
replica_id: ReplicaId,
state: Mutex<(u64, u16)>,
}
impl HlcGenerator {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
replica_id,
state: Mutex::new((0, 0)),
}
}
fn now_ms() -> ArrayResult<u64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.map_err(|e| ArrayError::InvalidHlc {
detail: format!("system clock before Unix epoch: {e}"),
})
}
pub fn next(&self) -> ArrayResult<Hlc> {
let now_ms = Self::now_ms()?;
if now_ms > MAX_PHYSICAL_MS {
return Err(ArrayError::InvalidHlc {
detail: format!("system clock {now_ms} exceeds MAX_PHYSICAL_MS"),
});
}
let mut guard = self.state.lock().map_err(|_| ArrayError::HlcLockPoisoned)?;
let (last_physical, last_logical) = *guard;
let new_physical = now_ms.max(last_physical);
let new_logical = if new_physical == last_physical {
last_logical
.checked_add(1)
.ok_or_else(|| ArrayError::InvalidHlc {
detail: "logical counter overflow within one millisecond".into(),
})?
} else {
0
};
*guard = (new_physical, new_logical);
drop(guard);
Hlc::new(new_physical, new_logical, self.replica_id)
}
pub fn observe(&self, remote: Hlc) -> ArrayResult<()> {
let now_ms = Self::now_ms()?.min(MAX_PHYSICAL_MS);
let mut guard = self.state.lock().map_err(|_| ArrayError::HlcLockPoisoned)?;
let (last_physical, last_logical) = *guard;
let new_physical = now_ms.max(last_physical).max(remote.physical_ms);
let new_logical = if new_physical == last_physical && new_physical == remote.physical_ms {
last_logical
.max(remote.logical)
.checked_add(1)
.ok_or_else(|| ArrayError::InvalidHlc {
detail: "logical counter overflow during observe".into(),
})?
} else if new_physical == last_physical {
last_logical
.checked_add(1)
.ok_or_else(|| ArrayError::InvalidHlc {
detail: "logical counter overflow during observe (local wins)".into(),
})?
} else if new_physical == remote.physical_ms {
remote
.logical
.checked_add(1)
.ok_or_else(|| ArrayError::InvalidHlc {
detail: "logical counter overflow during observe (remote wins)".into(),
})?
} else {
0
};
*guard = (new_physical, new_logical);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_replica() -> ReplicaId {
ReplicaId::new(42)
}
#[test]
fn monotonic_under_fast_calls() {
let g = HlcGenerator::new(test_replica());
let mut prev = g.next().unwrap();
for _ in 0..999 {
let curr = g.next().unwrap();
assert!(
curr > prev,
"HLC must be strictly increasing: {curr:?} <= {prev:?}"
);
prev = curr;
}
}
#[test]
fn survives_clock_skew_injection() {
let g = HlcGenerator::new(test_replica());
let baseline = g.next().unwrap();
let future_physical = baseline.physical_ms + 100_000; let future_hlc = Hlc::new(future_physical, 50, ReplicaId::new(99)).unwrap();
g.observe(future_hlc).unwrap();
let next = g.next().unwrap();
assert!(
next > future_hlc,
"next {next:?} should be > observed future {future_hlc:?}"
);
}
#[test]
fn to_bytes_roundtrip() {
let hlc = Hlc::new(123_456_789, 7, ReplicaId::new(0xabcd)).unwrap();
let bytes = hlc.to_bytes();
let back = Hlc::from_bytes(&bytes);
assert_eq!(hlc, back);
}
#[test]
fn byte_order_matches_lexicographic() {
use std::collections::BTreeMap;
let replica = ReplicaId::new(1);
let mut hlcs: Vec<Hlc> = (0u64..100)
.map(|i| Hlc {
physical_ms: i / 10,
logical: (i % 10) as u16,
replica_id: replica,
})
.collect();
let mut by_ord = hlcs.clone();
by_ord.sort();
hlcs.sort_by_key(|h| h.to_bytes());
assert_eq!(by_ord, hlcs, "byte sort must match Ord sort");
let mut map: BTreeMap<[u8; 18], Hlc> = BTreeMap::new();
for h in &by_ord {
map.insert(h.to_bytes(), *h);
}
let map_order: Vec<Hlc> = map.into_values().collect();
assert_eq!(by_ord, map_order);
}
#[test]
fn serialize_roundtrip() {
let hlc = Hlc::new(9_999, 3, ReplicaId::new(77)).unwrap();
let bytes = zerompk::to_msgpack_vec(&hlc).expect("serialize");
let back: Hlc = zerompk::from_msgpack(&bytes).expect("deserialize");
assert_eq!(hlc, back);
}
#[test]
fn physical_ms_overflow_errors() {
let result = Hlc::new(MAX_PHYSICAL_MS + 1, 0, ReplicaId::new(1));
assert!(
matches!(result, Err(ArrayError::InvalidHlc { .. })),
"expected InvalidHlc, got: {result:?}"
);
}
#[test]
fn hlc_zero_is_minimum() {
let non_zero = Hlc::new(1, 0, ReplicaId::new(0)).unwrap();
assert!(Hlc::ZERO < non_zero);
}
}