kestrel_timer/
wheel.rs

1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{
4    TaskCompletion, TaskHandle, TaskId, TaskLocation, TaskTypeWithCompletionNotifier,
5    TimerTaskForWheel, TimerTaskWithCompletionNotifier,
6};
7use deferred_map::DeferredMap;
8use std::time::Duration;
9
10pub struct WheelAdvanceResult {
11    pub id: TaskId,
12    pub callback: Option<CallbackWrapper>,
13}
14
15/// Timing wheel single layer data structure
16///
17/// 时间轮单层数据结构
18struct WheelLayer {
19    /// Slot array, each slot stores a group of timer tasks
20    ///
21    /// 槽数组,每个槽存储一组定时器任务
22    slots: Vec<Vec<TimerTaskForWheel>>,
23
24    /// Current time pointer (tick index)
25    ///
26    /// 当前时间指针(tick 索引)
27    current_tick: u64,
28
29    /// Slot count
30    ///
31    /// 槽数量
32    slot_count: usize,
33
34    /// Duration of each tick
35    ///
36    /// 每个 tick 的持续时间
37    tick_duration: Duration,
38
39    /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
40    ///
41    /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
42    tick_duration_ms: u64,
43
44    /// Cache: slot mask (slot_count - 1) - for fast modulo operation
45    ///
46    /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
47    slot_mask: usize,
48}
49
50impl WheelLayer {
51    /// Create a new wheel layer
52    ///
53    /// 创建一个新的时间轮层
54    fn new(slot_count: usize, tick_duration: Duration) -> Self {
55        let mut slots = Vec::with_capacity(slot_count);
56        // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
57        // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
58        //
59        // 为每个槽预分配容量以减少后续 push 时的重新分配
60        // 大多数槽通常包含 0-4 个任务,预分配容量为 4
61        for _ in 0..slot_count {
62            slots.push(Vec::with_capacity(4));
63        }
64
65        let tick_duration_ms = tick_duration.as_millis() as u64;
66        let slot_mask = slot_count - 1;
67
68        Self {
69            slots,
70            current_tick: 0,
71            slot_count,
72            tick_duration,
73            tick_duration_ms,
74            slot_mask,
75        }
76    }
77
78    /// Calculate the number of ticks corresponding to the delay
79    ///
80    /// 计算延迟对应的 tick 数量
81    fn delay_to_ticks(&self, delay: Duration) -> u64 {
82        let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
83        ticks.max(1) // at least 1 tick (至少 1 个 tick)
84    }
85}
86
87/// Timing wheel data structure (hierarchical mode)
88///
89/// Now uses DeferredMap for O(1) task lookup and generational safety.
90///
91/// 时间轮数据结构(分层模式)
92///
93/// 现在使用 DeferredMap 实现 O(1) 任务查找和代数安全
94pub struct Wheel {
95    /// L0 layer (bottom layer)
96    ///
97    /// L0 层(底层)
98    l0: WheelLayer,
99
100    /// L1 layer (top layer)
101    ///
102    /// L1 层(顶层)
103    l1: WheelLayer,
104
105    /// L1 tick ratio relative to L0 tick
106    ///
107    /// L1 tick 相对于 L0 tick 的比率
108    pub(crate) l1_tick_ratio: u64,
109
110    /// Task index for fast lookup and cancellation using DeferredMap
111    ///
112    /// Keys are TaskIds (u64 from DeferredMap), values are TaskLocations
113    ///
114    /// 任务索引,使用 DeferredMap 实现快速查找和取消
115    ///
116    /// 键是 TaskId(来自 DeferredMap 的 u64),值是 TaskLocation
117    pub(crate) task_index: DeferredMap<TaskLocation>,
118
119    /// Batch processing configuration
120    ///
121    /// 批处理配置
122    batch_config: BatchConfig,
123
124    /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
125    ///
126    /// 缓存:L0 层容量(毫秒)- 避免重复计算
127    l0_capacity_ms: u64,
128
129    /// Cache: L1 layer capacity in ticks - avoid repeated calculation
130    ///
131    /// 缓存:L1 层容量(tick 数)- 避免重复计算
132    l1_capacity_ticks: u64,
133}
134
135impl Wheel {
136    /// Create new timing wheel
137    ///
138    /// # Parameters
139    /// - `config`: Timing wheel configuration (already validated)
140    /// - `batch_config`: Batch processing configuration
141    ///
142    /// # Notes
143    /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
144    /// Now uses DeferredMap for task indexing with generational safety.
145    ///
146    /// 创建新的时间轮
147    ///
148    /// # 参数
149    /// - `config`: 时间轮配置(已验证)
150    /// - `batch_config`: 批处理配置
151    ///
152    /// # 注意
153    /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
154    /// 现在使用 DeferredMap 进行任务索引,具有代数安全特性。
155    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
156        let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
157        let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
158
159        // Calculate L1 tick ratio relative to L0 tick
160        // 计算 L1 tick 相对于 L0 tick 的比率
161        let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
162
163        // Pre-calculate capacity to avoid repeated calculation in insert
164        // 预计算容量,避免在 insert 中重复计算
165        let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
166        let l1_capacity_ticks = l1.slot_count as u64;
167
168        // Estimate initial capacity for DeferredMap based on L0 slot count
169        // 根据 L0 槽数量估算 DeferredMap 的初始容量
170        let estimated_capacity = (l0.slot_count / 4).max(64);
171
172        Self {
173            l0,
174            l1,
175            l1_tick_ratio,
176            task_index: DeferredMap::with_capacity(estimated_capacity),
177            batch_config,
178            l0_capacity_ms,
179            l1_capacity_ticks,
180        }
181    }
182
183    /// Get current tick (L0 layer tick)
184    ///
185    /// 获取当前 tick(L0 层 tick)
186    #[allow(dead_code)]
187    pub fn current_tick(&self) -> u64 {
188        self.l0.current_tick
189    }
190
191    /// Get tick duration (L0 layer tick duration)
192    ///
193    /// 获取 tick 持续时间(L0 层 tick 持续时间)
194    #[allow(dead_code)]
195    pub fn tick_duration(&self) -> Duration {
196        self.l0.tick_duration
197    }
198
199    /// Get slot count (L0 layer slot count)
200    ///
201    /// 获取槽数量(L0 层槽数量)
202    #[allow(dead_code)]
203    pub fn slot_count(&self) -> usize {
204        self.l0.slot_count
205    }
206
207    /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
208    ///
209    /// 计算延迟对应的 tick 数量(基于 L0 层)
210    #[allow(dead_code)]
211    pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
212        self.l0.delay_to_ticks(delay)
213    }
214
215    /// Determine which layer the delay should be inserted into
216    ///
217    /// # Returns
218    /// Returns: (layer, ticks, rounds)
219    /// - Layer: 0 = L0, 1 = L1
220    /// - Ticks: number of ticks calculated from current tick
221    /// - Rounds: number of rounds (only used for very long delays in L1 layer)
222    ///
223    /// 确定延迟应该插入到哪一层
224    ///
225    /// # 返回值
226    /// 返回:(层级, ticks, 轮数)
227    /// - 层级:0 = L0, 1 = L1
228    /// - Ticks:从当前 tick 计算的 tick 数量
229    /// - 轮数:轮数(仅用于 L1 层的超长延迟)
230    #[inline(always)]
231    fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
232        let delay_ms = delay.as_millis() as u64;
233
234        // Fast path: most tasks are within L0 range (using cached capacity)
235        // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
236        if delay_ms < self.l0_capacity_ms {
237            let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
238            return (0, l0_ticks, 0);
239        }
240
241        // Slow path: L1 layer tasks (using cached values)
242        // 慢速路径:L1 层任务(使用缓存的值)
243        let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
244
245        if l1_ticks < self.l1_capacity_ticks {
246            (1, l1_ticks, 0)
247        } else {
248            let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
249            (1, l1_ticks, rounds)
250        }
251    }
252
253    /// Allocate a handle from DeferredMap
254    ///
255    /// 从 DeferredMap 分配一个 handle
256    ///
257    /// # Returns
258    /// A unique handle for later insertion
259    ///
260    /// # 返回值
261    /// 用于后续插入的唯一 handle
262    pub fn allocate_handle(&mut self) -> TaskHandle {
263        TaskHandle::new(self.task_index.allocate_handle())
264    }
265
266    /// Batch allocate handles from DeferredMap
267    ///
268    /// 从 DeferredMap 批量分配 handles
269    ///
270    /// # Parameters
271    /// - `count`: Number of handles to allocate
272    ///
273    /// # Returns
274    /// Vector of unique handles for later batch insertion
275    ///
276    /// # 参数
277    /// - `count`: 要分配的 handle 数量
278    ///
279    /// # 返回值
280    /// 用于后续批量插入的唯一 handles 向量
281    pub fn allocate_handles(&mut self, count: usize) -> Vec<TaskHandle> {
282        let mut handles = Vec::with_capacity(count);
283        for _ in 0..count {
284            handles.push(TaskHandle::new(self.task_index.allocate_handle()));
285        }
286        handles
287    }
288
289    /// Insert timer task
290    ///
291    /// # Parameters
292    /// - `handle`: Handle for the task
293    /// - `task`: Timer task with completion notifier
294    ///
295    /// # Returns
296    /// Unique identifier of the task (TaskId) - now generated from DeferredMap
297    ///
298    /// # Implementation Details
299    /// - Allocates Handle from DeferredMap to generate TaskId with generational safety
300    /// - Automatically calculate the layer and slot where the task should be inserted
301    /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
302    /// - Use bit operations to optimize slot index calculation
303    /// - Use DeferredMap for O(1) lookup and cancellation with generation checking
304    ///
305    /// 插入定时器任务
306    ///
307    /// # 参数
308    /// - `handle`: 任务的 handle
309    /// - `task`: 带完成通知器的定时器任务
310    ///
311    /// # 返回值
312    /// 任务的唯一标识符(TaskId)- 现在从 DeferredMap 生成
313    ///
314    /// # 实现细节
315    /// - 自动计算任务应该插入的层级和槽位
316    /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
317    /// - 使用位运算优化槽索引计算
318    /// - 使用 DeferredMap 实现 O(1) 查找和取消,带代数检查
319    #[inline]
320    pub fn insert(&mut self, handle: TaskHandle, task: TimerTaskWithCompletionNotifier) {
321        let task_id = handle.task_id();
322
323        let (level, ticks, rounds) = self.determine_layer(task.delay);
324
325        // Use match to reduce branches, and use cached slot mask
326        // 使用 match 减少分支,并使用缓存的槽掩码
327        let (current_tick, slot_mask, slots) = match level {
328            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
329            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
330        };
331
332        let total_ticks = current_tick + ticks;
333        let slot_index = (total_ticks as usize) & slot_mask;
334
335        // Create task with the assigned TaskId
336        // 使用分配的 TaskId 创建任务
337        let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
338
339        // Get the index position of the task in Vec (the length before insertion is the index of the new task)
340        // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
341        let vec_index = slots[slot_index].len();
342        let location = TaskLocation::new(level, slot_index, vec_index);
343
344        // Insert task into slot
345        // 将任务插入槽中
346        slots[slot_index].push(task);
347
348        // Insert task location into DeferredMap using handle
349        // In non-debug mode, it is guaranteed that this method will not panic.
350        // 使用 handle 将任务位置插入 DeferredMap
351        // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
352        self.task_index
353            .insert(handle.into_handle(), location)
354            .expect("Handle should be valid for insertion");
355    }
356
357    /// Batch insert timer tasks
358    ///
359    /// # Parameters
360    /// - `handles`: List of pre-allocated handles for tasks
361    /// - `tasks`: List of timer tasks with completion notifiers
362    ///
363    /// # Returns
364    /// - `Ok(())` if all tasks are successfully inserted
365    /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
366    ///
367    /// # Performance Advantages
368    /// - Reduce repeated boundary checks and capacity adjustments
369    /// - For tasks with the same delay, calculation results can be reused
370    /// - Uses DeferredMap for efficient task indexing with generational safety
371    ///
372    /// 批量插入定时器任务
373    ///
374    /// # 参数
375    /// - `handles`: 任务的预分配 handles 列表
376    /// - `tasks`: 带完成通知器的定时器任务列表
377    ///
378    /// # 返回值
379    /// - `Ok(())` 如果所有任务成功插入
380    /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
381    ///
382    /// # 性能优势
383    /// - 减少重复的边界检查和容量调整
384    /// - 对于相同延迟的任务,可以重用计算结果
385    /// - 使用 DeferredMap 实现高效的任务索引,具有代数安全特性
386    #[inline]
387    pub fn insert_batch(
388        &mut self,
389        handles: Vec<TaskHandle>,
390        tasks: Vec<TimerTaskWithCompletionNotifier>,
391    ) -> Result<(), crate::error::TimerError> {
392        // Validate that handles and tasks have the same length
393        // 验证 handles 和 tasks 长度相同
394        if handles.len() != tasks.len() {
395            return Err(crate::error::TimerError::BatchLengthMismatch {
396                handles_len: handles.len(),
397                tasks_len: tasks.len(),
398            });
399        }
400
401        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
402            let task_id = handle.task_id();
403
404            let (level, ticks, rounds) = self.determine_layer(task.delay);
405
406            // Use match to reduce branches, and use cached slot mask
407            // 使用 match 减少分支,并使用缓存的槽掩码
408            let (current_tick, slot_mask, slots) = match level {
409                0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
410                _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
411            };
412
413            let total_ticks = current_tick + ticks;
414            let slot_index = (total_ticks as usize) & slot_mask;
415
416            // Create task with the assigned TaskId
417            // 使用分配的 TaskId 创建任务
418            let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
419
420            // Get the index position of the task in Vec
421            // 获取任务在 Vec 中的索引位置
422            let vec_index = slots[slot_index].len();
423            let location = TaskLocation::new(level, slot_index, vec_index);
424
425            // Insert task into slot
426            // 将任务插入槽中
427            slots[slot_index].push(task);
428
429            // Insert task location into DeferredMap using handle
430            // In non-debug mode, it is guaranteed that this method will not panic.
431            // 使用 handle 将任务位置插入 DeferredMap
432            // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
433            self.task_index
434                .insert(handle.into_handle(), location)
435                .expect("Handle should be valid for insertion");
436        }
437
438        Ok(())
439    }
440
441    /// Cancel timer task
442    ///
443    /// # Parameters
444    /// - `task_id`: Task ID
445    ///
446    /// # Returns
447    /// Returns true if the task exists and is successfully cancelled, otherwise returns false
448    ///
449    /// Now uses DeferredMap for safe task removal with generational checking
450    ///
451    /// 取消定时器任务
452    ///
453    /// # 参数
454    /// - `task_id`: 任务 ID
455    ///
456    /// # 返回值
457    /// 如果任务存在且成功取消则返回 true,否则返回 false
458    ///
459    /// 现在使用 DeferredMap 实现安全的任务移除,带代数检查
460    #[inline]
461    pub fn cancel(&mut self, task_id: TaskId) -> bool {
462        // Remove task location from DeferredMap using TaskId as key
463        // 使用 TaskId 作为 key 从 DeferredMap 中移除任务位置
464        let location = match self.task_index.remove(task_id.key()) {
465            Some(loc) => loc,
466            None => return false, // Task not found or already removed (generation mismatch)
467        };
468
469        // Use match to get slot reference, reduce branches
470        // 使用 match 获取槽引用,减少分支
471        let slot = match location.level {
472            0 => &mut self.l0.slots[location.slot_index],
473            _ => &mut self.l1.slots[location.slot_index],
474        };
475
476        // Boundary check and ID verification
477        // 边界检查和 ID 验证
478        if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
479            // Index inconsistent - this shouldn't happen with DeferredMap, but handle it anyway
480            // 索引不一致 - DeferredMap 不应该出现这种情况,但还是处理一下
481            return false;
482        }
483
484        // Use swap_remove to remove task, record swapped task ID
485        // 使用 swap_remove 移除任务,记录被交换的任务 ID
486        let removed_task = slot.swap_remove(location.vec_index);
487
488        // Ensure the correct task was removed
489        // 确保移除了正确的任务
490        debug_assert_eq!(removed_task.get_id(), task_id);
491
492        match removed_task.into_task_type() {
493            TaskTypeWithCompletionNotifier::OneShot {
494                completion_notifier,
495            } => {
496                // Use Notify + AtomicU8 for zero-allocation notification
497                // 使用 Notify + AtomicU8 实现零分配通知
498                completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
499            }
500            TaskTypeWithCompletionNotifier::Periodic {
501                completion_notifier,
502                ..
503            } => {
504                // Use flume for high-performance periodic notification
505                // 使用 flume 实现高性能周期通知
506                let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
507            }
508        }
509
510        // If a swap occurred (vec_index is not the last element)
511        // 如果发生了交换(vec_index 不是最后一个元素)
512        if location.vec_index < slot.len() {
513            let swapped_task_id = slot[location.vec_index].get_id();
514            // Update swapped element's index in one go, using DeferredMap's get_mut
515            // 一次性更新被交换元素的索引,使用 DeferredMap 的 get_mut
516            if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
517                swapped_location.vec_index = location.vec_index;
518            }
519        }
520
521        true
522    }
523
524    /// Batch cancel timer tasks
525    ///
526    /// # Parameters
527    /// - `task_ids`: List of task IDs to cancel
528    ///
529    /// # Returns
530    /// Number of successfully cancelled tasks
531    ///
532    /// # Performance Advantages
533    /// - Reduce repeated HashMap lookup overhead
534    /// - Multiple cancellation operations on the same slot can be batch processed
535    /// - Use unstable sort to improve performance
536    /// - Small batch optimization: skip sorting based on configuration threshold, process directly
537    ///
538    /// 批量取消定时器任务
539    ///
540    /// # 参数
541    /// - `task_ids`: 要取消的任务 ID 列表
542    ///
543    /// # 返回值
544    /// 成功取消的任务数量
545    ///
546    /// # 性能优势
547    /// - 减少重复的 HashMap 查找开销
548    /// - 同一槽位的多个取消操作可以批量处理
549    /// - 使用不稳定排序提高性能
550    /// - 小批量优化:根据配置阈值跳过排序,直接处理
551    #[inline]
552    pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
553        let mut cancelled_count = 0;
554
555        // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
556        // 小批量优化:逐个取消以避免分组和排序开销
557        if task_ids.len() <= self.batch_config.small_batch_threshold {
558            for &task_id in task_ids {
559                if self.cancel(task_id) {
560                    cancelled_count += 1;
561                }
562            }
563            return cancelled_count;
564        }
565
566        // Group by layer and slot to optimize batch cancellation
567        // Use SmallVec to avoid heap allocation in most cases
568        // 按层级和槽位分组以优化批量取消
569        // 使用 SmallVec 避免在大多数情况下进行堆分配
570        let l0_slot_count = self.l0.slot_count;
571        let l1_slot_count = self.l1.slot_count;
572
573        let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
574        let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
575
576        // Collect information of tasks to be cancelled
577        // 收集要取消的任务信息
578        for &task_id in task_ids {
579            // Use DeferredMap's get with TaskId key
580            // 使用 DeferredMap 的 get,传入 TaskId key
581            if let Some(location) = self.task_index.get(task_id.key()) {
582                if location.level == 0 {
583                    l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
584                } else {
585                    l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
586                }
587            }
588        }
589
590        // Process L0 layer cancellation
591        // 处理 L0 层取消
592        for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
593            if tasks.is_empty() {
594                continue;
595            }
596
597            // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
598            // 按 vec_index 降序排序,从后向前删除以避免索引失效
599            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
600
601            let slot = &mut self.l0.slots[slot_index];
602
603            for &(task_id, vec_index) in tasks.iter() {
604                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
605                    let removed_task = slot.swap_remove(vec_index);
606                    match removed_task.into_task_type() {
607                        TaskTypeWithCompletionNotifier::OneShot {
608                            completion_notifier,
609                        } => {
610                            // Use Notify + AtomicU8 for zero-allocation notification
611                            // 使用 Notify + AtomicU8 实现零分配通知
612                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
613                        }
614                        TaskTypeWithCompletionNotifier::Periodic {
615                            completion_notifier,
616                            ..
617                        } => {
618                            // Use flume for high-performance periodic notification
619                            // 使用 flume 实现高性能周期通知
620                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
621                        }
622                    }
623
624                    if vec_index < slot.len() {
625                        let swapped_task_id = slot[vec_index].get_id();
626                        // Use DeferredMap's get_mut with TaskId key
627                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
628                        if let Some(swapped_location) =
629                            self.task_index.get_mut(swapped_task_id.key())
630                        {
631                            swapped_location.vec_index = vec_index;
632                        }
633                    }
634
635                    // Use DeferredMap's remove with TaskId key
636                    // 使用 DeferredMap 的 remove,传入 TaskId key
637                    self.task_index.remove(task_id.key());
638                    cancelled_count += 1;
639                }
640            }
641        }
642
643        // Process L1 layer cancellation
644        // 处理 L1 层取消
645        for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
646            if tasks.is_empty() {
647                continue;
648            }
649
650            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
651
652            let slot = &mut self.l1.slots[slot_index];
653
654            for &(task_id, vec_index) in tasks.iter() {
655                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
656                    let removed_task = slot.swap_remove(vec_index);
657
658                    match removed_task.into_task_type() {
659                        TaskTypeWithCompletionNotifier::OneShot {
660                            completion_notifier,
661                        } => {
662                            // Use Notify + AtomicU8 for zero-allocation notification
663                            // 使用 Notify + AtomicU8 实现零分配通知
664                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
665                        }
666                        TaskTypeWithCompletionNotifier::Periodic {
667                            completion_notifier,
668                            ..
669                        } => {
670                            // Use flume for high-performance periodic notification
671                            // 使用 flume 实现高性能周期通知
672                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
673                        }
674                    }
675
676                    if vec_index < slot.len() {
677                        let swapped_task_id = slot[vec_index].get_id();
678                        // Use DeferredMap's get_mut with TaskId key
679                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
680                        if let Some(swapped_location) =
681                            self.task_index.get_mut(swapped_task_id.key())
682                        {
683                            swapped_location.vec_index = vec_index;
684                        }
685                    }
686
687                    // Use DeferredMap's remove with TaskId key
688                    // 使用 DeferredMap 的 remove,传入 TaskId key
689                    self.task_index.remove(task_id.key());
690                    cancelled_count += 1;
691                }
692            }
693        }
694
695        cancelled_count
696    }
697
698    /// Reinsert periodic task with the same TaskId
699    ///
700    /// Used to automatically reschedule periodic tasks after they expire
701    ///
702    /// TaskId remains unchanged, only location is updated in DeferredMap
703    ///
704    /// 重新插入周期性任务(保持相同的 TaskId)
705    ///
706    /// 用于在周期性任务过期后自动重新调度
707    ///
708    /// TaskId 保持不变,只更新 DeferredMap 中的位置
709    fn reinsert_periodic_task(&mut self, task_id: TaskId, task: TimerTaskWithCompletionNotifier) {
710        // Determine which layer the interval should be inserted into
711        // This method is only called by periodic tasks, so the interval is guaranteed to be Some.
712        // 确定间隔应该插入到哪一层
713        // 该方法只能由周期性任务调用,所以间隔是 guaranteed 应当保证为 Some.
714        let (level, ticks, rounds) = self.determine_layer(task.get_interval().unwrap());
715
716        // Use match to reduce branches, and use cached slot mask
717        // 使用 match 减少分支,并使用缓存的槽掩码
718        let (current_tick, slot_mask, slots) = match level {
719            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
720            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
721        };
722
723        let total_ticks = current_tick + ticks;
724        let slot_index = (total_ticks as usize) & slot_mask;
725
726        // Get the index position of the task in Vec
727        // 获取任务在 Vec 中的索引位置
728        let vec_index = slots[slot_index].len();
729        let new_location = TaskLocation::new(level, slot_index, vec_index);
730
731        // Insert task into slot
732        // 将任务插入槽中
733        slots[slot_index].push(TimerTaskForWheel::new_with_id(
734            task_id,
735            task,
736            total_ticks,
737            rounds,
738        ));
739
740        // Update task location in DeferredMap (doesn't remove, just updates)
741        // 更新 DeferredMap 中的任务位置(不删除,仅更新)
742        if let Some(location) = self.task_index.get_mut(task_id.key()) {
743            *location = new_location;
744        }
745    }
746
747    /// Advance the timing wheel by one tick, return all expired tasks
748    ///
749    /// # Returns
750    /// List of expired tasks
751    ///
752    /// # Implementation Details
753    /// - L0 layer advances 1 tick each time (no rounds check)
754    /// - L1 layer advances once every (L1_tick / L0_tick) times
755    /// - L1 expired tasks are batch demoted to L0
756    ///
757    /// 推进时间轮一个 tick,返回所有过期的任务
758    ///
759    /// # 返回值
760    /// 过期任务列表
761    ///
762    /// # 实现细节
763    /// - L0 层每次推进 1 个 tick(无轮数检查)
764    /// - L1 层每 (L1_tick / L0_tick) 次推进一次
765    /// - L1 层过期任务批量降级到 L0
766    pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
767        // Advance L0 layer
768        // 推进 L0 层
769        self.l0.current_tick += 1;
770
771        let mut expired_tasks = Vec::new();
772
773        // Process L0 layer expired tasks
774        // Distinguish between one-shot and periodic tasks
775        // 处理 L0 层过期任务
776        // 区分一次性任务和周期性任务
777        let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
778
779        // Collect periodic tasks to reinsert
780        // 收集需要重新插入的周期性任务
781        let mut periodic_tasks_to_reinsert = Vec::new();
782
783        {
784            let l0_slot = &mut self.l0.slots[l0_slot_index];
785
786            // Process all tasks in the current slot by always removing from index 0
787            // This avoids index tracking issues with swap_remove
788            // 通过始终从索引 0 移除来处理当前槽中的所有任务
789            // 这避免了 swap_remove 的索引跟踪问题
790            while !l0_slot.is_empty() {
791                // Always remove from index 0
792                // 始终从索引 0 移除
793                let task_with_notifier = l0_slot.swap_remove(0);
794                let task_id = task_with_notifier.get_id();
795
796                // Determine if this is a periodic task (check before consuming)
797                // 判断是否为周期性任务(在消耗前检查)
798                let is_periodic = matches!(
799                    task_with_notifier.task.task_type,
800                    TaskTypeWithCompletionNotifier::Periodic { .. }
801                );
802
803                // For one-shot tasks, remove from DeferredMap index
804                // For periodic tasks, keep in index (will update location in reinsert)
805                // 对于一次性任务,从 DeferredMap 索引中移除
806                // 对于周期性任务,保留在索引中(重新插入时会更新位置)
807                if !is_periodic {
808                    self.task_index.remove(task_id.key());
809                }
810
811                // Update swapped element's index (the element that was moved to index 0)
812                // 更新被交换元素的索引(被移动到索引 0 的元素)
813                if !l0_slot.is_empty() {
814                    let swapped_task_id = l0_slot[0].get_id();
815                    // Use DeferredMap's get_mut with TaskId key
816                    // 使用 DeferredMap 的 get_mut,传入 TaskId key
817                    if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
818                        swapped_location.vec_index = 0;
819                    }
820                }
821
822                let TimerTaskForWheel { task_id, task, .. } = task_with_notifier;
823
824                match &task.task_type {
825                    TaskTypeWithCompletionNotifier::Periodic {
826                        completion_notifier,
827                        ..
828                    } => {
829                        let _ = completion_notifier.0.try_send(TaskCompletion::Called);
830
831                        periodic_tasks_to_reinsert.push((
832                            task_id,
833                            TimerTaskWithCompletionNotifier {
834                                task_type: task.task_type,
835                                delay: task.delay,
836                                callback: task.callback.clone(),
837                            },
838                        ));
839                    }
840                    TaskTypeWithCompletionNotifier::OneShot {
841                        completion_notifier,
842                    } => {
843                        completion_notifier.notify(crate::task::TaskCompletion::Called);
844                    }
845                }
846
847                expired_tasks.push(WheelAdvanceResult {
848                    id: task_id,
849                    callback: task.callback,
850                });
851            }
852        }
853
854        // Reinsert periodic tasks for next interval
855        // 重新插入周期性任务到下一个周期
856        for (task_id, task) in periodic_tasks_to_reinsert {
857            self.reinsert_periodic_task(task_id, task);
858        }
859
860        // Process L1 layer
861        // Check if L1 layer needs to be advanced
862        // 处理 L1 层
863        // 检查是否需要推进 L1 层
864        if self.l0.current_tick.is_multiple_of(self.l1_tick_ratio) {
865            self.l1.current_tick += 1;
866            let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
867            let l1_slot = &mut self.l1.slots[l1_slot_index];
868
869            // Collect L1 layer expired tasks
870            // 收集 L1 层过期任务
871            let mut tasks_to_demote = Vec::new();
872            let mut i = 0;
873            while i < l1_slot.len() {
874                let task = &mut l1_slot[i];
875
876                if task.rounds > 0 {
877                    // Still has rounds, decrease rounds and keep
878                    // 还有轮数,减少轮数并保留
879                    task.rounds -= 1;
880                    // Use DeferredMap's get_mut with TaskId key
881                    // 使用 DeferredMap 的 get_mut,传入 TaskId key
882                    if let Some(location) = self.task_index.get_mut(task.get_id().key()) {
883                        location.vec_index = i;
884                    }
885                    i += 1;
886                } else {
887                    // rounds = 0, need to demote to L0
888                    // Don't remove from task_index - will update location in demote_tasks
889                    // rounds = 0,需要降级到 L0
890                    // 不从 task_index 中移除 - 在 demote_tasks 中会更新位置
891                    let task_to_demote = l1_slot.swap_remove(i);
892
893                    if i < l1_slot.len() {
894                        let swapped_task_id = l1_slot[i].get_id();
895                        // Use DeferredMap's get_mut with TaskId key
896                        // 使用 DeferredMap 的 get_mut,传入 TaskId key
897                        if let Some(swapped_location) =
898                            self.task_index.get_mut(swapped_task_id.key())
899                        {
900                            swapped_location.vec_index = i;
901                        }
902                    }
903
904                    tasks_to_demote.push(task_to_demote);
905                }
906            }
907
908            // Demote tasks to L0
909            // 将任务降级到 L0
910            self.demote_tasks(tasks_to_demote);
911        }
912
913        expired_tasks
914    }
915
916    /// Demote tasks from L1 to L0
917    ///
918    /// Recalculate and insert L1 expired tasks into L0 layer
919    ///
920    /// Updates task location in DeferredMap without removing/reinserting
921    ///
922    /// 将任务从 L1 降级到 L0
923    ///
924    /// 重新计算并将 L1 过期任务插入到 L0 层
925    ///
926    /// 更新 DeferredMap 中的任务位置而不删除/重新插入
927    fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
928        for task in tasks {
929            // Calculate the remaining delay of the task in L0 layer
930            // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
931            // 计算任务在 L0 层的剩余延迟
932            // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
933            let l1_tick_ratio = self.l1_tick_ratio;
934
935            // Calculate the original expiration time of the task (L1 tick)
936            // 计算任务的原始过期时间(L1 tick)
937            let l1_deadline = task.deadline_tick;
938
939            // Convert to L0 tick expiration time
940            // 转换为 L0 tick 过期时间
941            let l0_deadline_tick = l1_deadline * l1_tick_ratio;
942            let l0_current_tick = self.l0.current_tick;
943
944            // Calculate remaining L0 ticks
945            // 计算剩余 L0 ticks
946            let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
947                l0_deadline_tick - l0_current_tick
948            } else {
949                1 // At least trigger in next tick (至少在下一个 tick 触发)
950            };
951
952            // Calculate L0 slot index
953            // 计算 L0 槽索引
954            let target_l0_tick = l0_current_tick + remaining_l0_ticks;
955            let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
956
957            let task_id = task.get_id();
958            let vec_index = self.l0.slots[l0_slot_index].len();
959            let new_location = TaskLocation::new(0, l0_slot_index, vec_index);
960
961            // Insert into L0 layer
962            // 插入到 L0 层
963            self.l0.slots[l0_slot_index].push(task);
964
965            // Update task location in DeferredMap (task already exists in index)
966            // 更新 DeferredMap 中的任务位置(任务已在索引中)
967            if let Some(location) = self.task_index.get_mut(task_id.key()) {
968                *location = new_location;
969            }
970        }
971    }
972
973    /// Check if the timing wheel is empty
974    ///
975    /// 检查时间轮是否为空
976    #[allow(dead_code)]
977    pub fn is_empty(&self) -> bool {
978        self.task_index.is_empty()
979    }
980
981    /// Postpone timer task (keep original TaskId)
982    ///
983    /// # Parameters
984    /// - `task_id`: Task ID to postpone
985    /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
986    /// - `new_callback`: New callback function (if None, keep original callback)
987    ///
988    /// # Returns
989    /// Returns true if the task exists and is successfully postponed, otherwise returns false
990    ///
991    /// # Implementation Details
992    /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
993    /// - Update delay time and callback function (if provided)
994    /// - Recalculate target layer, slot, and rounds based on new_delay
995    /// - Cross-layer migration may occur (L0 <-> L1)
996    /// - Re-insert to new position using original TaskId
997    /// - Keep consistent with external held TaskId reference
998    ///
999    /// 延期定时器任务(保留原始 TaskId)
1000    ///
1001    /// # 参数
1002    /// - `task_id`: 要延期的任务 ID
1003    /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
1004    /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
1005    ///
1006    /// # 返回值
1007    /// 如果任务存在且成功延期则返回 true,否则返回 false
1008    ///
1009    /// # 实现细节
1010    /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
1011    /// - 更新延迟时间和回调函数(如果提供)
1012    /// - 根据 new_delay 重新计算目标层级、槽位和轮数
1013    /// - 可能发生跨层迁移(L0 <-> L1)
1014    /// - 使用原始 TaskId 重新插入到新位置
1015    /// - 与外部持有的 TaskId 引用保持一致
1016    #[inline]
1017    pub fn postpone(
1018        &mut self,
1019        task_id: TaskId,
1020        new_delay: Duration,
1021        new_callback: Option<crate::task::CallbackWrapper>,
1022    ) -> bool {
1023        // Step 1: Get task location from DeferredMap (don't remove)
1024        // 步骤 1: 从 DeferredMap 获取任务位置(不删除)
1025        let old_location = match self.task_index.get(task_id.key()) {
1026            Some(loc) => *loc,
1027            None => return false,
1028        };
1029
1030        // Use match to get slot reference
1031        // 使用 match 获取槽引用
1032        let slot = match old_location.level {
1033            0 => &mut self.l0.slots[old_location.slot_index],
1034            _ => &mut self.l1.slots[old_location.slot_index],
1035        };
1036
1037        // Verify task is still at expected position
1038        // 验证任务仍在预期位置
1039        if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id
1040        {
1041            // Index inconsistent, return failure
1042            // 索引不一致,返回失败
1043            return false;
1044        }
1045
1046        // Use swap_remove to remove task from slot
1047        // 使用 swap_remove 从槽中移除任务
1048        let mut task = slot.swap_remove(old_location.vec_index);
1049
1050        // Update swapped element's index (if a swap occurred)
1051        // 更新被交换元素的索引(如果发生了交换)
1052        if old_location.vec_index < slot.len() {
1053            let swapped_task_id = slot[old_location.vec_index].get_id();
1054            // Use DeferredMap's get_mut with TaskId key
1055            // 使用 DeferredMap 的 get_mut,传入 TaskId key
1056            if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
1057                swapped_location.vec_index = old_location.vec_index;
1058            }
1059        }
1060
1061        // Step 2: Update task's delay and callback
1062        // 步骤 2: 更新任务的延迟和回调
1063        task.update_delay(new_delay);
1064        if let Some(callback) = new_callback {
1065            task.update_callback(callback);
1066        }
1067
1068        // Step 3: Recalculate layer, slot, and rounds based on new delay
1069        // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
1070        let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
1071
1072        // Use match to reduce branches, and use cached slot mask
1073        // 使用 match 减少分支,并使用缓存的槽掩码
1074        let (current_tick, slot_mask, slots) = match new_level {
1075            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
1076            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
1077        };
1078
1079        let total_ticks = current_tick + ticks;
1080        let new_slot_index = (total_ticks as usize) & slot_mask;
1081
1082        // Update task's timing wheel parameters
1083        // 更新任务的时间轮参数
1084        task.deadline_tick = total_ticks;
1085        task.rounds = new_rounds;
1086
1087        // Step 4: Re-insert task to new layer/slot
1088        // 步骤 4: 将任务重新插入到新层级/槽位
1089        let new_vec_index = slots[new_slot_index].len();
1090        let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
1091
1092        slots[new_slot_index].push(task);
1093
1094        // Update task location in DeferredMap (task already exists in index)
1095        // 更新 DeferredMap 中的任务位置(任务已在索引中)
1096        if let Some(location) = self.task_index.get_mut(task_id.key()) {
1097            *location = new_location;
1098        }
1099
1100        true
1101    }
1102
1103    /// Batch postpone timer tasks
1104    ///
1105    /// # Parameters
1106    /// - `updates`: List of tuples of (task ID, new delay)
1107    ///
1108    /// # Returns
1109    /// Number of successfully postponed tasks
1110    ///
1111    /// # Performance Advantages
1112    /// - Batch processing reduces function call overhead
1113    /// - All delays are recalculated from current_tick at call time
1114    ///
1115    /// # Notes
1116    /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
1117    ///
1118    /// 批量延期定时器任务
1119    ///
1120    /// # 参数
1121    /// - `updates`: (任务 ID, 新延迟) 元组列表
1122    ///
1123    /// # 返回值
1124    /// 成功延期的任务数量
1125    ///
1126    /// # 性能优势
1127    /// - 批处理减少函数调用开销
1128    /// - 所有延迟在调用时从 current_tick 重新计算
1129    ///
1130    /// # 注意
1131    /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
1132    #[inline]
1133    pub fn postpone_batch(&mut self, updates: Vec<(TaskId, Duration)>) -> usize {
1134        let mut postponed_count = 0;
1135
1136        for (task_id, new_delay) in updates {
1137            if self.postpone(task_id, new_delay, None) {
1138                postponed_count += 1;
1139            }
1140        }
1141
1142        postponed_count
1143    }
1144
1145    /// Batch postpone timer tasks (replace callbacks)
1146    ///
1147    /// # Parameters
1148    /// - `updates`: List of tuples of (task ID, new delay, new callback)
1149    ///
1150    /// # Returns
1151    /// Number of successfully postponed tasks
1152    ///
1153    /// 批量延期定时器任务(替换回调)
1154    ///
1155    /// # 参数
1156    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1157    ///
1158    /// # 返回值
1159    /// 成功延期的任务数量
1160    pub fn postpone_batch_with_callbacks(
1161        &mut self,
1162        updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1163    ) -> usize {
1164        let mut postponed_count = 0;
1165
1166        for (task_id, new_delay, new_callback) in updates {
1167            if self.postpone(task_id, new_delay, new_callback) {
1168                postponed_count += 1;
1169            }
1170        }
1171
1172        postponed_count
1173    }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179    use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1180
1181    #[test]
1182    fn test_wheel_creation() {
1183        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1184        assert_eq!(wheel.slot_count(), 512);
1185        assert_eq!(wheel.current_tick(), 0);
1186        assert!(wheel.is_empty());
1187    }
1188
1189    #[test]
1190    fn test_hierarchical_wheel_creation() {
1191        let config = WheelConfig::default();
1192
1193        let wheel = Wheel::new(config, BatchConfig::default());
1194        assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1195        assert_eq!(wheel.current_tick(), 0);
1196        assert!(wheel.is_empty());
1197        // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1198        assert_eq!(wheel.l1.slot_count, 64);
1199        assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1200    }
1201
1202    #[test]
1203    fn test_hierarchical_config_validation() {
1204        // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1205        let result = WheelConfig::builder()
1206            .l0_tick_duration(Duration::from_millis(10))
1207            .l0_slot_count(512)
1208            .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1209            .l1_slot_count(64)
1210            .build();
1211
1212        assert!(result.is_err());
1213
1214        // Correct configuration (正确的配置)
1215        let result = WheelConfig::builder()
1216            .l0_tick_duration(Duration::from_millis(10))
1217            .l0_slot_count(512)
1218            .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1219            .l1_slot_count(64)
1220            .build();
1221
1222        assert!(result.is_ok());
1223    }
1224
1225    #[test]
1226    fn test_layer_determination() {
1227        let config = WheelConfig::default();
1228
1229        let wheel = Wheel::new(config, BatchConfig::default());
1230
1231        // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1232        // L0: 512 slots * 10ms = 5120ms
1233        let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1234        assert_eq!(level, 0);
1235        assert_eq!(rounds, 0);
1236
1237        // Long delay should enter L1 layer
1238        // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1239        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1240        assert_eq!(level, 1);
1241        assert_eq!(rounds, 0);
1242
1243        // Long delay should enter L1 layer and have rounds
1244        // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1245        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1246        assert_eq!(level, 1);
1247        assert!(rounds > 0);
1248    }
1249
1250    #[test]
1251    fn test_hierarchical_insert_and_advance() {
1252        let config = WheelConfig::default();
1253
1254        let mut wheel = Wheel::new(config, BatchConfig::default());
1255
1256        // Insert short delay task into L0 (插入短延迟任务到 L0)
1257        let callback = CallbackWrapper::new(|| async {});
1258        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1259        let (task_with_notifier, _completion_rx) =
1260            TimerTaskWithCompletionNotifier::from_timer_task(task);
1261        let handle = wheel.allocate_handle();
1262        let task_id = handle.task_id();
1263        wheel.insert(handle, task_with_notifier);
1264
1265        // Verify task is in L0 layer (验证任务在 L0 层)
1266        let location = wheel.task_index.get(task_id.key()).unwrap();
1267        assert_eq!(location.level, 0);
1268
1269        // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1270        for _ in 0..10 {
1271            let expired = wheel.advance();
1272            if !expired.is_empty() {
1273                assert_eq!(expired.len(), 1);
1274                assert_eq!(expired[0].id, task_id);
1275                return;
1276            }
1277        }
1278        panic!("Task should have expired");
1279    }
1280
1281    #[test]
1282    fn test_cross_layer_cancel() {
1283        let config = WheelConfig::default();
1284
1285        let mut wheel = Wheel::new(config, BatchConfig::default());
1286
1287        // Insert L0 task (插入 L0 任务)
1288        let callback1 = CallbackWrapper::new(|| async {});
1289        let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1290        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1291        let handle1 = wheel.allocate_handle();
1292        let task_id1 = handle1.task_id();
1293        wheel.insert(handle1, task_with_notifier1);
1294
1295        // Insert L1 task (插入 L1 任务)
1296        let callback2 = CallbackWrapper::new(|| async {});
1297        let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1298        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1299        let handle2 = wheel.allocate_handle();
1300        let task_id2 = handle2.task_id();
1301        wheel.insert(handle2, task_with_notifier2);
1302
1303        // Verify levels (验证层级)
1304        assert_eq!(wheel.task_index.get(task_id1.key()).unwrap().level, 0);
1305        assert_eq!(wheel.task_index.get(task_id2.key()).unwrap().level, 1);
1306
1307        // Cancel L0 task (取消 L0 任务)
1308        assert!(wheel.cancel(task_id1));
1309        assert!(wheel.task_index.get(task_id1.key()).is_none());
1310
1311        // Cancel L1 task (取消 L1 任务)
1312        assert!(wheel.cancel(task_id2));
1313        assert!(wheel.task_index.get(task_id2.key()).is_none());
1314
1315        assert!(wheel.is_empty()); // 时间轮应该为空
1316    }
1317
1318    #[test]
1319    fn test_delay_to_ticks() {
1320        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1321        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1322        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1323        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1324    }
1325
1326    #[test]
1327    fn test_wheel_invalid_slot_count() {
1328        let result = WheelConfig::builder().l0_slot_count(100).build();
1329        assert!(result.is_err());
1330        if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1331            assert_eq!(slot_count, 100);
1332            assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1333        } else {
1334            panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1335        }
1336    }
1337
1338    #[test]
1339    fn test_minimum_delay() {
1340        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1341
1342        // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1343        let callback = CallbackWrapper::new(|| async {});
1344        let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1345        let (task_with_notifier, _completion_rx) =
1346            TimerTaskWithCompletionNotifier::from_timer_task(task);
1347        let handle = wheel.allocate_handle();
1348        let task_id: TaskId = handle.task_id();
1349        wheel.insert(handle, task_with_notifier);
1350
1351        // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1352        let expired = wheel.advance();
1353        assert_eq!(
1354            expired.len(),
1355            1,
1356            "Minimum delay task should be triggered after 1 tick"
1357        ); // 最小延迟任务应该在 1 个 tick 后触发
1358        assert_eq!(expired[0].id, task_id);
1359    }
1360
1361    #[test]
1362    fn test_advance_empty_slots() {
1363        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1364
1365        // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1366        for _ in 0..100 {
1367            let expired = wheel.advance();
1368            assert!(
1369                expired.is_empty(),
1370                "Empty slots should not return any tasks"
1371            );
1372        }
1373
1374        assert_eq!(
1375            wheel.current_tick(),
1376            100,
1377            "current_tick should correctly increment"
1378        ); // current_tick 应该正确递增
1379    }
1380
1381    #[test]
1382    fn test_slot_boundary() {
1383        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1384
1385        // Test slot boundary and wraparound (测试槽边界和环绕)
1386        // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1387        let callback1 = CallbackWrapper::new(|| async {});
1388        let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1389        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1390        let handle1 = wheel.allocate_handle();
1391        let task_id_1 = handle1.task_id();
1392        wheel.insert(handle1, task_with_notifier1);
1393
1394        // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发)
1395        let callback2 = CallbackWrapper::new(|| async {});
1396        let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1397        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1398        let handle2 = wheel.allocate_handle();
1399        let task_id_2 = handle2.task_id();
1400        wheel.insert(handle2, task_with_notifier2);
1401
1402        // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1403        let expired = wheel.advance();
1404        assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1405        assert_eq!(expired[0].id, task_id_1);
1406
1407        // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1408        let mut triggered = false;
1409        for i in 0..510 {
1410            let expired = wheel.advance();
1411            if !expired.is_empty() {
1412                assert_eq!(
1413                    expired.len(),
1414                    1,
1415                    "The {}th advance should trigger the second task",
1416                    i + 2
1417                ); // 第 {i + 2} 次前进应该触发第二个任务
1418                assert_eq!(expired[0].id, task_id_2);
1419                triggered = true;
1420                break;
1421            }
1422        }
1423        assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1424
1425        assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1426    }
1427
1428    #[test]
1429    fn test_task_id_uniqueness() {
1430        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1431
1432        // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1433        let mut task_ids = std::collections::HashSet::new();
1434        for _ in 0..100 {
1435            let callback = CallbackWrapper::new(|| async {});
1436            let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1437            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1438            let handle = wheel.allocate_handle();
1439            let task_id = handle.task_id();
1440            wheel.insert(handle, task_with_notifier);
1441
1442            assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1443        }
1444
1445        assert_eq!(task_ids.len(), 100);
1446    }
1447}