use parking_lot::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
pub trait ClockProvider: Send + Sync {
fn now(&self) -> u64;
fn system_time_millis(&self) -> u64;
fn sleep(
&self,
duration: Duration,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>>;
fn advance(&self, duration: Duration);
fn is_mock(&self) -> bool;
}
#[derive(Debug, Clone)]
pub struct RealClock {
start: Instant,
}
impl RealClock {
pub fn new() -> Self {
Self {
start: Instant::now(),
}
}
}
impl Default for RealClock {
fn default() -> Self {
Self::new()
}
}
impl ClockProvider for RealClock {
fn now(&self) -> u64 {
self.start.elapsed().as_nanos() as u64
}
fn system_time_millis(&self) -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before UNIX epoch")
.as_millis() as u64
}
fn sleep(
&self,
duration: Duration,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
Box::pin(async move {
tokio::time::sleep(duration).await;
})
}
fn advance(&self, _duration: Duration) {
}
fn is_mock(&self) -> bool {
false
}
}
pub struct MockClock {
current_nanos: AtomicU64,
system_time_millis: AtomicU64,
pending_sleeps: Mutex<Vec<PendingSleep>>,
}
struct PendingSleep {
wake_at_nanos: u64,
waker: Option<std::task::Waker>,
}
impl MockClock {
pub fn new() -> Self {
Self {
current_nanos: AtomicU64::new(0),
system_time_millis: AtomicU64::new(0),
pending_sleeps: Mutex::new(Vec::new()),
}
}
pub fn fixed(iso_time: &str) -> Self {
let dt = chrono::DateTime::parse_from_rfc3339(iso_time)
.expect("Invalid ISO 8601 datetime format");
let millis = dt.timestamp_millis() as u64;
Self {
current_nanos: AtomicU64::new(0),
system_time_millis: AtomicU64::new(millis),
pending_sleeps: Mutex::new(Vec::new()),
}
}
pub fn at_now() -> Self {
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before UNIX epoch")
.as_millis() as u64;
Self {
current_nanos: AtomicU64::new(0),
system_time_millis: AtomicU64::new(millis),
pending_sleeps: Mutex::new(Vec::new()),
}
}
pub fn current_nanos(&self) -> u64 {
self.current_nanos.load(Ordering::SeqCst)
}
fn wake_expired_sleeps(&self, current: u64) {
let mut sleeps = self.pending_sleeps.lock();
sleeps.retain_mut(|sleep| {
if sleep.wake_at_nanos <= current {
if let Some(waker) = sleep.waker.take() {
waker.wake();
}
false } else {
true }
});
}
}
impl Default for MockClock {
fn default() -> Self {
Self::new()
}
}
impl ClockProvider for MockClock {
fn now(&self) -> u64 {
self.current_nanos.load(Ordering::SeqCst)
}
fn system_time_millis(&self) -> u64 {
self.system_time_millis.load(Ordering::SeqCst)
}
fn sleep(
&self,
duration: Duration,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
let wake_at = self.current_nanos.load(Ordering::SeqCst) + duration.as_nanos() as u64;
if self.current_nanos.load(Ordering::SeqCst) >= wake_at {
return Box::pin(std::future::ready(()));
}
let clock_ref = self as *const Self;
Box::pin(MockSleepFuture {
wake_at,
clock: clock_ref,
registered: false,
})
}
fn advance(&self, duration: Duration) {
let nanos = duration.as_nanos() as u64;
let new_time = self.current_nanos.fetch_add(nanos, Ordering::SeqCst) + nanos;
let millis = duration.as_millis() as u64;
self.system_time_millis.fetch_add(millis, Ordering::SeqCst);
self.wake_expired_sleeps(new_time);
}
fn is_mock(&self) -> bool {
true
}
}
struct MockSleepFuture {
wake_at: u64,
clock: *const MockClock,
registered: bool,
}
unsafe impl Send for MockSleepFuture {}
impl std::future::Future for MockSleepFuture {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let clock = unsafe { &*self.clock };
let current = clock.current_nanos.load(Ordering::SeqCst);
if current >= self.wake_at {
std::task::Poll::Ready(())
} else {
if !self.registered {
let mut sleeps = clock.pending_sleeps.lock();
sleeps.push(PendingSleep {
wake_at_nanos: self.wake_at,
waker: Some(cx.waker().clone()),
});
self.registered = true;
} else {
let mut sleeps = clock.pending_sleeps.lock();
for sleep in sleeps.iter_mut() {
if sleep.wake_at_nanos == self.wake_at {
sleep.waker = Some(cx.waker().clone());
break;
}
}
}
std::task::Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn real_clock_advances() {
let clock = RealClock::new();
let t1 = clock.now();
std::thread::sleep(Duration::from_millis(10));
let t2 = clock.now();
assert!(t2 > t1);
}
#[test]
fn mock_clock_fixed_time() {
let clock = MockClock::fixed("2024-01-15T10:30:00Z");
let millis = clock.system_time_millis();
assert_eq!(millis, 1705314600000);
}
#[test]
fn mock_clock_does_not_advance_automatically() {
let clock = MockClock::new();
let t1 = clock.now();
std::thread::sleep(Duration::from_millis(10));
let t2 = clock.now();
assert_eq!(t1, t2);
}
#[test]
fn mock_clock_advance() {
let clock = MockClock::new();
assert_eq!(clock.now(), 0);
clock.advance(Duration::from_secs(1));
assert_eq!(clock.now(), 1_000_000_000);
clock.advance(Duration::from_millis(500));
assert_eq!(clock.now(), 1_500_000_000);
}
#[test]
fn mock_clock_system_time_advances() {
let clock = MockClock::fixed("2024-01-15T10:30:00Z");
let t1 = clock.system_time_millis();
clock.advance(Duration::from_secs(60));
let t2 = clock.system_time_millis();
assert_eq!(t2 - t1, 60_000);
}
#[tokio::test]
async fn mock_clock_sleep_completes_on_advance() {
let clock = Arc::new(MockClock::new());
let clock_ref = Arc::clone(&clock);
let handle = tokio::spawn(async move {
clock_ref.sleep(Duration::from_secs(1)).await;
true
});
tokio::task::yield_now().await;
clock.advance(Duration::from_secs(2));
let result = tokio::time::timeout(Duration::from_millis(100), handle)
.await
.expect("Timed out waiting for sleep")
.expect("Task panicked");
assert!(result);
}
}