kestrel_timer/timer.rs
1use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
2use crate::task::{CallbackWrapper, TaskId, TaskCompletionReason};
3use crate::wheel::Wheel;
4use parking_lot::Mutex;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::oneshot;
8use tokio::task::JoinHandle;
9
10/// Completion receiver for receiving timer completion notifications
11///
12/// 完成通知接收器,用于接收定时器完成通知
13pub struct CompletionReceiver(pub oneshot::Receiver<TaskCompletionReason>);
14
15/// Timer handle for managing timer lifecycle
16///
17/// Note: This type does not implement Clone to prevent duplicate cancellation of the same timer. Each timer should have only one owner.
18///
19/// 定时器句柄,用于管理定时器生命周期
20///
21/// 注意:此类型未实现 Clone 以防止重复取消同一定时器。每个定时器应该只有一个所有者。
22pub struct TimerHandle {
23 pub(crate) task_id: TaskId,
24 pub(crate) wheel: Arc<Mutex<Wheel>>,
25 pub(crate) completion_rx: CompletionReceiver,
26}
27
28impl TimerHandle {
29 pub(crate) fn new(task_id: TaskId, wheel: Arc<Mutex<Wheel>>, completion_rx: oneshot::Receiver<TaskCompletionReason>) -> Self {
30 Self { task_id, wheel, completion_rx: CompletionReceiver(completion_rx) }
31 }
32
33 /// Cancel the timer
34 ///
35 /// # Returns
36 /// Returns true if task exists and is successfully cancelled, otherwise false
37 ///
38 /// 取消定时器
39 ///
40 /// # 返回值
41 /// 如果任务存在且成功取消则返回 true,否则返回 false
42 ///
43 /// # Examples (示例)
44 /// ```no_run
45 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
46 /// # use std::time::Duration;
47 /// #
48 /// # #[tokio::main]
49 /// # async fn main() {
50 /// let timer = TimerWheel::with_defaults();
51 /// let callback = Some(CallbackWrapper::new(|| async {}));
52 /// let task = TimerWheel::create_task(Duration::from_secs(1), callback);
53 /// let handle = timer.register(task);
54 ///
55 /// // Cancel the timer
56 /// let success = handle.cancel();
57 /// println!("Canceled successfully: {}", success);
58 /// # }
59 /// ```
60 pub fn cancel(&self) -> bool {
61 let mut wheel = self.wheel.lock();
62 wheel.cancel(self.task_id)
63 }
64
65 /// Get mutable reference to completion receiver
66 ///
67 /// 获取完成通知接收器的可变引用
68 ///
69 /// # Examples (示例)
70 /// ```no_run
71 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
72 /// # use std::time::Duration;
73 /// #
74 /// # #[tokio::main]
75 /// # async fn main() {
76 /// let timer = TimerWheel::with_defaults();
77 /// let callback = Some(CallbackWrapper::new(|| async {
78 /// println!("Timer fired!");
79 /// }));
80 /// let task = TimerWheel::create_task(Duration::from_secs(1), callback);
81 /// let handle = timer.register(task);
82 ///
83 /// // Wait for timer completion (consume handle using into_completion_receiver)
84 /// // 等待定时器完成(使用 into_completion_receiver 消费句柄)
85 /// handle.into_completion_receiver().0.await.ok();
86 /// println!("Timer completed!");
87 /// # }
88 /// ```
89 pub fn completion_receiver(&mut self) -> &mut CompletionReceiver {
90 &mut self.completion_rx
91 }
92
93 /// Consume handle and return completion receiver
94 ///
95 /// 消费句柄并返回完成通知接收器
96 ///
97 /// # Examples (示例)
98 /// ```no_run
99 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
100 /// # use std::time::Duration;
101 /// #
102 /// # #[tokio::main]
103 /// # async fn main() {
104 /// let timer = TimerWheel::with_defaults();
105 /// let callback = Some(CallbackWrapper::new(|| async {
106 /// println!("Timer fired!");
107 /// }));
108 /// let task = TimerWheel::create_task(Duration::from_secs(1), callback);
109 /// let handle = timer.register(task);
110 ///
111 /// // Wait for timer completion
112 /// handle.into_completion_receiver().0.await.ok();
113 /// println!("Timer completed!");
114 /// # }
115 /// ```
116 pub fn into_completion_receiver(self) -> CompletionReceiver {
117 self.completion_rx
118 }
119}
120
121/// Batch timer handle for managing batch-scheduled timers
122///
123/// Note: This type does not implement Clone to prevent duplicate cancellation of the same batch of timers. Use `into_iter()` or `into_handles()` to access individual timer handles.
124///
125/// 批量定时器句柄,用于管理批量调度的定时器
126///
127/// 注意:此类型未实现 Clone 以防止重复取消同一批定时器。使用 `into_iter()` 或 `into_handles()` 访问单个定时器句柄。
128pub struct BatchHandle {
129 pub(crate) task_ids: Vec<TaskId>,
130 pub(crate) wheel: Arc<Mutex<Wheel>>,
131 pub(crate) completion_rxs: Vec<oneshot::Receiver<TaskCompletionReason>>,
132}
133
134impl BatchHandle {
135 pub(crate) fn new(task_ids: Vec<TaskId>, wheel: Arc<Mutex<Wheel>>, completion_rxs: Vec<oneshot::Receiver<TaskCompletionReason>>) -> Self {
136 Self { task_ids, wheel, completion_rxs }
137 }
138
139 /// Cancel all timers in batch
140 ///
141 /// # Returns
142 /// Number of successfully cancelled tasks
143 ///
144 /// 批量取消所有定时器
145 ///
146 /// # 返回值
147 /// 成功取消的任务数量
148 ///
149 /// # Examples (示例)
150 /// ```no_run
151 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
152 /// # use std::time::Duration;
153 /// #
154 /// # #[tokio::main]
155 /// # async fn main() {
156 /// let timer = TimerWheel::with_defaults();
157 /// let delays: Vec<Duration> = (0..10)
158 /// .map(|_| Duration::from_secs(1))
159 /// .collect();
160 /// let tasks = TimerWheel::create_batch(delays);
161 /// let batch = timer.register_batch(tasks);
162 ///
163 /// let cancelled = batch.cancel_all();
164 /// println!("Canceled {} timers", cancelled);
165 /// # }
166 /// ```
167 pub fn cancel_all(self) -> usize {
168 let mut wheel = self.wheel.lock();
169 wheel.cancel_batch(&self.task_ids)
170 }
171
172 /// Convert batch handle to Vec of individual timer handles
173 ///
174 /// Consumes BatchHandle and creates independent TimerHandle for each task
175 ///
176 /// 将批量句柄转换为单个定时器句柄的 Vec
177 ///
178 /// 消费 BatchHandle 并为每个任务创建独立的 TimerHandle
179 ///
180 /// # Examples (示例)
181 /// ```no_run
182 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
183 /// # use std::time::Duration;
184 /// #
185 /// # #[tokio::main]
186 /// # async fn main() {
187 /// let timer = TimerWheel::with_defaults();
188 /// let delays: Vec<Duration> = (0..3)
189 /// .map(|_| Duration::from_secs(1))
190 /// .collect();
191 /// let tasks = TimerWheel::create_batch(delays);
192 /// let batch = timer.register_batch(tasks);
193 ///
194 /// // Convert to individual handles
195 /// // 转换为单个句柄
196 /// let handles = batch.into_handles();
197 /// for handle in handles {
198 /// // Can operate each handle individually
199 /// // 可以单独操作每个句柄
200 /// }
201 /// # }
202 /// ```
203 pub fn into_handles(self) -> Vec<TimerHandle> {
204 self.task_ids
205 .into_iter()
206 .zip(self.completion_rxs.into_iter())
207 .map(|(task_id, rx)| {
208 TimerHandle::new(task_id, self.wheel.clone(), rx)
209 })
210 .collect()
211 }
212
213 /// Get the number of batch tasks
214 ///
215 /// 获取批量任务数量
216 pub fn len(&self) -> usize {
217 self.task_ids.len()
218 }
219
220 /// Check if batch tasks are empty
221 ///
222 /// 检查批量任务是否为空
223 pub fn is_empty(&self) -> bool {
224 self.task_ids.is_empty()
225 }
226
227 /// Get reference to all task IDs
228 ///
229 /// 获取所有任务 ID 的引用
230 pub fn task_ids(&self) -> &[TaskId] {
231 &self.task_ids
232 }
233
234 /// Get reference to all completion receivers
235 ///
236 /// # Returns
237 /// Reference to list of completion receivers for all tasks
238 ///
239 /// 获取所有完成通知接收器的引用
240 ///
241 /// # 返回值
242 /// 所有任务完成通知接收器列表的引用
243 pub fn completion_receivers(&mut self) -> &mut Vec<oneshot::Receiver<TaskCompletionReason>> {
244 &mut self.completion_rxs
245 }
246
247 /// Consume handle and return all completion receivers
248 ///
249 /// # Returns
250 /// List of completion receivers for all tasks
251 ///
252 /// 消费句柄并返回所有完成通知接收器
253 ///
254 /// # 返回值
255 /// 所有任务的完成通知接收器列表
256 ///
257 /// # Examples (示例)
258 /// ```no_run
259 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
260 /// # use std::time::Duration;
261 /// #
262 /// # #[tokio::main]
263 /// # async fn main() {
264 /// let timer = TimerWheel::with_defaults();
265 /// let delays: Vec<Duration> = (0..3)
266 /// .map(|_| Duration::from_secs(1))
267 /// .collect();
268 /// let tasks = TimerWheel::create_batch(delays);
269 /// let batch = timer.register_batch(tasks);
270 ///
271 /// // Get all completion receivers
272 /// // 获取所有完成通知接收器
273 /// let receivers = batch.into_completion_receivers();
274 /// for rx in receivers {
275 /// tokio::spawn(async move {
276 /// if rx.await.is_ok() {
277 /// println!("A timer completed!");
278 /// }
279 /// });
280 /// }
281 /// # }
282 /// ```
283 pub fn into_completion_receivers(self) -> Vec<oneshot::Receiver<TaskCompletionReason>> {
284 self.completion_rxs
285 }
286}
287
288/// Implement IntoIterator to allow direct iteration over BatchHandle
289///
290/// 实现 IntoIterator 以允许直接迭代 BatchHandle
291///
292/// # Examples (示例)
293/// ```no_run
294/// # use kestrel_timer::{TimerWheel, CallbackWrapper};
295/// # use std::time::Duration;
296/// #
297/// # #[tokio::main]
298/// # async fn main() {
299/// let timer = TimerWheel::with_defaults();
300/// let delays: Vec<Duration> = (0..3)
301/// .map(|_| Duration::from_secs(1))
302/// .collect();
303/// let tasks = TimerWheel::create_batch(delays);
304/// let batch = timer.register_batch(tasks);
305///
306/// // Iterate directly, each element is an independent TimerHandle
307/// // 直接迭代,每个元素是一个独立的 TimerHandle
308/// for handle in batch {
309/// // Can operate each handle individually
310/// // 可以单独操作每个句柄
311/// }
312/// # }
313/// ```
314impl IntoIterator for BatchHandle {
315 type Item = TimerHandle;
316 type IntoIter = BatchHandleIter;
317
318 fn into_iter(self) -> Self::IntoIter {
319 BatchHandleIter {
320 task_ids: self.task_ids.into_iter(),
321 completion_rxs: self.completion_rxs.into_iter(),
322 wheel: self.wheel,
323 }
324 }
325}
326
327/// Iterator for BatchHandle
328///
329/// BatchHandle 的迭代器
330pub struct BatchHandleIter {
331 task_ids: std::vec::IntoIter<TaskId>,
332 completion_rxs: std::vec::IntoIter<oneshot::Receiver<TaskCompletionReason>>,
333 wheel: Arc<Mutex<Wheel>>,
334}
335
336impl Iterator for BatchHandleIter {
337 type Item = TimerHandle;
338
339 fn next(&mut self) -> Option<Self::Item> {
340 match (self.task_ids.next(), self.completion_rxs.next()) {
341 (Some(task_id), Some(rx)) => {
342 Some(TimerHandle::new(task_id, self.wheel.clone(), rx))
343 }
344 _ => None,
345 }
346 }
347
348 fn size_hint(&self) -> (usize, Option<usize>) {
349 self.task_ids.size_hint()
350 }
351}
352
353impl ExactSizeIterator for BatchHandleIter {
354 fn len(&self) -> usize {
355 self.task_ids.len()
356 }
357}
358
359/// Timing Wheel Timer Manager
360///
361/// 时间轮定时器管理器
362pub struct TimerWheel {
363 /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
364 ///
365 /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
366 wheel: Arc<Mutex<Wheel>>,
367
368 /// Background tick loop task handle
369 ///
370 /// 后台 tick 循环任务句柄
371 tick_handle: Option<JoinHandle<()>>,
372}
373
374impl TimerWheel {
375 /// Create a new timer manager
376 ///
377 /// # Parameters
378 /// - `config`: Timing wheel configuration, already validated
379 ///
380 /// 创建新的定时器管理器
381 ///
382 /// # 参数
383 /// - `config`: 时间轮配置,已验证
384 ///
385 /// # Examples (示例)
386 /// ```no_run
387 /// use kestrel_timer::{TimerWheel, WheelConfig, TimerTask, BatchConfig};
388 /// use std::time::Duration;
389 ///
390 /// #[tokio::main]
391 /// async fn main() {
392 /// let config = WheelConfig::builder()
393 /// .l0_tick_duration(Duration::from_millis(10))
394 /// .l0_slot_count(512)
395 /// .l1_tick_duration(Duration::from_secs(1))
396 /// .l1_slot_count(64)
397 /// .build()
398 /// .unwrap();
399 /// let timer = TimerWheel::new(config, BatchConfig::default());
400 ///
401 /// // Use two-step API
402 /// // (Use two-step API)
403 /// let task = TimerWheel::create_task(Duration::from_secs(1), None);
404 /// let handle = timer.register(task);
405 /// }
406 /// ```
407 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
408 let tick_duration = config.l0_tick_duration;
409 let wheel = Wheel::new(config, batch_config);
410 let wheel = Arc::new(Mutex::new(wheel));
411 let wheel_clone = wheel.clone();
412
413 // Start background tick loop
414 // 启动后台 tick 循环
415 let tick_handle = tokio::spawn(async move {
416 Self::tick_loop(wheel_clone, tick_duration).await;
417 });
418
419 Self {
420 wheel,
421 tick_handle: Some(tick_handle),
422 }
423 }
424
425 /// Create timer manager with default configuration, hierarchical mode
426 /// - L0 layer tick duration: 10ms, slot count: 512
427 /// - L1 layer tick duration: 1s, slot count: 64
428 ///
429 /// # Parameters
430 /// - `config`: Timing wheel configuration, already validated
431 ///
432 /// # Returns
433 /// Timer manager instance
434 ///
435 /// 使用默认配置创建定时器管理器,分层模式
436 /// - L0 层 tick 持续时间:10ms,槽数量:512
437 /// - L1 层 tick 持续时间:1s,槽数量:64
438 ///
439 /// # 参数
440 /// - `config`: 时间轮配置,已验证
441 ///
442 /// # 返回值
443 /// 定时器管理器实例
444 ///
445 /// # Examples (示例)
446 /// ```no_run
447 /// use kestrel_timer::TimerWheel;
448 ///
449 /// #[tokio::main]
450 /// async fn main() {
451 /// let timer = TimerWheel::with_defaults();
452 /// }
453 /// ```
454 pub fn with_defaults() -> Self {
455 Self::new(WheelConfig::default(), BatchConfig::default())
456 }
457
458 /// Create TimerService bound to this timing wheel with default configuration
459 ///
460 /// # Parameters
461 /// - `service_config`: Service configuration
462 ///
463 /// # Returns
464 /// TimerService instance bound to this timing wheel
465 ///
466 /// 创建绑定到此时间轮的 TimerService,使用默认配置
467 ///
468 /// # 参数
469 /// - `service_config`: 服务配置
470 ///
471 /// # 返回值
472 /// 绑定到此时间轮的 TimerService 实例
473 ///
474 /// # Examples (示例)
475 /// ```no_run
476 /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, ServiceConfig};
477 /// use std::time::Duration;
478 ///
479 ///
480 /// #[tokio::main]
481 /// async fn main() {
482 /// let timer = TimerWheel::with_defaults();
483 /// let mut service = timer.create_service(ServiceConfig::default());
484 ///
485 /// // Use two-step API to batch schedule timers through service
486 /// // 使用两步 API 通过服务批量调度定时器
487 /// let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
488 /// .map(|_| (Duration::from_millis(100), Some(CallbackWrapper::new(|| async {}))))
489 /// .collect();
490 /// let tasks = TimerService::create_batch_with_callbacks(callbacks);
491 /// service.register_batch(tasks).unwrap();
492 ///
493 /// // Receive timeout notifications
494 /// // 接收超时通知
495 /// let mut rx = service.take_receiver().unwrap();
496 /// while let Some(task_id) = rx.recv().await {
497 /// println!("Task {:?} completed", task_id);
498 /// }
499 /// }
500 /// ```
501 pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
502 crate::service::TimerService::new(self.wheel.clone(), service_config)
503 }
504
505 /// Create TimerService bound to this timing wheel with custom configuration
506 ///
507 /// # Parameters
508 /// - `config`: Service configuration
509 ///
510 /// # Returns
511 /// TimerService instance bound to this timing wheel
512 ///
513 /// 创建绑定到此时间轮的 TimerService,使用自定义配置
514 ///
515 /// # 参数
516 /// - `config`: 服务配置
517 ///
518 /// # 返回值
519 /// 绑定到此时间轮的 TimerService 实例
520 ///
521 /// # Examples (示例)
522 /// ```no_run
523 /// use kestrel_timer::{TimerWheel, ServiceConfig};
524 ///
525 /// #[tokio::main]
526 /// async fn main() {
527 /// let timer = TimerWheel::with_defaults();
528 /// let config = ServiceConfig::builder()
529 /// .command_channel_capacity(1024)
530 /// .timeout_channel_capacity(2000)
531 /// .build()
532 /// .unwrap();
533 /// let service = timer.create_service_with_config(config);
534 /// }
535 /// ```
536 pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
537 crate::service::TimerService::new(self.wheel.clone(), config)
538 }
539
540 /// Create timer task (static method, apply stage)
541 ///
542 /// # Parameters
543 /// - `delay`: Delay duration
544 /// - `callback`: Callback object implementing TimerCallback trait
545 ///
546 /// # Returns
547 /// Return TimerTask, needs to be registered through `register()`
548 ///
549 /// 创建定时器任务 (静态方法,应用阶段)
550 ///
551 /// # 参数
552 /// - `delay`: 延迟时间
553 /// - `callback`: 回调对象,实现 TimerCallback 特质
554 ///
555 /// # 返回值
556 /// 返回 TimerTask,需要通过 `register()` 注册
557 ///
558 /// # Examples (示例)
559 /// ```no_run
560 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
561 /// use std::time::Duration;
562 ///
563 ///
564 /// #[tokio::main]
565 /// async fn main() {
566 /// let timer = TimerWheel::with_defaults();
567 ///
568 /// // Step 1: Create task
569 /// // 创建任务
570 /// let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
571 /// println!("Timer fired!");
572 /// })));
573 ///
574 /// // Get task ID (获取任务 ID)
575 /// let task_id = task.get_id();
576 /// println!("Created task: {:?}", task_id);
577 ///
578 /// // Step 2: Register task
579 /// // 注册任务
580 /// let handle = timer.register(task);
581 /// }
582 /// ```
583 #[inline]
584 pub fn create_task(delay: Duration, callback: Option<CallbackWrapper>) -> crate::task::TimerTask {
585 crate::task::TimerTask::new(delay, callback)
586 }
587
588 /// Create batch of timer tasks (static method, apply stage, no callbacks)
589 ///
590 /// # Parameters
591 /// - `delays`: List of delay times
592 ///
593 /// # Returns
594 /// Return TimerTask list, needs to be registered through `register_batch()`
595 ///
596 /// 创建定时器任务 (静态方法,应用阶段,没有回调)
597 ///
598 /// # 参数
599 /// - `delays`: 延迟时间列表
600 ///
601 /// # 返回值
602 /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册
603 ///
604 /// # Examples (示例)
605 /// ```no_run
606 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
607 /// use std::time::Duration;
608 /// use std::sync::Arc;
609 /// use std::sync::atomic::{AtomicU32, Ordering};
610 ///
611 /// #[tokio::main]
612 /// async fn main() {
613 /// let timer = TimerWheel::with_defaults();
614 /// let counter = Arc::new(AtomicU32::new(0));
615 ///
616 /// // Step 1: Create batch of tasks
617 /// // 创建批量任务
618 /// let delays: Vec<Duration> = (0..3)
619 /// .map(|_| Duration::from_millis(100))
620 /// .collect();
621 ///
622 /// // Create batch of tasks
623 /// let tasks = TimerWheel::create_batch(delays);
624 /// println!("Created {} tasks", tasks.len());
625 ///
626 /// // Step 2: Register batch of tasks
627 /// // 注册批量任务
628 /// let batch = timer.register_batch(tasks);
629 /// }
630 /// ```
631 #[inline]
632 pub fn create_batch(delays: Vec<Duration>) -> Vec<crate::task::TimerTask>
633 {
634 delays
635 .into_iter()
636 .map(|delay| crate::task::TimerTask::new(delay, None))
637 .collect()
638 }
639
640 /// Create batch of timer tasks (static method, apply stage, with callbacks)
641 ///
642 /// # Parameters
643 /// - `callbacks`: List of tuples of (delay time, callback)
644 ///
645 /// # Returns
646 /// Return TimerTask list, needs to be registered through `register_batch()`
647 ///
648 /// 创建定时器任务 (静态方法,应用阶段,有回调)
649 ///
650 /// # 参数
651 /// - `callbacks`: (延迟时间, 回调) 元组列表
652 ///
653 /// # 返回值
654 /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册
655 ///
656 /// # Examples (示例)
657 /// ```no_run
658 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
659 /// use std::time::Duration;
660 /// use std::sync::Arc;
661 /// use std::sync::atomic::{AtomicU32, Ordering};
662 ///
663 /// #[tokio::main]
664 /// async fn main() {
665 /// let timer = TimerWheel::with_defaults();
666 /// let counter = Arc::new(AtomicU32::new(0));
667 ///
668 /// // Step 1: Create batch of tasks
669 /// // 创建批量任务
670 /// let delays: Vec<Duration> = (0..3)
671 /// .map(|_| Duration::from_millis(100))
672 /// .collect();
673 /// let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = delays
674 /// .into_iter()
675 /// .map(|delay| {
676 /// let counter = Arc::clone(&counter);
677 /// let callback = Some(CallbackWrapper::new(move || {
678 /// let counter = Arc::clone(&counter);
679 /// async move {
680 /// counter.fetch_add(1, Ordering::SeqCst);
681 /// }
682 /// }));
683 /// (delay, callback)
684 /// })
685 /// .collect();
686 ///
687 /// // Create batch of tasks
688 /// let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
689 /// println!("Created {} tasks", tasks.len());
690 ///
691 /// // Step 2: Register batch of tasks
692 /// // 注册批量任务
693 /// let batch = timer.register_batch(tasks);
694 /// }
695 /// ```
696 #[inline]
697 pub fn create_batch_with_callbacks(callbacks: Vec<(Duration, Option<CallbackWrapper>)>) -> Vec<crate::task::TimerTask>
698 {
699 callbacks
700 .into_iter()
701 .map(|(delay, callback)| crate::task::TimerTask::new(delay, callback))
702 .collect()
703 }
704
705 /// Register timer task to timing wheel (registration phase)
706 ///
707 /// # Parameters
708 /// - `task`: Task created via `create_task()`
709 ///
710 /// # Returns
711 /// Return timer handle that can be used to cancel timer and receive completion notifications
712 ///
713 /// 注册定时器任务到时间轮 (注册阶段)
714 ///
715 /// # 参数
716 /// - `task`: 通过 `create_task()` 创建的任务
717 ///
718 /// # 返回值
719 /// 返回定时器句柄,可用于取消定时器和接收完成通知
720 ///
721 /// # Examples (示例)
722 /// ```no_run
723 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
724 ///
725 /// use std::time::Duration;
726 ///
727 /// #[tokio::main]
728 /// async fn main() {
729 /// let timer = TimerWheel::with_defaults();
730 ///
731 /// let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
732 /// println!("Timer fired!");
733 /// })));
734 /// let task_id = task.get_id();
735 ///
736 /// // Register task
737 /// // 注册任务
738 /// let handle = timer.register(task);
739 ///
740 /// // Wait for timer completion
741 /// // 等待定时器完成
742 /// handle.into_completion_receiver().0.await.ok();
743 /// }
744 /// ```
745 #[inline]
746 pub fn register(&self, task: crate::task::TimerTask) -> TimerHandle {
747 let (completion_tx, completion_rx) = oneshot::channel();
748 let notifier = crate::task::CompletionNotifier(completion_tx);
749
750 let task_id = task.id;
751
752 // 单次加锁完成所有操作
753 {
754 let mut wheel_guard = self.wheel.lock();
755 wheel_guard.insert(task, notifier);
756 }
757
758 TimerHandle::new(task_id, self.wheel.clone(), completion_rx)
759 }
760
761 /// Batch register timer tasks to timing wheel (registration phase)
762 ///
763 /// # Parameters
764 /// - `tasks`: List of tasks created via `create_batch()`
765 ///
766 /// # Returns
767 /// Return batch timer handle
768 ///
769 /// 批量注册定时器任务到时间轮 (注册阶段)
770 ///
771 /// # 参数
772 /// - `tasks`: 通过 `create_batch()` 创建的任务列表
773 ///
774 /// # 返回值
775 /// 返回批量定时器句柄
776 ///
777 /// # Examples (示例)
778 /// ```no_run
779 /// use kestrel_timer::{TimerWheel, TimerTask};
780 /// use std::time::Duration;
781 ///
782 /// #[tokio::main]
783 /// async fn main() {
784 /// let timer = TimerWheel::with_defaults();
785 ///
786 /// let delays: Vec<Duration> = (0..3)
787 /// .map(|_| Duration::from_secs(1))
788 /// .collect();
789 /// let tasks = TimerWheel::create_batch(delays);
790 ///
791 /// let batch = timer.register_batch(tasks);
792 /// println!("Registered {} timers", batch.len());
793 /// }
794 /// ```
795 #[inline]
796 pub fn register_batch(&self, tasks: Vec<crate::task::TimerTask>) -> BatchHandle {
797 let task_count = tasks.len();
798 let mut completion_rxs = Vec::with_capacity(task_count);
799 let mut task_ids = Vec::with_capacity(task_count);
800 let mut prepared_tasks = Vec::with_capacity(task_count);
801
802 // Step 1: Prepare all channels and notifiers
803 for task in tasks {
804 let (completion_tx, completion_rx) = oneshot::channel();
805 let notifier = crate::task::CompletionNotifier(completion_tx);
806
807 task_ids.push(task.id);
808 completion_rxs.push(completion_rx);
809 prepared_tasks.push((task, notifier));
810 }
811
812 // Step 2: Single lock, batch insert
813 {
814 let mut wheel_guard = self.wheel.lock();
815 wheel_guard.insert_batch(prepared_tasks);
816 }
817
818 BatchHandle::new(task_ids, self.wheel.clone(), completion_rxs)
819 }
820
821 /// Cancel timer
822 ///
823 /// # Parameters
824 /// - `task_id`: Task ID
825 ///
826 /// # Returns
827 /// Returns true if task exists and is successfully cancelled, otherwise false
828 ///
829 /// 取消定时器
830 ///
831 /// # 参数
832 /// - `task_id`: 任务 ID
833 ///
834 /// # 返回值
835 /// 如果任务存在且成功取消则返回 true,否则返回 false
836 ///
837 /// # Examples (示例)
838 /// ```no_run
839 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
840 ///
841 /// use std::time::Duration;
842 ///
843 /// #[tokio::main]
844 /// async fn main() {
845 /// let timer = TimerWheel::with_defaults();
846 ///
847 /// let task = TimerWheel::create_task(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
848 /// println!("Timer fired!");
849 /// })));
850 /// let task_id = task.get_id();
851 /// let _handle = timer.register(task);
852 ///
853 /// // Cancel task using task ID
854 /// // 使用任务 ID 取消任务
855 /// let cancelled = timer.cancel(task_id);
856 /// println!("Canceled successfully: {}", cancelled);
857 /// }
858 /// ```
859 #[inline]
860 pub fn cancel(&self, task_id: TaskId) -> bool {
861 let mut wheel = self.wheel.lock();
862 wheel.cancel(task_id)
863 }
864
865 /// Batch cancel timers
866 ///
867 /// # Parameters
868 /// - `task_ids`: List of task IDs to cancel
869 ///
870 /// # Returns
871 /// Number of successfully cancelled tasks
872 ///
873 /// 批量取消定时器
874 ///
875 /// # 参数
876 /// - `task_ids`: 要取消的任务 ID 列表
877 ///
878 /// # 返回值
879 /// 成功取消的任务数量
880 ///
881 /// # Performance Advantages
882 /// - Batch processing reduces lock contention
883 /// - Internally optimized batch cancellation operation
884 ///
885 /// # Examples (示例)
886 /// ```no_run
887 /// use kestrel_timer::{TimerWheel, TimerTask};
888 /// use std::time::Duration;
889 ///
890 /// #[tokio::main]
891 /// async fn main() {
892 /// let timer = TimerWheel::with_defaults();
893 ///
894 /// // Create multiple timers
895 /// // 创建多个定时器
896 /// let task1 = TimerWheel::create_task(Duration::from_secs(10), None);
897 /// let task2 = TimerWheel::create_task(Duration::from_secs(10), None);
898 /// let task3 = TimerWheel::create_task(Duration::from_secs(10), None);
899 ///
900 /// let task_ids = vec![task1.get_id(), task2.get_id(), task3.get_id()];
901 ///
902 /// let _h1 = timer.register(task1);
903 /// let _h2 = timer.register(task2);
904 /// let _h3 = timer.register(task3);
905 ///
906 /// // Batch cancel
907 /// // 批量取消
908 /// let cancelled = timer.cancel_batch(&task_ids);
909 /// println!("Canceled {} timers", cancelled);
910 /// }
911 /// ```
912 #[inline]
913 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
914 let mut wheel = self.wheel.lock();
915 wheel.cancel_batch(task_ids)
916 }
917
918 /// Postpone timer
919 ///
920 /// # Parameters
921 /// - `task_id`: Task ID to postpone
922 /// - `new_delay`: New delay duration, recalculated from current time
923 /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
924 ///
925 /// # Returns
926 /// Returns true if task exists and is successfully postponed, otherwise false
927 ///
928 /// 推迟定时器
929 ///
930 /// # 参数
931 /// - `task_id`: 要推迟的任务 ID
932 /// - `new_delay`: 新的延迟时间,从当前时间重新计算
933 /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
934 ///
935 /// # 返回值
936 /// 如果任务存在且成功推迟则返回 true,否则返回 false
937 ///
938 /// # Note
939 /// - Task ID remains unchanged after postponement
940 /// - Original completion_receiver remains valid
941 ///
942 /// # 注意
943 /// - 任务 ID 在推迟后保持不变
944 /// - 原始 completion_receiver 保持有效
945 ///
946 /// # Examples (示例)
947 ///
948 /// ## Keep original callback (保持原始回调)
949 /// ```no_run
950 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
951 /// use std::time::Duration;
952 ///
953 ///
954 /// #[tokio::main]
955 /// async fn main() {
956 /// let timer = TimerWheel::with_defaults();
957 ///
958 /// let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
959 /// println!("Timer fired!");
960 /// })));
961 /// let task_id = task.get_id();
962 /// let _handle = timer.register(task);
963 ///
964 /// // Postpone to 10 seconds after triggering, and keep original callback
965 /// // 推迟到 10 秒后触发,并保持原始回调
966 /// let success = timer.postpone(task_id, Duration::from_secs(10), None);
967 /// println!("Postponed successfully: {}", success);
968 /// }
969 /// ```
970 ///
971 /// ## Replace with new callback (替换为新的回调)
972 /// ```no_run
973 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
974 /// use std::time::Duration;
975 ///
976 /// #[tokio::main]
977 /// async fn main() {
978 /// let timer = TimerWheel::with_defaults();
979 ///
980 /// let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
981 /// println!("Original callback!");
982 /// })));
983 /// let task_id = task.get_id();
984 /// let _handle = timer.register(task);
985 ///
986 /// // Postpone to 10 seconds after triggering, and replace with new callback
987 /// // 推迟到 10 秒后触发,并替换为新的回调
988 /// let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
989 /// println!("New callback!");
990 /// })));
991 /// println!("Postponed successfully: {}", success);
992 /// }
993 /// ```
994 #[inline]
995 pub fn postpone(
996 &self,
997 task_id: TaskId,
998 new_delay: Duration,
999 callback: Option<CallbackWrapper>,
1000 ) -> bool {
1001 let mut wheel = self.wheel.lock();
1002 wheel.postpone(task_id, new_delay, callback)
1003 }
1004
1005 /// Batch postpone timers (keep original callbacks)
1006 ///
1007 /// # Parameters
1008 /// - `updates`: List of tuples of (task ID, new delay)
1009 ///
1010 /// # Returns
1011 /// Number of successfully postponed tasks
1012 ///
1013 /// 批量推迟定时器 (保持原始回调)
1014 ///
1015 /// # 参数
1016 /// - `updates`: (任务 ID, 新延迟) 元组列表
1017 ///
1018 /// # 返回值
1019 /// 成功推迟的任务数量
1020 ///
1021 /// # Note
1022 /// - This method keeps all tasks' original callbacks unchanged
1023 /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
1024 ///
1025 /// # 注意
1026 /// - 此方法保持所有任务的原始回调不变
1027 /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
1028 ///
1029 /// # Performance Advantages
1030 /// - Batch processing reduces lock contention
1031 /// - Internally optimized batch postponement operation
1032 ///
1033 /// # Examples (示例)
1034 /// ```no_run
1035 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
1036 /// use std::time::Duration;
1037 ///
1038 /// #[tokio::main]
1039 /// async fn main() {
1040 /// let timer = TimerWheel::with_defaults();
1041 ///
1042 /// // Create multiple tasks with callbacks
1043 /// // 创建多个带有回调的任务
1044 /// let task1 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
1045 /// println!("Task 1 fired!");
1046 /// })));
1047 /// let task2 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
1048 /// println!("Task 2 fired!");
1049 /// })));
1050 /// let task3 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
1051 /// println!("Task 3 fired!");
1052 /// })));
1053 ///
1054 /// let task_ids = vec![
1055 /// (task1.get_id(), Duration::from_secs(10)),
1056 /// (task2.get_id(), Duration::from_secs(15)),
1057 /// (task3.get_id(), Duration::from_secs(20)),
1058 /// ];
1059 ///
1060 /// timer.register(task1);
1061 /// timer.register(task2);
1062 /// timer.register(task3);
1063 ///
1064 /// // Batch postpone (keep original callbacks)
1065 /// // 批量推迟 (保持原始回调)
1066 /// let postponed = timer.postpone_batch(task_ids);
1067 /// println!("Postponed {} timers", postponed);
1068 /// }
1069 /// ```
1070 #[inline]
1071 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
1072 let mut wheel = self.wheel.lock();
1073 wheel.postpone_batch(updates)
1074 }
1075
1076 /// Batch postpone timers (replace callbacks)
1077 ///
1078 /// # Parameters
1079 /// - `updates`: List of tuples of (task ID, new delay, new callback)
1080 ///
1081 /// # Returns
1082 /// Number of successfully postponed tasks
1083 ///
1084 /// 批量推迟定时器 (替换回调)
1085 ///
1086 /// # 参数
1087 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
1088 ///
1089 /// # 返回值
1090 /// 成功推迟的任务数量
1091 ///
1092 /// # Performance Advantages
1093 /// - Batch processing reduces lock contention
1094 /// - Internally optimized batch postponement operation
1095 ///
1096 /// # Examples (示例)
1097 /// ```no_run
1098 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
1099 /// use std::time::Duration;
1100 /// use std::sync::Arc;
1101 /// use std::sync::atomic::{AtomicU32, Ordering};
1102 ///
1103 /// #[tokio::main]
1104 /// async fn main() {
1105 /// let timer = TimerWheel::with_defaults();
1106 /// let counter = Arc::new(AtomicU32::new(0));
1107 ///
1108 /// // Create multiple timers
1109 /// // 创建多个定时器
1110 /// let task1 = TimerWheel::create_task(Duration::from_secs(5), None);
1111 /// let task2 = TimerWheel::create_task(Duration::from_secs(5), None);
1112 ///
1113 /// let id1 = task1.get_id();
1114 /// let id2 = task2.get_id();
1115 ///
1116 /// timer.register(task1);
1117 /// timer.register(task2);
1118 ///
1119 /// // Batch postpone and replace callbacks
1120 /// // 批量推迟并替换回调
1121 /// let updates: Vec<_> = vec![id1, id2]
1122 /// .into_iter()
1123 /// .map(|id| {
1124 /// let counter = Arc::clone(&counter);
1125 /// (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
1126 /// let counter = Arc::clone(&counter);
1127 /// async move { counter.fetch_add(1, Ordering::SeqCst); }
1128 /// })))
1129 /// })
1130 /// .collect();
1131 /// let postponed = timer.postpone_batch_with_callbacks(updates);
1132 /// println!("Postponed {} timers", postponed);
1133 /// }
1134 /// ```
1135 #[inline]
1136 pub fn postpone_batch_with_callbacks(
1137 &self,
1138 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
1139 ) -> usize {
1140 let mut wheel = self.wheel.lock();
1141 wheel.postpone_batch_with_callbacks(updates.to_vec())
1142 }
1143
1144 /// Core tick loop
1145 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
1146 let mut interval = tokio::time::interval(tick_duration);
1147 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1148
1149 loop {
1150 interval.tick().await;
1151
1152 // Advance timing wheel and get expired tasks
1153 let expired_tasks = {
1154 let mut wheel_guard = wheel.lock();
1155 wheel_guard.advance()
1156 };
1157
1158 // Execute expired tasks
1159 for task in expired_tasks {
1160 let callback = task.get_callback();
1161
1162 // Move task ownership to get completion_notifier
1163 let notifier = task.completion_notifier;
1164
1165 // Only registered tasks have notifier
1166 if let Some(notifier) = notifier {
1167 // Execute callback in a separate tokio task, and send notification after callback completion
1168 if let Some(callback) = callback {
1169 tokio::spawn(async move {
1170 // Execute callback
1171 let future = callback.call();
1172 future.await;
1173
1174 // Send notification after callback completion
1175 let _ = notifier.0.send(TaskCompletionReason::Expired);
1176 });
1177 } else {
1178 // Send notification immediately if no callback
1179 let _ = notifier.0.send(TaskCompletionReason::Expired);
1180 }
1181 }
1182 }
1183 }
1184 }
1185
1186 /// Graceful shutdown of TimerWheel
1187 ///
1188 /// 优雅关闭 TimerWheel
1189 ///
1190 /// # Examples (示例)
1191 /// ```no_run
1192 /// # use kestrel_timer::TimerWheel;
1193 /// # #[tokio::main]
1194 /// # async fn main() {
1195 /// let timer = TimerWheel::with_defaults();
1196 ///
1197 /// // Use timer... (使用定时器...)
1198 ///
1199 /// timer.shutdown().await;
1200 /// # }
1201 /// ```
1202 pub async fn shutdown(mut self) {
1203 if let Some(handle) = self.tick_handle.take() {
1204 handle.abort();
1205 let _ = handle.await;
1206 }
1207 }
1208}
1209
1210impl Drop for TimerWheel {
1211 fn drop(&mut self) {
1212 if let Some(handle) = self.tick_handle.take() {
1213 handle.abort();
1214 }
1215 }
1216}
1217
1218#[cfg(test)]
1219mod tests {
1220 use super::*;
1221 use std::sync::atomic::{AtomicU32, Ordering};
1222
1223 #[tokio::test]
1224 async fn test_timer_creation() {
1225 let _timer = TimerWheel::with_defaults();
1226 }
1227
1228 #[tokio::test]
1229 async fn test_schedule_once() {
1230 use std::sync::Arc;
1231 let timer = TimerWheel::with_defaults();
1232 let counter = Arc::new(AtomicU32::new(0));
1233 let counter_clone = Arc::clone(&counter);
1234
1235 let task = TimerWheel::create_task(
1236 Duration::from_millis(50),
1237 Some(CallbackWrapper::new(move || {
1238 let counter = Arc::clone(&counter_clone);
1239 async move {
1240 counter.fetch_add(1, Ordering::SeqCst);
1241 }
1242 })),
1243 );
1244 let _handle = timer.register(task);
1245
1246 // Wait for timer to trigger
1247 // 等待定时器触发
1248 tokio::time::sleep(Duration::from_millis(100)).await;
1249 assert_eq!(counter.load(Ordering::SeqCst), 1);
1250 }
1251
1252 #[tokio::test]
1253 async fn test_cancel_timer() {
1254 use std::sync::Arc;
1255 let timer = TimerWheel::with_defaults();
1256 let counter = Arc::new(AtomicU32::new(0));
1257 let counter_clone = Arc::clone(&counter);
1258
1259 let task = TimerWheel::create_task(
1260 Duration::from_millis(100),
1261 Some(CallbackWrapper::new(move || {
1262 let counter = Arc::clone(&counter_clone);
1263 async move {
1264 counter.fetch_add(1, Ordering::SeqCst);
1265 }
1266 })),
1267 );
1268 let handle = timer.register(task);
1269
1270 // Immediately cancel
1271 // 立即取消
1272 let cancel_result = handle.cancel();
1273 assert!(cancel_result);
1274
1275 // Wait for enough time to ensure timer does not trigger
1276 // 等待足够时间确保定时器不触发
1277 tokio::time::sleep(Duration::from_millis(200)).await;
1278 assert_eq!(counter.load(Ordering::SeqCst), 0);
1279 }
1280
1281 #[tokio::test]
1282 async fn test_cancel_immediate() {
1283 use std::sync::Arc;
1284 let timer = TimerWheel::with_defaults();
1285 let counter = Arc::new(AtomicU32::new(0));
1286 let counter_clone = Arc::clone(&counter);
1287
1288 let task = TimerWheel::create_task(
1289 Duration::from_millis(100),
1290 Some(CallbackWrapper::new(move || {
1291 let counter = Arc::clone(&counter_clone);
1292 async move {
1293 counter.fetch_add(1, Ordering::SeqCst);
1294 }
1295 })),
1296 );
1297 let handle = timer.register(task);
1298
1299 // Immediately cancel
1300 // 立即取消
1301 let cancel_result = handle.cancel();
1302 assert!(cancel_result);
1303
1304 // Wait for enough time to ensure timer does not trigger
1305 // 等待足够时间确保定时器不触发
1306 tokio::time::sleep(Duration::from_millis(200)).await;
1307 assert_eq!(counter.load(Ordering::SeqCst), 0);
1308 }
1309
1310 #[tokio::test]
1311 async fn test_postpone_timer() {
1312 use std::sync::Arc;
1313 let timer = TimerWheel::with_defaults();
1314 let counter = Arc::new(AtomicU32::new(0));
1315 let counter_clone = Arc::clone(&counter);
1316
1317 let task = TimerWheel::create_task(
1318 Duration::from_millis(50),
1319 Some(CallbackWrapper::new(move || {
1320 let counter = Arc::clone(&counter_clone);
1321 async move {
1322 counter.fetch_add(1, Ordering::SeqCst);
1323 }
1324 })),
1325 );
1326 let task_id = task.get_id();
1327 let handle = timer.register(task);
1328
1329 // Postpone task to 150ms
1330 // 推迟任务到 150ms
1331 let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
1332 assert!(postponed);
1333
1334 // Wait for original time 50ms, task should not trigger
1335 // 等待原始时间 50ms,任务不应触发
1336 tokio::time::sleep(Duration::from_millis(70)).await;
1337 assert_eq!(counter.load(Ordering::SeqCst), 0);
1338
1339 // Wait for new trigger time (from postponed start, need to wait about 150ms)
1340 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1341 let result = tokio::time::timeout(
1342 Duration::from_millis(200),
1343 handle.into_completion_receiver().0
1344 ).await;
1345 assert!(result.is_ok());
1346
1347 // Wait for callback to execute
1348 // 等待回调执行
1349 tokio::time::sleep(Duration::from_millis(20)).await;
1350 assert_eq!(counter.load(Ordering::SeqCst), 1);
1351 }
1352
1353 #[tokio::test]
1354 async fn test_postpone_with_callback() {
1355 use std::sync::Arc;
1356 let timer = TimerWheel::with_defaults();
1357 let counter = Arc::new(AtomicU32::new(0));
1358 let counter_clone1 = Arc::clone(&counter);
1359 let counter_clone2 = Arc::clone(&counter);
1360
1361 // Create task, original callback adds 1
1362 let task = TimerWheel::create_task(
1363 Duration::from_millis(50),
1364 Some(CallbackWrapper::new(move || {
1365 let counter = Arc::clone(&counter_clone1);
1366 async move {
1367 counter.fetch_add(1, Ordering::SeqCst);
1368 }
1369 })),
1370 );
1371 let task_id = task.get_id();
1372 let handle = timer.register(task);
1373
1374 // Postpone task and replace callback, new callback adds 10
1375 // 推迟任务并替换回调,新回调增加 10
1376 let postponed = timer.postpone(
1377 task_id,
1378 Duration::from_millis(100),
1379 Some(CallbackWrapper::new(move || {
1380 let counter = Arc::clone(&counter_clone2);
1381 async move {
1382 counter.fetch_add(10, Ordering::SeqCst);
1383 }
1384 })),
1385 );
1386 assert!(postponed);
1387
1388 // Wait for task to trigger (after postponed, need to wait 100ms, plus margin)
1389 // 等待任务触发(推迟后,需要等待 100ms,加上余量)
1390 let result = tokio::time::timeout(
1391 Duration::from_millis(200),
1392 handle.into_completion_receiver().0
1393 ).await;
1394 assert!(result.is_ok());
1395
1396 // Wait for callback to execute
1397 // 等待回调执行
1398 tokio::time::sleep(Duration::from_millis(20)).await;
1399
1400 // Verify new callback is executed (increased 10 instead of 1)
1401 // 验证新回调已执行(增加 10 而不是 1)
1402 assert_eq!(counter.load(Ordering::SeqCst), 10);
1403 }
1404
1405 #[tokio::test]
1406 async fn test_postpone_nonexistent_timer() {
1407 let timer = TimerWheel::with_defaults();
1408
1409 // Try to postpone nonexistent task
1410 // 尝试推迟一个不存在的任务
1411 let fake_task = TimerWheel::create_task(Duration::from_millis(50), None);
1412 let fake_task_id = fake_task.get_id();
1413 // Do not register this task
1414 // 不注册这个任务
1415 let postponed = timer.postpone(fake_task_id, Duration::from_millis(100), None);
1416 assert!(!postponed);
1417 }
1418
1419 #[tokio::test]
1420 async fn test_postpone_batch() {
1421 use std::sync::Arc;
1422 let timer = TimerWheel::with_defaults();
1423 let counter = Arc::new(AtomicU32::new(0));
1424
1425 // Create 3 tasks
1426 // 创建 3 个任务
1427 let mut task_ids = Vec::new();
1428 for _ in 0..3 {
1429 let counter_clone = Arc::clone(&counter);
1430 let task = TimerWheel::create_task(
1431 Duration::from_millis(50),
1432 Some(CallbackWrapper::new(move || {
1433 let counter = Arc::clone(&counter_clone);
1434 async move {
1435 counter.fetch_add(1, Ordering::SeqCst);
1436 }
1437 })),
1438 );
1439 task_ids.push((task.get_id(), Duration::from_millis(150)));
1440 timer.register(task);
1441 }
1442
1443 // Batch postpone
1444 // 批量推迟
1445 let postponed = timer.postpone_batch(task_ids);
1446 assert_eq!(postponed, 3);
1447
1448 // Wait for original time 50ms, task should not trigger
1449 // 等待原始时间 50ms,任务不应触发
1450 tokio::time::sleep(Duration::from_millis(70)).await;
1451 assert_eq!(counter.load(Ordering::SeqCst), 0);
1452
1453 // Wait for new trigger time (from postponed start, need to wait about 150ms)
1454 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1455 tokio::time::sleep(Duration::from_millis(200)).await;
1456
1457 // Wait for callback to execute
1458 // 等待回调执行
1459 tokio::time::sleep(Duration::from_millis(20)).await;
1460 assert_eq!(counter.load(Ordering::SeqCst), 3);
1461 }
1462
1463 #[tokio::test]
1464 async fn test_postpone_batch_with_callbacks() {
1465 use std::sync::Arc;
1466 let timer = TimerWheel::with_defaults();
1467 let counter = Arc::new(AtomicU32::new(0));
1468
1469 // Create 3 tasks
1470 // 创建 3 个任务
1471 let mut task_ids = Vec::new();
1472 for _ in 0..3 {
1473 let task = TimerWheel::create_task(
1474 Duration::from_millis(50),
1475 None
1476 );
1477 task_ids.push(task.get_id());
1478 timer.register(task);
1479 }
1480
1481 // Batch postpone and replace callbacks
1482 // 批量推迟并替换回调
1483 let updates: Vec<_> = task_ids
1484 .into_iter()
1485 .map(|id| {
1486 let counter_clone = Arc::clone(&counter);
1487 (id, Duration::from_millis(150), Some(CallbackWrapper::new(move || {
1488 let counter = Arc::clone(&counter_clone);
1489 async move {
1490 counter.fetch_add(1, Ordering::SeqCst);
1491 }
1492 })))
1493 })
1494 .collect();
1495
1496 // Batch postpone and replace callbacks
1497 // 批量推迟并替换回调
1498 let postponed = timer.postpone_batch_with_callbacks(updates);
1499 assert_eq!(postponed, 3);
1500
1501 // Wait for original time 50ms, task should not trigger
1502 // 等待原始时间 50ms,任务不应触发
1503 tokio::time::sleep(Duration::from_millis(70)).await;
1504 assert_eq!(counter.load(Ordering::SeqCst), 0);
1505
1506 // Wait for new trigger time (from postponed start, need to wait about 150ms)
1507 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1508 tokio::time::sleep(Duration::from_millis(200)).await;
1509
1510 // Wait for callback to execute
1511 // 等待回调执行
1512 tokio::time::sleep(Duration::from_millis(20)).await;
1513 assert_eq!(counter.load(Ordering::SeqCst), 3);
1514 }
1515
1516 #[tokio::test]
1517 async fn test_postpone_keeps_completion_receiver_valid() {
1518 use std::sync::Arc;
1519 let timer = TimerWheel::with_defaults();
1520 let counter = Arc::new(AtomicU32::new(0));
1521 let counter_clone = Arc::clone(&counter);
1522
1523 let task = TimerWheel::create_task(
1524 Duration::from_millis(50),
1525 Some(CallbackWrapper::new(move || {
1526 let counter = Arc::clone(&counter_clone);
1527 async move {
1528 counter.fetch_add(1, Ordering::SeqCst);
1529 }
1530 })),
1531 );
1532 let task_id = task.get_id();
1533 let handle = timer.register(task);
1534
1535 // Postpone task
1536 // 推迟任务
1537 timer.postpone(task_id, Duration::from_millis(100), None);
1538
1539 // Verify original completion_receiver is still valid (after postponed, need to wait 100ms, plus margin)
1540 // 验证原始完成接收器是否仍然有效(推迟后,需要等待 100ms,加上余量)
1541 let result = tokio::time::timeout(
1542 Duration::from_millis(200),
1543 handle.into_completion_receiver().0
1544 ).await;
1545 assert!(result.is_ok(), "Completion receiver should still work after postpone");
1546
1547 // Wait for callback to execute
1548 tokio::time::sleep(Duration::from_millis(20)).await;
1549 assert_eq!(counter.load(Ordering::SeqCst), 1);
1550 }
1551}
1552