kestrel_timer/
timer.rs

1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskId};
5use crate::wheel::Wheel;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::task::JoinHandle;
10use handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
11
12/// Timing Wheel Timer Manager
13/// 
14/// 时间轮定时器管理器
15pub struct TimerWheel {
16    /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
17    /// 
18    /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
19    wheel: Arc<Mutex<Wheel>>,
20    
21    /// Background tick loop task handle
22    /// 
23    /// 后台 tick 循环任务句柄
24    tick_handle: Option<JoinHandle<()>>,
25}
26
27impl TimerWheel {
28    /// Create a new timer manager
29    ///
30    /// # Parameters
31    /// - `config`: Timing wheel configuration, already validated
32    /// 
33    /// 创建新的定时器管理器
34    ///
35    /// # 参数
36    /// - `config`: 时间轮配置,已验证
37    ///
38    /// # Examples (示例)
39    /// ```no_run
40    /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
41    /// use std::time::Duration;
42    ///
43    /// #[tokio::main]
44    /// async fn main() {
45    ///     let config = WheelConfig::builder()
46    ///         .l0_tick_duration(Duration::from_millis(10))
47    ///         .l0_slot_count(512)
48    ///         .l1_tick_duration(Duration::from_secs(1))
49    ///         .l1_slot_count(64)
50    ///         .build()
51    ///         .unwrap();
52    ///     let timer = TimerWheel::new(config, BatchConfig::default());
53    ///     
54    ///     // Use two-step API
55    ///     // (Use two-step API)
56    ///     let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
57    ///     let handle = timer.register(task);
58    /// }
59    /// ```
60    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
61        let tick_duration = config.l0_tick_duration;
62        let wheel = Wheel::new(config, batch_config);
63        let wheel = Arc::new(Mutex::new(wheel));
64        let wheel_clone = wheel.clone();
65
66        // Start background tick loop
67        // 启动后台 tick 循环
68        let tick_handle = tokio::spawn(async move {
69            Self::tick_loop(wheel_clone, tick_duration).await;
70        });
71
72        Self {
73            wheel,
74            tick_handle: Some(tick_handle),
75        }
76    }
77
78    /// Create timer manager with default configuration, hierarchical mode
79    /// - L0 layer tick duration: 10ms, slot count: 512
80    /// - L1 layer tick duration: 1s, slot count: 64
81    ///
82    /// # Parameters
83    /// - `config`: Timing wheel configuration, already validated
84    ///
85    /// # Returns
86    /// Timer manager instance
87    /// 
88    /// 使用默认配置创建定时器管理器,分层模式
89    /// - L0 层 tick 持续时间:10ms,槽数量:512
90    /// - L1 层 tick 持续时间:1s,槽数量:64
91    ///
92    /// # 参数
93    /// - `config`: 时间轮配置,已验证
94    ///
95    /// # 返回值
96    /// 定时器管理器实例
97    ///
98    /// # Examples (示例)
99    /// ```no_run
100    /// use kestrel_timer::TimerWheel;
101    ///
102    /// #[tokio::main]
103    /// async fn main() {
104    ///     let timer = TimerWheel::with_defaults();
105    /// }
106    /// ```
107    pub fn with_defaults() -> Self {
108        Self::new(WheelConfig::default(), BatchConfig::default())
109    }
110
111    /// Create TimerService bound to this timing wheel with default configuration
112    ///
113    /// # Parameters
114    /// - `service_config`: Service configuration
115    ///
116    /// # Returns
117    /// TimerService instance bound to this timing wheel
118    ///
119    /// 创建绑定到此时间轮的 TimerService,使用默认配置
120    ///
121    /// # 参数
122    /// - `service_config`: 服务配置
123    ///
124    /// # 返回值
125    /// 绑定到此时间轮的 TimerService 实例
126    ///
127    /// # Examples (示例)
128    /// ```no_run
129    /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
130    /// use std::time::Duration;
131    /// 
132    ///
133    /// #[tokio::main]
134    /// async fn main() {
135    ///     let timer = TimerWheel::with_defaults();
136    ///     let mut service = timer.create_service(ServiceConfig::default());
137    ///     
138    ///     // Use two-step API to batch schedule timers through service
139    ///     // 使用两步 API 通过服务批量调度定时器
140    ///     let tasks: Vec<_> = (0..5)
141    ///         .map(|_| {
142    ///             use kestrel_timer::TimerTask;
143    ///             TimerTask::new_oneshot(Duration::from_millis(100), Some(CallbackWrapper::new(|| async {})))
144    ///         })
145    ///         .collect();
146    ///     service.register_batch(tasks).unwrap();
147    ///     
148    ///     // Receive timeout notifications
149    ///     // 接收超时通知
150    ///     let mut rx = service.take_receiver().unwrap();
151    ///     while let Some(task_id) = rx.recv().await {
152    ///         println!("Task {:?} completed", task_id);
153    ///     }
154    /// }
155    /// ```
156    pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
157        crate::service::TimerService::new(self.wheel.clone(), service_config)
158    }
159    
160    /// Create TimerService bound to this timing wheel with custom configuration
161    ///
162    /// # Parameters
163    /// - `config`: Service configuration
164    ///
165    /// # Returns
166    /// TimerService instance bound to this timing wheel
167    ///
168    /// 创建绑定到此时间轮的 TimerService,使用自定义配置
169    ///
170    /// # 参数
171    /// - `config`: 服务配置
172    ///
173    /// # 返回值
174    /// 绑定到此时间轮的 TimerService 实例
175    ///
176    /// # Examples (示例)
177    /// ```no_run
178    /// use kestrel_timer::{TimerWheel, config::ServiceConfig, TimerTask};
179    ///
180    /// #[tokio::main]
181    /// async fn main() {
182    ///     let timer = TimerWheel::with_defaults();
183    ///     let config = ServiceConfig::builder()
184    ///         .command_channel_capacity(1024)
185    ///         .timeout_channel_capacity(2000)
186    ///         .build()
187    ///         .unwrap();
188    ///     let service = timer.create_service_with_config(config);
189    /// }
190    /// ```
191    pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
192        crate::service::TimerService::new(self.wheel.clone(), config)
193    }
194    
195    /// Register timer task to timing wheel (registration phase)
196    /// 
197    /// # Parameters
198    /// - `task`: Task created via `create_task()`
199    /// 
200    /// # Returns
201    /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
202    /// 
203    /// 注册定时器任务到时间轮 (注册阶段)
204    /// 
205    /// # 参数
206    /// - `task`: 通过 `create_task()` 创建的任务
207    /// 
208    /// # 返回值
209    /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
210    /// 
211    /// # Examples (示例)
212    /// ```no_run
213    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
214    /// 
215    /// use std::time::Duration;
216    /// 
217    /// #[tokio::main]
218    /// async fn main() {
219    ///     let timer = TimerWheel::with_defaults();
220    ///     
221    ///     let task = TimerTask::new_oneshot(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
222    ///         println!("Timer fired!");
223    ///     })));
224    ///     let task_id = task.get_id();
225    ///     
226    ///     // Register task
227    ///     // 注册任务
228    ///     let handle = timer.register(task);
229    ///     
230    ///     // Wait for timer completion
231    ///     // 等待定时器完成
232    ///     use kestrel_timer::CompletionReceiver;
233    ///     let (rx, _handle) = handle.into_parts();
234    ///     match rx {
235    ///         CompletionReceiver::OneShot(receiver) => {
236    ///             receiver.wait().await;
237    ///         },
238    ///         _ => {}
239    ///     }
240    /// }
241    /// ```
242    #[inline]
243    pub fn register(&self, task: crate::task::TimerTask) -> TimerHandleWithCompletion {
244        let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
245        
246        let task_id = task.id;
247        
248        // 单次加锁完成所有操作
249        {
250            let mut wheel_guard = self.wheel.lock();
251            wheel_guard.insert(task);
252        }
253        
254        TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
255    }
256    
257    /// Batch register timer tasks to timing wheel (registration phase)
258    /// 
259    /// # Parameters
260    /// - `tasks`: List of tasks created via `create_batch()`
261    /// 
262    /// # Returns
263    /// Return batch timer handle with completion receivers
264    /// 
265    /// 批量注册定时器任务到时间轮 (注册阶段)
266    /// 
267    /// # 参数
268    /// - `tasks`: 通过 `create_batch()` 创建的任务列表
269    /// 
270    /// # 返回值
271    /// 返回包含完成通知接收器的批量定时器句柄
272    /// 
273    /// # Examples (示例)
274    /// ```no_run
275    /// use kestrel_timer::{TimerWheel, TimerTask};
276    /// use std::time::Duration;
277    /// 
278    /// #[tokio::main]
279    /// async fn main() {
280    ///     let timer = TimerWheel::with_defaults();
281    ///     
282    ///     let tasks: Vec<_> = (0..3)
283    ///         .map(|_| TimerTask::new_oneshot(Duration::from_secs(1), None))
284    ///         .collect();
285    ///     
286    ///     let batch = timer.register_batch(tasks);
287    ///     println!("Registered {} timers", batch.len());
288    /// }
289    /// ```
290    #[inline]
291    pub fn register_batch(&self, tasks: Vec<crate::task::TimerTask>) -> BatchHandleWithCompletion {
292        let task_count = tasks.len();
293        let mut completion_rxs = Vec::with_capacity(task_count);
294        let mut task_ids = Vec::with_capacity(task_count);
295        let mut prepared_tasks = Vec::with_capacity(task_count);
296        
297        // Step 1: Prepare all channels and notifiers
298        for task in tasks {
299            let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
300            task_ids.push(task.id);
301            completion_rxs.push(completion_rx);
302            prepared_tasks.push(task);
303        }
304        
305        // Step 2: Single lock, batch insert
306        {
307            let mut wheel_guard = self.wheel.lock();
308            wheel_guard.insert_batch(prepared_tasks);
309        }
310        
311        BatchHandleWithCompletion::new(BatchHandle::new(task_ids, self.wheel.clone()), completion_rxs)
312    }
313
314    /// Cancel timer
315    ///
316    /// # Parameters
317    /// - `task_id`: Task ID
318    ///
319    /// # Returns
320    /// Returns true if task exists and is successfully cancelled, otherwise false
321    /// 
322    /// 取消定时器
323    ///
324    /// # 参数
325    /// - `task_id`: 任务 ID
326    ///
327    /// # 返回值
328    /// 如果任务存在且成功取消则返回 true,否则返回 false
329    /// 
330    /// # Examples (示例)
331    /// ```no_run
332    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
333    /// 
334    /// use std::time::Duration;
335    ///
336    /// #[tokio::main]
337    /// async fn main() {
338    ///     let timer = TimerWheel::with_defaults();
339    ///     
340    ///     let task = TimerTask::new_oneshot(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
341    ///         println!("Timer fired!");
342    ///     })));
343    ///     let task_id = task.get_id();
344    ///     let _handle = timer.register(task);
345    ///     
346    ///     // Cancel task using task ID
347    ///     // 使用任务 ID 取消任务
348    ///     let cancelled = timer.cancel(task_id);
349    ///     println!("Canceled successfully: {}", cancelled);
350    /// }
351    /// ```
352    #[inline]
353    pub fn cancel(&self, task_id: TaskId) -> bool {
354        let mut wheel = self.wheel.lock();
355        wheel.cancel(task_id)
356    }
357
358    /// Batch cancel timers
359    ///
360    /// # Parameters
361    /// - `task_ids`: List of task IDs to cancel
362    ///
363    /// # Returns
364    /// Number of successfully cancelled tasks
365    ///
366    /// 批量取消定时器
367    ///
368    /// # 参数
369    /// - `task_ids`: 要取消的任务 ID 列表
370    ///
371    /// # 返回值
372    /// 成功取消的任务数量
373    ///
374    /// # Performance Advantages
375    /// - Batch processing reduces lock contention
376    /// - Internally optimized batch cancellation operation
377    ///
378    /// # Examples (示例)
379    /// ```no_run
380    /// use kestrel_timer::{TimerWheel, TimerTask};
381    /// use std::time::Duration;
382    ///
383    /// #[tokio::main]
384    /// async fn main() {
385    ///     let timer = TimerWheel::with_defaults();
386    ///     
387    ///     // Create multiple timers
388    ///     // 创建多个定时器
389    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
390    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
391    ///     let task3 = TimerTask::new_oneshot(Duration::from_secs(10), None);
392    ///     
393    ///     let task_ids = vec![task1.get_id(), task2.get_id(), task3.get_id()];
394    ///     
395    ///     let _h1 = timer.register(task1);
396    ///     let _h2 = timer.register(task2);
397    ///     let _h3 = timer.register(task3);
398    ///     
399    ///     // Batch cancel
400    ///     // 批量取消
401    ///     let cancelled = timer.cancel_batch(&task_ids);
402    ///     println!("Canceled {} timers", cancelled);
403    /// }
404    /// ```
405    #[inline]
406    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
407        let mut wheel = self.wheel.lock();
408        wheel.cancel_batch(task_ids)
409    }
410
411    /// Postpone timer
412    ///
413    /// # Parameters
414    /// - `task_id`: Task ID to postpone
415    /// - `new_delay`: New delay duration, recalculated from current time
416    /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
417    ///
418    /// # Returns
419    /// Returns true if task exists and is successfully postponed, otherwise false
420    ///
421    /// 推迟定时器
422    ///
423    /// # 参数
424    /// - `task_id`: 要推迟的任务 ID
425    /// - `new_delay`: 新的延迟时间,从当前时间重新计算
426    /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
427    ///
428    /// # 返回值
429    /// 如果任务存在且成功推迟则返回 true,否则返回 false
430    ///
431    /// # Note
432    /// - Task ID remains unchanged after postponement
433    /// - Original completion_receiver remains valid
434    ///
435    /// # 注意
436    /// - 任务 ID 在推迟后保持不变
437    /// - 原始 completion_receiver 保持有效
438    ///
439    /// # Examples (示例)
440    ///
441    /// ## Keep original callback (保持原始回调)
442    /// ```no_run
443    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
444    /// use std::time::Duration;
445    /// 
446    ///
447    /// #[tokio::main]
448    /// async fn main() {
449    ///     let timer = TimerWheel::with_defaults();
450    ///     
451    ///     let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
452    ///         println!("Timer fired!");
453    ///     })));
454    ///     let task_id = task.get_id();
455    ///     let _handle = timer.register(task);
456    ///     
457    ///     // Postpone to 10 seconds after triggering, and keep original callback
458    ///     // 推迟到 10 秒后触发,并保持原始回调
459    ///     let success = timer.postpone(task_id, Duration::from_secs(10), None);
460    ///     println!("Postponed successfully: {}", success);
461    /// }
462    /// ```
463    ///
464    /// ## Replace with new callback (替换为新的回调)
465    /// ```no_run
466    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
467    /// use std::time::Duration;
468    ///
469    /// #[tokio::main]
470    /// async fn main() {
471    ///     let timer = TimerWheel::with_defaults();
472    ///     
473    ///     let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
474    ///         println!("Original callback!");
475    ///     })));
476    ///     let task_id = task.get_id();
477    ///     let _handle = timer.register(task);
478    ///     
479    ///     // Postpone to 10 seconds after triggering, and replace with new callback
480    ///     // 推迟到 10 秒后触发,并替换为新的回调
481    ///     let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
482    ///         println!("New callback!");
483    ///     })));
484    ///     println!("Postponed successfully: {}", success);
485    /// }
486    /// ```
487    #[inline]
488    pub fn postpone(
489        &self,
490        task_id: TaskId,
491        new_delay: Duration,
492        callback: Option<CallbackWrapper>,
493    ) -> bool {
494        let mut wheel = self.wheel.lock();
495        wheel.postpone(task_id, new_delay, callback)
496    }
497
498    /// Batch postpone timers (keep original callbacks)
499    ///
500    /// # Parameters
501    /// - `updates`: List of tuples of (task ID, new delay)
502    ///
503    /// # Returns
504    /// Number of successfully postponed tasks
505    ///
506    /// 批量推迟定时器 (保持原始回调)
507    ///
508    /// # 参数
509    /// - `updates`: (任务 ID, 新延迟) 元组列表
510    ///
511    /// # 返回值
512    /// 成功推迟的任务数量
513    ///
514    /// # Note
515    /// - This method keeps all tasks' original callbacks unchanged
516    /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
517    ///
518    /// # 注意
519    /// - 此方法保持所有任务的原始回调不变
520    /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
521    ///
522    /// # Performance Advantages
523    /// - Batch processing reduces lock contention
524    /// - Internally optimized batch postponement operation
525    ///
526    /// # Examples (示例)
527    /// ```no_run
528    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
529    /// use std::time::Duration;
530    ///
531    /// #[tokio::main]
532    /// async fn main() {
533    ///     let timer = TimerWheel::with_defaults();
534    ///     
535    ///     // Create multiple tasks with callbacks
536    ///     // 创建多个带有回调的任务
537    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
538    ///         println!("Task 1 fired!");
539    ///     })));
540    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
541    ///         println!("Task 2 fired!");
542    ///     })));
543    ///     let task3 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
544    ///         println!("Task 3 fired!");
545    ///     })));
546    ///     
547    ///     let task_ids = vec![
548    ///         (task1.get_id(), Duration::from_secs(10)),
549    ///         (task2.get_id(), Duration::from_secs(15)),
550    ///         (task3.get_id(), Duration::from_secs(20)),
551    ///     ];
552    ///     
553    ///     timer.register(task1);
554    ///     timer.register(task2);
555    ///     timer.register(task3);
556    ///     
557    ///     // Batch postpone (keep original callbacks)
558    ///     // 批量推迟 (保持原始回调)
559    ///     let postponed = timer.postpone_batch(task_ids);
560    ///     println!("Postponed {} timers", postponed);
561    /// }
562    /// ```
563    #[inline]
564    pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
565        let mut wheel = self.wheel.lock();
566        wheel.postpone_batch(updates)
567    }
568
569    /// Batch postpone timers (replace callbacks)
570    ///
571    /// # Parameters
572    /// - `updates`: List of tuples of (task ID, new delay, new callback)
573    ///
574    /// # Returns
575    /// Number of successfully postponed tasks
576    ///
577    /// 批量推迟定时器 (替换回调)
578    ///
579    /// # 参数
580    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
581    ///
582    /// # 返回值
583    /// 成功推迟的任务数量
584    ///
585    /// # Performance Advantages
586    /// - Batch processing reduces lock contention
587    /// - Internally optimized batch postponement operation
588    ///
589    /// # Examples (示例)
590    /// ```no_run
591    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
592    /// use std::time::Duration;
593    /// use std::sync::Arc;
594    /// use std::sync::atomic::{AtomicU32, Ordering};
595    ///
596    /// #[tokio::main]
597    /// async fn main() {
598    ///     let timer = TimerWheel::with_defaults();
599    ///     let counter = Arc::new(AtomicU32::new(0));
600    ///     
601    ///     // Create multiple timers
602    ///     // 创建多个定时器
603    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(5), None);
604    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(5), None);
605    ///     
606    ///     let id1 = task1.get_id();
607    ///     let id2 = task2.get_id();
608    ///     
609    ///     timer.register(task1);
610    ///     timer.register(task2);
611    ///     
612    ///     // Batch postpone and replace callbacks
613    ///     // 批量推迟并替换回调
614    ///     let updates: Vec<_> = vec![id1, id2]
615    ///         .into_iter()
616    ///         .map(|id| {
617    ///             let counter = Arc::clone(&counter);
618    ///             (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
619    ///                 let counter = Arc::clone(&counter);
620    ///                 async move { counter.fetch_add(1, Ordering::SeqCst); }
621    ///             })))
622    ///         })
623    ///         .collect();
624    ///     let postponed = timer.postpone_batch_with_callbacks(updates);
625    ///     println!("Postponed {} timers", postponed);
626    /// }
627    /// ```
628    #[inline]
629    pub fn postpone_batch_with_callbacks(
630        &self,
631        updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
632    ) -> usize {
633        let mut wheel = self.wheel.lock();
634        wheel.postpone_batch_with_callbacks(updates.to_vec())
635    }
636    
637    /// Core tick loop
638    async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
639        let mut interval = tokio::time::interval(tick_duration);
640        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
641
642        loop {
643            interval.tick().await;
644
645            // Advance timing wheel and get expired tasks
646            // Note: wheel.advance() already handles completion notifications
647            let expired_tasks = {
648                let mut wheel_guard = wheel.lock();
649                wheel_guard.advance()
650            };
651
652            // Execute callbacks for expired tasks
653            // Notifications have already been sent by wheel.advance()
654            for task in expired_tasks {
655                if let Some(callback) = task.callback {
656                    // Spawn callback execution in a separate tokio task
657                    tokio::spawn(async move {
658                        let future = callback.call();
659                        future.await;
660                    });
661                }
662            }
663        }
664    }
665
666    /// Graceful shutdown of TimerWheel
667    /// 
668    /// 优雅关闭 TimerWheel
669    /// 
670    /// # Examples (示例)
671    /// ```no_run
672    /// # use kestrel_timer::TimerWheel;
673    /// # #[tokio::main]
674    /// # async fn main() {
675    /// let timer = TimerWheel::with_defaults();
676    /// 
677    /// // Use timer... (使用定时器...)
678    /// 
679    /// timer.shutdown().await;
680    /// # }
681    /// ```
682    pub async fn shutdown(mut self) {
683        if let Some(handle) = self.tick_handle.take() {
684            handle.abort();
685            let _ = handle.await;
686        }
687    }
688}
689
690impl Drop for TimerWheel {
691    fn drop(&mut self) {
692        if let Some(handle) = self.tick_handle.take() {
693            handle.abort();
694        }
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701    use std::sync::atomic::{AtomicU32, Ordering};
702    use crate::task::TimerTask;
703    
704    #[tokio::test]
705    async fn test_timer_creation() {
706        let _timer = TimerWheel::with_defaults();
707    }
708
709    #[tokio::test]
710    async fn test_schedule_once() {
711        use std::sync::Arc;
712        let timer = TimerWheel::with_defaults();
713        let counter = Arc::new(AtomicU32::new(0));
714        let counter_clone = Arc::clone(&counter);
715
716        let task = TimerTask::new_oneshot(
717            Duration::from_millis(50),
718            Some(CallbackWrapper::new(move || {
719                let counter = Arc::clone(&counter_clone);
720                async move {
721                    counter.fetch_add(1, Ordering::SeqCst);
722                }
723            })),
724        );
725        let _handle = timer.register(task);
726
727        // Wait for timer to trigger
728        // 等待定时器触发
729        tokio::time::sleep(Duration::from_millis(100)).await;
730        assert_eq!(counter.load(Ordering::SeqCst), 1);
731    }
732
733    #[tokio::test]
734    async fn test_cancel_timer() {
735        use std::sync::Arc;
736        let timer = TimerWheel::with_defaults();
737        let counter = Arc::new(AtomicU32::new(0));
738        let counter_clone = Arc::clone(&counter);
739
740        let task = TimerTask::new_oneshot(
741            Duration::from_millis(100),
742            Some(CallbackWrapper::new(move || {
743                let counter = Arc::clone(&counter_clone);
744                async move {
745                    counter.fetch_add(1, Ordering::SeqCst);
746                }
747            })),
748        );
749        let handle = timer.register(task);
750
751        // Immediately cancel
752        // 立即取消
753        let cancel_result = handle.cancel();
754        assert!(cancel_result);
755
756        // Wait for enough time to ensure timer does not trigger
757        // 等待足够时间确保定时器不触发
758        tokio::time::sleep(Duration::from_millis(200)).await;
759        assert_eq!(counter.load(Ordering::SeqCst), 0);
760    }
761
762    #[tokio::test]
763    async fn test_cancel_immediate() {
764        use std::sync::Arc;
765        let timer = TimerWheel::with_defaults();
766        let counter = Arc::new(AtomicU32::new(0));
767        let counter_clone = Arc::clone(&counter);
768
769        let task = TimerTask::new_oneshot(
770            Duration::from_millis(100),
771            Some(CallbackWrapper::new(move || {
772                let counter = Arc::clone(&counter_clone);
773                async move {
774                    counter.fetch_add(1, Ordering::SeqCst);
775                }
776            })),
777        );
778        let handle = timer.register(task);
779
780        // Immediately cancel
781        // 立即取消
782        let cancel_result = handle.cancel();
783        assert!(cancel_result);
784
785        // Wait for enough time to ensure timer does not trigger
786        // 等待足够时间确保定时器不触发
787        tokio::time::sleep(Duration::from_millis(200)).await;
788        assert_eq!(counter.load(Ordering::SeqCst), 0);
789    }
790
791    #[tokio::test]
792    async fn test_postpone_timer() {
793        use std::sync::Arc;
794        let timer = TimerWheel::with_defaults();
795        let counter = Arc::new(AtomicU32::new(0));
796        let counter_clone = Arc::clone(&counter);
797
798        let task = TimerTask::new_oneshot(
799            Duration::from_millis(50),
800            Some(CallbackWrapper::new(move || {
801                let counter = Arc::clone(&counter_clone);
802                async move {
803                    counter.fetch_add(1, Ordering::SeqCst);
804                }
805            })),
806        );
807        let task_id = task.get_id();
808        let handle = timer.register(task);
809
810        // Postpone task to 150ms
811        // 推迟任务到 150ms
812        let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
813        assert!(postponed);
814
815        // Wait for original time 50ms, task should not trigger
816        // 等待原始时间 50ms,任务不应触发
817        tokio::time::sleep(Duration::from_millis(70)).await;
818        assert_eq!(counter.load(Ordering::SeqCst), 0);
819
820        // Wait for new trigger time (from postponed start, need to wait about 150ms)
821        // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
822        let (rx, _handle) = handle.into_parts();
823        let result = match rx {
824            crate::task::CompletionReceiver::OneShot(receiver) => {
825                tokio::time::timeout(
826                    Duration::from_millis(200),
827                    receiver.wait()
828                ).await
829            },
830            _ => panic!("Expected OneShot completion receiver"),
831        };
832        assert!(result.is_ok());
833        
834        // Wait for callback to execute
835        // 等待回调执行
836        tokio::time::sleep(Duration::from_millis(20)).await;
837        assert_eq!(counter.load(Ordering::SeqCst), 1);
838    }
839
840    #[tokio::test]
841    async fn test_postpone_with_callback() {
842        use std::sync::Arc;
843        let timer = TimerWheel::with_defaults();
844        let counter = Arc::new(AtomicU32::new(0));
845        let counter_clone1 = Arc::clone(&counter);
846        let counter_clone2 = Arc::clone(&counter);
847
848        // Create task, original callback adds 1
849        let task = TimerTask::new_oneshot(
850            Duration::from_millis(50),
851            Some(CallbackWrapper::new(move || {
852                let counter = Arc::clone(&counter_clone1);
853                async move {
854                    counter.fetch_add(1, Ordering::SeqCst);
855                }
856            })),
857        );
858        let task_id = task.get_id();
859        let handle = timer.register(task);
860
861        // Postpone task and replace callback, new callback adds 10
862        // 推迟任务并替换回调,新回调增加 10
863        let postponed = timer.postpone(
864            task_id,
865            Duration::from_millis(100),
866            Some(CallbackWrapper::new(move || {
867                let counter = Arc::clone(&counter_clone2);
868                async move {
869                    counter.fetch_add(10, Ordering::SeqCst);
870                }
871            })),
872        );
873        assert!(postponed);
874
875        // Wait for task to trigger (after postponed, need to wait 100ms, plus margin)
876        // 等待任务触发(推迟后,需要等待 100ms,加上余量)
877        let (rx, _handle) = handle.into_parts();
878        let result = match rx {
879            crate::task::CompletionReceiver::OneShot(receiver) => {
880                tokio::time::timeout(
881                    Duration::from_millis(200),
882                    receiver.wait()
883                ).await
884            },
885            _ => panic!("Expected OneShot completion receiver"),
886        };
887        assert!(result.is_ok());
888        
889        // Wait for callback to execute
890        // 等待回调执行
891        tokio::time::sleep(Duration::from_millis(20)).await;
892        
893        // Verify new callback is executed (increased 10 instead of 1)
894        // 验证新回调已执行(增加 10 而不是 1)
895        assert_eq!(counter.load(Ordering::SeqCst), 10);
896    }
897
898    #[tokio::test]
899    async fn test_postpone_nonexistent_timer() {
900        let timer = TimerWheel::with_defaults();
901        
902        // Try to postpone nonexistent task
903        // 尝试推迟一个不存在的任务
904        let fake_task = TimerTask::new_oneshot(Duration::from_millis(50), None);
905        let fake_task_id = fake_task.get_id();
906        // Do not register this task
907        // 不注册这个任务
908        let postponed = timer.postpone(fake_task_id, Duration::from_millis(100), None);
909        assert!(!postponed);
910    }
911
912    #[tokio::test]
913    async fn test_postpone_batch() {
914        use std::sync::Arc;
915        let timer = TimerWheel::with_defaults();
916        let counter = Arc::new(AtomicU32::new(0));
917
918        // Create 3 tasks
919        // 创建 3 个任务
920        let mut task_ids = Vec::new();
921        for _ in 0..3 {
922            let counter_clone = Arc::clone(&counter);
923            let task = TimerTask::new_oneshot(
924                Duration::from_millis(50),
925                Some(CallbackWrapper::new(move || {
926                    let counter = Arc::clone(&counter_clone);
927                    async move {
928                        counter.fetch_add(1, Ordering::SeqCst);
929                    }
930                })),
931            );
932            task_ids.push((task.get_id(), Duration::from_millis(150)));
933            timer.register(task);
934        }
935
936        // Batch postpone
937        // 批量推迟
938        let postponed = timer.postpone_batch(task_ids);
939        assert_eq!(postponed, 3);
940
941        // Wait for original time 50ms, task should not trigger
942        // 等待原始时间 50ms,任务不应触发
943        tokio::time::sleep(Duration::from_millis(70)).await;
944        assert_eq!(counter.load(Ordering::SeqCst), 0);
945
946        // Wait for new trigger time (from postponed start, need to wait about 150ms)
947        // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
948        tokio::time::sleep(Duration::from_millis(200)).await;
949        
950        // Wait for callback to execute
951        // 等待回调执行
952        tokio::time::sleep(Duration::from_millis(20)).await;
953        assert_eq!(counter.load(Ordering::SeqCst), 3);
954    }
955
956    #[tokio::test]
957    async fn test_postpone_batch_with_callbacks() {
958        use std::sync::Arc;
959        let timer = TimerWheel::with_defaults();
960        let counter = Arc::new(AtomicU32::new(0));
961
962        // Create 3 tasks
963        // 创建 3 个任务
964        let mut task_ids = Vec::new();
965        for _ in 0..3 {
966            let task = TimerTask::new_oneshot(
967                Duration::from_millis(50),
968                None
969            );
970            task_ids.push(task.get_id());
971            timer.register(task);
972        }
973
974        // Batch postpone and replace callbacks
975        // 批量推迟并替换回调
976        let updates: Vec<_> = task_ids
977            .into_iter()
978            .map(|id| {
979                let counter_clone = Arc::clone(&counter);
980                (id, Duration::from_millis(150), Some(CallbackWrapper::new(move || {
981                    let counter = Arc::clone(&counter_clone);
982                    async move {
983                        counter.fetch_add(1, Ordering::SeqCst);
984                    }
985                })))
986            })
987            .collect();
988
989        // Batch postpone and replace callbacks
990        // 批量推迟并替换回调
991        let postponed = timer.postpone_batch_with_callbacks(updates);
992        assert_eq!(postponed, 3);
993
994        // Wait for original time 50ms, task should not trigger
995        // 等待原始时间 50ms,任务不应触发
996        tokio::time::sleep(Duration::from_millis(70)).await;
997        assert_eq!(counter.load(Ordering::SeqCst), 0);
998
999        // Wait for new trigger time (from postponed start, need to wait about 150ms)
1000        // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1001        tokio::time::sleep(Duration::from_millis(200)).await;
1002        
1003        // Wait for callback to execute
1004        // 等待回调执行
1005        tokio::time::sleep(Duration::from_millis(20)).await;
1006        assert_eq!(counter.load(Ordering::SeqCst), 3);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_postpone_keeps_completion_receiver_valid() {
1011        use std::sync::Arc;
1012        let timer = TimerWheel::with_defaults();
1013        let counter = Arc::new(AtomicU32::new(0));
1014        let counter_clone = Arc::clone(&counter);
1015
1016        let task = TimerTask::new_oneshot(
1017            Duration::from_millis(50),
1018            Some(CallbackWrapper::new(move || {
1019                let counter = Arc::clone(&counter_clone);
1020                async move {
1021                    counter.fetch_add(1, Ordering::SeqCst);
1022                }
1023            })),
1024        );
1025        let task_id = task.get_id();
1026        let handle = timer.register(task);
1027
1028        // Postpone task
1029        // 推迟任务
1030        timer.postpone(task_id, Duration::from_millis(100), None);
1031
1032        // Verify original completion_receiver is still valid (after postponed, need to wait 100ms, plus margin)
1033        // 验证原始完成接收器是否仍然有效(推迟后,需要等待 100ms,加上余量)
1034        let (rx, _handle) = handle.into_parts();
1035        let result = match rx {
1036            crate::task::CompletionReceiver::OneShot(receiver) => {
1037                tokio::time::timeout(
1038                    Duration::from_millis(200),
1039                    receiver.wait()
1040                ).await
1041            },
1042            _ => panic!("Expected OneShot completion receiver"),
1043        };
1044        assert!(result.is_ok(), "Completion receiver should still work after postpone");
1045        
1046        // Wait for callback to execute
1047        tokio::time::sleep(Duration::from_millis(20)).await;
1048        assert_eq!(counter.load(Ordering::SeqCst), 1);
1049    }
1050
1051    // ==================== Periodic Task Tests ====================
1052    // ==================== 周期任务测试 ====================
1053
1054    #[tokio::test]
1055    async fn test_periodic_basic() {
1056        use std::sync::Arc;
1057        let timer = TimerWheel::with_defaults();
1058        let counter = Arc::new(AtomicU32::new(0));
1059        let counter_clone = Arc::clone(&counter);
1060
1061        // Create periodic task that triggers every 50ms
1062        // 创建每 50ms 触发一次的周期任务
1063        let task = TimerTask::new_periodic(
1064            Duration::from_millis(50),  // initial delay
1065            Duration::from_millis(50),  // interval
1066            Some(CallbackWrapper::new(move || {
1067                let counter = Arc::clone(&counter_clone);
1068                async move {
1069                    counter.fetch_add(1, Ordering::SeqCst);
1070                }
1071            })),
1072            None,  // use default buffer size
1073        );
1074        let (mut rx, _handle) = timer.register(task).into_parts();
1075
1076        // Wait for 3 periodic executions (50ms initial + 50ms * 2)
1077        // 等待 3 次周期执行(50ms 初始 + 50ms * 2)
1078        tokio::time::sleep(Duration::from_millis(200)).await;
1079        
1080        // Verify callback was triggered multiple times
1081        // 验证回调被多次触发
1082        let count = counter.load(Ordering::SeqCst);
1083        assert!(count >= 3, "Expected at least 3 executions, got {}", count);
1084        
1085        // Verify completion notifications were sent
1086        // 验证完成通知已发送
1087        match rx {
1088            crate::task::CompletionReceiver::Periodic(ref mut receiver) => {
1089                let mut notification_count = 0;
1090                while let Ok(completion) = receiver.try_recv() {
1091                    assert_eq!(completion, crate::task::TaskCompletion::Called);
1092                    notification_count += 1;
1093                }
1094                assert!(notification_count >= 3, "Expected at least 3 notifications, got {}", notification_count);
1095            },
1096            _ => panic!("Expected Periodic completion receiver"),
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn test_periodic_cancel() {
1102        use std::sync::Arc;
1103        let timer = TimerWheel::with_defaults();
1104        let counter = Arc::new(AtomicU32::new(0));
1105        let counter_clone = Arc::clone(&counter);
1106
1107        // Create periodic task
1108        // 创建周期任务
1109        let task = TimerTask::new_periodic(
1110            Duration::from_millis(50),
1111            Duration::from_millis(50),
1112            Some(CallbackWrapper::new(move || {
1113                let counter = Arc::clone(&counter_clone);
1114                async move {
1115                    counter.fetch_add(1, Ordering::SeqCst);
1116                }
1117            })),
1118            None,
1119        );
1120        let handle = timer.register(task);
1121
1122        // Wait for first execution
1123        // 等待第一次执行
1124        tokio::time::sleep(Duration::from_millis(80)).await;
1125        let count_before_cancel = counter.load(Ordering::SeqCst);
1126        assert!(count_before_cancel >= 1, "Expected at least 1 execution before cancel");
1127
1128        // Cancel periodic task
1129        // 取消周期任务
1130        let cancelled = handle.cancel();
1131        assert!(cancelled);
1132
1133        // Wait some more time and verify task stopped
1134        // 等待更多时间并验证任务已停止
1135        tokio::time::sleep(Duration::from_millis(150)).await;
1136        let count_after_cancel = counter.load(Ordering::SeqCst);
1137        
1138        // Count should not increase significantly after cancellation
1139        // 取消后计数不应该显著增加
1140        assert!(
1141            count_after_cancel - count_before_cancel <= 1,
1142            "Task should stop after cancel, before: {}, after: {}",
1143            count_before_cancel,
1144            count_after_cancel
1145        );
1146    }
1147
1148    #[tokio::test]
1149    async fn test_periodic_cancel_notification() {
1150        use std::sync::Arc;
1151        let timer = TimerWheel::with_defaults();
1152        let counter = Arc::new(AtomicU32::new(0));
1153        let counter_clone = Arc::clone(&counter);
1154
1155        // Create periodic task
1156        // 创建周期任务
1157        let task = TimerTask::new_periodic(
1158            Duration::from_millis(50),
1159            Duration::from_millis(50),
1160            Some(CallbackWrapper::new(move || {
1161                let counter = Arc::clone(&counter_clone);
1162                async move {
1163                    counter.fetch_add(1, Ordering::SeqCst);
1164                }
1165            })),
1166            None,
1167        );
1168        let (mut rx, handle) = timer.register(task).into_parts();
1169
1170        // Wait for first execution
1171        // 等待第一次执行
1172        tokio::time::sleep(Duration::from_millis(80)).await;
1173
1174        // Cancel the task
1175        // 取消任务
1176        let cancelled = handle.cancel();
1177        assert!(cancelled);
1178
1179        // Wait a bit for the cancel notification
1180        // 等待取消通知
1181        tokio::time::sleep(Duration::from_millis(50)).await;
1182
1183        // Verify we received cancellation notification
1184        // 验证收到取消通知
1185        match rx {
1186            crate::task::CompletionReceiver::Periodic(ref mut receiver) => {
1187                let mut found_cancelled = false;
1188                while let Ok(completion) = receiver.try_recv() {
1189                    if completion == crate::task::TaskCompletion::Cancelled {
1190                        found_cancelled = true;
1191                        break;
1192                    }
1193                }
1194                assert!(found_cancelled, "Expected to receive Cancelled notification");
1195            },
1196            _ => panic!("Expected Periodic completion receiver"),
1197        }
1198    }
1199
1200    #[tokio::test]
1201    async fn test_periodic_postpone() {
1202        use std::sync::Arc;
1203        let timer = TimerWheel::with_defaults();
1204        let counter = Arc::new(AtomicU32::new(0));
1205        let counter_clone = Arc::clone(&counter);
1206
1207        // Create periodic task with 50ms initial delay and 50ms interval
1208        // 创建初始延迟 50ms 和间隔 50ms 的周期任务
1209        let task = TimerTask::new_periodic(
1210            Duration::from_millis(50),
1211            Duration::from_millis(50),
1212            Some(CallbackWrapper::new(move || {
1213                let counter = Arc::clone(&counter_clone);
1214                async move {
1215                    counter.fetch_add(1, Ordering::SeqCst);
1216                }
1217            })),
1218            None,
1219        );
1220        let task_id = task.get_id();
1221        let _handle = timer.register(task);
1222
1223        // Immediately postpone the task to 150ms
1224        // 立即推迟任务到 150ms
1225        let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
1226        assert!(postponed);
1227
1228        // Wait original time, should not trigger
1229        // 等待原始时间,不应触发
1230        tokio::time::sleep(Duration::from_millis(80)).await;
1231        assert_eq!(counter.load(Ordering::SeqCst), 0, "Task should not trigger at original time");
1232
1233        // Wait for postponed time
1234        // 等待推迟后的时间
1235        tokio::time::sleep(Duration::from_millis(150)).await;
1236        
1237        // Verify task started executing
1238        // 验证任务已开始执行
1239        let count = counter.load(Ordering::SeqCst);
1240        assert!(count >= 1, "Task should trigger after postpone, got count: {}", count);
1241    }
1242
1243    #[tokio::test]
1244    async fn test_periodic_postpone_with_callback() {
1245        use std::sync::Arc;
1246        let timer = TimerWheel::with_defaults();
1247        let counter = Arc::new(AtomicU32::new(0));
1248        let counter_clone1 = Arc::clone(&counter);
1249        let counter_clone2 = Arc::clone(&counter);
1250
1251        // Create periodic task, original callback adds 1
1252        // 创建周期任务,原始回调增加 1
1253        let task = TimerTask::new_periodic(
1254            Duration::from_millis(50),
1255            Duration::from_millis(50),
1256            Some(CallbackWrapper::new(move || {
1257                let counter = Arc::clone(&counter_clone1);
1258                async move {
1259                    counter.fetch_add(1, Ordering::SeqCst);
1260                }
1261            })),
1262            None,
1263        );
1264        let task_id = task.get_id();
1265        let _handle = timer.register(task);
1266
1267        // Postpone task and replace callback, new callback adds 10
1268        // 推迟任务并替换回调,新回调增加 10
1269        let postponed = timer.postpone(
1270            task_id,
1271            Duration::from_millis(100),
1272            Some(CallbackWrapper::new(move || {
1273                let counter = Arc::clone(&counter_clone2);
1274                async move {
1275                    counter.fetch_add(10, Ordering::SeqCst);
1276                }
1277            })),
1278        );
1279        assert!(postponed);
1280
1281        // Wait for task to trigger
1282        // 等待任务触发
1283        tokio::time::sleep(Duration::from_millis(200)).await;
1284        
1285        let count = counter.load(Ordering::SeqCst);
1286        // Should be at least 10 (new callback), not 1 (old callback)
1287        // 应该至少是 10(新回调),而不是 1(旧回调)
1288        assert!(count >= 10, "New callback should be used, got count: {}", count);
1289        
1290        // Verify it's a multiple of 10 (new callback)
1291        // 验证是 10 的倍数(新回调)
1292        assert_eq!(count % 10, 0, "Count should be multiple of 10, got: {}", count);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_periodic_batch_cancel() {
1297        use std::sync::Arc;
1298        let timer = TimerWheel::with_defaults();
1299        let counter = Arc::new(AtomicU32::new(0));
1300
1301        // Create multiple periodic tasks
1302        // 创建多个周期任务
1303        let mut task_ids = Vec::new();
1304        for _ in 0..3 {
1305            let counter_clone = Arc::clone(&counter);
1306            let task = TimerTask::new_periodic(
1307                Duration::from_millis(50),
1308                Duration::from_millis(50),
1309                Some(CallbackWrapper::new(move || {
1310                    let counter = Arc::clone(&counter_clone);
1311                    async move {
1312                        counter.fetch_add(1, Ordering::SeqCst);
1313                    }
1314                })),
1315                None,
1316            );
1317            task_ids.push(task.get_id());
1318            timer.register(task);
1319        }
1320
1321        // Wait for first execution
1322        // 等待第一次执行
1323        tokio::time::sleep(Duration::from_millis(80)).await;
1324        let count_before_cancel = counter.load(Ordering::SeqCst);
1325        assert!(count_before_cancel >= 3, "Expected at least 3 executions before cancel");
1326
1327        // Batch cancel
1328        // 批量取消
1329        let cancelled = timer.cancel_batch(&task_ids);
1330        assert_eq!(cancelled, 3);
1331
1332        // Wait and verify tasks stopped
1333        // 等待并验证任务已停止
1334        tokio::time::sleep(Duration::from_millis(150)).await;
1335        let count_after_cancel = counter.load(Ordering::SeqCst);
1336        
1337        // Count should not increase significantly after cancellation
1338        // 取消后计数不应该显著增加
1339        assert!(
1340            count_after_cancel - count_before_cancel <= 3,
1341            "Tasks should stop after cancel, before: {}, after: {}",
1342            count_before_cancel,
1343            count_after_cancel
1344        );
1345    }
1346
1347    #[tokio::test]
1348    async fn test_periodic_batch_postpone() {
1349        use std::sync::Arc;
1350        let timer = TimerWheel::with_defaults();
1351        let counter = Arc::new(AtomicU32::new(0));
1352
1353        // Create 3 periodic tasks
1354        // 创建 3 个周期任务
1355        let mut postpone_updates = Vec::new();
1356        for _ in 0..3 {
1357            let counter_clone = Arc::clone(&counter);
1358            let task = TimerTask::new_periodic(
1359                Duration::from_millis(50),
1360                Duration::from_millis(50),
1361                Some(CallbackWrapper::new(move || {
1362                    let counter = Arc::clone(&counter_clone);
1363                    async move {
1364                        counter.fetch_add(1, Ordering::SeqCst);
1365                    }
1366                })),
1367                None,
1368            );
1369            postpone_updates.push((task.get_id(), Duration::from_millis(150)));
1370            timer.register(task);
1371        }
1372
1373        // Batch postpone
1374        // 批量推迟
1375        let postponed = timer.postpone_batch(postpone_updates);
1376        assert_eq!(postponed, 3);
1377
1378        // Wait original time, should not trigger
1379        // 等待原始时间,不应触发
1380        tokio::time::sleep(Duration::from_millis(80)).await;
1381        assert_eq!(counter.load(Ordering::SeqCst), 0, "Tasks should not trigger at original time");
1382
1383        // Wait for postponed time
1384        // 等待推迟后的时间
1385        tokio::time::sleep(Duration::from_millis(150)).await;
1386        
1387        let count = counter.load(Ordering::SeqCst);
1388        assert!(count >= 3, "All tasks should trigger after postpone, got count: {}", count);
1389    }
1390
1391    #[tokio::test]
1392    async fn test_periodic_completion_receiver() {
1393        use std::sync::Arc;
1394        let timer = TimerWheel::with_defaults();
1395        let counter = Arc::new(AtomicU32::new(0));
1396        let counter_clone = Arc::clone(&counter);
1397
1398        // Create periodic task
1399        // 创建周期任务
1400        let task = TimerTask::new_periodic(
1401            Duration::from_millis(50),
1402            Duration::from_millis(50),
1403            Some(CallbackWrapper::new(move || {
1404                let counter = Arc::clone(&counter_clone);
1405                async move {
1406                    counter.fetch_add(1, Ordering::SeqCst);
1407                }
1408            })),
1409            None,
1410        );
1411        let (mut rx, _handle) = timer.register(task).into_parts();
1412
1413        // Continuously receive completion notifications
1414        // 持续接收完成通知
1415        let mut notification_count = 0;
1416        match rx {
1417            crate::task::CompletionReceiver::Periodic(ref mut receiver) => {
1418                // Receive notifications for ~200ms
1419                // 接收约 200ms 的通知
1420                let timeout = tokio::time::sleep(Duration::from_millis(200));
1421                tokio::pin!(timeout);
1422                
1423                loop {
1424                    tokio::select! {
1425                        _ = &mut timeout => break,
1426                        result = receiver.recv() => {
1427                            match result {
1428                                Some(completion) => {
1429                                    assert_eq!(completion, crate::task::TaskCompletion::Called);
1430                                    notification_count += 1;
1431                                }
1432                                None => break,
1433                            }
1434                        }
1435                    }
1436                }
1437            },
1438            _ => panic!("Expected Periodic completion receiver"),
1439        }
1440
1441        // Should receive at least 3 notifications
1442        // 应该至少收到 3 次通知
1443        assert!(notification_count >= 3, "Expected at least 3 notifications, got {}", notification_count);
1444    }
1445
1446    #[tokio::test]
1447    async fn test_periodic_batch_register() {
1448        use std::sync::Arc;
1449        let timer = TimerWheel::with_defaults();
1450        let counter = Arc::new(AtomicU32::new(0));
1451
1452        // Create batch of periodic tasks
1453        // 创建批量周期任务
1454        let tasks: Vec<_> = (0..3)
1455            .map(|_| {
1456                let counter_clone = Arc::clone(&counter);
1457                TimerTask::new_periodic(
1458                    Duration::from_millis(50),
1459                    Duration::from_millis(50),
1460                    Some(CallbackWrapper::new(move || {
1461                        let counter = Arc::clone(&counter_clone);
1462                        async move {
1463                            counter.fetch_add(1, Ordering::SeqCst);
1464                        }
1465                    })),
1466                    None,
1467                )
1468            })
1469            .collect();
1470
1471        let batch_handle = timer.register_batch(tasks);
1472        assert_eq!(batch_handle.len(), 3);
1473
1474        // Wait for executions
1475        // 等待执行
1476        tokio::time::sleep(Duration::from_millis(200)).await;
1477        
1478        let count = counter.load(Ordering::SeqCst);
1479        // Each task should execute at least 3 times, total at least 9
1480        // 每个任务应至少执行 3 次,总共至少 9 次
1481        assert!(count >= 9, "Expected at least 9 total executions, got {}", count);
1482    }
1483}
1484