kestrel_timer/wheel.rs
1use crate::CallbackWrapper;
2use crate::config::{BatchConfig, WheelConfig};
3use crate::task::{TaskCompletion, TaskHandle, TaskId, TaskLocation, TaskTypeWithCompletionNotifier, TimerTaskForWheel, TimerTaskWithCompletionNotifier};
4use deferred_map::DeferredMap;
5use std::time::Duration;
6
7pub struct WheelAdvanceResult {
8 pub id: TaskId,
9 pub callback: Option<CallbackWrapper>
10}
11
12/// Timing wheel single layer data structure
13///
14/// 时间轮单层数据结构
15struct WheelLayer {
16 /// Slot array, each slot stores a group of timer tasks
17 ///
18 /// 槽数组,每个槽存储一组定时器任务
19 slots: Vec<Vec<TimerTaskForWheel>>,
20
21 /// Current time pointer (tick index)
22 ///
23 /// 当前时间指针(tick 索引)
24 current_tick: u64,
25
26 /// Slot count
27 ///
28 /// 槽数量
29 slot_count: usize,
30
31 /// Duration of each tick
32 ///
33 /// 每个 tick 的持续时间
34 tick_duration: Duration,
35
36 /// Cache: tick duration in milliseconds (u64) - avoid repeated conversion
37 ///
38 /// 缓存:tick 持续时间(毫秒,u64)- 避免重复转换
39 tick_duration_ms: u64,
40
41 /// Cache: slot mask (slot_count - 1) - for fast modulo operation
42 ///
43 /// 缓存:槽掩码(slot_count - 1)- 用于快速取模运算
44 slot_mask: usize,
45}
46
47impl WheelLayer {
48 /// Create a new wheel layer
49 ///
50 /// 创建一个新的时间轮层
51 fn new(slot_count: usize, tick_duration: Duration) -> Self {
52 let mut slots = Vec::with_capacity(slot_count);
53 // Pre-allocate capacity for each slot to reduce subsequent reallocation during push
54 // Most slots typically contain 0-4 tasks, pre-allocate capacity of 4
55 //
56 // 为每个槽预分配容量以减少后续 push 时的重新分配
57 // 大多数槽通常包含 0-4 个任务,预分配容量为 4
58 for _ in 0..slot_count {
59 slots.push(Vec::with_capacity(4));
60 }
61
62 let tick_duration_ms = tick_duration.as_millis() as u64;
63 let slot_mask = slot_count - 1;
64
65 Self {
66 slots,
67 current_tick: 0,
68 slot_count,
69 tick_duration,
70 tick_duration_ms,
71 slot_mask,
72 }
73 }
74
75 /// Calculate the number of ticks corresponding to the delay
76 ///
77 /// 计算延迟对应的 tick 数量
78 fn delay_to_ticks(&self, delay: Duration) -> u64 {
79 let ticks = delay.as_millis() as u64 / self.tick_duration.as_millis() as u64;
80 ticks.max(1) // at least 1 tick (至少 1 个 tick)
81 }
82}
83
84/// Timing wheel data structure (hierarchical mode)
85///
86/// Now uses DeferredMap for O(1) task lookup and generational safety.
87///
88/// 时间轮数据结构(分层模式)
89///
90/// 现在使用 DeferredMap 实现 O(1) 任务查找和代数安全
91pub struct Wheel {
92 /// L0 layer (bottom layer)
93 ///
94 /// L0 层(底层)
95 l0: WheelLayer,
96
97 /// L1 layer (top layer)
98 ///
99 /// L1 层(顶层)
100 l1: WheelLayer,
101
102 /// L1 tick ratio relative to L0 tick
103 ///
104 /// L1 tick 相对于 L0 tick 的比率
105 pub(crate) l1_tick_ratio: u64,
106
107 /// Task index for fast lookup and cancellation using DeferredMap
108 ///
109 /// Keys are TaskIds (u64 from DeferredMap), values are TaskLocations
110 ///
111 /// 任务索引,使用 DeferredMap 实现快速查找和取消
112 ///
113 /// 键是 TaskId(来自 DeferredMap 的 u64),值是 TaskLocation
114 pub(crate) task_index: DeferredMap<TaskLocation>,
115
116 /// Batch processing configuration
117 ///
118 /// 批处理配置
119 batch_config: BatchConfig,
120
121 /// Cache: L0 layer capacity in milliseconds - avoid repeated calculation
122 ///
123 /// 缓存:L0 层容量(毫秒)- 避免重复计算
124 l0_capacity_ms: u64,
125
126 /// Cache: L1 layer capacity in ticks - avoid repeated calculation
127 ///
128 /// 缓存:L1 层容量(tick 数)- 避免重复计算
129 l1_capacity_ticks: u64,
130}
131
132impl Wheel {
133 /// Create new timing wheel
134 ///
135 /// # Parameters
136 /// - `config`: Timing wheel configuration (already validated)
137 /// - `batch_config`: Batch processing configuration
138 ///
139 /// # Notes
140 /// Configuration parameters have been validated in WheelConfig::builder().build(), so this method will not fail.
141 /// Now uses DeferredMap for task indexing with generational safety.
142 ///
143 /// 创建新的时间轮
144 ///
145 /// # 参数
146 /// - `config`: 时间轮配置(已验证)
147 /// - `batch_config`: 批处理配置
148 ///
149 /// # 注意
150 /// 配置参数已在 WheelConfig::builder().build() 中验证,因此此方法不会失败。
151 /// 现在使用 DeferredMap 进行任务索引,具有代数安全特性。
152 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
153 let l0 = WheelLayer::new(config.l0_slot_count, config.l0_tick_duration);
154 let l1 = WheelLayer::new(config.l1_slot_count, config.l1_tick_duration);
155
156 // Calculate L1 tick ratio relative to L0 tick
157 // 计算 L1 tick 相对于 L0 tick 的比率
158 let l1_tick_ratio = l1.tick_duration_ms / l0.tick_duration_ms;
159
160 // Pre-calculate capacity to avoid repeated calculation in insert
161 // 预计算容量,避免在 insert 中重复计算
162 let l0_capacity_ms = (l0.slot_count as u64) * l0.tick_duration_ms;
163 let l1_capacity_ticks = l1.slot_count as u64;
164
165 // Estimate initial capacity for DeferredMap based on L0 slot count
166 // 根据 L0 槽数量估算 DeferredMap 的初始容量
167 let estimated_capacity = (l0.slot_count / 4).max(64);
168
169 Self {
170 l0,
171 l1,
172 l1_tick_ratio,
173 task_index: DeferredMap::with_capacity(estimated_capacity),
174 batch_config,
175 l0_capacity_ms,
176 l1_capacity_ticks,
177 }
178 }
179
180 /// Get current tick (L0 layer tick)
181 ///
182 /// 获取当前 tick(L0 层 tick)
183 #[allow(dead_code)]
184 pub fn current_tick(&self) -> u64 {
185 self.l0.current_tick
186 }
187
188 /// Get tick duration (L0 layer tick duration)
189 ///
190 /// 获取 tick 持续时间(L0 层 tick 持续时间)
191 #[allow(dead_code)]
192 pub fn tick_duration(&self) -> Duration {
193 self.l0.tick_duration
194 }
195
196 /// Get slot count (L0 layer slot count)
197 ///
198 /// 获取槽数量(L0 层槽数量)
199 #[allow(dead_code)]
200 pub fn slot_count(&self) -> usize {
201 self.l0.slot_count
202 }
203
204 /// Calculate the number of ticks corresponding to the delay (based on L0 layer)
205 ///
206 /// 计算延迟对应的 tick 数量(基于 L0 层)
207 #[allow(dead_code)]
208 pub fn delay_to_ticks(&self, delay: Duration) -> u64 {
209 self.l0.delay_to_ticks(delay)
210 }
211
212 /// Determine which layer the delay should be inserted into
213 ///
214 /// # Returns
215 /// Returns: (layer, ticks, rounds)
216 /// - Layer: 0 = L0, 1 = L1
217 /// - Ticks: number of ticks calculated from current tick
218 /// - Rounds: number of rounds (only used for very long delays in L1 layer)
219 ///
220 /// 确定延迟应该插入到哪一层
221 ///
222 /// # 返回值
223 /// 返回:(层级, ticks, 轮数)
224 /// - 层级:0 = L0, 1 = L1
225 /// - Ticks:从当前 tick 计算的 tick 数量
226 /// - 轮数:轮数(仅用于 L1 层的超长延迟)
227 #[inline(always)]
228 fn determine_layer(&self, delay: Duration) -> (u8, u64, u32) {
229 let delay_ms = delay.as_millis() as u64;
230
231 // Fast path: most tasks are within L0 range (using cached capacity)
232 // 快速路径:大多数任务在 L0 范围内(使用缓存的容量)
233 if delay_ms < self.l0_capacity_ms {
234 let l0_ticks = (delay_ms / self.l0.tick_duration_ms).max(1);
235 return (0, l0_ticks, 0);
236 }
237
238 // Slow path: L1 layer tasks (using cached values)
239 // 慢速路径:L1 层任务(使用缓存的值)
240 let l1_ticks = (delay_ms / self.l1.tick_duration_ms).max(1);
241
242 if l1_ticks < self.l1_capacity_ticks {
243 (1, l1_ticks, 0)
244 } else {
245 let rounds = (l1_ticks / self.l1_capacity_ticks) as u32;
246 (1, l1_ticks, rounds)
247 }
248 }
249
250 /// Allocate a handle from DeferredMap
251 ///
252 /// 从 DeferredMap 分配一个 handle
253 ///
254 /// # Returns
255 /// A unique handle for later insertion
256 ///
257 /// # 返回值
258 /// 用于后续插入的唯一 handle
259 pub fn allocate_handle(&mut self) -> TaskHandle {
260 TaskHandle::new(self.task_index.allocate_handle())
261 }
262
263 /// Batch allocate handles from DeferredMap
264 ///
265 /// 从 DeferredMap 批量分配 handles
266 ///
267 /// # Parameters
268 /// - `count`: Number of handles to allocate
269 ///
270 /// # Returns
271 /// Vector of unique handles for later batch insertion
272 ///
273 /// # 参数
274 /// - `count`: 要分配的 handle 数量
275 ///
276 /// # 返回值
277 /// 用于后续批量插入的唯一 handles 向量
278 pub fn allocate_handles(&mut self, count: usize) -> Vec<TaskHandle> {
279 let mut handles = Vec::with_capacity(count);
280 for _ in 0..count {
281 handles.push(TaskHandle::new(self.task_index.allocate_handle()));
282 }
283 handles
284 }
285
286 /// Insert timer task
287 ///
288 /// # Parameters
289 /// - `handle`: Handle for the task
290 /// - `task`: Timer task with completion notifier
291 ///
292 /// # Returns
293 /// Unique identifier of the task (TaskId) - now generated from DeferredMap
294 ///
295 /// # Implementation Details
296 /// - Allocates Handle from DeferredMap to generate TaskId with generational safety
297 /// - Automatically calculate the layer and slot where the task should be inserted
298 /// - Hierarchical mode: short delay tasks are inserted into L0, long delay tasks are inserted into L1
299 /// - Use bit operations to optimize slot index calculation
300 /// - Use DeferredMap for O(1) lookup and cancellation with generation checking
301 ///
302 /// 插入定时器任务
303 ///
304 /// # 参数
305 /// - `handle`: 任务的 handle
306 /// - `task`: 带完成通知器的定时器任务
307 ///
308 /// # 返回值
309 /// 任务的唯一标识符(TaskId)- 现在从 DeferredMap 生成
310 ///
311 /// # 实现细节
312 /// - 自动计算任务应该插入的层级和槽位
313 /// - 分层模式:短延迟任务插入 L0,长延迟任务插入 L1
314 /// - 使用位运算优化槽索引计算
315 /// - 使用 DeferredMap 实现 O(1) 查找和取消,带代数检查
316 #[inline]
317 pub fn insert(&mut self, handle: TaskHandle, task: TimerTaskWithCompletionNotifier) {
318 let task_id = handle.task_id();
319
320 let (level, ticks, rounds) = self.determine_layer(task.delay);
321
322 // Use match to reduce branches, and use cached slot mask
323 // 使用 match 减少分支,并使用缓存的槽掩码
324 let (current_tick, slot_mask, slots) = match level {
325 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
326 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
327 };
328
329 let total_ticks = current_tick + ticks;
330 let slot_index = (total_ticks as usize) & slot_mask;
331
332 // Create task with the assigned TaskId
333 // 使用分配的 TaskId 创建任务
334 let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
335
336 // Get the index position of the task in Vec (the length before insertion is the index of the new task)
337 // 获取任务在 Vec 中的索引位置(插入前的长度就是新任务的索引)
338 let vec_index = slots[slot_index].len();
339 let location = TaskLocation::new(level, slot_index, vec_index);
340
341 // Insert task into slot
342 // 将任务插入槽中
343 slots[slot_index].push(task);
344
345 // Insert task location into DeferredMap using handle
346 // In non-debug mode, it is guaranteed that this method will not panic.
347 // 使用 handle 将任务位置插入 DeferredMap
348 // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
349 self.task_index.insert(handle.into_handle(), location)
350 .expect("Handle should be valid for insertion");
351 }
352
353 /// Batch insert timer tasks
354 ///
355 /// # Parameters
356 /// - `handles`: List of pre-allocated handles for tasks
357 /// - `tasks`: List of timer tasks with completion notifiers
358 ///
359 /// # Returns
360 /// - `Ok(())` if all tasks are successfully inserted
361 /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
362 ///
363 /// # Performance Advantages
364 /// - Reduce repeated boundary checks and capacity adjustments
365 /// - For tasks with the same delay, calculation results can be reused
366 /// - Uses DeferredMap for efficient task indexing with generational safety
367 ///
368 /// 批量插入定时器任务
369 ///
370 /// # 参数
371 /// - `handles`: 任务的预分配 handles 列表
372 /// - `tasks`: 带完成通知器的定时器任务列表
373 ///
374 /// # 返回值
375 /// - `Ok(())` 如果所有任务成功插入
376 /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
377 ///
378 /// # 性能优势
379 /// - 减少重复的边界检查和容量调整
380 /// - 对于相同延迟的任务,可以重用计算结果
381 /// - 使用 DeferredMap 实现高效的任务索引,具有代数安全特性
382 #[inline]
383 pub fn insert_batch(
384 &mut self,
385 handles: Vec<TaskHandle>,
386 tasks: Vec<TimerTaskWithCompletionNotifier>
387 ) -> Result<(), crate::error::TimerError> {
388 // Validate that handles and tasks have the same length
389 // 验证 handles 和 tasks 长度相同
390 if handles.len() != tasks.len() {
391 return Err(crate::error::TimerError::BatchLengthMismatch {
392 handles_len: handles.len(),
393 tasks_len: tasks.len(),
394 });
395 }
396
397 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
398 let task_id = handle.task_id();
399
400 let (level, ticks, rounds) = self.determine_layer(task.delay);
401
402 // Use match to reduce branches, and use cached slot mask
403 // 使用 match 减少分支,并使用缓存的槽掩码
404 let (current_tick, slot_mask, slots) = match level {
405 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
406 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
407 };
408
409 let total_ticks = current_tick + ticks;
410 let slot_index = (total_ticks as usize) & slot_mask;
411
412 // Create task with the assigned TaskId
413 // 使用分配的 TaskId 创建任务
414 let task = TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds);
415
416 // Get the index position of the task in Vec
417 // 获取任务在 Vec 中的索引位置
418 let vec_index = slots[slot_index].len();
419 let location = TaskLocation::new(level, slot_index, vec_index);
420
421 // Insert task into slot
422 // 将任务插入槽中
423 slots[slot_index].push(task);
424
425 // Insert task location into DeferredMap using handle
426 // In non-debug mode, it is guaranteed that this method will not panic.
427 // 使用 handle 将任务位置插入 DeferredMap
428 // 由于 deferred_map::DeferredMap 是我实现的,底层保证非debug模式下不会panic
429 self.task_index.insert(handle.into_handle(), location)
430 .expect("Handle should be valid for insertion");
431 }
432
433 Ok(())
434 }
435
436 /// Cancel timer task
437 ///
438 /// # Parameters
439 /// - `task_id`: Task ID
440 ///
441 /// # Returns
442 /// Returns true if the task exists and is successfully cancelled, otherwise returns false
443 ///
444 /// Now uses DeferredMap for safe task removal with generational checking
445 ///
446 /// 取消定时器任务
447 ///
448 /// # 参数
449 /// - `task_id`: 任务 ID
450 ///
451 /// # 返回值
452 /// 如果任务存在且成功取消则返回 true,否则返回 false
453 ///
454 /// 现在使用 DeferredMap 实现安全的任务移除,带代数检查
455 #[inline]
456 pub fn cancel(&mut self, task_id: TaskId) -> bool {
457 // Remove task location from DeferredMap using TaskId as key
458 // 使用 TaskId 作为 key 从 DeferredMap 中移除任务位置
459 let location = match self.task_index.remove(task_id.key()) {
460 Some(loc) => loc,
461 None => return false, // Task not found or already removed (generation mismatch)
462 };
463
464 // Use match to get slot reference, reduce branches
465 // 使用 match 获取槽引用,减少分支
466 let slot = match location.level {
467 0 => &mut self.l0.slots[location.slot_index],
468 _ => &mut self.l1.slots[location.slot_index],
469 };
470
471 // Boundary check and ID verification
472 // 边界检查和 ID 验证
473 if location.vec_index >= slot.len() || slot[location.vec_index].get_id() != task_id {
474 // Index inconsistent - this shouldn't happen with DeferredMap, but handle it anyway
475 // 索引不一致 - DeferredMap 不应该出现这种情况,但还是处理一下
476 return false;
477 }
478
479 // Use swap_remove to remove task, record swapped task ID
480 // 使用 swap_remove 移除任务,记录被交换的任务 ID
481 let removed_task = slot.swap_remove(location.vec_index);
482
483 // Ensure the correct task was removed
484 // 确保移除了正确的任务
485 debug_assert_eq!(removed_task.get_id(), task_id);
486
487 match removed_task.into_task_type() {
488 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
489 // Use Notify + AtomicU8 for zero-allocation notification
490 // 使用 Notify + AtomicU8 实现零分配通知
491 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
492 }
493 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
494 // Use flume for high-performance periodic notification
495 // 使用 flume 实现高性能周期通知
496 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
497 }
498 }
499
500 // If a swap occurred (vec_index is not the last element)
501 // 如果发生了交换(vec_index 不是最后一个元素)
502 if location.vec_index < slot.len() {
503 let swapped_task_id = slot[location.vec_index].get_id();
504 // Update swapped element's index in one go, using DeferredMap's get_mut
505 // 一次性更新被交换元素的索引,使用 DeferredMap 的 get_mut
506 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
507 swapped_location.vec_index = location.vec_index;
508 }
509 }
510
511 true
512 }
513
514 /// Batch cancel timer tasks
515 ///
516 /// # Parameters
517 /// - `task_ids`: List of task IDs to cancel
518 ///
519 /// # Returns
520 /// Number of successfully cancelled tasks
521 ///
522 /// # Performance Advantages
523 /// - Reduce repeated HashMap lookup overhead
524 /// - Multiple cancellation operations on the same slot can be batch processed
525 /// - Use unstable sort to improve performance
526 /// - Small batch optimization: skip sorting based on configuration threshold, process directly
527 ///
528 /// 批量取消定时器任务
529 ///
530 /// # 参数
531 /// - `task_ids`: 要取消的任务 ID 列表
532 ///
533 /// # 返回值
534 /// 成功取消的任务数量
535 ///
536 /// # 性能优势
537 /// - 减少重复的 HashMap 查找开销
538 /// - 同一槽位的多个取消操作可以批量处理
539 /// - 使用不稳定排序提高性能
540 /// - 小批量优化:根据配置阈值跳过排序,直接处理
541 #[inline]
542 pub fn cancel_batch(&mut self, task_ids: &[TaskId]) -> usize {
543 let mut cancelled_count = 0;
544
545 // Small batch optimization: cancel one by one to avoid grouping and sorting overhead
546 // 小批量优化:逐个取消以避免分组和排序开销
547 if task_ids.len() <= self.batch_config.small_batch_threshold {
548 for &task_id in task_ids {
549 if self.cancel(task_id) {
550 cancelled_count += 1;
551 }
552 }
553 return cancelled_count;
554 }
555
556 // Group by layer and slot to optimize batch cancellation
557 // Use SmallVec to avoid heap allocation in most cases
558 // 按层级和槽位分组以优化批量取消
559 // 使用 SmallVec 避免在大多数情况下进行堆分配
560 let l0_slot_count = self.l0.slot_count;
561 let l1_slot_count = self.l1.slot_count;
562
563 let mut l0_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l0_slot_count];
564 let mut l1_tasks_by_slot: Vec<Vec<(TaskId, usize)>> = vec![Vec::new(); l1_slot_count];
565
566 // Collect information of tasks to be cancelled
567 // 收集要取消的任务信息
568 for &task_id in task_ids {
569 // Use DeferredMap's get with TaskId key
570 // 使用 DeferredMap 的 get,传入 TaskId key
571 if let Some(location) = self.task_index.get(task_id.key()) {
572 if location.level == 0 {
573 l0_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
574 } else {
575 l1_tasks_by_slot[location.slot_index].push((task_id, location.vec_index));
576 }
577 }
578 }
579
580 // Process L0 layer cancellation
581 // 处理 L0 层取消
582 for (slot_index, tasks) in l0_tasks_by_slot.iter_mut().enumerate() {
583 if tasks.is_empty() {
584 continue;
585 }
586
587 // Sort by vec_index in descending order, delete from back to front to avoid index invalidation
588 // 按 vec_index 降序排序,从后向前删除以避免索引失效
589 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
590
591 let slot = &mut self.l0.slots[slot_index];
592
593 for &(task_id, vec_index) in tasks.iter() {
594 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
595 let removed_task = slot.swap_remove(vec_index);
596 match removed_task.into_task_type() {
597 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
598 // Use Notify + AtomicU8 for zero-allocation notification
599 // 使用 Notify + AtomicU8 实现零分配通知
600 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
601 }
602 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
603 // Use flume for high-performance periodic notification
604 // 使用 flume 实现高性能周期通知
605 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
606 }
607 }
608
609
610 if vec_index < slot.len() {
611 let swapped_task_id = slot[vec_index].get_id();
612 // Use DeferredMap's get_mut with TaskId key
613 // 使用 DeferredMap 的 get_mut,传入 TaskId key
614 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
615 swapped_location.vec_index = vec_index;
616 }
617 }
618
619 // Use DeferredMap's remove with TaskId key
620 // 使用 DeferredMap 的 remove,传入 TaskId key
621 self.task_index.remove(task_id.key());
622 cancelled_count += 1;
623 }
624 }
625 }
626
627 // Process L1 layer cancellation
628 // 处理 L1 层取消
629 for (slot_index, tasks) in l1_tasks_by_slot.iter_mut().enumerate() {
630 if tasks.is_empty() {
631 continue;
632 }
633
634 tasks.sort_unstable_by(|a, b| b.1.cmp(&a.1));
635
636 let slot = &mut self.l1.slots[slot_index];
637
638 for &(task_id, vec_index) in tasks.iter() {
639 if vec_index < slot.len() && slot[vec_index].get_id() == task_id {
640
641 let removed_task = slot.swap_remove(vec_index);
642
643 match removed_task.into_task_type() {
644 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
645 // Use Notify + AtomicU8 for zero-allocation notification
646 // 使用 Notify + AtomicU8 实现零分配通知
647 completion_notifier.notify(crate::task::TaskCompletion::Cancelled);
648 }
649 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
650 // Use flume for high-performance periodic notification
651 // 使用 flume 实现高性能周期通知
652 let _ = completion_notifier.0.try_send(TaskCompletion::Cancelled);
653 }
654 }
655
656 if vec_index < slot.len() {
657 let swapped_task_id = slot[vec_index].get_id();
658 // Use DeferredMap's get_mut with TaskId key
659 // 使用 DeferredMap 的 get_mut,传入 TaskId key
660 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
661 swapped_location.vec_index = vec_index;
662 }
663 }
664
665 // Use DeferredMap's remove with TaskId key
666 // 使用 DeferredMap 的 remove,传入 TaskId key
667 self.task_index.remove(task_id.key());
668 cancelled_count += 1;
669 }
670 }
671 }
672
673 cancelled_count
674 }
675
676 /// Reinsert periodic task with the same TaskId
677 ///
678 /// Used to automatically reschedule periodic tasks after they expire
679 ///
680 /// TaskId remains unchanged, only location is updated in DeferredMap
681 ///
682 /// 重新插入周期性任务(保持相同的 TaskId)
683 ///
684 /// 用于在周期性任务过期后自动重新调度
685 ///
686 /// TaskId 保持不变,只更新 DeferredMap 中的位置
687 fn reinsert_periodic_task(
688 &mut self,
689 task_id: TaskId,
690 task: TimerTaskWithCompletionNotifier,
691 ) {
692 // Determine which layer the interval should be inserted into
693 // This method is only called by periodic tasks, so the interval is guaranteed to be Some.
694 // 确定间隔应该插入到哪一层
695 // 该方法只能由周期性任务调用,所以间隔是 guaranteed 应当保证为 Some.
696 let (level, ticks, rounds) = self.determine_layer(task.get_interval().unwrap());
697
698 // Use match to reduce branches, and use cached slot mask
699 // 使用 match 减少分支,并使用缓存的槽掩码
700 let (current_tick, slot_mask, slots) = match level {
701 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
702 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
703 };
704
705 let total_ticks = current_tick + ticks;
706 let slot_index = (total_ticks as usize) & slot_mask;
707
708 // Get the index position of the task in Vec
709 // 获取任务在 Vec 中的索引位置
710 let vec_index = slots[slot_index].len();
711 let new_location = TaskLocation::new(level, slot_index, vec_index);
712
713 // Insert task into slot
714 // 将任务插入槽中
715 slots[slot_index].push(TimerTaskForWheel::new_with_id(task_id, task, total_ticks, rounds));
716
717 // Update task location in DeferredMap (doesn't remove, just updates)
718 // 更新 DeferredMap 中的任务位置(不删除,仅更新)
719 if let Some(location) = self.task_index.get_mut(task_id.key()) {
720 *location = new_location;
721 }
722 }
723
724 /// Advance the timing wheel by one tick, return all expired tasks
725 ///
726 /// # Returns
727 /// List of expired tasks
728 ///
729 /// # Implementation Details
730 /// - L0 layer advances 1 tick each time (no rounds check)
731 /// - L1 layer advances once every (L1_tick / L0_tick) times
732 /// - L1 expired tasks are batch demoted to L0
733 ///
734 /// 推进时间轮一个 tick,返回所有过期的任务
735 ///
736 /// # 返回值
737 /// 过期任务列表
738 ///
739 /// # 实现细节
740 /// - L0 层每次推进 1 个 tick(无轮数检查)
741 /// - L1 层每 (L1_tick / L0_tick) 次推进一次
742 /// - L1 层过期任务批量降级到 L0
743 pub fn advance(&mut self) -> Vec<WheelAdvanceResult> {
744 // Advance L0 layer
745 // 推进 L0 层
746 self.l0.current_tick += 1;
747
748 let mut expired_tasks = Vec::new();
749
750 // Process L0 layer expired tasks
751 // Distinguish between one-shot and periodic tasks
752 // 处理 L0 层过期任务
753 // 区分一次性任务和周期性任务
754 let l0_slot_index = (self.l0.current_tick as usize) & self.l0.slot_mask;
755
756 // Collect periodic tasks to reinsert
757 // 收集需要重新插入的周期性任务
758 let mut periodic_tasks_to_reinsert = Vec::new();
759
760 {
761 let l0_slot = &mut self.l0.slots[l0_slot_index];
762
763 // Process all tasks in the current slot by always removing from index 0
764 // This avoids index tracking issues with swap_remove
765 // 通过始终从索引 0 移除来处理当前槽中的所有任务
766 // 这避免了 swap_remove 的索引跟踪问题
767 while !l0_slot.is_empty() {
768 // Always remove from index 0
769 // 始终从索引 0 移除
770 let task_with_notifier = l0_slot.swap_remove(0);
771 let task_id = task_with_notifier.get_id();
772
773 // Determine if this is a periodic task (check before consuming)
774 // 判断是否为周期性任务(在消耗前检查)
775 let is_periodic = matches!(task_with_notifier.task.task_type,
776 TaskTypeWithCompletionNotifier::Periodic { .. });
777
778 // For one-shot tasks, remove from DeferredMap index
779 // For periodic tasks, keep in index (will update location in reinsert)
780 // 对于一次性任务,从 DeferredMap 索引中移除
781 // 对于周期性任务,保留在索引中(重新插入时会更新位置)
782 if !is_periodic {
783 self.task_index.remove(task_id.key());
784 }
785
786 // Update swapped element's index (the element that was moved to index 0)
787 // 更新被交换元素的索引(被移动到索引 0 的元素)
788 if !l0_slot.is_empty() {
789 let swapped_task_id = l0_slot[0].get_id();
790 // Use DeferredMap's get_mut with TaskId key
791 // 使用 DeferredMap 的 get_mut,传入 TaskId key
792 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
793 swapped_location.vec_index = 0;
794 }
795 }
796
797 let TimerTaskForWheel { task_id, task, .. } = task_with_notifier;
798
799 match &task.task_type {
800 TaskTypeWithCompletionNotifier::Periodic { completion_notifier, .. } => {
801 let _ = completion_notifier.0.try_send(TaskCompletion::Called);
802
803 periodic_tasks_to_reinsert.push((task_id, TimerTaskWithCompletionNotifier {
804 task_type: task.task_type,
805 delay: task.delay,
806 callback: task.callback.clone(),
807 }));
808 }
809 TaskTypeWithCompletionNotifier::OneShot { completion_notifier } => {
810 completion_notifier.notify(crate::task::TaskCompletion::Called);
811 }
812 }
813
814 expired_tasks.push(WheelAdvanceResult {
815 id: task_id,
816 callback: task.callback,
817 });
818 }
819 }
820
821 // Reinsert periodic tasks for next interval
822 // 重新插入周期性任务到下一个周期
823 for (task_id, task) in periodic_tasks_to_reinsert {
824 self.reinsert_periodic_task(task_id, task);
825 }
826
827 // Process L1 layer
828 // Check if L1 layer needs to be advanced
829 // 处理 L1 层
830 // 检查是否需要推进 L1 层
831 if self.l0.current_tick.is_multiple_of(self.l1_tick_ratio) {
832 self.l1.current_tick += 1;
833 let l1_slot_index = (self.l1.current_tick as usize) & self.l1.slot_mask;
834 let l1_slot = &mut self.l1.slots[l1_slot_index];
835
836 // Collect L1 layer expired tasks
837 // 收集 L1 层过期任务
838 let mut tasks_to_demote = Vec::new();
839 let mut i = 0;
840 while i < l1_slot.len() {
841 let task = &mut l1_slot[i];
842
843 if task.rounds > 0 {
844 // Still has rounds, decrease rounds and keep
845 // 还有轮数,减少轮数并保留
846 task.rounds -= 1;
847 // Use DeferredMap's get_mut with TaskId key
848 // 使用 DeferredMap 的 get_mut,传入 TaskId key
849 if let Some(location) = self.task_index.get_mut(task.get_id().key()) {
850 location.vec_index = i;
851 }
852 i += 1;
853 } else {
854 // rounds = 0, need to demote to L0
855 // Don't remove from task_index - will update location in demote_tasks
856 // rounds = 0,需要降级到 L0
857 // 不从 task_index 中移除 - 在 demote_tasks 中会更新位置
858 let task_to_demote = l1_slot.swap_remove(i);
859
860 if i < l1_slot.len() {
861 let swapped_task_id = l1_slot[i].get_id();
862 // Use DeferredMap's get_mut with TaskId key
863 // 使用 DeferredMap 的 get_mut,传入 TaskId key
864 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
865 swapped_location.vec_index = i;
866 }
867 }
868
869 tasks_to_demote.push(task_to_demote);
870 }
871 }
872
873 // Demote tasks to L0
874 // 将任务降级到 L0
875 self.demote_tasks(tasks_to_demote);
876 }
877
878 expired_tasks
879 }
880
881 /// Demote tasks from L1 to L0
882 ///
883 /// Recalculate and insert L1 expired tasks into L0 layer
884 ///
885 /// Updates task location in DeferredMap without removing/reinserting
886 ///
887 /// 将任务从 L1 降级到 L0
888 ///
889 /// 重新计算并将 L1 过期任务插入到 L0 层
890 ///
891 /// 更新 DeferredMap 中的任务位置而不删除/重新插入
892 fn demote_tasks(&mut self, tasks: Vec<TimerTaskForWheel>) {
893 for task in tasks {
894 // Calculate the remaining delay of the task in L0 layer
895 // The task's deadline_tick is based on L1 tick, needs to be converted to L0 tick
896 // 计算任务在 L0 层的剩余延迟
897 // 任务的 deadline_tick 基于 L1 tick,需要转换为 L0 tick
898 let l1_tick_ratio = self.l1_tick_ratio;
899
900 // Calculate the original expiration time of the task (L1 tick)
901 // 计算任务的原始过期时间(L1 tick)
902 let l1_deadline = task.deadline_tick;
903
904 // Convert to L0 tick expiration time
905 // 转换为 L0 tick 过期时间
906 let l0_deadline_tick = l1_deadline * l1_tick_ratio;
907 let l0_current_tick = self.l0.current_tick;
908
909 // Calculate remaining L0 ticks
910 // 计算剩余 L0 ticks
911 let remaining_l0_ticks = if l0_deadline_tick > l0_current_tick {
912 l0_deadline_tick - l0_current_tick
913 } else {
914 1 // At least trigger in next tick (至少在下一个 tick 触发)
915 };
916
917 // Calculate L0 slot index
918 // 计算 L0 槽索引
919 let target_l0_tick = l0_current_tick + remaining_l0_ticks;
920 let l0_slot_index = (target_l0_tick as usize) & self.l0.slot_mask;
921
922 let task_id = task.get_id();
923 let vec_index = self.l0.slots[l0_slot_index].len();
924 let new_location = TaskLocation::new(0, l0_slot_index, vec_index);
925
926 // Insert into L0 layer
927 // 插入到 L0 层
928 self.l0.slots[l0_slot_index].push(task);
929
930 // Update task location in DeferredMap (task already exists in index)
931 // 更新 DeferredMap 中的任务位置(任务已在索引中)
932 if let Some(location) = self.task_index.get_mut(task_id.key()) {
933 *location = new_location;
934 }
935 }
936 }
937
938 /// Check if the timing wheel is empty
939 ///
940 /// 检查时间轮是否为空
941 #[allow(dead_code)]
942 pub fn is_empty(&self) -> bool {
943 self.task_index.is_empty()
944 }
945
946 /// Postpone timer task (keep original TaskId)
947 ///
948 /// # Parameters
949 /// - `task_id`: Task ID to postpone
950 /// - `new_delay`: New delay time (recalculated from current tick, not continuing from original delay time)
951 /// - `new_callback`: New callback function (if None, keep original callback)
952 ///
953 /// # Returns
954 /// Returns true if the task exists and is successfully postponed, otherwise returns false
955 ///
956 /// # Implementation Details
957 /// - Remove task from original layer/slot, keep its completion_notifier (will not trigger cancellation notification)
958 /// - Update delay time and callback function (if provided)
959 /// - Recalculate target layer, slot, and rounds based on new_delay
960 /// - Cross-layer migration may occur (L0 <-> L1)
961 /// - Re-insert to new position using original TaskId
962 /// - Keep consistent with external held TaskId reference
963 ///
964 /// 延期定时器任务(保留原始 TaskId)
965 ///
966 /// # 参数
967 /// - `task_id`: 要延期的任务 ID
968 /// - `new_delay`: 新延迟时间(从当前 tick 重新计算,而非从原延迟时间继续)
969 /// - `new_callback`: 新回调函数(如果为 None,则保留原回调)
970 ///
971 /// # 返回值
972 /// 如果任务存在且成功延期则返回 true,否则返回 false
973 ///
974 /// # 实现细节
975 /// - 从原层级/槽位移除任务,保留其 completion_notifier(不会触发取消通知)
976 /// - 更新延迟时间和回调函数(如果提供)
977 /// - 根据 new_delay 重新计算目标层级、槽位和轮数
978 /// - 可能发生跨层迁移(L0 <-> L1)
979 /// - 使用原始 TaskId 重新插入到新位置
980 /// - 与外部持有的 TaskId 引用保持一致
981 #[inline]
982 pub fn postpone(
983 &mut self,
984 task_id: TaskId,
985 new_delay: Duration,
986 new_callback: Option<crate::task::CallbackWrapper>,
987 ) -> bool {
988 // Step 1: Get task location from DeferredMap (don't remove)
989 // 步骤 1: 从 DeferredMap 获取任务位置(不删除)
990 let old_location = match self.task_index.get(task_id.key()) {
991 Some(loc) => *loc,
992 None => return false,
993 };
994
995 // Use match to get slot reference
996 // 使用 match 获取槽引用
997 let slot = match old_location.level {
998 0 => &mut self.l0.slots[old_location.slot_index],
999 _ => &mut self.l1.slots[old_location.slot_index],
1000 };
1001
1002 // Verify task is still at expected position
1003 // 验证任务仍在预期位置
1004 if old_location.vec_index >= slot.len() || slot[old_location.vec_index].get_id() != task_id {
1005 // Index inconsistent, return failure
1006 // 索引不一致,返回失败
1007 return false;
1008 }
1009
1010 // Use swap_remove to remove task from slot
1011 // 使用 swap_remove 从槽中移除任务
1012 let mut task = slot.swap_remove(old_location.vec_index);
1013
1014 // Update swapped element's index (if a swap occurred)
1015 // 更新被交换元素的索引(如果发生了交换)
1016 if old_location.vec_index < slot.len() {
1017 let swapped_task_id = slot[old_location.vec_index].get_id();
1018 // Use DeferredMap's get_mut with TaskId key
1019 // 使用 DeferredMap 的 get_mut,传入 TaskId key
1020 if let Some(swapped_location) = self.task_index.get_mut(swapped_task_id.key()) {
1021 swapped_location.vec_index = old_location.vec_index;
1022 }
1023 }
1024
1025 // Step 2: Update task's delay and callback
1026 // 步骤 2: 更新任务的延迟和回调
1027 task.update_delay(new_delay);
1028 if let Some(callback) = new_callback {
1029 task.update_callback(callback);
1030 }
1031
1032 // Step 3: Recalculate layer, slot, and rounds based on new delay
1033 // 步骤 3: 根据新延迟重新计算层级、槽位和轮数
1034 let (new_level, ticks, new_rounds) = self.determine_layer(new_delay);
1035
1036 // Use match to reduce branches, and use cached slot mask
1037 // 使用 match 减少分支,并使用缓存的槽掩码
1038 let (current_tick, slot_mask, slots) = match new_level {
1039 0 => (self.l0.current_tick, self.l0.slot_mask, &mut self.l0.slots),
1040 _ => (self.l1.current_tick, self.l1.slot_mask, &mut self.l1.slots),
1041 };
1042
1043 let total_ticks = current_tick + ticks;
1044 let new_slot_index = (total_ticks as usize) & slot_mask;
1045
1046 // Update task's timing wheel parameters
1047 // 更新任务的时间轮参数
1048 task.deadline_tick = total_ticks;
1049 task.rounds = new_rounds;
1050
1051 // Step 4: Re-insert task to new layer/slot
1052 // 步骤 4: 将任务重新插入到新层级/槽位
1053 let new_vec_index = slots[new_slot_index].len();
1054 let new_location = TaskLocation::new(new_level, new_slot_index, new_vec_index);
1055
1056 slots[new_slot_index].push(task);
1057
1058 // Update task location in DeferredMap (task already exists in index)
1059 // 更新 DeferredMap 中的任务位置(任务已在索引中)
1060 if let Some(location) = self.task_index.get_mut(task_id.key()) {
1061 *location = new_location;
1062 }
1063
1064 true
1065 }
1066
1067 /// Batch postpone timer tasks
1068 ///
1069 /// # Parameters
1070 /// - `updates`: List of tuples of (task ID, new delay)
1071 ///
1072 /// # Returns
1073 /// Number of successfully postponed tasks
1074 ///
1075 /// # Performance Advantages
1076 /// - Batch processing reduces function call overhead
1077 /// - All delays are recalculated from current_tick at call time
1078 ///
1079 /// # Notes
1080 /// - If a task ID does not exist, that task will be skipped without affecting other tasks' postponement
1081 ///
1082 /// 批量延期定时器任务
1083 ///
1084 /// # 参数
1085 /// - `updates`: (任务 ID, 新延迟) 元组列表
1086 ///
1087 /// # 返回值
1088 /// 成功延期的任务数量
1089 ///
1090 /// # 性能优势
1091 /// - 批处理减少函数调用开销
1092 /// - 所有延迟在调用时从 current_tick 重新计算
1093 ///
1094 /// # 注意
1095 /// - 如果任务 ID 不存在,该任务将被跳过,不影响其他任务的延期
1096 #[inline]
1097 pub fn postpone_batch(
1098 &mut self,
1099 updates: Vec<(TaskId, Duration)>,
1100 ) -> usize {
1101 let mut postponed_count = 0;
1102
1103 for (task_id, new_delay) in updates {
1104 if self.postpone(task_id, new_delay, None) {
1105 postponed_count += 1;
1106 }
1107 }
1108
1109 postponed_count
1110 }
1111
1112 /// Batch postpone timer tasks (replace callbacks)
1113 ///
1114 /// # Parameters
1115 /// - `updates`: List of tuples of (task ID, new delay, new callback)
1116 ///
1117 /// # Returns
1118 /// Number of successfully postponed tasks
1119 ///
1120 /// 批量延期定时器任务(替换回调)
1121 ///
1122 /// # 参数
1123 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1124 ///
1125 /// # 返回值
1126 /// 成功延期的任务数量
1127 pub fn postpone_batch_with_callbacks(
1128 &mut self,
1129 updates: Vec<(TaskId, Duration, Option<crate::task::CallbackWrapper>)>,
1130 ) -> usize {
1131 let mut postponed_count = 0;
1132
1133 for (task_id, new_delay, new_callback) in updates {
1134 if self.postpone(task_id, new_delay, new_callback) {
1135 postponed_count += 1;
1136 }
1137 }
1138
1139 postponed_count
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use super::*;
1146 use crate::task::{CallbackWrapper, TimerTask, TimerTaskWithCompletionNotifier};
1147
1148 #[test]
1149 fn test_wheel_creation() {
1150 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1151 assert_eq!(wheel.slot_count(), 512);
1152 assert_eq!(wheel.current_tick(), 0);
1153 assert!(wheel.is_empty());
1154 }
1155
1156 #[test]
1157 fn test_hierarchical_wheel_creation() {
1158 let config = WheelConfig::default();
1159
1160 let wheel = Wheel::new(config, BatchConfig::default());
1161 assert_eq!(wheel.slot_count(), 512); // L0 slot count (L0 层槽数量)
1162 assert_eq!(wheel.current_tick(), 0);
1163 assert!(wheel.is_empty());
1164 // L1 layer always exists in hierarchical mode (L1 层在分层模式下始终存在)
1165 assert_eq!(wheel.l1.slot_count, 64);
1166 assert_eq!(wheel.l1_tick_ratio, 100); // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1167 }
1168
1169 #[test]
1170 fn test_hierarchical_config_validation() {
1171 // L1 tick must be an integer multiple of L0 tick (L1 tick 必须是 L0 tick 的整数倍)
1172 let result = WheelConfig::builder()
1173 .l0_tick_duration(Duration::from_millis(10))
1174 .l0_slot_count(512)
1175 .l1_tick_duration(Duration::from_millis(15)) // Not an integer multiple (不是整数倍)
1176 .l1_slot_count(64)
1177 .build();
1178
1179 assert!(result.is_err());
1180
1181 // Correct configuration (正确的配置)
1182 let result = WheelConfig::builder()
1183 .l0_tick_duration(Duration::from_millis(10))
1184 .l0_slot_count(512)
1185 .l1_tick_duration(Duration::from_secs(1)) // 1000ms / 10ms = 100 ticks (1000毫秒 / 10毫秒 = 100个tick)
1186 .l1_slot_count(64)
1187 .build();
1188
1189 assert!(result.is_ok());
1190 }
1191
1192 #[test]
1193 fn test_layer_determination() {
1194 let config = WheelConfig::default();
1195
1196 let wheel = Wheel::new(config, BatchConfig::default());
1197
1198 // Short delay should enter L0 layer (短延迟应该进入 L0 层)
1199 // L0: 512 slots * 10ms = 5120ms
1200 let (level, _, rounds) = wheel.determine_layer(Duration::from_millis(100));
1201 assert_eq!(level, 0);
1202 assert_eq!(rounds, 0);
1203
1204 // Long delay should enter L1 layer
1205 // 超过 L0 范围(>5120ms) (超过 L0 范围 (>5120毫秒))
1206 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(10));
1207 assert_eq!(level, 1);
1208 assert_eq!(rounds, 0);
1209
1210 // Long delay should enter L1 layer and have rounds
1211 // L1: 64 slots * 1000ms = 64000ms (L1: 64个槽 * 1000毫秒 = 64000毫秒)
1212 let (level, _, rounds) = wheel.determine_layer(Duration::from_secs(120));
1213 assert_eq!(level, 1);
1214 assert!(rounds > 0);
1215 }
1216
1217 #[test]
1218 fn test_hierarchical_insert_and_advance() {
1219 let config = WheelConfig::default();
1220
1221 let mut wheel = Wheel::new(config, BatchConfig::default());
1222
1223 // Insert short delay task into L0 (插入短延迟任务到 L0)
1224 let callback = CallbackWrapper::new(|| async {});
1225 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1226 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1227 let handle = wheel.allocate_handle();
1228 let task_id = handle.task_id();
1229 wheel.insert(handle, task_with_notifier);
1230
1231 // Verify task is in L0 layer (验证任务在 L0 层)
1232 let location = wheel.task_index.get(task_id.key()).unwrap();
1233 assert_eq!(location.level, 0);
1234
1235 // Advance 10 ticks (100ms) (前进 10 个 tick (100毫秒))
1236 for _ in 0..10 {
1237 let expired = wheel.advance();
1238 if !expired.is_empty() {
1239 assert_eq!(expired.len(), 1);
1240 assert_eq!(expired[0].id, task_id);
1241 return;
1242 }
1243 }
1244 panic!("Task should have expired");
1245 }
1246
1247 #[test]
1248 fn test_cross_layer_cancel() {
1249 let config = WheelConfig::default();
1250
1251 let mut wheel = Wheel::new(config, BatchConfig::default());
1252
1253 // Insert L0 task (插入 L0 任务)
1254 let callback1 = CallbackWrapper::new(|| async {});
1255 let task1 = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback1));
1256 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1257 let handle1 = wheel.allocate_handle();
1258 let task_id1 = handle1.task_id();
1259 wheel.insert(handle1, task_with_notifier1);
1260
1261 // Insert L1 task (插入 L1 任务)
1262 let callback2 = CallbackWrapper::new(|| async {});
1263 let task2 = TimerTask::new_oneshot(Duration::from_secs(10), Some(callback2));
1264 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1265 let handle2 = wheel.allocate_handle();
1266 let task_id2 = handle2.task_id();
1267 wheel.insert(handle2, task_with_notifier2);
1268
1269 // Verify levels (验证层级)
1270 assert_eq!(wheel.task_index.get(task_id1.key()).unwrap().level, 0);
1271 assert_eq!(wheel.task_index.get(task_id2.key()).unwrap().level, 1);
1272
1273 // Cancel L0 task (取消 L0 任务)
1274 assert!(wheel.cancel(task_id1));
1275 assert!(wheel.task_index.get(task_id1.key()).is_none());
1276
1277 // Cancel L1 task (取消 L1 任务)
1278 assert!(wheel.cancel(task_id2));
1279 assert!(wheel.task_index.get(task_id2.key()).is_none());
1280
1281 assert!(wheel.is_empty()); // 时间轮应该为空
1282 }
1283
1284 #[test]
1285 fn test_delay_to_ticks() {
1286 let wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1287 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(100)), 10);
1288 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(50)), 5);
1289 assert_eq!(wheel.delay_to_ticks(Duration::from_millis(1)), 1); // Minimum 1 tick (最小 1 个 tick)
1290 }
1291
1292 #[test]
1293 fn test_wheel_invalid_slot_count() {
1294 let result = WheelConfig::builder()
1295 .l0_slot_count(100)
1296 .build();
1297 assert!(result.is_err());
1298 if let Err(crate::error::TimerError::InvalidSlotCount { slot_count, reason }) = result {
1299 assert_eq!(slot_count, 100);
1300 assert_eq!(reason, "L0 layer slot count must be power of 2"); // L0 层槽数量必须是 2 的幂
1301 } else {
1302 panic!("Expected InvalidSlotCount error"); // 期望 InvalidSlotCount 错误
1303 }
1304 }
1305
1306
1307 #[test]
1308 fn test_minimum_delay() {
1309 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1310
1311 // Test minimum delay (delays less than 1 tick should be rounded up to 1 tick) (测试最小延迟 (延迟小于 1 个 tick 应该向上舍入到 1 个 tick))
1312 let callback = CallbackWrapper::new(|| async {});
1313 let task = TimerTask::new_oneshot(Duration::from_millis(1), Some(callback));
1314 let (task_with_notifier, _completion_rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1315 let handle = wheel.allocate_handle();
1316 let task_id: TaskId = handle.task_id();
1317 wheel.insert(handle, task_with_notifier);
1318
1319 // Advance 1 tick, task should trigger (前进 1 个 tick,任务应该触发)
1320 let expired = wheel.advance();
1321 assert_eq!(expired.len(), 1, "Minimum delay task should be triggered after 1 tick"); // 最小延迟任务应该在 1 个 tick 后触发
1322 assert_eq!(expired[0].id, task_id);
1323 }
1324
1325
1326 #[test]
1327 fn test_advance_empty_slots() {
1328 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1329
1330 // Do not insert any tasks, advance multiple ticks (不插入任何任务,前进多个tick)
1331 for _ in 0..100 {
1332 let expired = wheel.advance();
1333 assert!(expired.is_empty(), "Empty slots should not return any tasks");
1334 }
1335
1336 assert_eq!(wheel.current_tick(), 100, "current_tick should correctly increment"); // current_tick 应该正确递增
1337 }
1338
1339
1340 #[test]
1341 fn test_slot_boundary() {
1342 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1343
1344 // Test slot boundary and wraparound (测试槽边界和环绕)
1345 // 第一个任务:延迟 10ms(1 tick),应该在 slot 1 触发
1346 let callback1 = CallbackWrapper::new(|| async {});
1347 let task1 = TimerTask::new_oneshot(Duration::from_millis(10), Some(callback1));
1348 let (task_with_notifier1, _rx1) = TimerTaskWithCompletionNotifier::from_timer_task(task1);
1349 let handle1 = wheel.allocate_handle();
1350 let task_id_1 = handle1.task_id();
1351 wheel.insert(handle1, task_with_notifier1);
1352
1353 // Second task: delay 5110ms (511 ticks), should trigger on slot 511 (第二个任务:延迟 5110毫秒 (511个tick),应该在槽 511 触发)
1354 let callback2 = CallbackWrapper::new(|| async {});
1355 let task2 = TimerTask::new_oneshot(Duration::from_millis(5110), Some(callback2));
1356 let (task_with_notifier2, _rx2) = TimerTaskWithCompletionNotifier::from_timer_task(task2);
1357 let handle2 = wheel.allocate_handle();
1358 let task_id_2 = handle2.task_id();
1359 wheel.insert(handle2, task_with_notifier2);
1360
1361 // Advance 1 tick, first task should trigger (前进 1 个 tick,第一个任务应该触发)
1362 let expired = wheel.advance();
1363 assert_eq!(expired.len(), 1, "First task should trigger on tick 1"); // 第一个任务应该在第 1 个 tick 触发
1364 assert_eq!(expired[0].id, task_id_1);
1365
1366 // Continue advancing to 511 ticks (from tick 1 to tick 511), second task should trigger (继续前进 511个tick (从tick 1到tick 511),第二个任务应该触发)
1367 let mut triggered = false;
1368 for i in 0..510 {
1369 let expired = wheel.advance();
1370 if !expired.is_empty() {
1371 assert_eq!(expired.len(), 1, "The {}th advance should trigger the second task", i + 2); // 第 {i + 2} 次前进应该触发第二个任务
1372 assert_eq!(expired[0].id, task_id_2);
1373 triggered = true;
1374 break;
1375 }
1376 }
1377 assert!(triggered, "Second task should trigger on tick 511"); // 第二个任务应该在第 511 个 tick 触发
1378
1379 assert!(wheel.is_empty(), "All tasks should have been triggered"); // 所有任务都应该被触发
1380 }
1381
1382
1383 #[test]
1384 fn test_task_id_uniqueness() {
1385 let mut wheel = Wheel::new(WheelConfig::default(), BatchConfig::default());
1386
1387 // Insert multiple tasks, verify TaskId uniqueness (插入多个任务,验证 TaskId 唯一性)
1388 let mut task_ids = std::collections::HashSet::new();
1389 for _ in 0..100 {
1390 let callback = CallbackWrapper::new(|| async {});
1391 let task = TimerTask::new_oneshot(Duration::from_millis(100), Some(callback));
1392 let (task_with_notifier, _rx) = TimerTaskWithCompletionNotifier::from_timer_task(task);
1393 let handle = wheel.allocate_handle();
1394 let task_id = handle.task_id();
1395 wheel.insert(handle, task_with_notifier);
1396
1397 assert!(task_ids.insert(task_id), "TaskId should be unique"); // TaskId 应该唯一
1398 }
1399
1400 assert_eq!(task_ids.len(), 100);
1401 }
1402
1403}
1404