use crate::TimerWheel;
use std::time::{Duration, Instant};
pub trait Clock {
fn now_nanos(&self) -> u64;
}
#[derive(Default)]
pub struct MonotonicClock {
origin: Option<Instant>,
}
impl MonotonicClock {
pub fn new() -> Self {
Self {
origin: Some(Instant::now()),
}
}
}
impl Clock for MonotonicClock {
fn now_nanos(&self) -> u64 {
let origin = self.origin.unwrap_or_else(Instant::now);
Instant::now().duration_since(origin).as_nanos() as u64
}
}
pub struct TestClock {
now_nanos: std::cell::Cell<u64>,
}
impl Default for TestClock {
fn default() -> Self {
Self::new()
}
}
impl TestClock {
pub fn new() -> Self {
Self {
now_nanos: std::cell::Cell::new(0),
}
}
pub fn advance(&self, d: Duration) {
self.now_nanos
.set(self.now_nanos.get().saturating_add(d.as_nanos() as u64));
}
}
impl Clock for TestClock {
fn now_nanos(&self) -> u64 {
self.now_nanos.get()
}
}
pub struct DeadlineScheduler<V, C: Clock> {
wheel: TimerWheel<V>,
clock: C,
tick_nanos: u64,
consumed_nanos: u64,
}
impl<V, C: Clock> DeadlineScheduler<V, C> {
pub fn new(num_slots: usize, clock: C, tick: Duration) -> Self {
let tick_nanos = (tick.as_nanos() as u64).max(1);
Self {
wheel: TimerWheel::new(num_slots),
clock,
tick_nanos,
consumed_nanos: 0,
}
}
pub fn tick_nanos(&self) -> u64 {
self.tick_nanos
}
pub fn schedule_after(&mut self, delay: Duration, value: V) -> u64 {
let ticks = self.nanos_to_ticks(delay.as_nanos() as u64);
self.wheel.schedule(ticks, value)
}
pub fn schedule_at(&mut self, when_nanos: u64, value: V) -> u64 {
let now = self.clock.now_nanos();
let diff = when_nanos.saturating_sub(now);
let ticks = self.nanos_to_ticks(diff).max(1);
self.wheel.schedule(ticks, value)
}
pub fn cancel(&mut self, id: u64) -> bool {
self.wheel.cancel(id)
}
pub fn poll(&mut self) -> Vec<V> {
let now = self.clock.now_nanos();
let pending = now.saturating_sub(self.consumed_nanos);
let ticks = (pending / self.tick_nanos) as usize;
self.consumed_nanos = self
.consumed_nanos
.saturating_add(ticks as u64 * self.tick_nanos);
let mut fired = Vec::new();
for _ in 0..ticks {
fired.extend(self.wheel.tick());
}
fired
}
fn nanos_to_ticks(&self, nanos: u64) -> usize {
nanos.div_ceil(self.tick_nanos) as usize
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sched_with_clock() -> (DeadlineScheduler<&'static str, TestClock>, ()) {
(
DeadlineScheduler::new(64, TestClock::new(), Duration::from_millis(1)),
(),
)
}
#[test]
fn schedule_after_fires_after_elapsed_time() {
let (mut s, _) = sched_with_clock();
s.schedule_after(Duration::from_millis(3), "a");
assert!(s.poll().is_empty());
s.clock.advance(Duration::from_millis(2));
assert!(s.poll().is_empty());
s.clock.advance(Duration::from_millis(1));
assert_eq!(s.poll(), vec!["a"]);
}
#[test]
fn schedule_at_with_absolute_deadline_fires_when_clock_passes_it() {
let mut s = DeadlineScheduler::new(64, TestClock::new(), Duration::from_millis(1));
let when = s.clock.now_nanos() + Duration::from_millis(5).as_nanos() as u64;
s.schedule_at(when, "five");
s.clock.advance(Duration::from_millis(4));
assert!(s.poll().is_empty());
s.clock.advance(Duration::from_millis(1));
assert_eq!(s.poll(), vec!["five"]);
}
#[test]
fn schedule_at_in_the_past_fires_on_next_tick() {
let mut s = DeadlineScheduler::new(64, TestClock::new(), Duration::from_millis(1));
s.clock.advance(Duration::from_secs(10));
let id = s.schedule_at(0, "stale");
s.clock.advance(Duration::from_millis(1));
assert_eq!(s.poll(), vec!["stale"]);
assert!(!s.cancel(id));
}
#[test]
fn cancel_removes_before_fire() {
let (mut s, _) = sched_with_clock();
let id = s.schedule_after(Duration::from_millis(3), "doomed");
assert!(s.cancel(id));
s.clock.advance(Duration::from_millis(10));
assert!(s.poll().is_empty());
}
#[test]
fn poll_with_no_clock_movement_is_idempotent() {
let (mut s, _) = sched_with_clock();
s.schedule_after(Duration::from_millis(2), "a");
assert!(s.poll().is_empty());
assert!(s.poll().is_empty());
s.clock.advance(Duration::from_millis(2));
let first = s.poll();
let second = s.poll();
assert_eq!(first, vec!["a"]);
assert!(second.is_empty(), "second poll must not refire");
}
#[test]
fn sub_tick_delay_rounds_up_to_one_tick() {
let mut s = DeadlineScheduler::new(64, TestClock::new(), Duration::from_millis(1));
s.schedule_after(Duration::from_micros(500), "a");
s.clock.advance(Duration::from_millis(1));
assert_eq!(s.poll(), vec!["a"]);
}
#[test]
fn many_deadlines_fire_in_order() {
let mut s = DeadlineScheduler::new(64, TestClock::new(), Duration::from_millis(1));
for i in 1u32..=10 {
s.schedule_after(Duration::from_millis(i as u64), i);
}
for i in 1u32..=10 {
s.clock.advance(Duration::from_millis(1));
assert_eq!(s.poll(), vec![i]);
}
}
#[test]
fn monotonic_clock_default_does_not_panic() {
let c = MonotonicClock::new();
let a = c.now_nanos();
let b = c.now_nanos();
assert!(b >= a);
}
}