kestrel_timer/wheel.rs
1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{
4 TaskCompletion, TaskHandle, TaskId, TaskLocation, TaskTypeWithCompletionNotifier,
5 TimerTaskForWheel, TimerTaskWithCompletionNotifier,
6};
7use deferred_map::DeferredMap;
8use std::time::Duration;
9
10pub struct WheelAdvanceResult {
11 pub id: TaskId,
12 pub callback: Option<CallbackWrapper>,
13}
14
15/// Timing wheel single layer data structure
16///
17/// 时间轮单层数据结构
18struct WheelLayer {
19 /// Slot array, each slot stores a group of timer tasks
20 ///
21 /// 槽数组,每个槽存储一组定时器任务
22 slots: Vec<Vec<TimerTaskForWheel>>,
23
24 /// Current time pointer (tick index)
25 ///
26 /// 当前时间指针(tick 索引)
27 current_tick: u64,
28
29 /// Slot count
30 ///
31 /// 槽数量
32 slot_count: usize,
33
34 /// Duration of each tick
35 ///
36 /// 每个 tick 的持续时间
37 tick_duration: Duration,
38
39 /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
40 ///
41 /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
42 tick_duration_ms: u64,
43
44 /// Cache: slot mask (slot_count - 1) - for fast modulo operation
45 ///
46 /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
47 slot_mask: usize,
48}
49
50impl WheelLayer {
51 /// Create a new wheel layer
52 ///
53 /// 创建一个新的时间轮层
54 fn new(slot_count: usize, tick_duration: Duration) -> Self {
55 let mut slots = Vec::with_capacity(slot_count);
56 // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
57 // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
58 //
59 // 为每个槽预分配容量以减少后续 push 时的重新分配
60 // 大多数槽通常包含 0-4 个任务,预分配容量为 4
61 for _ in 0..slot_count {
62 slots.push(Vec::with_capacity(4));
63 }
64
65 let tick_duration_ms = tick_duration.as_millis() as u64;
66 let slot_mask = slot_count - 1;
67
68 Self {
69 slots,
70 current_tick: 0,
71 slot_count,
72 tick_duration,
73 tick_duration_ms,
74 slot_mask,
75 }
76 }
77
78 /// Calculate the number of ticks corresponding to the delay
79 ///
80 /// 计算延迟对应的 tick 数量
81 fn delay_to_ticks(&self, delay: Duration) -> u64 {
82 let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
83 ticks.max(1) // at least 1 tick (至少 1 个 tick)
84 }
85}
86
87/// Timing wheel data structure (hierarchical mode)
88///
89/// Now uses DeferredMap for O(1) task lookup and generational safety.
90///
91/// 时间轮数据结构(分层模式)
92///
93/// 现在使用 DeferredMap 实现 O(1) 任务查找和代数安全
94pub struct Wheel {
95 /// L0 layer (bottom layer)
96 ///
97 /// L0 层(底层)
98 l0: WheelLayer,
99
100 /// L1 layer (top layer)
101 ///
102 /// L1 层(顶层)
103 l1: WheelLayer,
104
105 /// L1 tick ratio relative to L0 tick
106 ///
107 /// L1 tick 相对于 L0 tick 的比率
108 pub(crate) l1_tick_ratio: u64,
109
110 /// Task index for fast lookup and cancellation using DeferredMap
111 ///
112 /// Keys are TaskIds (u64 from DeferredMap), values are TaskLocations
113 ///
114 /// 任务索引,使用 DeferredMap 实现快速查找和取消
115 ///
116 /// 键是 TaskId(来自 DeferredMap 的 u64),值是 TaskLocation
117 pub(crate) task_index: DeferredMap<TaskLocation>,
118
119 /// Batch processing configuration
120 ///
121 /// 批处理配置
122 batch_config: BatchConfig,
123
124 /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
125 ///
126 /// 缓存:L0 层容量(毫秒)- 避免重复计算
127 l0_capacity_ms: u64,
128
129 /// Cache: L1 layer capacity in ticks - avoid repeated calculation
130 ///
131 /// 缓存:L1 层容量(tick 数)- 避免重复计算
132 l1_capacity_ticks: u64,
133}
134
135impl Wheel {
136 /// Create new timing wheel
137 ///
138 /// # Parameters
139 /// - `config`: Timing wheel configuration (already validated)
140 /// - `batch_config`: Batch processing configuration
141 ///
142 /// # Notes
143 /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
144 /// Now uses DeferredMap for task indexing with generational safety.
145 ///
146 /// 创建新的时间轮
147 ///
148 /// # 参数
149 /// - `config`: 时间轮配置(已验证)
150 /// - `batch_config`: 批处理配置
151 ///
152 /// # 注意
153 /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
154 /// 现在使用 DeferredMap 进行任务索引,具有代数安全特性。
155 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
156 let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
157 let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
158
159 // Calculate L1 tick ratio relative to L0 tick
160 // 计算 L1 tick 相对于 L0 tick 的比率
161 let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
162
163 // Pre-calculate capacity to avoid repeated calculation in insert
164 // 预计算容量,避免在 insert 中重复计算
165 let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
166 let l1_capacity_ticks = l1.slot_count as u64;
167
168 // Estimate initial capacity for DeferredMap based on L0 slot count
169 // 根据 L0 槽数量估算 DeferredMap 的初始容量
170 let estimated_capacity = (l0.slot_count / 4).max(64);
171
172 Self {
173 l0,
174 l1,
175 l1_tick_ratio,
176 task_index: DeferredMap::with_capacity(estimated_capacity),
177 batch_config,
178 l0_capacity_ms,
179 l1_capacity_ticks,
180 }
181 }
182
183 /// Get current tick (L0 layer tick)
184 ///
185 /// 获取当前 tick(L0 层 tick)
186 #[allow(dead_code)]
187 pub fn current_tick(&self) -> u64 {
188 self.l0.current_tick
189 }
190
191 /// Get tick duration (L0 layer tick duration)
192 ///
193 /// 获取 tick 持续时间(L0 层 tick 持续时间)
194 #[allow(dead_code)]
195 pub fn tick_duration(&self) -> Duration {
196 self.l0.tick_duration
197 }
198
199 /// Get slot count (L0 layer slot count)
200 ///
201 /// 获取槽数量(L0 层槽数量)
202 #[allow(dead_code)]
203 pub fn slot_count(&self) -> usize {
204 self.l0.slot_count
205 }
206
207 /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
208 ///
209 /// 计算延迟对应的 tick 数量(基于 L0 层)
210 #[allow(dead_code)]
211 pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
212 self.l0.delay_to_ticks(delay)
213 }
214
215 /// Determine which layer the delay should be inserted into
216 ///
217 /// # Returns
218 /// Returns: (layer, ticks, rounds)
219 /// - Layer: 0 = L0, 1 = L1
220 /// - Ticks: number of ticks calculated from current tick
221 /// - Rounds: number of rounds (only used for very long delays in L1 layer)
222 ///
223 /// 确定延迟应该插入到哪一层
224 ///
225 /// # 返回值
226 /// 返回:(层级, ticks, 轮数)
227 /// - 层级:0 = L0, 1 = L1
228 /// - Ticks:从当前 tick 计算的 tick 数量
229 /// - 轮数:轮数(仅用于 L1 层的超长延迟)
230 #[inline(always)]
231 fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
232 let delay_ms = delay.as_millis() as u64;
233
234 // Fast path: most tasks are within L0 range (using cached capacity)
235 // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
236 if delay_ms < self.l0_capacity_ms {
237 let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
238 return (0, l0_ticks, 0);
239 }
240
241 // Slow path: L1 layer tasks (using cached values)
242 // 慢速路径:L1 层任务(使用缓存的值)
243 let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
244
245 if l1_ticks < self.l1_capacity_ticks {
246 (1, l1_ticks, 0)
247 } else {
248 let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
249 (1, l1_ticks, rounds)
250 }
251 }
252
253 /// Allocate a handle from DeferredMap
254 ///
255 /// 从 DeferredMap 分配一个 handle
256 ///
257 /// # Returns
258 /// A unique handle for later insertion
259 ///
260 /// # 返回值
261 /// 用于后续插入的唯一 handle
262 pub fn allocate_handle(&mut self) -> TaskHandle {
263 TaskHandle::new(self.task_index.allocate_handle())
264 }
265
266 /// Batch allocate handles from DeferredMap
267 ///
268 /// 从 DeferredMap 批量分配 handles
269 ///
270 /// # Parameters
271 /// - `count`: Number of handles to allocate
272 ///
273 /// # Returns
274 /// Vector of unique handles for later batch insertion
275 ///
276 /// # 参数
277 /// - `count`: 要分配的 handle 数量
278 ///
279 /// # 返回值
280 /// 用于后续批量插入的唯一 handles 向量
281 pub fn allocate_handles(&mut self, count: usize) -> Vec<TaskHandle> {
282 let mut handles = Vec::with_capacity(count);
283 for _ in 0..count {
284 handles.push(TaskHandle::new(self.task_index.allocate_handle()));
285 }
286 handles
287 }
288
289 /// Insert timer task
290 ///
291 /// # Parameters
292 /// - `handle`: Handle for the task
293 /// - `task`: Timer task with completion notifier
294 ///
295 /// # Returns
296 /// Unique identifier of the task (TaskId) - now generated from DeferredMap
297 ///
298 /// # Implementation Details
299 /// - Allocates Handle from DeferredMap to generate TaskId with generational safety
300 /// - Automatically calculate the layer and slot where the task should be inserted
301 /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
302 /// - Use bit operations to optimize slot index calculation
303 /// - Use DeferredMap for O(1) lookup and cancellation with generation checking
304 ///
305 /// 插入定时器任务
306 ///
307 /// # 参数
308 /// - `handle`: 任务的 handle
309 /// - `task`: 带完成通知器的定时器任务
310 ///
311 /// # 返回值
312 /// 任务的唯一标识符(TaskId)- 现在从 DeferredMap 生成
313 ///
314 /// # 实现细节
315 /// - 自动计算任务应该插入的层级和槽位
316 /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
317 /// - 使用位运算优化槽索引计算
318 /// - 使用 DeferredMap 实现 O(1) 查找和取消,带代数检查
319 #[inline]
320 pub fn insert(&mut self, handle: TaskHandle, task: TimerTaskWithCompletionNotifier) {
321 let task_id = handle.task_id();
322
323 let (level, ticks, rounds) = self.determine_layer(task.delay);
324
325 // Use match to reduce branches, and use cached slot mask
326 // 使用 match 减少分支,并使用缓存的槽掩码
327 let (current_tick, slot_mask, slots) = match level {
328 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
329 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
330 };
331
332 let total_ticks = current_tick + ticks;
333 let slot_index = (total_ticks as usize) & slot_mask;
334
335 // Create task with the assigned TaskId
336 // 使用分配的 TaskId 创建任务
337 let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
338
339 // Get the index position of the task in Vec (the length before insertion is the index of the new task)
340 // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
341 let vec_index = slots[slot_index].len();
342 let location = TaskLocation::new(level, slot_index, vec_index);
343
344 // Insert task into slot
345 // 将任务插入槽中
346 slots[slot_index].push(task);
347
348 // Insert task location into DeferredMap using handle
349 // 使用 handle 将任务位置插入 DeferredMap
350 self.task_index
351 .insert(handle.into_handle(), location);
352 }
353
354 /// Batch insert timer tasks
355 ///
356 /// # Parameters
357 /// - `handles`: List of pre-allocated handles for tasks
358 /// - `tasks`: List of timer tasks with completion notifiers
359 ///
360 /// # Returns
361 /// - `Ok(())` if all tasks are successfully inserted
362 /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
363 ///
364 /// # Performance Advantages
365 /// - Reduce repeated boundary checks and capacity adjustments
366 /// - For tasks with the same delay, calculation results can be reused
367 /// - Uses DeferredMap for efficient task indexing with generational safety
368 ///
369 /// 批量插入定时器任务
370 ///
371 /// # 参数
372 /// - `handles`: 任务的预分配 handles 列表
373 /// - `tasks`: 带完成通知器的定时器任务列表
374 ///
375 /// # 返回值
376 /// - `Ok(())` 如果所有任务成功插入
377 /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
378 ///
379 /// # 性能优势
380 /// - 减少重复的边界检查和容量调整
381 /// - 对于相同延迟的任务,可以重用计算结果
382 /// - 使用 DeferredMap 实现高效的任务索引,具有代数安全特性
383 #[inline]
384 pub fn insert_batch(
385 &mut self,
386 handles: Vec<TaskHandle>,
387 tasks: Vec<TimerTaskWithCompletionNotifier>,
388 ) -> Result<(), crate::error::TimerError> {
389 // Validate that handles and tasks have the same length
390 // 验证 handles 和 tasks 长度相同
391 if handles.len() != tasks.len() {
392 return Err(crate::error::TimerError::BatchLengthMismatch {
393 handles_len: handles.len(),
394 tasks_len: tasks.len(),
395 });
396 }
397
398 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
399 let task_id = handle.task_id();
400
401 let (level, ticks, rounds) = self.determine_layer(task.delay);
402
403 // Use match to reduce branches, and use cached slot mask
404 // 使用 match 减少分支,并使用缓存的槽掩码
405 let (current_tick, slot_mask, slots) = match level {
406 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
407 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
408 };
409
410 let total_ticks = current_tick + ticks;
411 let slot_index = (total_ticks as usize) & slot_mask;
412
413 // Create task with the assigned TaskId
414 // 使用分配的 TaskId 创建任务
415 let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
416
417 // Get the index position of the task in Vec
418 // 获取任务在 Vec 中的索引位置
419 let vec_index = slots[slot_index].len();
420 let location = TaskLocation::new(level, slot_index, vec_index);
421
422 // Insert task into slot
423 // 将任务插入槽中
424 slots[slot_index].push(task);
425
426 // Insert task location into DeferredMap using handle
427 // 使用 handle 将任务位置插入 DeferredMap
428 self.task_index
429 .insert(handle.into_handle(), location);
430 }
431
432 Ok(())
433 }
434
435 /// Cancel timer task
436 ///
437 /// # Parameters
438 /// - `task_id`: Task ID
439 ///
440 /// # Returns
441 /// Returns true if the task exists and is successfully cancelled, otherwise returns false
442 ///
443 /// Now uses DeferredMap for safe task removal with generational checking
444 ///
445 /// 取消定时器任务
446 ///
447 /// # 参数
448 /// - `task_id`: 任务 ID
449 ///
450 /// # 返回值
451 /// 如果任务存在且成功取消则返回 true,否则返回 false
452 ///
453 /// 现在使用 DeferredMap 实现安全的任务移除,带代数检查
454 #[inline]
455 pub fn cancel(&mut self, task_id: TaskId) -> bool {
456 // Remove task location from DeferredMap using TaskId as key
457 // 使用 TaskId 作为 key 从 DeferredMap 中移除任务位置
458 let location = match self.task_index.remove(task_id.key()) {
459 Some(loc) => loc,
460 None => return false, // Task not found or already removed (generation mismatch)
461 };
462
463 // Use match to get slot reference, reduce branches
464 // 使用 match 获取槽引用,减少分支
465 let slot = match location.level {
466 0 => &mut self.l0.slots[location.slot_index],
467 _ => &mut self.l1.slots[location.slot_index],
468 };
469
470 // Boundary check and ID verification
471 // 边界检查和 ID 验证
472 if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
473 // Index inconsistent - this shouldn't happen with DeferredMap, but handle it anyway
474 // 索引不一致 - DeferredMap 不应该出现这种情况,但还是处理一下
475 return false;
476 }
477
478 // Use swap_remove to remove task, record swapped task ID
479 // 使用 swap_remove 移除任务,记录被交换的任务 ID
480 let removed_task = slot.swap_remove(location.vec_index);
481
482 // Ensure the correct task was removed
483 // 确保移除了正确的任务
484 debug_assert_eq!(removed_task.get_id(), task_id);
485
486 match removed_task.into_task_type() {
487 TaskTypeWithCompletionNotifier::OneShot {
488 completion_notifier,
489 } => {
490 // Use Notify + AtomicU8 for zero-allocation notification
491 // 使用 Notify + AtomicU8 实现零分配通知
492 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
493 }
494 TaskTypeWithCompletionNotifier::Periodic {
495 completion_notifier,
496 ..
497 } => {
498 // Use flume for high-performance periodic notification
499 // 使用 flume 实现高性能周期通知
500 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
501 }
502 }
503
504 // If a swap occurred (vec_index is not the last element)
505 // 如果发生了交换(vec_index 不是最后一个元素)
506 if location.vec_index < slot.len() {
507 let swapped_task_id = slot[location.vec_index].get_id();
508 // Update swapped element's index in one go, using DeferredMap's get_mut
509 // 一次性更新被交换元素的索引,使用 DeferredMap 的 get_mut
510 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
511 swapped_location.vec_index = location.vec_index;
512 }
513 }
514
515 true
516 }
517
518 /// Batch cancel timer tasks
519 ///
520 /// # Parameters
521 /// - `task_ids`: List of task IDs to cancel
522 ///
523 /// # Returns
524 /// Number of successfully cancelled tasks
525 ///
526 /// # Performance Advantages
527 /// - Reduce repeated HashMap lookup overhead
528 /// - Multiple cancellation operations on the same slot can be batch processed
529 /// - Use unstable sort to improve performance
530 /// - Small batch optimization: skip sorting based on configuration threshold, process directly
531 ///
532 /// 批量取消定时器任务
533 ///
534 /// # 参数
535 /// - `task_ids`: 要取消的任务 ID 列表
536 ///
537 /// # 返回值
538 /// 成功取消的任务数量
539 ///
540 /// # 性能优势
541 /// - 减少重复的 HashMap 查找开销
542 /// - 同一槽位的多个取消操作可以批量处理
543 /// - 使用不稳定排序提高性能
544 /// - 小批量优化:根据配置阈值跳过排序,直接处理
545 #[inline]
546 pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
547 let mut cancelled_count = 0;
548
549 // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
550 // 小批量优化:逐个取消以避免分组和排序开销
551 if task_ids.len() <= self.batch_config.small_batch_threshold {
552 for &task_id in task_ids {
553 if self.cancel(task_id) {
554 cancelled_count += 1;
555 }
556 }
557 return cancelled_count;
558 }
559
560 // Group by layer and slot to optimize batch cancellation
561 // Use SmallVec to avoid heap allocation in most cases
562 // 按层级和槽位分组以优化批量取消
563 // 使用 SmallVec 避免在大多数情况下进行堆分配
564 let l0_slot_count = self.l0.slot_count;
565 let l1_slot_count = self.l1.slot_count;
566
567 let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
568 let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
569
570 // Collect information of tasks to be cancelled
571 // 收集要取消的任务信息
572 for &task_id in task_ids {
573 // Use DeferredMap's get with TaskId key
574 // 使用 DeferredMap 的 get,传入 TaskId key
575 if let Some(location) = self.task_index.get(task_id.key()) {
576 if location.level == 0 {
577 l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
578 } else {
579 l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
580 }
581 }
582 }
583
584 // Process L0 layer cancellation
585 // 处理 L0 层取消
586 for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
587 if tasks.is_empty() {
588 continue;
589 }
590
591 // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
592 // 按 vec_index 降序排序,从后向前删除以避免索引失效
593 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
594
595 let slot = &mut self.l0.slots[slot_index];
596
597 for &(task_id, vec_index) in tasks.iter() {
598 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
599 let removed_task = slot.swap_remove(vec_index);
600 match removed_task.into_task_type() {
601 TaskTypeWithCompletionNotifier::OneShot {
602 completion_notifier,
603 } => {
604 // Use Notify + AtomicU8 for zero-allocation notification
605 // 使用 Notify + AtomicU8 实现零分配通知
606 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
607 }
608 TaskTypeWithCompletionNotifier::Periodic {
609 completion_notifier,
610 ..
611 } => {
612 // Use flume for high-performance periodic notification
613 // 使用 flume 实现高性能周期通知
614 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
615 }
616 }
617
618 if vec_index < slot.len() {
619 let swapped_task_id = slot[vec_index].get_id();
620 // Use DeferredMap's get_mut with TaskId key
621 // 使用 DeferredMap 的 get_mut,传入 TaskId key
622 if let Some(swapped_location) =
623 self.task_index.get_mut(swapped_task_id.key())
624 {
625 swapped_location.vec_index = vec_index;
626 }
627 }
628
629 // Use DeferredMap's remove with TaskId key
630 // 使用 DeferredMap 的 remove,传入 TaskId key
631 self.task_index.remove(task_id.key());
632 cancelled_count += 1;
633 }
634 }
635 }
636
637 // Process L1 layer cancellation
638 // 处理 L1 层取消
639 for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
640 if tasks.is_empty() {
641 continue;
642 }
643
644 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
645
646 let slot = &mut self.l1.slots[slot_index];
647
648 for &(task_id, vec_index) in tasks.iter() {
649 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
650 let removed_task = slot.swap_remove(vec_index);
651
652 match removed_task.into_task_type() {
653 TaskTypeWithCompletionNotifier::OneShot {
654 completion_notifier,
655 } => {
656 // Use Notify + AtomicU8 for zero-allocation notification
657 // 使用 Notify + AtomicU8 实现零分配通知
658 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
659 }
660 TaskTypeWithCompletionNotifier::Periodic {
661 completion_notifier,
662 ..
663 } => {
664 // Use flume for high-performance periodic notification
665 // 使用 flume 实现高性能周期通知
666 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
667 }
668 }
669
670 if vec_index < slot.len() {
671 let swapped_task_id = slot[vec_index].get_id();
672 // Use DeferredMap's get_mut with TaskId key
673 // 使用 DeferredMap 的 get_mut,传入 TaskId key
674 if let Some(swapped_location) =
675 self.task_index.get_mut(swapped_task_id.key())
676 {
677 swapped_location.vec_index = vec_index;
678 }
679 }
680
681 // Use DeferredMap's remove with TaskId key
682 // 使用 DeferredMap 的 remove,传入 TaskId key
683 self.task_index.remove(task_id.key());
684 cancelled_count += 1;
685 }
686 }
687 }
688
689 cancelled_count
690 }
691
692 /// Reinsert periodic task with the same TaskId
693 ///
694 /// Used to automatically reschedule periodic tasks after they expire
695 ///
696 /// TaskId remains unchanged, only location is updated in DeferredMap
697 ///
698 /// 重新插入周期性任务(保持相同的 TaskId)
699 ///
700 /// 用于在周期性任务过期后自动重新调度
701 ///
702 /// TaskId 保持不变,只更新 DeferredMap 中的位置
703 fn reinsert_periodic_task(&mut self, task_id: TaskId, task: TimerTaskWithCompletionNotifier) {
704 // Determine which layer the interval should be inserted into
705 // This method is only called by periodic tasks, so the interval is guaranteed to be Some.
706 // 确定间隔应该插入到哪一层
707 // 该方法只能由周期性任务调用,所以间隔是 guaranteed 应当保证为 Some.
708 let (level, ticks, rounds) = self.determine_layer(task.get_interval().unwrap());
709
710 // Use match to reduce branches, and use cached slot mask
711 // 使用 match 减少分支,并使用缓存的槽掩码
712 let (current_tick, slot_mask, slots) = match level {
713 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
714 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
715 };
716
717 let total_ticks = current_tick + ticks;
718 let slot_index = (total_ticks as usize) & slot_mask;
719
720 // Get the index position of the task in Vec
721 // 获取任务在 Vec 中的索引位置
722 let vec_index = slots[slot_index].len();
723 let new_location = TaskLocation::new(level, slot_index, vec_index);
724
725 // Insert task into slot
726 // 将任务插入槽中
727 slots[slot_index].push(TimerTaskForWheel::new_with_id(
728 task_id,
729 task,
730 total_ticks,
731 rounds,
732 ));
733
734 // Update task location in DeferredMap (doesn't remove, just updates)
735 // 更新 DeferredMap 中的任务位置(不删除,仅更新)
736 if let Some(location) = self.task_index.get_mut(task_id.key()) {
737 *location = new_location;
738 }
739 }
740
741 /// Advance the timing wheel by one tick, return all expired tasks
742 ///
743 /// # Returns
744 /// List of expired tasks
745 ///
746 /// # Implementation Details
747 /// - L0 layer advances 1 tick each time (no rounds check)
748 /// - L1 layer advances once every (L1_tick / L0_tick) times
749 /// - L1 expired tasks are batch demoted to L0
750 ///
751 /// 推进时间轮一个 tick,返回所有过期的任务
752 ///
753 /// # 返回值
754 /// 过期任务列表
755 ///
756 /// # 实现细节
757 /// - L0 层每次推进 1 个 tick(无轮数检查)
758 /// - L1 层每 (L1_tick / L0_tick) 次推进一次
759 /// - L1 层过期任务批量降级到 L0
760 pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
761 // Advance L0 layer
762 // 推进 L0 层
763 self.l0.current_tick += 1;
764
765 let mut expired_tasks = Vec::new();
766
767 // Process L0 layer expired tasks
768 // Distinguish between one-shot and periodic tasks
769 // 处理 L0 层过期任务
770 // 区分一次性任务和周期性任务
771 let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
772
773 // Collect periodic tasks to reinsert
774 // 收集需要重新插入的周期性任务
775 let mut periodic_tasks_to_reinsert = Vec::new();
776
777 {
778 let l0_slot = &mut self.l0.slots[l0_slot_index];
779
780 // Process all tasks in the current slot by always removing from index 0
781 // This avoids index tracking issues with swap_remove
782 // 通过始终从索引 0 移除来处理当前槽中的所有任务
783 // 这避免了 swap_remove 的索引跟踪问题
784 while !l0_slot.is_empty() {
785 // Always remove from index 0
786 // 始终从索引 0 移除
787 let task_with_notifier = l0_slot.swap_remove(0);
788 let task_id = task_with_notifier.get_id();
789
790 // Determine if this is a periodic task (check before consuming)
791 // 判断是否为周期性任务(在消耗前检查)
792 let is_periodic = matches!(
793 task_with_notifier.task.task_type,
794 TaskTypeWithCompletionNotifier::Periodic { .. }
795 );
796
797 // For one-shot tasks, remove from DeferredMap index
798 // For periodic tasks, keep in index (will update location in reinsert)
799 // 对于一次性任务,从 DeferredMap 索引中移除
800 // 对于周期性任务,保留在索引中(重新插入时会更新位置)
801 if !is_periodic {
802 self.task_index.remove(task_id.key());
803 }
804
805 // Update swapped element's index (the element that was moved to index 0)
806 // 更新被交换元素的索引(被移动到索引 0 的元素)
807 if !l0_slot.is_empty() {
808 let swapped_task_id = l0_slot[0].get_id();
809 // Use DeferredMap's get_mut with TaskId key
810 // 使用 DeferredMap 的 get_mut,传入 TaskId key
811 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
812 swapped_location.vec_index = 0;
813 }
814 }
815
816 let TimerTaskForWheel { task_id, task, .. } = task_with_notifier;
817
818 match &task.task_type {
819 TaskTypeWithCompletionNotifier::Periodic {
820 completion_notifier,
821 ..
822 } => {
823 let _ = completion_notifier.0.try_send(TaskCompletion::Called);
824
825 periodic_tasks_to_reinsert.push((
826 task_id,
827 TimerTaskWithCompletionNotifier {
828 task_type: task.task_type,
829 delay: task.delay,
830 callback: task.callback.clone(),
831 },
832 ));
833 }
834 TaskTypeWithCompletionNotifier::OneShot {
835 completion_notifier,
836 } => {
837 completion_notifier.notify(crate::task::TaskCompletion::Called);
838 }
839 }
840
841 expired_tasks.push(WheelAdvanceResult {
842 id: task_id,
843 callback: task.callback,
844 });
845 }
846 }
847
848 // Reinsert periodic tasks for next interval
849 // 重新插入周期性任务到下一个周期
850 for (task_id, task) in periodic_tasks_to_reinsert {
851 self.reinsert_periodic_task(task_id, task);
852 }
853
854 // Process L1 layer
855 // Check if L1 layer needs to be advanced
856 // 处理 L1 层
857 // 检查是否需要推进 L1 层
858 if self.l0.current_tick.is_multiple_of(self.l1_tick_ratio) {
859 self.l1.current_tick += 1;
860 let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
861 let l1_slot = &mut self.l1.slots[l1_slot_index];
862
863 // Collect L1 layer expired tasks
864 // 收集 L1 层过期任务
865 let mut tasks_to_demote = Vec::new();
866 let mut i = 0;
867 while i < l1_slot.len() {
868 let task = &mut l1_slot[i];
869
870 if task.rounds > 0 {
871 // Still has rounds, decrease rounds and keep
872 // 还有轮数,减少轮数并保留
873 task.rounds -= 1;
874 // Use DeferredMap's get_mut with TaskId key
875 // 使用 DeferredMap 的 get_mut,传入 TaskId key
876 if let Some(location) = self.task_index.get_mut(task.get_id().key()) {
877 location.vec_index = i;
878 }
879 i += 1;
880 } else {
881 // rounds = 0, need to demote to L0
882 // Don't remove from task_index - will update location in demote_tasks
883 // rounds = 0,需要降级到 L0
884 // 不从 task_index 中移除 - 在 demote_tasks 中会更新位置
885 let task_to_demote = l1_slot.swap_remove(i);
886
887 if i < l1_slot.len() {
888 let swapped_task_id = l1_slot[i].get_id();
889 // Use DeferredMap's get_mut with TaskId key
890 // 使用 DeferredMap 的 get_mut,传入 TaskId key
891 if let Some(swapped_location) =
892 self.task_index.get_mut(swapped_task_id.key())
893 {
894 swapped_location.vec_index = i;
895 }
896 }
897
898 tasks_to_demote.push(task_to_demote);
899 }
900 }
901
902 // Demote tasks to L0
903 // 将任务降级到 L0
904 self.demote_tasks(tasks_to_demote);
905 }
906
907 expired_tasks
908 }
909
910 /// Demote tasks from L1 to L0
911 ///
912 /// Recalculate and insert L1 expired tasks into L0 layer
913 ///
914 /// Updates task location in DeferredMap without removing/reinserting
915 ///
916 /// 将任务从 L1 降级到 L0
917 ///
918 /// 重新计算并将 L1 过期任务插入到 L0 层
919 ///
920 /// 更新 DeferredMap 中的任务位置而不删除/重新插入
921 fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
922 for task in tasks {
923 // Calculate the remaining delay of the task in L0 layer
924 // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
925 // 计算任务在 L0 层的剩余延迟
926 // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
927 let l1_tick_ratio = self.l1_tick_ratio;
928
929 // Calculate the original expiration time of the task (L1 tick)
930 // 计算任务的原始过期时间(L1 tick)
931 let l1_deadline = task.deadline_tick;
932
933 // Convert to L0 tick expiration time
934 // 转换为 L0 tick 过期时间
935 let l0_deadline_tick = l1_deadline * l1_tick_ratio;
936 let l0_current_tick = self.l0.current_tick;
937
938 // Calculate remaining L0 ticks
939 // 计算剩余 L0 ticks
940 let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
941 l0_deadline_tick - l0_current_tick
942 } else {
943 1 // At least trigger in next tick (至少在下一个 tick 触发)
944 };
945
946 // Calculate L0 slot index
947 // 计算 L0 槽索引
948 let target_l0_tick = l0_current_tick + remaining_l0_ticks;
949 let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
950
951 let task_id = task.get_id();
952 let vec_index = self.l0.slots[l0_slot_index].len();
953 let new_location = TaskLocation::new(0, l0_slot_index, vec_index);
954
955 // Insert into L0 layer
956 // 插入到 L0 层
957 self.l0.slots[l0_slot_index].push(task);
958
959 // Update task location in DeferredMap (task already exists in index)
960 // 更新 DeferredMap 中的任务位置(任务已在索引中)
961 if let Some(location) = self.task_index.get_mut(task_id.key()) {
962 *location = new_location;
963 }
964 }
965 }
966
967 /// Check if the timing wheel is empty
968 ///
969 /// 检查时间轮是否为空
970 #[allow(dead_code)]
971 pub fn is_empty(&self) -> bool {
972 self.task_index.is_empty()
973 }
974
975 /// Postpone timer task (keep original TaskId)
976 ///
977 /// # Parameters
978 /// - `task_id`: Task ID to postpone
979 /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
980 /// - `new_callback`: New callback function (if None, keep original callback)
981 ///
982 /// # Returns
983 /// Returns true if the task exists and is successfully postponed, otherwise returns false
984 ///
985 /// # Implementation Details
986 /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
987 /// - Update delay time and callback function (if provided)
988 /// - Recalculate target layer, slot, and rounds based on new_delay
989 /// - Cross-layer migration may occur (L0 <-> L1)
990 /// - Re-insert to new position using original TaskId
991 /// - Keep consistent with external held TaskId reference
992 ///
993 /// 延期定时器任务(保留原始 TaskId)
994 ///
995 /// # 参数
996 /// - `task_id`: 要延期的任务 ID
997 /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
998 /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
999 ///
1000 /// # 返回值
1001 /// 如果任务存在且成功延期则返回 true,否则返回 false
1002 ///
1003 /// # 实现细节
1004 /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
1005 /// - 更新延迟时间和回调函数(如果提供)
1006 /// - 根据 new_delay 重新计算目标层级、槽位和轮数
1007 /// - 可能发生跨层迁移(L0 <-> L1)
1008 /// - 使用原始 TaskId 重新插入到新位置
1009 /// - 与外部持有的 TaskId 引用保持一致
1010 #[inline]
1011 pub fn postpone(
1012 &mut self,
1013 task_id: TaskId,
1014 new_delay: Duration,
1015 new_callback: Option<crate::task::CallbackWrapper>,
1016 ) -> bool {
1017 // Step 1: Get task location from DeferredMap (don't remove)
1018 // 步骤 1: 从 DeferredMap 获取任务位置(不删除)
1019 let old_location = match self.task_index.get(task_id.key()) {
1020 Some(loc) => *loc,
1021 None => return false,
1022 };
1023
1024 // Use match to get slot reference
1025 // 使用 match 获取槽引用
1026 let slot = match old_location.level {
1027 0 => &mut self.l0.slots[old_location.slot_index],
1028 _ => &mut self.l1.slots[old_location.slot_index],
1029 };
1030
1031 // Verify task is still at expected position
1032 // 验证任务仍在预期位置
1033 if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id
1034 {
1035 // Index inconsistent, return failure
1036 // 索引不一致,返回失败
1037 return false;
1038 }
1039
1040 // Use swap_remove to remove task from slot
1041 // 使用 swap_remove 从槽中移除任务
1042 let mut task = slot.swap_remove(old_location.vec_index);
1043
1044 // Update swapped element's index (if a swap occurred)
1045 // 更新被交换元素的索引(如果发生了交换)
1046 if old_location.vec_index < slot.len() {
1047 let swapped_task_id = slot[old_location.vec_index].get_id();
1048 // Use DeferredMap's get_mut with TaskId key
1049 // 使用 DeferredMap 的 get_mut,传入 TaskId key
1050 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
1051 swapped_location.vec_index = old_location.vec_index;
1052 }
1053 }
1054
1055 // Step 2: Update task's delay and callback
1056 // 步骤 2: 更新任务的延迟和回调
1057 task.update_delay(new_delay);
1058 if let Some(callback) = new_callback {
1059 task.update_callback(callback);
1060 }
1061
1062 // Step 3: Recalculate layer, slot, and rounds based on new delay
1063 // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
1064 let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
1065
1066 // Use match to reduce branches, and use cached slot mask
1067 // 使用 match 减少分支,并使用缓存的槽掩码
1068 let (current_tick, slot_mask, slots) = match new_level {
1069 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
1070 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
1071 };
1072
1073 let total_ticks = current_tick + ticks;
1074 let new_slot_index = (total_ticks as usize) & slot_mask;
1075
1076 // Update task's timing wheel parameters
1077 // 更新任务的时间轮参数
1078 task.deadline_tick = total_ticks;
1079 task.rounds = new_rounds;
1080
1081 // Step 4: Re-insert task to new layer/slot
1082 // 步骤 4: 将任务重新插入到新层级/槽位
1083 let new_vec_index = slots[new_slot_index].len();
1084 let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
1085
1086 slots[new_slot_index].push(task);
1087
1088 // Update task location in DeferredMap (task already exists in index)
1089 // 更新 DeferredMap 中的任务位置(任务已在索引中)
1090 if let Some(location) = self.task_index.get_mut(task_id.key()) {
1091 *location = new_location;
1092 }
1093
1094 true
1095 }
1096
1097 /// Batch postpone timer tasks
1098 ///
1099 /// # Parameters
1100 /// - `updates`: List of tuples of (task ID, new delay)
1101 ///
1102 /// # Returns
1103 /// Number of successfully postponed tasks
1104 ///
1105 /// # Performance Advantages
1106 /// - Batch processing reduces function call overhead
1107 /// - All delays are recalculated from current_tick at call time
1108 ///
1109 /// # Notes
1110 /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
1111 ///
1112 /// 批量延期定时器任务
1113 ///
1114 /// # 参数
1115 /// - `updates`: (任务 ID, 新延迟) 元组列表
1116 ///
1117 /// # 返回值
1118 /// 成功延期的任务数量
1119 ///
1120 /// # 性能优势
1121 /// - 批处理减少函数调用开销
1122 /// - 所有延迟在调用时从 current_tick 重新计算
1123 ///
1124 /// # 注意
1125 /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
1126 #[inline]
1127 pub fn postpone_batch(&mut self, updates: Vec<(TaskId, Duration)>) -> usize {
1128 let mut postponed_count = 0;
1129
1130 for (task_id, new_delay) in updates {
1131 if self.postpone(task_id, new_delay, None) {
1132 postponed_count += 1;
1133 }
1134 }
1135
1136 postponed_count
1137 }
1138
1139 /// Batch postpone timer tasks (replace callbacks)
1140 ///
1141 /// # Parameters
1142 /// - `updates`: List of tuples of (task ID, new delay, new callback)
1143 ///
1144 /// # Returns
1145 /// Number of successfully postponed tasks
1146 ///
1147 /// 批量延期定时器任务(替换回调)
1148 ///
1149 /// # 参数
1150 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1151 ///
1152 /// # 返回值
1153 /// 成功延期的任务数量
1154 pub fn postpone_batch_with_callbacks(
1155 &mut self,
1156 updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1157 ) -> usize {
1158 let mut postponed_count = 0;
1159
1160 for (task_id, new_delay, new_callback) in updates {
1161 if self.postpone(task_id, new_delay, new_callback) {
1162 postponed_count += 1;
1163 }
1164 }
1165
1166 postponed_count
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::*;
1173 use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1174
1175 #[test]
1176 fn test_wheel_creation() {
1177 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1178 assert_eq!(wheel.slot_count(), 512);
1179 assert_eq!(wheel.current_tick(), 0);
1180 assert!(wheel.is_empty());
1181 }
1182
1183 #[test]
1184 fn test_hierarchical_wheel_creation() {
1185 let config = WheelConfig::default();
1186
1187 let wheel = Wheel::new(config, BatchConfig::default());
1188 assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1189 assert_eq!(wheel.current_tick(), 0);
1190 assert!(wheel.is_empty());
1191 // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1192 assert_eq!(wheel.l1.slot_count, 64);
1193 assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1194 }
1195
1196 #[test]
1197 fn test_hierarchical_config_validation() {
1198 // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1199 let result = WheelConfig::builder()
1200 .l0_tick_duration(Duration::from_millis(10))
1201 .l0_slot_count(512)
1202 .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1203 .l1_slot_count(64)
1204 .build();
1205
1206 assert!(result.is_err());
1207
1208 // Correct configuration (正确的配置)
1209 let result = WheelConfig::builder()
1210 .l0_tick_duration(Duration::from_millis(10))
1211 .l0_slot_count(512)
1212 .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1213 .l1_slot_count(64)
1214 .build();
1215
1216 assert!(result.is_ok());
1217 }
1218
1219 #[test]
1220 fn test_layer_determination() {
1221 let config = WheelConfig::default();
1222
1223 let wheel = Wheel::new(config, BatchConfig::default());
1224
1225 // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1226 // L0: 512 slots * 10ms = 5120ms
1227 let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1228 assert_eq!(level, 0);
1229 assert_eq!(rounds, 0);
1230
1231 // Long delay should enter L1 layer
1232 // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1233 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1234 assert_eq!(level, 1);
1235 assert_eq!(rounds, 0);
1236
1237 // Long delay should enter L1 layer and have rounds
1238 // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1239 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1240 assert_eq!(level, 1);
1241 assert!(rounds > 0);
1242 }
1243
1244 #[test]
1245 fn test_hierarchical_insert_and_advance() {
1246 let config = WheelConfig::default();
1247
1248 let mut wheel = Wheel::new(config, BatchConfig::default());
1249
1250 // Insert short delay task into L0 (插入短延迟任务到 L0)
1251 let callback = CallbackWrapper::new(|| async {});
1252 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1253 let (task_with_notifier, _completion_rx) =
1254 TimerTaskWithCompletionNotifier::from_timer_task(task);
1255 let handle = wheel.allocate_handle();
1256 let task_id = handle.task_id();
1257 wheel.insert(handle, task_with_notifier);
1258
1259 // Verify task is in L0 layer (验证任务在 L0 层)
1260 let location = wheel.task_index.get(task_id.key()).unwrap();
1261 assert_eq!(location.level, 0);
1262
1263 // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1264 for _ in 0..10 {
1265 let expired = wheel.advance();
1266 if !expired.is_empty() {
1267 assert_eq!(expired.len(), 1);
1268 assert_eq!(expired[0].id, task_id);
1269 return;
1270 }
1271 }
1272 panic!("Task should have expired");
1273 }
1274
1275 #[test]
1276 fn test_cross_layer_cancel() {
1277 let config = WheelConfig::default();
1278
1279 let mut wheel = Wheel::new(config, BatchConfig::default());
1280
1281 // Insert L0 task (插入 L0 任务)
1282 let callback1 = CallbackWrapper::new(|| async {});
1283 let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1284 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1285 let handle1 = wheel.allocate_handle();
1286 let task_id1 = handle1.task_id();
1287 wheel.insert(handle1, task_with_notifier1);
1288
1289 // Insert L1 task (插入 L1 任务)
1290 let callback2 = CallbackWrapper::new(|| async {});
1291 let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1292 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1293 let handle2 = wheel.allocate_handle();
1294 let task_id2 = handle2.task_id();
1295 wheel.insert(handle2, task_with_notifier2);
1296
1297 // Verify levels (验证层级)
1298 assert_eq!(wheel.task_index.get(task_id1.key()).unwrap().level, 0);
1299 assert_eq!(wheel.task_index.get(task_id2.key()).unwrap().level, 1);
1300
1301 // Cancel L0 task (取消 L0 任务)
1302 assert!(wheel.cancel(task_id1));
1303 assert!(wheel.task_index.get(task_id1.key()).is_none());
1304
1305 // Cancel L1 task (取消 L1 任务)
1306 assert!(wheel.cancel(task_id2));
1307 assert!(wheel.task_index.get(task_id2.key()).is_none());
1308
1309 assert!(wheel.is_empty()); // 时间轮应该为空
1310 }
1311
1312 #[test]
1313 fn test_delay_to_ticks() {
1314 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1315 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1316 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1317 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1318 }
1319
1320 #[test]
1321 fn test_wheel_invalid_slot_count() {
1322 let result = WheelConfig::builder().l0_slot_count(100).build();
1323 assert!(result.is_err());
1324 if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1325 assert_eq!(slot_count, 100);
1326 assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1327 } else {
1328 panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1329 }
1330 }
1331
1332 #[test]
1333 fn test_minimum_delay() {
1334 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1335
1336 // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1337 let callback = CallbackWrapper::new(|| async {});
1338 let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1339 let (task_with_notifier, _completion_rx) =
1340 TimerTaskWithCompletionNotifier::from_timer_task(task);
1341 let handle = wheel.allocate_handle();
1342 let task_id: TaskId = handle.task_id();
1343 wheel.insert(handle, task_with_notifier);
1344
1345 // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1346 let expired = wheel.advance();
1347 assert_eq!(
1348 expired.len(),
1349 1,
1350 "Minimum delay task should be triggered after 1 tick"
1351 ); // 最小延迟任务应该在 1 个 tick 后触发
1352 assert_eq!(expired[0].id, task_id);
1353 }
1354
1355 #[test]
1356 fn test_advance_empty_slots() {
1357 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1358
1359 // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1360 for _ in 0..100 {
1361 let expired = wheel.advance();
1362 assert!(
1363 expired.is_empty(),
1364 "Empty slots should not return any tasks"
1365 );
1366 }
1367
1368 assert_eq!(
1369 wheel.current_tick(),
1370 100,
1371 "current_tick should correctly increment"
1372 ); // current_tick 应该正确递增
1373 }
1374
1375 #[test]
1376 fn test_slot_boundary() {
1377 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1378
1379 // Test slot boundary and wraparound (测试槽边界和环绕)
1380 // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1381 let callback1 = CallbackWrapper::new(|| async {});
1382 let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1383 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1384 let handle1 = wheel.allocate_handle();
1385 let task_id_1 = handle1.task_id();
1386 wheel.insert(handle1, task_with_notifier1);
1387
1388 // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发)
1389 let callback2 = CallbackWrapper::new(|| async {});
1390 let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1391 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1392 let handle2 = wheel.allocate_handle();
1393 let task_id_2 = handle2.task_id();
1394 wheel.insert(handle2, task_with_notifier2);
1395
1396 // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1397 let expired = wheel.advance();
1398 assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1399 assert_eq!(expired[0].id, task_id_1);
1400
1401 // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1402 let mut triggered = false;
1403 for i in 0..510 {
1404 let expired = wheel.advance();
1405 if !expired.is_empty() {
1406 assert_eq!(
1407 expired.len(),
1408 1,
1409 "The {}th advance should trigger the second task",
1410 i + 2
1411 ); // 第 {i + 2} 次前进应该触发第二个任务
1412 assert_eq!(expired[0].id, task_id_2);
1413 triggered = true;
1414 break;
1415 }
1416 }
1417 assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1418
1419 assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1420 }
1421
1422 #[test]
1423 fn test_task_id_uniqueness() {
1424 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1425
1426 // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1427 let mut task_ids = std::collections::HashSet::new();
1428 for _ in 0..100 {
1429 let callback = CallbackWrapper::new(|| async {});
1430 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1431 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1432 let handle = wheel.allocate_handle();
1433 let task_id = handle.task_id();
1434 wheel.insert(handle, task_with_notifier);
1435
1436 assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1437 }
1438
1439 assert_eq!(task_ids.len(), 100);
1440 }
1441}