kestrel_timer/timer.rs
1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskId, TaskCompletionReason};
5use crate::wheel::Wheel;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::oneshot;
10use tokio::task::JoinHandle;
11use handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
12
13/// Timing Wheel Timer Manager
14///
15/// 时间轮定时器管理器
16pub struct TimerWheel {
17 /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
18 ///
19 /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
20 wheel: Arc<Mutex<Wheel>>,
21
22 /// Background tick loop task handle
23 ///
24 /// 后台 tick 循环任务句柄
25 tick_handle: Option<JoinHandle<()>>,
26}
27
28impl TimerWheel {
29 /// Create a new timer manager
30 ///
31 /// # Parameters
32 /// - `config`: Timing wheel configuration, already validated
33 ///
34 /// 创建新的定时器管理器
35 ///
36 /// # 参数
37 /// - `config`: 时间轮配置,已验证
38 ///
39 /// # Examples (示例)
40 /// ```no_run
41 /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
42 /// use std::time::Duration;
43 ///
44 /// #[tokio::main]
45 /// async fn main() {
46 /// let config = WheelConfig::builder()
47 /// .l0_tick_duration(Duration::from_millis(10))
48 /// .l0_slot_count(512)
49 /// .l1_tick_duration(Duration::from_secs(1))
50 /// .l1_slot_count(64)
51 /// .build()
52 /// .unwrap();
53 /// let timer = TimerWheel::new(config, BatchConfig::default());
54 ///
55 /// // Use two-step API
56 /// // (Use two-step API)
57 /// let task = TimerWheel::create_task(Duration::from_secs(1), None);
58 /// let handle = timer.register(task);
59 /// }
60 /// ```
61 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
62 let tick_duration = config.l0_tick_duration;
63 let wheel = Wheel::new(config, batch_config);
64 let wheel = Arc::new(Mutex::new(wheel));
65 let wheel_clone = wheel.clone();
66
67 // Start background tick loop
68 // 启动后台 tick 循环
69 let tick_handle = tokio::spawn(async move {
70 Self::tick_loop(wheel_clone, tick_duration).await;
71 });
72
73 Self {
74 wheel,
75 tick_handle: Some(tick_handle),
76 }
77 }
78
79 /// Create timer manager with default configuration, hierarchical mode
80 /// - L0 layer tick duration: 10ms, slot count: 512
81 /// - L1 layer tick duration: 1s, slot count: 64
82 ///
83 /// # Parameters
84 /// - `config`: Timing wheel configuration, already validated
85 ///
86 /// # Returns
87 /// Timer manager instance
88 ///
89 /// 使用默认配置创建定时器管理器,分层模式
90 /// - L0 层 tick 持续时间:10ms,槽数量:512
91 /// - L1 层 tick 持续时间:1s,槽数量:64
92 ///
93 /// # 参数
94 /// - `config`: 时间轮配置,已验证
95 ///
96 /// # 返回值
97 /// 定时器管理器实例
98 ///
99 /// # Examples (示例)
100 /// ```no_run
101 /// use kestrel_timer::TimerWheel;
102 ///
103 /// #[tokio::main]
104 /// async fn main() {
105 /// let timer = TimerWheel::with_defaults();
106 /// }
107 /// ```
108 pub fn with_defaults() -> Self {
109 Self::new(WheelConfig::default(), BatchConfig::default())
110 }
111
112 /// Create TimerService bound to this timing wheel with default configuration
113 ///
114 /// # Parameters
115 /// - `service_config`: Service configuration
116 ///
117 /// # Returns
118 /// TimerService instance bound to this timing wheel
119 ///
120 /// 创建绑定到此时间轮的 TimerService,使用默认配置
121 ///
122 /// # 参数
123 /// - `service_config`: 服务配置
124 ///
125 /// # 返回值
126 /// 绑定到此时间轮的 TimerService 实例
127 ///
128 /// # Examples (示例)
129 /// ```no_run
130 /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
131 /// use std::time::Duration;
132 ///
133 ///
134 /// #[tokio::main]
135 /// async fn main() {
136 /// let timer = TimerWheel::with_defaults();
137 /// let mut service = timer.create_service(ServiceConfig::default());
138 ///
139 /// // Use two-step API to batch schedule timers through service
140 /// // 使用两步 API 通过服务批量调度定时器
141 /// let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
142 /// .map(|_| (Duration::from_millis(100), Some(CallbackWrapper::new(|| async {}))))
143 /// .collect();
144 /// let tasks = TimerService::create_batch_with_callbacks(callbacks);
145 /// service.register_batch(tasks).unwrap();
146 ///
147 /// // Receive timeout notifications
148 /// // 接收超时通知
149 /// let mut rx = service.take_receiver().unwrap();
150 /// while let Some(task_id) = rx.recv().await {
151 /// println!("Task {:?} completed", task_id);
152 /// }
153 /// }
154 /// ```
155 pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
156 crate::service::TimerService::new(self.wheel.clone(), service_config)
157 }
158
159 /// Create TimerService bound to this timing wheel with custom configuration
160 ///
161 /// # Parameters
162 /// - `config`: Service configuration
163 ///
164 /// # Returns
165 /// TimerService instance bound to this timing wheel
166 ///
167 /// 创建绑定到此时间轮的 TimerService,使用自定义配置
168 ///
169 /// # 参数
170 /// - `config`: 服务配置
171 ///
172 /// # 返回值
173 /// 绑定到此时间轮的 TimerService 实例
174 ///
175 /// # Examples (示例)
176 /// ```no_run
177 /// use kestrel_timer::{TimerWheel, config::ServiceConfig};
178 ///
179 /// #[tokio::main]
180 /// async fn main() {
181 /// let timer = TimerWheel::with_defaults();
182 /// let config = ServiceConfig::builder()
183 /// .command_channel_capacity(1024)
184 /// .timeout_channel_capacity(2000)
185 /// .build()
186 /// .unwrap();
187 /// let service = timer.create_service_with_config(config);
188 /// }
189 /// ```
190 pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
191 crate::service::TimerService::new(self.wheel.clone(), config)
192 }
193
194 /// Create timer task (static method, apply stage)
195 ///
196 /// # Parameters
197 /// - `delay`: Delay duration
198 /// - `callback`: Callback object implementing TimerCallback trait
199 ///
200 /// # Returns
201 /// Return TimerTask, needs to be registered through `register()`
202 ///
203 /// 创建定时器任务 (静态方法,应用阶段)
204 ///
205 /// # 参数
206 /// - `delay`: 延迟时间
207 /// - `callback`: 回调对象,实现 TimerCallback 特质
208 ///
209 /// # 返回值
210 /// 返回 TimerTask,需要通过 `register()` 注册
211 ///
212 /// # Examples (示例)
213 /// ```no_run
214 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
215 /// use std::time::Duration;
216 ///
217 ///
218 /// #[tokio::main]
219 /// async fn main() {
220 /// let timer = TimerWheel::with_defaults();
221 ///
222 /// // Step 1: Create task
223 /// // 创建任务
224 /// let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
225 /// println!("Timer fired!");
226 /// })));
227 ///
228 /// // Get task ID (获取任务 ID)
229 /// let task_id = task.get_id();
230 /// println!("Created task: {:?}", task_id);
231 ///
232 /// // Step 2: Register task
233 /// // 注册任务
234 /// let handle = timer.register(task);
235 /// }
236 /// ```
237 #[inline]
238 pub fn create_task(delay: Duration, callback: Option<CallbackWrapper>) -> crate::task::TimerTask {
239 crate::task::TimerTask::new(delay, callback)
240 }
241
242 /// Create batch of timer tasks (static method, apply stage, no callbacks)
243 ///
244 /// # Parameters
245 /// - `delays`: List of delay times
246 ///
247 /// # Returns
248 /// Return TimerTask list, needs to be registered through `register_batch()`
249 ///
250 /// 创建定时器任务 (静态方法,应用阶段,没有回调)
251 ///
252 /// # 参数
253 /// - `delays`: 延迟时间列表
254 ///
255 /// # 返回值
256 /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册
257 ///
258 /// # Examples (示例)
259 /// ```no_run
260 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
261 /// use std::time::Duration;
262 /// use std::sync::Arc;
263 /// use std::sync::atomic::{AtomicU32, Ordering};
264 ///
265 /// #[tokio::main]
266 /// async fn main() {
267 /// let timer = TimerWheel::with_defaults();
268 /// let counter = Arc::new(AtomicU32::new(0));
269 ///
270 /// // Step 1: Create batch of tasks
271 /// // 创建批量任务
272 /// let delays: Vec<Duration> = (0..3)
273 /// .map(|_| Duration::from_millis(100))
274 /// .collect();
275 ///
276 /// // Create batch of tasks
277 /// let tasks = TimerWheel::create_batch(delays);
278 /// println!("Created {} tasks", tasks.len());
279 ///
280 /// // Step 2: Register batch of tasks
281 /// // 注册批量任务
282 /// let batch = timer.register_batch(tasks);
283 /// }
284 /// ```
285 #[inline]
286 pub fn create_batch(delays: Vec<Duration>) -> Vec<crate::task::TimerTask>
287 {
288 delays
289 .into_iter()
290 .map(|delay| crate::task::TimerTask::new(delay, None))
291 .collect()
292 }
293
294 /// Create batch of timer tasks (static method, apply stage, with callbacks)
295 ///
296 /// # Parameters
297 /// - `callbacks`: List of tuples of (delay time, callback)
298 ///
299 /// # Returns
300 /// Return TimerTask list, needs to be registered through `register_batch()`
301 ///
302 /// 创建定时器任务 (静态方法,应用阶段,有回调)
303 ///
304 /// # 参数
305 /// - `callbacks`: (延迟时间, 回调) 元组列表
306 ///
307 /// # 返回值
308 /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册
309 ///
310 /// # Examples (示例)
311 /// ```no_run
312 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
313 /// use std::time::Duration;
314 /// use std::sync::Arc;
315 /// use std::sync::atomic::{AtomicU32, Ordering};
316 ///
317 /// #[tokio::main]
318 /// async fn main() {
319 /// let timer = TimerWheel::with_defaults();
320 /// let counter = Arc::new(AtomicU32::new(0));
321 ///
322 /// // Step 1: Create batch of tasks
323 /// // 创建批量任务
324 /// let delays: Vec<Duration> = (0..3)
325 /// .map(|_| Duration::from_millis(100))
326 /// .collect();
327 /// let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = delays
328 /// .into_iter()
329 /// .map(|delay| {
330 /// let counter = Arc::clone(&counter);
331 /// let callback = Some(CallbackWrapper::new(move || {
332 /// let counter = Arc::clone(&counter);
333 /// async move {
334 /// counter.fetch_add(1, Ordering::SeqCst);
335 /// }
336 /// }));
337 /// (delay, callback)
338 /// })
339 /// .collect();
340 ///
341 /// // Create batch of tasks
342 /// let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
343 /// println!("Created {} tasks", tasks.len());
344 ///
345 /// // Step 2: Register batch of tasks
346 /// // 注册批量任务
347 /// let batch = timer.register_batch(tasks);
348 /// }
349 /// ```
350 #[inline]
351 pub fn create_batch_with_callbacks(callbacks: Vec<(Duration, Option<CallbackWrapper>)>) -> Vec<crate::task::TimerTask>
352 {
353 callbacks
354 .into_iter()
355 .map(|(delay, callback)| crate::task::TimerTask::new(delay, callback))
356 .collect()
357 }
358
359 /// Register timer task to timing wheel (registration phase)
360 ///
361 /// # Parameters
362 /// - `task`: Task created via `create_task()`
363 ///
364 /// # Returns
365 /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
366 ///
367 /// 注册定时器任务到时间轮 (注册阶段)
368 ///
369 /// # 参数
370 /// - `task`: 通过 `create_task()` 创建的任务
371 ///
372 /// # 返回值
373 /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
374 ///
375 /// # Examples (示例)
376 /// ```no_run
377 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
378 ///
379 /// use std::time::Duration;
380 ///
381 /// #[tokio::main]
382 /// async fn main() {
383 /// let timer = TimerWheel::with_defaults();
384 ///
385 /// let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
386 /// println!("Timer fired!");
387 /// })));
388 /// let task_id = task.get_id();
389 ///
390 /// // Register task
391 /// // 注册任务
392 /// let handle = timer.register(task);
393 ///
394 /// // Wait for timer completion
395 /// // 等待定时器完成
396 /// let (rx, _handle) = handle.into_parts();
397 /// rx.0.await.ok();
398 /// }
399 /// ```
400 #[inline]
401 pub fn register(&self, task: crate::task::TimerTask) -> TimerHandleWithCompletion {
402 let (completion_tx, completion_rx) = oneshot::channel();
403 let notifier = crate::task::CompletionNotifier(completion_tx);
404
405 let task_id = task.id;
406
407 // 单次加锁完成所有操作
408 {
409 let mut wheel_guard = self.wheel.lock();
410 wheel_guard.insert(task, notifier);
411 }
412
413 TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
414 }
415
416 /// Batch register timer tasks to timing wheel (registration phase)
417 ///
418 /// # Parameters
419 /// - `tasks`: List of tasks created via `create_batch()`
420 ///
421 /// # Returns
422 /// Return batch timer handle with completion receivers
423 ///
424 /// 批量注册定时器任务到时间轮 (注册阶段)
425 ///
426 /// # 参数
427 /// - `tasks`: 通过 `create_batch()` 创建的任务列表
428 ///
429 /// # 返回值
430 /// 返回包含完成通知接收器的批量定时器句柄
431 ///
432 /// # Examples (示例)
433 /// ```no_run
434 /// use kestrel_timer::{TimerWheel, TimerTask};
435 /// use std::time::Duration;
436 ///
437 /// #[tokio::main]
438 /// async fn main() {
439 /// let timer = TimerWheel::with_defaults();
440 ///
441 /// let delays: Vec<Duration> = (0..3)
442 /// .map(|_| Duration::from_secs(1))
443 /// .collect();
444 /// let tasks = TimerWheel::create_batch(delays);
445 ///
446 /// let batch = timer.register_batch(tasks);
447 /// println!("Registered {} timers", batch.len());
448 /// }
449 /// ```
450 #[inline]
451 pub fn register_batch(&self, tasks: Vec<crate::task::TimerTask>) -> BatchHandleWithCompletion {
452 let task_count = tasks.len();
453 let mut completion_rxs = Vec::with_capacity(task_count);
454 let mut task_ids = Vec::with_capacity(task_count);
455 let mut prepared_tasks = Vec::with_capacity(task_count);
456
457 // Step 1: Prepare all channels and notifiers
458 for task in tasks {
459 let (completion_tx, completion_rx) = oneshot::channel();
460 let notifier = crate::task::CompletionNotifier(completion_tx);
461
462 task_ids.push(task.id);
463 completion_rxs.push(completion_rx);
464 prepared_tasks.push((task, notifier));
465 }
466
467 // Step 2: Single lock, batch insert
468 {
469 let mut wheel_guard = self.wheel.lock();
470 wheel_guard.insert_batch(prepared_tasks);
471 }
472
473 BatchHandleWithCompletion::new(BatchHandle::new(task_ids, self.wheel.clone()), completion_rxs)
474 }
475
476 /// Cancel timer
477 ///
478 /// # Parameters
479 /// - `task_id`: Task ID
480 ///
481 /// # Returns
482 /// Returns true if task exists and is successfully cancelled, otherwise false
483 ///
484 /// 取消定时器
485 ///
486 /// # 参数
487 /// - `task_id`: 任务 ID
488 ///
489 /// # 返回值
490 /// 如果任务存在且成功取消则返回 true,否则返回 false
491 ///
492 /// # Examples (示例)
493 /// ```no_run
494 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
495 ///
496 /// use std::time::Duration;
497 ///
498 /// #[tokio::main]
499 /// async fn main() {
500 /// let timer = TimerWheel::with_defaults();
501 ///
502 /// let task = TimerWheel::create_task(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
503 /// println!("Timer fired!");
504 /// })));
505 /// let task_id = task.get_id();
506 /// let _handle = timer.register(task);
507 ///
508 /// // Cancel task using task ID
509 /// // 使用任务 ID 取消任务
510 /// let cancelled = timer.cancel(task_id);
511 /// println!("Canceled successfully: {}", cancelled);
512 /// }
513 /// ```
514 #[inline]
515 pub fn cancel(&self, task_id: TaskId) -> bool {
516 let mut wheel = self.wheel.lock();
517 wheel.cancel(task_id)
518 }
519
520 /// Batch cancel timers
521 ///
522 /// # Parameters
523 /// - `task_ids`: List of task IDs to cancel
524 ///
525 /// # Returns
526 /// Number of successfully cancelled tasks
527 ///
528 /// 批量取消定时器
529 ///
530 /// # 参数
531 /// - `task_ids`: 要取消的任务 ID 列表
532 ///
533 /// # 返回值
534 /// 成功取消的任务数量
535 ///
536 /// # Performance Advantages
537 /// - Batch processing reduces lock contention
538 /// - Internally optimized batch cancellation operation
539 ///
540 /// # Examples (示例)
541 /// ```no_run
542 /// use kestrel_timer::{TimerWheel, TimerTask};
543 /// use std::time::Duration;
544 ///
545 /// #[tokio::main]
546 /// async fn main() {
547 /// let timer = TimerWheel::with_defaults();
548 ///
549 /// // Create multiple timers
550 /// // 创建多个定时器
551 /// let task1 = TimerWheel::create_task(Duration::from_secs(10), None);
552 /// let task2 = TimerWheel::create_task(Duration::from_secs(10), None);
553 /// let task3 = TimerWheel::create_task(Duration::from_secs(10), None);
554 ///
555 /// let task_ids = vec![task1.get_id(), task2.get_id(), task3.get_id()];
556 ///
557 /// let _h1 = timer.register(task1);
558 /// let _h2 = timer.register(task2);
559 /// let _h3 = timer.register(task3);
560 ///
561 /// // Batch cancel
562 /// // 批量取消
563 /// let cancelled = timer.cancel_batch(&task_ids);
564 /// println!("Canceled {} timers", cancelled);
565 /// }
566 /// ```
567 #[inline]
568 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
569 let mut wheel = self.wheel.lock();
570 wheel.cancel_batch(task_ids)
571 }
572
573 /// Postpone timer
574 ///
575 /// # Parameters
576 /// - `task_id`: Task ID to postpone
577 /// - `new_delay`: New delay duration, recalculated from current time
578 /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
579 ///
580 /// # Returns
581 /// Returns true if task exists and is successfully postponed, otherwise false
582 ///
583 /// 推迟定时器
584 ///
585 /// # 参数
586 /// - `task_id`: 要推迟的任务 ID
587 /// - `new_delay`: 新的延迟时间,从当前时间重新计算
588 /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
589 ///
590 /// # 返回值
591 /// 如果任务存在且成功推迟则返回 true,否则返回 false
592 ///
593 /// # Note
594 /// - Task ID remains unchanged after postponement
595 /// - Original completion_receiver remains valid
596 ///
597 /// # 注意
598 /// - 任务 ID 在推迟后保持不变
599 /// - 原始 completion_receiver 保持有效
600 ///
601 /// # Examples (示例)
602 ///
603 /// ## Keep original callback (保持原始回调)
604 /// ```no_run
605 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
606 /// use std::time::Duration;
607 ///
608 ///
609 /// #[tokio::main]
610 /// async fn main() {
611 /// let timer = TimerWheel::with_defaults();
612 ///
613 /// let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
614 /// println!("Timer fired!");
615 /// })));
616 /// let task_id = task.get_id();
617 /// let _handle = timer.register(task);
618 ///
619 /// // Postpone to 10 seconds after triggering, and keep original callback
620 /// // 推迟到 10 秒后触发,并保持原始回调
621 /// let success = timer.postpone(task_id, Duration::from_secs(10), None);
622 /// println!("Postponed successfully: {}", success);
623 /// }
624 /// ```
625 ///
626 /// ## Replace with new callback (替换为新的回调)
627 /// ```no_run
628 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
629 /// use std::time::Duration;
630 ///
631 /// #[tokio::main]
632 /// async fn main() {
633 /// let timer = TimerWheel::with_defaults();
634 ///
635 /// let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
636 /// println!("Original callback!");
637 /// })));
638 /// let task_id = task.get_id();
639 /// let _handle = timer.register(task);
640 ///
641 /// // Postpone to 10 seconds after triggering, and replace with new callback
642 /// // 推迟到 10 秒后触发,并替换为新的回调
643 /// let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
644 /// println!("New callback!");
645 /// })));
646 /// println!("Postponed successfully: {}", success);
647 /// }
648 /// ```
649 #[inline]
650 pub fn postpone(
651 &self,
652 task_id: TaskId,
653 new_delay: Duration,
654 callback: Option<CallbackWrapper>,
655 ) -> bool {
656 let mut wheel = self.wheel.lock();
657 wheel.postpone(task_id, new_delay, callback)
658 }
659
660 /// Batch postpone timers (keep original callbacks)
661 ///
662 /// # Parameters
663 /// - `updates`: List of tuples of (task ID, new delay)
664 ///
665 /// # Returns
666 /// Number of successfully postponed tasks
667 ///
668 /// 批量推迟定时器 (保持原始回调)
669 ///
670 /// # 参数
671 /// - `updates`: (任务 ID, 新延迟) 元组列表
672 ///
673 /// # 返回值
674 /// 成功推迟的任务数量
675 ///
676 /// # Note
677 /// - This method keeps all tasks' original callbacks unchanged
678 /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
679 ///
680 /// # 注意
681 /// - 此方法保持所有任务的原始回调不变
682 /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
683 ///
684 /// # Performance Advantages
685 /// - Batch processing reduces lock contention
686 /// - Internally optimized batch postponement operation
687 ///
688 /// # Examples (示例)
689 /// ```no_run
690 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
691 /// use std::time::Duration;
692 ///
693 /// #[tokio::main]
694 /// async fn main() {
695 /// let timer = TimerWheel::with_defaults();
696 ///
697 /// // Create multiple tasks with callbacks
698 /// // 创建多个带有回调的任务
699 /// let task1 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
700 /// println!("Task 1 fired!");
701 /// })));
702 /// let task2 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
703 /// println!("Task 2 fired!");
704 /// })));
705 /// let task3 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
706 /// println!("Task 3 fired!");
707 /// })));
708 ///
709 /// let task_ids = vec![
710 /// (task1.get_id(), Duration::from_secs(10)),
711 /// (task2.get_id(), Duration::from_secs(15)),
712 /// (task3.get_id(), Duration::from_secs(20)),
713 /// ];
714 ///
715 /// timer.register(task1);
716 /// timer.register(task2);
717 /// timer.register(task3);
718 ///
719 /// // Batch postpone (keep original callbacks)
720 /// // 批量推迟 (保持原始回调)
721 /// let postponed = timer.postpone_batch(task_ids);
722 /// println!("Postponed {} timers", postponed);
723 /// }
724 /// ```
725 #[inline]
726 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
727 let mut wheel = self.wheel.lock();
728 wheel.postpone_batch(updates)
729 }
730
731 /// Batch postpone timers (replace callbacks)
732 ///
733 /// # Parameters
734 /// - `updates`: List of tuples of (task ID, new delay, new callback)
735 ///
736 /// # Returns
737 /// Number of successfully postponed tasks
738 ///
739 /// 批量推迟定时器 (替换回调)
740 ///
741 /// # 参数
742 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
743 ///
744 /// # 返回值
745 /// 成功推迟的任务数量
746 ///
747 /// # Performance Advantages
748 /// - Batch processing reduces lock contention
749 /// - Internally optimized batch postponement operation
750 ///
751 /// # Examples (示例)
752 /// ```no_run
753 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
754 /// use std::time::Duration;
755 /// use std::sync::Arc;
756 /// use std::sync::atomic::{AtomicU32, Ordering};
757 ///
758 /// #[tokio::main]
759 /// async fn main() {
760 /// let timer = TimerWheel::with_defaults();
761 /// let counter = Arc::new(AtomicU32::new(0));
762 ///
763 /// // Create multiple timers
764 /// // 创建多个定时器
765 /// let task1 = TimerWheel::create_task(Duration::from_secs(5), None);
766 /// let task2 = TimerWheel::create_task(Duration::from_secs(5), None);
767 ///
768 /// let id1 = task1.get_id();
769 /// let id2 = task2.get_id();
770 ///
771 /// timer.register(task1);
772 /// timer.register(task2);
773 ///
774 /// // Batch postpone and replace callbacks
775 /// // 批量推迟并替换回调
776 /// let updates: Vec<_> = vec![id1, id2]
777 /// .into_iter()
778 /// .map(|id| {
779 /// let counter = Arc::clone(&counter);
780 /// (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
781 /// let counter = Arc::clone(&counter);
782 /// async move { counter.fetch_add(1, Ordering::SeqCst); }
783 /// })))
784 /// })
785 /// .collect();
786 /// let postponed = timer.postpone_batch_with_callbacks(updates);
787 /// println!("Postponed {} timers", postponed);
788 /// }
789 /// ```
790 #[inline]
791 pub fn postpone_batch_with_callbacks(
792 &self,
793 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
794 ) -> usize {
795 let mut wheel = self.wheel.lock();
796 wheel.postpone_batch_with_callbacks(updates.to_vec())
797 }
798
799 /// Core tick loop
800 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
801 let mut interval = tokio::time::interval(tick_duration);
802 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
803
804 loop {
805 interval.tick().await;
806
807 // Advance timing wheel and get expired tasks
808 let expired_tasks = {
809 let mut wheel_guard = wheel.lock();
810 wheel_guard.advance()
811 };
812
813 // Execute expired tasks
814 for task in expired_tasks {
815 let callback = task.get_callback();
816
817 // Move task ownership to get completion_notifier
818 let notifier = task.completion_notifier;
819
820 // Only registered tasks have notifier
821 if let Some(notifier) = notifier {
822 // Execute callback in a separate tokio task, and send notification after callback completion
823 if let Some(callback) = callback {
824 tokio::spawn(async move {
825 // Execute callback
826 let future = callback.call();
827 future.await;
828
829 // Send notification after callback completion
830 let _ = notifier.0.send(TaskCompletionReason::Expired);
831 });
832 } else {
833 // Send notification immediately if no callback
834 let _ = notifier.0.send(TaskCompletionReason::Expired);
835 }
836 }
837 }
838 }
839 }
840
841 /// Graceful shutdown of TimerWheel
842 ///
843 /// 优雅关闭 TimerWheel
844 ///
845 /// # Examples (示例)
846 /// ```no_run
847 /// # use kestrel_timer::TimerWheel;
848 /// # #[tokio::main]
849 /// # async fn main() {
850 /// let timer = TimerWheel::with_defaults();
851 ///
852 /// // Use timer... (使用定时器...)
853 ///
854 /// timer.shutdown().await;
855 /// # }
856 /// ```
857 pub async fn shutdown(mut self) {
858 if let Some(handle) = self.tick_handle.take() {
859 handle.abort();
860 let _ = handle.await;
861 }
862 }
863}
864
865impl Drop for TimerWheel {
866 fn drop(&mut self) {
867 if let Some(handle) = self.tick_handle.take() {
868 handle.abort();
869 }
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876 use std::sync::atomic::{AtomicU32, Ordering};
877
878 #[tokio::test]
879 async fn test_timer_creation() {
880 let _timer = TimerWheel::with_defaults();
881 }
882
883 #[tokio::test]
884 async fn test_schedule_once() {
885 use std::sync::Arc;
886 let timer = TimerWheel::with_defaults();
887 let counter = Arc::new(AtomicU32::new(0));
888 let counter_clone = Arc::clone(&counter);
889
890 let task = TimerWheel::create_task(
891 Duration::from_millis(50),
892 Some(CallbackWrapper::new(move || {
893 let counter = Arc::clone(&counter_clone);
894 async move {
895 counter.fetch_add(1, Ordering::SeqCst);
896 }
897 })),
898 );
899 let _handle = timer.register(task);
900
901 // Wait for timer to trigger
902 // 等待定时器触发
903 tokio::time::sleep(Duration::from_millis(100)).await;
904 assert_eq!(counter.load(Ordering::SeqCst), 1);
905 }
906
907 #[tokio::test]
908 async fn test_cancel_timer() {
909 use std::sync::Arc;
910 let timer = TimerWheel::with_defaults();
911 let counter = Arc::new(AtomicU32::new(0));
912 let counter_clone = Arc::clone(&counter);
913
914 let task = TimerWheel::create_task(
915 Duration::from_millis(100),
916 Some(CallbackWrapper::new(move || {
917 let counter = Arc::clone(&counter_clone);
918 async move {
919 counter.fetch_add(1, Ordering::SeqCst);
920 }
921 })),
922 );
923 let handle = timer.register(task);
924
925 // Immediately cancel
926 // 立即取消
927 let cancel_result = handle.cancel();
928 assert!(cancel_result);
929
930 // Wait for enough time to ensure timer does not trigger
931 // 等待足够时间确保定时器不触发
932 tokio::time::sleep(Duration::from_millis(200)).await;
933 assert_eq!(counter.load(Ordering::SeqCst), 0);
934 }
935
936 #[tokio::test]
937 async fn test_cancel_immediate() {
938 use std::sync::Arc;
939 let timer = TimerWheel::with_defaults();
940 let counter = Arc::new(AtomicU32::new(0));
941 let counter_clone = Arc::clone(&counter);
942
943 let task = TimerWheel::create_task(
944 Duration::from_millis(100),
945 Some(CallbackWrapper::new(move || {
946 let counter = Arc::clone(&counter_clone);
947 async move {
948 counter.fetch_add(1, Ordering::SeqCst);
949 }
950 })),
951 );
952 let handle = timer.register(task);
953
954 // Immediately cancel
955 // 立即取消
956 let cancel_result = handle.cancel();
957 assert!(cancel_result);
958
959 // Wait for enough time to ensure timer does not trigger
960 // 等待足够时间确保定时器不触发
961 tokio::time::sleep(Duration::from_millis(200)).await;
962 assert_eq!(counter.load(Ordering::SeqCst), 0);
963 }
964
965 #[tokio::test]
966 async fn test_postpone_timer() {
967 use std::sync::Arc;
968 let timer = TimerWheel::with_defaults();
969 let counter = Arc::new(AtomicU32::new(0));
970 let counter_clone = Arc::clone(&counter);
971
972 let task = TimerWheel::create_task(
973 Duration::from_millis(50),
974 Some(CallbackWrapper::new(move || {
975 let counter = Arc::clone(&counter_clone);
976 async move {
977 counter.fetch_add(1, Ordering::SeqCst);
978 }
979 })),
980 );
981 let task_id = task.get_id();
982 let handle = timer.register(task);
983
984 // Postpone task to 150ms
985 // 推迟任务到 150ms
986 let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
987 assert!(postponed);
988
989 // Wait for original time 50ms, task should not trigger
990 // 等待原始时间 50ms,任务不应触发
991 tokio::time::sleep(Duration::from_millis(70)).await;
992 assert_eq!(counter.load(Ordering::SeqCst), 0);
993
994 // Wait for new trigger time (from postponed start, need to wait about 150ms)
995 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
996 let (rx, _handle) = handle.into_parts();
997 let result = tokio::time::timeout(
998 Duration::from_millis(200),
999 rx.0
1000 ).await;
1001 assert!(result.is_ok());
1002
1003 // Wait for callback to execute
1004 // 等待回调执行
1005 tokio::time::sleep(Duration::from_millis(20)).await;
1006 assert_eq!(counter.load(Ordering::SeqCst), 1);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_postpone_with_callback() {
1011 use std::sync::Arc;
1012 let timer = TimerWheel::with_defaults();
1013 let counter = Arc::new(AtomicU32::new(0));
1014 let counter_clone1 = Arc::clone(&counter);
1015 let counter_clone2 = Arc::clone(&counter);
1016
1017 // Create task, original callback adds 1
1018 let task = TimerWheel::create_task(
1019 Duration::from_millis(50),
1020 Some(CallbackWrapper::new(move || {
1021 let counter = Arc::clone(&counter_clone1);
1022 async move {
1023 counter.fetch_add(1, Ordering::SeqCst);
1024 }
1025 })),
1026 );
1027 let task_id = task.get_id();
1028 let handle = timer.register(task);
1029
1030 // Postpone task and replace callback, new callback adds 10
1031 // 推迟任务并替换回调,新回调增加 10
1032 let postponed = timer.postpone(
1033 task_id,
1034 Duration::from_millis(100),
1035 Some(CallbackWrapper::new(move || {
1036 let counter = Arc::clone(&counter_clone2);
1037 async move {
1038 counter.fetch_add(10, Ordering::SeqCst);
1039 }
1040 })),
1041 );
1042 assert!(postponed);
1043
1044 // Wait for task to trigger (after postponed, need to wait 100ms, plus margin)
1045 // 等待任务触发(推迟后,需要等待 100ms,加上余量)
1046 let (rx, _handle) = handle.into_parts();
1047 let result = tokio::time::timeout(
1048 Duration::from_millis(200),
1049 rx.0
1050 ).await;
1051 assert!(result.is_ok());
1052
1053 // Wait for callback to execute
1054 // 等待回调执行
1055 tokio::time::sleep(Duration::from_millis(20)).await;
1056
1057 // Verify new callback is executed (increased 10 instead of 1)
1058 // 验证新回调已执行(增加 10 而不是 1)
1059 assert_eq!(counter.load(Ordering::SeqCst), 10);
1060 }
1061
1062 #[tokio::test]
1063 async fn test_postpone_nonexistent_timer() {
1064 let timer = TimerWheel::with_defaults();
1065
1066 // Try to postpone nonexistent task
1067 // 尝试推迟一个不存在的任务
1068 let fake_task = TimerWheel::create_task(Duration::from_millis(50), None);
1069 let fake_task_id = fake_task.get_id();
1070 // Do not register this task
1071 // 不注册这个任务
1072 let postponed = timer.postpone(fake_task_id, Duration::from_millis(100), None);
1073 assert!(!postponed);
1074 }
1075
1076 #[tokio::test]
1077 async fn test_postpone_batch() {
1078 use std::sync::Arc;
1079 let timer = TimerWheel::with_defaults();
1080 let counter = Arc::new(AtomicU32::new(0));
1081
1082 // Create 3 tasks
1083 // 创建 3 个任务
1084 let mut task_ids = Vec::new();
1085 for _ in 0..3 {
1086 let counter_clone = Arc::clone(&counter);
1087 let task = TimerWheel::create_task(
1088 Duration::from_millis(50),
1089 Some(CallbackWrapper::new(move || {
1090 let counter = Arc::clone(&counter_clone);
1091 async move {
1092 counter.fetch_add(1, Ordering::SeqCst);
1093 }
1094 })),
1095 );
1096 task_ids.push((task.get_id(), Duration::from_millis(150)));
1097 timer.register(task);
1098 }
1099
1100 // Batch postpone
1101 // 批量推迟
1102 let postponed = timer.postpone_batch(task_ids);
1103 assert_eq!(postponed, 3);
1104
1105 // Wait for original time 50ms, task should not trigger
1106 // 等待原始时间 50ms,任务不应触发
1107 tokio::time::sleep(Duration::from_millis(70)).await;
1108 assert_eq!(counter.load(Ordering::SeqCst), 0);
1109
1110 // Wait for new trigger time (from postponed start, need to wait about 150ms)
1111 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1112 tokio::time::sleep(Duration::from_millis(200)).await;
1113
1114 // Wait for callback to execute
1115 // 等待回调执行
1116 tokio::time::sleep(Duration::from_millis(20)).await;
1117 assert_eq!(counter.load(Ordering::SeqCst), 3);
1118 }
1119
1120 #[tokio::test]
1121 async fn test_postpone_batch_with_callbacks() {
1122 use std::sync::Arc;
1123 let timer = TimerWheel::with_defaults();
1124 let counter = Arc::new(AtomicU32::new(0));
1125
1126 // Create 3 tasks
1127 // 创建 3 个任务
1128 let mut task_ids = Vec::new();
1129 for _ in 0..3 {
1130 let task = TimerWheel::create_task(
1131 Duration::from_millis(50),
1132 None
1133 );
1134 task_ids.push(task.get_id());
1135 timer.register(task);
1136 }
1137
1138 // Batch postpone and replace callbacks
1139 // 批量推迟并替换回调
1140 let updates: Vec<_> = task_ids
1141 .into_iter()
1142 .map(|id| {
1143 let counter_clone = Arc::clone(&counter);
1144 (id, Duration::from_millis(150), Some(CallbackWrapper::new(move || {
1145 let counter = Arc::clone(&counter_clone);
1146 async move {
1147 counter.fetch_add(1, Ordering::SeqCst);
1148 }
1149 })))
1150 })
1151 .collect();
1152
1153 // Batch postpone and replace callbacks
1154 // 批量推迟并替换回调
1155 let postponed = timer.postpone_batch_with_callbacks(updates);
1156 assert_eq!(postponed, 3);
1157
1158 // Wait for original time 50ms, task should not trigger
1159 // 等待原始时间 50ms,任务不应触发
1160 tokio::time::sleep(Duration::from_millis(70)).await;
1161 assert_eq!(counter.load(Ordering::SeqCst), 0);
1162
1163 // Wait for new trigger time (from postponed start, need to wait about 150ms)
1164 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1165 tokio::time::sleep(Duration::from_millis(200)).await;
1166
1167 // Wait for callback to execute
1168 // 等待回调执行
1169 tokio::time::sleep(Duration::from_millis(20)).await;
1170 assert_eq!(counter.load(Ordering::SeqCst), 3);
1171 }
1172
1173 #[tokio::test]
1174 async fn test_postpone_keeps_completion_receiver_valid() {
1175 use std::sync::Arc;
1176 let timer = TimerWheel::with_defaults();
1177 let counter = Arc::new(AtomicU32::new(0));
1178 let counter_clone = Arc::clone(&counter);
1179
1180 let task = TimerWheel::create_task(
1181 Duration::from_millis(50),
1182 Some(CallbackWrapper::new(move || {
1183 let counter = Arc::clone(&counter_clone);
1184 async move {
1185 counter.fetch_add(1, Ordering::SeqCst);
1186 }
1187 })),
1188 );
1189 let task_id = task.get_id();
1190 let handle = timer.register(task);
1191
1192 // Postpone task
1193 // 推迟任务
1194 timer.postpone(task_id, Duration::from_millis(100), None);
1195
1196 // Verify original completion_receiver is still valid (after postponed, need to wait 100ms, plus margin)
1197 // 验证原始完成接收器是否仍然有效(推迟后,需要等待 100ms,加上余量)
1198 let (rx, _handle) = handle.into_parts();
1199 let result = tokio::time::timeout(
1200 Duration::from_millis(200),
1201 rx.0
1202 ).await;
1203 assert!(result.is_ok(), "Completion receiver should still work after postpone");
1204
1205 // Wait for callback to execute
1206 tokio::time::sleep(Duration::from_millis(20)).await;
1207 assert_eq!(counter.load(Ordering::SeqCst), 1);
1208 }
1209}
1210