kestrel_timer/
wheel.rs

1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{
4    TaskCompletion, TaskHandle, TaskId, TaskLocation, TaskTypeWithCompletionNotifier,
5    TimerTaskForWheel, TimerTaskWithCompletionNotifier,
6};
7use deferred_map::DeferredMap;
8use std::time::Duration;
9
10pub struct WheelAdvanceResult {
11    pub id: TaskId,
12    pub callback: Option<CallbackWrapper>,
13}
14
15/// Timing wheel single layer data structure
16///
17/// 时间轮单层数据结构
18struct WheelLayer {
19    /// Slot array, each slot stores a group of timer tasks
20    ///
21    /// 槽数组,每个槽存储一组定时器任务
22    slots: Vec<Vec<TimerTaskForWheel>>,
23
24    /// Current time pointer (tick index)
25    ///
26    /// 当前时间指针(tick 索引)
27    current_tick: u64,
28
29    /// Slot count
30    ///
31    /// 槽数量
32    slot_count: usize,
33
34    /// Duration of each tick
35    ///
36    /// 每个 tick 的持续时间
37    tick_duration: Duration,
38
39    /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
40    ///
41    /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
42    tick_duration_ms: u64,
43
44    /// Cache: slot mask (slot_count - 1) - for fast modulo operation
45    ///
46    /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
47    slot_mask: usize,
48}
49
50impl WheelLayer {
51    /// Create a new wheel layer
52    ///
53    /// 创建一个新的时间轮层
54    fn new(slot_count: usize, tick_duration: Duration) -> Self {
55        let mut slots = Vec::with_capacity(slot_count);
56        // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
57        // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
58        //
59        // 为每个槽预分配容量以减少后续 push 时的重新分配
60        // 大多数槽通常包含 0-4 个任务,预分配容量为 4
61        for _ in 0..slot_count {
62            slots.push(Vec::with_capacity(4));
63        }
64
65        let tick_duration_ms = tick_duration.as_millis() as u64;
66        let slot_mask = slot_count - 1;
67
68        Self {
69            slots,
70            current_tick: 0,
71            slot_count,
72            tick_duration,
73            tick_duration_ms,
74            slot_mask,
75        }
76    }
77
78    /// Calculate the number of ticks corresponding to the delay
79    ///
80    /// 计算延迟对应的 tick 数量
81    fn delay_to_ticks(&self, delay: Duration) -> u64 {
82        let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
83        ticks.max(1) // at least 1 tick (至少 1 个 tick)
84    }
85}
86
87/// Timing wheel data structure (hierarchical mode)
88///
89/// Now uses DeferredMap for O(1) task lookup and generational safety.
90///
91/// 时间轮数据结构(分层模式)
92///
93/// 现在使用 DeferredMap 实现 O(1) 任务查找和代数安全
94pub struct Wheel {
95    /// L0 layer (bottom layer)
96    ///
97    /// L0 层(底层)
98    l0: WheelLayer,
99
100    /// L1 layer (top layer)
101    ///
102    /// L1 层(顶层)
103    l1: WheelLayer,
104
105    /// L1 tick ratio relative to L0 tick
106    ///
107    /// L1 tick 相对于 L0 tick 的比率
108    pub(crate) l1_tick_ratio: u64,
109
110    /// Task index for fast lookup and cancellation using DeferredMap
111    ///
112    /// Keys are TaskIds (u64 from DeferredMap), values are TaskLocations
113    ///
114    /// 任务索引,使用 DeferredMap 实现快速查找和取消
115    ///
116    /// 键是 TaskId(来自 DeferredMap 的 u64),值是 TaskLocation
117    pub(crate) task_index: DeferredMap<TaskLocation>,
118
119    /// Batch processing configuration
120    ///
121    /// 批处理配置
122    batch_config: BatchConfig,
123
124    /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
125    ///
126    /// 缓存:L0 层容量(毫秒)- 避免重复计算
127    l0_capacity_ms: u64,
128
129    /// Cache: L1 layer capacity in ticks - avoid repeated calculation
130    ///
131    /// 缓存:L1 层容量(tick 数)- 避免重复计算
132    l1_capacity_ticks: u64,
133}
134
135impl Wheel {
136    /// Create new timing wheel
137    ///
138    /// # Parameters
139    /// - `config`: Timing wheel configuration (already validated)
140    /// - `batch_config`: Batch processing configuration
141    ///
142    /// # Notes
143    /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
144    /// Now uses DeferredMap for task indexing with generational safety.
145    ///
146    /// 创建新的时间轮
147    ///
148    /// # 参数
149    /// - `config`: 时间轮配置(已验证)
150    /// - `batch_config`: 批处理配置
151    ///
152    /// # 注意
153    /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
154    /// 现在使用 DeferredMap 进行任务索引,具有代数安全特性。
155    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
156        let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
157        let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
158
159        // Calculate L1 tick ratio relative to L0 tick
160        // 计算 L1 tick 相对于 L0 tick 的比率
161        let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
162
163        // Pre-calculate capacity to avoid repeated calculation in insert
164        // 预计算容量,避免在 insert 中重复计算
165        let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
166        let l1_capacity_ticks = l1.slot_count as u64;
167
168        // Estimate initial capacity for DeferredMap based on L0 slot count
169        // 根据 L0 槽数量估算 DeferredMap 的初始容量
170        let estimated_capacity = (l0.slot_count / 4).max(64);
171
172        Self {
173            l0,
174            l1,
175            l1_tick_ratio,
176            task_index: DeferredMap::with_capacity(estimated_capacity),
177            batch_config,
178            l0_capacity_ms,
179            l1_capacity_ticks,
180        }
181    }
182
183    /// Get current tick (L0 layer tick)
184    ///
185    /// 获取当前 tick(L0 层 tick)
186    #[allow(dead_code)]
187    pub fn current_tick(&self) -> u64 {
188        self.l0.current_tick
189    }
190
191    /// Get tick duration (L0 layer tick duration)
192    ///
193    /// 获取 tick 持续时间(L0 层 tick 持续时间)
194    #[allow(dead_code)]
195    pub fn tick_duration(&self) -> Duration {
196        self.l0.tick_duration
197    }
198
199    /// Get slot count (L0 layer slot count)
200    ///
201    /// 获取槽数量(L0 层槽数量)
202    #[allow(dead_code)]
203    pub fn slot_count(&self) -> usize {
204        self.l0.slot_count
205    }
206
207    /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
208    ///
209    /// 计算延迟对应的 tick 数量(基于 L0 层)
210    #[allow(dead_code)]
211    pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
212        self.l0.delay_to_ticks(delay)
213    }
214
215    /// Determine which layer the delay should be inserted into
216    ///
217    /// # Returns
218    /// Returns: (layer, ticks, rounds)
219    /// - Layer: 0 = L0, 1 = L1
220    /// - Ticks: number of ticks calculated from current tick
221    /// - Rounds: number of rounds (only used for very long delays in L1 layer)
222    ///
223    /// 确定延迟应该插入到哪一层
224    ///
225    /// # 返回值
226    /// 返回:(层级, ticks, 轮数)
227    /// - 层级:0 = L0, 1 = L1
228    /// - Ticks:从当前 tick 计算的 tick 数量
229    /// - 轮数:轮数(仅用于 L1 层的超长延迟)
230    #[inline(always)]
231    fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
232        let delay_ms = delay.as_millis() as u64;
233
234        // Fast path: most tasks are within L0 range (using cached capacity)
235        // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
236        if delay_ms < self.l0_capacity_ms {
237            let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
238            return (0, l0_ticks, 0);
239        }
240
241        // Slow path: L1 layer tasks (using cached values)
242        // 慢速路径:L1 层任务(使用缓存的值)
243        let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
244
245        if l1_ticks < self.l1_capacity_ticks {
246            (1, l1_ticks, 0)
247        } else {
248            let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
249            (1, l1_ticks, rounds)
250        }
251    }
252
253    /// Allocate a handle from DeferredMap
254    ///
255    /// 从 DeferredMap 分配一个 handle
256    ///
257    /// # Returns
258    /// A unique handle for later insertion
259    ///
260    /// # 返回值
261    /// 用于后续插入的唯一 handle
262    pub fn allocate_handle(&mut self) -> TaskHandle {
263        TaskHandle::new(self.task_index.allocate_handle())
264    }
265
266    /// Batch allocate handles from DeferredMap
267    ///
268    /// 从 DeferredMap 批量分配 handles
269    ///
270    /// # Parameters
271    /// - `count`: Number of handles to allocate
272    ///
273    /// # Returns
274    /// Vector of unique handles for later batch insertion
275    ///
276    /// # 参数
277    /// - `count`: 要分配的 handle 数量
278    ///
279    /// # 返回值
280    /// 用于后续批量插入的唯一 handles 向量
281    pub fn allocate_handles(&mut self, count: usize) -> Vec<TaskHandle> {
282        let mut handles = Vec::with_capacity(count);
283        for _ in 0..count {
284            handles.push(TaskHandle::new(self.task_index.allocate_handle()));
285        }
286        handles
287    }
288
289    /// Insert timer task
290    ///
291    /// # Parameters
292    /// - `handle`: Handle for the task
293    /// - `task`: Timer task with completion notifier
294    ///
295    /// # Returns
296    /// Unique identifier of the task (TaskId) - now generated from DeferredMap
297    ///
298    /// # Implementation Details
299    /// - Allocates Handle from DeferredMap to generate TaskId with generational safety
300    /// - Automatically calculate the layer and slot where the task should be inserted
301    /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
302    /// - Use bit operations to optimize slot index calculation
303    /// - Use DeferredMap for O(1) lookup and cancellation with generation checking
304    ///
305    /// 插入定时器任务
306    ///
307    /// # 参数
308    /// - `handle`: 任务的 handle
309    /// - `task`: 带完成通知器的定时器任务
310    ///
311    /// # 返回值
312    /// 任务的唯一标识符(TaskId)- 现在从 DeferredMap 生成
313    ///
314    /// # 实现细节
315    /// - 自动计算任务应该插入的层级和槽位
316    /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
317    /// - 使用位运算优化槽索引计算
318    /// - 使用 DeferredMap 实现 O(1) 查找和取消,带代数检查
319    #[inline]
320    pub fn insert(&mut self, handle: TaskHandle, task: TimerTaskWithCompletionNotifier) {
321        let task_id = handle.task_id();
322
323        let (level, ticks, rounds) = self.determine_layer(task.delay);
324
325        // Use match to reduce branches, and use cached slot mask
326        // 使用 match 减少分支,并使用缓存的槽掩码
327        let (current_tick, slot_mask, slots) = match level {
328            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
329            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
330        };
331
332        let total_ticks = current_tick + ticks;
333        let slot_index = (total_ticks as usize) & slot_mask;
334
335        // Create task with the assigned TaskId
336        // 使用分配的 TaskId 创建任务
337        let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
338
339        // Get the index position of the task in Vec (the length before insertion is the index of the new task)
340        // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
341        let vec_index = slots[slot_index].len();
342        let location = TaskLocation::new(level, slot_index, vec_index);
343
344        // Insert task into slot
345        // 将任务插入槽中
346        slots[slot_index].push(task);
347
348        // Insert task location into DeferredMap using handle
349        // 使用 handle 将任务位置插入 DeferredMap
350        self.task_index
351            .insert(handle.into_handle(), location);
352    }
353
354    /// Batch insert timer tasks
355    ///
356    /// # Parameters
357    /// - `handles`: List of pre-allocated handles for tasks
358    /// - `tasks`: List of timer tasks with completion notifiers
359    ///
360    /// # Returns
361    /// - `Ok(())` if all tasks are successfully inserted
362    /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
363    ///
364    /// # Performance Advantages
365    /// - Reduce repeated boundary checks and capacity adjustments
366    /// - For tasks with the same delay, calculation results can be reused
367    /// - Uses DeferredMap for efficient task indexing with generational safety
368    ///
369    /// 批量插入定时器任务
370    ///
371    /// # 参数
372    /// - `handles`: 任务的预分配 handles 列表
373    /// - `tasks`: 带完成通知器的定时器任务列表
374    ///
375    /// # 返回值
376    /// - `Ok(())` 如果所有任务成功插入
377    /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
378    ///
379    /// # 性能优势
380    /// - 减少重复的边界检查和容量调整
381    /// - 对于相同延迟的任务,可以重用计算结果
382    /// - 使用 DeferredMap 实现高效的任务索引,具有代数安全特性
383    #[inline]
384    pub fn insert_batch(
385        &mut self,
386        handles: Vec<TaskHandle>,
387        tasks: Vec<TimerTaskWithCompletionNotifier>,
388    ) -> Result<(), crate::error::TimerError> {
389        // Validate that handles and tasks have the same length
390        // 验证 handles 和 tasks 长度相同
391        if handles.len() != tasks.len() {
392            return Err(crate::error::TimerError::BatchLengthMismatch {
393                handles_len: handles.len(),
394                tasks_len: tasks.len(),
395            });
396        }
397
398        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
399            let task_id = handle.task_id();
400
401            let (level, ticks, rounds) = self.determine_layer(task.delay);
402
403            // Use match to reduce branches, and use cached slot mask
404            // 使用 match 减少分支,并使用缓存的槽掩码
405            let (current_tick, slot_mask, slots) = match level {
406                0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
407                _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
408            };
409
410            let total_ticks = current_tick + ticks;
411            let slot_index = (total_ticks as usize) & slot_mask;
412
413            // Create task with the assigned TaskId
414            // 使用分配的 TaskId 创建任务
415            let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
416
417            // Get the index position of the task in Vec
418            // 获取任务在 Vec 中的索引位置
419            let vec_index = slots[slot_index].len();
420            let location = TaskLocation::new(level, slot_index, vec_index);
421
422            // Insert task into slot
423            // 将任务插入槽中
424            slots[slot_index].push(task);
425
426            // Insert task location into DeferredMap using handle
427            // 使用 handle 将任务位置插入 DeferredMap
428            self.task_index
429                .insert(handle.into_handle(), location);
430        }
431
432        Ok(())
433    }
434
435    /// Cancel timer task
436    ///
437    /// # Parameters
438    /// - `task_id`: Task ID
439    ///
440    /// # Returns
441    /// Returns true if the task exists and is successfully cancelled, otherwise returns false
442    ///
443    /// Now uses DeferredMap for safe task removal with generational checking
444    ///
445    /// 取消定时器任务
446    ///
447    /// # 参数
448    /// - `task_id`: 任务 ID
449    ///
450    /// # 返回值
451    /// 如果任务存在且成功取消则返回 true,否则返回 false
452    ///
453    /// 现在使用 DeferredMap 实现安全的任务移除,带代数检查
454    #[inline]
455    pub fn cancel(&mut self, task_id: TaskId) -> bool {
456        // Remove task location from DeferredMap using TaskId as key
457        // 使用 TaskId 作为 key 从 DeferredMap 中移除任务位置
458        let location = match self.task_index.remove(task_id.key()) {
459            Some(loc) => loc,
460            None => return false, // Task not found or already removed (generation mismatch)
461        };
462
463        // Use match to get slot reference, reduce branches
464        // 使用 match 获取槽引用,减少分支
465        let slot = match location.level {
466            0 => &mut self.l0.slots[location.slot_index],
467            _ => &mut self.l1.slots[location.slot_index],
468        };
469
470        // Boundary check and ID verification
471        // 边界检查和 ID 验证
472        if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
473            // Index inconsistent - this shouldn't happen with DeferredMap, but handle it anyway
474            // 索引不一致 - DeferredMap 不应该出现这种情况,但还是处理一下
475            return false;
476        }
477
478        // Use swap_remove to remove task, record swapped task ID
479        // 使用 swap_remove 移除任务,记录被交换的任务 ID
480        let removed_task = slot.swap_remove(location.vec_index);
481
482        // Ensure the correct task was removed
483        // 确保移除了正确的任务
484        debug_assert_eq!(removed_task.get_id(), task_id);
485
486        match removed_task.into_task_type() {
487            TaskTypeWithCompletionNotifier::OneShot {
488                completion_notifier,
489            } => {
490                // Use Notify + AtomicU8 for zero-allocation notification
491                // 使用 Notify + AtomicU8 实现零分配通知
492                completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
493            }
494            TaskTypeWithCompletionNotifier::Periodic {
495                completion_notifier,
496                ..
497            } => {
498                // Use flume for high-performance periodic notification
499                // 使用 flume 实现高性能周期通知
500                let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
501            }
502        }
503
504        // If a swap occurred (vec_index is not the last element)
505        // 如果发生了交换(vec_index 不是最后一个元素)
506        if location.vec_index < slot.len() {
507            let swapped_task_id = slot[location.vec_index].get_id();
508            // Update swapped element's index in one go, using DeferredMap's get_mut
509            // 一次性更新被交换元素的索引,使用 DeferredMap 的 get_mut
510            if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
511                swapped_location.vec_index = location.vec_index;
512            }
513        }
514
515        true
516    }
517
518    /// Batch cancel timer tasks
519    ///
520    /// # Parameters
521    /// - `task_ids`: List of task IDs to cancel
522    ///
523    /// # Returns
524    /// Number of successfully cancelled tasks
525    ///
526    /// # Performance Advantages
527    /// - Reduce repeated HashMap lookup overhead
528    /// - Multiple cancellation operations on the same slot can be batch processed
529    /// - Use unstable sort to improve performance
530    /// - Small batch optimization: skip sorting based on configuration threshold, process directly
531    ///
532    /// 批量取消定时器任务
533    ///
534    /// # 参数
535    /// - `task_ids`: 要取消的任务 ID 列表
536    ///
537    /// # 返回值
538    /// 成功取消的任务数量
539    ///
540    /// # 性能优势
541    /// - 减少重复的 HashMap 查找开销
542    /// - 同一槽位的多个取消操作可以批量处理
543    /// - 使用不稳定排序提高性能
544    /// - 小批量优化:根据配置阈值跳过排序,直接处理
545    #[inline]
546    pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
547        let mut cancelled_count = 0;
548
549        // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
550        // 小批量优化:逐个取消以避免分组和排序开销
551        if task_ids.len() <= self.batch_config.small_batch_threshold {
552            for &task_id in task_ids {
553                if self.cancel(task_id) {
554                    cancelled_count += 1;
555                }
556            }
557            return cancelled_count;
558        }
559
560        // Group by layer and slot to optimize batch cancellation
561        // Use SmallVec to avoid heap allocation in most cases
562        // 按层级和槽位分组以优化批量取消
563        // 使用 SmallVec 避免在大多数情况下进行堆分配
564        let l0_slot_count = self.l0.slot_count;
565        let l1_slot_count = self.l1.slot_count;
566
567        let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
568        let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
569
570        // Collect information of tasks to be cancelled
571        // 收集要取消的任务信息
572        for &task_id in task_ids {
573            // Use DeferredMap's get with TaskId key
574            // 使用 DeferredMap 的 get,传入 TaskId key
575            if let Some(location) = self.task_index.get(task_id.key()) {
576                if location.level == 0 {
577                    l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
578                } else {
579                    l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
580                }
581            }
582        }
583
584        // Process L0 layer cancellation
585        // 处理 L0 层取消
586        for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
587            if tasks.is_empty() {
588                continue;
589            }
590
591            // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
592            // 按 vec_index 降序排序,从后向前删除以避免索引失效
593            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
594
595            let slot = &mut self.l0.slots[slot_index];
596
597            for &(task_id, vec_index) in tasks.iter() {
598                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
599                    let removed_task = slot.swap_remove(vec_index);
600                    match removed_task.into_task_type() {
601                        TaskTypeWithCompletionNotifier::OneShot {
602                            completion_notifier,
603                        } => {
604                            // Use Notify + AtomicU8 for zero-allocation notification
605                            // 使用 Notify + AtomicU8 实现零分配通知
606                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
607                        }
608                        TaskTypeWithCompletionNotifier::Periodic {
609                            completion_notifier,
610                            ..
611                        } => {
612                            // Use flume for high-performance periodic notification
613                            // 使用 flume 实现高性能周期通知
614                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
615                        }
616                    }
617
618                    if vec_index < slot.len() {
619                        let swapped_task_id = slot[vec_index].get_id();
620                        // Use DeferredMap's get_mut with TaskId key
621                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
622                        if let Some(swapped_location) =
623                            self.task_index.get_mut(swapped_task_id.key())
624                        {
625                            swapped_location.vec_index = vec_index;
626                        }
627                    }
628
629                    // Use DeferredMap's remove with TaskId key
630                    // 使用 DeferredMap 的 remove,传入 TaskId key
631                    self.task_index.remove(task_id.key());
632                    cancelled_count += 1;
633                }
634            }
635        }
636
637        // Process L1 layer cancellation
638        // 处理 L1 层取消
639        for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
640            if tasks.is_empty() {
641                continue;
642            }
643
644            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
645
646            let slot = &mut self.l1.slots[slot_index];
647
648            for &(task_id, vec_index) in tasks.iter() {
649                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
650                    let removed_task = slot.swap_remove(vec_index);
651
652                    match removed_task.into_task_type() {
653                        TaskTypeWithCompletionNotifier::OneShot {
654                            completion_notifier,
655                        } => {
656                            // Use Notify + AtomicU8 for zero-allocation notification
657                            // 使用 Notify + AtomicU8 实现零分配通知
658                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
659                        }
660                        TaskTypeWithCompletionNotifier::Periodic {
661                            completion_notifier,
662                            ..
663                        } => {
664                            // Use flume for high-performance periodic notification
665                            // 使用 flume 实现高性能周期通知
666                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
667                        }
668                    }
669
670                    if vec_index < slot.len() {
671                        let swapped_task_id = slot[vec_index].get_id();
672                        // Use DeferredMap's get_mut with TaskId key
673                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
674                        if let Some(swapped_location) =
675                            self.task_index.get_mut(swapped_task_id.key())
676                        {
677                            swapped_location.vec_index = vec_index;
678                        }
679                    }
680
681                    // Use DeferredMap's remove with TaskId key
682                    // 使用 DeferredMap 的 remove,传入 TaskId key
683                    self.task_index.remove(task_id.key());
684                    cancelled_count += 1;
685                }
686            }
687        }
688
689        cancelled_count
690    }
691
692    /// Reinsert periodic task with the same TaskId
693    ///
694    /// Used to automatically reschedule periodic tasks after they expire
695    ///
696    /// TaskId remains unchanged, only location is updated in DeferredMap
697    ///
698    /// 重新插入周期性任务(保持相同的 TaskId)
699    ///
700    /// 用于在周期性任务过期后自动重新调度
701    ///
702    /// TaskId 保持不变,只更新 DeferredMap 中的位置
703    fn reinsert_periodic_task(&mut self, task_id: TaskId, task: TimerTaskWithCompletionNotifier) {
704        // Determine which layer the interval should be inserted into
705        // This method is only called by periodic tasks, so the interval is guaranteed to be Some.
706        // 确定间隔应该插入到哪一层
707        // 该方法只能由周期性任务调用,所以间隔是 guaranteed 应当保证为 Some.
708        let (level, ticks, rounds) = self.determine_layer(task.get_interval().unwrap());
709
710        // Use match to reduce branches, and use cached slot mask
711        // 使用 match 减少分支,并使用缓存的槽掩码
712        let (current_tick, slot_mask, slots) = match level {
713            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
714            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
715        };
716
717        let total_ticks = current_tick + ticks;
718        let slot_index = (total_ticks as usize) & slot_mask;
719
720        // Get the index position of the task in Vec
721        // 获取任务在 Vec 中的索引位置
722        let vec_index = slots[slot_index].len();
723        let new_location = TaskLocation::new(level, slot_index, vec_index);
724
725        // Insert task into slot
726        // 将任务插入槽中
727        slots[slot_index].push(TimerTaskForWheel::new_with_id(
728            task_id,
729            task,
730            total_ticks,
731            rounds,
732        ));
733
734        // Update task location in DeferredMap (doesn't remove, just updates)
735        // 更新 DeferredMap 中的任务位置(不删除,仅更新)
736        if let Some(location) = self.task_index.get_mut(task_id.key()) {
737            *location = new_location;
738        }
739    }
740
741    /// Advance the timing wheel by one tick, return all expired tasks
742    ///
743    /// # Returns
744    /// List of expired tasks
745    ///
746    /// # Implementation Details
747    /// - L0 layer advances 1 tick each time (no rounds check)
748    /// - L1 layer advances once every (L1_tick / L0_tick) times
749    /// - L1 expired tasks are batch demoted to L0
750    ///
751    /// 推进时间轮一个 tick,返回所有过期的任务
752    ///
753    /// # 返回值
754    /// 过期任务列表
755    ///
756    /// # 实现细节
757    /// - L0 层每次推进 1 个 tick(无轮数检查)
758    /// - L1 层每 (L1_tick / L0_tick) 次推进一次
759    /// - L1 层过期任务批量降级到 L0
760    pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
761        // Advance L0 layer
762        // 推进 L0 层
763        self.l0.current_tick += 1;
764
765        let mut expired_tasks = Vec::new();
766
767        // Process L0 layer expired tasks
768        // Distinguish between one-shot and periodic tasks
769        // 处理 L0 层过期任务
770        // 区分一次性任务和周期性任务
771        let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
772
773        // Collect periodic tasks to reinsert
774        // 收集需要重新插入的周期性任务
775        let mut periodic_tasks_to_reinsert = Vec::new();
776
777        {
778            let l0_slot = &mut self.l0.slots[l0_slot_index];
779
780            // Process all tasks in the current slot by always removing from index 0
781            // This avoids index tracking issues with swap_remove
782            // 通过始终从索引 0 移除来处理当前槽中的所有任务
783            // 这避免了 swap_remove 的索引跟踪问题
784            while !l0_slot.is_empty() {
785                // Always remove from index 0
786                // 始终从索引 0 移除
787                let task_with_notifier = l0_slot.swap_remove(0);
788                let task_id = task_with_notifier.get_id();
789
790                // Determine if this is a periodic task (check before consuming)
791                // 判断是否为周期性任务(在消耗前检查)
792                let is_periodic = matches!(
793                    task_with_notifier.task.task_type,
794                    TaskTypeWithCompletionNotifier::Periodic { .. }
795                );
796
797                // For one-shot tasks, remove from DeferredMap index
798                // For periodic tasks, keep in index (will update location in reinsert)
799                // 对于一次性任务,从 DeferredMap 索引中移除
800                // 对于周期性任务,保留在索引中(重新插入时会更新位置)
801                if !is_periodic {
802                    self.task_index.remove(task_id.key());
803                }
804
805                // Update swapped element's index (the element that was moved to index 0)
806                // 更新被交换元素的索引(被移动到索引 0 的元素)
807                if !l0_slot.is_empty() {
808                    let swapped_task_id = l0_slot[0].get_id();
809                    // Use DeferredMap's get_mut with TaskId key
810                    // 使用 DeferredMap 的 get_mut,传入 TaskId key
811                    if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
812                        swapped_location.vec_index = 0;
813                    }
814                }
815
816                let TimerTaskForWheel { task_id, task, .. } = task_with_notifier;
817
818                match &task.task_type {
819                    TaskTypeWithCompletionNotifier::Periodic {
820                        completion_notifier,
821                        ..
822                    } => {
823                        let _ = completion_notifier.0.try_send(TaskCompletion::Called);
824
825                        periodic_tasks_to_reinsert.push((
826                            task_id,
827                            TimerTaskWithCompletionNotifier {
828                                task_type: task.task_type,
829                                delay: task.delay,
830                                callback: task.callback.clone(),
831                            },
832                        ));
833                    }
834                    TaskTypeWithCompletionNotifier::OneShot {
835                        completion_notifier,
836                    } => {
837                        completion_notifier.notify(crate::task::TaskCompletion::Called);
838                    }
839                }
840
841                expired_tasks.push(WheelAdvanceResult {
842                    id: task_id,
843                    callback: task.callback,
844                });
845            }
846        }
847
848        // Reinsert periodic tasks for next interval
849        // 重新插入周期性任务到下一个周期
850        for (task_id, task) in periodic_tasks_to_reinsert {
851            self.reinsert_periodic_task(task_id, task);
852        }
853
854        // Process L1 layer
855        // Check if L1 layer needs to be advanced
856        // 处理 L1 层
857        // 检查是否需要推进 L1 层
858        if self.l0.current_tick.is_multiple_of(self.l1_tick_ratio) {
859            self.l1.current_tick += 1;
860            let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
861            let l1_slot = &mut self.l1.slots[l1_slot_index];
862
863            // Collect L1 layer expired tasks
864            // 收集 L1 层过期任务
865            let mut tasks_to_demote = Vec::new();
866            let mut i = 0;
867            while i < l1_slot.len() {
868                let task = &mut l1_slot[i];
869
870                if task.rounds > 0 {
871                    // Still has rounds, decrease rounds and keep
872                    // 还有轮数,减少轮数并保留
873                    task.rounds -= 1;
874                    // Use DeferredMap's get_mut with TaskId key
875                    // 使用 DeferredMap 的 get_mut,传入 TaskId key
876                    if let Some(location) = self.task_index.get_mut(task.get_id().key()) {
877                        location.vec_index = i;
878                    }
879                    i += 1;
880                } else {
881                    // rounds = 0, need to demote to L0
882                    // Don't remove from task_index - will update location in demote_tasks
883                    // rounds = 0,需要降级到 L0
884                    // 不从 task_index 中移除 - 在 demote_tasks 中会更新位置
885                    let task_to_demote = l1_slot.swap_remove(i);
886
887                    if i < l1_slot.len() {
888                        let swapped_task_id = l1_slot[i].get_id();
889                        // Use DeferredMap's get_mut with TaskId key
890                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
891                        if let Some(swapped_location) =
892                            self.task_index.get_mut(swapped_task_id.key())
893                        {
894                            swapped_location.vec_index = i;
895                        }
896                    }
897
898                    tasks_to_demote.push(task_to_demote);
899                }
900            }
901
902            // Demote tasks to L0
903            // 将任务降级到 L0
904            self.demote_tasks(tasks_to_demote);
905        }
906
907        expired_tasks
908    }
909
910    /// Demote tasks from L1 to L0
911    ///
912    /// Recalculate and insert L1 expired tasks into L0 layer
913    ///
914    /// Updates task location in DeferredMap without removing/reinserting
915    ///
916    /// 将任务从 L1 降级到 L0
917    ///
918    /// 重新计算并将 L1 过期任务插入到 L0 层
919    ///
920    /// 更新 DeferredMap 中的任务位置而不删除/重新插入
921    fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
922        for task in tasks {
923            // Calculate the remaining delay of the task in L0 layer
924            // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
925            // 计算任务在 L0 层的剩余延迟
926            // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
927            let l1_tick_ratio = self.l1_tick_ratio;
928
929            // Calculate the original expiration time of the task (L1 tick)
930            // 计算任务的原始过期时间(L1 tick)
931            let l1_deadline = task.deadline_tick;
932
933            // Convert to L0 tick expiration time
934            // 转换为 L0 tick 过期时间
935            let l0_deadline_tick = l1_deadline * l1_tick_ratio;
936            let l0_current_tick = self.l0.current_tick;
937
938            // Calculate remaining L0 ticks
939            // 计算剩余 L0 ticks
940            let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
941                l0_deadline_tick - l0_current_tick
942            } else {
943                1 // At least trigger in next tick (至少在下一个 tick 触发)
944            };
945
946            // Calculate L0 slot index
947            // 计算 L0 槽索引
948            let target_l0_tick = l0_current_tick + remaining_l0_ticks;
949            let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
950
951            let task_id = task.get_id();
952            let vec_index = self.l0.slots[l0_slot_index].len();
953            let new_location = TaskLocation::new(0, l0_slot_index, vec_index);
954
955            // Insert into L0 layer
956            // 插入到 L0 层
957            self.l0.slots[l0_slot_index].push(task);
958
959            // Update task location in DeferredMap (task already exists in index)
960            // 更新 DeferredMap 中的任务位置(任务已在索引中)
961            if let Some(location) = self.task_index.get_mut(task_id.key()) {
962                *location = new_location;
963            }
964        }
965    }
966
967    /// Check if the timing wheel is empty
968    ///
969    /// 检查时间轮是否为空
970    #[allow(dead_code)]
971    pub fn is_empty(&self) -> bool {
972        self.task_index.is_empty()
973    }
974
975    /// Postpone timer task (keep original TaskId)
976    ///
977    /// # Parameters
978    /// - `task_id`: Task ID to postpone
979    /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
980    /// - `new_callback`: New callback function (if None, keep original callback)
981    ///
982    /// # Returns
983    /// Returns true if the task exists and is successfully postponed, otherwise returns false
984    ///
985    /// # Implementation Details
986    /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
987    /// - Update delay time and callback function (if provided)
988    /// - Recalculate target layer, slot, and rounds based on new_delay
989    /// - Cross-layer migration may occur (L0 <-> L1)
990    /// - Re-insert to new position using original TaskId
991    /// - Keep consistent with external held TaskId reference
992    ///
993    /// 延期定时器任务(保留原始 TaskId)
994    ///
995    /// # 参数
996    /// - `task_id`: 要延期的任务 ID
997    /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
998    /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
999    ///
1000    /// # 返回值
1001    /// 如果任务存在且成功延期则返回 true,否则返回 false
1002    ///
1003    /// # 实现细节
1004    /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
1005    /// - 更新延迟时间和回调函数(如果提供)
1006    /// - 根据 new_delay 重新计算目标层级、槽位和轮数
1007    /// - 可能发生跨层迁移(L0 <-> L1)
1008    /// - 使用原始 TaskId 重新插入到新位置
1009    /// - 与外部持有的 TaskId 引用保持一致
1010    #[inline]
1011    pub fn postpone(
1012        &mut self,
1013        task_id: TaskId,
1014        new_delay: Duration,
1015        new_callback: Option<crate::task::CallbackWrapper>,
1016    ) -> bool {
1017        // Step 1: Get task location from DeferredMap (don't remove)
1018        // 步骤 1: 从 DeferredMap 获取任务位置(不删除)
1019        let old_location = match self.task_index.get(task_id.key()) {
1020            Some(loc) => *loc,
1021            None => return false,
1022        };
1023
1024        // Use match to get slot reference
1025        // 使用 match 获取槽引用
1026        let slot = match old_location.level {
1027            0 => &mut self.l0.slots[old_location.slot_index],
1028            _ => &mut self.l1.slots[old_location.slot_index],
1029        };
1030
1031        // Verify task is still at expected position
1032        // 验证任务仍在预期位置
1033        if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id
1034        {
1035            // Index inconsistent, return failure
1036            // 索引不一致,返回失败
1037            return false;
1038        }
1039
1040        // Use swap_remove to remove task from slot
1041        // 使用 swap_remove 从槽中移除任务
1042        let mut task = slot.swap_remove(old_location.vec_index);
1043
1044        // Update swapped element's index (if a swap occurred)
1045        // 更新被交换元素的索引(如果发生了交换)
1046        if old_location.vec_index < slot.len() {
1047            let swapped_task_id = slot[old_location.vec_index].get_id();
1048            // Use DeferredMap's get_mut with TaskId key
1049            // 使用 DeferredMap 的 get_mut,传入 TaskId key
1050            if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
1051                swapped_location.vec_index = old_location.vec_index;
1052            }
1053        }
1054
1055        // Step 2: Update task's delay and callback
1056        // 步骤 2: 更新任务的延迟和回调
1057        task.update_delay(new_delay);
1058        if let Some(callback) = new_callback {
1059            task.update_callback(callback);
1060        }
1061
1062        // Step 3: Recalculate layer, slot, and rounds based on new delay
1063        // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
1064        let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
1065
1066        // Use match to reduce branches, and use cached slot mask
1067        // 使用 match 减少分支,并使用缓存的槽掩码
1068        let (current_tick, slot_mask, slots) = match new_level {
1069            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
1070            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
1071        };
1072
1073        let total_ticks = current_tick + ticks;
1074        let new_slot_index = (total_ticks as usize) & slot_mask;
1075
1076        // Update task's timing wheel parameters
1077        // 更新任务的时间轮参数
1078        task.deadline_tick = total_ticks;
1079        task.rounds = new_rounds;
1080
1081        // Step 4: Re-insert task to new layer/slot
1082        // 步骤 4: 将任务重新插入到新层级/槽位
1083        let new_vec_index = slots[new_slot_index].len();
1084        let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
1085
1086        slots[new_slot_index].push(task);
1087
1088        // Update task location in DeferredMap (task already exists in index)
1089        // 更新 DeferredMap 中的任务位置(任务已在索引中)
1090        if let Some(location) = self.task_index.get_mut(task_id.key()) {
1091            *location = new_location;
1092        }
1093
1094        true
1095    }
1096
1097    /// Batch postpone timer tasks
1098    ///
1099    /// # Parameters
1100    /// - `updates`: List of tuples of (task ID, new delay)
1101    ///
1102    /// # Returns
1103    /// Number of successfully postponed tasks
1104    ///
1105    /// # Performance Advantages
1106    /// - Batch processing reduces function call overhead
1107    /// - All delays are recalculated from current_tick at call time
1108    ///
1109    /// # Notes
1110    /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
1111    ///
1112    /// 批量延期定时器任务
1113    ///
1114    /// # 参数
1115    /// - `updates`: (任务 ID, 新延迟) 元组列表
1116    ///
1117    /// # 返回值
1118    /// 成功延期的任务数量
1119    ///
1120    /// # 性能优势
1121    /// - 批处理减少函数调用开销
1122    /// - 所有延迟在调用时从 current_tick 重新计算
1123    ///
1124    /// # 注意
1125    /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
1126    #[inline]
1127    pub fn postpone_batch(&mut self, updates: Vec<(TaskId, Duration)>) -> usize {
1128        let mut postponed_count = 0;
1129
1130        for (task_id, new_delay) in updates {
1131            if self.postpone(task_id, new_delay, None) {
1132                postponed_count += 1;
1133            }
1134        }
1135
1136        postponed_count
1137    }
1138
1139    /// Batch postpone timer tasks (replace callbacks)
1140    ///
1141    /// # Parameters
1142    /// - `updates`: List of tuples of (task ID, new delay, new callback)
1143    ///
1144    /// # Returns
1145    /// Number of successfully postponed tasks
1146    ///
1147    /// 批量延期定时器任务(替换回调)
1148    ///
1149    /// # 参数
1150    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1151    ///
1152    /// # 返回值
1153    /// 成功延期的任务数量
1154    pub fn postpone_batch_with_callbacks(
1155        &mut self,
1156        updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1157    ) -> usize {
1158        let mut postponed_count = 0;
1159
1160        for (task_id, new_delay, new_callback) in updates {
1161            if self.postpone(task_id, new_delay, new_callback) {
1162                postponed_count += 1;
1163            }
1164        }
1165
1166        postponed_count
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::*;
1173    use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1174
1175    #[test]
1176    fn test_wheel_creation() {
1177        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1178        assert_eq!(wheel.slot_count(), 512);
1179        assert_eq!(wheel.current_tick(), 0);
1180        assert!(wheel.is_empty());
1181    }
1182
1183    #[test]
1184    fn test_hierarchical_wheel_creation() {
1185        let config = WheelConfig::default();
1186
1187        let wheel = Wheel::new(config, BatchConfig::default());
1188        assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1189        assert_eq!(wheel.current_tick(), 0);
1190        assert!(wheel.is_empty());
1191        // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1192        assert_eq!(wheel.l1.slot_count, 64);
1193        assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1194    }
1195
1196    #[test]
1197    fn test_hierarchical_config_validation() {
1198        // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1199        let result = WheelConfig::builder()
1200            .l0_tick_duration(Duration::from_millis(10))
1201            .l0_slot_count(512)
1202            .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1203            .l1_slot_count(64)
1204            .build();
1205
1206        assert!(result.is_err());
1207
1208        // Correct configuration (正确的配置)
1209        let result = WheelConfig::builder()
1210            .l0_tick_duration(Duration::from_millis(10))
1211            .l0_slot_count(512)
1212            .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1213            .l1_slot_count(64)
1214            .build();
1215
1216        assert!(result.is_ok());
1217    }
1218
1219    #[test]
1220    fn test_layer_determination() {
1221        let config = WheelConfig::default();
1222
1223        let wheel = Wheel::new(config, BatchConfig::default());
1224
1225        // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1226        // L0: 512 slots * 10ms = 5120ms
1227        let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1228        assert_eq!(level, 0);
1229        assert_eq!(rounds, 0);
1230
1231        // Long delay should enter L1 layer
1232        // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1233        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1234        assert_eq!(level, 1);
1235        assert_eq!(rounds, 0);
1236
1237        // Long delay should enter L1 layer and have rounds
1238        // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1239        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1240        assert_eq!(level, 1);
1241        assert!(rounds > 0);
1242    }
1243
1244    #[test]
1245    fn test_hierarchical_insert_and_advance() {
1246        let config = WheelConfig::default();
1247
1248        let mut wheel = Wheel::new(config, BatchConfig::default());
1249
1250        // Insert short delay task into L0 (插入短延迟任务到 L0)
1251        let callback = CallbackWrapper::new(|| async {});
1252        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1253        let (task_with_notifier, _completion_rx) =
1254            TimerTaskWithCompletionNotifier::from_timer_task(task);
1255        let handle = wheel.allocate_handle();
1256        let task_id = handle.task_id();
1257        wheel.insert(handle, task_with_notifier);
1258
1259        // Verify task is in L0 layer (验证任务在 L0 层)
1260        let location = wheel.task_index.get(task_id.key()).unwrap();
1261        assert_eq!(location.level, 0);
1262
1263        // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1264        for _ in 0..10 {
1265            let expired = wheel.advance();
1266            if !expired.is_empty() {
1267                assert_eq!(expired.len(), 1);
1268                assert_eq!(expired[0].id, task_id);
1269                return;
1270            }
1271        }
1272        panic!("Task should have expired");
1273    }
1274
1275    #[test]
1276    fn test_cross_layer_cancel() {
1277        let config = WheelConfig::default();
1278
1279        let mut wheel = Wheel::new(config, BatchConfig::default());
1280
1281        // Insert L0 task (插入 L0 任务)
1282        let callback1 = CallbackWrapper::new(|| async {});
1283        let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1284        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1285        let handle1 = wheel.allocate_handle();
1286        let task_id1 = handle1.task_id();
1287        wheel.insert(handle1, task_with_notifier1);
1288
1289        // Insert L1 task (插入 L1 任务)
1290        let callback2 = CallbackWrapper::new(|| async {});
1291        let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1292        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1293        let handle2 = wheel.allocate_handle();
1294        let task_id2 = handle2.task_id();
1295        wheel.insert(handle2, task_with_notifier2);
1296
1297        // Verify levels (验证层级)
1298        assert_eq!(wheel.task_index.get(task_id1.key()).unwrap().level, 0);
1299        assert_eq!(wheel.task_index.get(task_id2.key()).unwrap().level, 1);
1300
1301        // Cancel L0 task (取消 L0 任务)
1302        assert!(wheel.cancel(task_id1));
1303        assert!(wheel.task_index.get(task_id1.key()).is_none());
1304
1305        // Cancel L1 task (取消 L1 任务)
1306        assert!(wheel.cancel(task_id2));
1307        assert!(wheel.task_index.get(task_id2.key()).is_none());
1308
1309        assert!(wheel.is_empty()); // 时间轮应该为空
1310    }
1311
1312    #[test]
1313    fn test_delay_to_ticks() {
1314        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1315        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1316        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1317        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1318    }
1319
1320    #[test]
1321    fn test_wheel_invalid_slot_count() {
1322        let result = WheelConfig::builder().l0_slot_count(100).build();
1323        assert!(result.is_err());
1324        if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1325            assert_eq!(slot_count, 100);
1326            assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1327        } else {
1328            panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1329        }
1330    }
1331
1332    #[test]
1333    fn test_minimum_delay() {
1334        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1335
1336        // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1337        let callback = CallbackWrapper::new(|| async {});
1338        let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1339        let (task_with_notifier, _completion_rx) =
1340            TimerTaskWithCompletionNotifier::from_timer_task(task);
1341        let handle = wheel.allocate_handle();
1342        let task_id: TaskId = handle.task_id();
1343        wheel.insert(handle, task_with_notifier);
1344
1345        // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1346        let expired = wheel.advance();
1347        assert_eq!(
1348            expired.len(),
1349            1,
1350            "Minimum delay task should be triggered after 1 tick"
1351        ); // 最小延迟任务应该在 1 个 tick 后触发
1352        assert_eq!(expired[0].id, task_id);
1353    }
1354
1355    #[test]
1356    fn test_advance_empty_slots() {
1357        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1358
1359        // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1360        for _ in 0..100 {
1361            let expired = wheel.advance();
1362            assert!(
1363                expired.is_empty(),
1364                "Empty slots should not return any tasks"
1365            );
1366        }
1367
1368        assert_eq!(
1369            wheel.current_tick(),
1370            100,
1371            "current_tick should correctly increment"
1372        ); // current_tick 应该正确递增
1373    }
1374
1375    #[test]
1376    fn test_slot_boundary() {
1377        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1378
1379        // Test slot boundary and wraparound (测试槽边界和环绕)
1380        // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1381        let callback1 = CallbackWrapper::new(|| async {});
1382        let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1383        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1384        let handle1 = wheel.allocate_handle();
1385        let task_id_1 = handle1.task_id();
1386        wheel.insert(handle1, task_with_notifier1);
1387
1388        // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发)
1389        let callback2 = CallbackWrapper::new(|| async {});
1390        let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1391        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1392        let handle2 = wheel.allocate_handle();
1393        let task_id_2 = handle2.task_id();
1394        wheel.insert(handle2, task_with_notifier2);
1395
1396        // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1397        let expired = wheel.advance();
1398        assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1399        assert_eq!(expired[0].id, task_id_1);
1400
1401        // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1402        let mut triggered = false;
1403        for i in 0..510 {
1404            let expired = wheel.advance();
1405            if !expired.is_empty() {
1406                assert_eq!(
1407                    expired.len(),
1408                    1,
1409                    "The {}th advance should trigger the second task",
1410                    i + 2
1411                ); // 第 {i + 2} 次前进应该触发第二个任务
1412                assert_eq!(expired[0].id, task_id_2);
1413                triggered = true;
1414                break;
1415            }
1416        }
1417        assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1418
1419        assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1420    }
1421
1422    #[test]
1423    fn test_task_id_uniqueness() {
1424        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1425
1426        // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1427        let mut task_ids = std::collections::HashSet::new();
1428        for _ in 0..100 {
1429            let callback = CallbackWrapper::new(|| async {});
1430            let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1431            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1432            let handle = wheel.allocate_handle();
1433            let task_id = handle.task_id();
1434            wheel.insert(handle, task_with_notifier);
1435
1436            assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1437        }
1438
1439        assert_eq!(task_ids.len(), 100);
1440    }
1441}