use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use std::{
cmp::Ordering as CmpOrdering,
collections::BinaryHeap,
sync::atomic::{AtomicI64, AtomicU8, AtomicU64, Ordering},
task::Waker,
time::{Duration, Instant},
};
use super::config::{ArtificialClockConfig, ArtificialMode};
static NEXT_SLEEP_ID: AtomicU64 = AtomicU64::new(0);
pub(crate) fn next_sleep_id() -> u64 {
NEXT_SLEEP_ID.fetch_add(1, Ordering::Relaxed)
}
const MODE_MANUAL: u8 = 0;
const MODE_AUTO: u8 = 1;
const MODE_REALTIME: u8 = 2;
pub(crate) struct ArtificialClock {
mode: AtomicU8,
time_scale: f64,
current_ms: AtomicI64,
real_start: Instant,
pending_wakes: Mutex<BinaryHeap<PendingWake>>,
}
pub(crate) struct PendingWake {
wake_at_ms: i64,
sleep_id: u64,
waker: Waker,
}
impl PartialEq for PendingWake {
fn eq(&self, other: &Self) -> bool {
self.wake_at_ms == other.wake_at_ms && self.sleep_id == other.sleep_id
}
}
impl Eq for PendingWake {}
impl PartialOrd for PendingWake {
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}
impl Ord for PendingWake {
fn cmp(&self, other: &Self) -> CmpOrdering {
match other.wake_at_ms.cmp(&self.wake_at_ms) {
CmpOrdering::Equal => other.sleep_id.cmp(&self.sleep_id),
ord => ord,
}
}
}
impl ArtificialClock {
pub fn new(config: ArtificialClockConfig) -> Self {
let (mode, time_scale) = match config.mode {
ArtificialMode::Manual => (MODE_MANUAL, 0.0),
ArtificialMode::AutoAdvance { time_scale } => (MODE_AUTO, time_scale),
};
Self {
mode: AtomicU8::new(mode),
time_scale,
current_ms: AtomicI64::new(config.start_at.timestamp_millis()),
real_start: Instant::now(),
pending_wakes: Mutex::new(BinaryHeap::new()),
}
}
pub fn now(&self) -> DateTime<Utc> {
DateTime::from_timestamp_millis(self.now_ms()).expect("valid timestamp")
}
pub fn now_ms(&self) -> i64 {
match self.mode.load(Ordering::Acquire) {
MODE_REALTIME => Utc::now().timestamp_millis(),
MODE_MANUAL => self.current_ms.load(Ordering::SeqCst),
MODE_AUTO => {
let base_ms = self.current_ms.load(Ordering::SeqCst);
let real_elapsed = self.real_start.elapsed();
base_ms + (real_elapsed.as_millis() as f64 * self.time_scale) as i64
}
_ => unreachable!(),
}
}
pub(crate) fn real_duration(&self, duration: Duration) -> Duration {
match self.mode.load(Ordering::Acquire) {
MODE_REALTIME => duration,
MODE_MANUAL => Duration::ZERO,
MODE_AUTO => {
let real_ms = (duration.as_millis() as f64 / self.time_scale).ceil() as u64;
Duration::from_millis(real_ms.max(1))
}
_ => unreachable!(),
}
}
pub fn is_manual(&self) -> bool {
self.mode.load(Ordering::Acquire) == MODE_MANUAL
}
pub fn is_realtime(&self) -> bool {
self.mode.load(Ordering::Acquire) == MODE_REALTIME
}
pub fn transition_to_realtime(&self) {
self.mode.store(MODE_REALTIME, Ordering::Release);
self.wake_all_pending();
}
fn wake_all_pending(&self) {
let wakers: Vec<Waker> = {
let mut pending = self.pending_wakes.lock();
pending.drain().map(|w| w.waker).collect()
};
for waker in wakers {
waker.wake();
}
}
pub fn register_wake(&self, wake_at_ms: i64, sleep_id: u64, waker: Waker) {
let mut pending = self.pending_wakes.lock();
pending.push(PendingWake {
wake_at_ms,
sleep_id,
waker,
});
}
pub fn cancel_wake(&self, sleep_id: u64) {
let mut pending = self.pending_wakes.lock();
let entries: Vec<_> = pending.drain().filter(|w| w.sleep_id != sleep_id).collect();
pending.extend(entries);
}
pub fn clear_pending_wakes(&self) {
self.pending_wakes.lock().clear();
}
pub fn next_wake_time(&self) -> Option<i64> {
let pending = self.pending_wakes.lock();
pending.peek().map(|w| w.wake_at_ms)
}
pub fn wake_tasks_at(&self, up_to_ms: i64) -> usize {
let wakers: Vec<Waker> = {
let mut pending = self.pending_wakes.lock();
let mut wakers = Vec::new();
while let Some(wake) = pending.peek() {
if wake.wake_at_ms > up_to_ms {
break;
}
let wake = pending.pop().unwrap();
wakers.push(wake.waker);
}
wakers
};
let count = wakers.len();
for waker in wakers {
waker.wake();
}
count
}
pub fn set_time(&self, time: DateTime<Utc>) {
self.current_ms
.store(time.timestamp_millis(), Ordering::SeqCst);
}
pub async fn advance(&self, duration: Duration) -> usize {
if !self.is_manual() {
return 0;
}
let start_ms = self.current_ms.load(Ordering::SeqCst);
let target_ms = start_ms + duration.as_millis() as i64;
let mut total_woken = 0;
loop {
let next_wake_ms = self.next_wake_time();
match next_wake_ms {
Some(wake_ms) if wake_ms <= target_ms => {
self.current_ms.store(wake_ms, Ordering::SeqCst);
let woken = self.wake_tasks_at(wake_ms);
total_woken += woken;
tokio::task::yield_now().await;
}
_ => {
self.current_ms.store(target_ms, Ordering::SeqCst);
break;
}
}
}
total_woken
}
pub async fn advance_to_next_wake(&self) -> Option<DateTime<Utc>> {
if !self.is_manual() {
return None;
}
let next_wake_ms = self.next_wake_time()?;
self.current_ms.store(next_wake_ms, Ordering::SeqCst);
self.wake_tasks_at(next_wake_ms);
tokio::task::yield_now().await;
Some(DateTime::from_timestamp_millis(next_wake_ms).expect("valid timestamp"))
}
pub fn pending_wake_count(&self) -> usize {
self.pending_wakes.lock().len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_manual_now() {
let clock = ArtificialClock::new(ArtificialClockConfig::manual());
let start = clock.now();
std::thread::sleep(Duration::from_millis(10));
assert_eq!(clock.now(), start);
}
#[test]
fn test_auto_advance_now() {
let start = Utc::now();
let clock = ArtificialClock::new(ArtificialClockConfig::auto_at(start, 1000.0));
let t1 = clock.now();
std::thread::sleep(Duration::from_millis(10));
let t2 = clock.now();
let elapsed = t2 - t1;
assert!(elapsed.num_seconds() >= 5 && elapsed.num_seconds() <= 20);
}
#[test]
fn test_transition_to_realtime() {
let clock = ArtificialClock::new(ArtificialClockConfig::manual());
assert!(clock.is_manual());
assert!(!clock.is_realtime());
clock.transition_to_realtime();
assert!(!clock.is_manual());
assert!(clock.is_realtime());
let clock_now = clock.now();
let utc_now = Utc::now();
let diff = (clock_now - utc_now).num_milliseconds().abs();
assert!(diff < 100); }
#[test]
fn test_pending_wake_ordering() {
let clock = ArtificialClock::new(ArtificialClockConfig::manual());
let waker = futures::task::noop_waker();
clock.register_wake(3000, 1, waker.clone());
clock.register_wake(1000, 2, waker.clone());
clock.register_wake(2000, 3, waker);
assert_eq!(clock.next_wake_time(), Some(1000));
clock.wake_tasks_at(1000);
assert_eq!(clock.next_wake_time(), Some(2000));
clock.wake_tasks_at(2000);
assert_eq!(clock.next_wake_time(), Some(3000));
}
#[test]
fn test_clear_pending_wakes() {
let clock = ArtificialClock::new(ArtificialClockConfig::manual());
let waker = futures::task::noop_waker();
clock.register_wake(1000, 1, waker.clone());
clock.register_wake(2000, 2, waker);
assert_eq!(clock.pending_wake_count(), 2);
clock.clear_pending_wakes();
assert_eq!(clock.pending_wake_count(), 0);
}
}