use core::cmp;
use crate::NodeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct HybridTimestamp {
pub physical: u64,
pub logical: u16,
pub node_id: u16,
}
impl HybridTimestamp {
pub fn zero() -> Self {
Self {
physical: 0,
logical: 0,
node_id: 0,
}
}
pub fn to_u128(&self) -> u128 {
((self.physical as u128) << 64)
| ((self.logical as u128) << 48)
| ((self.node_id as u128) << 32)
}
}
impl Ord for HybridTimestamp {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.physical
.cmp(&other.physical)
.then(self.logical.cmp(&other.logical))
.then(self.node_id.cmp(&other.node_id))
}
}
impl PartialOrd for HybridTimestamp {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone)]
pub struct HybridClock {
node_id: u16,
last: HybridTimestamp,
physical_time_fn: fn() -> u64,
}
#[cfg(feature = "std")]
fn system_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(not(feature = "std"))]
fn fallback_time_ms() -> u64 {
0 }
impl HybridClock {
pub fn new(node_id: NodeId) -> Self {
Self {
node_id: node_id as u16,
last: HybridTimestamp::zero(),
#[cfg(feature = "std")]
physical_time_fn: system_time_ms,
#[cfg(not(feature = "std"))]
physical_time_fn: fallback_time_ms,
}
}
pub fn with_time_source(node_id: NodeId, time_fn: fn() -> u64) -> Self {
Self {
node_id: node_id as u16,
last: HybridTimestamp::zero(),
physical_time_fn: time_fn,
}
}
pub fn now(&mut self) -> HybridTimestamp {
let pt = (self.physical_time_fn)();
if pt > self.last.physical {
self.last = HybridTimestamp {
physical: pt,
logical: 0,
node_id: self.node_id,
};
} else {
self.last = HybridTimestamp {
physical: self.last.physical,
logical: self.last.logical + 1,
node_id: self.node_id,
};
}
self.last
}
pub fn receive(&mut self, remote: &HybridTimestamp) -> HybridTimestamp {
let pt = (self.physical_time_fn)();
let max_pt = cmp::max(cmp::max(pt, self.last.physical), remote.physical);
let logical = if max_pt == self.last.physical && max_pt == remote.physical {
cmp::max(self.last.logical, remote.logical) + 1
} else if max_pt == self.last.physical {
self.last.logical + 1
} else if max_pt == remote.physical {
remote.logical + 1
} else {
0
};
self.last = HybridTimestamp {
physical: max_pt,
logical,
node_id: self.node_id,
};
self.last
}
pub fn node_id(&self) -> u16 {
self.node_id
}
pub fn last_timestamp(&self) -> HybridTimestamp {
self.last
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::sync::atomic::{AtomicU64, Ordering};
static MOCK_TIME: AtomicU64 = AtomicU64::new(1000);
fn mock_time() -> u64 {
MOCK_TIME.load(Ordering::SeqCst)
}
fn set_mock_time(ms: u64) {
MOCK_TIME.store(ms, Ordering::SeqCst);
}
#[test]
fn monotonic_within_same_ms() {
set_mock_time(5000);
let mut clock = HybridClock::with_time_source(1, mock_time);
let ts1 = clock.now();
let ts2 = clock.now();
let ts3 = clock.now();
assert!(ts1 < ts2);
assert!(ts2 < ts3);
assert_eq!(ts1.physical, 5000);
assert_eq!(ts1.logical, 0);
assert_eq!(ts2.logical, 1);
assert_eq!(ts3.logical, 2);
}
#[test]
fn physical_time_advance_resets_logical() {
set_mock_time(1000);
let mut clock = HybridClock::with_time_source(1, mock_time);
let ts1 = clock.now();
assert_eq!(ts1.logical, 0);
let _ts2 = clock.now();
set_mock_time(2000);
let ts3 = clock.now();
assert_eq!(ts3.physical, 2000);
assert_eq!(ts3.logical, 0);
}
#[test]
fn receive_advances_clock() {
set_mock_time(1000);
let mut clock = HybridClock::with_time_source(1, mock_time);
let remote = HybridTimestamp {
physical: 5000,
logical: 3,
node_id: 2,
};
let ts = clock.receive(&remote);
assert!(ts > remote);
assert_eq!(ts.physical, 5000);
assert_eq!(ts.logical, 4); }
#[test]
fn receive_same_physical_time() {
set_mock_time(5000);
let mut clock = HybridClock::with_time_source(1, mock_time);
let _local = clock.now();
let remote = HybridTimestamp {
physical: 5000,
logical: 5,
node_id: 2,
};
let ts = clock.receive(&remote);
assert!(ts > remote);
assert_eq!(ts.physical, 5000);
assert_eq!(ts.logical, 6); }
#[test]
fn ordering_is_total() {
let a = HybridTimestamp {
physical: 1000,
logical: 0,
node_id: 1,
};
let b = HybridTimestamp {
physical: 1000,
logical: 0,
node_id: 2,
};
let c = HybridTimestamp {
physical: 1000,
logical: 1,
node_id: 1,
};
assert!(a < b); assert!(a < c); assert!(b < c); }
#[test]
fn to_u128_preserves_ordering() {
let a = HybridTimestamp {
physical: 1000,
logical: 5,
node_id: 1,
};
let b = HybridTimestamp {
physical: 1000,
logical: 6,
node_id: 1,
};
let c = HybridTimestamp {
physical: 1001,
logical: 0,
node_id: 1,
};
assert!(a.to_u128() < b.to_u128());
assert!(b.to_u128() < c.to_u128());
}
}