kestrel_timer/timer.rs
1use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
2use crate::task::{CallbackWrapper, TaskId, TaskCompletionReason};
3use crate::wheel::Wheel;
4use parking_lot::Mutex;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::oneshot;
8use tokio::task::JoinHandle;
9
10/// 完成通知接收器,用于接收定时器完成通知
11/// (Completion receiver for receiving timer completion notifications)
12pub struct CompletionReceiver(pub oneshot::Receiver<TaskCompletionReason>);
13
14/// 定时器句柄,用于管理定时器的生命周期
15/// (Timer handle for managing timer lifecycle)
16///
17/// 注意:此类型不实现 Clone,以防止重复取消同一个定时器。每个定时器只应有一个所有者。
18/// Note: This type does not implement Clone to prevent duplicate cancellation of the same timer. Each timer should have only one owner.
19pub struct TimerHandle {
20 pub(crate) task_id: TaskId,
21 pub(crate) wheel: Arc<Mutex<Wheel>>,
22 pub(crate) completion_rx: CompletionReceiver,
23}
24
25impl TimerHandle {
26 pub(crate) fn new(task_id: TaskId, wheel: Arc<Mutex<Wheel>>, completion_rx: oneshot::Receiver<TaskCompletionReason>) -> Self {
27 Self { task_id, wheel, completion_rx: CompletionReceiver(completion_rx) }
28 }
29
30 /// 取消定时器 (Cancel the timer)
31 ///
32 /// # 返回 (Returns)
33 /// 如果任务存在且成功取消返回 true,否则返回 false
34 /// (Returns true if task exists and is successfully cancelled, otherwise false)
35 ///
36 /// # 示例 (Examples)
37 /// ```no_run
38 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
39 /// # use std::time::Duration;
40 /// #
41 /// # #[tokio::main]
42 /// # async fn main() {
43 /// let timer = TimerWheel::with_defaults();
44 /// let callback = Some(CallbackWrapper::new(|| async {}));
45 /// let task = TimerWheel::create_task(Duration::from_secs(1), callback);
46 /// let handle = timer.register(task);
47 ///
48 /// // 取消定时器
49 /// let success = handle.cancel();
50 /// println!("Canceled successfully: {}", success);
51 /// # }
52 /// ```
53 pub fn cancel(&self) -> bool {
54 let mut wheel = self.wheel.lock();
55 wheel.cancel(self.task_id)
56 }
57
58 /// 获取完成通知接收器的可变引用
59 /// (Get mutable reference to completion receiver)
60 ///
61 /// # 示例 (Examples)
62 /// ```no_run
63 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
64 /// # use std::time::Duration;
65 /// #
66 /// # #[tokio::main]
67 /// # async fn main() {
68 /// let timer = TimerWheel::with_defaults();
69 /// let callback = Some(CallbackWrapper::new(|| async {
70 /// println!("Timer fired!");
71 /// }));
72 /// let task = TimerWheel::create_task(Duration::from_secs(1), callback);
73 /// let handle = timer.register(task);
74 ///
75 /// // 等待定时器完成(使用 into_completion_receiver 消耗句柄)
76 /// // (Wait for timer completion (consume handle using into_completion_receiver))
77 /// handle.into_completion_receiver().0.await.ok();
78 /// println!("Timer completed!");
79 /// # }
80 /// ```
81 pub fn completion_receiver(&mut self) -> &mut CompletionReceiver {
82 &mut self.completion_rx
83 }
84
85 /// 消耗句柄,返回完成通知接收器
86 /// (Consume handle and return completion receiver)
87 ///
88 /// # 示例 (Examples)
89 /// ```no_run
90 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
91 /// # use std::time::Duration;
92 /// #
93 /// # #[tokio::main]
94 /// # async fn main() {
95 /// let timer = TimerWheel::with_defaults();
96 /// let callback = Some(CallbackWrapper::new(|| async {
97 /// println!("Timer fired!");
98 /// }));
99 /// let task = TimerWheel::create_task(Duration::from_secs(1), callback);
100 /// let handle = timer.register(task);
101 ///
102 /// // 等待定时器完成
103 /// // (Wait for timer completion)
104 /// handle.into_completion_receiver().0.await.ok();
105 /// println!("Timer completed!");
106 /// # }
107 /// ```
108 pub fn into_completion_receiver(self) -> CompletionReceiver {
109 self.completion_rx
110 }
111}
112
113/// 批量定时器句柄,用于管理批量调度的定时器
114/// (Batch timer handle for managing batch-scheduled timers)
115///
116/// 通过共享 Wheel 引用减少内存开销,同时提供批量操作和迭代器访问能力。
117/// (Reduces memory overhead through shared Wheel reference while providing batch operations and iterator access)
118///
119/// 注意:此类型不实现 Clone,以防止重复取消同一批定时器。如需访问单个定时器句柄,请使用 `into_iter()` 或 `into_handles()` 进行转换。
120/// Note: This type does not implement Clone to prevent duplicate cancellation of the same batch of timers. Use `into_iter()` or `into_handles()` to access individual timer handles.
121pub struct BatchHandle {
122 pub(crate) task_ids: Vec<TaskId>,
123 pub(crate) wheel: Arc<Mutex<Wheel>>,
124 pub(crate) completion_rxs: Vec<oneshot::Receiver<TaskCompletionReason>>,
125}
126
127impl BatchHandle {
128 pub(crate) fn new(task_ids: Vec<TaskId>, wheel: Arc<Mutex<Wheel>>, completion_rxs: Vec<oneshot::Receiver<TaskCompletionReason>>) -> Self {
129 Self { task_ids, wheel, completion_rxs }
130 }
131
132 /// 批量取消所有定时器 (Cancel all timers in batch)
133 ///
134 /// # 返回 (Returns)
135 /// 成功取消的任务数量 (Number of successfully cancelled tasks)
136 ///
137 /// # 示例 (Examples)
138 /// ```no_run
139 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
140 /// # use std::time::Duration;
141 /// #
142 /// # #[tokio::main]
143 /// # async fn main() {
144 /// let timer = TimerWheel::with_defaults();
145 /// let delays: Vec<Duration> = (0..10)
146 /// .map(|_| Duration::from_secs(1))
147 /// .collect();
148 /// let tasks = TimerWheel::create_batch(delays);
149 /// let batch = timer.register_batch(tasks);
150 ///
151 /// let cancelled = batch.cancel_all();
152 /// println!("Canceled {} timers", cancelled);
153 /// # }
154 /// ```
155 pub fn cancel_all(self) -> usize {
156 let mut wheel = self.wheel.lock();
157 wheel.cancel_batch(&self.task_ids)
158 }
159
160 /// 将批量句柄转换为单个定时器句柄的 Vec (Convert batch handle to Vec of individual timer handles)
161 ///
162 /// 消耗 BatchHandle,为每个任务创建独立的 TimerHandle。
163 /// (Consumes BatchHandle and creates independent TimerHandle for each task)
164 ///
165 /// # 示例 (Examples)
166 /// ```no_run
167 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
168 /// # use std::time::Duration;
169 /// #
170 /// # #[tokio::main]
171 /// # async fn main() {
172 /// let timer = TimerWheel::with_defaults();
173 /// let delays: Vec<Duration> = (0..3)
174 /// .map(|_| Duration::from_secs(1))
175 /// .collect();
176 /// let tasks = TimerWheel::create_batch(delays);
177 /// let batch = timer.register_batch(tasks);
178 ///
179 /// // 转换为独立的句柄
180 /// // (Convert to individual handles)
181 /// let handles = batch.into_handles();
182 /// for handle in handles {
183 /// // 可以单独操作每个句柄
184 /// // (Can operate each handle individually)
185 /// }
186 /// # }
187 /// ```
188 pub fn into_handles(self) -> Vec<TimerHandle> {
189 self.task_ids
190 .into_iter()
191 .zip(self.completion_rxs.into_iter())
192 .map(|(task_id, rx)| {
193 TimerHandle::new(task_id, self.wheel.clone(), rx)
194 })
195 .collect()
196 }
197
198 /// 获取批量任务的数量 (Get the number of batch tasks)
199 pub fn len(&self) -> usize {
200 self.task_ids.len()
201 }
202
203 /// 检查批量任务是否为空 (Check if batch tasks are empty)
204 pub fn is_empty(&self) -> bool {
205 self.task_ids.is_empty()
206 }
207
208 /// 获取所有任务 ID 的引用 (Get reference to all task IDs)
209 pub fn task_ids(&self) -> &[TaskId] {
210 &self.task_ids
211 }
212
213 /// 获取所有完成通知接收器的引用 (Get reference to all completion receivers)
214 ///
215 /// # 返回 (Returns)
216 /// 所有任务的完成通知接收器列表引用 (Reference to list of completion receivers for all tasks)
217 pub fn completion_receivers(&mut self) -> &mut Vec<oneshot::Receiver<TaskCompletionReason>> {
218 &mut self.completion_rxs
219 }
220
221 /// 消耗句柄,返回所有完成通知接收器 (Consume handle and return all completion receivers)
222 ///
223 /// # 返回 (Returns)
224 /// 所有任务的完成通知接收器列表 (List of completion receivers for all tasks)
225 ///
226 /// # 示例 (Examples)
227 /// ```no_run
228 /// # use kestrel_timer::{TimerWheel, CallbackWrapper};
229 /// # use std::time::Duration;
230 /// #
231 /// # #[tokio::main]
232 /// # async fn main() {
233 /// let timer = TimerWheel::with_defaults();
234 /// let delays: Vec<Duration> = (0..3)
235 /// .map(|_| Duration::from_secs(1))
236 /// .collect();
237 /// let tasks = TimerWheel::create_batch(delays);
238 /// let batch = timer.register_batch(tasks);
239 ///
240 /// // 获取所有完成通知接收器
241 /// // (Get all completion receivers)
242 /// let receivers = batch.into_completion_receivers();
243 /// for rx in receivers {
244 /// tokio::spawn(async move {
245 /// if rx.await.is_ok() {
246 /// println!("A timer completed!");
247 /// }
248 /// });
249 /// }
250 /// # }
251 /// ```
252 pub fn into_completion_receivers(self) -> Vec<oneshot::Receiver<TaskCompletionReason>> {
253 self.completion_rxs
254 }
255}
256
257/// 实现 IntoIterator,允许直接迭代 BatchHandle
258/// (Implement IntoIterator to allow direct iteration over BatchHandle)
259///
260/// # 示例 (Examples)
261/// ```no_run
262/// # use kestrel_timer::{TimerWheel, CallbackWrapper};
263/// # use std::time::Duration;
264/// #
265/// # #[tokio::main]
266/// # async fn main() {
267/// let timer = TimerWheel::with_defaults();
268/// let delays: Vec<Duration> = (0..3)
269/// .map(|_| Duration::from_secs(1))
270/// .collect();
271/// let tasks = TimerWheel::create_batch(delays);
272/// let batch = timer.register_batch(tasks);
273///
274/// // 直接迭代,每个元素都是独立的 TimerHandle
275/// // (Iterate directly, each element is an independent TimerHandle)
276/// for handle in batch {
277/// // 可以单独操作每个句柄
278/// // (Can operate each handle individually)
279/// }
280/// # }
281/// ```
282impl IntoIterator for BatchHandle {
283 type Item = TimerHandle;
284 type IntoIter = BatchHandleIter;
285
286 fn into_iter(self) -> Self::IntoIter {
287 BatchHandleIter {
288 task_ids: self.task_ids.into_iter(),
289 completion_rxs: self.completion_rxs.into_iter(),
290 wheel: self.wheel,
291 }
292 }
293}
294
295/// BatchHandle 的迭代器 (Iterator for BatchHandle)
296pub struct BatchHandleIter {
297 task_ids: std::vec::IntoIter<TaskId>,
298 completion_rxs: std::vec::IntoIter<oneshot::Receiver<TaskCompletionReason>>,
299 wheel: Arc<Mutex<Wheel>>,
300}
301
302impl Iterator for BatchHandleIter {
303 type Item = TimerHandle;
304
305 fn next(&mut self) -> Option<Self::Item> {
306 match (self.task_ids.next(), self.completion_rxs.next()) {
307 (Some(task_id), Some(rx)) => {
308 Some(TimerHandle::new(task_id, self.wheel.clone(), rx))
309 }
310 _ => None,
311 }
312 }
313
314 fn size_hint(&self) -> (usize, Option<usize>) {
315 self.task_ids.size_hint()
316 }
317}
318
319impl ExactSizeIterator for BatchHandleIter {
320 fn len(&self) -> usize {
321 self.task_ids.len()
322 }
323}
324
325/// 时间轮定时器管理器 (Timing Wheel Timer Manager)
326pub struct TimerWheel {
327 /// 时间轮唯一标识符 (Timing wheel unique identifier)
328
329 /// 时间轮实例(使用 Arc<Mutex> 包装以支持多线程访问)
330 /// (Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access)
331 wheel: Arc<Mutex<Wheel>>,
332
333 /// 后台 tick 循环任务句柄 (Background tick loop task handle)
334 tick_handle: Option<JoinHandle<()>>,
335}
336
337impl TimerWheel {
338 /// 创建新的定时器管理器 (Create a new timer manager)
339 ///
340 /// # 参数 (Parameters)
341 /// - `config`: 时间轮配置(已经过验证)
342 /// (Timing wheel configuration, already validated)
343 ///
344 /// # 示例 (Examples)
345 /// ```no_run
346 /// use kestrel_timer::{TimerWheel, WheelConfig, TimerTask, BatchConfig};
347 /// use std::time::Duration;
348 ///
349 /// #[tokio::main]
350 /// async fn main() {
351 /// let config = WheelConfig::builder()
352 /// .l0_tick_duration(Duration::from_millis(10))
353 /// .l0_slot_count(512)
354 /// .l1_tick_duration(Duration::from_secs(1))
355 /// .l1_slot_count(64)
356 /// .build()
357 /// .unwrap();
358 /// let timer = TimerWheel::new(config, BatchConfig::default());
359 ///
360 /// // 使用两步式 API
361 /// // (Use two-step API)
362 /// let task = TimerWheel::create_task(Duration::from_secs(1), None);
363 /// let handle = timer.register(task);
364 /// }
365 /// ```
366 pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
367 let tick_duration = config.l0_tick_duration;
368 let wheel = Wheel::new(config, batch_config);
369 let wheel = Arc::new(Mutex::new(wheel));
370 let wheel_clone = wheel.clone();
371
372 // 启动后台 tick 循环
373 let tick_handle = tokio::spawn(async move {
374 Self::tick_loop(wheel_clone, tick_duration).await;
375 });
376
377 Self {
378 wheel,
379 tick_handle: Some(tick_handle),
380 }
381 }
382
383 /// 创建带默认配置的定时器管理器(分层模式)
384 /// - L0 层 tick 时长: 10ms, 槽位数量: 512
385 /// - L1 层 tick 时长: 1s, 槽位数量: 64
386 ///
387 /// Create a timer manager with default configuration, hierarchical mode
388 /// - L0 layer tick duration: 10ms, slot count: 512
389 /// - L1 layer tick duration: 1s, slot count: 64
390 ///
391 /// # 参数 (Parameters)
392 /// - `config`: 时间轮配置(已经过验证)
393 /// (Timing wheel configuration, already validated)
394 ///
395 /// # 返回 (Returns)
396 /// 定时器管理器实例 (Timer manager instance)
397 /// (Timer manager instance)
398 ///
399 /// # 示例 (Examples)
400 /// ```no_run
401 /// use kestrel_timer::TimerWheel;
402 ///
403 /// #[tokio::main]
404 /// async fn main() {
405 /// let timer = TimerWheel::with_defaults();
406 /// }
407 /// ```
408 pub fn with_defaults() -> Self {
409 Self::new(WheelConfig::default(), BatchConfig::default())
410 }
411
412 /// 创建与此时间轮绑定的 TimerService(使用默认配置)(Create TimerService bound to this timing wheel with default configuration)
413 ///
414 /// # 返回 (Returns)
415 /// 绑定到此时间轮的 TimerService 实例 (TimerService instance bound to this timing wheel)
416 ///
417 /// # 参数 (Parameters)
418 /// - `service_config`: 服务配置 (Service configuration)
419 ///
420 /// # 示例 (Examples)
421 /// ```no_run
422 /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, ServiceConfig};
423 /// use std::time::Duration;
424 ///
425 ///
426 /// #[tokio::main]
427 /// async fn main() {
428 /// let timer = TimerWheel::with_defaults();
429 /// let mut service = timer.create_service(ServiceConfig::default());
430 ///
431 /// // 使用两步式 API 通过 service 批量调度定时器
432 /// let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
433 /// .map(|_| (Duration::from_millis(100), Some(CallbackWrapper::new(|| async {}))))
434 /// .collect();
435 /// let tasks = TimerService::create_batch_with_callbacks(callbacks);
436 /// service.register_batch(tasks).unwrap();
437 ///
438 /// // 接收超时通知
439 /// let mut rx = service.take_receiver().unwrap();
440 /// while let Some(task_id) = rx.recv().await {
441 /// println!("Task {:?} completed", task_id);
442 /// }
443 /// }
444 /// ```
445 pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
446 crate::service::TimerService::new(self.wheel.clone(), service_config)
447 }
448
449 /// 创建与此时间轮绑定的 TimerService(使用自定义配置)
450 /// (Create TimerService bound to this timing wheel with custom configuration)
451 ///
452 /// # 参数 (Parameters)
453 /// - `config`: 服务配置 (Service configuration)
454 ///
455 /// # 返回 (Returns)
456 /// 绑定到此时间轮的 TimerService 实例 (TimerService instance bound to this timing wheel)
457 ///
458 /// # 示例 (Examples)
459 /// ```no_run
460 /// use kestrel_timer::{TimerWheel, ServiceConfig};
461 ///
462 /// #[tokio::main]
463 /// async fn main() {
464 /// let timer = TimerWheel::with_defaults();
465 /// let config = ServiceConfig::builder()
466 /// .command_channel_capacity(1024)
467 /// .timeout_channel_capacity(2000)
468 /// .build()
469 /// .unwrap();
470 /// let service = timer.create_service_with_config(config);
471 /// }
472 /// ```
473 pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
474 crate::service::TimerService::new(self.wheel.clone(), config)
475 }
476
477 /// 创建定时器任务(申请阶段)(Create timer task, application phase)
478 ///
479 /// # 参数 (Parameters)
480 /// - `delay`: 延迟时间 (Delay duration)
481 /// - `callback`: 实现了 TimerCallback trait 的回调对象 (Callback object implementing TimerCallback trait)
482 ///
483 /// # 返回 (Returns)
484 /// 返回 TimerTask,需要通过 `register()` 注册到时间轮
485 /// (Returns TimerTask that needs to be registered to the timing wheel via `register()`)
486 ///
487 /// # 示例 (Examples)
488 /// ```no_run
489 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
490 /// use std::time::Duration;
491 ///
492 ///
493 /// #[tokio::main]
494 /// async fn main() {
495 /// let timer = TimerWheel::with_defaults();
496 ///
497 /// // 步骤 1: 创建任务
498 /// // (Create task)
499 /// let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
500 /// println!("Timer fired!");
501 /// })));
502 ///
503 /// // 获取任务 ID
504 /// // (Get task ID)
505 /// let task_id = task.get_id();
506 /// println!("Created task: {:?}", task_id);
507 ///
508 /// // 步骤 2: 注册任务
509 /// // (Register task)
510 /// let handle = timer.register(task);
511 /// }
512 /// ```
513 #[inline]
514 pub fn create_task(delay: Duration, callback: Option<CallbackWrapper>) -> crate::task::TimerTask {
515 crate::task::TimerTask::new(delay, callback)
516 }
517
518 /// 批量创建定时器任务(申请阶段)
519 /// (Create batch of timer tasks, application phase)
520 ///
521 /// # 参数 (Parameters)
522 /// - `delays`: 延迟时间列表 (Delay duration list)
523 ///
524 /// # 返回 (Returns)
525 /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册到时间轮
526 /// (Returns TimerTask list that needs to be registered to the timing wheel via `register_batch()`)
527 ///
528 /// # 示例 (Examples)
529 /// ```no_run
530 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
531 /// use std::time::Duration;
532 /// use std::sync::Arc;
533 /// use std::sync::atomic::{AtomicU32, Ordering};
534 ///
535 /// #[tokio::main]
536 /// async fn main() {
537 /// let timer = TimerWheel::with_defaults();
538 /// let counter = Arc::new(AtomicU32::new(0));
539 ///
540 /// // 步骤 1: 批量创建任务
541 /// // (Create batch of tasks)
542 /// let delays: Vec<Duration> = (0..3)
543 /// .map(|_| Duration::from_millis(100))
544 /// .collect();
545 ///
546 /// // 批量创建任务
547 /// // (Create batch of tasks)
548 /// let tasks = TimerWheel::create_batch(delays);
549 /// println!("Created {} tasks", tasks.len());
550 ///
551 /// // 步骤 2: 批量注册任务
552 /// // (Register batch of tasks)
553 /// let batch = timer.register_batch(tasks);
554 /// }
555 /// ```
556 #[inline]
557 pub fn create_batch(delays: Vec<Duration>) -> Vec<crate::task::TimerTask>
558 {
559 delays
560 .into_iter()
561 .map(|delay| crate::task::TimerTask::new(delay, None))
562 .collect()
563 }
564
565 /// 批量创建定时器任务(申请阶段,带回调)
566 /// (Create batch of timer tasks, application phase, with callbacks)
567 ///
568 /// # 参数 (Parameters)
569 /// - `callbacks`: (延迟时间, 回调) 的元组列表 (Tuple list of (delay duration, callback))
570 ///
571 /// # 返回 (Returns)
572 /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册到时间轮
573 /// (Returns TimerTask list that needs to be registered to the timing wheel via `register_batch()`)
574 ///
575 /// # 示例 (Examples)
576 /// ```no_run
577 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
578 /// use std::time::Duration;
579 /// use std::sync::Arc;
580 /// use std::sync::atomic::{AtomicU32, Ordering};
581 ///
582 /// #[tokio::main]
583 /// async fn main() {
584 /// let timer = TimerWheel::with_defaults();
585 /// let counter = Arc::new(AtomicU32::new(0));
586 ///
587 /// // 步骤 1: 批量创建任务
588 /// // (Create batch of tasks)
589 /// let delays: Vec<Duration> = (0..3)
590 /// .map(|_| Duration::from_millis(100))
591 /// .collect();
592 /// let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = delays
593 /// .into_iter()
594 /// .map(|delay| {
595 /// let counter = Arc::clone(&counter);
596 /// let callback = Some(CallbackWrapper::new(move || {
597 /// let counter = Arc::clone(&counter);
598 /// async move {
599 /// counter.fetch_add(1, Ordering::SeqCst);
600 /// }
601 /// }));
602 /// (delay, callback)
603 /// })
604 /// .collect();
605 ///
606 /// // 批量创建任务
607 /// // (Create batch of tasks)
608 /// let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
609 /// println!("Created {} tasks", tasks.len());
610 ///
611 /// // 步骤 2: 批量注册任务
612 /// // (Register batch of tasks)
613 /// let batch = timer.register_batch(tasks);
614 /// }
615 /// ```
616 #[inline]
617 pub fn create_batch_with_callbacks(callbacks: Vec<(Duration, Option<CallbackWrapper>)>) -> Vec<crate::task::TimerTask>
618 {
619 callbacks
620 .into_iter()
621 .map(|(delay, callback)| crate::task::TimerTask::new(delay, callback))
622 .collect()
623 }
624
625 /// 注册定时器任务到时间轮(注册阶段)
626 /// (Register timer task to timing wheel, registration phase)
627 ///
628 /// # 参数 (Parameters)
629 /// - `task`: 通过 `create_task()` 创建的任务 (Task created via `create_task()`)
630 ///
631 /// # 返回 (Returns)
632 /// 返回定时器句柄,可用于取消定时器和接收完成通知
633 /// (Returns timer handle that can be used to cancel timer and receive completion notifications)
634 ///
635 /// # 示例 (Examples)
636 /// ```no_run
637 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
638 ///
639 /// use std::time::Duration;
640 ///
641 /// #[tokio::main]
642 /// async fn main() {
643 /// let timer = TimerWheel::with_defaults();
644 ///
645 /// let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
646 /// println!("Timer fired!");
647 /// })));
648 /// let task_id = task.get_id();
649 ///
650 /// // 注册任务
651 /// // (Register task)
652 /// let handle = timer.register(task);
653 ///
654 /// // 等待定时器完成
655 /// // (Wait for timer completion)
656 /// handle.into_completion_receiver().0.await.ok();
657 /// }
658 /// ```
659 #[inline]
660 pub fn register(&self, task: crate::task::TimerTask) -> TimerHandle {
661 let (completion_tx, completion_rx) = oneshot::channel();
662 let notifier = crate::task::CompletionNotifier(completion_tx);
663
664 let task_id = task.id;
665
666 // 单次加锁完成所有操作
667 {
668 let mut wheel_guard = self.wheel.lock();
669 wheel_guard.insert(task, notifier);
670 }
671
672 TimerHandle::new(task_id, self.wheel.clone(), completion_rx)
673 }
674
675 /// 批量注册定时器任务到时间轮(注册阶段)(Batch register timer tasks to timing wheel, registration phase)
676 ///
677 /// # 参数 (Parameters)
678 /// - `tasks`: 通过 `create_batch()` 创建的任务列表 (List of tasks created via `create_batch()`)
679 ///
680 /// # 返回 (Returns)
681 /// 返回批量定时器句柄 (Returns batch timer handle)
682 ///
683 /// # 示例 (Examples)
684 /// ```no_run
685 /// use kestrel_timer::{TimerWheel, TimerTask};
686 /// use std::time::Duration;
687 ///
688 /// #[tokio::main]
689 /// async fn main() {
690 /// let timer = TimerWheel::with_defaults();
691 ///
692 /// let delays: Vec<Duration> = (0..3)
693 /// .map(|_| Duration::from_secs(1))
694 /// .collect();
695 /// let tasks = TimerWheel::create_batch(delays);
696 ///
697 /// let batch = timer.register_batch(tasks);
698 /// println!("Registered {} timers", batch.len());
699 /// }
700 /// ```
701 #[inline]
702 pub fn register_batch(&self, tasks: Vec<crate::task::TimerTask>) -> BatchHandle {
703 let task_count = tasks.len();
704 let mut completion_rxs = Vec::with_capacity(task_count);
705 let mut task_ids = Vec::with_capacity(task_count);
706 let mut prepared_tasks = Vec::with_capacity(task_count);
707
708 // 步骤1: 准备所有 channels 和 notifiers(无锁)
709 // 优化:使用 for 循环代替 map + collect,避免闭包捕获开销
710 // (Step 1: Prepare all channels and notifiers)
711 // (Optimize: use for loop instead of map + collect, avoid closure capture overhead)
712 for task in tasks {
713 let (completion_tx, completion_rx) = oneshot::channel();
714 let notifier = crate::task::CompletionNotifier(completion_tx);
715
716 task_ids.push(task.id);
717 completion_rxs.push(completion_rx);
718 prepared_tasks.push((task, notifier));
719 }
720
721 // 步骤2: 单次加锁,批量插入
722 // (Step 2: Single lock, batch insert)
723 {
724 let mut wheel_guard = self.wheel.lock();
725 wheel_guard.insert_batch(prepared_tasks);
726 }
727
728 BatchHandle::new(task_ids, self.wheel.clone(), completion_rxs)
729 }
730
731 /// 取消定时器 (Cancel timer)
732 ///
733 /// # 参数 (Parameters)
734 /// - `task_id`: 任务 ID (Task ID)
735 ///
736 /// # 返回 (Returns)
737 /// 如果任务存在且成功取消返回 true,否则返回 false
738 /// (Returns true if task exists and is successfully cancelled, otherwise false)
739 ///
740 /// # 示例 (Examples)
741 /// ```no_run
742 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
743 ///
744 /// use std::time::Duration;
745 ///
746 /// #[tokio::main]
747 /// async fn main() {
748 /// let timer = TimerWheel::with_defaults();
749 ///
750 /// let task = TimerWheel::create_task(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
751 /// println!("Timer fired!");
752 /// })));
753 /// let task_id = task.get_id();
754 /// let _handle = timer.register(task);
755 ///
756 /// // 使用任务 ID 取消
757 /// // (Cancel task using task ID)
758 /// let cancelled = timer.cancel(task_id);
759 /// println!("Canceled successfully: {}", cancelled);
760 /// }
761 /// ```
762 #[inline]
763 pub fn cancel(&self, task_id: TaskId) -> bool {
764 let mut wheel = self.wheel.lock();
765 wheel.cancel(task_id)
766 }
767
768 /// 批量取消定时器 (Batch cancel timers)
769 ///
770 /// # 参数 (Parameters)
771 /// - `task_ids`: 要取消的任务 ID 列表 (List of task IDs to cancel)
772 ///
773 /// # 返回 (Returns)
774 /// 成功取消的任务数量 (Number of successfully cancelled tasks)
775 ///
776 /// # 性能优势 (Performance Advantages)
777 /// - 批量处理减少锁竞争 (Batch processing reduces lock contention)
778 /// - 内部优化批量取消操作 (Internally optimized batch cancellation operation)
779 ///
780 /// # 示例 (Examples)
781 /// ```no_run
782 /// use kestrel_timer::{TimerWheel, TimerTask};
783 /// use std::time::Duration;
784 ///
785 /// #[tokio::main]
786 /// async fn main() {
787 /// let timer = TimerWheel::with_defaults();
788 ///
789 /// // 创建多个定时器
790 /// // (Create multiple timers)
791 /// let task1 = TimerWheel::create_task(Duration::from_secs(10), None);
792 /// let task2 = TimerWheel::create_task(Duration::from_secs(10), None);
793 /// let task3 = TimerWheel::create_task(Duration::from_secs(10), None);
794 ///
795 /// let task_ids = vec![task1.get_id(), task2.get_id(), task3.get_id()];
796 ///
797 /// let _h1 = timer.register(task1);
798 /// let _h2 = timer.register(task2);
799 /// let _h3 = timer.register(task3);
800 ///
801 /// // 批量取消
802 /// // (Batch cancel)
803 /// let cancelled = timer.cancel_batch(&task_ids);
804 /// println!("Canceled {} timers", cancelled);
805 /// }
806 /// ```
807 #[inline]
808 pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
809 let mut wheel = self.wheel.lock();
810 wheel.cancel_batch(task_ids)
811 }
812
813 /// 推迟定时器 (Postpone timer)
814 ///
815 /// # 参数 (Parameters)
816 /// - `task_id`: 要推迟的任务 ID (Task ID to postpone)
817 /// - `new_delay`: 新的延迟时间(从当前时间点重新计算)(New delay duration, recalculated from current time)
818 /// - `callback`: 新的回调函数,传入 `None` 保持原回调不变,传入 `Some` 替换为新回调
819 /// (New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback)
820 ///
821 /// # 返回 (Returns)
822 /// 如果任务存在且成功推迟返回 true,否则返回 false
823 /// (Returns true if task exists and is successfully postponed, otherwise false)
824 ///
825 /// # 注意 (Note)
826 /// - 推迟后任务 ID 保持不变 (Task ID remains unchanged after postponement)
827 /// - 原有的 completion_receiver 仍然有效 (Original completion_receiver remains valid)
828 ///
829 /// # 示例 (Examples)
830 ///
831 /// ## 保持原回调 (Keep original callback)
832 /// ```no_run
833 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
834 /// use std::time::Duration;
835 ///
836 ///
837 /// #[tokio::main]
838 /// async fn main() {
839 /// let timer = TimerWheel::with_defaults();
840 ///
841 /// let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
842 /// println!("Timer fired!");
843 /// })));
844 /// let task_id = task.get_id();
845 /// let _handle = timer.register(task);
846 ///
847 /// // 推迟到 10 秒后触发(保持原回调)
848 /// // (Postpone to 10 seconds after triggering, and keep original callback)
849 /// let success = timer.postpone(task_id, Duration::from_secs(10), None);
850 /// println!("Postponed successfully: {}", success);
851 /// }
852 /// ```
853 ///
854 /// ## 替换为新回调 (Replace with new callback)
855 /// ```no_run
856 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
857 /// use std::time::Duration;
858 ///
859 /// #[tokio::main]
860 /// async fn main() {
861 /// let timer = TimerWheel::with_defaults();
862 ///
863 /// let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
864 /// println!("Original callback!");
865 /// })));
866 /// let task_id = task.get_id();
867 /// let _handle = timer.register(task);
868 ///
869 /// // 推迟到 10 秒后触发(并替换为新回调)
870 /// // (Postpone to 10 seconds after triggering, and replace with new callback)
871 /// let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
872 /// println!("New callback!");
873 /// })));
874 /// println!("Postponed successfully: {}", success);
875 /// }
876 /// ```
877 #[inline]
878 pub fn postpone(
879 &self,
880 task_id: TaskId,
881 new_delay: Duration,
882 callback: Option<CallbackWrapper>,
883 ) -> bool {
884 let mut wheel = self.wheel.lock();
885 wheel.postpone(task_id, new_delay, callback)
886 }
887
888 /// 批量推迟定时器(保持原回调)
889 /// (Batch postpone timers, keep original callbacks)
890 ///
891 /// # 参数 (Parameters)
892 /// - `updates`: (任务ID, 新延迟) 的元组列表
893 /// (List of tuples of (task ID, new delay))
894 ///
895 /// # 返回 (Returns)
896 /// 成功推迟的任务数量
897 /// (Number of successfully postponed tasks)
898 ///
899 /// # 注意 (Note)
900 /// - 此方法会保持所有任务的原回调不变
901 /// (This method keeps all tasks' original callbacks unchanged)
902 /// - 如需替换回调,请使用 `postpone_batch_with_callbacks`
903 /// (Use `postpone_batch_with_callbacks` if you need to replace callbacks)
904 ///
905 /// # 性能优势 (Performance Advantages)
906 /// - 批量处理减少锁竞争
907 /// (Batch processing reduces lock contention)
908 /// - 内部优化批量推迟操作
909 /// (Internally optimized batch postponement operation)
910 ///
911 /// # 示例 (Examples)
912 /// ```no_run
913 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
914 /// use std::time::Duration;
915 ///
916 /// #[tokio::main]
917 /// async fn main() {
918 /// let timer = TimerWheel::with_defaults();
919 ///
920 /// // 创建多个带回调的定时器
921 /// // (Create multiple tasks with callbacks)
922 /// let task1 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
923 /// println!("Task 1 fired!");
924 /// })));
925 /// let task2 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
926 /// println!("Task 2 fired!");
927 /// })));
928 /// let task3 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
929 /// println!("Task 3 fired!");
930 /// })));
931 ///
932 /// let task_ids = vec![
933 /// (task1.get_id(), Duration::from_secs(10)),
934 /// (task2.get_id(), Duration::from_secs(15)),
935 /// (task3.get_id(), Duration::from_secs(20)),
936 /// ];
937 ///
938 /// timer.register(task1);
939 /// timer.register(task2);
940 /// timer.register(task3);
941 ///
942 /// // 批量推迟(保持原回调)
943 /// // (Batch postpone, keep original callbacks)
944 /// let postponed = timer.postpone_batch(task_ids);
945 /// println!("Postponed {} timers", postponed);
946 /// }
947 /// ```
948 #[inline]
949 pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
950 let mut wheel = self.wheel.lock();
951 wheel.postpone_batch(updates)
952 }
953
954 /// 批量推迟定时器(替换回调)
955 /// (Batch postpone timers, replace callbacks)
956 ///
957 /// # 参数 (Parameters)
958 /// - `updates`: (任务ID, 新延迟, 新回调) 的元组列表
959 /// (List of tuples of (task ID, new delay, new callback))
960 ///
961 /// # 返回 (Returns)
962 /// 成功推迟的任务数量
963 /// (Number of successfully postponed tasks)
964 ///
965 /// # 性能优势 (Performance Advantages)
966 /// - 批量处理减少锁竞争
967 /// (Batch processing reduces lock contention)
968 /// - 内部优化批量推迟操作
969 /// (Internally optimized batch postponement operation)
970 ///
971 /// # 示例 (Examples)
972 /// ```no_run
973 /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
974 /// use std::time::Duration;
975 /// use std::sync::Arc;
976 /// use std::sync::atomic::{AtomicU32, Ordering};
977 ///
978 /// #[tokio::main]
979 /// async fn main() {
980 /// let timer = TimerWheel::with_defaults();
981 /// let counter = Arc::new(AtomicU32::new(0));
982 ///
983 /// // 创建多个定时器
984 /// let task1 = TimerWheel::create_task(Duration::from_secs(5), None);
985 /// let task2 = TimerWheel::create_task(Duration::from_secs(5), None);
986 ///
987 /// let id1 = task1.get_id();
988 /// let id2 = task2.get_id();
989 ///
990 /// timer.register(task1);
991 /// timer.register(task2);
992 ///
993 /// // 批量推迟并替换回调
994 /// let updates: Vec<_> = vec![id1, id2]
995 /// .into_iter()
996 /// .map(|id| {
997 /// let counter = Arc::clone(&counter);
998 /// (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
999 /// let counter = Arc::clone(&counter);
1000 /// async move { counter.fetch_add(1, Ordering::SeqCst); }
1001 /// })))
1002 /// })
1003 /// .collect();
1004 /// let postponed = timer.postpone_batch_with_callbacks(updates);
1005 /// println!("Postponed {} timers", postponed);
1006 /// }
1007 /// ```
1008 #[inline]
1009 pub fn postpone_batch_with_callbacks(
1010 &self,
1011 updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
1012 ) -> usize {
1013 let mut wheel = self.wheel.lock();
1014 wheel.postpone_batch_with_callbacks(updates.to_vec())
1015 }
1016
1017 /// 核心 tick 循环
1018 /// (Core tick loop)
1019 async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
1020 let mut interval = tokio::time::interval(tick_duration);
1021 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1022
1023 loop {
1024 interval.tick().await;
1025
1026 // 推进时间轮并获取到期任务
1027 // (Advance timing wheel and get expired tasks)
1028 let expired_tasks = {
1029 let mut wheel_guard = wheel.lock();
1030 wheel_guard.advance()
1031 // (Advance timing wheel and get expired tasks)
1032 };
1033
1034 // 执行到期任务
1035 // (Execute expired tasks)
1036 for task in expired_tasks {
1037 let callback = task.get_callback();
1038
1039 // 移动task的所有权来获取completion_notifier
1040 // (Move task ownership to get completion_notifier)
1041 let notifier = task.completion_notifier;
1042
1043 // 只有注册过的任务才有 notifier
1044 // (Only registered tasks have notifier)
1045 if let Some(notifier) = notifier {
1046 // 在独立的 tokio 任务中执行回调,并在回调完成后发送通知
1047 // (Execute callback in a separate tokio task, and send notification after callback completion)
1048 if let Some(callback) = callback {
1049 tokio::spawn(async move {
1050 // 执行回调
1051 // (Execute callback)
1052 let future = callback.call();
1053 future.await;
1054
1055 // 回调执行完成后发送通知
1056 // (Send notification after callback completion)
1057 let _ = notifier.0.send(TaskCompletionReason::Expired);
1058 });
1059 } else {
1060 // 如果没有回调,立即发送完成通知
1061 // (Send notification immediately if no callback)
1062 let _ = notifier.0.send(TaskCompletionReason::Expired);
1063 }
1064 }
1065 }
1066 }
1067 }
1068
1069 /// 停止定时器管理器
1070 /// (Stop timer manager)
1071 pub async fn shutdown(mut self) {
1072 if let Some(handle) = self.tick_handle.take() {
1073 handle.abort();
1074 // (Abort tick loop task)
1075 let _ = handle.await;
1076 }
1077 }
1078}
1079
1080impl Drop for TimerWheel {
1081 fn drop(&mut self) {
1082 if let Some(handle) = self.tick_handle.take() {
1083 handle.abort();
1084 }
1085 }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090 use super::*;
1091 use std::sync::atomic::{AtomicU32, Ordering};
1092
1093 #[tokio::test]
1094 async fn test_timer_creation() {
1095 let _timer = TimerWheel::with_defaults();
1096 }
1097
1098 #[tokio::test]
1099 async fn test_schedule_once() {
1100 use std::sync::Arc;
1101 let timer = TimerWheel::with_defaults();
1102 let counter = Arc::new(AtomicU32::new(0));
1103 let counter_clone = Arc::clone(&counter);
1104
1105 let task = TimerWheel::create_task(
1106 Duration::from_millis(50),
1107 Some(CallbackWrapper::new(move || {
1108 let counter = Arc::clone(&counter_clone);
1109 async move {
1110 counter.fetch_add(1, Ordering::SeqCst);
1111 }
1112 })),
1113 );
1114 let _handle = timer.register(task);
1115
1116 // 等待定时器触发
1117 tokio::time::sleep(Duration::from_millis(100)).await;
1118 assert_eq!(counter.load(Ordering::SeqCst), 1);
1119 }
1120
1121 #[tokio::test]
1122 async fn test_cancel_timer() {
1123 use std::sync::Arc;
1124 let timer = TimerWheel::with_defaults();
1125 let counter = Arc::new(AtomicU32::new(0));
1126 let counter_clone = Arc::clone(&counter);
1127
1128 let task = TimerWheel::create_task(
1129 Duration::from_millis(100),
1130 Some(CallbackWrapper::new(move || {
1131 let counter = Arc::clone(&counter_clone);
1132 async move {
1133 counter.fetch_add(1, Ordering::SeqCst);
1134 }
1135 })),
1136 );
1137 let handle = timer.register(task);
1138
1139 // 立即取消
1140 let cancel_result = handle.cancel();
1141 assert!(cancel_result);
1142
1143 // 等待足够长时间确保定时器不会触发
1144 tokio::time::sleep(Duration::from_millis(200)).await;
1145 assert_eq!(counter.load(Ordering::SeqCst), 0);
1146 }
1147
1148 #[tokio::test]
1149 async fn test_cancel_immediate() {
1150 use std::sync::Arc;
1151 let timer = TimerWheel::with_defaults();
1152 let counter = Arc::new(AtomicU32::new(0));
1153 let counter_clone = Arc::clone(&counter);
1154
1155 let task = TimerWheel::create_task(
1156 Duration::from_millis(100),
1157 Some(CallbackWrapper::new(move || {
1158 let counter = Arc::clone(&counter_clone);
1159 async move {
1160 counter.fetch_add(1, Ordering::SeqCst);
1161 }
1162 })),
1163 );
1164 let handle = timer.register(task);
1165
1166 // 立即取消
1167 let cancel_result = handle.cancel();
1168 assert!(cancel_result);
1169
1170 // 等待足够长时间确保定时器不会触发
1171 tokio::time::sleep(Duration::from_millis(200)).await;
1172 assert_eq!(counter.load(Ordering::SeqCst), 0);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_postpone_timer() {
1177 use std::sync::Arc;
1178 let timer = TimerWheel::with_defaults();
1179 let counter = Arc::new(AtomicU32::new(0));
1180 let counter_clone = Arc::clone(&counter);
1181
1182 let task = TimerWheel::create_task(
1183 Duration::from_millis(50),
1184 Some(CallbackWrapper::new(move || {
1185 let counter = Arc::clone(&counter_clone);
1186 async move {
1187 counter.fetch_add(1, Ordering::SeqCst);
1188 }
1189 })),
1190 );
1191 let task_id = task.get_id();
1192 let handle = timer.register(task);
1193
1194 // 推迟任务到 150ms
1195 let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
1196 assert!(postponed);
1197
1198 // 等待原定时间 50ms,任务不应该触发
1199 tokio::time::sleep(Duration::from_millis(70)).await;
1200 assert_eq!(counter.load(Ordering::SeqCst), 0);
1201
1202 // 等待新的触发时间(从推迟开始算,还需要等待约 150ms)
1203 let result = tokio::time::timeout(
1204 Duration::from_millis(200),
1205 handle.into_completion_receiver().0
1206 ).await;
1207 assert!(result.is_ok());
1208
1209 // 等待回调执行
1210 tokio::time::sleep(Duration::from_millis(20)).await;
1211 assert_eq!(counter.load(Ordering::SeqCst), 1);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_postpone_with_callback() {
1216 use std::sync::Arc;
1217 let timer = TimerWheel::with_defaults();
1218 let counter = Arc::new(AtomicU32::new(0));
1219 let counter_clone1 = Arc::clone(&counter);
1220 let counter_clone2 = Arc::clone(&counter);
1221
1222 // 创建任务,原始回调增加 1
1223 let task = TimerWheel::create_task(
1224 Duration::from_millis(50),
1225 Some(CallbackWrapper::new(move || {
1226 let counter = Arc::clone(&counter_clone1);
1227 async move {
1228 counter.fetch_add(1, Ordering::SeqCst);
1229 }
1230 })),
1231 );
1232 let task_id = task.get_id();
1233 let handle = timer.register(task);
1234
1235 // 推迟任务并替换回调,新回调增加 10
1236 let postponed = timer.postpone(
1237 task_id,
1238 Duration::from_millis(100),
1239 Some(CallbackWrapper::new(move || {
1240 let counter = Arc::clone(&counter_clone2);
1241 async move {
1242 counter.fetch_add(10, Ordering::SeqCst);
1243 }
1244 })),
1245 );
1246 assert!(postponed);
1247
1248 // 等待任务触发(推迟后需要等待100ms,加上余量)
1249 let result = tokio::time::timeout(
1250 Duration::from_millis(200),
1251 handle.into_completion_receiver().0
1252 ).await;
1253 assert!(result.is_ok());
1254
1255 // 等待回调执行
1256 tokio::time::sleep(Duration::from_millis(20)).await;
1257
1258 // 验证新回调被执行(增加了 10 而不是 1)
1259 assert_eq!(counter.load(Ordering::SeqCst), 10);
1260 }
1261
1262 #[tokio::test]
1263 async fn test_postpone_nonexistent_timer() {
1264 let timer = TimerWheel::with_defaults();
1265
1266 // 尝试推迟不存在的任务
1267 let fake_task = TimerWheel::create_task(Duration::from_millis(50), None);
1268 let fake_task_id = fake_task.get_id();
1269 // 不注册这个任务
1270
1271 let postponed = timer.postpone(fake_task_id, Duration::from_millis(100), None);
1272 assert!(!postponed);
1273 }
1274
1275 #[tokio::test]
1276 async fn test_postpone_batch() {
1277 use std::sync::Arc;
1278 let timer = TimerWheel::with_defaults();
1279 let counter = Arc::new(AtomicU32::new(0));
1280
1281 // 创建 3 个任务
1282 let mut task_ids = Vec::new();
1283 for _ in 0..3 {
1284 let counter_clone = Arc::clone(&counter);
1285 let task = TimerWheel::create_task(
1286 Duration::from_millis(50),
1287 Some(CallbackWrapper::new(move || {
1288 let counter = Arc::clone(&counter_clone);
1289 async move {
1290 counter.fetch_add(1, Ordering::SeqCst);
1291 }
1292 })),
1293 );
1294 task_ids.push((task.get_id(), Duration::from_millis(150)));
1295 timer.register(task);
1296 }
1297
1298 // 批量推迟
1299 let postponed = timer.postpone_batch(task_ids);
1300 assert_eq!(postponed, 3);
1301
1302 // 等待原定时间 50ms,任务不应该触发
1303 tokio::time::sleep(Duration::from_millis(70)).await;
1304 assert_eq!(counter.load(Ordering::SeqCst), 0);
1305
1306 // 等待新的触发时间(从推迟开始算,还需要等待约 150ms)
1307 tokio::time::sleep(Duration::from_millis(200)).await;
1308
1309 // 等待回调执行
1310 tokio::time::sleep(Duration::from_millis(20)).await;
1311 assert_eq!(counter.load(Ordering::SeqCst), 3);
1312 }
1313
1314 #[tokio::test]
1315 async fn test_postpone_batch_with_callbacks() {
1316 use std::sync::Arc;
1317 let timer = TimerWheel::with_defaults();
1318 let counter = Arc::new(AtomicU32::new(0));
1319
1320 // 创建 3 个任务
1321 let mut task_ids = Vec::new();
1322 for _ in 0..3 {
1323 let task = TimerWheel::create_task(
1324 Duration::from_millis(50),
1325 None
1326 );
1327 task_ids.push(task.get_id());
1328 timer.register(task);
1329 }
1330
1331 // 批量推迟并替换回调
1332 let updates: Vec<_> = task_ids
1333 .into_iter()
1334 .map(|id| {
1335 let counter_clone = Arc::clone(&counter);
1336 (id, Duration::from_millis(150), Some(CallbackWrapper::new(move || {
1337 let counter = Arc::clone(&counter_clone);
1338 async move {
1339 counter.fetch_add(1, Ordering::SeqCst);
1340 }
1341 })))
1342 })
1343 .collect();
1344
1345 let postponed = timer.postpone_batch_with_callbacks(updates);
1346 assert_eq!(postponed, 3);
1347
1348 // 等待原定时间 50ms,任务不应该触发
1349 tokio::time::sleep(Duration::from_millis(70)).await;
1350 assert_eq!(counter.load(Ordering::SeqCst), 0);
1351
1352 // 等待新的触发时间(从推迟开始算,还需要等待约 150ms)
1353 tokio::time::sleep(Duration::from_millis(200)).await;
1354
1355 // 等待回调执行
1356 tokio::time::sleep(Duration::from_millis(20)).await;
1357 assert_eq!(counter.load(Ordering::SeqCst), 3);
1358 }
1359
1360 #[tokio::test]
1361 async fn test_postpone_keeps_completion_receiver_valid() {
1362 use std::sync::Arc;
1363 let timer = TimerWheel::with_defaults();
1364 let counter = Arc::new(AtomicU32::new(0));
1365 let counter_clone = Arc::clone(&counter);
1366
1367 let task = TimerWheel::create_task(
1368 Duration::from_millis(50),
1369 Some(CallbackWrapper::new(move || {
1370 let counter = Arc::clone(&counter_clone);
1371 async move {
1372 counter.fetch_add(1, Ordering::SeqCst);
1373 }
1374 })),
1375 );
1376 let task_id = task.get_id();
1377 let handle = timer.register(task);
1378
1379 // 推迟任务
1380 timer.postpone(task_id, Duration::from_millis(100), None);
1381
1382 // 验证原 completion_receiver 仍然有效(推迟后需要等待100ms,加上余量)
1383 let result = tokio::time::timeout(
1384 Duration::from_millis(200),
1385 handle.into_completion_receiver().0
1386 ).await;
1387 assert!(result.is_ok(), "Completion receiver should still work after postpone");
1388
1389 // 等待回调执行
1390 tokio::time::sleep(Duration::from_millis(20)).await;
1391 assert_eq!(counter.load(Ordering::SeqCst), 1);
1392 }
1393}
1394