Skip to main content

nexus_timer/
wheel.rs

1//! Timer wheel — the main data structure.
2//!
3//! `TimerWheel<T, S>` is a multi-level, no-cascade timer wheel. Entries are
4//! placed into a level based on how far in the future their deadline is.
5//! Once placed, an entry never moves — poll checks `deadline_ticks <= now`
6//! per entry.
7
8use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, Slot, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore};
18
19// =============================================================================
20// WheelBuilder (typestate)
21// =============================================================================
22
23/// Builder for configuring a timer wheel.
24///
25/// Defaults match the Linux kernel timer wheel (1ms tick, 64 slots/level,
26/// 8x multiplier, 7 levels → ~4.7 hour range).
27///
28/// # Examples
29///
30/// ```
31/// use std::time::{Duration, Instant};
32/// use nexus_timer::{Wheel, WheelBuilder};
33///
34/// let now = Instant::now();
35///
36/// // All defaults
37/// let wheel: Wheel<u64> = WheelBuilder::default().unbounded(4096).build(now);
38///
39/// // Custom config
40/// let wheel: Wheel<u64> = WheelBuilder::default()
41///     .tick_duration(Duration::from_micros(100))
42///     .slots_per_level(32)
43///     .unbounded(4096)
44///     .build(now);
45/// ```
46#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48    tick_duration: Duration,
49    slots_per_level: usize,
50    clk_shift: u32,
51    num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55    fn default() -> Self {
56        WheelBuilder {
57            tick_duration: Duration::from_millis(1),
58            slots_per_level: 64,
59            clk_shift: 3,
60            num_levels: 7,
61        }
62    }
63}
64
65impl WheelBuilder {
66    /// Creates a new builder with default configuration.
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Sets the tick duration. Default: 1ms.
72    pub fn tick_duration(mut self, d: Duration) -> Self {
73        self.tick_duration = d;
74        self
75    }
76
77    /// Sets the number of slots per level. Must be a power of 2. Default: 64.
78    pub fn slots_per_level(mut self, n: usize) -> Self {
79        self.slots_per_level = n;
80        self
81    }
82
83    /// Sets the bit shift between levels (multiplier = 2^clk_shift). Default: 3 (8x).
84    pub fn clk_shift(mut self, s: u32) -> Self {
85        self.clk_shift = s;
86        self
87    }
88
89    /// Sets the number of levels. Default: 7.
90    pub fn num_levels(mut self, n: usize) -> Self {
91        self.num_levels = n;
92        self
93    }
94
95    /// Transitions to an unbounded wheel builder.
96    ///
97    /// `chunk_capacity` is the slab chunk size (entries per chunk). The slab
98    /// grows by adding new chunks as needed.
99    pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100        UnboundedWheelBuilder {
101            config: self,
102            chunk_capacity,
103        }
104    }
105
106    /// Transitions to a bounded wheel builder.
107    ///
108    /// `capacity` is the maximum number of concurrent timers.
109    pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110        BoundedWheelBuilder {
111            config: self,
112            capacity,
113        }
114    }
115
116    fn validate(&self) {
117        assert!(
118            self.slots_per_level.is_power_of_two(),
119            "slots_per_level must be a power of 2, got {}",
120            self.slots_per_level
121        );
122        assert!(
123            self.slots_per_level <= 64,
124            "slots_per_level must be <= 64 (u64 bitmask), got {}",
125            self.slots_per_level
126        );
127        assert!(self.num_levels > 0, "num_levels must be > 0");
128        assert!(
129            self.num_levels <= 8,
130            "num_levels must be <= 8 (u8 bitmask), got {}",
131            self.num_levels
132        );
133        assert!(self.clk_shift > 0, "clk_shift must be > 0");
134        assert!(
135            !self.tick_duration.is_zero(),
136            "tick_duration must be non-zero"
137        );
138        let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139        assert!(
140            max_shift < 64,
141            "(num_levels - 1) * clk_shift must be < 64, got {}",
142            max_shift
143        );
144        let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145        assert!(
146            slots_log2 + max_shift < 64,
147            "slots_per_level << max_shift would overflow u64"
148        );
149    }
150
151    fn tick_ns(&self) -> u64 {
152        self.tick_duration.as_nanos() as u64
153    }
154}
155
156/// Terminal builder for an unbounded timer wheel.
157///
158/// Created via [`WheelBuilder::unbounded`]. The only method is `.build()`.
159#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161    config: WheelBuilder,
162    chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166    /// Builds the unbounded timer wheel.
167    ///
168    /// # Panics
169    ///
170    /// Panics if the configuration is invalid (non-power-of-2 slots, zero
171    /// levels, zero clk_shift, or zero tick duration).
172    pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173        self.config.validate();
174        // SAFETY: Timer wheel is single-threaded (!Send, !Sync). All slots
175        // are freed via Slot::from_raw() + slab.free() before the wheel drops.
176        // The slab is never shared across threads.
177        let slab = unsafe { unbounded::Slab::with_chunk_capacity(self.chunk_capacity) };
178        let levels = build_levels::<T>(&self.config);
179        TimerWheel {
180            slab,
181            num_levels: self.config.num_levels,
182            levels,
183            current_ticks: 0,
184            tick_ns: self.config.tick_ns(),
185            epoch: now,
186            active_levels: 0,
187            len: 0,
188            _marker: PhantomData,
189        }
190    }
191}
192
193/// Terminal builder for a bounded timer wheel.
194///
195/// Created via [`WheelBuilder::bounded`]. The only method is `.build()`.
196#[derive(Debug)]
197pub struct BoundedWheelBuilder {
198    config: WheelBuilder,
199    capacity: usize,
200}
201
202impl BoundedWheelBuilder {
203    /// Builds the bounded timer wheel.
204    ///
205    /// # Panics
206    ///
207    /// Panics if the configuration is invalid (non-power-of-2 slots, zero
208    /// levels, zero clk_shift, or zero tick duration).
209    pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
210        self.config.validate();
211        // SAFETY: Timer wheel is single-threaded (!Send, !Sync). All slots
212        // are freed via Slot::from_raw() + slab.free() before the wheel drops.
213        // The slab is never shared across threads.
214        let slab = unsafe { bounded::Slab::with_capacity(self.capacity) };
215        let levels = build_levels::<T>(&self.config);
216        TimerWheel {
217            slab,
218            num_levels: self.config.num_levels,
219            levels,
220            current_ticks: 0,
221            tick_ns: self.config.tick_ns(),
222            epoch: now,
223            active_levels: 0,
224            len: 0,
225            _marker: PhantomData,
226        }
227    }
228}
229
230// =============================================================================
231// TimerWheel
232// =============================================================================
233
234/// A multi-level, no-cascade timer wheel.
235///
236/// Generic over:
237/// - `T` — the user payload stored with each timer.
238/// - `S` — the slab storage backend. Defaults to `unbounded::Slab`.
239///
240/// # Thread Safety
241///
242/// `Send` but `!Sync`. Can be moved to a thread at setup but must not
243/// be shared. All internal raw pointers point into owned allocations
244/// (slab chunks, level slot arrays) — moving the wheel moves the heap
245/// data with it.
246pub struct TimerWheel<
247    T: 'static,
248    S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
249> {
250    slab: S,
251    levels: Vec<Level<T>>,
252    num_levels: usize,
253    active_levels: u8,
254    current_ticks: u64,
255    tick_ns: u64,
256    epoch: Instant,
257    len: usize,
258    _marker: PhantomData<*const ()>, // !Send (overridden below), !Sync
259}
260
261// SAFETY: TimerWheel<T, S> exclusively owns all memory behind its raw pointers.
262//
263// Pointer inventory and ownership:
264// - Slot `entry_head`/`entry_tail` — point into slab-owned memory (SlotCell
265//   in a slab chunk). Slab chunks are Vec<SlotCell<T>> heap allocations.
266// - DLL links (`WheelEntry::prev`, `WheelEntry::next`) — point to other
267//   SlotCells in the same slab.
268// - `Level::slots` — `Box<[WheelSlot<T>]>`, owned by the level.
269//
270// All pointed-to memory lives inside owned collections (Vec, Box<[T]>).
271// When TimerWheel is moved, the heap allocations stay at their addresses —
272// the internal pointers remain valid. No thread-local state. No shared
273// ownership.
274//
275// T: Send is required because timer values cross the thread boundary with
276// the wheel.
277//
278// S is NOT required to be Send. Slab types are !Send (raw pointers, Cell)
279// but the wheel exclusively owns its slab — no shared access, no aliasing.
280// Moving the wheel moves the slab; heap allocations stay at their addresses
281// so internal pointers remain valid.
282//
283// Outstanding TimerHandle<T> values are !Send and cannot follow the wheel
284// across threads. They become inert — consuming them requires &mut
285// TimerWheel which the original thread no longer has. The debug_assert in
286// TimerHandle::drop catches this as a programming error. Worst case is a
287// slot leak (refcount stuck at 1), not unsoundness.
288#[allow(clippy::non_send_fields_in_send_ty)]
289unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
290
291/// A timer wheel backed by a fixed-capacity slab.
292pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
293
294/// A timer wheel backed by a growable slab.
295pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
296
297// =============================================================================
298// Construction
299// =============================================================================
300
301impl<T: 'static> Wheel<T> {
302    /// Creates an unbounded timer wheel with default configuration.
303    ///
304    /// For custom configuration, use [`WheelBuilder`].
305    pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
306        WheelBuilder::default().unbounded(chunk_capacity).build(now)
307    }
308}
309
310impl<T: 'static> BoundedWheel<T> {
311    /// Creates a bounded timer wheel with default configuration.
312    ///
313    /// For custom configuration, use [`WheelBuilder`].
314    pub fn bounded(capacity: usize, now: Instant) -> Self {
315        WheelBuilder::default().bounded(capacity).build(now)
316    }
317}
318
319fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
320    (0..config.num_levels)
321        .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
322        .collect()
323}
324
325// =============================================================================
326// Schedule
327// =============================================================================
328
329impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
330    /// Schedules a timer and returns a handle for cancellation.
331    ///
332    /// The handle must be consumed via [`cancel`](Self::cancel) or
333    /// [`free`](Self::free). Dropping it is a programming error.
334    ///
335    /// # Panics
336    ///
337    /// Panics if the backing slab is at capacity (bounded slabs only).
338    /// This is a capacity planning error — size your wheel for peak load.
339    pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
340        let deadline_ticks = self.instant_to_ticks(deadline);
341        let entry = WheelEntry::new(deadline_ticks, value, 2);
342        let slot = self.slab.alloc(entry);
343        let ptr = slot.into_raw();
344        self.insert_entry(ptr, deadline_ticks);
345        self.len += 1;
346        TimerHandle::new(ptr)
347    }
348
349    /// Schedules a fire-and-forget timer (no handle returned).
350    ///
351    /// The timer will fire during poll and the value will be collected.
352    /// Cannot be cancelled.
353    ///
354    /// # Panics
355    ///
356    /// Panics if the backing slab is at capacity (bounded slabs only).
357    /// This is a capacity planning error — size your wheel for peak load.
358    pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
359        let deadline_ticks = self.instant_to_ticks(deadline);
360        let entry = WheelEntry::new(deadline_ticks, value, 1);
361        let slot = self.slab.alloc(entry);
362        let ptr = slot.into_raw();
363        self.insert_entry(ptr, deadline_ticks);
364        self.len += 1;
365    }
366}
367
368// =============================================================================
369// Schedule — fallible (bounded slabs only)
370// =============================================================================
371
372impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
373    /// Attempts to schedule a timer, returning a handle on success.
374    ///
375    /// Returns `Err(Full(value))` if the slab is at capacity. Use this
376    /// when you need graceful error handling. For the common case where
377    /// capacity exhaustion is fatal, use [`schedule`](Self::schedule).
378    pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
379        let deadline_ticks = self.instant_to_ticks(deadline);
380        let entry = WheelEntry::new(deadline_ticks, value, 2);
381        match self.slab.try_alloc(entry) {
382            Ok(slot) => {
383                let ptr = slot.into_raw();
384                self.insert_entry(ptr, deadline_ticks);
385                self.len += 1;
386                Ok(TimerHandle::new(ptr))
387            }
388            Err(full) => {
389                // Extract the user's T from the WheelEntry wrapper
390                // SAFETY: we just constructed this entry, take_value is valid
391                let wheel_entry = full.into_inner();
392                let value = unsafe { wheel_entry.take_value() }
393                    .expect("entry was just constructed with Some(value)");
394                Err(Full(value))
395            }
396        }
397    }
398
399    /// Attempts to schedule a fire-and-forget timer.
400    ///
401    /// Returns `Err(Full(value))` if the slab is at capacity. Use this
402    /// when you need graceful error handling. For the common case where
403    /// capacity exhaustion is fatal, use [`schedule_forget`](Self::schedule_forget).
404    pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
405        let deadline_ticks = self.instant_to_ticks(deadline);
406        let entry = WheelEntry::new(deadline_ticks, value, 1);
407        match self.slab.try_alloc(entry) {
408            Ok(slot) => {
409                let ptr = slot.into_raw();
410                self.insert_entry(ptr, deadline_ticks);
411                self.len += 1;
412                Ok(())
413            }
414            Err(full) => {
415                let wheel_entry = full.into_inner();
416                let value = unsafe { wheel_entry.take_value() }
417                    .expect("entry was just constructed with Some(value)");
418                Err(Full(value))
419            }
420        }
421    }
422}
423
424// =============================================================================
425// Cancel / Free / Poll / Query — generic over any store
426// =============================================================================
427
428impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
429    /// Cancels a timer and returns its value.
430    ///
431    /// - If the timer is still active: unlinks from the wheel, extracts value,
432    ///   frees the slab entry. Returns `Some(T)`.
433    /// - If the timer already fired (zombie handle): frees the slab entry.
434    ///   Returns `None`.
435    ///
436    /// Consumes the handle (no Drop runs).
437    pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
438        let ptr = handle.ptr;
439        // Consume handle without running Drop
440        mem::forget(handle);
441
442        // SAFETY: handle guarantees ptr is valid and allocated from our slab.
443        let entry = unsafe { entry_ref(ptr) };
444        let refs = entry.refs();
445
446        if refs == 2 {
447            // Active timer with handle — unlink, extract, free
448            let value = unsafe { entry.take_value() };
449            self.remove_entry(ptr);
450            self.len -= 1;
451            // SAFETY: ptr was allocated from our slab via into_raw()
452            self.slab.free(unsafe { Slot::from_raw(ptr) });
453            value
454        } else {
455            // refs == 1 means the wheel already fired this (zombie).
456            // The fire path decremented 2→1 and left the entry for us to free.
457            debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
458            // SAFETY: ptr was allocated from our slab via into_raw()
459            self.slab.free(unsafe { Slot::from_raw(ptr) });
460            None
461        }
462    }
463
464    /// Releases a timer handle without cancelling.
465    ///
466    /// - If the timer is still active: converts to fire-and-forget (refs 2→1).
467    ///   Timer stays in the wheel and will fire normally during poll.
468    /// - If the timer already fired (zombie): frees the slab entry (refs 1→0).
469    ///
470    /// Consumes the handle (no Drop runs).
471    pub fn free(&mut self, handle: TimerHandle<T>) {
472        let ptr = handle.ptr;
473        mem::forget(handle);
474
475        // SAFETY: handle guarantees ptr is valid
476        let entry = unsafe { entry_ref(ptr) };
477        let new_refs = entry.dec_refs();
478
479        if new_refs == 0 {
480            // Was a zombie (fired already, refs was 1) — free the entry
481            // SAFETY: ptr was allocated from our slab via into_raw()
482            self.slab.free(unsafe { Slot::from_raw(ptr) });
483        }
484        // new_refs == 1: timer is now fire-and-forget, stays in wheel
485    }
486
487    /// Reschedules an active timer to a new deadline.
488    ///
489    /// Moves the entry from its current slot to the correct slot for
490    /// `new_deadline` without extracting or reconstructing the value.
491    ///
492    /// # Panics
493    ///
494    /// Panics if the timer has already fired (zombie handle). Only active
495    /// timers (refs == 2) can be rescheduled.
496    ///
497    /// Consumes and returns a new handle (same entry, new position).
498    pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
499        let ptr = handle.ptr;
500        mem::forget(handle);
501
502        // SAFETY: handle guarantees ptr is valid
503        let entry = unsafe { entry_ref(ptr) };
504        assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
505
506        // Remove from current position
507        self.remove_entry(ptr);
508
509        // Update deadline and reinsert
510        let new_ticks = self.instant_to_ticks(new_deadline);
511        entry.set_deadline_ticks(new_ticks);
512        self.insert_entry(ptr, new_ticks);
513
514        TimerHandle::new(ptr)
515    }
516
517    /// Fires all expired timers, collecting their values into `buf`.
518    ///
519    /// Returns the number of timers fired.
520    pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
521        self.poll_with_limit(now, usize::MAX, buf)
522    }
523
524    /// Fires expired timers up to `limit`, collecting values into `buf`.
525    ///
526    /// Resumable: if the limit is hit, the next call continues where this one
527    /// left off (as long as `now` hasn't changed).
528    ///
529    /// Returns the number of timers fired in this call.
530    pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
531        let now_ticks = self.instant_to_ticks(now);
532        self.current_ticks = now_ticks;
533
534        let mut fired = 0;
535        let mut mask = self.active_levels;
536
537        while mask != 0 && fired < limit {
538            let lvl_idx = mask.trailing_zeros() as usize;
539            mask &= mask - 1; // clear lowest set bit
540            fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
541        }
542        fired
543    }
544
545    /// Returns the `Instant` of the next timer that will fire, or `None` if empty.
546    ///
547    /// Walks only active (non-empty) slots. O(active_slots) in the worst case,
548    /// but typically very fast because most slots are empty.
549    pub fn next_deadline(&self) -> Option<Instant> {
550        let mut min_ticks: Option<u64> = None;
551
552        let mut lvl_mask = self.active_levels;
553        while lvl_mask != 0 {
554            let lvl_idx = lvl_mask.trailing_zeros() as usize;
555            lvl_mask &= lvl_mask - 1;
556
557            let level = &self.levels[lvl_idx];
558            let mut slot_mask = level.active_slots();
559            while slot_mask != 0 {
560                let slot_idx = slot_mask.trailing_zeros() as usize;
561                slot_mask &= slot_mask - 1;
562
563                let slot = level.slot(slot_idx);
564                let mut entry_ptr = slot.entry_head();
565
566                while !entry_ptr.is_null() {
567                    // SAFETY: entry_ptr is in this slot's DLL
568                    let entry = unsafe { entry_ref(entry_ptr) };
569                    let dt = entry.deadline_ticks();
570                    min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
571                    entry_ptr = entry.next();
572                }
573            }
574        }
575
576        min_ticks.map(|t| self.ticks_to_instant(t))
577    }
578
579    /// Returns the number of timers currently in the wheel.
580    #[inline]
581    pub fn len(&self) -> usize {
582        self.len
583    }
584
585    /// Returns true if the wheel contains no timers.
586    #[inline]
587    pub fn is_empty(&self) -> bool {
588        self.len == 0
589    }
590
591    // =========================================================================
592    // Internal: tick conversion
593    // =========================================================================
594
595    #[inline]
596    fn instant_to_ticks(&self, instant: Instant) -> u64 {
597        // Saturate at 0 for instants before epoch
598        let dur = instant.saturating_duration_since(self.epoch);
599        dur.as_nanos() as u64 / self.tick_ns
600    }
601
602    #[inline]
603    fn ticks_to_instant(&self, ticks: u64) -> Instant {
604        self.epoch + Duration::from_nanos(ticks * self.tick_ns)
605    }
606
607    // =========================================================================
608    // Internal: level selection
609    // =========================================================================
610
611    /// Selects the appropriate level for a deadline.
612    ///
613    /// Walks levels from finest to coarsest, picking the first level whose
614    /// range can represent the delta. Clamps to the highest level if the
615    /// deadline exceeds the wheel's total range.
616    #[inline]
617    fn select_level(&self, deadline_ticks: u64) -> usize {
618        let delta = deadline_ticks.saturating_sub(self.current_ticks);
619
620        for (i, level) in self.levels.iter().enumerate() {
621            if delta < level.range() {
622                return i;
623            }
624        }
625
626        // Beyond max range — clamp to highest level
627        self.num_levels - 1
628    }
629
630    // =========================================================================
631    // Internal: entry insertion into a level's slot
632    // =========================================================================
633
634    /// Inserts an entry into the appropriate level and slot.
635    ///
636    /// Records the level and slot index on the entry so `remove_entry` can
637    /// find it without recomputing (which would be unsound after time advances).
638    #[inline]
639    #[allow(clippy::needless_pass_by_ref_mut)]
640    fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
641        let lvl_idx = self.select_level(deadline_ticks);
642        let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
643
644        // Record location on the entry for O(1) lookup at cancel time.
645        // SAFETY: entry_ptr is valid (just allocated)
646        let entry = unsafe { entry_ref(entry_ptr) };
647        entry.set_location(lvl_idx as u8, slot_idx as u16);
648
649        // SAFETY: entry_ptr is valid (just allocated), not in any DLL yet
650        unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
651
652        // Activate slot and level (idempotent — OR is a no-op if already set)
653        self.levels[lvl_idx].activate_slot(slot_idx);
654        self.active_levels |= 1 << lvl_idx;
655    }
656
657    /// Removes an entry from its level's slot DLL.
658    ///
659    /// Reads the stored level and slot index from the entry (set at insertion
660    /// time). Does NOT recompute from delta — that would be unsound after
661    /// `current_ticks` advances.
662    #[inline]
663    #[allow(clippy::needless_pass_by_ref_mut)]
664    fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
665        // SAFETY: entry_ptr is valid (caller guarantee)
666        let entry = unsafe { entry_ref(entry_ptr) };
667
668        let lvl_idx = entry.level() as usize;
669        let slot_idx = entry.slot_idx() as usize;
670
671        // SAFETY: entry_ptr is in this slot's DLL (invariant from insert_entry)
672        unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
673
674        if self.levels[lvl_idx].slot(slot_idx).is_empty() {
675            self.levels[lvl_idx].deactivate_slot(slot_idx);
676            if !self.levels[lvl_idx].is_active() {
677                self.active_levels &= !(1 << lvl_idx);
678            }
679        }
680    }
681
682    // =========================================================================
683    // Internal: fire an entry
684    // =========================================================================
685
686    /// Fires a single entry: extracts value, decrements refcount, possibly frees.
687    ///
688    /// Returns `Some(T)` if the value was still present (not already cancelled).
689    #[inline]
690    fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
691        // SAFETY: entry_ptr is valid (we're walking the DLL)
692        let entry = unsafe { entry_ref(entry_ptr) };
693
694        // Extract value
695        // SAFETY: single-threaded
696        let value = unsafe { entry.take_value() };
697
698        let new_refs = entry.dec_refs();
699        if new_refs == 0 {
700            // Fire-and-forget (was refs=1) — free the slab entry immediately
701            // SAFETY: entry_ptr was allocated from our slab via into_raw()
702            self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
703        }
704        // new_refs == 1: handle exists (was refs=2), entry becomes zombie.
705        // Handle holder will free via cancel() or free().
706
707        self.len -= 1;
708        value
709    }
710
711    // =========================================================================
712    // Internal: poll a single level
713    // =========================================================================
714
715    /// Polls a single level for expired entries up to `limit`.
716    ///
717    fn poll_level(
718        &mut self,
719        lvl_idx: usize,
720        now_ticks: u64,
721        limit: usize,
722        buf: &mut Vec<T>,
723    ) -> usize {
724        let mut fired = 0;
725        let mut mask = self.levels[lvl_idx].active_slots();
726
727        while mask != 0 && fired < limit {
728            let slot_idx = mask.trailing_zeros() as usize;
729            mask &= mask - 1;
730
731            let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
732            // SAFETY: slot_ptr points into self.levels[lvl_idx].slots
733            // (Box<[WheelSlot<T>]>), a stable heap allocation. fire_entry
734            // only mutates self.slab and self.len, not self.levels.
735            let slot = unsafe { &*slot_ptr };
736            let mut entry_ptr = slot.entry_head();
737
738            while !entry_ptr.is_null() && fired < limit {
739                // SAFETY: entry_ptr is in this slot's DLL
740                let entry = unsafe { entry_ref(entry_ptr) };
741                let next_entry = entry.next();
742
743                if entry.deadline_ticks() <= now_ticks {
744                    unsafe { slot.remove_entry(entry_ptr) };
745
746                    if let Some(value) = self.fire_entry(entry_ptr) {
747                        buf.push(value);
748                    }
749                    fired += 1;
750                }
751
752                entry_ptr = next_entry;
753            }
754
755            if slot.is_empty() {
756                self.levels[lvl_idx].deactivate_slot(slot_idx);
757            }
758        }
759
760        // Deactivate level if all slots drained
761        if !self.levels[lvl_idx].is_active() {
762            self.active_levels &= !(1 << lvl_idx);
763        }
764
765        fired
766    }
767}
768
769// =============================================================================
770// Drop
771// =============================================================================
772
773impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
774    fn drop(&mut self) {
775        // Walk active levels and slots via bitmasks, free every entry.
776        let mut lvl_mask = self.active_levels;
777        while lvl_mask != 0 {
778            let lvl_idx = lvl_mask.trailing_zeros() as usize;
779            lvl_mask &= lvl_mask - 1;
780
781            let level = &self.levels[lvl_idx];
782            let mut slot_mask = level.active_slots();
783            while slot_mask != 0 {
784                let slot_idx = slot_mask.trailing_zeros() as usize;
785                slot_mask &= slot_mask - 1;
786
787                let slot = level.slot(slot_idx);
788                let mut entry_ptr = slot.entry_head();
789                while !entry_ptr.is_null() {
790                    // SAFETY: entry_ptr is in this slot's DLL
791                    let entry = unsafe { entry_ref(entry_ptr) };
792                    let next_entry = entry.next();
793
794                    // SAFETY: entry_ptr was allocated from our slab via into_raw()
795                    self.slab.free(unsafe { Slot::from_raw(entry_ptr) });
796
797                    entry_ptr = next_entry;
798                }
799            }
800        }
801    }
802}
803
804// =============================================================================
805// Tests
806// =============================================================================
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811    use std::time::{Duration, Instant};
812
813    fn ms(millis: u64) -> Duration {
814        Duration::from_millis(millis)
815    }
816
817    // -------------------------------------------------------------------------
818    // Thread safety
819    // -------------------------------------------------------------------------
820
821    fn assert_send<T: Send>() {}
822
823    #[test]
824    fn wheel_is_send() {
825        assert_send::<Wheel<u64>>();
826        assert_send::<BoundedWheel<u64>>();
827    }
828
829    // -------------------------------------------------------------------------
830    // Construction
831    // -------------------------------------------------------------------------
832
833    #[test]
834    fn default_config() {
835        let now = Instant::now();
836        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
837        assert!(wheel.is_empty());
838        assert_eq!(wheel.len(), 0);
839    }
840
841    #[test]
842    fn bounded_construction() {
843        let now = Instant::now();
844        let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
845        assert!(wheel.is_empty());
846    }
847
848    #[test]
849    #[should_panic(expected = "slots_per_level must be a power of 2")]
850    fn invalid_config_non_power_of_two() {
851        let now = Instant::now();
852        WheelBuilder::default()
853            .slots_per_level(65)
854            .unbounded(1024)
855            .build::<u64>(now);
856    }
857
858    // -------------------------------------------------------------------------
859    // Schedule + Cancel
860    // -------------------------------------------------------------------------
861
862    #[test]
863    fn schedule_and_cancel() {
864        let now = Instant::now();
865        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
866
867        let h = wheel.schedule(now + ms(50), 42);
868        assert_eq!(wheel.len(), 1);
869
870        let val = wheel.cancel(h);
871        assert_eq!(val, Some(42));
872        assert_eq!(wheel.len(), 0);
873    }
874
875    #[test]
876    fn schedule_forget_fires() {
877        let now = Instant::now();
878        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
879
880        wheel.schedule_forget(now + ms(10), 99);
881        assert_eq!(wheel.len(), 1);
882
883        let mut buf = Vec::new();
884        let fired = wheel.poll(now + ms(20), &mut buf);
885        assert_eq!(fired, 1);
886        assert_eq!(buf, vec![99]);
887        assert_eq!(wheel.len(), 0);
888    }
889
890    #[test]
891    fn cancel_after_fire_returns_none() {
892        let now = Instant::now();
893        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
894
895        let h = wheel.schedule(now + ms(10), 42);
896
897        let mut buf = Vec::new();
898        wheel.poll(now + ms(20), &mut buf);
899        assert_eq!(buf, vec![42]);
900
901        // Handle is now a zombie
902        let val = wheel.cancel(h);
903        assert_eq!(val, None);
904    }
905
906    #[test]
907    fn free_active_timer_becomes_fire_and_forget() {
908        let now = Instant::now();
909        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
910
911        let h = wheel.schedule(now + ms(10), 42);
912        wheel.free(h); // releases handle, timer stays
913        assert_eq!(wheel.len(), 1);
914
915        let mut buf = Vec::new();
916        wheel.poll(now + ms(20), &mut buf);
917        assert_eq!(buf, vec![42]);
918        assert_eq!(wheel.len(), 0);
919    }
920
921    #[test]
922    fn free_zombie_handle() {
923        let now = Instant::now();
924        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
925
926        let h = wheel.schedule(now + ms(10), 42);
927
928        let mut buf = Vec::new();
929        wheel.poll(now + ms(20), &mut buf);
930
931        // Handle is zombie, free should clean up
932        wheel.free(h);
933    }
934
935    // -------------------------------------------------------------------------
936    // Bounded wheel
937    // -------------------------------------------------------------------------
938
939    #[test]
940    fn bounded_full() {
941        let now = Instant::now();
942        let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
943
944        let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
945        let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
946
947        let err = wheel.try_schedule(now + ms(30), 3);
948        assert!(err.is_err());
949        let recovered = err.unwrap_err().into_inner();
950        assert_eq!(recovered, 3);
951
952        // Cancel one, should have room
953        wheel.cancel(h1);
954        let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
955
956        // Clean up handles
957        wheel.free(h2);
958        wheel.free(h3);
959    }
960
961    #[test]
962    fn bounded_schedule_forget_full() {
963        let now = Instant::now();
964        let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
965
966        wheel.try_schedule_forget(now + ms(10), 1).unwrap();
967        let err = wheel.try_schedule_forget(now + ms(20), 2);
968        assert!(err.is_err());
969    }
970
971    // -------------------------------------------------------------------------
972    // Poll
973    // -------------------------------------------------------------------------
974
975    #[test]
976    fn poll_respects_deadline() {
977        let now = Instant::now();
978        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
979
980        wheel.schedule_forget(now + ms(10), 1);
981        wheel.schedule_forget(now + ms(50), 2);
982        wheel.schedule_forget(now + ms(100), 3);
983
984        let mut buf = Vec::new();
985
986        // At 20ms: only timer 1 should fire
987        let fired = wheel.poll(now + ms(20), &mut buf);
988        assert_eq!(fired, 1);
989        assert_eq!(buf, vec![1]);
990        assert_eq!(wheel.len(), 2);
991
992        // At 60ms: timer 2 fires
993        buf.clear();
994        let fired = wheel.poll(now + ms(60), &mut buf);
995        assert_eq!(fired, 1);
996        assert_eq!(buf, vec![2]);
997
998        // At 200ms: timer 3 fires
999        buf.clear();
1000        let fired = wheel.poll(now + ms(200), &mut buf);
1001        assert_eq!(fired, 1);
1002        assert_eq!(buf, vec![3]);
1003
1004        assert!(wheel.is_empty());
1005    }
1006
1007    #[test]
1008    fn poll_with_limit() {
1009        let now = Instant::now();
1010        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1011
1012        for i in 0..10 {
1013            wheel.schedule_forget(now + ms(1), i);
1014        }
1015
1016        let mut buf = Vec::new();
1017
1018        // Fire 3 at a time
1019        let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1020        assert_eq!(fired, 3);
1021        assert_eq!(wheel.len(), 7);
1022
1023        let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1024        assert_eq!(fired, 3);
1025        assert_eq!(wheel.len(), 4);
1026
1027        // Fire remaining
1028        let fired = wheel.poll(now + ms(5), &mut buf);
1029        assert_eq!(fired, 4);
1030        assert!(wheel.is_empty());
1031        assert_eq!(buf.len(), 10);
1032    }
1033
1034    // -------------------------------------------------------------------------
1035    // Multi-level
1036    // -------------------------------------------------------------------------
1037
1038    #[test]
1039    fn timers_across_levels() {
1040        let now = Instant::now();
1041        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1042
1043        // Level 0: 0-63ms
1044        wheel.schedule_forget(now + ms(5), 0);
1045        // Level 1: 64-511ms
1046        wheel.schedule_forget(now + ms(200), 1);
1047        // Level 2: 512-4095ms
1048        wheel.schedule_forget(now + ms(1000), 2);
1049
1050        let mut buf = Vec::new();
1051
1052        wheel.poll(now + ms(10), &mut buf);
1053        assert_eq!(buf, vec![0]);
1054
1055        buf.clear();
1056        wheel.poll(now + ms(250), &mut buf);
1057        assert_eq!(buf, vec![1]);
1058
1059        buf.clear();
1060        wheel.poll(now + ms(1500), &mut buf);
1061        assert_eq!(buf, vec![2]);
1062
1063        assert!(wheel.is_empty());
1064    }
1065
1066    // -------------------------------------------------------------------------
1067    // next_deadline
1068    // -------------------------------------------------------------------------
1069
1070    #[test]
1071    fn next_deadline_empty() {
1072        let now = Instant::now();
1073        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1074        assert!(wheel.next_deadline().is_none());
1075    }
1076
1077    #[test]
1078    fn next_deadline_returns_earliest() {
1079        let now = Instant::now();
1080        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1081
1082        wheel.schedule_forget(now + ms(100), 1);
1083        wheel.schedule_forget(now + ms(50), 2);
1084        wheel.schedule_forget(now + ms(200), 3);
1085
1086        let next = wheel.next_deadline().unwrap();
1087        // Should be close to now + 50ms (within tick granularity)
1088        let delta = next.duration_since(now);
1089        assert!(delta >= ms(49) && delta <= ms(51));
1090    }
1091
1092    // -------------------------------------------------------------------------
1093    // Deadline in the past
1094    // -------------------------------------------------------------------------
1095
1096    #[test]
1097    fn deadline_in_the_past_fires_immediately() {
1098        let now = Instant::now();
1099        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1100
1101        // Schedule at epoch (which is "now" at construction)
1102        wheel.schedule_forget(now, 42);
1103
1104        let mut buf = Vec::new();
1105        let fired = wheel.poll(now + ms(1), &mut buf);
1106        assert_eq!(fired, 1);
1107        assert_eq!(buf, vec![42]);
1108    }
1109
1110    // -------------------------------------------------------------------------
1111    // Deadline beyond max range — clamped
1112    // -------------------------------------------------------------------------
1113
1114    #[test]
1115    fn deadline_beyond_max_range_clamped() {
1116        let now = Instant::now();
1117        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1118
1119        // Way in the future — should clamp to highest level
1120        let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1121        assert_eq!(wheel.len(), 1);
1122
1123        // Won't fire at any reasonable time but will fire when enough ticks pass
1124        let mut buf = Vec::new();
1125        wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1126        assert_eq!(buf, vec![99]);
1127
1128        // Note: handle was already consumed by the poll (fire-and-forget path won't
1129        // apply since refs=2). Actually the handle still exists. Let's clean up.
1130        // The timer already fired, so cancel returns None.
1131        // Actually buf got the value, which means it fired. But handle still needs cleanup.
1132        // We already pushed the value so we need to handle the zombie.
1133        // Wait — we used schedule (refs=2), poll fired it (refs 2→1 zombie), handle `h` exists.
1134        // Actually we consumed it with the poll — no we didn't, we still have `h`.
1135
1136        // h is a zombie handle now
1137        let val = wheel.cancel(h);
1138        assert_eq!(val, None);
1139    }
1140
1141    // -------------------------------------------------------------------------
1142    // Drop
1143    // -------------------------------------------------------------------------
1144
1145    #[test]
1146    fn drop_cleans_up_active_entries() {
1147        let now = Instant::now();
1148        let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1149
1150        for i in 0..100 {
1151            wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1152        }
1153
1154        assert_eq!(wheel.len(), 100);
1155        // Drop should free all entries without leaking
1156        drop(wheel);
1157    }
1158
1159    #[test]
1160    fn drop_with_outstanding_handles() {
1161        let now = Instant::now();
1162        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1163
1164        // Schedule but DON'T cancel — just free the handles
1165        let h1 = wheel.schedule(now + ms(10), 1);
1166        let h2 = wheel.schedule(now + ms(20), 2);
1167
1168        // Free the handles (convert to fire-and-forget) so they don't debug_assert
1169        wheel.free(h1);
1170        wheel.free(h2);
1171
1172        // Drop the wheel — should clean up the entries
1173        drop(wheel);
1174    }
1175
1176    // -------------------------------------------------------------------------
1177    // Level selection
1178    // -------------------------------------------------------------------------
1179
1180    #[test]
1181    fn level_selection_boundaries() {
1182        let now = Instant::now();
1183        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1184
1185        // Level 0: delta < 64
1186        assert_eq!(wheel.select_level(0), 0);
1187        assert_eq!(wheel.select_level(63), 0);
1188
1189        // Level 1: 64 <= delta < 512
1190        assert_eq!(wheel.select_level(64), 1);
1191        assert_eq!(wheel.select_level(511), 1);
1192
1193        // Level 2: 512 <= delta < 4096
1194        assert_eq!(wheel.select_level(512), 2);
1195    }
1196
1197    // -------------------------------------------------------------------------
1198    // Bug fix validation: cancel after time advance
1199    // -------------------------------------------------------------------------
1200
1201    #[test]
1202    fn cancel_after_time_advance() {
1203        // The critical bug: schedule at T+500ms (level 2, delta=500 ticks),
1204        // poll at T+400ms (no fire, but current_ticks advances to 400),
1205        // cancel at T+400ms. Old code would recompute delta = 500-400 = 100
1206        // → level 1. But the entry is in level 2. Stored location fixes this.
1207        let now = Instant::now();
1208        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1209
1210        let h = wheel.schedule(now + ms(500), 42);
1211        assert_eq!(wheel.len(), 1);
1212
1213        // Advance time — timer doesn't fire (deadline is 500ms)
1214        let mut buf = Vec::new();
1215        let fired = wheel.poll(now + ms(400), &mut buf);
1216        assert_eq!(fired, 0);
1217        assert!(buf.is_empty());
1218
1219        // Cancel after time advance — must find the entry in the correct slot
1220        let val = wheel.cancel(h);
1221        assert_eq!(val, Some(42));
1222        assert_eq!(wheel.len(), 0);
1223    }
1224
1225    // -------------------------------------------------------------------------
1226    // Same-slot entries
1227    // -------------------------------------------------------------------------
1228
1229    #[test]
1230    fn multiple_entries_same_slot() {
1231        let now = Instant::now();
1232        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1233
1234        // All 5 timers at the same deadline → same slot
1235        let mut handles = Vec::new();
1236        for i in 0..5 {
1237            handles.push(wheel.schedule(now + ms(10), i));
1238        }
1239        assert_eq!(wheel.len(), 5);
1240
1241        // Cancel the middle ones
1242        let v2 = wheel.cancel(handles.remove(2));
1243        assert_eq!(v2, Some(2));
1244        let v0 = wheel.cancel(handles.remove(0));
1245        assert_eq!(v0, Some(0));
1246        assert_eq!(wheel.len(), 3);
1247
1248        // Poll — remaining 3 should fire
1249        let mut buf = Vec::new();
1250        let fired = wheel.poll(now + ms(20), &mut buf);
1251        assert_eq!(fired, 3);
1252
1253        // Clean up zombie handles
1254        for h in handles {
1255            let val = wheel.cancel(h);
1256            assert_eq!(val, None); // already fired
1257        }
1258    }
1259
1260    // -------------------------------------------------------------------------
1261    // Level boundary
1262    // -------------------------------------------------------------------------
1263
1264    #[test]
1265    fn entry_at_level_boundary() {
1266        // Default config: level 0 range = 64 ticks (64ms).
1267        // A deadline at exactly tick 64 should go to level 1, not level 0.
1268        let now = Instant::now();
1269        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1270
1271        let h = wheel.schedule(now + ms(64), 99);
1272        assert_eq!(wheel.len(), 1);
1273
1274        // Should NOT fire at 63ms
1275        let mut buf = Vec::new();
1276        let fired = wheel.poll(now + ms(63), &mut buf);
1277        assert_eq!(fired, 0);
1278
1279        // Should fire at 64ms
1280        let fired = wheel.poll(now + ms(65), &mut buf);
1281        assert_eq!(fired, 1);
1282        assert_eq!(buf, vec![99]);
1283
1284        // Clean up zombie handle
1285        wheel.cancel(h);
1286    }
1287
1288    // -------------------------------------------------------------------------
1289    // Bookmark/resumption with mixed expiry
1290    // -------------------------------------------------------------------------
1291
1292    #[test]
1293    fn poll_with_limit_mixed_expiry() {
1294        let now = Instant::now();
1295        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1296
1297        // 3 expired at poll time, 2 not
1298        wheel.schedule_forget(now + ms(5), 1);
1299        wheel.schedule_forget(now + ms(5), 2);
1300        wheel.schedule_forget(now + ms(5), 3);
1301        wheel.schedule_forget(now + ms(500), 4); // not expired
1302        wheel.schedule_forget(now + ms(500), 5); // not expired
1303        assert_eq!(wheel.len(), 5);
1304
1305        let mut buf = Vec::new();
1306
1307        // Fire 2 of the 3 expired
1308        let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1309        assert_eq!(fired, 2);
1310        assert_eq!(wheel.len(), 3);
1311
1312        // Fire remaining expired (1 more)
1313        let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1314        assert_eq!(fired, 1);
1315        assert_eq!(wheel.len(), 2);
1316
1317        // The 2 unexpired should still be there
1318        assert_eq!(buf.len(), 3);
1319    }
1320
1321    // -------------------------------------------------------------------------
1322    // Re-add after drain
1323    // -------------------------------------------------------------------------
1324
1325    #[test]
1326    fn reuse_after_full_drain() {
1327        let now = Instant::now();
1328        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1329
1330        // Round 1: schedule and drain
1331        for i in 0..10 {
1332            wheel.schedule_forget(now + ms(1), i);
1333        }
1334        let mut buf = Vec::new();
1335        wheel.poll(now + ms(5), &mut buf);
1336        assert_eq!(buf.len(), 10);
1337        assert!(wheel.is_empty());
1338
1339        // Round 2: schedule and drain again — wheel must work normally
1340        buf.clear();
1341        for i in 10..20 {
1342            wheel.schedule_forget(now + ms(100), i);
1343        }
1344        assert_eq!(wheel.len(), 10);
1345
1346        wheel.poll(now + ms(200), &mut buf);
1347        assert_eq!(buf.len(), 10);
1348        assert!(wheel.is_empty());
1349    }
1350
1351    // -------------------------------------------------------------------------
1352    // All levels active simultaneously
1353    // -------------------------------------------------------------------------
1354
1355    #[test]
1356    fn all_levels_active() {
1357        let now = Instant::now();
1358        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1359
1360        // Schedule one timer per level with increasing distances.
1361        // Level 0: <64ms, Level 1: 64-511ms, Level 2: 512-4095ms, etc.
1362        let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1363        let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1364        for (i, &d) in distances.iter().enumerate() {
1365            handles.push(wheel.schedule(now + ms(d), i as u64));
1366        }
1367        assert_eq!(wheel.len(), 7);
1368
1369        // Cancel in a shuffled order: 4, 1, 6, 0, 3, 5, 2
1370        let order = [4, 1, 6, 0, 3, 5, 2];
1371        // Take ownership by swapping with dummies — actually we need to
1372        // cancel by index. Let's use Option to track.
1373        let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1374            handles.into_iter().map(Some).collect();
1375
1376        for &idx in &order {
1377            let h = opt_handles[idx].take().unwrap();
1378            let val = wheel.cancel(h);
1379            assert_eq!(val, Some(idx as u64));
1380        }
1381        assert!(wheel.is_empty());
1382    }
1383
1384    // -------------------------------------------------------------------------
1385    // Poll values match
1386    // -------------------------------------------------------------------------
1387
1388    #[test]
1389    fn poll_values_match() {
1390        let now = Instant::now();
1391        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1392
1393        let expected: Vec<u64> = (100..110).collect();
1394        for &v in &expected {
1395            wheel.schedule_forget(now + ms(5), v);
1396        }
1397
1398        let mut buf = Vec::new();
1399        wheel.poll(now + ms(10), &mut buf);
1400
1401        buf.sort_unstable();
1402        assert_eq!(buf, expected);
1403    }
1404
1405    // -------------------------------------------------------------------------
1406    // Reschedule
1407    // -------------------------------------------------------------------------
1408
1409    #[test]
1410    fn reschedule_moves_deadline() {
1411        let now = Instant::now();
1412        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1413
1414        let h = wheel.schedule(now + ms(100), 42);
1415        assert_eq!(wheel.len(), 1);
1416
1417        // Reschedule to earlier
1418        let h = wheel.reschedule(h, now + ms(50));
1419        assert_eq!(wheel.len(), 1);
1420
1421        // Should NOT fire at 40ms
1422        let mut buf = Vec::new();
1423        let fired = wheel.poll(now + ms(40), &mut buf);
1424        assert_eq!(fired, 0);
1425
1426        // Should fire at 50ms
1427        let fired = wheel.poll(now + ms(55), &mut buf);
1428        assert_eq!(fired, 1);
1429        assert_eq!(buf, vec![42]);
1430
1431        // Clean up zombie
1432        wheel.cancel(h);
1433    }
1434
1435    #[test]
1436    fn reschedule_to_later() {
1437        let now = Instant::now();
1438        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1439
1440        let h = wheel.schedule(now + ms(50), 7);
1441
1442        // Reschedule to later
1443        let h = wheel.reschedule(h, now + ms(200));
1444
1445        // Should NOT fire at original deadline
1446        let mut buf = Vec::new();
1447        let fired = wheel.poll(now + ms(60), &mut buf);
1448        assert_eq!(fired, 0);
1449
1450        // Should fire at new deadline
1451        let fired = wheel.poll(now + ms(210), &mut buf);
1452        assert_eq!(fired, 1);
1453        assert_eq!(buf, vec![7]);
1454
1455        wheel.cancel(h);
1456    }
1457
1458    #[test]
1459    #[should_panic(expected = "cannot reschedule a fired timer")]
1460    fn reschedule_panics_on_zombie() {
1461        let now = Instant::now();
1462        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1463
1464        let h = wheel.schedule(now + ms(10), 42);
1465
1466        let mut buf = Vec::new();
1467        wheel.poll(now + ms(20), &mut buf);
1468
1469        // h is now a zombie — reschedule should panic
1470        let _h = wheel.reschedule(h, now + ms(100));
1471    }
1472
1473    // -------------------------------------------------------------------------
1474    // Non-default builder configurations (L13)
1475    // -------------------------------------------------------------------------
1476
1477    #[test]
1478    fn custom_slots_per_level() {
1479        let now = Instant::now();
1480        // 32 slots/level instead of default 64
1481        let mut wheel: Wheel<u64> = WheelBuilder::default()
1482            .slots_per_level(32)
1483            .unbounded(256)
1484            .build(now);
1485
1486        // Level 0 range = 32 ticks (32ms with 1ms tick)
1487        // Deadline at 20ms should go to level 0
1488        let h1 = wheel.schedule(now + ms(20), 1);
1489        // Deadline at 40ms should go to level 1 (>= 32 ticks)
1490        let h2 = wheel.schedule(now + ms(40), 2);
1491
1492        let mut buf = Vec::new();
1493        wheel.poll(now + ms(25), &mut buf);
1494        assert_eq!(buf, vec![1]);
1495
1496        buf.clear();
1497        wheel.poll(now + ms(50), &mut buf);
1498        assert_eq!(buf, vec![2]);
1499
1500        wheel.cancel(h1);
1501        wheel.cancel(h2);
1502    }
1503
1504    #[test]
1505    fn custom_clk_shift() {
1506        let now = Instant::now();
1507        // clk_shift=2 means 4x multiplier between levels (instead of 8x)
1508        let mut wheel: Wheel<u64> = WheelBuilder::default()
1509            .clk_shift(2)
1510            .unbounded(256)
1511            .build(now);
1512
1513        // Level 0: 64 slots × 1ms = 64ms range
1514        // Level 1: 64 slots × 4ms = 256ms range (with 4x multiplier)
1515        let h1 = wheel.schedule(now + ms(50), 1); // level 0
1516        let h2 = wheel.schedule(now + ms(100), 2); // level 1 (>= 64 ticks, <256 ticks)
1517
1518        let mut buf = Vec::new();
1519        wheel.poll(now + ms(55), &mut buf);
1520        assert_eq!(buf, vec![1]);
1521
1522        buf.clear();
1523        wheel.poll(now + ms(110), &mut buf);
1524        assert_eq!(buf, vec![2]);
1525
1526        wheel.cancel(h1);
1527        wheel.cancel(h2);
1528    }
1529
1530    #[test]
1531    fn custom_num_levels() {
1532        let now = Instant::now();
1533        // Only 3 levels instead of 7
1534        let mut wheel: Wheel<u64> = WheelBuilder::default()
1535            .num_levels(3)
1536            .unbounded(256)
1537            .build(now);
1538
1539        // Level 0: 64ms, Level 1: 512ms, Level 2: 4096ms
1540        // Max range is level 2 = 4096ms. Beyond that, clamped.
1541        let h = wheel.schedule(now + ms(3000), 42);
1542        assert_eq!(wheel.len(), 1);
1543
1544        let mut buf = Vec::new();
1545        wheel.poll(now + ms(3100), &mut buf);
1546        assert_eq!(buf, vec![42]);
1547
1548        wheel.cancel(h);
1549    }
1550
1551    #[test]
1552    fn custom_tick_duration() {
1553        let now = Instant::now();
1554        // 100μs ticks instead of 1ms
1555        let mut wheel: Wheel<u64> = WheelBuilder::default()
1556            .tick_duration(Duration::from_micros(100))
1557            .unbounded(256)
1558            .build(now);
1559
1560        // 1ms = 10 ticks, should be level 0 (< 64 ticks)
1561        wheel.schedule_forget(now + ms(1), 1);
1562        // 10ms = 100 ticks, should be level 1 (>= 64 ticks)
1563        wheel.schedule_forget(now + ms(10), 2);
1564
1565        let mut buf = Vec::new();
1566        wheel.poll(now + ms(2), &mut buf);
1567        assert_eq!(buf, vec![1]);
1568
1569        buf.clear();
1570        wheel.poll(now + ms(15), &mut buf);
1571        assert_eq!(buf, vec![2]);
1572    }
1573
1574    #[test]
1575    fn bounded_custom_config() {
1576        let now = Instant::now();
1577        let mut wheel: BoundedWheel<u64> = WheelBuilder::default()
1578            .slots_per_level(16)
1579            .num_levels(4)
1580            .bounded(8)
1581            .build(now);
1582
1583        // Fill to capacity
1584        let mut handles = Vec::new();
1585        for i in 0..8 {
1586            handles.push(wheel.try_schedule(now + ms(i * 10 + 10), i).unwrap());
1587        }
1588        assert!(wheel.try_schedule(now + ms(100), 99).is_err());
1589
1590        // Cancel one, schedule another
1591        wheel.cancel(handles.remove(0));
1592        let h = wheel.try_schedule(now + ms(100), 99).unwrap();
1593        handles.push(h);
1594
1595        // Clean up
1596        for h in handles {
1597            wheel.cancel(h);
1598        }
1599    }
1600
1601    // -------------------------------------------------------------------------
1602    // Builder validation (L13)
1603    // -------------------------------------------------------------------------
1604
1605    #[test]
1606    #[should_panic(expected = "slots_per_level must be <= 64")]
1607    fn invalid_config_too_many_slots() {
1608        let now = Instant::now();
1609        WheelBuilder::default()
1610            .slots_per_level(128)
1611            .unbounded(1024)
1612            .build::<u64>(now);
1613    }
1614
1615    #[test]
1616    #[should_panic(expected = "num_levels must be > 0")]
1617    fn invalid_config_zero_levels() {
1618        let now = Instant::now();
1619        WheelBuilder::default()
1620            .num_levels(0)
1621            .unbounded(1024)
1622            .build::<u64>(now);
1623    }
1624
1625    #[test]
1626    #[should_panic(expected = "num_levels must be <= 8")]
1627    fn invalid_config_too_many_levels() {
1628        let now = Instant::now();
1629        WheelBuilder::default()
1630            .num_levels(9)
1631            .unbounded(1024)
1632            .build::<u64>(now);
1633    }
1634
1635    #[test]
1636    #[should_panic(expected = "clk_shift must be > 0")]
1637    fn invalid_config_zero_shift() {
1638        let now = Instant::now();
1639        WheelBuilder::default()
1640            .clk_shift(0)
1641            .unbounded(1024)
1642            .build::<u64>(now);
1643    }
1644
1645    #[test]
1646    #[should_panic(expected = "tick_duration must be non-zero")]
1647    fn invalid_config_zero_tick() {
1648        let now = Instant::now();
1649        WheelBuilder::default()
1650            .tick_duration(Duration::ZERO)
1651            .unbounded(1024)
1652            .build::<u64>(now);
1653    }
1654
1655    #[test]
1656    #[should_panic(expected = "overflow")]
1657    fn invalid_config_shift_overflow() {
1658        let now = Instant::now();
1659        // 8 levels × clk_shift=8 = 56 bits shift on level 7
1660        // Plus 64 slots (6 bits) = 62 bits, should be OK
1661        // But 8 levels × clk_shift=9 = 63 + 6 = 69 bits — overflow
1662        WheelBuilder::default()
1663            .num_levels(8)
1664            .clk_shift(9)
1665            .unbounded(1024)
1666            .build::<u64>(now);
1667    }
1668
1669    // -------------------------------------------------------------------------
1670    // Miri-compatible tests (L12)
1671    // -------------------------------------------------------------------------
1672    // These tests exercise the raw pointer paths (DLL operations, entry
1673    // lifecycle, poll) with Drop types to catch UB under Miri.
1674
1675    #[test]
1676    fn miri_schedule_cancel_drop_type() {
1677        let now = Instant::now();
1678        let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1679
1680        let h = wheel.schedule(now + ms(50), "hello".to_string());
1681        let val = wheel.cancel(h);
1682        assert_eq!(val, Some("hello".to_string()));
1683        assert!(wheel.is_empty());
1684    }
1685
1686    #[test]
1687    fn miri_poll_fires_drop_type() {
1688        let now = Instant::now();
1689        let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1690
1691        wheel.schedule_forget(now + ms(10), "a".to_string());
1692        wheel.schedule_forget(now + ms(10), "b".to_string());
1693        wheel.schedule_forget(now + ms(10), "c".to_string());
1694
1695        let mut buf = Vec::new();
1696        let fired = wheel.poll(now + ms(20), &mut buf);
1697        assert_eq!(fired, 3);
1698        assert_eq!(buf.len(), 3);
1699        assert!(wheel.is_empty());
1700    }
1701
1702    #[test]
1703    fn miri_cancel_zombie_drop_type() {
1704        let now = Instant::now();
1705        let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1706
1707        let h = wheel.schedule(now + ms(10), "zombie".to_string());
1708
1709        let mut buf = Vec::new();
1710        wheel.poll(now + ms(20), &mut buf);
1711        assert_eq!(buf, vec!["zombie".to_string()]);
1712
1713        // h is now a zombie — cancel frees the entry
1714        let val = wheel.cancel(h);
1715        assert_eq!(val, None);
1716    }
1717
1718    #[test]
1719    fn miri_free_active_and_zombie() {
1720        let now = Instant::now();
1721        let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1722
1723        // Active → fire-and-forget via free
1724        let h1 = wheel.schedule(now + ms(10), "active".to_string());
1725        wheel.free(h1);
1726
1727        // Poll fires the fire-and-forget entry
1728        let mut buf = Vec::new();
1729        wheel.poll(now + ms(20), &mut buf);
1730        assert_eq!(buf, vec!["active".to_string()]);
1731
1732        // Zombie → free
1733        let h2 = wheel.schedule(now + ms(10), "will-fire".to_string());
1734        buf.clear();
1735        wheel.poll(now + ms(20), &mut buf);
1736        wheel.free(h2); // zombie cleanup
1737    }
1738
1739    #[test]
1740    fn miri_reschedule_drop_type() {
1741        let now = Instant::now();
1742        let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1743
1744        let h = wheel.schedule(now + ms(100), "moveme".to_string());
1745        let h = wheel.reschedule(h, now + ms(50));
1746
1747        let mut buf = Vec::new();
1748        wheel.poll(now + ms(55), &mut buf);
1749        assert_eq!(buf, vec!["moveme".to_string()]);
1750
1751        wheel.cancel(h);
1752    }
1753
1754    #[test]
1755    fn miri_dll_multi_entry_same_slot() {
1756        // Multiple entries in same slot exercises DLL push/remove paths
1757        let now = Instant::now();
1758        let mut wheel: Wheel<Vec<u8>> = Wheel::unbounded(64, now);
1759
1760        let mut handles = Vec::new();
1761        for i in 0..5 {
1762            handles.push(wheel.schedule(now + ms(10), vec![i; 32]));
1763        }
1764
1765        // Cancel middle, then head, then tail
1766        let v2 = wheel.cancel(handles.remove(2));
1767        assert_eq!(v2.unwrap(), vec![2; 32]);
1768
1769        let v0 = wheel.cancel(handles.remove(0));
1770        assert_eq!(v0.unwrap(), vec![0; 32]);
1771
1772        // Poll remaining
1773        let mut buf = Vec::new();
1774        wheel.poll(now + ms(20), &mut buf);
1775        assert_eq!(buf.len(), 3);
1776
1777        // Clean up zombie handles
1778        for h in handles {
1779            wheel.cancel(h);
1780        }
1781    }
1782
1783    #[test]
1784    fn miri_drop_wheel_with_entries() {
1785        let now = Instant::now();
1786        let mut wheel: Wheel<String> = Wheel::unbounded(64, now);
1787
1788        // Schedule entries across multiple levels
1789        for i in 0..20 {
1790            wheel.schedule_forget(now + ms(i * 100), format!("entry-{i}"));
1791        }
1792        assert_eq!(wheel.len(), 20);
1793
1794        // Drop with active entries — must not leak or UB
1795        drop(wheel);
1796    }
1797
1798    #[test]
1799    fn miri_bounded_lifecycle() {
1800        let now = Instant::now();
1801        let mut wheel: BoundedWheel<String> = BoundedWheel::bounded(4, now);
1802
1803        let h1 = wheel.try_schedule(now + ms(10), "a".to_string()).unwrap();
1804        let h2 = wheel.try_schedule(now + ms(20), "b".to_string()).unwrap();
1805        let h3 = wheel.try_schedule(now + ms(30), "c".to_string()).unwrap();
1806        let h4 = wheel.try_schedule(now + ms(40), "d".to_string()).unwrap();
1807
1808        // Full
1809        let err = wheel.try_schedule(now + ms(50), "e".to_string());
1810        assert!(err.is_err());
1811
1812        // Cancel and reuse
1813        wheel.cancel(h1);
1814        let h5 = wheel.try_schedule(now + ms(50), "e".to_string()).unwrap();
1815
1816        // Poll fires some
1817        let mut buf = Vec::new();
1818        wheel.poll(now + ms(25), &mut buf);
1819
1820        // Clean up all remaining handles
1821        wheel.cancel(h2);
1822        wheel.free(h3);
1823        wheel.free(h4);
1824        wheel.free(h5);
1825    }
1826}
1827
1828#[cfg(test)]
1829mod proptests {
1830    use super::*;
1831    use proptest::prelude::*;
1832    use std::collections::HashSet;
1833    use std::mem;
1834    use std::time::{Duration, Instant};
1835
1836    /// Operation in a schedule/cancel interleaving.
1837    #[derive(Debug, Clone)]
1838    enum Op {
1839        /// Schedule a timer at `deadline_ms` milliseconds from epoch.
1840        Schedule { deadline_ms: u64 },
1841        /// Cancel the timer at the given index (modulo outstanding handles).
1842        Cancel { idx: usize },
1843    }
1844
1845    fn op_strategy() -> impl Strategy<Value = Op> {
1846        prop_oneof![
1847            // Schedule with deadlines from 1ms to 10_000ms
1848            (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1849            // Cancel at random index
1850            any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1851        ]
1852    }
1853
1854    proptest! {
1855        #![proptest_config(ProptestConfig::with_cases(500))]
1856
1857        /// Fuzz schedule/cancel interleaving.
1858        ///
1859        /// Random sequence of schedule and cancel operations. Invariants:
1860        /// - `len` always matches outstanding active timers
1861        /// - cancel on active handle returns `Some`
1862        /// - poll collects all un-cancelled values
1863        #[test]
1864        fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1865            let now = Instant::now();
1866            let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1867
1868            let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1869            let mut active_values: HashSet<u64> = HashSet::new();
1870            let mut next_id: u64 = 0;
1871
1872            for op in &ops {
1873                match op {
1874                    Op::Schedule { deadline_ms } => {
1875                        let id = next_id;
1876                        next_id += 1;
1877                        let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1878                        handles.push(h);
1879                        active_values.insert(id);
1880                    }
1881                    Op::Cancel { idx } => {
1882                        if !handles.is_empty() {
1883                            let i = idx % handles.len();
1884                            let h = handles.swap_remove(i);
1885                            let val = wheel.cancel(h);
1886                            // Value should be Some (all handles are for active timers)
1887                            let v = val.unwrap();
1888                            assert!(active_values.remove(&v));
1889                        }
1890                    }
1891                }
1892                // len must match active values
1893                prop_assert_eq!(wheel.len(), active_values.len());
1894            }
1895
1896            // Poll everything — should collect exactly the remaining active values
1897            let mut buf = Vec::new();
1898            // Use a far-future time to fire everything
1899            wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1900
1901            // Clean up zombie handles (poll fired them, handles still exist)
1902            for h in handles {
1903                mem::forget(h);
1904            }
1905
1906            let fired_set: HashSet<u64> = buf.into_iter().collect();
1907            prop_assert_eq!(fired_set, active_values);
1908            prop_assert!(wheel.is_empty());
1909        }
1910
1911        /// Fuzz poll timing.
1912        ///
1913        /// Schedule N timers with random deadlines. Poll at random increasing
1914        /// times. Assert every timer fires exactly once, fired deadlines are
1915        /// all <= poll time, unfired deadlines are all > poll time.
1916        #[test]
1917        fn fuzz_poll_timing(
1918            deadlines in proptest::collection::vec(1u64..5000, 1..100),
1919            poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1920        ) {
1921            let now = Instant::now();
1922            let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1923
1924            // Schedule all timers (fire-and-forget)
1925            for (i, &d) in deadlines.iter().enumerate() {
1926                wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1927            }
1928
1929            // Sort poll times to be monotonically increasing
1930            let mut sorted_times: Vec<u64> = poll_times;
1931            sorted_times.sort_unstable();
1932            sorted_times.dedup();
1933
1934            let mut all_fired: Vec<u64> = Vec::new();
1935
1936            for &t in &sorted_times {
1937                let mut buf = Vec::new();
1938                wheel.poll(now + Duration::from_millis(t), &mut buf);
1939
1940                // Every fired entry should have deadline_ms <= t
1941                for &id in &buf {
1942                    let deadline_ms = deadlines[id as usize];
1943                    prop_assert!(deadline_ms <= t,
1944                        "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1945                }
1946
1947                all_fired.extend(buf);
1948            }
1949
1950            // Fire everything remaining
1951            let mut final_buf = Vec::new();
1952            wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1953            all_fired.extend(final_buf);
1954
1955            // Every timer should have fired exactly once
1956            all_fired.sort_unstable();
1957            let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1958            prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1959            prop_assert!(wheel.is_empty());
1960        }
1961    }
1962}