kestrel_protocol_timer/
timer.rs

1use crate::error::TimerError;
2use crate::task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback, TimerTask};
3use crate::wheel::Wheel;
4use parking_lot::Mutex;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::oneshot;
8use tokio::task::JoinHandle;
9
10/// 完成通知接收器,用于接收定时器完成通知
11pub struct CompletionReceiver(pub oneshot::Receiver<()>);
12
13/// 定时器句柄,用于管理定时器的生命周期
14/// 
15/// 注意:此类型不实现 Clone,以防止重复取消同一个定时器。
16/// 每个定时器只应有一个所有者。
17pub struct TimerHandle {
18    pub(crate) task_id: TaskId,
19    pub(crate) wheel: Arc<Mutex<Wheel>>,
20    pub(crate) completion_rx: CompletionReceiver,
21}
22
23impl TimerHandle {
24    pub(crate) fn new(task_id: TaskId, wheel: Arc<Mutex<Wheel>>, completion_rx: oneshot::Receiver<()>) -> Self {
25        Self { task_id, wheel, completion_rx: CompletionReceiver(completion_rx) }
26    }
27
28    /// 取消定时器
29    ///
30    /// # 返回
31    /// 如果任务存在且成功取消返回 true,否则返回 false
32    ///
33    /// # 示例
34    /// ```no_run
35    /// # use timer::TimerWheel;
36    /// # use std::time::Duration;
37    /// # #[tokio::main]
38    /// # async fn main() {
39    /// let timer = TimerWheel::with_defaults().unwrap();
40    /// let handle = timer.schedule_once(Duration::from_secs(1), || async {}).await.unwrap();
41    /// 
42    /// // 取消定时器
43    /// let success = handle.cancel();
44    /// println!("取消成功: {}", success);
45    /// # }
46    /// ```
47    pub fn cancel(&self) -> bool {
48        let mut wheel = self.wheel.lock();
49        wheel.cancel(self.task_id)
50    }
51
52    /// 获取任务 ID
53    pub fn task_id(&self) -> TaskId {
54        self.task_id
55    }
56
57    /// 获取完成通知接收器的可变引用
58    ///
59    /// # 示例
60    /// ```no_run
61    /// # use timer::TimerWheel;
62    /// # use std::time::Duration;
63    /// # #[tokio::main]
64    /// # async fn main() {
65    /// let timer = TimerWheel::with_defaults().unwrap();
66    /// let handle = timer.schedule_once(Duration::from_secs(1), || async {
67    ///     println!("Timer fired!");
68    /// }).await.unwrap();
69    /// 
70    /// // 等待定时器完成(使用 into_completion_receiver 消耗句柄)
71    /// handle.into_completion_receiver().0.await.ok();
72    /// println!("Timer completed!");
73    /// # }
74    /// ```
75    pub fn completion_receiver(&mut self) -> &mut CompletionReceiver {
76        &mut self.completion_rx
77    }
78
79    /// 消耗句柄,返回完成通知接收器
80    ///
81    /// # 示例
82    /// ```no_run
83    /// # use timer::TimerWheel;
84    /// # use std::time::Duration;
85    /// # #[tokio::main]
86    /// # async fn main() {
87    /// let timer = TimerWheel::with_defaults().unwrap();
88    /// let handle = timer.schedule_once(Duration::from_secs(1), || async {
89    ///     println!("Timer fired!");
90    /// }).await.unwrap();
91    /// 
92    /// // 等待定时器完成
93    /// handle.into_completion_receiver().0.await.ok();
94    /// println!("Timer completed!");
95    /// # }
96    /// ```
97    pub fn into_completion_receiver(self) -> CompletionReceiver {
98        self.completion_rx
99    }
100}
101
102/// 批量定时器句柄,用于管理批量调度的定时器
103/// 
104/// 通过共享 Wheel 引用减少内存开销,同时提供批量操作和迭代器访问能力。
105/// 
106/// 注意:此类型不实现 Clone,以防止重复取消同一批定时器。
107/// 如需访问单个定时器句柄,请使用 `into_iter()` 或 `into_handles()` 进行转换。
108pub struct BatchHandle {
109    pub(crate) task_ids: Vec<TaskId>,
110    pub(crate) wheel: Arc<Mutex<Wheel>>,
111    pub(crate) completion_rxs: Vec<oneshot::Receiver<()>>,
112}
113
114impl BatchHandle {
115    pub(crate) fn new(task_ids: Vec<TaskId>, wheel: Arc<Mutex<Wheel>>, completion_rxs: Vec<oneshot::Receiver<()>>) -> Self {
116        Self { task_ids, wheel, completion_rxs }
117    }
118
119    /// 批量取消所有定时器
120    ///
121    /// # 返回
122    /// 成功取消的任务数量
123    ///
124    /// # 示例
125    /// ```no_run
126    /// # use timer::TimerWheel;
127    /// # use std::time::Duration;
128    /// # #[tokio::main]
129    /// # async fn main() {
130    /// let timer = TimerWheel::with_defaults().unwrap();
131    /// let callbacks: Vec<_> = (0..10)
132    ///     .map(|_| (Duration::from_secs(1), || async {}))
133    ///     .collect();
134    /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
135    /// 
136    /// let cancelled = batch.cancel_all();
137    /// println!("取消了 {} 个定时器", cancelled);
138    /// # }
139    /// ```
140    pub fn cancel_all(self) -> usize {
141        let mut wheel = self.wheel.lock();
142        wheel.cancel_batch(&self.task_ids)
143    }
144
145    /// 将批量句柄转换为单个定时器句柄的 Vec
146    ///
147    /// 消耗 BatchHandle,为每个任务创建独立的 TimerHandle。
148    ///
149    /// # 示例
150    /// ```no_run
151    /// # use timer::TimerWheel;
152    /// # use std::time::Duration;
153    /// # #[tokio::main]
154    /// # async fn main() {
155    /// let timer = TimerWheel::with_defaults().unwrap();
156    /// let callbacks: Vec<_> = (0..3)
157    ///     .map(|_| (Duration::from_secs(1), || async {}))
158    ///     .collect();
159    /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
160    /// 
161    /// // 转换为独立的句柄
162    /// let handles = batch.into_handles();
163    /// for handle in handles {
164    ///     // 可以单独操作每个句柄
165    /// }
166    /// # }
167    /// ```
168    pub fn into_handles(self) -> Vec<TimerHandle> {
169        self.task_ids
170            .into_iter()
171            .zip(self.completion_rxs.into_iter())
172            .map(|(task_id, rx)| {
173                TimerHandle::new(task_id, self.wheel.clone(), rx)
174            })
175            .collect()
176    }
177
178    /// 获取批量任务的数量
179    pub fn len(&self) -> usize {
180        self.task_ids.len()
181    }
182
183    /// 检查批量任务是否为空
184    pub fn is_empty(&self) -> bool {
185        self.task_ids.is_empty()
186    }
187
188    /// 获取所有任务 ID 的引用
189    pub fn task_ids(&self) -> &[TaskId] {
190        &self.task_ids
191    }
192
193    /// 获取所有完成通知接收器的引用
194    ///
195    /// # 返回
196    /// 所有任务的完成通知接收器列表引用
197    pub fn completion_receivers(&mut self) -> &mut Vec<oneshot::Receiver<()>> {
198        &mut self.completion_rxs
199    }
200
201    /// 消耗句柄,返回所有完成通知接收器
202    ///
203    /// # 返回
204    /// 所有任务的完成通知接收器列表
205    ///
206    /// # 示例
207    /// ```no_run
208    /// # use timer::TimerWheel;
209    /// # use std::time::Duration;
210    /// # #[tokio::main]
211    /// # async fn main() {
212    /// let timer = TimerWheel::with_defaults().unwrap();
213    /// let callbacks: Vec<_> = (0..3)
214    ///     .map(|_| (Duration::from_secs(1), || async {}))
215    ///     .collect();
216    /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
217    /// 
218    /// // 获取所有完成通知接收器
219    /// let receivers = batch.into_completion_receivers();
220    /// for rx in receivers {
221    ///     tokio::spawn(async move {
222    ///         if rx.await.is_ok() {
223    ///             println!("A timer completed!");
224    ///         }
225    ///     });
226    /// }
227    /// # }
228    /// ```
229    pub fn into_completion_receivers(self) -> Vec<oneshot::Receiver<()>> {
230        self.completion_rxs
231    }
232}
233
234/// 实现 IntoIterator,允许直接迭代 BatchHandle
235/// 
236/// # 示例
237/// ```no_run
238/// # use timer::TimerWheel;
239/// # use std::time::Duration;
240/// # #[tokio::main]
241/// # async fn main() {
242/// let timer = TimerWheel::with_defaults().unwrap();
243/// let callbacks: Vec<_> = (0..3)
244///     .map(|_| (Duration::from_secs(1), || async {}))
245///     .collect();
246/// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
247/// 
248/// // 直接迭代,每个元素都是独立的 TimerHandle
249/// for handle in batch {
250///     // 可以单独操作每个句柄
251/// }
252/// # }
253/// ```
254impl IntoIterator for BatchHandle {
255    type Item = TimerHandle;
256    type IntoIter = BatchHandleIter;
257
258    fn into_iter(self) -> Self::IntoIter {
259        BatchHandleIter {
260            task_ids: self.task_ids.into_iter(),
261            completion_rxs: self.completion_rxs.into_iter(),
262            wheel: self.wheel,
263        }
264    }
265}
266
267/// BatchHandle 的迭代器
268pub struct BatchHandleIter {
269    task_ids: std::vec::IntoIter<TaskId>,
270    completion_rxs: std::vec::IntoIter<oneshot::Receiver<()>>,
271    wheel: Arc<Mutex<Wheel>>,
272}
273
274impl Iterator for BatchHandleIter {
275    type Item = TimerHandle;
276
277    fn next(&mut self) -> Option<Self::Item> {
278        match (self.task_ids.next(), self.completion_rxs.next()) {
279            (Some(task_id), Some(rx)) => {
280                Some(TimerHandle::new(task_id, self.wheel.clone(), rx))
281            }
282            _ => None,
283        }
284    }
285
286    fn size_hint(&self) -> (usize, Option<usize>) {
287        self.task_ids.size_hint()
288    }
289}
290
291impl ExactSizeIterator for BatchHandleIter {
292    fn len(&self) -> usize {
293        self.task_ids.len()
294    }
295}
296
297/// 时间轮定时器管理器
298pub struct TimerWheel {
299    /// 时间轮唯一标识符
300    
301    /// 时间轮实例(使用 Arc<Mutex> 包装以支持多线程访问)
302    wheel: Arc<Mutex<Wheel>>,
303    
304    /// 后台 tick 循环任务句柄
305    tick_handle: Option<JoinHandle<()>>,
306}
307
308impl TimerWheel {
309    /// 创建新的定时器管理器
310    ///
311    /// # 参数
312    /// - `tick_duration`: 每个 tick 的时间长度(建议 10ms)
313    /// - `slot_count`: 槽位数量(必须是 2 的幂次方,建议 512 或 1024)
314    ///
315    /// # 返回
316    /// - `Ok(Self)`: 成功创建定时器管理器
317    /// - `Err(TimerError)`: 槽位数量无效
318    ///
319    /// # 示例
320    /// ```no_run
321    /// use timer::TimerWheel;
322    /// use std::time::Duration;
323    ///
324    /// #[tokio::main]
325    /// async fn main() {
326    ///     let timer = TimerWheel::new(Duration::from_millis(10), 512).unwrap();
327    /// }
328    /// ```
329    pub fn new(tick_duration: Duration, slot_count: usize) -> Result<Self, TimerError> {
330        let wheel = Wheel::new(tick_duration, slot_count)?;
331        let wheel = Arc::new(Mutex::new(wheel));
332        let wheel_clone = wheel.clone();
333
334        // 启动后台 tick 循环
335        let tick_handle = tokio::spawn(async move {
336            Self::tick_loop(wheel_clone, tick_duration).await;
337        });
338
339        Ok(Self {
340            wheel,
341            tick_handle: Some(tick_handle),
342        })
343    }
344
345    /// 创建带默认配置的定时器管理器
346    /// - tick 时长: 10ms
347    /// - 槽位数量: 512
348    ///
349    /// # 返回
350    /// - `Ok(Self)`: 成功创建定时器管理器
351    /// - `Err(TimerError)`: 创建失败(不太可能,因为使用的是有效的默认值)
352    pub fn with_defaults() -> Result<Self, TimerError> {
353        Self::new(Duration::from_millis(10), 512)
354    }
355
356    /// 创建与此时间轮绑定的 TimerService
357    ///
358    /// # 返回
359    /// 绑定到此时间轮的 TimerService 实例
360    ///
361    /// # 示例
362    /// ```no_run
363    /// use timer::TimerWheel;
364    /// use std::time::Duration;
365    ///
366    /// #[tokio::main]
367    /// async fn main() {
368    ///     let timer = TimerWheel::with_defaults().unwrap();
369    ///     let mut service = timer.create_service();
370    ///     
371    ///     // 直接通过 service 批量调度定时器
372    ///     let callbacks: Vec<_> = (0..5)
373    ///         .map(|_| (Duration::from_millis(100), || async {}))
374    ///         .collect();
375    ///     service.schedule_once_batch(callbacks).await.unwrap();
376    ///     
377    ///     // 接收超时通知
378    ///     let mut rx = service.take_receiver().unwrap();
379    ///     while let Some(task_id) = rx.recv().await {
380    ///         println!("Task {:?} completed", task_id);
381    ///     }
382    /// }
383    /// ```
384    pub fn create_service(&self) -> crate::service::TimerService {
385        crate::service::TimerService::new(self.wheel.clone())
386    }
387
388    /// 内部辅助方法:创建定时器句柄
389    /// 
390    /// 由 TimerWheel 和 TimerService 共用
391    pub(crate) fn create_timer_handle_internal(
392        wheel: &Arc<Mutex<Wheel>>,
393        delay: Duration,
394        callback: Option<CallbackWrapper>,
395    ) -> Result<TimerHandle, TimerError> {
396        let (completion_tx, completion_rx) = oneshot::channel();
397        let notifier = CompletionNotifier(completion_tx);
398        
399        let task = TimerTask::once(0, 0, callback, notifier);
400        
401        let task_id = {
402            let mut wheel_guard = wheel.lock();
403            wheel_guard.insert(delay, task)
404        };
405        
406        Ok(TimerHandle::new(task_id, wheel.clone(), completion_rx))
407    }
408
409    /// 内部辅助方法:创建批量定时器句柄
410    /// 
411    /// 由 TimerWheel 和 TimerService 共用
412    pub(crate) fn create_batch_handle_internal<C>(
413        wheel: &Arc<Mutex<Wheel>>,
414        callbacks: Vec<(Duration, C)>,
415    ) -> Result<BatchHandle, TimerError>
416    where
417        C: TimerCallback,
418    {
419        use std::sync::Arc;
420        let mut completion_rxs = Vec::with_capacity(callbacks.len());
421        
422        let tasks: Vec<(Duration, TimerTask)> = callbacks
423            .into_iter()
424            .map(|(delay, callback)| {
425                let callback_wrapper = Arc::new(callback) as CallbackWrapper;
426                let (completion_tx, completion_rx) = oneshot::channel();
427                completion_rxs.push(completion_rx);
428                let notifier = CompletionNotifier(completion_tx);
429                let task = TimerTask::once(0, 0, Some(callback_wrapper), notifier);
430                (delay, task)
431            })
432            .collect();
433        
434        let task_ids = {
435            let mut wheel_guard = wheel.lock();
436            wheel_guard.insert_batch(tasks)
437        };
438        
439        Ok(BatchHandle::new(task_ids, wheel.clone(), completion_rxs))
440    }
441
442    /// 调度一次性定时器
443    ///
444    /// # 参数
445    /// - `delay`: 延迟时间
446    /// - `callback`: 实现了 TimerCallback trait 的回调对象
447    ///
448    /// # 返回
449    /// - `Ok(TimerHandle)`: 成功调度,返回定时器句柄,可用于取消定时器
450    /// - `Err(TimerError)`: 内部错误
451    ///
452    /// # 示例
453    /// ```no_run
454    /// use timer::TimerWheel;
455    /// use std::time::Duration;
456    /// use std::sync::Arc;
457    ///
458    /// #[tokio::main]
459    /// async fn main() {
460    ///     let timer = TimerWheel::with_defaults().unwrap();
461    ///     
462    ///     let handle = timer.schedule_once(Duration::from_secs(1), || async {
463    ///         println!("Timer fired!");
464    ///     }).await.unwrap();
465    ///     
466    ///     tokio::time::sleep(Duration::from_secs(2)).await;
467    /// }
468    /// ```
469    pub async fn schedule_once<C>(&self, delay: Duration, callback: C) -> Result<TimerHandle, TimerError>
470    where
471        C: TimerCallback,
472    {
473        use std::sync::Arc;
474        let callback_wrapper = Arc::new(callback) as CallbackWrapper;
475        Self::create_timer_handle_internal(&self.wheel, delay, Some(callback_wrapper))
476    }
477
478    /// 批量调度一次性定时器
479    ///
480    /// # 参数
481    /// - `tasks`: (延迟时间, 回调) 的元组列表
482    ///
483    /// # 返回
484    /// - `Ok(BatchHandle)`: 成功调度,返回批量定时器句柄
485    /// - `Err(TimerError)`: 内部错误
486    ///
487    /// # 性能优势
488    /// - 批量处理减少锁竞争
489    /// - 内部优化批量插入操作
490    /// - 共享 Wheel 引用减少内存开销
491    ///
492    /// # 示例
493    /// ```no_run
494    /// use timer::TimerWheel;
495    /// use std::time::Duration;
496    /// use std::sync::Arc;
497    /// use std::sync::atomic::{AtomicU32, Ordering};
498    ///
499    /// #[tokio::main]
500    /// async fn main() {
501    ///     let timer = TimerWheel::with_defaults().unwrap();
502    ///     let counter = Arc::new(AtomicU32::new(0));
503    ///     
504    ///     // 动态生成批量回调
505    ///     let callbacks: Vec<(Duration, _)> = (0..3)
506    ///         .map(|i| {
507    ///             let counter = Arc::clone(&counter);
508    ///             let delay = Duration::from_millis(100 + i * 100);
509    ///             let callback = move || {
510    ///                 let counter = Arc::clone(&counter);
511    ///                 async move {
512    ///                     counter.fetch_add(1, Ordering::SeqCst);
513    ///                 }
514    ///             };
515    ///             (delay, callback)
516    ///         })
517    ///         .collect();
518    ///     
519    ///     let batch = timer.schedule_once_batch(callbacks).await.unwrap();
520    ///     println!("Scheduled {} timers", batch.len());
521    ///     
522    ///     // 批量取消所有定时器
523    ///     let cancelled = batch.cancel_all();
524    ///     println!("Cancelled {} timers", cancelled);
525    /// }
526    /// ```
527    pub async fn schedule_once_batch<C>(&self, callbacks: Vec<(Duration, C)>) -> Result<BatchHandle, TimerError>
528    where
529        C: TimerCallback,
530    {
531        Self::create_batch_handle_internal(&self.wheel, callbacks)
532    }
533
534
535    /// 调度一次性通知定时器(无回调,仅通知)
536    ///
537    /// # 参数
538    /// - `delay`: 延迟时间
539    ///
540    /// # 返回
541    /// - `Ok(TimerHandle)`: 成功调度,返回定时器句柄,可通过 `into_completion_receiver()` 获取通知接收器
542    /// - `Err(TimerError)`: 内部错误
543    ///
544    /// # 示例
545    /// ```no_run
546    /// use timer::TimerWheel;
547    /// use std::time::Duration;
548    ///
549    /// #[tokio::main]
550    /// async fn main() {
551    ///     let timer = TimerWheel::with_defaults().unwrap();
552    ///     
553    ///     let handle = timer.schedule_once_notify(Duration::from_secs(1)).await.unwrap();
554    ///     
555    ///     // 获取完成通知接收器
556    ///     handle.into_completion_receiver().0.await.ok();
557    ///     println!("Timer completed!");
558    /// }
559    /// ```
560    pub async fn schedule_once_notify(&self, delay: Duration) -> Result<TimerHandle, TimerError> {
561        Self::create_timer_handle_internal(&self.wheel, delay, None)
562    }
563
564    /// 取消定时器
565    ///
566    /// # 参数
567    /// - `task_id`: 任务 ID
568    ///
569    /// # 返回
570    /// 如果任务存在且成功取消返回 true,否则返回 false
571    pub fn cancel(&self, task_id: TaskId) -> bool {
572        let mut wheel = self.wheel.lock();
573        wheel.cancel(task_id)
574    }
575
576    /// 批量取消定时器
577    ///
578    /// # 参数
579    /// - `task_ids`: 要取消的任务 ID 列表
580    ///
581    /// # 返回
582    /// 成功取消的任务数量
583    ///
584    /// # 性能优势
585    /// - 批量处理减少锁竞争
586    /// - 内部优化批量取消操作
587    ///
588    /// # 示例
589    /// ```no_run
590    /// use timer::TimerWheel;
591    /// use std::time::Duration;
592    ///
593    /// #[tokio::main]
594    /// async fn main() {
595    ///     let timer = TimerWheel::with_defaults().unwrap();
596    ///     
597    ///     // 创建多个定时器
598    ///     let handle1 = timer.schedule_once(Duration::from_secs(10), || async {}).await.unwrap();
599    ///     let handle2 = timer.schedule_once(Duration::from_secs(10), || async {}).await.unwrap();
600    ///     let handle3 = timer.schedule_once(Duration::from_secs(10), || async {}).await.unwrap();
601    ///     
602    ///     // 批量取消
603    ///     let task_ids = vec![handle1.task_id(), handle2.task_id(), handle3.task_id()];
604    ///     let cancelled = timer.cancel_batch(&task_ids);
605    ///     println!("已取消 {} 个定时器", cancelled);
606    /// }
607    /// ```
608    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
609        let mut wheel = self.wheel.lock();
610        wheel.cancel_batch(task_ids)
611    }
612    
613    /// 核心 tick 循环
614    async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
615        let mut interval = tokio::time::interval(tick_duration);
616        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
617
618        loop {
619            interval.tick().await;
620
621            // 推进时间轮并获取到期任务
622            let expired_tasks = {
623                let mut wheel_guard = wheel.lock();
624                wheel_guard.advance()
625            };
626
627            // 执行到期任务
628            for task in expired_tasks {
629                let callback = task.get_callback();
630                
631                // 移动task的所有权来获取completion_notifier
632                let notifier = task.completion_notifier;
633                
634                // 在独立的 tokio 任务中执行回调,并在回调完成后发送通知
635                if let Some(callback) = callback {
636                    tokio::spawn(async move {
637                        // 执行回调
638                        let future = callback.call();
639                        future.await;
640                        
641                        // 回调执行完成后发送通知
642                        let _ = notifier.0.send(());
643                    });
644                } else {
645                    // 如果没有回调,立即发送完成通知
646                    let _ = notifier.0.send(());
647                }
648            }
649        }
650    }
651
652    /// 停止定时器管理器
653    pub async fn shutdown(mut self) {
654        if let Some(handle) = self.tick_handle.take() {
655            handle.abort();
656            let _ = handle.await;
657        }
658    }
659}
660
661impl Drop for TimerWheel {
662    fn drop(&mut self) {
663        if let Some(handle) = self.tick_handle.take() {
664            handle.abort();
665        }
666    }
667}
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672    use std::sync::atomic::{AtomicU32, Ordering};
673
674    #[tokio::test]
675    async fn test_timer_creation() {
676        let _timer = TimerWheel::with_defaults().unwrap();
677    }
678
679    #[tokio::test]
680    async fn test_schedule_once() {
681        use std::sync::Arc;
682        let timer = TimerWheel::with_defaults().unwrap();
683        let counter = Arc::new(AtomicU32::new(0));
684        let counter_clone = Arc::clone(&counter);
685
686        let _handle = timer.schedule_once(
687            Duration::from_millis(50),
688            move || {
689                let counter = Arc::clone(&counter_clone);
690                async move {
691                    counter.fetch_add(1, Ordering::SeqCst);
692                }
693            },
694        ).await.unwrap();
695
696        // 等待定时器触发
697        tokio::time::sleep(Duration::from_millis(100)).await;
698        assert_eq!(counter.load(Ordering::SeqCst), 1);
699    }
700
701    #[tokio::test]
702    async fn test_cancel_timer() {
703        use std::sync::Arc;
704        let timer = TimerWheel::with_defaults().unwrap();
705        let counter = Arc::new(AtomicU32::new(0));
706        let counter_clone = Arc::clone(&counter);
707
708        let handle = timer.schedule_once(
709            Duration::from_millis(100),
710            move || {
711                let counter = Arc::clone(&counter_clone);
712                async move {
713                    counter.fetch_add(1, Ordering::SeqCst);
714                }
715            },
716        ).await.unwrap();
717
718        // 立即取消
719        let cancel_result = handle.cancel();
720        assert!(cancel_result);
721
722        // 等待足够长时间确保定时器不会触发
723        tokio::time::sleep(Duration::from_millis(200)).await;
724        assert_eq!(counter.load(Ordering::SeqCst), 0);
725    }
726
727    #[tokio::test]
728    async fn test_cancel_immediate() {
729        use std::sync::Arc;
730        let timer = TimerWheel::with_defaults().unwrap();
731        let counter = Arc::new(AtomicU32::new(0));
732        let counter_clone = Arc::clone(&counter);
733
734        let handle = timer.schedule_once(
735            Duration::from_millis(100),
736            move || {
737                let counter = Arc::clone(&counter_clone);
738                async move {
739                    counter.fetch_add(1, Ordering::SeqCst);
740                }
741            },
742        ).await.unwrap();
743
744        // 立即取消
745        let cancel_result = handle.cancel();
746        assert!(cancel_result);
747
748        // 等待足够长时间确保定时器不会触发
749        tokio::time::sleep(Duration::from_millis(200)).await;
750        assert_eq!(counter.load(Ordering::SeqCst), 0);
751    }
752}
753