kestrel_timer/
timer.rs

1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskHandle, TaskId};
5use crate::wheel::Wheel;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::task::JoinHandle;
10use handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
11
12/// Timing Wheel Timer Manager
13/// 
14/// 时间轮定时器管理器
15pub struct TimerWheel {
16    /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
17    /// 
18    /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
19    wheel: Arc<Mutex<Wheel>>,
20    
21    /// Background tick loop task handle
22    /// 
23    /// 后台 tick 循环任务句柄
24    tick_handle: Option<JoinHandle<()>>,
25}
26
27impl TimerWheel {
28    /// Create a new timer manager
29    ///
30    /// # Parameters
31    /// - `config`: Timing wheel configuration, already validated
32    /// - `batch_config`: Batch operation configuration
33    /// 
34    /// 创建新的定时器管理器
35    ///
36    /// # 参数
37    /// - `config`: 时间轮配置,已验证
38    /// - `batch_config`: 批量操作配置
39    ///
40    /// # Examples (示例)
41    /// ```no_run
42    /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
43    /// use std::time::Duration;
44    ///
45    /// #[tokio::main]
46    /// async fn main() {
47    ///     let config = WheelConfig::builder()
48    ///         .l0_tick_duration(Duration::from_millis(10))
49    ///         .l0_slot_count(512)
50    ///         .l1_tick_duration(Duration::from_secs(1))
51    ///         .l1_slot_count(64)
52    ///         .build()
53    ///         .unwrap();
54    ///     let timer = TimerWheel::new(config, BatchConfig::default());
55    ///     
56    ///     // Use two-step API: allocate handle first, then register
57    ///     // 使用两步 API:先分配 handle,再注册
58    ///     let handle = timer.allocate_handle();
59    ///     let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
60    ///     let _timer_handle = timer.register(handle, task);
61    /// }
62    /// ```
63    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
64        let tick_duration = config.l0_tick_duration;
65        let wheel = Wheel::new(config, batch_config);
66        let wheel = Arc::new(Mutex::new(wheel));
67        let wheel_clone = wheel.clone();
68
69        // Start background tick loop
70        // 启动后台 tick 循环
71        let tick_handle = tokio::spawn(async move {
72            Self::tick_loop(wheel_clone, tick_duration).await;
73        });
74
75        Self {
76            wheel,
77            tick_handle: Some(tick_handle),
78        }
79    }
80
81    /// Create timer manager with default configuration, hierarchical mode
82    /// - L0 layer tick duration: 10ms, slot count: 512
83    /// - L1 layer tick duration: 1s, slot count: 64
84    ///
85    /// # Returns
86    /// Timer manager instance
87    /// 
88    /// 使用默认配置创建定时器管理器,分层模式
89    /// - L0 层 tick 持续时间:10ms,槽数量:512
90    /// - L1 层 tick 持续时间:1s,槽数量:64
91    ///
92    /// # 返回值
93    /// 定时器管理器实例
94    ///
95    /// # Examples (示例)
96    /// ```no_run
97    /// use kestrel_timer::TimerWheel;
98    ///
99    /// #[tokio::main]
100    /// async fn main() {
101    ///     let timer = TimerWheel::with_defaults();
102    /// }
103    /// ```
104    pub fn with_defaults() -> Self {
105        Self::new(WheelConfig::default(), BatchConfig::default())
106    }
107
108    /// Create TimerService bound to this timing wheel with default configuration
109    ///
110    /// # Parameters
111    /// - `service_config`: Service configuration
112    ///
113    /// # Returns
114    /// TimerService instance bound to this timing wheel
115    ///
116    /// 创建绑定到此时间轮的 TimerService,使用默认配置
117    ///
118    /// # 参数
119    /// - `service_config`: 服务配置
120    ///
121    /// # 返回值
122    /// 绑定到此时间轮的 TimerService 实例
123    ///
124    /// # Examples (示例)
125    /// ```no_run
126    /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
127    /// use std::time::Duration;
128    /// 
129    ///
130    /// #[tokio::main]
131    /// async fn main() {
132    ///     let timer = TimerWheel::with_defaults();
133    ///     let mut service = timer.create_service(ServiceConfig::default());
134    ///     
135    ///     // Use two-step API to batch schedule timers through service
136    ///     // 使用两步 API 通过服务批量调度定时器
137    ///     // Step 1: Allocate handles
138    ///     let handles = service.allocate_handles(5);
139    ///     
140    ///     // Step 2: Create tasks
141    ///     let tasks: Vec<_> = (0..5)
142    ///         .map(|_| {
143    ///             use kestrel_timer::TimerTask;
144    ///             TimerTask::new_oneshot(Duration::from_millis(100), Some(CallbackWrapper::new(|| async {})))
145    ///         })
146    ///         .collect();
147    ///     
148    ///     // Step 3: Register batch
149    ///     service.register_batch(handles, tasks).unwrap();
150    ///     
151    ///     // Receive timeout notifications
152    ///     // 接收超时通知
153    ///     let mut rx = service.take_receiver().unwrap();
154    ///     while let Some(task_id) = rx.recv().await {
155    ///         println!("Task {:?} completed", task_id);
156    ///     }
157    /// }
158    /// ```
159    pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
160        crate::service::TimerService::new(self.wheel.clone(), service_config)
161    }
162    
163    /// Create TimerService bound to this timing wheel with custom configuration
164    ///
165    /// # Parameters
166    /// - `config`: Service configuration
167    ///
168    /// # Returns
169    /// TimerService instance bound to this timing wheel
170    ///
171    /// 创建绑定到此时间轮的 TimerService,使用自定义配置
172    ///
173    /// # 参数
174    /// - `config`: 服务配置
175    ///
176    /// # 返回值
177    /// 绑定到此时间轮的 TimerService 实例
178    ///
179    /// # Examples (示例)
180    /// ```no_run
181    /// use kestrel_timer::{TimerWheel, config::ServiceConfig, TimerTask};
182    /// use std::num::NonZeroUsize;
183    ///
184    /// #[tokio::main]
185    /// async fn main() {
186    ///     let timer = TimerWheel::with_defaults();
187    ///     let config = ServiceConfig::builder()
188    ///         .command_channel_capacity(NonZeroUsize::new(1024).unwrap())
189    ///         .timeout_channel_capacity(NonZeroUsize::new(2000).unwrap())
190    ///         .build();
191    ///     let service = timer.create_service_with_config(config);
192    /// }
193    /// ```
194    pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
195        crate::service::TimerService::new(self.wheel.clone(), config)
196    }
197
198
199    /// Allocate a handle from DeferredMap
200    /// 
201    /// # Returns
202    /// A unique handle for later insertion
203    /// 
204    /// # 返回值
205    /// 用于后续插入的唯一 handle
206    pub fn allocate_handle(&self) -> TaskHandle {
207        self.wheel.lock().allocate_handle()
208    }
209
210    /// Batch allocate handles from DeferredMap
211    /// 
212    /// # Parameters
213    /// - `count`: Number of handles to allocate
214    /// 
215    /// # Returns
216    /// Vector of unique handles for later batch insertion
217    /// 
218    /// # 参数
219    /// - `count`: 要分配的 handle 数量
220    /// 
221    /// # 返回值
222    /// 用于后续批量插入的唯一 handles 向量
223    pub fn allocate_handles(&self, count: usize) -> Vec<TaskHandle> {
224        self.wheel.lock().allocate_handles(count)
225    }
226    
227    /// Register timer task to timing wheel (registration phase)
228    /// 
229    /// # Parameters
230    /// - `task`: Task created via `create_task()`
231    /// 
232    /// # Returns
233    /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
234    /// 
235    /// 注册定时器任务到时间轮 (注册阶段)
236    /// 
237    /// # 参数
238    /// - `task`: 通过 `create_task()` 创建的任务
239    /// 
240    /// # 返回值
241    /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
242    /// 
243    /// # Examples (示例)
244    /// ```no_run
245    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
246    /// 
247    /// use std::time::Duration;
248    /// 
249    /// #[tokio::main]
250    /// async fn main() {
251    ///     let timer = TimerWheel::with_defaults();
252    ///     
253    ///     // Step 1: Allocate handle
254    ///     let allocated_handle = timer.allocate_handle();
255    ///     let task_id = allocated_handle.task_id();
256    ///     
257    ///     // Step 2: Create task
258    ///     let task = TimerTask::new_oneshot(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
259    ///         println!("Timer fired!");
260    ///     })));
261    ///     
262    ///     // Step 3: Register task
263    ///     let handle = timer.register(allocated_handle, task);
264    ///     
265    ///     // Wait for timer completion
266    ///     // 等待定时器完成
267    ///     use kestrel_timer::CompletionReceiver;
268    ///     let (rx, _handle) = handle.into_parts();
269    ///     match rx {
270    ///         CompletionReceiver::OneShot(receiver) => {
271    ///             receiver.wait().await;
272    ///         },
273    ///         _ => {}
274    ///     }
275    /// }
276    /// ```
277    #[inline]
278    pub fn register(&self, handle: TaskHandle, task: crate::task::TimerTask) -> TimerHandleWithCompletion {
279        let task_id = handle.task_id();
280
281        let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
282        
283        // Single lock to complete all operations
284        // 单次加锁完成所有操作
285        let mut wheel_guard = self.wheel.lock();
286        wheel_guard.insert(handle, task);
287        
288        TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
289    }
290    
291    /// Batch register timer tasks to timing wheel (registration phase)
292    /// 
293    /// # Parameters
294    /// - `handles`: Pre-allocated handles for tasks
295    /// - `tasks`: List of timer tasks
296    /// 
297    /// # Returns
298    /// - `Ok(BatchHandleWithCompletion)` if all tasks are successfully registered
299    /// - `Err(TimerError::BatchLengthMismatch)` if handles and tasks lengths don't match
300    /// 
301    /// 批量注册定时器任务到时间轮 (注册阶段)
302    /// 
303    /// # 参数
304    /// - `handles`: 任务的预分配 handles
305    /// - `tasks`: 定时器任务列表
306    /// 
307    /// # 返回值
308    /// - `Ok(BatchHandleWithCompletion)` 如果所有任务成功注册
309    /// - `Err(TimerError::BatchLengthMismatch)` 如果 handles 和 tasks 长度不匹配
310    /// 
311    /// # Examples (示例)
312    /// ```no_run
313    /// use kestrel_timer::{TimerWheel, TimerTask};
314    /// use std::time::Duration;
315    /// 
316    /// #[tokio::main]
317    /// async fn main() {
318    ///     let timer = TimerWheel::with_defaults();
319    ///     
320    ///     // Step 1: Allocate handles
321    ///     let handles = timer.allocate_handles(3);
322    ///     
323    ///     // Step 2: Create tasks
324    ///     let tasks: Vec<_> = (0..3)
325    ///         .map(|_| TimerTask::new_oneshot(Duration::from_secs(1), None))
326    ///         .collect();
327    ///     
328    ///     // Step 3: Batch register
329    ///     let batch = timer.register_batch(handles, tasks)
330    ///         .expect("register_batch should succeed");
331    ///     println!("Registered {} timers", batch.len());
332    /// }
333    /// ```
334    #[inline]
335    pub fn register_batch(
336        &self, 
337        handles: Vec<TaskHandle>, 
338        tasks: Vec<crate::task::TimerTask>
339    ) -> Result<BatchHandleWithCompletion, crate::error::TimerError> {
340        // Validate lengths match
341        if handles.len() != tasks.len() {
342            return Err(crate::error::TimerError::BatchLengthMismatch {
343                handles_len: handles.len(),
344                tasks_len: tasks.len(),
345            });
346        }
347        
348        let task_count = tasks.len();
349        let mut completion_rxs = Vec::with_capacity(task_count);
350        let mut task_ids = Vec::with_capacity(task_count);
351        let mut prepared_handles = Vec::with_capacity(task_count);
352        let mut prepared_tasks = Vec::with_capacity(task_count);
353        
354        // Step 1: Prepare all channels and notifiers
355        for (handle, task) in handles.into_iter().zip(tasks.into_iter()) {
356            let task_id = handle.task_id();
357            let (task, completion_rx) = crate::task::TimerTaskWithCompletionNotifier::from_timer_task(task);
358            task_ids.push(task_id);
359            completion_rxs.push(completion_rx);
360            prepared_handles.push(handle);
361            prepared_tasks.push(task);
362        }
363        
364        // Step 2: Single lock, batch insert
365        {
366            let mut wheel_guard = self.wheel.lock();
367            wheel_guard.insert_batch(prepared_handles, prepared_tasks)?;
368        }
369        
370        Ok(BatchHandleWithCompletion::new(BatchHandle::new(task_ids, self.wheel.clone()), completion_rxs))
371    }
372
373    /// Cancel timer
374    ///
375    /// # Parameters
376    /// - `task_id`: Task ID
377    ///
378    /// # Returns
379    /// Returns true if task exists and is successfully cancelled, otherwise false
380    /// 
381    /// 取消定时器
382    ///
383    /// # 参数
384    /// - `task_id`: 任务 ID
385    ///
386    /// # 返回值
387    /// 如果任务存在且成功取消则返回 true,否则返回 false
388    /// 
389    /// # Examples (示例)
390    /// ```no_run
391    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
392    /// 
393    /// use std::time::Duration;
394    ///
395    /// #[tokio::main]
396    /// async fn main() {
397    ///     let timer = TimerWheel::with_defaults();
398    ///     
399    ///     // Step 1: Allocate handle
400    ///     let allocated_handle = timer.allocate_handle();
401    ///     let task_id = allocated_handle.task_id();
402    ///     
403    ///     // Step 2: Create and register task
404    ///     let task = TimerTask::new_oneshot(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
405    ///         println!("Timer fired!");
406    ///     })));
407    ///     let _handle = timer.register(allocated_handle, task);
408    ///     
409    ///     // Cancel task using task ID
410    ///     // 使用任务 ID 取消任务
411    ///     let cancelled = timer.cancel(task_id);
412    ///     println!("Canceled successfully: {}", cancelled);
413    /// }
414    /// ```
415    #[inline]
416    pub fn cancel(&self, task_id: TaskId) -> bool {
417        let mut wheel = self.wheel.lock();
418        wheel.cancel(task_id)
419    }
420
421    /// Batch cancel timers
422    ///
423    /// # Parameters
424    /// - `task_ids`: List of task IDs to cancel
425    ///
426    /// # Returns
427    /// Number of successfully cancelled tasks
428    ///
429    /// 批量取消定时器
430    ///
431    /// # 参数
432    /// - `task_ids`: 要取消的任务 ID 列表
433    ///
434    /// # 返回值
435    /// 成功取消的任务数量
436    ///
437    /// # Performance Advantages
438    /// - Batch processing reduces lock contention
439    /// - Internally optimized batch cancellation operation
440    ///
441    /// # Examples (示例)
442    /// ```no_run
443    /// use kestrel_timer::{TimerWheel, TimerTask};
444    /// use std::time::Duration;
445    ///
446    /// #[tokio::main]
447    /// async fn main() {
448    ///     let timer = TimerWheel::with_defaults();
449    ///     
450    ///     // Create multiple timers
451    ///     // 创建多个定时器
452    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(10), None);
453    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(10), None);
454    ///     let task3 = TimerTask::new_oneshot(Duration::from_secs(10), None);
455    ///     
456    ///     // Allocate handles and get task IDs
457    ///     let h1 = timer.allocate_handle();
458    ///     let h2 = timer.allocate_handle();
459    ///     let h3 = timer.allocate_handle();
460    ///     let task_ids = vec![h1.task_id(), h2.task_id(), h3.task_id()];
461    ///     
462    ///     let _h1 = timer.register(h1, task1);
463    ///     let _h2 = timer.register(h2, task2);
464    ///     let _h3 = timer.register(h3, task3);
465    ///     
466    ///     // Batch cancel
467    ///     // 批量取消
468    ///     let cancelled = timer.cancel_batch(&task_ids);
469    ///     println!("Canceled {} timers", cancelled);
470    /// }
471    /// ```
472    #[inline]
473    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
474        let mut wheel = self.wheel.lock();
475        wheel.cancel_batch(task_ids)
476    }
477
478    /// Postpone timer
479    ///
480    /// # Parameters
481    /// - `task_id`: Task ID to postpone
482    /// - `new_delay`: New delay duration, recalculated from current time
483    /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
484    ///
485    /// # Returns
486    /// Returns true if task exists and is successfully postponed, otherwise false
487    ///
488    /// 推迟定时器
489    ///
490    /// # 参数
491    /// - `task_id`: 要推迟的任务 ID
492    /// - `new_delay`: 新的延迟时间,从当前时间重新计算
493    /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
494    ///
495    /// # 返回值
496    /// 如果任务存在且成功推迟则返回 true,否则返回 false
497    ///
498    /// # Note
499    /// - Task ID remains unchanged after postponement
500    /// - Original completion_receiver remains valid
501    ///
502    /// # 注意
503    /// - 任务 ID 在推迟后保持不变
504    /// - 原始 completion_receiver 保持有效
505    ///
506    /// # Examples (示例)
507    ///
508    /// ## Keep original callback (保持原始回调)
509    /// ```no_run
510    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
511    /// use std::time::Duration;
512    /// 
513    ///
514    /// #[tokio::main]
515    /// async fn main() {
516    ///     let timer = TimerWheel::with_defaults();
517    ///     
518    ///     // Allocate handle first
519    ///     let allocated_handle = timer.allocate_handle();
520    ///     let task_id = allocated_handle.task_id();
521    ///     
522    ///     let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
523    ///         println!("Timer fired!");
524    ///     })));
525    ///     let _handle = timer.register(allocated_handle, task);
526    ///     
527    ///     // Postpone to 10 seconds after triggering, and keep original callback
528    ///     // 推迟到 10 秒后触发,并保持原始回调
529    ///     let success = timer.postpone(task_id, Duration::from_secs(10), None);
530    ///     println!("Postponed successfully: {}", success);
531    /// }
532    /// ```
533    ///
534    /// ## Replace with new callback (替换为新的回调)
535    /// ```no_run
536    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
537    /// use std::time::Duration;
538    ///
539    /// #[tokio::main]
540    /// async fn main() {
541    ///     let timer = TimerWheel::with_defaults();
542    ///     
543    ///     // Allocate handle first
544    ///     let allocated_handle = timer.allocate_handle();
545    ///     let task_id = allocated_handle.task_id();
546    ///     
547    ///     let task = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
548    ///         println!("Original callback!");
549    ///     })));
550    ///     let _handle = timer.register(allocated_handle, task);
551    ///     
552    ///     // Postpone to 10 seconds after triggering, and replace with new callback
553    ///     // 推迟到 10 秒后触发,并替换为新的回调
554    ///     let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
555    ///         println!("New callback!");
556    ///     })));
557    ///     println!("Postponed successfully: {}", success);
558    /// }
559    /// ```
560    #[inline]
561    pub fn postpone(
562        &self,
563        task_id: TaskId,
564        new_delay: Duration,
565        callback: Option<CallbackWrapper>,
566    ) -> bool {
567        let mut wheel = self.wheel.lock();
568        wheel.postpone(task_id, new_delay, callback)
569    }
570
571    /// Batch postpone timers (keep original callbacks)
572    ///
573    /// # Parameters
574    /// - `updates`: List of tuples of (task ID, new delay)
575    ///
576    /// # Returns
577    /// Number of successfully postponed tasks
578    ///
579    /// 批量推迟定时器 (保持原始回调)
580    ///
581    /// # 参数
582    /// - `updates`: (任务 ID, 新延迟) 元组列表
583    ///
584    /// # 返回值
585    /// 成功推迟的任务数量
586    ///
587    /// # Note
588    /// - This method keeps all tasks' original callbacks unchanged
589    /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
590    ///
591    /// # 注意
592    /// - 此方法保持所有任务的原始回调不变
593    /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
594    ///
595    /// # Performance Advantages
596    /// - Batch processing reduces lock contention
597    /// - Internally optimized batch postponement operation
598    ///
599    /// # Examples (示例)
600    /// ```no_run
601    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
602    /// use std::time::Duration;
603    ///
604    /// #[tokio::main]
605    /// async fn main() {
606    ///     let timer = TimerWheel::with_defaults();
607    ///     
608    ///     // Create multiple tasks with callbacks
609    ///     // 创建多个带有回调的任务
610    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
611    ///         println!("Task 1 fired!");
612    ///     })));
613    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
614    ///         println!("Task 2 fired!");
615    ///     })));
616    ///     let task3 = TimerTask::new_oneshot(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
617    ///         println!("Task 3 fired!");
618    ///     })));
619    ///     
620    ///     // Allocate handles and register
621    ///     let h1 = timer.allocate_handle();
622    ///     let h2 = timer.allocate_handle();
623    ///     let h3 = timer.allocate_handle();
624    ///     
625    ///     let task_ids = vec![
626    ///         (h1.task_id(), Duration::from_secs(10)),
627    ///         (h2.task_id(), Duration::from_secs(15)),
628    ///         (h3.task_id(), Duration::from_secs(20)),
629    ///     ];
630    ///     
631    ///     timer.register(h1, task1);
632    ///     timer.register(h2, task2);
633    ///     timer.register(h3, task3);
634    ///     
635    ///     // Batch postpone (keep original callbacks)
636    ///     // 批量推迟 (保持原始回调)
637    ///     let postponed = timer.postpone_batch(task_ids);
638    ///     println!("Postponed {} timers", postponed);
639    /// }
640    /// ```
641    #[inline]
642    pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
643        let mut wheel = self.wheel.lock();
644        wheel.postpone_batch(updates)
645    }
646
647    /// Batch postpone timers (replace callbacks)
648    ///
649    /// # Parameters
650    /// - `updates`: List of tuples of (task ID, new delay, new callback)
651    ///
652    /// # Returns
653    /// Number of successfully postponed tasks
654    ///
655    /// 批量推迟定时器 (替换回调)
656    ///
657    /// # 参数
658    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
659    ///
660    /// # 返回值
661    /// 成功推迟的任务数量
662    ///
663    /// # Performance Advantages
664    /// - Batch processing reduces lock contention
665    /// - Internally optimized batch postponement operation
666    ///
667    /// # Examples (示例)
668    /// ```no_run
669    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
670    /// use std::time::Duration;
671    /// use std::sync::Arc;
672    /// use std::sync::atomic::{AtomicU32, Ordering};
673    ///
674    /// #[tokio::main]
675    /// async fn main() {
676    ///     let timer = TimerWheel::with_defaults();
677    ///     let counter = Arc::new(AtomicU32::new(0));
678    ///     
679    ///     // Create multiple timers
680    ///     // 创建多个定时器
681    ///     let task1 = TimerTask::new_oneshot(Duration::from_secs(5), None);
682    ///     let task2 = TimerTask::new_oneshot(Duration::from_secs(5), None);
683    ///     
684    ///     // Allocate handles first
685    ///     let h1 = timer.allocate_handle();
686    ///     let h2 = timer.allocate_handle();
687    ///     let id1 = h1.task_id();
688    ///     let id2 = h2.task_id();
689    ///     
690    ///     timer.register(h1, task1);
691    ///     timer.register(h2, task2);
692    ///     
693    ///     // Batch postpone and replace callbacks
694    ///     // 批量推迟并替换回调
695    ///     let updates: Vec<_> = vec![id1, id2]
696    ///         .into_iter()
697    ///         .map(|id| {
698    ///             let counter = Arc::clone(&counter);
699    ///             (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
700    ///                 let counter = Arc::clone(&counter);
701    ///                 async move { counter.fetch_add(1, Ordering::SeqCst); }
702    ///             })))
703    ///         })
704    ///         .collect();
705    ///     let postponed = timer.postpone_batch_with_callbacks(updates);
706    ///     println!("Postponed {} timers", postponed);
707    /// }
708    /// ```
709    #[inline]
710    pub fn postpone_batch_with_callbacks(
711        &self,
712        updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
713    ) -> usize {
714        let mut wheel = self.wheel.lock();
715        wheel.postpone_batch_with_callbacks(updates.to_vec())
716    }
717    
718    /// Core tick loop
719    /// 
720    /// Background task that advances the timing wheel at regular intervals
721    /// 
722    /// # Parameters
723    /// - `wheel`: Shared timing wheel instance
724    /// - `tick_duration`: Duration between ticks
725    /// 
726    /// 核心 tick 循环
727    /// 
728    /// 定期推进时间轮的后台任务
729    /// 
730    /// # 参数
731    /// - `wheel`: 共享的时间轮实例
732    /// - `tick_duration`: tick 之间的持续时间
733    async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
734        let mut interval = tokio::time::interval(tick_duration);
735        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
736
737        loop {
738            interval.tick().await;
739
740            // Advance timing wheel and get expired tasks
741            // Note: wheel.advance() already handles completion notifications
742            let expired_tasks = {
743                let mut wheel_guard = wheel.lock();
744                wheel_guard.advance()
745            };
746
747            // Execute callbacks for expired tasks
748            // Notifications have already been sent by wheel.advance()
749            for task in expired_tasks {
750                if let Some(callback) = task.callback {
751                    // Spawn callback execution in a separate tokio task
752                    tokio::spawn(async move {
753                        let future = callback.call();
754                        future.await;
755                    });
756                }
757            }
758        }
759    }
760
761    /// Graceful shutdown of TimerWheel
762    /// 
763    /// 优雅关闭 TimerWheel
764    /// 
765    /// # Examples (示例)
766    /// ```no_run
767    /// # use kestrel_timer::TimerWheel;
768    /// # #[tokio::main]
769    /// # async fn main() {
770    /// let timer = TimerWheel::with_defaults();
771    /// 
772    /// // Use timer... (使用定时器...)
773    /// 
774    /// timer.shutdown().await;
775    /// # }
776    /// ```
777    pub async fn shutdown(mut self) {
778        if let Some(handle) = self.tick_handle.take() {
779            handle.abort();
780            let _ = handle.await;
781        }
782    }
783}
784
785/// Automatically abort the background tick task when TimerWheel is dropped
786/// 
787/// 当 TimerWheel 被销毁时自动中止后台 tick 任务
788impl Drop for TimerWheel {
789    fn drop(&mut self) {
790        if let Some(handle) = self.tick_handle.take() {
791            handle.abort();
792        }
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799    use std::sync::atomic::{AtomicU32, Ordering};
800    use crate::task::TimerTask;
801    
802    #[tokio::test]
803    async fn test_timer_creation() {
804        let _timer = TimerWheel::with_defaults();
805    }
806
807    #[tokio::test]
808    async fn test_schedule_once() {
809        use std::sync::Arc;
810        let timer = TimerWheel::with_defaults();
811        let counter = Arc::new(AtomicU32::new(0));
812        let counter_clone = Arc::clone(&counter);
813
814        let task = TimerTask::new_oneshot(
815            Duration::from_millis(50),
816            Some(CallbackWrapper::new(move || {
817                let counter = Arc::clone(&counter_clone);
818                async move {
819                    counter.fetch_add(1, Ordering::SeqCst);
820                }
821            })),
822        );
823        let allocate_handle = timer.allocate_handle();
824        let _handle = timer.register(allocate_handle, task);
825
826        // Wait for timer to trigger
827        // 等待定时器触发
828        tokio::time::sleep(Duration::from_millis(100)).await;
829        assert_eq!(counter.load(Ordering::SeqCst), 1);
830    }
831}
832