maniac_runtime/runtime/
timer_wheel.rs

1/// High-performance hierarchical hashed timing wheel for efficient timer scheduling
2/// Based on "Hashed and Hierarchical Timing Wheels" by Varghese and Lauck
3///
4/// Two-level hierarchy:
5/// - L0 (Fast): 1.05ms/tick × 1024 ticks = 1.07s coverage
6/// - L1 (Medium): 1.07s/tick × 1024 ticks = 18.33min coverage
7///
8/// Complexity:
9/// - Schedule: O(1) amortized, O(tick_allocation) worst-case when tick is full
10/// - Cancel: O(1)
11/// - Poll: O(expired_timers + ticks_per_wheel × tick_allocation) - scans from current_tick forward
12/// - Next deadline: O(ticks_per_wheel × tick_allocation) when cache invalidated
13///
14/// Timers in same tick are not ordered relative to each other
15/// NOT thread-safe - caller must synchronize
16use std::{
17    sync::atomic::{AtomicU64, Ordering},
18    time::Duration,
19};
20
21/// Error types for timer wheel operations
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum TimerWheelError {
24    /// Timer ID exceeds maximum supported value
25    InvalidTimerId,
26    /// Timer deadline is invalid (negative, overflow, etc.)
27    InvalidDeadline,
28    /// Capacity limit exceeded
29    CapacityExceeded,
30    /// Integer overflow in calculations
31    Overflow,
32    /// Timer ID does not correspond to a valid timer
33    TimerNotFound,
34}
35
36/// Timer entry stored in the wheel
37/// 
38/// Each entry contains the absolute deadline in nanoseconds, the computed deadline tick
39/// (for efficient spoke calculation), and the associated data.
40#[derive(Debug)]
41struct TimerEntry<T> {
42    /// Absolute deadline in nanoseconds since wheel start
43    deadline_ns: u64,
44    /// Precomputed deadline tick (deadline_ns / tick_resolution_ns)
45    /// Used for efficient spoke index calculation
46    deadline_tick: u64,
47    /// User data associated with this timer
48    data: T,
49}
50
51/// Internal single-level timing wheel
52/// 
53/// A hashed timing wheel uses a circular array where each slot (spoke) represents a time interval.
54/// Timers are hashed into spokes based on their deadline, allowing O(1) insertion and cancellation.
55/// Multiple timers can hash to the same spoke, so each spoke has multiple slots.
56/// 
57/// The wheel uses power-of-2 sizes for efficient bitwise operations:
58/// - Spoke index = (deadline_tick & tick_mask) - uses bitwise AND instead of modulo
59/// - Slot index within spoke uses bit shifts for fast indexing
60/// 
61/// Used as a building block for the hierarchical TimerWheel
62struct SingleWheel<T> {
63    /// Time resolution per tick in nanoseconds (must be power of 2)
64    /// Determines the granularity of timer scheduling
65    tick_resolution_ns: u64,
66
67    /// Current tick position - tracks how far the wheel has advanced
68    /// Used to optimize polling by only scanning expired ticks
69    current_tick: u64,
70
71    /// Number of active timers in this wheel
72    /// Used for quick checks to avoid unnecessary work
73    timer_count: u64,
74
75    /// Cached next deadline for efficient querying
76    /// Invalidated (set to NULL_DEADLINE) when timers are cancelled or cascaded
77    cached_next_deadline: u64,
78
79    /// Number of ticks/spokes per wheel (must be power of 2)
80    /// Each spoke represents one tick interval
81    ticks_per_wheel: usize,
82
83    /// Mask for tick calculation (ticks_per_wheel - 1)
84    /// Used for fast modulo: tick & tick_mask instead of tick % ticks_per_wheel
85    tick_mask: usize,
86
87    /// Bit shift for resolution calculations
88    /// Precomputed: log2(tick_resolution_ns) for fast division: deadline_ns >> resolution_bits_to_shift
89    resolution_bits_to_shift: u32,
90
91    /// Current allocation size per tick (must be power of 2)
92    /// Each spoke can hold this many timers before needing to expand
93    tick_allocation: usize,
94
95    /// Bit shift for allocation calculations
96    /// Precomputed: log2(tick_allocation) for fast indexing: spoke << allocation_bits_to_shift
97    allocation_bits_to_shift: u32,
98
99    /// Current index within a tick during polling
100    /// Used for incremental polling (currently not fully utilized)
101    poll_index: usize,
102
103    /// Flat array storage: [tick0_slot0, tick0_slot1, ..., tick1_slot0, ...]
104    /// Index calculation: (spoke_index << allocation_bits_to_shift) + slot_index
105    /// None indicates an empty slot
106    wheel: Vec<Option<TimerEntry<T>>>,
107
108    /// Per-tick next available slot hint for O(1) scheduling
109    /// Tracks the last used slot index per spoke to enable fast insertion
110    /// When scheduling, we first check this hint before doing a linear search
111    next_free_hint: Vec<usize>,
112}
113
114impl<T> SingleWheel<T> {
115    const INITIAL_TICK_ALLOCATION: usize = 16;
116    const NULL_DEADLINE: u64 = u64::MAX;
117
118    /// Create a new single-level timing wheel
119    /// 
120    /// # Arguments
121    /// * `tick_resolution_ns` - Duration of one tick in nanoseconds (must be power of 2)
122    /// * `ticks_per_wheel` - Number of spokes/ticks in the wheel (must be power of 2)
123    /// * `initial_tick_allocation` - Initial number of slots per tick (must be power of 2)
124    /// 
125    /// # Panics
126    /// Panics if any parameter is not a power of 2, as this breaks bitwise optimizations
127    fn new(
128        tick_resolution_ns: u64,
129        ticks_per_wheel: usize,
130        initial_tick_allocation: usize,
131    ) -> Self {
132        // Validate power-of-2 constraints for efficient bitwise operations
133        assert!(
134            ticks_per_wheel.is_power_of_two(),
135            "ticks_per_wheel must be power of 2"
136        );
137        assert!(
138            initial_tick_allocation.is_power_of_two(),
139            "tick_allocation must be power of 2"
140        );
141        assert!(
142            tick_resolution_ns.is_power_of_two(),
143            "tick_resolution must be power of 2 ns"
144        );
145
146        // Precompute masks and bit shifts for fast operations
147        let tick_mask = ticks_per_wheel - 1; // For fast modulo: tick & tick_mask
148        let resolution_bits_to_shift = tick_resolution_ns.trailing_zeros(); // log2(tick_resolution_ns)
149        let allocation_bits_to_shift = initial_tick_allocation.trailing_zeros(); // log2(tick_allocation)
150
151        // Allocate flat array: total capacity = ticks_per_wheel * slots_per_tick
152        let capacity = ticks_per_wheel * initial_tick_allocation;
153        let mut wheel = Vec::with_capacity(capacity);
154        wheel.resize_with(capacity, || None);
155
156        // Initialize per-spoke hints for O(1) scheduling
157        let mut next_free_hint = Vec::with_capacity(ticks_per_wheel);
158        next_free_hint.resize(ticks_per_wheel, 0);
159
160        Self {
161            tick_resolution_ns,
162            current_tick: 0,
163            timer_count: 0,
164            cached_next_deadline: Self::NULL_DEADLINE,
165            ticks_per_wheel,
166            tick_mask,
167            resolution_bits_to_shift,
168            tick_allocation: initial_tick_allocation,
169            allocation_bits_to_shift,
170            poll_index: 0,
171            wheel,
172            next_free_hint,
173        }
174    }
175
176    /// Schedule a timer into this wheel
177    /// 
178    /// Uses hashing to determine which spoke (tick) the timer belongs to, then finds
179    /// an available slot within that spoke. Employs a hint-based optimization for
180    /// O(1) average-case insertion.
181    /// 
182    /// # Returns
183    /// Returns `(spoke_index, slot_index)` tuple that can be encoded into a timer ID
184    fn schedule_internal(
185        &mut self,
186        deadline_ns: u64,
187        start_time_ns: u64,
188        data: T,
189        level: u8,
190    ) -> Result<(usize, usize), TimerWheelError> {
191        // Validate deadline is in the future
192        if deadline_ns < start_time_ns {
193            return Err(TimerWheelError::InvalidDeadline);
194        }
195
196        // Compute which tick this deadline falls into
197        // deadline_tick = (deadline_ns - start_time_ns) / tick_resolution_ns
198        // Using bit shift instead of division for performance
199        let deadline_tick =
200            (deadline_ns.saturating_sub(start_time_ns)) >> self.resolution_bits_to_shift;
201        
202        // Hash into spoke using bitwise AND (faster than modulo)
203        // spoke_index = deadline_tick % ticks_per_wheel
204        let spoke_index = (deadline_tick & self.tick_mask as u64) as usize;
205        
206        // Calculate start index of this spoke in the flat array
207        // tick_start_index = spoke_index * tick_allocation
208        let mut tick_start_index = spoke_index << self.allocation_bits_to_shift;
209
210        // Fast path: try hint first for O(1) insertion when spoke has free slots
211        let hint = self.next_free_hint[spoke_index];
212        let mut slot_idx = None;
213
214        if hint < self.tick_allocation {
215            let hint_index = tick_start_index + hint;
216            if self.wheel[hint_index].is_none() {
217                slot_idx = Some(hint);
218            }
219        }
220
221        // Slow path: linear search if hint didn't work
222        // Searches circularly starting from hint position
223        if slot_idx.is_none() {
224            for i in 0..self.tick_allocation {
225                let candidate = (hint + i) % self.tick_allocation;
226                let index = tick_start_index + candidate;
227
228                if self.wheel[index].is_none() {
229                    slot_idx = Some(candidate);
230                    break;
231                }
232            }
233        }
234
235        // If spoke is full, expand capacity
236        let slot_idx = match slot_idx {
237            Some(idx) => idx,
238            None => {
239                // Double the allocation for this spoke (and all spokes)
240                let new_slot = self.increase_capacity(spoke_index)
241                    .map_err(|_| TimerWheelError::CapacityExceeded)?;
242                // Recalculate start index after capacity increase
243                tick_start_index = spoke_index << self.allocation_bits_to_shift;
244                new_slot
245            }
246        };
247
248        // Store timer entry
249        let wheel_index = tick_start_index + slot_idx;
250        self.wheel[wheel_index] = Some(TimerEntry {
251            deadline_ns,
252            deadline_tick,
253            data,
254        });
255        self.timer_count += 1;
256
257        // Update cached earliest deadline if this timer is earlier
258        // This enables O(1) next_deadline() queries when cache is valid
259        if deadline_ns < self.cached_next_deadline {
260            self.cached_next_deadline = deadline_ns;
261        }
262
263        // Update hint to next slot for future insertions
264        // Wraps around to 0 if we've reached the end of allocation
265        let next_hint = if slot_idx + 1 >= self.tick_allocation {
266            0
267        } else {
268            slot_idx + 1
269        };
270        self.next_free_hint[spoke_index] = next_hint;
271
272        Ok((spoke_index, slot_idx))
273    }
274
275    /// Cancel a timer by spoke and slot index
276    /// 
277    /// O(1) operation - directly indexes into the wheel array using the provided indices.
278    /// Invalidates the cached deadline if the cancelled timer was the earliest one.
279    /// 
280    /// # Returns
281    /// Returns the timer data if found, or an error if the timer doesn't exist
282    fn cancel_internal(&mut self, spoke_index: usize, slot_index: usize) -> Result<Option<T>, TimerWheelError> {
283        // Validate bounds BEFORE computing wheel_index to prevent integer overflow
284        // This is a safety check - overflow could cause incorrect indexing
285        if spoke_index >= self.ticks_per_wheel || slot_index >= self.tick_allocation {
286            return Err(TimerWheelError::InvalidTimerId);
287        }
288
289        // Calculate flat array index: (spoke_index * tick_allocation) + slot_index
290        let wheel_index = (spoke_index << self.allocation_bits_to_shift) + slot_index;
291
292        // Remove timer entry if present
293        if let Some(entry) = self.wheel[wheel_index].take() {
294            self.timer_count -= 1;
295            let TimerEntry {
296                deadline_ns, data, ..
297            } = entry;
298            
299            // If we cancelled the earliest timer, invalidate the cache
300            // The cache will be recomputed lazily on the next next_deadline() call
301            if deadline_ns == self.cached_next_deadline {
302                self.cached_next_deadline = Self::NULL_DEADLINE;
303            }
304            
305            return Ok(Some(data));
306        }
307        
308        // Timer not found at this location
309        Err(TimerWheelError::TimerNotFound)
310    }
311
312    /// Double the capacity of all spokes in the wheel
313    /// 
314    /// When a spoke runs out of slots, we double the allocation for ALL spokes
315    /// (not just the full one) to maintain uniform indexing. This is expensive
316    /// but amortized over many insertions.
317    /// 
318    /// # Returns
319    /// Returns the old tick allocation size (which becomes the first new slot index)
320    fn increase_capacity(&mut self, spoke_index: usize) -> Result<usize, TimerWheelError> {
321        // Double the allocation (must remain power of 2)
322        let new_tick_allocation = self.tick_allocation.checked_mul(2)
323            .ok_or(TimerWheelError::Overflow)?;
324        let new_allocation_bits = new_tick_allocation.trailing_zeros();
325
326        // Check total capacity limit (1GB max)
327        let new_capacity = self.ticks_per_wheel * new_tick_allocation;
328        if new_capacity > (1 << 30) {
329            return Err(TimerWheelError::CapacityExceeded);
330        }
331
332        // Save old values for copying
333        let old_tick_allocation = self.tick_allocation;
334        let old_allocation_bits = self.allocation_bits_to_shift;
335
336        // Allocate new larger array
337        let mut new_wheel = Vec::with_capacity(new_capacity);
338        new_wheel.resize_with(new_capacity, || None);
339
340        // Copy all existing timers to new array
341        // Each spoke's data is copied to the same relative position in the new spoke
342        for j in 0..self.ticks_per_wheel {
343            let old_start = j << old_allocation_bits;  // Old spoke start
344            let new_start = j << new_allocation_bits;  // New spoke start
345            // Copy all slots from old spoke to new spoke
346            for k in 0..old_tick_allocation {
347                new_wheel[new_start + k] = self.wheel[old_start + k].take();
348            }
349        }
350
351        // Update wheel state
352        self.tick_allocation = new_tick_allocation;
353        self.allocation_bits_to_shift = new_allocation_bits;
354        self.wheel = new_wheel;
355
356        // Reset hints to point to the first new slot (old allocation size)
357        // This ensures future insertions use the newly allocated space first
358        for hint in &mut self.next_free_hint {
359            *hint = old_tick_allocation;
360        }
361
362        // Return old allocation as the first available new slot
363        Ok(old_tick_allocation)
364    }
365}
366
367/// High-performance hierarchical timing wheel
368/// 
369/// Uses a two-level hierarchy to efficiently handle timers across a wide time range:
370/// - **L0 (Fast wheel)**: Handles near-term timers (0 to ~1 second)
371///   - Fine-grained resolution for accurate short-term scheduling
372///   - Polled frequently for expired timers
373/// - **L1 (Medium wheel)**: Handles long-term timers (~1 second to ~18 minutes)
374///   - Coarser resolution to reduce memory overhead
375///   - Timers cascade down to L0 as their deadline approaches
376/// 
377/// The hierarchy allows O(1) scheduling and cancellation while supporting
378/// a wide range of timer durations without excessive memory usage.
379pub struct TimerWheel<T> {
380    /// L0 (Fast) wheel: handles timers in the near future
381    /// Typical: 1.05ms ticks × 1024 ticks = 1.07s coverage
382    l0: SingleWheel<T>,
383
384    /// L1 (Medium) wheel: handles timers further in the future
385    /// Typical: 1.07s ticks × 1024 ticks = 18.33min coverage
386    l1: SingleWheel<T>,
387
388    /// Null deadline sentinel value (u64::MAX)
389    /// Used to represent "no deadline" in cached values
390    null_deadline: u64,
391
392    /// Start time in nanoseconds - when the wheel was created
393    /// All deadlines are relative to this time
394    start_time_ns: u64,
395
396    /// Current time in nanoseconds (atomic for cross-thread updates)
397    /// Updated by external tick thread, read by worker threads
398    now_ns: AtomicU64,
399
400    /// Worker ID that owns this timer wheel
401    /// Used for cross-worker timer operations
402    worker_id: u32,
403
404    /// L0 coverage in nanoseconds (precalculated for efficiency)
405    /// Threshold for deciding whether to schedule in L0 or L1
406    l0_coverage_ns: u64,
407}
408
409impl<T> TimerWheel<T> {
410    const INITIAL_TICK_ALLOCATION: usize = 16;
411    const NULL_DEADLINE: u64 = u64::MAX;
412    const TICKS_PER_WHEEL: usize = 1024;
413
414    /// Create new hierarchical timer wheel
415    ///
416    /// # Arguments
417    /// * `tick_resolution` - L0 tick duration (must be power of 2 nanoseconds, typically 1.05ms)
418    /// * `ticks_per_wheel` - Number of spokes per wheel (must be power of 2, typically 1024)
419    /// * `worker_id` - Worker ID that owns this timer wheel
420    ///
421    /// # Timer Deadlines
422    /// All timer deadlines are specified as nanoseconds elapsed since wheel creation.
423    pub fn new(tick_resolution: Duration, ticks_per_wheel: usize, worker_id: u32) -> Self {
424        Self::with_allocation(
425            tick_resolution,
426            ticks_per_wheel,
427            Self::INITIAL_TICK_ALLOCATION,
428            worker_id,
429        )
430    }
431
432    /// Create hierarchical timer wheel with custom initial allocation per tick
433    /// 
434    /// # Arguments
435    /// * `tick_resolution` - L0 tick duration (must be power of 2 nanoseconds)
436    /// * `ticks_per_wheel` - Number of spokes per wheel (must be power of 2)
437    /// * `initial_tick_allocation` - Initial slots per tick (must be power of 2)
438    /// * `worker_id` - Worker ID that owns this timer wheel
439    /// 
440    /// # Design Notes
441    /// The L1 tick resolution equals one full L0 rotation, creating a natural
442    /// cascading relationship: when an L1 tick expires, all its timers are
443    /// within L0's coverage range and can be cascaded down.
444    pub fn with_allocation(
445        tick_resolution: Duration,
446        ticks_per_wheel: usize,
447        initial_tick_allocation: usize,
448        worker_id: u32,
449    ) -> Self {
450        let l0_tick_ns = tick_resolution.as_nanos() as u64;
451        // L0 coverage = one full rotation of L0 wheel
452        let l0_coverage_ns = l0_tick_ns * ticks_per_wheel as u64;
453        // L1 tick equals one full L0 rotation - this enables efficient cascading
454        let l1_tick_ns = l0_coverage_ns;
455
456        let l0 = SingleWheel::new(l0_tick_ns, ticks_per_wheel, initial_tick_allocation);
457        let l1 = SingleWheel::new(l1_tick_ns, ticks_per_wheel, initial_tick_allocation);
458
459        Self {
460            l0,
461            l1,
462            null_deadline: Self::NULL_DEADLINE,
463            start_time_ns: 0,
464            now_ns: AtomicU64::new(0),
465            worker_id,
466            l0_coverage_ns,
467        }
468    }
469
470    /// Schedule a timer for an absolute deadline
471    /// 
472    /// The deadline is specified as nanoseconds since wheel creation. The timer
473    /// is automatically placed in the appropriate wheel (L0 for near-term, L1 for
474    /// long-term) based on the deadline value.
475    /// 
476    /// # Arguments
477    /// * `deadline_ns` - Absolute deadline in nanoseconds (must be > 0 and in the future)
478    /// * `data` - User data to associate with this timer
479    /// 
480    /// # Returns
481    /// Returns a timer ID that can be used to cancel the timer, or an error if
482    /// the deadline is invalid or capacity is exceeded.
483    pub fn schedule_timer(&mut self, deadline_ns: u64, data: T) -> Result<u64, TimerWheelError> {
484        // Validate deadline: must be > 0, >= start_time_ns, and >= now_ns
485        // This prevents scheduling timers in the past
486        let now_ns = self.now_ns.load(Ordering::Relaxed);
487        if deadline_ns == 0 || deadline_ns < self.start_time_ns || deadline_ns < now_ns {
488            return Err(TimerWheelError::InvalidDeadline);
489        }
490
491        // Route to appropriate wheel based on deadline distance
492        if deadline_ns < self.l0_coverage_ns {
493            // Near-term timer: schedule in L0 (fast wheel)
494            let (spoke, slot) = self
495                .l0
496                .schedule_internal(deadline_ns, self.start_time_ns, data, 0)?;
497            Ok(Self::encode_timer_id(0, spoke, slot)?)
498        } else {
499            // Long-term timer: schedule in L1 (medium wheel)
500            // Will cascade down to L0 as deadline approaches
501            let (spoke, slot) = self
502                .l1
503                .schedule_internal(deadline_ns, self.start_time_ns, data, 1)?;
504            Ok(Self::encode_timer_id(1, spoke, slot)?)
505        }
506    }
507
508    /// Cancel a previously scheduled timer
509    /// 
510    /// O(1) operation - directly indexes into the wheel using the timer ID.
511    /// 
512    /// # Arguments
513    /// * `timer_id` - Timer ID returned from `schedule_timer()`
514    /// 
515    /// # Returns
516    /// Returns the timer data if found and cancelled, or an error if the timer
517    /// doesn't exist or the timer ID is invalid.
518    pub fn cancel_timer(&mut self, timer_id: u64) -> Result<Option<T>, TimerWheelError> {
519        // Decode timer ID to extract wheel level, spoke, and slot indices
520        let (level, spoke, slot) = Self::decode_timer_id(timer_id)?;
521        match level {
522            0 => self.l0.cancel_internal(spoke, slot),
523            1 => self.l1.cancel_internal(spoke, slot),
524            _ => Err(TimerWheelError::InvalidTimerId),
525        }
526    }
527
528    /// Poll for expired timers
529    /// 
530    /// Checks for timers that have expired by the given `now_ns` time. First cascades
531    /// L1 timers that are now within L0 coverage down to L0, then polls L0 for
532    /// expired timers.
533    /// 
534    /// # Arguments
535    /// * `now_ns` - Current time in nanoseconds
536    /// * `expiry_limit` - Maximum number of expired timers to return
537    /// * `output` - Buffer to write expired timer tuples: `(timer_id, deadline_ns, data)`
538    /// 
539    /// # Returns
540    /// Returns the number of expired timers found (may be less than `expiry_limit`)
541    pub fn poll(
542        &mut self,
543        now_ns: u64,
544        expiry_limit: usize,
545        output: &mut Vec<(u64, u64, T)>,
546    ) -> usize {
547        // Update internal clock
548        self.now_ns.store(now_ns, Ordering::Release);
549        output.clear();
550
551        // Step 1: Cascade L1 -> L0 for timers now within L0 coverage
552        // This moves long-term timers to the fast wheel as their deadline approaches
553        self.cascade_l1_to_l0(now_ns);
554
555        // Step 2: Poll L0 for expired timers
556        // All expired timers should now be in L0 (either originally or cascaded)
557        self.poll_l0(now_ns, expiry_limit, output)
558    }
559
560    /// Cascade timers from L1 to L0
561    /// 
562    /// Moves L1 timers that are now within L0's coverage range down to L0.
563    /// This ensures long-term timers are handled by the fast wheel as their
564    /// deadline approaches, enabling accurate expiration.
565    /// 
566    /// Cascading happens when: deadline_ns < now_ns + l0_coverage_ns
567    /// This means the timer will expire within one L0 rotation, so it should
568    /// be in L0 for precise timing.
569    fn cascade_l1_to_l0(&mut self, now_ns: u64) {
570        // Update L1's current tick position
571        let l1_target_tick =
572            (now_ns.saturating_sub(self.start_time_ns)) >> self.l1.resolution_bits_to_shift;
573        self.l1.current_tick = l1_target_tick;
574
575        // Early exit if no timers to cascade
576        if self.l1.timer_count == 0 {
577            return;
578        }
579
580        // Calculate threshold: cascade timers with deadline within L0 coverage
581        // Threshold = now_ns + l0_coverage_ns
582        // Any timer with deadline < threshold should be in L0
583        let cascade_deadline_threshold = now_ns.saturating_add(self.l0_coverage_ns);
584
585        // Scan all L1 spokes for timers to cascade
586        // Note: This is O(ticks_per_wheel × tick_allocation) but only runs
587        // when there are L1 timers, and cascading is relatively infrequent
588        for spoke_index in 0..self.l1.ticks_per_wheel {
589            let tick_start = spoke_index << self.l1.allocation_bits_to_shift;
590
591            // Check each slot in this spoke
592            for slot_idx in 0..self.l1.tick_allocation {
593                let wheel_index = tick_start + slot_idx;
594                
595                // Check if this timer should be cascaded to L0
596                // Criteria: deadline is within L0 coverage from now
597                let should_cascade = matches!(
598                    self.l1.wheel[wheel_index].as_ref(),
599                    Some(entry) if entry.deadline_ns < cascade_deadline_threshold
600                );
601
602                if should_cascade {
603                    // Remove from L1 and reschedule in L0
604                    if let Some(entry) = self.l1.wheel[wheel_index].take() {
605                        self.l1.timer_count -= 1;
606                        let TimerEntry {
607                            deadline_ns, data, ..
608                        } = entry;
609
610                        // Invalidate L1 cache if this was the earliest timer
611                        if deadline_ns == self.l1.cached_next_deadline {
612                            self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
613                        }
614
615                        // Reschedule into L0 (may place in past tick if already expired)
616                        let _ = self
617                            .l0
618                            .schedule_internal(deadline_ns, self.start_time_ns, data, 0);
619                    }
620                }
621            }
622
623            // Reset L1 spoke hint after scanning (timers may have been removed)
624            self.l1.next_free_hint[spoke_index] = 0;
625        }
626
627        // Invalidate deadline caches after cascading
628        // L0 cache must be invalidated because cascaded timers may have deadlines
629        // in past ticks (already processed), requiring a full scan to recompute
630        if self.l1.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
631            self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
632        }
633        if self.l0.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
634            self.l0.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
635        }
636    }
637
638    /// Poll L0 wheel for expired timers
639    /// 
640    /// Scans L0 for timers that have expired by `now_ns`. Uses an optimized
641    /// incremental scan starting from `current_tick` to avoid scanning the entire
642    /// wheel when only a few ticks have expired. Falls back to a full scan if
643    /// cascaded timers may exist in earlier ticks.
644    /// 
645    /// # Performance
646    /// - Best case: O(expired_timers) - only scans expired ticks
647    /// - Worst case: O(ticks_per_wheel × tick_allocation) - full wheel scan
648    fn poll_l0(
649        &mut self,
650        now_ns: u64,
651        expiry_limit: usize,
652        output: &mut Vec<(u64, u64, T)>,
653    ) -> usize {
654        // Calculate which tick we should be at given current time
655        let target_tick =
656            (now_ns.saturating_sub(self.start_time_ns)) >> self.l0.resolution_bits_to_shift;
657
658        // Early exit if no timers
659        if self.l0.timer_count == 0 {
660            self.l0.current_tick = target_tick;
661            return 0;
662        }
663
664        // Optimized incremental scan: only scan ticks from current_tick to target_tick
665        // This avoids scanning the entire wheel when only a few ticks have expired
666        let start_tick = self.l0.current_tick;
667        let ticks_to_scan = if target_tick >= start_tick {
668            // Normal case: scan forward from current_tick to target_tick
669            // Cap at ticks_per_wheel to handle wrap-around
670            (target_tick - start_tick + 1).min(self.l0.ticks_per_wheel as u64) as usize
671        } else {
672            // Wrapped around: need to scan entire wheel
673            // (This shouldn't happen in practice, but handle it safely)
674            self.l0.ticks_per_wheel
675        };
676
677        // Scan ticks incrementally from current_tick forward
678        for tick_offset in 0..ticks_to_scan {
679            if output.len() >= expiry_limit {
680                break;
681            }
682
683            // Calculate spoke index with wrap-around using bitwise AND
684            let spoke = ((start_tick as usize + tick_offset) & self.l0.tick_mask) as usize;
685            let tick_start = spoke << self.l0.allocation_bits_to_shift;
686
687            // Check all slots in this spoke
688            for slot_idx in 0..self.l0.tick_allocation {
689                if output.len() >= expiry_limit {
690                    break;
691                }
692
693                let wheel_index = tick_start + slot_idx;
694
695                if let Some(entry) = self.l0.wheel[wheel_index].as_ref() {
696                    // Use deadline-based check (not tick-based) for accuracy
697                    // A timer may expire even if its tick hasn't been reached yet
698                    if now_ns >= entry.deadline_ns {
699                        // Remove expired timer
700                        let entry = self.l0.wheel[wheel_index].take().unwrap();
701                        let TimerEntry {
702                            deadline_ns, data, ..
703                        } = entry;
704                        self.l0.timer_count -= 1;
705
706                        // Encode timer ID and add to output
707                        let timer_id = Self::encode_timer_id(0, spoke, slot_idx).unwrap_or(0);
708                        output.push((timer_id, deadline_ns, data));
709
710                        // Invalidate cache if this was the earliest timer
711                        if deadline_ns == self.l0.cached_next_deadline {
712                            self.l0.cached_next_deadline = self.null_deadline;
713                        }
714                    }
715                }
716            }
717        }
718
719        // Fallback: full wheel scan if we didn't find enough expired timers
720        // This handles the case where cascaded timers were placed in ticks
721        // before current_tick (e.g., if current_tick advanced while L0 was empty)
722        if output.len() < expiry_limit && self.l0.timer_count > 0 {
723            // Full scan: check all ticks for expired timers
724            for spoke in 0..self.l0.ticks_per_wheel {
725                if output.len() >= expiry_limit {
726                    break;
727                }
728
729                let tick_start = spoke << self.l0.allocation_bits_to_shift;
730
731                for slot_idx in 0..self.l0.tick_allocation {
732                    if output.len() >= expiry_limit {
733                        break;
734                    }
735
736                    let wheel_index = tick_start + slot_idx;
737
738                    if let Some(entry) = self.l0.wheel[wheel_index].as_ref() {
739                        if now_ns >= entry.deadline_ns {
740                            let entry = self.l0.wheel[wheel_index].take().unwrap();
741                            let TimerEntry {
742                                deadline_ns, data, ..
743                            } = entry;
744                            self.l0.timer_count -= 1;
745
746                            let timer_id = Self::encode_timer_id(0, spoke, slot_idx).unwrap_or(0);
747                            output.push((timer_id, deadline_ns, data));
748
749                            if deadline_ns == self.l0.cached_next_deadline {
750                                self.l0.cached_next_deadline = self.null_deadline;
751                            }
752                        }
753                    }
754                }
755            }
756        }
757
758        // Advance current_tick to target_tick since we've processed all expired timers
759        self.l0.current_tick = target_tick;
760        self.l0.poll_index = 0;
761
762        output.len()
763    }
764
765    /// Advance the wheel's current tick position without polling
766    /// 
767    /// Useful for synchronizing the wheel's position when time advances
768    /// without immediately processing expired timers.
769    pub fn advance_to(&mut self, now_ns: u64) {
770        let new_tick =
771            (now_ns.saturating_sub(self.start_time_ns)) >> self.l0.resolution_bits_to_shift;
772        // Only advance, never go backwards
773        self.l0.current_tick = self.l0.current_tick.max(new_tick);
774    }
775
776    /// Get the time corresponding to the next tick
777    /// 
778    /// Returns the absolute time (in nanoseconds) when the next tick will occur.
779    pub fn current_tick_time_ns(&self) -> u64 {
780        ((self.l0.current_tick + 1) << self.l0.resolution_bits_to_shift) + self.start_time_ns
781    }
782
783    /// Get the total number of active timers across both wheels
784    pub fn timer_count(&self) -> u64 {
785        self.l0.timer_count + self.l1.timer_count
786    }
787
788    /// Get the earliest deadline among all active timers
789    /// 
790    /// Uses cached values when available for O(1) performance. Falls back to
791    /// full wheel scans when cache is invalidated (O(ticks_per_wheel × tick_allocation)).
792    /// 
793    /// # Returns
794    /// Returns the earliest deadline in nanoseconds, or `None` if no timers are scheduled.
795    pub fn next_deadline(&mut self) -> Option<u64> {
796        let total_count = self.l0.timer_count + self.l1.timer_count;
797        if total_count == 0 {
798            return None;
799        }
800
801        // Get earliest deadline from L0 (with lazy cache recomputation)
802        let l0_next = if self.l0.timer_count > 0 {
803            if self.l0.cached_next_deadline == SingleWheel::<T>::NULL_DEADLINE {
804                self.recompute_l0_deadline();
805            }
806            if self.l0.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
807                Some(self.l0.cached_next_deadline)
808            } else {
809                None
810            }
811        } else {
812            None
813        };
814
815        // Get earliest deadline from L1 (with lazy cache recomputation)
816        let l1_next = if self.l1.timer_count > 0 {
817            if self.l1.cached_next_deadline == SingleWheel::<T>::NULL_DEADLINE {
818                self.recompute_l1_deadline();
819            }
820            if self.l1.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
821                Some(self.l1.cached_next_deadline)
822            } else {
823                None
824            }
825        } else {
826            None
827        };
828
829        // Return the minimum of L0 and L1 earliest deadlines
830        match (l0_next, l1_next) {
831            (Some(d0), Some(d1)) => Some(d0.min(d1)),
832            (Some(d0), None) => Some(d0),
833            (None, Some(d1)) => Some(d1),
834            (None, None) => None,
835        }
836    }
837
838    /// Recompute the cached earliest deadline for L0
839    /// 
840    /// Scans the entire L0 wheel to find the earliest deadline. This is expensive
841    /// (O(ticks_per_wheel × tick_allocation)) but only runs when the cache is invalidated.
842    /// 
843    /// Must scan the entire wheel because cascaded timers may exist in any tick,
844    /// not just those after current_tick.
845    fn recompute_l0_deadline(&mut self) {
846        if self.l0.timer_count == 0 {
847            self.l0.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
848            return;
849        }
850
851        let mut earliest = SingleWheel::<T>::NULL_DEADLINE;
852
853        // Scan entire wheel to find earliest deadline
854        // Must scan all ticks because cascaded timers can be placed anywhere
855        for spoke in 0..self.l0.ticks_per_wheel {
856            for slot_idx in 0..self.l0.tick_allocation {
857                let wheel_index = (spoke << self.l0.allocation_bits_to_shift) + slot_idx;
858                if let Some(entry) = self.l0.wheel[wheel_index].as_ref() {
859                    if entry.deadline_ns < earliest {
860                        earliest = entry.deadline_ns;
861                    }
862                }
863            }
864        }
865
866        self.l0.cached_next_deadline = earliest;
867    }
868
869    /// Recompute the cached earliest deadline for L1
870    /// 
871    /// Scans the entire L1 wheel to find the earliest deadline. This is expensive
872    /// (O(ticks_per_wheel × tick_allocation)) but only runs when the cache is invalidated.
873    fn recompute_l1_deadline(&mut self) {
874        if self.l1.timer_count == 0 {
875            self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
876            return;
877        }
878
879        let mut earliest = SingleWheel::<T>::NULL_DEADLINE;
880
881        // Scan entire wheel to find earliest deadline
882        for spoke in 0..self.l1.ticks_per_wheel {
883            for slot_idx in 0..self.l1.tick_allocation {
884                let wheel_index = (spoke << self.l1.allocation_bits_to_shift) + slot_idx;
885                if let Some(entry) = self.l1.wheel[wheel_index].as_ref() {
886                    if entry.deadline_ns < earliest {
887                        earliest = entry.deadline_ns;
888                    }
889                }
890            }
891        }
892
893        self.l1.cached_next_deadline = earliest;
894    }
895
896    #[inline]
897    pub fn now_ns(&self) -> u64 {
898        self.now_ns.load(Ordering::Relaxed)
899    }
900
901    #[inline]
902    pub fn set_now_ns(&mut self, now_ns: u64) {
903        self.now_ns.store(now_ns, Ordering::Release);
904    }
905
906    #[inline]
907    pub fn worker_id(&self) -> u32 {
908        self.worker_id
909    }
910
911    pub fn clear(&mut self) {
912        for slot in &mut self.l0.wheel {
913            *slot = None;
914        }
915        for slot in &mut self.l1.wheel {
916            *slot = None;
917        }
918        
919        self.l0.timer_count = 0;
920        self.l1.timer_count = 0;
921        self.l0.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
922        self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
923    }
924
925    /// Encode wheel level, spoke, and slot into a compact timer ID
926    /// 
927    /// Timer ID format (64 bits):
928    /// - Bits 62-63: Level (0=L0, 1=L1)
929    /// - Bits 32-61: Spoke index (30 bits, max 1<<30 spokes)
930    /// - Bits 0-31:  Slot index (32 bits, max 1<<32 slots)
931    /// 
932    /// This encoding allows O(1) timer cancellation by directly decoding
933    /// the wheel location from the timer ID.
934    #[inline]
935    fn encode_timer_id(level: u8, spoke: usize, slot: usize) -> Result<u64, TimerWheelError> {
936        // Validate bounds to prevent truncation during encoding
937        if spoke >= (1 << 30) {
938            return Err(TimerWheelError::InvalidTimerId);
939        }
940        if slot >= (1 << 32) {
941            return Err(TimerWheelError::InvalidTimerId);
942        }
943        if level > 1 {
944            return Err(TimerWheelError::InvalidTimerId);
945        }
946
947        // Pack into 64-bit integer: level (2 bits) | spoke (30 bits) | slot (32 bits)
948        Ok(((level as u64) << 62) | ((spoke as u64) << 32) | (slot as u64))
949    }
950
951    /// Decode a timer ID into wheel level, spoke, and slot indices
952    /// 
953    /// Reverse of `encode_timer_id()`. Extracts the three components
954    /// needed to locate a timer in the wheel structure.
955    #[inline]
956    fn decode_timer_id(timer_id: u64) -> Result<(u8, usize, usize), TimerWheelError> {
957        // Extract level from top 2 bits
958        let level = ((timer_id >> 62) & 0x3) as u8;
959        // Extract spoke from middle 30 bits
960        let spoke = ((timer_id >> 32) & 0x3FFFFFFF) as usize;
961        // Extract slot from bottom 32 bits
962        let slot = (timer_id & 0xFFFFFFFF) as usize;
963
964        // Validate decoded level
965        if level > 1 {
966            return Err(TimerWheelError::InvalidTimerId);
967        }
968
969        Ok((level, spoke, slot))
970    }
971}
972
973#[cfg(test)]
974mod tests {
975    use super::*;
976
977    #[test]
978    fn test_basic_scheduling() {
979        let mut wheel = TimerWheel::<u32>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
980
981        let timer_id = wheel.schedule_timer(100_000_000, 42).unwrap(); // 100ms
982        assert_eq!(wheel.timer_count(), 1);
983
984        let data = wheel.cancel_timer(timer_id).unwrap().unwrap();
985        assert_eq!(data, 42);
986        assert_eq!(wheel.timer_count(), 0);
987    }
988
989    #[test]
990    fn test_l0_timer() {
991        let mut wheel = TimerWheel::<&str>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
992
993        // 500ms timer should go to L0
994        wheel.schedule_timer(500_000_000, "fast").unwrap();
995        assert_eq!(wheel.l0.timer_count, 1);
996        assert_eq!(wheel.l1.timer_count, 0);
997    }
998
999    #[test]
1000    fn test_l1_timer() {
1001        let mut wheel = TimerWheel::<&str>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1002
1003        // 5s timer should go to L1
1004        wheel.schedule_timer(5_000_000_000, "slow").unwrap();
1005        assert_eq!(wheel.l0.timer_count, 0);
1006        assert_eq!(wheel.l1.timer_count, 1);
1007    }
1008
1009    #[test]
1010    fn test_cascade() {
1011        let mut wheel = TimerWheel::<&str>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1012
1013        // Schedule timer at 2 seconds (in L1 tick 1, which covers 1.07s-2.15s)
1014        wheel.schedule_timer(2_000_000_000, "cascaded").unwrap();
1015        assert_eq!(wheel.l1.timer_count, 1);
1016        assert_eq!(wheel.l0.timer_count, 0);
1017
1018        // Poll at 2.2 seconds - should cascade tick 1 (we're now at tick 2)
1019        // L1 tick 2 starts at 2.147s, so polling at 2.2s triggers cascade of tick 1
1020        let mut output = Vec::new();
1021        wheel.poll(2_200_000_000, 100, &mut output);
1022        // Timer cascaded to L0 but hasn't fired yet (deadline is 2.0s, we're at 2.2s so it fires)
1023        assert_eq!(output.len(), 1);
1024        assert_eq!(output[0].2, "cascaded");
1025    }
1026
1027    #[test]
1028    fn test_error_handling() {
1029        let mut wheel = TimerWheel::<u32>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1030
1031        // Test invalid deadline (zero)
1032        assert!(matches!(
1033            wheel.schedule_timer(0, 42),
1034            Err(TimerWheelError::InvalidDeadline)
1035        ));
1036
1037        // Test invalid timer ID
1038        assert!(matches!(
1039            wheel.cancel_timer(0xFFFFFFFFFFFFFFFF),
1040            Err(TimerWheelError::InvalidTimerId)
1041        ));
1042    }
1043
1044    #[test]
1045    fn test_timer_validation() {
1046        let mut wheel = TimerWheel::<u32>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1047
1048        let timer_id = wheel.schedule_timer(100_000_000, 42).unwrap();
1049
1050        // Cancel the timer
1051        let data = wheel.cancel_timer(timer_id).unwrap().unwrap();
1052        assert_eq!(data, 42);
1053
1054        // Try to cancel again - should fail
1055        assert!(matches!(
1056            wheel.cancel_timer(timer_id),
1057            Err(TimerWheelError::TimerNotFound)
1058        ));
1059    }
1060}