kestrel_timer/
wheel.rs

1use crate::config::{BatchConfig, WheelConfig};
2use crate::task::{TaskCompletionReason, TaskId, TaskLocation, TimerTask};
3use rustc_hash::FxHashMap;
4use std::time::Duration;
5
6/// Timing wheel single layer data structure
7/// 
8/// 时间轮单层数据结构
9struct WheelLayer {
10    /// Slot array, each slot stores a group of timer tasks
11    /// 
12    /// 槽数组,每个槽存储一组定时器任务
13    slots: Vec<Vec<TimerTask>>,
14    
15    /// Current time pointer (tick index)
16    /// 
17    /// 当前时间指针(tick 索引)
18    current_tick: u64,
19    
20    /// Slot count
21    /// 
22    /// 槽数量
23    slot_count: usize,
24    
25    /// Duration of each tick
26    /// 
27    /// 每个 tick 的持续时间
28    tick_duration: Duration,
29    
30    /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
31    /// 
32    /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
33    tick_duration_ms: u64,
34    
35    /// Cache: slot mask (slot_count - 1) - for fast modulo operation
36    /// 
37    /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
38    slot_mask: usize,
39}
40
41impl WheelLayer {
42    /// Create a new wheel layer
43    /// 
44    /// 创建一个新的时间轮层
45    fn new(slot_count: usize, tick_duration: Duration) -> Self {
46        let mut slots = Vec::with_capacity(slot_count);
47        // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
48        // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
49        // 
50        // 为每个槽预分配容量以减少后续 push 时的重新分配
51        // 大多数槽通常包含 0-4 个任务,预分配容量为 4
52        for _ in 0..slot_count {
53            slots.push(Vec::with_capacity(4));
54        }
55        
56        let tick_duration_ms = tick_duration.as_millis() as u64;
57        let slot_mask = slot_count - 1;
58        
59        Self {
60            slots,
61            current_tick: 0,
62            slot_count,
63            tick_duration,
64            tick_duration_ms,
65            slot_mask,
66        }
67    }
68    
69    /// Calculate the number of ticks corresponding to the delay
70    /// 
71    /// 计算延迟对应的 tick 数量
72    fn delay_to_ticks(&self, delay: Duration) -> u64 {
73        let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
74        ticks.max(1) // at least 1 tick (至少 1 个 tick)
75    }
76}
77
78/// Timing wheel data structure (hierarchical mode)
79/// 
80/// 时间轮数据结构(分层模式)
81pub struct Wheel {
82    /// L0 layer (bottom layer)
83    /// 
84    /// L0 层(底层)
85    l0: WheelLayer,
86    
87    /// L1 layer (top layer)
88    /// 
89    /// L1 层(顶层)
90    l1: WheelLayer,
91    
92    /// L1 tick ratio relative to L0 tick
93    /// 
94    /// L1 tick 相对于 L0 tick 的比率
95    l1_tick_ratio: u64,
96    
97    /// Task index for fast lookup and cancellation
98    /// 
99    /// 任务索引,用于快速查找和取消
100    task_index: FxHashMap<TaskId, TaskLocation>,
101    
102    /// Batch processing configuration
103    /// 
104    /// 批处理配置
105    batch_config: BatchConfig,
106    
107    /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
108    /// 
109    /// 缓存:L0 层容量(毫秒)- 避免重复计算
110    l0_capacity_ms: u64,
111    
112    /// Cache: L1 layer capacity in ticks - avoid repeated calculation
113    /// 
114    /// 缓存:L1 层容量(tick 数)- 避免重复计算
115    l1_capacity_ticks: u64,
116}
117
118impl Wheel {
119    /// Create new timing wheel
120    ///
121    /// # Parameters
122    /// - `config`: Timing wheel configuration (already validated)
123    /// - `batch_config`: Batch processing configuration
124    ///
125    /// # Notes
126    /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
127    /// 
128    /// 创建新的时间轮
129    ///
130    /// # 参数
131    /// - `config`: 时间轮配置(已验证)
132    /// - `batch_config`: 批处理配置
133    ///
134    /// # 注意
135    /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
136    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
137        let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
138        let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
139        
140        // Calculate L1 tick ratio relative to L0 tick
141        // 计算 L1 tick 相对于 L0 tick 的比率
142        let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
143        
144        // Pre-calculate capacity to avoid repeated calculation in insert
145        // 预计算容量,避免在 insert 中重复计算
146        let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
147        let l1_capacity_ticks = l1.slot_count as u64;
148        
149        Self {
150            l0,
151            l1,
152            l1_tick_ratio,
153            task_index: FxHashMap::default(),
154            batch_config,
155            l0_capacity_ms,
156            l1_capacity_ticks,
157        }
158    }
159
160    /// Get current tick (L0 layer tick)
161    /// 
162    /// 获取当前 tick(L0 层 tick)
163    #[allow(dead_code)]
164    pub fn current_tick(&self) -> u64 {
165        self.l0.current_tick
166    }
167
168    /// Get tick duration (L0 layer tick duration)
169    /// 
170    /// 获取 tick 持续时间(L0 层 tick 持续时间)
171    #[allow(dead_code)]
172    pub fn tick_duration(&self) -> Duration {
173        self.l0.tick_duration
174    }
175
176    /// Get slot count (L0 layer slot count)
177    /// 
178    /// 获取槽数量(L0 层槽数量)
179    #[allow(dead_code)]
180    pub fn slot_count(&self) -> usize {
181        self.l0.slot_count
182    }
183
184    /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
185    /// 
186    /// 计算延迟对应的 tick 数量(基于 L0 层)
187    #[allow(dead_code)]
188    pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
189        self.l0.delay_to_ticks(delay)
190    }
191    
192    /// Determine which layer the delay should be inserted into
193    ///
194    /// # Returns
195    /// Returns: (layer, ticks, rounds)
196    /// - Layer: 0 = L0, 1 = L1
197    /// - Ticks: number of ticks calculated from current tick
198    /// - Rounds: number of rounds (only used for very long delays in L1 layer)
199    /// 
200    /// 确定延迟应该插入到哪一层
201    ///
202    /// # 返回值
203    /// 返回:(层级, ticks, 轮数)
204    /// - 层级:0 = L0, 1 = L1
205    /// - Ticks:从当前 tick 计算的 tick 数量
206    /// - 轮数:轮数(仅用于 L1 层的超长延迟)
207    #[inline(always)]
208    fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
209        let delay_ms = delay.as_millis() as u64;
210        
211        // Fast path: most tasks are within L0 range (using cached capacity)
212        // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
213        if delay_ms < self.l0_capacity_ms {
214            let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
215            return (0, l0_ticks, 0);
216        }
217        
218        // Slow path: L1 layer tasks (using cached values)
219        // 慢速路径:L1 层任务(使用缓存的值)
220        let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
221        
222        if l1_ticks < self.l1_capacity_ticks {
223            (1, l1_ticks, 0)
224        } else {
225            let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
226            (1, l1_ticks, rounds)
227        }
228    }
229
230    /// Insert timer task
231    ///
232    /// # Parameters
233    /// - `task`: Timer task
234    /// - `notifier`: Completion notifier (used to send notifications when tasks expire or are cancelled)
235    ///
236    /// # Returns
237    /// Unique identifier of the task (TaskId)
238    ///
239    /// # Implementation Details
240    /// - Automatically calculate the layer and slot where the task should be inserted
241    /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
242    /// - Use bit operations to optimize slot index calculation
243    /// - Maintain task index to support O(1) lookup and cancellation
244    /// 
245    /// 插入定时器任务
246    ///
247    /// # 参数
248    /// - `task`: 定时器任务
249    /// - `notifier`: 完成通知器(用于在任务过期或取消时发送通知)
250    ///
251    /// # 返回值
252    /// 任务的唯一标识符(TaskId)
253    ///
254    /// # 实现细节
255    /// - 自动计算任务应该插入的层级和槽位
256    /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
257    /// - 使用位运算优化槽索引计算
258    /// - 维护任务索引以支持 O(1) 查找和取消
259    #[inline]
260    pub fn insert(&mut self, mut task: TimerTask, notifier: crate::task::CompletionNotifier) -> TaskId {
261        let (level, ticks, rounds) = self.determine_layer(task.delay);
262        
263        // Use match to reduce branches, and use cached slot mask
264        // 使用 match 减少分支,并使用缓存的槽掩码
265        let (current_tick, slot_mask, slots) = match level {
266            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
267            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
268        };
269        
270        let total_ticks = current_tick + ticks;
271        let slot_index = (total_ticks as usize) & slot_mask;
272
273        // Prepare task registration (set notifier and timing wheel parameters)
274        // 准备任务注册(设置通知器和时间轮参数)
275        task.prepare_for_registration(notifier, total_ticks, rounds);
276
277        let task_id = task.id;
278        
279        // Get the index position of the task in Vec (the length before insertion is the index of the new task)
280        // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
281        let vec_index = slots[slot_index].len();
282        let location = TaskLocation::new(level, slot_index, vec_index);
283
284        // Insert task into slot
285        // 将任务插入槽中
286        slots[slot_index].push(task);
287        
288        // Record task location
289        // 记录任务位置
290        self.task_index.insert(task_id, location);
291
292        task_id
293    }
294
295    /// Batch insert timer tasks
296    ///
297    /// # Parameters
298    /// - `tasks`: List of tuples of (task, completion notifier)
299    ///
300    /// # Returns
301    /// List of task IDs
302    ///
303    /// # Performance Advantages
304    /// - Reduce repeated boundary checks and capacity adjustments
305    /// - For tasks with the same delay, calculation results can be reused
306    /// 
307    /// 批量插入定时器任务
308    ///
309    /// # 参数
310    /// - `tasks`: (任务, 完成通知器) 元组列表
311    ///
312    /// # 返回值
313    /// 任务 ID 列表
314    ///
315    /// # 性能优势
316    /// - 减少重复的边界检查和容量调整
317    /// - 对于相同延迟的任务,可以重用计算结果
318    #[inline]
319    pub fn insert_batch(&mut self, tasks: Vec<(TimerTask, crate::task::CompletionNotifier)>) -> Vec<TaskId> {
320        let task_count = tasks.len();
321        
322        // Optimize: pre-allocate HashMap capacity to avoid reallocation
323        // 优化:预分配 HashMap 容量以避免重新分配
324        self.task_index.reserve(task_count);
325        
326        let mut task_ids = Vec::with_capacity(task_count);
327        
328        for (mut task, notifier) in tasks {
329            let (level, ticks, rounds) = self.determine_layer(task.delay);
330            
331            // Use match to reduce branches, and use cached slot mask
332            // 使用 match 减少分支,并使用缓存的槽掩码
333            let (current_tick, slot_mask, slots) = match level {
334                0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
335                _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
336            };
337            
338            let total_ticks = current_tick + ticks;
339            let slot_index = (total_ticks as usize) & slot_mask;
340
341            // Prepare task registration (set notifier and timing wheel parameters)
342            // 准备任务注册(设置通知器和时间轮参数)
343            task.prepare_for_registration(notifier, total_ticks, rounds);
344
345            let task_id = task.id;
346            
347            // Get the index position of the task in Vec
348            // 获取任务在 Vec 中的索引位置
349            let vec_index = slots[slot_index].len();
350            let location = TaskLocation::new(level, slot_index, vec_index);
351
352            // Insert task into slot
353            // 将任务插入槽中
354            slots[slot_index].push(task);
355            
356            // Record task location
357            // 记录任务位置
358            self.task_index.insert(task_id, location);
359            
360            task_ids.push(task_id);
361        }
362        
363        task_ids
364    }
365
366    /// Cancel timer task
367    ///
368    /// # Parameters
369    /// - `task_id`: Task ID
370    ///
371    /// # Returns
372    /// Returns true if the task exists and is successfully cancelled, otherwise returns false
373    /// 
374    /// 取消定时器任务
375    ///
376    /// # 参数
377    /// - `task_id`: 任务 ID
378    ///
379    /// # 返回值
380    /// 如果任务存在且成功取消则返回 true,否则返回 false
381    #[inline]
382    pub fn cancel(&mut self, task_id: TaskId) -> bool {
383        // Remove task location from index
384        // 从索引中移除任务位置
385        let location = match self.task_index.remove(&task_id) {
386            Some(loc) => loc,
387            None => return false,
388        };
389        
390        // Use match to get slot reference, reduce branches
391        // 使用 match 获取槽引用,减少分支
392        let slot = match location.level {
393            0 => &mut self.l0.slots[location.slot_index],
394            _ => &mut self.l1.slots[location.slot_index],
395        };
396        
397        // Boundary check and ID verification
398        // 边界检查和 ID 验证
399        if location.vec_index >= slot.len() || slot[location.vec_index].id != task_id {
400            // Index inconsistent, re-insert location to maintain data consistency
401            // 索引不一致,重新插入位置以保持数据一致性
402            self.task_index.insert(task_id, location);
403            return false;
404        }
405        
406        // Send cancellation notification
407        // 发送取消通知
408        if let Some(notifier) = slot[location.vec_index].completion_notifier.take() {
409            let _ = notifier.0.send(TaskCompletionReason::Cancelled);
410        }
411        
412        // Use swap_remove to remove task, record swapped task ID
413        // 使用 swap_remove 移除任务,记录被交换的任务 ID
414        let removed_task = slot.swap_remove(location.vec_index);
415        
416        // If a swap occurred (vec_index is not the last element)
417        // 如果发生了交换(vec_index 不是最后一个元素)
418        if location.vec_index < slot.len() {
419            let swapped_task_id = slot[location.vec_index].id;
420            // Update swapped element's index in one go, avoid another HashMap query
421            // 一次性更新被交换元素的索引,避免再次查询 HashMap
422            if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
423                swapped_location.vec_index = location.vec_index;
424            }
425        }
426        
427        // Ensure the correct task was removed
428        // 确保移除了正确的任务
429        debug_assert_eq!(removed_task.id, task_id);
430        true
431    }
432
433    /// Batch cancel timer tasks
434    ///
435    /// # Parameters
436    /// - `task_ids`: List of task IDs to cancel
437    ///
438    /// # Returns
439    /// Number of successfully cancelled tasks
440    ///
441    /// # Performance Advantages
442    /// - Reduce repeated HashMap lookup overhead
443    /// - Multiple cancellation operations on the same slot can be batch processed
444    /// - Use unstable sort to improve performance
445    /// - Small batch optimization: skip sorting based on configuration threshold, process directly
446    /// 
447    /// 批量取消定时器任务
448    ///
449    /// # 参数
450    /// - `task_ids`: 要取消的任务 ID 列表
451    ///
452    /// # 返回值
453    /// 成功取消的任务数量
454    ///
455    /// # 性能优势
456    /// - 减少重复的 HashMap 查找开销
457    /// - 同一槽位的多个取消操作可以批量处理
458    /// - 使用不稳定排序提高性能
459    /// - 小批量优化:根据配置阈值跳过排序,直接处理
460    #[inline]
461    pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
462        let mut cancelled_count = 0;
463        
464        // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
465        // 小批量优化:逐个取消以避免分组和排序开销
466        if task_ids.len() <= self.batch_config.small_batch_threshold {
467            for &task_id in task_ids {
468                if self.cancel(task_id) {
469                    cancelled_count += 1;
470                }
471            }
472            return cancelled_count;
473        }
474        
475        // Group by layer and slot to optimize batch cancellation
476        // Use SmallVec to avoid heap allocation in most cases
477        // 按层级和槽位分组以优化批量取消
478        // 使用 SmallVec 避免在大多数情况下进行堆分配
479        let l0_slot_count = self.l0.slot_count;
480        let l1_slot_count = self.l1.slot_count;
481        
482        let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
483        let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
484        
485        // Collect information of tasks to be cancelled
486        // 收集要取消的任务信息
487        for &task_id in task_ids {
488            if let Some(location) = self.task_index.get(&task_id) {
489                if location.level == 0 {
490                    l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
491                } else {
492                    l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
493                }
494            }
495        }
496        
497        // Process L0 layer cancellation
498        // 处理 L0 层取消
499        for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
500            if tasks.is_empty() {
501                continue;
502            }
503            
504            // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
505            // 按 vec_index 降序排序,从后向前删除以避免索引失效
506            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
507            
508            let slot = &mut self.l0.slots[slot_index];
509            
510            for &(task_id, vec_index) in tasks.iter() {
511                if vec_index < slot.len() && slot[vec_index].id == task_id {
512                    if let Some(notifier) = slot[vec_index].completion_notifier.take() {
513                        let _ = notifier.0.send(TaskCompletionReason::Cancelled);
514                    }
515                    
516                    slot.swap_remove(vec_index);
517                    
518                    if vec_index < slot.len() {
519                        let swapped_task_id = slot[vec_index].id;
520                        if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
521                            swapped_location.vec_index = vec_index;
522                        }
523                    }
524                    
525                    self.task_index.remove(&task_id);
526                    cancelled_count += 1;
527                }
528            }
529        }
530        
531        // Process L1 layer cancellation
532        // 处理 L1 层取消
533        for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
534            if tasks.is_empty() {
535                continue;
536            }
537            
538            tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
539            
540            let slot = &mut self.l1.slots[slot_index];
541            
542            for &(task_id, vec_index) in tasks.iter() {
543                if vec_index < slot.len() && slot[vec_index].id == task_id {
544                    if let Some(notifier) = slot[vec_index].completion_notifier.take() {
545                        let _ = notifier.0.send(TaskCompletionReason::Cancelled);
546                    }
547                    
548                    slot.swap_remove(vec_index);
549                    
550                    if vec_index < slot.len() {
551                        let swapped_task_id = slot[vec_index].id;
552                        if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
553                            swapped_location.vec_index = vec_index;
554                        }
555                    }
556                    
557                    self.task_index.remove(&task_id);
558                    cancelled_count += 1;
559                }
560            }
561        }
562        
563        cancelled_count
564    }
565
566    /// Advance the timing wheel by one tick, return all expired tasks
567    ///
568    /// # Returns
569    /// List of expired tasks
570    ///
571    /// # Implementation Details
572    /// - L0 layer advances 1 tick each time (no rounds check)
573    /// - L1 layer advances once every (L1_tick / L0_tick) times
574    /// - L1 expired tasks are batch demoted to L0
575    /// 
576    /// 推进时间轮一个 tick,返回所有过期的任务
577    ///
578    /// # 返回值
579    /// 过期任务列表
580    ///
581    /// # 实现细节
582    /// - L0 层每次推进 1 个 tick(无轮数检查)
583    /// - L1 层每 (L1_tick / L0_tick) 次推进一次
584    /// - L1 层过期任务批量降级到 L0
585    pub fn advance(&mut self) -> Vec<TimerTask> {
586        // Advance L0 layer
587        // 推进 L0 层
588        self.l0.current_tick += 1;
589        
590        let mut expired_tasks = Vec::new();
591        
592        // Process L0 layer expired tasks (hierarchical mode: L0 layer has no rounds, return all tasks directly)
593        // 处理 L0 层过期任务(分层模式:L0 层无轮数,直接返回所有任务)
594        let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
595        let l0_slot = &mut self.l0.slots[l0_slot_index];
596        
597        let i = 0;
598        while i < l0_slot.len() {
599            let task = &l0_slot[i];
600            
601            // Remove from index
602            // 从索引中移除
603            self.task_index.remove(&task.id);
604            
605            // Use swap_remove to remove task
606            // 使用 swap_remove 移除任务
607            let expired_task = l0_slot.swap_remove(i);
608            
609            // Update swapped element's index
610            // 更新被交换元素的索引
611            if i < l0_slot.len() {
612                let swapped_task_id = l0_slot[i].id;
613                if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
614                    swapped_location.vec_index = i;
615                }
616            }
617            
618            expired_tasks.push(expired_task);
619        }
620        
621        // Process L1 layer
622        // Check if L1 layer needs to be advanced
623        // 处理 L1 层
624        // 检查是否需要推进 L1 层
625        if self.l0.current_tick % self.l1_tick_ratio == 0 {
626            self.l1.current_tick += 1;
627            let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
628            let l1_slot = &mut self.l1.slots[l1_slot_index];
629            
630            // Collect L1 layer expired tasks
631            // 收集 L1 层过期任务
632            let mut tasks_to_demote = Vec::new();
633            let mut i = 0;
634            while i < l1_slot.len() {
635                let task = &mut l1_slot[i];
636                
637                if task.rounds > 0 {
638                    // Still has rounds, decrease rounds and keep
639                    // 还有轮数,减少轮数并保留
640                    task.rounds -= 1;
641                    if let Some(location) = self.task_index.get_mut(&task.id) {
642                        location.vec_index = i;
643                    }
644                    i += 1;
645                } else {
646                    // rounds = 0, need to demote to L0
647                    // rounds = 0,需要降级到 L0
648                    self.task_index.remove(&task.id);
649                    let task_to_demote = l1_slot.swap_remove(i);
650                    
651                    if i < l1_slot.len() {
652                        let swapped_task_id = l1_slot[i].id;
653                        if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
654                            swapped_location.vec_index = i;
655                        }
656                    }
657                    
658                    tasks_to_demote.push(task_to_demote);
659                }
660            }
661            
662            // Demote tasks to L0
663            // 将任务降级到 L0
664            self.demote_tasks(tasks_to_demote);
665        }
666        
667        expired_tasks
668    }
669    
670    /// Demote tasks from L1 to L0
671    /// 
672    /// Recalculate and insert L1 expired tasks into L0 layer
673    /// 
674    /// 将任务从 L1 降级到 L0
675    /// 
676    /// 重新计算并将 L1 过期任务插入到 L0 层
677    fn demote_tasks(&mut self, tasks: Vec<TimerTask>) {
678        for task in tasks {
679            // Calculate the remaining delay of the task in L0 layer
680            // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
681            // 计算任务在 L0 层的剩余延迟
682            // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
683            let l1_tick_ratio = self.l1_tick_ratio;
684            
685            // Calculate the original expiration time of the task (L1 tick)
686            // 计算任务的原始过期时间(L1 tick)
687            let l1_deadline = task.deadline_tick;
688            
689            // Convert to L0 tick expiration time
690            // 转换为 L0 tick 过期时间
691            let l0_deadline_tick = l1_deadline * l1_tick_ratio;
692            let l0_current_tick = self.l0.current_tick;
693            
694            // Calculate remaining L0 ticks
695            // 计算剩余 L0 ticks
696            let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
697                l0_deadline_tick - l0_current_tick
698            } else {
699                1 // At least trigger in next tick (至少在下一个 tick 触发)
700            };
701            
702            // Calculate L0 slot index
703            // 计算 L0 槽索引
704            let target_l0_tick = l0_current_tick + remaining_l0_ticks;
705            let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
706            
707            let task_id = task.id;
708            let vec_index = self.l0.slots[l0_slot_index].len();
709            let location = TaskLocation::new(0, l0_slot_index, vec_index);
710            
711            // Insert into L0 layer
712            // 插入到 L0 层
713            self.l0.slots[l0_slot_index].push(task);
714            self.task_index.insert(task_id, location);
715        }
716    }
717
718    /// Check if the timing wheel is empty
719    /// 
720    /// 检查时间轮是否为空
721    #[allow(dead_code)]
722    pub fn is_empty(&self) -> bool {
723        self.task_index.is_empty()
724    }
725
726    /// Postpone timer task (keep original TaskId)
727    ///
728    /// # Parameters
729    /// - `task_id`: Task ID to postpone
730    /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
731    /// - `new_callback`: New callback function (if None, keep original callback)
732    ///
733    /// # Returns
734    /// Returns true if the task exists and is successfully postponed, otherwise returns false
735    ///
736    /// # Implementation Details
737    /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
738    /// - Update delay time and callback function (if provided)
739    /// - Recalculate target layer, slot, and rounds based on new_delay
740    /// - Cross-layer migration may occur (L0 <-> L1)
741    /// - Re-insert to new position using original TaskId
742    /// - Keep consistent with external held TaskId reference
743    /// 
744    /// 延期定时器任务(保留原始 TaskId)
745    ///
746    /// # 参数
747    /// - `task_id`: 要延期的任务 ID
748    /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
749    /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
750    ///
751    /// # 返回值
752    /// 如果任务存在且成功延期则返回 true,否则返回 false
753    ///
754    /// # 实现细节
755    /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
756    /// - 更新延迟时间和回调函数(如果提供)
757    /// - 根据 new_delay 重新计算目标层级、槽位和轮数
758    /// - 可能发生跨层迁移(L0 <-> L1)
759    /// - 使用原始 TaskId 重新插入到新位置
760    /// - 与外部持有的 TaskId 引用保持一致
761    #[inline]
762    pub fn postpone(
763        &mut self,
764        task_id: TaskId,
765        new_delay: Duration,
766        new_callback: Option<crate::task::CallbackWrapper>,
767    ) -> bool {
768        // Step 1: Find and remove original task
769        // 步骤 1: 查找并移除原任务
770        let old_location = match self.task_index.remove(&task_id) {
771            Some(loc) => loc,
772            None => return false,
773        };
774        
775        // Use match to get slot reference
776        // 使用 match 获取槽引用
777        let slot = match old_location.level {
778            0 => &mut self.l0.slots[old_location.slot_index],
779            _ => &mut self.l1.slots[old_location.slot_index],
780        };
781        
782        // Verify task is still at expected position
783        // 验证任务仍在预期位置
784        if old_location.vec_index >= slot.len() || slot[old_location.vec_index].id != task_id {
785            // Index inconsistent, re-insert and return failure
786            // 索引不一致,重新插入并返回失败
787            self.task_index.insert(task_id, old_location);
788            return false;
789        }
790        
791        // Use swap_remove to remove task
792        // 使用 swap_remove 移除任务
793        let mut task = slot.swap_remove(old_location.vec_index);
794        
795        // Update swapped element's index (if a swap occurred)
796        // 更新被交换元素的索引(如果发生了交换)
797        if old_location.vec_index < slot.len() {
798            let swapped_task_id = slot[old_location.vec_index].id;
799            if let Some(swapped_location) = self.task_index.get_mut(&swapped_task_id) {
800                swapped_location.vec_index = old_location.vec_index;
801            }
802        }
803        
804        // Step 2: Update task's delay and callback
805        // 步骤 2: 更新任务的延迟和回调
806        task.delay = new_delay;
807        if let Some(callback) = new_callback {
808            task.callback = Some(callback);
809        }
810        
811        // Step 3: Recalculate layer, slot, and rounds based on new delay
812        // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
813        let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
814        
815        // Use match to reduce branches, and use cached slot mask
816        // 使用 match 减少分支,并使用缓存的槽掩码
817        let (current_tick, slot_mask, slots) = match new_level {
818            0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
819            _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
820        };
821        
822        let total_ticks = current_tick + ticks;
823        let new_slot_index = (total_ticks as usize) & slot_mask;
824        
825        // Update task's timing wheel parameters
826        // 更新任务的时间轮参数
827        task.deadline_tick = total_ticks;
828        task.rounds = new_rounds;
829        
830        // Step 4: Re-insert task to new layer/slot
831        // 步骤 4: 将任务重新插入到新层级/槽位
832        let new_vec_index = slots[new_slot_index].len();
833        let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
834        
835        slots[new_slot_index].push(task);
836        self.task_index.insert(task_id, new_location);
837        
838        true
839    }
840
841    /// Batch postpone timer tasks
842    ///
843    /// # Parameters
844    /// - `updates`: List of tuples of (task ID, new delay)
845    ///
846    /// # Returns
847    /// Number of successfully postponed tasks
848    ///
849    /// # Performance Advantages
850    /// - Batch processing reduces function call overhead
851    /// - All delays are recalculated from current_tick at call time
852    ///
853    /// # Notes
854    /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
855    /// 
856    /// 批量延期定时器任务
857    ///
858    /// # 参数
859    /// - `updates`: (任务 ID, 新延迟) 元组列表
860    ///
861    /// # 返回值
862    /// 成功延期的任务数量
863    ///
864    /// # 性能优势
865    /// - 批处理减少函数调用开销
866    /// - 所有延迟在调用时从 current_tick 重新计算
867    ///
868    /// # 注意
869    /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
870    #[inline]
871    pub fn postpone_batch(
872        &mut self,
873        updates: Vec<(TaskId, Duration)>,
874    ) -> usize {
875        let mut postponed_count = 0;
876        
877        for (task_id, new_delay) in updates {
878            if self.postpone(task_id, new_delay, None) {
879                postponed_count += 1;
880            }
881        }
882        
883        postponed_count
884    }
885
886    /// Batch postpone timer tasks (replace callbacks)
887    ///
888    /// # Parameters
889    /// - `updates`: List of tuples of (task ID, new delay, new callback)
890    ///
891    /// # Returns
892    /// Number of successfully postponed tasks
893    /// 
894    /// 批量延期定时器任务(替换回调)
895    ///
896    /// # 参数
897    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
898    ///
899    /// # 返回值
900    /// 成功延期的任务数量
901    pub fn postpone_batch_with_callbacks(
902        &mut self,
903        updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
904    ) -> usize {
905        let mut postponed_count = 0;
906        
907        for (task_id, new_delay, new_callback) in updates {
908            if self.postpone(task_id, new_delay, new_callback) {
909                postponed_count += 1;
910            }
911        }
912        
913        postponed_count
914    }
915}
916
917#[cfg(test)]
918mod tests {
919    use super::*;
920    use crate::task::CallbackWrapper;
921
922    #[test]
923    fn test_wheel_creation() {
924        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
925        assert_eq!(wheel.slot_count(), 512);
926        assert_eq!(wheel.current_tick(), 0);
927        assert!(wheel.is_empty());
928    }
929
930    #[test]
931    fn test_hierarchical_wheel_creation() {
932        let config = WheelConfig::default();
933        
934        let wheel = Wheel::new(config, BatchConfig::default());
935        assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
936        assert_eq!(wheel.current_tick(), 0);
937        assert!(wheel.is_empty());
938        // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
939        assert_eq!(wheel.l1.slot_count, 64);
940        assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
941    }
942
943    #[test]
944    fn test_hierarchical_config_validation() {
945        // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
946        let result = WheelConfig::builder()
947            .l0_tick_duration(Duration::from_millis(10))
948            .l0_slot_count(512)
949            .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
950            .l1_slot_count(64)
951            .build();
952        
953        assert!(result.is_err());
954        
955        // Correct configuration (正确的配置)
956        let result = WheelConfig::builder()
957            .l0_tick_duration(Duration::from_millis(10))
958            .l0_slot_count(512)
959            .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
960            .l1_slot_count(64)
961            .build();
962        
963        assert!(result.is_ok());
964    }
965
966    #[test]
967    fn test_layer_determination() {
968        let config = WheelConfig::default();
969        
970        let wheel = Wheel::new(config, BatchConfig::default());
971        
972        // Short delay should enter L0 layer (短延迟应该进入 L0 层)
973        // L0: 512 slots * 10ms = 5120ms
974        let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
975        assert_eq!(level, 0);
976        assert_eq!(rounds, 0);
977        
978        // Long delay should enter L1 layer
979        // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
980        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
981        assert_eq!(level, 1);
982        assert_eq!(rounds, 0);
983        
984        // Long delay should enter L1 layer and have rounds
985        // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
986        let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
987        assert_eq!(level, 1);
988        assert!(rounds > 0);
989    }
990
991    #[test]
992    fn test_hierarchical_insert_and_advance() {
993        use crate::task::{TimerTask, CompletionNotifier};
994        
995        let config = WheelConfig::default();
996        
997        let mut wheel = Wheel::new(config, BatchConfig::default());
998        
999        // Insert short delay task into L0 (插入短延迟任务到 L0)
1000        let callback = CallbackWrapper::new(|| async {});
1001        let (tx, _rx) = tokio::sync::oneshot::channel();
1002        let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1003        let task_id = wheel.insert(task, CompletionNotifier(tx));
1004        
1005        // Verify task is in L0 layer (验证任务在 L0 层)
1006        let location = wheel.task_index.get(&task_id).unwrap();
1007        assert_eq!(location.level, 0);
1008        
1009        // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1010        for _ in 0..10 {
1011            let expired = wheel.advance();
1012            if !expired.is_empty() {
1013                assert_eq!(expired.len(), 1);
1014                assert_eq!(expired[0].id, task_id);
1015                return;
1016            }
1017        }
1018        panic!("Task should have expired");
1019    }
1020
1021    #[test]
1022    fn test_hierarchical_l1_to_l0_demotion() {
1023        use crate::task::{TimerTask, CompletionNotifier};
1024        
1025        let config = WheelConfig::builder()
1026            .l0_tick_duration(Duration::from_millis(10))
1027            .l0_slot_count(512)
1028            .l1_tick_duration(Duration::from_millis(100)) // L1 tick = 100ms (L1 tick = 100毫秒)
1029            .l1_slot_count(64)
1030            .build()
1031            .unwrap();
1032        
1033        let mut wheel = Wheel::new(config, BatchConfig::default());
1034        let l1_tick_ratio = wheel.l1_tick_ratio;
1035        assert_eq!(l1_tick_ratio, 10); // 100ms / 10ms = 10 (100毫秒 / 10毫秒 = 10)
1036        
1037        // Insert task, delay 6000ms (exceeds L0 range 5120ms) (插入任务,延迟 6000毫秒 (超过 L0 范围 5120毫秒))
1038        let callback = CallbackWrapper::new(|| async {});
1039        let (tx, _rx) = tokio::sync::oneshot::channel();
1040        let task = TimerTask::new(Duration::from_millis(6000), Some(callback));
1041        let task_id = wheel.insert(task, CompletionNotifier(tx));
1042        
1043        // Verify task is in L1 layer (验证任务在 L1 层)
1044        let location = wheel.task_index.get(&task_id).unwrap();
1045        assert_eq!(location.level, 1);
1046        
1047        // Advance to L1 slot expiration (6000ms / 100ms = 60 L1 ticks) (前进到 L1 槽过期 (6000毫秒 / 100毫秒 = 60个L1 tick))
1048        // 60 L1 ticks = 600 L0 ticks (60个L1 tick = 600个L0 tick)
1049        let mut demoted = false;
1050        for i in 0..610 {
1051            wheel.advance();
1052            
1053            // Check if task is demoted to L0 (检查任务是否降级到 L0)
1054            if let Some(location) = wheel.task_index.get(&task_id) {
1055                if location.level == 0 && !demoted {
1056                    demoted = true;
1057                    println!("Task demoted to L0 at L0 tick {}", i); // 任务降级到 L0 在 L0 tick {i}
1058                }
1059            }
1060        }
1061        
1062        assert!(demoted, "Task should have been demoted from L1 to L0"); // 任务应该从 L1 降级到 L0
1063    }
1064
1065    #[test]
1066    fn test_cross_layer_cancel() {
1067        use crate::task::{TimerTask, CompletionNotifier};
1068        
1069        let config = WheelConfig::default();
1070        
1071        let mut wheel = Wheel::new(config, BatchConfig::default());
1072        
1073        // Insert L0 task (插入 L0 任务)
1074        let callback1 = CallbackWrapper::new(|| async {});
1075        let (tx1, _rx1) = tokio::sync::oneshot::channel();
1076        let task1 = TimerTask::new(Duration::from_millis(100), Some(callback1));
1077        let task_id1 = wheel.insert(task1, CompletionNotifier(tx1));
1078        
1079        // Insert L1 task (插入 L1 任务)
1080        let callback2 = CallbackWrapper::new(|| async {});
1081        let (tx2, _rx2) = tokio::sync::oneshot::channel();
1082        let task2 = TimerTask::new(Duration::from_secs(10), Some(callback2));
1083        let task_id2 = wheel.insert(task2, CompletionNotifier(tx2));
1084        
1085        // Verify levels (验证层级)
1086        assert_eq!(wheel.task_index.get(&task_id1).unwrap().level, 0);
1087        assert_eq!(wheel.task_index.get(&task_id2).unwrap().level, 1);
1088        
1089        // Cancel L0 task (取消 L0 任务)
1090        assert!(wheel.cancel(task_id1));
1091        assert!(wheel.task_index.get(&task_id1).is_none());
1092        
1093        // Cancel L1 task (取消 L1 任务)
1094        assert!(wheel.cancel(task_id2));
1095        assert!(wheel.task_index.get(&task_id2).is_none());
1096        
1097        assert!(wheel.is_empty()); // 时间轮应该为空
1098    }
1099
1100    #[test]
1101    fn test_cross_layer_postpone() {
1102        use crate::task::{TimerTask, CompletionNotifier};
1103        
1104        let config = WheelConfig::default();
1105        
1106        let mut wheel = Wheel::new(config, BatchConfig::default());
1107        
1108        // Insert L0 task (100ms) (插入 L0 任务 (100毫秒))
1109        let callback = CallbackWrapper::new(|| async {});
1110        let (tx, _rx) = tokio::sync::oneshot::channel();
1111        let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1112        let task_id = wheel.insert(task, CompletionNotifier(tx));
1113        
1114        // Verify in L0 layer (验证在 L0 层)
1115        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
1116        
1117        // Postpone to 10 seconds (should migrate to L1) (延期到 10 秒 (应该迁移到 L1))
1118        assert!(wheel.postpone(task_id, Duration::from_secs(10), None));
1119        
1120        // Verify migrated to L1 layer (验证迁移到 L1 层)
1121        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 1);
1122        
1123        // Postpone back to 200ms (should migrate back to L0) (延期回 200毫秒 (应该迁移回 L0))
1124        assert!(wheel.postpone(task_id, Duration::from_millis(200), None));
1125        
1126        // Verify migrated back to L0 layer (验证迁移回 L0 层)
1127        assert_eq!(wheel.task_index.get(&task_id).unwrap().level, 0);
1128    }
1129
1130    #[test]
1131    fn test_delay_to_ticks() {
1132        let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1133        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1134        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1135        assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1136    }
1137
1138    #[test]
1139    fn test_wheel_invalid_slot_count() {
1140        let result = WheelConfig::builder()
1141            .l0_slot_count(100)
1142            .build();
1143        assert!(result.is_err());
1144        if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1145            assert_eq!(slot_count, 100);
1146            assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1147        } else {
1148            panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1149        }
1150    }
1151
1152    #[test]
1153    fn test_insert_batch() {
1154        use crate::task::{TimerTask, CompletionNotifier};
1155        
1156        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1157        
1158        // Create batch tasks (创建批量任务)
1159        let tasks: Vec<(TimerTask, CompletionNotifier)> = (0..10)
1160            .map(|i| {
1161                let callback = CallbackWrapper::new(|| async {});
1162                let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1163                let notifier = CompletionNotifier(completion_tx);
1164                let task = TimerTask::new(Duration::from_millis(100 + i * 10), Some(callback));
1165                (task, notifier)
1166            })
1167            .collect();
1168        
1169        let task_ids = wheel.insert_batch(tasks);
1170        
1171        assert_eq!(task_ids.len(), 10);
1172        assert!(!wheel.is_empty());
1173    }
1174
1175    #[test]
1176    fn test_cancel_batch() {
1177        use crate::task::{TimerTask, CompletionNotifier};
1178        
1179        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1180        
1181        // Insert multiple tasks
1182        let mut task_ids = Vec::new();
1183        for i in 0..10 {
1184            let callback = CallbackWrapper::new(|| async {});
1185            let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1186            let notifier = CompletionNotifier(completion_tx);
1187            let task = TimerTask::new(Duration::from_millis(100 + i * 10), Some(callback));
1188            let task_id = wheel.insert(task, notifier);
1189            task_ids.push(task_id);
1190        }
1191        
1192        assert_eq!(task_ids.len(), 10);
1193        
1194        // Batch cancel first 5 tasks
1195        let to_cancel = &task_ids[0..5];
1196        let cancelled_count = wheel.cancel_batch(to_cancel);
1197        
1198        assert_eq!(cancelled_count, 5);
1199        
1200        // Try to cancel the same tasks again, should return 0
1201        let cancelled_again = wheel.cancel_batch(to_cancel);
1202        assert_eq!(cancelled_again, 0);
1203        
1204        // Cancel remaining tasks
1205        let remaining = &task_ids[5..10];
1206        let cancelled_remaining = wheel.cancel_batch(remaining);
1207        assert_eq!(cancelled_remaining, 5);
1208        
1209        assert!(wheel.is_empty());
1210    }
1211
1212    #[test]
1213    fn test_batch_operations_same_slot() {
1214        use crate::task::{TimerTask, CompletionNotifier};
1215        
1216        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1217        
1218        // Insert multiple tasks with the same delay (will enter the same slot) (插入多个任务,延迟相同,将进入同一个槽)
1219        let mut task_ids = Vec::new();
1220        for _ in 0..20 {
1221            let callback = CallbackWrapper::new(|| async {});
1222            let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1223            let notifier = CompletionNotifier(completion_tx);
1224            let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1225            let task_id = wheel.insert(task, notifier);
1226            task_ids.push(task_id);
1227        }
1228        
1229        // Batch cancel all tasks (批量取消所有任务)
1230        let cancelled_count = wheel.cancel_batch(&task_ids);
1231        assert_eq!(cancelled_count, 20);
1232        assert!(wheel.is_empty());
1233    }
1234
1235    #[test]
1236    fn test_postpone_single_task() {
1237        use crate::task::{TimerTask, CompletionNotifier};
1238        
1239        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1240        
1241        // Insert task, delay 100ms (插入任务,延迟 100毫秒)
1242        let callback = CallbackWrapper::new(|| async {});
1243        let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1244        let notifier = CompletionNotifier(completion_tx);
1245        let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1246        let task_id = wheel.insert(task, notifier);
1247        
1248        // Postpone task to 200ms (keep original callback) (延期任务到 200毫秒 (保留原始回调))
1249        let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1250        assert!(postponed);
1251        
1252        // Verify task is still in the timing wheel (验证任务仍在时间轮中)
1253        assert!(!wheel.is_empty());
1254        
1255        // Advance 100ms (10 ticks), task should not trigger (前进 100毫秒 (10个tick),任务不应该触发)
1256        for _ in 0..10 {
1257            let expired = wheel.advance();
1258            assert!(expired.is_empty());
1259        }
1260        
1261        // Advance 100ms (10 ticks), task should trigger (前进 100毫秒 (10个tick),任务应该触发)
1262        let mut triggered = false;
1263        for _ in 0..10 {
1264            let expired = wheel.advance();
1265            if !expired.is_empty() {
1266                assert_eq!(expired.len(), 1);
1267                assert_eq!(expired[0].id, task_id);
1268                triggered = true;
1269                break;
1270            }
1271        }
1272        assert!(triggered);
1273    }
1274
1275    #[test]
1276    fn test_postpone_with_new_callback() {
1277        use crate::task::{TimerTask, CompletionNotifier};
1278        
1279        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1280        
1281        // Insert task, with original callback (插入任务,原始回调)
1282        let old_callback = CallbackWrapper::new(|| async {});
1283        let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1284        let notifier = CompletionNotifier(completion_tx);
1285        let task = TimerTask::new(Duration::from_millis(100), Some(old_callback.clone()));
1286        let task_id = wheel.insert(task, notifier);
1287        
1288        // Postpone task and replace callback (延期任务并替换回调)
1289        let new_callback = CallbackWrapper::new(|| async {});
1290        let postponed = wheel.postpone(task_id, Duration::from_millis(50), Some(new_callback));
1291        assert!(postponed);
1292        
1293        // Advance 50ms (5 ticks), task should trigger (前进 50毫秒 (5个tick),任务应该触发)
1294        // 注意:任务在第 5 个 tick 触发(current_tick 从 0 推进到 5)
1295        let mut triggered = false;
1296        for i in 0..5 {
1297            let expired = wheel.advance();
1298            if !expired.is_empty() {
1299                assert_eq!(expired.len(), 1, "On the {}th advance, there should be 1 task triggered", i + 1);
1300                assert_eq!(expired[0].id, task_id);
1301                triggered = true;
1302                break;
1303            }
1304        }
1305        assert!(triggered, "Task should be triggered within 5 ticks"); // 任务应该在 5 个 tick 内触发
1306    }
1307
1308    #[test]
1309    fn test_postpone_nonexistent_task() {
1310        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1311        
1312        // Try to postpone nonexistent task (尝试延期不存在任务)
1313        let fake_task_id = TaskId::new();
1314        let postponed = wheel.postpone(fake_task_id, Duration::from_millis(100), None);
1315        assert!(!postponed);
1316    }
1317
1318    #[test]
1319    fn test_postpone_batch() {
1320        use crate::task::{TimerTask, CompletionNotifier};
1321        
1322        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1323        
1324        // Insert 5 tasks, delay 50ms (5 ticks) (插入 5 个任务,延迟 50毫秒 (5个tick))
1325        let mut task_ids = Vec::new();
1326        for _ in 0..5 {
1327            let callback = CallbackWrapper::new(|| async {});
1328            let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1329            let notifier = CompletionNotifier(completion_tx);
1330            let task = TimerTask::new(Duration::from_millis(50), Some(callback));
1331            let task_id = wheel.insert(task, notifier);
1332            task_ids.push(task_id);
1333        }
1334        
1335        // Batch postpone all tasks to 150ms (15 ticks) (批量延期所有任务到 150毫秒 (15个tick))
1336        let updates: Vec<_> = task_ids
1337            .iter()
1338            .map(|&id| (id, Duration::from_millis(150)))
1339            .collect();
1340        let postponed_count = wheel.postpone_batch(updates);
1341        assert_eq!(postponed_count, 5);
1342        
1343        // Advance 5 ticks (50ms), task should not trigger (前进 50毫秒 (5个tick),任务不应该触发)
1344        for _ in 0..5 {
1345            let expired = wheel.advance();
1346            assert!(expired.is_empty(), "The first 5 ticks should not have tasks triggered");
1347        }
1348        
1349        // Continue advancing 10 ticks (from tick 5 to tick 15), all tasks should trigger on the 15th tick (继续前进 10个tick (从tick 5到tick 15),所有任务应该在第 15 个 tick 触发)
1350        let mut total_triggered = 0;
1351        for _ in 0..10 {
1352            let expired = wheel.advance();
1353            total_triggered += expired.len();
1354        }
1355        assert_eq!(total_triggered, 5, "There should be 5 tasks triggered on the 15th tick"); // 第 15 个 tick 应该有 5 个任务触发
1356    }
1357
1358    #[test]
1359    fn test_postpone_batch_partial() {
1360        use crate::task::{TimerTask, CompletionNotifier};
1361        
1362        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1363        
1364        // Insert 10 tasks, delay 50ms (5 ticks) (插入 10 个任务,延迟 50毫秒 (5个tick))
1365        let mut task_ids = Vec::new();
1366        for _ in 0..10 {
1367            let callback = CallbackWrapper::new(|| async {});
1368            let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1369            let notifier = CompletionNotifier(completion_tx);
1370            let task = TimerTask::new(Duration::from_millis(50), Some(callback));
1371            let task_id = wheel.insert(task, notifier);
1372            task_ids.push(task_id);
1373        }
1374        
1375        // Only postpone the first 5 tasks to 150ms, including a nonexistent task (只延期前 5 个任务到 150毫秒,包括一个不存在任务)
1376        let fake_task_id = TaskId::new();
1377        let mut updates: Vec<_> = task_ids[0..5]
1378            .iter()
1379            .map(|&id| (id, Duration::from_millis(150)))
1380            .collect();
1381        updates.push((fake_task_id, Duration::from_millis(150)));
1382        
1383        let postponed_count = wheel.postpone_batch(updates);
1384        assert_eq!(postponed_count, 5, "There should be 5 tasks successfully postponed (fake_task_id failed)"); // 应该有 5 个任务成功延期 (fake_task_id 失败)
1385        
1386        // Advance 5 ticks (50ms), the last 5 tasks that were not postponed should trigger (前进 50毫秒 (5个tick),最后一个没有延期的任务应该触发)
1387        let mut triggered_at_50ms = 0;
1388        for _ in 0..5 {
1389            let expired = wheel.advance();
1390            triggered_at_50ms += expired.len();
1391        }
1392        assert_eq!(triggered_at_50ms, 5, "There should be 5 tasks that were not postponed triggered on the 5th tick"); // 第 5 个 tick 应该有 5 个任务没有延期触发
1393        
1394        // Continue advancing 10 ticks (from tick 5 to tick 15), the first 5 tasks that were postponed should trigger (继续前进 10个tick (从tick 5到tick 15),第一个 5 个任务应该触发)
1395        let mut triggered_at_150ms = 0;
1396        for _ in 0..10 {
1397            let expired = wheel.advance();
1398            triggered_at_150ms += expired.len();
1399        }
1400        assert_eq!(triggered_at_150ms, 5, "There should be 5 tasks that were postponed triggered on the 15th tick"); // 第 15 个 tick 应该有 5 个任务延期触发
1401    }
1402
1403    #[test]
1404    fn test_multi_round_tasks() {
1405        use crate::task::{TimerTask, CompletionNotifier};
1406        
1407        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1408        
1409        // Test multi-round tasks in L1 layer in hierarchical mode (在分层模式下测试 L1 层的多轮任务)
1410        // L1: 64 slots * 1000ms = 64000ms
1411        // Insert a task that exceeds one L1 circle, delay 120000ms (120 seconds) (插入一个超过一个 L1 圈的任务,延迟 120000毫秒 (120秒))
1412        let callback = CallbackWrapper::new(|| async {});
1413        let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1414        let notifier = CompletionNotifier(completion_tx);
1415        let task = TimerTask::new(Duration::from_secs(120), Some(callback));
1416        let task_id = wheel.insert(task, notifier);
1417        
1418        // 120000ms / 1000ms = 120 L1 ticks (120000毫秒 / 1000毫秒 = 120个L1 tick)
1419        // 120 ticks / 64 slots = 1 round + 56 ticks (120个tick / 64个槽 = 1轮 + 56个tick)
1420        // Task should be in L1 layer, slot 56, rounds = 1 (任务应该在 L1 层,槽 56,轮数 = 1)
1421        
1422        // Verify task is in L1 layer (验证任务在 L1 层)
1423        let location = wheel.task_index.get(&task_id).unwrap();
1424        assert_eq!(location.level, 1);
1425        
1426        // L1 tick advances every 100 L0 ticks (L1 tick 每 100 个 L0 tick 前进一次)
1427        // Advance 64 * 100 = 6400 L0 ticks (complete the first round of L1) (前进 64 * 100 = 6400 L0 tick (完成 L1 的第一轮))
1428        for _ in 0..6400 {
1429            let _expired = wheel.advance();
1430            // During the first round of L1, the task should not be demoted or triggered (在 L1 的第一轮中,任务不应该被降级或触发)
1431        }
1432        
1433        // Task should still be in L1 layer, but rounds has decreased (任务应该仍在 L1 层,但轮数已经减少)
1434        let location = wheel.task_index.get(&task_id);
1435        if let Some(loc) = location {
1436            assert_eq!(loc.level, 1);
1437        }
1438        
1439        // Continue advancing until the task is triggered (approximately another 56 * 100 = 5600 L0 ticks) (继续前进,直到任务被触发 (大约另一个 56 * 100 = 5600 L0 tick))
1440        let mut triggered = false;
1441        for _ in 0..6000 {
1442            let expired = wheel.advance();
1443            if expired.iter().any(|t| t.id == task_id) {
1444                triggered = true;
1445                break;
1446            }
1447        }
1448        assert!(triggered, "Task should be triggered in the second round of L1"); // 任务应该在 L1 的第二轮中触发
1449    }
1450
1451    #[test]
1452    fn test_minimum_delay() {
1453        use crate::task::{TimerTask, CompletionNotifier};
1454        
1455        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1456        
1457        // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1458        let callback = CallbackWrapper::new(|| async {});
1459        let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1460        let notifier = CompletionNotifier(completion_tx);
1461        let task = TimerTask::new(Duration::from_millis(1), Some(callback));
1462        let task_id: TaskId = wheel.insert(task, notifier);
1463        
1464        // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1465        let expired = wheel.advance();
1466        assert_eq!(expired.len(), 1, "Minimum delay task should be triggered after 1 tick"); // 最小延迟任务应该在 1 个 tick 后触发
1467        assert_eq!(expired[0].id, task_id);
1468    }
1469
1470    #[test]
1471    fn test_empty_batch_operations() {
1472        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1473        
1474        // Test empty batch insert (测试空批量插入)
1475        let task_ids = wheel.insert_batch(vec![]);
1476        assert_eq!(task_ids.len(), 0);
1477        
1478        // Test empty batch cancel (测试空批量取消)
1479        let cancelled = wheel.cancel_batch(&[]);
1480        assert_eq!(cancelled, 0);
1481        
1482        // Test empty batch postpone (测试空批量延期)
1483        let postponed = wheel.postpone_batch(vec![]);
1484        assert_eq!(postponed, 0);
1485    }
1486
1487    #[test]
1488    fn test_postpone_same_task_multiple_times() {
1489        use crate::task::{TimerTask, CompletionNotifier};
1490        
1491        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1492        
1493        // Insert task (插入任务)
1494        let callback = CallbackWrapper::new(|| async {});
1495        let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1496        let notifier = CompletionNotifier(completion_tx);
1497        let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1498        let task_id = wheel.insert(task, notifier);
1499        
1500        // First postpone (第一次延期)
1501        let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1502        assert!(postponed, "First postpone should succeed");
1503        
1504        // Second postpone (第二次延期)
1505        let postponed = wheel.postpone(task_id, Duration::from_millis(300), None);
1506        assert!(postponed, "Second postpone should succeed");
1507        
1508        // Third postpone (第三次延期)  
1509        let postponed = wheel.postpone(task_id, Duration::from_millis(50), None);
1510        assert!(postponed, "Third postpone should succeed");
1511        
1512        // Verify task is triggered at the last postpone time (50ms = 5 ticks) (验证任务在最后一次延期时触发 (50毫秒 = 5个tick))
1513        let mut triggered = false;
1514        for _ in 0..5 {
1515            let expired = wheel.advance();
1516            if !expired.is_empty() {
1517                assert_eq!(expired.len(), 1);
1518                assert_eq!(expired[0].id, task_id);
1519                triggered = true;
1520                break;
1521            }
1522        }
1523        assert!(triggered, "Task should be triggered at the last postpone time"); // 任务应该在最后一次延期时触发
1524    }
1525
1526    #[test]
1527    fn test_advance_empty_slots() {
1528        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1529        
1530        // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1531        for _ in 0..100 {
1532            let expired = wheel.advance();
1533            assert!(expired.is_empty(), "Empty slots should not return any tasks");
1534        }
1535        
1536        assert_eq!(wheel.current_tick(), 100, "current_tick should correctly increment"); // current_tick 应该正确递增
1537    }
1538
1539    #[test]
1540    fn test_cancel_after_postpone() {
1541        use crate::task::{TimerTask, CompletionNotifier};
1542        
1543        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1544        
1545        // Insert task (插入任务)
1546        let callback = CallbackWrapper::new(|| async {});
1547        let (completion_tx, _completion_rx) = tokio::sync::oneshot::channel();
1548        let notifier = CompletionNotifier(completion_tx);
1549        let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1550        let task_id = wheel.insert(task, notifier);
1551        
1552        // Postpone task (延期任务)
1553        let postponed = wheel.postpone(task_id, Duration::from_millis(200), None);
1554        assert!(postponed, "Postpone should succeed");
1555        
1556        // Cancel postponed task (取消延期的任务)
1557        let cancelled = wheel.cancel(task_id);
1558        assert!(cancelled, "Cancel should succeed");
1559        
1560        // Advance to original time, task should not trigger (前进到原始时间,任务不应该触发)
1561        for _ in 0..20 {
1562            let expired = wheel.advance();
1563            assert!(expired.is_empty(), "Cancelled task should not trigger"); // 取消的任务不应该触发
1564        }
1565        
1566        assert!(wheel.is_empty(), "Wheel should be empty"); // 时间轮应该为空
1567    }
1568
1569    #[test]
1570    fn test_slot_boundary() {
1571        use crate::task::{TimerTask, CompletionNotifier};
1572        
1573        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1574        
1575        // Test slot boundary and wraparound (测试槽边界和环绕)
1576        // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1577        let callback1 = CallbackWrapper::new(|| async {});
1578        let (tx1, _rx1) = tokio::sync::oneshot::channel();
1579        let task1 = TimerTask::new(Duration::from_millis(10), Some(callback1));
1580        let task_id_1 = wheel.insert(task1, CompletionNotifier(tx1));
1581        
1582        // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发) 
1583        let callback2 = CallbackWrapper::new(|| async {});
1584        let (tx2, _rx2) = tokio::sync::oneshot::channel();
1585        let task2 = TimerTask::new(Duration::from_millis(5110), Some(callback2));
1586        let task_id_2 = wheel.insert(task2, CompletionNotifier(tx2));
1587        
1588        // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1589        let expired = wheel.advance();
1590        assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1591        assert_eq!(expired[0].id, task_id_1);
1592        
1593        // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1594        let mut triggered = false;
1595        for i in 0..510 {
1596            let expired = wheel.advance();
1597            if !expired.is_empty() {
1598                assert_eq!(expired.len(), 1, "The {}th advance should trigger the second task", i + 2); // 第 {i + 2} 次前进应该触发第二个任务
1599                assert_eq!(expired[0].id, task_id_2);
1600                triggered = true;
1601                break;
1602            }
1603        }
1604        assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1605        
1606        assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1607    }
1608
1609    #[test]
1610    fn test_batch_cancel_small_threshold() {
1611        use crate::task::{TimerTask, CompletionNotifier};
1612        
1613        // Test small batch threshold optimization (测试小批量阈值优化)
1614        let batch_config = BatchConfig {
1615            small_batch_threshold: 5,
1616        };
1617        let mut wheel = Wheel::new(WheelConfig::default(), batch_config);
1618        
1619        // Insert 10 tasks (插入 10 个任务)
1620        let mut task_ids = Vec::new();
1621        for _ in 0..10 {
1622            let callback = CallbackWrapper::new(|| async {});
1623            let (tx, _rx) = tokio::sync::oneshot::channel();
1624            let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1625            let task_id = wheel.insert(task, CompletionNotifier(tx));
1626            task_ids.push(task_id);
1627        }
1628        
1629        // Small batch cancel (should use direct cancel path) (小批量取消 (应该使用直接取消路径))
1630        let cancelled = wheel.cancel_batch(&task_ids[0..3]);
1631        assert_eq!(cancelled, 3);
1632        
1633        // Large batch cancel (should use grouped optimization path) (大批量取消 (应该使用分组优化路径))
1634        let cancelled = wheel.cancel_batch(&task_ids[3..10]);
1635        assert_eq!(cancelled, 7);
1636        
1637        assert!(wheel.is_empty()); // 时间轮应该为空
1638    }
1639
1640    #[test]
1641    fn test_task_id_uniqueness() {
1642        use crate::task::{TimerTask, CompletionNotifier};
1643        
1644        let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1645        
1646        // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1647        let mut task_ids = std::collections::HashSet::new();
1648        for _ in 0..100 {
1649            let callback = CallbackWrapper::new(|| async {});
1650            let (tx, _rx) = tokio::sync::oneshot::channel();
1651            let task = TimerTask::new(Duration::from_millis(100), Some(callback));
1652            let task_id = wheel.insert(task, CompletionNotifier(tx));
1653            
1654            assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1655        }
1656        
1657        assert_eq!(task_ids.len(), 100);
1658    }
1659}
1660