okee-wheel-timer 0.1.0

Deterministic hashed wheel timer with keyed deduplication.
Documentation
use std::collections::HashMap;
use std::hash::Hash;

use crate::event::{Event, EventId};
use crate::time_wheel::{HashedWheelTimer, TimeWheelError};

use super::types::{KeyedScheduleResult, KeyedUpdateResult};

#[derive(Debug)]
/// Keyed wrapper over [`HashedWheelTimer`].
///
/// A dedup key is produced by `dedup_key_fn`.
/// Scheduling/updating an event with an existing key replaces the previous one.
pub struct KeyedHashedWheelTimer<T, K, F>
where
    K: Eq + Hash + Clone,
    F: Fn(&T) -> K,
{
    dedup_key_fn: F,
    by_key: HashMap<K, EventId>,
    wheel: HashedWheelTimer<T>,
}

impl<T, K, F> KeyedHashedWheelTimer<T, K, F>
where
    K: Eq + Hash + Clone,
    F: Fn(&T) -> K,
{
    /// Creates a keyed timer.
    pub fn new(buckets_num: usize, dedup_key_fn: F) -> Self {
        Self {
            wheel: HashedWheelTimer::new(buckets_num),
            dedup_key_fn,
            by_key: HashMap::new(),
        }
    }

    /// Returns total number of scheduled events.
    pub fn count_all(&self) -> usize {
        self.wheel.count_all()
    }

    /// Returns number of events in bucket by index.
    pub fn count_in_bucket(&self, bucket_index: usize) -> Result<usize, TimeWheelError> {
        self.wheel.count_in_bucket(bucket_index)
    }

    /// Returns `true` when no events are scheduled.
    pub fn is_empty(&self) -> bool {
        self.wheel.is_empty()
    }

    /// Returns `true` when bucket contains no events.
    pub fn is_empty_bucket(&self, bucket_index: usize) -> Result<bool, TimeWheelError> {
        self.wheel.is_empty_bucket(bucket_index)
    }

    /// Returns `true` if current tick still has events to pop.
    pub fn has_events_in_current_tick(&self) -> bool {
        self.wheel.has_events_in_current_tick()
    }

    /// Returns current absolute tick.
    pub fn curr_tick(&self) -> u64 {
        self.wheel.curr_tick()
    }

    /// Returns current bucket index.
    pub fn curr_bucket(&self) -> usize {
        self.wheel.curr_bucket()
    }

    /// Returns current wave (`delta_tick`) in the current tick.
    pub fn curr_delta_tick(&self) -> u64 {
        self.wheel.curr_delta_tick()
    }

    /// Returns latest assigned sequence ID.
    pub fn curr_seq_id(&self) -> EventId {
        self.wheel.curr_seq_id()
    }

    /// Advances timer by one tick.
    pub fn step(&mut self) {
        self.wheel.step()
    }

    /// Clears all events and key index, then resets timer state.
    pub fn reset(&mut self) {
        self.wheel.reset();
        self.by_key.clear();
    }

    /// Returns `true` if event with ID exists.
    pub fn contains_by_id(&self, id: EventId) -> bool {
        self.wheel.contains(id)
    }

    /// Returns `true` if key exists.
    pub fn contains_by_key(&self, key: &K) -> bool {
        self.by_key.contains_key(key)
    }

    /// Returns event ID by key.
    pub fn id_by_key(&self, key: &K) -> Option<EventId> {
        self.by_key.get(key).copied()
    }

    /// Returns event by ID.
    pub fn get(&self, id: EventId) -> Option<&Event<T>> {
        self.wheel.get(id)
    }

    /// Returns event by key.
    pub fn get_by_key(&self, key: &K) -> Option<&Event<T>> {
        let id = self.id_by_key(key)?;
        self.wheel.get(id)
    }

    /// Schedules an event.
    ///
    /// If key already exists, previous event is replaced.
    pub fn schedule(&mut self, on_tick: u64, data: T) -> KeyedScheduleResult {
        let key = (self.dedup_key_fn)(&data);
        let replaced_id = self.by_key.get(&key).copied();
        if let Some(old_id) = replaced_id {
            let _ = self.wheel.remove(old_id);
        }

        let result = self.wheel.schedule(on_tick, data);
        self.by_key.insert(key, result.id);

        KeyedScheduleResult {
            id: result.id,
            replaced_id,
        }
    }

    /// Removes event by ID and updates key index.
    pub fn remove(&mut self, id: EventId) -> Option<Event<T>> {
        let event = self.wheel.remove(id)?;
        let key = (self.dedup_key_fn)(event.data());
        if self.by_key.get(&key).copied() == Some(id) {
            self.by_key.remove(&key);
        }
        Some(event)
    }

    /// Removes event by dedup key.
    pub fn remove_by_key(&mut self, key: &K) -> Option<Event<T>> {
        let id = self.by_key.get(key).copied()?;
        let event = self.wheel.remove(id)?;
        self.by_key.remove(key);
        Some(event)
    }

    /// Updates event by ID and applies key replacement semantics.
    pub fn update(&mut self, id: EventId, on_tick: u64, data: T) -> Option<KeyedUpdateResult> {
        let old_key = {
            let old_event = self.wheel.get(id)?;
            (self.dedup_key_fn)(old_event.data())
        };

        if self.by_key.get(&old_key).copied() == Some(id) {
            self.by_key.remove(&old_key);
        }

        let new_key = (self.dedup_key_fn)(&data);
        let replaced_id = self.by_key.get(&new_key).copied();
        if let Some(other_id) = replaced_id {
            let _ = self.wheel.remove(other_id);
            self.by_key.remove(&new_key);
        }

        let result = self.wheel.update(id, on_tick, data)?;
        self.by_key.insert(new_key, result.id);

        Some(KeyedUpdateResult {
            id: result.id,
            replaced_id,
        })
    }

    /// Updates event by key.
    pub fn update_by_key(&mut self, key: &K, on_tick: u64, data: T) -> Option<KeyedUpdateResult> {
        let id = self.id_by_key(key)?;
        self.update(id, on_tick, data)
    }

    /// Reschedules event by ID preserving payload and key.
    pub fn reschedule(&mut self, id: EventId, on_tick: u64) -> Option<KeyedUpdateResult> {
        let key = {
            let event = self.wheel.get(id)?;
            (self.dedup_key_fn)(event.data())
        };

        let result = self.wheel.reschedule(id, on_tick)?;
        self.by_key.insert(key, result.id);

        Some(KeyedUpdateResult {
            id: result.id,
            replaced_id: None,
        })
    }

    /// Reschedules event by key.
    pub fn reschedule_by_key(&mut self, key: &K, on_tick: u64) -> Option<KeyedUpdateResult> {
        let id = self.id_by_key(key)?;
        self.reschedule(id, on_tick)
    }

    /// Pops one current-tick wave and keeps key index in sync.
    pub fn pop_events(&mut self) -> Vec<Event<T>> {
        let events = self.wheel.pop_events();
        for event in &events {
            let key = (self.dedup_key_fn)(event.data());
            if self.by_key.get(&key).copied() == Some(event.id()) {
                self.by_key.remove(&key);
            }
        }
        events
    }
}