okee-wheel-timer 0.1.0

Deterministic hashed wheel timer with keyed deduplication.
Documentation
use std::collections::HashMap;

use crate::event::{Event, EventId};

use super::types::{ScheduleResult, TimeWheelError, UpdateResult};

type Bucket<T> = HashMap<EventId, Event<T>>;

#[derive(Debug, Clone, Copy)]
struct IdEntry {
    bucket_index: usize,
}

#[derive(Debug)]
/// Single-level hashed wheel timer indexed by event ID.
///
/// This timer is deterministic:
/// - Events are grouped by `(tick, delta_tick)` waves.
/// - Events in the same wave are popped in ascending `event_id`.
///
/// `on_tick` passed to [`schedule`](Self::schedule) is clamped to `curr_tick`,
/// so scheduling into the past places the event on the current tick.
pub struct HashedWheelTimer<T> {
    curr_tick: u64,
    curr_bucket: usize,
    curr_delta_tick: u64,
    curr_sequence_id: EventId,
    buckets: Vec<Bucket<T>>,
    by_id: HashMap<EventId, IdEntry>,
}

impl<T> HashedWheelTimer<T> {
    /// Creates a timer with a fixed number of buckets.
    ///
    /// `buckets_num` must be greater than `0`.
    pub fn new(buckets_num: usize) -> Self {
        assert!(buckets_num > 0, "buckets_num must be greater than 0");

        Self {
            curr_tick: 0,
            curr_bucket: 0,
            curr_delta_tick: 0,
            curr_sequence_id: 0,
            buckets: build_buckets(buckets_num),
            by_id: HashMap::new(),
        }
    }

    /// Returns the total number of scheduled events.
    pub fn count_all(&self) -> usize {
        self.by_id.len()
    }

    /// Returns number of events currently stored in a bucket.
    ///
    /// This is a bucket-level view, not limited to `curr_tick`.
    pub fn count_in_bucket(&self, bucket_index: usize) -> Result<usize, TimeWheelError> {
        self.buckets
            .get(bucket_index)
            .map(|bucket| bucket.len())
            .ok_or(TimeWheelError::InvalidBucketIndex {
                index: bucket_index,
                buckets: self.buckets.len(),
            })
    }

    /// Returns `true` when no events are scheduled.
    pub fn is_empty(&self) -> bool {
        self.by_id.is_empty()
    }

    /// Returns `true` when a bucket contains no events.
    pub fn is_empty_bucket(&self, bucket_index: usize) -> Result<bool, TimeWheelError> {
        self.count_in_bucket(bucket_index).map(|count| count == 0)
    }

    /// Returns `true` if the current tick still has at least one wave to pop.
    pub fn has_events_in_current_tick(&self) -> bool {
        self.find_min_delta_tick().is_some()
    }

    /// Returns current absolute tick.
    pub fn curr_tick(&self) -> u64 {
        self.curr_tick
    }

    /// Returns current bucket index.
    pub fn curr_bucket(&self) -> usize {
        self.curr_bucket
    }

    /// Returns current wave (`delta_tick`) inside `curr_tick`.
    pub fn curr_delta_tick(&self) -> u64 {
        self.curr_delta_tick
    }

    /// Returns the latest assigned sequence ID.
    pub fn curr_seq_id(&self) -> EventId {
        self.curr_sequence_id
    }

    /// Schedules a new event and returns its ID.
    ///
    /// If `on_tick < curr_tick`, the event is scheduled on `curr_tick`.
    pub fn schedule(&mut self, on_tick: u64, data: T) -> ScheduleResult {
        self.curr_sequence_id += 1;
        self.insert(self.curr_sequence_id, on_tick, data)
    }

    /// Returns `true` if an event with `id` exists.
    pub fn contains(&self, id: EventId) -> bool {
        self.by_id.contains_key(&id)
    }

    /// Returns event by ID.
    pub fn get(&self, id: EventId) -> Option<&Event<T>> {
        let meta = self.by_id.get(&id)?;
        self.buckets[meta.bucket_index].get(&id)
    }

    /// Removes and returns an event by ID.
    pub fn remove(&mut self, id: EventId) -> Option<Event<T>> {
        self.remove_internal(id)
    }

    /// Replaces event payload and scheduled tick by ID.
    pub fn update(&mut self, id: EventId, on_tick: u64, data: T) -> Option<UpdateResult> {
        self.remove_internal(id)?;
        let inserted = self.insert(id, on_tick, data);
        Some(UpdateResult { id: inserted.id })
    }

    /// Moves existing event to another tick preserving payload.
    pub fn reschedule(&mut self, id: EventId, on_tick: u64) -> Option<UpdateResult> {
        let old_event = self.remove_internal(id)?;
        let inserted = self.insert(id, on_tick, old_event.into_data());
        Some(UpdateResult { id: inserted.id })
    }

    /// Pops a single wave of events for current tick.
    ///
    /// Returns empty vector when current tick has no remaining events.
    pub fn pop_events(&mut self) -> Vec<Event<T>> {
        let Some(min_delta) = self.find_min_delta_tick() else {
            return Vec::new();
        };

        self.curr_delta_tick = min_delta;
        self.drain_wave()
    }

    /// Advances timer by one tick and moves to next bucket.
    pub fn step(&mut self) {
        self.curr_tick += 1;
        self.curr_bucket = (self.curr_bucket + 1) % self.buckets.len();
        self.curr_delta_tick = 0;
    }

    /// Clears all events and resets timer state.
    pub fn reset(&mut self) {
        self.curr_tick = 0;
        self.curr_bucket = 0;
        self.curr_delta_tick = 0;
        self.curr_sequence_id = 0;
        self.buckets = build_buckets(self.buckets.len());
        self.by_id.clear();
    }

    fn insert(&mut self, event_id: EventId, on_tick: u64, data: T) -> ScheduleResult {
        let tick = on_tick.max(self.curr_tick);
        let delta_tick = if tick == self.curr_tick {
            self.curr_delta_tick + 1
        } else {
            0
        };

        let bucket_index = self.bucket_index(tick);
        let event = Event::new(event_id, tick, delta_tick, data);

        self.buckets[bucket_index].insert(event_id, event);
        self.by_id.insert(event_id, IdEntry { bucket_index });

        ScheduleResult { id: event_id }
    }

    fn remove_internal(&mut self, id: EventId) -> Option<Event<T>> {
        let meta = self.by_id.remove(&id)?;
        self.buckets[meta.bucket_index].remove(&id)
    }

    fn find_min_delta_tick(&self) -> Option<u64> {
        self.buckets[self.curr_bucket]
            .values()
            .filter_map(|event| (event.tick() == self.curr_tick).then_some(event.delta_tick()))
            .min()
    }

    fn drain_wave(&mut self) -> Vec<Event<T>> {
        let mut event_ids: Vec<EventId> = self.buckets[self.curr_bucket]
            .iter()
            .filter_map(|(id, event)| {
                (event.tick() == self.curr_tick && event.delta_tick() == self.curr_delta_tick)
                    .then_some(*id)
            })
            .collect();

        event_ids.sort_unstable();

        event_ids
            .into_iter()
            .filter_map(|id| self.remove_internal(id))
            .collect()
    }

    fn bucket_index(&self, tick: u64) -> usize {
        (tick % self.buckets.len() as u64) as usize
    }
}

fn build_buckets<T>(buckets_num: usize) -> Vec<Bucket<T>> {
    let mut buckets = Vec::with_capacity(buckets_num);
    for _ in 0..buckets_num {
        buckets.push(HashMap::new());
    }
    buckets
}