kestrel_protocol_timer/timer.rs
1use crate::error::TimerError;
2use crate::task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback, TimerTask};
3use crate::wheel::Wheel;
4use parking_lot::Mutex;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::oneshot;
8use tokio::task::JoinHandle;
9
10/// 完成通知接收器,用于接收定时器完成通知
11pub struct CompletionReceiver(pub oneshot::Receiver<()>);
12
13/// 定时器句柄,用于管理定时器的生命周期
14///
15/// 注意:此类型不实现 Clone,以防止重复取消同一个定时器。
16/// 每个定时器只应有一个所有者。
17pub struct TimerHandle {
18 pub(crate) task_id: TaskId,
19 pub(crate) wheel: Arc<Mutex<Wheel>>,
20 pub(crate) completion_rx: CompletionReceiver,
21}
22
23impl TimerHandle {
24 pub(crate) fn new(task_id: TaskId, wheel: Arc<Mutex<Wheel>>, completion_rx: oneshot::Receiver<()>) -> Self {
25 Self { task_id, wheel, completion_rx: CompletionReceiver(completion_rx) }
26 }
27
28 /// 取消定时器
29 ///
30 /// # 返回
31 /// 如果任务存在且成功取消返回 true,否则返回 false
32 ///
33 /// # 示例
34 /// ```no_run
35 /// # use kestrel_protocol_timer::TimerWheel;
36 /// # use std::time::Duration;
37 /// # #[tokio::main]
38 /// # async fn main() {
39 /// let timer = TimerWheel::with_defaults().unwrap();
40 /// let handle = timer.schedule_once(Duration::from_secs(1), || async {}).await.unwrap();
41 ///
42 /// // 取消定时器
43 /// let success = handle.cancel();
44 /// println!("取消成功: {}", success);
45 /// # }
46 /// ```
47 pub fn cancel(&self) -> bool {
48 let mut wheel = self.wheel.lock();
49 wheel.cancel(self.task_id)
50 }
51
52 /// 获取任务 ID
53 pub fn task_id(&self) -> TaskId {
54 self.task_id
55 }
56
57 /// 获取完成通知接收器的可变引用
58 ///
59 /// # 示例
60 /// ```no_run
61 /// # use kestrel_protocol_timer::TimerWheel;
62 /// # use std::time::Duration;
63 /// # #[tokio::main]
64 /// # async fn main() {
65 /// let timer = TimerWheel::with_defaults().unwrap();
66 /// let handle = timer.schedule_once(Duration::from_secs(1), || async {
67 /// println!("Timer fired!");
68 /// }).await.unwrap();
69 ///
70 /// // 等待定时器完成(使用 into_completion_receiver 消耗句柄)
71 /// handle.into_completion_receiver().0.await.ok();
72 /// println!("Timer completed!");
73 /// # }
74 /// ```
75 pub fn completion_receiver(&mut self) -> &mut CompletionReceiver {
76 &mut self.completion_rx
77 }
78
79 /// 消耗句柄,返回完成通知接收器
80 ///
81 /// # 示例
82 /// ```no_run
83 /// # use kestrel_protocol_timer::TimerWheel;
84 /// # use std::time::Duration;
85 /// # #[tokio::main]
86 /// # async fn main() {
87 /// let timer = TimerWheel::with_defaults().unwrap();
88 /// let handle = timer.schedule_once(Duration::from_secs(1), || async {
89 /// println!("Timer fired!");
90 /// }).await.unwrap();
91 ///
92 /// // 等待定时器完成
93 /// handle.into_completion_receiver().0.await.ok();
94 /// println!("Timer completed!");
95 /// # }
96 /// ```
97 pub fn into_completion_receiver(self) -> CompletionReceiver {
98 self.completion_rx
99 }
100}
101
102/// 批量定时器句柄,用于管理批量调度的定时器
103///
104/// 通过共享 Wheel 引用减少内存开销,同时提供批量操作和迭代器访问能力。
105///
106/// 注意:此类型不实现 Clone,以防止重复取消同一批定时器。
107/// 如需访问单个定时器句柄,请使用 `into_iter()` 或 `into_handles()` 进行转换。
108pub struct BatchHandle {
109 pub(crate) task_ids: Vec<TaskId>,
110 pub(crate) wheel: Arc<Mutex<Wheel>>,
111 pub(crate) completion_rxs: Vec<oneshot::Receiver<()>>,
112}
113
114impl BatchHandle {
115 pub(crate) fn new(task_ids: Vec<TaskId>, wheel: Arc<Mutex<Wheel>>, completion_rxs: Vec<oneshot::Receiver<()>>) -> Self {
116 Self { task_ids, wheel, completion_rxs }
117 }
118
119 /// 批量取消所有定时器
120 ///
121 /// # 返回
122 /// 成功取消的任务数量
123 ///
124 /// # 示例
125 /// ```no_run
126 /// # use kestrel_protocol_timer::TimerWheel;
127 /// # use std::time::Duration;
128 /// # #[tokio::main]
129 /// # async fn main() {
130 /// let timer = TimerWheel::with_defaults().unwrap();
131 /// let callbacks: Vec<_> = (0..10)
132 /// .map(|_| (Duration::from_secs(1), || async {}))
133 /// .collect();
134 /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
135 ///
136 /// let cancelled = batch.cancel_all();
137 /// println!("取消了 {} 个定时器", cancelled);
138 /// # }
139 /// ```
140 pub fn cancel_all(self) -> usize {
141 let mut wheel = self.wheel.lock();
142 wheel.cancel_batch(&self.task_ids)
143 }
144
145 /// 将批量句柄转换为单个定时器句柄的 Vec
146 ///
147 /// 消耗 BatchHandle,为每个任务创建独立的 TimerHandle。
148 ///
149 /// # 示例
150 /// ```no_run
151 /// # use kestrel_protocol_timer::TimerWheel;
152 /// # use std::time::Duration;
153 /// # #[tokio::main]
154 /// # async fn main() {
155 /// let timer = TimerWheel::with_defaults().unwrap();
156 /// let callbacks: Vec<_> = (0..3)
157 /// .map(|_| (Duration::from_secs(1), || async {}))
158 /// .collect();
159 /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
160 ///
161 /// // 转换为独立的句柄
162 /// let handles = batch.into_handles();
163 /// for handle in handles {
164 /// // 可以单独操作每个句柄
165 /// }
166 /// # }
167 /// ```
168 pub fn into_handles(self) -> Vec<TimerHandle> {
169 self.task_ids
170 .into_iter()
171 .zip(self.completion_rxs.into_iter())
172 .map(|(task_id, rx)| {
173 TimerHandle::new(task_id, self.wheel.clone(), rx)
174 })
175 .collect()
176 }
177
178 /// 获取批量任务的数量
179 pub fn len(&self) -> usize {
180 self.task_ids.len()
181 }
182
183 /// 检查批量任务是否为空
184 pub fn is_empty(&self) -> bool {
185 self.task_ids.is_empty()
186 }
187
188 /// 获取所有任务 ID 的引用
189 pub fn task_ids(&self) -> &[TaskId] {
190 &self.task_ids
191 }
192
193 /// 获取所有完成通知接收器的引用
194 ///
195 /// # 返回
196 /// 所有任务的完成通知接收器列表引用
197 pub fn completion_receivers(&mut self) -> &mut Vec<oneshot::Receiver<()>> {
198 &mut self.completion_rxs
199 }
200
201 /// 消耗句柄,返回所有完成通知接收器
202 ///
203 /// # 返回
204 /// 所有任务的完成通知接收器列表
205 ///
206 /// # 示例
207 /// ```no_run
208 /// # use kestrel_protocol_timer::TimerWheel;
209 /// # use std::time::Duration;
210 /// # #[tokio::main]
211 /// # async fn main() {
212 /// let timer = TimerWheel::with_defaults().unwrap();
213 /// let callbacks: Vec<_> = (0..3)
214 /// .map(|_| (Duration::from_secs(1), || async {}))
215 /// .collect();
216 /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
217 ///
218 /// // 获取所有完成通知接收器
219 /// let receivers = batch.into_completion_receivers();
220 /// for rx in receivers {
221 /// tokio::spawn(async move {
222 /// if rx.await.is_ok() {
223 /// println!("A timer completed!");
224 /// }
225 /// });
226 /// }
227 /// # }
228 /// ```
229 pub fn into_completion_receivers(self) -> Vec<oneshot::Receiver<()>> {
230 self.completion_rxs
231 }
232}
233
234/// 实现 IntoIterator,允许直接迭代 BatchHandle
235///
236/// # 示例
237/// ```no_run
238/// # use kestrel_protocol_timer::TimerWheel;
239/// # use std::time::Duration;
240/// # #[tokio::main]
241/// # async fn main() {
242/// let timer = TimerWheel::with_defaults().unwrap();
243/// let callbacks: Vec<_> = (0..3)
244/// .map(|_| (Duration::from_secs(1), || async {}))
245/// .collect();
246/// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
247///
248/// // 直接迭代,每个元素都是独立的 TimerHandle
249/// for handle in batch {
250/// // 可以单独操作每个句柄
251/// }
252/// # }
253/// ```
254impl IntoIterator for BatchHandle {
255 type Item = TimerHandle;
256 type IntoIter = BatchHandleIter;
257
258 fn into_iter(self) -> Self::IntoIter {
259 BatchHandleIter {
260 task_ids: self.task_ids.into_iter(),
261 completion_rxs: self.completion_rxs.into_iter(),
262 wheel: self.wheel,
263 }
264 }
265}
266
267/// BatchHandle 的迭代器
268pub struct BatchHandleIter {
269 task_ids: std::vec::IntoIter<TaskId>,
270 completion_rxs: std::vec::IntoIter<oneshot::Receiver<()>>,
271 wheel: Arc<Mutex<Wheel>>,
272}
273
274impl Iterator for BatchHandleIter {
275 type Item = TimerHandle;
276
277 fn next(&mut self) -> Option<Self::Item> {
278 match (self.task_ids.next(), self.completion_rxs.next()) {
279 (Some(task_id), Some(rx)) => {
280 Some(TimerHandle::new(task_id, self.wheel.clone(), rx))
281 }
282 _ => None,
283 }
284 }
285
286 fn size_hint(&self) -> (usize, Option<usize>) {
287 self.task_ids.size_hint()
288 }
289}
290
291impl ExactSizeIterator for BatchHandleIter {
292 fn len(&self) -> usize {
293 self.task_ids.len()
294 }
295}
296
297/// 时间轮定时器管理器
298pub struct TimerWheel {
299 /// 时间轮唯一标识符
300
301 /// 时间轮实例(使用 Arc<Mutex> 包装以支持多线程访问)
302 wheel: Arc<Mutex<Wheel>>,
303
304 /// 后台 tick 循环任务句柄
305 tick_handle: Option<JoinHandle<()>>,
306}
307
308impl TimerWheel {
309 /// 创建新的定时器管理器
310 ///
311 /// # 参数
312 /// - `tick_duration`: 每个 tick 的时间长度(建议 10ms)
313 /// - `slot_count`: 槽位数量(必须是 2 的幂次方,建议 512 或 1024)
314 ///
315 /// # 返回
316 /// - `Ok(Self)`: 成功创建定时器管理器
317 /// - `Err(TimerError)`: 槽位数量无效
318 ///
319 /// # 示例
320 /// ```no_run
321 /// use kestrel_protocol_timer::TimerWheel;
322 /// use std::time::Duration;
323 ///
324 /// #[tokio::main]
325 /// async fn main() {
326 /// let timer = TimerWheel::new(Duration::from_millis(10), 512).unwrap();
327 /// }
328 /// ```
329 pub fn new(tick_duration: Duration, slot_count: usize) -> Result<Self, TimerError> {
330 let wheel = Wheel::new(tick_duration, slot_count)?;
331 let wheel = Arc::new(Mutex::new(wheel));
332 let wheel_clone = wheel.clone();
333
334 // 启动后台 tick 循环
335 let tick_handle = tokio::spawn(async move {
336 Self::tick_loop(wheel_clone, tick_duration).await;
337 });
338
339 Ok(Self {
340 wheel,
341 tick_handle: Some(tick_handle),
342 })
343 }
344
345 /// 创建带默认配置的定时器管理器
346 /// - tick 时长: 10ms
347 /// - 槽位数量: 512
348 ///
349 /// # 返回
350 /// - `Ok(Self)`: 成功创建定时器管理器
351 /// - `Err(TimerError)`: 创建失败(不太可能,因为使用的是有效的默认值)
352 pub fn with_defaults() -> Result<Self, TimerError> {
353 Self::new(Duration::from_millis(10), 512)
354 }
355
356 /// 创建与此时间轮绑定的 TimerService
357 ///
358 /// # 返回
359 /// 绑定到此时间轮的 TimerService 实例
360 ///
361 /// # 示例
362 /// ```no_run
363 /// use kestrel_protocol_timer::TimerWheel;
364 /// use std::time::Duration;
365 ///
366 /// #[tokio::main]
367 /// async fn main() {
368 /// let timer = TimerWheel::with_defaults().unwrap();
369 /// let mut service = timer.create_service();
370 ///
371 /// // 直接通过 service 批量调度定时器
372 /// let callbacks: Vec<_> = (0..5)
373 /// .map(|_| (Duration::from_millis(100), || async {}))
374 /// .collect();
375 /// service.schedule_once_batch(callbacks).await.unwrap();
376 ///
377 /// // 接收超时通知
378 /// let mut rx = service.take_receiver().unwrap();
379 /// while let Some(task_id) = rx.recv().await {
380 /// println!("Task {:?} completed", task_id);
381 /// }
382 /// }
383 /// ```
384 pub fn create_service(&self) -> crate::service::TimerService {
385 crate::service::TimerService::new(self.wheel.clone())
386 }
387
388 /// 内部辅助方法:创建定时器句柄
389 ///
390 /// 由 TimerWheel 和 TimerService 共用
391 pub(crate) fn create_timer_handle_internal(
392 wheel: &Arc<Mutex<Wheel>>,
393 delay: Duration,
394 callback: Option<CallbackWrapper>,
395 ) -> Result<TimerHandle, TimerError> {
396 let (completion_tx, completion_rx) = oneshot::channel();
397 let notifier = CompletionNotifier(completion_tx);
398
399 let task = TimerTask::once(0, 0, callback, notifier);
400
401 let task_id = {
402 let mut wheel_guard = wheel.lock();
403 wheel_guard.insert(delay, task)
404 };
405
406 Ok(TimerHandle::new(task_id, wheel.clone(), completion_rx))
407 }
408
409 /// 内部辅助方法:创建批量定时器句柄
410 ///
411 /// 由 TimerWheel 和 TimerService 共用
412 pub(crate) fn create_batch_handle_internal<C>(
413 wheel: &Arc<Mutex<Wheel>>,
414 callbacks: Vec<(Duration, C)>,
415 ) -> Result<BatchHandle, TimerError>
416 where
417 C: TimerCallback,
418 {
419 use std::sync::Arc;
420 let mut completion_rxs = Vec::with_capacity(callbacks.len());
421
422 let tasks: Vec<(Duration, TimerTask)> = callbacks
423 .into_iter()
424 .map(|(delay, callback)| {
425 let callback_wrapper = Arc::new(callback) as CallbackWrapper;
426 let (completion_tx, completion_rx) = oneshot::channel();
427 completion_rxs.push(completion_rx);
428 let notifier = CompletionNotifier(completion_tx);
429 let task = TimerTask::once(0, 0, Some(callback_wrapper), notifier);
430 (delay, task)
431 })
432 .collect();
433
434 let task_ids = {
435 let mut wheel_guard = wheel.lock();
436 wheel_guard.insert_batch(tasks)
437 };
438
439 Ok(BatchHandle::new(task_ids, wheel.clone(), completion_rxs))
440 }
441
442 /// 调度一次性定时器
443 ///
444 /// # 参数
445 /// - `delay`: 延迟时间
446 /// - `callback`: 实现了 TimerCallback trait 的回调对象
447 ///
448 /// # 返回
449 /// - `Ok(TimerHandle)`: 成功调度,返回定时器句柄,可用于取消定时器
450 /// - `Err(TimerError)`: 内部错误
451 ///
452 /// # 示例
453 /// ```no_run
454 /// use kestrel_protocol_timer::TimerWheel;
455 /// use std::time::Duration;
456 /// use std::sync::Arc;
457 ///
458 /// #[tokio::main]
459 /// async fn main() {
460 /// let timer = TimerWheel::with_defaults().unwrap();
461 ///
462 /// let handle = timer.schedule_once(Duration::from_secs(1), || async {
463 /// println!("Timer fired!");
464 /// }).await.unwrap();
465 ///
466 /// tokio::time::sleep(Duration::from_secs(2)).await;
467 /// }
468 /// ```
469 pub async fn schedule_once<C>(&self, delay: Duration, callback: C) -> Result<TimerHandle, TimerError>
470 where
471 C: TimerCallback,
472 {
473 use std::sync::Arc;
474 let callback_wrapper = Arc::new(callback) as CallbackWrapper;
475 Self::create_timer_handle_internal(&self.wheel, delay, Some(callback_wrapper))
476 }
477
478 /// 批量调度一次性定时器
479 ///
480 /// # 参数
481 /// - `tasks`: (延迟时间, 回调) 的元组列表
482 ///
483 /// # 返回
484 /// - `Ok(BatchHandle)`: 成功调度,返回批量定时器句柄
485 /// - `Err(TimerError)`: 内部错误
486 ///
487 /// # 性能优势
488 /// - 批量处理减少锁竞争
489 /// - 内部优化批量插入操作
490 /// - 共享 Wheel 引用减少内存开销
491 ///
492 /// # 示例
493 /// ```no_run
494 /// use kestrel_protocol_timer::TimerWheel;
495 /// use std::time::Duration;
496 /// use std::sync::Arc;
497 /// use std::sync::atomic::{AtomicU32, Ordering};
498 ///
499 /// #[tokio::main]
500 /// async fn main() {
501 /// let timer = TimerWheel::with_defaults().unwrap();
502 /// let counter = Arc::new(AtomicU32::new(0));
503 ///
504 /// // 动态生成批量回调
505 /// let callbacks: Vec<(Duration, _)> = (0..3)
506 /// .map(|i| {
507 /// let counter = Arc::clone(&counter);
508 /// let delay = Duration::from_millis(100 + i * 100);
509 /// let callback = move || {
510 /// let counter = Arc::clone(&counter);
511 /// async move {
512 /// counter.fetch_add(1, Ordering::SeqCst);
513 /// }
514 /// };
515 /// (delay, callback)
516 /// })
517 /// .collect();
518 ///
519 /// let batch = timer.schedule_once_batch(callbacks).await.unwrap();
520 /// println!("Scheduled {} timers", batch.len());
521 ///
522 /// // 批量取消所有定时器
523 /// let cancelled = batch.cancel_all();
524 /// println!("Cancelled {} timers", cancelled);
525 /// }
526 /// ```
527 pub async fn schedule_once_batch<C>(&self, callbacks: Vec<(Duration, C)>) -> Result<BatchHandle, TimerError>
528 where
529 C: TimerCallback,
530 {
531 Self::create_batch_handle_internal(&self.wheel, callbacks)
532 }
533
534
535 /// 调度一次性通知定时器(无回调,仅通知)
536 ///
537 /// # 参数
538 /// - `delay`: 延迟时间
539 ///
540 /// # 返回
541 /// - `Ok(TimerHandle)`: 成功调度,返回定时器句柄,可通过 `into_completion_receiver()` 获取通知接收器
542 /// - `Err(TimerError)`: 内部错误
543 ///
544 /// # 示例
545 /// ```no_run
546 /// use kestrel_protocol_timer::TimerWheel;
547 /// use std::time::Duration;
548 ///
549 /// #[tokio::main]
550 /// async fn main() {
551 /// let timer = TimerWheel::with_defaults().unwrap();
552 ///
553 /// let handle = timer.schedule_once_notify(Duration::from_secs(1)).await.unwrap();
554 ///
555 /// // 获取完成通知接收器
556 /// handle.into_completion_receiver().0.await.ok();
557 /// println!("Timer completed!");
558 /// }
559 /// ```
560 pub async fn schedule_once_notify(&self, delay: Duration) -> Result<TimerHandle, TimerError> {
561 Self::create_timer_handle_internal(&self.wheel, delay, None)
562 }
563
564 /// 取消定时器
565 ///
566 /// # 参数
567 /// - `task_id`: 任务 ID
568 ///
569 /// # 返回
570 /// 如果任务存在且成功取消返回 true,否则返回 false
571 pub fn cancel(&self, task_id: TaskId) -> bool {
572 let mut wheel = self.wheel.lock();
573 wheel.cancel(task_id)
574 }
575
576 /// 批量取消定时器
577 ///
578 /// # 参数
579 /// - `task_ids`: 要取消的任务 ID 列表
580 ///
581 /// # 返回
582 /// 成功取消的任务数量
583 ///
584 /// # 性能优势
585 /// - 批量处理减少锁竞争
586 /// - 内部优化批量取消操作
587 ///
588 /// # 示例
589 /// ```no_run
590 /// use kestrel_protocol_timer::TimerWheel;
591 /// use std::time::Duration;
592 ///
593 /// #[tokio::main]
594 /// async fn main() {
595 /// let timer = TimerWheel::with_defaults().unwrap();
596 ///
597 /// // 创建多个定时器
598 /// let handle1 = timer.schedule_once(Duration::from_secs(10), || async {}).await.unwrap();
599 /// let handle2 = timer.schedule_once(Duration::from_secs(10), || async {}).await.unwrap();
600 /// let handle3 = timer.schedule_once(Duration::from_secs(10), || async {}).await.unwrap();
601 ///
602 /// // 批量取消
603 /// let task_ids = vec![handle1.task_id(), handle2.task_id(), handle3.task_id()];
604 /// let cancelled = timer.cancel_batch(&task_ids);
605 /// println!("已取消 {} 个定时器", cancelled);
606 /// }
607 /// ```
608 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
609 let mut wheel = self.wheel.lock();
610 wheel.cancel_batch(task_ids)
611 }
612
613 /// 核心 tick 循环
614 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
615 let mut interval = tokio::time::interval(tick_duration);
616 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
617
618 loop {
619 interval.tick().await;
620
621 // 推进时间轮并获取到期任务
622 let expired_tasks = {
623 let mut wheel_guard = wheel.lock();
624 wheel_guard.advance()
625 };
626
627 // 执行到期任务
628 for task in expired_tasks {
629 let callback = task.get_callback();
630
631 // 移动task的所有权来获取completion_notifier
632 let notifier = task.completion_notifier;
633
634 // 在独立的 tokio 任务中执行回调,并在回调完成后发送通知
635 if let Some(callback) = callback {
636 tokio::spawn(async move {
637 // 执行回调
638 let future = callback.call();
639 future.await;
640
641 // 回调执行完成后发送通知
642 let _ = notifier.0.send(());
643 });
644 } else {
645 // 如果没有回调,立即发送完成通知
646 let _ = notifier.0.send(());
647 }
648 }
649 }
650 }
651
652 /// 停止定时器管理器
653 pub async fn shutdown(mut self) {
654 if let Some(handle) = self.tick_handle.take() {
655 handle.abort();
656 let _ = handle.await;
657 }
658 }
659}
660
661impl Drop for TimerWheel {
662 fn drop(&mut self) {
663 if let Some(handle) = self.tick_handle.take() {
664 handle.abort();
665 }
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672 use std::sync::atomic::{AtomicU32, Ordering};
673
674 #[tokio::test]
675 async fn test_timer_creation() {
676 let _timer = TimerWheel::with_defaults().unwrap();
677 }
678
679 #[tokio::test]
680 async fn test_schedule_once() {
681 use std::sync::Arc;
682 let timer = TimerWheel::with_defaults().unwrap();
683 let counter = Arc::new(AtomicU32::new(0));
684 let counter_clone = Arc::clone(&counter);
685
686 let _handle = timer.schedule_once(
687 Duration::from_millis(50),
688 move || {
689 let counter = Arc::clone(&counter_clone);
690 async move {
691 counter.fetch_add(1, Ordering::SeqCst);
692 }
693 },
694 ).await.unwrap();
695
696 // 等待定时器触发
697 tokio::time::sleep(Duration::from_millis(100)).await;
698 assert_eq!(counter.load(Ordering::SeqCst), 1);
699 }
700
701 #[tokio::test]
702 async fn test_cancel_timer() {
703 use std::sync::Arc;
704 let timer = TimerWheel::with_defaults().unwrap();
705 let counter = Arc::new(AtomicU32::new(0));
706 let counter_clone = Arc::clone(&counter);
707
708 let handle = timer.schedule_once(
709 Duration::from_millis(100),
710 move || {
711 let counter = Arc::clone(&counter_clone);
712 async move {
713 counter.fetch_add(1, Ordering::SeqCst);
714 }
715 },
716 ).await.unwrap();
717
718 // 立即取消
719 let cancel_result = handle.cancel();
720 assert!(cancel_result);
721
722 // 等待足够长时间确保定时器不会触发
723 tokio::time::sleep(Duration::from_millis(200)).await;
724 assert_eq!(counter.load(Ordering::SeqCst), 0);
725 }
726
727 #[tokio::test]
728 async fn test_cancel_immediate() {
729 use std::sync::Arc;
730 let timer = TimerWheel::with_defaults().unwrap();
731 let counter = Arc::new(AtomicU32::new(0));
732 let counter_clone = Arc::clone(&counter);
733
734 let handle = timer.schedule_once(
735 Duration::from_millis(100),
736 move || {
737 let counter = Arc::clone(&counter_clone);
738 async move {
739 counter.fetch_add(1, Ordering::SeqCst);
740 }
741 },
742 ).await.unwrap();
743
744 // 立即取消
745 let cancel_result = handle.cancel();
746 assert!(cancel_result);
747
748 // 等待足够长时间确保定时器不会触发
749 tokio::time::sleep(Duration::from_millis(200)).await;
750 assert_eq!(counter.load(Ordering::SeqCst), 0);
751 }
752}
753