use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::Notify;
#[async_trait]
pub trait Clock: Send + Sync + fmt::Debug {
fn now_utc(&self) -> OffsetDateTime;
fn monotonic_ms(&self) -> i64;
async fn sleep(&self, duration: Duration);
async fn sleep_until_utc(&self, deadline: OffsetDateTime);
}
pub fn now_wall_ms(clock: &dyn Clock) -> i64 {
let ts = clock.now_utc();
let nanos = ts.unix_timestamp_nanos();
(nanos / 1_000_000) as i64
}
pub struct RealClock {
monotonic_origin: tokio::time::Instant,
}
impl Default for RealClock {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for RealClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RealClock").finish()
}
}
impl RealClock {
pub fn new() -> Self {
Self {
monotonic_origin: tokio::time::Instant::now(),
}
}
pub fn arc() -> Arc<dyn Clock> {
Arc::new(Self::new())
}
}
#[async_trait]
impl Clock for RealClock {
fn now_utc(&self) -> OffsetDateTime {
OffsetDateTime::now_utc()
}
fn monotonic_ms(&self) -> i64 {
let elapsed = tokio::time::Instant::now().saturating_duration_since(self.monotonic_origin);
elapsed.as_millis() as i64
}
async fn sleep(&self, duration: Duration) {
if duration.is_zero() {
return;
}
tokio::time::sleep(duration).await;
}
async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
let now = self.now_utc();
if deadline <= now {
return;
}
let delta = deadline - now;
let Ok(duration) = Duration::try_from(delta) else {
return;
};
tokio::time::sleep(duration).await;
}
}
pub struct PausedClock {
state: Mutex<PausedState>,
notify: Notify,
}
struct PausedState {
wall: OffsetDateTime,
monotonic: Duration,
}
impl fmt::Debug for PausedClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.lock();
f.debug_struct("PausedClock")
.field("wall", &state.wall)
.field("monotonic_ms", &state.monotonic.as_millis())
.finish()
}
}
impl PausedClock {
pub fn new(origin: OffsetDateTime) -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(PausedState {
wall: origin,
monotonic: Duration::ZERO,
}),
notify: Notify::new(),
})
}
pub fn advance(&self, duration: Duration) {
let delta = match time::Duration::try_from(duration) {
Ok(value) => value,
Err(_) => return,
};
{
let mut state = self.state.lock();
state.wall += delta;
state.monotonic = state.monotonic.saturating_add(duration);
}
self.notify.notify_waiters();
}
pub fn advance_time(&self, duration: time::Duration) {
let Ok(positive) = Duration::try_from(duration) else {
return;
};
self.advance(positive);
}
pub fn advance_ticks(&self, ticks: u32, tick: Duration) {
for _ in 0..ticks {
self.advance(tick);
}
}
pub fn set(&self, wall: OffsetDateTime) {
{
let mut state = self.state.lock();
let delta = wall - state.wall;
state.wall = wall;
if delta.is_positive() {
if let Ok(positive) = Duration::try_from(delta) {
state.monotonic = state.monotonic.saturating_add(positive);
}
}
}
self.notify.notify_waiters();
}
}
#[async_trait]
impl Clock for PausedClock {
fn now_utc(&self) -> OffsetDateTime {
self.state.lock().wall
}
fn monotonic_ms(&self) -> i64 {
self.state.lock().monotonic.as_millis() as i64
}
async fn sleep(&self, duration: Duration) {
if duration.is_zero() {
return;
}
let deadline = self.state.lock().wall
+ time::Duration::try_from(duration).unwrap_or(time::Duration::ZERO);
self.sleep_until_utc(deadline).await;
}
async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
loop {
let notified = self.notify.notified();
if self.state.lock().wall >= deadline {
return;
}
notified.await;
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ClockEvent {
NowUtc { wall_ns: i128 },
MonotonicMs { value: i64 },
Sleep { duration_ms: u64 },
SleepUntil { wall_ns: i128 },
}
#[derive(Debug, Default)]
pub struct ClockEventLog {
events: Mutex<Vec<ClockEvent>>,
}
impl ClockEventLog {
pub fn new() -> Self {
Self::default()
}
pub fn push(&self, event: ClockEvent) {
self.events.lock().push(event);
}
pub fn snapshot(&self) -> Vec<ClockEvent> {
self.events.lock().clone()
}
pub fn len(&self) -> usize {
self.events.lock().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct RecordedClock {
inner: Arc<dyn Clock>,
log: Arc<ClockEventLog>,
}
impl fmt::Debug for RecordedClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecordedClock")
.field("inner", &self.inner)
.field("events", &self.log.len())
.finish()
}
}
impl RecordedClock {
pub fn new(inner: Arc<dyn Clock>, log: Arc<ClockEventLog>) -> Self {
Self { inner, log }
}
pub fn log(&self) -> Arc<ClockEventLog> {
self.log.clone()
}
}
#[async_trait]
impl Clock for RecordedClock {
fn now_utc(&self) -> OffsetDateTime {
let value = self.inner.now_utc();
self.log.push(ClockEvent::NowUtc {
wall_ns: value.unix_timestamp_nanos(),
});
value
}
fn monotonic_ms(&self) -> i64 {
let value = self.inner.monotonic_ms();
self.log.push(ClockEvent::MonotonicMs { value });
value
}
async fn sleep(&self, duration: Duration) {
self.log.push(ClockEvent::Sleep {
duration_ms: duration.as_millis() as u64,
});
self.inner.sleep(duration).await;
}
async fn sleep_until_utc(&self, deadline: OffsetDateTime) {
self.log.push(ClockEvent::SleepUntil {
wall_ns: deadline.unix_timestamp_nanos(),
});
self.inner.sleep_until_utc(deadline).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn epoch() -> OffsetDateTime {
OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap()
}
#[tokio::test]
async fn real_clock_returns_increasing_monotonic() {
let clock = RealClock::new();
let a = clock.monotonic_ms();
tokio::task::yield_now().await;
let b = clock.monotonic_ms();
assert!(b >= a, "monotonic must not move backwards");
}
#[tokio::test]
async fn paused_clock_pins_wall_and_monotonic_until_advanced() {
let clock = PausedClock::new(epoch());
assert_eq!(clock.now_utc(), epoch());
assert_eq!(clock.monotonic_ms(), 0);
clock.advance(Duration::from_millis(250));
assert_eq!(clock.monotonic_ms(), 250);
assert_eq!(clock.now_utc(), epoch() + time::Duration::milliseconds(250));
}
#[tokio::test]
async fn paused_clock_sleep_resumes_after_advance() {
let clock = PausedClock::new(epoch());
let clock_for_sleep = clock.clone();
let task = tokio::spawn(async move {
clock_for_sleep.sleep(Duration::from_secs(5)).await;
});
tokio::task::yield_now().await;
assert!(!task.is_finished(), "sleep should still be pending");
clock.advance(Duration::from_secs(10));
task.await.expect("sleep task panicked");
}
#[tokio::test]
async fn paused_clock_sleep_until_returns_immediately_for_past_deadline() {
let clock = PausedClock::new(epoch());
clock.advance(Duration::from_secs(60));
clock.sleep_until_utc(epoch()).await;
}
#[tokio::test]
async fn recorded_clock_appends_one_event_per_call() {
let log = Arc::new(ClockEventLog::new());
let clock = RecordedClock::new(PausedClock::new(epoch()), log.clone());
let _ = clock.now_utc();
let _ = clock.monotonic_ms();
clock.sleep(Duration::ZERO).await;
clock.sleep_until_utc(epoch()).await;
let events = log.snapshot();
assert_eq!(events.len(), 4);
assert!(matches!(events[0], ClockEvent::NowUtc { .. }));
assert!(matches!(events[1], ClockEvent::MonotonicMs { .. }));
assert!(matches!(events[2], ClockEvent::Sleep { duration_ms: 0 }));
assert!(matches!(events[3], ClockEvent::SleepUntil { .. }));
}
#[tokio::test]
async fn now_wall_ms_helper_matches_clock_observation() {
let clock = PausedClock::new(epoch());
let observed = now_wall_ms(clock.as_ref());
let expected = epoch().unix_timestamp_nanos() / 1_000_000;
assert_eq!(observed, expected as i64);
}
}