kestrel_timer/
service.rs

1use crate::config::ServiceConfig;
2use crate::error::TimerError;
3use crate::task::{CallbackWrapper, CompletionReceiver, TaskCompletion, TaskId};
4use crate::wheel::Wheel;
5use crate::{BatchHandle, TimerHandle};
6use futures::future::BoxFuture;
7use futures::stream::{FuturesUnordered, StreamExt};
8use lite_sync::{
9    oneshot::lite::{Receiver, Sender, channel},
10    spsc,
11};
12use parking_lot::Mutex;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::task::JoinHandle;
16
17/// Task notification type for distinguishing between one-shot and periodic tasks
18///
19/// 任务通知类型,用于区分一次性任务和周期性任务
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TaskNotification {
22    /// One-shot task expired notification
23    ///
24    /// 一次性任务过期通知
25    OneShot(TaskId),
26    /// Periodic task called notification
27    ///
28    /// 周期性任务被调用通知
29    Periodic(TaskId),
30}
31
32impl TaskNotification {
33    /// Get the task ID from the notification
34    ///
35    /// 从通知中获取任务 ID
36    pub fn task_id(&self) -> TaskId {
37        match self {
38            TaskNotification::OneShot(id) => *id,
39            TaskNotification::Periodic(id) => *id,
40        }
41    }
42
43    /// Check if this is a one-shot task notification
44    ///
45    /// 检查是否为一次性任务通知
46    pub fn is_oneshot(&self) -> bool {
47        matches!(self, TaskNotification::OneShot(_))
48    }
49
50    /// Check if this is a periodic task notification
51    ///
52    /// 检查是否为周期性任务通知
53    pub fn is_periodic(&self) -> bool {
54        matches!(self, TaskNotification::Periodic(_))
55    }
56}
57
58/// Service command type
59///
60/// 服务命令类型
61enum ServiceCommand {
62    /// Add batch timer handle, only contains necessary data: task_ids and completion_rxs
63    ///
64    /// 添加批量定时器句柄,仅包含必要数据:task_ids 和 completion_rxs
65    AddBatchHandle {
66        task_ids: Vec<TaskId>,
67        completion_rxs: Vec<CompletionReceiver>,
68    },
69    /// Add single timer handle, only contains necessary data: task_id and completion_rx
70    ///
71    /// 添加单个定时器句柄,仅包含必要数据:task_id 和 completion_rx
72    AddTimerHandle {
73        task_id: TaskId,
74        completion_rx: CompletionReceiver,
75    },
76}
77
78/// TimerService - timer service based on Actor pattern
79/// Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.
80/// # Features
81/// - Automatically listens to all added timer handles' timeout events
82/// - Automatically removes one-shot tasks from internal management after timeout
83/// - Continuously monitors periodic tasks and forwards each invocation
84/// - Aggregates notifications (both one-shot and periodic) to be forwarded to the user's unified channel
85/// - Supports dynamic addition of BatchHandle and TimerHandle
86///
87///
88/// # 定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户
89/// - 自动监听所有添加的定时器句柄的超时事件
90/// - 自动在一次性任务超时后从内部管理中移除任务
91/// - 持续监听周期性任务并转发每次调用通知
92/// - 将通知(一次性和周期性)聚合转发给用户
93/// - 支持动态添加 BatchHandle 和 TimerHandle
94///
95/// # Examples (示例)
96/// ```no_run
97/// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
98/// use std::time::Duration;
99///
100/// #[tokio::main]
101/// async fn main() {
102///     let timer = TimerWheel::with_defaults();
103///     let mut service = timer.create_service(ServiceConfig::default());
104///     
105///     // Register one-shot tasks (注册一次性任务)
106///     use kestrel_timer::TimerTask;
107///     let handles = service.allocate_handles(3);
108///     let tasks: Vec<_> = (0..3)
109///         .map(|i| {
110///             let callback = Some(CallbackWrapper::new(move || async move {
111///                 println!("One-shot timer {} fired!", i);
112///             }));
113///             TimerTask::new_oneshot(Duration::from_millis(100), callback)
114///         })
115///         .collect();
116///     service.register_batch(handles, tasks).unwrap();
117///     
118///     // Register periodic tasks (注册周期性任务)
119///     let handle = service.allocate_handle();
120///     let periodic_task = TimerTask::new_periodic(
121///         Duration::from_millis(100),
122///         Duration::from_millis(50),
123///         Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
124///         None
125///     );
126///     service.register(handle, periodic_task).unwrap();
127///     
128///     // Receive notifications (接收通知)
129///     let rx = service.take_receiver().unwrap();
130///     while let Some(notification) = rx.recv().await {
131///         match notification {
132///             TaskNotification::OneShot(task_id) => {
133///                 println!("One-shot task {:?} expired", task_id);
134///             }
135///             TaskNotification::Periodic(task_id) => {
136///                 println!("Periodic task {:?} called", task_id);
137///             }
138///         }
139///     }
140/// }
141/// ```
142pub struct TimerService {
143    /// Command sender
144    ///
145    /// 命令发送器
146    command_tx: spsc::Sender<ServiceCommand, 32>,
147    /// Timeout receiver (supports both one-shot and periodic task notifications)
148    ///
149    /// 超时接收器(支持一次性和周期性任务通知)
150    timeout_rx: Option<spsc::Receiver<TaskNotification, 32>>,
151    /// Actor task handle
152    ///
153    /// Actor 任务句柄
154    actor_handle: Option<JoinHandle<()>>,
155    /// Timing wheel reference (for direct scheduling of timers)
156    ///
157    /// 时间轮引用(用于直接调度定时器)
158    wheel: Arc<Mutex<Wheel>>,
159    /// Actor shutdown signal sender
160    ///
161    /// Actor 关闭信号发送器
162    shutdown_tx: Option<Sender<()>>,
163}
164
165impl TimerService {
166    /// Allocate a handle from DeferredMap
167    ///
168    /// # Returns
169    /// A unique handle for later insertion
170    ///
171    /// # 返回值
172    /// 用于后续插入的唯一 handle
173    ///
174    /// # Examples (示例)
175    /// ```no_run
176    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
177    /// # #[tokio::main]
178    /// # async fn main() {
179    /// let timer = TimerWheel::with_defaults();
180    /// let mut service = timer.create_service(ServiceConfig::default());
181    ///
182    /// // Allocate handle first    
183    /// // 先分配handle
184    /// let handle = service.allocate_handle();
185    /// # }
186    /// ```
187    pub fn allocate_handle(&self) -> crate::task::TaskHandle {
188        self.wheel.lock().allocate_handle()
189    }
190
191    /// Batch allocate handles from DeferredMap
192    ///
193    /// # Parameters
194    /// - `count`: Number of handles to allocate
195    ///
196    /// # Returns
197    /// Vector of unique handles for later batch insertion
198    ///
199    /// # 参数
200    /// - `count`: 要分配的 handle 数量
201    ///
202    /// # 返回值
203    /// 用于后续批量插入的唯一 handles 向量
204    ///
205    /// # Examples (示例)
206    /// ```no_run
207    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
208    /// # #[tokio::main]
209    /// # async fn main() {
210    /// let timer = TimerWheel::with_defaults();
211    /// let service = timer.create_service(ServiceConfig::default());
212    ///
213    /// // Batch allocate handles
214    /// // 批量分配 handles
215    /// let handles = service.allocate_handles(10);
216    /// assert_eq!(handles.len(), 10);
217    /// # }
218    /// ```
219    pub fn allocate_handles(&self, count: usize) -> Vec<crate::task::TaskHandle> {
220        self.wheel.lock().allocate_handles(count)
221    }
222
223    /// Create new TimerService
224    ///
225    /// # Parameters
226    /// - `wheel`: Timing wheel reference
227    /// - `config`: Service configuration
228    ///
229    /// # Notes
230    /// Typically not called directly, but used to create through `TimerWheel::create_service()`
231    ///
232    /// 创建新的 TimerService
233    ///
234    /// # 参数
235    /// - `wheel`: 时间轮引用
236    /// - `config`: 服务配置
237    ///
238    /// # 注意
239    /// 通常不直接调用,而是通过 `TimerWheel::create_service()` 创建
240    ///
241    pub(crate) fn new(wheel: Arc<Mutex<Wheel>>, config: ServiceConfig) -> Self {
242        let (command_tx, command_rx) = spsc::channel(config.command_channel_capacity);
243        let (timeout_tx, timeout_rx) = spsc::channel(config.timeout_channel_capacity);
244
245        let (shutdown_tx, shutdown_rx) = channel::<()>();
246        let actor = ServiceActor::new(command_rx, timeout_tx, shutdown_rx);
247        let actor_handle = tokio::spawn(async move {
248            actor.run().await;
249        });
250
251        Self {
252            command_tx,
253            timeout_rx: Some(timeout_rx),
254            actor_handle: Some(actor_handle),
255            wheel,
256            shutdown_tx: Some(shutdown_tx),
257        }
258    }
259
260    /// Get timeout receiver (transfer ownership)
261    ///
262    /// # Returns
263    /// Timeout notification receiver, if already taken, returns None
264    ///
265    /// # Notes
266    /// This method can only be called once, because it transfers ownership of the receiver
267    /// The receiver will receive both one-shot task expired notifications and periodic task called notifications
268    ///
269    /// 获取超时通知接收器(转移所有权)
270    ///
271    /// # 返回值
272    /// 超时通知接收器,如果已经取走,返回 None
273    ///
274    /// # 注意
275    /// 此方法只能调用一次,因为它转移了接收器的所有权
276    /// 接收器将接收一次性任务过期通知和周期性任务被调用通知
277    ///
278    /// # Examples (示例)
279    /// ```no_run
280    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig, TaskNotification};
281    /// # #[tokio::main]
282    /// # async fn main() {
283    /// let timer = TimerWheel::with_defaults();
284    /// let mut service = timer.create_service(ServiceConfig::default());
285    ///
286    /// let rx = service.take_receiver().unwrap();
287    /// while let Some(notification) = rx.recv().await {
288    ///     match notification {
289    ///         TaskNotification::OneShot(task_id) => {
290    ///             println!("One-shot task {:?} expired", task_id);
291    ///         }
292    ///         TaskNotification::Periodic(task_id) => {
293    ///             println!("Periodic task {:?} called", task_id);
294    ///         }
295    ///     }
296    /// }
297    /// # }
298    /// ```
299    pub fn take_receiver(&mut self) -> Option<spsc::Receiver<TaskNotification, 32>> {
300        self.timeout_rx.take()
301    }
302
303    /// Cancel specified task
304    ///
305    /// # Parameters
306    /// - `task_id`: Task ID to cancel
307    ///
308    /// # Returns
309    /// - `true`: Task exists and cancellation is successful
310    /// - `false`: Task does not exist or cancellation fails
311    ///
312    /// 取消指定任务
313    ///
314    /// # 参数
315    /// - `task_id`: 任务 ID
316    ///
317    /// # 返回值
318    /// - `true`: 任务存在且取消成功
319    /// - `false`: 任务不存在或取消失败
320    ///
321    /// # Examples (示例)
322    /// ```no_run
323    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
324    /// # use std::time::Duration;
325    /// #
326    /// # #[tokio::main]
327    /// # async fn main() {
328    /// let timer = TimerWheel::with_defaults();
329    /// let service = timer.create_service(ServiceConfig::default());
330    ///
331    /// // Use two-step API to schedule timers
332    /// let handle = service.allocate_handle();
333    /// let task_id = handle.task_id();
334    /// let callback = Some(CallbackWrapper::new(|| async move {
335    ///     println!("Timer fired!"); // 定时器触发
336    /// }));
337    /// let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
338    /// service.register(handle, task).unwrap(); // 注册定时器
339    ///
340    /// // Cancel task
341    /// let cancelled = service.cancel_task(task_id);
342    /// println!("Task cancelled: {}", cancelled); // 任务取消
343    /// # }
344    /// ```
345    #[inline]
346    pub fn cancel_task(&self, task_id: TaskId) -> bool {
347        // Direct cancellation, no need to notify Actor
348        // FuturesUnordered will automatically clean up when tasks are cancelled
349        // 直接取消,无需通知 Actor
350        // FuturesUnordered 将在任务取消时自动清理
351        let mut wheel = self.wheel.lock();
352        wheel.cancel(task_id)
353    }
354
355    /// Batch cancel tasks
356    ///
357    /// Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
358    ///
359    /// # Parameters
360    /// - `task_ids`: List of task IDs to cancel
361    ///
362    /// # Returns
363    /// Number of successfully cancelled tasks
364    ///
365    /// 批量取消任务
366    ///
367    /// # 参数
368    /// - `task_ids`: 任务 ID 列表
369    ///
370    /// # 返回值
371    /// 成功取消的任务数量
372    ///
373    /// # Examples (示例)
374    /// ```no_run
375    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
376    /// # use std::time::Duration;
377    /// #
378    /// # #[tokio::main]
379    /// # async fn main() {
380    /// let timer = TimerWheel::with_defaults();
381    /// let service = timer.create_service(ServiceConfig::default());
382    ///
383    /// let handles = service.allocate_handles(10);
384    /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
385    /// let tasks: Vec<_> = (0..10)
386    ///     .map(|i| {
387    ///         let callback = Some(CallbackWrapper::new(move || async move {
388    ///             println!("Timer {} fired!", i); // 定时器触发
389    ///         }));
390    ///         TimerTask::new_oneshot(Duration::from_secs(10), callback)
391    ///     })
392    ///     .collect();
393    /// service.register_batch(handles, tasks).unwrap(); // 注册定时器
394    ///
395    /// // Batch cancel
396    /// let cancelled = service.cancel_batch(&task_ids);
397    /// println!("Cancelled {} tasks", cancelled); // 任务取消
398    /// # }
399    /// ```
400    #[inline]
401    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
402        if task_ids.is_empty() {
403            return 0;
404        }
405
406        // Direct batch cancellation, no need to notify Actor
407        // FuturesUnordered will automatically clean up when tasks are cancelled
408        // 直接批量取消,无需通知 Actor
409        // FuturesUnordered 将在任务取消时自动清理
410        let mut wheel = self.wheel.lock();
411        wheel.cancel_batch(task_ids)
412    }
413
414    /// Postpone task (optionally replace callback)
415    ///
416    /// # Parameters
417    /// - `task_id`: Task ID to postpone
418    /// - `new_delay`: New delay time (recalculated from current time point)
419    /// - `callback`: New callback function (if `None`, keeps the original callback)
420    ///
421    /// # Returns
422    /// - `true`: Task exists and is successfully postponed
423    /// - `false`: Task does not exist or postponement fails
424    ///
425    /// # Notes
426    /// - Task ID remains unchanged after postponement
427    /// - Original timeout notification remains valid
428    /// - If callback is `Some`, it will replace the original callback
429    /// - If callback is `None`, the original callback is preserved
430    ///
431    /// 推迟任务 (可选替换回调)
432    ///
433    /// # 参数
434    /// - `task_id`: 任务 ID
435    /// - `new_delay`: 新的延迟时间 (从当前时间点重新计算)
436    /// - `callback`: 新的回调函数 (如果为 `None`,则保留原回调)
437    ///
438    /// # 返回值
439    /// - `true`: 任务存在且延期成功
440    /// - `false`: 任务不存在或延期失败
441    ///
442    /// # 注意
443    /// - 任务 ID 在延期后保持不变
444    /// - 原始超时通知保持有效
445    /// - 如果 callback 为 `Some`,将替换原始回调
446    /// - 如果 callback 为 `None`,保留原始回调
447    ///
448    /// # Examples (示例)
449    /// ```no_run
450    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
451    /// # use std::time::Duration;
452    /// #
453    /// # #[tokio::main]
454    /// # async fn main() {
455    /// let timer = TimerWheel::with_defaults();
456    /// let service = timer.create_service(ServiceConfig::default());
457    ///
458    /// let handle = service.allocate_handle();
459    /// let task_id = handle.task_id();
460    /// let callback = Some(CallbackWrapper::new(|| async {
461    ///     println!("Original callback"); // 原始回调
462    /// }));
463    /// let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
464    /// service.register(handle, task).unwrap(); // 注册定时器
465    ///
466    /// // Postpone and replace callback (延期并替换回调)
467    /// let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
468    /// let success = service.postpone(
469    ///     task_id,
470    ///     Duration::from_secs(10),
471    ///     new_callback
472    /// );
473    /// println!("Postponed successfully: {}", success);
474    /// # }
475    /// ```
476    #[inline]
477    pub fn postpone(
478        &self,
479        task_id: TaskId,
480        new_delay: Duration,
481        callback: Option<CallbackWrapper>,
482    ) -> bool {
483        let mut wheel = self.wheel.lock();
484        wheel.postpone(task_id, new_delay, callback)
485    }
486
487    /// Batch postpone tasks (keep original callbacks)
488    ///
489    /// # Parameters
490    /// - `updates`: List of tuples of (task ID, new delay)
491    ///
492    /// # Returns
493    /// Number of successfully postponed tasks
494    ///
495    /// 批量延期任务 (保持原始回调)
496    ///
497    /// # 参数
498    /// - `updates`: (任务 ID, 新延迟) 元组列表
499    ///
500    /// # 返回值
501    /// 成功延期的任务数量
502    ///
503    /// # Examples (示例)
504    /// ```no_run
505    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
506    /// # use std::time::Duration;
507    /// #
508    /// # #[tokio::main]
509    /// # async fn main() {
510    /// let timer = TimerWheel::with_defaults();
511    /// let service = timer.create_service(ServiceConfig::default());
512    ///
513    /// let handles = service.allocate_handles(3);
514    /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
515    /// let tasks: Vec<_> = (0..3)
516    ///     .map(|i| {
517    ///         let callback = Some(CallbackWrapper::new(move || async move {
518    ///             println!("Timer {} fired!", i);
519    ///         }));
520    ///         TimerTask::new_oneshot(Duration::from_secs(5), callback)
521    ///     })
522    ///     .collect();
523    /// service.register_batch(handles, tasks).unwrap();
524    ///
525    /// // Batch postpone (keep original callbacks)
526    /// // 批量延期任务 (保持原始回调)
527    /// let updates: Vec<_> = task_ids
528    ///     .into_iter()
529    ///     .map(|id| (id, Duration::from_secs(10)))
530    ///     .collect();
531    /// let postponed = service.postpone_batch(updates);
532    /// println!("Postponed {} tasks", postponed);
533    /// # }
534    /// ```
535    #[inline]
536    pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
537        if updates.is_empty() {
538            return 0;
539        }
540
541        let mut wheel = self.wheel.lock();
542        wheel.postpone_batch(updates)
543    }
544
545    /// Batch postpone tasks (replace callbacks)
546    ///
547    /// # Parameters
548    /// - `updates`: List of tuples of (task ID, new delay, new callback)
549    ///
550    /// # Returns
551    /// Number of successfully postponed tasks
552    ///
553    /// 批量延期任务 (替换回调)
554    ///
555    /// # 参数
556    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
557    ///
558    /// # 返回值
559    /// 成功延期的任务数量
560    ///
561    /// # Examples (示例)
562    /// ```no_run
563    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
564    /// # use std::time::Duration;
565    /// #
566    /// # #[tokio::main]
567    /// # async fn main() {
568    /// # use kestrel_timer::TimerTask;
569    /// let timer = TimerWheel::with_defaults();
570    /// let service = timer.create_service(ServiceConfig::default());
571    ///
572    /// // Create 3 tasks, initially no callbacks
573    /// // 创建 3 个任务,最初没有回调
574    /// let handles = service.allocate_handles(3);
575    /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
576    /// let tasks: Vec<_> = (0..3)
577    ///     .map(|_| {
578    ///         TimerTask::new_oneshot(Duration::from_secs(5), None)
579    ///     })
580    ///     .collect();
581    /// service.register_batch(handles, tasks).unwrap();
582    ///
583    /// // Batch postpone and add new callbacks
584    /// // 批量延期并添加新的回调
585    /// let updates: Vec<_> = task_ids
586    ///     .into_iter()
587    ///     .enumerate()
588    ///     .map(|(i, id)| {
589    ///         let callback = Some(CallbackWrapper::new(move || async move {
590    ///             println!("New callback {}", i);
591    ///         }));
592    ///         (id, Duration::from_secs(10), callback)
593    ///     })
594    ///     .collect();
595    /// let postponed = service.postpone_batch_with_callbacks(updates);
596    /// println!("Postponed {} tasks", postponed);
597    /// # }
598    /// ```
599    #[inline]
600    pub fn postpone_batch_with_callbacks(
601        &self,
602        updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
603    ) -> usize {
604        if updates.is_empty() {
605            return 0;
606        }
607
608        let mut wheel = self.wheel.lock();
609        wheel.postpone_batch_with_callbacks(updates)
610    }
611
612    /// Register timer task to service (registration phase)
613    ///
614    /// # Parameters
615    /// - `handle`: Handle allocated via `allocate_handle()`
616    /// - `task`: Task created via `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
617    ///
618    /// # Returns
619    /// - `Ok(TimerHandle)`: Register successfully
620    /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
621    ///
622    /// 注册定时器任务到服务 (注册阶段)
623    /// # 参数
624    /// - `handle`: 通过 `allocate_handle()` 分配的 handle
625    /// - `task`: 通过 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建的任务
626    ///
627    /// # 返回值
628    /// - `Ok(TimerHandle)`: 注册成功
629    /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关闭)
630    ///
631    /// # Examples (示例)
632    /// ```no_run
633    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
634    /// # use std::time::Duration;
635    /// #
636    /// # #[tokio::main]
637    /// # async fn main() {
638    /// let timer = TimerWheel::with_defaults();
639    /// let service = timer.create_service(ServiceConfig::default());
640    ///
641    /// // Step 1: allocate handle
642    /// // 分配 handle
643    /// let handle = service.allocate_handle();
644    /// let task_id = handle.task_id();
645    ///
646    /// // Step 2: create task
647    /// // 创建任务
648    /// let callback = Some(CallbackWrapper::new(|| async move {
649    ///     println!("Timer fired!");
650    /// }));
651    /// let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
652    ///
653    /// // Step 3: register task
654    /// // 注册任务
655    /// service.register(handle, task).unwrap();
656    /// # }
657    /// ```
658    #[inline]
659    pub fn register(
660        &self,
661        handle: crate::task::TaskHandle,
662        task: crate::task::TimerTask,
663    ) -> Result<TimerHandle, TimerError> {
664        let task_id = handle.task_id();
665
666        let (task, completion_rx) =
667            crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
668
669        // Single lock, complete all operations
670        // 单次锁定,完成所有操作
671        {
672            let mut wheel_guard = self.wheel.lock();
673            wheel_guard.insert(handle, task);
674        }
675
676        // Add to service management (only send necessary data)
677        // 添加到服务管理(只发送必要数据)
678        self.command_tx
679            .try_send(ServiceCommand::AddTimerHandle {
680                task_id,
681                completion_rx,
682            })
683            .map_err(|_| TimerError::RegisterFailed)?;
684
685        Ok(TimerHandle::new(task_id, self.wheel.clone()))
686    }
687
688    /// Batch register timer tasks to service (registration phase)
689    ///
690    /// # Parameters
691    /// - `handles`: Pre-allocated handles for tasks
692    /// - `tasks`: List of timer tasks
693    ///
694    /// # Returns
695    /// - `Ok(BatchHandle)`: Register successfully
696    /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
697    /// - `Err(TimerError::BatchLengthMismatch)`: Handles and tasks lengths don't match
698    ///
699    /// 批量注册定时器任务到服务 (注册阶段)
700    /// # 参数
701    /// - `handles`: 任务的预分配 handles
702    /// - `tasks`: 定时器任务列表
703    ///
704    /// # 返回值
705    /// - `Ok(BatchHandle)`: 注册成功
706    /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关闭)
707    /// - `Err(TimerError::BatchLengthMismatch)`: handles 和 tasks 长度不匹配
708    ///
709    /// # Examples (示例)
710    /// ```no_run
711    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
712    /// # use std::time::Duration;
713    /// #
714    /// # #[tokio::main]
715    /// # async fn main() {
716    /// # use kestrel_timer::TimerTask;
717    /// let timer = TimerWheel::with_defaults();
718    /// let service = timer.create_service(ServiceConfig::default());
719    ///
720    /// // Step 1: batch allocate handles
721    /// // 批量分配 handles
722    /// let handles = service.allocate_handles(3);
723    ///
724    /// // Step 2: create tasks
725    /// // 创建任务
726    /// let tasks: Vec<_> = (0..3)
727    ///     .map(|i| {
728    ///         let callback = Some(CallbackWrapper::new(move || async move {
729    ///             println!("Timer {} fired!", i);
730    ///         }));
731    ///         TimerTask::new_oneshot(Duration::from_secs(1), callback)
732    ///     })
733    ///     .collect();
734    ///
735    /// // Step 3: register batch
736    /// // 注册批量任务
737    /// service.register_batch(handles, tasks).unwrap();
738    /// # }
739    /// ```
740    #[inline]
741    pub fn register_batch(
742        &self,
743        handles: Vec<crate::task::TaskHandle>,
744        tasks: Vec<crate::task::TimerTask>,
745    ) -> Result<BatchHandle, TimerError> {
746        // Validate lengths match
747        if handles.len() != tasks.len() {
748            return Err(TimerError::BatchLengthMismatch {
749                handles_len: handles.len(),
750                tasks_len: tasks.len(),
751            });
752        }
753
754        let task_count = tasks.len();
755        let mut completion_rxs = Vec::with_capacity(task_count);
756        let mut task_ids = Vec::with_capacity(task_count);
757        let mut prepared_handles = Vec::with_capacity(task_count);
758        let mut prepared_tasks = Vec::with_capacity(task_count);
759
760        // Step 1: prepare all channels and notifiers (no lock)
761        // 步骤 1: 准备所有通道和通知器(无锁)
762        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
763            let task_id = handle.task_id();
764            let (task, completion_rx) =
765                crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
766            task_ids.push(task_id);
767            completion_rxs.push(completion_rx);
768            prepared_handles.push(handle);
769            prepared_tasks.push(task);
770        }
771
772        // Step 2: single lock, batch insert
773        // 步骤 2: 单次锁定,批量插入
774        {
775            let mut wheel_guard = self.wheel.lock();
776            wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
777        }
778
779        // Add to service management (only send necessary data)
780        // 添加到服务管理(只发送必要数据)
781        self.command_tx
782            .try_send(ServiceCommand::AddBatchHandle {
783                task_ids: task_ids.clone(),
784                completion_rxs,
785            })
786            .map_err(|_| TimerError::RegisterFailed)?;
787
788        Ok(BatchHandle::new(task_ids, self.wheel.clone()))
789    }
790
791    /// Graceful shutdown of TimerService
792    ///
793    /// 优雅关闭 TimerService
794    ///
795    /// # Examples (示例)
796    /// ```no_run
797    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
798    /// # #[tokio::main]
799    /// # async fn main() {
800    /// let timer = TimerWheel::with_defaults();
801    /// let mut service = timer.create_service(ServiceConfig::default());
802    ///
803    /// // Use service... (使用服务...)
804    ///
805    /// service.shutdown().await;
806    /// # }
807    /// ```
808    pub async fn shutdown(mut self) {
809        if let Some(shutdown_tx) = self.shutdown_tx.take() {
810            shutdown_tx.notify(());
811        }
812        if let Some(handle) = self.actor_handle.take() {
813            let _ = handle.await;
814        }
815    }
816}
817
818impl Drop for TimerService {
819    fn drop(&mut self) {
820        if let Some(handle) = self.actor_handle.take() {
821            handle.abort();
822        }
823    }
824}
825
826/// ServiceActor - internal Actor implementation
827///
828/// ServiceActor - 内部 Actor 实现
829struct ServiceActor {
830    /// Command receiver
831    ///
832    /// 命令接收器
833    command_rx: spsc::Receiver<ServiceCommand, 32>,
834    /// Timeout sender (supports both one-shot and periodic task notifications)
835    ///
836    /// 超时发送器(支持一次性和周期性任务通知)
837    timeout_tx: spsc::Sender<TaskNotification, 32>,
838    /// Actor shutdown signal receiver
839    ///
840    /// Actor 关闭信号接收器
841    shutdown_rx: Receiver<()>,
842}
843
844impl ServiceActor {
845    /// Create new ServiceActor
846    ///
847    /// 创建新的 ServiceActor
848    fn new(
849        command_rx: spsc::Receiver<ServiceCommand, 32>,
850        timeout_tx: spsc::Sender<TaskNotification, 32>,
851        shutdown_rx: Receiver<()>,
852    ) -> Self {
853        Self {
854            command_rx,
855            timeout_tx,
856            shutdown_rx,
857        }
858    }
859
860    /// Run Actor event loop
861    ///
862    /// 运行 Actor 事件循环
863    async fn run(self) {
864        // Use separate FuturesUnordered for one-shot and periodic tasks
865        // 为一次性任务和周期性任务使用独立的 FuturesUnordered
866
867        // One-shot futures: each future returns (TaskId, TaskCompletion)
868        // 一次性任务futures:每个future 返回 (TaskId, TaskCompletion)
869        let mut oneshot_futures: FuturesUnordered<BoxFuture<'static, (TaskId, TaskCompletion)>> =
870            FuturesUnordered::new();
871
872        // Periodic futures: each future returns (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
873        // The receiver is returned so we can continue listening for next event
874        // 周期性任务futures:每个future 返回 (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
875        // 返回接收器以便我们可以继续监听下一个事件
876        type PeriodicFutureResult = (
877            TaskId,
878            Option<TaskCompletion>,
879            crate::task::PeriodicCompletionReceiver,
880        );
881        let mut periodic_futures: FuturesUnordered<BoxFuture<'static, PeriodicFutureResult>> =
882            FuturesUnordered::new();
883
884        // Move shutdown_rx out of self, so it can be used in select! with &mut
885        // 将 shutdown_rx 从 self 中移出,以便在 select! 中使用 &mut
886        let mut shutdown_rx = self.shutdown_rx;
887
888        loop {
889            tokio::select! {
890                // Listen to high-priority shutdown signal
891                // 监听高优先级关闭信号
892                _ = &mut shutdown_rx => {
893                    // Receive shutdown signal, exit loop immediately
894                    // 接收到关闭信号,立即退出循环
895                    break;
896                }
897
898                // Listen to one-shot task timeout events
899                // 监听一次性任务超时事件
900                Some((task_id, completion)) = oneshot_futures.next() => {
901                    // Check completion reason, only forward Called events, do not forward Cancelled events
902                    // 检查完成原因,只转发 Called 事件,不转发 Cancelled 事件
903                    if completion == TaskCompletion::Called {
904                        let _ = self.timeout_tx.send(TaskNotification::OneShot(task_id)).await;
905                    }
906                    // Task will be automatically removed from FuturesUnordered
907                    // 任务将自动从 FuturesUnordered 中移除
908                }
909
910                // Listen to periodic task events
911                // 监听周期性任务事件
912                Some((task_id, reason, mut receiver)) = periodic_futures.next() => {
913                    // Check completion reason, only forward Called events, do not forward Cancelled events
914                    // 检查完成原因,只转发 Called 事件,不转发 Cancelled 事件
915                    if let Some(TaskCompletion::Called) = reason {
916                        let _ = self.timeout_tx.send(TaskNotification::Periodic(task_id)).await;
917
918                        // Re-add the receiver to continue listening for next periodic event
919                        // 重新添加接收器以继续监听下一个周期性事件
920                        let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
921                            let reason = receiver.recv().await;
922                            (task_id, reason, receiver)
923                        });
924                        periodic_futures.push(future);
925                    }
926                    // If Cancelled or None, do not re-add the future (task is done)
927                    // 如果 Cancelled 或 None,不重新添加 future(任务结束)
928                }
929
930                // Listen to commands
931                // 监听命令
932                Some(cmd) = self.command_rx.recv() => {
933                    match cmd {
934                        ServiceCommand::AddBatchHandle { task_ids, completion_rxs } => {
935                            // Add all tasks to appropriate futures
936                            // 将所有任务添加到相应 futures
937                            for (task_id, rx) in task_ids.into_iter().zip(completion_rxs.into_iter()) {
938                                match rx {
939                                    crate::task::CompletionReceiver::OneShot(receiver) => {
940                                        let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
941                                            // unwrap() is safe here: the sender is held by the task and will send
942                                            // before being dropped. If the sender is dropped without sending,
943                                            // it's a logic error in the task implementation.
944                                            // unwrap() 在这里是安全的:发送器由任务持有,在被丢弃前会发送。
945                                            // 如果发送器在未发送的情况下被丢弃,这是任务实现中的逻辑错误。
946                                            (task_id, receiver.recv().await.unwrap())
947                                        });
948                                        oneshot_futures.push(future);
949                                    },
950                                    crate::task::CompletionReceiver::Periodic(mut receiver) => {
951                                        let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
952                                            let reason = receiver.recv().await;
953                                            (task_id, reason, receiver)
954                                        });
955                                        periodic_futures.push(future);
956                                    }
957                                }
958                            }
959                        }
960                        ServiceCommand::AddTimerHandle { task_id, completion_rx } => {
961                            // Add to appropriate futures
962                            // 添加到相应的 futures
963                            match completion_rx {
964                                crate::task::CompletionReceiver::OneShot(receiver) => {
965                                    let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
966                                        // unwrap() is safe here: the sender is held by the task and will send
967                                        // before being dropped. If the sender is dropped without sending,
968                                        // it's a logic error in the task implementation.
969                                        // unwrap() 在这里是安全的:发送器由任务持有,在被丢弃前会发送。
970                                        // 如果发送器在未发送的情况下被丢弃,这是任务实现中的逻辑错误。
971                                        (task_id, receiver.recv().await.unwrap())
972                                    });
973                                    oneshot_futures.push(future);
974                                },
975                                crate::task::CompletionReceiver::Periodic(mut receiver) => {
976                                    let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
977                                        let reason = receiver.recv().await;
978                                        (task_id, reason, receiver)
979                                    });
980                                    periodic_futures.push(future);
981                                }
982                            }
983                        }
984                    }
985                }
986
987                // If no futures and command channel is closed, exit loop
988                // 如果没有 futures 且命令通道关闭,退出循环
989                else => {
990                    break;
991                }
992            }
993        }
994    }
995}
996
997#[cfg(test)]
998mod tests {
999    use super::*;
1000    use crate::{TimerTask, TimerWheel};
1001    use std::sync::Arc;
1002    use std::sync::atomic::{AtomicU32, Ordering};
1003    use std::time::Duration;
1004
1005    #[tokio::test]
1006    async fn test_service_creation() {
1007        let timer = TimerWheel::with_defaults();
1008        let _service = timer.create_service(ServiceConfig::default());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_add_timer_handle_and_receive_timeout() {
1013        let timer = TimerWheel::with_defaults();
1014        let mut service = timer.create_service(ServiceConfig::default());
1015
1016        // Allocate handle (分配 handle)
1017        let handle = service.allocate_handle();
1018        let task_id = handle.task_id();
1019
1020        // Create single timer (创建单个定时器)
1021        let task = TimerTask::new_oneshot(
1022            Duration::from_millis(50),
1023            Some(CallbackWrapper::new(|| async {})),
1024        );
1025
1026        // Register to service (注册到服务)
1027        service.register(handle, task).unwrap();
1028
1029        // Receive timeout notification (接收超时通知)
1030        let rx = service.take_receiver().unwrap();
1031        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1032            .await
1033            .expect("Should receive timeout notification")
1034            .expect("Should receive Some value");
1035
1036        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1037    }
1038
1039    #[tokio::test]
1040    async fn test_shutdown() {
1041        let timer = TimerWheel::with_defaults();
1042        let service = timer.create_service(ServiceConfig::default());
1043
1044        // Add some timers (添加一些定时器)
1045        let handle1 = service.allocate_handle();
1046        let handle2 = service.allocate_handle();
1047        let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1048        let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1049        service.register(handle1, task1).unwrap();
1050        service.register(handle2, task2).unwrap();
1051
1052        // Immediately shutdown (without waiting for timers to trigger) (立即关闭(不等待定时器触发))
1053        service.shutdown().await;
1054    }
1055
1056    #[tokio::test]
1057    async fn test_schedule_once_direct() {
1058        let timer = TimerWheel::with_defaults();
1059        let mut service = timer.create_service(ServiceConfig::default());
1060        let counter = Arc::new(AtomicU32::new(0));
1061
1062        // Schedule timer directly through service
1063        // 直接通过服务调度定时器
1064        let counter_clone = Arc::clone(&counter);
1065        let handle = service.allocate_handle();
1066        let task_id = handle.task_id();
1067        let task = TimerTask::new_oneshot(
1068            Duration::from_millis(50),
1069            Some(CallbackWrapper::new(move || {
1070                let counter = Arc::clone(&counter_clone);
1071                async move {
1072                    counter.fetch_add(1, Ordering::SeqCst);
1073                }
1074            })),
1075        );
1076        service.register(handle, task).unwrap();
1077
1078        // Wait for timer to trigger
1079        // 等待定时器触发
1080        let rx = service.take_receiver().unwrap();
1081        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1082            .await
1083            .expect("Should receive timeout notification")
1084            .expect("Should receive Some value");
1085
1086        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1087
1088        // Wait for callback to execute
1089        // 等待回调执行
1090        tokio::time::sleep(Duration::from_millis(50)).await;
1091        assert_eq!(counter.load(Ordering::SeqCst), 1);
1092    }
1093
1094    #[tokio::test]
1095    async fn test_schedule_once_notify_direct() {
1096        let timer = TimerWheel::with_defaults();
1097        let mut service = timer.create_service(ServiceConfig::default());
1098
1099        // Schedule only notification timer directly through service (no callback)
1100        // 直接通过服务调度通知定时器(没有回调函数)
1101        let handle = service.allocate_handle();
1102        let task_id = handle.task_id();
1103        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1104        service.register(handle, task).unwrap();
1105
1106        // Receive timeout notification
1107        // 接收超时通知
1108        let rx = service.take_receiver().unwrap();
1109        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1110            .await
1111            .expect("Should receive timeout notification")
1112            .expect("Should receive Some value");
1113
1114        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1115    }
1116
1117    #[tokio::test]
1118    async fn test_task_timeout_cleans_up_task_sender() {
1119        let timer = TimerWheel::with_defaults();
1120        let mut service = timer.create_service(ServiceConfig::default());
1121
1122        // Add a short-term timer (添加短期定时器)
1123        let handle = service.allocate_handle();
1124        let task_id = handle.task_id();
1125        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1126
1127        service.register(handle, task).unwrap();
1128
1129        // Wait for task timeout (等待任务超时)
1130        let rx = service.take_receiver().unwrap();
1131        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1132            .await
1133            .expect("Should receive timeout notification")
1134            .expect("Should receive Some value");
1135
1136        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1137
1138        // Wait a moment to ensure internal cleanup is complete (等待片刻以确保内部清理完成)
1139        tokio::time::sleep(Duration::from_millis(10)).await;
1140
1141        // Try to cancel the timed-out task, should return false (尝试取消超时任务,应返回 false)
1142        let cancelled = service.cancel_task(task_id);
1143        assert!(!cancelled, "Timed out task should not exist anymore");
1144    }
1145
1146    #[tokio::test]
1147    async fn test_take_receiver_twice() {
1148        let timer = TimerWheel::with_defaults();
1149        let mut service = timer.create_service(ServiceConfig::default());
1150
1151        // First call should return Some
1152        // 第一次调用应该返回 Some
1153        let rx1 = service.take_receiver();
1154        assert!(rx1.is_some(), "First take_receiver should return Some");
1155
1156        // Second call should return None
1157        // 第二次调用应该返回 None
1158        let rx2 = service.take_receiver();
1159        assert!(rx2.is_none(), "Second take_receiver should return None");
1160    }
1161}