use std::marker::PhantomData;
use std::mem;
use std::time::{Duration, Instant};
use nexus_slab::{Full, Slot, bounded, unbounded};
use crate::entry::{EntryPtr, WheelEntry, entry_ref};
use crate::handle::TimerHandle;
use crate::level::Level;
use crate::store::{BoundedStore, SlabStore};
#[derive(Debug, Clone, Copy)]
pub struct WheelBuilder {
tick_duration: Duration,
slots_per_level: usize,
clk_shift: u32,
num_levels: usize,
}
impl Default for WheelBuilder {
fn default() -> Self {
WheelBuilder {
tick_duration: Duration::from_millis(1),
slots_per_level: 64,
clk_shift: 3,
num_levels: 7,
}
}
}
impl WheelBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn tick_duration(mut self, d: Duration) -> Self {
self.tick_duration = d;
self
}
pub fn slots_per_level(mut self, n: usize) -> Self {
self.slots_per_level = n;
self
}
pub fn clk_shift(mut self, s: u32) -> Self {
self.clk_shift = s;
self
}
pub fn num_levels(mut self, n: usize) -> Self {
self.num_levels = n;
self
}
pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
UnboundedWheelBuilder {
config: self,
chunk_capacity,
}
}
pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
BoundedWheelBuilder {
config: self,
capacity,
}
}
fn validate(&self) {
assert!(
self.slots_per_level.is_power_of_two(),
"slots_per_level must be a power of 2, got {}",
self.slots_per_level
);
assert!(
self.slots_per_level <= 64,
"slots_per_level must be <= 64 (u64 bitmask), got {}",
self.slots_per_level
);
assert!(self.num_levels > 0, "num_levels must be > 0");
assert!(
self.num_levels <= 8,
"num_levels must be <= 8 (u8 bitmask), got {}",
self.num_levels
);
assert!(self.clk_shift > 0, "clk_shift must be > 0");
assert!(
!self.tick_duration.is_zero(),
"tick_duration must be non-zero"
);
let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
assert!(
max_shift < 64,
"(num_levels - 1) * clk_shift must be < 64, got {}",
max_shift
);
let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
assert!(
slots_log2 + max_shift < 64,
"slots_per_level << max_shift would overflow u64"
);
}
fn tick_ns(&self) -> u64 {
self.tick_duration.as_nanos() as u64
}
}
#[derive(Debug)]
pub struct UnboundedWheelBuilder {
config: WheelBuilder,
chunk_capacity: usize,
}
impl UnboundedWheelBuilder {
pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
self.config.validate();
let slab = unsafe { unbounded::Slab::with_chunk_capacity(self.chunk_capacity) };
let levels = build_levels::<T>(&self.config);
TimerWheel {
slab,
num_levels: self.config.num_levels,
levels,
current_ticks: 0,
tick_ns: self.config.tick_ns(),
epoch: now,
active_levels: 0,
len: 0,
_marker: PhantomData,
}
}
}
#[derive(Debug)]
pub struct BoundedWheelBuilder {
config: WheelBuilder,
capacity: usize,
}
impl BoundedWheelBuilder {
pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
self.config.validate();
let slab = unsafe { bounded::Slab::with_capacity(self.capacity) };
let levels = build_levels::<T>(&self.config);
TimerWheel {
slab,
num_levels: self.config.num_levels,
levels,
current_ticks: 0,
tick_ns: self.config.tick_ns(),
epoch: now,
active_levels: 0,
len: 0,
_marker: PhantomData,
}
}
}
pub struct TimerWheel<
T: 'static,
S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
> {
slab: S,
levels: Vec<Level<T>>,
num_levels: usize,
active_levels: u8,
current_ticks: u64,
tick_ns: u64,
epoch: Instant,
len: usize,
_marker: PhantomData<*const ()>, }
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
impl<T: 'static> Wheel<T> {
pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
WheelBuilder::default().unbounded(chunk_capacity).build(now)
}
}
impl<T: 'static> BoundedWheel<T> {
pub fn bounded(capacity: usize, now: Instant) -> Self {
WheelBuilder::default().bounded(capacity).build(now)
}
}
fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
(0..config.num_levels)
.map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
.collect()
}
impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
let deadline_ticks = self.instant_to_ticks(deadline);
let entry = WheelEntry::new(deadline_ticks, value, 2);
let slot = self.slab.alloc(entry);
let ptr = slot.into_raw();
self.insert_entry(ptr, deadline_ticks);
self.len += 1;
TimerHandle::new(ptr)
}
pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
let deadline_ticks = self.instant_to_ticks(deadline);
let entry = WheelEntry::new(deadline_ticks, value, 1);
let slot = self.slab.alloc(entry);
let ptr = slot.into_raw();
self.insert_entry(ptr, deadline_ticks);
self.len += 1;
}
}
impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
let deadline_ticks = self.instant_to_ticks(deadline);
let entry = WheelEntry::new(deadline_ticks, value, 2);
match self.slab.try_alloc(entry) {
Ok(slot) => {
let ptr = slot.into_raw();
self.insert_entry(ptr, deadline_ticks);
self.len += 1;
Ok(TimerHandle::new(ptr))
}
Err(full) => {
let wheel_entry = full.into_inner();
let value = unsafe { wheel_entry.take_value() }
.expect("entry was just constructed with Some(value)");
Err(Full(value))
}
}
}
pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
let deadline_ticks = self.instant_to_ticks(deadline);
let entry = WheelEntry::new(deadline_ticks, value, 1);
match self.slab.try_alloc(entry) {
Ok(slot) => {
let ptr = slot.into_raw();
self.insert_entry(ptr, deadline_ticks);
self.len += 1;
Ok(())
}
Err(full) => {
let wheel_entry = full.into_inner();
let value = unsafe { wheel_entry.take_value() }
.expect("entry was just constructed with Some(value)");
Err(Full(value))
}
}
}
}
impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
let ptr = handle.ptr;
mem::forget(handle);
let entry = unsafe { entry_ref(ptr) };
let refs = entry.refs();
if refs == 2 {
let value = unsafe { entry.take_value() };
self.remove_entry(ptr);
self.len -= 1;
self.slab.free(unsafe { Slot::from_raw(ptr) });
value
} else {
debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
self.slab.free(unsafe { Slot::from_raw(ptr) });
None
}
}
pub fn free(&mut self, handle: TimerHandle<T>) {
let ptr = handle.ptr;
mem::forget(handle);
let entry = unsafe { entry_ref(ptr) };
let new_refs = entry.dec_refs();
if new_refs == 0 {
self.slab.free(unsafe { Slot::from_raw(ptr) });
}
}
pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
let ptr = handle.ptr;
mem::forget(handle);
let entry = unsafe { entry_ref(ptr) };
assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
self.remove_entry(ptr);
let new_ticks = self.instant_to_ticks(new_deadline);
entry.set_deadline_ticks(new_ticks);
self.insert_entry(ptr, new_ticks);
TimerHandle::new(ptr)
}
pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
self.poll_with_limit(now, usize::MAX, buf)
}
pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
let now_ticks = self.instant_to_ticks(now);
self.current_ticks = now_ticks;
let mut fired = 0;
let mut mask = self.active_levels;
while mask != 0 && fired < limit {
let lvl_idx = mask.trailing_zeros() as usize;
mask &= mask - 1; fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
}
fired
}
pub fn next_deadline(&self) -> Option<Instant> {
let mut min_ticks: Option<u64> = None;
let mut lvl_mask = self.active_levels;
while lvl_mask != 0 {
let lvl_idx = lvl_mask.trailing_zeros() as usize;
lvl_mask &= lvl_mask - 1;
let level = &self.levels[lvl_idx];
let mut slot_mask = level.active_slots();
while slot_mask != 0 {
let slot_idx = slot_mask.trailing_zeros() as usize;
slot_mask &= slot_mask - 1;
let slot = level.slot(slot_idx);
let mut entry_ptr = slot.entry_head();
while !entry_ptr.is_null() {
let entry = unsafe { entry_ref(entry_ptr) };
let dt = entry.deadline_ticks();
min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
entry_ptr = entry.next();
}
}
}
min_ticks.map(|t| self.ticks_to_instant(t))
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
fn instant_to_ticks(&self, instant: Instant) -> u64 {
let dur = instant.saturating_duration_since(self.epoch);
dur.as_nanos() as u64 / self.tick_ns
}
#[inline]
fn ticks_to_instant(&self, ticks: u64) -> Instant {
self.epoch + Duration::from_nanos(ticks.saturating_mul(self.tick_ns))
}
#[inline]
fn select_level(&self, deadline_ticks: u64) -> usize {
let delta = deadline_ticks.saturating_sub(self.current_ticks);
for (i, level) in self.levels.iter().enumerate() {
if delta < level.range() {
return i;
}
}
self.num_levels - 1
}
#[inline]
#[allow(clippy::needless_pass_by_ref_mut)]
fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
let lvl_idx = self.select_level(deadline_ticks);
let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
let entry = unsafe { entry_ref(entry_ptr) };
entry.set_location(lvl_idx as u8, slot_idx as u16);
unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
self.levels[lvl_idx].activate_slot(slot_idx);
self.active_levels |= 1 << lvl_idx;
}
#[inline]
#[allow(clippy::needless_pass_by_ref_mut)]
fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
let entry = unsafe { entry_ref(entry_ptr) };
let lvl_idx = entry.level() as usize;
let slot_idx = entry.slot_idx() as usize;
unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
if self.levels[lvl_idx].slot(slot_idx).is_empty() {
self.levels[lvl_idx].deactivate_slot(slot_idx);
if !self.levels[lvl_idx].is_active() {
self.active_levels &= !(1 << lvl_idx);
}
}
}
#[inline]
fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
let entry = unsafe { entry_ref(entry_ptr) };
let value = unsafe { entry.take_value() };
let new_refs = entry.dec_refs();
if new_refs == 0 {
self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
}
self.len -= 1;
value
}
fn poll_level(
&mut self,
lvl_idx: usize,
now_ticks: u64,
limit: usize,
buf: &mut Vec<T>,
) -> usize {
let mut fired = 0;
let mut mask = self.levels[lvl_idx].active_slots();
while mask != 0 && fired < limit {
let slot_idx = mask.trailing_zeros() as usize;
mask &= mask - 1;
let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
let slot = unsafe { &*slot_ptr };
let mut entry_ptr = slot.entry_head();
while !entry_ptr.is_null() && fired < limit {
let entry = unsafe { entry_ref(entry_ptr) };
let next_entry = entry.next();
if entry.deadline_ticks() <= now_ticks {
unsafe { slot.remove_entry(entry_ptr) };
if let Some(value) = self.fire_entry(entry_ptr) {
buf.push(value);
}
fired += 1;
}
entry_ptr = next_entry;
}
if slot.is_empty() {
self.levels[lvl_idx].deactivate_slot(slot_idx);
}
}
if !self.levels[lvl_idx].is_active() {
self.active_levels &= !(1 << lvl_idx);
}
fired
}
}
impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
fn drop(&mut self) {
let mut lvl_mask = self.active_levels;
while lvl_mask != 0 {
let lvl_idx = lvl_mask.trailing_zeros() as usize;
lvl_mask &= lvl_mask - 1;
let level = &self.levels[lvl_idx];
let mut slot_mask = level.active_slots();
while slot_mask != 0 {
let slot_idx = slot_mask.trailing_zeros() as usize;
slot_mask &= slot_mask - 1;
let slot = level.slot(slot_idx);
let mut entry_ptr = slot.entry_head();
while !entry_ptr.is_null() {
let entry = unsafe { entry_ref(entry_ptr) };
let next_entry = entry.next();
self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
entry_ptr = next_entry;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, Instant};
fn ms(millis: u64) -> Duration {
Duration::from_millis(millis)
}
fn assert_send<T: Send>() {}
#[test]
fn wheel_is_send() {
assert_send::<Wheel<u64>>();
assert_send::<BoundedWheel<u64>>();
}
#[test]
fn default_config() {
let now = Instant::now();
let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
assert!(wheel.is_empty());
assert_eq!(wheel.len(), 0);
}
#[test]
fn bounded_construction() {
let now = Instant::now();
let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
assert!(wheel.is_empty());
}
#[test]
#[should_panic(expected = "slots_per_level must be a power of 2")]
fn invalid_config_non_power_of_two() {
let now = Instant::now();
WheelBuilder::default()
.slots_per_level(65)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
fn schedule_and_cancel() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(50), 42);
assert_eq!(wheel.len(), 1);
let val = wheel.cancel(h);
assert_eq!(val, Some(42));
assert_eq!(wheel.len(), 0);
}
#[test]
fn schedule_forget_fires() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
wheel.schedule_forget(now + ms(10), 99);
assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(20), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![99]);
assert_eq!(wheel.len(), 0);
}
#[test]
fn cancel_after_fire_returns_none() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(10), 42);
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
assert_eq!(buf, vec![42]);
let val = wheel.cancel(h);
assert_eq!(val, None);
}
#[test]
fn free_active_timer_becomes_fire_and_forget() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(10), 42);
wheel.free(h); assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
assert_eq!(buf, vec![42]);
assert_eq!(wheel.len(), 0);
}
#[test]
fn free_zombie_handle() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(10), 42);
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
wheel.free(h);
}
#[test]
fn bounded_full() {
let now = Instant::now();
let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
let err = wheel.try_schedule(now + ms(30), 3);
assert!(err.is_err());
let recovered = err.unwrap_err().into_inner();
assert_eq!(recovered, 3);
wheel.cancel(h1);
let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
wheel.free(h2);
wheel.free(h3);
}
#[test]
fn bounded_schedule_forget_full() {
let now = Instant::now();
let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
wheel.try_schedule_forget(now + ms(10), 1).unwrap();
let err = wheel.try_schedule_forget(now + ms(20), 2);
assert!(err.is_err());
}
#[test]
fn poll_respects_deadline() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
wheel.schedule_forget(now + ms(10), 1);
wheel.schedule_forget(now + ms(50), 2);
wheel.schedule_forget(now + ms(100), 3);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(20), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![1]);
assert_eq!(wheel.len(), 2);
buf.clear();
let fired = wheel.poll(now + ms(60), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![2]);
buf.clear();
let fired = wheel.poll(now + ms(200), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![3]);
assert!(wheel.is_empty());
}
#[test]
fn poll_with_limit() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
for i in 0..10 {
wheel.schedule_forget(now + ms(1), i);
}
let mut buf = Vec::new();
let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
assert_eq!(fired, 3);
assert_eq!(wheel.len(), 7);
let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
assert_eq!(fired, 3);
assert_eq!(wheel.len(), 4);
let fired = wheel.poll(now + ms(5), &mut buf);
assert_eq!(fired, 4);
assert!(wheel.is_empty());
assert_eq!(buf.len(), 10);
}
#[test]
fn timers_across_levels() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
wheel.schedule_forget(now + ms(5), 0);
wheel.schedule_forget(now + ms(200), 1);
wheel.schedule_forget(now + ms(1000), 2);
let mut buf = Vec::new();
wheel.poll(now + ms(10), &mut buf);
assert_eq!(buf, vec![0]);
buf.clear();
wheel.poll(now + ms(250), &mut buf);
assert_eq!(buf, vec![1]);
buf.clear();
wheel.poll(now + ms(1500), &mut buf);
assert_eq!(buf, vec![2]);
assert!(wheel.is_empty());
}
#[test]
fn next_deadline_empty() {
let now = Instant::now();
let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
assert!(wheel.next_deadline().is_none());
}
#[test]
fn next_deadline_returns_earliest() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
wheel.schedule_forget(now + ms(100), 1);
wheel.schedule_forget(now + ms(50), 2);
wheel.schedule_forget(now + ms(200), 3);
let next = wheel.next_deadline().unwrap();
let delta = next.duration_since(now);
assert!(delta >= ms(49) && delta <= ms(51));
}
#[test]
fn deadline_in_the_past_fires_immediately() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
wheel.schedule_forget(now, 42);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(1), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![42]);
}
#[test]
fn deadline_beyond_max_range_clamped() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
wheel.poll(now + Duration::from_secs(100_001), &mut buf);
assert_eq!(buf, vec![99]);
let val = wheel.cancel(h);
assert_eq!(val, None);
}
#[test]
fn drop_cleans_up_active_entries() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
for i in 0..100 {
wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
}
assert_eq!(wheel.len(), 100);
drop(wheel);
}
#[test]
fn drop_with_outstanding_handles() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h1 = wheel.schedule(now + ms(10), 1);
let h2 = wheel.schedule(now + ms(20), 2);
wheel.free(h1);
wheel.free(h2);
drop(wheel);
}
#[test]
fn level_selection_boundaries() {
let now = Instant::now();
let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
assert_eq!(wheel.select_level(0), 0);
assert_eq!(wheel.select_level(63), 0);
assert_eq!(wheel.select_level(64), 1);
assert_eq!(wheel.select_level(511), 1);
assert_eq!(wheel.select_level(512), 2);
}
#[test]
fn cancel_after_time_advance() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(500), 42);
assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(400), &mut buf);
assert_eq!(fired, 0);
assert!(buf.is_empty());
let val = wheel.cancel(h);
assert_eq!(val, Some(42));
assert_eq!(wheel.len(), 0);
}
#[test]
fn multiple_entries_same_slot() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let mut handles = Vec::new();
for i in 0..5 {
handles.push(wheel.schedule(now + ms(10), i));
}
assert_eq!(wheel.len(), 5);
let v2 = wheel.cancel(handles.remove(2));
assert_eq!(v2, Some(2));
let v0 = wheel.cancel(handles.remove(0));
assert_eq!(v0, Some(0));
assert_eq!(wheel.len(), 3);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(20), &mut buf);
assert_eq!(fired, 3);
for h in handles {
let val = wheel.cancel(h);
assert_eq!(val, None); }
}
#[test]
fn entry_at_level_boundary() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(64), 99);
assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(63), &mut buf);
assert_eq!(fired, 0);
let fired = wheel.poll(now + ms(65), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![99]);
wheel.cancel(h);
}
#[test]
fn poll_with_limit_mixed_expiry() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
wheel.schedule_forget(now + ms(5), 1);
wheel.schedule_forget(now + ms(5), 2);
wheel.schedule_forget(now + ms(5), 3);
wheel.schedule_forget(now + ms(500), 4); wheel.schedule_forget(now + ms(500), 5); assert_eq!(wheel.len(), 5);
let mut buf = Vec::new();
let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
assert_eq!(fired, 2);
assert_eq!(wheel.len(), 3);
let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
assert_eq!(fired, 1);
assert_eq!(wheel.len(), 2);
assert_eq!(buf.len(), 3);
}
#[test]
fn reuse_after_full_drain() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
for i in 0..10 {
wheel.schedule_forget(now + ms(1), i);
}
let mut buf = Vec::new();
wheel.poll(now + ms(5), &mut buf);
assert_eq!(buf.len(), 10);
assert!(wheel.is_empty());
buf.clear();
for i in 10..20 {
wheel.schedule_forget(now + ms(100), i);
}
assert_eq!(wheel.len(), 10);
wheel.poll(now + ms(200), &mut buf);
assert_eq!(buf.len(), 10);
assert!(wheel.is_empty());
}
#[test]
fn all_levels_active() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
let mut handles: Vec<TimerHandle<u64>> = Vec::new();
for (i, &d) in distances.iter().enumerate() {
handles.push(wheel.schedule(now + ms(d), i as u64));
}
assert_eq!(wheel.len(), 7);
let order = [4, 1, 6, 0, 3, 5, 2];
let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
handles.into_iter().map(Some).collect();
for &idx in &order {
let h = opt_handles[idx].take().unwrap();
let val = wheel.cancel(h);
assert_eq!(val, Some(idx as u64));
}
assert!(wheel.is_empty());
}
#[test]
fn poll_values_match() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let expected: Vec<u64> = (100..110).collect();
for &v in &expected {
wheel.schedule_forget(now + ms(5), v);
}
let mut buf = Vec::new();
wheel.poll(now + ms(10), &mut buf);
buf.sort_unstable();
assert_eq!(buf, expected);
}
#[test]
fn reschedule_moves_deadline() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(100), 42);
assert_eq!(wheel.len(), 1);
let h = wheel.reschedule(h, now + ms(50));
assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(40), &mut buf);
assert_eq!(fired, 0);
let fired = wheel.poll(now + ms(55), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![42]);
wheel.cancel(h);
}
#[test]
fn reschedule_to_later() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(50), 7);
let h = wheel.reschedule(h, now + ms(200));
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(60), &mut buf);
assert_eq!(fired, 0);
let fired = wheel.poll(now + ms(210), &mut buf);
assert_eq!(fired, 1);
assert_eq!(buf, vec![7]);
wheel.cancel(h);
}
#[test]
#[should_panic(expected = "cannot reschedule a fired timer")]
fn reschedule_panics_on_zombie() {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let h = wheel.schedule(now + ms(10), 42);
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
let _h = wheel.reschedule(h, now + ms(100));
}
#[test]
fn custom_slots_per_level() {
let now = Instant::now();
let mut wheel: Wheel<u64> = WheelBuilder::default()
.slots_per_level(32)
.unbounded(256)
.build(now);
let h1 = wheel.schedule(now + ms(20), 1);
let h2 = wheel.schedule(now + ms(40), 2);
let mut buf = Vec::new();
wheel.poll(now + ms(25), &mut buf);
assert_eq!(buf, vec![1]);
buf.clear();
wheel.poll(now + ms(50), &mut buf);
assert_eq!(buf, vec![2]);
wheel.cancel(h1);
wheel.cancel(h2);
}
#[test]
fn custom_clk_shift() {
let now = Instant::now();
let mut wheel: Wheel<u64> = WheelBuilder::default()
.clk_shift(2)
.unbounded(256)
.build(now);
let h1 = wheel.schedule(now + ms(50), 1); let h2 = wheel.schedule(now + ms(100), 2);
let mut buf = Vec::new();
wheel.poll(now + ms(55), &mut buf);
assert_eq!(buf, vec![1]);
buf.clear();
wheel.poll(now + ms(110), &mut buf);
assert_eq!(buf, vec![2]);
wheel.cancel(h1);
wheel.cancel(h2);
}
#[test]
fn custom_num_levels() {
let now = Instant::now();
let mut wheel: Wheel<u64> = WheelBuilder::default()
.num_levels(3)
.unbounded(256)
.build(now);
let h = wheel.schedule(now + ms(3000), 42);
assert_eq!(wheel.len(), 1);
let mut buf = Vec::new();
wheel.poll(now + ms(3100), &mut buf);
assert_eq!(buf, vec![42]);
wheel.cancel(h);
}
#[test]
fn custom_tick_duration() {
let now = Instant::now();
let mut wheel: Wheel<u64> = WheelBuilder::default()
.tick_duration(Duration::from_micros(100))
.unbounded(256)
.build(now);
wheel.schedule_forget(now + ms(1), 1);
wheel.schedule_forget(now + ms(10), 2);
let mut buf = Vec::new();
wheel.poll(now + ms(2), &mut buf);
assert_eq!(buf, vec![1]);
buf.clear();
wheel.poll(now + ms(15), &mut buf);
assert_eq!(buf, vec![2]);
}
#[test]
fn bounded_custom_config() {
let now = Instant::now();
let mut wheel: BoundedWheel<u64> = WheelBuilder::default()
.slots_per_level(16)
.num_levels(4)
.bounded(8)
.build(now);
let mut handles = Vec::new();
for i in 0..8 {
handles.push(wheel.try_schedule(now + ms(i * 10 + 10), i).unwrap());
}
assert!(wheel.try_schedule(now + ms(100), 99).is_err());
wheel.cancel(handles.remove(0));
let h = wheel.try_schedule(now + ms(100), 99).unwrap();
handles.push(h);
for h in handles {
wheel.cancel(h);
}
}
#[test]
#[should_panic(expected = "slots_per_level must be <= 64")]
fn invalid_config_too_many_slots() {
let now = Instant::now();
WheelBuilder::default()
.slots_per_level(128)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
#[should_panic(expected = "num_levels must be > 0")]
fn invalid_config_zero_levels() {
let now = Instant::now();
WheelBuilder::default()
.num_levels(0)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
#[should_panic(expected = "num_levels must be <= 8")]
fn invalid_config_too_many_levels() {
let now = Instant::now();
WheelBuilder::default()
.num_levels(9)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
#[should_panic(expected = "clk_shift must be > 0")]
fn invalid_config_zero_shift() {
let now = Instant::now();
WheelBuilder::default()
.clk_shift(0)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
#[should_panic(expected = "tick_duration must be non-zero")]
fn invalid_config_zero_tick() {
let now = Instant::now();
WheelBuilder::default()
.tick_duration(Duration::ZERO)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
#[should_panic(expected = "overflow")]
fn invalid_config_shift_overflow() {
let now = Instant::now();
WheelBuilder::default()
.num_levels(8)
.clk_shift(9)
.unbounded(1024)
.build::<u64>(now);
}
#[test]
fn miri_schedule_cancel_drop_type() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
let h = wheel.schedule(now + ms(50), "hello".to_string());
let val = wheel.cancel(h);
assert_eq!(val, Some("hello".to_string()));
assert!(wheel.is_empty());
}
#[test]
fn miri_poll_fires_drop_type() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
wheel.schedule_forget(now + ms(10), "a".to_string());
wheel.schedule_forget(now + ms(10), "b".to_string());
wheel.schedule_forget(now + ms(10), "c".to_string());
let mut buf = Vec::new();
let fired = wheel.poll(now + ms(20), &mut buf);
assert_eq!(fired, 3);
assert_eq!(buf.len(), 3);
assert!(wheel.is_empty());
}
#[test]
fn miri_cancel_zombie_drop_type() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
let h = wheel.schedule(now + ms(10), "zombie".to_string());
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
assert_eq!(buf, vec!["zombie".to_string()]);
let val = wheel.cancel(h);
assert_eq!(val, None);
}
#[test]
fn miri_free_active_and_zombie() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
let h1 = wheel.schedule(now + ms(10), "active".to_string());
wheel.free(h1);
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
assert_eq!(buf, vec!["active".to_string()]);
let h2 = wheel.schedule(now + ms(10), "will-fire".to_string());
buf.clear();
wheel.poll(now + ms(20), &mut buf);
wheel.free(h2); }
#[test]
fn miri_reschedule_drop_type() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
let h = wheel.schedule(now + ms(100), "moveme".to_string());
let h = wheel.reschedule(h, now + ms(50));
let mut buf = Vec::new();
wheel.poll(now + ms(55), &mut buf);
assert_eq!(buf, vec!["moveme".to_string()]);
wheel.cancel(h);
}
#[test]
fn miri_dll_multi_entry_same_slot() {
let now = Instant::now();
let mut wheel: Wheel<Vec<u8>> = Wheel::unbounded(64, now);
let mut handles = Vec::new();
for i in 0..5 {
handles.push(wheel.schedule(now + ms(10), vec![i; 32]));
}
let v2 = wheel.cancel(handles.remove(2));
assert_eq!(v2.unwrap(), vec![2; 32]);
let v0 = wheel.cancel(handles.remove(0));
assert_eq!(v0.unwrap(), vec![0; 32]);
let mut buf = Vec::new();
wheel.poll(now + ms(20), &mut buf);
assert_eq!(buf.len(), 3);
for h in handles {
wheel.cancel(h);
}
}
#[test]
fn miri_drop_wheel_with_entries() {
let now = Instant::now();
let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
for i in 0..20 {
wheel.schedule_forget(now + ms(i * 100), format!("entry-{i}"));
}
assert_eq!(wheel.len(), 20);
drop(wheel);
}
#[test]
fn miri_bounded_lifecycle() {
let now = Instant::now();
let mut wheel: BoundedWheel<String> = BoundedWheel::bounded(4, now);
let h1 = wheel.try_schedule(now + ms(10), "a".to_string()).unwrap();
let h2 = wheel.try_schedule(now + ms(20), "b".to_string()).unwrap();
let h3 = wheel.try_schedule(now + ms(30), "c".to_string()).unwrap();
let h4 = wheel.try_schedule(now + ms(40), "d".to_string()).unwrap();
let err = wheel.try_schedule(now + ms(50), "e".to_string());
assert!(err.is_err());
wheel.cancel(h1);
let h5 = wheel.try_schedule(now + ms(50), "e".to_string()).unwrap();
let mut buf = Vec::new();
wheel.poll(now + ms(25), &mut buf);
wheel.cancel(h2);
wheel.free(h3);
wheel.free(h4);
wheel.free(h5);
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
use std::collections::HashSet;
use std::mem;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
enum Op {
Schedule { deadline_ms: u64 },
Cancel { idx: usize },
}
fn op_strategy() -> impl Strategy<Value = Op> {
prop_oneof![
(1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
any::<usize>().prop_map(|idx| Op::Cancel { idx }),
]
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[test]
fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
let mut handles: Vec<TimerHandle<u64>> = Vec::new();
let mut active_values: HashSet<u64> = HashSet::new();
let mut next_id: u64 = 0;
for op in &ops {
match op {
Op::Schedule { deadline_ms } => {
let id = next_id;
next_id += 1;
let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
handles.push(h);
active_values.insert(id);
}
Op::Cancel { idx } => {
if !handles.is_empty() {
let i = idx % handles.len();
let h = handles.swap_remove(i);
let val = wheel.cancel(h);
let v = val.unwrap();
assert!(active_values.remove(&v));
}
}
}
prop_assert_eq!(wheel.len(), active_values.len());
}
let mut buf = Vec::new();
wheel.poll(now + Duration::from_secs(100_000), &mut buf);
for h in handles {
mem::forget(h);
}
let fired_set: HashSet<u64> = buf.into_iter().collect();
prop_assert_eq!(fired_set, active_values);
prop_assert!(wheel.is_empty());
}
#[test]
fn fuzz_poll_timing(
deadlines in proptest::collection::vec(1u64..5000, 1..100),
poll_times in proptest::collection::vec(1u64..10_000, 1..20),
) {
let now = Instant::now();
let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
for (i, &d) in deadlines.iter().enumerate() {
wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
}
let mut sorted_times: Vec<u64> = poll_times;
sorted_times.sort_unstable();
sorted_times.dedup();
let mut all_fired: Vec<u64> = Vec::new();
for &t in &sorted_times {
let mut buf = Vec::new();
wheel.poll(now + Duration::from_millis(t), &mut buf);
for &id in &buf {
let deadline_ms = deadlines[id as usize];
prop_assert!(deadline_ms <= t,
"Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
}
all_fired.extend(buf);
}
let mut final_buf = Vec::new();
wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
all_fired.extend(final_buf);
all_fired.sort_unstable();
let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
prop_assert!(wheel.is_empty());
}
}
}