use serde::{Deserialize, Serialize};
use std::time::SystemTime;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct HybridClock {
pub id: String,
pub physical_ms: u64,
pub logical: u32,
}
fn current_time_ms() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
impl HybridClock {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
physical_ms: current_time_ms(),
logical: 0,
}
}
pub fn with_values(id: impl Into<String>, physical_ms: u64, logical: u32) -> Self {
Self {
id: id.into(),
physical_ms,
logical,
}
}
pub fn tick(&mut self) -> (u64, u32) {
let wall = current_time_ms();
if wall > self.physical_ms {
self.physical_ms = wall;
self.logical = 0;
} else {
self.logical = self.logical.saturating_add(1);
}
(self.physical_ms, self.logical)
}
pub fn merge(&mut self, remote: &HybridClock) -> (u64, u32) {
let wall = current_time_ms();
let new_physical = self.physical_ms.max(remote.physical_ms).max(wall);
if new_physical > self.physical_ms && new_physical > remote.physical_ms {
self.logical = 0;
} else if new_physical == self.physical_ms && new_physical == remote.physical_ms {
self.logical = self.logical.max(remote.logical).saturating_add(1);
} else if new_physical == self.physical_ms {
self.logical = self.logical.saturating_add(1);
} else {
self.logical = remote.logical.saturating_add(1);
}
self.physical_ms = new_physical;
(self.physical_ms, self.logical)
}
pub fn cmp_order(&self, other: &HybridClock) -> std::cmp::Ordering {
self.physical_ms
.cmp(&other.physical_ms)
.then_with(|| self.logical.cmp(&other.logical))
.then_with(|| other.id.cmp(&self.id)) }
pub fn as_tuple(&self) -> (u64, u32) {
(self.physical_ms, self.logical)
}
}
pub type LamportClock = HybridClock;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hlc_monotonic() {
let mut clock = HybridClock::new("node-a");
let (p1, l1) = clock.tick();
let (p2, l2) = clock.tick();
assert!(
(p2, l2) >= (p1, l1),
"clock must be monotonic: ({p1},{l1}) -> ({p2},{l2})"
);
}
#[test]
fn hlc_merge_advances() {
let mut local = HybridClock::with_values("node-a", 100, 0);
let remote = HybridClock::with_values("node-b", 200, 5);
let (p, l) = local.merge(&remote);
assert!(p >= 200, "merge should advance physical to at least remote");
assert!(
(p, l) > (200, 5),
"merged clock should be ahead of remote: ({p},{l}) vs (200,5)"
);
}
#[test]
fn hlc_merge_local_ahead() {
let mut local = HybridClock::with_values("node-a", 500, 10);
let remote = HybridClock::with_values("node-b", 100, 0);
let (p, _l) = local.merge(&remote);
assert!(p >= 500, "local was ahead — physical should stay >= 500");
}
#[test]
fn hlc_tiebreak_deterministic() {
let a = HybridClock::with_values("alpha", 100, 5);
let b = HybridClock::with_values("beta", 100, 5);
assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
assert_eq!(b.cmp_order(&a), std::cmp::Ordering::Less);
}
#[test]
fn hlc_physical_beats_logical() {
let a = HybridClock::with_values("node-a", 200, 0);
let b = HybridClock::with_values("node-b", 100, 999);
assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
}
#[test]
fn hlc_logical_breaks_physical_tie() {
let a = HybridClock::with_values("node-a", 100, 10);
let b = HybridClock::with_values("node-b", 100, 5);
assert_eq!(a.cmp_order(&b), std::cmp::Ordering::Greater);
}
#[test]
fn hlc_serialization_roundtrip() {
let clock = HybridClock::with_values("node-x", 1711234567890, 42);
let bytes = rmp_serde::to_vec(&clock).unwrap();
let decoded: HybridClock = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(clock, decoded);
}
#[test]
fn hlc_logical_saturates() {
let mut clock = HybridClock::with_values("node-a", u64::MAX, u32::MAX);
let (p, l) = clock.tick();
assert_eq!(p, u64::MAX);
assert_eq!(l, u32::MAX);
}
#[test]
fn hlc_with_values_constructor() {
let clock = HybridClock::with_values("test", 42, 7);
assert_eq!(clock.id, "test");
assert_eq!(clock.physical_ms, 42);
assert_eq!(clock.logical, 7);
}
#[test]
fn hlc_as_tuple() {
let clock = HybridClock::with_values("test", 100, 5);
assert_eq!(clock.as_tuple(), (100, 5));
}
}