use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::term::Term;
const DEFAULT_BUCKETS: usize = 1024;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct TimerRef(u64);
impl TimerRef {
#[must_use]
pub const fn id(self) -> u64 {
self.0
}
#[must_use]
pub const fn from_id(id: u64) -> Self {
Self(id)
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TimerEntry {
pub target_pid: u64,
pub message: Term,
pub expires_at: Instant,
bucket: usize,
slot: usize,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct ExpiredTimer {
pub reference: TimerRef,
pub target_pid: u64,
pub message: Term,
pub expires_at: Instant,
}
#[derive(Debug)]
pub struct TimerWheel {
buckets: Vec<Vec<TimerRef>>,
entries: HashMap<TimerRef, TimerEntry>,
next_ref: u64,
start: Instant,
current_tick: u128,
}
impl TimerWheel {
#[must_use]
pub fn new() -> Self {
Self::with_bucket_count(DEFAULT_BUCKETS)
}
#[must_use]
pub fn with_bucket_count(bucket_count: usize) -> Self {
let bucket_count = bucket_count.max(1);
let buckets = (0..bucket_count).map(|_| Vec::new()).collect();
Self {
buckets,
entries: HashMap::new(),
next_ref: 1,
start: Instant::now(),
current_tick: 0,
}
}
pub fn schedule(&mut self, delay: Duration, target_pid: u64, message: Term) -> TimerRef {
self.schedule_at(Instant::now(), delay, target_pid, message)
}
pub fn reserve_reference(&mut self) -> TimerRef {
self.allocate_ref()
}
pub fn schedule_reserved(
&mut self,
reference: TimerRef,
delay: Duration,
target_pid: u64,
message: Term,
) -> Option<TimerRef> {
self.schedule_reserved_at(reference, Instant::now(), delay, target_pid, message)
}
pub fn schedule_reserved_at(
&mut self,
reference: TimerRef,
now: Instant,
delay: Duration,
target_pid: u64,
message: Term,
) -> Option<TimerRef> {
if self.entries.contains_key(&reference) {
return None;
}
if now < self.start {
self.start = now;
self.current_tick = 0;
}
let expires_at = now.checked_add(delay).unwrap_or(now);
let bucket = self.bucket_for(expires_at);
let slot = self.buckets[bucket].len();
self.buckets[bucket].push(reference);
self.entries.insert(
reference,
TimerEntry {
target_pid,
message,
expires_at,
bucket,
slot,
},
);
Some(reference)
}
pub fn schedule_at(
&mut self,
now: Instant,
delay: Duration,
target_pid: u64,
message: Term,
) -> TimerRef {
let reference = self.allocate_ref();
self.schedule_reserved_at(reference, now, delay, target_pid, message)
.unwrap_or(reference)
}
pub fn cancel(&mut self, reference: TimerRef) -> Option<Duration> {
self.cancel_at(reference, Instant::now())
}
pub fn cancel_at(&mut self, reference: TimerRef, now: Instant) -> Option<Duration> {
let entry = self.remove_entry(reference)?;
Some(entry.expires_at.saturating_duration_since(now))
}
pub fn tick(&mut self) -> Vec<ExpiredTimer> {
self.tick_at(Instant::now())
}
pub fn tick_at(&mut self, now: Instant) -> Vec<ExpiredTimer> {
let mut expired = Vec::new();
let target_tick = self.tick_for(now);
if target_tick < self.current_tick {
return expired;
}
while self.current_tick <= target_tick {
let bucket_index = (self.current_tick % self.buckets.len() as u128) as usize;
self.expire_bucket(bucket_index, now, &mut expired);
self.current_tick = self.current_tick.saturating_add(1);
}
expired
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
#[must_use]
pub fn get(&self, reference: TimerRef) -> Option<&TimerEntry> {
self.entries.get(&reference)
}
fn allocate_ref(&mut self) -> TimerRef {
let reference = TimerRef(self.next_ref);
self.next_ref = self.next_ref.checked_add(1).unwrap_or(1);
reference
}
fn bucket_for(&self, expires_at: Instant) -> usize {
(self.tick_for(expires_at) % self.buckets.len() as u128) as usize
}
fn tick_for(&self, instant: Instant) -> u128 {
instant.saturating_duration_since(self.start).as_millis()
}
fn expire_bucket(
&mut self,
bucket_index: usize,
now: Instant,
expired: &mut Vec<ExpiredTimer>,
) {
let mut slot = 0;
while slot < self.buckets[bucket_index].len() {
let reference = self.buckets[bucket_index][slot];
let Some(entry) = self.entries.get(&reference) else {
self.swap_remove_bucket_slot(bucket_index, slot);
continue;
};
if entry.expires_at <= now {
if let Some(entry) = self.remove_entry(reference) {
expired.push(ExpiredTimer {
reference,
target_pid: entry.target_pid,
message: entry.message,
expires_at: entry.expires_at,
});
}
} else {
slot += 1;
}
}
}
fn remove_entry(&mut self, reference: TimerRef) -> Option<TimerEntry> {
let entry = self.entries.remove(&reference)?;
self.swap_remove_bucket_slot(entry.bucket, entry.slot);
Some(entry)
}
fn swap_remove_bucket_slot(&mut self, bucket: usize, slot: usize) {
let Some(bucket_entries) = self.buckets.get_mut(bucket) else {
return;
};
if slot >= bucket_entries.len() {
return;
}
let moved = bucket_entries.swap_remove(slot);
if slot < bucket_entries.len() {
let replacement = bucket_entries[slot];
if let Some(entry) = self.entries.get_mut(&replacement) {
entry.slot = slot;
}
}
if let Some(entry) = self.entries.get_mut(&moved) {
entry.slot = slot;
}
}
}
impl Default for TimerWheel {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use super::TimerWheel;
use crate::atom::Atom;
use crate::term::Term;
#[test]
fn timer_schedule_and_tick_expire_due_timers() {
let start = Instant::now();
let mut wheel = TimerWheel::with_bucket_count(8);
let reference =
wheel.schedule_at(start, Duration::from_millis(10), 12, Term::atom(Atom::OK));
assert!(wheel.tick_at(start + Duration::from_millis(9)).is_empty());
let expired = wheel.tick_at(start + Duration::from_millis(10));
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].reference, reference);
assert_eq!(expired[0].target_pid, 12);
assert_eq!(expired[0].message, Term::atom(Atom::OK));
assert!(wheel.is_empty());
}
#[test]
fn timer_cancellation_is_constant_time_and_returns_remaining_time() {
let start = Instant::now();
let mut wheel = TimerWheel::with_bucket_count(4);
let reference = wheel.schedule_at(start, Duration::from_millis(100), 1, Term::small_int(1));
assert_eq!(
wheel.cancel_at(reference, start + Duration::from_millis(40)),
Some(Duration::from_millis(60))
);
assert_eq!(wheel.cancel_at(reference, start), None);
assert!(wheel.tick_at(start + Duration::from_millis(100)).is_empty());
}
#[test]
fn timer_cancellation_after_fire_returns_none() {
let start = Instant::now();
let mut wheel = TimerWheel::with_bucket_count(4);
let reference = wheel.schedule_at(start, Duration::from_millis(1), 1, Term::small_int(1));
assert_eq!(wheel.tick_at(start + Duration::from_millis(1)).len(), 1);
assert_eq!(
wheel.cancel_at(reference, start + Duration::from_millis(1)),
None
);
}
#[test]
fn timer_reserved_reference_cannot_be_scheduled_twice() {
let start = Instant::now();
let mut wheel = TimerWheel::with_bucket_count(4);
let reference = wheel.reserve_reference();
assert_eq!(
wheel.schedule_reserved_at(
reference,
start,
Duration::from_millis(10),
1,
Term::small_int(1),
),
Some(reference)
);
assert_eq!(
wheel.schedule_reserved_at(
reference,
start,
Duration::from_millis(20),
1,
Term::small_int(2),
),
None
);
let expired = wheel.tick_at(start + Duration::from_millis(20));
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].message, Term::small_int(1));
assert!(wheel.is_empty());
}
#[test]
fn timer_handles_ten_thousand_concurrent_timers() {
let start = Instant::now();
let mut wheel = TimerWheel::with_bucket_count(256);
for index in 0..10_000 {
wheel.schedule_at(
start,
Duration::from_millis(index % 100),
index,
Term::small_int(index as i64),
);
}
assert_eq!(wheel.len(), 10_000);
let expired = wheel.tick_at(start + Duration::from_millis(100));
assert_eq!(expired.len(), 10_000);
assert!(wheel.is_empty());
}
}