kestrel_timer/wheel.rs
1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{
4 TaskCompletion, TaskHandle, TaskId, TaskLocation, TaskTypeWithCompletionNotifier,
5 TimerTaskForWheel, TimerTaskWithCompletionNotifier,
6};
7use deferred_map::DeferredMap;
8use std::time::Duration;
9
10pub struct WheelAdvanceResult {
11 pub id: TaskId,
12 pub callback: Option<CallbackWrapper>,
13}
14
15/// Timing wheel single layer data structure
16///
17/// 时间轮单层数据结构
18struct WheelLayer {
19 /// Slot array, each slot stores a group of timer tasks
20 ///
21 /// 槽数组,每个槽存储一组定时器任务
22 slots: Vec<Vec<TimerTaskForWheel>>,
23
24 /// Current time pointer (tick index)
25 ///
26 /// 当前时间指针(tick 索引)
27 current_tick: u64,
28
29 /// Slot count
30 ///
31 /// 槽数量
32 slot_count: usize,
33
34 /// Duration of each tick
35 ///
36 /// 每个 tick 的持续时间
37 tick_duration: Duration,
38
39 /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
40 ///
41 /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
42 tick_duration_ms: u64,
43
44 /// Cache: slot mask (slot_count - 1) - for fast modulo operation
45 ///
46 /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
47 slot_mask: usize,
48}
49
50impl WheelLayer {
51 /// Create a new wheel layer
52 ///
53 /// 创建一个新的时间轮层
54 fn new(slot_count: usize, tick_duration: Duration) -> Self {
55 let mut slots = Vec::with_capacity(slot_count);
56 // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
57 // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
58 //
59 // 为每个槽预分配容量以减少后续 push 时的重新分配
60 // 大多数槽通常包含 0-4 个任务,预分配容量为 4
61 for _ in 0..slot_count {
62 slots.push(Vec::with_capacity(4));
63 }
64
65 let tick_duration_ms = tick_duration.as_millis() as u64;
66 let slot_mask = slot_count - 1;
67
68 Self {
69 slots,
70 current_tick: 0,
71 slot_count,
72 tick_duration,
73 tick_duration_ms,
74 slot_mask,
75 }
76 }
77
78 /// Calculate the number of ticks corresponding to the delay
79 ///
80 /// 计算延迟对应的 tick 数量
81 fn delay_to_ticks(&self, delay: Duration) -> u64 {
82 let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
83 ticks.max(1) // at least 1 tick (至少 1 个 tick)
84 }
85}
86
87/// Timing wheel data structure (hierarchical mode)
88///
89/// Now uses DeferredMap for O(1) task lookup and generational safety.
90///
91/// 时间轮数据结构(分层模式)
92///
93/// 现在使用 DeferredMap 实现 O(1) 任务查找和代数安全
94pub struct Wheel {
95 /// L0 layer (bottom layer)
96 ///
97 /// L0 层(底层)
98 l0: WheelLayer,
99
100 /// L1 layer (top layer)
101 ///
102 /// L1 层(顶层)
103 l1: WheelLayer,
104
105 /// L1 tick ratio relative to L0 tick
106 ///
107 /// L1 tick 相对于 L0 tick 的比率
108 pub(crate) l1_tick_ratio: u64,
109
110 /// Task index for fast lookup and cancellation using DeferredMap
111 ///
112 /// Keys are TaskIds (u64 from DeferredMap), values are TaskLocations
113 ///
114 /// 任务索引,使用 DeferredMap 实现快速查找和取消
115 ///
116 /// 键是 TaskId(来自 DeferredMap 的 u64),值是 TaskLocation
117 pub(crate) task_index: DeferredMap<TaskLocation>,
118
119 /// Batch processing configuration
120 ///
121 /// 批处理配置
122 batch_config: BatchConfig,
123
124 /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
125 ///
126 /// 缓存:L0 层容量(毫秒)- 避免重复计算
127 l0_capacity_ms: u64,
128
129 /// Cache: L1 layer capacity in ticks - avoid repeated calculation
130 ///
131 /// 缓存:L1 层容量(tick 数)- 避免重复计算
132 l1_capacity_ticks: u64,
133}
134
135impl Wheel {
136 /// Create new timing wheel
137 ///
138 /// # Parameters
139 /// - `config`: Timing wheel configuration (already validated)
140 /// - `batch_config`: Batch processing configuration
141 ///
142 /// # Notes
143 /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
144 /// Now uses DeferredMap for task indexing with generational safety.
145 ///
146 /// 创建新的时间轮
147 ///
148 /// # 参数
149 /// - `config`: 时间轮配置(已验证)
150 /// - `batch_config`: 批处理配置
151 ///
152 /// # 注意
153 /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
154 /// 现在使用 DeferredMap 进行任务索引,具有代数安全特性。
155 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
156 let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
157 let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
158
159 // Calculate L1 tick ratio relative to L0 tick
160 // 计算 L1 tick 相对于 L0 tick 的比率
161 let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
162
163 // Pre-calculate capacity to avoid repeated calculation in insert
164 // 预计算容量,避免在 insert 中重复计算
165 let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
166 let l1_capacity_ticks = l1.slot_count as u64;
167
168 // Estimate initial capacity for DeferredMap based on L0 slot count
169 // 根据 L0 槽数量估算 DeferredMap 的初始容量
170 let estimated_capacity = (l0.slot_count / 4).max(64);
171
172 Self {
173 l0,
174 l1,
175 l1_tick_ratio,
176 task_index: DeferredMap::with_capacity(estimated_capacity),
177 batch_config,
178 l0_capacity_ms,
179 l1_capacity_ticks,
180 }
181 }
182
183 /// Get current tick (L0 layer tick)
184 ///
185 /// 获取当前 tick(L0 层 tick)
186 #[allow(dead_code)]
187 pub fn current_tick(&self) -> u64 {
188 self.l0.current_tick
189 }
190
191 /// Get tick duration (L0 layer tick duration)
192 ///
193 /// 获取 tick 持续时间(L0 层 tick 持续时间)
194 #[allow(dead_code)]
195 pub fn tick_duration(&self) -> Duration {
196 self.l0.tick_duration
197 }
198
199 /// Get slot count (L0 layer slot count)
200 ///
201 /// 获取槽数量(L0 层槽数量)
202 #[allow(dead_code)]
203 pub fn slot_count(&self) -> usize {
204 self.l0.slot_count
205 }
206
207 /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
208 ///
209 /// 计算延迟对应的 tick 数量(基于 L0 层)
210 #[allow(dead_code)]
211 pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
212 self.l0.delay_to_ticks(delay)
213 }
214
215 /// Determine which layer the delay should be inserted into
216 ///
217 /// # Returns
218 /// Returns: (layer, ticks, rounds)
219 /// - Layer: 0 = L0, 1 = L1
220 /// - Ticks: number of ticks calculated from current tick
221 /// - Rounds: number of rounds (only used for very long delays in L1 layer)
222 ///
223 /// 确定延迟应该插入到哪一层
224 ///
225 /// # 返回值
226 /// 返回:(层级, ticks, 轮数)
227 /// - 层级:0 = L0, 1 = L1
228 /// - Ticks:从当前 tick 计算的 tick 数量
229 /// - 轮数:轮数(仅用于 L1 层的超长延迟)
230 #[inline(always)]
231 fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
232 let delay_ms = delay.as_millis() as u64;
233
234 // Fast path: most tasks are within L0 range (using cached capacity)
235 // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
236 if delay_ms < self.l0_capacity_ms {
237 let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
238 return (0, l0_ticks, 0);
239 }
240
241 // Slow path: L1 layer tasks (using cached values)
242 // 慢速路径:L1 层任务(使用缓存的值)
243 let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
244
245 if l1_ticks < self.l1_capacity_ticks {
246 (1, l1_ticks, 0)
247 } else {
248 let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
249 (1, l1_ticks, rounds)
250 }
251 }
252
253 /// Allocate a handle from DeferredMap
254 ///
255 /// 从 DeferredMap 分配一个 handle
256 ///
257 /// # Returns
258 /// A unique handle for later insertion
259 ///
260 /// # 返回值
261 /// 用于后续插入的唯一 handle
262 pub fn allocate_handle(&mut self) -> TaskHandle {
263 TaskHandle::new(self.task_index.allocate_handle())
264 }
265
266 /// Batch allocate handles from DeferredMap
267 ///
268 /// 从 DeferredMap 批量分配 handles
269 ///
270 /// # Parameters
271 /// - `count`: Number of handles to allocate
272 ///
273 /// # Returns
274 /// Vector of unique handles for later batch insertion
275 ///
276 /// # 参数
277 /// - `count`: 要分配的 handle 数量
278 ///
279 /// # 返回值
280 /// 用于后续批量插入的唯一 handles 向量
281 pub fn allocate_handles(&mut self, count: usize) -> Vec<TaskHandle> {
282 let mut handles = Vec::with_capacity(count);
283 for _ in 0..count {
284 handles.push(TaskHandle::new(self.task_index.allocate_handle()));
285 }
286 handles
287 }
288
289 /// Insert timer task
290 ///
291 /// # Parameters
292 /// - `handle`: Handle for the task
293 /// - `task`: Timer task with completion notifier
294 ///
295 /// # Returns
296 /// Unique identifier of the task (TaskId) - now generated from DeferredMap
297 ///
298 /// # Implementation Details
299 /// - Allocates Handle from DeferredMap to generate TaskId with generational safety
300 /// - Automatically calculate the layer and slot where the task should be inserted
301 /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
302 /// - Use bit operations to optimize slot index calculation
303 /// - Use DeferredMap for O(1) lookup and cancellation with generation checking
304 ///
305 /// 插入定时器任务
306 ///
307 /// # 参数
308 /// - `handle`: 任务的 handle
309 /// - `task`: 带完成通知器的定时器任务
310 ///
311 /// # 返回值
312 /// 任务的唯一标识符(TaskId)- 现在从 DeferredMap 生成
313 ///
314 /// # 实现细节
315 /// - 自动计算任务应该插入的层级和槽位
316 /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
317 /// - 使用位运算优化槽索引计算
318 /// - 使用 DeferredMap 实现 O(1) 查找和取消,带代数检查
319 #[inline]
320 pub fn insert(&mut self, handle: TaskHandle, task: TimerTaskWithCompletionNotifier) {
321 let task_id = handle.task_id();
322
323 let (level, ticks, rounds) = self.determine_layer(task.delay);
324
325 // Use match to reduce branches, and use cached slot mask
326 // 使用 match 减少分支,并使用缓存的槽掩码
327 let (current_tick, slot_mask, slots) = match level {
328 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
329 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
330 };
331
332 let total_ticks = current_tick + ticks;
333 let slot_index = (total_ticks as usize) & slot_mask;
334
335 // Create task with the assigned TaskId
336 // 使用分配的 TaskId 创建任务
337 let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
338
339 // Get the index position of the task in Vec (the length before insertion is the index of the new task)
340 // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
341 let vec_index = slots[slot_index].len();
342 let location = TaskLocation::new(level, slot_index, vec_index);
343
344 // Insert task into slot
345 // 将任务插入槽中
346 slots[slot_index].push(task);
347
348 // Insert task location into DeferredMap using handle
349 // In non-debug mode, it is guaranteed that this method will not panic.
350 // 使用 handle 将任务位置插入 DeferredMap
351 // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
352 self.task_index
353 .insert(handle.into_handle(), location)
354 .expect("Handle should be valid for insertion");
355 }
356
357 /// Batch insert timer tasks
358 ///
359 /// # Parameters
360 /// - `handles`: List of pre-allocated handles for tasks
361 /// - `tasks`: List of timer tasks with completion notifiers
362 ///
363 /// # Returns
364 /// - `Ok(())` if all tasks are successfully inserted
365 /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
366 ///
367 /// # Performance Advantages
368 /// - Reduce repeated boundary checks and capacity adjustments
369 /// - For tasks with the same delay, calculation results can be reused
370 /// - Uses DeferredMap for efficient task indexing with generational safety
371 ///
372 /// 批量插入定时器任务
373 ///
374 /// # 参数
375 /// - `handles`: 任务的预分配 handles 列表
376 /// - `tasks`: 带完成通知器的定时器任务列表
377 ///
378 /// # 返回值
379 /// - `Ok(())` 如果所有任务成功插入
380 /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
381 ///
382 /// # 性能优势
383 /// - 减少重复的边界检查和容量调整
384 /// - 对于相同延迟的任务,可以重用计算结果
385 /// - 使用 DeferredMap 实现高效的任务索引,具有代数安全特性
386 #[inline]
387 pub fn insert_batch(
388 &mut self,
389 handles: Vec<TaskHandle>,
390 tasks: Vec<TimerTaskWithCompletionNotifier>,
391 ) -> Result<(), crate::error::TimerError> {
392 // Validate that handles and tasks have the same length
393 // 验证 handles 和 tasks 长度相同
394 if handles.len() != tasks.len() {
395 return Err(crate::error::TimerError::BatchLengthMismatch {
396 handles_len: handles.len(),
397 tasks_len: tasks.len(),
398 });
399 }
400
401 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
402 let task_id = handle.task_id();
403
404 let (level, ticks, rounds) = self.determine_layer(task.delay);
405
406 // Use match to reduce branches, and use cached slot mask
407 // 使用 match 减少分支,并使用缓存的槽掩码
408 let (current_tick, slot_mask, slots) = match level {
409 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
410 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
411 };
412
413 let total_ticks = current_tick + ticks;
414 let slot_index = (total_ticks as usize) & slot_mask;
415
416 // Create task with the assigned TaskId
417 // 使用分配的 TaskId 创建任务
418 let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
419
420 // Get the index position of the task in Vec
421 // 获取任务在 Vec 中的索引位置
422 let vec_index = slots[slot_index].len();
423 let location = TaskLocation::new(level, slot_index, vec_index);
424
425 // Insert task into slot
426 // 将任务插入槽中
427 slots[slot_index].push(task);
428
429 // Insert task location into DeferredMap using handle
430 // In non-debug mode, it is guaranteed that this method will not panic.
431 // 使用 handle 将任务位置插入 DeferredMap
432 // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
433 self.task_index
434 .insert(handle.into_handle(), location)
435 .expect("Handle should be valid for insertion");
436 }
437
438 Ok(())
439 }
440
441 /// Cancel timer task
442 ///
443 /// # Parameters
444 /// - `task_id`: Task ID
445 ///
446 /// # Returns
447 /// Returns true if the task exists and is successfully cancelled, otherwise returns false
448 ///
449 /// Now uses DeferredMap for safe task removal with generational checking
450 ///
451 /// 取消定时器任务
452 ///
453 /// # 参数
454 /// - `task_id`: 任务 ID
455 ///
456 /// # 返回值
457 /// 如果任务存在且成功取消则返回 true,否则返回 false
458 ///
459 /// 现在使用 DeferredMap 实现安全的任务移除,带代数检查
460 #[inline]
461 pub fn cancel(&mut self, task_id: TaskId) -> bool {
462 // Remove task location from DeferredMap using TaskId as key
463 // 使用 TaskId 作为 key 从 DeferredMap 中移除任务位置
464 let location = match self.task_index.remove(task_id.key()) {
465 Some(loc) => loc,
466 None => return false, // Task not found or already removed (generation mismatch)
467 };
468
469 // Use match to get slot reference, reduce branches
470 // 使用 match 获取槽引用,减少分支
471 let slot = match location.level {
472 0 => &mut self.l0.slots[location.slot_index],
473 _ => &mut self.l1.slots[location.slot_index],
474 };
475
476 // Boundary check and ID verification
477 // 边界检查和 ID 验证
478 if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
479 // Index inconsistent - this shouldn't happen with DeferredMap, but handle it anyway
480 // 索引不一致 - DeferredMap 不应该出现这种情况,但还是处理一下
481 return false;
482 }
483
484 // Use swap_remove to remove task, record swapped task ID
485 // 使用 swap_remove 移除任务,记录被交换的任务 ID
486 let removed_task = slot.swap_remove(location.vec_index);
487
488 // Ensure the correct task was removed
489 // 确保移除了正确的任务
490 debug_assert_eq!(removed_task.get_id(), task_id);
491
492 match removed_task.into_task_type() {
493 TaskTypeWithCompletionNotifier::OneShot {
494 completion_notifier,
495 } => {
496 // Use Notify + AtomicU8 for zero-allocation notification
497 // 使用 Notify + AtomicU8 实现零分配通知
498 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
499 }
500 TaskTypeWithCompletionNotifier::Periodic {
501 completion_notifier,
502 ..
503 } => {
504 // Use flume for high-performance periodic notification
505 // 使用 flume 实现高性能周期通知
506 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
507 }
508 }
509
510 // If a swap occurred (vec_index is not the last element)
511 // 如果发生了交换(vec_index 不是最后一个元素)
512 if location.vec_index < slot.len() {
513 let swapped_task_id = slot[location.vec_index].get_id();
514 // Update swapped element's index in one go, using DeferredMap's get_mut
515 // 一次性更新被交换元素的索引,使用 DeferredMap 的 get_mut
516 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
517 swapped_location.vec_index = location.vec_index;
518 }
519 }
520
521 true
522 }
523
524 /// Batch cancel timer tasks
525 ///
526 /// # Parameters
527 /// - `task_ids`: List of task IDs to cancel
528 ///
529 /// # Returns
530 /// Number of successfully cancelled tasks
531 ///
532 /// # Performance Advantages
533 /// - Reduce repeated HashMap lookup overhead
534 /// - Multiple cancellation operations on the same slot can be batch processed
535 /// - Use unstable sort to improve performance
536 /// - Small batch optimization: skip sorting based on configuration threshold, process directly
537 ///
538 /// 批量取消定时器任务
539 ///
540 /// # 参数
541 /// - `task_ids`: 要取消的任务 ID 列表
542 ///
543 /// # 返回值
544 /// 成功取消的任务数量
545 ///
546 /// # 性能优势
547 /// - 减少重复的 HashMap 查找开销
548 /// - 同一槽位的多个取消操作可以批量处理
549 /// - 使用不稳定排序提高性能
550 /// - 小批量优化:根据配置阈值跳过排序,直接处理
551 #[inline]
552 pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
553 let mut cancelled_count = 0;
554
555 // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
556 // 小批量优化:逐个取消以避免分组和排序开销
557 if task_ids.len() <= self.batch_config.small_batch_threshold {
558 for &task_id in task_ids {
559 if self.cancel(task_id) {
560 cancelled_count += 1;
561 }
562 }
563 return cancelled_count;
564 }
565
566 // Group by layer and slot to optimize batch cancellation
567 // Use SmallVec to avoid heap allocation in most cases
568 // 按层级和槽位分组以优化批量取消
569 // 使用 SmallVec 避免在大多数情况下进行堆分配
570 let l0_slot_count = self.l0.slot_count;
571 let l1_slot_count = self.l1.slot_count;
572
573 let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
574 let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
575
576 // Collect information of tasks to be cancelled
577 // 收集要取消的任务信息
578 for &task_id in task_ids {
579 // Use DeferredMap's get with TaskId key
580 // 使用 DeferredMap 的 get,传入 TaskId key
581 if let Some(location) = self.task_index.get(task_id.key()) {
582 if location.level == 0 {
583 l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
584 } else {
585 l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
586 }
587 }
588 }
589
590 // Process L0 layer cancellation
591 // 处理 L0 层取消
592 for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
593 if tasks.is_empty() {
594 continue;
595 }
596
597 // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
598 // 按 vec_index 降序排序,从后向前删除以避免索引失效
599 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
600
601 let slot = &mut self.l0.slots[slot_index];
602
603 for &(task_id, vec_index) in tasks.iter() {
604 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
605 let removed_task = slot.swap_remove(vec_index);
606 match removed_task.into_task_type() {
607 TaskTypeWithCompletionNotifier::OneShot {
608 completion_notifier,
609 } => {
610 // Use Notify + AtomicU8 for zero-allocation notification
611 // 使用 Notify + AtomicU8 实现零分配通知
612 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
613 }
614 TaskTypeWithCompletionNotifier::Periodic {
615 completion_notifier,
616 ..
617 } => {
618 // Use flume for high-performance periodic notification
619 // 使用 flume 实现高性能周期通知
620 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
621 }
622 }
623
624 if vec_index < slot.len() {
625 let swapped_task_id = slot[vec_index].get_id();
626 // Use DeferredMap's get_mut with TaskId key
627 // 使用 DeferredMap 的 get_mut,传入 TaskId key
628 if let Some(swapped_location) =
629 self.task_index.get_mut(swapped_task_id.key())
630 {
631 swapped_location.vec_index = vec_index;
632 }
633 }
634
635 // Use DeferredMap's remove with TaskId key
636 // 使用 DeferredMap 的 remove,传入 TaskId key
637 self.task_index.remove(task_id.key());
638 cancelled_count += 1;
639 }
640 }
641 }
642
643 // Process L1 layer cancellation
644 // 处理 L1 层取消
645 for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
646 if tasks.is_empty() {
647 continue;
648 }
649
650 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
651
652 let slot = &mut self.l1.slots[slot_index];
653
654 for &(task_id, vec_index) in tasks.iter() {
655 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
656 let removed_task = slot.swap_remove(vec_index);
657
658 match removed_task.into_task_type() {
659 TaskTypeWithCompletionNotifier::OneShot {
660 completion_notifier,
661 } => {
662 // Use Notify + AtomicU8 for zero-allocation notification
663 // 使用 Notify + AtomicU8 实现零分配通知
664 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
665 }
666 TaskTypeWithCompletionNotifier::Periodic {
667 completion_notifier,
668 ..
669 } => {
670 // Use flume for high-performance periodic notification
671 // 使用 flume 实现高性能周期通知
672 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
673 }
674 }
675
676 if vec_index < slot.len() {
677 let swapped_task_id = slot[vec_index].get_id();
678 // Use DeferredMap's get_mut with TaskId key
679 // 使用 DeferredMap 的 get_mut,传入 TaskId key
680 if let Some(swapped_location) =
681 self.task_index.get_mut(swapped_task_id.key())
682 {
683 swapped_location.vec_index = vec_index;
684 }
685 }
686
687 // Use DeferredMap's remove with TaskId key
688 // 使用 DeferredMap 的 remove,传入 TaskId key
689 self.task_index.remove(task_id.key());
690 cancelled_count += 1;
691 }
692 }
693 }
694
695 cancelled_count
696 }
697
698 /// Reinsert periodic task with the same TaskId
699 ///
700 /// Used to automatically reschedule periodic tasks after they expire
701 ///
702 /// TaskId remains unchanged, only location is updated in DeferredMap
703 ///
704 /// 重新插入周期性任务(保持相同的 TaskId)
705 ///
706 /// 用于在周期性任务过期后自动重新调度
707 ///
708 /// TaskId 保持不变,只更新 DeferredMap 中的位置
709 fn reinsert_periodic_task(&mut self, task_id: TaskId, task: TimerTaskWithCompletionNotifier) {
710 // Determine which layer the interval should be inserted into
711 // This method is only called by periodic tasks, so the interval is guaranteed to be Some.
712 // 确定间隔应该插入到哪一层
713 // 该方法只能由周期性任务调用,所以间隔是 guaranteed 应当保证为 Some.
714 let (level, ticks, rounds) = self.determine_layer(task.get_interval().unwrap());
715
716 // Use match to reduce branches, and use cached slot mask
717 // 使用 match 减少分支,并使用缓存的槽掩码
718 let (current_tick, slot_mask, slots) = match level {
719 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
720 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
721 };
722
723 let total_ticks = current_tick + ticks;
724 let slot_index = (total_ticks as usize) & slot_mask;
725
726 // Get the index position of the task in Vec
727 // 获取任务在 Vec 中的索引位置
728 let vec_index = slots[slot_index].len();
729 let new_location = TaskLocation::new(level, slot_index, vec_index);
730
731 // Insert task into slot
732 // 将任务插入槽中
733 slots[slot_index].push(TimerTaskForWheel::new_with_id(
734 task_id,
735 task,
736 total_ticks,
737 rounds,
738 ));
739
740 // Update task location in DeferredMap (doesn't remove, just updates)
741 // 更新 DeferredMap 中的任务位置(不删除,仅更新)
742 if let Some(location) = self.task_index.get_mut(task_id.key()) {
743 *location = new_location;
744 }
745 }
746
747 /// Advance the timing wheel by one tick, return all expired tasks
748 ///
749 /// # Returns
750 /// List of expired tasks
751 ///
752 /// # Implementation Details
753 /// - L0 layer advances 1 tick each time (no rounds check)
754 /// - L1 layer advances once every (L1_tick / L0_tick) times
755 /// - L1 expired tasks are batch demoted to L0
756 ///
757 /// 推进时间轮一个 tick,返回所有过期的任务
758 ///
759 /// # 返回值
760 /// 过期任务列表
761 ///
762 /// # 实现细节
763 /// - L0 层每次推进 1 个 tick(无轮数检查)
764 /// - L1 层每 (L1_tick / L0_tick) 次推进一次
765 /// - L1 层过期任务批量降级到 L0
766 pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
767 // Advance L0 layer
768 // 推进 L0 层
769 self.l0.current_tick += 1;
770
771 let mut expired_tasks = Vec::new();
772
773 // Process L0 layer expired tasks
774 // Distinguish between one-shot and periodic tasks
775 // 处理 L0 层过期任务
776 // 区分一次性任务和周期性任务
777 let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
778
779 // Collect periodic tasks to reinsert
780 // 收集需要重新插入的周期性任务
781 let mut periodic_tasks_to_reinsert = Vec::new();
782
783 {
784 let l0_slot = &mut self.l0.slots[l0_slot_index];
785
786 // Process all tasks in the current slot by always removing from index 0
787 // This avoids index tracking issues with swap_remove
788 // 通过始终从索引 0 移除来处理当前槽中的所有任务
789 // 这避免了 swap_remove 的索引跟踪问题
790 while !l0_slot.is_empty() {
791 // Always remove from index 0
792 // 始终从索引 0 移除
793 let task_with_notifier = l0_slot.swap_remove(0);
794 let task_id = task_with_notifier.get_id();
795
796 // Determine if this is a periodic task (check before consuming)
797 // 判断是否为周期性任务(在消耗前检查)
798 let is_periodic = matches!(
799 task_with_notifier.task.task_type,
800 TaskTypeWithCompletionNotifier::Periodic { .. }
801 );
802
803 // For one-shot tasks, remove from DeferredMap index
804 // For periodic tasks, keep in index (will update location in reinsert)
805 // 对于一次性任务,从 DeferredMap 索引中移除
806 // 对于周期性任务,保留在索引中(重新插入时会更新位置)
807 if !is_periodic {
808 self.task_index.remove(task_id.key());
809 }
810
811 // Update swapped element's index (the element that was moved to index 0)
812 // 更新被交换元素的索引(被移动到索引 0 的元素)
813 if !l0_slot.is_empty() {
814 let swapped_task_id = l0_slot[0].get_id();
815 // Use DeferredMap's get_mut with TaskId key
816 // 使用 DeferredMap 的 get_mut,传入 TaskId key
817 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
818 swapped_location.vec_index = 0;
819 }
820 }
821
822 let TimerTaskForWheel { task_id, task, .. } = task_with_notifier;
823
824 match &task.task_type {
825 TaskTypeWithCompletionNotifier::Periodic {
826 completion_notifier,
827 ..
828 } => {
829 let _ = completion_notifier.0.try_send(TaskCompletion::Called);
830
831 periodic_tasks_to_reinsert.push((
832 task_id,
833 TimerTaskWithCompletionNotifier {
834 task_type: task.task_type,
835 delay: task.delay,
836 callback: task.callback.clone(),
837 },
838 ));
839 }
840 TaskTypeWithCompletionNotifier::OneShot {
841 completion_notifier,
842 } => {
843 completion_notifier.notify(crate::task::TaskCompletion::Called);
844 }
845 }
846
847 expired_tasks.push(WheelAdvanceResult {
848 id: task_id,
849 callback: task.callback,
850 });
851 }
852 }
853
854 // Reinsert periodic tasks for next interval
855 // 重新插入周期性任务到下一个周期
856 for (task_id, task) in periodic_tasks_to_reinsert {
857 self.reinsert_periodic_task(task_id, task);
858 }
859
860 // Process L1 layer
861 // Check if L1 layer needs to be advanced
862 // 处理 L1 层
863 // 检查是否需要推进 L1 层
864 if self.l0.current_tick.is_multiple_of(self.l1_tick_ratio) {
865 self.l1.current_tick += 1;
866 let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
867 let l1_slot = &mut self.l1.slots[l1_slot_index];
868
869 // Collect L1 layer expired tasks
870 // 收集 L1 层过期任务
871 let mut tasks_to_demote = Vec::new();
872 let mut i = 0;
873 while i < l1_slot.len() {
874 let task = &mut l1_slot[i];
875
876 if task.rounds > 0 {
877 // Still has rounds, decrease rounds and keep
878 // 还有轮数,减少轮数并保留
879 task.rounds -= 1;
880 // Use DeferredMap's get_mut with TaskId key
881 // 使用 DeferredMap 的 get_mut,传入 TaskId key
882 if let Some(location) = self.task_index.get_mut(task.get_id().key()) {
883 location.vec_index = i;
884 }
885 i += 1;
886 } else {
887 // rounds = 0, need to demote to L0
888 // Don't remove from task_index - will update location in demote_tasks
889 // rounds = 0,需要降级到 L0
890 // 不从 task_index 中移除 - 在 demote_tasks 中会更新位置
891 let task_to_demote = l1_slot.swap_remove(i);
892
893 if i < l1_slot.len() {
894 let swapped_task_id = l1_slot[i].get_id();
895 // Use DeferredMap's get_mut with TaskId key
896 // 使用 DeferredMap 的 get_mut,传入 TaskId key
897 if let Some(swapped_location) =
898 self.task_index.get_mut(swapped_task_id.key())
899 {
900 swapped_location.vec_index = i;
901 }
902 }
903
904 tasks_to_demote.push(task_to_demote);
905 }
906 }
907
908 // Demote tasks to L0
909 // 将任务降级到 L0
910 self.demote_tasks(tasks_to_demote);
911 }
912
913 expired_tasks
914 }
915
916 /// Demote tasks from L1 to L0
917 ///
918 /// Recalculate and insert L1 expired tasks into L0 layer
919 ///
920 /// Updates task location in DeferredMap without removing/reinserting
921 ///
922 /// 将任务从 L1 降级到 L0
923 ///
924 /// 重新计算并将 L1 过期任务插入到 L0 层
925 ///
926 /// 更新 DeferredMap 中的任务位置而不删除/重新插入
927 fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
928 for task in tasks {
929 // Calculate the remaining delay of the task in L0 layer
930 // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
931 // 计算任务在 L0 层的剩余延迟
932 // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
933 let l1_tick_ratio = self.l1_tick_ratio;
934
935 // Calculate the original expiration time of the task (L1 tick)
936 // 计算任务的原始过期时间(L1 tick)
937 let l1_deadline = task.deadline_tick;
938
939 // Convert to L0 tick expiration time
940 // 转换为 L0 tick 过期时间
941 let l0_deadline_tick = l1_deadline * l1_tick_ratio;
942 let l0_current_tick = self.l0.current_tick;
943
944 // Calculate remaining L0 ticks
945 // 计算剩余 L0 ticks
946 let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
947 l0_deadline_tick - l0_current_tick
948 } else {
949 1 // At least trigger in next tick (至少在下一个 tick 触发)
950 };
951
952 // Calculate L0 slot index
953 // 计算 L0 槽索引
954 let target_l0_tick = l0_current_tick + remaining_l0_ticks;
955 let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
956
957 let task_id = task.get_id();
958 let vec_index = self.l0.slots[l0_slot_index].len();
959 let new_location = TaskLocation::new(0, l0_slot_index, vec_index);
960
961 // Insert into L0 layer
962 // 插入到 L0 层
963 self.l0.slots[l0_slot_index].push(task);
964
965 // Update task location in DeferredMap (task already exists in index)
966 // 更新 DeferredMap 中的任务位置(任务已在索引中)
967 if let Some(location) = self.task_index.get_mut(task_id.key()) {
968 *location = new_location;
969 }
970 }
971 }
972
973 /// Check if the timing wheel is empty
974 ///
975 /// 检查时间轮是否为空
976 #[allow(dead_code)]
977 pub fn is_empty(&self) -> bool {
978 self.task_index.is_empty()
979 }
980
981 /// Postpone timer task (keep original TaskId)
982 ///
983 /// # Parameters
984 /// - `task_id`: Task ID to postpone
985 /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
986 /// - `new_callback`: New callback function (if None, keep original callback)
987 ///
988 /// # Returns
989 /// Returns true if the task exists and is successfully postponed, otherwise returns false
990 ///
991 /// # Implementation Details
992 /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
993 /// - Update delay time and callback function (if provided)
994 /// - Recalculate target layer, slot, and rounds based on new_delay
995 /// - Cross-layer migration may occur (L0 <-> L1)
996 /// - Re-insert to new position using original TaskId
997 /// - Keep consistent with external held TaskId reference
998 ///
999 /// 延期定时器任务(保留原始 TaskId)
1000 ///
1001 /// # 参数
1002 /// - `task_id`: 要延期的任务 ID
1003 /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
1004 /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
1005 ///
1006 /// # 返回值
1007 /// 如果任务存在且成功延期则返回 true,否则返回 false
1008 ///
1009 /// # 实现细节
1010 /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
1011 /// - 更新延迟时间和回调函数(如果提供)
1012 /// - 根据 new_delay 重新计算目标层级、槽位和轮数
1013 /// - 可能发生跨层迁移(L0 <-> L1)
1014 /// - 使用原始 TaskId 重新插入到新位置
1015 /// - 与外部持有的 TaskId 引用保持一致
1016 #[inline]
1017 pub fn postpone(
1018 &mut self,
1019 task_id: TaskId,
1020 new_delay: Duration,
1021 new_callback: Option<crate::task::CallbackWrapper>,
1022 ) -> bool {
1023 // Step 1: Get task location from DeferredMap (don't remove)
1024 // 步骤 1: 从 DeferredMap 获取任务位置(不删除)
1025 let old_location = match self.task_index.get(task_id.key()) {
1026 Some(loc) => *loc,
1027 None => return false,
1028 };
1029
1030 // Use match to get slot reference
1031 // 使用 match 获取槽引用
1032 let slot = match old_location.level {
1033 0 => &mut self.l0.slots[old_location.slot_index],
1034 _ => &mut self.l1.slots[old_location.slot_index],
1035 };
1036
1037 // Verify task is still at expected position
1038 // 验证任务仍在预期位置
1039 if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id
1040 {
1041 // Index inconsistent, return failure
1042 // 索引不一致,返回失败
1043 return false;
1044 }
1045
1046 // Use swap_remove to remove task from slot
1047 // 使用 swap_remove 从槽中移除任务
1048 let mut task = slot.swap_remove(old_location.vec_index);
1049
1050 // Update swapped element's index (if a swap occurred)
1051 // 更新被交换元素的索引(如果发生了交换)
1052 if old_location.vec_index < slot.len() {
1053 let swapped_task_id = slot[old_location.vec_index].get_id();
1054 // Use DeferredMap's get_mut with TaskId key
1055 // 使用 DeferredMap 的 get_mut,传入 TaskId key
1056 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
1057 swapped_location.vec_index = old_location.vec_index;
1058 }
1059 }
1060
1061 // Step 2: Update task's delay and callback
1062 // 步骤 2: 更新任务的延迟和回调
1063 task.update_delay(new_delay);
1064 if let Some(callback) = new_callback {
1065 task.update_callback(callback);
1066 }
1067
1068 // Step 3: Recalculate layer, slot, and rounds based on new delay
1069 // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
1070 let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
1071
1072 // Use match to reduce branches, and use cached slot mask
1073 // 使用 match 减少分支,并使用缓存的槽掩码
1074 let (current_tick, slot_mask, slots) = match new_level {
1075 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
1076 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
1077 };
1078
1079 let total_ticks = current_tick + ticks;
1080 let new_slot_index = (total_ticks as usize) & slot_mask;
1081
1082 // Update task's timing wheel parameters
1083 // 更新任务的时间轮参数
1084 task.deadline_tick = total_ticks;
1085 task.rounds = new_rounds;
1086
1087 // Step 4: Re-insert task to new layer/slot
1088 // 步骤 4: 将任务重新插入到新层级/槽位
1089 let new_vec_index = slots[new_slot_index].len();
1090 let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
1091
1092 slots[new_slot_index].push(task);
1093
1094 // Update task location in DeferredMap (task already exists in index)
1095 // 更新 DeferredMap 中的任务位置(任务已在索引中)
1096 if let Some(location) = self.task_index.get_mut(task_id.key()) {
1097 *location = new_location;
1098 }
1099
1100 true
1101 }
1102
1103 /// Batch postpone timer tasks
1104 ///
1105 /// # Parameters
1106 /// - `updates`: List of tuples of (task ID, new delay)
1107 ///
1108 /// # Returns
1109 /// Number of successfully postponed tasks
1110 ///
1111 /// # Performance Advantages
1112 /// - Batch processing reduces function call overhead
1113 /// - All delays are recalculated from current_tick at call time
1114 ///
1115 /// # Notes
1116 /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
1117 ///
1118 /// 批量延期定时器任务
1119 ///
1120 /// # 参数
1121 /// - `updates`: (任务 ID, 新延迟) 元组列表
1122 ///
1123 /// # 返回值
1124 /// 成功延期的任务数量
1125 ///
1126 /// # 性能优势
1127 /// - 批处理减少函数调用开销
1128 /// - 所有延迟在调用时从 current_tick 重新计算
1129 ///
1130 /// # 注意
1131 /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
1132 #[inline]
1133 pub fn postpone_batch(&mut self, updates: Vec<(TaskId, Duration)>) -> usize {
1134 let mut postponed_count = 0;
1135
1136 for (task_id, new_delay) in updates {
1137 if self.postpone(task_id, new_delay, None) {
1138 postponed_count += 1;
1139 }
1140 }
1141
1142 postponed_count
1143 }
1144
1145 /// Batch postpone timer tasks (replace callbacks)
1146 ///
1147 /// # Parameters
1148 /// - `updates`: List of tuples of (task ID, new delay, new callback)
1149 ///
1150 /// # Returns
1151 /// Number of successfully postponed tasks
1152 ///
1153 /// 批量延期定时器任务(替换回调)
1154 ///
1155 /// # 参数
1156 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1157 ///
1158 /// # 返回值
1159 /// 成功延期的任务数量
1160 pub fn postpone_batch_with_callbacks(
1161 &mut self,
1162 updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1163 ) -> usize {
1164 let mut postponed_count = 0;
1165
1166 for (task_id, new_delay, new_callback) in updates {
1167 if self.postpone(task_id, new_delay, new_callback) {
1168 postponed_count += 1;
1169 }
1170 }
1171
1172 postponed_count
1173 }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use super::*;
1179 use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1180
1181 #[test]
1182 fn test_wheel_creation() {
1183 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1184 assert_eq!(wheel.slot_count(), 512);
1185 assert_eq!(wheel.current_tick(), 0);
1186 assert!(wheel.is_empty());
1187 }
1188
1189 #[test]
1190 fn test_hierarchical_wheel_creation() {
1191 let config = WheelConfig::default();
1192
1193 let wheel = Wheel::new(config, BatchConfig::default());
1194 assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1195 assert_eq!(wheel.current_tick(), 0);
1196 assert!(wheel.is_empty());
1197 // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1198 assert_eq!(wheel.l1.slot_count, 64);
1199 assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1200 }
1201
1202 #[test]
1203 fn test_hierarchical_config_validation() {
1204 // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1205 let result = WheelConfig::builder()
1206 .l0_tick_duration(Duration::from_millis(10))
1207 .l0_slot_count(512)
1208 .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1209 .l1_slot_count(64)
1210 .build();
1211
1212 assert!(result.is_err());
1213
1214 // Correct configuration (正确的配置)
1215 let result = WheelConfig::builder()
1216 .l0_tick_duration(Duration::from_millis(10))
1217 .l0_slot_count(512)
1218 .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1219 .l1_slot_count(64)
1220 .build();
1221
1222 assert!(result.is_ok());
1223 }
1224
1225 #[test]
1226 fn test_layer_determination() {
1227 let config = WheelConfig::default();
1228
1229 let wheel = Wheel::new(config, BatchConfig::default());
1230
1231 // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1232 // L0: 512 slots * 10ms = 5120ms
1233 let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1234 assert_eq!(level, 0);
1235 assert_eq!(rounds, 0);
1236
1237 // Long delay should enter L1 layer
1238 // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1239 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1240 assert_eq!(level, 1);
1241 assert_eq!(rounds, 0);
1242
1243 // Long delay should enter L1 layer and have rounds
1244 // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1245 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1246 assert_eq!(level, 1);
1247 assert!(rounds > 0);
1248 }
1249
1250 #[test]
1251 fn test_hierarchical_insert_and_advance() {
1252 let config = WheelConfig::default();
1253
1254 let mut wheel = Wheel::new(config, BatchConfig::default());
1255
1256 // Insert short delay task into L0 (插入短延迟任务到 L0)
1257 let callback = CallbackWrapper::new(|| async {});
1258 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1259 let (task_with_notifier, _completion_rx) =
1260 TimerTaskWithCompletionNotifier::from_timer_task(task);
1261 let handle = wheel.allocate_handle();
1262 let task_id = handle.task_id();
1263 wheel.insert(handle, task_with_notifier);
1264
1265 // Verify task is in L0 layer (验证任务在 L0 层)
1266 let location = wheel.task_index.get(task_id.key()).unwrap();
1267 assert_eq!(location.level, 0);
1268
1269 // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1270 for _ in 0..10 {
1271 let expired = wheel.advance();
1272 if !expired.is_empty() {
1273 assert_eq!(expired.len(), 1);
1274 assert_eq!(expired[0].id, task_id);
1275 return;
1276 }
1277 }
1278 panic!("Task should have expired");
1279 }
1280
1281 #[test]
1282 fn test_cross_layer_cancel() {
1283 let config = WheelConfig::default();
1284
1285 let mut wheel = Wheel::new(config, BatchConfig::default());
1286
1287 // Insert L0 task (插入 L0 任务)
1288 let callback1 = CallbackWrapper::new(|| async {});
1289 let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1290 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1291 let handle1 = wheel.allocate_handle();
1292 let task_id1 = handle1.task_id();
1293 wheel.insert(handle1, task_with_notifier1);
1294
1295 // Insert L1 task (插入 L1 任务)
1296 let callback2 = CallbackWrapper::new(|| async {});
1297 let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1298 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1299 let handle2 = wheel.allocate_handle();
1300 let task_id2 = handle2.task_id();
1301 wheel.insert(handle2, task_with_notifier2);
1302
1303 // Verify levels (验证层级)
1304 assert_eq!(wheel.task_index.get(task_id1.key()).unwrap().level, 0);
1305 assert_eq!(wheel.task_index.get(task_id2.key()).unwrap().level, 1);
1306
1307 // Cancel L0 task (取消 L0 任务)
1308 assert!(wheel.cancel(task_id1));
1309 assert!(wheel.task_index.get(task_id1.key()).is_none());
1310
1311 // Cancel L1 task (取消 L1 任务)
1312 assert!(wheel.cancel(task_id2));
1313 assert!(wheel.task_index.get(task_id2.key()).is_none());
1314
1315 assert!(wheel.is_empty()); // 时间轮应该为空
1316 }
1317
1318 #[test]
1319 fn test_delay_to_ticks() {
1320 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1321 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1322 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1323 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1324 }
1325
1326 #[test]
1327 fn test_wheel_invalid_slot_count() {
1328 let result = WheelConfig::builder().l0_slot_count(100).build();
1329 assert!(result.is_err());
1330 if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1331 assert_eq!(slot_count, 100);
1332 assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1333 } else {
1334 panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1335 }
1336 }
1337
1338 #[test]
1339 fn test_minimum_delay() {
1340 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1341
1342 // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1343 let callback = CallbackWrapper::new(|| async {});
1344 let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1345 let (task_with_notifier, _completion_rx) =
1346 TimerTaskWithCompletionNotifier::from_timer_task(task);
1347 let handle = wheel.allocate_handle();
1348 let task_id: TaskId = handle.task_id();
1349 wheel.insert(handle, task_with_notifier);
1350
1351 // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1352 let expired = wheel.advance();
1353 assert_eq!(
1354 expired.len(),
1355 1,
1356 "Minimum delay task should be triggered after 1 tick"
1357 ); // 最小延迟任务应该在 1 个 tick 后触发
1358 assert_eq!(expired[0].id, task_id);
1359 }
1360
1361 #[test]
1362 fn test_advance_empty_slots() {
1363 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1364
1365 // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1366 for _ in 0..100 {
1367 let expired = wheel.advance();
1368 assert!(
1369 expired.is_empty(),
1370 "Empty slots should not return any tasks"
1371 );
1372 }
1373
1374 assert_eq!(
1375 wheel.current_tick(),
1376 100,
1377 "current_tick should correctly increment"
1378 ); // current_tick 应该正确递增
1379 }
1380
1381 #[test]
1382 fn test_slot_boundary() {
1383 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1384
1385 // Test slot boundary and wraparound (测试槽边界和环绕)
1386 // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1387 let callback1 = CallbackWrapper::new(|| async {});
1388 let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1389 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1390 let handle1 = wheel.allocate_handle();
1391 let task_id_1 = handle1.task_id();
1392 wheel.insert(handle1, task_with_notifier1);
1393
1394 // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发)
1395 let callback2 = CallbackWrapper::new(|| async {});
1396 let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1397 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1398 let handle2 = wheel.allocate_handle();
1399 let task_id_2 = handle2.task_id();
1400 wheel.insert(handle2, task_with_notifier2);
1401
1402 // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1403 let expired = wheel.advance();
1404 assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1405 assert_eq!(expired[0].id, task_id_1);
1406
1407 // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1408 let mut triggered = false;
1409 for i in 0..510 {
1410 let expired = wheel.advance();
1411 if !expired.is_empty() {
1412 assert_eq!(
1413 expired.len(),
1414 1,
1415 "The {}th advance should trigger the second task",
1416 i + 2
1417 ); // 第 {i + 2} 次前进应该触发第二个任务
1418 assert_eq!(expired[0].id, task_id_2);
1419 triggered = true;
1420 break;
1421 }
1422 }
1423 assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1424
1425 assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1426 }
1427
1428 #[test]
1429 fn test_task_id_uniqueness() {
1430 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1431
1432 // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1433 let mut task_ids = std::collections::HashSet::new();
1434 for _ in 0..100 {
1435 let callback = CallbackWrapper::new(|| async {});
1436 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1437 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1438 let handle = wheel.allocate_handle();
1439 let task_id = handle.task_id();
1440 wheel.insert(handle, task_with_notifier);
1441
1442 assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1443 }
1444
1445 assert_eq!(task_ids.len(), 100);
1446 }
1447}