kestrel_timer/
timer.rs

1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskHandle, TaskId};
5use crate::wheel::Wheel;
6use handle::{BatchHandle, BatchHandleWithCompletion, TimerHandle, TimerHandleWithCompletion};
7use parking_lot::Mutex;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::task::JoinHandle;
11
12/// Timing Wheel Timer Manager
13///
14/// 时间轮定时器管理器
15pub struct TimerWheel {
16    /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
17    ///
18    /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
19    wheel: Arc<Mutex<Wheel>>,
20
21    /// Background tick loop task handle
22    ///
23    /// 后台 tick 循环任务句柄
24    tick_handle: Option<JoinHandle<()>>,
25}
26
27impl TimerWheel {
28    /// Create a new timer manager
29    ///
30    /// # Parameters
31    /// - `config`: Timing wheel configuration, already validated
32    /// - `batch_config`: Batch operation configuration
33    ///
34    /// 创建新的定时器管理器
35    ///
36    /// # 参数
37    /// - `config`: 时间轮配置,已验证
38    /// - `batch_config`: 批量操作配置
39    ///
40    /// # Examples (示例)
41    /// ```no_run
42    /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
43    /// use std::time::Duration;
44    ///
45    /// #[tokio::main]
46    /// async fn main() {
47    ///     let config = WheelConfig::builder()
48    ///         .l0_tick_duration(Duration::from_millis(10))
49    ///         .l0_slot_count(512)
50    ///         .l1_tick_duration(Duration::from_secs(1))
51    ///         .l1_slot_count(64)
52    ///         .build()
53    ///         .unwrap();
54    ///     let timer = TimerWheel::new(config, BatchConfig::default());
55    ///     
56    ///     // Use two-step API: allocate handle first, then register
57    ///     // 使用两步 API:先分配 handle,再注册
58    ///     let handle = timer.allocate_handle();
59    ///     let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
60    ///     let _timer_handle = timer.register(handle, task);
61    /// }
62    /// ```
63    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
64        let tick_duration = config.l0_tick_duration;
65        let wheel = Wheel::new(config, batch_config);
66        let wheel = Arc::new(Mutex::new(wheel));
67        let wheel_clone = wheel.clone();
68
69        // Start background tick loop
70        // 启动后台 tick 循环
71        let tick_handle = tokio::spawn(async move {
72            Self::tick_loop(wheel_clone, tick_duration).await;
73        });
74
75        Self {
76            wheel,
77            tick_handle: Some(tick_handle),
78        }
79    }
80
81    /// Create timer manager with default configuration, hierarchical mode
82    /// - L0 layer tick duration: 10ms, slot count: 512
83    /// - L1 layer tick duration: 1s, slot count: 64
84    ///
85    /// # Returns
86    /// Timer manager instance
87    ///
88    /// 使用默认配置创建定时器管理器,分层模式
89    /// - L0 层 tick 持续时间:10ms,槽数量:512
90    /// - L1 层 tick 持续时间:1s,槽数量:64
91    ///
92    /// # 返回值
93    /// 定时器管理器实例
94    ///
95    /// # Examples (示例)
96    /// ```no_run
97    /// use kestrel_timer::TimerWheel;
98    ///
99    /// #[tokio::main]
100    /// async fn main() {
101    ///     let timer = TimerWheel::with_defaults();
102    /// }
103    /// ```
104    pub fn with_defaults() -> Self {
105        Self::new(WheelConfig::default(), BatchConfig::default())
106    }
107
108    /// Create TimerService bound to this timing wheel with default configuration
109    ///
110    /// # Parameters
111    /// - `service_config`: Service configuration
112    ///
113    /// # Returns
114    /// TimerService instance bound to this timing wheel
115    ///
116    /// 创建绑定到此时间轮的 TimerService,使用默认配置
117    ///
118    /// # 参数
119    /// - `service_config`: 服务配置
120    ///
121    /// # 返回值
122    /// 绑定到此时间轮的 TimerService 实例
123    ///
124    /// # Examples (示例)
125    /// ```no_run
126    /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
127    /// use std::time::Duration;
128    ///
129    ///
130    /// #[tokio::main]
131    /// async fn main() {
132    ///     let timer = TimerWheel::with_defaults();
133    ///     let mut service = timer.create_service(ServiceConfig::default());
134    ///     
135    ///     // Use two-step API to batch schedule timers through service
136    ///     // 使用两步 API 通过服务批量调度定时器
137    ///     // Step 1: Allocate handles
138    ///     let handles = service.allocate_handles(5);
139    ///     
140    ///     // Step 2: Create tasks
141    ///     let tasks: Vec<_> = (0..5)
142    ///         .map(|_| {
143    ///             use kestrel_timer::TimerTask;
144    ///             TimerTask::new_oneshot(Duration::from_millis(100), Some(CallbackWrapper::new(|| async {})))
145    ///         })
146    ///         .collect();
147    ///     
148    ///     // Step 3: Register batch
149    ///     service.register_batch(handles, tasks).unwrap();
150    ///     
151    ///     // Receive timeout notifications
152    ///     // 接收超时通知
153    ///     let mut rx = service.take_receiver().unwrap();
154    ///     while let Some(task_id) = rx.recv().await {
155    ///         println!("Task {:?} completed", task_id);
156    ///     }
157    /// }
158    /// ```
159    pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
160        crate::service::TimerService::new(self.wheel.clone(), service_config)
161    }
162
163    /// Create TimerService bound to this timing wheel with custom configuration
164    ///
165    /// # Parameters
166    /// - `config`: Service configuration
167    ///
168    /// # Returns
169    /// TimerService instance bound to this timing wheel
170    ///
171    /// 创建绑定到此时间轮的 TimerService,使用自定义配置
172    ///
173    /// # 参数
174    /// - `config`: 服务配置
175    ///
176    /// # 返回值
177    /// 绑定到此时间轮的 TimerService 实例
178    ///
179    /// # Examples (示例)
180    /// ```no_run
181    /// use kestrel_timer::{TimerWheel, config::ServiceConfig, TimerTask};
182    /// use std::num::NonZeroUsize;
183    ///
184    /// #[tokio::main]
185    /// async fn main() {
186    ///     let timer = TimerWheel::with_defaults();
187    ///     let config = ServiceConfig::builder()
188    ///         .command_channel_capacity(NonZeroUsize::new(1024).unwrap())
189    ///         .timeout_channel_capacity(NonZeroUsize::new(2000).unwrap())
190    ///         .build();
191    ///     let service = timer.create_service_with_config(config);
192    /// }
193    /// ```
194    pub fn create_service_with_config(
195        &self,
196        config: ServiceConfig,
197    ) -> crate::service::TimerService {
198        crate::service::TimerService::new(self.wheel.clone(), config)
199    }
200
201    /// Allocate a handle from DeferredMap
202    ///
203    /// # Returns
204    /// A unique handle for later insertion
205    ///
206    /// # 返回值
207    /// 用于后续插入的唯一 handle
208    pub fn allocate_handle(&self) -> TaskHandle {
209        self.wheel.lock().allocate_handle()
210    }
211
212    /// Batch allocate handles from DeferredMap
213    ///
214    /// # Parameters
215    /// - `count`: Number of handles to allocate
216    ///
217    /// # Returns
218    /// Vector of unique handles for later batch insertion
219    ///
220    /// # 参数
221    /// - `count`: 要分配的 handle 数量
222    ///
223    /// # 返回值
224    /// 用于后续批量插入的唯一 handles 向量
225    pub fn allocate_handles(&self, count: usize) -> Vec<TaskHandle> {
226        self.wheel.lock().allocate_handles(count)
227    }
228
229    /// Register timer task to timing wheel (registration phase)
230    ///
231    /// # Parameters
232    /// - `task`: Task created via `create_task()`
233    ///
234    /// # Returns
235    /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
236    ///
237    /// 注册定时器任务到时间轮 (注册阶段)
238    ///
239    /// # 参数
240    /// - `task`: 通过 `create_task()` 创建的任务
241    ///
242    /// # 返回值
243    /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
244    ///
245    /// # Examples (示例)
246    /// ```no_run
247    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
248    ///
249    /// use std::time::Duration;
250    ///
251    /// #[tokio::main]
252    /// async fn main() {
253    ///     let timer = TimerWheel::with_defaults();
254    ///     
255    ///     // Step 1: Allocate handle
256    ///     let allocated_handle = timer.allocate_handle();
257    ///     let task_id = allocated_handle.task_id();
258    ///     
259    ///     // Step 2: Create task
260    ///     let task = TimerTask::new_oneshot(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
261    ///         println!("Timer fired!");
262    ///     })));
263    ///     
264    ///     // Step 3: Register task
265    ///     let handle = timer.register(allocated_handle, task);
266    ///     
267    ///     // Wait for timer completion
268    ///     // 等待定时器完成
269    ///     use kestrel_timer::CompletionReceiver;
270    ///     let (rx, _handle) = handle.into_parts();
271    ///     match rx {
272    ///         CompletionReceiver::OneShot(receiver) => {
273    ///             receiver.recv().await.unwrap();
274    ///         },
275    ///         _ => {}
276    ///     }
277    /// }
278    /// ```
279    #[inline]
280    pub fn register(
281        &self,
282        handle: TaskHandle,
283        task: crate::task::TimerTask,
284    ) -> TimerHandleWithCompletion {
285        let task_id = handle.task_id();
286
287        let (task, completion_rx) =
288            crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
289
290        // Single lock to complete all operations
291        // 单次加锁完成所有操作
292        let mut wheel_guard = self.wheel.lock();
293        wheel_guard.insert(handle, task);
294
295        TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
296    }
297
298    /// Batch register timer tasks to timing wheel (registration phase)
299    ///
300    /// # Parameters
301    /// - `handles`: Pre-allocated handles for tasks
302    /// - `tasks`: List of timer tasks
303    ///
304    /// # Returns
305    /// - `Ok(BatchHandleWithCompletion)` if all tasks are successfully registered
306    /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
307    ///
308    /// 批量注册定时器任务到时间轮 (注册阶段)
309    ///
310    /// # 参数
311    /// - `handles`: 任务的预分配 handles
312    /// - `tasks`: 定时器任务列表
313    ///
314    /// # 返回值
315    /// - `Ok(BatchHandleWithCompletion)` 如果所有任务成功注册
316    /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
317    ///
318    /// # Examples (示例)
319    /// ```no_run
320    /// use kestrel_timer::{TimerWheel, TimerTask};
321    /// use std::time::Duration;
322    ///
323    /// #[tokio::main]
324    /// async fn main() {
325    ///     let timer = TimerWheel::with_defaults();
326    ///     
327    ///     // Step 1: Allocate handles
328    ///     let handles = timer.allocate_handles(3);
329    ///     
330    ///     // Step 2: Create tasks
331    ///     let tasks: Vec<_> = (0..3)
332    ///         .map(|_| TimerTask::new_oneshot(Duration::from_secs(1), None))
333    ///         .collect();
334    ///     
335    ///     // Step 3: Batch register
336    ///     let batch = timer.register_batch(handles, tasks)
337    ///         .expect("register_batch should succeed");
338    ///     println!("Registered {} timers", batch.len());
339    /// }
340    /// ```
341    #[inline]
342    pub fn register_batch(
343        &self,
344        handles: Vec<TaskHandle>,
345        tasks: Vec<crate::task::TimerTask>,
346    ) -> Result<BatchHandleWithCompletion, crate::error::TimerError> {
347        // Validate lengths match
348        if handles.len() != tasks.len() {
349            return Err(crate::error::TimerError::BatchLengthMismatch {
350                handles_len: handles.len(),
351                tasks_len: tasks.len(),
352            });
353        }
354
355        let task_count = tasks.len();
356        let mut completion_rxs = Vec::with_capacity(task_count);
357        let mut task_ids = Vec::with_capacity(task_count);
358        let mut prepared_handles = Vec::with_capacity(task_count);
359        let mut prepared_tasks = Vec::with_capacity(task_count);
360
361        // Step 1: Prepare all channels and notifiers
362        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
363            let task_id = handle.task_id();
364            let (task, completion_rx) =
365                crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
366            task_ids.push(task_id);
367            completion_rxs.push(completion_rx);
368            prepared_handles.push(handle);
369            prepared_tasks.push(task);
370        }
371
372        // Step 2: Single lock, batch insert
373        {
374            let mut wheel_guard = self.wheel.lock();
375            wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
376        }
377
378        Ok(BatchHandleWithCompletion::new(
379            BatchHandle::new(task_ids, self.wheel.clone()),
380            completion_rxs,
381        ))
382    }
383
384    /// Cancel timer
385    ///
386    /// # Parameters
387    /// - `task_id`: Task ID
388    ///
389    /// # Returns
390    /// Returns true if task exists and is successfully cancelled, otherwise false
391    ///
392    /// 取消定时器
393    ///
394    /// # 参数
395    /// - `task_id`: 任务 ID
396    ///
397    /// # 返回值
398    /// 如果任务存在且成功取消则返回 true,否则返回 false
399    ///
400    /// # Examples (示例)
401    /// ```no_run
402    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
403    ///
404    /// use std::time::Duration;
405    ///
406    /// #[tokio::main]
407    /// async fn main() {
408    ///     let timer = TimerWheel::with_defaults();
409    ///     
410    ///     // Step 1: Allocate handle
411    ///     let allocated_handle = timer.allocate_handle();
412    ///     let task_id = allocated_handle.task_id();
413    ///     
414    ///     // Step 2: Create and register task
415    ///     let task = TimerTask::new_oneshot(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
416    ///         println!("Timer fired!");
417    ///     })));
418    ///     let _handle = timer.register(allocated_handle, task);
419    ///     
420    ///     // Cancel task using task ID
421    ///     // 使用任务 ID 取消任务
422    ///     let cancelled = timer.cancel(task_id);
423    ///     println!("Canceled successfully: {}", cancelled);
424    /// }
425    /// ```
426    #[inline]
427    pub fn cancel(&self, task_id: TaskId) -> bool {
428        let mut wheel = self.wheel.lock();
429        wheel.cancel(task_id)
430    }
431
432    /// Batch cancel timers
433    ///
434    /// # Parameters
435    /// - `task_ids`: List of task IDs to cancel
436    ///
437    /// # Returns
438    /// Number of successfully cancelled tasks
439    ///
440    /// 批量取消定时器
441    ///
442    /// # 参数
443    /// - `task_ids`: 要取消的任务 ID 列表
444    ///
445    /// # 返回值
446    /// 成功取消的任务数量
447    ///
448    /// # Performance Advantages
449    /// - Batch processing reduces lock contention
450    /// - Internally optimized batch cancellation operation
451    ///
452    /// # Examples (示例)
453    /// ```no_run
454    /// use kestrel_timer::{TimerWheel, TimerTask};
455    /// use std::time::Duration;
456    ///
457    /// #[tokio::main]
458    /// async fn main() {
459    ///     let timer = TimerWheel::with_defaults();
460    ///     
461    ///     // Create multiple timers
462    ///     // 创建多个定时器
463    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
464    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
465    ///     let task3 = TimerTask::new_oneshot(Duration::from_secs(10), None);
466    ///     
467    ///     // Allocate handles and get task IDs
468    ///     let h1 = timer.allocate_handle();
469    ///     let h2 = timer.allocate_handle();
470    ///     let h3 = timer.allocate_handle();
471    ///     let task_ids = vec![h1.task_id(), h2.task_id(), h3.task_id()];
472    ///     
473    ///     let _h1 = timer.register(h1, task1);
474    ///     let _h2 = timer.register(h2, task2);
475    ///     let _h3 = timer.register(h3, task3);
476    ///     
477    ///     // Batch cancel
478    ///     // 批量取消
479    ///     let cancelled = timer.cancel_batch(&task_ids);
480    ///     println!("Canceled {} timers", cancelled);
481    /// }
482    /// ```
483    #[inline]
484    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
485        let mut wheel = self.wheel.lock();
486        wheel.cancel_batch(task_ids)
487    }
488
489    /// Postpone timer
490    ///
491    /// # Parameters
492    /// - `task_id`: Task ID to postpone
493    /// - `new_delay`: New delay duration, recalculated from current time
494    /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
495    ///
496    /// # Returns
497    /// Returns true if task exists and is successfully postponed, otherwise false
498    ///
499    /// 推迟定时器
500    ///
501    /// # 参数
502    /// - `task_id`: 要推迟的任务 ID
503    /// - `new_delay`: 新的延迟时间,从当前时间重新计算
504    /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
505    ///
506    /// # 返回值
507    /// 如果任务存在且成功推迟则返回 true,否则返回 false
508    ///
509    /// # Note
510    /// - Task ID remains unchanged after postponement
511    /// - Original completion_receiver remains valid
512    ///
513    /// # 注意
514    /// - 任务 ID 在推迟后保持不变
515    /// - 原始 completion_receiver 保持有效
516    ///
517    /// # Examples (示例)
518    ///
519    /// ## Keep original callback (保持原始回调)
520    /// ```no_run
521    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
522    /// use std::time::Duration;
523    ///
524    ///
525    /// #[tokio::main]
526    /// async fn main() {
527    ///     let timer = TimerWheel::with_defaults();
528    ///     
529    ///     // Allocate handle first
530    ///     let allocated_handle = timer.allocate_handle();
531    ///     let task_id = allocated_handle.task_id();
532    ///     
533    ///     let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
534    ///         println!("Timer fired!");
535    ///     })));
536    ///     let _handle = timer.register(allocated_handle, task);
537    ///     
538    ///     // Postpone to 10 seconds after triggering, and keep original callback
539    ///     // 推迟到 10 秒后触发,并保持原始回调
540    ///     let success = timer.postpone(task_id, Duration::from_secs(10), None);
541    ///     println!("Postponed successfully: {}", success);
542    /// }
543    /// ```
544    ///
545    /// ## Replace with new callback (替换为新的回调)
546    /// ```no_run
547    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
548    /// use std::time::Duration;
549    ///
550    /// #[tokio::main]
551    /// async fn main() {
552    ///     let timer = TimerWheel::with_defaults();
553    ///     
554    ///     // Allocate handle first
555    ///     let allocated_handle = timer.allocate_handle();
556    ///     let task_id = allocated_handle.task_id();
557    ///     
558    ///     let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
559    ///         println!("Original callback!");
560    ///     })));
561    ///     let _handle = timer.register(allocated_handle, task);
562    ///     
563    ///     // Postpone to 10 seconds after triggering, and replace with new callback
564    ///     // 推迟到 10 秒后触发,并替换为新的回调
565    ///     let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
566    ///         println!("New callback!");
567    ///     })));
568    ///     println!("Postponed successfully: {}", success);
569    /// }
570    /// ```
571    #[inline]
572    pub fn postpone(
573        &self,
574        task_id: TaskId,
575        new_delay: Duration,
576        callback: Option<CallbackWrapper>,
577    ) -> bool {
578        let mut wheel = self.wheel.lock();
579        wheel.postpone(task_id, new_delay, callback)
580    }
581
582    /// Batch postpone timers (keep original callbacks)
583    ///
584    /// # Parameters
585    /// - `updates`: List of tuples of (task ID, new delay)
586    ///
587    /// # Returns
588    /// Number of successfully postponed tasks
589    ///
590    /// 批量推迟定时器 (保持原始回调)
591    ///
592    /// # 参数
593    /// - `updates`: (任务 ID, 新延迟) 元组列表
594    ///
595    /// # 返回值
596    /// 成功推迟的任务数量
597    ///
598    /// # Note
599    /// - This method keeps all tasks' original callbacks unchanged
600    /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
601    ///
602    /// # 注意
603    /// - 此方法保持所有任务的原始回调不变
604    /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
605    ///
606    /// # Performance Advantages
607    /// - Batch processing reduces lock contention
608    /// - Internally optimized batch postponement operation
609    ///
610    /// # Examples (示例)
611    /// ```no_run
612    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
613    /// use std::time::Duration;
614    ///
615    /// #[tokio::main]
616    /// async fn main() {
617    ///     let timer = TimerWheel::with_defaults();
618    ///     
619    ///     // Create multiple tasks with callbacks
620    ///     // 创建多个带有回调的任务
621    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
622    ///         println!("Task 1 fired!");
623    ///     })));
624    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
625    ///         println!("Task 2 fired!");
626    ///     })));
627    ///     let task3 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
628    ///         println!("Task 3 fired!");
629    ///     })));
630    ///     
631    ///     // Allocate handles and register
632    ///     let h1 = timer.allocate_handle();
633    ///     let h2 = timer.allocate_handle();
634    ///     let h3 = timer.allocate_handle();
635    ///     
636    ///     let task_ids = vec![
637    ///         (h1.task_id(), Duration::from_secs(10)),
638    ///         (h2.task_id(), Duration::from_secs(15)),
639    ///         (h3.task_id(), Duration::from_secs(20)),
640    ///     ];
641    ///     
642    ///     timer.register(h1, task1);
643    ///     timer.register(h2, task2);
644    ///     timer.register(h3, task3);
645    ///     
646    ///     // Batch postpone (keep original callbacks)
647    ///     // 批量推迟 (保持原始回调)
648    ///     let postponed = timer.postpone_batch(task_ids);
649    ///     println!("Postponed {} timers", postponed);
650    /// }
651    /// ```
652    #[inline]
653    pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
654        let mut wheel = self.wheel.lock();
655        wheel.postpone_batch(updates)
656    }
657
658    /// Batch postpone timers (replace callbacks)
659    ///
660    /// # Parameters
661    /// - `updates`: List of tuples of (task ID, new delay, new callback)
662    ///
663    /// # Returns
664    /// Number of successfully postponed tasks
665    ///
666    /// 批量推迟定时器 (替换回调)
667    ///
668    /// # 参数
669    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
670    ///
671    /// # 返回值
672    /// 成功推迟的任务数量
673    ///
674    /// # Performance Advantages
675    /// - Batch processing reduces lock contention
676    /// - Internally optimized batch postponement operation
677    ///
678    /// # Examples (示例)
679    /// ```no_run
680    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
681    /// use std::time::Duration;
682    /// use std::sync::Arc;
683    /// use std::sync::atomic::{AtomicU32, Ordering};
684    ///
685    /// #[tokio::main]
686    /// async fn main() {
687    ///     let timer = TimerWheel::with_defaults();
688    ///     let counter = Arc::new(AtomicU32::new(0));
689    ///     
690    ///     // Create multiple timers
691    ///     // 创建多个定时器
692    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(5), None);
693    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(5), None);
694    ///     
695    ///     // Allocate handles first
696    ///     let h1 = timer.allocate_handle();
697    ///     let h2 = timer.allocate_handle();
698    ///     let id1 = h1.task_id();
699    ///     let id2 = h2.task_id();
700    ///     
701    ///     timer.register(h1, task1);
702    ///     timer.register(h2, task2);
703    ///     
704    ///     // Batch postpone and replace callbacks
705    ///     // 批量推迟并替换回调
706    ///     let updates: Vec<_> = vec![id1, id2]
707    ///         .into_iter()
708    ///         .map(|id| {
709    ///             let counter = Arc::clone(&counter);
710    ///             (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
711    ///                 let counter = Arc::clone(&counter);
712    ///                 async move { counter.fetch_add(1, Ordering::SeqCst); }
713    ///             })))
714    ///         })
715    ///         .collect();
716    ///     let postponed = timer.postpone_batch_with_callbacks(updates);
717    ///     println!("Postponed {} timers", postponed);
718    /// }
719    /// ```
720    #[inline]
721    pub fn postpone_batch_with_callbacks(
722        &self,
723        updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
724    ) -> usize {
725        let mut wheel = self.wheel.lock();
726        wheel.postpone_batch_with_callbacks(updates.to_vec())
727    }
728
729    /// Core tick loop
730    ///
731    /// Background task that advances the timing wheel at regular intervals
732    ///
733    /// # Parameters
734    /// - `wheel`: Shared timing wheel instance
735    /// - `tick_duration`: Duration between ticks
736    ///
737    /// 核心 tick 循环
738    ///
739    /// 定期推进时间轮的后台任务
740    ///
741    /// # 参数
742    /// - `wheel`: 共享的时间轮实例
743    /// - `tick_duration`: tick 之间的持续时间
744    async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
745        let mut interval = tokio::time::interval(tick_duration);
746        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
747
748        loop {
749            interval.tick().await;
750
751            // Advance timing wheel and get expired tasks
752            // Note: wheel.advance() already handles completion notifications
753            let expired_tasks = {
754                let mut wheel_guard = wheel.lock();
755                wheel_guard.advance()
756            };
757
758            // Execute callbacks for expired tasks
759            // Notifications have already been sent by wheel.advance()
760            for task in expired_tasks {
761                if let Some(callback) = task.callback {
762                    // Spawn callback execution in a separate tokio task
763                    tokio::spawn(async move {
764                        let future = callback.call();
765                        future.await;
766                    });
767                }
768            }
769        }
770    }
771
772    /// Graceful shutdown of TimerWheel
773    ///
774    /// 优雅关闭 TimerWheel
775    ///
776    /// # Examples (示例)
777    /// ```no_run
778    /// # use kestrel_timer::TimerWheel;
779    /// # #[tokio::main]
780    /// # async fn main() {
781    /// let timer = TimerWheel::with_defaults();
782    ///
783    /// // Use timer... (使用定时器...)
784    ///
785    /// timer.shutdown().await;
786    /// # }
787    /// ```
788    pub async fn shutdown(mut self) {
789        if let Some(handle) = self.tick_handle.take() {
790            handle.abort();
791            let _ = handle.await;
792        }
793    }
794}
795
796/// Automatically abort the background tick task when TimerWheel is dropped
797///
798/// 当 TimerWheel 被销毁时自动中止后台 tick 任务
799impl Drop for TimerWheel {
800    fn drop(&mut self) {
801        if let Some(handle) = self.tick_handle.take() {
802            handle.abort();
803        }
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use crate::task::TimerTask;
811    use std::sync::atomic::{AtomicU32, Ordering};
812
813    #[tokio::test]
814    async fn test_timer_creation() {
815        let _timer = TimerWheel::with_defaults();
816    }
817
818    #[tokio::test]
819    async fn test_schedule_once() {
820        use std::sync::Arc;
821        let timer = TimerWheel::with_defaults();
822        let counter = Arc::new(AtomicU32::new(0));
823        let counter_clone = Arc::clone(&counter);
824
825        let task = TimerTask::new_oneshot(
826            Duration::from_millis(50),
827            Some(CallbackWrapper::new(move || {
828                let counter = Arc::clone(&counter_clone);
829                async move {
830                    counter.fetch_add(1, Ordering::SeqCst);
831                }
832            })),
833        );
834        let allocate_handle = timer.allocate_handle();
835        let _handle = timer.register(allocate_handle, task);
836
837        // Wait for timer to trigger
838        // 等待定时器触发
839        tokio::time::sleep(Duration::from_millis(100)).await;
840        assert_eq!(counter.load(Ordering::SeqCst), 1);
841    }
842}