subms-timer-wheel 0.5.2

submillisecond.com cookbook recipe - concurrency: subms-timer-wheel. Single-level hashed timer wheel with O(1) schedule and cancel.
Documentation
//! Hierarchical timer wheel (HHW). Three levels, each a wheel of
//! 64 slots: seconds, minutes (each slot = 64 ticks), hours (each
//! slot = 64*64 ticks). A timer scheduled `d` ticks out lands on the
//! coarsest wheel whose slot can hold it; on each tick of a higher
//! wheel we cascade its expiring slot's entries down to the lower
//! wheel re-binned at the residual offset.
//!
//! Capacity: 64 * 64 * 64 = 262_144 ticks per "day" (loose analogy).
//! Long delays no longer cost a no-op revolution per `mask` ticks the
//! way the base wheel does; they sit on the coarse wheel and get
//! cascaded down only as their fire time approaches.
//!
//! Memory: 3 * 64 = 192 buckets total regardless of how many timers
//! are scheduled - the buckets hold Vecs of entries, not a per-tick
//! slot. Compare with the base single-level wheel which needs a slot
//! count >= max-delay for O(1) firing.

const LEVELS: usize = 3;
const SLOTS: usize = 64;
const MASK: usize = SLOTS - 1;
const LEVEL_SHIFT: [u32; LEVELS] = [0, 6, 12];
const LEVEL_RANGE: [usize; LEVELS] = [
    SLOTS,                 // level 0: 1..=64 ticks
    SLOTS * SLOTS,         // level 1: 65..=4096 ticks
    SLOTS * SLOTS * SLOTS, // level 2: 4097..=262_144 ticks
];

struct Entry<V> {
    id: u64,
    deadline: u64,
    value: Option<V>,
    cancelled: bool,
}

pub struct HierarchicalTimerWheel<V> {
    /// Three wheels of 64 buckets each.
    wheels: [[Vec<Entry<V>>; SLOTS]; LEVELS],
    /// Monotonically increasing tick counter. The level-i slot for
    /// `t` is `(t >> LEVEL_SHIFT[i]) & MASK`.
    now: u64,
    next_id: u64,
    /// Counts cascade events (entries moved from a coarser wheel down
    /// to a finer one). Useful for diagnostics; doubles as a stable
    /// hook for the metrics feature.
    cascades: u64,
}

impl<V> HierarchicalTimerWheel<V> {
    pub fn new() -> Self {
        // const-init a 3x64 array of empty Vecs. The repeat-with shape
        // avoids requiring V: Clone.
        let wheels = std::array::from_fn(|_| std::array::from_fn(|_| Vec::new()));
        Self {
            wheels,
            now: 0,
            next_id: 1,
            cascades: 0,
        }
    }

    pub fn now(&self) -> u64 {
        self.now
    }

    pub fn cascades(&self) -> u64 {
        self.cascades
    }

    /// Max delay (in ticks) the wheel can place without overflowing the
    /// coarsest level. Schedules beyond this cap are rejected by
    /// [`Self::try_schedule`] and clamped by [`Self::schedule`].
    pub const fn max_delay() -> usize {
        LEVEL_RANGE[LEVELS - 1]
    }

    /// Schedule `value` to fire in `delay` ticks. Delays larger than
    /// [`Self::max_delay`] are clamped to the cap; use
    /// [`Self::try_schedule`] for explicit overflow handling.
    pub fn schedule(&mut self, delay: u64, value: V) -> u64 {
        let cap = Self::max_delay() as u64;
        let d = delay.min(cap.saturating_sub(1));
        self.try_schedule(d, value).expect("clamped delay fits")
    }

    pub fn try_schedule(&mut self, delay: u64, value: V) -> Option<u64> {
        if delay >= Self::max_delay() as u64 {
            return None;
        }
        let deadline = self.now + delay;
        let id = self.next_id;
        self.next_id += 1;
        let entry = Entry {
            id,
            deadline,
            value: Some(value),
            cancelled: false,
        };
        let (lvl, slot) = self.bucket_for(deadline);
        self.wheels[lvl][slot].push(entry);
        Some(id)
    }

