Skip to main content

nexus_timer/
wheel.rs

1//! Timer wheel — the main data structure.
2//!
3//! `TimerWheel<T, S>` is a multi-level, no-cascade timer wheel. Entries are
4//! placed into a level based on how far in the future their deadline is.
5//! Once placed, an entry never moves — poll checks `deadline_ticks <= now`
6//! per entry.
7
8use std::marker::PhantomData;
9use std::mem;
10use std::time::{Duration, Instant};
11
12use nexus_slab::{Full, bounded, unbounded};
13
14use crate::entry::{EntryPtr, WheelEntry, entry_ref};
15use crate::handle::TimerHandle;
16use crate::level::Level;
17use crate::store::{BoundedStore, SlabStore};
18
19// =============================================================================
20// WheelBuilder (typestate)
21// =============================================================================
22
23/// Builder for configuring a timer wheel.
24///
25/// Defaults match the Linux kernel timer wheel (1ms tick, 64 slots/level,
26/// 8x multiplier, 7 levels → ~4.7 hour range).
27///
28/// # Examples
29///
30/// ```
31/// use std::time::{Duration, Instant};
32/// use nexus_timer::{Wheel, WheelBuilder};
33///
34/// let now = Instant::now();
35///
36/// // All defaults
37/// let wheel: Wheel<u64> = WheelBuilder::default().unbounded(4096).build(now);
38///
39/// // Custom config
40/// let wheel: Wheel<u64> = WheelBuilder::default()
41///     .tick_duration(Duration::from_micros(100))
42///     .slots_per_level(32)
43///     .unbounded(4096)
44///     .build(now);
45/// ```
46#[derive(Debug, Clone, Copy)]
47pub struct WheelBuilder {
48    tick_duration: Duration,
49    slots_per_level: usize,
50    clk_shift: u32,
51    num_levels: usize,
52}
53
54impl Default for WheelBuilder {
55    fn default() -> Self {
56        WheelBuilder {
57            tick_duration: Duration::from_millis(1),
58            slots_per_level: 64,
59            clk_shift: 3,
60            num_levels: 7,
61        }
62    }
63}
64
65impl WheelBuilder {
66    /// Creates a new builder with default configuration.
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Sets the tick duration. Default: 1ms.
72    pub fn tick_duration(mut self, d: Duration) -> Self {
73        self.tick_duration = d;
74        self
75    }
76
77    /// Sets the number of slots per level. Must be a power of 2. Default: 64.
78    pub fn slots_per_level(mut self, n: usize) -> Self {
79        self.slots_per_level = n;
80        self
81    }
82
83    /// Sets the bit shift between levels (multiplier = 2^clk_shift). Default: 3 (8x).
84    pub fn clk_shift(mut self, s: u32) -> Self {
85        self.clk_shift = s;
86        self
87    }
88
89    /// Sets the number of levels. Default: 7.
90    pub fn num_levels(mut self, n: usize) -> Self {
91        self.num_levels = n;
92        self
93    }
94
95    /// Transitions to an unbounded wheel builder.
96    ///
97    /// `chunk_capacity` is the slab chunk size (entries per chunk). The slab
98    /// grows by adding new chunks as needed.
99    pub fn unbounded(self, chunk_capacity: usize) -> UnboundedWheelBuilder {
100        UnboundedWheelBuilder {
101            config: self,
102            chunk_capacity,
103        }
104    }
105
106    /// Transitions to a bounded wheel builder.
107    ///
108    /// `capacity` is the maximum number of concurrent timers.
109    pub fn bounded(self, capacity: usize) -> BoundedWheelBuilder {
110        BoundedWheelBuilder {
111            config: self,
112            capacity,
113        }
114    }
115
116    fn validate(&self) {
117        assert!(
118            self.slots_per_level.is_power_of_two(),
119            "slots_per_level must be a power of 2, got {}",
120            self.slots_per_level
121        );
122        assert!(
123            self.slots_per_level <= 64,
124            "slots_per_level must be <= 64 (u64 bitmask), got {}",
125            self.slots_per_level
126        );
127        assert!(self.num_levels > 0, "num_levels must be > 0");
128        assert!(
129            self.num_levels <= 8,
130            "num_levels must be <= 8 (u8 bitmask), got {}",
131            self.num_levels
132        );
133        assert!(self.clk_shift > 0, "clk_shift must be > 0");
134        assert!(
135            !self.tick_duration.is_zero(),
136            "tick_duration must be non-zero"
137        );
138        let max_shift = (self.num_levels - 1) as u64 * self.clk_shift as u64;
139        assert!(
140            max_shift < 64,
141            "(num_levels - 1) * clk_shift must be < 64, got {}",
142            max_shift
143        );
144        let slots_log2 = self.slots_per_level.trailing_zeros() as u64;
145        assert!(
146            slots_log2 + max_shift < 64,
147            "slots_per_level << max_shift would overflow u64"
148        );
149    }
150
151    fn tick_ns(&self) -> u64 {
152        self.tick_duration.as_nanos() as u64
153    }
154}
155
156/// Terminal builder for an unbounded timer wheel.
157///
158/// Created via [`WheelBuilder::unbounded`]. The only method is `.build()`.
159#[derive(Debug)]
160pub struct UnboundedWheelBuilder {
161    config: WheelBuilder,
162    chunk_capacity: usize,
163}
164
165impl UnboundedWheelBuilder {
166    /// Builds the unbounded timer wheel.
167    ///
168    /// # Panics
169    ///
170    /// Panics if the configuration is invalid (non-power-of-2 slots, zero
171    /// levels, zero clk_shift, or zero tick duration).
172    pub fn build<T: 'static>(self, now: Instant) -> Wheel<T> {
173        self.config.validate();
174        let slab = unbounded::Slab::with_chunk_capacity(self.chunk_capacity);
175        let levels = build_levels::<T>(&self.config);
176        TimerWheel {
177            slab,
178            num_levels: self.config.num_levels,
179            levels,
180            current_ticks: 0,
181            tick_ns: self.config.tick_ns(),
182            epoch: now,
183            active_levels: 0,
184            len: 0,
185            _marker: PhantomData,
186        }
187    }
188}
189
190/// Terminal builder for a bounded timer wheel.
191///
192/// Created via [`WheelBuilder::bounded`]. The only method is `.build()`.
193#[derive(Debug)]
194pub struct BoundedWheelBuilder {
195    config: WheelBuilder,
196    capacity: usize,
197}
198
199impl BoundedWheelBuilder {
200    /// Builds the bounded timer wheel.
201    ///
202    /// # Panics
203    ///
204    /// Panics if the configuration is invalid (non-power-of-2 slots, zero
205    /// levels, zero clk_shift, or zero tick duration).
206    pub fn build<T: 'static>(self, now: Instant) -> BoundedWheel<T> {
207        self.config.validate();
208        let slab = bounded::Slab::with_capacity(self.capacity);
209        let levels = build_levels::<T>(&self.config);
210        TimerWheel {
211            slab,
212            num_levels: self.config.num_levels,
213            levels,
214            current_ticks: 0,
215            tick_ns: self.config.tick_ns(),
216            epoch: now,
217            active_levels: 0,
218            len: 0,
219            _marker: PhantomData,
220        }
221    }
222}
223
224// =============================================================================
225// TimerWheel
226// =============================================================================
227
228/// A multi-level, no-cascade timer wheel.
229///
230/// Generic over:
231/// - `T` — the user payload stored with each timer.
232/// - `S` — the slab storage backend. Defaults to `unbounded::Slab`.
233///
234/// # Thread Safety
235///
236/// `Send` but `!Sync`. Can be moved to a thread at setup but must not
237/// be shared. All internal raw pointers point into owned allocations
238/// (slab chunks, level slot arrays) — moving the wheel moves the heap
239/// data with it.
240pub struct TimerWheel<
241    T: 'static,
242    S: SlabStore<Item = WheelEntry<T>> = unbounded::Slab<WheelEntry<T>>,
243> {
244    slab: S,
245    levels: Vec<Level<T>>,
246    num_levels: usize,
247    active_levels: u8,
248    current_ticks: u64,
249    tick_ns: u64,
250    epoch: Instant,
251    len: usize,
252    _marker: PhantomData<*const ()>, // !Send (overridden below), !Sync
253}
254
255// SAFETY: TimerWheel<T, S> exclusively owns all memory behind its raw pointers.
256//
257// Pointer inventory and ownership:
258// - Slot `entry_head`/`entry_tail` — point into slab-owned memory (SlotCell
259//   in a slab chunk). Slab chunks are Vec<SlotCell<T>> heap allocations.
260// - DLL links (`WheelEntry::prev`, `WheelEntry::next`) — point to other
261//   SlotCells in the same slab.
262// - `Level::slots` — `Box<[WheelSlot<T>]>`, owned by the level.
263//
264// All pointed-to memory lives inside owned collections (Vec, Box<[T]>).
265// When TimerWheel is moved, the heap allocations stay at their addresses —
266// the internal pointers remain valid. No thread-local state. No shared
267// ownership.
268//
269// T: Send is required because timer values cross the thread boundary with
270// the wheel.
271//
272// S is NOT required to be Send. Slab types are !Send (raw pointers, Cell)
273// but the wheel exclusively owns its slab — no shared access, no aliasing.
274// Moving the wheel moves the slab; heap allocations stay at their addresses
275// so internal pointers remain valid.
276//
277// Outstanding TimerHandle<T> values are !Send and cannot follow the wheel
278// across threads. They become inert — consuming them requires &mut
279// TimerWheel which the original thread no longer has. The debug_assert in
280// TimerHandle::drop catches this as a programming error. Worst case is a
281// slot leak (refcount stuck at 1), not unsoundness.
282#[allow(clippy::non_send_fields_in_send_ty)]
283unsafe impl<T: Send + 'static, S: SlabStore<Item = WheelEntry<T>>> Send for TimerWheel<T, S> {}
284
285/// A timer wheel backed by a fixed-capacity slab.
286pub type BoundedWheel<T> = TimerWheel<T, bounded::Slab<WheelEntry<T>>>;
287
288/// A timer wheel backed by a growable slab.
289pub type Wheel<T> = TimerWheel<T, unbounded::Slab<WheelEntry<T>>>;
290
291// =============================================================================
292// Construction
293// =============================================================================
294
295impl<T: 'static> Wheel<T> {
296    /// Creates an unbounded timer wheel with default configuration.
297    ///
298    /// For custom configuration, use [`WheelBuilder`].
299    pub fn unbounded(chunk_capacity: usize, now: Instant) -> Self {
300        WheelBuilder::default().unbounded(chunk_capacity).build(now)
301    }
302}
303
304impl<T: 'static> BoundedWheel<T> {
305    /// Creates a bounded timer wheel with default configuration.
306    ///
307    /// For custom configuration, use [`WheelBuilder`].
308    pub fn bounded(capacity: usize, now: Instant) -> Self {
309        WheelBuilder::default().bounded(capacity).build(now)
310    }
311}
312
313fn build_levels<T: 'static>(config: &WheelBuilder) -> Vec<Level<T>> {
314    (0..config.num_levels)
315        .map(|i| Level::new(config.slots_per_level, i, config.clk_shift))
316        .collect()
317}
318
319// =============================================================================
320// Schedule
321// =============================================================================
322
323impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
324    /// Schedules a timer and returns a handle for cancellation.
325    ///
326    /// The handle must be consumed via [`cancel`](Self::cancel) or
327    /// [`free`](Self::free). Dropping it is a programming error.
328    ///
329    /// # Panics
330    ///
331    /// Panics if the backing slab is at capacity (bounded slabs only).
332    /// This is a capacity planning error — size your wheel for peak load.
333    pub fn schedule(&mut self, deadline: Instant, value: T) -> TimerHandle<T> {
334        let deadline_ticks = self.instant_to_ticks(deadline);
335        let entry = WheelEntry::new(deadline_ticks, value, 2);
336        let slot = self.slab.alloc(entry);
337        let ptr = slot.as_ptr();
338        self.insert_entry(ptr, deadline_ticks);
339        self.len += 1;
340        TimerHandle::new(ptr)
341    }
342
343    /// Schedules a fire-and-forget timer (no handle returned).
344    ///
345    /// The timer will fire during poll and the value will be collected.
346    /// Cannot be cancelled.
347    ///
348    /// # Panics
349    ///
350    /// Panics if the backing slab is at capacity (bounded slabs only).
351    /// This is a capacity planning error — size your wheel for peak load.
352    pub fn schedule_forget(&mut self, deadline: Instant, value: T) {
353        let deadline_ticks = self.instant_to_ticks(deadline);
354        let entry = WheelEntry::new(deadline_ticks, value, 1);
355        let slot = self.slab.alloc(entry);
356        let ptr = slot.as_ptr();
357        self.insert_entry(ptr, deadline_ticks);
358        self.len += 1;
359    }
360}
361
362// =============================================================================
363// Schedule — fallible (bounded slabs only)
364// =============================================================================
365
366impl<T: 'static, S: BoundedStore<Item = WheelEntry<T>>>
367    TimerWheel<T, S>
368{
369    /// Attempts to schedule a timer, returning a handle on success.
370    ///
371    /// Returns `Err(Full(value))` if the slab is at capacity. Use this
372    /// when you need graceful error handling. For the common case where
373    /// capacity exhaustion is fatal, use [`schedule`](Self::schedule).
374    pub fn try_schedule(&mut self, deadline: Instant, value: T) -> Result<TimerHandle<T>, Full<T>> {
375        let deadline_ticks = self.instant_to_ticks(deadline);
376        let entry = WheelEntry::new(deadline_ticks, value, 2);
377        match self.slab.try_alloc(entry) {
378            Ok(slot) => {
379                let ptr = slot.as_ptr();
380                self.insert_entry(ptr, deadline_ticks);
381                self.len += 1;
382                Ok(TimerHandle::new(ptr))
383            }
384            Err(full) => {
385                // Extract the user's T from the WheelEntry wrapper
386                // SAFETY: we just constructed this entry, take_value is valid
387                let wheel_entry = full.into_inner();
388                let value = unsafe { wheel_entry.take_value() }
389                    .expect("entry was just constructed with Some(value)");
390                Err(Full(value))
391            }
392        }
393    }
394
395    /// Attempts to schedule a fire-and-forget timer.
396    ///
397    /// Returns `Err(Full(value))` if the slab is at capacity. Use this
398    /// when you need graceful error handling. For the common case where
399    /// capacity exhaustion is fatal, use [`schedule_forget`](Self::schedule_forget).
400    pub fn try_schedule_forget(&mut self, deadline: Instant, value: T) -> Result<(), Full<T>> {
401        let deadline_ticks = self.instant_to_ticks(deadline);
402        let entry = WheelEntry::new(deadline_ticks, value, 1);
403        match self.slab.try_alloc(entry) {
404            Ok(slot) => {
405                let ptr = slot.as_ptr();
406                self.insert_entry(ptr, deadline_ticks);
407                self.len += 1;
408                Ok(())
409            }
410            Err(full) => {
411                let wheel_entry = full.into_inner();
412                let value = unsafe { wheel_entry.take_value() }
413                    .expect("entry was just constructed with Some(value)");
414                Err(Full(value))
415            }
416        }
417    }
418}
419
420// =============================================================================
421// Cancel / Free / Poll / Query — generic over any store
422// =============================================================================
423
424impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> TimerWheel<T, S> {
425    /// Cancels a timer and returns its value.
426    ///
427    /// - If the timer is still active: unlinks from the wheel, extracts value,
428    ///   frees the slab entry. Returns `Some(T)`.
429    /// - If the timer already fired (zombie handle): frees the slab entry.
430    ///   Returns `None`.
431    ///
432    /// Consumes the handle (no Drop runs).
433    pub fn cancel(&mut self, handle: TimerHandle<T>) -> Option<T> {
434        let ptr = handle.ptr;
435        // Consume handle without running Drop
436        mem::forget(handle);
437
438        // SAFETY: handle guarantees ptr is valid and allocated from our slab.
439        let entry = unsafe { entry_ref(ptr) };
440        let refs = entry.refs();
441
442        if refs == 2 {
443            // Active timer with handle — unlink, extract, free
444            let value = unsafe { entry.take_value() };
445            self.remove_entry(ptr);
446            self.len -= 1;
447            // SAFETY: ptr was allocated from our slab, entry is now spent
448            unsafe { self.slab.free_ptr(ptr) };
449            value
450        } else {
451            // refs == 1 means the wheel already fired this (zombie).
452            // The fire path decremented 2→1 and left the entry for us to free.
453            debug_assert_eq!(refs, 1, "unexpected refcount {refs} in cancel");
454            // SAFETY: ptr was allocated from our slab
455            unsafe { self.slab.free_ptr(ptr) };
456            None
457        }
458    }
459
460    /// Releases a timer handle without cancelling.
461    ///
462    /// - If the timer is still active: converts to fire-and-forget (refs 2→1).
463    ///   Timer stays in the wheel and will fire normally during poll.
464    /// - If the timer already fired (zombie): frees the slab entry (refs 1→0).
465    ///
466    /// Consumes the handle (no Drop runs).
467    pub fn free(&mut self, handle: TimerHandle<T>) {
468        let ptr = handle.ptr;
469        mem::forget(handle);
470
471        // SAFETY: handle guarantees ptr is valid
472        let entry = unsafe { entry_ref(ptr) };
473        let new_refs = entry.dec_refs();
474
475        if new_refs == 0 {
476            // Was a zombie (fired already, refs was 1) — free the entry
477            // SAFETY: ptr was allocated from our slab
478            unsafe { self.slab.free_ptr(ptr) };
479        }
480        // new_refs == 1: timer is now fire-and-forget, stays in wheel
481    }
482
483    /// Reschedules an active timer to a new deadline.
484    ///
485    /// Moves the entry from its current slot to the correct slot for
486    /// `new_deadline` without extracting or reconstructing the value.
487    ///
488    /// # Panics
489    ///
490    /// Panics if the timer has already fired (zombie handle). Only active
491    /// timers (refs == 2) can be rescheduled.
492    ///
493    /// Consumes and returns a new handle (same entry, new position).
494    pub fn reschedule(&mut self, handle: TimerHandle<T>, new_deadline: Instant) -> TimerHandle<T> {
495        let ptr = handle.ptr;
496        mem::forget(handle);
497
498        // SAFETY: handle guarantees ptr is valid
499        let entry = unsafe { entry_ref(ptr) };
500        assert_eq!(entry.refs(), 2, "cannot reschedule a fired timer");
501
502        // Remove from current position
503        self.remove_entry(ptr);
504
505        // Update deadline and reinsert
506        let new_ticks = self.instant_to_ticks(new_deadline);
507        entry.set_deadline_ticks(new_ticks);
508        self.insert_entry(ptr, new_ticks);
509
510        TimerHandle::new(ptr)
511    }
512
513    /// Fires all expired timers, collecting their values into `buf`.
514    ///
515    /// Returns the number of timers fired.
516    pub fn poll(&mut self, now: Instant, buf: &mut Vec<T>) -> usize {
517        self.poll_with_limit(now, usize::MAX, buf)
518    }
519
520    /// Fires expired timers up to `limit`, collecting values into `buf`.
521    ///
522    /// Resumable: if the limit is hit, the next call continues where this one
523    /// left off (as long as `now` hasn't changed).
524    ///
525    /// Returns the number of timers fired in this call.
526    pub fn poll_with_limit(&mut self, now: Instant, limit: usize, buf: &mut Vec<T>) -> usize {
527        let now_ticks = self.instant_to_ticks(now);
528        self.current_ticks = now_ticks;
529
530        let mut fired = 0;
531        let mut mask = self.active_levels;
532
533        while mask != 0 && fired < limit {
534            let lvl_idx = mask.trailing_zeros() as usize;
535            mask &= mask - 1; // clear lowest set bit
536            fired += self.poll_level(lvl_idx, now_ticks, limit - fired, buf);
537        }
538        fired
539    }
540
541    /// Returns the `Instant` of the next timer that will fire, or `None` if empty.
542    ///
543    /// Walks only active (non-empty) slots. O(active_slots) in the worst case,
544    /// but typically very fast because most slots are empty.
545    pub fn next_deadline(&self) -> Option<Instant> {
546        let mut min_ticks: Option<u64> = None;
547
548        let mut lvl_mask = self.active_levels;
549        while lvl_mask != 0 {
550            let lvl_idx = lvl_mask.trailing_zeros() as usize;
551            lvl_mask &= lvl_mask - 1;
552
553            let level = &self.levels[lvl_idx];
554            let mut slot_mask = level.active_slots();
555            while slot_mask != 0 {
556                let slot_idx = slot_mask.trailing_zeros() as usize;
557                slot_mask &= slot_mask - 1;
558
559                let slot = level.slot(slot_idx);
560                let mut entry_ptr = slot.entry_head();
561
562                while !entry_ptr.is_null() {
563                    // SAFETY: entry_ptr is in this slot's DLL
564                    let entry = unsafe { entry_ref(entry_ptr) };
565                    let dt = entry.deadline_ticks();
566                    min_ticks = Some(min_ticks.map_or(dt, |current| current.min(dt)));
567                    entry_ptr = entry.next();
568                }
569            }
570        }
571
572        min_ticks.map(|t| self.ticks_to_instant(t))
573    }
574
575    /// Returns the number of timers currently in the wheel.
576    #[inline]
577    pub fn len(&self) -> usize {
578        self.len
579    }
580
581    /// Returns true if the wheel contains no timers.
582    #[inline]
583    pub fn is_empty(&self) -> bool {
584        self.len == 0
585    }
586
587    // =========================================================================
588    // Internal: tick conversion
589    // =========================================================================
590
591    #[inline]
592    fn instant_to_ticks(&self, instant: Instant) -> u64 {
593        // Saturate at 0 for instants before epoch
594        let dur = instant.saturating_duration_since(self.epoch);
595        dur.as_nanos() as u64 / self.tick_ns
596    }
597
598    #[inline]
599    fn ticks_to_instant(&self, ticks: u64) -> Instant {
600        self.epoch + Duration::from_nanos(ticks * self.tick_ns)
601    }
602
603    // =========================================================================
604    // Internal: level selection
605    // =========================================================================
606
607    /// Selects the appropriate level for a deadline.
608    ///
609    /// Walks levels from finest to coarsest, picking the first level whose
610    /// range can represent the delta. Clamps to the highest level if the
611    /// deadline exceeds the wheel's total range.
612    #[inline]
613    fn select_level(&self, deadline_ticks: u64) -> usize {
614        let delta = deadline_ticks.saturating_sub(self.current_ticks);
615
616        for (i, level) in self.levels.iter().enumerate() {
617            if delta < level.range() {
618                return i;
619            }
620        }
621
622        // Beyond max range — clamp to highest level
623        self.num_levels - 1
624    }
625
626    // =========================================================================
627    // Internal: entry insertion into a level's slot
628    // =========================================================================
629
630    /// Inserts an entry into the appropriate level and slot.
631    ///
632    /// Records the level and slot index on the entry so `remove_entry` can
633    /// find it without recomputing (which would be unsound after time advances).
634    #[inline]
635    #[allow(clippy::needless_pass_by_ref_mut)]
636    fn insert_entry(&mut self, entry_ptr: EntryPtr<T>, deadline_ticks: u64) {
637        let lvl_idx = self.select_level(deadline_ticks);
638        let slot_idx = self.levels[lvl_idx].slot_index(deadline_ticks);
639
640        // Record location on the entry for O(1) lookup at cancel time.
641        // SAFETY: entry_ptr is valid (just allocated)
642        let entry = unsafe { entry_ref(entry_ptr) };
643        entry.set_location(lvl_idx as u8, slot_idx as u16);
644
645        // SAFETY: entry_ptr is valid (just allocated), not in any DLL yet
646        unsafe { self.levels[lvl_idx].slot(slot_idx).push_entry(entry_ptr) };
647
648        // Activate slot and level (idempotent — OR is a no-op if already set)
649        self.levels[lvl_idx].activate_slot(slot_idx);
650        self.active_levels |= 1 << lvl_idx;
651    }
652
653    /// Removes an entry from its level's slot DLL.
654    ///
655    /// Reads the stored level and slot index from the entry (set at insertion
656    /// time). Does NOT recompute from delta — that would be unsound after
657    /// `current_ticks` advances.
658    #[inline]
659    #[allow(clippy::needless_pass_by_ref_mut)]
660    fn remove_entry(&mut self, entry_ptr: EntryPtr<T>) {
661        // SAFETY: entry_ptr is valid (caller guarantee)
662        let entry = unsafe { entry_ref(entry_ptr) };
663
664        let lvl_idx = entry.level() as usize;
665        let slot_idx = entry.slot_idx() as usize;
666
667        // SAFETY: entry_ptr is in this slot's DLL (invariant from insert_entry)
668        unsafe { self.levels[lvl_idx].slot(slot_idx).remove_entry(entry_ptr) };
669
670        if self.levels[lvl_idx].slot(slot_idx).is_empty() {
671            self.levels[lvl_idx].deactivate_slot(slot_idx);
672            if !self.levels[lvl_idx].is_active() {
673                self.active_levels &= !(1 << lvl_idx);
674            }
675        }
676    }
677
678    // =========================================================================
679    // Internal: fire an entry
680    // =========================================================================
681
682    /// Fires a single entry: extracts value, decrements refcount, possibly frees.
683    ///
684    /// Returns `Some(T)` if the value was still present (not already cancelled).
685    #[inline]
686    fn fire_entry(&mut self, entry_ptr: EntryPtr<T>) -> Option<T> {
687        // SAFETY: entry_ptr is valid (we're walking the DLL)
688        let entry = unsafe { entry_ref(entry_ptr) };
689
690        // Extract value
691        // SAFETY: single-threaded
692        let value = unsafe { entry.take_value() };
693
694        let new_refs = entry.dec_refs();
695        if new_refs == 0 {
696            // Fire-and-forget (was refs=1) — free the slab entry immediately
697            // SAFETY: entry_ptr was allocated from our slab
698            unsafe { self.slab.free_ptr(entry_ptr) };
699        }
700        // new_refs == 1: handle exists (was refs=2), entry becomes zombie.
701        // Handle holder will free via cancel() or free().
702
703        self.len -= 1;
704        value
705    }
706
707    // =========================================================================
708    // Internal: poll a single level
709    // =========================================================================
710
711    /// Polls a single level for expired entries up to `limit`.
712    ///
713    fn poll_level(
714        &mut self,
715        lvl_idx: usize,
716        now_ticks: u64,
717        limit: usize,
718        buf: &mut Vec<T>,
719    ) -> usize {
720        let mut fired = 0;
721        let mut mask = self.levels[lvl_idx].active_slots();
722
723        while mask != 0 && fired < limit {
724            let slot_idx = mask.trailing_zeros() as usize;
725            mask &= mask - 1;
726
727            let slot_ptr = self.levels[lvl_idx].slot(slot_idx) as *const crate::level::WheelSlot<T>;
728            // SAFETY: slot_ptr points into self.levels[lvl_idx].slots
729            // (Box<[WheelSlot<T>]>), a stable heap allocation. fire_entry
730            // only mutates self.slab and self.len, not self.levels.
731            let slot = unsafe { &*slot_ptr };
732            let mut entry_ptr = slot.entry_head();
733
734            while !entry_ptr.is_null() && fired < limit {
735                // SAFETY: entry_ptr is in this slot's DLL
736                let entry = unsafe { entry_ref(entry_ptr) };
737                let next_entry = entry.next();
738
739                if entry.deadline_ticks() <= now_ticks {
740                    unsafe { slot.remove_entry(entry_ptr) };
741
742                    if let Some(value) = self.fire_entry(entry_ptr) {
743                        buf.push(value);
744                    }
745                    fired += 1;
746                }
747
748                entry_ptr = next_entry;
749            }
750
751            if slot.is_empty() {
752                self.levels[lvl_idx].deactivate_slot(slot_idx);
753            }
754        }
755
756        // Deactivate level if all slots drained
757        if !self.levels[lvl_idx].is_active() {
758            self.active_levels &= !(1 << lvl_idx);
759        }
760
761        fired
762    }
763}
764
765// =============================================================================
766// Drop
767// =============================================================================
768
769impl<T: 'static, S: SlabStore<Item = WheelEntry<T>>> Drop for TimerWheel<T, S> {
770    fn drop(&mut self) {
771        // Walk active levels and slots via bitmasks, free every entry.
772        let mut lvl_mask = self.active_levels;
773        while lvl_mask != 0 {
774            let lvl_idx = lvl_mask.trailing_zeros() as usize;
775            lvl_mask &= lvl_mask - 1;
776
777            let level = &self.levels[lvl_idx];
778            let mut slot_mask = level.active_slots();
779            while slot_mask != 0 {
780                let slot_idx = slot_mask.trailing_zeros() as usize;
781                slot_mask &= slot_mask - 1;
782
783                let slot = level.slot(slot_idx);
784                let mut entry_ptr = slot.entry_head();
785                while !entry_ptr.is_null() {
786                    // SAFETY: entry_ptr is in this slot's DLL
787                    let entry = unsafe { entry_ref(entry_ptr) };
788                    let next_entry = entry.next();
789
790                    // SAFETY: entry_ptr was allocated from our slab
791                    unsafe { self.slab.free(nexus_slab::Slot::from_ptr(entry_ptr)) };
792
793                    entry_ptr = next_entry;
794                }
795            }
796        }
797    }
798}
799
800// =============================================================================
801// Tests
802// =============================================================================
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use std::time::{Duration, Instant};
808
809    fn ms(millis: u64) -> Duration {
810        Duration::from_millis(millis)
811    }
812
813    // -------------------------------------------------------------------------
814    // Thread safety
815    // -------------------------------------------------------------------------
816
817    fn _assert_send<T: Send>() {}
818
819    #[test]
820    fn wheel_is_send() {
821        _assert_send::<Wheel<u64>>();
822        _assert_send::<BoundedWheel<u64>>();
823    }
824
825    // -------------------------------------------------------------------------
826    // Construction
827    // -------------------------------------------------------------------------
828
829    #[test]
830    fn default_config() {
831        let now = Instant::now();
832        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
833        assert!(wheel.is_empty());
834        assert_eq!(wheel.len(), 0);
835    }
836
837    #[test]
838    fn bounded_construction() {
839        let now = Instant::now();
840        let wheel: BoundedWheel<u64> = BoundedWheel::bounded(128, now);
841        assert!(wheel.is_empty());
842    }
843
844    #[test]
845    #[should_panic(expected = "slots_per_level must be a power of 2")]
846    fn invalid_config_non_power_of_two() {
847        let now = Instant::now();
848        WheelBuilder::default()
849            .slots_per_level(65)
850            .unbounded(1024)
851            .build::<u64>(now);
852    }
853
854    // -------------------------------------------------------------------------
855    // Schedule + Cancel
856    // -------------------------------------------------------------------------
857
858    #[test]
859    fn schedule_and_cancel() {
860        let now = Instant::now();
861        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
862
863        let h = wheel.schedule(now + ms(50), 42);
864        assert_eq!(wheel.len(), 1);
865
866        let val = wheel.cancel(h);
867        assert_eq!(val, Some(42));
868        assert_eq!(wheel.len(), 0);
869    }
870
871    #[test]
872    fn schedule_forget_fires() {
873        let now = Instant::now();
874        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
875
876        wheel.schedule_forget(now + ms(10), 99);
877        assert_eq!(wheel.len(), 1);
878
879        let mut buf = Vec::new();
880        let fired = wheel.poll(now + ms(20), &mut buf);
881        assert_eq!(fired, 1);
882        assert_eq!(buf, vec![99]);
883        assert_eq!(wheel.len(), 0);
884    }
885
886    #[test]
887    fn cancel_after_fire_returns_none() {
888        let now = Instant::now();
889        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
890
891        let h = wheel.schedule(now + ms(10), 42);
892
893        let mut buf = Vec::new();
894        wheel.poll(now + ms(20), &mut buf);
895        assert_eq!(buf, vec![42]);
896
897        // Handle is now a zombie
898        let val = wheel.cancel(h);
899        assert_eq!(val, None);
900    }
901
902    #[test]
903    fn free_active_timer_becomes_fire_and_forget() {
904        let now = Instant::now();
905        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
906
907        let h = wheel.schedule(now + ms(10), 42);
908        wheel.free(h); // releases handle, timer stays
909        assert_eq!(wheel.len(), 1);
910
911        let mut buf = Vec::new();
912        wheel.poll(now + ms(20), &mut buf);
913        assert_eq!(buf, vec![42]);
914        assert_eq!(wheel.len(), 0);
915    }
916
917    #[test]
918    fn free_zombie_handle() {
919        let now = Instant::now();
920        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
921
922        let h = wheel.schedule(now + ms(10), 42);
923
924        let mut buf = Vec::new();
925        wheel.poll(now + ms(20), &mut buf);
926
927        // Handle is zombie, free should clean up
928        wheel.free(h);
929    }
930
931    // -------------------------------------------------------------------------
932    // Bounded wheel
933    // -------------------------------------------------------------------------
934
935    #[test]
936    fn bounded_full() {
937        let now = Instant::now();
938        let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(2, now);
939
940        let h1 = wheel.try_schedule(now + ms(10), 1).unwrap();
941        let h2 = wheel.try_schedule(now + ms(20), 2).unwrap();
942
943        let err = wheel.try_schedule(now + ms(30), 3);
944        assert!(err.is_err());
945        let recovered = err.unwrap_err().into_inner();
946        assert_eq!(recovered, 3);
947
948        // Cancel one, should have room
949        wheel.cancel(h1);
950        let h3 = wheel.try_schedule(now + ms(30), 3).unwrap();
951
952        // Clean up handles
953        wheel.free(h2);
954        wheel.free(h3);
955    }
956
957    #[test]
958    fn bounded_schedule_forget_full() {
959        let now = Instant::now();
960        let mut wheel: BoundedWheel<u64> = BoundedWheel::bounded(1, now);
961
962        wheel.try_schedule_forget(now + ms(10), 1).unwrap();
963        let err = wheel.try_schedule_forget(now + ms(20), 2);
964        assert!(err.is_err());
965    }
966
967    // -------------------------------------------------------------------------
968    // Poll
969    // -------------------------------------------------------------------------
970
971    #[test]
972    fn poll_respects_deadline() {
973        let now = Instant::now();
974        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
975
976        wheel.schedule_forget(now + ms(10), 1);
977        wheel.schedule_forget(now + ms(50), 2);
978        wheel.schedule_forget(now + ms(100), 3);
979
980        let mut buf = Vec::new();
981
982        // At 20ms: only timer 1 should fire
983        let fired = wheel.poll(now + ms(20), &mut buf);
984        assert_eq!(fired, 1);
985        assert_eq!(buf, vec![1]);
986        assert_eq!(wheel.len(), 2);
987
988        // At 60ms: timer 2 fires
989        buf.clear();
990        let fired = wheel.poll(now + ms(60), &mut buf);
991        assert_eq!(fired, 1);
992        assert_eq!(buf, vec![2]);
993
994        // At 200ms: timer 3 fires
995        buf.clear();
996        let fired = wheel.poll(now + ms(200), &mut buf);
997        assert_eq!(fired, 1);
998        assert_eq!(buf, vec![3]);
999
1000        assert!(wheel.is_empty());
1001    }
1002
1003    #[test]
1004    fn poll_with_limit() {
1005        let now = Instant::now();
1006        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1007
1008        for i in 0..10 {
1009            wheel.schedule_forget(now + ms(1), i);
1010        }
1011
1012        let mut buf = Vec::new();
1013
1014        // Fire 3 at a time
1015        let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1016        assert_eq!(fired, 3);
1017        assert_eq!(wheel.len(), 7);
1018
1019        let fired = wheel.poll_with_limit(now + ms(5), 3, &mut buf);
1020        assert_eq!(fired, 3);
1021        assert_eq!(wheel.len(), 4);
1022
1023        // Fire remaining
1024        let fired = wheel.poll(now + ms(5), &mut buf);
1025        assert_eq!(fired, 4);
1026        assert!(wheel.is_empty());
1027        assert_eq!(buf.len(), 10);
1028    }
1029
1030    // -------------------------------------------------------------------------
1031    // Multi-level
1032    // -------------------------------------------------------------------------
1033
1034    #[test]
1035    fn timers_across_levels() {
1036        let now = Instant::now();
1037        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1038
1039        // Level 0: 0-63ms
1040        wheel.schedule_forget(now + ms(5), 0);
1041        // Level 1: 64-511ms
1042        wheel.schedule_forget(now + ms(200), 1);
1043        // Level 2: 512-4095ms
1044        wheel.schedule_forget(now + ms(1000), 2);
1045
1046        let mut buf = Vec::new();
1047
1048        wheel.poll(now + ms(10), &mut buf);
1049        assert_eq!(buf, vec![0]);
1050
1051        buf.clear();
1052        wheel.poll(now + ms(250), &mut buf);
1053        assert_eq!(buf, vec![1]);
1054
1055        buf.clear();
1056        wheel.poll(now + ms(1500), &mut buf);
1057        assert_eq!(buf, vec![2]);
1058
1059        assert!(wheel.is_empty());
1060    }
1061
1062    // -------------------------------------------------------------------------
1063    // next_deadline
1064    // -------------------------------------------------------------------------
1065
1066    #[test]
1067    fn next_deadline_empty() {
1068        let now = Instant::now();
1069        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1070        assert!(wheel.next_deadline().is_none());
1071    }
1072
1073    #[test]
1074    fn next_deadline_returns_earliest() {
1075        let now = Instant::now();
1076        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1077
1078        wheel.schedule_forget(now + ms(100), 1);
1079        wheel.schedule_forget(now + ms(50), 2);
1080        wheel.schedule_forget(now + ms(200), 3);
1081
1082        let next = wheel.next_deadline().unwrap();
1083        // Should be close to now + 50ms (within tick granularity)
1084        let delta = next.duration_since(now);
1085        assert!(delta >= ms(49) && delta <= ms(51));
1086    }
1087
1088    // -------------------------------------------------------------------------
1089    // Deadline in the past
1090    // -------------------------------------------------------------------------
1091
1092    #[test]
1093    fn deadline_in_the_past_fires_immediately() {
1094        let now = Instant::now();
1095        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1096
1097        // Schedule at epoch (which is "now" at construction)
1098        wheel.schedule_forget(now, 42);
1099
1100        let mut buf = Vec::new();
1101        let fired = wheel.poll(now + ms(1), &mut buf);
1102        assert_eq!(fired, 1);
1103        assert_eq!(buf, vec![42]);
1104    }
1105
1106    // -------------------------------------------------------------------------
1107    // Deadline beyond max range — clamped
1108    // -------------------------------------------------------------------------
1109
1110    #[test]
1111    fn deadline_beyond_max_range_clamped() {
1112        let now = Instant::now();
1113        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1114
1115        // Way in the future — should clamp to highest level
1116        let h = wheel.schedule(now + Duration::from_secs(100_000), 99);
1117        assert_eq!(wheel.len(), 1);
1118
1119        // Won't fire at any reasonable time but will fire when enough ticks pass
1120        let mut buf = Vec::new();
1121        wheel.poll(now + Duration::from_secs(100_001), &mut buf);
1122        assert_eq!(buf, vec![99]);
1123
1124        // Note: handle was already consumed by the poll (fire-and-forget path won't
1125        // apply since refs=2). Actually the handle still exists. Let's clean up.
1126        // The timer already fired, so cancel returns None.
1127        // Actually buf got the value, which means it fired. But handle still needs cleanup.
1128        // We already pushed the value so we need to handle the zombie.
1129        // Wait — we used schedule (refs=2), poll fired it (refs 2→1 zombie), handle `h` exists.
1130        // Actually we consumed it with the poll — no we didn't, we still have `h`.
1131
1132        // h is a zombie handle now
1133        let val = wheel.cancel(h);
1134        assert_eq!(val, None);
1135    }
1136
1137    // -------------------------------------------------------------------------
1138    // Drop
1139    // -------------------------------------------------------------------------
1140
1141    #[test]
1142    fn drop_cleans_up_active_entries() {
1143        let now = Instant::now();
1144        let mut wheel: Wheel<String> = Wheel::unbounded(1024, now);
1145
1146        for i in 0..100 {
1147            wheel.schedule_forget(now + ms(i * 10), format!("timer-{i}"));
1148        }
1149
1150        assert_eq!(wheel.len(), 100);
1151        // Drop should free all entries without leaking
1152        drop(wheel);
1153    }
1154
1155    #[test]
1156    fn drop_with_outstanding_handles() {
1157        let now = Instant::now();
1158        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1159
1160        // Schedule but DON'T cancel — just free the handles
1161        let h1 = wheel.schedule(now + ms(10), 1);
1162        let h2 = wheel.schedule(now + ms(20), 2);
1163
1164        // Free the handles (convert to fire-and-forget) so they don't debug_assert
1165        wheel.free(h1);
1166        wheel.free(h2);
1167
1168        // Drop the wheel — should clean up the entries
1169        drop(wheel);
1170    }
1171
1172    // -------------------------------------------------------------------------
1173    // Level selection
1174    // -------------------------------------------------------------------------
1175
1176    #[test]
1177    fn level_selection_boundaries() {
1178        let now = Instant::now();
1179        let wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1180
1181        // Level 0: delta < 64
1182        assert_eq!(wheel.select_level(0), 0);
1183        assert_eq!(wheel.select_level(63), 0);
1184
1185        // Level 1: 64 <= delta < 512
1186        assert_eq!(wheel.select_level(64), 1);
1187        assert_eq!(wheel.select_level(511), 1);
1188
1189        // Level 2: 512 <= delta < 4096
1190        assert_eq!(wheel.select_level(512), 2);
1191    }
1192
1193    // -------------------------------------------------------------------------
1194    // Bug fix validation: cancel after time advance
1195    // -------------------------------------------------------------------------
1196
1197    #[test]
1198    fn cancel_after_time_advance() {
1199        // The critical bug: schedule at T+500ms (level 2, delta=500 ticks),
1200        // poll at T+400ms (no fire, but current_ticks advances to 400),
1201        // cancel at T+400ms. Old code would recompute delta = 500-400 = 100
1202        // → level 1. But the entry is in level 2. Stored location fixes this.
1203        let now = Instant::now();
1204        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1205
1206        let h = wheel.schedule(now + ms(500), 42);
1207        assert_eq!(wheel.len(), 1);
1208
1209        // Advance time — timer doesn't fire (deadline is 500ms)
1210        let mut buf = Vec::new();
1211        let fired = wheel.poll(now + ms(400), &mut buf);
1212        assert_eq!(fired, 0);
1213        assert!(buf.is_empty());
1214
1215        // Cancel after time advance — must find the entry in the correct slot
1216        let val = wheel.cancel(h);
1217        assert_eq!(val, Some(42));
1218        assert_eq!(wheel.len(), 0);
1219    }
1220
1221    // -------------------------------------------------------------------------
1222    // Same-slot entries
1223    // -------------------------------------------------------------------------
1224
1225    #[test]
1226    fn multiple_entries_same_slot() {
1227        let now = Instant::now();
1228        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1229
1230        // All 5 timers at the same deadline → same slot
1231        let mut handles = Vec::new();
1232        for i in 0..5 {
1233            handles.push(wheel.schedule(now + ms(10), i));
1234        }
1235        assert_eq!(wheel.len(), 5);
1236
1237        // Cancel the middle ones
1238        let v2 = wheel.cancel(handles.remove(2));
1239        assert_eq!(v2, Some(2));
1240        let v0 = wheel.cancel(handles.remove(0));
1241        assert_eq!(v0, Some(0));
1242        assert_eq!(wheel.len(), 3);
1243
1244        // Poll — remaining 3 should fire
1245        let mut buf = Vec::new();
1246        let fired = wheel.poll(now + ms(20), &mut buf);
1247        assert_eq!(fired, 3);
1248
1249        // Clean up zombie handles
1250        for h in handles {
1251            let val = wheel.cancel(h);
1252            assert_eq!(val, None); // already fired
1253        }
1254    }
1255
1256    // -------------------------------------------------------------------------
1257    // Level boundary
1258    // -------------------------------------------------------------------------
1259
1260    #[test]
1261    fn entry_at_level_boundary() {
1262        // Default config: level 0 range = 64 ticks (64ms).
1263        // A deadline at exactly tick 64 should go to level 1, not level 0.
1264        let now = Instant::now();
1265        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1266
1267        let h = wheel.schedule(now + ms(64), 99);
1268        assert_eq!(wheel.len(), 1);
1269
1270        // Should NOT fire at 63ms
1271        let mut buf = Vec::new();
1272        let fired = wheel.poll(now + ms(63), &mut buf);
1273        assert_eq!(fired, 0);
1274
1275        // Should fire at 64ms
1276        let fired = wheel.poll(now + ms(65), &mut buf);
1277        assert_eq!(fired, 1);
1278        assert_eq!(buf, vec![99]);
1279
1280        // Clean up zombie handle
1281        wheel.cancel(h);
1282    }
1283
1284    // -------------------------------------------------------------------------
1285    // Bookmark/resumption with mixed expiry
1286    // -------------------------------------------------------------------------
1287
1288    #[test]
1289    fn poll_with_limit_mixed_expiry() {
1290        let now = Instant::now();
1291        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1292
1293        // 3 expired at poll time, 2 not
1294        wheel.schedule_forget(now + ms(5), 1);
1295        wheel.schedule_forget(now + ms(5), 2);
1296        wheel.schedule_forget(now + ms(5), 3);
1297        wheel.schedule_forget(now + ms(500), 4); // not expired
1298        wheel.schedule_forget(now + ms(500), 5); // not expired
1299        assert_eq!(wheel.len(), 5);
1300
1301        let mut buf = Vec::new();
1302
1303        // Fire 2 of the 3 expired
1304        let fired = wheel.poll_with_limit(now + ms(10), 2, &mut buf);
1305        assert_eq!(fired, 2);
1306        assert_eq!(wheel.len(), 3);
1307
1308        // Fire remaining expired (1 more)
1309        let fired = wheel.poll_with_limit(now + ms(10), 5, &mut buf);
1310        assert_eq!(fired, 1);
1311        assert_eq!(wheel.len(), 2);
1312
1313        // The 2 unexpired should still be there
1314        assert_eq!(buf.len(), 3);
1315    }
1316
1317    // -------------------------------------------------------------------------
1318    // Re-add after drain
1319    // -------------------------------------------------------------------------
1320
1321    #[test]
1322    fn reuse_after_full_drain() {
1323        let now = Instant::now();
1324        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1325
1326        // Round 1: schedule and drain
1327        for i in 0..10 {
1328            wheel.schedule_forget(now + ms(1), i);
1329        }
1330        let mut buf = Vec::new();
1331        wheel.poll(now + ms(5), &mut buf);
1332        assert_eq!(buf.len(), 10);
1333        assert!(wheel.is_empty());
1334
1335        // Round 2: schedule and drain again — wheel must work normally
1336        buf.clear();
1337        for i in 10..20 {
1338            wheel.schedule_forget(now + ms(100), i);
1339        }
1340        assert_eq!(wheel.len(), 10);
1341
1342        wheel.poll(now + ms(200), &mut buf);
1343        assert_eq!(buf.len(), 10);
1344        assert!(wheel.is_empty());
1345    }
1346
1347    // -------------------------------------------------------------------------
1348    // All levels active simultaneously
1349    // -------------------------------------------------------------------------
1350
1351    #[test]
1352    fn all_levels_active() {
1353        let now = Instant::now();
1354        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1355
1356        // Schedule one timer per level with increasing distances.
1357        // Level 0: <64ms, Level 1: 64-511ms, Level 2: 512-4095ms, etc.
1358        let distances = [10, 100, 1000, 5000, 40_000, 300_000, 3_000_000];
1359        let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1360        for (i, &d) in distances.iter().enumerate() {
1361            handles.push(wheel.schedule(now + ms(d), i as u64));
1362        }
1363        assert_eq!(wheel.len(), 7);
1364
1365        // Cancel in a shuffled order: 4, 1, 6, 0, 3, 5, 2
1366        let order = [4, 1, 6, 0, 3, 5, 2];
1367        // Take ownership by swapping with dummies — actually we need to
1368        // cancel by index. Let's use Option to track.
1369        let mut opt_handles: Vec<Option<TimerHandle<u64>>> =
1370            handles.into_iter().map(Some).collect();
1371
1372        for &idx in &order {
1373            let h = opt_handles[idx].take().unwrap();
1374            let val = wheel.cancel(h);
1375            assert_eq!(val, Some(idx as u64));
1376        }
1377        assert!(wheel.is_empty());
1378    }
1379
1380    // -------------------------------------------------------------------------
1381    // Poll values match
1382    // -------------------------------------------------------------------------
1383
1384    #[test]
1385    fn poll_values_match() {
1386        let now = Instant::now();
1387        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1388
1389        let expected: Vec<u64> = (100..110).collect();
1390        for &v in &expected {
1391            wheel.schedule_forget(now + ms(5), v);
1392        }
1393
1394        let mut buf = Vec::new();
1395        wheel.poll(now + ms(10), &mut buf);
1396
1397        buf.sort();
1398        assert_eq!(buf, expected);
1399    }
1400
1401    // -------------------------------------------------------------------------
1402    // Reschedule
1403    // -------------------------------------------------------------------------
1404
1405    #[test]
1406    fn reschedule_moves_deadline() {
1407        let now = Instant::now();
1408        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1409
1410        let h = wheel.schedule(now + ms(100), 42);
1411        assert_eq!(wheel.len(), 1);
1412
1413        // Reschedule to earlier
1414        let h = wheel.reschedule(h, now + ms(50));
1415        assert_eq!(wheel.len(), 1);
1416
1417        // Should NOT fire at 40ms
1418        let mut buf = Vec::new();
1419        let fired = wheel.poll(now + ms(40), &mut buf);
1420        assert_eq!(fired, 0);
1421
1422        // Should fire at 50ms
1423        let fired = wheel.poll(now + ms(55), &mut buf);
1424        assert_eq!(fired, 1);
1425        assert_eq!(buf, vec![42]);
1426
1427        // Clean up zombie
1428        wheel.cancel(h);
1429    }
1430
1431    #[test]
1432    fn reschedule_to_later() {
1433        let now = Instant::now();
1434        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1435
1436        let h = wheel.schedule(now + ms(50), 7);
1437
1438        // Reschedule to later
1439        let h = wheel.reschedule(h, now + ms(200));
1440
1441        // Should NOT fire at original deadline
1442        let mut buf = Vec::new();
1443        let fired = wheel.poll(now + ms(60), &mut buf);
1444        assert_eq!(fired, 0);
1445
1446        // Should fire at new deadline
1447        let fired = wheel.poll(now + ms(210), &mut buf);
1448        assert_eq!(fired, 1);
1449        assert_eq!(buf, vec![7]);
1450
1451        wheel.cancel(h);
1452    }
1453
1454    #[test]
1455    #[should_panic(expected = "cannot reschedule a fired timer")]
1456    fn reschedule_panics_on_zombie() {
1457        let now = Instant::now();
1458        let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1459
1460        let h = wheel.schedule(now + ms(10), 42);
1461
1462        let mut buf = Vec::new();
1463        wheel.poll(now + ms(20), &mut buf);
1464
1465        // h is now a zombie — reschedule should panic
1466        let _h = wheel.reschedule(h, now + ms(100));
1467    }
1468}
1469
1470#[cfg(test)]
1471mod proptests {
1472    use super::*;
1473    use proptest::prelude::*;
1474    use std::collections::HashSet;
1475    use std::mem;
1476    use std::time::{Duration, Instant};
1477
1478    /// Operation in a schedule/cancel interleaving.
1479    #[derive(Debug, Clone)]
1480    enum Op {
1481        /// Schedule a timer at `deadline_ms` milliseconds from epoch.
1482        Schedule { deadline_ms: u64 },
1483        /// Cancel the timer at the given index (modulo outstanding handles).
1484        Cancel { idx: usize },
1485    }
1486
1487    fn op_strategy() -> impl Strategy<Value = Op> {
1488        prop_oneof![
1489            // Schedule with deadlines from 1ms to 10_000ms
1490            (1u64..10_000).prop_map(|deadline_ms| Op::Schedule { deadline_ms }),
1491            // Cancel at random index
1492            any::<usize>().prop_map(|idx| Op::Cancel { idx }),
1493        ]
1494    }
1495
1496    proptest! {
1497        #![proptest_config(ProptestConfig::with_cases(500))]
1498
1499        /// Fuzz schedule/cancel interleaving.
1500        ///
1501        /// Random sequence of schedule and cancel operations. Invariants:
1502        /// - `len` always matches outstanding active timers
1503        /// - cancel on active handle returns `Some`
1504        /// - poll collects all un-cancelled values
1505        #[test]
1506        fn fuzz_schedule_cancel_interleaving(ops in proptest::collection::vec(op_strategy(), 1..200)) {
1507            let now = Instant::now();
1508            let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1509
1510            let mut handles: Vec<TimerHandle<u64>> = Vec::new();
1511            let mut active_values: HashSet<u64> = HashSet::new();
1512            let mut next_id: u64 = 0;
1513
1514            for op in &ops {
1515                match op {
1516                    Op::Schedule { deadline_ms } => {
1517                        let id = next_id;
1518                        next_id += 1;
1519                        let h = wheel.schedule(now + Duration::from_millis(*deadline_ms), id);
1520                        handles.push(h);
1521                        active_values.insert(id);
1522                    }
1523                    Op::Cancel { idx } => {
1524                        if !handles.is_empty() {
1525                            let i = idx % handles.len();
1526                            let h = handles.swap_remove(i);
1527                            let val = wheel.cancel(h);
1528                            // Value should be Some (all handles are for active timers)
1529                            let v = val.unwrap();
1530                            assert!(active_values.remove(&v));
1531                        }
1532                    }
1533                }
1534                // len must match active values
1535                prop_assert_eq!(wheel.len(), active_values.len());
1536            }
1537
1538            // Poll everything — should collect exactly the remaining active values
1539            let mut buf = Vec::new();
1540            // Use a far-future time to fire everything
1541            wheel.poll(now + Duration::from_secs(100_000), &mut buf);
1542
1543            // Clean up zombie handles (poll fired them, handles still exist)
1544            for h in handles {
1545                mem::forget(h);
1546            }
1547
1548            let fired_set: HashSet<u64> = buf.into_iter().collect();
1549            prop_assert_eq!(fired_set, active_values);
1550            prop_assert!(wheel.is_empty());
1551        }
1552
1553        /// Fuzz poll timing.
1554        ///
1555        /// Schedule N timers with random deadlines. Poll at random increasing
1556        /// times. Assert every timer fires exactly once, fired deadlines are
1557        /// all <= poll time, unfired deadlines are all > poll time.
1558        #[test]
1559        fn fuzz_poll_timing(
1560            deadlines in proptest::collection::vec(1u64..5000, 1..100),
1561            poll_times in proptest::collection::vec(1u64..10_000, 1..20),
1562        ) {
1563            let now = Instant::now();
1564            let mut wheel: Wheel<u64> = Wheel::unbounded(1024, now);
1565
1566            // Schedule all timers (fire-and-forget)
1567            for (i, &d) in deadlines.iter().enumerate() {
1568                wheel.schedule_forget(now + Duration::from_millis(d), i as u64);
1569            }
1570
1571            // Sort poll times to be monotonically increasing
1572            let mut sorted_times: Vec<u64> = poll_times;
1573            sorted_times.sort();
1574            sorted_times.dedup();
1575
1576            let mut all_fired: Vec<u64> = Vec::new();
1577
1578            for &t in &sorted_times {
1579                let mut buf = Vec::new();
1580                wheel.poll(now + Duration::from_millis(t), &mut buf);
1581
1582                // Every fired entry should have deadline_ms <= t
1583                for &id in &buf {
1584                    let deadline_ms = deadlines[id as usize];
1585                    prop_assert!(deadline_ms <= t,
1586                        "Timer {} with deadline {}ms fired at {}ms", id, deadline_ms, t);
1587                }
1588
1589                all_fired.extend(buf);
1590            }
1591
1592            // Fire everything remaining
1593            let mut final_buf = Vec::new();
1594            wheel.poll(now + Duration::from_secs(100_000), &mut final_buf);
1595            all_fired.extend(final_buf);
1596
1597            // Every timer should have fired exactly once
1598            all_fired.sort();
1599            let expected: Vec<u64> = (0..deadlines.len() as u64).collect();
1600            prop_assert_eq!(all_fired, expected, "Not all timers fired exactly once");
1601            prop_assert!(wheel.is_empty());
1602        }
1603    }
1604}