use crate::core::error::{StorageError, TemporalError};
use crate::core::temporal::MAX_VALID_TIMESTAMP;
#[cfg(test)]
use std::cell::Cell;
use std::sync::OnceLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct HybridTimestamp {
wallclock: i64,
logical: u32,
}
pub const MAX_BACKWARD_DRIFT_US: i64 = 5 * 60 * 1_000_000;
pub const MAX_FORWARD_JUMP_US: i64 = 60 * 60 * 1_000_000;
#[cfg(test)]
thread_local! {
static CLOCK_SKEW_AUTO_HEAL_TEST_OVERRIDE: Cell<Option<bool>> = const { Cell::new(None) };
}
#[cfg(test)]
#[derive(Debug)]
pub(crate) struct ClockSkewAutoHealTestGuard {
previous: Option<bool>,
}
#[cfg(test)]
impl ClockSkewAutoHealTestGuard {
pub(crate) fn force(enabled: bool) -> Self {
let previous = CLOCK_SKEW_AUTO_HEAL_TEST_OVERRIDE.with(|cell| {
let prev = cell.get();
cell.set(Some(enabled));
prev
});
Self { previous }
}
}
#[cfg(test)]
impl Drop for ClockSkewAutoHealTestGuard {
fn drop(&mut self) {
CLOCK_SKEW_AUTO_HEAL_TEST_OVERRIDE.with(|cell| cell.set(self.previous));
}
}
pub fn is_clock_skew_self_heal_enabled() -> bool {
#[cfg(test)]
if let Some(forced) = CLOCK_SKEW_AUTO_HEAL_TEST_OVERRIDE.with(|cell| cell.get()) {
return forced;
}
static CLOCK_SKEW_AUTO_HEAL: OnceLock<Option<bool>> = OnceLock::new();
CLOCK_SKEW_AUTO_HEAL
.get_or_init(|| {
std::env::var("ALETHEIADB_AUTO_HEAL_CLOCK_SKEW")
.ok()
.map(|value| {
matches!(
value.to_ascii_lowercase().as_str(),
"1" | "true" | "on" | "yes" | "enabled"
)
})
})
.unwrap_or(false)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClockSkewDirection {
Backward,
Forward,
}
impl ClockSkewDirection {
pub const fn as_str(self) -> &'static str {
match self {
Self::Backward => "backward",
Self::Forward => "forward",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ClockSkewDecision {
pub drift_us: i64,
pub effective_wallclock: i64,
pub healed_direction: Option<ClockSkewDirection>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ClockSkewViolation {
pub drift_us: i64,
pub max_allowed: i64,
pub direction: ClockSkewDirection,
}
pub fn evaluate_clock_skew(
current_wallclock: i64,
frontier_wallclock: i64,
max_forward_jump_us: Option<i64>,
self_heal_clock_skew: bool,
) -> Result<ClockSkewDecision, ClockSkewViolation> {
let drift = current_wallclock.saturating_sub(frontier_wallclock);
let mut effective_wallclock = current_wallclock;
let mut healed_direction = None;
if drift < -MAX_BACKWARD_DRIFT_US {
if !self_heal_clock_skew {
return Err(ClockSkewViolation {
drift_us: drift,
max_allowed: -MAX_BACKWARD_DRIFT_US,
direction: ClockSkewDirection::Backward,
});
}
effective_wallclock = frontier_wallclock;
healed_direction = Some(ClockSkewDirection::Backward);
}
if let Some(max_forward_jump_us) = max_forward_jump_us
&& drift > max_forward_jump_us
{
if !self_heal_clock_skew {
return Err(ClockSkewViolation {
drift_us: drift,
max_allowed: max_forward_jump_us,
direction: ClockSkewDirection::Forward,
});
}
effective_wallclock = frontier_wallclock;
healed_direction = Some(ClockSkewDirection::Forward);
}
Ok(ClockSkewDecision {
drift_us: drift,
effective_wallclock,
healed_direction,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SendWithSelfHealError {
InitialSend(TemporalError),
FallbackWallclockOverflow {
wallclock: i64,
current_logical: u32,
},
FallbackSend(TemporalError),
}
pub fn send_with_overflow_self_heal<E>(
frontier: &HybridTimestamp,
effective_wallclock: i64,
self_heal_clock_skew: bool,
map_error: impl Fn(SendWithSelfHealError) -> E,
) -> Result<HybridTimestamp, E> {
match frontier.send(effective_wallclock) {
Ok(next) => Ok(next),
Err(TemporalError::LogicalCounterOverflow {
wallclock,
current_logical,
}) if self_heal_clock_skew => {
let fallback_wallclock = wallclock.checked_add(1).ok_or_else(|| {
map_error(SendWithSelfHealError::FallbackWallclockOverflow {
wallclock,
current_logical,
})
})?;
frontier
.send(fallback_wallclock)
.map_err(|error| map_error(SendWithSelfHealError::FallbackSend(error)))
}
Err(error) => Err(map_error(SendWithSelfHealError::InitialSend(error))),
}
}
impl HybridTimestamp {
#[inline]
pub fn new(wallclock: i64, logical: u32) -> Result<Self, TemporalError> {
if wallclock > MAX_VALID_TIMESTAMP {
let invalid_ts = HybridTimestamp { wallclock, logical };
return Err(TemporalError::InvalidTimestamp {
timestamp: invalid_ts,
reason: format!(
"Wallclock {} exceeds MAX_VALID_TIMESTAMP ({})",
wallclock, MAX_VALID_TIMESTAMP
),
});
}
Ok(HybridTimestamp { wallclock, logical })
}
#[allow(dead_code)] #[inline]
pub(crate) const fn new_unchecked(wallclock: i64, logical: u32) -> Self {
HybridTimestamp { wallclock, logical }
}
#[inline]
pub const fn wallclock(&self) -> i64 {
self.wallclock
}
#[inline]
pub const fn logical(&self) -> u32 {
self.logical
}
#[inline]
pub const fn as_secs(&self) -> i64 {
self.wallclock / 1_000_000
}
#[inline]
pub const fn as_millis(&self) -> i64 {
self.wallclock / 1_000
}
#[inline]
fn increment_logical(logical: u32, wallclock: i64) -> Result<u32, TemporalError> {
logical
.checked_add(1)
.ok_or(TemporalError::LogicalCounterOverflow {
wallclock,
current_logical: logical,
})
}
#[inline]
pub fn send(&self, new_wallclock: i64) -> Result<Self, TemporalError> {
if new_wallclock > MAX_VALID_TIMESTAMP {
return Err(TemporalError::InvalidTimestamp {
timestamp: Self {
wallclock: new_wallclock,
logical: 0,
},
reason: format!(
"Send wallclock {} exceeds MAX_VALID_TIMESTAMP ({})",
new_wallclock, MAX_VALID_TIMESTAMP
),
});
}
if new_wallclock > self.wallclock {
Ok(HybridTimestamp {
wallclock: new_wallclock,
logical: 0,
})
} else {
let logical = Self::increment_logical(self.logical, self.wallclock)?;
Ok(HybridTimestamp {
wallclock: self.wallclock,
logical,
})
}
}
#[inline]
pub fn receive(
&self,
msg: HybridTimestamp,
physical_wallclock: i64,
) -> Result<Self, TemporalError> {
let new_wallclock = self.wallclock.max(msg.wallclock).max(physical_wallclock);
if new_wallclock > MAX_VALID_TIMESTAMP {
return Err(TemporalError::InvalidTimestamp {
timestamp: Self {
wallclock: new_wallclock,
logical: 0,
},
reason: format!(
"Receive wallclock {} exceeds MAX_VALID_TIMESTAMP ({})",
new_wallclock, MAX_VALID_TIMESTAMP
),
});
}
let logical = if new_wallclock > self.wallclock && new_wallclock > msg.wallclock {
0
} else if new_wallclock == self.wallclock && new_wallclock == msg.wallclock {
Self::increment_logical(self.logical.max(msg.logical), new_wallclock)?
} else if new_wallclock == self.wallclock {
Self::increment_logical(self.logical, new_wallclock)?
} else {
Self::increment_logical(msg.logical, new_wallclock)?
};
Ok(HybridTimestamp {
wallclock: new_wallclock,
logical,
})
}
pub fn serialize(&self) -> Vec<u8> {
let mut buffer = Vec::with_capacity(12);
self.serialize_into(&mut buffer);
buffer
}
pub fn serialize_into(&self, buffer: &mut Vec<u8>) {
buffer.extend_from_slice(&self.wallclock.to_le_bytes());
buffer.extend_from_slice(&self.logical.to_le_bytes());
}
pub fn deserialize(bytes: &[u8]) -> Result<(Self, usize), StorageError> {
if bytes.len() < 12 {
return Err(StorageError::CorruptedData(format!(
"Buffer too short for HybridTimestamp: {} bytes (need 12)",
bytes.len()
)));
}
let (wallclock_bytes, rest) = bytes.split_at(std::mem::size_of::<i64>());
let wallclock = i64::from_le_bytes(wallclock_bytes.try_into().unwrap());
let (logical_bytes, _) = rest.split_at(std::mem::size_of::<u32>());
let logical = u32::from_le_bytes(logical_bytes.try_into().unwrap());
if wallclock > MAX_VALID_TIMESTAMP && wallclock != i64::MAX {
return Err(StorageError::CorruptedData(format!(
"Deserialized wallclock {} exceeds MAX_VALID_TIMESTAMP ({})",
wallclock, MAX_VALID_TIMESTAMP
)));
}
if wallclock == i64::MAX && logical != 0 {
return Err(StorageError::CorruptedData(format!(
"Deserialized timestamp has wallclock i64::MAX but non-zero logical counter: {}",
logical
)));
}
Ok((HybridTimestamp { wallclock, logical }, 12))
}
}
impl std::fmt::Display for HybridTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.wallclock, self.logical)
}
}
impl From<i64> for HybridTimestamp {
fn from(wallclock: i64) -> Self {
HybridTimestamp::new_unchecked(wallclock, 0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_validation() {
assert!(HybridTimestamp::new(1000, 0).is_ok());
assert!(HybridTimestamp::new(MAX_VALID_TIMESTAMP, 0).is_ok());
let result = HybridTimestamp::new(MAX_VALID_TIMESTAMP + 1, 0);
assert!(result.is_err());
match result {
Err(TemporalError::InvalidTimestamp { .. }) => (),
_ => panic!("Expected InvalidTimestamp error"),
}
}
#[test]
fn test_receive_collision_oracle() {
let local = HybridTimestamp::new(1000, 10).unwrap();
let msg = HybridTimestamp::new(1000, 20).unwrap();
let next = local.receive(msg, 1000).unwrap();
assert_eq!(next.wallclock(), 1000);
assert_eq!(next.logical(), 21);
let local = HybridTimestamp::new(1000, 20).unwrap();
let msg = HybridTimestamp::new(1000, 10).unwrap();
let next = local.receive(msg, 1000).unwrap();
assert_eq!(next.wallclock(), 1000);
assert_eq!(next.logical(), 21);
}
#[test]
fn test_send_logic() {
let ts = HybridTimestamp::new(1000, 5).unwrap();
let next = ts.send(1001).unwrap();
assert_eq!(next.wallclock(), 1001);
assert_eq!(next.logical(), 0);
let next = ts.send(1000).unwrap();
assert_eq!(next.wallclock(), 1000);
assert_eq!(next.logical(), 6);
let next = ts.send(999).unwrap();
assert_eq!(next.wallclock(), 1000);
assert_eq!(next.logical(), 6);
}
#[test]
fn test_send_overflow() {
let ts = HybridTimestamp::new(1000, u32::MAX).unwrap();
let result = ts.send(1000);
assert!(matches!(
result,
Err(TemporalError::LogicalCounterOverflow { .. })
));
let result = ts.send(1001);
assert!(result.is_ok());
assert_eq!(result.unwrap().logical(), 0);
}
#[test]
fn test_evaluate_clock_skew_backward_violation_without_self_heal() {
let current_wallclock = 1_000_000;
let frontier_wallclock = current_wallclock + MAX_BACKWARD_DRIFT_US + 1;
let result = evaluate_clock_skew(
current_wallclock,
frontier_wallclock,
Some(MAX_FORWARD_JUMP_US),
false,
);
assert!(matches!(
result,
Err(ClockSkewViolation {
direction: ClockSkewDirection::Backward,
..
})
));
}
#[test]
fn test_evaluate_clock_skew_allows_unbounded_forward_drift() {
let current_wallclock = 10_000_000;
let frontier_wallclock = 1_000;
let result = evaluate_clock_skew(current_wallclock, frontier_wallclock, None, false)
.expect("forward drift should be ignored when no forward bound is provided");
assert_eq!(result.effective_wallclock, current_wallclock);
assert!(result.healed_direction.is_none());
}
#[test]
fn test_send_with_overflow_self_heal_uses_fallback_wallclock() {
let ts = HybridTimestamp::new(1000, u32::MAX).unwrap();
let next = send_with_overflow_self_heal(&ts, 1000, true, |error| error)
.expect("self-heal should bump wallclock and recover from logical overflow");
assert_eq!(next.wallclock(), 1001);
assert_eq!(next.logical(), 0);
}
#[test]
fn test_send_with_overflow_self_heal_reports_initial_error_when_disabled() {
let ts = HybridTimestamp::new(1000, u32::MAX).unwrap();
let err = send_with_overflow_self_heal(&ts, 1000, false, |error| error)
.expect_err("overflow should fail when self-heal is disabled");
assert!(matches!(
err,
SendWithSelfHealError::InitialSend(TemporalError::LogicalCounterOverflow { .. })
));
}
#[test]
fn test_send_with_overflow_self_heal_reports_fallback_wallclock_overflow() {
let ts = HybridTimestamp::new_unchecked(i64::MAX, u32::MAX);
let err = send_with_overflow_self_heal(&ts, MAX_VALID_TIMESTAMP, true, |error| error)
.expect_err("fallback wallclock increment should fail at i64::MAX");
assert!(matches!(
err,
SendWithSelfHealError::FallbackWallclockOverflow {
wallclock: i64::MAX,
current_logical: u32::MAX
}
));
}
#[test]
fn test_receive_logic() {
let local = HybridTimestamp::new(1000, 10).unwrap();
let msg = HybridTimestamp::new(1000, 20).unwrap();
let next = local.receive(msg, 2000).unwrap();
assert_eq!(next.wallclock(), 2000);
assert_eq!(next.logical(), 0);
let msg = HybridTimestamp::new(1000, 20).unwrap();
let next = local.receive(msg, 1000).unwrap();
assert_eq!(next.wallclock(), 1000);
assert_eq!(next.logical(), 21);
let msg = HybridTimestamp::new(900, 5).unwrap();
let next = local.receive(msg, 950).unwrap();
assert_eq!(next.wallclock(), 1000);
assert_eq!(next.logical(), 11);
let msg = HybridTimestamp::new(1500, 5).unwrap();
let next = local.receive(msg, 1200).unwrap();
assert_eq!(next.wallclock(), 1500);
assert_eq!(next.logical(), 6); }
#[test]
fn test_receive_overflow() {
let local = HybridTimestamp::new(1000, u32::MAX).unwrap();
let msg = HybridTimestamp::new(1000, 5).unwrap();
let result = local.receive(msg, 1000);
assert!(matches!(
result,
Err(TemporalError::LogicalCounterOverflow { .. })
));
}
#[test]
fn test_serialization() {
let ts = HybridTimestamp::new(123456789, 42).unwrap();
let bytes = ts.serialize();
assert_eq!(bytes.len(), 12);
let (deserialized, consumed) = HybridTimestamp::deserialize(&bytes).unwrap();
assert_eq!(deserialized, ts);
assert_eq!(consumed, 12);
}
#[test]
fn test_deserialize_validation() {
let bytes = vec![0u8; 11];
assert!(matches!(
HybridTimestamp::deserialize(&bytes),
Err(StorageError::CorruptedData(_))
));
let invalid_ts = HybridTimestamp::new_unchecked(MAX_VALID_TIMESTAMP + 1, 0);
let bytes = invalid_ts.serialize();
assert!(matches!(
HybridTimestamp::deserialize(&bytes),
Err(StorageError::CorruptedData(_))
));
let sentinel = HybridTimestamp::new_unchecked(i64::MAX, 0);
let bytes = sentinel.serialize();
assert!(HybridTimestamp::deserialize(&bytes).is_ok());
}
#[test]
fn test_as_secs_and_millis() {
let secs = 1234567890;
let ts = HybridTimestamp::new(secs * 1_000_000, 0).unwrap();
assert_eq!(ts.as_secs(), secs);
assert_eq!(ts.as_millis(), secs * 1000);
let millis = 1234567890123;
let ts = HybridTimestamp::new(millis * 1000, 0).unwrap();
assert_eq!(ts.as_secs(), millis / 1000);
assert_eq!(ts.as_millis(), millis);
let micros = 1234567890123456;
let ts = HybridTimestamp::new(micros, 0).unwrap();
assert_eq!(ts.as_secs(), micros / 1_000_000);
assert_eq!(ts.as_millis(), micros / 1000);
}
#[test]
fn test_receive_when_physical_equals_local_and_ahead_of_msg() {
let local = HybridTimestamp::new(1000, 5).unwrap();
let msg = HybridTimestamp::new(900, 0).unwrap();
let physical = 1000;
let result = local.receive(msg, physical).unwrap();
assert_eq!(result.wallclock(), 1000);
assert_eq!(
result.logical(),
6,
"Logical counter should increment, not reset"
);
assert!(
result > local,
"Result should be strictly greater than local"
);
}
#[test]
fn test_evaluate_clock_skew_self_heal_backward() {
let current_wallclock = 1_000_000;
let frontier_wallclock = current_wallclock + MAX_BACKWARD_DRIFT_US + 1;
let result = evaluate_clock_skew(
current_wallclock,
frontier_wallclock,
None,
true, )
.expect("backward drift should be healed");
assert_eq!(result.effective_wallclock, frontier_wallclock);
assert_eq!(result.healed_direction, Some(ClockSkewDirection::Backward));
}
#[test]
fn test_evaluate_clock_skew_self_heal_forward() {
let current_wallclock = 1_000_000;
let max_jump = 10_000;
let frontier_wallclock = current_wallclock - max_jump - 1;
let result = evaluate_clock_skew(
current_wallclock,
frontier_wallclock,
Some(max_jump),
true, )
.expect("forward drift should be healed");
assert_eq!(result.effective_wallclock, frontier_wallclock);
assert_eq!(result.healed_direction, Some(ClockSkewDirection::Forward));
}
#[test]
fn test_send_with_overflow_self_heal_fallback_send_error() {
let ts = HybridTimestamp::new_unchecked(MAX_VALID_TIMESTAMP, u32::MAX);
let err = send_with_overflow_self_heal(&ts, MAX_VALID_TIMESTAMP, true, |error| error)
.expect_err("should return FallbackSend error");
assert!(matches!(
err,
SendWithSelfHealError::FallbackSend(TemporalError::InvalidTimestamp { .. })
));
}
#[test]
fn test_receive_ignores_msg_logical_when_wallclock_behind() {
let local = HybridTimestamp::new(1000, 10).unwrap();
let msg = HybridTimestamp::new(900, 999).unwrap(); let physical = 1000;
let result = local.receive(msg, physical).unwrap();
assert_eq!(result.wallclock(), 1000);
assert_eq!(
result.logical(),
11,
"Should increment local logical, ignoring msg logical"
);
}
#[test]
fn test_evaluate_clock_skew_exact_boundaries() {
let current = 1_000_000;
let max_jump = 100;
let frontier_forward = current - max_jump; let result_forward = evaluate_clock_skew(current, frontier_forward, Some(max_jump), false);
assert!(
result_forward.is_ok(),
"Exact max forward jump should be allowed"
);
let frontier_backward = current + MAX_BACKWARD_DRIFT_US;
let result_backward = evaluate_clock_skew(current, frontier_backward, None, false);
assert!(
result_backward.is_ok(),
"Exact max backward drift should be allowed"
);
}
#[test]
fn test_evaluate_clock_skew_overflow_protection() {
let current = i64::MAX;
let frontier = i64::MIN;
let max_forward = Some(MAX_FORWARD_JUMP_US);
let result = evaluate_clock_skew(current, frontier, max_forward, false);
assert!(
result.is_err(),
"Should reject extreme clock jump that wraps around arithmetic limits. Got: {:?} with drift {}",
result,
result.as_ref().map(|d| d.drift_us).unwrap_or(0)
);
}
#[test]
fn test_is_clock_skew_self_heal_enabled_override() {
let default = is_clock_skew_self_heal_enabled();
let _guard_true = ClockSkewAutoHealTestGuard::force(true);
assert!(is_clock_skew_self_heal_enabled());
drop(_guard_true);
let _guard_false = ClockSkewAutoHealTestGuard::force(false);
assert!(!is_clock_skew_self_heal_enabled());
drop(_guard_false);
assert_eq!(is_clock_skew_self_heal_enabled(), default);
}
#[test]
fn test_clock_skew_direction_as_str() {
assert_eq!(ClockSkewDirection::Backward.as_str(), "backward");
assert_eq!(ClockSkewDirection::Forward.as_str(), "forward");
}
#[test]
fn test_hybrid_timestamp_display() {
let ts = HybridTimestamp::new(123456789, 42).unwrap();
assert_eq!(ts.to_string(), "123456789.42");
}
#[test]
fn test_hybrid_timestamp_as_secs_millis_exact() {
let ts = HybridTimestamp::new(5_000_000, 0).unwrap();
assert_eq!(ts.as_secs(), 5);
assert_eq!(ts.as_millis(), 5_000);
let ts2 = HybridTimestamp::new(1_234_567, 0).unwrap();
assert_eq!(ts2.as_secs(), 1); assert_eq!(ts2.as_millis(), 1_234);
}
#[test]
fn test_hybrid_timestamp_receive_exact_wallclock_logic() {
let local1 = HybridTimestamp::new(1000, 10).unwrap();
let msg1 = HybridTimestamp::new(1000, 20).unwrap();
let next1 = local1.receive(msg1, 2000).unwrap();
assert_eq!(next1.wallclock(), 2000);
assert_eq!(next1.logical(), 0);
let local2 = HybridTimestamp::new(1000, 10).unwrap();
let msg2 = HybridTimestamp::new(1000, 20).unwrap();
let next2 = local2.receive(msg2, 1000).unwrap();
assert_eq!(next2.wallclock(), 1000);
assert_eq!(next2.logical(), 21);
let local3 = HybridTimestamp::new(2000, 10).unwrap();
let msg3 = HybridTimestamp::new(1000, 20).unwrap();
let next3 = local3.receive(msg3, 1000).unwrap();
assert_eq!(next3.wallclock(), 2000);
assert_eq!(next3.logical(), 11);
let local4 = HybridTimestamp::new(1000, 10).unwrap();
let msg4 = HybridTimestamp::new(2000, 20).unwrap();
let next4 = local4.receive(msg4, 1000).unwrap();
assert_eq!(next4.wallclock(), 2000);
assert_eq!(next4.logical(), 21);
assert_eq!(next3.logical(), 11);
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
fn valid_wallclock() -> impl Strategy<Value = i64> {
0..=MAX_VALID_TIMESTAMP
}
fn valid_timestamp() -> impl Strategy<Value = HybridTimestamp> {
(valid_wallclock(), any::<u32>()).prop_map(|(w, l)| HybridTimestamp::new(w, l).unwrap())
}
proptest! {
#[test]
fn prop_ordering_lexicographic(
w1 in valid_wallclock(), l1 in any::<u32>(),
w2 in valid_wallclock(), l2 in any::<u32>()
) {
let t1 = HybridTimestamp::new(w1, l1).unwrap();
let t2 = HybridTimestamp::new(w2, l2).unwrap();
prop_assert_eq!(t1.cmp(&t2), (w1, l1).cmp(&(w2, l2)));
}
#[test]
fn prop_send_monotonicity(
current in valid_timestamp(),
new_wallclock in valid_wallclock()
) {
match current.send(new_wallclock) {
Ok(next) => {
prop_assert!(next > current);
prop_assert!(next.wallclock() >= new_wallclock);
prop_assert!(next.wallclock() >= current.wallclock());
}
Err(e) => {
prop_assert!(
matches!(e, TemporalError::LogicalCounterOverflow { .. }),
"Unexpected error: {:?}",
e
);
}
}
}
#[test]
fn prop_receive_causality(
local in valid_timestamp(),
msg in valid_timestamp(),
physical in valid_wallclock()
) {
match local.receive(msg, physical) {
Ok(next) => {
prop_assert!(next > local, "next > local");
prop_assert!(next > msg, "next > msg");
prop_assert!(next.wallclock() >= local.wallclock());
prop_assert!(next.wallclock() >= msg.wallclock());
prop_assert!(next.wallclock() >= physical);
}
Err(e) => {
prop_assert!(
matches!(e, TemporalError::LogicalCounterOverflow { .. }),
"Unexpected error: {:?}",
e
);
}
}
}
#[test]
fn prop_serialization_roundtrip(ts in valid_timestamp()) {
let bytes = ts.serialize();
let (deserialized, consumed) = HybridTimestamp::deserialize(&bytes).unwrap();
prop_assert_eq!(ts, deserialized);
prop_assert_eq!(consumed, 12);
}
#[test]
fn prop_send_rejects_invalid(
ts in valid_timestamp(),
invalid_wc in (MAX_VALID_TIMESTAMP + 1)..i64::MAX
) {
let result = ts.send(invalid_wc);
let is_invalid = matches!(result, Err(TemporalError::InvalidTimestamp { .. }));
prop_assert!(is_invalid);
}
#[test]
fn prop_receive_rejects_invalid_physical(
local in valid_timestamp(),
msg in valid_timestamp(),
invalid_phy in (MAX_VALID_TIMESTAMP + 1)..i64::MAX
) {
let result = local.receive(msg, invalid_phy);
let is_invalid = matches!(result, Err(TemporalError::InvalidTimestamp { .. }));
prop_assert!(is_invalid);
}
#[test]
fn prop_receive_causality_collision(
wallclock in valid_wallclock(),
local_logical in any::<u32>(),
msg_logical in any::<u32>(),
) {
let local = HybridTimestamp::new(wallclock, local_logical).unwrap();
let msg = HybridTimestamp::new(wallclock, msg_logical).unwrap();
match local.receive(msg, wallclock) {
Ok(next) => {
prop_assert!(next > local, "next > local (collision)");
prop_assert!(next > msg, "next > msg (collision)");
prop_assert_eq!(next.wallclock(), wallclock);
let expected_logical = local_logical.max(msg_logical).checked_add(1);
if let Some(expected) = expected_logical {
prop_assert_eq!(next.logical(), expected);
} else {
prop_assert!(false, "Expected overflow error, but got Ok");
}
}
Err(e) => {
prop_assert!(
matches!(e, TemporalError::LogicalCounterOverflow { .. }),
"Unexpected error: {:?}",
e
);
}
}
}
}
}
#[cfg(test)]
mod sentinel_evaluate_clock_skew_tests {
use super::*;
#[test]
fn test_evaluate_clock_skew_exact_boundaries_strict() {
let current = 1_000_000;
let max_jump = 100;
let frontier_forward_strict = current - max_jump - 1; let result_forward_strict =
evaluate_clock_skew(current, frontier_forward_strict, Some(max_jump), false);
assert!(
result_forward_strict.is_err(),
"Exceeding max forward jump by 1 should be rejected"
);
let frontier_backward_strict = current + MAX_BACKWARD_DRIFT_US + 1; let result_backward_strict =
evaluate_clock_skew(current, frontier_backward_strict, None, false);
assert!(
result_backward_strict.is_err(),
"Exceeding max backward drift by 1 should be rejected"
);
let err = result_backward_strict.unwrap_err();
assert_eq!(err.direction, ClockSkewDirection::Backward);
let err_f = result_forward_strict.unwrap_err();
assert_eq!(err_f.direction, ClockSkewDirection::Forward);
}
}