kestrel_timer/service.rs
1use lite_sync::{oneshot, spsc};
2use crate::{BatchHandle, TimerHandle};
3use crate::config::ServiceConfig;
4use crate::error::TimerError;
5use crate::task::{CallbackWrapper, CompletionReceiver, TaskCompletion, TaskId};
6use crate::wheel::Wheel;
7use futures::stream::{FuturesUnordered, StreamExt};
8use futures::future::BoxFuture;
9use parking_lot::Mutex;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::task::JoinHandle;
13
14/// Task notification type for distinguishing between one-shot and periodic tasks
15///
16/// 任务通知类型,用于区分一次性任务和周期性任�?
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum TaskNotification {
19 /// One-shot task expired notification
20 ///
21 /// 一次性任务过期通知
22 OneShot(TaskId),
23 /// Periodic task called notification
24 ///
25 /// 周期性任务被调用通知
26 Periodic(TaskId),
27}
28
29impl TaskNotification {
30 /// Get the task ID from the notification
31 ///
32 /// 从通知中获取任�?ID
33 pub fn task_id(&self) -> TaskId {
34 match self {
35 TaskNotification::OneShot(id) => *id,
36 TaskNotification::Periodic(id) => *id,
37 }
38 }
39
40 /// Check if this is a one-shot task notification
41 ///
42 /// 检查是否为一次性任务通知
43 pub fn is_oneshot(&self) -> bool {
44 matches!(self, TaskNotification::OneShot(_))
45 }
46
47 /// Check if this is a periodic task notification
48 ///
49 /// 检查是否为周期性任务通知
50 pub fn is_periodic(&self) -> bool {
51 matches!(self, TaskNotification::Periodic(_))
52 }
53}
54
55/// Service command type
56///
57/// 服务命令类型
58enum ServiceCommand {
59 /// Add batch timer handle, only contains necessary data: task_ids and completion_rxs
60 ///
61 /// 添加批量定时器句柄,仅包含必要数�? task_ids �?completion_rxs
62 AddBatchHandle {
63 task_ids: Vec<TaskId>,
64 completion_rxs: Vec<CompletionReceiver>,
65 },
66 /// Add single timer handle, only contains necessary data: task_id and completion_rx
67 ///
68 /// 添加单个定时器句柄,仅包含必要数�? task_id �?completion_rx
69 AddTimerHandle {
70 task_id: TaskId,
71 completion_rx: CompletionReceiver,
72 },
73}
74
75/// TimerService - timer service based on Actor pattern
76/// Manages multiple timer handles, listens to all timeout events, and aggregates notifications to be forwarded to the user.
77/// # Features
78/// - Automatically listens to all added timer handles' timeout events
79/// - Automatically removes one-shot tasks from internal management after timeout
80/// - Continuously monitors periodic tasks and forwards each invocation
81/// - Aggregates notifications (both one-shot and periodic) to be forwarded to the user's unified channel
82/// - Supports dynamic addition of BatchHandle and TimerHandle
83///
84///
85/// # 定时器服务,基于 Actor 模式管理多个定时器句柄,监听所有超时事件,并将通知聚合转发给用户�?
86/// - 自动监听所有添加的定时器句柄的超时事件
87/// - 自动在一次性任务超时后从内部管理中移除任务
88/// - 持续监听周期性任务并转发每次调用通知
89/// - 将通知(一次性和周期性)聚合转发给用�?
90/// - 支持动态添�?BatchHandle �?TimerHandle
91///
92/// # Examples (示例)
93/// ```no_run
94/// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TaskNotification, config::ServiceConfig};
95/// use std::time::Duration;
96///
97/// #[tokio::main]
98/// async fn main() {
99/// let timer = TimerWheel::with_defaults();
100/// let mut service = timer.create_service(ServiceConfig::default());
101///
102/// // Register one-shot tasks (注册一次性任�?
103/// use kestrel_timer::TimerTask;
104/// let handles = service.allocate_handles(3);
105/// let tasks: Vec<_> = (0..3)
106/// .map(|i| {
107/// let callback = Some(CallbackWrapper::new(move || async move {
108/// println!("One-shot timer {} fired!", i);
109/// }));
110/// TimerTask::new_oneshot(Duration::from_millis(100), callback)
111/// })
112/// .collect();
113/// service.register_batch(handles, tasks).unwrap();
114///
115/// // Register periodic tasks (注册周期性任�?
116/// let handle = service.allocate_handle();
117/// let periodic_task = TimerTask::new_periodic(
118/// Duration::from_millis(100),
119/// Duration::from_millis(50),
120/// Some(CallbackWrapper::new(|| async { println!("Periodic timer fired!"); })),
121/// None
122/// );
123/// service.register(handle, periodic_task).unwrap();
124///
125/// // Receive notifications (接收通知)
126/// let rx = service.take_receiver().unwrap();
127/// while let Some(notification) = rx.recv().await {
128/// match notification {
129/// TaskNotification::OneShot(task_id) => {
130/// println!("One-shot task {:?} expired", task_id);
131/// }
132/// TaskNotification::Periodic(task_id) => {
133/// println!("Periodic task {:?} called", task_id);
134/// }
135/// }
136/// }
137/// }
138/// ```
139pub struct TimerService {
140 /// Command sender
141 ///
142 /// 命令发送器
143 command_tx: spsc::Sender<ServiceCommand, 32>,
144 /// Timeout receiver (supports both one-shot and periodic task notifications)
145 ///
146 /// 超时接收器(支持一次性和周期性任务通知�?
147 timeout_rx: Option<spsc::Receiver<TaskNotification, 32>>,
148 /// Actor task handle
149 ///
150 /// Actor 任务句柄
151 actor_handle: Option<JoinHandle<()>>,
152 /// Timing wheel reference (for direct scheduling of timers)
153 ///
154 /// 时间轮引�?(用于直接调度定时�?
155 wheel: Arc<Mutex<Wheel>>,
156 /// Actor shutdown signal sender
157 ///
158 /// Actor 关闭信号发送器
159 shutdown_tx: Option<oneshot::Sender<()>>,
160}
161
162impl TimerService {
163 /// Allocate a handle from DeferredMap
164 ///
165 /// # Returns
166 /// A unique handle for later insertion
167 ///
168 /// # 返回�?
169 /// 用于后续插入的唯一 handle
170 ///
171 /// # Examples (示例)
172 /// ```no_run
173 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
174 /// # #[tokio::main]
175 /// # async fn main() {
176 /// let timer = TimerWheel::with_defaults();
177 /// let mut service = timer.create_service(ServiceConfig::default());
178 ///
179 /// // Allocate handle first
180 /// // 先分�?handle
181 /// let handle = service.allocate_handle();
182 /// # }
183 /// ```
184 pub fn allocate_handle(&self) -> crate::task::TaskHandle {
185 self.wheel.lock().allocate_handle()
186 }
187
188 /// Batch allocate handles from DeferredMap
189 ///
190 /// # Parameters
191 /// - `count`: Number of handles to allocate
192 ///
193 /// # Returns
194 /// Vector of unique handles for later batch insertion
195 ///
196 /// # 参数
197 /// - `count`: 要分配的 handle 数量
198 ///
199 /// # 返回�?
200 /// 用于后续批量插入的唯一 handles 向量
201 ///
202 /// # Examples (示例)
203 /// ```no_run
204 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
205 /// # #[tokio::main]
206 /// # async fn main() {
207 /// let timer = TimerWheel::with_defaults();
208 /// let service = timer.create_service(ServiceConfig::default());
209 ///
210 /// // Batch allocate handles
211 /// // 批量分配 handles
212 /// let handles = service.allocate_handles(10);
213 /// assert_eq!(handles.len(), 10);
214 /// # }
215 /// ```
216 pub fn allocate_handles(&self, count: usize) -> Vec<crate::task::TaskHandle> {
217 self.wheel.lock().allocate_handles(count)
218 }
219
220 /// Create new TimerService
221 ///
222 /// # Parameters
223 /// - `wheel`: Timing wheel reference
224 /// - `config`: Service configuration
225 ///
226 /// # Notes
227 /// Typically not called directly, but used to create through `TimerWheel::create_service()`
228 ///
229 /// 创建新的 TimerService
230 ///
231 /// # 参数
232 /// - `wheel`: 时间轮引�?
233 /// - `config`: 服务配置
234 ///
235 /// # 注意
236 /// 通常不直接调用,而是通过 `TimerWheel::create_service()` 创建
237 ///
238 pub(crate) fn new(wheel: Arc<Mutex<Wheel>>, config: ServiceConfig) -> Self {
239 let (command_tx, command_rx) = spsc::channel(config.command_channel_capacity);
240 let (timeout_tx, timeout_rx) = spsc::channel(config.timeout_channel_capacity);
241
242 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
243 let actor = ServiceActor::new(command_rx, timeout_tx, shutdown_rx);
244 let actor_handle = tokio::spawn(async move {
245 actor.run().await;
246 });
247
248 Self {
249 command_tx,
250 timeout_rx: Some(timeout_rx),
251 actor_handle: Some(actor_handle),
252 wheel,
253 shutdown_tx: Some(shutdown_tx),
254 }
255 }
256
257 /// Get timeout receiver (transfer ownership)
258 ///
259 /// # Returns
260 /// Timeout notification receiver, if already taken, returns None
261 ///
262 /// # Notes
263 /// This method can only be called once, because it transfers ownership of the receiver
264 /// The receiver will receive both one-shot task expired notifications and periodic task called notifications
265 ///
266 /// 获取超时通知接收�?(转移所有权)
267 ///
268 /// # 返回�?
269 /// 超时通知接收器,如果已经取走,返�?None
270 ///
271 /// # 注意
272 /// 此方法只能调用一次,因为它转移了接收器的所有权
273 /// 接收器将接收一次性任务过期通知和周期性任务被调用通知
274 ///
275 /// # Examples (示例)
276 /// ```no_run
277 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig, TaskNotification};
278 /// # #[tokio::main]
279 /// # async fn main() {
280 /// let timer = TimerWheel::with_defaults();
281 /// let mut service = timer.create_service(ServiceConfig::default());
282 ///
283 /// let rx = service.take_receiver().unwrap();
284 /// while let Some(notification) = rx.recv().await {
285 /// match notification {
286 /// TaskNotification::OneShot(task_id) => {
287 /// println!("One-shot task {:?} expired", task_id);
288 /// }
289 /// TaskNotification::Periodic(task_id) => {
290 /// println!("Periodic task {:?} called", task_id);
291 /// }
292 /// }
293 /// }
294 /// # }
295 /// ```
296 pub fn take_receiver(&mut self) -> Option<spsc::Receiver<TaskNotification, 32>> {
297 self.timeout_rx.take()
298 }
299
300 /// Cancel specified task
301 ///
302 /// # Parameters
303 /// - `task_id`: Task ID to cancel
304 ///
305 /// # Returns
306 /// - `true`: Task exists and cancellation is successful
307 /// - `false`: Task does not exist or cancellation fails
308 ///
309 /// 取消指定任务
310 ///
311 /// # 参数
312 /// - `task_id`: 任务 ID
313 ///
314 /// # 返回�?
315 /// - `true`: 任务存在且取消成�?
316 /// - `false`: 任务不存在或取消失败
317 ///
318 /// # Examples (示例)
319 /// ```no_run
320 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
321 /// # use std::time::Duration;
322 /// #
323 /// # #[tokio::main]
324 /// # async fn main() {
325 /// let timer = TimerWheel::with_defaults();
326 /// let service = timer.create_service(ServiceConfig::default());
327 ///
328 /// // Use two-step API to schedule timers
329 /// let handle = service.allocate_handle();
330 /// let task_id = handle.task_id();
331 /// let callback = Some(CallbackWrapper::new(|| async move {
332 /// println!("Timer fired!"); // 定时器触�?
333 /// }));
334 /// let task = TimerTask::new_oneshot(Duration::from_secs(10), callback);
335 /// service.register(handle, task).unwrap(); // 注册定时�?
336 ///
337 /// // Cancel task
338 /// let cancelled = service.cancel_task(task_id);
339 /// println!("Task cancelled: {}", cancelled); // 任务取消
340 /// # }
341 /// ```
342 #[inline]
343 pub fn cancel_task(&self, task_id: TaskId) -> bool {
344 // Direct cancellation, no need to notify Actor
345 // FuturesUnordered will automatically clean up when tasks are cancelled
346 // 直接取消,无需通知 Actor
347 // FuturesUnordered 将在任务取消时自动清�?
348 let mut wheel = self.wheel.lock();
349 wheel.cancel(task_id)
350 }
351
352 /// Batch cancel tasks
353 ///
354 /// Use underlying batch cancellation operation to cancel multiple tasks at once, performance is better than calling cancel_task repeatedly.
355 ///
356 /// # Parameters
357 /// - `task_ids`: List of task IDs to cancel
358 ///
359 /// # Returns
360 /// Number of successfully cancelled tasks
361 ///
362 /// 批量取消任务
363 ///
364 /// # 参数
365 /// - `task_ids`: 任务 ID 列表
366 ///
367 /// # 返回�?
368 /// 成功取消的任务数�?
369 ///
370 /// # Examples (示例)
371 /// ```no_run
372 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
373 /// # use std::time::Duration;
374 /// #
375 /// # #[tokio::main]
376 /// # async fn main() {
377 /// let timer = TimerWheel::with_defaults();
378 /// let service = timer.create_service(ServiceConfig::default());
379 ///
380 /// let handles = service.allocate_handles(10);
381 /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
382 /// let tasks: Vec<_> = (0..10)
383 /// .map(|i| {
384 /// let callback = Some(CallbackWrapper::new(move || async move {
385 /// println!("Timer {} fired!", i); // 定时器触�?
386 /// }));
387 /// TimerTask::new_oneshot(Duration::from_secs(10), callback)
388 /// })
389 /// .collect();
390 /// service.register_batch(handles, tasks).unwrap(); // 注册定时�?
391 ///
392 /// // Batch cancel
393 /// let cancelled = service.cancel_batch(&task_ids);
394 /// println!("Cancelled {} tasks", cancelled); // 任务取消
395 /// # }
396 /// ```
397 #[inline]
398 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
399 if task_ids.is_empty() {
400 return 0;
401 }
402
403 // Direct batch cancellation, no need to notify Actor
404 // FuturesUnordered will automatically clean up when tasks are cancelled
405 // 直接批量取消,无需通知 Actor
406 // FuturesUnordered 将在任务取消时自动清�?
407 let mut wheel = self.wheel.lock();
408 wheel.cancel_batch(task_ids)
409 }
410
411 /// Postpone task (optionally replace callback)
412 ///
413 /// # Parameters
414 /// - `task_id`: Task ID to postpone
415 /// - `new_delay`: New delay time (recalculated from current time point)
416 /// - `callback`: New callback function (if `None`, keeps the original callback)
417 ///
418 /// # Returns
419 /// - `true`: Task exists and is successfully postponed
420 /// - `false`: Task does not exist or postponement fails
421 ///
422 /// # Notes
423 /// - Task ID remains unchanged after postponement
424 /// - Original timeout notification remains valid
425 /// - If callback is `Some`, it will replace the original callback
426 /// - If callback is `None`, the original callback is preserved
427 ///
428 /// 推迟任务 (可选替换回�?
429 ///
430 /// # 参数
431 /// - `task_id`: 任务 ID
432 /// - `new_delay`: 新的延迟时间 (从当前时间点重新计算)
433 /// - `callback`: 新的回调函数 (如果�?`None`,则保留原回�?
434 ///
435 /// # 返回�?
436 /// - `true`: 任务存在且延期成�?
437 /// - `false`: 任务不存在或延期失败
438 ///
439 /// # 注意
440 /// - 任务 ID 在延期后保持不变
441 /// - 原始超时通知保持有效
442 /// - 如果 callback �?`Some`,将替换原始回调
443 /// - 如果 callback �?`None`,保留原始回�?
444 ///
445 /// # Examples (示例)
446 /// ```no_run
447 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
448 /// # use std::time::Duration;
449 /// #
450 /// # #[tokio::main]
451 /// # async fn main() {
452 /// let timer = TimerWheel::with_defaults();
453 /// let service = timer.create_service(ServiceConfig::default());
454 ///
455 /// let handle = service.allocate_handle();
456 /// let task_id = handle.task_id();
457 /// let callback = Some(CallbackWrapper::new(|| async {
458 /// println!("Original callback"); // 原始回调
459 /// }));
460 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), callback);
461 /// service.register(handle, task).unwrap(); // 注册定时�?
462 ///
463 /// // Postpone and replace callback (延期并替换回�?
464 /// let new_callback = Some(CallbackWrapper::new(|| async { println!("New callback!"); }));
465 /// let success = service.postpone(
466 /// task_id,
467 /// Duration::from_secs(10),
468 /// new_callback
469 /// );
470 /// println!("Postponed successfully: {}", success);
471 /// # }
472 /// ```
473 #[inline]
474 pub fn postpone(&self, task_id: TaskId, new_delay: Duration, callback: Option<CallbackWrapper>) -> bool {
475 let mut wheel = self.wheel.lock();
476 wheel.postpone(task_id, new_delay, callback)
477 }
478
479 /// Batch postpone tasks (keep original callbacks)
480 ///
481 /// # Parameters
482 /// - `updates`: List of tuples of (task ID, new delay)
483 ///
484 /// # Returns
485 /// Number of successfully postponed tasks
486 ///
487 /// 批量延期任务 (保持原始回调)
488 ///
489 /// # 参数
490 /// - `updates`: (任务 ID, 新延�? 元组列表
491 ///
492 /// # 返回�?
493 /// 成功延期的任务数�?
494 ///
495 /// # Examples (示例)
496 /// ```no_run
497 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, TimerTask, config::ServiceConfig};
498 /// # use std::time::Duration;
499 /// #
500 /// # #[tokio::main]
501 /// # async fn main() {
502 /// let timer = TimerWheel::with_defaults();
503 /// let service = timer.create_service(ServiceConfig::default());
504 ///
505 /// let handles = service.allocate_handles(3);
506 /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
507 /// let tasks: Vec<_> = (0..3)
508 /// .map(|i| {
509 /// let callback = Some(CallbackWrapper::new(move || async move {
510 /// println!("Timer {} fired!", i);
511 /// }));
512 /// TimerTask::new_oneshot(Duration::from_secs(5), callback)
513 /// })
514 /// .collect();
515 /// service.register_batch(handles, tasks).unwrap();
516 ///
517 /// // Batch postpone (keep original callbacks)
518 /// // 批量延期任务 (保持原始回调)
519 /// let updates: Vec<_> = task_ids
520 /// .into_iter()
521 /// .map(|id| (id, Duration::from_secs(10)))
522 /// .collect();
523 /// let postponed = service.postpone_batch(updates);
524 /// println!("Postponed {} tasks", postponed);
525 /// # }
526 /// ```
527 #[inline]
528 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
529 if updates.is_empty() {
530 return 0;
531 }
532
533 let mut wheel = self.wheel.lock();
534 wheel.postpone_batch(updates)
535 }
536
537 /// Batch postpone tasks (replace callbacks)
538 ///
539 /// # Parameters
540 /// - `updates`: List of tuples of (task ID, new delay, new callback)
541 ///
542 /// # Returns
543 /// Number of successfully postponed tasks
544 ///
545 /// 批量延期任务 (替换回调)
546 ///
547 /// # 参数
548 /// - `updates`: (任务 ID, 新延�? 新回�? 元组列表
549 ///
550 /// # 返回�?
551 /// 成功延期的任务数�?
552 ///
553 /// # Examples (示例)
554 /// ```no_run
555 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
556 /// # use std::time::Duration;
557 /// #
558 /// # #[tokio::main]
559 /// # async fn main() {
560 /// # use kestrel_timer::TimerTask;
561 /// let timer = TimerWheel::with_defaults();
562 /// let service = timer.create_service(ServiceConfig::default());
563 ///
564 /// // Create 3 tasks, initially no callbacks
565 /// // 创建 3 个任务,最初没有回�?
566 /// let handles = service.allocate_handles(3);
567 /// let task_ids: Vec<_> = handles.iter().map(|h| h.task_id()).collect();
568 /// let tasks: Vec<_> = (0..3)
569 /// .map(|_| {
570 /// TimerTask::new_oneshot(Duration::from_secs(5), None)
571 /// })
572 /// .collect();
573 /// service.register_batch(handles, tasks).unwrap();
574 ///
575 /// // Batch postpone and add new callbacks
576 /// // 批量延期并添加新的回�?
577 /// let updates: Vec<_> = task_ids
578 /// .into_iter()
579 /// .enumerate()
580 /// .map(|(i, id)| {
581 /// let callback = Some(CallbackWrapper::new(move || async move {
582 /// println!("New callback {}", i);
583 /// }));
584 /// (id, Duration::from_secs(10), callback)
585 /// })
586 /// .collect();
587 /// let postponed = service.postpone_batch_with_callbacks(updates);
588 /// println!("Postponed {} tasks", postponed);
589 /// # }
590 /// ```
591 #[inline]
592 pub fn postpone_batch_with_callbacks(
593 &self,
594 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
595 ) -> usize {
596 if updates.is_empty() {
597 return 0;
598 }
599
600 let mut wheel = self.wheel.lock();
601 wheel.postpone_batch_with_callbacks(updates)
602 }
603
604 /// Register timer task to service (registration phase)
605 ///
606 /// # Parameters
607 /// - `handle`: Handle allocated via `allocate_handle()`
608 /// - `task`: Task created via `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
609 ///
610 /// # Returns
611 /// - `Ok(TimerHandle)`: Register successfully
612 /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
613 ///
614 /// 注册定时器任务到服务 (注册阶段)
615 /// # 参数
616 /// - `handle`: 通过 `allocate_handle()` 分配�?handle
617 /// - `task`: 通过 `TimerTask::new_oneshot()` �?`TimerTask::new_periodic()` 创建的任�?
618 ///
619 /// # 返回�?
620 /// - `Ok(TimerHandle)`: 注册成功
621 /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关�?
622 ///
623 /// # Examples (示例)
624 /// ```no_run
625 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
626 /// # use std::time::Duration;
627 /// #
628 /// # #[tokio::main]
629 /// # async fn main() {
630 /// let timer = TimerWheel::with_defaults();
631 /// let service = timer.create_service(ServiceConfig::default());
632 ///
633 /// // Step 1: allocate handle
634 /// // 分配 handle
635 /// let handle = service.allocate_handle();
636 /// let task_id = handle.task_id();
637 ///
638 /// // Step 2: create task
639 /// // 创建任务
640 /// let callback = Some(CallbackWrapper::new(|| async move {
641 /// println!("Timer fired!");
642 /// }));
643 /// let task = TimerTask::new_oneshot(Duration::from_millis(100), callback);
644 ///
645 /// // Step 3: register task
646 /// // 注册任务
647 /// service.register(handle, task).unwrap();
648 /// # }
649 /// ```
650 #[inline]
651 pub fn register(&self, handle: crate::task::TaskHandle, task: crate::task::TimerTask) -> Result<TimerHandle, TimerError> {
652 let task_id = handle.task_id();
653
654 let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
655
656 // Single lock, complete all operations
657 // 单次锁定,完成所有操�?
658 {
659 let mut wheel_guard = self.wheel.lock();
660 wheel_guard.insert(handle, task);
661 }
662
663 // Add to service management (only send necessary data)
664 // 添加到服务管理(只发送必要数据)
665 self.command_tx
666 .try_send(ServiceCommand::AddTimerHandle {
667 task_id,
668 completion_rx,
669 })
670 .map_err(|_| TimerError::RegisterFailed)?;
671
672 Ok(TimerHandle::new(task_id, self.wheel.clone()))
673 }
674
675 /// Batch register timer tasks to service (registration phase)
676 ///
677 /// # Parameters
678 /// - `handles`: Pre-allocated handles for tasks
679 /// - `tasks`: List of timer tasks
680 ///
681 /// # Returns
682 /// - `Ok(BatchHandle)`: Register successfully
683 /// - `Err(TimerError::RegisterFailed)`: Register failed (internal channel is full or closed)
684 /// - `Err(TimerError::BatchLengthMismatch)`: Handles and tasks lengths don't match
685 ///
686 /// 批量注册定时器任务到服务 (注册阶段)
687 /// # 参数
688 /// - `handles`: 任务的预分配 handles
689 /// - `tasks`: 定时器任务列�?
690 ///
691 /// # 返回�?
692 /// - `Ok(BatchHandle)`: 注册成功
693 /// - `Err(TimerError::RegisterFailed)`: 注册失败 (内部通道已满或关�?
694 /// - `Err(TimerError::BatchLengthMismatch)`: handles �?tasks 长度不匹�?
695 ///
696 /// # Examples (示例)
697 /// ```no_run
698 /// # use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig, TimerTask};
699 /// # use std::time::Duration;
700 /// #
701 /// # #[tokio::main]
702 /// # async fn main() {
703 /// # use kestrel_timer::TimerTask;
704 /// let timer = TimerWheel::with_defaults();
705 /// let service = timer.create_service(ServiceConfig::default());
706 ///
707 /// // Step 1: batch allocate handles
708 /// // 批量分配 handles
709 /// let handles = service.allocate_handles(3);
710 ///
711 /// // Step 2: create tasks
712 /// // 创建任务
713 /// let tasks: Vec<_> = (0..3)
714 /// .map(|i| {
715 /// let callback = Some(CallbackWrapper::new(move || async move {
716 /// println!("Timer {} fired!", i);
717 /// }));
718 /// TimerTask::new_oneshot(Duration::from_secs(1), callback)
719 /// })
720 /// .collect();
721 ///
722 /// // Step 3: register batch
723 /// // 注册批量任务
724 /// service.register_batch(handles, tasks).unwrap();
725 /// # }
726 /// ```
727 #[inline]
728 pub fn register_batch(
729 &self,
730 handles: Vec<crate::task::TaskHandle>,
731 tasks: Vec<crate::task::TimerTask>
732 ) -> Result<BatchHandle, TimerError> {
733 // Validate lengths match
734 if handles.len() != tasks.len() {
735 return Err(TimerError::BatchLengthMismatch {
736 handles_len: handles.len(),
737 tasks_len: tasks.len(),
738 });
739 }
740
741 let task_count = tasks.len();
742 let mut completion_rxs = Vec::with_capacity(task_count);
743 let mut task_ids = Vec::with_capacity(task_count);
744 let mut prepared_handles = Vec::with_capacity(task_count);
745 let mut prepared_tasks = Vec::with_capacity(task_count);
746
747 // Step 1: prepare all channels and notifiers (no lock)
748 // 步骤 1: 准备所有通道和通知器(无锁�?
749 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
750 let task_id = handle.task_id();
751 let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
752 task_ids.push(task_id);
753 completion_rxs.push(completion_rx);
754 prepared_handles.push(handle);
755 prepared_tasks.push(task);
756 }
757
758 // Step 2: single lock, batch insert
759 // 步骤 2: 单次锁定,批量插�?
760 {
761 let mut wheel_guard = self.wheel.lock();
762 wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
763 }
764
765 // Add to service management (only send necessary data)
766 // 添加到服务管理(只发送必要数据)
767 self.command_tx
768 .try_send(ServiceCommand::AddBatchHandle {
769 task_ids: task_ids.clone(),
770 completion_rxs,
771 })
772 .map_err(|_| TimerError::RegisterFailed)?;
773
774 Ok(BatchHandle::new(task_ids, self.wheel.clone()))
775 }
776
777 /// Graceful shutdown of TimerService
778 ///
779 /// 优雅关闭 TimerService
780 ///
781 /// # Examples (示例)
782 /// ```no_run
783 /// # use kestrel_timer::{TimerWheel, config::ServiceConfig};
784 /// # #[tokio::main]
785 /// # async fn main() {
786 /// let timer = TimerWheel::with_defaults();
787 /// let mut service = timer.create_service(ServiceConfig::default());
788 ///
789 /// // Use service... (使用服务...)
790 ///
791 /// service.shutdown().await;
792 /// # }
793 /// ```
794 pub async fn shutdown(mut self) {
795 if let Some(shutdown_tx) = self.shutdown_tx.take() {
796 shutdown_tx.notify(());
797 }
798 if let Some(handle) = self.actor_handle.take() {
799 let _ = handle.await;
800 }
801 }
802}
803
804
805impl Drop for TimerService {
806 fn drop(&mut self) {
807 if let Some(handle) = self.actor_handle.take() {
808 handle.abort();
809 }
810 }
811}
812
813/// ServiceActor - internal Actor implementation
814///
815/// ServiceActor - 内部 Actor 实现
816struct ServiceActor {
817 /// Command receiver
818 ///
819 /// 命令接收�?
820 command_rx: spsc::Receiver<ServiceCommand, 32>,
821 /// Timeout sender (supports both one-shot and periodic task notifications)
822 ///
823 /// 超时发送器(支持一次性和周期性任务通知�?
824 timeout_tx: spsc::Sender<TaskNotification, 32>,
825 /// Actor shutdown signal receiver
826 ///
827 /// Actor 关闭信号接收�?
828 shutdown_rx: oneshot::Receiver<()>,
829}
830
831impl ServiceActor {
832 /// Create new ServiceActor
833 ///
834 /// 创建新的 ServiceActor
835 fn new(command_rx: spsc::Receiver<ServiceCommand, 32>, timeout_tx: spsc::Sender<TaskNotification, 32>, shutdown_rx: oneshot::Receiver<()>) -> Self {
836 Self {
837 command_rx,
838 timeout_tx,
839 shutdown_rx,
840 }
841 }
842
843 /// Run Actor event loop
844 ///
845 /// 运行 Actor 事件循环
846 async fn run(self) {
847 // Use separate FuturesUnordered for one-shot and periodic tasks
848 // 为一次性任务和周期性任务使用独立的 FuturesUnordered
849
850 // One-shot futures: each future returns (TaskId, TaskCompletion)
851 // 一次性任�?futures:每�?future 返回 (TaskId, TaskCompletion)
852 let mut oneshot_futures: FuturesUnordered<BoxFuture<'static, (TaskId, TaskCompletion)>> = FuturesUnordered::new();
853
854 // Periodic futures: each future returns (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
855 // The receiver is returned so we can continue listening for next event
856 // 周期性任�?futures:每�?future 返回 (TaskId, Option<PeriodicTaskCompletion>, mpsc::Receiver)
857 // 返回接收器以便我们可以继续监听下一个事�?
858 type PeriodicFutureResult = (TaskId, Option<TaskCompletion>, crate::task::PeriodicCompletionReceiver);
859 let mut periodic_futures: FuturesUnordered<BoxFuture<'static, PeriodicFutureResult>> = FuturesUnordered::new();
860
861 // Move shutdown_rx out of self, so it can be used in select! with &mut
862 // �?shutdown_rx �?self 中移出,以便�?select! 中使�?&mut
863 let mut shutdown_rx = self.shutdown_rx;
864
865 loop {
866 tokio::select! {
867 // Listen to high-priority shutdown signal
868 // 监听高优先级关闭信号
869 _ = &mut shutdown_rx => {
870 // Receive shutdown signal, exit loop immediately
871 // 接收到关闭信号,立即退出循�?
872 break;
873 }
874
875 // Listen to one-shot task timeout events
876 // 监听一次性任务超时事�?
877 Some((task_id, completion)) = oneshot_futures.next() => {
878 // Check completion reason, only forward Called events, do not forward Cancelled events
879 // 检查完成原因,只转�?Called 事件,不转发 Cancelled 事件
880 if completion == TaskCompletion::Called {
881 let _ = self.timeout_tx.send(TaskNotification::OneShot(task_id)).await;
882 }
883 // Task will be automatically removed from FuturesUnordered
884 // 任务将自动从 FuturesUnordered 中移�?
885 }
886
887 // Listen to periodic task events
888 // 监听周期性任务事�?
889 Some((task_id, reason, mut receiver)) = periodic_futures.next() => {
890 // Check completion reason, only forward Called events, do not forward Cancelled events
891 // 检查完成原因,只转�?Called 事件,不转发 Cancelled 事件
892 if let Some(TaskCompletion::Called) = reason {
893 let _ = self.timeout_tx.send(TaskNotification::Periodic(task_id)).await;
894
895 // Re-add the receiver to continue listening for next periodic event
896 // 重新添加接收器以继续监听下一个周期性事�?
897 let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
898 let reason = receiver.recv().await;
899 (task_id, reason, receiver)
900 });
901 periodic_futures.push(future);
902 }
903 // If Cancelled or None, do not re-add the future (task is done)
904 // 如果�?Cancelled �?None,不重新添加 future(任务结束)
905 }
906
907 // Listen to commands
908 // 监听命令
909 Some(cmd) = self.command_rx.recv() => {
910 match cmd {
911 ServiceCommand::AddBatchHandle { task_ids, completion_rxs } => {
912 // Add all tasks to appropriate futures
913 // 将所有任务添加到相应�?futures
914 for (task_id, rx) in task_ids.into_iter().zip(completion_rxs.into_iter()) {
915 match rx {
916 crate::task::CompletionReceiver::OneShot(receiver) => {
917 let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
918 (task_id, receiver.wait().await)
919 });
920 oneshot_futures.push(future);
921 },
922 crate::task::CompletionReceiver::Periodic(mut receiver) => {
923 let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
924 let reason = receiver.recv().await;
925 (task_id, reason, receiver)
926 });
927 periodic_futures.push(future);
928 }
929 }
930 }
931 }
932 ServiceCommand::AddTimerHandle { task_id, completion_rx } => {
933 // Add to appropriate futures
934 // 添加到相应的 futures
935 match completion_rx {
936 crate::task::CompletionReceiver::OneShot(receiver) => {
937 let future: BoxFuture<'static, (TaskId, TaskCompletion)> = Box::pin(async move {
938 (task_id, receiver.wait().await)
939 });
940 oneshot_futures.push(future);
941 },
942 crate::task::CompletionReceiver::Periodic(mut receiver) => {
943 let future: BoxFuture<'static, PeriodicFutureResult> = Box::pin(async move {
944 let reason = receiver.recv().await;
945 (task_id, reason, receiver)
946 });
947 periodic_futures.push(future);
948 }
949 }
950 }
951 }
952 }
953
954 // If no futures and command channel is closed, exit loop
955 // 如果没有 futures 且命令通道关闭,退出循�?
956 else => {
957 break;
958 }
959 }
960 }
961 }
962}
963
964#[cfg(test)]
965mod tests {
966 use super::*;
967 use crate::{TimerWheel, TimerTask};
968 use std::sync::atomic::{AtomicU32, Ordering};
969 use std::sync::Arc;
970 use std::time::Duration;
971
972 #[tokio::test]
973 async fn test_service_creation() {
974 let timer = TimerWheel::with_defaults();
975 let _service = timer.create_service(ServiceConfig::default());
976 }
977
978 #[tokio::test]
979 async fn test_add_timer_handle_and_receive_timeout() {
980 let timer = TimerWheel::with_defaults();
981 let mut service = timer.create_service(ServiceConfig::default());
982
983 // Allocate handle (分配 handle)
984 let handle = service.allocate_handle();
985 let task_id = handle.task_id();
986
987 // Create single timer (创建单个定时�?
988 let task = TimerTask::new_oneshot(Duration::from_millis(50), Some(CallbackWrapper::new(|| async {})));
989
990 // Register to service (注册到服�?
991 service.register(handle, task).unwrap();
992
993 // Receive timeout notification (接收超时通知)
994 let rx = service.take_receiver().unwrap();
995 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
996 .await
997 .expect("Should receive timeout notification")
998 .expect("Should receive Some value");
999
1000 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1001 }
1002
1003 #[tokio::test]
1004 async fn test_shutdown() {
1005 let timer = TimerWheel::with_defaults();
1006 let service = timer.create_service(ServiceConfig::default());
1007
1008 // Add some timers (添加一些定时器)
1009 let handle1 = service.allocate_handle();
1010 let handle2 = service.allocate_handle();
1011 let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1012 let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
1013 service.register(handle1, task1).unwrap();
1014 service.register(handle2, task2).unwrap();
1015
1016 // Immediately shutdown (without waiting for timers to trigger) (立即关闭(不等待定时器触发))
1017 service.shutdown().await;
1018 }
1019
1020 #[tokio::test]
1021 async fn test_schedule_once_direct() {
1022 let timer = TimerWheel::with_defaults();
1023 let mut service = timer.create_service(ServiceConfig::default());
1024 let counter = Arc::new(AtomicU32::new(0));
1025
1026 // Schedule timer directly through service
1027 // 直接通过服务调度定时器
1028 let counter_clone = Arc::clone(&counter);
1029 let handle = service.allocate_handle();
1030 let task_id = handle.task_id();
1031 let task = TimerTask::new_oneshot(
1032 Duration::from_millis(50),
1033 Some(CallbackWrapper::new(move || {
1034 let counter = Arc::clone(&counter_clone);
1035 async move {
1036 counter.fetch_add(1, Ordering::SeqCst);
1037 }
1038 })),
1039 );
1040 service.register(handle, task).unwrap();
1041
1042 // Wait for timer to trigger
1043 // 等待定时器触�?
1044 let rx = service.take_receiver().unwrap();
1045 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1046 .await
1047 .expect("Should receive timeout notification")
1048 .expect("Should receive Some value");
1049
1050 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1051
1052 // Wait for callback to execute
1053 // 等待回调执行
1054 tokio::time::sleep(Duration::from_millis(50)).await;
1055 assert_eq!(counter.load(Ordering::SeqCst), 1);
1056 }
1057
1058 #[tokio::test]
1059 async fn test_schedule_once_notify_direct() {
1060 let timer = TimerWheel::with_defaults();
1061 let mut service = timer.create_service(ServiceConfig::default());
1062
1063 // Schedule only notification timer directly through service (no callback)
1064 // 直接通过服务调度通知定时器(没有回调�?
1065 let handle = service.allocate_handle();
1066 let task_id = handle.task_id();
1067 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1068 service.register(handle, task).unwrap();
1069
1070 // Receive timeout notification
1071 // 接收超时通知
1072 let rx = service.take_receiver().unwrap();
1073 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1074 .await
1075 .expect("Should receive timeout notification")
1076 .expect("Should receive Some value");
1077
1078 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1079 }
1080
1081 #[tokio::test]
1082 async fn test_task_timeout_cleans_up_task_sender() {
1083 let timer = TimerWheel::with_defaults();
1084 let mut service = timer.create_service(ServiceConfig::default());
1085
1086 // Add a short-term timer (添加短期定时�?
1087 let handle = service.allocate_handle();
1088 let task_id = handle.task_id();
1089 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
1090
1091 service.register(handle, task).unwrap();
1092
1093 // Wait for task timeout (等待任务超时)
1094 let rx = service.take_receiver().unwrap();
1095 let received_notification = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1096 .await
1097 .expect("Should receive timeout notification")
1098 .expect("Should receive Some value");
1099
1100 assert_eq!(received_notification, TaskNotification::OneShot(task_id));
1101
1102 // Wait a moment to ensure internal cleanup is complete (等待片刻以确保内部清理完�?
1103 tokio::time::sleep(Duration::from_millis(10)).await;
1104
1105 // Try to cancel the timed-out task, should return false (尝试取消超时任务,应返回 false)
1106 let cancelled = service.cancel_task(task_id);
1107 assert!(!cancelled, "Timed out task should not exist anymore");
1108 }
1109
1110 #[tokio::test]
1111 async fn test_take_receiver_twice() {
1112 let timer = TimerWheel::with_defaults();
1113 let mut service = timer.create_service(ServiceConfig::default());
1114
1115 // First call should return Some
1116 // 第一次调用应该返�?Some
1117 let rx1 = service.take_receiver();
1118 assert!(rx1.is_some(), "First take_receiver should return Some");
1119
1120 // Second call should return None
1121 // 第二次调用应该返�?None
1122 let rx2 = service.take_receiver();
1123 assert!(rx2.is_none(), "Second take_receiver should return None");
1124 }
1125}