maniac_runtime/runtime/timer_wheel.rs
1/// High-performance hierarchical hashed timing wheel for efficient timer scheduling
2/// Based on "Hashed and Hierarchical Timing Wheels" by Varghese and Lauck
3///
4/// Two-level hierarchy:
5/// - L0 (Fast): 1.05ms/tick × 1024 ticks = 1.07s coverage
6/// - L1 (Medium): 1.07s/tick × 1024 ticks = 18.33min coverage
7///
8/// Complexity:
9/// - Schedule: O(1) amortized, O(tick_allocation) worst-case when tick is full
10/// - Cancel: O(1)
11/// - Poll: O(expired_timers + ticks_per_wheel × tick_allocation) - scans from current_tick forward
12/// - Next deadline: O(ticks_per_wheel × tick_allocation) when cache invalidated
13///
14/// Timers in same tick are not ordered relative to each other
15/// NOT thread-safe - caller must synchronize
16use std::{
17 sync::atomic::{AtomicU64, Ordering},
18 time::Duration,
19};
20
21/// Error types for timer wheel operations
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum TimerWheelError {
24 /// Timer ID exceeds maximum supported value
25 InvalidTimerId,
26 /// Timer deadline is invalid (negative, overflow, etc.)
27 InvalidDeadline,
28 /// Capacity limit exceeded
29 CapacityExceeded,
30 /// Integer overflow in calculations
31 Overflow,
32 /// Timer ID does not correspond to a valid timer
33 TimerNotFound,
34}
35
36/// Timer entry stored in the wheel
37///
38/// Each entry contains the absolute deadline in nanoseconds, the computed deadline tick
39/// (for efficient spoke calculation), and the associated data.
40#[derive(Debug)]
41struct TimerEntry<T> {
42 /// Absolute deadline in nanoseconds since wheel start
43 deadline_ns: u64,
44 /// Precomputed deadline tick (deadline_ns / tick_resolution_ns)
45 /// Used for efficient spoke index calculation
46 deadline_tick: u64,
47 /// User data associated with this timer
48 data: T,
49}
50
51/// Internal single-level timing wheel
52///
53/// A hashed timing wheel uses a circular array where each slot (spoke) represents a time interval.
54/// Timers are hashed into spokes based on their deadline, allowing O(1) insertion and cancellation.
55/// Multiple timers can hash to the same spoke, so each spoke has multiple slots.
56///
57/// The wheel uses power-of-2 sizes for efficient bitwise operations:
58/// - Spoke index = (deadline_tick & tick_mask) - uses bitwise AND instead of modulo
59/// - Slot index within spoke uses bit shifts for fast indexing
60///
61/// Used as a building block for the hierarchical TimerWheel
62struct SingleWheel<T> {
63 /// Time resolution per tick in nanoseconds (must be power of 2)
64 /// Determines the granularity of timer scheduling
65 tick_resolution_ns: u64,
66
67 /// Current tick position - tracks how far the wheel has advanced
68 /// Used to optimize polling by only scanning expired ticks
69 current_tick: u64,
70
71 /// Number of active timers in this wheel
72 /// Used for quick checks to avoid unnecessary work
73 timer_count: u64,
74
75 /// Cached next deadline for efficient querying
76 /// Invalidated (set to NULL_DEADLINE) when timers are cancelled or cascaded
77 cached_next_deadline: u64,
78
79 /// Number of ticks/spokes per wheel (must be power of 2)
80 /// Each spoke represents one tick interval
81 ticks_per_wheel: usize,
82
83 /// Mask for tick calculation (ticks_per_wheel - 1)
84 /// Used for fast modulo: tick & tick_mask instead of tick % ticks_per_wheel
85 tick_mask: usize,
86
87 /// Bit shift for resolution calculations
88 /// Precomputed: log2(tick_resolution_ns) for fast division: deadline_ns >> resolution_bits_to_shift
89 resolution_bits_to_shift: u32,
90
91 /// Current allocation size per tick (must be power of 2)
92 /// Each spoke can hold this many timers before needing to expand
93 tick_allocation: usize,
94
95 /// Bit shift for allocation calculations
96 /// Precomputed: log2(tick_allocation) for fast indexing: spoke << allocation_bits_to_shift
97 allocation_bits_to_shift: u32,
98
99 /// Current index within a tick during polling
100 /// Used for incremental polling (currently not fully utilized)
101 poll_index: usize,
102
103 /// Flat array storage: [tick0_slot0, tick0_slot1, ..., tick1_slot0, ...]
104 /// Index calculation: (spoke_index << allocation_bits_to_shift) + slot_index
105 /// None indicates an empty slot
106 wheel: Vec<Option<TimerEntry<T>>>,
107
108 /// Per-tick next available slot hint for O(1) scheduling
109 /// Tracks the last used slot index per spoke to enable fast insertion
110 /// When scheduling, we first check this hint before doing a linear search
111 next_free_hint: Vec<usize>,
112}
113
114impl<T> SingleWheel<T> {
115 const INITIAL_TICK_ALLOCATION: usize = 16;
116 const NULL_DEADLINE: u64 = u64::MAX;
117
118 /// Create a new single-level timing wheel
119 ///
120 /// # Arguments
121 /// * `tick_resolution_ns` - Duration of one tick in nanoseconds (must be power of 2)
122 /// * `ticks_per_wheel` - Number of spokes/ticks in the wheel (must be power of 2)
123 /// * `initial_tick_allocation` - Initial number of slots per tick (must be power of 2)
124 ///
125 /// # Panics
126 /// Panics if any parameter is not a power of 2, as this breaks bitwise optimizations
127 fn new(
128 tick_resolution_ns: u64,
129 ticks_per_wheel: usize,
130 initial_tick_allocation: usize,
131 ) -> Self {
132 // Validate power-of-2 constraints for efficient bitwise operations
133 assert!(
134 ticks_per_wheel.is_power_of_two(),
135 "ticks_per_wheel must be power of 2"
136 );
137 assert!(
138 initial_tick_allocation.is_power_of_two(),
139 "tick_allocation must be power of 2"
140 );
141 assert!(
142 tick_resolution_ns.is_power_of_two(),
143 "tick_resolution must be power of 2 ns"
144 );
145
146 // Precompute masks and bit shifts for fast operations
147 let tick_mask = ticks_per_wheel - 1; // For fast modulo: tick & tick_mask
148 let resolution_bits_to_shift = tick_resolution_ns.trailing_zeros(); // log2(tick_resolution_ns)
149 let allocation_bits_to_shift = initial_tick_allocation.trailing_zeros(); // log2(tick_allocation)
150
151 // Allocate flat array: total capacity = ticks_per_wheel * slots_per_tick
152 let capacity = ticks_per_wheel * initial_tick_allocation;
153 let mut wheel = Vec::with_capacity(capacity);
154 wheel.resize_with(capacity, || None);
155
156 // Initialize per-spoke hints for O(1) scheduling
157 let mut next_free_hint = Vec::with_capacity(ticks_per_wheel);
158 next_free_hint.resize(ticks_per_wheel, 0);
159
160 Self {
161 tick_resolution_ns,
162 current_tick: 0,
163 timer_count: 0,
164 cached_next_deadline: Self::NULL_DEADLINE,
165 ticks_per_wheel,
166 tick_mask,
167 resolution_bits_to_shift,
168 tick_allocation: initial_tick_allocation,
169 allocation_bits_to_shift,
170 poll_index: 0,
171 wheel,
172 next_free_hint,
173 }
174 }
175
176 /// Schedule a timer into this wheel
177 ///
178 /// Uses hashing to determine which spoke (tick) the timer belongs to, then finds
179 /// an available slot within that spoke. Employs a hint-based optimization for
180 /// O(1) average-case insertion.
181 ///
182 /// # Returns
183 /// Returns `(spoke_index, slot_index)` tuple that can be encoded into a timer ID
184 fn schedule_internal(
185 &mut self,
186 deadline_ns: u64,
187 start_time_ns: u64,
188 data: T,
189 level: u8,
190 ) -> Result<(usize, usize), TimerWheelError> {
191 // Validate deadline is in the future
192 if deadline_ns < start_time_ns {
193 return Err(TimerWheelError::InvalidDeadline);
194 }
195
196 // Compute which tick this deadline falls into
197 // deadline_tick = (deadline_ns - start_time_ns) / tick_resolution_ns
198 // Using bit shift instead of division for performance
199 let deadline_tick =
200 (deadline_ns.saturating_sub(start_time_ns)) >> self.resolution_bits_to_shift;
201
202 // Hash into spoke using bitwise AND (faster than modulo)
203 // spoke_index = deadline_tick % ticks_per_wheel
204 let spoke_index = (deadline_tick & self.tick_mask as u64) as usize;
205
206 // Calculate start index of this spoke in the flat array
207 // tick_start_index = spoke_index * tick_allocation
208 let mut tick_start_index = spoke_index << self.allocation_bits_to_shift;
209
210 // Fast path: try hint first for O(1) insertion when spoke has free slots
211 let hint = self.next_free_hint[spoke_index];
212 let mut slot_idx = None;
213
214 if hint < self.tick_allocation {
215 let hint_index = tick_start_index + hint;
216 if self.wheel[hint_index].is_none() {
217 slot_idx = Some(hint);
218 }
219 }
220
221 // Slow path: linear search if hint didn't work
222 // Searches circularly starting from hint position
223 if slot_idx.is_none() {
224 for i in 0..self.tick_allocation {
225 let candidate = (hint + i) % self.tick_allocation;
226 let index = tick_start_index + candidate;
227
228 if self.wheel[index].is_none() {
229 slot_idx = Some(candidate);
230 break;
231 }
232 }
233 }
234
235 // If spoke is full, expand capacity
236 let slot_idx = match slot_idx {
237 Some(idx) => idx,
238 None => {
239 // Double the allocation for this spoke (and all spokes)
240 let new_slot = self.increase_capacity(spoke_index)
241 .map_err(|_| TimerWheelError::CapacityExceeded)?;
242 // Recalculate start index after capacity increase
243 tick_start_index = spoke_index << self.allocation_bits_to_shift;
244 new_slot
245 }
246 };
247
248 // Store timer entry
249 let wheel_index = tick_start_index + slot_idx;
250 self.wheel[wheel_index] = Some(TimerEntry {
251 deadline_ns,
252 deadline_tick,
253 data,
254 });
255 self.timer_count += 1;
256
257 // Update cached earliest deadline if this timer is earlier
258 // This enables O(1) next_deadline() queries when cache is valid
259 if deadline_ns < self.cached_next_deadline {
260 self.cached_next_deadline = deadline_ns;
261 }
262
263 // Update hint to next slot for future insertions
264 // Wraps around to 0 if we've reached the end of allocation
265 let next_hint = if slot_idx + 1 >= self.tick_allocation {
266 0
267 } else {
268 slot_idx + 1
269 };
270 self.next_free_hint[spoke_index] = next_hint;
271
272 Ok((spoke_index, slot_idx))
273 }
274
275 /// Cancel a timer by spoke and slot index
276 ///
277 /// O(1) operation - directly indexes into the wheel array using the provided indices.
278 /// Invalidates the cached deadline if the cancelled timer was the earliest one.
279 ///
280 /// # Returns
281 /// Returns the timer data if found, or an error if the timer doesn't exist
282 fn cancel_internal(&mut self, spoke_index: usize, slot_index: usize) -> Result<Option<T>, TimerWheelError> {
283 // Validate bounds BEFORE computing wheel_index to prevent integer overflow
284 // This is a safety check - overflow could cause incorrect indexing
285 if spoke_index >= self.ticks_per_wheel || slot_index >= self.tick_allocation {
286 return Err(TimerWheelError::InvalidTimerId);
287 }
288
289 // Calculate flat array index: (spoke_index * tick_allocation) + slot_index
290 let wheel_index = (spoke_index << self.allocation_bits_to_shift) + slot_index;
291
292 // Remove timer entry if present
293 if let Some(entry) = self.wheel[wheel_index].take() {
294 self.timer_count -= 1;
295 let TimerEntry {
296 deadline_ns, data, ..
297 } = entry;
298
299 // If we cancelled the earliest timer, invalidate the cache
300 // The cache will be recomputed lazily on the next next_deadline() call
301 if deadline_ns == self.cached_next_deadline {
302 self.cached_next_deadline = Self::NULL_DEADLINE;
303 }
304
305 return Ok(Some(data));
306 }
307
308 // Timer not found at this location
309 Err(TimerWheelError::TimerNotFound)
310 }
311
312 /// Double the capacity of all spokes in the wheel
313 ///
314 /// When a spoke runs out of slots, we double the allocation for ALL spokes
315 /// (not just the full one) to maintain uniform indexing. This is expensive
316 /// but amortized over many insertions.
317 ///
318 /// # Returns
319 /// Returns the old tick allocation size (which becomes the first new slot index)
320 fn increase_capacity(&mut self, spoke_index: usize) -> Result<usize, TimerWheelError> {
321 // Double the allocation (must remain power of 2)
322 let new_tick_allocation = self.tick_allocation.checked_mul(2)
323 .ok_or(TimerWheelError::Overflow)?;
324 let new_allocation_bits = new_tick_allocation.trailing_zeros();
325
326 // Check total capacity limit (1GB max)
327 let new_capacity = self.ticks_per_wheel * new_tick_allocation;
328 if new_capacity > (1 << 30) {
329 return Err(TimerWheelError::CapacityExceeded);
330 }
331
332 // Save old values for copying
333 let old_tick_allocation = self.tick_allocation;
334 let old_allocation_bits = self.allocation_bits_to_shift;
335
336 // Allocate new larger array
337 let mut new_wheel = Vec::with_capacity(new_capacity);
338 new_wheel.resize_with(new_capacity, || None);
339
340 // Copy all existing timers to new array
341 // Each spoke's data is copied to the same relative position in the new spoke
342 for j in 0..self.ticks_per_wheel {
343 let old_start = j << old_allocation_bits; // Old spoke start
344 let new_start = j << new_allocation_bits; // New spoke start
345 // Copy all slots from old spoke to new spoke
346 for k in 0..old_tick_allocation {
347 new_wheel[new_start + k] = self.wheel[old_start + k].take();
348 }
349 }
350
351 // Update wheel state
352 self.tick_allocation = new_tick_allocation;
353 self.allocation_bits_to_shift = new_allocation_bits;
354 self.wheel = new_wheel;
355
356 // Reset hints to point to the first new slot (old allocation size)
357 // This ensures future insertions use the newly allocated space first
358 for hint in &mut self.next_free_hint {
359 *hint = old_tick_allocation;
360 }
361
362 // Return old allocation as the first available new slot
363 Ok(old_tick_allocation)
364 }
365}
366
367/// High-performance hierarchical timing wheel
368///
369/// Uses a two-level hierarchy to efficiently handle timers across a wide time range:
370/// - **L0 (Fast wheel)**: Handles near-term timers (0 to ~1 second)
371/// - Fine-grained resolution for accurate short-term scheduling
372/// - Polled frequently for expired timers
373/// - **L1 (Medium wheel)**: Handles long-term timers (~1 second to ~18 minutes)
374/// - Coarser resolution to reduce memory overhead
375/// - Timers cascade down to L0 as their deadline approaches
376///
377/// The hierarchy allows O(1) scheduling and cancellation while supporting
378/// a wide range of timer durations without excessive memory usage.
379pub struct TimerWheel<T> {
380 /// L0 (Fast) wheel: handles timers in the near future
381 /// Typical: 1.05ms ticks × 1024 ticks = 1.07s coverage
382 l0: SingleWheel<T>,
383
384 /// L1 (Medium) wheel: handles timers further in the future
385 /// Typical: 1.07s ticks × 1024 ticks = 18.33min coverage
386 l1: SingleWheel<T>,
387
388 /// Null deadline sentinel value (u64::MAX)
389 /// Used to represent "no deadline" in cached values
390 null_deadline: u64,
391
392 /// Start time in nanoseconds - when the wheel was created
393 /// All deadlines are relative to this time
394 start_time_ns: u64,
395
396 /// Current time in nanoseconds (atomic for cross-thread updates)
397 /// Updated by external tick thread, read by worker threads
398 now_ns: AtomicU64,
399
400 /// Worker ID that owns this timer wheel
401 /// Used for cross-worker timer operations
402 worker_id: u32,
403
404 /// L0 coverage in nanoseconds (precalculated for efficiency)
405 /// Threshold for deciding whether to schedule in L0 or L1
406 l0_coverage_ns: u64,
407}
408
409impl<T> TimerWheel<T> {
410 const INITIAL_TICK_ALLOCATION: usize = 16;
411 const NULL_DEADLINE: u64 = u64::MAX;
412 const TICKS_PER_WHEEL: usize = 1024;
413
414 /// Create new hierarchical timer wheel
415 ///
416 /// # Arguments
417 /// * `tick_resolution` - L0 tick duration (must be power of 2 nanoseconds, typically 1.05ms)
418 /// * `ticks_per_wheel` - Number of spokes per wheel (must be power of 2, typically 1024)
419 /// * `worker_id` - Worker ID that owns this timer wheel
420 ///
421 /// # Timer Deadlines
422 /// All timer deadlines are specified as nanoseconds elapsed since wheel creation.
423 pub fn new(tick_resolution: Duration, ticks_per_wheel: usize, worker_id: u32) -> Self {
424 Self::with_allocation(
425 tick_resolution,
426 ticks_per_wheel,
427 Self::INITIAL_TICK_ALLOCATION,
428 worker_id,
429 )
430 }
431
432 /// Create hierarchical timer wheel with custom initial allocation per tick
433 ///
434 /// # Arguments
435 /// * `tick_resolution` - L0 tick duration (must be power of 2 nanoseconds)
436 /// * `ticks_per_wheel` - Number of spokes per wheel (must be power of 2)
437 /// * `initial_tick_allocation` - Initial slots per tick (must be power of 2)
438 /// * `worker_id` - Worker ID that owns this timer wheel
439 ///
440 /// # Design Notes
441 /// The L1 tick resolution equals one full L0 rotation, creating a natural
442 /// cascading relationship: when an L1 tick expires, all its timers are
443 /// within L0's coverage range and can be cascaded down.
444 pub fn with_allocation(
445 tick_resolution: Duration,
446 ticks_per_wheel: usize,
447 initial_tick_allocation: usize,
448 worker_id: u32,
449 ) -> Self {
450 let l0_tick_ns = tick_resolution.as_nanos() as u64;
451 // L0 coverage = one full rotation of L0 wheel
452 let l0_coverage_ns = l0_tick_ns * ticks_per_wheel as u64;
453 // L1 tick equals one full L0 rotation - this enables efficient cascading
454 let l1_tick_ns = l0_coverage_ns;
455
456 let l0 = SingleWheel::new(l0_tick_ns, ticks_per_wheel, initial_tick_allocation);
457 let l1 = SingleWheel::new(l1_tick_ns, ticks_per_wheel, initial_tick_allocation);
458
459 Self {
460 l0,
461 l1,
462 null_deadline: Self::NULL_DEADLINE,
463 start_time_ns: 0,
464 now_ns: AtomicU64::new(0),
465 worker_id,
466 l0_coverage_ns,
467 }
468 }
469
470 /// Schedule a timer for an absolute deadline
471 ///
472 /// The deadline is specified as nanoseconds since wheel creation. The timer
473 /// is automatically placed in the appropriate wheel (L0 for near-term, L1 for
474 /// long-term) based on the deadline value.
475 ///
476 /// # Arguments
477 /// * `deadline_ns` - Absolute deadline in nanoseconds (must be > 0 and in the future)
478 /// * `data` - User data to associate with this timer
479 ///
480 /// # Returns
481 /// Returns a timer ID that can be used to cancel the timer, or an error if
482 /// the deadline is invalid or capacity is exceeded.
483 pub fn schedule_timer(&mut self, deadline_ns: u64, data: T) -> Result<u64, TimerWheelError> {
484 // Validate deadline: must be > 0, >= start_time_ns, and >= now_ns
485 // This prevents scheduling timers in the past
486 let now_ns = self.now_ns.load(Ordering::Relaxed);
487 if deadline_ns == 0 || deadline_ns < self.start_time_ns || deadline_ns < now_ns {
488 return Err(TimerWheelError::InvalidDeadline);
489 }
490
491 // Route to appropriate wheel based on deadline distance
492 if deadline_ns < self.l0_coverage_ns {
493 // Near-term timer: schedule in L0 (fast wheel)
494 let (spoke, slot) = self
495 .l0
496 .schedule_internal(deadline_ns, self.start_time_ns, data, 0)?;
497 Ok(Self::encode_timer_id(0, spoke, slot)?)
498 } else {
499 // Long-term timer: schedule in L1 (medium wheel)
500 // Will cascade down to L0 as deadline approaches
501 let (spoke, slot) = self
502 .l1
503 .schedule_internal(deadline_ns, self.start_time_ns, data, 1)?;
504 Ok(Self::encode_timer_id(1, spoke, slot)?)
505 }
506 }
507
508 /// Cancel a previously scheduled timer
509 ///
510 /// O(1) operation - directly indexes into the wheel using the timer ID.
511 ///
512 /// # Arguments
513 /// * `timer_id` - Timer ID returned from `schedule_timer()`
514 ///
515 /// # Returns
516 /// Returns the timer data if found and cancelled, or an error if the timer
517 /// doesn't exist or the timer ID is invalid.
518 pub fn cancel_timer(&mut self, timer_id: u64) -> Result<Option<T>, TimerWheelError> {
519 // Decode timer ID to extract wheel level, spoke, and slot indices
520 let (level, spoke, slot) = Self::decode_timer_id(timer_id)?;
521 match level {
522 0 => self.l0.cancel_internal(spoke, slot),
523 1 => self.l1.cancel_internal(spoke, slot),
524 _ => Err(TimerWheelError::InvalidTimerId),
525 }
526 }
527
528 /// Poll for expired timers
529 ///
530 /// Checks for timers that have expired by the given `now_ns` time. First cascades
531 /// L1 timers that are now within L0 coverage down to L0, then polls L0 for
532 /// expired timers.
533 ///
534 /// # Arguments
535 /// * `now_ns` - Current time in nanoseconds
536 /// * `expiry_limit` - Maximum number of expired timers to return
537 /// * `output` - Buffer to write expired timer tuples: `(timer_id, deadline_ns, data)`
538 ///
539 /// # Returns
540 /// Returns the number of expired timers found (may be less than `expiry_limit`)
541 pub fn poll(
542 &mut self,
543 now_ns: u64,
544 expiry_limit: usize,
545 output: &mut Vec<(u64, u64, T)>,
546 ) -> usize {
547 // Update internal clock
548 self.now_ns.store(now_ns, Ordering::Release);
549 output.clear();
550
551 // Step 1: Cascade L1 -> L0 for timers now within L0 coverage
552 // This moves long-term timers to the fast wheel as their deadline approaches
553 self.cascade_l1_to_l0(now_ns);
554
555 // Step 2: Poll L0 for expired timers
556 // All expired timers should now be in L0 (either originally or cascaded)
557 self.poll_l0(now_ns, expiry_limit, output)
558 }
559
560 /// Cascade timers from L1 to L0
561 ///
562 /// Moves L1 timers that are now within L0's coverage range down to L0.
563 /// This ensures long-term timers are handled by the fast wheel as their
564 /// deadline approaches, enabling accurate expiration.
565 ///
566 /// Cascading happens when: deadline_ns < now_ns + l0_coverage_ns
567 /// This means the timer will expire within one L0 rotation, so it should
568 /// be in L0 for precise timing.
569 fn cascade_l1_to_l0(&mut self, now_ns: u64) {
570 // Update L1's current tick position
571 let l1_target_tick =
572 (now_ns.saturating_sub(self.start_time_ns)) >> self.l1.resolution_bits_to_shift;
573 self.l1.current_tick = l1_target_tick;
574
575 // Early exit if no timers to cascade
576 if self.l1.timer_count == 0 {
577 return;
578 }
579
580 // Calculate threshold: cascade timers with deadline within L0 coverage
581 // Threshold = now_ns + l0_coverage_ns
582 // Any timer with deadline < threshold should be in L0
583 let cascade_deadline_threshold = now_ns.saturating_add(self.l0_coverage_ns);
584
585 // Scan all L1 spokes for timers to cascade
586 // Note: This is O(ticks_per_wheel × tick_allocation) but only runs
587 // when there are L1 timers, and cascading is relatively infrequent
588 for spoke_index in 0..self.l1.ticks_per_wheel {
589 let tick_start = spoke_index << self.l1.allocation_bits_to_shift;
590
591 // Check each slot in this spoke
592 for slot_idx in 0..self.l1.tick_allocation {
593 let wheel_index = tick_start + slot_idx;
594
595 // Check if this timer should be cascaded to L0
596 // Criteria: deadline is within L0 coverage from now
597 let should_cascade = matches!(
598 self.l1.wheel[wheel_index].as_ref(),
599 Some(entry) if entry.deadline_ns < cascade_deadline_threshold
600 );
601
602 if should_cascade {
603 // Remove from L1 and reschedule in L0
604 if let Some(entry) = self.l1.wheel[wheel_index].take() {
605 self.l1.timer_count -= 1;
606 let TimerEntry {
607 deadline_ns, data, ..
608 } = entry;
609
610 // Invalidate L1 cache if this was the earliest timer
611 if deadline_ns == self.l1.cached_next_deadline {
612 self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
613 }
614
615 // Reschedule into L0 (may place in past tick if already expired)
616 let _ = self
617 .l0
618 .schedule_internal(deadline_ns, self.start_time_ns, data, 0);
619 }
620 }
621 }
622
623 // Reset L1 spoke hint after scanning (timers may have been removed)
624 self.l1.next_free_hint[spoke_index] = 0;
625 }
626
627 // Invalidate deadline caches after cascading
628 // L0 cache must be invalidated because cascaded timers may have deadlines
629 // in past ticks (already processed), requiring a full scan to recompute
630 if self.l1.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
631 self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
632 }
633 if self.l0.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
634 self.l0.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
635 }
636 }
637
638 /// Poll L0 wheel for expired timers
639 ///
640 /// Scans L0 for timers that have expired by `now_ns`. Uses an optimized
641 /// incremental scan starting from `current_tick` to avoid scanning the entire
642 /// wheel when only a few ticks have expired. Falls back to a full scan if
643 /// cascaded timers may exist in earlier ticks.
644 ///
645 /// # Performance
646 /// - Best case: O(expired_timers) - only scans expired ticks
647 /// - Worst case: O(ticks_per_wheel × tick_allocation) - full wheel scan
648 fn poll_l0(
649 &mut self,
650 now_ns: u64,
651 expiry_limit: usize,
652 output: &mut Vec<(u64, u64, T)>,
653 ) -> usize {
654 // Calculate which tick we should be at given current time
655 let target_tick =
656 (now_ns.saturating_sub(self.start_time_ns)) >> self.l0.resolution_bits_to_shift;
657
658 // Early exit if no timers
659 if self.l0.timer_count == 0 {
660 self.l0.current_tick = target_tick;
661 return 0;
662 }
663
664 // Optimized incremental scan: only scan ticks from current_tick to target_tick
665 // This avoids scanning the entire wheel when only a few ticks have expired
666 let start_tick = self.l0.current_tick;
667 let ticks_to_scan = if target_tick >= start_tick {
668 // Normal case: scan forward from current_tick to target_tick
669 // Cap at ticks_per_wheel to handle wrap-around
670 (target_tick - start_tick + 1).min(self.l0.ticks_per_wheel as u64) as usize
671 } else {
672 // Wrapped around: need to scan entire wheel
673 // (This shouldn't happen in practice, but handle it safely)
674 self.l0.ticks_per_wheel
675 };
676
677 // Scan ticks incrementally from current_tick forward
678 for tick_offset in 0..ticks_to_scan {
679 if output.len() >= expiry_limit {
680 break;
681 }
682
683 // Calculate spoke index with wrap-around using bitwise AND
684 let spoke = ((start_tick as usize + tick_offset) & self.l0.tick_mask) as usize;
685 let tick_start = spoke << self.l0.allocation_bits_to_shift;
686
687 // Check all slots in this spoke
688 for slot_idx in 0..self.l0.tick_allocation {
689 if output.len() >= expiry_limit {
690 break;
691 }
692
693 let wheel_index = tick_start + slot_idx;
694
695 if let Some(entry) = self.l0.wheel[wheel_index].as_ref() {
696 // Use deadline-based check (not tick-based) for accuracy
697 // A timer may expire even if its tick hasn't been reached yet
698 if now_ns >= entry.deadline_ns {
699 // Remove expired timer
700 let entry = self.l0.wheel[wheel_index].take().unwrap();
701 let TimerEntry {
702 deadline_ns, data, ..
703 } = entry;
704 self.l0.timer_count -= 1;
705
706 // Encode timer ID and add to output
707 let timer_id = Self::encode_timer_id(0, spoke, slot_idx).unwrap_or(0);
708 output.push((timer_id, deadline_ns, data));
709
710 // Invalidate cache if this was the earliest timer
711 if deadline_ns == self.l0.cached_next_deadline {
712 self.l0.cached_next_deadline = self.null_deadline;
713 }
714 }
715 }
716 }
717 }
718
719 // Fallback: full wheel scan if we didn't find enough expired timers
720 // This handles the case where cascaded timers were placed in ticks
721 // before current_tick (e.g., if current_tick advanced while L0 was empty)
722 if output.len() < expiry_limit && self.l0.timer_count > 0 {
723 // Full scan: check all ticks for expired timers
724 for spoke in 0..self.l0.ticks_per_wheel {
725 if output.len() >= expiry_limit {
726 break;
727 }
728
729 let tick_start = spoke << self.l0.allocation_bits_to_shift;
730
731 for slot_idx in 0..self.l0.tick_allocation {
732 if output.len() >= expiry_limit {
733 break;
734 }
735
736 let wheel_index = tick_start + slot_idx;
737
738 if let Some(entry) = self.l0.wheel[wheel_index].as_ref() {
739 if now_ns >= entry.deadline_ns {
740 let entry = self.l0.wheel[wheel_index].take().unwrap();
741 let TimerEntry {
742 deadline_ns, data, ..
743 } = entry;
744 self.l0.timer_count -= 1;
745
746 let timer_id = Self::encode_timer_id(0, spoke, slot_idx).unwrap_or(0);
747 output.push((timer_id, deadline_ns, data));
748
749 if deadline_ns == self.l0.cached_next_deadline {
750 self.l0.cached_next_deadline = self.null_deadline;
751 }
752 }
753 }
754 }
755 }
756 }
757
758 // Advance current_tick to target_tick since we've processed all expired timers
759 self.l0.current_tick = target_tick;
760 self.l0.poll_index = 0;
761
762 output.len()
763 }
764
765 /// Advance the wheel's current tick position without polling
766 ///
767 /// Useful for synchronizing the wheel's position when time advances
768 /// without immediately processing expired timers.
769 pub fn advance_to(&mut self, now_ns: u64) {
770 let new_tick =
771 (now_ns.saturating_sub(self.start_time_ns)) >> self.l0.resolution_bits_to_shift;
772 // Only advance, never go backwards
773 self.l0.current_tick = self.l0.current_tick.max(new_tick);
774 }
775
776 /// Get the time corresponding to the next tick
777 ///
778 /// Returns the absolute time (in nanoseconds) when the next tick will occur.
779 pub fn current_tick_time_ns(&self) -> u64 {
780 ((self.l0.current_tick + 1) << self.l0.resolution_bits_to_shift) + self.start_time_ns
781 }
782
783 /// Get the total number of active timers across both wheels
784 pub fn timer_count(&self) -> u64 {
785 self.l0.timer_count + self.l1.timer_count
786 }
787
788 /// Get the earliest deadline among all active timers
789 ///
790 /// Uses cached values when available for O(1) performance. Falls back to
791 /// full wheel scans when cache is invalidated (O(ticks_per_wheel × tick_allocation)).
792 ///
793 /// # Returns
794 /// Returns the earliest deadline in nanoseconds, or `None` if no timers are scheduled.
795 pub fn next_deadline(&mut self) -> Option<u64> {
796 let total_count = self.l0.timer_count + self.l1.timer_count;
797 if total_count == 0 {
798 return None;
799 }
800
801 // Get earliest deadline from L0 (with lazy cache recomputation)
802 let l0_next = if self.l0.timer_count > 0 {
803 if self.l0.cached_next_deadline == SingleWheel::<T>::NULL_DEADLINE {
804 self.recompute_l0_deadline();
805 }
806 if self.l0.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
807 Some(self.l0.cached_next_deadline)
808 } else {
809 None
810 }
811 } else {
812 None
813 };
814
815 // Get earliest deadline from L1 (with lazy cache recomputation)
816 let l1_next = if self.l1.timer_count > 0 {
817 if self.l1.cached_next_deadline == SingleWheel::<T>::NULL_DEADLINE {
818 self.recompute_l1_deadline();
819 }
820 if self.l1.cached_next_deadline != SingleWheel::<T>::NULL_DEADLINE {
821 Some(self.l1.cached_next_deadline)
822 } else {
823 None
824 }
825 } else {
826 None
827 };
828
829 // Return the minimum of L0 and L1 earliest deadlines
830 match (l0_next, l1_next) {
831 (Some(d0), Some(d1)) => Some(d0.min(d1)),
832 (Some(d0), None) => Some(d0),
833 (None, Some(d1)) => Some(d1),
834 (None, None) => None,
835 }
836 }
837
838 /// Recompute the cached earliest deadline for L0
839 ///
840 /// Scans the entire L0 wheel to find the earliest deadline. This is expensive
841 /// (O(ticks_per_wheel × tick_allocation)) but only runs when the cache is invalidated.
842 ///
843 /// Must scan the entire wheel because cascaded timers may exist in any tick,
844 /// not just those after current_tick.
845 fn recompute_l0_deadline(&mut self) {
846 if self.l0.timer_count == 0 {
847 self.l0.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
848 return;
849 }
850
851 let mut earliest = SingleWheel::<T>::NULL_DEADLINE;
852
853 // Scan entire wheel to find earliest deadline
854 // Must scan all ticks because cascaded timers can be placed anywhere
855 for spoke in 0..self.l0.ticks_per_wheel {
856 for slot_idx in 0..self.l0.tick_allocation {
857 let wheel_index = (spoke << self.l0.allocation_bits_to_shift) + slot_idx;
858 if let Some(entry) = self.l0.wheel[wheel_index].as_ref() {
859 if entry.deadline_ns < earliest {
860 earliest = entry.deadline_ns;
861 }
862 }
863 }
864 }
865
866 self.l0.cached_next_deadline = earliest;
867 }
868
869 /// Recompute the cached earliest deadline for L1
870 ///
871 /// Scans the entire L1 wheel to find the earliest deadline. This is expensive
872 /// (O(ticks_per_wheel × tick_allocation)) but only runs when the cache is invalidated.
873 fn recompute_l1_deadline(&mut self) {
874 if self.l1.timer_count == 0 {
875 self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
876 return;
877 }
878
879 let mut earliest = SingleWheel::<T>::NULL_DEADLINE;
880
881 // Scan entire wheel to find earliest deadline
882 for spoke in 0..self.l1.ticks_per_wheel {
883 for slot_idx in 0..self.l1.tick_allocation {
884 let wheel_index = (spoke << self.l1.allocation_bits_to_shift) + slot_idx;
885 if let Some(entry) = self.l1.wheel[wheel_index].as_ref() {
886 if entry.deadline_ns < earliest {
887 earliest = entry.deadline_ns;
888 }
889 }
890 }
891 }
892
893 self.l1.cached_next_deadline = earliest;
894 }
895
896 #[inline]
897 pub fn now_ns(&self) -> u64 {
898 self.now_ns.load(Ordering::Relaxed)
899 }
900
901 #[inline]
902 pub fn set_now_ns(&mut self, now_ns: u64) {
903 self.now_ns.store(now_ns, Ordering::Release);
904 }
905
906 #[inline]
907 pub fn worker_id(&self) -> u32 {
908 self.worker_id
909 }
910
911 pub fn clear(&mut self) {
912 for slot in &mut self.l0.wheel {
913 *slot = None;
914 }
915 for slot in &mut self.l1.wheel {
916 *slot = None;
917 }
918
919 self.l0.timer_count = 0;
920 self.l1.timer_count = 0;
921 self.l0.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
922 self.l1.cached_next_deadline = SingleWheel::<T>::NULL_DEADLINE;
923 }
924
925 /// Encode wheel level, spoke, and slot into a compact timer ID
926 ///
927 /// Timer ID format (64 bits):
928 /// - Bits 62-63: Level (0=L0, 1=L1)
929 /// - Bits 32-61: Spoke index (30 bits, max 1<<30 spokes)
930 /// - Bits 0-31: Slot index (32 bits, max 1<<32 slots)
931 ///
932 /// This encoding allows O(1) timer cancellation by directly decoding
933 /// the wheel location from the timer ID.
934 #[inline]
935 fn encode_timer_id(level: u8, spoke: usize, slot: usize) -> Result<u64, TimerWheelError> {
936 // Validate bounds to prevent truncation during encoding
937 if spoke >= (1 << 30) {
938 return Err(TimerWheelError::InvalidTimerId);
939 }
940 if slot >= (1 << 32) {
941 return Err(TimerWheelError::InvalidTimerId);
942 }
943 if level > 1 {
944 return Err(TimerWheelError::InvalidTimerId);
945 }
946
947 // Pack into 64-bit integer: level (2 bits) | spoke (30 bits) | slot (32 bits)
948 Ok(((level as u64) << 62) | ((spoke as u64) << 32) | (slot as u64))
949 }
950
951 /// Decode a timer ID into wheel level, spoke, and slot indices
952 ///
953 /// Reverse of `encode_timer_id()`. Extracts the three components
954 /// needed to locate a timer in the wheel structure.
955 #[inline]
956 fn decode_timer_id(timer_id: u64) -> Result<(u8, usize, usize), TimerWheelError> {
957 // Extract level from top 2 bits
958 let level = ((timer_id >> 62) & 0x3) as u8;
959 // Extract spoke from middle 30 bits
960 let spoke = ((timer_id >> 32) & 0x3FFFFFFF) as usize;
961 // Extract slot from bottom 32 bits
962 let slot = (timer_id & 0xFFFFFFFF) as usize;
963
964 // Validate decoded level
965 if level > 1 {
966 return Err(TimerWheelError::InvalidTimerId);
967 }
968
969 Ok((level, spoke, slot))
970 }
971}
972
973#[cfg(test)]
974mod tests {
975 use super::*;
976
977 #[test]
978 fn test_basic_scheduling() {
979 let mut wheel = TimerWheel::<u32>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
980
981 let timer_id = wheel.schedule_timer(100_000_000, 42).unwrap(); // 100ms
982 assert_eq!(wheel.timer_count(), 1);
983
984 let data = wheel.cancel_timer(timer_id).unwrap().unwrap();
985 assert_eq!(data, 42);
986 assert_eq!(wheel.timer_count(), 0);
987 }
988
989 #[test]
990 fn test_l0_timer() {
991 let mut wheel = TimerWheel::<&str>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
992
993 // 500ms timer should go to L0
994 wheel.schedule_timer(500_000_000, "fast").unwrap();
995 assert_eq!(wheel.l0.timer_count, 1);
996 assert_eq!(wheel.l1.timer_count, 0);
997 }
998
999 #[test]
1000 fn test_l1_timer() {
1001 let mut wheel = TimerWheel::<&str>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1002
1003 // 5s timer should go to L1
1004 wheel.schedule_timer(5_000_000_000, "slow").unwrap();
1005 assert_eq!(wheel.l0.timer_count, 0);
1006 assert_eq!(wheel.l1.timer_count, 1);
1007 }
1008
1009 #[test]
1010 fn test_cascade() {
1011 let mut wheel = TimerWheel::<&str>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1012
1013 // Schedule timer at 2 seconds (in L1 tick 1, which covers 1.07s-2.15s)
1014 wheel.schedule_timer(2_000_000_000, "cascaded").unwrap();
1015 assert_eq!(wheel.l1.timer_count, 1);
1016 assert_eq!(wheel.l0.timer_count, 0);
1017
1018 // Poll at 2.2 seconds - should cascade tick 1 (we're now at tick 2)
1019 // L1 tick 2 starts at 2.147s, so polling at 2.2s triggers cascade of tick 1
1020 let mut output = Vec::new();
1021 wheel.poll(2_200_000_000, 100, &mut output);
1022 // Timer cascaded to L0 but hasn't fired yet (deadline is 2.0s, we're at 2.2s so it fires)
1023 assert_eq!(output.len(), 1);
1024 assert_eq!(output[0].2, "cascaded");
1025 }
1026
1027 #[test]
1028 fn test_error_handling() {
1029 let mut wheel = TimerWheel::<u32>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1030
1031 // Test invalid deadline (zero)
1032 assert!(matches!(
1033 wheel.schedule_timer(0, 42),
1034 Err(TimerWheelError::InvalidDeadline)
1035 ));
1036
1037 // Test invalid timer ID
1038 assert!(matches!(
1039 wheel.cancel_timer(0xFFFFFFFFFFFFFFFF),
1040 Err(TimerWheelError::InvalidTimerId)
1041 ));
1042 }
1043
1044 #[test]
1045 fn test_timer_validation() {
1046 let mut wheel = TimerWheel::<u32>::new(Duration::from_nanos(1024 * 1024), 1024, 0);
1047
1048 let timer_id = wheel.schedule_timer(100_000_000, 42).unwrap();
1049
1050 // Cancel the timer
1051 let data = wheel.cancel_timer(timer_id).unwrap().unwrap();
1052 assert_eq!(data, 42);
1053
1054 // Try to cancel again - should fail
1055 assert!(matches!(
1056 wheel.cancel_timer(timer_id),
1057 Err(TimerWheelError::TimerNotFound)
1058 ));
1059 }
1060}