use crate::store::StoreError;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Instant;
pub trait Clock: Send + Sync {
fn now_us(&self) -> i64;
fn now_wall_ns(&self) -> i64;
fn now_mono_ns(&self) -> i64;
fn process_boot_ns(&self) -> u64;
}
fn system_now_us() -> i64 {
let micros = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros();
i64::try_from(micros).unwrap_or(i64::MAX)
}
pub(crate) fn wall_ms_from_timestamp_us(timestamp_us: i64) -> Result<u64, StoreError> {
if timestamp_us < 0 {
return Err(StoreError::InvalidClock {
timestamp_us,
reason: "timestamp_us must be >= 0 microseconds since Unix epoch".into(),
});
}
Ok((timestamp_us / 1000).cast_unsigned())
}
fn system_now_wall_ns_saturating() -> i64 {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
i64::try_from(nanos).unwrap_or(i64::MAX)
}
pub(crate) struct MonotonicAnchor {
anchor_instant: Instant,
anchor_boot_ns: u64,
}
impl MonotonicAnchor {
fn get() -> &'static Self {
static ANCHOR: OnceLock<MonotonicAnchor> = OnceLock::new();
ANCHOR.get_or_init(|| {
let wall_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let anchor_boot_ns = u64::try_from(wall_ns).unwrap_or(u64::MAX);
MonotonicAnchor {
anchor_instant: Instant::now(),
anchor_boot_ns,
}
})
}
fn now_mono_ns(&self) -> i64 {
let elapsed = self.anchor_instant.elapsed().as_nanos();
i64::try_from(elapsed).unwrap_or(i64::MAX)
}
}
#[derive(Clone)]
pub struct SystemClock {
anchor: &'static MonotonicAnchor,
}
impl SystemClock {
pub fn new() -> Self {
Self {
anchor: MonotonicAnchor::get(),
}
}
}
impl Default for SystemClock {
fn default() -> Self {
Self::new()
}
}
impl Clock for SystemClock {
fn now_us(&self) -> i64 {
system_now_us()
}
fn now_wall_ns(&self) -> i64 {
system_now_wall_ns_saturating()
}
fn now_mono_ns(&self) -> i64 {
self.anchor.now_mono_ns()
}
fn process_boot_ns(&self) -> u64 {
self.anchor.anchor_boot_ns
}
}
struct FnClock {
inner: Arc<dyn Fn() -> i64 + Send + Sync>,
anchor: &'static MonotonicAnchor,
}
impl FnClock {
fn new(inner: Arc<dyn Fn() -> i64 + Send + Sync>) -> Self {
Self {
inner,
anchor: MonotonicAnchor::get(),
}
}
}
impl Clock for FnClock {
fn now_us(&self) -> i64 {
(self.inner)()
}
fn now_wall_ns(&self) -> i64 {
self.now_us().saturating_mul(1000)
}
fn now_mono_ns(&self) -> i64 {
self.anchor.now_mono_ns()
}
fn process_boot_ns(&self) -> u64 {
self.anchor.anchor_boot_ns
}
}
pub(crate) fn clock_from_fn(inner: Arc<dyn Fn() -> i64 + Send + Sync>) -> Arc<dyn Clock> {
Arc::new(FnClock::new(inner))
}
#[derive(Clone)]
pub(crate) struct MonotonicClock {
inner: Arc<dyn Clock>,
last: Arc<AtomicI64>,
last_wall_ns: Arc<AtomicI64>,
}
impl MonotonicClock {
pub(crate) fn wrap(inner: Arc<dyn Clock>) -> Self {
Self {
inner,
last: Arc::new(AtomicI64::new(i64::MIN)),
last_wall_ns: Arc::new(AtomicI64::new(i64::MIN)),
}
}
fn clamp_non_decreasing(slot: &AtomicI64, raw: i64, what: &str) -> i64 {
loop {
let prev = slot.load(Ordering::Acquire);
if raw >= prev {
match slot.compare_exchange(prev, raw, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => return raw,
Err(_) => continue, }
} else {
tracing::error!("user clock regressed ({}): prev={} new={}", what, prev, raw);
return prev;
}
}
}
pub(crate) fn now_us(&self) -> i64 {
Self::clamp_non_decreasing(&self.last, self.inner.now_us(), "us")
}
}
impl Clock for MonotonicClock {
fn now_us(&self) -> i64 {
MonotonicClock::now_us(self)
}
fn now_wall_ns(&self) -> i64 {
Self::clamp_non_decreasing(&self.last_wall_ns, self.inner.now_wall_ns(), "wall_ns")
}
fn now_mono_ns(&self) -> i64 {
self.inner.now_mono_ns()
}
fn process_boot_ns(&self) -> u64 {
self.inner.process_boot_ns()
}
}
#[cfg(test)]
mod tests {
use super::{Clock, FnClock, MonotonicClock, SystemClock};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
struct AdjustableClock {
us: AtomicI64,
wall_ns: AtomicI64,
}
impl AdjustableClock {
fn new(us: i64, wall_ns: i64) -> Arc<Self> {
Arc::new(Self {
us: AtomicI64::new(us),
wall_ns: AtomicI64::new(wall_ns),
})
}
fn set_us(&self, v: i64) {
self.us.store(v, Ordering::SeqCst);
}
fn set_wall_ns(&self, v: i64) {
self.wall_ns.store(v, Ordering::SeqCst);
}
}
impl Clock for AdjustableClock {
fn now_us(&self) -> i64 {
self.us.load(Ordering::SeqCst)
}
fn now_wall_ns(&self) -> i64 {
self.wall_ns.load(Ordering::SeqCst)
}
fn now_mono_ns(&self) -> i64 {
0
}
fn process_boot_ns(&self) -> u64 {
0
}
}
#[test]
fn fn_clock_preserves_negative_wall_values_but_not_monotonic_time() {
let system = SystemClock::new();
let mut floor = system.now_mono_ns();
while floor <= 1 {
floor = system.now_mono_ns();
}
let clock = FnClock::new(Arc::new(|| -7));
assert_eq!(
clock.now_us(),
-7,
"PROPERTY: FnClock must expose malformed caller wall time for validation"
);
assert_eq!(
clock.now_wall_ns(),
-7_000,
"PROPERTY: wall nanoseconds come from the caller wall clock, not the monotonic anchor"
);
let mono = clock.now_mono_ns();
let ceiling = system.now_mono_ns();
assert!(
floor <= mono && mono <= ceiling,
"PROPERTY: process-local monotonic evidence reads the shared process anchor \
(floor {floor} <= mono {mono} <= ceiling {ceiling}), never an echo of the \
negative caller wall clock or a stubbed constant"
);
}
#[test]
fn now_wall_ns_clamps_on_regression() {
let inner = AdjustableClock::new(1_000, 5_000_000_000);
let mono = MonotonicClock::wrap(Arc::clone(&inner) as Arc<dyn Clock>);
assert_eq!(
mono.now_wall_ns(),
5_000_000_000,
"PROPERTY: forward wall clock passes through unchanged"
);
inner.set_wall_ns(1_000_000_000);
assert_eq!(
mono.now_wall_ns(),
5_000_000_000,
"PROPERTY: a regressing wall clock stalls at the highest value seen, never moving backward"
);
inner.set_wall_ns(6_000_000_000);
assert_eq!(
mono.now_wall_ns(),
6_000_000_000,
"PROPERTY: forward progress past the high-water mark resumes"
);
}
#[test]
fn now_us_and_now_wall_ns_clamp_states_are_independent() {
let inner = AdjustableClock::new(1_000, 5_000_000_000);
let mono = MonotonicClock::wrap(Arc::clone(&inner) as Arc<dyn Clock>);
assert_eq!(mono.now_us(), 1_000);
assert_eq!(mono.now_wall_ns(), 5_000_000_000);
inner.set_us(500);
inner.set_wall_ns(9_000_000_000);
assert_eq!(
mono.now_us(),
1_000,
"PROPERTY: regressed us sequence stalls at its own high-water mark"
);
assert_eq!(
mono.now_wall_ns(),
9_000_000_000,
"PROPERTY: wall_ns is not contaminated by the us regression (separate atomic)"
);
inner.set_us(2_000);
inner.set_wall_ns(1_000_000_000);
assert_eq!(
mono.now_us(),
2_000,
"PROPERTY: us is not contaminated by the wall_ns regression (separate atomic)"
);
assert_eq!(
mono.now_wall_ns(),
9_000_000_000,
"PROPERTY: regressed wall_ns sequence stalls at its own high-water mark"
);
}
#[test]
fn system_clock_now_wall_ns_reports_real_wall_time() {
const EARLIEST_PLAUSIBLE_WALL_NS: i64 = 1_600_000_000_000_000_000;
let reading = SystemClock::new().now_wall_ns();
assert!(
reading > EARLIEST_PLAUSIBLE_WALL_NS,
"PROPERTY: SystemClock::now_wall_ns must read the real wall clock \
(>{EARLIEST_PLAUSIBLE_WALL_NS} ns), got {reading}"
);
}
#[test]
fn monotonic_clock_process_boot_ns_delegates_real_anchor() {
const EARLIEST_PLAUSIBLE_BOOT_NS: u64 = 1_600_000_000_000_000_000;
let inner = Arc::new(SystemClock::new());
let inner_boot = inner.process_boot_ns();
let mono = MonotonicClock::wrap(Arc::clone(&inner) as Arc<dyn Clock>);
let wrapped_boot = mono.process_boot_ns();
let mut failures: Vec<String> = Vec::new();
if wrapped_boot != inner_boot {
failures.push(format!(
"PROPERTY: MonotonicClock::process_boot_ns must delegate to the \
wrapped clock unchanged: wrapped={wrapped_boot}, inner={inner_boot}"
));
}
if wrapped_boot <= EARLIEST_PLAUSIBLE_BOOT_NS {
failures.push(format!(
"PROPERTY: the process boot marker is the real anchor wall-ns \
(>{EARLIEST_PLAUSIBLE_BOOT_NS}), never a trivial stub: got {wrapped_boot}"
));
}
assert!(failures.is_empty(), "{failures:?}");
}
}