kestrel_timer/
service.rs

1use lite_sync::{oneshot, spsc};
2use crate::{BatchHandle, TimerHandle};
3use crate::config::ServiceConfig;
4use crate::error::TimerError;
5use crate::task::{CallbackWrapper,  CompletionReceiver, TaskCompletion, TaskId};
6use crate::wheel::Wheel;
7use futures::stream::{FuturesUnordered, StreamExt};
8use futures::future::BoxFuture;
9use parking_lot::Mutex;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::task::JoinHandle;
13
14/// Task notification type for distinguishing between one-shot and periodic tasks
15/// 
16/// 任务通知类型,用于区分一次性任务和周期性任�?
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum TaskNotification {
19    /// One-shot task expired notification
20    /// 
21    /// 一次性任务过期通知
22    OneShot(TaskId),
23    /// Periodic task called notification
24    /// 
25    /// 周期性任务被调用通知
26    Periodic(TaskId),
27}
28
29impl TaskNotification {
30    /// Get the task ID from the notification
31    /// 
32    /// 从通知中获取任�?ID
33    pub fn task_id(&self) -> TaskId {
34        match self {
35            TaskNotification::OneShot(id) => *id,
36            TaskNotification::Periodic(id) => *id,
37        }
38    }
39    
40    /// Check if this is a one-shot task notification
41    /// 
42    /// 检查是否为一次性任务通知
43    pub fn is_oneshot(&self) -> bool {
44        matches!(self, TaskNotification::OneShot(_))
45    }
46    
47    /// Check if this is a periodic task notification
48    /// 
49    /// 检查是否为周期性任务通知
50    pub fn is_periodic(&self) -> bool {
51        matches!(self, TaskNotification::Periodic(_))
52    }
53}
54
55/// Service command type
56/// 
57/// 服务命令类型
58enum ServiceCommand {
59    /// Add batch timer handle, only contains necessary data: task_ids and completion_rxs
60    /// 
61    /// 添加批量定时器句柄,仅包含必要数�? task_ids �?completion_rxs
62    AddBatchHandle {
63        task_ids: Vec<TaskId>,
64        completion_rxs: Vec<CompletionReceiver>,
65    },
66    /// Add single timer handle, only contains necessary data: task_id and completion_rx
67    /// 
68    /// 添加单个定时器句柄,仅包含必要数�? task_id �?completion_rx
69    AddTimerHandle {
70        task_id: TaskId,
71        completion_rx: CompletionReceiver,
72    },
73}
74
75/// TimerService - timer service based on Actor pattern
76/// Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.
77/// # Features
78/// - Automatically listens to all added timer handles' timeout events
79/// - Automatically removes one-shot tasks from internal management after timeout
80/// - Continuously monitors periodic tasks and forwards each invocation
81/// - Aggregates notifications (both one-shot and periodic) to be forwarded to the user's unified channel
82/// - Supports dynamic addition of BatchHandle and TimerHandle
83///
84/// 
85/// # 定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户�?
86/// - 自动监听所有添加的定时器句柄的超时事件
87/// - 自动在一次性任务超时后从内部管理中移除任务
88/// - 持续监听周期性任务并转发每次调用通知
89/// - 将通知(一次性和周期性)聚合转发给用�?
90/// - 支持动态添�?BatchHandle �?TimerHandle
91/// 
92/// # Examples (示例)
93/// ```no_run
94/// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
95/// use std::time::Duration;
96///
97/// #[tokio::main]
98/// async fn main() {
99///     let timer = TimerWheel::with_defaults();
100///     let mut service = timer.create_service(ServiceConfig::default());
101///     
102///     // Register one-shot tasks (注册一次性任�?
103///     use kestrel_timer::TimerTask;
104///     let handles = service.allocate_handles(3);
105///     let tasks: Vec<_> = (0..3)
106///         .map(|i| {
107///             let callback = Some(CallbackWrapper::new(move || async move {
108///                 println!("One-shot timer {} fired!", i);
109///             }));
110///             TimerTask::new_oneshot(Duration::from_millis(100), callback)
111///         })
112///         .collect();
113///     service.register_batch(handles, tasks).unwrap();
114///     
115///     // Register periodic tasks (注册周期性任�?
116///     let handle = service.allocate_handle();
117///     let periodic_task = TimerTask::new_periodic(
118///         Duration::from_millis(100),
119///         Duration::from_millis(50),
120///         Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
121///         None
122///     );
123///     service.register(handle, periodic_task).unwrap();
124///     
125///     // Receive notifications (接收通知)
126///     let rx = service.take_receiver().unwrap();
127///     while let Some(notification) = rx.recv().await {
128///         match notification {
129///             TaskNotification::OneShot(task_id) => {
130///                 println!("One-shot task {:?} expired", task_id);
131///             }
132///             TaskNotification::Periodic(task_id) => {
133///                 println!("Periodic task {:?} called", task_id);
134///             }
135///         }
136///     }
137/// }
138/// ```
139pub struct TimerService {
140    /// Command sender
141    /// 
142    /// 命令发送器
143    command_tx: spsc::Sender<ServiceCommand, 32>,
144    /// Timeout receiver (supports both one-shot and periodic task notifications)
145    /// 
146    /// 超时接收器(支持一次性和周期性任务通知�?
147    timeout_rx: Option<spsc::Receiver<TaskNotification, 32>>,
148    /// Actor task handle
149    /// 
150    /// Actor 任务句柄
151    actor_handle: Option<JoinHandle<()>>,
152    /// Timing wheel reference (for direct scheduling of timers)
153    /// 
154    /// 时间轮引�?(用于直接调度定时�?
155    wheel: Arc<Mutex<Wheel>>,
156    /// Actor shutdown signal sender
157    /// 
158    /// Actor 关闭信号发送器
159    shutdown_tx: Option<oneshot::Sender<()>>,
160}
161
162impl TimerService {
163    /// Allocate a handle from DeferredMap
164    /// 
165    /// # Returns
166    /// A unique handle for later insertion
167    /// 
168    /// # 返回�?
169    /// 用于后续插入的唯一 handle
170    /// 
171    /// # Examples (示例)
172    /// ```no_run
173    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
174    /// # #[tokio::main]
175    /// # async fn main() {
176    /// let timer = TimerWheel::with_defaults();
177    /// let mut service = timer.create_service(ServiceConfig::default());
178    /// 
179    /// // Allocate handle first
180    /// // 先分�?handle
181    /// let handle = service.allocate_handle();
182    /// # }
183    /// ```
184    pub fn allocate_handle(&self) -> crate::task::TaskHandle {
185        self.wheel.lock().allocate_handle()
186    }
187
188    /// Batch allocate handles from DeferredMap
189    /// 
190    /// # Parameters
191    /// - `count`: Number of handles to allocate
192    /// 
193    /// # Returns
194    /// Vector of unique handles for later batch insertion
195    /// 
196    /// # 参数
197    /// - `count`: 要分配的 handle 数量
198    /// 
199    /// # 返回�?
200    /// 用于后续批量插入的唯一 handles 向量
201    /// 
202    /// # Examples (示例)
203    /// ```no_run
204    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
205    /// # #[tokio::main]
206    /// # async fn main() {
207    /// let timer = TimerWheel::with_defaults();
208    /// let service = timer.create_service(ServiceConfig::default());
209    /// 
210    /// // Batch allocate handles
211    /// // 批量分配 handles
212    /// let handles = service.allocate_handles(10);
213    /// assert_eq!(handles.len(), 10);
214    /// # }
215    /// ```
216    pub fn allocate_handles(&self, count: usize) -> Vec<crate::task::TaskHandle> {
217        self.wheel.lock().allocate_handles(count)
218    }
219
220    /// Create new TimerService
221    ///
222    /// # Parameters
223    /// - `wheel`: Timing wheel reference
224    /// - `config`: Service configuration
225    ///
226    /// # Notes
227    /// Typically not called directly, but used to create through `TimerWheel::create_service()`
228    ///
229    /// 创建新的 TimerService
230    /// 
231    /// # 参数
232    /// - `wheel`: 时间轮引�?
233    /// - `config`: 服务配置
234    ///
235    /// # 注意
236    /// 通常不直接调用,而是通过 `TimerWheel::create_service()` 创建
237    ///
238    pub(crate) fn new(wheel: Arc<Mutex<Wheel>>, config: ServiceConfig) -> Self {
239        let (command_tx, command_rx) = spsc::channel(config.command_channel_capacity);
240        let (timeout_tx, timeout_rx) = spsc::channel(config.timeout_channel_capacity);
241
242        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
243        let actor = ServiceActor::new(command_rx, timeout_tx, shutdown_rx);
244        let actor_handle = tokio::spawn(async move {
245            actor.run().await;
246        });
247
248        Self {
249            command_tx,
250            timeout_rx: Some(timeout_rx),
251            actor_handle: Some(actor_handle),
252            wheel,
253            shutdown_tx: Some(shutdown_tx),
254        }
255    }
256
257    /// Get timeout receiver (transfer ownership)
258    /// 
259    /// # Returns
260    /// Timeout notification receiver, if already taken, returns None
261    ///
262    /// # Notes
263    /// This method can only be called once, because it transfers ownership of the receiver
264    /// The receiver will receive both one-shot task expired notifications and periodic task called notifications
265    ///
266    /// 获取超时通知接收�?(转移所有权)
267    /// 
268    /// # 返回�?
269    /// 超时通知接收器,如果已经取走,返�?None
270    /// 
271    /// # 注意
272    /// 此方法只能调用一次,因为它转移了接收器的所有权
273    /// 接收器将接收一次性任务过期通知和周期性任务被调用通知
274    /// 
275    /// # Examples (示例)
276    /// ```no_run
277    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig, TaskNotification};
278    /// # #[tokio::main]
279    /// # async fn main() {
280    /// let timer = TimerWheel::with_defaults();
281    /// let mut service = timer.create_service(ServiceConfig::default());
282    /// 
283    /// let rx = service.take_receiver().unwrap();
284    /// while let Some(notification) = rx.recv().await {
285    ///     match notification {
286    ///         TaskNotification::OneShot(task_id) => {
287    ///             println!("One-shot task {:?} expired", task_id);
288    ///         }
289    ///         TaskNotification::Periodic(task_id) => {
290    ///             println!("Periodic task {:?} called", task_id);
291    ///         }
292    ///     }
293    /// }
294    /// # }
295    /// ```
296    pub fn take_receiver(&mut self) -> Option<spsc::Receiver<TaskNotification, 32>> {
297        self.timeout_rx.take()
298    }
299
300    /// Cancel specified task
301    /// 
302    /// # Parameters
303    /// - `task_id`: Task ID to cancel
304    ///
305    /// # Returns
306    /// - `true`: Task exists and cancellation is successful
307    /// - `false`: Task does not exist or cancellation fails
308    ///
309    /// 取消指定任务
310    /// 
311    /// # 参数
312    /// - `task_id`: 任务 ID
313    ///
314    /// # 返回�?
315    /// - `true`: 任务存在且取消成�?
316    /// - `false`: 任务不存在或取消失败
317    ///
318    /// # Examples (示例)
319    /// ```no_run
320    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
321    /// # use std::time::Duration;
322    /// # 
323    /// # #[tokio::main]
324    /// # async fn main() {
325    /// let timer = TimerWheel::with_defaults();
326    /// let service = timer.create_service(ServiceConfig::default());
327    /// 
328    /// // Use two-step API to schedule timers
329    /// let handle = service.allocate_handle();
330    /// let task_id = handle.task_id();
331    /// let callback = Some(CallbackWrapper::new(|| async move {
332    ///     println!("Timer fired!"); // 定时器触�?
333    /// }));
334    /// let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
335    /// service.register(handle, task).unwrap(); // 注册定时�?
336    /// 
337    /// // Cancel task
338    /// let cancelled = service.cancel_task(task_id);
339    /// println!("Task cancelled: {}", cancelled); // 任务取消
340    /// # }
341    /// ```
342    #[inline]
343    pub fn cancel_task(&self, task_id: TaskId) -> bool {
344        // Direct cancellation, no need to notify Actor
345        // FuturesUnordered will automatically clean up when tasks are cancelled
346        // 直接取消,无需通知 Actor
347        // FuturesUnordered 将在任务取消时自动清�?
348        let mut wheel = self.wheel.lock();
349        wheel.cancel(task_id)
350    }
351
352    /// Batch cancel tasks
353    /// 
354    /// Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
355    /// 
356    /// # Parameters
357    /// - `task_ids`: List of task IDs to cancel
358    ///
359    /// # Returns
360    /// Number of successfully cancelled tasks
361    ///
362    /// 批量取消任务
363    /// 
364    /// # 参数
365    /// - `task_ids`: 任务 ID 列表
366    ///
367    /// # 返回�?
368    /// 成功取消的任务数�?
369    ///
370    /// # Examples (示例)
371    /// ```no_run
372    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
373    /// # use std::time::Duration;
374    /// # 
375    /// # #[tokio::main]
376    /// # async fn main() {
377    /// let timer = TimerWheel::with_defaults();
378    /// let service = timer.create_service(ServiceConfig::default());
379    /// 
380    /// let handles = service.allocate_handles(10);
381    /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
382    /// let tasks: Vec<_> = (0..10)
383    ///     .map(|i| {
384    ///         let callback = Some(CallbackWrapper::new(move || async move {
385    ///             println!("Timer {} fired!", i); // 定时器触�?
386    ///         }));
387    ///         TimerTask::new_oneshot(Duration::from_secs(10), callback)
388    ///     })
389    ///     .collect();
390    /// service.register_batch(handles, tasks).unwrap(); // 注册定时�?
391    /// 
392    /// // Batch cancel
393    /// let cancelled = service.cancel_batch(&task_ids);
394    /// println!("Cancelled {} tasks", cancelled); // 任务取消
395    /// # }
396    /// ```
397    #[inline]
398    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
399        if task_ids.is_empty() {
400            return 0;
401        }
402
403        // Direct batch cancellation, no need to notify Actor
404        // FuturesUnordered will automatically clean up when tasks are cancelled
405        // 直接批量取消,无需通知 Actor
406        // FuturesUnordered 将在任务取消时自动清�?
407        let mut wheel = self.wheel.lock();
408        wheel.cancel_batch(task_ids)
409    }
410
411    /// Postpone task (optionally replace callback)
412    /// 
413    /// # Parameters
414    /// - `task_id`: Task ID to postpone
415    /// - `new_delay`: New delay time (recalculated from current time point)
416    /// - `callback`: New callback function (if `None`, keeps the original callback)
417    ///
418    /// # Returns
419    /// - `true`: Task exists and is successfully postponed
420    /// - `false`: Task does not exist or postponement fails
421    ///
422    /// # Notes
423    /// - Task ID remains unchanged after postponement
424    /// - Original timeout notification remains valid
425    /// - If callback is `Some`, it will replace the original callback
426    /// - If callback is `None`, the original callback is preserved
427    ///
428    /// 推迟任务 (可选替换回�?
429    /// 
430    /// # 参数
431    /// - `task_id`: 任务 ID
432    /// - `new_delay`: 新的延迟时间 (从当前时间点重新计算)
433    /// - `callback`: 新的回调函数 (如果�?`None`,则保留原回�?
434    ///
435    /// # 返回�?
436    /// - `true`: 任务存在且延期成�?
437    /// - `false`: 任务不存在或延期失败
438    ///
439    /// # 注意
440    /// - 任务 ID 在延期后保持不变
441    /// - 原始超时通知保持有效
442    /// - 如果 callback �?`Some`,将替换原始回调
443    /// - 如果 callback �?`None`,保留原始回�?
444    ///
445    /// # Examples (示例)
446    /// ```no_run
447    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
448    /// # use std::time::Duration;
449    /// # 
450    /// # #[tokio::main]
451    /// # async fn main() {
452    /// let timer = TimerWheel::with_defaults();
453    /// let service = timer.create_service(ServiceConfig::default());
454    /// 
455    /// let handle = service.allocate_handle();
456    /// let task_id = handle.task_id();
457    /// let callback = Some(CallbackWrapper::new(|| async {
458    ///     println!("Original callback"); // 原始回调
459    /// }));
460    /// let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
461    /// service.register(handle, task).unwrap(); // 注册定时�?
462    /// 
463    /// // Postpone and replace callback (延期并替换回�?
464    /// let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
465    /// let success = service.postpone(
466    ///     task_id,
467    ///     Duration::from_secs(10),
468    ///     new_callback
469    /// );
470    /// println!("Postponed successfully: {}", success);
471    /// # }
472    /// ```
473    #[inline]
474    pub fn postpone(&self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>) -> bool {
475        let mut wheel = self.wheel.lock();
476        wheel.postpone(task_id, new_delay, callback)
477    }
478
479    /// Batch postpone tasks (keep original callbacks)
480    /// 
481    /// # Parameters
482    /// - `updates`: List of tuples of (task ID, new delay)
483    ///
484    /// # Returns
485    /// Number of successfully postponed tasks
486    ///
487    /// 批量延期任务 (保持原始回调)
488    /// 
489    /// # 参数
490    /// - `updates`: (任务 ID, 新延�? 元组列表
491    ///
492    /// # 返回�?
493    /// 成功延期的任务数�?
494    ///
495    /// # Examples (示例)
496    /// ```no_run
497    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
498    /// # use std::time::Duration;
499    /// # 
500    /// # #[tokio::main]
501    /// # async fn main() {
502    /// let timer = TimerWheel::with_defaults();
503    /// let service = timer.create_service(ServiceConfig::default());
504    /// 
505    /// let handles = service.allocate_handles(3);
506    /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
507    /// let tasks: Vec<_> = (0..3)
508    ///     .map(|i| {
509    ///         let callback = Some(CallbackWrapper::new(move || async move {
510    ///             println!("Timer {} fired!", i);
511    ///         }));
512    ///         TimerTask::new_oneshot(Duration::from_secs(5), callback)
513    ///     })
514    ///     .collect();
515    /// service.register_batch(handles, tasks).unwrap();
516    /// 
517    /// // Batch postpone (keep original callbacks)
518    /// // 批量延期任务 (保持原始回调)
519    /// let updates: Vec<_> = task_ids
520    ///     .into_iter()
521    ///     .map(|id| (id, Duration::from_secs(10)))
522    ///     .collect();
523    /// let postponed = service.postpone_batch(updates);
524    /// println!("Postponed {} tasks", postponed);
525    /// # }
526    /// ```
527    #[inline]
528    pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
529        if updates.is_empty() {
530            return 0;
531        }
532
533        let mut wheel = self.wheel.lock();
534        wheel.postpone_batch(updates)
535    }
536
537    /// Batch postpone tasks (replace callbacks)
538    /// 
539    /// # Parameters
540    /// - `updates`: List of tuples of (task ID, new delay, new callback)
541    ///
542    /// # Returns
543    /// Number of successfully postponed tasks
544    ///
545    /// 批量延期任务 (替换回调)
546    /// 
547    /// # 参数
548    /// - `updates`: (任务 ID, 新延�? 新回�? 元组列表
549    ///
550    /// # 返回�?
551    /// 成功延期的任务数�?
552    ///
553    /// # Examples (示例)
554    /// ```no_run
555    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
556    /// # use std::time::Duration;
557    /// # 
558    /// # #[tokio::main]
559    /// # async fn main() {
560    /// # use kestrel_timer::TimerTask;
561    /// let timer = TimerWheel::with_defaults();
562    /// let service = timer.create_service(ServiceConfig::default());
563    /// 
564    /// // Create 3 tasks, initially no callbacks
565    /// // 创建 3 个任务,最初没有回�?
566    /// let handles = service.allocate_handles(3);
567    /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
568    /// let tasks: Vec<_> = (0..3)
569    ///     .map(|_| {
570    ///         TimerTask::new_oneshot(Duration::from_secs(5), None)
571    ///     })
572    ///     .collect();
573    /// service.register_batch(handles, tasks).unwrap();
574    /// 
575    /// // Batch postpone and add new callbacks
576    /// // 批量延期并添加新的回�?
577    /// let updates: Vec<_> = task_ids
578    ///     .into_iter()
579    ///     .enumerate()
580    ///     .map(|(i, id)| {
581    ///         let callback = Some(CallbackWrapper::new(move || async move {
582    ///             println!("New callback {}", i);
583    ///         }));
584    ///         (id, Duration::from_secs(10), callback)
585    ///     })
586    ///     .collect();
587    /// let postponed = service.postpone_batch_with_callbacks(updates);
588    /// println!("Postponed {} tasks", postponed);
589    /// # }
590    /// ```
591    #[inline]
592    pub fn postpone_batch_with_callbacks(
593        &self,
594        updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
595    ) -> usize {
596        if updates.is_empty() {
597            return 0;
598        }
599
600        let mut wheel = self.wheel.lock();
601        wheel.postpone_batch_with_callbacks(updates)
602    }
603    
604    /// Register timer task to service (registration phase)
605    /// 
606    /// # Parameters
607    /// - `handle`: Handle allocated via `allocate_handle()`
608    /// - `task`: Task created via `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
609    /// 
610    /// # Returns
611    /// - `Ok(TimerHandle)`: Register successfully
612    /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
613    /// 
614    /// 注册定时器任务到服务 (注册阶段)
615    /// # 参数
616    /// - `handle`: 通过 `allocate_handle()` 分配�?handle
617    /// - `task`: 通过 `TimerTask::new_oneshot()` �?`TimerTask::new_periodic()` 创建的任�?
618    ///
619    /// # 返回�?
620    /// - `Ok(TimerHandle)`: 注册成功
621    /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关�?
622    ///
623    /// # Examples (示例)
624    /// ```no_run
625    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
626    /// # use std::time::Duration;
627    /// # 
628    /// # #[tokio::main]
629    /// # async fn main() {
630    /// let timer = TimerWheel::with_defaults();
631    /// let service = timer.create_service(ServiceConfig::default());
632    /// 
633    /// // Step 1: allocate handle
634    /// // 分配 handle
635    /// let handle = service.allocate_handle();
636    /// let task_id = handle.task_id();
637    /// 
638    /// // Step 2: create task
639    /// // 创建任务
640    /// let callback = Some(CallbackWrapper::new(|| async move {
641    ///     println!("Timer fired!");
642    /// }));
643    /// let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
644    /// 
645    /// // Step 3: register task
646    /// // 注册任务
647    /// service.register(handle, task).unwrap();
648    /// # }
649    /// ```
650    #[inline]
651    pub fn register(&self, handle: crate::task::TaskHandle, task: crate::task::TimerTask) -> Result<TimerHandle, TimerError> {
652        let task_id = handle.task_id();
653        
654        let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
655        
656        // Single lock, complete all operations
657        // 单次锁定,完成所有操�?
658        {
659            let mut wheel_guard = self.wheel.lock();
660            wheel_guard.insert(handle, task);
661        }
662        
663        // Add to service management (only send necessary data)
664        // 添加到服务管理(只发送必要数据)
665        self.command_tx
666            .try_send(ServiceCommand::AddTimerHandle {
667                task_id,
668                completion_rx,
669            })
670            .map_err(|_| TimerError::RegisterFailed)?;
671        
672        Ok(TimerHandle::new(task_id, self.wheel.clone()))
673    }
674    
675    /// Batch register timer tasks to service (registration phase)
676    /// 
677    /// # Parameters
678    /// - `handles`: Pre-allocated handles for tasks
679    /// - `tasks`: List of timer tasks
680    /// 
681    /// # Returns
682    /// - `Ok(BatchHandle)`: Register successfully
683    /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
684    /// - `Err(TimerError::BatchLengthMismatch)`: Handles and tasks lengths don't match
685    /// 
686    /// 批量注册定时器任务到服务 (注册阶段)
687    /// # 参数
688    /// - `handles`: 任务的预分配 handles
689    /// - `tasks`: 定时器任务列�?
690    ///
691    /// # 返回�?
692    /// - `Ok(BatchHandle)`: 注册成功
693    /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关�?
694    /// - `Err(TimerError::BatchLengthMismatch)`: handles �?tasks 长度不匹�?
695    ///
696    /// # Examples (示例)
697    /// ```no_run
698    /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
699    /// # use std::time::Duration;
700    /// # 
701    /// # #[tokio::main]
702    /// # async fn main() {
703    /// # use kestrel_timer::TimerTask;
704    /// let timer = TimerWheel::with_defaults();
705    /// let service = timer.create_service(ServiceConfig::default());
706    /// 
707    /// // Step 1: batch allocate handles
708    /// // 批量分配 handles
709    /// let handles = service.allocate_handles(3);
710    /// 
711    /// // Step 2: create tasks
712    /// // 创建任务
713    /// let tasks: Vec<_> = (0..3)
714    ///     .map(|i| {
715    ///         let callback = Some(CallbackWrapper::new(move || async move {
716    ///             println!("Timer {} fired!", i);
717    ///         }));
718    ///         TimerTask::new_oneshot(Duration::from_secs(1), callback)
719    ///     })
720    ///     .collect();
721    /// 
722    /// // Step 3: register batch
723    /// // 注册批量任务
724    /// service.register_batch(handles, tasks).unwrap();
725    /// # }
726    /// ```
727    #[inline]
728    pub fn register_batch(
729        &self, 
730        handles: Vec<crate::task::TaskHandle>, 
731        tasks: Vec<crate::task::TimerTask>
732    ) -> Result<BatchHandle, TimerError> {
733        // Validate lengths match
734        if handles.len() != tasks.len() {
735            return Err(TimerError::BatchLengthMismatch {
736                handles_len: handles.len(),
737                tasks_len: tasks.len(),
738            });
739        }
740        
741        let task_count = tasks.len();
742        let mut completion_rxs = Vec::with_capacity(task_count);
743        let mut task_ids = Vec::with_capacity(task_count);
744        let mut prepared_handles = Vec::with_capacity(task_count);
745        let mut prepared_tasks = Vec::with_capacity(task_count);
746        
747        // Step 1: prepare all channels and notifiers (no lock)
748        // 步骤 1: 准备所有通道和通知器(无锁�?
749        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
750            let task_id = handle.task_id();
751            let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
752            task_ids.push(task_id);
753            completion_rxs.push(completion_rx);
754            prepared_handles.push(handle);
755            prepared_tasks.push(task);
756        }
757        
758        // Step 2: single lock, batch insert
759        // 步骤 2: 单次锁定,批量插�?
760        {
761            let mut wheel_guard = self.wheel.lock();
762            wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
763        }
764        
765        // Add to service management (only send necessary data)
766        // 添加到服务管理(只发送必要数据)
767        self.command_tx
768            .try_send(ServiceCommand::AddBatchHandle {
769                task_ids: task_ids.clone(),
770                completion_rxs,
771            })
772            .map_err(|_| TimerError::RegisterFailed)?;
773        
774        Ok(BatchHandle::new(task_ids, self.wheel.clone()))
775    }
776
777    /// Graceful shutdown of TimerService
778    /// 
779    /// 优雅关闭 TimerService
780    /// 
781    /// # Examples (示例)
782    /// ```no_run
783    /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
784    /// # #[tokio::main]
785    /// # async fn main() {
786    /// let timer = TimerWheel::with_defaults();
787    /// let mut service = timer.create_service(ServiceConfig::default());
788    /// 
789    /// // Use service... (使用服务...)
790    /// 
791    /// service.shutdown().await;
792    /// # }
793    /// ```
794    pub async fn shutdown(mut self) {
795        if let Some(shutdown_tx) = self.shutdown_tx.take() {
796            shutdown_tx.notify(());
797        }
798        if let Some(handle) = self.actor_handle.take() {
799            let _ = handle.await;
800        }
801    }
802}
803
804
805impl Drop for TimerService {
806    fn drop(&mut self) {
807        if let Some(handle) = self.actor_handle.take() {
808            handle.abort();
809        }
810    }
811}
812
813/// ServiceActor - internal Actor implementation
814/// 
815/// ServiceActor - 内部 Actor 实现
816struct ServiceActor {
817    /// Command receiver
818    /// 
819    /// 命令接收�?
820    command_rx: spsc::Receiver<ServiceCommand, 32>,
821    /// Timeout sender (supports both one-shot and periodic task notifications)
822    /// 
823    /// 超时发送器(支持一次性和周期性任务通知�?
824    timeout_tx: spsc::Sender<TaskNotification, 32>,
825    /// Actor shutdown signal receiver
826    /// 
827    /// Actor 关闭信号接收�?
828    shutdown_rx: oneshot::Receiver<()>,
829}
830
831impl ServiceActor {
832    /// Create new ServiceActor
833    /// 
834    /// 创建新的 ServiceActor
835    fn new(command_rx: spsc::Receiver<ServiceCommand, 32>, timeout_tx: spsc::Sender<TaskNotification, 32>, shutdown_rx: oneshot::Receiver<()>) -> Self {
836        Self {
837            command_rx,
838            timeout_tx,
839            shutdown_rx,
840        }
841    }
842
843    /// Run Actor event loop
844    /// 
845    /// 运行 Actor 事件循环
846    async fn run(self) {
847        // Use separate FuturesUnordered for one-shot and periodic tasks
848        // 为一次性任务和周期性任务使用独立的 FuturesUnordered
849        
850        // One-shot futures: each future returns (TaskId, TaskCompletion)
851        // 一次性任�?futures:每�?future 返回 (TaskId, TaskCompletion)
852        let mut oneshot_futures: FuturesUnordered<BoxFuture<'static, (TaskId, TaskCompletion)>> = FuturesUnordered::new();
853
854        // Periodic futures: each future returns (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
855        // The receiver is returned so we can continue listening for next event
856        // 周期性任�?futures:每�?future 返回 (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
857        // 返回接收器以便我们可以继续监听下一个事�?
858        type PeriodicFutureResult = (TaskId, Option<TaskCompletion>, crate::task::PeriodicCompletionReceiver);
859        let mut periodic_futures: FuturesUnordered<BoxFuture<'static, PeriodicFutureResult>> = FuturesUnordered::new();
860
861        // Move shutdown_rx out of self, so it can be used in select! with &mut
862        // �?shutdown_rx �?self 中移出,以便�?select! 中使�?&mut
863        let mut shutdown_rx = self.shutdown_rx;
864
865        loop {
866            tokio::select! {
867                // Listen to high-priority shutdown signal
868                // 监听高优先级关闭信号
869                _ = &mut shutdown_rx => {
870                    // Receive shutdown signal, exit loop immediately
871                    // 接收到关闭信号,立即退出循�?
872                    break;
873                }
874
875                // Listen to one-shot task timeout events
876                // 监听一次性任务超时事�?
877                Some((task_id, completion)) = oneshot_futures.next() => {
878                    // Check completion reason, only forward Called events, do not forward Cancelled events
879                    // 检查完成原因,只转�?Called 事件,不转发 Cancelled 事件
880                    if completion == TaskCompletion::Called {
881                        let _ = self.timeout_tx.send(TaskNotification::OneShot(task_id)).await;
882                    }
883                    // Task will be automatically removed from FuturesUnordered
884                    // 任务将自动从 FuturesUnordered 中移�?
885                }
886                
887                // Listen to periodic task events
888                // 监听周期性任务事�?
889                Some((task_id, reason, mut receiver)) = periodic_futures.next() => {
890                    // Check completion reason, only forward Called events, do not forward Cancelled events
891                    // 检查完成原因,只转�?Called 事件,不转发 Cancelled 事件
892                    if let Some(TaskCompletion::Called) = reason {
893                        let _ = self.timeout_tx.send(TaskNotification::Periodic(task_id)).await;
894                        
895                        // Re-add the receiver to continue listening for next periodic event
896                        // 重新添加接收器以继续监听下一个周期性事�?
897                        let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
898                            let reason = receiver.recv().await;
899                            (task_id, reason, receiver)
900                        });
901                        periodic_futures.push(future);
902                    }
903                    // If Cancelled or None, do not re-add the future (task is done)
904                    // 如果�?Cancelled �?None,不重新添加 future(任务结束)
905                }
906                
907                // Listen to commands
908                // 监听命令
909                Some(cmd) = self.command_rx.recv() => {
910                    match cmd {
911                        ServiceCommand::AddBatchHandle { task_ids, completion_rxs } => {
912                            // Add all tasks to appropriate futures
913                            // 将所有任务添加到相应�?futures
914                            for (task_id, rx) in task_ids.into_iter().zip(completion_rxs.into_iter()) {
915                                match rx {
916                                    crate::task::CompletionReceiver::OneShot(receiver) => {
917                                        let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
918                                            (task_id, receiver.wait().await)
919                                        });
920                                        oneshot_futures.push(future);
921                                    },
922                                    crate::task::CompletionReceiver::Periodic(mut receiver) => {
923                                        let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
924                                            let reason = receiver.recv().await;
925                                            (task_id, reason, receiver)
926                                        });
927                                        periodic_futures.push(future);
928                                    }
929                                }
930                            }
931                        }
932                        ServiceCommand::AddTimerHandle { task_id, completion_rx } => {
933                            // Add to appropriate futures
934                            // 添加到相应的 futures
935                            match completion_rx {
936                                crate::task::CompletionReceiver::OneShot(receiver) => {
937                                    let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
938                                        (task_id, receiver.wait().await)
939                                    });
940                                    oneshot_futures.push(future);
941                                },
942                                crate::task::CompletionReceiver::Periodic(mut receiver) => {
943                                    let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
944                                        let reason = receiver.recv().await;
945                                        (task_id, reason, receiver)
946                                    });
947                                    periodic_futures.push(future);
948                                }
949                            }
950                        }
951                    }
952                }
953                
954                // If no futures and command channel is closed, exit loop
955                // 如果没有 futures 且命令通道关闭,退出循�?
956                else => {
957                    break;
958                }
959            }
960        }
961    }
962}
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967    use crate::{TimerWheel, TimerTask};
968    use std::sync::atomic::{AtomicU32, Ordering};
969    use std::sync::Arc;
970    use std::time::Duration;
971
972    #[tokio::test]
973    async fn test_service_creation() {
974        let timer = TimerWheel::with_defaults();
975        let _service = timer.create_service(ServiceConfig::default());
976    }
977
978    #[tokio::test]
979    async fn test_add_timer_handle_and_receive_timeout() {
980        let timer = TimerWheel::with_defaults();
981        let mut service = timer.create_service(ServiceConfig::default());
982
983        // Allocate handle (分配 handle)
984        let handle = service.allocate_handle();
985        let task_id = handle.task_id();
986        
987        // Create single timer (创建单个定时�?
988        let task = TimerTask::new_oneshot(Duration::from_millis(50), Some(CallbackWrapper::new(|| async {})));
989        
990        // Register to service (注册到服�?
991        service.register(handle, task).unwrap();
992
993        // Receive timeout notification (接收超时通知)
994        let rx = service.take_receiver().unwrap();
995        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
996            .await
997            .expect("Should receive timeout notification")
998            .expect("Should receive Some value");
999
1000        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1001    }
1002
1003    #[tokio::test]
1004    async fn test_shutdown() {
1005        let timer = TimerWheel::with_defaults();
1006        let service = timer.create_service(ServiceConfig::default());
1007
1008        // Add some timers (添加一些定时器)
1009        let handle1 = service.allocate_handle();
1010        let handle2 = service.allocate_handle();
1011        let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1012        let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1013        service.register(handle1, task1).unwrap();
1014        service.register(handle2, task2).unwrap();
1015
1016        // Immediately shutdown (without waiting for timers to trigger) (立即关闭(不等待定时器触发))
1017        service.shutdown().await;
1018    }
1019
1020    #[tokio::test]
1021    async fn test_schedule_once_direct() {
1022        let timer = TimerWheel::with_defaults();
1023        let mut service = timer.create_service(ServiceConfig::default());
1024        let counter = Arc::new(AtomicU32::new(0));
1025
1026        // Schedule timer directly through service
1027        // 直接通过服务调度定时器
1028        let counter_clone = Arc::clone(&counter);
1029        let handle = service.allocate_handle();
1030        let task_id = handle.task_id();
1031        let task = TimerTask::new_oneshot(
1032            Duration::from_millis(50),
1033            Some(CallbackWrapper::new(move || {
1034                let counter = Arc::clone(&counter_clone);
1035                async move {
1036                    counter.fetch_add(1, Ordering::SeqCst);
1037                }
1038            })),
1039        );
1040        service.register(handle, task).unwrap();
1041
1042        // Wait for timer to trigger
1043        // 等待定时器触�?
1044        let rx = service.take_receiver().unwrap();
1045        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1046            .await
1047            .expect("Should receive timeout notification")
1048            .expect("Should receive Some value");
1049
1050        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1051        
1052        // Wait for callback to execute
1053        // 等待回调执行
1054        tokio::time::sleep(Duration::from_millis(50)).await;
1055        assert_eq!(counter.load(Ordering::SeqCst), 1);
1056    }
1057
1058    #[tokio::test]
1059    async fn test_schedule_once_notify_direct() {
1060        let timer = TimerWheel::with_defaults();
1061        let mut service = timer.create_service(ServiceConfig::default());
1062
1063        // Schedule only notification timer directly through service (no callback)
1064        // 直接通过服务调度通知定时器(没有回调�?
1065        let handle = service.allocate_handle();
1066        let task_id = handle.task_id();
1067        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1068        service.register(handle, task).unwrap();
1069
1070        // Receive timeout notification
1071        // 接收超时通知
1072        let rx = service.take_receiver().unwrap();
1073        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1074            .await
1075            .expect("Should receive timeout notification")
1076            .expect("Should receive Some value");
1077
1078        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1079    }
1080
1081    #[tokio::test]
1082    async fn test_task_timeout_cleans_up_task_sender() {
1083        let timer = TimerWheel::with_defaults();
1084        let mut service = timer.create_service(ServiceConfig::default());
1085
1086        // Add a short-term timer (添加短期定时�?
1087        let handle = service.allocate_handle();
1088        let task_id = handle.task_id();
1089        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1090        
1091        service.register(handle, task).unwrap();
1092
1093        // Wait for task timeout (等待任务超时)
1094        let rx = service.take_receiver().unwrap();
1095        let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1096            .await
1097            .expect("Should receive timeout notification")
1098            .expect("Should receive Some value");
1099        
1100        assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1101
1102        // Wait a moment to ensure internal cleanup is complete (等待片刻以确保内部清理完�?
1103        tokio::time::sleep(Duration::from_millis(10)).await;
1104
1105        // Try to cancel the timed-out task, should return false (尝试取消超时任务,应返回 false)
1106        let cancelled = service.cancel_task(task_id);
1107        assert!(!cancelled, "Timed out task should not exist anymore");
1108    }
1109
1110    #[tokio::test]
1111    async fn test_take_receiver_twice() {
1112        let timer = TimerWheel::with_defaults();
1113        let mut service = timer.create_service(ServiceConfig::default());
1114
1115        // First call should return Some
1116        // 第一次调用应该返�?Some
1117        let rx1 = service.take_receiver();
1118        assert!(rx1.is_some(), "First take_receiver should return Some");
1119
1120        // Second call should return None
1121        // 第二次调用应该返�?None
1122        let rx2 = service.take_receiver();
1123        assert!(rx2.is_none(), "Second take_receiver should return None");
1124    }
1125}