kestrel_timer/
wheel.rs

1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{TaskCompletion, TaskId, TaskLocation, TaskTypeWithCompletionNotifier, TimerTaskForWheel, TimerTaskWithCompletionNotifier};
4use rustc_hash::FxHashMap;
5use std::time::Duration;
6
7pub struct WheelAdvanceResult {
8    pub id: TaskId,
9    pub callback: Option<CallbackWrapper>
10}
11
12/// Timing wheel single layer data structure
13/// 
14/// 时间轮单层数据结构
15struct WheelLayer {
16    /// Slot array, each slot stores a group of timer tasks
17    /// 
18    /// 槽数组,每个槽存储一组定时器任务
19    slots: Vec<Vec<TimerTaskForWheel>>,
20    
21    /// Current time pointer (tick index)
22    /// 
23    /// 当前时间指针(tick 索引)
24    current_tick: u64,
25    
26    /// Slot count
27    /// 
28    /// 槽数量
29    slot_count: usize,
30    
31    /// Duration of each tick
32    /// 
33    /// 每个 tick 的持续时间
34    tick_duration: Duration,
35    
36    /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
37    /// 
38    /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
39    tick_duration_ms: u64,
40    
41    /// Cache: slot mask (slot_count - 1) - for fast modulo operation
42    /// 
43    /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
44    slot_mask: usize,
45}
46
47impl WheelLayer {
48    /// Create a new wheel layer
49    /// 
50    /// 创建一个新的时间轮层
51    fn new(slot_count: usize, tick_duration: Duration) -> Self {
52        let mut slots = Vec::with_capacity(slot_count);
53        // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
54        // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
55        // 
56        // 为每个槽预分配容量以减少后续 push 时的重新分配
57        // 大多数槽通常包含 0-4 个任务,预分配容量为 4
58        for _ in 0..slot_count {
59            slots.push(Vec::with_capacity(4));
60        }
61        
62        let tick_duration_ms = tick_duration.as_millis() as u64;
63        let slot_mask = slot_count - 1;
64        
65        Self {
66            slots,
67            current_tick: 0,
68            slot_count,
69            tick_duration,
70            tick_duration_ms,
71            slot_mask,
72        }
73    }
74    
75    /// Calculate the number of ticks corresponding to the delay
76    /// 
77    /// 计算延迟对应的 tick 数量
78    fn delay_to_ticks(&self, delay: Duration) -> u64 {
79        let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
80        ticks.max(1) // at least 1 tick (至少 1 个 tick)
81    }
82}
83
84/// Timing wheel data structure (hierarchical mode)
85/// 
86/// 时间轮数据结构(分层模式)
87pub struct Wheel {
88    /// L0 layer (bottom layer)
89    /// 
90    /// L0 层(底层)
91    l0: WheelLayer,
92    
93    /// L1 layer (top layer)
94    /// 
95    /// L1 层(顶层)
96    l1: WheelLayer,
97    
98    /// L1 tick ratio relative to L0 tick
99    /// 
100    /// L1 tick 相对于 L0 tick 的比率
101    l1_tick_ratio: u64,
102    
103    /// Task index for fast lookup and cancellation
104    /// 
105    /// 任务索引,用于快速查找和取消
106    task_index: FxHashMap<TaskId, TaskLocation>,
107    
108    /// Batch processing configuration
109    /// 
110    /// 批处理配置
111    batch_config: BatchConfig,
112    
113    /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
114    /// 
115    /// 缓存:L0 层容量(毫秒)- 避免重复计算
116    l0_capacity_ms: u64,
117    
118    /// Cache: L1 layer capacity in ticks - avoid repeated calculation
119    /// 
120    /// 缓存:L1 层容量(tick 数)- 避免重复计算
121    l1_capacity_ticks: u64,
122}
123
124impl Wheel {
125    /// Create new timing wheel
126    ///
127    /// # Parameters
128    /// - `config`: Timing wheel configuration (already validated)
129    /// - `batch_config`: Batch processing configuration
130    ///
131    /// # Notes
132    /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
133    /// 
134    /// 创建新的时间轮
135    ///
136    /// # 参数
137    /// - `config`: 时间轮配置(已验证)
138    /// - `batch_config`: 批处理配置
139    ///
140    /// # 注意
141    /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
142    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
143        let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
144        let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
145        
146        // Calculate L1 tick ratio relative to L0 tick
147        // 计算 L1 tick 相对于 L0 tick 的比率
148        let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
149        
150        // Pre-calculate capacity to avoid repeated calculation in insert
151        // 预计算容量,避免在 insert 中重复计算
152        let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
153        let l1_capacity_ticks = l1.slot_count as u64;
154        
155        Self {
156            l0,
157            l1,
158            l1_tick_ratio,
159            task_index: FxHashMap::default(),
160            batch_config,
161            l0_capacity_ms,
162            l1_capacity_ticks,
163        }
164    }
165
166    /// Get current tick (L0 layer tick)
167    /// 
168    /// 获取当前 tick(L0 层 tick)
169    #[allow(dead_code)]
170    pub fn current_tick(&self) -> u64 {
171        self.l0.current_tick
172    }
173
174    /// Get tick duration (L0 layer tick duration)
175    /// 
176    /// 获取 tick 持续时间(L0 层 tick 持续时间)
177    #[allow(dead_code)]
178    pub fn tick_duration(&self) -> Duration {
179        self.l0.tick_duration
180    }
181
182    /// Get slot count (L0 layer slot count)
183    /// 
184    /// 获取槽数量(L0 层槽数量)
185    #[allow(dead_code)]
186    pub fn slot_count(&self) -> usize {
187        self.l0.slot_count
188    }
189
190    /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
191    /// 
192    /// 计算延迟对应的 tick 数量(基于 L0 层)
193    #[allow(dead_code)]
194    pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
195        self.l0.delay_to_ticks(delay)
196    }
197    
198    /// Determine which layer the delay should be inserted into
199    ///
200    /// # Returns
201    /// Returns: (layer, ticks, rounds)
202    /// - Layer: 0 = L0, 1 = L1
203    /// - Ticks: number of ticks calculated from current tick
204    /// - Rounds: number of rounds (only used for very long delays in L1 layer)
205    /// 
206    /// 确定延迟应该插入到哪一层
207    ///
208    /// # 返回值
209    /// 返回:(层级, ticks, 轮数)
210    /// - 层级:0 = L0, 1 = L1
211    /// - Ticks:从当前 tick 计算的 tick 数量
212    /// - 轮数:轮数(仅用于 L1 层的超长延迟)
213    #[inline(always)]
214    fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
215        let delay_ms = delay.as_millis() as u64;
216        
217        // Fast path: most tasks are within L0 range (using cached capacity)
218        // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
219        if delay_ms < self.l0_capacity_ms {
220            let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
221            return (0, l0_ticks, 0);
222        }
223        
224        // Slow path: L1 layer tasks (using cached values)
225        // 慢速路径:L1 层任务(使用缓存的值)
226        let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
227        
228        if l1_ticks < self.l1_capacity_ticks {
229            (1, l1_ticks, 0)
230        } else {
231            let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
232            (1, l1_ticks, rounds)
233        }
234    }
235
236    /// Insert timer task
237    ///
238    /// # Parameters
239    /// - `task`: Timer task
240    /// - `notifier`: Completion notifier (used to send notifications when tasks expire or are cancelled)
241    ///
242    /// # Returns
243    /// Unique identifier of the task (TaskId)
244    ///
245    /// # Implementation Details
246    /// - Automatically calculate the layer and slot where the task should be inserted
247    /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
248    /// - Use bit operations to optimize slot index calculation
249    /// - Maintain task index to support O(1) lookup and cancellation
250    /// 
251    /// 插入定时器任务
252    ///
253    /// # 参数
254    /// - `task`: 定时器任务
255    /// - `notifier`: 完成通知器(用于在任务过期或取消时发送通知)
256    ///
257    /// # 返回值
258    /// 任务的唯一标识符(TaskId)
259    ///
260    /// # 实现细节
261    /// - 自动计算任务应该插入的层级和槽位
262    /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
263    /// - 使用位运算优化槽索引计算
264    /// - 维护任务索引以支持 O(1) 查找和取消
265    #[inline]
266    pub fn insert(&mut self, task: TimerTaskWithCompletionNotifier) -> TaskId {
267        let (level, ticks, rounds) = self.determine_layer(task.delay);
268        
269        // Use match to reduce branches, and use cached slot mask
270        // 使用 match 减少分支,并使用缓存的槽掩码
271        let (current_tick, slot_mask, slots) = match level {
272            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
273            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
274        };
275        
276        let total_ticks = current_tick + ticks;
277        let slot_index = (total_ticks as usize) & slot_mask;
278
279        let task = TimerTaskForWheel::new(task, total_ticks, rounds);
280
281        let task_id = task.get_id();
282        
283        // Get the index position of the task in Vec (the length before insertion is the index of the new task)
284        // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
285        let vec_index = slots[slot_index].len();
286        let location = TaskLocation::new(level, slot_index, vec_index);
287
288        // Insert task into slot
289        // 将任务插入槽中
290        slots[slot_index].push(task);
291        
292        // Record task location
293        // 记录任务位置
294        self.task_index.insert(task_id, location);
295
296        task_id
297    }
298
299    /// Batch insert timer tasks
300    ///
301    /// # Parameters
302    /// - `tasks`: List of tuples of (task, completion notifier)
303    ///
304    /// # Returns
305    /// List of task IDs
306    ///
307    /// # Performance Advantages
308    /// - Reduce repeated boundary checks and capacity adjustments
309    /// - For tasks with the same delay, calculation results can be reused
310    /// 
311    /// 批量插入定时器任务
312    ///
313    /// # 参数
314    /// - `tasks`: (任务, 完成通知器) 元组列表
315    ///
316    /// # 返回值
317    /// 任务 ID 列表
318    ///
319    /// # 性能优势
320    /// - 减少重复的边界检查和容量调整
321    /// - 对于相同延迟的任务,可以重用计算结果
322    #[inline]
323    pub fn insert_batch(&mut self, tasks: Vec<TimerTaskWithCompletionNotifier>) -> Vec<TaskId> {
324        let task_count = tasks.len();
325        
326        // Optimize: pre-allocate HashMap capacity to avoid reallocation
327        // 优化:预分配 HashMap 容量以避免重新分配
328        self.task_index.reserve(task_count);
329        
330        let mut task_ids = Vec::with_capacity(task_count);
331        
332        for task in tasks {
333            let (level, ticks, rounds) = self.determine_layer(task.delay);
334            
335            // Use match to reduce branches, and use cached slot mask
336            // 使用 match 减少分支,并使用缓存的槽掩码
337            let (current_tick, slot_mask, slots) = match level {
338                0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
339                _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
340            };
341            
342            let total_ticks = current_tick + ticks;
343            let slot_index = (total_ticks as usize) & slot_mask;
344
345            let task = TimerTaskForWheel::new(task, total_ticks, rounds);
346
347            let task_id = task.get_id();
348            
349            // Get the index position of the task in Vec
350            // 获取任务在 Vec 中的索引位置
351            let vec_index = slots[slot_index].len();
352            let location = TaskLocation::new(level, slot_index, vec_index);
353
354            // Insert task into slot
355            // 将任务插入槽中
356            slots[slot_index].push(task);
357            
358            // Record task location
359            // 记录任务位置
360            self.task_index.insert(task_id, location);
361            
362            task_ids.push(task_id);
363        }
364        
365        task_ids
366    }
367
368    /// Cancel timer task
369    ///
370    /// # Parameters
371    /// - `task_id`: Task ID
372    ///
373    /// # Returns
374    /// Returns true if the task exists and is successfully cancelled, otherwise returns false
375    /// 
376    /// 取消定时器任务
377    ///
378    /// # 参数
379    /// - `task_id`: 任务 ID
380    ///
381    /// # 返回值
382    /// 如果任务存在且成功取消则返回 true,否则返回 false
383    #[inline]
384    pub fn cancel(&mut self, task_id: TaskId) -> bool {
385        // Remove task location from index
386        // 从索引中移除任务位置
387        let location = match self.task_index.remove(&task_id) {
388            Some(loc) => loc,
389            None => return false,
390        };
391        
392        // Use match to get slot reference, reduce branches
393        // 使用 match 获取槽引用,减少分支
394        let slot = match location.level {
395            0 => &mut self.l0.slots[location.slot_index],
396            _ => &mut self.l1.slots[location.slot_index],
397        };
398        
399        // Boundary check and ID verification
400        // 边界检查和 ID 验证
401        if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
402            // Index inconsistent, re-insert location to maintain data consistency
403            // 索引不一致,重新插入位置以保持数据一致性
404            self.task_index.insert(task_id, location);
405            return false;
406        }
407        // Use swap_remove to remove task, record swapped task ID
408        // 使用 swap_remove 移除任务,记录被交换的任务 ID
409        let removed_task = slot.swap_remove(location.vec_index);
410        
411        // Ensure the correct task was removed
412        // 确保移除了正确的任务
413        debug_assert_eq!(removed_task.get_id(), task_id);
414
415        match removed_task.into_task_type() {
416            TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
417                // Use Notify + AtomicU8 for zero-allocation notification
418                // 使用 Notify + AtomicU8 实现零分配通知
419                completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
420            }
421            TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
422                // Use flume for high-performance periodic notification
423                // 使用 flume 实现高性能周期通知
424                let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
425            }
426        }
427        
428        // If a swap occurred (vec_index is not the last element)
429        // 如果发生了交换(vec_index 不是最后一个元素)
430        if location.vec_index < slot.len() {
431            let swapped_task_id = slot[location.vec_index].get_id();
432            // Update swapped element's index in one go, avoid another HashMap query
433            // 一次性更新被交换元素的索引,避免再次查询 HashMap
434            if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
435                swapped_location.vec_index = location.vec_index;
436            }
437        }
438        
439        true
440    }
441
442    /// Batch cancel timer tasks
443    ///
444    /// # Parameters
445    /// - `task_ids`: List of task IDs to cancel
446    ///
447    /// # Returns
448    /// Number of successfully cancelled tasks
449    ///
450    /// # Performance Advantages
451    /// - Reduce repeated HashMap lookup overhead
452    /// - Multiple cancellation operations on the same slot can be batch processed
453    /// - Use unstable sort to improve performance
454    /// - Small batch optimization: skip sorting based on configuration threshold, process directly
455    /// 
456    /// 批量取消定时器任务
457    ///
458    /// # 参数
459    /// - `task_ids`: 要取消的任务 ID 列表
460    ///
461    /// # 返回值
462    /// 成功取消的任务数量
463    ///
464    /// # 性能优势
465    /// - 减少重复的 HashMap 查找开销
466    /// - 同一槽位的多个取消操作可以批量处理
467    /// - 使用不稳定排序提高性能
468    /// - 小批量优化:根据配置阈值跳过排序,直接处理
469    #[inline]
470    pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
471        let mut cancelled_count = 0;
472        
473        // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
474        // 小批量优化:逐个取消以避免分组和排序开销
475        if task_ids.len() <= self.batch_config.small_batch_threshold {
476            for &task_id in task_ids {
477                if self.cancel(task_id) {
478                    cancelled_count += 1;
479                }
480            }
481            return cancelled_count;
482        }
483        
484        // Group by layer and slot to optimize batch cancellation
485        // Use SmallVec to avoid heap allocation in most cases
486        // 按层级和槽位分组以优化批量取消
487        // 使用 SmallVec 避免在大多数情况下进行堆分配
488        let l0_slot_count = self.l0.slot_count;
489        let l1_slot_count = self.l1.slot_count;
490        
491        let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
492        let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
493        
494        // Collect information of tasks to be cancelled
495        // 收集要取消的任务信息
496        for &task_id in task_ids {
497            if let Some(location) = self.task_index.get(&task_id) {
498                if location.level == 0 {
499                    l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
500                } else {
501                    l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
502                }
503            }
504        }
505        
506        // Process L0 layer cancellation
507        // 处理 L0 层取消
508        for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
509            if tasks.is_empty() {
510                continue;
511            }
512            
513            // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
514            // 按 vec_index 降序排序,从后向前删除以避免索引失效
515            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
516            
517            let slot = &mut self.l0.slots[slot_index];
518            
519            for &(task_id, vec_index) in tasks.iter() {
520                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
521                    let removed_task = slot.swap_remove(vec_index);
522                    match removed_task.into_task_type() {
523                        TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
524                            // Use Notify + AtomicU8 for zero-allocation notification
525                            // 使用 Notify + AtomicU8 实现零分配通知
526                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
527                        }
528                        TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
529                            // Use flume for high-performance periodic notification
530                            // 使用 flume 实现高性能周期通知
531                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
532                        }
533                    }
534                    
535                    
536                    if vec_index < slot.len() {
537                        let swapped_task_id = slot[vec_index].get_id();
538                        if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
539                            swapped_location.vec_index = vec_index;
540                        }
541                    }
542                    
543                    self.task_index.remove(&task_id);
544                    cancelled_count += 1;
545                }
546            }
547        }
548        
549        // Process L1 layer cancellation
550        // 处理 L1 层取消
551        for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
552            if tasks.is_empty() {
553                continue;
554            }
555            
556            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
557            
558            let slot = &mut self.l1.slots[slot_index];
559            
560            for &(task_id, vec_index) in tasks.iter() {
561                if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
562            
563                    let removed_task = slot.swap_remove(vec_index);
564                    
565                    match removed_task.into_task_type() {
566                        TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
567                            // Use Notify + AtomicU8 for zero-allocation notification
568                            // 使用 Notify + AtomicU8 实现零分配通知
569                            completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
570                        }
571                        TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
572                            // Use flume for high-performance periodic notification
573                            // 使用 flume 实现高性能周期通知
574                            let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
575                        }
576                    }
577                    
578                    if vec_index < slot.len() {
579                        let swapped_task_id = slot[vec_index].get_id();
580                        if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
581                            swapped_location.vec_index = vec_index;
582                        }
583                    }
584                    
585                    self.task_index.remove(&task_id);
586                    cancelled_count += 1;
587                }
588            }
589        }
590        
591        cancelled_count
592    }
593
594    /// Reinsert periodic task with the same TaskId
595    /// 
596    /// Used to automatically reschedule periodic tasks after they expire
597    /// 
598    /// 重新插入周期性任务(保持相同的 TaskId)
599    /// 
600    /// 用于在周期性任务过期后自动重新调度
601    fn reinsert_periodic_task(
602        &mut self,
603        periodic_task: TimerTaskWithCompletionNotifier,
604    ) {
605        let task_id = periodic_task.get_id();
606
607        // Determine which layer the interval should be inserted into
608        // 确定间隔应该插入到哪一层
609        let (level, ticks, rounds) = self.determine_layer(periodic_task.get_interval().unwrap());
610        
611        // Use match to reduce branches, and use cached slot mask
612        // 使用 match 减少分支,并使用缓存的槽掩码
613        let (current_tick, slot_mask, slots) = match level {
614            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
615            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
616        };
617        
618        let total_ticks = current_tick + ticks;
619        let slot_index = (total_ticks as usize) & slot_mask;
620        
621        // Create new task with the same TaskId
622        // 创建新任务,使用相同的 TaskId
623        let task = TimerTaskForWheel::new(periodic_task, total_ticks, rounds);
624        
625        // Get the index position of the task in Vec
626        // 获取任务在 Vec 中的索引位置
627        let vec_index = slots[slot_index].len();
628        let location = TaskLocation::new(level, slot_index, vec_index);
629        
630        // Insert task into slot
631        // 将任务插入槽中
632        slots[slot_index].push(task);
633        
634        // Record task location
635        // 记录任务位置
636        self.task_index.insert(task_id, location);
637    }
638
639    /// Advance the timing wheel by one tick, return all expired tasks
640    ///
641    /// # Returns
642    /// List of expired tasks
643    ///
644    /// # Implementation Details
645    /// - L0 layer advances 1 tick each time (no rounds check)
646    /// - L1 layer advances once every (L1_tick / L0_tick) times
647    /// - L1 expired tasks are batch demoted to L0
648    /// 
649    /// 推进时间轮一个 tick,返回所有过期的任务
650    ///
651    /// # 返回值
652    /// 过期任务列表
653    ///
654    /// # 实现细节
655    /// - L0 层每次推进 1 个 tick(无轮数检查)
656    /// - L1 层每 (L1_tick / L0_tick) 次推进一次
657    /// - L1 层过期任务批量降级到 L0
658    pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
659        // Advance L0 layer
660        // 推进 L0 层
661        self.l0.current_tick += 1;
662        
663        let mut expired_tasks = Vec::new();
664        
665        // Process L0 layer expired tasks
666        // Distinguish between one-shot and periodic tasks
667        // 处理 L0 层过期任务
668        // 区分一次性任务和周期性任务
669        let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
670        
671        // Collect periodic tasks to reinsert
672        // 收集需要重新插入的周期性任务
673        let mut periodic_tasks_to_reinsert = Vec::new();
674        
675        {
676            let l0_slot = &mut self.l0.slots[l0_slot_index];
677            
678            let i = 0;
679            while i < l0_slot.len() {
680                // Remove from index
681                // 从索引中移除
682                let task_id = l0_slot[i].get_id();
683                self.task_index.remove(&task_id);
684                
685                // Use swap_remove to remove task
686                // 使用 swap_remove 移除任务
687                let task_with_notifier = l0_slot.swap_remove(i);
688                
689                // Update swapped element's index
690                // 更新被交换元素的索引
691                if i < l0_slot.len() {
692                    let swapped_task_id = l0_slot[i].get_id();
693                    if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
694                        swapped_location.vec_index = i;
695                    }
696                }
697
698
699                let TimerTaskForWheel { task, .. } = task_with_notifier;
700
701                match task.task_type {
702                    TaskTypeWithCompletionNotifier::Periodic { interval, completion_notifier } => {
703                        // Use flume for high-performance periodic notification
704                        // 使用 flume 实现高性能周期通知
705                        let _ = completion_notifier.0.try_send(TaskCompletion::Called);
706                        
707                        periodic_tasks_to_reinsert.push(TimerTaskWithCompletionNotifier {
708                            id: task.id,
709                            task_type: TaskTypeWithCompletionNotifier::Periodic { interval, completion_notifier },
710                            delay: task.delay,
711                            callback: task.callback.clone(), 
712                        });
713                    }
714                    TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
715                        // Use Notify + AtomicU8 for zero-allocation notification
716                        // 使用 Notify + AtomicU8 实现零分配通知
717                        completion_notifier.notify(crate::task::TaskCompletion::Called);
718                    }
719                }
720
721                expired_tasks.push(WheelAdvanceResult {
722                    id: task_id,
723                    callback: task.callback,
724                });
725                
726                // Don't increment i, as we've removed the current element
727                // 不增加 i,因为我们已经移除了当前元素
728            }
729        }
730        
731        // Reinsert periodic tasks for next interval
732        // 重新插入周期性任务到下一个周期
733        for task_type in periodic_tasks_to_reinsert {
734            self.reinsert_periodic_task(task_type);
735        }
736        
737        // Process L1 layer
738        // Check if L1 layer needs to be advanced
739        // 处理 L1 层
740        // 检查是否需要推进 L1 层
741        if self.l0.current_tick % self.l1_tick_ratio == 0 {
742            self.l1.current_tick += 1;
743            let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
744            let l1_slot = &mut self.l1.slots[l1_slot_index];
745            
746            // Collect L1 layer expired tasks
747            // 收集 L1 层过期任务
748            let mut tasks_to_demote = Vec::new();
749            let mut i = 0;
750            while i < l1_slot.len() {
751                let task = &mut l1_slot[i];
752                
753                if task.rounds > 0 {
754                    // Still has rounds, decrease rounds and keep
755                    // 还有轮数,减少轮数并保留
756                    task.rounds -= 1;
757                    if let Some(location) = self.task_index.get_mut(&task.get_id()) {
758                        location.vec_index = i;
759                    }
760                    i += 1;
761                } else {
762                    // rounds = 0, need to demote to L0
763                    // rounds = 0,需要降级到 L0
764                    self.task_index.remove(&task.get_id());
765                    let task_to_demote = l1_slot.swap_remove(i);
766                    
767                    if i < l1_slot.len() {
768                        let swapped_task_id = l1_slot[i].get_id();
769                        if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
770                            swapped_location.vec_index = i;
771                        }
772                    }
773                    
774                    tasks_to_demote.push(task_to_demote);
775                }
776            }
777            
778            // Demote tasks to L0
779            // 将任务降级到 L0
780            self.demote_tasks(tasks_to_demote);
781        }
782        
783        expired_tasks
784    }
785    
786    /// Demote tasks from L1 to L0
787    /// 
788    /// Recalculate and insert L1 expired tasks into L0 layer
789    /// 
790    /// 将任务从 L1 降级到 L0
791    /// 
792    /// 重新计算并将 L1 过期任务插入到 L0 层
793    fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
794        for task in tasks {
795            // Calculate the remaining delay of the task in L0 layer
796            // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
797            // 计算任务在 L0 层的剩余延迟
798            // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
799            let l1_tick_ratio = self.l1_tick_ratio;
800            
801            // Calculate the original expiration time of the task (L1 tick)
802            // 计算任务的原始过期时间(L1 tick)
803            let l1_deadline = task.deadline_tick;
804            
805            // Convert to L0 tick expiration time
806            // 转换为 L0 tick 过期时间
807            let l0_deadline_tick = l1_deadline * l1_tick_ratio;
808            let l0_current_tick = self.l0.current_tick;
809            
810            // Calculate remaining L0 ticks
811            // 计算剩余 L0 ticks
812            let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
813                l0_deadline_tick - l0_current_tick
814            } else {
815                1 // At least trigger in next tick (至少在下一个 tick 触发)
816            };
817            
818            // Calculate L0 slot index
819            // 计算 L0 槽索引
820            let target_l0_tick = l0_current_tick + remaining_l0_ticks;
821            let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
822            
823            let task_id = task.get_id();
824            let vec_index = self.l0.slots[l0_slot_index].len();
825            let location = TaskLocation::new(0, l0_slot_index, vec_index);
826            
827            // Insert into L0 layer
828            // 插入到 L0 层
829            self.l0.slots[l0_slot_index].push(task);
830            self.task_index.insert(task_id, location);
831        }
832    }
833
834    /// Check if the timing wheel is empty
835    /// 
836    /// 检查时间轮是否为空
837    #[allow(dead_code)]
838    pub fn is_empty(&self) -> bool {
839        self.task_index.is_empty()
840    }
841
842    /// Postpone timer task (keep original TaskId)
843    ///
844    /// # Parameters
845    /// - `task_id`: Task ID to postpone
846    /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
847    /// - `new_callback`: New callback function (if None, keep original callback)
848    ///
849    /// # Returns
850    /// Returns true if the task exists and is successfully postponed, otherwise returns false
851    ///
852    /// # Implementation Details
853    /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
854    /// - Update delay time and callback function (if provided)
855    /// - Recalculate target layer, slot, and rounds based on new_delay
856    /// - Cross-layer migration may occur (L0 <-> L1)
857    /// - Re-insert to new position using original TaskId
858    /// - Keep consistent with external held TaskId reference
859    /// 
860    /// 延期定时器任务(保留原始 TaskId)
861    ///
862    /// # 参数
863    /// - `task_id`: 要延期的任务 ID
864    /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
865    /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
866    ///
867    /// # 返回值
868    /// 如果任务存在且成功延期则返回 true,否则返回 false
869    ///
870    /// # 实现细节
871    /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
872    /// - 更新延迟时间和回调函数(如果提供)
873    /// - 根据 new_delay 重新计算目标层级、槽位和轮数
874    /// - 可能发生跨层迁移(L0 <-> L1)
875    /// - 使用原始 TaskId 重新插入到新位置
876    /// - 与外部持有的 TaskId 引用保持一致
877    #[inline]
878    pub fn postpone(
879        &mut self,
880        task_id: TaskId,
881        new_delay: Duration,
882        new_callback: Option<crate::task::CallbackWrapper>,
883    ) -> bool {
884        // Step 1: Find and remove original task
885        // 步骤 1: 查找并移除原任务
886        let old_location = match self.task_index.remove(&task_id) {
887            Some(loc) => loc,
888            None => return false,
889        };
890        
891        // Use match to get slot reference
892        // 使用 match 获取槽引用
893        let slot = match old_location.level {
894            0 => &mut self.l0.slots[old_location.slot_index],
895            _ => &mut self.l1.slots[old_location.slot_index],
896        };
897        
898        // Verify task is still at expected position
899        // 验证任务仍在预期位置
900        if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id {
901            // Index inconsistent, re-insert and return failure
902            // 索引不一致,重新插入并返回失败
903            self.task_index.insert(task_id, old_location);
904            return false;
905        }
906        
907        // Use swap_remove to remove task
908        // 使用 swap_remove 移除任务
909        let mut task = slot.swap_remove(old_location.vec_index);
910        
911        // Update swapped element's index (if a swap occurred)
912        // 更新被交换元素的索引(如果发生了交换)
913        if old_location.vec_index < slot.len() {
914            let swapped_task_id = slot[old_location.vec_index].get_id();
915            if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
916                swapped_location.vec_index = old_location.vec_index;
917            }
918        }
919        
920        // Step 2: Update task's delay and callback
921        // 步骤 2: 更新任务的延迟和回调
922        task.update_delay(new_delay);
923        if let Some(callback) = new_callback {
924            task.update_callback(callback);
925        }
926        
927        // Step 3: Recalculate layer, slot, and rounds based on new delay
928        // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
929        let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
930        
931        // Use match to reduce branches, and use cached slot mask
932        // 使用 match 减少分支,并使用缓存的槽掩码
933        let (current_tick, slot_mask, slots) = match new_level {
934            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
935            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
936        };
937        
938        let total_ticks = current_tick + ticks;
939        let new_slot_index = (total_ticks as usize) & slot_mask;
940        
941        // Update task's timing wheel parameters
942        // 更新任务的时间轮参数
943        task.deadline_tick = total_ticks;
944        task.rounds = new_rounds;
945        
946        // Step 4: Re-insert task to new layer/slot
947        // 步骤 4: 将任务重新插入到新层级/槽位
948        let new_vec_index = slots[new_slot_index].len();
949        let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
950        
951        slots[new_slot_index].push(task);
952        self.task_index.insert(task_id, new_location);
953        
954        true
955    }
956
957    /// Batch postpone timer tasks
958    ///
959    /// # Parameters
960    /// - `updates`: List of tuples of (task ID, new delay)
961    ///
962    /// # Returns
963    /// Number of successfully postponed tasks
964    ///
965    /// # Performance Advantages
966    /// - Batch processing reduces function call overhead
967    /// - All delays are recalculated from current_tick at call time
968    ///
969    /// # Notes
970    /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
971    /// 
972    /// 批量延期定时器任务
973    ///
974    /// # 参数
975    /// - `updates`: (任务 ID, 新延迟) 元组列表
976    ///
977    /// # 返回值
978    /// 成功延期的任务数量
979    ///
980    /// # 性能优势
981    /// - 批处理减少函数调用开销
982    /// - 所有延迟在调用时从 current_tick 重新计算
983    ///
984    /// # 注意
985    /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
986    #[inline]
987    pub fn postpone_batch(
988        &mut self,
989        updates: Vec<(TaskId, Duration)>,
990    ) -> usize {
991        let mut postponed_count = 0;
992        
993        for (task_id, new_delay) in updates {
994            if self.postpone(task_id, new_delay, None) {
995                postponed_count += 1;
996            }
997        }
998        
999        postponed_count
1000    }
1001
1002    /// Batch postpone timer tasks (replace callbacks)
1003    ///
1004    /// # Parameters
1005    /// - `updates`: List of tuples of (task ID, new delay, new callback)
1006    ///
1007    /// # Returns
1008    /// Number of successfully postponed tasks
1009    /// 
1010    /// 批量延期定时器任务(替换回调)
1011    ///
1012    /// # 参数
1013    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1014    ///
1015    /// # 返回值
1016    /// 成功延期的任务数量
1017    pub fn postpone_batch_with_callbacks(
1018        &mut self,
1019        updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1020    ) -> usize {
1021        let mut postponed_count = 0;
1022        
1023        for (task_id, new_delay, new_callback) in updates {
1024            if self.postpone(task_id, new_delay, new_callback) {
1025                postponed_count += 1;
1026            }
1027        }
1028        
1029        postponed_count
1030    }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035    use super::*;
1036    use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1037
1038    #[test]
1039    fn test_wheel_creation() {
1040        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1041        assert_eq!(wheel.slot_count(), 512);
1042        assert_eq!(wheel.current_tick(), 0);
1043        assert!(wheel.is_empty());
1044    }
1045
1046    #[test]
1047    fn test_hierarchical_wheel_creation() {
1048        let config = WheelConfig::default();
1049        
1050        let wheel = Wheel::new(config, BatchConfig::default());
1051        assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1052        assert_eq!(wheel.current_tick(), 0);
1053        assert!(wheel.is_empty());
1054        // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1055        assert_eq!(wheel.l1.slot_count, 64);
1056        assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1057    }
1058
1059    #[test]
1060    fn test_hierarchical_config_validation() {
1061        // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1062        let result = WheelConfig::builder()
1063            .l0_tick_duration(Duration::from_millis(10))
1064            .l0_slot_count(512)
1065            .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1066            .l1_slot_count(64)
1067            .build();
1068        
1069        assert!(result.is_err());
1070        
1071        // Correct configuration (正确的配置)
1072        let result = WheelConfig::builder()
1073            .l0_tick_duration(Duration::from_millis(10))
1074            .l0_slot_count(512)
1075            .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1076            .l1_slot_count(64)
1077            .build();
1078        
1079        assert!(result.is_ok());
1080    }
1081
1082    #[test]
1083    fn test_layer_determination() {
1084        let config = WheelConfig::default();
1085        
1086        let wheel = Wheel::new(config, BatchConfig::default());
1087        
1088        // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1089        // L0: 512 slots * 10ms = 5120ms
1090        let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1091        assert_eq!(level, 0);
1092        assert_eq!(rounds, 0);
1093        
1094        // Long delay should enter L1 layer
1095        // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1096        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1097        assert_eq!(level, 1);
1098        assert_eq!(rounds, 0);
1099        
1100        // Long delay should enter L1 layer and have rounds
1101        // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1102        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1103        assert_eq!(level, 1);
1104        assert!(rounds > 0);
1105    }
1106
1107    #[test]
1108    fn test_hierarchical_insert_and_advance() {
1109        let config = WheelConfig::default();
1110        
1111        let mut wheel = Wheel::new(config, BatchConfig::default());
1112        
1113        // Insert short delay task into L0 (插入短延迟任务到 L0)
1114        let callback = CallbackWrapper::new(|| async {});
1115        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1116        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1117        let task_id = wheel.insert(task_with_notifier);
1118        
1119        // Verify task is in L0 layer (验证任务在 L0 层)
1120        let location = wheel.task_index.get(&task_id).unwrap();
1121        assert_eq!(location.level, 0);
1122        
1123        // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1124        for _ in 0..10 {
1125            let expired = wheel.advance();
1126            if !expired.is_empty() {
1127                assert_eq!(expired.len(), 1);
1128                assert_eq!(expired[0].id, task_id);
1129                return;
1130            }
1131        }
1132        panic!("Task should have expired");
1133    }
1134
1135    #[test]
1136    fn test_hierarchical_l1_to_l0_demotion() {
1137        let config = WheelConfig::builder()
1138            .l0_tick_duration(Duration::from_millis(10))
1139            .l0_slot_count(512)
1140            .l1_tick_duration(Duration::from_millis(100)) // L1 tick = 100ms (L1 tick = 100毫秒)
1141            .l1_slot_count(64)
1142            .build()
1143            .unwrap();
1144        
1145        let mut wheel = Wheel::new(config, BatchConfig::default());
1146        let l1_tick_ratio = wheel.l1_tick_ratio;
1147        assert_eq!(l1_tick_ratio, 10); // 100ms / 10ms = 10 (100毫秒 / 10毫秒 = 10)
1148        
1149        // Insert task, delay 6000ms (exceeds L0 range 5120ms) (插入任务,延迟 6000毫秒 (超过 L0 范围 5120毫秒))
1150        let callback = CallbackWrapper::new(|| async {});
1151        let task = TimerTask::new_oneshot(Duration::from_millis(6000), Some(callback));
1152        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1153        let task_id = wheel.insert(task_with_notifier);
1154        
1155        // Verify task is in L1 layer (验证任务在 L1 层)
1156        let location = wheel.task_index.get(&task_id).unwrap();
1157        assert_eq!(location.level, 1);
1158        
1159        // Advance to L1 slot expiration (6000ms / 100ms = 60 L1 ticks) (前进到 L1 槽过期 (6000毫秒 / 100毫秒 = 60个L1 tick))
1160        // 60 L1 ticks = 600 L0 ticks (60个L1 tick = 600个L0 tick)
1161        let mut demoted = false;
1162        for i in 0..610 {
1163            wheel.advance();
1164            
1165            // Check if task is demoted to L0 (检查任务是否降级到 L0)
1166            if let Some(location) = wheel.task_index.get(&task_id) {
1167                if location.level == 0 && !demoted {
1168                    demoted = true;
1169                    println!("Task demoted to L0 at L0 tick {}", i); // 任务降级到 L0 在 L0 tick {i}
1170                }
1171            }
1172        }
1173        
1174        assert!(demoted, "Task should have been demoted from L1 to L0"); // 任务应该从 L1 降级到 L0
1175    }
1176
1177    #[test]
1178    fn test_cross_layer_cancel() {
1179        let config = WheelConfig::default();
1180        
1181        let mut wheel = Wheel::new(config, BatchConfig::default());
1182        
1183        // Insert L0 task (插入 L0 任务)
1184        let callback1 = CallbackWrapper::new(|| async {});
1185        let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1186        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1187        let task_id1 = wheel.insert(task_with_notifier1);
1188        
1189        // Insert L1 task (插入 L1 任务)
1190        let callback2 = CallbackWrapper::new(|| async {});
1191        let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1192        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1193        let task_id2 = wheel.insert(task_with_notifier2);
1194        
1195        // Verify levels (验证层级)
1196        assert_eq!(wheel.task_index.get(&task_id1).unwrap().level, 0);
1197        assert_eq!(wheel.task_index.get(&task_id2).unwrap().level, 1);
1198        
1199        // Cancel L0 task (取消 L0 任务)
1200        assert!(wheel.cancel(task_id1));
1201        assert!(wheel.task_index.get(&task_id1).is_none());
1202        
1203        // Cancel L1 task (取消 L1 任务)
1204        assert!(wheel.cancel(task_id2));
1205        assert!(wheel.task_index.get(&task_id2).is_none());
1206        
1207        assert!(wheel.is_empty()); // 时间轮应该为空
1208    }
1209
1210    #[test]
1211    fn test_cross_layer_postpone() {
1212        let config = WheelConfig::default();
1213        
1214        let mut wheel = Wheel::new(config, BatchConfig::default());
1215        
1216        // Insert L0 task (100ms) (插入 L0 任务 (100毫秒))
1217        let callback = CallbackWrapper::new(|| async {});
1218        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1219        let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1220        let task_id = wheel.insert(task_with_notifier);
1221        
1222        // Verify in L0 layer (验证在 L0 层)
1223        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
1224        
1225        // Postpone to 10 seconds (should migrate to L1) (延期到 10 秒 (应该迁移到 L1))
1226        assert!(wheel.postpone(task_id, Duration::from_secs(10), None));
1227        
1228        // Verify migrated to L1 layer (验证迁移到 L1 层)
1229        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 1);
1230        
1231        // Postpone back to 200ms (should migrate back to L0) (延期回 200毫秒 (应该迁移回 L0))
1232        assert!(wheel.postpone(task_id, Duration::from_millis(200), None));
1233        
1234        // Verify migrated back to L0 layer (验证迁移回 L0 层)
1235        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
1236    }
1237
1238    #[test]
1239    fn test_delay_to_ticks() {
1240        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1241        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1242        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1243        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1244    }
1245
1246    #[test]
1247    fn test_wheel_invalid_slot_count() {
1248        let result = WheelConfig::builder()
1249            .l0_slot_count(100)
1250            .build();
1251        assert!(result.is_err());
1252        if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1253            assert_eq!(slot_count, 100);
1254            assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1255        } else {
1256            panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1257        }
1258    }
1259
1260    #[test]
1261    fn test_insert_batch() {
1262        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1263        
1264        // Create batch tasks (创建批量任务)
1265        let tasks: Vec<TimerTaskWithCompletionNotifier> = (0..10)
1266            .map(|i| {
1267                let callback = CallbackWrapper::new(|| async {});
1268                let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
1269                let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1270                task_with_notifier
1271            })
1272            .collect();
1273        
1274        let task_ids = wheel.insert_batch(tasks);
1275        
1276        assert_eq!(task_ids.len(), 10);
1277        assert!(!wheel.is_empty());
1278    }
1279
1280    #[test]
1281    fn test_cancel_batch() {
1282        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1283        
1284        // Insert multiple tasks
1285        let mut task_ids = Vec::new();
1286        for i in 0..10 {
1287            let callback = CallbackWrapper::new(|| async {});
1288            let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
1289            let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1290            let task_id = wheel.insert(task_with_notifier);
1291            task_ids.push(task_id);
1292        }
1293        
1294        assert_eq!(task_ids.len(), 10);
1295        
1296        // Batch cancel first 5 tasks
1297        let to_cancel = &task_ids[0..5];
1298        let cancelled_count = wheel.cancel_batch(to_cancel);
1299        
1300        assert_eq!(cancelled_count, 5);
1301        
1302        // Try to cancel the same tasks again, should return 0
1303        let cancelled_again = wheel.cancel_batch(to_cancel);
1304        assert_eq!(cancelled_again, 0);
1305        
1306        // Cancel remaining tasks
1307        let remaining = &task_ids[5..10];
1308        let cancelled_remaining = wheel.cancel_batch(remaining);
1309        assert_eq!(cancelled_remaining, 5);
1310        
1311        assert!(wheel.is_empty());
1312    }
1313
1314    #[test]
1315    fn test_batch_operations_same_slot() {
1316        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1317        
1318        // Insert multiple tasks with the same delay (will enter the same slot) (插入多个任务,延迟相同,将进入同一个槽)
1319        let mut task_ids = Vec::new();
1320        for _ in 0..20 {
1321            let callback = CallbackWrapper::new(|| async {});
1322            let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1323            let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1324            let task_id = wheel.insert(task_with_notifier);
1325            task_ids.push(task_id);
1326        }
1327        
1328        // Batch cancel all tasks (批量取消所有任务)
1329        let cancelled_count = wheel.cancel_batch(&task_ids);
1330        assert_eq!(cancelled_count, 20);
1331        assert!(wheel.is_empty());
1332    }
1333
1334    #[test]
1335    fn test_postpone_single_task() {
1336        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1337        
1338        // Insert task, delay 100ms (插入任务,延迟 100毫秒)
1339        let callback = CallbackWrapper::new(|| async {});
1340        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1341        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1342        let task_id = wheel.insert(task_with_notifier);
1343        
1344        // Postpone task to 200ms (keep original callback) (延期任务到 200毫秒 (保留原始回调))
1345        let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1346        assert!(postponed);
1347        
1348        // Verify task is still in the timing wheel (验证任务仍在时间轮中)
1349        assert!(!wheel.is_empty());
1350        
1351        // Advance 100ms (10 ticks), task should not trigger (前进 100毫秒 (10个tick),任务不应该触发)
1352        for _ in 0..10 {
1353            let expired = wheel.advance();
1354            assert!(expired.is_empty());
1355        }
1356        
1357        // Advance 100ms (10 ticks), task should trigger (前进 100毫秒 (10个tick),任务应该触发)
1358        let mut triggered = false;
1359        for _ in 0..10 {
1360            let expired = wheel.advance();
1361            if !expired.is_empty() {
1362                assert_eq!(expired.len(), 1);
1363                assert_eq!(expired[0].id, task_id);
1364                triggered = true;
1365                break;
1366            }
1367        }
1368        assert!(triggered);
1369    }
1370
1371    #[test]
1372    fn test_postpone_with_new_callback() {
1373        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1374        
1375        // Insert task, with original callback (插入任务,原始回调)
1376        let old_callback = CallbackWrapper::new(|| async {});
1377        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(old_callback.clone()));
1378        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1379        let task_id = wheel.insert(task_with_notifier);
1380        
1381        // Postpone task and replace callback (延期任务并替换回调)
1382        let new_callback = CallbackWrapper::new(|| async {});
1383        let postponed = wheel.postpone(task_id, Duration::from_millis(50), Some(new_callback));
1384        assert!(postponed);
1385        
1386        // Advance 50ms (5 ticks), task should trigger (前进 50毫秒 (5个tick),任务应该触发)
1387        // 注意:任务在第 5 个 tick 触发(current_tick 从 0 推进到 5)
1388        let mut triggered = false;
1389        for i in 0..5 {
1390            let expired = wheel.advance();
1391            if !expired.is_empty() {
1392                assert_eq!(expired.len(), 1, "On the {}th advance, there should be 1 task triggered", i + 1);
1393                assert_eq!(expired[0].id, task_id);
1394                triggered = true;
1395                break;
1396            }
1397        }
1398        assert!(triggered, "Task should be triggered within 5 ticks"); // 任务应该在 5 个 tick 内触发
1399    }
1400
1401    #[test]
1402    fn test_postpone_nonexistent_task() {
1403        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1404        
1405        // Try to postpone nonexistent task (尝试延期不存在任务)
1406        let fake_task_id = TaskId::new();
1407        let postponed = wheel.postpone(fake_task_id, Duration::from_millis(100), None);
1408        assert!(!postponed);
1409    }
1410
1411    #[test]
1412    fn test_postpone_batch() {
1413        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1414        
1415        // Insert 5 tasks, delay 50ms (5 ticks) (插入 5 个任务,延迟 50毫秒 (5个tick))
1416        let mut task_ids = Vec::new();
1417        for _ in 0..5 {
1418            let callback = CallbackWrapper::new(|| async {});
1419            let task = TimerTask::new_oneshot(Duration::from_millis(50), Some(callback));
1420            let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1421            let task_id = wheel.insert(task_with_notifier);
1422            task_ids.push(task_id);
1423        }
1424        
1425        // Batch postpone all tasks to 150ms (15 ticks) (批量延期所有任务到 150毫秒 (15个tick))
1426        let updates: Vec<_> = task_ids
1427            .iter()
1428            .map(|&id| (id, Duration::from_millis(150)))
1429            .collect();
1430        let postponed_count = wheel.postpone_batch(updates);
1431        assert_eq!(postponed_count, 5);
1432        
1433        // Advance 5 ticks (50ms), task should not trigger (前进 50毫秒 (5个tick),任务不应该触发)
1434        for _ in 0..5 {
1435            let expired = wheel.advance();
1436            assert!(expired.is_empty(), "The first 5 ticks should not have tasks triggered");
1437        }
1438        
1439        // Continue advancing 10 ticks (from tick 5 to tick 15), all tasks should trigger on the 15th tick (继续前进 10个tick (从tick 5到tick 15),所有任务应该在第 15 个 tick 触发)
1440        let mut total_triggered = 0;
1441        for _ in 0..10 {
1442            let expired = wheel.advance();
1443            total_triggered += expired.len();
1444        }
1445        assert_eq!(total_triggered, 5, "There should be 5 tasks triggered on the 15th tick"); // 第 15 个 tick 应该有 5 个任务触发
1446    }
1447
1448    #[test]
1449    fn test_postpone_batch_partial() {
1450        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1451        
1452        // Insert 10 tasks, delay 50ms (5 ticks) (插入 10 个任务,延迟 50毫秒 (5个tick))
1453        let mut task_ids = Vec::new();
1454        for _ in 0..10 {
1455            let callback = CallbackWrapper::new(|| async {});
1456            let task = TimerTask::new_oneshot(Duration::from_millis(50), Some(callback));
1457            let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1458            let task_id = wheel.insert(task_with_notifier);
1459            task_ids.push(task_id);
1460        }
1461        
1462        // Only postpone the first 5 tasks to 150ms, including a nonexistent task (只延期前 5 个任务到 150毫秒,包括一个不存在任务)
1463        let fake_task_id = TaskId::new();
1464        let mut updates: Vec<_> = task_ids[0..5]
1465            .iter()
1466            .map(|&id| (id, Duration::from_millis(150)))
1467            .collect();
1468        updates.push((fake_task_id, Duration::from_millis(150)));
1469        
1470        let postponed_count = wheel.postpone_batch(updates);
1471        assert_eq!(postponed_count, 5, "There should be 5 tasks successfully postponed (fake_task_id failed)"); // 应该有 5 个任务成功延期 (fake_task_id 失败)
1472        
1473        // Advance 5 ticks (50ms), the last 5 tasks that were not postponed should trigger (前进 50毫秒 (5个tick),最后一个没有延期的任务应该触发)
1474        let mut triggered_at_50ms = 0;
1475        for _ in 0..5 {
1476            let expired = wheel.advance();
1477            triggered_at_50ms += expired.len();
1478        }
1479        assert_eq!(triggered_at_50ms, 5, "There should be 5 tasks that were not postponed triggered on the 5th tick"); // 第 5 个 tick 应该有 5 个任务没有延期触发
1480        
1481        // Continue advancing 10 ticks (from tick 5 to tick 15), the first 5 tasks that were postponed should trigger (继续前进 10个tick (从tick 5到tick 15),第一个 5 个任务应该触发)
1482        let mut triggered_at_150ms = 0;
1483        for _ in 0..10 {
1484            let expired = wheel.advance();
1485            triggered_at_150ms += expired.len();
1486        }
1487        assert_eq!(triggered_at_150ms, 5, "There should be 5 tasks that were postponed triggered on the 15th tick"); // 第 15 个 tick 应该有 5 个任务延期触发
1488    }
1489
1490    #[test]
1491    fn test_multi_round_tasks() {
1492        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1493        
1494        // Test multi-round tasks in L1 layer in hierarchical mode (在分层模式下测试 L1 层的多轮任务)
1495        // L1: 64 slots * 1000ms = 64000ms
1496        // Insert a task that exceeds one L1 circle, delay 120000ms (120 seconds) (插入一个超过一个 L1 圈的任务,延迟 120000毫秒 (120秒))
1497        let callback = CallbackWrapper::new(|| async {});
1498        let task = TimerTask::new_oneshot(Duration::from_secs(120), Some(callback));
1499        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1500        let task_id = wheel.insert(task_with_notifier);
1501        
1502        // 120000ms / 1000ms = 120 L1 ticks (120000毫秒 / 1000毫秒 = 120个L1 tick)
1503        // 120 ticks / 64 slots = 1 round + 56 ticks (120个tick / 64个槽 = 1轮 + 56个tick)
1504        // Task should be in L1 layer, slot 56, rounds = 1 (任务应该在 L1 层,槽 56,轮数 = 1)
1505        
1506        // Verify task is in L1 layer (验证任务在 L1 层)
1507        let location = wheel.task_index.get(&task_id).unwrap();
1508        assert_eq!(location.level, 1);
1509        
1510        // L1 tick advances every 100 L0 ticks (L1 tick 每 100 个 L0 tick 前进一次)
1511        // Advance 64 * 100 = 6400 L0 ticks (complete the first round of L1) (前进 64 * 100 = 6400 L0 tick (完成 L1 的第一轮))
1512        for _ in 0..6400 {
1513            let _expired = wheel.advance();
1514            // During the first round of L1, the task should not be demoted or triggered (在 L1 的第一轮中,任务不应该被降级或触发)
1515        }
1516        
1517        // Task should still be in L1 layer, but rounds has decreased (任务应该仍在 L1 层,但轮数已经减少)
1518        let location = wheel.task_index.get(&task_id);
1519        if let Some(loc) = location {
1520            assert_eq!(loc.level, 1);
1521        }
1522        
1523        // Continue advancing until the task is triggered (approximately another 56 * 100 = 5600 L0 ticks) (继续前进,直到任务被触发 (大约另一个 56 * 100 = 5600 L0 tick))
1524        let mut triggered = false;
1525        for _ in 0..6000 {
1526            let expired = wheel.advance();
1527            if expired.iter().any(|t| t.id == task_id) {
1528                triggered = true;
1529                break;
1530            }
1531        }
1532        assert!(triggered, "Task should be triggered in the second round of L1"); // 任务应该在 L1 的第二轮中触发
1533    }
1534
1535    #[test]
1536    fn test_minimum_delay() {
1537        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1538        
1539        // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1540        let callback = CallbackWrapper::new(|| async {});
1541        let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1542        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1543        let task_id: TaskId = wheel.insert(task_with_notifier);
1544        
1545        // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1546        let expired = wheel.advance();
1547        assert_eq!(expired.len(), 1, "Minimum delay task should be triggered after 1 tick"); // 最小延迟任务应该在 1 个 tick 后触发
1548        assert_eq!(expired[0].id, task_id);
1549    }
1550
1551    #[test]
1552    fn test_empty_batch_operations() {
1553        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1554        
1555        // Test empty batch insert (测试空批量插入)
1556        let task_ids = wheel.insert_batch(vec![]);
1557        assert_eq!(task_ids.len(), 0);
1558        
1559        // Test empty batch cancel (测试空批量取消)
1560        let cancelled = wheel.cancel_batch(&[]);
1561        assert_eq!(cancelled, 0);
1562        
1563        // Test empty batch postpone (测试空批量延期)
1564        let postponed = wheel.postpone_batch(vec![]);
1565        assert_eq!(postponed, 0);
1566    }
1567
1568    #[test]
1569    fn test_postpone_same_task_multiple_times() {
1570        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1571        
1572        // Insert task (插入任务)
1573        let callback = CallbackWrapper::new(|| async {});
1574        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1575        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1576        let task_id = wheel.insert(task_with_notifier);
1577        
1578        // First postpone (第一次延期)
1579        let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1580        assert!(postponed, "First postpone should succeed");
1581        
1582        // Second postpone (第二次延期)
1583        let postponed = wheel.postpone(task_id, Duration::from_millis(300), None);
1584        assert!(postponed, "Second postpone should succeed");
1585        
1586        // Third postpone (第三次延期)  
1587        let postponed = wheel.postpone(task_id, Duration::from_millis(50), None);
1588        assert!(postponed, "Third postpone should succeed");
1589        
1590        // Verify task is triggered at the last postpone time (50ms = 5 ticks) (验证任务在最后一次延期时触发 (50毫秒 = 5个tick))
1591        let mut triggered = false;
1592        for _ in 0..5 {
1593            let expired = wheel.advance();
1594            if !expired.is_empty() {
1595                assert_eq!(expired.len(), 1);
1596                assert_eq!(expired[0].id, task_id);
1597                triggered = true;
1598                break;
1599            }
1600        }
1601        assert!(triggered, "Task should be triggered at the last postpone time"); // 任务应该在最后一次延期时触发
1602    }
1603
1604    #[test]
1605    fn test_advance_empty_slots() {
1606        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1607        
1608        // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1609        for _ in 0..100 {
1610            let expired = wheel.advance();
1611            assert!(expired.is_empty(), "Empty slots should not return any tasks");
1612        }
1613        
1614        assert_eq!(wheel.current_tick(), 100, "current_tick should correctly increment"); // current_tick 应该正确递增
1615    }
1616
1617    #[test]
1618    fn test_cancel_after_postpone() {
1619        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1620        
1621        // Insert task (插入任务)
1622        let callback = CallbackWrapper::new(|| async {});
1623        let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1624        let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1625        let task_id = wheel.insert(task_with_notifier);
1626        
1627        // Postpone task (延期任务)
1628        let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1629        assert!(postponed, "Postpone should succeed");
1630        
1631        // Cancel postponed task (取消延期的任务)
1632        let cancelled = wheel.cancel(task_id);
1633        assert!(cancelled, "Cancel should succeed");
1634        
1635        // Advance to original time, task should not trigger (前进到原始时间,任务不应该触发)
1636        for _ in 0..20 {
1637            let expired = wheel.advance();
1638            assert!(expired.is_empty(), "Cancelled task should not trigger"); // 取消的任务不应该触发
1639        }
1640        
1641        assert!(wheel.is_empty(), "Wheel should be empty"); // 时间轮应该为空
1642    }
1643
1644    #[test]
1645    fn test_slot_boundary() {
1646        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1647        
1648        // Test slot boundary and wraparound (测试槽边界和环绕)
1649        // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1650        let callback1 = CallbackWrapper::new(|| async {});
1651        let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1652        let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1653        let task_id_1 = wheel.insert(task_with_notifier1);
1654        
1655        // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发) 
1656        let callback2 = CallbackWrapper::new(|| async {});
1657        let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1658        let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1659        let task_id_2 = wheel.insert(task_with_notifier2);
1660        
1661        // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1662        let expired = wheel.advance();
1663        assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1664        assert_eq!(expired[0].id, task_id_1);
1665        
1666        // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1667        let mut triggered = false;
1668        for i in 0..510 {
1669            let expired = wheel.advance();
1670            if !expired.is_empty() {
1671                assert_eq!(expired.len(), 1, "The {}th advance should trigger the second task", i + 2); // 第 {i + 2} 次前进应该触发第二个任务
1672                assert_eq!(expired[0].id, task_id_2);
1673                triggered = true;
1674                break;
1675            }
1676        }
1677        assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1678        
1679        assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1680    }
1681
1682    #[test]
1683    fn test_batch_cancel_small_threshold() {
1684        // Test small batch threshold optimization (测试小批量阈值优化)
1685        let batch_config = BatchConfig {
1686            small_batch_threshold: 5,
1687        };
1688        let mut wheel = Wheel::new(WheelConfig::default(), batch_config);
1689        
1690        // Insert 10 tasks (插入 10 个任务)
1691        let mut task_ids = Vec::new();
1692        for _ in 0..10 {
1693            let callback = CallbackWrapper::new(|| async {});
1694            let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1695            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1696            let task_id = wheel.insert(task_with_notifier);
1697            task_ids.push(task_id);
1698        }
1699        
1700        // Small batch cancel (should use direct cancel path) (小批量取消 (应该使用直接取消路径))
1701        let cancelled = wheel.cancel_batch(&task_ids[0..3]);
1702        assert_eq!(cancelled, 3);
1703        
1704        // Large batch cancel (should use grouped optimization path) (大批量取消 (应该使用分组优化路径))
1705        let cancelled = wheel.cancel_batch(&task_ids[3..10]);
1706        assert_eq!(cancelled, 7);
1707        
1708        assert!(wheel.is_empty()); // 时间轮应该为空
1709    }
1710
1711    #[test]
1712    fn test_task_id_uniqueness() {
1713        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1714        
1715        // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1716        let mut task_ids = std::collections::HashSet::new();
1717        for _ in 0..100 {
1718            let callback = CallbackWrapper::new(|| async {});
1719            let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1720            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1721            let task_id = wheel.insert(task_with_notifier);
1722            
1723            assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1724        }
1725        
1726        assert_eq!(task_ids.len(), 100);
1727    }
1728
1729    #[test]
1730    fn test_periodic_task_basic() {
1731        use crate::task::CompletionReceiver;
1732        
1733        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1734        
1735        // Insert periodic task with 50ms interval (插入 50毫秒间隔的周期性任务)
1736        let callback = CallbackWrapper::new(|| async {});
1737        let task = TimerTask::new_periodic(
1738            Duration::from_millis(50),  // initial delay
1739            Duration::from_millis(50),  // interval
1740            Some(callback),
1741            None
1742        );
1743        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1744        let task_id = wheel.insert(task_with_notifier);
1745        
1746        let mut rx = match completion_receiver {
1747            CompletionReceiver::Periodic(receiver) => receiver,
1748            _ => panic!("Expected periodic completion receiver"),
1749        };
1750        
1751        // Advance 5 ticks (50ms), task should trigger for the first time
1752        // 前进 5 个 tick (50毫秒),任务应该第一次触发
1753        for _ in 0..5 {
1754            wheel.advance();
1755        }
1756        
1757        // Check notification
1758        // 检查通知
1759        assert!(rx.try_recv().is_ok(), "Should receive first notification");
1760        
1761        // Verify task is still in the wheel (reinserted)
1762        // 验证任务仍在时间轮中(已重新插入)
1763        assert!(!wheel.is_empty(), "Periodic task should be reinserted");
1764        assert!(wheel.task_index.contains_key(&task_id), "Task should still be in index");
1765        
1766        // Advance another 5 ticks (50ms), task should trigger again
1767        // 再前进 5 个 tick (50毫秒),任务应该再次触发
1768        for _ in 0..5 {
1769            wheel.advance();
1770        }
1771        
1772        // Check second notification
1773        // 检查第二次通知
1774        assert!(rx.try_recv().is_ok(), "Should receive second notification");
1775        assert!(!wheel.is_empty(), "Periodic task should still be in the wheel");
1776        
1777        // Cancel the periodic task
1778        // 取消周期性任务
1779        assert!(wheel.cancel(task_id), "Should be able to cancel periodic task");
1780        assert!(wheel.is_empty(), "Wheel should be empty after cancellation");
1781    }
1782
1783    #[test]
1784    fn test_periodic_task_cancel() {
1785        use crate::task::{TaskCompletion, CompletionReceiver};
1786        
1787        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1788        
1789        // Insert periodic task (插入周期性任务)
1790        let callback = CallbackWrapper::new(|| async {});
1791        let task = TimerTask::new_periodic(
1792            Duration::from_millis(100),
1793            Duration::from_millis(100),
1794            Some(callback),
1795            None
1796        );
1797        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1798        let task_id = wheel.insert(task_with_notifier);
1799        
1800        let mut rx = match completion_receiver {
1801            CompletionReceiver::Periodic(receiver) => receiver,
1802            _ => panic!("Expected periodic completion receiver"),
1803        };
1804        
1805        // Cancel immediately (立即取消)
1806        assert!(wheel.cancel(task_id), "Should successfully cancel");
1807        
1808        // Check cancellation notification
1809        // 检查取消通知
1810        if let Ok(reason) = rx.try_recv() {
1811            assert_eq!(reason, TaskCompletion::Cancelled, "Should receive Cancelled notification");
1812        } else {
1813            panic!("Should receive cancellation notification");
1814        }
1815        
1816        // Verify wheel is empty
1817        // 验证时间轮为空
1818        assert!(wheel.is_empty(), "Wheel should be empty");
1819        
1820        // Advance and verify no tasks expire
1821        // 前进并验证没有任务过期
1822        for _ in 0..20 {
1823            let expired = wheel.advance();
1824            assert!(expired.is_empty(), "No tasks should expire after cancellation");
1825        }
1826    }
1827
1828    #[test]
1829    fn test_periodic_task_multiple_triggers() {
1830        use crate::task::CompletionReceiver;
1831        
1832        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1833        
1834        // Insert periodic task with 30ms interval (插入 30毫秒间隔的周期性任务)
1835        let callback = CallbackWrapper::new(|| async {});
1836        let task = TimerTask::new_periodic(
1837            Duration::from_millis(30),
1838            Duration::from_millis(30),
1839            Some(callback),
1840            None
1841        );
1842        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1843        let task_id = wheel.insert(task_with_notifier);
1844        
1845        let mut rx = match completion_receiver {
1846            CompletionReceiver::Periodic(receiver) => receiver,
1847            _ => panic!("Expected periodic completion receiver"),
1848        };
1849        
1850        // Advance and count triggers (前进并计数触发次数)
1851        let mut trigger_count = 0;
1852        for _ in 0..100 {
1853            wheel.advance();
1854            
1855            // Collect all notifications
1856            // 收集所有通知
1857            while let Ok(_) = rx.try_recv() {
1858                trigger_count += 1;
1859            }
1860        }
1861        
1862        // Task should trigger approximately 100ms / 30ms = 3 times
1863        // 任务应该触发大约 100毫秒 / 30毫秒 = 3 次
1864        assert!(trigger_count >= 3, "Should trigger at least 3 times, got {}", trigger_count);
1865        
1866        // Cancel the task
1867        // 取消任务
1868        wheel.cancel(task_id);
1869    }
1870
1871    #[test]
1872    fn test_periodic_task_cross_layer() {
1873        use crate::task::CompletionReceiver;
1874        
1875        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1876        
1877        // Insert periodic task with long interval (exceeds L0 range)
1878        // 插入长间隔的周期性任务(超过 L0 范围)
1879        // L0 range: 512 slots * 10ms = 5120ms
1880        let callback = CallbackWrapper::new(|| async {});
1881        let task = TimerTask::new_periodic(
1882            Duration::from_secs(10),  // 10000ms > 5120ms
1883            Duration::from_secs(10),
1884            Some(callback),
1885            None
1886        );
1887        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1888        let task_id = wheel.insert(task_with_notifier);
1889        
1890        let mut rx = match completion_receiver {
1891            CompletionReceiver::Periodic(receiver) => receiver,
1892            _ => panic!("Expected periodic completion receiver"),
1893        };
1894        
1895        // Verify task is in L1 layer
1896        // 验证任务在 L1 层
1897        let location = wheel.task_index.get(&task_id).unwrap();
1898        assert_eq!(location.level, 1, "Long interval task should be in L1");
1899        
1900        // Advance to trigger the task (10000ms / 10ms = 1000 ticks)
1901        // 前进到触发任务 (10000毫秒 / 10毫秒 = 1000个tick)
1902        for _ in 0..1001 {
1903            wheel.advance();
1904        }
1905        
1906        // Check notification
1907        // 检查通知
1908        assert!(rx.try_recv().is_ok(), "Should receive notification");
1909        
1910        // Verify task is reinserted (should be in L1 again)
1911        // 验证任务已重新插入(应该再次在 L1 中)
1912        assert!(wheel.task_index.contains_key(&task_id), "Task should be reinserted");
1913        let location = wheel.task_index.get(&task_id).unwrap();
1914        assert_eq!(location.level, 1, "Reinserted task should still be in L1");
1915        
1916        // Cancel the task
1917        // 取消任务
1918        wheel.cancel(task_id);
1919    }
1920
1921    #[test]
1922    fn test_periodic_task_batch_cancel() {
1923        use crate::task::{TaskCompletion, CompletionReceiver};
1924        
1925        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1926        
1927        // Insert multiple periodic tasks (插入多个周期性任务)
1928        let mut task_ids = Vec::new();
1929        let mut receivers = Vec::new();
1930        
1931        for i in 0..5 {
1932            let callback = CallbackWrapper::new(|| async {});
1933            let task = TimerTask::new_periodic(
1934                Duration::from_millis(100 + i * 10),
1935                Duration::from_millis(100),
1936                Some(callback),
1937                None
1938            );
1939            let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1940            let task_id = wheel.insert(task_with_notifier);
1941            task_ids.push(task_id);
1942            
1943            if let CompletionReceiver::Periodic(rx) = completion_receiver {
1944                receivers.push(rx);
1945            }
1946        }
1947        
1948        // Batch cancel all tasks (批量取消所有任务)
1949        let cancelled_count = wheel.cancel_batch(&task_ids);
1950        assert_eq!(cancelled_count, 5, "Should cancel all 5 tasks");
1951        
1952        // Verify all receive cancellation notifications
1953        // 验证所有接收到取消通知
1954        for mut rx in receivers {
1955            if let Ok(reason) = rx.try_recv() {
1956                assert_eq!(reason, TaskCompletion::Cancelled);
1957            } else {
1958                panic!("Should receive cancellation notification");
1959            }
1960        }
1961        
1962        assert!(wheel.is_empty(), "Wheel should be empty");
1963    }
1964
1965    #[test]
1966    fn test_periodic_task_with_initial_delay() {
1967        use crate::task::CompletionReceiver;
1968        
1969        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1970        
1971        // Insert periodic task with different initial delay and interval
1972        // 插入具有不同初始延迟和间隔的周期性任务
1973        let callback = CallbackWrapper::new(|| async {});
1974        let task = TimerTask::new_periodic(
1975            Duration::from_millis(100),  // initial delay 100ms
1976            Duration::from_millis(50),   // interval 50ms
1977            Some(callback),
1978            None
1979        );
1980        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1981        let task_id = wheel.insert(task_with_notifier);
1982        
1983        let mut rx = match completion_receiver {
1984            CompletionReceiver::Periodic(receiver) => receiver,
1985            _ => panic!("Expected periodic completion receiver"),
1986        };
1987        
1988        // Advance 10 ticks (100ms), task should trigger for the first time
1989        // 前进 10 个 tick (100毫秒),任务应该第一次触发
1990        for _ in 0..10 {
1991            wheel.advance();
1992        }
1993        
1994        assert!(rx.try_recv().is_ok(), "Should receive first notification after initial delay");
1995        
1996        // Advance another 5 ticks (50ms), task should trigger again (using interval)
1997        // 再前进 5 个 tick (50毫秒),任务应该再次触发(使用间隔)
1998        for _ in 0..5 {
1999            wheel.advance();
2000        }
2001        
2002        assert!(rx.try_recv().is_ok(), "Should receive second notification after interval");
2003        
2004        // Cancel the task
2005        // 取消任务
2006        wheel.cancel(task_id);
2007    }
2008
2009    #[test]
2010    fn test_periodic_task_postpone() {
2011        use crate::task::CompletionReceiver;
2012        
2013        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2014        
2015        // Insert periodic task with 50ms interval
2016        // 插入 50毫秒间隔的周期性任务
2017        let callback = CallbackWrapper::new(|| async {});
2018        let task = TimerTask::new_periodic(
2019            Duration::from_millis(50),
2020            Duration::from_millis(50),
2021            Some(callback),
2022            None
2023        );
2024        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2025        let task_id = wheel.insert(task_with_notifier);
2026        
2027        let mut rx = match completion_receiver {
2028            CompletionReceiver::Periodic(receiver) => receiver,
2029            _ => panic!("Expected periodic completion receiver"),
2030        };
2031        
2032        // Postpone the periodic task to 100ms
2033        // 延期周期任务到 100毫秒
2034        assert!(wheel.postpone(task_id, Duration::from_millis(100), None), "Should postpone periodic task");
2035        
2036        // Advance 5 ticks (50ms), task should not trigger yet
2037        // 前进 5 个 tick (50毫秒),任务还不应该触发
2038        for _ in 0..5 {
2039            wheel.advance();
2040        }
2041        assert!(rx.try_recv().is_err(), "Should not receive notification before postponed time");
2042        
2043        // Advance another 5 ticks (total 100ms), task should trigger
2044        // 再前进 5 个 tick (总共 100毫秒),任务应该触发
2045        for _ in 0..5 {
2046            wheel.advance();
2047        }
2048        assert!(rx.try_recv().is_ok(), "Should receive notification at postponed time");
2049        
2050        // Verify task is reinserted and will trigger again at interval (50ms)
2051        // 验证任务已重新插入,并将在间隔时间 (50毫秒) 后再次触发
2052        for _ in 0..5 {
2053            wheel.advance();
2054        }
2055        assert!(rx.try_recv().is_ok(), "Should receive second notification after interval");
2056        
2057        wheel.cancel(task_id);
2058    }
2059
2060    #[test]
2061    fn test_periodic_task_postpone_cross_layer() {
2062        use crate::task::CompletionReceiver;
2063        
2064        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2065        
2066        // Insert periodic task in L0 (short interval)
2067        // 在 L0 层插入周期性任务(短间隔)
2068        let callback = CallbackWrapper::new(|| async {});
2069        let task = TimerTask::new_periodic(
2070            Duration::from_millis(100),
2071            Duration::from_millis(100),
2072            Some(callback),
2073            None
2074        );
2075        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2076        let task_id = wheel.insert(task_with_notifier);
2077        
2078        let mut rx = match completion_receiver {
2079            CompletionReceiver::Periodic(receiver) => receiver,
2080            _ => panic!("Expected periodic completion receiver"),
2081        };
2082        
2083        // Verify task is in L0
2084        // 验证任务在 L0 层
2085        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
2086        
2087        // Postpone to long delay (should migrate to L1)
2088        // 延期到长延迟(应该迁移到 L1)
2089        // L0 range: 512 slots * 10ms = 5120ms
2090        assert!(wheel.postpone(task_id, Duration::from_secs(10), None));
2091        
2092        // Verify task migrated to L1
2093        // 验证任务迁移到 L1
2094        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 1);
2095        
2096        // Postpone back to short delay (should migrate back to L0)
2097        // 延期回短延迟(应该迁移回 L0)
2098        assert!(wheel.postpone(task_id, Duration::from_millis(200), None));
2099        
2100        // Verify task migrated back to L0
2101        // 验证任务迁移回 L0
2102        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
2103        
2104        // Advance to trigger
2105        // 前进到触发
2106        for _ in 0..20 {
2107            wheel.advance();
2108        }
2109        assert!(rx.try_recv().is_ok(), "Should receive notification");
2110        
2111        wheel.cancel(task_id);
2112    }
2113
2114    #[test]
2115    fn test_periodic_task_batch_insert() {
2116        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2117        
2118        // Create batch of periodic tasks
2119        // 创建批量周期性任务
2120        let tasks: Vec<TimerTaskWithCompletionNotifier> = (0..10)
2121            .map(|i| {
2122                let callback = CallbackWrapper::new(|| async {});
2123                let task = TimerTask::new_periodic(
2124                    Duration::from_millis(100 + i * 10),
2125                    Duration::from_millis(50),
2126                    Some(callback),
2127                    None
2128                );
2129                let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2130                task_with_notifier
2131            })
2132            .collect();
2133        
2134        let task_ids = wheel.insert_batch(tasks);
2135        
2136        assert_eq!(task_ids.len(), 10, "Should insert all 10 periodic tasks");
2137        assert_eq!(wheel.task_index.len(), 10, "All tasks should be in index");
2138        
2139        // Advance and verify tasks trigger
2140        // 前进并验证任务触发
2141        for _ in 0..200 {
2142            let _expired = wheel.advance();
2143            // Some tasks should trigger during advancement
2144            // 某些任务应该在推进过程中触发
2145        }
2146        
2147        // All tasks should still be in wheel (periodic tasks reinsert)
2148        // 所有任务应该仍在时间轮中(周期性任务会重新插入)
2149        assert_eq!(wheel.task_index.len(), 10, "All periodic tasks should still be in wheel");
2150        
2151        // Batch cancel all
2152        // 批量取消所有
2153        let cancelled = wheel.cancel_batch(&task_ids);
2154        assert_eq!(cancelled, 10, "Should cancel all periodic tasks");
2155        assert!(wheel.is_empty());
2156    }
2157
2158    #[test]
2159    fn test_periodic_task_batch_postpone() {
2160        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2161        
2162        // Insert multiple periodic tasks
2163        // 插入多个周期性任务
2164        let mut task_ids = Vec::new();
2165        for _ in 0..5 {
2166            let callback = CallbackWrapper::new(|| async {});
2167            let task = TimerTask::new_periodic(
2168                Duration::from_millis(50),
2169                Duration::from_millis(50),
2170                Some(callback),
2171                None
2172            );
2173            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2174            let task_id = wheel.insert(task_with_notifier);
2175            task_ids.push(task_id);
2176        }
2177        
2178        // Batch postpone all tasks to 150ms
2179        // 批量延期所有任务到 150毫秒
2180        let updates: Vec<_> = task_ids
2181            .iter()
2182            .map(|&id| (id, Duration::from_millis(150)))
2183            .collect();
2184        let postponed_count = wheel.postpone_batch(updates);
2185        assert_eq!(postponed_count, 5, "Should postpone all 5 periodic tasks");
2186        
2187        // Advance 5 ticks (50ms), tasks should not trigger
2188        // 前进 5 个 tick (50毫秒),任务不应该触发
2189        for _ in 0..5 {
2190            let expired = wheel.advance();
2191            assert!(expired.is_empty(), "Tasks should not trigger before postponed time");
2192        }
2193        
2194        // Advance to 15 ticks (150ms), all tasks should trigger
2195        // 前进到 15 个 tick (150毫秒),所有任务应该触发
2196        let mut total_triggered = 0;
2197        for _ in 0..10 {
2198            let expired = wheel.advance();
2199            total_triggered += expired.len();
2200        }
2201        assert_eq!(total_triggered, 5, "All 5 tasks should trigger at postponed time");
2202        
2203        // Clean up
2204        // 清理
2205        wheel.cancel_batch(&task_ids);
2206    }
2207
2208    #[test]
2209    fn test_mixed_oneshot_and_periodic_tasks() {
2210        use crate::task::CompletionReceiver;
2211        
2212        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2213        
2214        // Insert oneshot tasks
2215        // 插入一次性任务
2216        let mut oneshot_ids = Vec::new();
2217        for i in 0..5 {
2218            let callback = CallbackWrapper::new(|| async {});
2219            let task = TimerTask::new_oneshot(Duration::from_millis(100 + i * 10), Some(callback));
2220            let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2221            let task_id = wheel.insert(task_with_notifier);
2222            oneshot_ids.push(task_id);
2223        }
2224        
2225        // Insert periodic tasks
2226        // 插入周期性任务
2227        let mut periodic_ids = Vec::new();
2228        let mut periodic_receivers = Vec::new();
2229        for _ in 0..5 {
2230            let callback = CallbackWrapper::new(|| async {});
2231            let task = TimerTask::new_periodic(
2232                Duration::from_millis(100),
2233                Duration::from_millis(100),
2234                Some(callback),
2235                None
2236            );
2237            let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2238            let task_id = wheel.insert(task_with_notifier);
2239            periodic_ids.push(task_id);
2240            
2241            if let CompletionReceiver::Periodic(rx) = completion_receiver {
2242                periodic_receivers.push(rx);
2243            }
2244        }
2245        
2246        assert_eq!(wheel.task_index.len(), 10, "Should have 10 tasks total");
2247        
2248        // Advance 15 ticks (150ms)
2249        // 前进 15 个 tick (150毫秒)
2250        let mut total_expired = 0;
2251        for _ in 0..15 {
2252            let expired = wheel.advance();
2253            total_expired += expired.len();
2254        }
2255        
2256        // All oneshot tasks (5) + first trigger of periodic tasks (5) = 10
2257        // 所有一次性任务 (5) + 周期性任务的第一次触发 (5) = 10
2258        assert_eq!(total_expired, 10, "Should have triggered oneshot and periodic tasks");
2259        
2260        // Oneshot tasks should be removed, periodic tasks should remain
2261        // 一次性任务应该被移除,周期性任务应该保留
2262        assert_eq!(wheel.task_index.len(), 5, "Only periodic tasks should remain");
2263        
2264        // Verify all oneshot tasks are removed
2265        // 验证所有一次性任务已移除
2266        for id in &oneshot_ids {
2267            assert!(!wheel.task_index.contains_key(id), "Oneshot task should be removed");
2268        }
2269        
2270        // Verify all periodic tasks are still present
2271        // 验证所有周期性任务仍然存在
2272        for id in &periodic_ids {
2273            assert!(wheel.task_index.contains_key(id), "Periodic task should still be present");
2274        }
2275        
2276        // Clean up periodic tasks
2277        // 清理周期性任务
2278        wheel.cancel_batch(&periodic_ids);
2279        assert!(wheel.is_empty());
2280    }
2281
2282    #[test]
2283    fn test_periodic_task_postpone_with_callback() {
2284        use crate::task::CompletionReceiver;
2285        
2286        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
2287        
2288        // Insert periodic task
2289        // 插入周期性任务
2290        let old_callback = CallbackWrapper::new(|| async {});
2291        let task = TimerTask::new_periodic(
2292            Duration::from_millis(50),
2293            Duration::from_millis(50),
2294            Some(old_callback),
2295            None
2296        );
2297        let (task_with_notifier, completion_receiver) = TimerTaskWithCompletionNotifier::from_timer_task(task);
2298        let task_id = wheel.insert(task_with_notifier);
2299        
2300        let mut rx = match completion_receiver {
2301            CompletionReceiver::Periodic(receiver) => receiver,
2302            _ => panic!("Expected periodic completion receiver"),
2303        };
2304        
2305        // Postpone with new callback
2306        // 使用新回调延期
2307        let new_callback = CallbackWrapper::new(|| async {});
2308        assert!(wheel.postpone(task_id, Duration::from_millis(100), Some(new_callback)));
2309        
2310        // Advance to trigger
2311        // 前进到触发
2312        for _ in 0..10 {
2313            wheel.advance();
2314        }
2315        
2316        assert!(rx.try_recv().is_ok(), "Should receive notification with new callback");
2317        
2318        wheel.cancel(task_id);
2319    }
2320}
2321