use std::{
cmp::Reverse,
collections::{BinaryHeap, HashMap},
time::{Duration, Instant},
};
type Slot = Reverse<u64>;
pub struct TimeWheel<T> {
start: Instant,
tick_interval: u64,
ticks: u64,
priority_queue: BinaryHeap<Slot>,
timers: HashMap<u64, Vec<T>>,
counter: usize,
}
impl<T> TimeWheel<T> {
pub fn new(tick_interval: Duration) -> Self {
Self {
tick_interval: tick_interval.as_micros() as u64,
ticks: 0,
start: Instant::now(),
priority_queue: Default::default(),
timers: Default::default(),
counter: 0,
}
}
pub fn len(&self) -> usize {
self.counter
}
pub fn deadline(&mut self, deadline: Instant, value: T) -> Option<u64> {
let interval = (deadline - self.start).as_micros() as u64;
let mut ticks = interval / self.tick_interval;
if interval % self.tick_interval != 0 {
ticks += 1;
}
if !(ticks > self.ticks) {
return None;
}
if let Some(timers) = self.timers.get_mut(&ticks) {
timers.push(value);
} else {
self.timers.insert(ticks, vec![value]);
self.priority_queue.push(Reverse(ticks));
}
self.counter += 1;
Some(ticks)
}
pub fn after(&mut self, duration: Duration, value: T) -> Option<u64> {
self.deadline(Instant::now() + duration, value)
}
pub fn spin(&mut self, wakers: &mut Vec<(u64, T)>) {
self.ticks = (Instant::now() - self.start).as_micros() as u64 / self.tick_interval;
while let Some(slot) = self.priority_queue.peek() {
if slot.0 > self.ticks {
break;
}
let slot = self.priority_queue.pop().unwrap().0;
if let Some(timers) = self.timers.remove(&slot) {
self.counter -= timers.len();
let mut timers = timers.into_iter().map(|v| (slot, v)).collect();
wakers.append(&mut timers);
}
}
}
}
#[cfg(test)]
mod tests {
use std::{thread::sleep, time::Duration};
use super::*;
#[test]
fn test_len() {
let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
time_wheel
.deadline(time_wheel.start + Duration::from_millis(1), ())
.expect("deadline is valid.");
assert_eq!(time_wheel.len(), 1);
sleep(Duration::from_millis(1));
let mut wakers = vec![];
time_wheel.spin(&mut wakers);
assert_eq!(
wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
vec![()]
);
}
#[test]
fn test_order() {
let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
let deadline = time_wheel.start + Duration::from_millis(1);
time_wheel
.deadline(deadline, 1)
.expect("deadline is valid.");
time_wheel
.deadline(deadline, 2)
.expect("deadline is valid.");
sleep(Duration::from_millis(1));
let mut wakers = vec![];
time_wheel.spin(&mut wakers);
assert_eq!(
wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
vec![1, 2]
);
}
#[test]
fn test_order2() {
let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
time_wheel
.deadline(time_wheel.start + Duration::from_millis(500), 1)
.expect("deadline is valid.");
time_wheel
.deadline(time_wheel.start + Duration::from_millis(1000), 2)
.expect("deadline is valid.");
assert_eq!(time_wheel.len(), 2);
sleep(Duration::from_millis(500));
let mut wakers = vec![];
time_wheel.spin(&mut wakers);
assert_eq!(
wakers.iter().cloned().map(|v| v.1).collect::<Vec<_>>(),
vec![1]
);
assert_eq!(time_wheel.len(), 1);
sleep(Duration::from_millis(1000));
time_wheel.spin(&mut wakers);
assert_eq!(
wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
vec![1, 2]
);
assert_eq!(time_wheel.len(), 0);
}
#[test]
fn test_after() {
let mut time_wheel = TimeWheel::new(Duration::from_millis(1));
time_wheel.after(Duration::from_millis(1), ());
sleep(Duration::from_millis(2));
let mut wakers = vec![];
time_wheel.spin(&mut wakers);
assert_eq!(
wakers.into_iter().map(|v| v.1).collect::<Vec<_>>(),
vec![()]
);
}
}