kestrel_timer/timer.rs
1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskId};
5use crate::wheel::Wheel;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::task::JoinHandle;
10use handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
11
12/// Timing Wheel Timer Manager
13///
14/// 时间轮定时器管理器
15pub struct TimerWheel {
16 /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
17 ///
18 /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
19 wheel: Arc<Mutex<Wheel>>,
20
21 /// Background tick loop task handle
22 ///
23 /// 后台 tick 循环任务句柄
24 tick_handle: Option<JoinHandle<()>>,
25}
26
27impl TimerWheel {
28 /// Create a new timer manager
29 ///
30 /// # Parameters
31 /// - `config`: Timing wheel configuration, already validated
32 ///
33 /// 创建新的定时器管理器
34 ///
35 /// # 参数
36 /// - `config`: 时间轮配置,已验证
37 ///
38 /// # Examples (示例)
39 /// ```no_run
40 /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
41 /// use std::time::Duration;
42 ///
43 /// #[tokio::main]
44 /// async fn main() {
45 /// let config = WheelConfig::builder()
46 /// .l0_tick_duration(Duration::from_millis(10))
47 /// .l0_slot_count(512)
48 /// .l1_tick_duration(Duration::from_secs(1))
49 /// .l1_slot_count(64)
50 /// .build()
51 /// .unwrap();
52 /// let timer = TimerWheel::new(config, BatchConfig::default());
53 ///
54 /// // Use two-step API
55 /// // (Use two-step API)
56 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
57 /// let handle = timer.register(task);
58 /// }
59 /// ```
60 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
61 let tick_duration = config.l0_tick_duration;
62 let wheel = Wheel::new(config, batch_config);
63 let wheel = Arc::new(Mutex::new(wheel));
64 let wheel_clone = wheel.clone();
65
66 // Start background tick loop
67 // 启动后台 tick 循环
68 let tick_handle = tokio::spawn(async move {
69 Self::tick_loop(wheel_clone, tick_duration).await;
70 });
71
72 Self {
73 wheel,
74 tick_handle: Some(tick_handle),
75 }
76 }
77
78 /// Create timer manager with default configuration, hierarchical mode
79 /// - L0 layer tick duration: 10ms, slot count: 512
80 /// - L1 layer tick duration: 1s, slot count: 64
81 ///
82 /// # Parameters
83 /// - `config`: Timing wheel configuration, already validated
84 ///
85 /// # Returns
86 /// Timer manager instance
87 ///
88 /// 使用默认配置创建定时器管理器,分层模式
89 /// - L0 层 tick 持续时间:10ms,槽数量:512
90 /// - L1 层 tick 持续时间:1s,槽数量:64
91 ///
92 /// # 参数
93 /// - `config`: 时间轮配置,已验证
94 ///
95 /// # 返回值
96 /// 定时器管理器实例
97 ///
98 /// # Examples (示例)
99 /// ```no_run
100 /// use kestrel_timer::TimerWheel;
101 ///
102 /// #[tokio::main]
103 /// async fn main() {
104 /// let timer = TimerWheel::with_defaults();
105 /// }
106 /// ```
107 pub fn with_defaults() -> Self {
108 Self::new(WheelConfig::default(), BatchConfig::default())
109 }
110
111 /// Create TimerService bound to this timing wheel with default configuration
112 ///
113 /// # Parameters
114 /// - `service_config`: Service configuration
115 ///
116 /// # Returns
117 /// TimerService instance bound to this timing wheel
118 ///
119 /// 创建绑定到此时间轮的 TimerService,使用默认配置
120 ///
121 /// # 参数
122 /// - `service_config`: 服务配置
123 ///
124 /// # 返回值
125 /// 绑定到此时间轮的 TimerService 实例
126 ///
127 /// # Examples (示例)
128 /// ```no_run
129 /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
130 /// use std::time::Duration;
131 ///
132 ///
133 /// #[tokio::main]
134 /// async fn main() {
135 /// let timer = TimerWheel::with_defaults();
136 /// let mut service = timer.create_service(ServiceConfig::default());
137 ///
138 /// // Use two-step API to batch schedule timers through service
139 /// // 使用两步 API 通过服务批量调度定时器
140 /// let tasks: Vec<_> = (0..5)
141 /// .map(|_| {
142 /// use kestrel_timer::TimerTask;
143 /// TimerTask::new_oneshot(Duration::from_millis(100), Some(CallbackWrapper::new(|| async {})))
144 /// })
145 /// .collect();
146 /// service.register_batch(tasks).unwrap();
147 ///
148 /// // Receive timeout notifications
149 /// // 接收超时通知
150 /// let mut rx = service.take_receiver().unwrap();
151 /// while let Some(task_id) = rx.recv().await {
152 /// println!("Task {:?} completed", task_id);
153 /// }
154 /// }
155 /// ```
156 pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
157 crate::service::TimerService::new(self.wheel.clone(), service_config)
158 }
159
160 /// Create TimerService bound to this timing wheel with custom configuration
161 ///
162 /// # Parameters
163 /// - `config`: Service configuration
164 ///
165 /// # Returns
166 /// TimerService instance bound to this timing wheel
167 ///
168 /// 创建绑定到此时间轮的 TimerService,使用自定义配置
169 ///
170 /// # 参数
171 /// - `config`: 服务配置
172 ///
173 /// # 返回值
174 /// 绑定到此时间轮的 TimerService 实例
175 ///
176 /// # Examples (示例)
177 /// ```no_run
178 /// use kestrel_timer::{TimerWheel, config::ServiceConfig, TimerTask};
179 /// use std::num::NonZeroUsize;
180 ///
181 /// #[tokio::main]
182 /// async fn main() {
183 /// let timer = TimerWheel::with_defaults();
184 /// let config = ServiceConfig::builder()
185 /// .command_channel_capacity(NonZeroUsize::new(1024).unwrap())
186 /// .timeout_channel_capacity(NonZeroUsize::new(2000).unwrap())
187 /// .build();
188 /// let service = timer.create_service_with_config(config);
189 /// }
190 /// ```
191 pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
192 crate::service::TimerService::new(self.wheel.clone(), config)
193 }
194
195 /// Register timer task to timing wheel (registration phase)
196 ///
197 /// # Parameters
198 /// - `task`: Task created via `create_task()`
199 ///
200 /// # Returns
201 /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
202 ///
203 /// 注册定时器任务到时间轮 (注册阶段)
204 ///
205 /// # 参数
206 /// - `task`: 通过 `create_task()` 创建的任务
207 ///
208 /// # 返回值
209 /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
210 ///
211 /// # Examples (示例)
212 /// ```no_run
213 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
214 ///
215 /// use std::time::Duration;
216 ///
217 /// #[tokio::main]
218 /// async fn main() {
219 /// let timer = TimerWheel::with_defaults();
220 ///
221 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
222 /// println!("Timer fired!");
223 /// })));
224 /// let task_id = task.get_id();
225 ///
226 /// // Register task
227 /// // 注册任务
228 /// let handle = timer.register(task);
229 ///
230 /// // Wait for timer completion
231 /// // 等待定时器完成
232 /// use kestrel_timer::CompletionReceiver;
233 /// let (rx, _handle) = handle.into_parts();
234 /// match rx {
235 /// CompletionReceiver::OneShot(receiver) => {
236 /// receiver.wait().await;
237 /// },
238 /// _ => {}
239 /// }
240 /// }
241 /// ```
242 #[inline]
243 pub fn register(&self, task: crate::task::TimerTask) -> TimerHandleWithCompletion {
244 let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
245
246 let task_id = task.id;
247
248 // 单次加锁完成所有操作
249 {
250 let mut wheel_guard = self.wheel.lock();
251 wheel_guard.insert(task);
252 }
253
254 TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
255 }
256
257 /// Batch register timer tasks to timing wheel (registration phase)
258 ///
259 /// # Parameters
260 /// - `tasks`: List of tasks created via `create_batch()`
261 ///
262 /// # Returns
263 /// Return batch timer handle with completion receivers
264 ///
265 /// 批量注册定时器任务到时间轮 (注册阶段)
266 ///
267 /// # 参数
268 /// - `tasks`: 通过 `create_batch()` 创建的任务列表
269 ///
270 /// # 返回值
271 /// 返回包含完成通知接收器的批量定时器句柄
272 ///
273 /// # Examples (示例)
274 /// ```no_run
275 /// use kestrel_timer::{TimerWheel, TimerTask};
276 /// use std::time::Duration;
277 ///
278 /// #[tokio::main]
279 /// async fn main() {
280 /// let timer = TimerWheel::with_defaults();
281 ///
282 /// let tasks: Vec<_> = (0..3)
283 /// .map(|_| TimerTask::new_oneshot(Duration::from_secs(1), None))
284 /// .collect();
285 ///
286 /// let batch = timer.register_batch(tasks);
287 /// println!("Registered {} timers", batch.len());
288 /// }
289 /// ```
290 #[inline]
291 pub fn register_batch(&self, tasks: Vec<crate::task::TimerTask>) -> BatchHandleWithCompletion {
292 let task_count = tasks.len();
293 let mut completion_rxs = Vec::with_capacity(task_count);
294 let mut task_ids = Vec::with_capacity(task_count);
295 let mut prepared_tasks = Vec::with_capacity(task_count);
296
297 // Step 1: Prepare all channels and notifiers
298 for task in tasks {
299 let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
300 task_ids.push(task.id);
301 completion_rxs.push(completion_rx);
302 prepared_tasks.push(task);
303 }
304
305 // Step 2: Single lock, batch insert
306 {
307 let mut wheel_guard = self.wheel.lock();
308 wheel_guard.insert_batch(prepared_tasks);
309 }
310
311 BatchHandleWithCompletion::new(BatchHandle::new(task_ids, self.wheel.clone()), completion_rxs)
312 }
313
314 /// Cancel timer
315 ///
316 /// # Parameters
317 /// - `task_id`: Task ID
318 ///
319 /// # Returns
320 /// Returns true if task exists and is successfully cancelled, otherwise false
321 ///
322 /// 取消定时器
323 ///
324 /// # 参数
325 /// - `task_id`: 任务 ID
326 ///
327 /// # 返回值
328 /// 如果任务存在且成功取消则返回 true,否则返回 false
329 ///
330 /// # Examples (示例)
331 /// ```no_run
332 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
333 ///
334 /// use std::time::Duration;
335 ///
336 /// #[tokio::main]
337 /// async fn main() {
338 /// let timer = TimerWheel::with_defaults();
339 ///
340 /// let task = TimerTask::new_oneshot(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
341 /// println!("Timer fired!");
342 /// })));
343 /// let task_id = task.get_id();
344 /// let _handle = timer.register(task);
345 ///
346 /// // Cancel task using task ID
347 /// // 使用任务 ID 取消任务
348 /// let cancelled = timer.cancel(task_id);
349 /// println!("Canceled successfully: {}", cancelled);
350 /// }
351 /// ```
352 #[inline]
353 pub fn cancel(&self, task_id: TaskId) -> bool {
354 let mut wheel = self.wheel.lock();
355 wheel.cancel(task_id)
356 }
357
358 /// Batch cancel timers
359 ///
360 /// # Parameters
361 /// - `task_ids`: List of task IDs to cancel
362 ///
363 /// # Returns
364 /// Number of successfully cancelled tasks
365 ///
366 /// 批量取消定时器
367 ///
368 /// # 参数
369 /// - `task_ids`: 要取消的任务 ID 列表
370 ///
371 /// # 返回值
372 /// 成功取消的任务数量
373 ///
374 /// # Performance Advantages
375 /// - Batch processing reduces lock contention
376 /// - Internally optimized batch cancellation operation
377 ///
378 /// # Examples (示例)
379 /// ```no_run
380 /// use kestrel_timer::{TimerWheel, TimerTask};
381 /// use std::time::Duration;
382 ///
383 /// #[tokio::main]
384 /// async fn main() {
385 /// let timer = TimerWheel::with_defaults();
386 ///
387 /// // Create multiple timers
388 /// // 创建多个定时器
389 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
390 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
391 /// let task3 = TimerTask::new_oneshot(Duration::from_secs(10), None);
392 ///
393 /// let task_ids = vec![task1.get_id(), task2.get_id(), task3.get_id()];
394 ///
395 /// let _h1 = timer.register(task1);
396 /// let _h2 = timer.register(task2);
397 /// let _h3 = timer.register(task3);
398 ///
399 /// // Batch cancel
400 /// // 批量取消
401 /// let cancelled = timer.cancel_batch(&task_ids);
402 /// println!("Canceled {} timers", cancelled);
403 /// }
404 /// ```
405 #[inline]
406 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
407 let mut wheel = self.wheel.lock();
408 wheel.cancel_batch(task_ids)
409 }
410
411 /// Postpone timer
412 ///
413 /// # Parameters
414 /// - `task_id`: Task ID to postpone
415 /// - `new_delay`: New delay duration, recalculated from current time
416 /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
417 ///
418 /// # Returns
419 /// Returns true if task exists and is successfully postponed, otherwise false
420 ///
421 /// 推迟定时器
422 ///
423 /// # 参数
424 /// - `task_id`: 要推迟的任务 ID
425 /// - `new_delay`: 新的延迟时间,从当前时间重新计算
426 /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
427 ///
428 /// # 返回值
429 /// 如果任务存在且成功推迟则返回 true,否则返回 false
430 ///
431 /// # Note
432 /// - Task ID remains unchanged after postponement
433 /// - Original completion_receiver remains valid
434 ///
435 /// # 注意
436 /// - 任务 ID 在推迟后保持不变
437 /// - 原始 completion_receiver 保持有效
438 ///
439 /// # Examples (示例)
440 ///
441 /// ## Keep original callback (保持原始回调)
442 /// ```no_run
443 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
444 /// use std::time::Duration;
445 ///
446 ///
447 /// #[tokio::main]
448 /// async fn main() {
449 /// let timer = TimerWheel::with_defaults();
450 ///
451 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
452 /// println!("Timer fired!");
453 /// })));
454 /// let task_id = task.get_id();
455 /// let _handle = timer.register(task);
456 ///
457 /// // Postpone to 10 seconds after triggering, and keep original callback
458 /// // 推迟到 10 秒后触发,并保持原始回调
459 /// let success = timer.postpone(task_id, Duration::from_secs(10), None);
460 /// println!("Postponed successfully: {}", success);
461 /// }
462 /// ```
463 ///
464 /// ## Replace with new callback (替换为新的回调)
465 /// ```no_run
466 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
467 /// use std::time::Duration;
468 ///
469 /// #[tokio::main]
470 /// async fn main() {
471 /// let timer = TimerWheel::with_defaults();
472 ///
473 /// let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
474 /// println!("Original callback!");
475 /// })));
476 /// let task_id = task.get_id();
477 /// let _handle = timer.register(task);
478 ///
479 /// // Postpone to 10 seconds after triggering, and replace with new callback
480 /// // 推迟到 10 秒后触发,并替换为新的回调
481 /// let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
482 /// println!("New callback!");
483 /// })));
484 /// println!("Postponed successfully: {}", success);
485 /// }
486 /// ```
487 #[inline]
488 pub fn postpone(
489 &self,
490 task_id: TaskId,
491 new_delay: Duration,
492 callback: Option<CallbackWrapper>,
493 ) -> bool {
494 let mut wheel = self.wheel.lock();
495 wheel.postpone(task_id, new_delay, callback)
496 }
497
498 /// Batch postpone timers (keep original callbacks)
499 ///
500 /// # Parameters
501 /// - `updates`: List of tuples of (task ID, new delay)
502 ///
503 /// # Returns
504 /// Number of successfully postponed tasks
505 ///
506 /// 批量推迟定时器 (保持原始回调)
507 ///
508 /// # 参数
509 /// - `updates`: (任务 ID, 新延迟) 元组列表
510 ///
511 /// # 返回值
512 /// 成功推迟的任务数量
513 ///
514 /// # Note
515 /// - This method keeps all tasks' original callbacks unchanged
516 /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
517 ///
518 /// # 注意
519 /// - 此方法保持所有任务的原始回调不变
520 /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
521 ///
522 /// # Performance Advantages
523 /// - Batch processing reduces lock contention
524 /// - Internally optimized batch postponement operation
525 ///
526 /// # Examples (示例)
527 /// ```no_run
528 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
529 /// use std::time::Duration;
530 ///
531 /// #[tokio::main]
532 /// async fn main() {
533 /// let timer = TimerWheel::with_defaults();
534 ///
535 /// // Create multiple tasks with callbacks
536 /// // 创建多个带有回调的任务
537 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
538 /// println!("Task 1 fired!");
539 /// })));
540 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
541 /// println!("Task 2 fired!");
542 /// })));
543 /// let task3 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
544 /// println!("Task 3 fired!");
545 /// })));
546 ///
547 /// let task_ids = vec![
548 /// (task1.get_id(), Duration::from_secs(10)),
549 /// (task2.get_id(), Duration::from_secs(15)),
550 /// (task3.get_id(), Duration::from_secs(20)),
551 /// ];
552 ///
553 /// timer.register(task1);
554 /// timer.register(task2);
555 /// timer.register(task3);
556 ///
557 /// // Batch postpone (keep original callbacks)
558 /// // 批量推迟 (保持原始回调)
559 /// let postponed = timer.postpone_batch(task_ids);
560 /// println!("Postponed {} timers", postponed);
561 /// }
562 /// ```
563 #[inline]
564 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
565 let mut wheel = self.wheel.lock();
566 wheel.postpone_batch(updates)
567 }
568
569 /// Batch postpone timers (replace callbacks)
570 ///
571 /// # Parameters
572 /// - `updates`: List of tuples of (task ID, new delay, new callback)
573 ///
574 /// # Returns
575 /// Number of successfully postponed tasks
576 ///
577 /// 批量推迟定时器 (替换回调)
578 ///
579 /// # 参数
580 /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
581 ///
582 /// # 返回值
583 /// 成功推迟的任务数量
584 ///
585 /// # Performance Advantages
586 /// - Batch processing reduces lock contention
587 /// - Internally optimized batch postponement operation
588 ///
589 /// # Examples (示例)
590 /// ```no_run
591 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
592 /// use std::time::Duration;
593 /// use std::sync::Arc;
594 /// use std::sync::atomic::{AtomicU32, Ordering};
595 ///
596 /// #[tokio::main]
597 /// async fn main() {
598 /// let timer = TimerWheel::with_defaults();
599 /// let counter = Arc::new(AtomicU32::new(0));
600 ///
601 /// // Create multiple timers
602 /// // 创建多个定时器
603 /// let task1 = TimerTask::new_oneshot(Duration::from_secs(5), None);
604 /// let task2 = TimerTask::new_oneshot(Duration::from_secs(5), None);
605 ///
606 /// let id1 = task1.get_id();
607 /// let id2 = task2.get_id();
608 ///
609 /// timer.register(task1);
610 /// timer.register(task2);
611 ///
612 /// // Batch postpone and replace callbacks
613 /// // 批量推迟并替换回调
614 /// let updates: Vec<_> = vec![id1, id2]
615 /// .into_iter()
616 /// .map(|id| {
617 /// let counter = Arc::clone(&counter);
618 /// (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
619 /// let counter = Arc::clone(&counter);
620 /// async move { counter.fetch_add(1, Ordering::SeqCst); }
621 /// })))
622 /// })
623 /// .collect();
624 /// let postponed = timer.postpone_batch_with_callbacks(updates);
625 /// println!("Postponed {} timers", postponed);
626 /// }
627 /// ```
628 #[inline]
629 pub fn postpone_batch_with_callbacks(
630 &self,
631 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
632 ) -> usize {
633 let mut wheel = self.wheel.lock();
634 wheel.postpone_batch_with_callbacks(updates.to_vec())
635 }
636
637 /// Core tick loop
638 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
639 let mut interval = tokio::time::interval(tick_duration);
640 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
641
642 loop {
643 interval.tick().await;
644
645 // Advance timing wheel and get expired tasks
646 // Note: wheel.advance() already handles completion notifications
647 let expired_tasks = {
648 let mut wheel_guard = wheel.lock();
649 wheel_guard.advance()
650 };
651
652 // Execute callbacks for expired tasks
653 // Notifications have already been sent by wheel.advance()
654 for task in expired_tasks {
655 if let Some(callback) = task.callback {
656 // Spawn callback execution in a separate tokio task
657 tokio::spawn(async move {
658 let future = callback.call();
659 future.await;
660 });
661 }
662 }
663 }
664 }
665
666 /// Graceful shutdown of TimerWheel
667 ///
668 /// 优雅关闭 TimerWheel
669 ///
670 /// # Examples (示例)
671 /// ```no_run
672 /// # use kestrel_timer::TimerWheel;
673 /// # #[tokio::main]
674 /// # async fn main() {
675 /// let timer = TimerWheel::with_defaults();
676 ///
677 /// // Use timer... (使用定时器...)
678 ///
679 /// timer.shutdown().await;
680 /// # }
681 /// ```
682 pub async fn shutdown(mut self) {
683 if let Some(handle) = self.tick_handle.take() {
684 handle.abort();
685 let _ = handle.await;
686 }
687 }
688}
689
690impl Drop for TimerWheel {
691 fn drop(&mut self) {
692 if let Some(handle) = self.tick_handle.take() {
693 handle.abort();
694 }
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701 use std::sync::atomic::{AtomicU32, Ordering};
702 use crate::task::TimerTask;
703
704 #[tokio::test]
705 async fn test_timer_creation() {
706 let _timer = TimerWheel::with_defaults();
707 }
708
709 #[tokio::test]
710 async fn test_schedule_once() {
711 use std::sync::Arc;
712 let timer = TimerWheel::with_defaults();
713 let counter = Arc::new(AtomicU32::new(0));
714 let counter_clone = Arc::clone(&counter);
715
716 let task = TimerTask::new_oneshot(
717 Duration::from_millis(50),
718 Some(CallbackWrapper::new(move || {
719 let counter = Arc::clone(&counter_clone);
720 async move {
721 counter.fetch_add(1, Ordering::SeqCst);
722 }
723 })),
724 );
725 let _handle = timer.register(task);
726
727 // Wait for timer to trigger
728 // 等待定时器触发
729 tokio::time::sleep(Duration::from_millis(100)).await;
730 assert_eq!(counter.load(Ordering::SeqCst), 1);
731 }
732
733 #[tokio::test]
734 async fn test_cancel_timer() {
735 use std::sync::Arc;
736 let timer = TimerWheel::with_defaults();
737 let counter = Arc::new(AtomicU32::new(0));
738 let counter_clone = Arc::clone(&counter);
739
740 let task = TimerTask::new_oneshot(
741 Duration::from_millis(100),
742 Some(CallbackWrapper::new(move || {
743 let counter = Arc::clone(&counter_clone);
744 async move {
745 counter.fetch_add(1, Ordering::SeqCst);
746 }
747 })),
748 );
749 let handle = timer.register(task);
750
751 // Immediately cancel
752 // 立即取消
753 let cancel_result = handle.cancel();
754 assert!(cancel_result);
755
756 // Wait for enough time to ensure timer does not trigger
757 // 等待足够时间确保定时器不触发
758 tokio::time::sleep(Duration::from_millis(200)).await;
759 assert_eq!(counter.load(Ordering::SeqCst), 0);
760 }
761
762 #[tokio::test]
763 async fn test_cancel_immediate() {
764 use std::sync::Arc;
765 let timer = TimerWheel::with_defaults();
766 let counter = Arc::new(AtomicU32::new(0));
767 let counter_clone = Arc::clone(&counter);
768
769 let task = TimerTask::new_oneshot(
770 Duration::from_millis(100),
771 Some(CallbackWrapper::new(move || {
772 let counter = Arc::clone(&counter_clone);
773 async move {
774 counter.fetch_add(1, Ordering::SeqCst);
775 }
776 })),
777 );
778 let handle = timer.register(task);
779
780 // Immediately cancel
781 // 立即取消
782 let cancel_result = handle.cancel();
783 assert!(cancel_result);
784
785 // Wait for enough time to ensure timer does not trigger
786 // 等待足够时间确保定时器不触发
787 tokio::time::sleep(Duration::from_millis(200)).await;
788 assert_eq!(counter.load(Ordering::SeqCst), 0);
789 }
790
791 #[tokio::test]
792 async fn test_postpone_timer() {
793 use std::sync::Arc;
794 let timer = TimerWheel::with_defaults();
795 let counter = Arc::new(AtomicU32::new(0));
796 let counter_clone = Arc::clone(&counter);
797
798 let task = TimerTask::new_oneshot(
799 Duration::from_millis(50),
800 Some(CallbackWrapper::new(move || {
801 let counter = Arc::clone(&counter_clone);
802 async move {
803 counter.fetch_add(1, Ordering::SeqCst);
804 }
805 })),
806 );
807 let task_id = task.get_id();
808 let handle = timer.register(task);
809
810 // Postpone task to 150ms
811 // 推迟任务到 150ms
812 let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
813 assert!(postponed);
814
815 // Wait for original time 50ms, task should not trigger
816 // 等待原始时间 50ms,任务不应触发
817 tokio::time::sleep(Duration::from_millis(70)).await;
818 assert_eq!(counter.load(Ordering::SeqCst), 0);
819
820 // Wait for new trigger time (from postponed start, need to wait about 150ms)
821 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
822 let (rx, _handle) = handle.into_parts();
823 let result = match rx {
824 crate::task::CompletionReceiver::OneShot(receiver) => {
825 tokio::time::timeout(
826 Duration::from_millis(200),
827 receiver.wait()
828 ).await
829 },
830 _ => panic!("Expected OneShot completion receiver"),
831 };
832 assert!(result.is_ok());
833
834 // Wait for callback to execute
835 // 等待回调执行
836 tokio::time::sleep(Duration::from_millis(20)).await;
837 assert_eq!(counter.load(Ordering::SeqCst), 1);
838 }
839
840 #[tokio::test]
841 async fn test_postpone_with_callback() {
842 use std::sync::Arc;
843 let timer = TimerWheel::with_defaults();
844 let counter = Arc::new(AtomicU32::new(0));
845 let counter_clone1 = Arc::clone(&counter);
846 let counter_clone2 = Arc::clone(&counter);
847
848 // Create task, original callback adds 1
849 let task = TimerTask::new_oneshot(
850 Duration::from_millis(50),
851 Some(CallbackWrapper::new(move || {
852 let counter = Arc::clone(&counter_clone1);
853 async move {
854 counter.fetch_add(1, Ordering::SeqCst);
855 }
856 })),
857 );
858 let task_id = task.get_id();
859 let handle = timer.register(task);
860
861 // Postpone task and replace callback, new callback adds 10
862 // 推迟任务并替换回调,新回调增加 10
863 let postponed = timer.postpone(
864 task_id,
865 Duration::from_millis(100),
866 Some(CallbackWrapper::new(move || {
867 let counter = Arc::clone(&counter_clone2);
868 async move {
869 counter.fetch_add(10, Ordering::SeqCst);
870 }
871 })),
872 );
873 assert!(postponed);
874
875 // Wait for task to trigger (after postponed, need to wait 100ms, plus margin)
876 // 等待任务触发(推迟后,需要等待 100ms,加上余量)
877 let (rx, _handle) = handle.into_parts();
878 let result = match rx {
879 crate::task::CompletionReceiver::OneShot(receiver) => {
880 tokio::time::timeout(
881 Duration::from_millis(200),
882 receiver.wait()
883 ).await
884 },
885 _ => panic!("Expected OneShot completion receiver"),
886 };
887 assert!(result.is_ok());
888
889 // Wait for callback to execute
890 // 等待回调执行
891 tokio::time::sleep(Duration::from_millis(20)).await;
892
893 // Verify new callback is executed (increased 10 instead of 1)
894 // 验证新回调已执行(增加 10 而不是 1)
895 assert_eq!(counter.load(Ordering::SeqCst), 10);
896 }
897
898 #[tokio::test]
899 async fn test_postpone_nonexistent_timer() {
900 let timer = TimerWheel::with_defaults();
901
902 // Try to postpone nonexistent task
903 // 尝试推迟一个不存在的任务
904 let fake_task = TimerTask::new_oneshot(Duration::from_millis(50), None);
905 let fake_task_id = fake_task.get_id();
906 // Do not register this task
907 // 不注册这个任务
908 let postponed = timer.postpone(fake_task_id, Duration::from_millis(100), None);
909 assert!(!postponed);
910 }
911
912 #[tokio::test]
913 async fn test_postpone_batch() {
914 use std::sync::Arc;
915 let timer = TimerWheel::with_defaults();
916 let counter = Arc::new(AtomicU32::new(0));
917
918 // Create 3 tasks
919 // 创建 3 个任务
920 let mut task_ids = Vec::new();
921 for _ in 0..3 {
922 let counter_clone = Arc::clone(&counter);
923 let task = TimerTask::new_oneshot(
924 Duration::from_millis(50),
925 Some(CallbackWrapper::new(move || {
926 let counter = Arc::clone(&counter_clone);
927 async move {
928 counter.fetch_add(1, Ordering::SeqCst);
929 }
930 })),
931 );
932 task_ids.push((task.get_id(), Duration::from_millis(150)));
933 timer.register(task);
934 }
935
936 // Batch postpone
937 // 批量推迟
938 let postponed = timer.postpone_batch(task_ids);
939 assert_eq!(postponed, 3);
940
941 // Wait for original time 50ms, task should not trigger
942 // 等待原始时间 50ms,任务不应触发
943 tokio::time::sleep(Duration::from_millis(70)).await;
944 assert_eq!(counter.load(Ordering::SeqCst), 0);
945
946 // Wait for new trigger time (from postponed start, need to wait about 150ms)
947 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
948 tokio::time::sleep(Duration::from_millis(200)).await;
949
950 // Wait for callback to execute
951 // 等待回调执行
952 tokio::time::sleep(Duration::from_millis(20)).await;
953 assert_eq!(counter.load(Ordering::SeqCst), 3);
954 }
955
956 #[tokio::test]
957 async fn test_postpone_batch_with_callbacks() {
958 use std::sync::Arc;
959 let timer = TimerWheel::with_defaults();
960 let counter = Arc::new(AtomicU32::new(0));
961
962 // Create 3 tasks
963 // 创建 3 个任务
964 let mut task_ids = Vec::new();
965 for _ in 0..3 {
966 let task = TimerTask::new_oneshot(
967 Duration::from_millis(50),
968 None
969 );
970 task_ids.push(task.get_id());
971 timer.register(task);
972 }
973
974 // Batch postpone and replace callbacks
975 // 批量推迟并替换回调
976 let updates: Vec<_> = task_ids
977 .into_iter()
978 .map(|id| {
979 let counter_clone = Arc::clone(&counter);
980 (id, Duration::from_millis(150), Some(CallbackWrapper::new(move || {
981 let counter = Arc::clone(&counter_clone);
982 async move {
983 counter.fetch_add(1, Ordering::SeqCst);
984 }
985 })))
986 })
987 .collect();
988
989 // Batch postpone and replace callbacks
990 // 批量推迟并替换回调
991 let postponed = timer.postpone_batch_with_callbacks(updates);
992 assert_eq!(postponed, 3);
993
994 // Wait for original time 50ms, task should not trigger
995 // 等待原始时间 50ms,任务不应触发
996 tokio::time::sleep(Duration::from_millis(70)).await;
997 assert_eq!(counter.load(Ordering::SeqCst), 0);
998
999 // Wait for new trigger time (from postponed start, need to wait about 150ms)
1000 // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1001 tokio::time::sleep(Duration::from_millis(200)).await;
1002
1003 // Wait for callback to execute
1004 // 等待回调执行
1005 tokio::time::sleep(Duration::from_millis(20)).await;
1006 assert_eq!(counter.load(Ordering::SeqCst), 3);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_postpone_keeps_completion_receiver_valid() {
1011 use std::sync::Arc;
1012 let timer = TimerWheel::with_defaults();
1013 let counter = Arc::new(AtomicU32::new(0));
1014 let counter_clone = Arc::clone(&counter);
1015
1016 let task = TimerTask::new_oneshot(
1017 Duration::from_millis(50),
1018 Some(CallbackWrapper::new(move || {
1019 let counter = Arc::clone(&counter_clone);
1020 async move {
1021 counter.fetch_add(1, Ordering::SeqCst);
1022 }
1023 })),
1024 );
1025 let task_id = task.get_id();
1026 let handle = timer.register(task);
1027
1028 // Postpone task
1029 // 推迟任务
1030 timer.postpone(task_id, Duration::from_millis(100), None);
1031
1032 // Verify original completion_receiver is still valid (after postponed, need to wait 100ms, plus margin)
1033 // 验证原始完成接收器是否仍然有效(推迟后,需要等待 100ms,加上余量)
1034 let (rx, _handle) = handle.into_parts();
1035 let result = match rx {
1036 crate::task::CompletionReceiver::OneShot(receiver) => {
1037 tokio::time::timeout(
1038 Duration::from_millis(200),
1039 receiver.wait()
1040 ).await
1041 },
1042 _ => panic!("Expected OneShot completion receiver"),
1043 };
1044 assert!(result.is_ok(), "Completion receiver should still work after postpone");
1045
1046 // Wait for callback to execute
1047 tokio::time::sleep(Duration::from_millis(20)).await;
1048 assert_eq!(counter.load(Ordering::SeqCst), 1);
1049 }
1050
1051 // ==================== Periodic Task Tests ====================
1052 // ==================== 周期任务测试 ====================
1053
1054 #[tokio::test]
1055 async fn test_periodic_basic() {
1056 use std::sync::Arc;
1057 let timer = TimerWheel::with_defaults();
1058 let counter = Arc::new(AtomicU32::new(0));
1059 let counter_clone = Arc::clone(&counter);
1060
1061 // Create periodic task that triggers every 50ms
1062 // 创建每 50ms 触发一次的周期任务
1063 let task = TimerTask::new_periodic(
1064 Duration::from_millis(50), // initial delay
1065 Duration::from_millis(50), // interval
1066 Some(CallbackWrapper::new(move || {
1067 let counter = Arc::clone(&counter_clone);
1068 async move {
1069 counter.fetch_add(1, Ordering::SeqCst);
1070 }
1071 })),
1072 None, // use default buffer size
1073 );
1074 let (mut rx, _handle) = timer.register(task).into_parts();
1075
1076 // Wait for 3 periodic executions (50ms initial + 50ms * 2)
1077 // 等待 3 次周期执行(50ms 初始 + 50ms * 2)
1078 tokio::time::sleep(Duration::from_millis(200)).await;
1079
1080 // Verify callback was triggered multiple times
1081 // 验证回调被多次触发
1082 let count = counter.load(Ordering::SeqCst);
1083 assert!(count >= 3, "Expected at least 3 executions, got {}", count);
1084
1085 // Verify completion notifications were sent
1086 // 验证完成通知已发送
1087 match rx {
1088 crate::task::CompletionReceiver::Periodic(ref mut receiver) => {
1089 let mut notification_count = 0;
1090 while let Ok(completion) = receiver.try_recv() {
1091 assert_eq!(completion, crate::task::TaskCompletion::Called);
1092 notification_count += 1;
1093 }
1094 assert!(notification_count >= 3, "Expected at least 3 notifications, got {}", notification_count);
1095 },
1096 _ => panic!("Expected Periodic completion receiver"),
1097 }
1098 }
1099
1100 #[tokio::test]
1101 async fn test_periodic_cancel() {
1102 use std::sync::Arc;
1103 let timer = TimerWheel::with_defaults();
1104 let counter = Arc::new(AtomicU32::new(0));
1105 let counter_clone = Arc::clone(&counter);
1106
1107 // Create periodic task
1108 // 创建周期任务
1109 let task = TimerTask::new_periodic(
1110 Duration::from_millis(50),
1111 Duration::from_millis(50),
1112 Some(CallbackWrapper::new(move || {
1113 let counter = Arc::clone(&counter_clone);
1114 async move {
1115 counter.fetch_add(1, Ordering::SeqCst);
1116 }
1117 })),
1118 None,
1119 );
1120 let handle = timer.register(task);
1121
1122 // Wait for first execution
1123 // 等待第一次执行
1124 tokio::time::sleep(Duration::from_millis(80)).await;
1125 let count_before_cancel = counter.load(Ordering::SeqCst);
1126 assert!(count_before_cancel >= 1, "Expected at least 1 execution before cancel");
1127
1128 // Cancel periodic task
1129 // 取消周期任务
1130 let cancelled = handle.cancel();
1131 assert!(cancelled);
1132
1133 // Wait some more time and verify task stopped
1134 // 等待更多时间并验证任务已停止
1135 tokio::time::sleep(Duration::from_millis(150)).await;
1136 let count_after_cancel = counter.load(Ordering::SeqCst);
1137
1138 // Count should not increase significantly after cancellation
1139 // 取消后计数不应该显著增加
1140 assert!(
1141 count_after_cancel - count_before_cancel <= 1,
1142 "Task should stop after cancel, before: {}, after: {}",
1143 count_before_cancel,
1144 count_after_cancel
1145 );
1146 }
1147
1148 #[tokio::test]
1149 async fn test_periodic_cancel_notification() {
1150 use std::sync::Arc;
1151 let timer = TimerWheel::with_defaults();
1152 let counter = Arc::new(AtomicU32::new(0));
1153 let counter_clone = Arc::clone(&counter);
1154
1155 // Create periodic task
1156 // 创建周期任务
1157 let task = TimerTask::new_periodic(
1158 Duration::from_millis(50),
1159 Duration::from_millis(50),
1160 Some(CallbackWrapper::new(move || {
1161 let counter = Arc::clone(&counter_clone);
1162 async move {
1163 counter.fetch_add(1, Ordering::SeqCst);
1164 }
1165 })),
1166 None,
1167 );
1168 let (mut rx, handle) = timer.register(task).into_parts();
1169
1170 // Wait for first execution
1171 // 等待第一次执行
1172 tokio::time::sleep(Duration::from_millis(80)).await;
1173
1174 // Cancel the task
1175 // 取消任务
1176 let cancelled = handle.cancel();
1177 assert!(cancelled);
1178
1179 // Wait a bit for the cancel notification
1180 // 等待取消通知
1181 tokio::time::sleep(Duration::from_millis(50)).await;
1182
1183 // Verify we received cancellation notification
1184 // 验证收到取消通知
1185 match rx {
1186 crate::task::CompletionReceiver::Periodic(ref mut receiver) => {
1187 let mut found_cancelled = false;
1188 while let Ok(completion) = receiver.try_recv() {
1189 if completion == crate::task::TaskCompletion::Cancelled {
1190 found_cancelled = true;
1191 break;
1192 }
1193 }
1194 assert!(found_cancelled, "Expected to receive Cancelled notification");
1195 },
1196 _ => panic!("Expected Periodic completion receiver"),
1197 }
1198 }
1199
1200 #[tokio::test]
1201 async fn test_periodic_postpone() {
1202 use std::sync::Arc;
1203 let timer = TimerWheel::with_defaults();
1204 let counter = Arc::new(AtomicU32::new(0));
1205 let counter_clone = Arc::clone(&counter);
1206
1207 // Create periodic task with 50ms initial delay and 50ms interval
1208 // 创建初始延迟 50ms 和间隔 50ms 的周期任务
1209 let task = TimerTask::new_periodic(
1210 Duration::from_millis(50),
1211 Duration::from_millis(50),
1212 Some(CallbackWrapper::new(move || {
1213 let counter = Arc::clone(&counter_clone);
1214 async move {
1215 counter.fetch_add(1, Ordering::SeqCst);
1216 }
1217 })),
1218 None,
1219 );
1220 let task_id = task.get_id();
1221 let _handle = timer.register(task);
1222
1223 // Immediately postpone the task to 150ms
1224 // 立即推迟任务到 150ms
1225 let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
1226 assert!(postponed);
1227
1228 // Wait original time, should not trigger
1229 // 等待原始时间,不应触发
1230 tokio::time::sleep(Duration::from_millis(80)).await;
1231 assert_eq!(counter.load(Ordering::SeqCst), 0, "Task should not trigger at original time");
1232
1233 // Wait for postponed time
1234 // 等待推迟后的时间
1235 tokio::time::sleep(Duration::from_millis(150)).await;
1236
1237 // Verify task started executing
1238 // 验证任务已开始执行
1239 let count = counter.load(Ordering::SeqCst);
1240 assert!(count >= 1, "Task should trigger after postpone, got count: {}", count);
1241 }
1242
1243 #[tokio::test]
1244 async fn test_periodic_postpone_with_callback() {
1245 use std::sync::Arc;
1246 let timer = TimerWheel::with_defaults();
1247 let counter = Arc::new(AtomicU32::new(0));
1248 let counter_clone1 = Arc::clone(&counter);
1249 let counter_clone2 = Arc::clone(&counter);
1250
1251 // Create periodic task, original callback adds 1
1252 // 创建周期任务,原始回调增加 1
1253 let task = TimerTask::new_periodic(
1254 Duration::from_millis(50),
1255 Duration::from_millis(50),
1256 Some(CallbackWrapper::new(move || {
1257 let counter = Arc::clone(&counter_clone1);
1258 async move {
1259 counter.fetch_add(1, Ordering::SeqCst);
1260 }
1261 })),
1262 None,
1263 );
1264 let task_id = task.get_id();
1265 let _handle = timer.register(task);
1266
1267 // Postpone task and replace callback, new callback adds 10
1268 // 推迟任务并替换回调,新回调增加 10
1269 let postponed = timer.postpone(
1270 task_id,
1271 Duration::from_millis(100),
1272 Some(CallbackWrapper::new(move || {
1273 let counter = Arc::clone(&counter_clone2);
1274 async move {
1275 counter.fetch_add(10, Ordering::SeqCst);
1276 }
1277 })),
1278 );
1279 assert!(postponed);
1280
1281 // Wait for task to trigger
1282 // 等待任务触发
1283 tokio::time::sleep(Duration::from_millis(200)).await;
1284
1285 let count = counter.load(Ordering::SeqCst);
1286 // Should be at least 10 (new callback), not 1 (old callback)
1287 // 应该至少是 10(新回调),而不是 1(旧回调)
1288 assert!(count >= 10, "New callback should be used, got count: {}", count);
1289
1290 // Verify it's a multiple of 10 (new callback)
1291 // 验证是 10 的倍数(新回调)
1292 assert_eq!(count % 10, 0, "Count should be multiple of 10, got: {}", count);
1293 }
1294
1295 #[tokio::test]
1296 async fn test_periodic_batch_cancel() {
1297 use std::sync::Arc;
1298 let timer = TimerWheel::with_defaults();
1299 let counter = Arc::new(AtomicU32::new(0));
1300
1301 // Create multiple periodic tasks
1302 // 创建多个周期任务
1303 let mut task_ids = Vec::new();
1304 for _ in 0..3 {
1305 let counter_clone = Arc::clone(&counter);
1306 let task = TimerTask::new_periodic(
1307 Duration::from_millis(50),
1308 Duration::from_millis(50),
1309 Some(CallbackWrapper::new(move || {
1310 let counter = Arc::clone(&counter_clone);
1311 async move {
1312 counter.fetch_add(1, Ordering::SeqCst);
1313 }
1314 })),
1315 None,
1316 );
1317 task_ids.push(task.get_id());
1318 timer.register(task);
1319 }
1320
1321 // Wait for first execution
1322 // 等待第一次执行
1323 tokio::time::sleep(Duration::from_millis(80)).await;
1324 let count_before_cancel = counter.load(Ordering::SeqCst);
1325 assert!(count_before_cancel >= 3, "Expected at least 3 executions before cancel");
1326
1327 // Batch cancel
1328 // 批量取消
1329 let cancelled = timer.cancel_batch(&task_ids);
1330 assert_eq!(cancelled, 3);
1331
1332 // Wait and verify tasks stopped
1333 // 等待并验证任务已停止
1334 tokio::time::sleep(Duration::from_millis(150)).await;
1335 let count_after_cancel = counter.load(Ordering::SeqCst);
1336
1337 // Count should not increase significantly after cancellation
1338 // 取消后计数不应该显著增加
1339 assert!(
1340 count_after_cancel - count_before_cancel <= 3,
1341 "Tasks should stop after cancel, before: {}, after: {}",
1342 count_before_cancel,
1343 count_after_cancel
1344 );
1345 }
1346
1347 #[tokio::test]
1348 async fn test_periodic_batch_postpone() {
1349 use std::sync::Arc;
1350 let timer = TimerWheel::with_defaults();
1351 let counter = Arc::new(AtomicU32::new(0));
1352
1353 // Create 3 periodic tasks
1354 // 创建 3 个周期任务
1355 let mut postpone_updates = Vec::new();
1356 for _ in 0..3 {
1357 let counter_clone = Arc::clone(&counter);
1358 let task = TimerTask::new_periodic(
1359 Duration::from_millis(50),
1360 Duration::from_millis(50),
1361 Some(CallbackWrapper::new(move || {
1362 let counter = Arc::clone(&counter_clone);
1363 async move {
1364 counter.fetch_add(1, Ordering::SeqCst);
1365 }
1366 })),
1367 None,
1368 );
1369 postpone_updates.push((task.get_id(), Duration::from_millis(150)));
1370 timer.register(task);
1371 }
1372
1373 // Batch postpone
1374 // 批量推迟
1375 let postponed = timer.postpone_batch(postpone_updates);
1376 assert_eq!(postponed, 3);
1377
1378 // Wait original time, should not trigger
1379 // 等待原始时间,不应触发
1380 tokio::time::sleep(Duration::from_millis(80)).await;
1381 assert_eq!(counter.load(Ordering::SeqCst), 0, "Tasks should not trigger at original time");
1382
1383 // Wait for postponed time
1384 // 等待推迟后的时间
1385 tokio::time::sleep(Duration::from_millis(150)).await;
1386
1387 let count = counter.load(Ordering::SeqCst);
1388 assert!(count >= 3, "All tasks should trigger after postpone, got count: {}", count);
1389 }
1390
1391 #[tokio::test]
1392 async fn test_periodic_completion_receiver() {
1393 use std::sync::Arc;
1394 let timer = TimerWheel::with_defaults();
1395 let counter = Arc::new(AtomicU32::new(0));
1396 let counter_clone = Arc::clone(&counter);
1397
1398 // Create periodic task
1399 // 创建周期任务
1400 let task = TimerTask::new_periodic(
1401 Duration::from_millis(50),
1402 Duration::from_millis(50),
1403 Some(CallbackWrapper::new(move || {
1404 let counter = Arc::clone(&counter_clone);
1405 async move {
1406 counter.fetch_add(1, Ordering::SeqCst);
1407 }
1408 })),
1409 None,
1410 );
1411 let (mut rx, _handle) = timer.register(task).into_parts();
1412
1413 // Continuously receive completion notifications
1414 // 持续接收完成通知
1415 let mut notification_count = 0;
1416 match rx {
1417 crate::task::CompletionReceiver::Periodic(ref mut receiver) => {
1418 // Receive notifications for ~200ms
1419 // 接收约 200ms 的通知
1420 let timeout = tokio::time::sleep(Duration::from_millis(200));
1421 tokio::pin!(timeout);
1422
1423 loop {
1424 tokio::select! {
1425 _ = &mut timeout => break,
1426 result = receiver.recv() => {
1427 match result {
1428 Some(completion) => {
1429 assert_eq!(completion, crate::task::TaskCompletion::Called);
1430 notification_count += 1;
1431 }
1432 None => break,
1433 }
1434 }
1435 }
1436 }
1437 },
1438 _ => panic!("Expected Periodic completion receiver"),
1439 }
1440
1441 // Should receive at least 3 notifications
1442 // 应该至少收到 3 次通知
1443 assert!(notification_count >= 3, "Expected at least 3 notifications, got {}", notification_count);
1444 }
1445
1446 #[tokio::test]
1447 async fn test_periodic_batch_register() {
1448 use std::sync::Arc;
1449 let timer = TimerWheel::with_defaults();
1450 let counter = Arc::new(AtomicU32::new(0));
1451
1452 // Create batch of periodic tasks
1453 // 创建批量周期任务
1454 let tasks: Vec<_> = (0..3)
1455 .map(|_| {
1456 let counter_clone = Arc::clone(&counter);
1457 TimerTask::new_periodic(
1458 Duration::from_millis(50),
1459 Duration::from_millis(50),
1460 Some(CallbackWrapper::new(move || {
1461 let counter = Arc::clone(&counter_clone);
1462 async move {
1463 counter.fetch_add(1, Ordering::SeqCst);
1464 }
1465 })),
1466 None,
1467 )
1468 })
1469 .collect();
1470
1471 let batch_handle = timer.register_batch(tasks);
1472 assert_eq!(batch_handle.len(), 3);
1473
1474 // Wait for executions
1475 // 等待执行
1476 tokio::time::sleep(Duration::from_millis(200)).await;
1477
1478 let count = counter.load(Ordering::SeqCst);
1479 // Each task should execute at least 3 times, total at least 9
1480 // 每个任务应至少执行 3 次,总共至少 9 次
1481 assert!(count >= 9, "Expected at least 9 total executions, got {}", count);
1482 }
1483}
1484