Skip to main content

nexus_timer/
wheel.rs

1//! Timer wheel — the main data structure.
2//!
3//! `TimerWheel<T, S>` is a multi-level, no-cascade timer wheel. Entries are
4//! placed into a level based on how far in the future their deadline is.
5//! Once placed, an entry never moves — poll checks `deadline_ticks <= now`
6//! per entry.
7
8use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore, UnboundedStore};
18
19// =============================================================================
20// WheelBuilder (typestate)
21// =============================================================================
22
23/// Builder for configuring a timer wheel.
24///
25/// Defaults match the Linux kernel timer wheel (1ms tick, 64 slots/level,
26/// 8x multiplier, 7 levels → ~4.7 hour range).
27///
28/// # Examples
29///
30/// ```
31/// use std::time::{Duration, Instant};
32/// use nexus_timer::{Wheel, WheelBuilder};
33///
34/// let now = Instant::now();
35///
36/// // All defaults
37/// let wheel: Wheel<u64> = WheelBuilder::default().unbounded(4096).build(now);
38///
39/// // Custom config
40/// let wheel: Wheel<u64> = WheelBuilder::default()
41///     .tick_duration(Duration::from_micros(100))
42///     .slots_per_level(32)
43///     .unbounded(4096)
44///     .build(now);
45/// ```
46#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48    tick_duration: Duration,
49    slots_per_level: usize,
50    clk_shift: u32,
51    num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55    fn default() -> Self {
56        WheelBuilder {
57            tick_duration: Duration::from_millis(1),
58            slots_per_level: 64,
59            clk_shift: 3,
60            num_levels: 7,
61        }
62    }
63}
64
65impl WheelBuilder {
66    /// Creates a new builder with default configuration.
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Sets the tick duration. Default: 1ms.
72    pub fn tick_duration(mut self, d: Duration) -> Self {
73        self.tick_duration = d;
74        self
75    }
76
77    /// Sets the number of slots per level. Must be a power of 2. Default: 64.
78    pub fn slots_per_level(mut self, n: usize) -> Self {
79        self.slots_per_level = n;
80        self
81    }
82
83    /// Sets the bit shift between levels (multiplier = 2^clk_shift). Default: 3 (8x).
84    pub fn clk_shift(mut self, s: u32) -> Self {
85        self.clk_shift = s;
86        self
87    }
88
89    /// Sets the number of levels. Default: 7.
90    pub fn num_levels(mut self, n: usize) -> Self {
91        self.num_levels = n;
92        self
93    }
94
95    /// Transitions to an unbounded wheel builder.
96    ///
97    /// `chunk_capacity` is the slab chunk size (entries per chunk). The slab
98    /// grows by adding new chunks as needed.
99    pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100        UnboundedWheelBuilder {
101            config: self,
102            chunk_capacity,
103        }
104    }
105
106    /// Transitions to a bounded wheel builder.
107    ///
108    /// `capacity` is the maximum number of concurrent timers.
109    pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110        BoundedWheelBuilder {
111            config: self,
112            capacity,
113        }
114    }
115
116    fn validate(&self) {
117        assert!(
118            self.slots_per_level.is_power_of_two(),
119            "slots_per_level must be a power of 2, got {}",
120            self.slots_per_level
121        );
122        assert!(
123            self.slots_per_level <= 64,
124            "slots_per_level must be <= 64 (u64 bitmask), got {}",
125            self.slots_per_level
126        );
127        assert!(self.num_levels > 0, "num_levels must be > 0");
128        assert!(
129            self.num_levels <= 8,
130            "num_levels must be <= 8 (u8 bitmask), got {}",
131            self.num_levels
132        );
133        assert!(self.clk_shift > 0, "clk_shift must be > 0");
134        assert!(
135            !self.tick_duration.is_zero(),
136            "tick_duration must be non-zero"
137        );
138        let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139        assert!(
140            max_shift < 64,
141            "(num_levels - 1) * clk_shift must be < 64, got {}",
142            max_shift
143        );
144        let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145        assert!(
146            slots_log2 + max_shift < 64,
147            "slots_per_level << max_shift would overflow u64"
148        );
149    }
150
151    fn tick_ns(&self) -> u64 {
152        self.tick_duration.as_nanos() as u64
153    }
154}
155
156/// Terminal builder for an unbounded timer wheel.
157///
158/// Created via [`WheelBuilder::unbounded`]. The only method is `.build()`.
159#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161    config: WheelBuilder,
162    chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166    /// Builds the unbounded timer wheel.
167    ///
168    /// # Panics
169    ///
170    /// Panics if the configuration is invalid (non-power-of-2 slots, zero
171    /// levels, zero clk_shift, or zero tick duration).
172    pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173        self.config.validate();
174        let slab = unbounded::Slab::with_chunk_capacity(self.chunk_capacity);
175        let levels = build_levels::<T>(&self.config);
176        TimerWheel {
177            slab,
178            num_levels: self.config.num_levels,
179            levels,
180            current_ticks: 0,
181            tick_ns: self.config.tick_ns(),
182            epoch: now,
183            active_levels: 0,
184            len: 0,
185            _marker: PhantomData,
186        }
187    }
188}
189
190/// Terminal builder for a bounded timer wheel.
191///
192/// Created via [`WheelBuilder::bounded`]. The only method is `.build()`.
193#[derive(Debug)]
194pub struct BoundedWheelBuilder {
195    config: WheelBuilder,
196    capacity: usize,
197}
198
199impl BoundedWheelBuilder {
200    /// Builds the bounded timer wheel.
201    ///
202    /// # Panics
203    ///
204    /// Panics if the configuration is invalid (non-power-of-2 slots, zero
205    /// levels, zero clk_shift, or zero tick duration).
206    pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
207        self.config.validate();
208        let slab = bounded::Slab::with_capacity(self.capacity);
209        let levels = build_levels::<T>(&self.config);
210        TimerWheel {
211            slab,
212            num_levels: self.config.num_levels,
213            levels,
214            current_ticks: 0,
215            tick_ns: self.config.tick_ns(),
216            epoch: now,
217            active_levels: 0,
218            len: 0,
219            _marker: PhantomData,
220        }
221    }
222}
223
224// =============================================================================
225// TimerWheel
226// =============================================================================
227
228/// A multi-level, no-cascade timer wheel.
229///
230/// Generic over:
231/// - `T` — the user payload stored with each timer.
232/// - `S` — the slab storage backend. Defaults to `unbounded::Slab`.
233///
234/// # Thread Safety
235///
236/// `Send` but `!Sync`. Can be moved to a thread at setup but must not
237/// be shared. All internal raw pointers point into owned allocations
238/// (slab chunks, level slot arrays) — moving the wheel moves the heap
239/// data with it.
240pub struct TimerWheel<
241    T: 'static,
242    S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
243> {
244    slab: S,
245    levels: Vec<Level<T>>,
246    num_levels: usize,
247    active_levels: u8,
248    current_ticks: u64,
249    tick_ns: u64,
250    epoch: Instant,
251    len: usize,
252    _marker: PhantomData<*const ()>, // !Send (overridden below), !Sync
253}
254
255// SAFETY: TimerWheel<T, S> exclusively owns all memory behind its raw pointers.
256//
257// Pointer inventory and ownership:
258// - Slot `entry_head`/`entry_tail` — point into slab-owned memory (SlotCell
259//   in a slab chunk). Slab chunks are Vec<SlotCell<T>> heap allocations.
260// - DLL links (`WheelEntry::prev`, `WheelEntry::next`) — point to other
261//   SlotCells in the same slab.
262// - `Level::slots` — `Box<[WheelSlot<T>]>`, owned by the level.
263//
264// All pointed-to memory lives inside owned collections (Vec, Box<[T]>).
265// When TimerWheel is moved, the heap allocations stay at their addresses —
266// the internal pointers remain valid. No thread-local state. No shared
267// ownership.
268//
269// T: Send is required because timer values cross the thread boundary with
270// the wheel.
271//
272// Outstanding TimerHandle<T> values are !Send and cannot follow the wheel
273// across threads. They become inert — consuming them requires &mut
274// TimerWheel which the original thread no longer has. The debug_assert in
275// TimerHandle::drop catches this as a programming error. Worst case is a
276// slot leak (refcount stuck at 1), not unsoundness.
277#[allow(clippy::non_send_fields_in_send_ty)]
278unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
279
280/// A timer wheel backed by a fixed-capacity slab.
281pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
282
283/// A timer wheel backed by a growable slab.
284pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
285
286// =============================================================================
287// Construction
288// =============================================================================
289
290impl<T: 'static> Wheel<T> {
291    /// Creates an unbounded timer wheel with default configuration.
292    ///
293    /// For custom configuration, use [`WheelBuilder`].
294    pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
295        WheelBuilder::default().unbounded(chunk_capacity).build(now)
296    }
297}
298
299impl<T: 'static> BoundedWheel<T> {
300    /// Creates a bounded timer wheel with default configuration.
301    ///
302    /// For custom configuration, use [`WheelBuilder`].
303    pub fn bounded(capacity: usize, now: Instant) -> Self {
304        WheelBuilder::default().bounded(capacity).build(now)
305    }
306}
307
308fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
309    (0..config.num_levels)
310        .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
311        .collect()
312}
313
314// =============================================================================
315// Schedule — unbounded (always succeeds)
316// =============================================================================
317
318impl<T: 'static, S: UnboundedStore<Item = WheelEntry<T>> + SlabStore<Item = WheelEntry<T>>>
319    TimerWheel<T, S>
320{
321    /// Schedules a timer and returns a handle for cancellation.
322    ///
323    /// The handle must be consumed via [`cancel`](Self::cancel) or
324    /// [`free`](Self::free). Dropping it is a programming error.
325    pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
326        let deadline_ticks = self.instant_to_ticks(deadline);
327        let entry = WheelEntry::new(deadline_ticks, value, 2);
328        let slot = self.slab.alloc(entry);
329        let ptr = slot.as_ptr();
330        self.insert_entry(ptr, deadline_ticks);
331        self.len += 1;
332        TimerHandle::new(ptr)
333    }
334
335    /// Schedules a fire-and-forget timer (no handle returned).
336    ///
337    /// The timer will fire during poll and the value will be collected.
338    /// Cannot be cancelled.
339    pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
340        let deadline_ticks = self.instant_to_ticks(deadline);
341        let entry = WheelEntry::new(deadline_ticks, value, 1);
342        let slot = self.slab.alloc(entry);
343        let ptr = slot.as_ptr();
344        self.insert_entry(ptr, deadline_ticks);
345        self.len += 1;
346    }
347}
348
349// =============================================================================
350// Schedule — bounded (can fail)
351// =============================================================================
352
353impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>> + SlabStore<Item = WheelEntry<T>>>
354    TimerWheel<T, S>
355{
356    /// Attempts to schedule a timer, returning a handle on success.
357    ///
358    /// Returns `Err(Full(value))` if the slab is at capacity.
359    pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
360        let deadline_ticks = self.instant_to_ticks(deadline);
361        let entry = WheelEntry::new(deadline_ticks, value, 2);
362        match self.slab.try_alloc(entry) {
363            Ok(slot) => {
364                let ptr = slot.as_ptr();
365                self.insert_entry(ptr, deadline_ticks);
366                self.len += 1;
367                Ok(TimerHandle::new(ptr))
368            }
369            Err(full) => {
370                // Extract the user's T from the WheelEntry wrapper
371                // SAFETY: we just constructed this entry, take_value is valid
372                let wheel_entry = full.into_inner();
373                let value = unsafe { wheel_entry.take_value() }
374                    .expect("entry was just constructed with Some(value)");
375                Err(Full(value))
376            }
377        }
378    }
379
380    /// Attempts to schedule a fire-and-forget timer.
381    ///
382    /// Returns `Err(Full(value))` if the slab is at capacity.
383    pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
384        let deadline_ticks = self.instant_to_ticks(deadline);
385        let entry = WheelEntry::new(deadline_ticks, value, 1);
386        match self.slab.try_alloc(entry) {
387            Ok(slot) => {
388                let ptr = slot.as_ptr();
389                self.insert_entry(ptr, deadline_ticks);
390                self.len += 1;
391                Ok(())
392            }
393            Err(full) => {
394                let wheel_entry = full.into_inner();
395                let value = unsafe { wheel_entry.take_value() }
396                    .expect("entry was just constructed with Some(value)");
397                Err(Full(value))
398            }
399        }
400    }
401}
402
403// =============================================================================
404// Cancel / Free / Poll / Query — generic over any store
405// =============================================================================
406
407impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
408    /// Cancels a timer and returns its value.
409    ///
410    /// - If the timer is still active: unlinks from the wheel, extracts value,
411    ///   frees the slab entry. Returns `Some(T)`.
412    /// - If the timer already fired (zombie handle): frees the slab entry.
413    ///   Returns `None`.
414    ///
415    /// Consumes the handle (no Drop runs).
416    pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
417        let ptr = handle.ptr;
418        // Consume handle without running Drop
419        mem::forget(handle);
420
421        // SAFETY: handle guarantees ptr is valid and allocated from our slab.
422        let entry = unsafe { entry_ref(ptr) };
423        let refs = entry.refs();
424
425        if refs == 2 {
426            // Active timer with handle — unlink, extract, free
427            let value = unsafe { entry.take_value() };
428            self.remove_entry(ptr);
429            self.len -= 1;
430            // SAFETY: ptr was allocated from our slab, entry is now spent
431            unsafe { self.slab.free_ptr(ptr) };
432            value
433        } else {
434            // refs == 1 means the wheel already fired this (zombie).
435            // The fire path decremented 2→1 and left the entry for us to free.
436            debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
437            // SAFETY: ptr was allocated from our slab
438            unsafe { self.slab.free_ptr(ptr) };
439            None
440        }
441    }
442
443    /// Releases a timer handle without cancelling.
444    ///
445    /// - If the timer is still active: converts to fire-and-forget (refs 2→1).
446    ///   Timer stays in the wheel and will fire normally during poll.
447    /// - If the timer already fired (zombie): frees the slab entry (refs 1→0).
448    ///
449    /// Consumes the handle (no Drop runs).
450    pub fn free(&mut self, handle: TimerHandle<T>) {
451        let ptr = handle.ptr;
452        mem::forget(handle);
453
454        // SAFETY: handle guarantees ptr is valid
455        let entry = unsafe { entry_ref(ptr) };
456        let new_refs = entry.dec_refs();
457
458        if new_refs == 0 {
459            // Was a zombie (fired already, refs was 1) — free the entry
460            // SAFETY: ptr was allocated from our slab
461            unsafe { self.slab.free_ptr(ptr) };
462        }
463        // new_refs == 1: timer is now fire-and-forget, stays in wheel
464    }
465
466    /// Reschedules an active timer to a new deadline.
467    ///
468    /// Moves the entry from its current slot to the correct slot for
469    /// `new_deadline` without extracting or reconstructing the value.
470    ///
471    /// # Panics
472    ///
473    /// Panics if the timer has already fired (zombie handle). Only active
474    /// timers (refs == 2) can be rescheduled.
475    ///
476    /// Consumes and returns a new handle (same entry, new position).
477    pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
478        let ptr = handle.ptr;
479        mem::forget(handle);
480
481        // SAFETY: handle guarantees ptr is valid
482        let entry = unsafe { entry_ref(ptr) };
483        assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
484
485        // Remove from current position
486        self.remove_entry(ptr);
487
488        // Update deadline and reinsert
489        let new_ticks = self.instant_to_ticks(new_deadline);
490        entry.set_deadline_ticks(new_ticks);
491        self.insert_entry(ptr, new_ticks);
492
493        TimerHandle::new(ptr)
494    }
495
496    /// Fires all expired timers, collecting their values into `buf`.
497    ///
498    /// Returns the number of timers fired.
499    pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
500        self.poll_with_limit(now, usize::MAX, buf)
501    }
502
503    /// Fires expired timers up to `limit`, collecting values into `buf`.
504    ///
505    /// Resumable: if the limit is hit, the next call continues where this one
506    /// left off (as long as `now` hasn't changed).
507    ///
508    /// Returns the number of timers fired in this call.
509    pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
510        let now_ticks = self.instant_to_ticks(now);
511        self.current_ticks = now_ticks;
512
513        let mut fired = 0;
514        let mut mask = self.active_levels;
515
516        while mask != 0 && fired < limit {
517            let lvl_idx = mask.trailing_zeros() as usize;
518            mask &= mask - 1; // clear lowest set bit
519            fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
520        }
521        fired
522    }
523
524    /// Returns the `Instant` of the next timer that will fire, or `None` if empty.
525    ///
526    /// Walks only active (non-empty) slots. O(active_slots) in the worst case,
527    /// but typically very fast because most slots are empty.
528    pub fn next_deadline(&self) -> Option<Instant> {
529        let mut min_ticks: Option<u64> = None;
530
531        let mut lvl_mask = self.active_levels;
532        while lvl_mask != 0 {
533            let lvl_idx = lvl_mask.trailing_zeros() as usize;
534            lvl_mask &= lvl_mask - 1;
535
536            let level = &self.levels[lvl_idx];
537            let mut slot_mask = level.active_slots();
538            while slot_mask != 0 {
539                let slot_idx = slot_mask.trailing_zeros() as usize;
540                slot_mask &= slot_mask - 1;
541
542                let slot = level.slot(slot_idx);
543                let mut entry_ptr = slot.entry_head();
544
545                while !entry_ptr.is_null() {
546                    // SAFETY: entry_ptr is in this slot's DLL
547                    let entry = unsafe { entry_ref(entry_ptr) };
548                    let dt = entry.deadline_ticks();
549                    min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
550                    entry_ptr = entry.next();
551                }
552            }
553        }
554
555        min_ticks.map(|t| self.ticks_to_instant(t))
556    }
557
558    /// Returns the number of timers currently in the wheel.
559    #[inline]
560    pub fn len(&self) -> usize {
561        self.len
562    }
563
564    /// Returns true if the wheel contains no timers.
565    #[inline]
566    pub fn is_empty(&self) -> bool {
567        self.len == 0
568    }
569
570    // =========================================================================
571    // Internal: tick conversion
572    // =========================================================================
573
574    #[inline]
575    fn instant_to_ticks(&self, instant: Instant) -> u64 {
576        // Saturate at 0 for instants before epoch
577        let dur = instant.saturating_duration_since(self.epoch);
578        dur.as_nanos() as u64 / self.tick_ns
579    }
580
581    #[inline]
582    fn ticks_to_instant(&self, ticks: u64) -> Instant {
583        self.epoch + Duration::from_nanos(ticks * self.tick_ns)
584    }
585
586    // =========================================================================
587    // Internal: level selection
588    // =========================================================================
589
590    /// Selects the appropriate level for a deadline.
591    ///
592    /// Walks levels from finest to coarsest, picking the first level whose
593    /// range can represent the delta. Clamps to the highest level if the
594    /// deadline exceeds the wheel's total range.
595    #[inline]
596    fn select_level(&self, deadline_ticks: u64) -> usize {
597        let delta = deadline_ticks.saturating_sub(self.current_ticks);
598
599        for (i, level) in self.levels.iter().enumerate() {
600            if delta < level.range() {
601                return i;
602            }
603        }
604
605        // Beyond max range — clamp to highest level
606        self.num_levels - 1
607    }
608
609    // =========================================================================
610    // Internal: entry insertion into a level's slot
611    // =========================================================================
612
613    /// Inserts an entry into the appropriate level and slot.
614    ///
615    /// Records the level and slot index on the entry so `remove_entry` can
616    /// find it without recomputing (which would be unsound after time advances).
617    #[inline]
618    #[allow(clippy::needless_pass_by_ref_mut)]
619    fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
620        let lvl_idx = self.select_level(deadline_ticks);
621        let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
622
623        // Record location on the entry for O(1) lookup at cancel time.
624        // SAFETY: entry_ptr is valid (just allocated)
625        let entry = unsafe { entry_ref(entry_ptr) };
626        entry.set_location(lvl_idx as u8, slot_idx as u16);
627
628        // SAFETY: entry_ptr is valid (just allocated), not in any DLL yet
629        unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
630
631        // Activate slot and level (idempotent — OR is a no-op if already set)
632        self.levels[lvl_idx].activate_slot(slot_idx);
633        self.active_levels |= 1 << lvl_idx;
634    }
635
636    /// Removes an entry from its level's slot DLL.
637    ///
638    /// Reads the stored level and slot index from the entry (set at insertion
639    /// time). Does NOT recompute from delta — that would be unsound after
640    /// `current_ticks` advances.
641    #[inline]
642    #[allow(clippy::needless_pass_by_ref_mut)]
643    fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
644        // SAFETY: entry_ptr is valid (caller guarantee)
645        let entry = unsafe { entry_ref(entry_ptr) };
646
647        let lvl_idx = entry.level() as usize;
648        let slot_idx = entry.slot_idx() as usize;
649
650        // SAFETY: entry_ptr is in this slot's DLL (invariant from insert_entry)
651        unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
652
653        if self.levels[lvl_idx].slot(slot_idx).is_empty() {
654            self.levels[lvl_idx].deactivate_slot(slot_idx);
655            if !self.levels[lvl_idx].is_active() {
656                self.active_levels &= !(1 << lvl_idx);
657            }
658        }
659    }
660
661    // =========================================================================
662    // Internal: fire an entry
663    // =========================================================================
664
665    /// Fires a single entry: extracts value, decrements refcount, possibly frees.
666    ///
667    /// Returns `Some(T)` if the value was still present (not already cancelled).
668    #[inline]
669    fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
670        // SAFETY: entry_ptr is valid (we're walking the DLL)
671        let entry = unsafe { entry_ref(entry_ptr) };
672
673        // Extract value
674        // SAFETY: single-threaded
675        let value = unsafe { entry.take_value() };
676
677        let new_refs = entry.dec_refs();
678        if new_refs == 0 {
679            // Fire-and-forget (was refs=1) — free the slab entry immediately
680            // SAFETY: entry_ptr was allocated from our slab
681            unsafe { self.slab.free_ptr(entry_ptr) };
682        }
683        // new_refs == 1: handle exists (was refs=2), entry becomes zombie.
684        // Handle holder will free via cancel() or free().
685
686        self.len -= 1;
687        value
688    }
689
690    // =========================================================================
691    // Internal: poll a single level
692    // =========================================================================
693
694    /// Polls a single level for expired entries up to `limit`.
695    ///
696    fn poll_level(
697        &mut self,
698        lvl_idx: usize,
699        now_ticks: u64,
700        limit: usize,
701        buf: &mut Vec<T>,
702    ) -> usize {
703        let mut fired = 0;
704        let mut mask = self.levels[lvl_idx].active_slots();
705
706        while mask != 0 && fired < limit {
707            let slot_idx = mask.trailing_zeros() as usize;
708            mask &= mask - 1;
709
710            let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
711            // SAFETY: slot_ptr points into self.levels[lvl_idx].slots
712            // (Box<[WheelSlot<T>]>), a stable heap allocation. fire_entry
713            // only mutates self.slab and self.len, not self.levels.
714            let slot = unsafe { &*slot_ptr };
715            let mut entry_ptr = slot.entry_head();
716
717            while !entry_ptr.is_null() && fired < limit {
718                // SAFETY: entry_ptr is in this slot's DLL
719                let entry = unsafe { entry_ref(entry_ptr) };
720                let next_entry = entry.next();
721
722                if entry.deadline_ticks() <= now_ticks {
723                    unsafe { slot.remove_entry(entry_ptr) };
724
725                    if let Some(value) = self.fire_entry(entry_ptr) {
726                        buf.push(value);
727                    }
728                    fired += 1;
729                }
730
731                entry_ptr = next_entry;
732            }
733
734            if slot.is_empty() {
735                self.levels[lvl_idx].deactivate_slot(slot_idx);
736            }
737        }
738
739        // Deactivate level if all slots drained
740        if !self.levels[lvl_idx].is_active() {
741            self.active_levels &= !(1 << lvl_idx);
742        }
743
744        fired
745    }
746}
747
748// =============================================================================
749// Drop
750// =============================================================================
751
752impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
753    fn drop(&mut self) {
754        // Walk active levels and slots via bitmasks, free every entry.
755        let mut lvl_mask = self.active_levels;
756        while lvl_mask != 0 {
757            let lvl_idx = lvl_mask.trailing_zeros() as usize;
758            lvl_mask &= lvl_mask - 1;
759
760            let level = &self.levels[lvl_idx];
761            let mut slot_mask = level.active_slots();
762            while slot_mask != 0 {
763                let slot_idx = slot_mask.trailing_zeros() as usize;
764                slot_mask &= slot_mask - 1;
765
766                let slot = level.slot(slot_idx);
767                let mut entry_ptr = slot.entry_head();
768                while !entry_ptr.is_null() {
769                    // SAFETY: entry_ptr is in this slot's DLL
770                    let entry = unsafe { entry_ref(entry_ptr) };
771                    let next_entry = entry.next();
772
773                    // SAFETY: entry_ptr was allocated from our slab
774                    unsafe { self.slab.free(nexus_slab::Slot::from_ptr(entry_ptr)) };
775
776                    entry_ptr = next_entry;
777                }
778            }
779        }
780    }
781}
782
783// =============================================================================
784// Tests
785// =============================================================================
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790    use std::time::{Duration, Instant};
791
792    fn ms(millis: u64) -> Duration {
793        Duration::from_millis(millis)
794    }
795
796    // -------------------------------------------------------------------------
797    // Thread safety
798    // -------------------------------------------------------------------------
799
800    fn _assert_send<T: Send>() {}
801
802    #[test]
803    fn wheel_is_send() {
804        _assert_send::<Wheel<u64>>();
805        _assert_send::<BoundedWheel<u64>>();
806    }
807
808    // -------------------------------------------------------------------------
809    // Construction
810    // -------------------------------------------------------------------------
811
812    #[test]
813    fn default_config() {
814        let now = Instant::now();
815        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
816        assert!(wheel.is_empty());
817        assert_eq!(wheel.len(), 0);
818    }
819
820    #[test]
821    fn bounded_construction() {
822        let now = Instant::now();
823        let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
824        assert!(wheel.is_empty());
825    }
826
827    #[test]
828    #[should_panic(expected = "slots_per_level must be a power of 2")]
829    fn invalid_config_non_power_of_two() {
830        let now = Instant::now();
831        WheelBuilder::default()
832            .slots_per_level(65)
833            .unbounded(1024)
834            .build::<u64>(now);
835    }
836
837    // -------------------------------------------------------------------------
838    // Schedule + Cancel
839    // -------------------------------------------------------------------------
840
841    #[test]
842    fn schedule_and_cancel() {
843        let now = Instant::now();
844        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
845
846        let h = wheel.schedule(now + ms(50), 42);
847        assert_eq!(wheel.len(), 1);
848
849        let val = wheel.cancel(h);
850        assert_eq!(val, Some(42));
851        assert_eq!(wheel.len(), 0);
852    }
853
854    #[test]
855    fn schedule_forget_fires() {
856        let now = Instant::now();
857        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
858
859        wheel.schedule_forget(now + ms(10), 99);
860        assert_eq!(wheel.len(), 1);
861
862        let mut buf = Vec::new();
863        let fired = wheel.poll(now + ms(20), &mut buf);
864        assert_eq!(fired, 1);
865        assert_eq!(buf, vec![99]);
866        assert_eq!(wheel.len(), 0);
867    }
868
869    #[test]
870    fn cancel_after_fire_returns_none() {
871        let now = Instant::now();
872        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
873
874        let h = wheel.schedule(now + ms(10), 42);
875
876        let mut buf = Vec::new();
877        wheel.poll(now + ms(20), &mut buf);
878        assert_eq!(buf, vec![42]);
879
880        // Handle is now a zombie
881        let val = wheel.cancel(h);
882        assert_eq!(val, None);
883    }
884
885    #[test]
886    fn free_active_timer_becomes_fire_and_forget() {
887        let now = Instant::now();
888        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
889
890        let h = wheel.schedule(now + ms(10), 42);
891        wheel.free(h); // releases handle, timer stays
892        assert_eq!(wheel.len(), 1);
893
894        let mut buf = Vec::new();
895        wheel.poll(now + ms(20), &mut buf);
896        assert_eq!(buf, vec![42]);
897        assert_eq!(wheel.len(), 0);
898    }
899
900    #[test]
901    fn free_zombie_handle() {
902        let now = Instant::now();
903        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
904
905        let h = wheel.schedule(now + ms(10), 42);
906
907        let mut buf = Vec::new();
908        wheel.poll(now + ms(20), &mut buf);
909
910        // Handle is zombie, free should clean up
911        wheel.free(h);
912    }
913
914    // -------------------------------------------------------------------------
915    // Bounded wheel
916    // -------------------------------------------------------------------------
917
918    #[test]
919    fn bounded_full() {
920        let now = Instant::now();
921        let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
922
923        let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
924        let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
925
926        let err = wheel.try_schedule(now + ms(30), 3);
927        assert!(err.is_err());
928        let recovered = err.unwrap_err().into_inner();
929        assert_eq!(recovered, 3);
930
931        // Cancel one, should have room
932        wheel.cancel(h1);
933        let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
934
935        // Clean up handles
936        wheel.free(h2);
937        wheel.free(h3);
938    }
939
940    #[test]
941    fn bounded_schedule_forget_full() {
942        let now = Instant::now();
943        let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
944
945        wheel.try_schedule_forget(now + ms(10), 1).unwrap();
946        let err = wheel.try_schedule_forget(now + ms(20), 2);
947        assert!(err.is_err());
948    }
949
950    // -------------------------------------------------------------------------
951    // Poll
952    // -------------------------------------------------------------------------
953
954    #[test]
955    fn poll_respects_deadline() {
956        let now = Instant::now();
957        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
958
959        wheel.schedule_forget(now + ms(10), 1);
960        wheel.schedule_forget(now + ms(50), 2);
961        wheel.schedule_forget(now + ms(100), 3);
962
963        let mut buf = Vec::new();
964
965        // At 20ms: only timer 1 should fire
966        let fired = wheel.poll(now + ms(20), &mut buf);
967        assert_eq!(fired, 1);
968        assert_eq!(buf, vec![1]);
969        assert_eq!(wheel.len(), 2);
970
971        // At 60ms: timer 2 fires
972        buf.clear();
973        let fired = wheel.poll(now + ms(60), &mut buf);
974        assert_eq!(fired, 1);
975        assert_eq!(buf, vec![2]);
976
977        // At 200ms: timer 3 fires
978        buf.clear();
979        let fired = wheel.poll(now + ms(200), &mut buf);
980        assert_eq!(fired, 1);
981        assert_eq!(buf, vec![3]);
982
983        assert!(wheel.is_empty());
984    }
985
986    #[test]
987    fn poll_with_limit() {
988        let now = Instant::now();
989        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
990
991        for i in 0..10 {
992            wheel.schedule_forget(now + ms(1), i);
993        }
994
995        let mut buf = Vec::new();
996
997        // Fire 3 at a time
998        let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
999        assert_eq!(fired, 3);
1000        assert_eq!(wheel.len(), 7);
1001
1002        let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1003        assert_eq!(fired, 3);
1004        assert_eq!(wheel.len(), 4);
1005
1006        // Fire remaining
1007        let fired = wheel.poll(now + ms(5), &mut buf);
1008        assert_eq!(fired, 4);
1009        assert!(wheel.is_empty());
1010        assert_eq!(buf.len(), 10);
1011    }
1012
1013    // -------------------------------------------------------------------------
1014    // Multi-level
1015    // -------------------------------------------------------------------------
1016
1017    #[test]
1018    fn timers_across_levels() {
1019        let now = Instant::now();
1020        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1021
1022        // Level 0: 0-63ms
1023        wheel.schedule_forget(now + ms(5), 0);
1024        // Level 1: 64-511ms
1025        wheel.schedule_forget(now + ms(200), 1);
1026        // Level 2: 512-4095ms
1027        wheel.schedule_forget(now + ms(1000), 2);
1028
1029        let mut buf = Vec::new();
1030
1031        wheel.poll(now + ms(10), &mut buf);
1032        assert_eq!(buf, vec![0]);
1033
1034        buf.clear();
1035        wheel.poll(now + ms(250), &mut buf);
1036        assert_eq!(buf, vec![1]);
1037
1038        buf.clear();
1039        wheel.poll(now + ms(1500), &mut buf);
1040        assert_eq!(buf, vec![2]);
1041
1042        assert!(wheel.is_empty());
1043    }
1044
1045    // -------------------------------------------------------------------------
1046    // next_deadline
1047    // -------------------------------------------------------------------------
1048
1049    #[test]
1050    fn next_deadline_empty() {
1051        let now = Instant::now();
1052        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1053        assert!(wheel.next_deadline().is_none());
1054    }
1055
1056    #[test]
1057    fn next_deadline_returns_earliest() {
1058        let now = Instant::now();
1059        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1060
1061        wheel.schedule_forget(now + ms(100), 1);
1062        wheel.schedule_forget(now + ms(50), 2);
1063        wheel.schedule_forget(now + ms(200), 3);
1064
1065        let next = wheel.next_deadline().unwrap();
1066        // Should be close to now + 50ms (within tick granularity)
1067        let delta = next.duration_since(now);
1068        assert!(delta >= ms(49) && delta <= ms(51));
1069    }
1070
1071    // -------------------------------------------------------------------------
1072    // Deadline in the past
1073    // -------------------------------------------------------------------------
1074
1075    #[test]
1076    fn deadline_in_the_past_fires_immediately() {
1077        let now = Instant::now();
1078        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1079
1080        // Schedule at epoch (which is "now" at construction)
1081        wheel.schedule_forget(now, 42);
1082
1083        let mut buf = Vec::new();
1084        let fired = wheel.poll(now + ms(1), &mut buf);
1085        assert_eq!(fired, 1);
1086        assert_eq!(buf, vec![42]);
1087    }
1088
1089    // -------------------------------------------------------------------------
1090    // Deadline beyond max range — clamped
1091    // -------------------------------------------------------------------------
1092
1093    #[test]
1094    fn deadline_beyond_max_range_clamped() {
1095        let now = Instant::now();
1096        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1097
1098        // Way in the future — should clamp to highest level
1099        let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1100        assert_eq!(wheel.len(), 1);
1101
1102        // Won't fire at any reasonable time but will fire when enough ticks pass
1103        let mut buf = Vec::new();
1104        wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1105        assert_eq!(buf, vec![99]);
1106
1107        // Note: handle was already consumed by the poll (fire-and-forget path won't
1108        // apply since refs=2). Actually the handle still exists. Let's clean up.
1109        // The timer already fired, so cancel returns None.
1110        // Actually buf got the value, which means it fired. But handle still needs cleanup.
1111        // We already pushed the value so we need to handle the zombie.
1112        // Wait — we used schedule (refs=2), poll fired it (refs 2→1 zombie), handle `h` exists.
1113        // Actually we consumed it with the poll — no we didn't, we still have `h`.
1114
1115        // h is a zombie handle now
1116        let val = wheel.cancel(h);
1117        assert_eq!(val, None);
1118    }
1119
1120    // -------------------------------------------------------------------------
1121    // Drop
1122    // -------------------------------------------------------------------------
1123
1124    #[test]
1125    fn drop_cleans_up_active_entries() {
1126        let now = Instant::now();
1127        let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1128
1129        for i in 0..100 {
1130            wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1131        }
1132
1133        assert_eq!(wheel.len(), 100);
1134        // Drop should free all entries without leaking
1135        drop(wheel);
1136    }
1137
1138    #[test]
1139    fn drop_with_outstanding_handles() {
1140        let now = Instant::now();
1141        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1142
1143        // Schedule but DON'T cancel — just free the handles
1144        let h1 = wheel.schedule(now + ms(10), 1);
1145        let h2 = wheel.schedule(now + ms(20), 2);
1146
1147        // Free the handles (convert to fire-and-forget) so they don't debug_assert
1148        wheel.free(h1);
1149        wheel.free(h2);
1150
1151        // Drop the wheel — should clean up the entries
1152        drop(wheel);
1153    }
1154
1155    // -------------------------------------------------------------------------
1156    // Level selection
1157    // -------------------------------------------------------------------------
1158
1159    #[test]
1160    fn level_selection_boundaries() {
1161        let now = Instant::now();
1162        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1163
1164        // Level 0: delta < 64
1165        assert_eq!(wheel.select_level(0), 0);
1166        assert_eq!(wheel.select_level(63), 0);
1167
1168        // Level 1: 64 <= delta < 512
1169        assert_eq!(wheel.select_level(64), 1);
1170        assert_eq!(wheel.select_level(511), 1);
1171
1172        // Level 2: 512 <= delta < 4096
1173        assert_eq!(wheel.select_level(512), 2);
1174    }
1175
1176    // -------------------------------------------------------------------------
1177    // Bug fix validation: cancel after time advance
1178    // -------------------------------------------------------------------------
1179
1180    #[test]
1181    fn cancel_after_time_advance() {
1182        // The critical bug: schedule at T+500ms (level 2, delta=500 ticks),
1183        // poll at T+400ms (no fire, but current_ticks advances to 400),
1184        // cancel at T+400ms. Old code would recompute delta = 500-400 = 100
1185        // → level 1. But the entry is in level 2. Stored location fixes this.
1186        let now = Instant::now();
1187        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1188
1189        let h = wheel.schedule(now + ms(500), 42);
1190        assert_eq!(wheel.len(), 1);
1191
1192        // Advance time — timer doesn't fire (deadline is 500ms)
1193        let mut buf = Vec::new();
1194        let fired = wheel.poll(now + ms(400), &mut buf);
1195        assert_eq!(fired, 0);
1196        assert!(buf.is_empty());
1197
1198        // Cancel after time advance — must find the entry in the correct slot
1199        let val = wheel.cancel(h);
1200        assert_eq!(val, Some(42));
1201        assert_eq!(wheel.len(), 0);
1202    }
1203
1204    // -------------------------------------------------------------------------
1205    // Same-slot entries
1206    // -------------------------------------------------------------------------
1207
1208    #[test]
1209    fn multiple_entries_same_slot() {
1210        let now = Instant::now();
1211        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1212
1213        // All 5 timers at the same deadline → same slot
1214        let mut handles = Vec::new();
1215        for i in 0..5 {
1216            handles.push(wheel.schedule(now + ms(10), i));
1217        }
1218        assert_eq!(wheel.len(), 5);
1219
1220        // Cancel the middle ones
1221        let v2 = wheel.cancel(handles.remove(2));
1222        assert_eq!(v2, Some(2));
1223        let v0 = wheel.cancel(handles.remove(0));
1224        assert_eq!(v0, Some(0));
1225        assert_eq!(wheel.len(), 3);
1226
1227        // Poll — remaining 3 should fire
1228        let mut buf = Vec::new();
1229        let fired = wheel.poll(now + ms(20), &mut buf);
1230        assert_eq!(fired, 3);
1231
1232        // Clean up zombie handles
1233        for h in handles {
1234            let val = wheel.cancel(h);
1235            assert_eq!(val, None); // already fired
1236        }
1237    }
1238
1239    // -------------------------------------------------------------------------
1240    // Level boundary
1241    // -------------------------------------------------------------------------
1242
1243    #[test]
1244    fn entry_at_level_boundary() {
1245        // Default config: level 0 range = 64 ticks (64ms).
1246        // A deadline at exactly tick 64 should go to level 1, not level 0.
1247        let now = Instant::now();
1248        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1249
1250        let h = wheel.schedule(now + ms(64), 99);
1251        assert_eq!(wheel.len(), 1);
1252
1253        // Should NOT fire at 63ms
1254        let mut buf = Vec::new();
1255        let fired = wheel.poll(now + ms(63), &mut buf);
1256        assert_eq!(fired, 0);
1257
1258        // Should fire at 64ms
1259        let fired = wheel.poll(now + ms(65), &mut buf);
1260        assert_eq!(fired, 1);
1261        assert_eq!(buf, vec![99]);
1262
1263        // Clean up zombie handle
1264        wheel.cancel(h);
1265    }
1266
1267    // -------------------------------------------------------------------------
1268    // Bookmark/resumption with mixed expiry
1269    // -------------------------------------------------------------------------
1270
1271    #[test]
1272    fn poll_with_limit_mixed_expiry() {
1273        let now = Instant::now();
1274        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1275
1276        // 3 expired at poll time, 2 not
1277        wheel.schedule_forget(now + ms(5), 1);
1278        wheel.schedule_forget(now + ms(5), 2);
1279        wheel.schedule_forget(now + ms(5), 3);
1280        wheel.schedule_forget(now + ms(500), 4); // not expired
1281        wheel.schedule_forget(now + ms(500), 5); // not expired
1282        assert_eq!(wheel.len(), 5);
1283
1284        let mut buf = Vec::new();
1285
1286        // Fire 2 of the 3 expired
1287        let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1288        assert_eq!(fired, 2);
1289        assert_eq!(wheel.len(), 3);
1290
1291        // Fire remaining expired (1 more)
1292        let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1293        assert_eq!(fired, 1);
1294        assert_eq!(wheel.len(), 2);
1295
1296        // The 2 unexpired should still be there
1297        assert_eq!(buf.len(), 3);
1298    }
1299
1300    // -------------------------------------------------------------------------
1301    // Re-add after drain
1302    // -------------------------------------------------------------------------
1303
1304    #[test]
1305    fn reuse_after_full_drain() {
1306        let now = Instant::now();
1307        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1308
1309        // Round 1: schedule and drain
1310        for i in 0..10 {
1311            wheel.schedule_forget(now + ms(1), i);
1312        }
1313        let mut buf = Vec::new();
1314        wheel.poll(now + ms(5), &mut buf);
1315        assert_eq!(buf.len(), 10);
1316        assert!(wheel.is_empty());
1317
1318        // Round 2: schedule and drain again — wheel must work normally
1319        buf.clear();
1320        for i in 10..20 {
1321            wheel.schedule_forget(now + ms(100), i);
1322        }
1323        assert_eq!(wheel.len(), 10);
1324
1325        wheel.poll(now + ms(200), &mut buf);
1326        assert_eq!(buf.len(), 10);
1327        assert!(wheel.is_empty());
1328    }
1329
1330    // -------------------------------------------------------------------------
1331    // All levels active simultaneously
1332    // -------------------------------------------------------------------------
1333
1334    #[test]
1335    fn all_levels_active() {
1336        let now = Instant::now();
1337        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1338
1339        // Schedule one timer per level with increasing distances.
1340        // Level 0: <64ms, Level 1: 64-511ms, Level 2: 512-4095ms, etc.
1341        let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1342        let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1343        for (i, &d) in distances.iter().enumerate() {
1344            handles.push(wheel.schedule(now + ms(d), i as u64));
1345        }
1346        assert_eq!(wheel.len(), 7);
1347
1348        // Cancel in a shuffled order: 4, 1, 6, 0, 3, 5, 2
1349        let order = [4, 1, 6, 0, 3, 5, 2];
1350        // Take ownership by swapping with dummies — actually we need to
1351        // cancel by index. Let's use Option to track.
1352        let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1353            handles.into_iter().map(Some).collect();
1354
1355        for &idx in &order {
1356            let h = opt_handles[idx].take().unwrap();
1357            let val = wheel.cancel(h);
1358            assert_eq!(val, Some(idx as u64));
1359        }
1360        assert!(wheel.is_empty());
1361    }
1362
1363    // -------------------------------------------------------------------------
1364    // Poll values match
1365    // -------------------------------------------------------------------------
1366
1367    #[test]
1368    fn poll_values_match() {
1369        let now = Instant::now();
1370        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1371
1372        let expected: Vec<u64> = (100..110).collect();
1373        for &v in &expected {
1374            wheel.schedule_forget(now + ms(5), v);
1375        }
1376
1377        let mut buf = Vec::new();
1378        wheel.poll(now + ms(10), &mut buf);
1379
1380        buf.sort();
1381        assert_eq!(buf, expected);
1382    }
1383
1384    // -------------------------------------------------------------------------
1385    // Reschedule
1386    // -------------------------------------------------------------------------
1387
1388    #[test]
1389    fn reschedule_moves_deadline() {
1390        let now = Instant::now();
1391        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1392
1393        let h = wheel.schedule(now + ms(100), 42);
1394        assert_eq!(wheel.len(), 1);
1395
1396        // Reschedule to earlier
1397        let h = wheel.reschedule(h, now + ms(50));
1398        assert_eq!(wheel.len(), 1);
1399
1400        // Should NOT fire at 40ms
1401        let mut buf = Vec::new();
1402        let fired = wheel.poll(now + ms(40), &mut buf);
1403        assert_eq!(fired, 0);
1404
1405        // Should fire at 50ms
1406        let fired = wheel.poll(now + ms(55), &mut buf);
1407        assert_eq!(fired, 1);
1408        assert_eq!(buf, vec![42]);
1409
1410        // Clean up zombie
1411        wheel.cancel(h);
1412    }
1413
1414    #[test]
1415    fn reschedule_to_later() {
1416        let now = Instant::now();
1417        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1418
1419        let h = wheel.schedule(now + ms(50), 7);
1420
1421        // Reschedule to later
1422        let h = wheel.reschedule(h, now + ms(200));
1423
1424        // Should NOT fire at original deadline
1425        let mut buf = Vec::new();
1426        let fired = wheel.poll(now + ms(60), &mut buf);
1427        assert_eq!(fired, 0);
1428
1429        // Should fire at new deadline
1430        let fired = wheel.poll(now + ms(210), &mut buf);
1431        assert_eq!(fired, 1);
1432        assert_eq!(buf, vec![7]);
1433
1434        wheel.cancel(h);
1435    }
1436
1437    #[test]
1438    #[should_panic(expected = "cannot reschedule a fired timer")]
1439    fn reschedule_panics_on_zombie() {
1440        let now = Instant::now();
1441        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1442
1443        let h = wheel.schedule(now + ms(10), 42);
1444
1445        let mut buf = Vec::new();
1446        wheel.poll(now + ms(20), &mut buf);
1447
1448        // h is now a zombie — reschedule should panic
1449        let _h = wheel.reschedule(h, now + ms(100));
1450    }
1451}
1452
1453#[cfg(test)]
1454mod proptests {
1455    use super::*;
1456    use proptest::prelude::*;
1457    use std::collections::HashSet;
1458    use std::mem;
1459    use std::time::{Duration, Instant};
1460
1461    /// Operation in a schedule/cancel interleaving.
1462    #[derive(Debug, Clone)]
1463    enum Op {
1464        /// Schedule a timer at `deadline_ms` milliseconds from epoch.
1465        Schedule { deadline_ms: u64 },
1466        /// Cancel the timer at the given index (modulo outstanding handles).
1467        Cancel { idx: usize },
1468    }
1469
1470    fn op_strategy() -> impl Strategy<Value = Op> {
1471        prop_oneof![
1472            // Schedule with deadlines from 1ms to 10_000ms
1473            (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1474            // Cancel at random index
1475            any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1476        ]
1477    }
1478
1479    proptest! {
1480        #![proptest_config(ProptestConfig::with_cases(500))]
1481
1482        /// Fuzz schedule/cancel interleaving.
1483        ///
1484        /// Random sequence of schedule and cancel operations. Invariants:
1485        /// - `len` always matches outstanding active timers
1486        /// - cancel on active handle returns `Some`
1487        /// - poll collects all un-cancelled values
1488        #[test]
1489        fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1490            let now = Instant::now();
1491            let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1492
1493            let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1494            let mut active_values: HashSet<u64> = HashSet::new();
1495            let mut next_id: u64 = 0;
1496
1497            for op in &ops {
1498                match op {
1499                    Op::Schedule { deadline_ms } => {
1500                        let id = next_id;
1501                        next_id += 1;
1502                        let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1503                        handles.push(h);
1504                        active_values.insert(id);
1505                    }
1506                    Op::Cancel { idx } => {
1507                        if !handles.is_empty() {
1508                            let i = idx % handles.len();
1509                            let h = handles.swap_remove(i);
1510                            let val = wheel.cancel(h);
1511                            // Value should be Some (all handles are for active timers)
1512                            let v = val.unwrap();
1513                            assert!(active_values.remove(&v));
1514                        }
1515                    }
1516                }
1517                // len must match active values
1518                prop_assert_eq!(wheel.len(), active_values.len());
1519            }
1520
1521            // Poll everything — should collect exactly the remaining active values
1522            let mut buf = Vec::new();
1523            // Use a far-future time to fire everything
1524            wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1525
1526            // Clean up zombie handles (poll fired them, handles still exist)
1527            for h in handles {
1528                mem::forget(h);
1529            }
1530
1531            let fired_set: HashSet<u64> = buf.into_iter().collect();
1532            prop_assert_eq!(fired_set, active_values);
1533            prop_assert!(wheel.is_empty());
1534        }
1535
1536        /// Fuzz poll timing.
1537        ///
1538        /// Schedule N timers with random deadlines. Poll at random increasing
1539        /// times. Assert every timer fires exactly once, fired deadlines are
1540        /// all <= poll time, unfired deadlines are all > poll time.
1541        #[test]
1542        fn fuzz_poll_timing(
1543            deadlines in proptest::collection::vec(1u64..5000, 1..100),
1544            poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1545        ) {
1546            let now = Instant::now();
1547            let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1548
1549            // Schedule all timers (fire-and-forget)
1550            for (i, &d) in deadlines.iter().enumerate() {
1551                wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1552            }
1553
1554            // Sort poll times to be monotonically increasing
1555            let mut sorted_times: Vec<u64> = poll_times;
1556            sorted_times.sort();
1557            sorted_times.dedup();
1558
1559            let mut all_fired: Vec<u64> = Vec::new();
1560
1561            for &t in &sorted_times {
1562                let mut buf = Vec::new();
1563                wheel.poll(now + Duration::from_millis(t), &mut buf);
1564
1565                // Every fired entry should have deadline_ms <= t
1566                for &id in &buf {
1567                    let deadline_ms = deadlines[id as usize];
1568                    prop_assert!(deadline_ms <= t,
1569                        "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1570                }
1571
1572                all_fired.extend(buf);
1573            }
1574
1575            // Fire everything remaining
1576            let mut final_buf = Vec::new();
1577            wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1578            all_fired.extend(final_buf);
1579
1580            // Every timer should have fired exactly once
1581            all_fired.sort();
1582            let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1583            prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1584            prop_assert!(wheel.is_empty());
1585        }
1586    }
1587}