use std::{
ops::Deref,
sync::{
OnceLock,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use crate::{
UnixNanos,
datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
};
pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
}
pub fn get_atomic_clock_static() -> &'static AtomicTime {
ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
}
#[inline(always)]
#[must_use]
pub fn duration_since_unix_epoch() -> Duration {
wall_clock_now()
.duration_since(UNIX_EPOCH)
.expect("Error calling `SystemTime`")
}
#[inline(always)]
#[must_use]
fn wall_clock_now() -> SystemTime {
#[cfg(not(all(feature = "simulation", madsim)))]
{
SystemTime::now()
}
#[cfg(all(feature = "simulation", madsim))]
{
match madsim::time::TimeHandle::try_current() {
Some(handle) => handle.now_time(),
None => SystemTime::now(),
}
}
}
#[inline(always)]
#[must_use]
#[expect(
clippy::cast_possible_truncation,
reason = "value is guarded by the assert above (ns <= u64::MAX)"
)]
pub fn nanos_since_unix_epoch() -> u64 {
let ns = duration_since_unix_epoch().as_nanos();
assert!(
ns <= u128::from(u64::MAX),
"System time overflow: value exceeds u64::MAX nanoseconds"
);
ns as u64
}
#[repr(C)]
#[derive(Debug)]
pub struct AtomicTime {
pub realtime: AtomicBool,
pub timestamp_ns: AtomicU64,
}
impl Deref for AtomicTime {
type Target = AtomicU64;
fn deref(&self) -> &Self::Target {
&self.timestamp_ns
}
}
impl Default for AtomicTime {
fn default() -> Self {
Self::new(true, UnixNanos::default())
}
}
impl AtomicTime {
#[must_use]
pub fn new(realtime: bool, time: UnixNanos) -> Self {
Self {
realtime: AtomicBool::new(realtime),
timestamp_ns: AtomicU64::new(time.into()),
}
}
#[must_use]
pub fn get_time_ns(&self) -> UnixNanos {
if self.realtime.load(Ordering::Acquire) {
self.time_since_epoch()
} else {
UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
}
}
#[must_use]
pub fn get_time_us(&self) -> u64 {
self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
}
#[must_use]
pub fn get_time_ms(&self) -> u64 {
self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
}
#[must_use]
#[expect(
clippy::cast_precision_loss,
reason = "Precision loss acceptable for time conversion"
)]
pub fn get_time(&self) -> f64 {
self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
}
pub fn set_time(&self, time: UnixNanos) {
assert!(
!self.realtime.load(Ordering::SeqCst),
"Cannot set time while clock is in realtime mode"
);
self.store(time.into(), Ordering::Release);
debug_assert!(
!self.realtime.load(Ordering::SeqCst),
"Invariant: clock must remain in static mode across `set_time`"
);
}
pub fn increment_time(&self, delta: u64) -> anyhow::Result<UnixNanos> {
anyhow::ensure!(
!self.realtime.load(Ordering::SeqCst),
"Cannot increment time while clock is in realtime mode"
);
let previous =
match self
.timestamp_ns
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
current.checked_add(delta)
}) {
Ok(prev) => prev,
Err(_) => anyhow::bail!("Cannot increment time beyond u64::MAX"),
};
debug_assert!(
!self.realtime.load(Ordering::SeqCst),
"Invariant: clock must remain in static mode across `increment_time`"
);
Ok(UnixNanos::from(previous + delta))
}
pub fn time_since_epoch(&self) -> UnixNanos {
let now = nanos_since_unix_epoch();
loop {
let last = self.load(Ordering::Acquire);
let incremented = last
.checked_add(1)
.expect("AtomicTime overflow: reached u64::MAX");
let next = now.max(incremented);
if self
.compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
debug_assert!(
next > last,
"Invariant: time is strictly monotonic across CAS"
);
return UnixNanos::from(next);
}
}
}
pub fn make_realtime(&self) {
if !self.realtime.swap(true, Ordering::SeqCst) {
self.timestamp_ns
.store(nanos_since_unix_epoch(), Ordering::Release);
}
}
pub fn make_static(&self) {
if self.realtime.swap(false, Ordering::SeqCst) {
self.timestamp_ns
.store(nanos_since_unix_epoch(), Ordering::Release);
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use rstest::*;
use super::*;
#[rstest]
fn test_global_clocks_initialization() {
let realtime_clock = get_atomic_clock_realtime();
assert!(realtime_clock.get_time_ns().as_u64() > 0);
let static_clock = get_atomic_clock_static();
static_clock.set_time(UnixNanos::from(500_000_000)); assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
}
#[rstest]
fn test_mode_switching() {
let time = AtomicTime::new(true, UnixNanos::default());
let realtime_ns = time.get_time_ns();
assert!(realtime_ns.as_u64() > 0);
time.make_static();
time.set_time(UnixNanos::from(1_000_000_000)); let static_ns = time.get_time_ns();
assert_eq!(static_ns.as_u64(), 1_000_000_000);
time.make_realtime();
let new_realtime_ns = time.get_time_ns();
assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
}
#[rstest]
#[should_panic(expected = "Cannot set time while clock is in realtime mode")]
fn test_set_time_panics_in_realtime_mode() {
let clock = AtomicTime::new(true, UnixNanos::default());
clock.set_time(UnixNanos::from(123));
}
#[rstest]
fn test_increment_time_returns_error_in_realtime_mode() {
let clock = AtomicTime::new(true, UnixNanos::default());
let result = clock.increment_time(1);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Cannot increment time while clock is in realtime mode")
);
}
#[rstest]
#[should_panic(expected = "AtomicTime overflow")]
fn test_time_since_epoch_overflow_panics() {
use std::sync::atomic::{AtomicBool, AtomicU64};
let clock = AtomicTime {
realtime: AtomicBool::new(true),
timestamp_ns: AtomicU64::new(u64::MAX),
};
let _ = clock.time_since_epoch();
}
#[rstest]
fn test_make_static_snapshots_wall_time() {
let clock = AtomicTime::new(true, UnixNanos::default());
clock.make_static();
let ts = clock.get_time_ns();
assert!(
ts.as_u64() > 1_650_000_000_000_000_000,
"Expected wall-clock snapshot, was {ts}"
);
}
#[rstest]
fn test_make_realtime_resets_future_timestamp() {
let clock = AtomicTime::new(false, UnixNanos::from(u64::MAX - 1_000));
clock.make_realtime();
let ts = clock.get_time_ns();
let now = nanos_since_unix_epoch();
assert!(
ts.as_u64() <= now + 1_000_000_000, "Expected wall-clock time, was {ts} (now={now})"
);
}
#[rstest]
fn test_make_static_idempotent() {
let clock = AtomicTime::new(false, UnixNanos::from(42));
clock.make_static();
assert_eq!(clock.get_time_ns(), UnixNanos::from(42));
}
#[rstest]
fn test_make_realtime_idempotent() {
let clock = AtomicTime::new(true, UnixNanos::default());
let ts1 = clock.get_time_ns();
clock.make_realtime(); let ts2 = clock.get_time_ns();
assert!(ts2 >= ts1);
}
#[rstest]
fn test_static_time_is_stable() {
let clock = AtomicTime::new(false, UnixNanos::from(42));
let time1 = clock.get_time_ns();
std::thread::sleep(std::time::Duration::from_millis(10));
let time2 = clock.get_time_ns();
assert_eq!(time1, time2);
}
#[rstest]
fn test_increment_time() {
let time = AtomicTime::new(false, UnixNanos::from(0));
let updated_time = time.increment_time(500).unwrap();
assert_eq!(updated_time.as_u64(), 500);
let updated_time = time.increment_time(1_000).unwrap();
assert_eq!(updated_time.as_u64(), 1_500);
}
#[rstest]
fn test_increment_time_overflow_errors() {
let time = AtomicTime::new(false, UnixNanos::from(u64::MAX - 5));
let err = time.increment_time(10).unwrap_err();
assert_eq!(err.to_string(), "Cannot increment time beyond u64::MAX");
}
#[rstest]
#[expect(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
reason = "Intentional cast for Python interop"
)]
fn test_nanos_since_unix_epoch_vs_system_time() {
let unix_nanos = nanos_since_unix_epoch();
let system_ns = duration_since_unix_epoch().as_nanos() as u64;
assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
}
#[rstest]
fn test_time_since_epoch_monotonicity() {
let clock = get_atomic_clock_realtime();
let mut previous = clock.time_since_epoch();
for _ in 0..1_000_000 {
let current = clock.time_since_epoch();
assert!(current > previous);
previous = current;
}
}
#[rstest]
fn test_time_since_epoch_strictly_increasing_concurrent() {
let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
let num_threads = 4;
let iterations = 100_000;
let mut handles = Vec::with_capacity(num_threads);
for thread_id in 0..num_threads {
let time_clone = Arc::clone(&time);
let handle = std::thread::spawn(move || {
let mut previous = time_clone.time_since_epoch().as_u64();
for i in 0..iterations {
let current = time_clone.time_since_epoch().as_u64();
assert!(
current > previous,
"Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
);
previous = current;
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[rstest]
fn test_duration_since_unix_epoch() {
let time = AtomicTime::new(true, UnixNanos::default());
let duration = Duration::from_nanos(time.get_time_ns().into());
let now = SystemTime::now();
let delta = now
.duration_since(UNIX_EPOCH)
.unwrap()
.checked_sub(duration);
assert!(delta.unwrap_or_default() < Duration::from_millis(100));
assert!(duration > Duration::from_mins(27_500_000));
}
#[rstest]
fn test_unix_timestamp_is_monotonic_increasing() {
let time = AtomicTime::new(true, UnixNanos::default());
let result1 = time.get_time();
let result2 = time.get_time();
let result3 = time.get_time();
let result4 = time.get_time();
let result5 = time.get_time();
assert!(result2 >= result1);
assert!(result3 >= result2);
assert!(result4 >= result3);
assert!(result5 >= result4);
assert!(result1 > 1_650_000_000.0);
}
#[rstest]
fn test_unix_timestamp_ms_is_monotonic_increasing() {
let time = AtomicTime::new(true, UnixNanos::default());
let result1 = time.get_time_ms();
let result2 = time.get_time_ms();
let result3 = time.get_time_ms();
let result4 = time.get_time_ms();
let result5 = time.get_time_ms();
assert!(result2 >= result1);
assert!(result3 >= result2);
assert!(result4 >= result3);
assert!(result5 >= result4);
assert!(result1 >= 1_650_000_000_000);
}
#[rstest]
fn test_unix_timestamp_us_is_monotonic_increasing() {
let time = AtomicTime::new(true, UnixNanos::default());
let result1 = time.get_time_us();
let result2 = time.get_time_us();
let result3 = time.get_time_us();
let result4 = time.get_time_us();
let result5 = time.get_time_us();
assert!(result2 >= result1);
assert!(result3 >= result2);
assert!(result4 >= result3);
assert!(result5 >= result4);
assert!(result1 > 1_650_000_000_000_000);
}
#[rstest]
fn test_unix_timestamp_ns_is_monotonic_increasing() {
let time = AtomicTime::new(true, UnixNanos::default());
let result1 = time.get_time_ns();
let result2 = time.get_time_ns();
let result3 = time.get_time_ns();
let result4 = time.get_time_ns();
let result5 = time.get_time_ns();
assert!(result2 >= result1);
assert!(result3 >= result2);
assert!(result4 >= result3);
assert!(result5 >= result4);
assert!(result1.as_u64() > 1_650_000_000_000_000_000);
}
#[rstest]
fn test_acquire_release_contract_static_mode() {
let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
let aux_data = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicBool::new(false));
let writer_clock = Arc::clone(&clock);
let writer_aux = Arc::clone(&aux_data);
let writer_done = Arc::clone(&done);
let writer = std::thread::spawn(move || {
for i in 1..=1_000u64 {
writer_aux.store(i, Ordering::Relaxed);
writer_clock.set_time(UnixNanos::from(i * 1000));
std::thread::yield_now();
}
writer_done.store(true, Ordering::Release);
});
let reader_clock = Arc::clone(&clock);
let reader_aux = Arc::clone(&aux_data);
let reader_done = Arc::clone(&done);
let reader = std::thread::spawn(move || {
let mut last_time = 0u64;
let mut max_aux_seen = 0u64;
while !reader_done.load(Ordering::Acquire) {
let current_time = reader_clock.get_time_ns().as_u64();
if current_time > last_time {
let aux_value = reader_aux.load(Ordering::Relaxed);
if aux_value > 0 {
assert!(
aux_value >= max_aux_seen,
"Acquire/Release contract violated: aux went backwards from {max_aux_seen} to {aux_value}"
);
max_aux_seen = aux_value;
}
last_time = current_time;
}
std::thread::yield_now();
}
let final_time = reader_clock.get_time_ns().as_u64();
if final_time > last_time {
let final_aux = reader_aux.load(Ordering::Relaxed);
if final_aux > 0 {
assert!(
final_aux >= max_aux_seen,
"Acquire/Release contract violated: final aux {final_aux} < max {max_aux_seen}"
);
max_aux_seen = final_aux;
}
}
max_aux_seen
});
writer.join().unwrap();
let max_observed = reader.join().unwrap();
assert!(max_observed > 0, "Reader must observe writer updates");
}
#[rstest]
fn test_acquire_release_contract_increment_time() {
let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
let aux_data = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicBool::new(false));
let writer_clock = Arc::clone(&clock);
let writer_aux = Arc::clone(&aux_data);
let writer_done = Arc::clone(&done);
let writer = std::thread::spawn(move || {
for i in 1..=1_000u64 {
writer_aux.store(i, Ordering::Relaxed);
let _ = writer_clock.increment_time(1000).unwrap();
std::thread::yield_now();
}
writer_done.store(true, Ordering::Release);
});
let reader_clock = Arc::clone(&clock);
let reader_aux = Arc::clone(&aux_data);
let reader_done = Arc::clone(&done);
let reader = std::thread::spawn(move || {
let mut last_time = 0u64;
let mut max_aux = 0u64;
while !reader_done.load(Ordering::Acquire) {
let current_time = reader_clock.get_time_ns().as_u64();
if current_time > last_time {
let aux_value = reader_aux.load(Ordering::Relaxed);
if aux_value > 0 {
assert!(
aux_value >= max_aux,
"AcqRel contract violated: aux regressed from {max_aux} to {aux_value}"
);
max_aux = aux_value;
}
last_time = current_time;
}
std::thread::yield_now();
}
let final_time = reader_clock.get_time_ns().as_u64();
if final_time > last_time {
let final_aux = reader_aux.load(Ordering::Relaxed);
if final_aux > 0 {
assert!(
final_aux >= max_aux,
"AcqRel contract violated: final aux {final_aux} < max {max_aux}"
);
max_aux = final_aux;
}
}
max_aux
});
writer.join().unwrap();
let max_observed = reader.join().unwrap();
assert!(max_observed > 0, "Reader must observe writer updates");
}
#[cfg(all(feature = "simulation", madsim))]
#[madsim::test]
async fn test_wall_clock_advances_with_virtual_time() {
let before = nanos_since_unix_epoch();
madsim::time::sleep(std::time::Duration::from_secs(60)).await;
let after = nanos_since_unix_epoch();
let elapsed_ns = after.saturating_sub(before);
let sixty_seconds_ns = 60 * NANOSECONDS_IN_SECOND;
assert!(
elapsed_ns >= sixty_seconds_ns,
"wall clock did not advance by full virtual sleep: elapsed={elapsed_ns}ns"
);
}
}