kestrel_timer/timer.rs
1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskHandle, TaskId};
5use crate::wheel::Wheel;
6use handle::{BatchHandle, BatchHandleWithCompletion, TimerHandle, TimerHandleWithCompletion};
7use parking_lot::Mutex;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::task::JoinHandle;
11
12/// Timing Wheel Timer Manager
13///
14/// 时间轮定时器管理器
15pub struct TimerWheel {
16 /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
17 ///
18 /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
19 wheel: Arc<Mutex<Wheel>>,
20
21 /// Background tick loop task handle
22 ///
23 /// 后台 tick 循环任务句柄
24 tick_handle: Option<JoinHandle<()>>,
25}
26
27impl TimerWheel {
28 /// Create a new timer manager
29 ///
30 /// # Parameters
31 /// - `config`: Timing wheel configuration, already validated
32 /// - `batch_config`: Batch operation configuration
33 ///
34 /// 创建新的定时器管理器
35 ///
36 /// # 参数
37 /// - `config`: 时间轮配置,已验证
38 /// - `batch_config`: 批量操作配置
39 ///
40 /// # Examples (示例)
41 /// ```no_run
42 /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
43 /// use std::time::Duration;
44 ///
45 /// #[tokio::main]
46 /// async fn main() {
47 /// let config = WheelConfig::builder()
48 /// .l0_tick_duration(Duration::from_millis(10))
49 /// .l0_slot_count(512)
50 /// .l1_tick_duration(Duration::from_secs(1))
51 /// .l1_slot_count(64)
52 /// .build()
53 /// .unwrap();
54 /// let timer = TimerWheel::new(config, BatchConfig::default());
55 ///
56 /// // Use two-step API: allocate handle first, then register
57 /// // 使用两步 API:先分配 handle,再注册
58 /// let handle = timer.allocate_handle();
59 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
60 /// let _timer_handle = timer.register(handle, task);
61 /// }
62 /// ```
63 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
64 let tick_duration = config.l0_tick_duration;
65 let wheel = Wheel::new(config, batch_config);
66 let wheel = Arc::new(Mutex::new(wheel));
67 let wheel_clone = wheel.clone();
68
69 // Start background tick loop
70 // 启动后台 tick 循环
71 let tick_handle = tokio::spawn(async move {
72 Self::tick_loop(wheel_clone, tick_duration).await;
73 });
74
75 Self {
76 wheel,
77 tick_handle: Some(tick_handle),
78 }
79 }
80
81 /// Create timer manager with default configuration, hierarchical mode
82 /// - L0 layer tick duration: 10ms, slot count: 512
83 /// - L1 layer tick duration: 1s, slot count: 64
84 ///
85 /// # Returns
86 /// Timer manager instance
87 ///
88 /// 使用默认配置创建定时器管理器,分层模式
89 /// - L0 层 tick 持续时间:10ms,槽数量:512
90 /// - L1 层 tick 持续时间:1s,槽数量:64
91 ///
92 /// # 返回值
93 /// 定时器管理器实例
94 ///
95 /// # Examples (示例)
96 /// ```no_run
97 /// use kestrel_timer::TimerWheel;
98 ///
99 /// #[tokio::main]
100 /// async fn main() {
101 /// let timer = TimerWheel::with_defaults();
102 /// }
103 /// ```
104 pub fn with_defaults() -> Self {
105 Self::new(WheelConfig::default(), BatchConfig::default())
106 }
107
108 /// Create TimerService bound to this timing wheel with default configuration
109 ///
110 /// # Parameters
111 /// - `service_config`: Service configuration
112 ///
113 /// # Returns
114 /// TimerService instance bound to this timing wheel
115 ///
116 /// 创建绑定到此时间轮的 TimerService,使用默认配置
117 ///
118 /// # 参数
119 /// - `service_config`: 服务配置
120 ///
121 /// # 返回值
122 /// 绑定到此时间轮的 TimerService 实例
123 ///
124 /// # Examples (示例)
125 /// ```no_run
126 /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
127 /// use std::time::Duration;
128 ///
129 ///
130 /// #[tokio::main]
131 /// async fn main() {
132 /// let timer = TimerWheel::with_defaults();
133 /// let mut service = timer.create_service(ServiceConfig::default());
134 ///
135 /// // Use two-step API to batch schedule timers through service
136 /// // 使用两步 API 通过服务批量调度定时器
137 /// // Step 1: Allocate handles
138 /// let handles = service.allocate_handles(5);
139 ///
140 /// // Step 2: Create tasks
141 /// let tasks: Vec<_> = (0..5)
142 /// .map(|_| {
143 /// use kestrel_timer::TimerTask;
144 /// TimerTask::new_oneshot(Duration::from_millis(100), Some(CallbackWrapper::new(|| async {})))
145 /// })
146 /// .collect();
147 ///
148 /// // Step 3: Register batch
149 /// service.register_batch(handles, tasks).unwrap();
150 ///
151 /// // Receive timeout notifications
152 /// // 接收超时通知
153 /// let mut rx = service.take_receiver().unwrap();
154 /// while let Some(task_id) = rx.recv().await {
155 /// println!("Task {:?} completed", task_id);
156 /// }
157 /// }
158 /// ```
159 pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
160 crate::service::TimerService::new(self.wheel.clone(), service_config)
161 }
162
163 /// Create TimerService bound to this timing wheel with custom configuration
164 ///
165 /// # Parameters
166 /// - `config`: Service configuration
167 ///
168 /// # Returns
169 /// TimerService instance bound to this timing wheel
170 ///
171 /// 创建绑定到此时间轮的 TimerService,使用自定义配置
172 ///
173 /// # 参数
174 /// - `config`: 服务配置
175 ///
176 /// # 返回值
177 /// 绑定到此时间轮的 TimerService 实例
178 ///
179 /// # Examples (示例)
180 /// ```no_run
181 /// use kestrel_timer::{TimerWheel, config::ServiceConfig, TimerTask};
182 /// use std::num::NonZeroUsize;
183 ///
184 /// #[tokio::main]
185 /// async fn main() {
186 /// let timer = TimerWheel::with_defaults();
187 /// let config = ServiceConfig::builder()
188 /// .command_channel_capacity(NonZeroUsize::new(1024).unwrap())
189 /// .timeout_channel_capacity(NonZeroUsize::new(2000).unwrap())
190 /// .build();
191 /// let service = timer.create_service_with_config(config);
192 /// }
193 /// ```
194 pub fn create_service_with_config(
195 &self,
196 config: ServiceConfig,
197 ) -> crate::service::TimerService {
198 crate::service::TimerService::new(self.wheel.clone(), config)
199 }
200
201 /// Allocate a handle from DeferredMap
202 ///
203 /// # Returns
204 /// A unique handle for later insertion
205 ///
206 /// # 返回值
207 /// 用于后续插入的唯一 handle
208 pub fn allocate_handle(&self) -> TaskHandle {
209 self.wheel.lock().allocate_handle()
210 }
211
212 /// Batch allocate handles from DeferredMap
213 ///
214 /// # Parameters
215 /// - `count`: Number of handles to allocate
216 ///
217 /// # Returns
218 /// Vector of unique handles for later batch insertion
219 ///
220 /// # 参数
221 /// - `count`: 要分配的 handle 数量
222 ///
223 /// # 返回值
224 /// 用于后续批量插入的唯一 handles 向量
225 pub fn allocate_handles(&self, count: usize) -> Vec<TaskHandle> {
226 self.wheel.lock().allocate_handles(count)
227 }
228
229 /// Register timer task to timing wheel (registration phase)
230 ///
231 /// # Parameters
232 /// - `task`: Task created via `create_task()`
233 ///
234 /// # Returns
235 /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
236 ///
237 /// 注册定时器任务到时间轮 (注册阶段)
238 ///
239 /// # 参数
240 /// - `task`: 通过 `create_task()` 创建的任务
241 ///
242 /// # 返回值
243 /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
244 ///
245 /// # Examples (示例)
246 /// ```no_run
247 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
248 ///
249 /// use std::time::Duration;
250 ///
251 /// #[tokio::main]
252 /// async fn main() {
253 /// let timer = TimerWheel::with_defaults();
254 ///
255 /// // Step 1: Allocate handle
256 /// let allocated_handle = timer.allocate_handle();
257 /// let task_id = allocated_handle.task_id();
258 ///
259 /// // Step 2: Create task
260 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
261 /// println!("Timer fired!");
262 /// })));
263 ///
264 /// // Step 3: Register task
265 /// let handle = timer.register(allocated_handle, task);
266 ///
267 /// // Wait for timer completion
268 /// // 等待定时器完成
269 /// use kestrel_timer::CompletionReceiver;
270 /// let (rx, _handle) = handle.into_parts();
271 /// match rx {
272 /// CompletionReceiver::OneShot(receiver) => {
273 /// receiver.wait().await;
274 /// },
275 /// _ => {}
276 /// }
277 /// }
278 /// ```
279 #[inline]
280 pub fn register(
281 &self,
282 handle: TaskHandle,
283 task: crate::task::TimerTask,
284 ) -> TimerHandleWithCompletion {
285 let task_id = handle.task_id();
286
287 let (task, completion_rx) =
288 crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
289
290 // Single lock to complete all operations
291 // 单次加锁完成所有操作
292 let mut wheel_guard = self.wheel.lock();
293 wheel_guard.insert(handle, task);
294
295 TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
296 }
297
298 /// Batch register timer tasks to timing wheel (registration phase)
299 ///
300 /// # Parameters
301 /// - `handles`: Pre-allocated handles for tasks
302 /// - `tasks`: List of timer tasks
303 ///
304 /// # Returns
305 /// - `Ok(BatchHandleWithCompletion)` if all tasks are successfully registered
306 /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
307 ///
308 /// 批量注册定时器任务到时间轮 (注册阶段)
309 ///
310 /// # 参数
311 /// - `handles`: 任务的预分配 handles
312 /// - `tasks`: 定时器任务列表
313 ///
314 /// # 返回值
315 /// - `Ok(BatchHandleWithCompletion)` 如果所有任务成功注册
316 /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
317 ///
318 /// # Examples (示例)
319 /// ```no_run
320 /// use kestrel_timer::{TimerWheel, TimerTask};
321 /// use std::time::Duration;
322 ///
323 /// #[tokio::main]
324 /// async fn main() {
325 /// let timer = TimerWheel::with_defaults();
326 ///
327 /// // Step 1: Allocate handles
328 /// let handles = timer.allocate_handles(3);
329 ///
330 /// // Step 2: Create tasks
331 /// let tasks: Vec<_> = (0..3)
332 /// .map(|_| TimerTask::new_oneshot(Duration::from_secs(1), None))
333 /// .collect();
334 ///
335 /// // Step 3: Batch register
336 /// let batch = timer.register_batch(handles, tasks)
337 /// .expect("register_batch should succeed");
338 /// println!("Registered {} timers", batch.len());
339 /// }
340 /// ```
341 #[inline]
342 pub fn register_batch(
343 &self,
344 handles: Vec<TaskHandle>,
345 tasks: Vec<crate::task::TimerTask>,
346 ) -> Result<BatchHandleWithCompletion, crate::error::TimerError> {
347 // Validate lengths match
348 if handles.len() != tasks.len() {
349 return Err(crate::error::TimerError::BatchLengthMismatch {
350 handles_len: handles.len(),
351 tasks_len: tasks.len(),
352 });
353 }
354
355 let task_count = tasks.len();
356 let mut completion_rxs = Vec::with_capacity(task_count);
357 let mut task_ids = Vec::with_capacity(task_count);
358 let mut prepared_handles = Vec::with_capacity(task_count);
359 let mut prepared_tasks = Vec::with_capacity(task_count);
360
361 // Step 1: Prepare all channels and notifiers
362 for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
363 let task_id = handle.task_id();
364 let (task, completion_rx) =
365 crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
366 task_ids.push(task_id);
367 completion_rxs.push(completion_rx);
368 prepared_handles.push(handle);
369 prepared_tasks.push(task);
370 }
371
372 // Step 2: Single lock, batch insert
373 {
374 let mut wheel_guard = self.wheel.lock();
375 wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
376 }
377
378 Ok(BatchHandleWithCompletion::new(
379 BatchHandle::new(task_ids, self.wheel.clone()),
380 completion_rxs,
381 ))
382 }
383
384 /// Cancel timer
385 ///
386 /// # Parameters
387 /// - `task_id`: Task ID
388 ///
389 /// # Returns
390 /// Returns true if task exists and is successfully cancelled, otherwise false
391 ///
392 /// 取消定时器
393 ///
394 /// # 参数
395 /// - `task_id`: 任务 ID
396 ///
397 /// # 返回值
398 /// 如果任务存在且成功取消则返回 true,否则返回 false
399 ///
400 /// # Examples (示例)
401 /// ```no_run
402 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
403 ///
404 /// use std::time::Duration;
405 ///
406 /// #[tokio::main]
407 /// async fn main() {
408 /// let timer = TimerWheel::with_defaults();
409 ///
410 /// // Step 1: Allocate handle
411 /// let allocated_handle = timer.allocate_handle();
412 /// let task_id = allocated_handle.task_id();
413 ///
414 /// // Step 2: Create and register task
415 /// let task = TimerTask::new_oneshot(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
416 /// println!("Timer fired!");
417 /// })));
418 /// let _handle = timer.register(allocated_handle, task);
419 ///
420 /// // Cancel task using task ID
421 /// // 使用任务 ID 取消任务
422 /// let cancelled = timer.cancel(task_id);
423 /// println!("Canceled successfully: {}", cancelled);
424 /// }
425 /// ```
426 #[inline]
427 pub fn cancel(&self, task_id: TaskId) -> bool {
428 let mut wheel = self.wheel.lock();
429 wheel.cancel(task_id)
430 }
431
432 /// Batch cancel timers
433 ///
434 /// # Parameters
435 /// - `task_ids`: List of task IDs to cancel
436 ///
437 /// # Returns
438 /// Number of successfully cancelled tasks
439 ///
440 /// 批量取消定时器
441 ///
442 /// # 参数
443 /// - `task_ids`: 要取消的任务 ID 列表
444 ///
445 /// # 返回值
446 /// 成功取消的任务数量
447 ///
448 /// # Performance Advantages
449 /// - Batch processing reduces lock contention
450 /// - Internally optimized batch cancellation operation
451 ///
452 /// # Examples (示例)
453 /// ```no_run
454 /// use kestrel_timer::{TimerWheel, TimerTask};
455 /// use std::time::Duration;
456 ///
457 /// #[tokio::main]
458 /// async fn main() {
459 /// let timer = TimerWheel::with_defaults();
460 ///
461 /// // Create multiple timers
462 /// // 创建多个定时器
463 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
464 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
465 /// let task3 = TimerTask::new_oneshot(Duration::from_secs(10), None);
466 ///
467 /// // Allocate handles and get task IDs
468 /// let h1 = timer.allocate_handle();
469 /// let h2 = timer.allocate_handle();
470 /// let h3 = timer.allocate_handle();
471 /// let task_ids = vec![h1.task_id(), h2.task_id(), h3.task_id()];
472 ///
473 /// let _h1 = timer.register(h1, task1);
474 /// let _h2 = timer.register(h2, task2);
475 /// let _h3 = timer.register(h3, task3);
476 ///
477 /// // Batch cancel
478 /// // 批量取消
479 /// let cancelled = timer.cancel_batch(&task_ids);
480 /// println!("Canceled {} timers", cancelled);
481 /// }
482 /// ```
483 #[inline]
484 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
485 let mut wheel = self.wheel.lock();
486 wheel.cancel_batch(task_ids)
487 }
488
489 /// Postpone timer
490 ///
491 /// # Parameters
492 /// - `task_id`: Task ID to postpone
493 /// - `new_delay`: New delay duration, recalculated from current time
494 /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
495 ///
496 /// # Returns
497 /// Returns true if task exists and is successfully postponed, otherwise false
498 ///
499 /// 推迟定时器
500 ///
501 /// # 参数
502 /// - `task_id`: 要推迟的任务 ID
503 /// - `new_delay`: 新的延迟时间,从当前时间重新计算
504 /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
505 ///
506 /// # 返回值
507 /// 如果任务存在且成功推迟则返回 true,否则返回 false
508 ///
509 /// # Note
510 /// - Task ID remains unchanged after postponement
511 /// - Original completion_receiver remains valid
512 ///
513 /// # 注意
514 /// - 任务 ID 在推迟后保持不变
515 /// - 原始 completion_receiver 保持有效
516 ///
517 /// # Examples (示例)
518 ///
519 /// ## Keep original callback (保持原始回调)
520 /// ```no_run
521 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
522 /// use std::time::Duration;
523 ///
524 ///
525 /// #[tokio::main]
526 /// async fn main() {
527 /// let timer = TimerWheel::with_defaults();
528 ///
529 /// // Allocate handle first
530 /// let allocated_handle = timer.allocate_handle();
531 /// let task_id = allocated_handle.task_id();
532 ///
533 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
534 /// println!("Timer fired!");
535 /// })));
536 /// let _handle = timer.register(allocated_handle, task);
537 ///
538 /// // Postpone to 10 seconds after triggering, and keep original callback
539 /// // 推迟到 10 秒后触发,并保持原始回调
540 /// let success = timer.postpone(task_id, Duration::from_secs(10), None);
541 /// println!("Postponed successfully: {}", success);
542 /// }
543 /// ```
544 ///
545 /// ## Replace with new callback (替换为新的回调)
546 /// ```no_run
547 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
548 /// use std::time::Duration;
549 ///
550 /// #[tokio::main]
551 /// async fn main() {
552 /// let timer = TimerWheel::with_defaults();
553 ///
554 /// // Allocate handle first
555 /// let allocated_handle = timer.allocate_handle();
556 /// let task_id = allocated_handle.task_id();
557 ///
558 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
559 /// println!("Original callback!");
560 /// })));
561 /// let _handle = timer.register(allocated_handle, task);
562 ///
563 /// // Postpone to 10 seconds after triggering, and replace with new callback
564 /// // 推迟到 10 秒后触发,并替换为新的回调
565 /// let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
566 /// println!("New callback!");
567 /// })));
568 /// println!("Postponed successfully: {}", success);
569 /// }
570 /// ```
571 #[inline]
572 pub fn postpone(
573 &self,
574 task_id: TaskId,
575 new_delay: Duration,
576 callback: Option<CallbackWrapper>,
577 ) -> bool {
578 let mut wheel = self.wheel.lock();
579 wheel.postpone(task_id, new_delay, callback)
580 }
581
582 /// Batch postpone timers (keep original callbacks)
583 ///
584 /// # Parameters
585 /// - `updates`: List of tuples of (task ID, new delay)
586 ///
587 /// # Returns
588 /// Number of successfully postponed tasks
589 ///
590 /// 批量推迟定时器 (保持原始回调)
591 ///
592 /// # 参数
593 /// - `updates`: (任务 ID, 新延迟) 元组列表
594 ///
595 /// # 返回值
596 /// 成功推迟的任务数量
597 ///
598 /// # Note
599 /// - This method keeps all tasks' original callbacks unchanged
600 /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
601 ///
602 /// # 注意
603 /// - 此方法保持所有任务的原始回调不变
604 /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
605 ///
606 /// # Performance Advantages
607 /// - Batch processing reduces lock contention
608 /// - Internally optimized batch postponement operation
609 ///
610 /// # Examples (示例)
611 /// ```no_run
612 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
613 /// use std::time::Duration;
614 ///
615 /// #[tokio::main]
616 /// async fn main() {
617 /// let timer = TimerWheel::with_defaults();
618 ///
619 /// // Create multiple tasks with callbacks
620 /// // 创建多个带有回调的任务
621 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
622 /// println!("Task 1 fired!");
623 /// })));
624 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
625 /// println!("Task 2 fired!");
626 /// })));
627 /// let task3 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
628 /// println!("Task 3 fired!");
629 /// })));
630 ///
631 /// // Allocate handles and register
632 /// let h1 = timer.allocate_handle();
633 /// let h2 = timer.allocate_handle();
634 /// let h3 = timer.allocate_handle();
635 ///
636 /// let task_ids = vec![
637 /// (h1.task_id(), Duration::from_secs(10)),
638 /// (h2.task_id(), Duration::from_secs(15)),
639 /// (h3.task_id(), Duration::from_secs(20)),
640 /// ];
641 ///
642 /// timer.register(h1, task1);
643 /// timer.register(h2, task2);
644 /// timer.register(h3, task3);
645 ///
646 /// // Batch postpone (keep original callbacks)
647 /// // 批量推迟 (保持原始回调)
648 /// let postponed = timer.postpone_batch(task_ids);
649 /// println!("Postponed {} timers", postponed);
650 /// }
651 /// ```
652 #[inline]
653 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
654 let mut wheel = self.wheel.lock();
655 wheel.postpone_batch(updates)
656 }
657
658 /// Batch postpone timers (replace callbacks)
659 ///
660 /// # Parameters
661 /// - `updates`: List of tuples of (task ID, new delay, new callback)
662 ///
663 /// # Returns
664 /// Number of successfully postponed tasks
665 ///
666 /// 批量推迟定时器 (替换回调)
667 ///
668 /// # 参数
669 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
670 ///
671 /// # 返回值
672 /// 成功推迟的任务数量
673 ///
674 /// # Performance Advantages
675 /// - Batch processing reduces lock contention
676 /// - Internally optimized batch postponement operation
677 ///
678 /// # Examples (示例)
679 /// ```no_run
680 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
681 /// use std::time::Duration;
682 /// use std::sync::Arc;
683 /// use std::sync::atomic::{AtomicU32, Ordering};
684 ///
685 /// #[tokio::main]
686 /// async fn main() {
687 /// let timer = TimerWheel::with_defaults();
688 /// let counter = Arc::new(AtomicU32::new(0));
689 ///
690 /// // Create multiple timers
691 /// // 创建多个定时器
692 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(5), None);
693 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(5), None);
694 ///
695 /// // Allocate handles first
696 /// let h1 = timer.allocate_handle();
697 /// let h2 = timer.allocate_handle();
698 /// let id1 = h1.task_id();
699 /// let id2 = h2.task_id();
700 ///
701 /// timer.register(h1, task1);
702 /// timer.register(h2, task2);
703 ///
704 /// // Batch postpone and replace callbacks
705 /// // 批量推迟并替换回调
706 /// let updates: Vec<_> = vec![id1, id2]
707 /// .into_iter()
708 /// .map(|id| {
709 /// let counter = Arc::clone(&counter);
710 /// (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
711 /// let counter = Arc::clone(&counter);
712 /// async move { counter.fetch_add(1, Ordering::SeqCst); }
713 /// })))
714 /// })
715 /// .collect();
716 /// let postponed = timer.postpone_batch_with_callbacks(updates);
717 /// println!("Postponed {} timers", postponed);
718 /// }
719 /// ```
720 #[inline]
721 pub fn postpone_batch_with_callbacks(
722 &self,
723 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
724 ) -> usize {
725 let mut wheel = self.wheel.lock();
726 wheel.postpone_batch_with_callbacks(updates.to_vec())
727 }
728
729 /// Core tick loop
730 ///
731 /// Background task that advances the timing wheel at regular intervals
732 ///
733 /// # Parameters
734 /// - `wheel`: Shared timing wheel instance
735 /// - `tick_duration`: Duration between ticks
736 ///
737 /// 核心 tick 循环
738 ///
739 /// 定期推进时间轮的后台任务
740 ///
741 /// # 参数
742 /// - `wheel`: 共享的时间轮实例
743 /// - `tick_duration`: tick 之间的持续时间
744 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
745 let mut interval = tokio::time::interval(tick_duration);
746 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
747
748 loop {
749 interval.tick().await;
750
751 // Advance timing wheel and get expired tasks
752 // Note: wheel.advance() already handles completion notifications
753 let expired_tasks = {
754 let mut wheel_guard = wheel.lock();
755 wheel_guard.advance()
756 };
757
758 // Execute callbacks for expired tasks
759 // Notifications have already been sent by wheel.advance()
760 for task in expired_tasks {
761 if let Some(callback) = task.callback {
762 // Spawn callback execution in a separate tokio task
763 tokio::spawn(async move {
764 let future = callback.call();
765 future.await;
766 });
767 }
768 }
769 }
770 }
771
772 /// Graceful shutdown of TimerWheel
773 ///
774 /// 优雅关闭 TimerWheel
775 ///
776 /// # Examples (示例)
777 /// ```no_run
778 /// # use kestrel_timer::TimerWheel;
779 /// # #[tokio::main]
780 /// # async fn main() {
781 /// let timer = TimerWheel::with_defaults();
782 ///
783 /// // Use timer... (使用定时器...)
784 ///
785 /// timer.shutdown().await;
786 /// # }
787 /// ```
788 pub async fn shutdown(mut self) {
789 if let Some(handle) = self.tick_handle.take() {
790 handle.abort();
791 let _ = handle.await;
792 }
793 }
794}
795
796/// Automatically abort the background tick task when TimerWheel is dropped
797///
798/// 当 TimerWheel 被销毁时自动中止后台 tick 任务
799impl Drop for TimerWheel {
800 fn drop(&mut self) {
801 if let Some(handle) = self.tick_handle.take() {
802 handle.abort();
803 }
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use super::*;
810 use crate::task::TimerTask;
811 use std::sync::atomic::{AtomicU32, Ordering};
812
813 #[tokio::test]
814 async fn test_timer_creation() {
815 let _timer = TimerWheel::with_defaults();
816 }
817
818 #[tokio::test]
819 async fn test_schedule_once() {
820 use std::sync::Arc;
821 let timer = TimerWheel::with_defaults();
822 let counter = Arc::new(AtomicU32::new(0));
823 let counter_clone = Arc::clone(&counter);
824
825 let task = TimerTask::new_oneshot(
826 Duration::from_millis(50),
827 Some(CallbackWrapper::new(move || {
828 let counter = Arc::clone(&counter_clone);
829 async move {
830 counter.fetch_add(1, Ordering::SeqCst);
831 }
832 })),
833 );
834 let allocate_handle = timer.allocate_handle();
835 let _handle = timer.register(allocate_handle, task);
836
837 // Wait for timer to trigger
838 // 等待定时器触发
839 tokio::time::sleep(Duration::from_millis(100)).await;
840 assert_eq!(counter.load(Ordering::SeqCst), 1);
841 }
842}