    /// Mark `id` cancelled. Returns true if a pending entry was found.
    /// O(n) over the entries in the entry's bucket; the tradeoff vs
    /// the base wheel (which keeps an id->slot map) is that the
    /// hierarchical wheel moves entries on cascade, so an id->slot map
    /// would need to be patched on every cascade. Linear sweep on
    /// cancel is the cheaper deal.
    pub fn cancel(&mut self, id: u64) -> bool {
        for lvl in 0..LEVELS {
            for slot in 0..SLOTS {
                for e in &mut self.wheels[lvl][slot] {
                    if e.id == id && !e.cancelled {
                        e.cancelled = true;
                        e.value = None;
                        return true;
                    }
                }
            }
        }
        false
    }

    /// Advance one tick. Returns the values of all timers whose
    /// deadline equals the new `now`. Cascade from coarser wheels
    /// down to finer wheels as needed.
    pub fn tick(&mut self) -> Vec<V> {
        self.now += 1;
        // Cascade higher levels whose slot is about to roll over.
        // The slot index at level L wraps every `LEVEL_RANGE[L]`
        // ticks; when the lower-level index wraps to 0, the next
        // higher level's slot has new contents to push down.
        // Walk highest-to-lowest so a level-2 entry cascading down
        // to level 1 still has time to re-cascade to level 0 on the
        // same tick when its deadline is now.
        for lvl in (1..LEVELS).rev() {
            let lower_period = 1u64 << LEVEL_SHIFT[lvl];
            if self.now % lower_period == 0 {
                let slot = ((self.now >> LEVEL_SHIFT[lvl]) as usize) & MASK;
                // Move entries from wheels[lvl][slot] down to their
                // correct lower-level slot now that we're closer in time.
                let entries = std::mem::take(&mut self.wheels[lvl][slot]);
                for e in entries {
                    if e.cancelled {
                        continue;
                    }
                    self.cascades += 1;
                    let (new_lvl, new_slot) = self.bucket_for(e.deadline);
                    self.wheels[new_lvl][new_slot].push(e);
                }
            }
        }

        let slot = (self.now as usize) & MASK;
        let entries = std::mem::take(&mut self.wheels[0][slot]);
        let mut fired = Vec::new();
        for mut e in entries {
            if e.cancelled || e.deadline != self.now {
                // deadline != now happens when an entry was rebinned
                // into a level-0 slot but its deadline is still a
                // full revolution away. With only LEVEL_RANGE[0]=64
                // residual room on level 0 this is never the case
                // after a cascade, but the guard makes the code
                // tolerant to future tuning.
                continue;
            }
            if let Some(v) = e.value.take() {
                fired.push(v);
            }
        }
        fired
    }

    /// Pick the coarsest level whose slot range contains `deadline -
    /// now`, then the slot within that level.
    fn bucket_for(&self, deadline: u64) -> (usize, usize) {
        let diff = deadline.saturating_sub(self.now);
        let lvl = if diff < LEVEL_RANGE[0] as u64 {
            0
        } else if diff < LEVEL_RANGE[1] as u64 {
            1
        } else {
            2
        };
        let slot = ((deadline >> LEVEL_SHIFT[lvl]) as usize) & MASK;
        (lvl, slot)
    }
}

impl<V> Default for HierarchicalTimerWheel<V> {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn short_delay_fires_on_correct_tick() {
        let mut w: HierarchicalTimerWheel<&'static str> = HierarchicalTimerWheel::new();
        w.schedule(5, "a");
        for _ in 0..4 {
            assert!(w.tick().is_empty());
        }
        assert_eq!(w.tick(), vec!["a"]);
    }

