kestrel_timer/
wheel.rs

1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{TaskCompletion, TaskHandle, TaskId, TaskLocation, TaskTypeWithCompletionNotifier, TimerTaskForWheel, TimerTaskWithCompletionNotifier};
4use deferred_map::DeferredMap;
5use std::time::Duration;
6
7pub struct WheelAdvanceResult {
8    pub id: TaskId,
9    pub callback: Option<CallbackWrapper>
10}
11
12/// Timing wheel single layer data structure
13/// 
14/// 时间轮单层数据结构
15struct WheelLayer {
16    /// Slot array, each slot stores a group of timer tasks
17    /// 
18    /// 槽数组,每个槽存储一组定时器任务
19    slots: Vec<Vec<TimerTaskForWheel>>,
20    
21    /// Current time pointer (tick index)
22    /// 
23    /// 当前时间指针(tick 索引)
24    current_tick: u64,
25    
26    /// Slot count
27    /// 
28    /// 槽数量
29    slot_count: usize,
30    
31    /// Duration of each tick
32    /// 
33    /// 每个 tick 的持续时间
34    tick_duration: Duration,
35    
36    /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
37    /// 
38    /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
39    tick_duration_ms: u64,
40    
41    /// Cache: slot mask (slot_count - 1) - for fast modulo operation
42    /// 
43    /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
44    slot_mask: usize,
45}
46
47impl WheelLayer {
48    /// Create a new wheel layer
49    /// 
50    /// 创建一个新的时间轮层
51    fn new(slot_count: usize, tick_duration: Duration) -> Self {
52        let mut slots = Vec::with_capacity(slot_count);
53        // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
54        // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
55        // 
56        // 为每个槽预分配容量以减少后续 push 时的重新分配
57        // 大多数槽通常包含 0-4 个任务,预分配容量为 4
58        for _ in 0..slot_count {
59            slots.push(Vec::with_capacity(4));
60        }
61        
62        let tick_duration_ms = tick_duration.as_millis() as u64;
63        let slot_mask = slot_count - 1;
64        
65        Self {
66            slots,
67            current_tick: 0,
68            slot_count,
69            tick_duration,
70            tick_duration_ms,
71            slot_mask,
72        }
73    }
74    
75    /// Calculate the number of ticks corresponding to the delay
76    /// 
77    /// 计算延迟对应的 tick 数量
78    fn delay_to_ticks(&self, delay: Duration) -> u64 {
79        let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
80        ticks.max(1) // at least 1 tick (至少 1 个 tick)
81    }
82}
83
84/// Timing wheel data structure (hierarchical mode)
85/// 
86/// Now uses DeferredMap for O(1) task lookup and generational safety.
87/// 
88/// 时间轮数据结构(分层模式)
89/// 
90/// 现在使用 DeferredMap 实现 O(1) 任务查找和代数安全
91pub struct Wheel {
92    /// L0 layer (bottom layer)
93    /// 
94    /// L0 层(底层)
95    l0: WheelLayer,
96    
97    /// L1 layer (top layer)
98    /// 
99    /// L1 层(顶层)
100    l1: WheelLayer,
101    
102    /// L1 tick ratio relative to L0 tick
103    /// 
104    /// L1 tick 相对于 L0 tick 的比率
105    pub(crate) l1_tick_ratio: u64,
106    
107    /// Task index for fast lookup and cancellation using DeferredMap
108    /// 
109    /// Keys are TaskIds (u64 from DeferredMap), values are TaskLocations
110    /// 
111    /// 任务索引,使用 DeferredMap 实现快速查找和取消
112    /// 
113    /// 键是 TaskId(来自 DeferredMap 的 u64),值是 TaskLocation
114    pub(crate) task_index: DeferredMap<TaskLocation>,
115    
116    /// Batch processing configuration
117    /// 
118    /// 批处理配置
119    batch_config: BatchConfig,
120    
121    /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
122    /// 
123    /// 缓存:L0 层容量(毫秒)- 避免重复计算
124    l0_capacity_ms: u64,
125    
126    /// Cache: L1 layer capacity in ticks - avoid repeated calculation
127    /// 
128    /// 缓存:L1 层容量(tick 数)- 避免重复计算
129    l1_capacity_ticks: u64,
130}
131
132impl Wheel {
133    /// Create new timing wheel
134    ///
135    /// # Parameters
136    /// - `config`: Timing wheel configuration (already validated)
137    /// - `batch_config`: Batch processing configuration
138    ///
139    /// # Notes
140    /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
141    /// Now uses DeferredMap for task indexing with generational safety.
142    /// 
143    /// 创建新的时间轮
144    ///
145    /// # 参数
146    /// - `config`: 时间轮配置(已验证)
147    /// - `batch_config`: 批处理配置
148    ///
149    /// # 注意
150    /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
151    /// 现在使用 DeferredMap 进行任务索引,具有代数安全特性。
152    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
153        let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
154        let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
155        
156        // Calculate L1 tick ratio relative to L0 tick
157        // 计算 L1 tick 相对于 L0 tick 的比率
158        let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
159        
160        // Pre-calculate capacity to avoid repeated calculation in insert
161        // 预计算容量,避免在 insert 中重复计算
162        let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
163        let l1_capacity_ticks = l1.slot_count as u64;
164        
165        // Estimate initial capacity for DeferredMap based on L0 slot count
166        // 根据 L0 槽数量估算 DeferredMap 的初始容量
167        let estimated_capacity = (l0.slot_count / 4).max(64);
168        
169        Self {
170            l0,
171            l1,
172            l1_tick_ratio,
173            task_index: DeferredMap::with_capacity(estimated_capacity),
174            batch_config,
175            l0_capacity_ms,
176            l1_capacity_ticks,
177        }
178    }
179
180    /// Get current tick (L0 layer tick)
181    /// 
182    /// 获取当前 tick(L0 层 tick)
183    #[allow(dead_code)]
184    pub fn current_tick(&self) -> u64 {
185        self.l0.current_tick
186    }
187
188    /// Get tick duration (L0 layer tick duration)
189    /// 
190    /// 获取 tick 持续时间(L0 层 tick 持续时间)
191    #[allow(dead_code)]
192    pub fn tick_duration(&self) -> Duration {
193        self.l0.tick_duration
194    }
195
196    /// Get slot count (L0 layer slot count)
197    /// 
198    /// 获取槽数量(L0 层槽数量)
199    #[allow(dead_code)]
200    pub fn slot_count(&self) -> usize {
201        self.l0.slot_count
202    }
203
204    /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
205    /// 
206    /// 计算延迟对应的 tick 数量(基于 L0 层)
207    #[allow(dead_code)]
208    pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
209        self.l0.delay_to_ticks(delay)
210    }
211    
212    /// Determine which layer the delay should be inserted into
213    ///
214    /// # Returns
215    /// Returns: (layer, ticks, rounds)
216    /// - Layer: 0 = L0, 1 = L1
217    /// - Ticks: number of ticks calculated from current tick
218    /// - Rounds: number of rounds (only used for very long delays in L1 layer)
219    /// 
220    /// 确定延迟应该插入到哪一层
221    ///
222    /// # 返回值
223    /// 返回:(层级, ticks, 轮数)
224    /// - 层级:0 = L0, 1 = L1
225    /// - Ticks:从当前 tick 计算的 tick 数量
226    /// - 轮数:轮数(仅用于 L1 层的超长延迟)
227    #[inline(always)]
228    fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
229        let delay_ms = delay.as_millis() as u64;
230        
231        // Fast path: most tasks are within L0 range (using cached capacity)
232        // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
233        if delay_ms < self.l0_capacity_ms {
234            let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
235            return (0, l0_ticks, 0);
236        }
237        
238        // Slow path: L1 layer tasks (using cached values)
239        // 慢速路径:L1 层任务(使用缓存的值)
240        let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
241        
242        if l1_ticks < self.l1_capacity_ticks {
243            (1, l1_ticks, 0)
244        } else {
245            let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
246            (1, l1_ticks, rounds)
247        }
248    }
249
250    /// Allocate a handle from DeferredMap
251    /// 
252    /// 从 DeferredMap 分配一个 handle
253    /// 
254    /// # Returns
255    /// A unique handle for later insertion
256    /// 
257    /// # 返回值
258    /// 用于后续插入的唯一 handle
259    pub fn allocate_handle(&mut self) -> TaskHandle {
260        TaskHandle::new(self.task_index.allocate_handle())
261    }
262
263    /// Batch allocate handles from DeferredMap
264    /// 
265    /// 从 DeferredMap 批量分配 handles
266    /// 
267    /// # Parameters
268    /// - `count`: Number of handles to allocate
269    /// 
270    /// # Returns
271    /// Vector of unique handles for later batch insertion
272    /// 
273    /// # 参数
274    /// - `count`: 要分配的 handle 数量
275    /// 
276    /// # 返回值
277    /// 用于后续批量插入的唯一 handles 向量
278    pub fn allocate_handles(&mut self, count: usize) -> Vec<TaskHandle> {
279        let mut handles = Vec::with_capacity(count);
280        for _ in 0..count {
281            handles.push(TaskHandle::new(self.task_index.allocate_handle()));
282        }
283        handles
284    }
285
286    /// Insert timer task
287    ///
288    /// # Parameters
289    /// - `handle`: Handle for the task
290    /// - `task`: Timer task with completion notifier
291    ///
292    /// # Returns
293    /// Unique identifier of the task (TaskId) - now generated from DeferredMap
294    ///
295    /// # Implementation Details
296    /// - Allocates Handle from DeferredMap to generate TaskId with generational safety
297    /// - Automatically calculate the layer and slot where the task should be inserted
298    /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
299    /// - Use bit operations to optimize slot index calculation
300    /// - Use DeferredMap for O(1) lookup and cancellation with generation checking
301    /// 
302    /// 插入定时器任务
303    ///
304    /// # 参数
305    /// - `handle`: 任务的 handle
306    /// - `task`: 带完成通知器的定时器任务
307    ///
308    /// # 返回值
309    /// 任务的唯一标识符(TaskId)- 现在从 DeferredMap 生成
310    ///
311    /// # 实现细节
312    /// - 自动计算任务应该插入的层级和槽位
313    /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
314    /// - 使用位运算优化槽索引计算
315    /// - 使用 DeferredMap 实现 O(1) 查找和取消,带代数检查
316    #[inline]
317    pub fn insert(&mut self, handle: TaskHandle, task: TimerTaskWithCompletionNotifier) {
318        let task_id = handle.task_id();
319        
320        let (level, ticks, rounds) = self.determine_layer(task.delay);
321        
322        // Use match to reduce branches, and use cached slot mask
323        // 使用 match 减少分支,并使用缓存的槽掩码
324        let (current_tick, slot_mask, slots) = match level {
325            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
326            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
327        };
328        
329        let total_ticks = current_tick + ticks;
330        let slot_index = (total_ticks as usize) & slot_mask;
331
332        // Create task with the assigned TaskId
333        // 使用分配的 TaskId 创建任务
334        let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
335        
336        // Get the index position of the task in Vec (the length before insertion is the index of the new task)
337        // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
338        let vec_index = slots[slot_index].len();
339        let location = TaskLocation::new(level, slot_index, vec_index);
340
341        // Insert task into slot
342        // 将任务插入槽中
343        slots[slot_index].push(task);
344        
345        // Insert task location into DeferredMap using handle
346        // In non-debug mode, it is guaranteed that this method will not panic.
347        // 使用 handle 将任务位置插入 DeferredMap
348        // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
349        self.task_index.insert(handle.into_handle(), location)
350            .expect("Handle should be valid for insertion");
351    }
352
353    /// Batch insert timer tasks
354    ///
355    /// # Parameters
356    /// - `handles`: List of pre-allocated handles for tasks
357    /// - `tasks`: List of timer tasks with completion notifiers
358    ///
359    /// # Returns
360    /// - `Ok(())` if all tasks are successfully inserted
361    /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
362    ///
363    /// # Performance Advantages
364    /// - Reduce repeated boundary checks and capacity adjustments
365    /// - For tasks with the same delay, calculation results can be reused
366    /// - Uses DeferredMap for efficient task indexing with generational safety
367    /// 
368    /// 批量插入定时器任务
369    ///
370    /// # 参数
371    /// - `handles`: 任务的预分配 handles 列表
372    /// - `tasks`: 带完成通知器的定时器任务列表
373    ///
374    /// # 返回值
375    /// - `Ok(())` 如果所有任务成功插入
376    /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
377    ///
378    /// # 性能优势
379    /// - 减少重复的边界检查和容量调整
380    /// - 对于相同延迟的任务,可以重用计算结果
381    /// - 使用 DeferredMap 实现高效的任务索引,具有代数安全特性
382    #[inline]
383    pub fn insert_batch(
384        &mut self, 
385        handles: Vec<TaskHandle>, 
386        tasks: Vec<TimerTaskWithCompletionNotifier>
387    ) -> Result<(), crate::error::TimerError> {
388        // Validate that handles and tasks have the same length
389        // 验证 handles 和 tasks 长度相同
390        if handles.len() != tasks.len() {
391            return Err(crate::error::TimerError::BatchLengthMismatch {
392                handles_len: handles.len(),
393                tasks_len: tasks.len(),
394            });
395        }
396        
397        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
398            let task_id = handle.task_id();
399            
400            let (level, ticks, rounds) = self.determine_layer(task.delay);
401            
402            // Use match to reduce branches, and use cached slot mask
403            // 使用 match 减少分支,并使用缓存的槽掩码
404            let (current_tick, slot_mask, slots) = match level {
405                0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
406                _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
407            };
408            
409            let total_ticks = current_tick + ticks;
410            let slot_index = (total_ticks as usize) & slot_mask;
411
412            // Create task with the assigned TaskId
413            // 使用分配的 TaskId 创建任务
414            let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
415            
416            // Get the index position of the task in Vec
417            // 获取任务在 Vec 中的索引位置
418            let vec_index = slots[slot_index].len();
419            let location = TaskLocation::new(level, slot_index, vec_index);
420
421            // Insert task into slot
422            // 将任务插入槽中
423            slots[slot_index].push(task);
424            
425            // Insert task location into DeferredMap using handle
426            // In non-debug mode, it is guaranteed that this method will not panic.
427            // 使用 handle 将任务位置插入 DeferredMap
428            // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
429            self.task_index.insert(handle.into_handle(), location)
430                .expect("Handle should be valid for insertion");
431        }
432        
433        Ok(())
434    }
435
436    /// Cancel timer task
437    ///
438    /// # Parameters
439    /// - `task_id`: Task ID
440    ///
441    /// # Returns
442    /// Returns true if the task exists and is successfully cancelled, otherwise returns false
443    /// 
444    /// Now uses DeferredMap for safe task removal with generational checking
445    /// 
446    /// 取消定时器任务
447    ///
448    /// # 参数
449    /// - `task_id`: 任务 ID
450    ///
451    /// # 返回值
452    /// 如果任务存在且成功取消则返回 true,否则返回 false
453    /// 
454    /// 现在使用 DeferredMap 实现安全的任务移除,带代数检查
455    #[inline]
456    pub fn cancel(&mut self, task_id: TaskId) -> bool {
457        // Remove task location from DeferredMap using TaskId as key
458        // 使用 TaskId 作为 key 从 DeferredMap 中移除任务位置
459        let location = match self.task_index.remove(task_id.key()) {
460            Some(loc) => loc,
461            None => return false, // Task not found or already removed (generation mismatch)
462        };
463        
464        // Use match to get slot reference, reduce branches
465        // 使用 match 获取槽引用,减少分支
466        let slot = match location.level {
467            0 => &mut self.l0.slots[location.slot_index],
468            _ => &mut self.l1.slots[location.slot_index],
469        };
470        
471        // Boundary check and ID verification
472        // 边界检查和 ID 验证
473        if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
474            // Index inconsistent - this shouldn't happen with DeferredMap, but handle it anyway
475            // 索引不一致 - DeferredMap 不应该出现这种情况,但还是处理一下
476            return false;
477        }
478        
479        // Use swap_remove to remove task, record swapped task ID
480        // 使用 swap_remove 移除任务,记录被交换的任务 ID
481        let removed_task = slot.swap_remove(location.vec_index);
482        
483        // Ensure the correct task was removed
484        // 确保移除了正确的任务
485        debug_assert_eq!(removed_task.get_id(), task_id);
486
487        match removed_task.into_task_type() {
488            TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
489                // Use Notify + AtomicU8 for zero-allocation notification
490                // 使用 Notify + AtomicU8 实现零分配通知
491                completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
492            }
493            TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
494                // Use flume for high-performance periodic notification
495                // 使用 flume 实现高性能周期通知
496                let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
497            }
498        }
499        
500        // If a swap occurred (vec_index is not the last element)
501        // 如果发生了交换(vec_index 不是最后一个元素)
502        if location.vec_index < slot.len() {
503            let swapped_task_id = slot[location.vec_index].get_id();
504            // Update swapped element's index in one go, using DeferredMap's get_mut
505            // 一次性更新被交换元素的索引,使用 DeferredMap 的 get_mut
506            if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
507                swapped_location.vec_index = location.vec_index;
508            }
509        }
510        
511        true
512    }
513
514    /// Batch cancel timer tasks
515    ///
516    /// # Parameters
517    /// - `task_ids`: List of task IDs to cancel
518    ///
519    /// # Returns
520    /// Number of successfully cancelled tasks
521    ///
522    /// # Performance Advantages
523    /// - Reduce repeated HashMap lookup overhead
524    /// - Multiple cancellation operations on the same slot can be batch processed
525    /// - Use unstable sort to improve performance
526    /// - Small batch optimization: skip sorting based on configuration threshold, process directly
527    /// 
528    /// 批量取消定时器任务
529    ///
530    /// # 参数
531    /// - `task_ids`: 要取消的任务 ID 列表
532    ///
533    /// # 返回值
534    /// 成功取消的任务数量
535    ///
536    /// # 性能优势
537    /// - 减少重复的 HashMap 查找开销
538    /// - 同一槽位的多个取消操作可以批量处理
539    /// - 使用不稳定排序提高性能
540    /// - 小批量优化:根据配置阈值跳过排序,直接处理
541    #[inline]
542    pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
543        let mut cancelled_count = 0;
544        
545        // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
546        // 小批量优化:逐个取消以避免分组和排序开销
547        if task_ids.len() <= self.batch_config.small_batch_threshold {
548            for &task_id in task_ids {
549                if self.cancel(task_id) {
550                    cancelled_count += 1;
551                }
552            }
553            return cancelled_count;
554        }
555        
556        // Group by layer and slot to optimize batch cancellation
557        // Use SmallVec to avoid heap allocation in most cases
558        // 按层级和槽位分组以优化批量取消
559        // 使用 SmallVec 避免在大多数情况下进行堆分配
560        let l0_slot_count = self.l0.slot_count;
561        let l1_slot_count = self.l1.slot_count;
562        
563        let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
564        let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
565        
566        // Collect information of tasks to be cancelled
567        // 收集要取消的任务信息
568        for &task_id in task_ids {
569            // Use DeferredMap's get with TaskId key
570            // 使用 DeferredMap 的 get,传入 TaskId key
571            if let Some(location) = self.task_index.get(task_id.key()) {
572                if location.level == 0 {
573                    l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
574                } else {
575                    l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
576                }
577            }
578        }
579        
580        // Process L0 layer cancellation
581        // 处理 L0 层取消
582        for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
583            if tasks.is_empty() {
584                continue;
585            }
586            
587            // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
588            // 按 vec_index 降序排序,从后向前删除以避免索引失效
589            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
590            
591            let slot = &mut self.l0.slots[slot_index];
592            
593            for &(task_id, vec_index) in tasks.iter() {
594                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
595                    let removed_task = slot.swap_remove(vec_index);
596                    match removed_task.into_task_type() {
597                        TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
598                            // Use Notify + AtomicU8 for zero-allocation notification
599                            // 使用 Notify + AtomicU8 实现零分配通知
600                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
601                        }
602                        TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
603                            // Use flume for high-performance periodic notification
604                            // 使用 flume 实现高性能周期通知
605                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
606                        }
607                    }
608                    
609                    
610                    if vec_index < slot.len() {
611                        let swapped_task_id = slot[vec_index].get_id();
612                        // Use DeferredMap's get_mut with TaskId key
613                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
614                        if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
615                            swapped_location.vec_index = vec_index;
616                        }
617                    }
618                    
619                    // Use DeferredMap's remove with TaskId key
620                    // 使用 DeferredMap 的 remove,传入 TaskId key
621                    self.task_index.remove(task_id.key());
622                    cancelled_count += 1;
623                }
624            }
625        }
626        
627        // Process L1 layer cancellation
628        // 处理 L1 层取消
629        for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
630            if tasks.is_empty() {
631                continue;
632            }
633            
634            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
635            
636            let slot = &mut self.l1.slots[slot_index];
637            
638            for &(task_id, vec_index) in tasks.iter() {
639                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
640            
641                    let removed_task = slot.swap_remove(vec_index);
642                    
643                    match removed_task.into_task_type() {
644                        TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
645                            // Use Notify + AtomicU8 for zero-allocation notification
646                            // 使用 Notify + AtomicU8 实现零分配通知
647                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
648                        }
649                        TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
650                            // Use flume for high-performance periodic notification
651                            // 使用 flume 实现高性能周期通知
652                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
653                        }
654                    }
655                    
656                    if vec_index < slot.len() {
657                        let swapped_task_id = slot[vec_index].get_id();
658                        // Use DeferredMap's get_mut with TaskId key
659                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
660                        if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
661                            swapped_location.vec_index = vec_index;
662                        }
663                    }
664                    
665                    // Use DeferredMap's remove with TaskId key
666                    // 使用 DeferredMap 的 remove,传入 TaskId key
667                    self.task_index.remove(task_id.key());
668                    cancelled_count += 1;
669                }
670            }
671        }
672        
673        cancelled_count
674    }
675
676    /// Reinsert periodic task with the same TaskId
677    /// 
678    /// Used to automatically reschedule periodic tasks after they expire
679    /// 
680    /// TaskId remains unchanged, only location is updated in DeferredMap
681    /// 
682    /// 重新插入周期性任务(保持相同的 TaskId)
683    /// 
684    /// 用于在周期性任务过期后自动重新调度
685    /// 
686    /// TaskId 保持不变,只更新 DeferredMap 中的位置
687    fn reinsert_periodic_task(
688        &mut self,
689        task_id: TaskId,
690        task: TimerTaskWithCompletionNotifier,
691    ) {
692        // Determine which layer the interval should be inserted into
693        // This method is only called by periodic tasks, so the interval is guaranteed to be Some.
694        // 确定间隔应该插入到哪一层
695        // 该方法只能由周期性任务调用,所以间隔是 guaranteed 应当保证为 Some.
696        let (level, ticks, rounds) = self.determine_layer(task.get_interval().unwrap());
697        
698        // Use match to reduce branches, and use cached slot mask
699        // 使用 match 减少分支,并使用缓存的槽掩码
700        let (current_tick, slot_mask, slots) = match level {
701            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
702            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
703        };
704        
705        let total_ticks = current_tick + ticks;
706        let slot_index = (total_ticks as usize) & slot_mask;
707
708        // Get the index position of the task in Vec
709        // 获取任务在 Vec 中的索引位置
710        let vec_index = slots[slot_index].len();
711        let new_location = TaskLocation::new(level, slot_index, vec_index);
712        
713        // Insert task into slot
714        // 将任务插入槽中
715        slots[slot_index].push(TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds));
716        
717        // Update task location in DeferredMap (doesn't remove, just updates)
718        // 更新 DeferredMap 中的任务位置(不删除,仅更新)
719        if let Some(location) = self.task_index.get_mut(task_id.key()) {
720            *location = new_location;
721        }
722    }
723
724    /// Advance the timing wheel by one tick, return all expired tasks
725    ///
726    /// # Returns
727    /// List of expired tasks
728    ///
729    /// # Implementation Details
730    /// - L0 layer advances 1 tick each time (no rounds check)
731    /// - L1 layer advances once every (L1_tick / L0_tick) times
732    /// - L1 expired tasks are batch demoted to L0
733    /// 
734    /// 推进时间轮一个 tick,返回所有过期的任务
735    ///
736    /// # 返回值
737    /// 过期任务列表
738    ///
739    /// # 实现细节
740    /// - L0 层每次推进 1 个 tick(无轮数检查)
741    /// - L1 层每 (L1_tick / L0_tick) 次推进一次
742    /// - L1 层过期任务批量降级到 L0
743    pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
744        // Advance L0 layer
745        // 推进 L0 层
746        self.l0.current_tick += 1;
747        
748        let mut expired_tasks = Vec::new();
749        
750        // Process L0 layer expired tasks
751        // Distinguish between one-shot and periodic tasks
752        // 处理 L0 层过期任务
753        // 区分一次性任务和周期性任务
754        let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
755        
756        // Collect periodic tasks to reinsert
757        // 收集需要重新插入的周期性任务
758        let mut periodic_tasks_to_reinsert = Vec::new();
759        
760        {
761            let l0_slot = &mut self.l0.slots[l0_slot_index];
762            
763            // Process all tasks in the current slot by always removing from index 0
764            // This avoids index tracking issues with swap_remove
765            // 通过始终从索引 0 移除来处理当前槽中的所有任务
766            // 这避免了 swap_remove 的索引跟踪问题
767            while !l0_slot.is_empty() {
768                // Always remove from index 0
769                // 始终从索引 0 移除
770                let task_with_notifier = l0_slot.swap_remove(0);
771                let task_id = task_with_notifier.get_id();
772                
773                // Determine if this is a periodic task (check before consuming)
774                // 判断是否为周期性任务(在消耗前检查)
775                let is_periodic = matches!(task_with_notifier.task.task_type, 
776                    TaskTypeWithCompletionNotifier::Periodic { .. });
777                
778                // For one-shot tasks, remove from DeferredMap index
779                // For periodic tasks, keep in index (will update location in reinsert)
780                // 对于一次性任务,从 DeferredMap 索引中移除
781                // 对于周期性任务,保留在索引中(重新插入时会更新位置)
782                if !is_periodic {
783                    self.task_index.remove(task_id.key());
784                }
785                
786                // Update swapped element's index (the element that was moved to index 0)
787                // 更新被交换元素的索引(被移动到索引 0 的元素)
788                if !l0_slot.is_empty() {
789                    let swapped_task_id = l0_slot[0].get_id();
790                    // Use DeferredMap's get_mut with TaskId key
791                    // 使用 DeferredMap 的 get_mut,传入 TaskId key
792                    if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
793                        swapped_location.vec_index = 0;
794                    }
795                }
796
797                let TimerTaskForWheel { task_id, task, .. } = task_with_notifier;
798
799                match &task.task_type {
800                    TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
801                        let _ = completion_notifier.0.try_send(TaskCompletion::Called);
802                        
803                        periodic_tasks_to_reinsert.push((task_id, TimerTaskWithCompletionNotifier {
804                            task_type: task.task_type,
805                            delay: task.delay,
806                            callback: task.callback.clone(), 
807                        }));
808                    }
809                    TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
810                        completion_notifier.notify(crate::task::TaskCompletion::Called);
811                    }
812                }
813
814                expired_tasks.push(WheelAdvanceResult {
815                    id: task_id,
816                    callback: task.callback,
817                });
818            }
819        }
820        
821        // Reinsert periodic tasks for next interval
822        // 重新插入周期性任务到下一个周期
823        for (task_id, task) in periodic_tasks_to_reinsert {
824            self.reinsert_periodic_task(task_id, task);
825        }
826        
827        // Process L1 layer
828        // Check if L1 layer needs to be advanced
829        // 处理 L1 层
830        // 检查是否需要推进 L1 层
831        if self.l0.current_tick.is_multiple_of(self.l1_tick_ratio) {
832            self.l1.current_tick += 1;
833            let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
834            let l1_slot = &mut self.l1.slots[l1_slot_index];
835            
836            // Collect L1 layer expired tasks
837            // 收集 L1 层过期任务
838            let mut tasks_to_demote = Vec::new();
839            let mut i = 0;
840            while i < l1_slot.len() {
841                let task = &mut l1_slot[i];
842                
843                if task.rounds > 0 {
844                    // Still has rounds, decrease rounds and keep
845                    // 还有轮数,减少轮数并保留
846                    task.rounds -= 1;
847                    // Use DeferredMap's get_mut with TaskId key
848                    // 使用 DeferredMap 的 get_mut,传入 TaskId key
849                    if let Some(location) = self.task_index.get_mut(task.get_id().key()) {
850                        location.vec_index = i;
851                    }
852                    i += 1;
853                } else {
854                    // rounds = 0, need to demote to L0
855                    // Don't remove from task_index - will update location in demote_tasks
856                    // rounds = 0,需要降级到 L0
857                    // 不从 task_index 中移除 - 在 demote_tasks 中会更新位置
858                    let task_to_demote = l1_slot.swap_remove(i);
859                    
860                    if i < l1_slot.len() {
861                        let swapped_task_id = l1_slot[i].get_id();
862                        // Use DeferredMap's get_mut with TaskId key
863                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
864                        if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
865                            swapped_location.vec_index = i;
866                        }
867                    }
868                    
869                    tasks_to_demote.push(task_to_demote);
870                }
871            }
872            
873            // Demote tasks to L0
874            // 将任务降级到 L0
875            self.demote_tasks(tasks_to_demote);
876        }
877        
878        expired_tasks
879    }
880    
881    /// Demote tasks from L1 to L0
882    /// 
883    /// Recalculate and insert L1 expired tasks into L0 layer
884    /// 
885    /// Updates task location in DeferredMap without removing/reinserting
886    /// 
887    /// 将任务从 L1 降级到 L0
888    /// 
889    /// 重新计算并将 L1 过期任务插入到 L0 层
890    /// 
891    /// 更新 DeferredMap 中的任务位置而不删除/重新插入
892    fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
893        for task in tasks {
894            // Calculate the remaining delay of the task in L0 layer
895            // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
896            // 计算任务在 L0 层的剩余延迟
897            // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
898            let l1_tick_ratio = self.l1_tick_ratio;
899            
900            // Calculate the original expiration time of the task (L1 tick)
901            // 计算任务的原始过期时间(L1 tick)
902            let l1_deadline = task.deadline_tick;
903            
904            // Convert to L0 tick expiration time
905            // 转换为 L0 tick 过期时间
906            let l0_deadline_tick = l1_deadline * l1_tick_ratio;
907            let l0_current_tick = self.l0.current_tick;
908            
909            // Calculate remaining L0 ticks
910            // 计算剩余 L0 ticks
911            let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
912                l0_deadline_tick - l0_current_tick
913            } else {
914                1 // At least trigger in next tick (至少在下一个 tick 触发)
915            };
916            
917            // Calculate L0 slot index
918            // 计算 L0 槽索引
919            let target_l0_tick = l0_current_tick + remaining_l0_ticks;
920            let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
921            
922            let task_id = task.get_id();
923            let vec_index = self.l0.slots[l0_slot_index].len();
924            let new_location = TaskLocation::new(0, l0_slot_index, vec_index);
925            
926            // Insert into L0 layer
927            // 插入到 L0 层
928            self.l0.slots[l0_slot_index].push(task);
929            
930            // Update task location in DeferredMap (task already exists in index)
931            // 更新 DeferredMap 中的任务位置(任务已在索引中)
932            if let Some(location) = self.task_index.get_mut(task_id.key()) {
933                *location = new_location;
934            }
935        }
936    }
937
938    /// Check if the timing wheel is empty
939    /// 
940    /// 检查时间轮是否为空
941    #[allow(dead_code)]
942    pub fn is_empty(&self) -> bool {
943        self.task_index.is_empty()
944    }
945
946    /// Postpone timer task (keep original TaskId)
947    ///
948    /// # Parameters
949    /// - `task_id`: Task ID to postpone
950    /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
951    /// - `new_callback`: New callback function (if None, keep original callback)
952    ///
953    /// # Returns
954    /// Returns true if the task exists and is successfully postponed, otherwise returns false
955    ///
956    /// # Implementation Details
957    /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
958    /// - Update delay time and callback function (if provided)
959    /// - Recalculate target layer, slot, and rounds based on new_delay
960    /// - Cross-layer migration may occur (L0 <-> L1)
961    /// - Re-insert to new position using original TaskId
962    /// - Keep consistent with external held TaskId reference
963    /// 
964    /// 延期定时器任务(保留原始 TaskId)
965    ///
966    /// # 参数
967    /// - `task_id`: 要延期的任务 ID
968    /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
969    /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
970    ///
971    /// # 返回值
972    /// 如果任务存在且成功延期则返回 true,否则返回 false
973    ///
974    /// # 实现细节
975    /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
976    /// - 更新延迟时间和回调函数(如果提供)
977    /// - 根据 new_delay 重新计算目标层级、槽位和轮数
978    /// - 可能发生跨层迁移(L0 <-> L1)
979    /// - 使用原始 TaskId 重新插入到新位置
980    /// - 与外部持有的 TaskId 引用保持一致
981    #[inline]
982    pub fn postpone(
983        &mut self,
984        task_id: TaskId,
985        new_delay: Duration,
986        new_callback: Option<crate::task::CallbackWrapper>,
987    ) -> bool {
988        // Step 1: Get task location from DeferredMap (don't remove)
989        // 步骤 1: 从 DeferredMap 获取任务位置(不删除)
990        let old_location = match self.task_index.get(task_id.key()) {
991            Some(loc) => *loc,
992            None => return false,
993        };
994        
995        // Use match to get slot reference
996        // 使用 match 获取槽引用
997        let slot = match old_location.level {
998            0 => &mut self.l0.slots[old_location.slot_index],
999            _ => &mut self.l1.slots[old_location.slot_index],
1000        };
1001        
1002        // Verify task is still at expected position
1003        // 验证任务仍在预期位置
1004        if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id {
1005            // Index inconsistent, return failure
1006            // 索引不一致,返回失败
1007            return false;
1008        }
1009        
1010        // Use swap_remove to remove task from slot
1011        // 使用 swap_remove 从槽中移除任务
1012        let mut task = slot.swap_remove(old_location.vec_index);
1013        
1014        // Update swapped element's index (if a swap occurred)
1015        // 更新被交换元素的索引(如果发生了交换)
1016        if old_location.vec_index < slot.len() {
1017            let swapped_task_id = slot[old_location.vec_index].get_id();
1018            // Use DeferredMap's get_mut with TaskId key
1019            // 使用 DeferredMap 的 get_mut,传入 TaskId key
1020            if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
1021                swapped_location.vec_index = old_location.vec_index;
1022            }
1023        }
1024        
1025        // Step 2: Update task's delay and callback
1026        // 步骤 2: 更新任务的延迟和回调
1027        task.update_delay(new_delay);
1028        if let Some(callback) = new_callback {
1029            task.update_callback(callback);
1030        }
1031        
1032        // Step 3: Recalculate layer, slot, and rounds based on new delay
1033        // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
1034        let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
1035        
1036        // Use match to reduce branches, and use cached slot mask
1037        // 使用 match 减少分支,并使用缓存的槽掩码
1038        let (current_tick, slot_mask, slots) = match new_level {
1039            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
1040            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
1041        };
1042        
1043        let total_ticks = current_tick + ticks;
1044        let new_slot_index = (total_ticks as usize) & slot_mask;
1045        
1046        // Update task's timing wheel parameters
1047        // 更新任务的时间轮参数
1048        task.deadline_tick = total_ticks;
1049        task.rounds = new_rounds;
1050        
1051        // Step 4: Re-insert task to new layer/slot
1052        // 步骤 4: 将任务重新插入到新层级/槽位
1053        let new_vec_index = slots[new_slot_index].len();
1054        let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
1055        
1056        slots[new_slot_index].push(task);
1057        
1058        // Update task location in DeferredMap (task already exists in index)
1059        // 更新 DeferredMap 中的任务位置(任务已在索引中)
1060        if let Some(location) = self.task_index.get_mut(task_id.key()) {
1061            *location = new_location;
1062        }
1063        
1064        true
1065    }
1066
1067    /// Batch postpone timer tasks
1068    ///
1069    /// # Parameters
1070    /// - `updates`: List of tuples of (task ID, new delay)
1071    ///
1072    /// # Returns
1073    /// Number of successfully postponed tasks
1074    ///
1075    /// # Performance Advantages
1076    /// - Batch processing reduces function call overhead
1077    /// - All delays are recalculated from current_tick at call time
1078    ///
1079    /// # Notes
1080    /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
1081    /// 
1082    /// 批量延期定时器任务
1083    ///
1084    /// # 参数
1085    /// - `updates`: (任务 ID, 新延迟) 元组列表
1086    ///
1087    /// # 返回值
1088    /// 成功延期的任务数量
1089    ///
1090    /// # 性能优势
1091    /// - 批处理减少函数调用开销
1092    /// - 所有延迟在调用时从 current_tick 重新计算
1093    ///
1094    /// # 注意
1095    /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
1096    #[inline]
1097    pub fn postpone_batch(
1098        &mut self,
1099        updates: Vec<(TaskId, Duration)>,
1100    ) -> usize {
1101        let mut postponed_count = 0;
1102        
1103        for (task_id, new_delay) in updates {
1104            if self.postpone(task_id, new_delay, None) {
1105                postponed_count += 1;
1106            }
1107        }
1108        
1109        postponed_count
1110    }
1111
1112    /// Batch postpone timer tasks (replace callbacks)
1113    ///
1114    /// # Parameters
1115    /// - `updates`: List of tuples of (task ID, new delay, new callback)
1116    ///
1117    /// # Returns
1118    /// Number of successfully postponed tasks
1119    /// 
1120    /// 批量延期定时器任务(替换回调)
1121    ///
1122    /// # 参数
1123    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1124    ///
1125    /// # 返回值
1126    /// 成功延期的任务数量
1127    pub fn postpone_batch_with_callbacks(
1128        &mut self,
1129        updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1130    ) -> usize {
1131        let mut postponed_count = 0;
1132        
1133        for (task_id, new_delay, new_callback) in updates {
1134            if self.postpone(task_id, new_delay, new_callback) {
1135                postponed_count += 1;
1136            }
1137        }
1138        
1139        postponed_count
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146    use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1147
1148    #[test]
1149    fn test_wheel_creation() {
1150        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1151        assert_eq!(wheel.slot_count(), 512);
1152        assert_eq!(wheel.current_tick(), 0);
1153        assert!(wheel.is_empty());
1154    }
1155
1156    #[test]
1157    fn test_hierarchical_wheel_creation() {
1158        let config = WheelConfig::default();
1159        
1160        let wheel = Wheel::new(config, BatchConfig::default());
1161        assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1162        assert_eq!(wheel.current_tick(), 0);
1163        assert!(wheel.is_empty());
1164        // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1165        assert_eq!(wheel.l1.slot_count, 64);
1166        assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1167    }
1168
1169    #[test]
1170    fn test_hierarchical_config_validation() {
1171        // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1172        let result = WheelConfig::builder()
1173            .l0_tick_duration(Duration::from_millis(10))
1174            .l0_slot_count(512)
1175            .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1176            .l1_slot_count(64)
1177            .build();
1178        
1179        assert!(result.is_err());
1180        
1181        // Correct configuration (正确的配置)
1182        let result = WheelConfig::builder()
1183            .l0_tick_duration(Duration::from_millis(10))
1184            .l0_slot_count(512)
1185            .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1186            .l1_slot_count(64)
1187            .build();
1188        
1189        assert!(result.is_ok());
1190    }
1191
1192    #[test]
1193    fn test_layer_determination() {
1194        let config = WheelConfig::default();
1195        
1196        let wheel = Wheel::new(config, BatchConfig::default());
1197        
1198        // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1199        // L0: 512 slots * 10ms = 5120ms
1200        let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1201        assert_eq!(level, 0);
1202        assert_eq!(rounds, 0);
1203        
1204        // Long delay should enter L1 layer
1205        // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1206        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1207        assert_eq!(level, 1);
1208        assert_eq!(rounds, 0);
1209        
1210        // Long delay should enter L1 layer and have rounds
1211        // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1212        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1213        assert_eq!(level, 1);
1214        assert!(rounds > 0);
1215    }
1216
1217    #[test]
1218    fn test_hierarchical_insert_and_advance() {
1219        let config = WheelConfig::default();
1220        
1221        let mut wheel = Wheel::new(config, BatchConfig::default());
1222        
1223        // Insert short delay task into L0 (插入短延迟任务到 L0)
1224        let callback = CallbackWrapper::new(|| async {});
1225        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1226        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1227        let handle = wheel.allocate_handle();
1228        let task_id = handle.task_id();
1229        wheel.insert(handle, task_with_notifier);
1230        
1231        // Verify task is in L0 layer (验证任务在 L0 层)
1232        let location = wheel.task_index.get(task_id.key()).unwrap();
1233        assert_eq!(location.level, 0);
1234        
1235        // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1236        for _ in 0..10 {
1237            let expired = wheel.advance();
1238            if !expired.is_empty() {
1239                assert_eq!(expired.len(), 1);
1240                assert_eq!(expired[0].id, task_id);
1241                return;
1242            }
1243        }
1244        panic!("Task should have expired");
1245    }
1246
1247    #[test]
1248    fn test_cross_layer_cancel() {
1249        let config = WheelConfig::default();
1250        
1251        let mut wheel = Wheel::new(config, BatchConfig::default());
1252        
1253        // Insert L0 task (插入 L0 任务)
1254        let callback1 = CallbackWrapper::new(|| async {});
1255        let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1256        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1257        let handle1 = wheel.allocate_handle();
1258        let task_id1 = handle1.task_id();
1259        wheel.insert(handle1, task_with_notifier1);
1260        
1261        // Insert L1 task (插入 L1 任务)
1262        let callback2 = CallbackWrapper::new(|| async {});
1263        let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1264        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1265        let handle2 = wheel.allocate_handle();
1266        let task_id2 = handle2.task_id();
1267        wheel.insert(handle2, task_with_notifier2);
1268        
1269        // Verify levels (验证层级)
1270        assert_eq!(wheel.task_index.get(task_id1.key()).unwrap().level, 0);
1271        assert_eq!(wheel.task_index.get(task_id2.key()).unwrap().level, 1);
1272        
1273        // Cancel L0 task (取消 L0 任务)
1274        assert!(wheel.cancel(task_id1));
1275        assert!(wheel.task_index.get(task_id1.key()).is_none());
1276        
1277        // Cancel L1 task (取消 L1 任务)
1278        assert!(wheel.cancel(task_id2));
1279        assert!(wheel.task_index.get(task_id2.key()).is_none());
1280        
1281        assert!(wheel.is_empty()); // 时间轮应该为空
1282    }
1283
1284    #[test]
1285    fn test_delay_to_ticks() {
1286        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1287        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1288        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1289        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1290    }
1291
1292    #[test]
1293    fn test_wheel_invalid_slot_count() {
1294        let result = WheelConfig::builder()
1295            .l0_slot_count(100)
1296            .build();
1297        assert!(result.is_err());
1298        if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1299            assert_eq!(slot_count, 100);
1300            assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1301        } else {
1302            panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1303        }
1304    }
1305
1306
1307    #[test]
1308    fn test_minimum_delay() {
1309        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1310        
1311        // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1312        let callback = CallbackWrapper::new(|| async {});
1313        let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1314        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1315        let handle = wheel.allocate_handle();
1316        let task_id: TaskId = handle.task_id();
1317        wheel.insert(handle, task_with_notifier);
1318        
1319        // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1320        let expired = wheel.advance();
1321        assert_eq!(expired.len(), 1, "Minimum delay task should be triggered after 1 tick"); // 最小延迟任务应该在 1 个 tick 后触发
1322        assert_eq!(expired[0].id, task_id);
1323    }
1324
1325
1326    #[test]
1327    fn test_advance_empty_slots() {
1328        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1329        
1330        // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1331        for _ in 0..100 {
1332            let expired = wheel.advance();
1333            assert!(expired.is_empty(), "Empty slots should not return any tasks");
1334        }
1335        
1336        assert_eq!(wheel.current_tick(), 100, "current_tick should correctly increment"); // current_tick 应该正确递增
1337    }
1338
1339
1340    #[test]
1341    fn test_slot_boundary() {
1342        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1343        
1344        // Test slot boundary and wraparound (测试槽边界和环绕)
1345        // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1346        let callback1 = CallbackWrapper::new(|| async {});
1347        let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1348        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1349        let handle1 = wheel.allocate_handle();
1350        let task_id_1 = handle1.task_id();
1351        wheel.insert(handle1, task_with_notifier1);
1352        
1353        // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发) 
1354        let callback2 = CallbackWrapper::new(|| async {});
1355        let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1356        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1357        let handle2 = wheel.allocate_handle();
1358        let task_id_2 = handle2.task_id();
1359        wheel.insert(handle2, task_with_notifier2);
1360        
1361        // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1362        let expired = wheel.advance();
1363        assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1364        assert_eq!(expired[0].id, task_id_1);
1365        
1366        // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1367        let mut triggered = false;
1368        for i in 0..510 {
1369            let expired = wheel.advance();
1370            if !expired.is_empty() {
1371                assert_eq!(expired.len(), 1, "The {}th advance should trigger the second task", i + 2); // 第 {i + 2} 次前进应该触发第二个任务
1372                assert_eq!(expired[0].id, task_id_2);
1373                triggered = true;
1374                break;
1375            }
1376        }
1377        assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1378        
1379        assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1380    }
1381
1382
1383    #[test]
1384    fn test_task_id_uniqueness() {
1385        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1386        
1387        // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1388        let mut task_ids = std::collections::HashSet::new();
1389        for _ in 0..100 {
1390            let callback = CallbackWrapper::new(|| async {});
1391            let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1392            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1393            let handle = wheel.allocate_handle();
1394            let task_id = handle.task_id();
1395            wheel.insert(handle, task_with_notifier);
1396            
1397            assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1398        }
1399        
1400        assert_eq!(task_ids.len(), 100);
1401    }
1402
1403}
1404