kestrel_timer/timer.rs
1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskHandle, TaskId};
5use crate::wheel::Wheel;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::task::JoinHandle;
10use handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
11
12/// Timing Wheel Timer Manager
13///
14/// 时间轮定时器管理器
15pub struct TimerWheel {
16 /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
17 ///
18 /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
19 wheel: Arc<Mutex<Wheel>>,
20
21 /// Background tick loop task handle
22 ///
23 /// 后台 tick 循环任务句柄
24 tick_handle: Option<JoinHandle<()>>,
25}
26
27impl TimerWheel {
28 /// Create a new timer manager
29 ///
30 /// # Parameters
31 /// - `config`: Timing wheel configuration, already validated
32 /// - `batch_config`: Batch operation configuration
33 ///
34 /// 创建新的定时器管理器
35 ///
36 /// # 参数
37 /// - `config`: 时间轮配置,已验证
38 /// - `batch_config`: 批量操作配置
39 ///
40 /// # Examples (示例)
41 /// ```no_run
42 /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
43 /// use std::time::Duration;
44 ///
45 /// #[tokio::main]
46 /// async fn main() {
47 /// let config = WheelConfig::builder()
48 /// .l0_tick_duration(Duration::from_millis(10))
49 /// .l0_slot_count(512)
50 /// .l1_tick_duration(Duration::from_secs(1))
51 /// .l1_slot_count(64)
52 /// .build()
53 /// .unwrap();
54 /// let timer = TimerWheel::new(config, BatchConfig::default());
55 ///
56 /// // Use two-step API: allocate handle first, then register
57 /// // 使用两步 API:先分配 handle,再注册
58 /// let handle = timer.allocate_handle();
59 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
60 /// let _timer_handle = timer.register(handle, task);
61 /// }
62 /// ```
63 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
64 let tick_duration = config.l0_tick_duration;
65 let wheel = Wheel::new(config, batch_config);
66 let wheel = Arc::new(Mutex::new(wheel));
67 let wheel_clone = wheel.clone();
68
69 // Start background tick loop
70 // 启动后台 tick 循环
71 let tick_handle = tokio::spawn(async move {
72 Self::tick_loop(wheel_clone, tick_duration).await;
73 });
74
75 Self {
76 wheel,
77 tick_handle: Some(tick_handle),
78 }
79 }
80
81 /// Create timer manager with default configuration, hierarchical mode
82 /// - L0 layer tick duration: 10ms, slot count: 512
83 /// - L1 layer tick duration: 1s, slot count: 64
84 ///
85 /// # Returns
86 /// Timer manager instance
87 ///
88 /// 使用默认配置创建定时器管理器,分层模式
89 /// - L0 层 tick 持续时间:10ms,槽数量:512
90 /// - L1 层 tick 持续时间:1s,槽数量:64
91 ///
92 /// # 返回值
93 /// 定时器管理器实例
94 ///
95 /// # Examples (示例)
96 /// ```no_run
97 /// use kestrel_timer::TimerWheel;
98 ///
99 /// #[tokio::main]
100 /// async fn main() {
101 /// let timer = TimerWheel::with_defaults();
102 /// }
103 /// ```
104 pub fn with_defaults() -> Self {
105 Self::new(WheelConfig::default(), BatchConfig::default())
106 }
107
108 /// Create TimerService bound to this timing wheel with default configuration
109 ///
110 /// # Parameters
111 /// - `service_config`: Service configuration
112 ///
113 /// # Returns
114 /// TimerService instance bound to this timing wheel
115 ///
116 /// 创建绑定到此时间轮的 TimerService,使用默认配置
117 ///
118 /// # 参数
119 /// - `service_config`: 服务配置
120 ///
121 /// # 返回值
122 /// 绑定到此时间轮的 TimerService 实例
123 ///
124 /// # Examples (示例)
125 /// ```no_run
126 /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
127 /// use std::time::Duration;
128 ///
129 ///
130 /// #[tokio::main]
131 /// async fn main() {
132 /// let timer = TimerWheel::with_defaults();
133 /// let mut service = timer.create_service(ServiceConfig::default());
134 ///
135 /// // Use two-step API to batch schedule timers through service
136 /// // 使用两步 API 通过服务批量调度定时器
137 /// // Step 1: Allocate handles
138 /// let handles = service.allocate_handles(5);
139 ///
140 /// // Step 2: Create tasks
141 /// let tasks: Vec<_> = (0..5)
142 /// .map(|_| {
143 /// use kestrel_timer::TimerTask;
144 /// TimerTask::new_oneshot(Duration::from_millis(100), Some(CallbackWrapper::new(|| async {})))
145 /// })
146 /// .collect();
147 ///
148 /// // Step 3: Register batch
149 /// service.register_batch(handles, tasks).unwrap();
150 ///
151 /// // Receive timeout notifications
152 /// // 接收超时通知
153 /// let mut rx = service.take_receiver().unwrap();
154 /// while let Some(task_id) = rx.recv().await {
155 /// println!("Task {:?} completed", task_id);
156 /// }
157 /// }
158 /// ```
159 pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
160 crate::service::TimerService::new(self.wheel.clone(), service_config)
161 }
162
163 /// Create TimerService bound to this timing wheel with custom configuration
164 ///
165 /// # Parameters
166 /// - `config`: Service configuration
167 ///
168 /// # Returns
169 /// TimerService instance bound to this timing wheel
170 ///
171 /// 创建绑定到此时间轮的 TimerService,使用自定义配置
172 ///
173 /// # 参数
174 /// - `config`: 服务配置
175 ///
176 /// # 返回值
177 /// 绑定到此时间轮的 TimerService 实例
178 ///
179 /// # Examples (示例)
180 /// ```no_run
181 /// use kestrel_timer::{TimerWheel, config::ServiceConfig, TimerTask};
182 /// use std::num::NonZeroUsize;
183 ///
184 /// #[tokio::main]
185 /// async fn main() {
186 /// let timer = TimerWheel::with_defaults();
187 /// let config = ServiceConfig::builder()
188 /// .command_channel_capacity(NonZeroUsize::new(1024).unwrap())
189 /// .timeout_channel_capacity(NonZeroUsize::new(2000).unwrap())
190 /// .build();
191 /// let service = timer.create_service_with_config(config);
192 /// }
193 /// ```
194 pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
195 crate::service::TimerService::new(self.wheel.clone(), config)
196 }
197
198
199 /// Allocate a handle from DeferredMap
200 ///
201 /// # Returns
202 /// A unique handle for later insertion
203 ///
204 /// # 返回值
205 /// 用于后续插入的唯一 handle
206 pub fn allocate_handle(&self) -> TaskHandle {
207 self.wheel.lock().allocate_handle()
208 }
209
210 /// Batch allocate handles from DeferredMap
211 ///
212 /// # Parameters
213 /// - `count`: Number of handles to allocate
214 ///
215 /// # Returns
216 /// Vector of unique handles for later batch insertion
217 ///
218 /// # 参数
219 /// - `count`: 要分配的 handle 数量
220 ///
221 /// # 返回值
222 /// 用于后续批量插入的唯一 handles 向量
223 pub fn allocate_handles(&self, count: usize) -> Vec<TaskHandle> {
224 self.wheel.lock().allocate_handles(count)
225 }
226
227 /// Register timer task to timing wheel (registration phase)
228 ///
229 /// # Parameters
230 /// - `task`: Task created via `create_task()`
231 ///
232 /// # Returns
233 /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
234 ///
235 /// 注册定时器任务到时间轮 (注册阶段)
236 ///
237 /// # 参数
238 /// - `task`: 通过 `create_task()` 创建的任务
239 ///
240 /// # 返回值
241 /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
242 ///
243 /// # Examples (示例)
244 /// ```no_run
245 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
246 ///
247 /// use std::time::Duration;
248 ///
249 /// #[tokio::main]
250 /// async fn main() {
251 /// let timer = TimerWheel::with_defaults();
252 ///
253 /// // Step 1: Allocate handle
254 /// let allocated_handle = timer.allocate_handle();
255 /// let task_id = allocated_handle.task_id();
256 ///
257 /// // Step 2: Create task
258 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
259 /// println!("Timer fired!");
260 /// })));
261 ///
262 /// // Step 3: Register task
263 /// let handle = timer.register(allocated_handle, task);
264 ///
265 /// // Wait for timer completion
266 /// // 等待定时器完成
267 /// use kestrel_timer::CompletionReceiver;
268 /// let (rx, _handle) = handle.into_parts();
269 /// match rx {
270 /// CompletionReceiver::OneShot(receiver) => {
271 /// receiver.wait().await;
272 /// },
273 /// _ => {}
274 /// }
275 /// }
276 /// ```
277 #[inline]
278 pub fn register(&self, handle: TaskHandle, task: crate::task::TimerTask) -> TimerHandleWithCompletion {
279 let task_id = handle.task_id();
280
281 let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
282
283 // Single lock to complete all operations
284 // 单次加锁完成所有操作
285 let mut wheel_guard = self.wheel.lock();
286 wheel_guard.insert(handle, task);
287
288 TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
289 }
290
291 /// Batch register timer tasks to timing wheel (registration phase)
292 ///
293 /// # Parameters
294 /// - `handles`: Pre-allocated handles for tasks
295 /// - `tasks`: List of timer tasks
296 ///
297 /// # Returns
298 /// - `Ok(BatchHandleWithCompletion)` if all tasks are successfully registered
299 /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
300 ///
301 /// 批量注册定时器任务到时间轮 (注册阶段)
302 ///
303 /// # 参数
304 /// - `handles`: 任务的预分配 handles
305 /// - `tasks`: 定时器任务列表
306 ///
307 /// # 返回值
308 /// - `Ok(BatchHandleWithCompletion)` 如果所有任务成功注册
309 /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
310 ///
311 /// # Examples (示例)
312 /// ```no_run
313 /// use kestrel_timer::{TimerWheel, TimerTask};
314 /// use std::time::Duration;
315 ///
316 /// #[tokio::main]
317 /// async fn main() {
318 /// let timer = TimerWheel::with_defaults();
319 ///
320 /// // Step 1: Allocate handles
321 /// let handles = timer.allocate_handles(3);
322 ///
323 /// // Step 2: Create tasks
324 /// let tasks: Vec<_> = (0..3)
325 /// .map(|_| TimerTask::new_oneshot(Duration::from_secs(1), None))
326 /// .collect();
327 ///
328 /// // Step 3: Batch register
329 /// let batch = timer.register_batch(handles, tasks)
330 /// .expect("register_batch should succeed");
331 /// println!("Registered {} timers", batch.len());
332 /// }
333 /// ```
334 #[inline]
335 pub fn register_batch(
336 &self,
337 handles: Vec<TaskHandle>,
338 tasks: Vec<crate::task::TimerTask>
339 ) -> Result<BatchHandleWithCompletion, crate::error::TimerError> {
340 // Validate lengths match
341 if handles.len() != tasks.len() {
342 return Err(crate::error::TimerError::BatchLengthMismatch {
343 handles_len: handles.len(),
344 tasks_len: tasks.len(),
345 });
346 }
347
348 let task_count = tasks.len();
349 let mut completion_rxs = Vec::with_capacity(task_count);
350 let mut task_ids = Vec::with_capacity(task_count);
351 let mut prepared_handles = Vec::with_capacity(task_count);
352 let mut prepared_tasks = Vec::with_capacity(task_count);
353
354 // Step 1: Prepare all channels and notifiers
355 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
356 let task_id = handle.task_id();
357 let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
358 task_ids.push(task_id);
359 completion_rxs.push(completion_rx);
360 prepared_handles.push(handle);
361 prepared_tasks.push(task);
362 }
363
364 // Step 2: Single lock, batch insert
365 {
366 let mut wheel_guard = self.wheel.lock();
367 wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
368 }
369
370 Ok(BatchHandleWithCompletion::new(BatchHandle::new(task_ids, self.wheel.clone()), completion_rxs))
371 }
372
373 /// Cancel timer
374 ///
375 /// # Parameters
376 /// - `task_id`: Task ID
377 ///
378 /// # Returns
379 /// Returns true if task exists and is successfully cancelled, otherwise false
380 ///
381 /// 取消定时器
382 ///
383 /// # 参数
384 /// - `task_id`: 任务 ID
385 ///
386 /// # 返回值
387 /// 如果任务存在且成功取消则返回 true,否则返回 false
388 ///
389 /// # Examples (示例)
390 /// ```no_run
391 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
392 ///
393 /// use std::time::Duration;
394 ///
395 /// #[tokio::main]
396 /// async fn main() {
397 /// let timer = TimerWheel::with_defaults();
398 ///
399 /// // Step 1: Allocate handle
400 /// let allocated_handle = timer.allocate_handle();
401 /// let task_id = allocated_handle.task_id();
402 ///
403 /// // Step 2: Create and register task
404 /// let task = TimerTask::new_oneshot(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
405 /// println!("Timer fired!");
406 /// })));
407 /// let _handle = timer.register(allocated_handle, task);
408 ///
409 /// // Cancel task using task ID
410 /// // 使用任务 ID 取消任务
411 /// let cancelled = timer.cancel(task_id);
412 /// println!("Canceled successfully: {}", cancelled);
413 /// }
414 /// ```
415 #[inline]
416 pub fn cancel(&self, task_id: TaskId) -> bool {
417 let mut wheel = self.wheel.lock();
418 wheel.cancel(task_id)
419 }
420
421 /// Batch cancel timers
422 ///
423 /// # Parameters
424 /// - `task_ids`: List of task IDs to cancel
425 ///
426 /// # Returns
427 /// Number of successfully cancelled tasks
428 ///
429 /// 批量取消定时器
430 ///
431 /// # 参数
432 /// - `task_ids`: 要取消的任务 ID 列表
433 ///
434 /// # 返回值
435 /// 成功取消的任务数量
436 ///
437 /// # Performance Advantages
438 /// - Batch processing reduces lock contention
439 /// - Internally optimized batch cancellation operation
440 ///
441 /// # Examples (示例)
442 /// ```no_run
443 /// use kestrel_timer::{TimerWheel, TimerTask};
444 /// use std::time::Duration;
445 ///
446 /// #[tokio::main]
447 /// async fn main() {
448 /// let timer = TimerWheel::with_defaults();
449 ///
450 /// // Create multiple timers
451 /// // 创建多个定时器
452 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
453 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
454 /// let task3 = TimerTask::new_oneshot(Duration::from_secs(10), None);
455 ///
456 /// // Allocate handles and get task IDs
457 /// let h1 = timer.allocate_handle();
458 /// let h2 = timer.allocate_handle();
459 /// let h3 = timer.allocate_handle();
460 /// let task_ids = vec![h1.task_id(), h2.task_id(), h3.task_id()];
461 ///
462 /// let _h1 = timer.register(h1, task1);
463 /// let _h2 = timer.register(h2, task2);
464 /// let _h3 = timer.register(h3, task3);
465 ///
466 /// // Batch cancel
467 /// // 批量取消
468 /// let cancelled = timer.cancel_batch(&task_ids);
469 /// println!("Canceled {} timers", cancelled);
470 /// }
471 /// ```
472 #[inline]
473 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
474 let mut wheel = self.wheel.lock();
475 wheel.cancel_batch(task_ids)
476 }
477
478 /// Postpone timer
479 ///
480 /// # Parameters
481 /// - `task_id`: Task ID to postpone
482 /// - `new_delay`: New delay duration, recalculated from current time
483 /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
484 ///
485 /// # Returns
486 /// Returns true if task exists and is successfully postponed, otherwise false
487 ///
488 /// 推迟定时器
489 ///
490 /// # 参数
491 /// - `task_id`: 要推迟的任务 ID
492 /// - `new_delay`: 新的延迟时间,从当前时间重新计算
493 /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
494 ///
495 /// # 返回值
496 /// 如果任务存在且成功推迟则返回 true,否则返回 false
497 ///
498 /// # Note
499 /// - Task ID remains unchanged after postponement
500 /// - Original completion_receiver remains valid
501 ///
502 /// # 注意
503 /// - 任务 ID 在推迟后保持不变
504 /// - 原始 completion_receiver 保持有效
505 ///
506 /// # Examples (示例)
507 ///
508 /// ## Keep original callback (保持原始回调)
509 /// ```no_run
510 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
511 /// use std::time::Duration;
512 ///
513 ///
514 /// #[tokio::main]
515 /// async fn main() {
516 /// let timer = TimerWheel::with_defaults();
517 ///
518 /// // Allocate handle first
519 /// let allocated_handle = timer.allocate_handle();
520 /// let task_id = allocated_handle.task_id();
521 ///
522 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
523 /// println!("Timer fired!");
524 /// })));
525 /// let _handle = timer.register(allocated_handle, task);
526 ///
527 /// // Postpone to 10 seconds after triggering, and keep original callback
528 /// // 推迟到 10 秒后触发,并保持原始回调
529 /// let success = timer.postpone(task_id, Duration::from_secs(10), None);
530 /// println!("Postponed successfully: {}", success);
531 /// }
532 /// ```
533 ///
534 /// ## Replace with new callback (替换为新的回调)
535 /// ```no_run
536 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
537 /// use std::time::Duration;
538 ///
539 /// #[tokio::main]
540 /// async fn main() {
541 /// let timer = TimerWheel::with_defaults();
542 ///
543 /// // Allocate handle first
544 /// let allocated_handle = timer.allocate_handle();
545 /// let task_id = allocated_handle.task_id();
546 ///
547 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
548 /// println!("Original callback!");
549 /// })));
550 /// let _handle = timer.register(allocated_handle, task);
551 ///
552 /// // Postpone to 10 seconds after triggering, and replace with new callback
553 /// // 推迟到 10 秒后触发,并替换为新的回调
554 /// let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
555 /// println!("New callback!");
556 /// })));
557 /// println!("Postponed successfully: {}", success);
558 /// }
559 /// ```
560 #[inline]
561 pub fn postpone(
562 &self,
563 task_id: TaskId,
564 new_delay: Duration,
565 callback: Option<CallbackWrapper>,
566 ) -> bool {
567 let mut wheel = self.wheel.lock();
568 wheel.postpone(task_id, new_delay, callback)
569 }
570
571 /// Batch postpone timers (keep original callbacks)
572 ///
573 /// # Parameters
574 /// - `updates`: List of tuples of (task ID, new delay)
575 ///
576 /// # Returns
577 /// Number of successfully postponed tasks
578 ///
579 /// 批量推迟定时器 (保持原始回调)
580 ///
581 /// # 参数
582 /// - `updates`: (任务 ID, 新延迟) 元组列表
583 ///
584 /// # 返回值
585 /// 成功推迟的任务数量
586 ///
587 /// # Note
588 /// - This method keeps all tasks' original callbacks unchanged
589 /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
590 ///
591 /// # 注意
592 /// - 此方法保持所有任务的原始回调不变
593 /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
594 ///
595 /// # Performance Advantages
596 /// - Batch processing reduces lock contention
597 /// - Internally optimized batch postponement operation
598 ///
599 /// # Examples (示例)
600 /// ```no_run
601 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
602 /// use std::time::Duration;
603 ///
604 /// #[tokio::main]
605 /// async fn main() {
606 /// let timer = TimerWheel::with_defaults();
607 ///
608 /// // Create multiple tasks with callbacks
609 /// // 创建多个带有回调的任务
610 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
611 /// println!("Task 1 fired!");
612 /// })));
613 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
614 /// println!("Task 2 fired!");
615 /// })));
616 /// let task3 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
617 /// println!("Task 3 fired!");
618 /// })));
619 ///
620 /// // Allocate handles and register
621 /// let h1 = timer.allocate_handle();
622 /// let h2 = timer.allocate_handle();
623 /// let h3 = timer.allocate_handle();
624 ///
625 /// let task_ids = vec![
626 /// (h1.task_id(), Duration::from_secs(10)),
627 /// (h2.task_id(), Duration::from_secs(15)),
628 /// (h3.task_id(), Duration::from_secs(20)),
629 /// ];
630 ///
631 /// timer.register(h1, task1);
632 /// timer.register(h2, task2);
633 /// timer.register(h3, task3);
634 ///
635 /// // Batch postpone (keep original callbacks)
636 /// // 批量推迟 (保持原始回调)
637 /// let postponed = timer.postpone_batch(task_ids);
638 /// println!("Postponed {} timers", postponed);
639 /// }
640 /// ```
641 #[inline]
642 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
643 let mut wheel = self.wheel.lock();
644 wheel.postpone_batch(updates)
645 }
646
647 /// Batch postpone timers (replace callbacks)
648 ///
649 /// # Parameters
650 /// - `updates`: List of tuples of (task ID, new delay, new callback)
651 ///
652 /// # Returns
653 /// Number of successfully postponed tasks
654 ///
655 /// 批量推迟定时器 (替换回调)
656 ///
657 /// # 参数
658 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
659 ///
660 /// # 返回值
661 /// 成功推迟的任务数量
662 ///
663 /// # Performance Advantages
664 /// - Batch processing reduces lock contention
665 /// - Internally optimized batch postponement operation
666 ///
667 /// # Examples (示例)
668 /// ```no_run
669 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
670 /// use std::time::Duration;
671 /// use std::sync::Arc;
672 /// use std::sync::atomic::{AtomicU32, Ordering};
673 ///
674 /// #[tokio::main]
675 /// async fn main() {
676 /// let timer = TimerWheel::with_defaults();
677 /// let counter = Arc::new(AtomicU32::new(0));
678 ///
679 /// // Create multiple timers
680 /// // 创建多个定时器
681 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(5), None);
682 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(5), None);
683 ///
684 /// // Allocate handles first
685 /// let h1 = timer.allocate_handle();
686 /// let h2 = timer.allocate_handle();
687 /// let id1 = h1.task_id();
688 /// let id2 = h2.task_id();
689 ///
690 /// timer.register(h1, task1);
691 /// timer.register(h2, task2);
692 ///
693 /// // Batch postpone and replace callbacks
694 /// // 批量推迟并替换回调
695 /// let updates: Vec<_> = vec![id1, id2]
696 /// .into_iter()
697 /// .map(|id| {
698 /// let counter = Arc::clone(&counter);
699 /// (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
700 /// let counter = Arc::clone(&counter);
701 /// async move { counter.fetch_add(1, Ordering::SeqCst); }
702 /// })))
703 /// })
704 /// .collect();
705 /// let postponed = timer.postpone_batch_with_callbacks(updates);
706 /// println!("Postponed {} timers", postponed);
707 /// }
708 /// ```
709 #[inline]
710 pub fn postpone_batch_with_callbacks(
711 &self,
712 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
713 ) -> usize {
714 let mut wheel = self.wheel.lock();
715 wheel.postpone_batch_with_callbacks(updates.to_vec())
716 }
717
718 /// Core tick loop
719 ///
720 /// Background task that advances the timing wheel at regular intervals
721 ///
722 /// # Parameters
723 /// - `wheel`: Shared timing wheel instance
724 /// - `tick_duration`: Duration between ticks
725 ///
726 /// 核心 tick 循环
727 ///
728 /// 定期推进时间轮的后台任务
729 ///
730 /// # 参数
731 /// - `wheel`: 共享的时间轮实例
732 /// - `tick_duration`: tick 之间的持续时间
733 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
734 let mut interval = tokio::time::interval(tick_duration);
735 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
736
737 loop {
738 interval.tick().await;
739
740 // Advance timing wheel and get expired tasks
741 // Note: wheel.advance() already handles completion notifications
742 let expired_tasks = {
743 let mut wheel_guard = wheel.lock();
744 wheel_guard.advance()
745 };
746
747 // Execute callbacks for expired tasks
748 // Notifications have already been sent by wheel.advance()
749 for task in expired_tasks {
750 if let Some(callback) = task.callback {
751 // Spawn callback execution in a separate tokio task
752 tokio::spawn(async move {
753 let future = callback.call();
754 future.await;
755 });
756 }
757 }
758 }
759 }
760
761 /// Graceful shutdown of TimerWheel
762 ///
763 /// 优雅关闭 TimerWheel
764 ///
765 /// # Examples (示例)
766 /// ```no_run
767 /// # use kestrel_timer::TimerWheel;
768 /// # #[tokio::main]
769 /// # async fn main() {
770 /// let timer = TimerWheel::with_defaults();
771 ///
772 /// // Use timer... (使用定时器...)
773 ///
774 /// timer.shutdown().await;
775 /// # }
776 /// ```
777 pub async fn shutdown(mut self) {
778 if let Some(handle) = self.tick_handle.take() {
779 handle.abort();
780 let _ = handle.await;
781 }
782 }
783}
784
785/// Automatically abort the background tick task when TimerWheel is dropped
786///
787/// 当 TimerWheel 被销毁时自动中止后台 tick 任务
788impl Drop for TimerWheel {
789 fn drop(&mut self) {
790 if let Some(handle) = self.tick_handle.take() {
791 handle.abort();
792 }
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799 use std::sync::atomic::{AtomicU32, Ordering};
800 use crate::task::TimerTask;
801
802 #[tokio::test]
803 async fn test_timer_creation() {
804 let _timer = TimerWheel::with_defaults();
805 }
806
807 #[tokio::test]
808 async fn test_schedule_once() {
809 use std::sync::Arc;
810 let timer = TimerWheel::with_defaults();
811 let counter = Arc::new(AtomicU32::new(0));
812 let counter_clone = Arc::clone(&counter);
813
814 let task = TimerTask::new_oneshot(
815 Duration::from_millis(50),
816 Some(CallbackWrapper::new(move || {
817 let counter = Arc::clone(&counter_clone);
818 async move {
819 counter.fetch_add(1, Ordering::SeqCst);
820 }
821 })),
822 );
823 let allocate_handle = timer.allocate_handle();
824 let _handle = timer.register(allocate_handle, task);
825
826 // Wait for timer to trigger
827 // 等待定时器触发
828 tokio::time::sleep(Duration::from_millis(100)).await;
829 assert_eq!(counter.load(Ordering::SeqCst), 1);
830 }
831}
832