    #[test]
    fn cascade_boundary_64_ticks_fires_correctly() {
        // 64 ticks lands on level 1 (range >=64). After 64 ticks of
        // ticking, exactly one cascade event must have moved it to
        // level 0 and it must fire on the 64th tick.
        let mut w: HierarchicalTimerWheel<u32> = HierarchicalTimerWheel::new();
        w.schedule(64, 7);
        let mut fired_at: Option<u64> = None;
        for i in 1..=70 {
            let fired = w.tick();
            if !fired.is_empty() {
                assert_eq!(fired, vec![7]);
                fired_at = Some(i);
                break;
            }
        }
        assert_eq!(fired_at, Some(64));
        assert!(w.cascades() >= 1, "expected at least one cascade event");
    }

    #[test]
    fn cascade_boundary_4096_ticks_fires_correctly() {
        // 4096 = SLOTS*SLOTS - on the upper edge of level 1 / lower of level 2.
        let mut w: HierarchicalTimerWheel<u32> = HierarchicalTimerWheel::new();
        w.schedule(4096, 42);
        let mut fired_at: Option<u64> = None;
        for i in 1..=4100 {
            if !w.tick().is_empty() {
                fired_at = Some(i);
                break;
            }
        }
        assert_eq!(fired_at, Some(4096));
        // The entry started on level 2 and cascaded toward level 0
        // before firing - at least one cascade event recorded.
        assert!(w.cascades() >= 1);
    }

    #[test]
    fn cancel_before_fire_drops_value() {
        let mut w: HierarchicalTimerWheel<&'static str> = HierarchicalTimerWheel::new();
        let id = w.schedule(10, "doomed");
        assert!(w.cancel(id));
        for _ in 0..20 {
            assert!(w.tick().is_empty());
        }
    }

    #[test]
    fn cancel_after_fire_returns_false() {
        let mut w: HierarchicalTimerWheel<&'static str> = HierarchicalTimerWheel::new();
        let id = w.schedule(2, "x");
        w.tick();
        let fired = w.tick();
        assert_eq!(fired, vec!["x"]);
        assert!(!w.cancel(id), "cancel after fire must return false");
    }

    #[test]
    fn cancel_unknown_id_returns_false() {
        let mut w: HierarchicalTimerWheel<()> = HierarchicalTimerWheel::new();
        assert!(!w.cancel(99_999));
    }

    #[test]
    fn long_delay_uses_coarse_wheel_then_cascades() {
        // Half-day-ish delay - lands deep on level 2, must cascade
        // down through level 1 and level 0 to fire.
        let delay: u64 = 5000;
        let mut w: HierarchicalTimerWheel<u32> = HierarchicalTimerWheel::new();
        w.schedule(delay, 1);
        let mut found = None;
        for i in 1..=(delay + 5) {
            if !w.tick().is_empty() {
                found = Some(i);
                break;
            }
        }
        assert_eq!(found, Some(delay));
    }

    #[test]
    fn overflow_delay_rejected_by_try_schedule() {
        let mut w: HierarchicalTimerWheel<u32> = HierarchicalTimerWheel::new();
        let too_big = HierarchicalTimerWheel::<u32>::max_delay() as u64;
        assert!(w.try_schedule(too_big, 1).is_none());
        assert!(w.try_schedule(too_big - 1, 1).is_some());
    }

    #[test]
    fn many_timers_fire_at_correct_distinct_ticks() {
        let mut w: HierarchicalTimerWheel<u32> = HierarchicalTimerWheel::new();
        for d in 1u32..=200 {
            w.schedule(d as u64, d);
        }
        let mut seen_total = 0;
        for i in 1..=200 {
            let fired = w.tick();
            for v in &fired {
                assert_eq!(*v, i as u32, "expected delay {i} to fire on tick {i}");
            }
            seen_total += fired.len();
        }
        assert_eq!(seen_total, 200);
    }

    #[test]
    fn cascades_counter_zero_for_short_delays() {
        let mut w: HierarchicalTimerWheel<u32> = HierarchicalTimerWheel::new();
        w.schedule(3, 1);
        w.tick();
        w.tick();
        w.tick();
        assert_eq!(w.cascades(), 0);
    }
}