kestrel_protocol_timer/
service.rs

1use crate::config::ServiceConfig;
2use crate::task::{CallbackWrapper, TaskId, TimerCallback};
3use crate::timer::{BatchHandle, TimerHandle};
4use crate::wheel::Wheel;
5use futures::stream::{FuturesUnordered, StreamExt};
6use futures::future::BoxFuture;
7use parking_lot::Mutex;
8use rustc_hash::FxHashSet;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tokio::task::JoinHandle;
13
14/// TimerService 命令类型
15enum ServiceCommand {
16    /// 添加批量定时器句柄
17    AddBatchHandle(BatchHandle),
18    /// 添加单个定时器句柄
19    AddTimerHandle(TimerHandle),
20    /// 批量从活跃任务集合中移除任务(用于直接取消后的清理)
21    RemoveTasks {
22        task_ids: Vec<TaskId>,
23    },
24    /// 关闭 Service
25    Shutdown,
26}
27
28/// TimerService - 基于 Actor 模式的定时器服务
29///
30/// 管理多个定时器句柄,监听所有超时事件,并将 TaskId 聚合转发给用户。
31///
32/// # 特性
33/// - 自动监听所有添加的定时器句柄的超时事件
34/// - 超时后自动从内部管理中移除该任务
35/// - 将超时的 TaskId 转发到统一的通道供用户接收
36/// - 支持动态添加 BatchHandle 和 TimerHandle
37///
38/// # 示例
39/// ```no_run
40/// use kestrel_protocol_timer::{TimerWheel, TimerService};
41/// use std::time::Duration;
42///
43/// #[tokio::main]
44/// async fn main() {
45///     let timer = TimerWheel::with_defaults();
46///     let mut service = timer.create_service();
47///     
48///     // 直接通过 service 批量调度定时器
49///     let callbacks: Vec<_> = (0..5)
50///         .map(|_| (Duration::from_millis(100), || async {}))
51///         .collect();
52///     service.schedule_once_batch(callbacks).await;
53///     
54///     // 接收超时通知
55///     let mut rx = service.take_receiver().unwrap();
56///     while let Some(task_id) = rx.recv().await {
57///         println!("Task {:?} completed", task_id);
58///     }
59/// }
60/// ```
61pub struct TimerService {
62    /// 命令发送端
63    command_tx: mpsc::Sender<ServiceCommand>,
64    /// 超时接收端
65    timeout_rx: Option<mpsc::Receiver<TaskId>>,
66    /// Actor 任务句柄
67    actor_handle: Option<JoinHandle<()>>,
68    /// 时间轮引用(用于直接调度定时器)
69    wheel: Arc<Mutex<Wheel>>,
70}
71
72impl TimerService {
73    /// 创建新的 TimerService
74    ///
75    /// # 参数
76    /// - `wheel`: 时间轮引用
77    /// - `config`: 服务配置
78    ///
79    /// # 注意
80    /// 通常不直接调用此方法,而是使用 `TimerWheel::create_service()` 来创建。
81    ///
82    /// # 示例
83    /// ```no_run
84    /// use kestrel_protocol_timer::TimerWheel;
85    ///
86    /// #[tokio::main]
87    /// async fn main() {
88    ///     let timer = TimerWheel::with_defaults();
89    ///     let mut service = timer.create_service();
90    /// }
91    /// ```
92    pub(crate) fn new(wheel: Arc<Mutex<Wheel>>, config: ServiceConfig) -> Self {
93        let (command_tx, command_rx) = mpsc::channel(config.command_channel_capacity);
94        let (timeout_tx, timeout_rx) = mpsc::channel(config.timeout_channel_capacity);
95
96        let actor = ServiceActor::new(command_rx, timeout_tx);
97        let actor_handle = tokio::spawn(async move {
98            actor.run().await;
99        });
100
101        Self {
102            command_tx,
103            timeout_rx: Some(timeout_rx),
104            actor_handle: Some(actor_handle),
105            wheel,
106        }
107    }
108
109    /// 添加批量定时器句柄(内部方法)
110    async fn add_batch_handle(&self, batch: BatchHandle) {
111        let _ = self.command_tx
112            .send(ServiceCommand::AddBatchHandle(batch))
113            .await;
114    }
115
116    /// 添加单个定时器句柄(内部方法)
117    async fn add_timer_handle(&self, handle: TimerHandle) {
118        let _ = self.command_tx
119            .send(ServiceCommand::AddTimerHandle(handle))
120            .await;
121    }
122
123    /// 获取超时接收器(转移所有权)
124    ///
125    /// # 返回
126    /// 超时通知接收器,如果已经被取走则返回 None
127    ///
128    /// # 注意
129    /// 此方法只能调用一次,因为它会转移接收器的所有权
130    ///
131    /// # 示例
132    /// ```no_run
133    /// # use kestrel_protocol_timer::TimerWheel;
134    /// # use std::time::Duration;
135    /// # #[tokio::main]
136    /// # async fn main() {
137    /// let timer = TimerWheel::with_defaults();
138    /// let mut service = timer.create_service();
139    /// 
140    /// let mut rx = service.take_receiver().unwrap();
141    /// while let Some(task_id) = rx.recv().await {
142    ///     println!("Task {:?} timed out", task_id);
143    /// }
144    /// # }
145    /// ```
146    pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<TaskId>> {
147        self.timeout_rx.take()
148    }
149
150    /// 取消指定的任务
151    ///
152    /// # 参数
153    /// - `task_id`: 要取消的任务 ID
154    ///
155    /// # 返回
156    /// - `Ok(true)`: 任务存在且成功取消
157    /// - `Ok(false)`: 任务不存在或取消失败
158    /// - `Err(String)`: 发送命令失败
159    ///
160    /// # 性能说明
161    /// 此方法使用直接取消优化,不需要等待 Actor 处理,大幅降低延迟
162    ///
163    /// # 示例
164    /// ```no_run
165    /// # use kestrel_protocol_timer::{TimerWheel, TimerService};
166    /// # use std::time::Duration;
167    /// # #[tokio::main]
168    /// # async fn main() {
169    /// let timer = TimerWheel::with_defaults();
170    /// let service = timer.create_service();
171    /// 
172    /// // 直接通过 service 调度定时器
173    /// let task_id = service.schedule_once(Duration::from_secs(10), || async {}).await;
174    /// 
175    /// // 取消任务
176    /// let cancelled = service.cancel_task(task_id).await;
177    /// println!("Task cancelled: {}", cancelled);
178    /// # }
179    /// ```
180    pub async fn cancel_task(&self, task_id: TaskId) -> bool {
181        // 优化:直接取消任务,避免通过 Actor 的异步往返
182        // 这将延迟从 "2次异步通信" 减少到 "0次等待"
183        let success = {
184            let mut wheel = self.wheel.lock();
185            wheel.cancel(task_id)
186        };
187        
188        // 异步通知 Actor 清理 active_tasks(无需等待结果)
189        if success {
190            let _ = self.command_tx
191                .send(ServiceCommand::RemoveTasks { 
192                    task_ids: vec![task_id] 
193                })
194                .await;
195        }
196        
197        success
198    }
199
200    /// 批量取消任务
201    ///
202    /// 使用底层的批量取消操作一次性取消多个任务,性能优于循环调用 cancel_task。
203    ///
204    /// # 参数
205    /// - `task_ids`: 要取消的任务 ID 列表
206    ///
207    /// # 返回
208    /// 成功取消的任务数量
209    ///
210    /// # 示例
211    /// ```no_run
212    /// # use kestrel_protocol_timer::{TimerWheel, TimerService};
213    /// # use std::time::Duration;
214    /// # #[tokio::main]
215    /// # async fn main() {
216    /// let timer = TimerWheel::with_defaults();
217    /// let service = timer.create_service();
218    /// 
219    /// let callbacks: Vec<_> = (0..10)
220    ///     .map(|_| (Duration::from_secs(10), || async {}))
221    ///     .collect();
222    /// let task_ids = service.schedule_once_batch(callbacks).await;
223    /// 
224    /// // 批量取消
225    /// let cancelled = service.cancel_batch(&task_ids).await;
226    /// println!("成功取消 {} 个任务", cancelled);
227    /// # }
228    /// ```
229    pub async fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
230        if task_ids.is_empty() {
231            return 0;
232        }
233
234        // 直接使用底层的批量取消
235        let cancelled_count = {
236            let mut wheel = self.wheel.lock();
237            wheel.cancel_batch(task_ids)
238        };
239
240        // 使用批量移除命令,一次性发送所有需要移除的任务ID
241        let _ = self.command_tx
242            .send(ServiceCommand::RemoveTasks { 
243                task_ids: task_ids.to_vec() 
244            })
245            .await;
246
247        cancelled_count
248    }
249
250    /// 调度一次性定时器
251    ///
252    /// 创建定时器并自动添加到服务管理中,无需手动调用 add_timer_handle
253    ///
254    /// # 参数
255    /// - `delay`: 延迟时间
256    /// - `callback`: 实现了 TimerCallback trait 的回调对象
257    ///
258    /// # 返回
259    /// 返回任务ID
260    ///
261    /// # 示例
262    /// ```no_run
263    /// # use kestrel_protocol_timer::TimerWheel;
264    /// # use std::time::Duration;
265    /// # #[tokio::main]
266    /// # async fn main() {
267    /// let timer = TimerWheel::with_defaults();
268    /// let mut service = timer.create_service();
269    /// 
270    /// let task_id = service.schedule_once(Duration::from_millis(100), || async {
271    ///     println!("Timer fired!");
272    /// }).await;
273    /// 
274    /// println!("Scheduled task: {:?}", task_id);
275    /// # }
276    /// ```
277    pub async fn schedule_once<C>(&self, delay: Duration, callback: C) -> TaskId
278    where
279        C: TimerCallback,
280    {
281        // 创建任务并获取句柄
282        let handle = self.create_timer_handle(delay, Some(Arc::new(callback)));
283        let task_id = handle.task_id();
284        
285        // 自动添加到服务管理
286        self.add_timer_handle(handle).await;
287        
288        task_id
289    }
290
291    /// 批量调度一次性定时器
292    ///
293    /// 批量创建定时器并自动添加到服务管理中
294    ///
295    /// # 参数
296    /// - `callbacks`: (延迟时间, 回调) 的元组列表
297    ///
298    /// # 返回
299    /// 返回所有任务ID
300    ///
301    /// # 示例
302    /// ```no_run
303    /// # use kestrel_protocol_timer::TimerWheel;
304    /// # use std::time::Duration;
305    /// # #[tokio::main]
306    /// # async fn main() {
307    /// let timer = TimerWheel::with_defaults();
308    /// let mut service = timer.create_service();
309    /// 
310    /// let callbacks: Vec<_> = (0..3)
311    ///     .map(|i| (Duration::from_millis(100 * (i + 1)), move || async move {
312    ///         println!("Timer {} fired!", i);
313    ///     }))
314    ///     .collect();
315    /// 
316    /// let task_ids = service.schedule_once_batch(callbacks).await;
317    /// println!("Scheduled {} tasks", task_ids.len());
318    /// # }
319    /// ```
320    pub async fn schedule_once_batch<C>(&self, callbacks: Vec<(Duration, C)>) -> Vec<TaskId>
321    where
322        C: TimerCallback,
323    {
324        // 创建批量任务并获取句柄
325        let batch_handle = self.create_batch_handle(callbacks);
326        let task_ids = batch_handle.task_ids().to_vec();
327        
328        // 自动添加到服务管理
329        self.add_batch_handle(batch_handle).await;
330        
331        task_ids
332    }
333
334    /// 调度一次性通知定时器(无回调,仅通知)
335    ///
336    /// 创建仅通知的定时器并自动添加到服务管理中
337    ///
338    /// # 参数
339    /// - `delay`: 延迟时间
340    ///
341    /// # 返回
342    /// 返回任务ID
343    ///
344    /// # 示例
345    /// ```no_run
346    /// # use kestrel_protocol_timer::TimerWheel;
347    /// # use std::time::Duration;
348    /// # #[tokio::main]
349    /// # async fn main() {
350    /// let timer = TimerWheel::with_defaults();
351    /// let mut service = timer.create_service();
352    /// 
353    /// let task_id = service.schedule_once_notify(Duration::from_millis(100)).await;
354    /// println!("Scheduled notify task: {:?}", task_id);
355    /// 
356    /// // 可以通过 timeout_receiver 接收超时通知
357    /// # }
358    /// ```
359    pub async fn schedule_once_notify(&self, delay: Duration) -> TaskId {
360        // 创建无回调任务并获取句柄
361        let handle = self.create_timer_handle(delay, None);
362        let task_id = handle.task_id();
363        
364        // 自动添加到服务管理
365        self.add_timer_handle(handle).await;
366        
367        task_id
368    }
369
370    /// 内部方法:创建定时器句柄
371    fn create_timer_handle(
372        &self,
373        delay: Duration,
374        callback: Option<CallbackWrapper>,
375    ) -> TimerHandle {
376        crate::timer::TimerWheel::create_timer_handle_internal(
377            &self.wheel,
378            delay,
379            callback
380        )
381    }
382
383    /// 内部方法:创建批量定时器句柄
384    fn create_batch_handle<C>(
385        &self,
386        callbacks: Vec<(Duration, C)>,
387    ) -> BatchHandle
388    where
389        C: TimerCallback,
390    {
391        crate::timer::TimerWheel::create_batch_handle_internal(
392            &self.wheel,
393            callbacks
394        )
395    }
396
397    /// 优雅关闭 TimerService
398    ///
399    /// # 示例
400    /// ```no_run
401    /// # use kestrel_protocol_timer::TimerWheel;
402    /// # #[tokio::main]
403    /// # async fn main() {
404    /// let timer = TimerWheel::with_defaults();
405    /// let mut service = timer.create_service();
406    /// 
407    /// // 使用 service...
408    /// 
409    /// service.shutdown().await;
410    /// # }
411    /// ```
412    pub async fn shutdown(mut self) {
413        let _ = self.command_tx.send(ServiceCommand::Shutdown).await;
414        if let Some(handle) = self.actor_handle.take() {
415            let _ = handle.await;
416        }
417    }
418}
419
420
421impl Drop for TimerService {
422    fn drop(&mut self) {
423        if let Some(handle) = self.actor_handle.take() {
424            handle.abort();
425        }
426    }
427}
428
429/// ServiceActor - 内部 Actor 实现
430struct ServiceActor {
431    /// 命令接收端
432    command_rx: mpsc::Receiver<ServiceCommand>,
433    /// 超时发送端
434    timeout_tx: mpsc::Sender<TaskId>,
435    /// 活跃任务ID集合(使用 FxHashSet 提升性能)
436    active_tasks: FxHashSet<TaskId>,
437}
438
439impl ServiceActor {
440    fn new(command_rx: mpsc::Receiver<ServiceCommand>, timeout_tx: mpsc::Sender<TaskId>) -> Self {
441        Self {
442            command_rx,
443            timeout_tx,
444            active_tasks: FxHashSet::default(),
445        }
446    }
447
448    async fn run(mut self) {
449        // 使用 FuturesUnordered 来监听所有的 completion_rxs
450        // 每个 future 返回 (TaskId, Result)
451        let mut futures: FuturesUnordered<BoxFuture<'static, (TaskId, Result<(), tokio::sync::oneshot::error::RecvError>)>> = FuturesUnordered::new();
452
453        loop {
454            tokio::select! {
455                // 监听超时事件
456                Some((task_id, _result)) = futures.next() => {
457                    // 任务超时,转发 TaskId
458                    let _ = self.timeout_tx.send(task_id).await;
459                    // 从活跃任务集合中移除该任务
460                    self.active_tasks.remove(&task_id);
461                    // 任务会自动从 FuturesUnordered 中移除
462                }
463                
464                // 监听命令
465                Some(cmd) = self.command_rx.recv() => {
466                    match cmd {
467                        ServiceCommand::AddBatchHandle(batch) => {
468                            let BatchHandle {
469                                task_ids,
470                                completion_rxs,
471                                ..
472                            } = batch;
473                            
474                            // 将所有任务添加到 futures 和 active_tasks 中
475                            for (task_id, rx) in task_ids.into_iter().zip(completion_rxs.into_iter()) {
476                                // 记录到活跃任务集合
477                                self.active_tasks.insert(task_id);
478                                
479                                let future: BoxFuture<'static, (TaskId, Result<(), tokio::sync::oneshot::error::RecvError>)> = Box::pin(async move {
480                                    (task_id, rx.await)
481                                });
482                                futures.push(future);
483                            }
484                        }
485                        ServiceCommand::AddTimerHandle(handle) => {
486                            let TimerHandle{
487                                task_id,
488                                completion_rx,
489                                ..
490                            } = handle;
491                            
492                            // 记录到活跃任务集合
493                            self.active_tasks.insert(task_id);
494                            
495                            // 添加到 futures 中
496                            let future: BoxFuture<'static, (TaskId, Result<(), tokio::sync::oneshot::error::RecvError>)> = Box::pin(async move {
497                                (task_id, completion_rx.0.await)
498                            });
499                            futures.push(future);
500                        }
501                        ServiceCommand::RemoveTasks { task_ids } => {
502                            // 批量从活跃任务集合中移除任务
503                            // 用于直接取消后的清理工作
504                            for task_id in task_ids {
505                                self.active_tasks.remove(&task_id);
506                            }
507                        }
508                        ServiceCommand::Shutdown => {
509                            break;
510                        }
511                    }
512                }
513                
514                // 如果没有任何 future 且命令通道已关闭,退出循环
515                else => {
516                    break;
517                }
518            }
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::TimerWheel;
527    use std::sync::atomic::{AtomicU32, Ordering};
528    use std::sync::Arc;
529    use std::time::Duration;
530
531    #[tokio::test]
532    async fn test_service_creation() {
533        let timer = TimerWheel::with_defaults();
534        let _service = timer.create_service();
535    }
536
537
538    #[tokio::test]
539    async fn test_add_timer_handle_and_receive_timeout() {
540        let timer = TimerWheel::with_defaults();
541        let mut service = timer.create_service();
542
543        // 创建单个定时器
544        let handle = timer.schedule_once(Duration::from_millis(50), || async {}).await;
545        let task_id = handle.task_id();
546
547        // 添加到 service
548        service.add_timer_handle(handle).await;
549
550        // 接收超时通知
551        let mut rx = service.take_receiver().unwrap();
552        let received_task_id = tokio::time::timeout(Duration::from_millis(200), rx.recv())
553            .await
554            .expect("Should receive timeout notification")
555            .expect("Should receive Some value");
556
557        assert_eq!(received_task_id, task_id);
558    }
559
560
561    #[tokio::test]
562    async fn test_shutdown() {
563        let timer = TimerWheel::with_defaults();
564        let service = timer.create_service();
565
566        // 添加一些定时器
567        let _task_id1 = service.schedule_once(Duration::from_secs(10), || async {}).await;
568        let _task_id2 = service.schedule_once(Duration::from_secs(10), || async {}).await;
569
570        // 立即关闭(不等待定时器触发)
571        service.shutdown().await;
572    }
573
574
575
576    #[tokio::test]
577    async fn test_cancel_task() {
578        let timer = TimerWheel::with_defaults();
579        let service = timer.create_service();
580
581        // 添加一个长时间的定时器
582        let handle = timer.schedule_once(Duration::from_secs(10), || async {}).await;
583        let task_id = handle.task_id();
584        
585        service.add_timer_handle(handle).await;
586
587        // 取消任务
588        let cancelled = service.cancel_task(task_id).await;
589        assert!(cancelled, "Task should be cancelled successfully");
590
591        // 尝试再次取消同一个任务,应该返回 false
592        let cancelled_again = service.cancel_task(task_id).await;
593        assert!(!cancelled_again, "Task should not exist anymore");
594    }
595
596    #[tokio::test]
597    async fn test_cancel_nonexistent_task() {
598        let timer = TimerWheel::with_defaults();
599        let service = timer.create_service();
600
601        // 添加一个定时器以初始化 service
602        let handle = timer.schedule_once(Duration::from_millis(50), || async {}).await;
603        service.add_timer_handle(handle).await;
604
605        // 尝试取消一个不存在的任务
606        let fake_task_id = TaskId::new();
607        let cancelled = service.cancel_task(fake_task_id).await;
608        assert!(!cancelled, "Nonexistent task should not be cancelled");
609    }
610
611
612    #[tokio::test]
613    async fn test_task_timeout_cleans_up_task_sender() {
614        let timer = TimerWheel::with_defaults();
615        let mut service = timer.create_service();
616
617        // 添加一个短时间的定时器
618        let handle = timer.schedule_once(Duration::from_millis(50), || async {}).await;
619        let task_id = handle.task_id();
620        
621        service.add_timer_handle(handle).await;
622
623        // 等待任务超时
624        let mut rx = service.take_receiver().unwrap();
625        let received_task_id = tokio::time::timeout(Duration::from_millis(200), rx.recv())
626            .await
627            .expect("Should receive timeout notification")
628            .expect("Should receive Some value");
629        
630        assert_eq!(received_task_id, task_id);
631
632        // 等待一下确保内部清理完成
633        tokio::time::sleep(Duration::from_millis(10)).await;
634
635        // 尝试取消已经超时的任务,应该返回 false
636        let cancelled = service.cancel_task(task_id).await;
637        assert!(!cancelled, "Timed out task should not exist anymore");
638    }
639
640    #[tokio::test]
641    async fn test_cancel_task_spawns_background_task() {
642        let timer = TimerWheel::with_defaults();
643        let service = timer.create_service();
644        let counter = Arc::new(AtomicU32::new(0));
645
646        // 创建一个定时器
647        let counter_clone = Arc::clone(&counter);
648        let handle = timer.schedule_once(
649            Duration::from_secs(10),
650            move || {
651                let counter = Arc::clone(&counter_clone);
652                async move {
653                    counter.fetch_add(1, Ordering::SeqCst);
654                }
655            },
656        ).await;
657        let task_id = handle.task_id();
658        
659        service.add_timer_handle(handle).await;
660
661        // 使用 cancel_task(会等待结果,但在后台协程中处理)
662        let cancelled = service.cancel_task(task_id).await;
663        assert!(cancelled, "Task should be cancelled successfully");
664
665        // 等待足够长时间确保回调不会被执行
666        tokio::time::sleep(Duration::from_millis(100)).await;
667        assert_eq!(counter.load(Ordering::SeqCst), 0, "Callback should not have been executed");
668
669        // 验证任务已从 active_tasks 中移除
670        let cancelled_again = service.cancel_task(task_id).await;
671        assert!(!cancelled_again, "Task should have been removed from active_tasks");
672    }
673
674    #[tokio::test]
675    async fn test_schedule_once_direct() {
676        let timer = TimerWheel::with_defaults();
677        let mut service = timer.create_service();
678        let counter = Arc::new(AtomicU32::new(0));
679
680        // 直接通过 service 调度定时器
681        let counter_clone = Arc::clone(&counter);
682        let task_id = service.schedule_once(
683            Duration::from_millis(50),
684            move || {
685                let counter = Arc::clone(&counter_clone);
686                async move {
687                    counter.fetch_add(1, Ordering::SeqCst);
688                }
689            },
690        ).await;
691
692        // 等待定时器触发
693        let mut rx = service.take_receiver().unwrap();
694        let received_task_id = tokio::time::timeout(Duration::from_millis(200), rx.recv())
695            .await
696            .expect("Should receive timeout notification")
697            .expect("Should receive Some value");
698
699        assert_eq!(received_task_id, task_id);
700        
701        // 等待回调执行
702        tokio::time::sleep(Duration::from_millis(50)).await;
703        assert_eq!(counter.load(Ordering::SeqCst), 1);
704    }
705
706    #[tokio::test]
707    async fn test_schedule_once_batch_direct() {
708        let timer = TimerWheel::with_defaults();
709        let mut service = timer.create_service();
710        let counter = Arc::new(AtomicU32::new(0));
711
712        // 直接通过 service 批量调度定时器
713        let callbacks: Vec<_> = (0..3)
714            .map(|_| {
715                let counter = Arc::clone(&counter);
716                (Duration::from_millis(50), move || {
717                    let counter = Arc::clone(&counter);
718                    async move {
719                        counter.fetch_add(1, Ordering::SeqCst);
720                    }
721                })
722            })
723            .collect();
724
725        let task_ids = service.schedule_once_batch(callbacks).await;
726        assert_eq!(task_ids.len(), 3);
727
728        // 接收所有超时通知
729        let mut received_count = 0;
730        let mut rx = service.take_receiver().unwrap();
731        
732        while received_count < 3 {
733            match tokio::time::timeout(Duration::from_millis(200), rx.recv()).await {
734                Ok(Some(_task_id)) => {
735                    received_count += 1;
736                }
737                Ok(None) => break,
738                Err(_) => break,
739            }
740        }
741
742        assert_eq!(received_count, 3);
743        
744        // 等待回调执行
745        tokio::time::sleep(Duration::from_millis(50)).await;
746        assert_eq!(counter.load(Ordering::SeqCst), 3);
747    }
748
749    #[tokio::test]
750    async fn test_schedule_once_notify_direct() {
751        let timer = TimerWheel::with_defaults();
752        let mut service = timer.create_service();
753
754        // 直接通过 service 调度仅通知的定时器
755        let task_id = service.schedule_once_notify(Duration::from_millis(50)).await;
756
757        // 接收超时通知
758        let mut rx = service.take_receiver().unwrap();
759        let received_task_id = tokio::time::timeout(Duration::from_millis(200), rx.recv())
760            .await
761            .expect("Should receive timeout notification")
762            .expect("Should receive Some value");
763
764        assert_eq!(received_task_id, task_id);
765    }
766
767    #[tokio::test]
768    async fn test_schedule_and_cancel_direct() {
769        let timer = TimerWheel::with_defaults();
770        let service = timer.create_service();
771        let counter = Arc::new(AtomicU32::new(0));
772
773        // 直接调度定时器
774        let counter_clone = Arc::clone(&counter);
775        let task_id = service.schedule_once(
776            Duration::from_secs(10),
777            move || {
778                let counter = Arc::clone(&counter_clone);
779                async move {
780                    counter.fetch_add(1, Ordering::SeqCst);
781                }
782            },
783        ).await;
784
785        // 立即取消
786        let cancelled = service.cancel_task(task_id).await;
787        assert!(cancelled, "Task should be cancelled successfully");
788
789        // 等待确保回调不会执行
790        tokio::time::sleep(Duration::from_millis(100)).await;
791        assert_eq!(counter.load(Ordering::SeqCst), 0, "Callback should not have been executed");
792    }
793
794    #[tokio::test]
795    async fn test_cancel_batch_direct() {
796        let timer = TimerWheel::with_defaults();
797        let service = timer.create_service();
798        let counter = Arc::new(AtomicU32::new(0));
799
800        // 批量调度定时器
801        let callbacks: Vec<_> = (0..10)
802            .map(|_| {
803                let counter = Arc::clone(&counter);
804                (Duration::from_secs(10), move || {
805                    let counter = Arc::clone(&counter);
806                    async move {
807                        counter.fetch_add(1, Ordering::SeqCst);
808                    }
809                })
810            })
811            .collect();
812
813        let task_ids = service.schedule_once_batch(callbacks).await;
814        assert_eq!(task_ids.len(), 10);
815
816        // 批量取消所有任务
817        let cancelled = service.cancel_batch(&task_ids).await;
818        assert_eq!(cancelled, 10, "All 10 tasks should be cancelled");
819
820        // 等待确保回调不会执行
821        tokio::time::sleep(Duration::from_millis(100)).await;
822        assert_eq!(counter.load(Ordering::SeqCst), 0, "No callbacks should have been executed");
823    }
824
825    #[tokio::test]
826    async fn test_cancel_batch_partial() {
827        let timer = TimerWheel::with_defaults();
828        let service = timer.create_service();
829        let counter = Arc::new(AtomicU32::new(0));
830
831        // 批量调度定时器
832        let callbacks: Vec<_> = (0..10)
833            .map(|_| {
834                let counter = Arc::clone(&counter);
835                (Duration::from_secs(10), move || {
836                    let counter = Arc::clone(&counter);
837                    async move {
838                        counter.fetch_add(1, Ordering::SeqCst);
839                    }
840                })
841            })
842            .collect();
843
844        let task_ids = service.schedule_once_batch(callbacks).await;
845
846        // 只取消前5个任务
847        let to_cancel: Vec<_> = task_ids.iter().take(5).copied().collect();
848        let cancelled = service.cancel_batch(&to_cancel).await;
849        assert_eq!(cancelled, 5, "5 tasks should be cancelled");
850
851        // 等待确保前5个回调不会执行
852        tokio::time::sleep(Duration::from_millis(100)).await;
853        assert_eq!(counter.load(Ordering::SeqCst), 0, "Cancelled tasks should not execute");
854    }
855
856    #[tokio::test]
857    async fn test_cancel_batch_empty() {
858        let timer = TimerWheel::with_defaults();
859        let service = timer.create_service();
860
861        // 取消空列表
862        let empty: Vec<TaskId> = vec![];
863        let cancelled = service.cancel_batch(&empty).await;
864        assert_eq!(cancelled, 0, "No tasks should be cancelled");
865    }
866}
867