kestrel_timer/service.rs
1use crate::config::ServiceConfig;
2use crate::error::TimerError;
3use crate::task::{CallbackWrapper, CompletionReceiver, TaskCompletion, TaskId};
4use crate::wheel::Wheel;
5use crate::{BatchHandle, TimerHandle};
6use futures::future::BoxFuture;
7use futures::stream::{FuturesUnordered, StreamExt};
8use lite_sync::{
9 oneshot::lite::{Receiver, Sender, channel},
10 spsc,
11};
12use parking_lot::Mutex;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::task::JoinHandle;
16
17/// Task notification type for distinguishing between one-shot and periodic tasks
18///
19/// 任务通知类型,用于区分一次性任务和周期性任务
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum TaskNotification {
22 /// One-shot task expired notification
23 ///
24 /// 一次性任务过期通知
25 OneShot(TaskId),
26 /// Periodic task called notification
27 ///
28 /// 周期性任务被调用通知
29 Periodic(TaskId),
30}
31
32impl TaskNotification {
33 /// Get the task ID from the notification
34 ///
35 /// 从通知中获取任务 ID
36 pub fn task_id(&self) -> TaskId {
37 match self {
38 TaskNotification::OneShot(id) => *id,
39 TaskNotification::Periodic(id) => *id,
40 }
41 }
42
43 /// Check if this is a one-shot task notification
44 ///
45 /// 检查是否为一次性任务通知
46 pub fn is_oneshot(&self) -> bool {
47 matches!(self, TaskNotification::OneShot(_))
48 }
49
50 /// Check if this is a periodic task notification
51 ///
52 /// 检查是否为周期性任务通知
53 pub fn is_periodic(&self) -> bool {
54 matches!(self, TaskNotification::Periodic(_))
55 }
56}
57
58/// Service command type
59///
60/// 服务命令类型
61enum ServiceCommand {
62 /// Add batch timer handle, only contains necessary data: task_ids and completion_rxs
63 ///
64 /// 添加批量定时器句柄,仅包含必要数据:task_ids 和 completion_rxs
65 AddBatchHandle {
66 task_ids: Vec<TaskId>,
67 completion_rxs: Vec<CompletionReceiver>,
68 },
69 /// Add single timer handle, only contains necessary data: task_id and completion_rx
70 ///
71 /// 添加单个定时器句柄,仅包含必要数据:task_id 和 completion_rx
72 AddTimerHandle {
73 task_id: TaskId,
74 completion_rx: CompletionReceiver,
75 },
76}
77
78/// TimerService - timer service based on Actor pattern
79/// Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.
80/// # Features
81/// - Automatically listens to all added timer handles' timeout events
82/// - Automatically removes one-shot tasks from internal management after timeout
83/// - Continuously monitors periodic tasks and forwards each invocation
84/// - Aggregates notifications (both one-shot and periodic) to be forwarded to the user's unified channel
85/// - Supports dynamic addition of BatchHandle and TimerHandle
86///
87///
88/// # 定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户
89/// - 自动监听所有添加的定时器句柄的超时事件
90/// - 自动在一次性任务超时后从内部管理中移除任务
91/// - 持续监听周期性任务并转发每次调用通知
92/// - 将通知(一次性和周期性)聚合转发给用户
93/// - 支持动态添加 BatchHandle 和 TimerHandle
94///
95/// # Examples (示例)
96/// ```no_run
97/// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
98/// use std::time::Duration;
99///
100/// #[tokio::main]
101/// async fn main() {
102/// let timer = TimerWheel::with_defaults();
103/// let mut service = timer.create_service(ServiceConfig::default());
104///
105/// // Register one-shot tasks (注册一次性任务)
106/// use kestrel_timer::TimerTask;
107/// let handles = service.allocate_handles(3);
108/// let tasks: Vec<_> = (0..3)
109/// .map(|i| {
110/// let callback = Some(CallbackWrapper::new(move || async move {
111/// println!("One-shot timer {} fired!", i);
112/// }));
113/// TimerTask::new_oneshot(Duration::from_millis(100), callback)
114/// })
115/// .collect();
116/// service.register_batch(handles, tasks).unwrap();
117///
118/// // Register periodic tasks (注册周期性任务)
119/// let handle = service.allocate_handle();
120/// let periodic_task = TimerTask::new_periodic(
121/// Duration::from_millis(100),
122/// Duration::from_millis(50),
123/// Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
124/// None
125/// );
126/// service.register(handle, periodic_task).unwrap();
127///
128/// // Receive notifications (接收通知)
129/// let rx = service.take_receiver().unwrap();
130/// while let Some(notification) = rx.recv().await {
131/// match notification {
132/// TaskNotification::OneShot(task_id) => {
133/// println!("One-shot task {:?} expired", task_id);
134/// }
135/// TaskNotification::Periodic(task_id) => {
136/// println!("Periodic task {:?} called", task_id);
137/// }
138/// }
139/// }
140/// }
141/// ```
142pub struct TimerService {
143 /// Command sender
144 ///
145 /// 命令发送器
146 command_tx: spsc::Sender<ServiceCommand, 32>,
147 /// Timeout receiver (supports both one-shot and periodic task notifications)
148 ///
149 /// 超时接收器(支持一次性和周期性任务通知)
150 timeout_rx: Option<spsc::Receiver<TaskNotification, 32>>,
151 /// Actor task handle
152 ///
153 /// Actor 任务句柄
154 actor_handle: Option<JoinHandle<()>>,
155 /// Timing wheel reference (for direct scheduling of timers)
156 ///
157 /// 时间轮引用(用于直接调度定时器)
158 wheel: Arc<Mutex<Wheel>>,
159 /// Actor shutdown signal sender
160 ///
161 /// Actor 关闭信号发送器
162 shutdown_tx: Option<Sender<()>>,
163}
164
165impl TimerService {
166 /// Allocate a handle from DeferredMap
167 ///
168 /// # Returns
169 /// A unique handle for later insertion
170 ///
171 /// # 返回值
172 /// 用于后续插入的唯一 handle
173 ///
174 /// # Examples (示例)
175 /// ```no_run
176 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
177 /// # #[tokio::main]
178 /// # async fn main() {
179 /// let timer = TimerWheel::with_defaults();
180 /// let mut service = timer.create_service(ServiceConfig::default());
181 ///
182 /// // Allocate handle first
183 /// // 先分配handle
184 /// let handle = service.allocate_handle();
185 /// # }
186 /// ```
187 pub fn allocate_handle(&self) -> crate::task::TaskHandle {
188 self.wheel.lock().allocate_handle()
189 }
190
191 /// Batch allocate handles from DeferredMap
192 ///
193 /// # Parameters
194 /// - `count`: Number of handles to allocate
195 ///
196 /// # Returns
197 /// Vector of unique handles for later batch insertion
198 ///
199 /// # 参数
200 /// - `count`: 要分配的 handle 数量
201 ///
202 /// # 返回值
203 /// 用于后续批量插入的唯一 handles 向量
204 ///
205 /// # Examples (示例)
206 /// ```no_run
207 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
208 /// # #[tokio::main]
209 /// # async fn main() {
210 /// let timer = TimerWheel::with_defaults();
211 /// let service = timer.create_service(ServiceConfig::default());
212 ///
213 /// // Batch allocate handles
214 /// // 批量分配 handles
215 /// let handles = service.allocate_handles(10);
216 /// assert_eq!(handles.len(), 10);
217 /// # }
218 /// ```
219 pub fn allocate_handles(&self, count: usize) -> Vec<crate::task::TaskHandle> {
220 self.wheel.lock().allocate_handles(count)
221 }
222
223 /// Create new TimerService
224 ///
225 /// # Parameters
226 /// - `wheel`: Timing wheel reference
227 /// - `config`: Service configuration
228 ///
229 /// # Notes
230 /// Typically not called directly, but used to create through `TimerWheel::create_service()`
231 ///
232 /// 创建新的 TimerService
233 ///
234 /// # 参数
235 /// - `wheel`: 时间轮引用
236 /// - `config`: 服务配置
237 ///
238 /// # 注意
239 /// 通常不直接调用,而是通过 `TimerWheel::create_service()` 创建
240 ///
241 pub(crate) fn new(wheel: Arc<Mutex<Wheel>>, config: ServiceConfig) -> Self {
242 let (command_tx, command_rx) = spsc::channel(config.command_channel_capacity);
243 let (timeout_tx, timeout_rx) = spsc::channel(config.timeout_channel_capacity);
244
245 let (shutdown_tx, shutdown_rx) = channel::<()>();
246 let actor = ServiceActor::new(command_rx, timeout_tx, shutdown_rx);
247 let actor_handle = tokio::spawn(async move {
248 actor.run().await;
249 });
250
251 Self {
252 command_tx,
253 timeout_rx: Some(timeout_rx),
254 actor_handle: Some(actor_handle),
255 wheel,
256 shutdown_tx: Some(shutdown_tx),
257 }
258 }
259
260 /// Get timeout receiver (transfer ownership)
261 ///
262 /// # Returns
263 /// Timeout notification receiver, if already taken, returns None
264 ///
265 /// # Notes
266 /// This method can only be called once, because it transfers ownership of the receiver
267 /// The receiver will receive both one-shot task expired notifications and periodic task called notifications
268 ///
269 /// 获取超时通知接收器(转移所有权)
270 ///
271 /// # 返回值
272 /// 超时通知接收器,如果已经取走,返回 None
273 ///
274 /// # 注意
275 /// 此方法只能调用一次,因为它转移了接收器的所有权
276 /// 接收器将接收一次性任务过期通知和周期性任务被调用通知
277 ///
278 /// # Examples (示例)
279 /// ```no_run
280 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig, TaskNotification};
281 /// # #[tokio::main]
282 /// # async fn main() {
283 /// let timer = TimerWheel::with_defaults();
284 /// let mut service = timer.create_service(ServiceConfig::default());
285 ///
286 /// let rx = service.take_receiver().unwrap();
287 /// while let Some(notification) = rx.recv().await {
288 /// match notification {
289 /// TaskNotification::OneShot(task_id) => {
290 /// println!("One-shot task {:?} expired", task_id);
291 /// }
292 /// TaskNotification::Periodic(task_id) => {
293 /// println!("Periodic task {:?} called", task_id);
294 /// }
295 /// }
296 /// }
297 /// # }
298 /// ```
299 pub fn take_receiver(&mut self) -> Option<spsc::Receiver<TaskNotification, 32>> {
300 self.timeout_rx.take()
301 }
302
303 /// Cancel specified task
304 ///
305 /// # Parameters
306 /// - `task_id`: Task ID to cancel
307 ///
308 /// # Returns
309 /// - `true`: Task exists and cancellation is successful
310 /// - `false`: Task does not exist or cancellation fails
311 ///
312 /// 取消指定任务
313 ///
314 /// # 参数
315 /// - `task_id`: 任务 ID
316 ///
317 /// # 返回值
318 /// - `true`: 任务存在且取消成功
319 /// - `false`: 任务不存在或取消失败
320 ///
321 /// # Examples (示例)
322 /// ```no_run
323 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
324 /// # use std::time::Duration;
325 /// #
326 /// # #[tokio::main]
327 /// # async fn main() {
328 /// let timer = TimerWheel::with_defaults();
329 /// let service = timer.create_service(ServiceConfig::default());
330 ///
331 /// // Use two-step API to schedule timers
332 /// let handle = service.allocate_handle();
333 /// let task_id = handle.task_id();
334 /// let callback = Some(CallbackWrapper::new(|| async move {
335 /// println!("Timer fired!"); // 定时器触发
336 /// }));
337 /// let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
338 /// service.register(handle, task).unwrap(); // 注册定时器
339 ///
340 /// // Cancel task
341 /// let cancelled = service.cancel_task(task_id);
342 /// println!("Task cancelled: {}", cancelled); // 任务取消
343 /// # }
344 /// ```
345 #[inline]
346 pub fn cancel_task(&self, task_id: TaskId) -> bool {
347 // Direct cancellation, no need to notify Actor
348 // FuturesUnordered will automatically clean up when tasks are cancelled
349 // 直接取消,无需通知 Actor
350 // FuturesUnordered 将在任务取消时自动清理
351 let mut wheel = self.wheel.lock();
352 wheel.cancel(task_id)
353 }
354
355 /// Batch cancel tasks
356 ///
357 /// Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
358 ///
359 /// # Parameters
360 /// - `task_ids`: List of task IDs to cancel
361 ///
362 /// # Returns
363 /// Number of successfully cancelled tasks
364 ///
365 /// 批量取消任务
366 ///
367 /// # 参数
368 /// - `task_ids`: 任务 ID 列表
369 ///
370 /// # 返回值
371 /// 成功取消的任务数量
372 ///
373 /// # Examples (示例)
374 /// ```no_run
375 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
376 /// # use std::time::Duration;
377 /// #
378 /// # #[tokio::main]
379 /// # async fn main() {
380 /// let timer = TimerWheel::with_defaults();
381 /// let service = timer.create_service(ServiceConfig::default());
382 ///
383 /// let handles = service.allocate_handles(10);
384 /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
385 /// let tasks: Vec<_> = (0..10)
386 /// .map(|i| {
387 /// let callback = Some(CallbackWrapper::new(move || async move {
388 /// println!("Timer {} fired!", i); // 定时器触发
389 /// }));
390 /// TimerTask::new_oneshot(Duration::from_secs(10), callback)
391 /// })
392 /// .collect();
393 /// service.register_batch(handles, tasks).unwrap(); // 注册定时器
394 ///
395 /// // Batch cancel
396 /// let cancelled = service.cancel_batch(&task_ids);
397 /// println!("Cancelled {} tasks", cancelled); // 任务取消
398 /// # }
399 /// ```
400 #[inline]
401 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
402 if task_ids.is_empty() {
403 return 0;
404 }
405
406 // Direct batch cancellation, no need to notify Actor
407 // FuturesUnordered will automatically clean up when tasks are cancelled
408 // 直接批量取消,无需通知 Actor
409 // FuturesUnordered 将在任务取消时自动清理
410 let mut wheel = self.wheel.lock();
411 wheel.cancel_batch(task_ids)
412 }
413
414 /// Postpone task (optionally replace callback)
415 ///
416 /// # Parameters
417 /// - `task_id`: Task ID to postpone
418 /// - `new_delay`: New delay time (recalculated from current time point)
419 /// - `callback`: New callback function (if `None`, keeps the original callback)
420 ///
421 /// # Returns
422 /// - `true`: Task exists and is successfully postponed
423 /// - `false`: Task does not exist or postponement fails
424 ///
425 /// # Notes
426 /// - Task ID remains unchanged after postponement
427 /// - Original timeout notification remains valid
428 /// - If callback is `Some`, it will replace the original callback
429 /// - If callback is `None`, the original callback is preserved
430 ///
431 /// 推迟任务 (可选替换回调)
432 ///
433 /// # 参数
434 /// - `task_id`: 任务 ID
435 /// - `new_delay`: 新的延迟时间 (从当前时间点重新计算)
436 /// - `callback`: 新的回调函数 (如果为 `None`,则保留原回调)
437 ///
438 /// # 返回值
439 /// - `true`: 任务存在且延期成功
440 /// - `false`: 任务不存在或延期失败
441 ///
442 /// # 注意
443 /// - 任务 ID 在延期后保持不变
444 /// - 原始超时通知保持有效
445 /// - 如果 callback 为 `Some`,将替换原始回调
446 /// - 如果 callback 为 `None`,保留原始回调
447 ///
448 /// # Examples (示例)
449 /// ```no_run
450 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
451 /// # use std::time::Duration;
452 /// #
453 /// # #[tokio::main]
454 /// # async fn main() {
455 /// let timer = TimerWheel::with_defaults();
456 /// let service = timer.create_service(ServiceConfig::default());
457 ///
458 /// let handle = service.allocate_handle();
459 /// let task_id = handle.task_id();
460 /// let callback = Some(CallbackWrapper::new(|| async {
461 /// println!("Original callback"); // 原始回调
462 /// }));
463 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
464 /// service.register(handle, task).unwrap(); // 注册定时器
465 ///
466 /// // Postpone and replace callback (延期并替换回调)
467 /// let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
468 /// let success = service.postpone(
469 /// task_id,
470 /// Duration::from_secs(10),
471 /// new_callback
472 /// );
473 /// println!("Postponed successfully: {}", success);
474 /// # }
475 /// ```
476 #[inline]
477 pub fn postpone(
478 &self,
479 task_id: TaskId,
480 new_delay: Duration,
481 callback: Option<CallbackWrapper>,
482 ) -> bool {
483 let mut wheel = self.wheel.lock();
484 wheel.postpone(task_id, new_delay, callback)
485 }
486
487 /// Batch postpone tasks (keep original callbacks)
488 ///
489 /// # Parameters
490 /// - `updates`: List of tuples of (task ID, new delay)
491 ///
492 /// # Returns
493 /// Number of successfully postponed tasks
494 ///
495 /// 批量延期任务 (保持原始回调)
496 ///
497 /// # 参数
498 /// - `updates`: (任务 ID, 新延迟) 元组列表
499 ///
500 /// # 返回值
501 /// 成功延期的任务数量
502 ///
503 /// # Examples (示例)
504 /// ```no_run
505 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
506 /// # use std::time::Duration;
507 /// #
508 /// # #[tokio::main]
509 /// # async fn main() {
510 /// let timer = TimerWheel::with_defaults();
511 /// let service = timer.create_service(ServiceConfig::default());
512 ///
513 /// let handles = service.allocate_handles(3);
514 /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
515 /// let tasks: Vec<_> = (0..3)
516 /// .map(|i| {
517 /// let callback = Some(CallbackWrapper::new(move || async move {
518 /// println!("Timer {} fired!", i);
519 /// }));
520 /// TimerTask::new_oneshot(Duration::from_secs(5), callback)
521 /// })
522 /// .collect();
523 /// service.register_batch(handles, tasks).unwrap();
524 ///
525 /// // Batch postpone (keep original callbacks)
526 /// // 批量延期任务 (保持原始回调)
527 /// let updates: Vec<_> = task_ids
528 /// .into_iter()
529 /// .map(|id| (id, Duration::from_secs(10)))
530 /// .collect();
531 /// let postponed = service.postpone_batch(updates);
532 /// println!("Postponed {} tasks", postponed);
533 /// # }
534 /// ```
535 #[inline]
536 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
537 if updates.is_empty() {
538 return 0;
539 }
540
541 let mut wheel = self.wheel.lock();
542 wheel.postpone_batch(updates)
543 }
544
545 /// Batch postpone tasks (replace callbacks)
546 ///
547 /// # Parameters
548 /// - `updates`: List of tuples of (task ID, new delay, new callback)
549 ///
550 /// # Returns
551 /// Number of successfully postponed tasks
552 ///
553 /// 批量延期任务 (替换回调)
554 ///
555 /// # 参数
556 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
557 ///
558 /// # 返回值
559 /// 成功延期的任务数量
560 ///
561 /// # Examples (示例)
562 /// ```no_run
563 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
564 /// # use std::time::Duration;
565 /// #
566 /// # #[tokio::main]
567 /// # async fn main() {
568 /// # use kestrel_timer::TimerTask;
569 /// let timer = TimerWheel::with_defaults();
570 /// let service = timer.create_service(ServiceConfig::default());
571 ///
572 /// // Create 3 tasks, initially no callbacks
573 /// // 创建 3 个任务,最初没有回调
574 /// let handles = service.allocate_handles(3);
575 /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
576 /// let tasks: Vec<_> = (0..3)
577 /// .map(|_| {
578 /// TimerTask::new_oneshot(Duration::from_secs(5), None)
579 /// })
580 /// .collect();
581 /// service.register_batch(handles, tasks).unwrap();
582 ///
583 /// // Batch postpone and add new callbacks
584 /// // 批量延期并添加新的回调
585 /// let updates: Vec<_> = task_ids
586 /// .into_iter()
587 /// .enumerate()
588 /// .map(|(i, id)| {
589 /// let callback = Some(CallbackWrapper::new(move || async move {
590 /// println!("New callback {}", i);
591 /// }));
592 /// (id, Duration::from_secs(10), callback)
593 /// })
594 /// .collect();
595 /// let postponed = service.postpone_batch_with_callbacks(updates);
596 /// println!("Postponed {} tasks", postponed);
597 /// # }
598 /// ```
599 #[inline]
600 pub fn postpone_batch_with_callbacks(
601 &self,
602 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
603 ) -> usize {
604 if updates.is_empty() {
605 return 0;
606 }
607
608 let mut wheel = self.wheel.lock();
609 wheel.postpone_batch_with_callbacks(updates)
610 }
611
612 /// Register timer task to service (registration phase)
613 ///
614 /// # Parameters
615 /// - `handle`: Handle allocated via `allocate_handle()`
616 /// - `task`: Task created via `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
617 ///
618 /// # Returns
619 /// - `Ok(TimerHandle)`: Register successfully
620 /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
621 ///
622 /// 注册定时器任务到服务 (注册阶段)
623 /// # 参数
624 /// - `handle`: 通过 `allocate_handle()` 分配的 handle
625 /// - `task`: 通过 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建的任务
626 ///
627 /// # 返回值
628 /// - `Ok(TimerHandle)`: 注册成功
629 /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关闭)
630 ///
631 /// # Examples (示例)
632 /// ```no_run
633 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
634 /// # use std::time::Duration;
635 /// #
636 /// # #[tokio::main]
637 /// # async fn main() {
638 /// let timer = TimerWheel::with_defaults();
639 /// let service = timer.create_service(ServiceConfig::default());
640 ///
641 /// // Step 1: allocate handle
642 /// // 分配 handle
643 /// let handle = service.allocate_handle();
644 /// let task_id = handle.task_id();
645 ///
646 /// // Step 2: create task
647 /// // 创建任务
648 /// let callback = Some(CallbackWrapper::new(|| async move {
649 /// println!("Timer fired!");
650 /// }));
651 /// let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
652 ///
653 /// // Step 3: register task
654 /// // 注册任务
655 /// service.register(handle, task).unwrap();
656 /// # }
657 /// ```
658 #[inline]
659 pub fn register(
660 &self,
661 handle: crate::task::TaskHandle,
662 task: crate::task::TimerTask,
663 ) -> Result<TimerHandle, TimerError> {
664 let task_id = handle.task_id();
665
666 let (task, completion_rx) =
667 crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
668
669 // Single lock, complete all operations
670 // 单次锁定,完成所有操作
671 {
672 let mut wheel_guard = self.wheel.lock();
673 wheel_guard.insert(handle, task);
674 }
675
676 // Add to service management (only send necessary data)
677 // 添加到服务管理(只发送必要数据)
678 self.command_tx
679 .try_send(ServiceCommand::AddTimerHandle {
680 task_id,
681 completion_rx,
682 })
683 .map_err(|_| TimerError::RegisterFailed)?;
684
685 Ok(TimerHandle::new(task_id, self.wheel.clone()))
686 }
687
688 /// Batch register timer tasks to service (registration phase)
689 ///
690 /// # Parameters
691 /// - `handles`: Pre-allocated handles for tasks
692 /// - `tasks`: List of timer tasks
693 ///
694 /// # Returns
695 /// - `Ok(BatchHandle)`: Register successfully
696 /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
697 /// - `Err(TimerError::BatchLengthMismatch)`: Handles and tasks lengths don't match
698 ///
699 /// 批量注册定时器任务到服务 (注册阶段)
700 /// # 参数
701 /// - `handles`: 任务的预分配 handles
702 /// - `tasks`: 定时器任务列表
703 ///
704 /// # 返回值
705 /// - `Ok(BatchHandle)`: 注册成功
706 /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关闭)
707 /// - `Err(TimerError::BatchLengthMismatch)`: handles 和 tasks 长度不匹配
708 ///
709 /// # Examples (示例)
710 /// ```no_run
711 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
712 /// # use std::time::Duration;
713 /// #
714 /// # #[tokio::main]
715 /// # async fn main() {
716 /// # use kestrel_timer::TimerTask;
717 /// let timer = TimerWheel::with_defaults();
718 /// let service = timer.create_service(ServiceConfig::default());
719 ///
720 /// // Step 1: batch allocate handles
721 /// // 批量分配 handles
722 /// let handles = service.allocate_handles(3);
723 ///
724 /// // Step 2: create tasks
725 /// // 创建任务
726 /// let tasks: Vec<_> = (0..3)
727 /// .map(|i| {
728 /// let callback = Some(CallbackWrapper::new(move || async move {
729 /// println!("Timer {} fired!", i);
730 /// }));
731 /// TimerTask::new_oneshot(Duration::from_secs(1), callback)
732 /// })
733 /// .collect();
734 ///
735 /// // Step 3: register batch
736 /// // 注册批量任务
737 /// service.register_batch(handles, tasks).unwrap();
738 /// # }
739 /// ```
740 #[inline]
741 pub fn register_batch(
742 &self,
743 handles: Vec<crate::task::TaskHandle>,
744 tasks: Vec<crate::task::TimerTask>,
745 ) -> Result<BatchHandle, TimerError> {
746 // Validate lengths match
747 if handles.len() != tasks.len() {
748 return Err(TimerError::BatchLengthMismatch {
749 handles_len: handles.len(),
750 tasks_len: tasks.len(),
751 });
752 }
753
754 let task_count = tasks.len();
755 let mut completion_rxs = Vec::with_capacity(task_count);
756 let mut task_ids = Vec::with_capacity(task_count);
757 let mut prepared_handles = Vec::with_capacity(task_count);
758 let mut prepared_tasks = Vec::with_capacity(task_count);
759
760 // Step 1: prepare all channels and notifiers (no lock)
761 // 步骤 1: 准备所有通道和通知器(无锁)
762 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
763 let task_id = handle.task_id();
764 let (task, completion_rx) =
765 crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
766 task_ids.push(task_id);
767 completion_rxs.push(completion_rx);
768 prepared_handles.push(handle);
769 prepared_tasks.push(task);
770 }
771
772 // Step 2: single lock, batch insert
773 // 步骤 2: 单次锁定,批量插入
774 {
775 let mut wheel_guard = self.wheel.lock();
776 wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
777 }
778
779 // Add to service management (only send necessary data)
780 // 添加到服务管理(只发送必要数据)
781 self.command_tx
782 .try_send(ServiceCommand::AddBatchHandle {
783 task_ids: task_ids.clone(),
784 completion_rxs,
785 })
786 .map_err(|_| TimerError::RegisterFailed)?;
787
788 Ok(BatchHandle::new(task_ids, self.wheel.clone()))
789 }
790
791 /// Graceful shutdown of TimerService
792 ///
793 /// 优雅关闭 TimerService
794 ///
795 /// # Examples (示例)
796 /// ```no_run
797 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
798 /// # #[tokio::main]
799 /// # async fn main() {
800 /// let timer = TimerWheel::with_defaults();
801 /// let mut service = timer.create_service(ServiceConfig::default());
802 ///
803 /// // Use service... (使用服务...)
804 ///
805 /// service.shutdown().await;
806 /// # }
807 /// ```
808 pub async fn shutdown(mut self) {
809 if let Some(shutdown_tx) = self.shutdown_tx.take() {
810 shutdown_tx.notify(());
811 }
812 if let Some(handle) = self.actor_handle.take() {
813 let _ = handle.await;
814 }
815 }
816}
817
818impl Drop for TimerService {
819 fn drop(&mut self) {
820 if let Some(handle) = self.actor_handle.take() {
821 handle.abort();
822 }
823 }
824}
825
826/// ServiceActor - internal Actor implementation
827///
828/// ServiceActor - 内部 Actor 实现
829struct ServiceActor {
830 /// Command receiver
831 ///
832 /// 命令接收器
833 command_rx: spsc::Receiver<ServiceCommand, 32>,
834 /// Timeout sender (supports both one-shot and periodic task notifications)
835 ///
836 /// 超时发送器(支持一次性和周期性任务通知)
837 timeout_tx: spsc::Sender<TaskNotification, 32>,
838 /// Actor shutdown signal receiver
839 ///
840 /// Actor 关闭信号接收器
841 shutdown_rx: Receiver<()>,
842}
843
844impl ServiceActor {
845 /// Create new ServiceActor
846 ///
847 /// 创建新的 ServiceActor
848 fn new(
849 command_rx: spsc::Receiver<ServiceCommand, 32>,
850 timeout_tx: spsc::Sender<TaskNotification, 32>,
851 shutdown_rx: Receiver<()>,
852 ) -> Self {
853 Self {
854 command_rx,
855 timeout_tx,
856 shutdown_rx,
857 }
858 }
859
860 /// Run Actor event loop
861 ///
862 /// 运行 Actor 事件循环
863 async fn run(self) {
864 // Use separate FuturesUnordered for one-shot and periodic tasks
865 // 为一次性任务和周期性任务使用独立的 FuturesUnordered
866
867 // One-shot futures: each future returns (TaskId, TaskCompletion)
868 // 一次性任务futures:每个future 返回 (TaskId, TaskCompletion)
869 let mut oneshot_futures: FuturesUnordered<BoxFuture<'static, (TaskId, TaskCompletion)>> =
870 FuturesUnordered::new();
871
872 // Periodic futures: each future returns (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
873 // The receiver is returned so we can continue listening for next event
874 // 周期性任务futures:每个future 返回 (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
875 // 返回接收器以便我们可以继续监听下一个事件
876 type PeriodicFutureResult = (
877 TaskId,
878 Option<TaskCompletion>,
879 crate::task::PeriodicCompletionReceiver,
880 );
881 let mut periodic_futures: FuturesUnordered<BoxFuture<'static, PeriodicFutureResult>> =
882 FuturesUnordered::new();
883
884 // Move shutdown_rx out of self, so it can be used in select! with &mut
885 // 将 shutdown_rx 从 self 中移出,以便在 select! 中使用 &mut
886 let mut shutdown_rx = self.shutdown_rx;
887
888 loop {
889 tokio::select! {
890 // Listen to high-priority shutdown signal
891 // 监听高优先级关闭信号
892 _ = &mut shutdown_rx => {
893 // Receive shutdown signal, exit loop immediately
894 // 接收到关闭信号,立即退出循环
895 break;
896 }
897
898 // Listen to one-shot task timeout events
899 // 监听一次性任务超时事件
900 Some((task_id, completion)) = oneshot_futures.next() => {
901 // Check completion reason, only forward Called events, do not forward Cancelled events
902 // 检查完成原因,只转发 Called 事件,不转发 Cancelled 事件
903 if completion == TaskCompletion::Called {
904 let _ = self.timeout_tx.send(TaskNotification::OneShot(task_id)).await;
905 }
906 // Task will be automatically removed from FuturesUnordered
907 // 任务将自动从 FuturesUnordered 中移除
908 }
909
910 // Listen to periodic task events
911 // 监听周期性任务事件
912 Some((task_id, reason, mut receiver)) = periodic_futures.next() => {
913 // Check completion reason, only forward Called events, do not forward Cancelled events
914 // 检查完成原因,只转发 Called 事件,不转发 Cancelled 事件
915 if let Some(TaskCompletion::Called) = reason {
916 let _ = self.timeout_tx.send(TaskNotification::Periodic(task_id)).await;
917
918 // Re-add the receiver to continue listening for next periodic event
919 // 重新添加接收器以继续监听下一个周期性事件
920 let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
921 let reason = receiver.recv().await;
922 (task_id, reason, receiver)
923 });
924 periodic_futures.push(future);
925 }
926 // If Cancelled or None, do not re-add the future (task is done)
927 // 如果 Cancelled 或 None,不重新添加 future(任务结束)
928 }
929
930 // Listen to commands
931 // 监听命令
932 Some(cmd) = self.command_rx.recv() => {
933 match cmd {
934 ServiceCommand::AddBatchHandle { task_ids, completion_rxs } => {
935 // Add all tasks to appropriate futures
936 // 将所有任务添加到相应 futures
937 for (task_id, rx) in task_ids.into_iter().zip(completion_rxs.into_iter()) {
938 match rx {
939 crate::task::CompletionReceiver::OneShot(receiver) => {
940 let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
941 (task_id, receiver.wait().await)
942 });
943 oneshot_futures.push(future);
944 },
945 crate::task::CompletionReceiver::Periodic(mut receiver) => {
946 let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
947 let reason = receiver.recv().await;
948 (task_id, reason, receiver)
949 });
950 periodic_futures.push(future);
951 }
952 }
953 }
954 }
955 ServiceCommand::AddTimerHandle { task_id, completion_rx } => {
956 // Add to appropriate futures
957 // 添加到相应的 futures
958 match completion_rx {
959 crate::task::CompletionReceiver::OneShot(receiver) => {
960 let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
961 (task_id, receiver.wait().await)
962 });
963 oneshot_futures.push(future);
964 },
965 crate::task::CompletionReceiver::Periodic(mut receiver) => {
966 let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
967 let reason = receiver.recv().await;
968 (task_id, reason, receiver)
969 });
970 periodic_futures.push(future);
971 }
972 }
973 }
974 }
975 }
976
977 // If no futures and command channel is closed, exit loop
978 // 如果没有 futures 且命令通道关闭,退出循环
979 else => {
980 break;
981 }
982 }
983 }
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990 use crate::{TimerTask, TimerWheel};
991 use std::sync::Arc;
992 use std::sync::atomic::{AtomicU32, Ordering};
993 use std::time::Duration;
994
995 #[tokio::test]
996 async fn test_service_creation() {
997 let timer = TimerWheel::with_defaults();
998 let _service = timer.create_service(ServiceConfig::default());
999 }
1000
1001 #[tokio::test]
1002 async fn test_add_timer_handle_and_receive_timeout() {
1003 let timer = TimerWheel::with_defaults();
1004 let mut service = timer.create_service(ServiceConfig::default());
1005
1006 // Allocate handle (分配 handle)
1007 let handle = service.allocate_handle();
1008 let task_id = handle.task_id();
1009
1010 // Create single timer (创建单个定时器)
1011 let task = TimerTask::new_oneshot(
1012 Duration::from_millis(50),
1013 Some(CallbackWrapper::new(|| async {})),
1014 );
1015
1016 // Register to service (注册到服务)
1017 service.register(handle, task).unwrap();
1018
1019 // Receive timeout notification (接收超时通知)
1020 let rx = service.take_receiver().unwrap();
1021 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1022 .await
1023 .expect("Should receive timeout notification")
1024 .expect("Should receive Some value");
1025
1026 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_shutdown() {
1031 let timer = TimerWheel::with_defaults();
1032 let service = timer.create_service(ServiceConfig::default());
1033
1034 // Add some timers (添加一些定时器)
1035 let handle1 = service.allocate_handle();
1036 let handle2 = service.allocate_handle();
1037 let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1038 let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1039 service.register(handle1, task1).unwrap();
1040 service.register(handle2, task2).unwrap();
1041
1042 // Immediately shutdown (without waiting for timers to trigger) (立即关闭(不等待定时器触发))
1043 service.shutdown().await;
1044 }
1045
1046 #[tokio::test]
1047 async fn test_schedule_once_direct() {
1048 let timer = TimerWheel::with_defaults();
1049 let mut service = timer.create_service(ServiceConfig::default());
1050 let counter = Arc::new(AtomicU32::new(0));
1051
1052 // Schedule timer directly through service
1053 // 直接通过服务调度定时器
1054 let counter_clone = Arc::clone(&counter);
1055 let handle = service.allocate_handle();
1056 let task_id = handle.task_id();
1057 let task = TimerTask::new_oneshot(
1058 Duration::from_millis(50),
1059 Some(CallbackWrapper::new(move || {
1060 let counter = Arc::clone(&counter_clone);
1061 async move {
1062 counter.fetch_add(1, Ordering::SeqCst);
1063 }
1064 })),
1065 );
1066 service.register(handle, task).unwrap();
1067
1068 // Wait for timer to trigger
1069 // 等待定时器触发
1070 let rx = service.take_receiver().unwrap();
1071 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1072 .await
1073 .expect("Should receive timeout notification")
1074 .expect("Should receive Some value");
1075
1076 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1077
1078 // Wait for callback to execute
1079 // 等待回调执行
1080 tokio::time::sleep(Duration::from_millis(50)).await;
1081 assert_eq!(counter.load(Ordering::SeqCst), 1);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_schedule_once_notify_direct() {
1086 let timer = TimerWheel::with_defaults();
1087 let mut service = timer.create_service(ServiceConfig::default());
1088
1089 // Schedule only notification timer directly through service (no callback)
1090 // 直接通过服务调度通知定时器(没有回调函数)
1091 let handle = service.allocate_handle();
1092 let task_id = handle.task_id();
1093 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1094 service.register(handle, task).unwrap();
1095
1096 // Receive timeout notification
1097 // 接收超时通知
1098 let rx = service.take_receiver().unwrap();
1099 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1100 .await
1101 .expect("Should receive timeout notification")
1102 .expect("Should receive Some value");
1103
1104 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1105 }
1106
1107 #[tokio::test]
1108 async fn test_task_timeout_cleans_up_task_sender() {
1109 let timer = TimerWheel::with_defaults();
1110 let mut service = timer.create_service(ServiceConfig::default());
1111
1112 // Add a short-term timer (添加短期定时器)
1113 let handle = service.allocate_handle();
1114 let task_id = handle.task_id();
1115 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1116
1117 service.register(handle, task).unwrap();
1118
1119 // Wait for task timeout (等待任务超时)
1120 let rx = service.take_receiver().unwrap();
1121 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1122 .await
1123 .expect("Should receive timeout notification")
1124 .expect("Should receive Some value");
1125
1126 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1127
1128 // Wait a moment to ensure internal cleanup is complete (等待片刻以确保内部清理完成)
1129 tokio::time::sleep(Duration::from_millis(10)).await;
1130
1131 // Try to cancel the timed-out task, should return false (尝试取消超时任务,应返回 false)
1132 let cancelled = service.cancel_task(task_id);
1133 assert!(!cancelled, "Timed out task should not exist anymore");
1134 }
1135
1136 #[tokio::test]
1137 async fn test_take_receiver_twice() {
1138 let timer = TimerWheel::with_defaults();
1139 let mut service = timer.create_service(ServiceConfig::default());
1140
1141 // First call should return Some
1142 // 第一次调用应该返回 Some
1143 let rx1 = service.take_receiver();
1144 assert!(rx1.is_some(), "First take_receiver should return Some");
1145
1146 // Second call should return None
1147 // 第二次调用应该返回 None
1148 let rx2 = service.take_receiver();
1149 assert!(rx2.is_none(), "Second take_receiver should return None");
1150 }
1151}