kestrel_timer/
timer.rs

1pub mod handle;
2
3use crate::config::{BatchConfig, ServiceConfig, WheelConfig};
4use crate::task::{CallbackWrapper, TaskId, TaskCompletionReason};
5use crate::wheel::Wheel;
6use parking_lot::Mutex;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::oneshot;
10use tokio::task::JoinHandle;
11use handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
12
13/// Timing Wheel Timer Manager
14/// 
15/// 时间轮定时器管理器
16pub struct TimerWheel {
17    /// Timing wheel instance, wrapped in Arc<Mutex> for multi-threaded access
18    /// 
19    /// 时间轮实例,包装在 Arc<Mutex> 中以支持多线程访问
20    wheel: Arc<Mutex<Wheel>>,
21    
22    /// Background tick loop task handle
23    /// 
24    /// 后台 tick 循环任务句柄
25    tick_handle: Option<JoinHandle<()>>,
26}
27
28impl TimerWheel {
29    /// Create a new timer manager
30    ///
31    /// # Parameters
32    /// - `config`: Timing wheel configuration, already validated
33    /// 
34    /// 创建新的定时器管理器
35    ///
36    /// # 参数
37    /// - `config`: 时间轮配置,已验证
38    ///
39    /// # Examples (示例)
40    /// ```no_run
41    /// use kestrel_timer::{TimerWheel, config::WheelConfig, TimerTask, config::BatchConfig};
42    /// use std::time::Duration;
43    ///
44    /// #[tokio::main]
45    /// async fn main() {
46    ///     let config = WheelConfig::builder()
47    ///         .l0_tick_duration(Duration::from_millis(10))
48    ///         .l0_slot_count(512)
49    ///         .l1_tick_duration(Duration::from_secs(1))
50    ///         .l1_slot_count(64)
51    ///         .build()
52    ///         .unwrap();
53    ///     let timer = TimerWheel::new(config, BatchConfig::default());
54    ///     
55    ///     // Use two-step API
56    ///     // (Use two-step API)
57    ///     let task = TimerWheel::create_task(Duration::from_secs(1), None);
58    ///     let handle = timer.register(task);
59    /// }
60    /// ```
61    pub fn new(config: WheelConfig, batch_config: BatchConfig) -> Self {
62        let tick_duration = config.l0_tick_duration;
63        let wheel = Wheel::new(config, batch_config);
64        let wheel = Arc::new(Mutex::new(wheel));
65        let wheel_clone = wheel.clone();
66
67        // Start background tick loop
68        // 启动后台 tick 循环
69        let tick_handle = tokio::spawn(async move {
70            Self::tick_loop(wheel_clone, tick_duration).await;
71        });
72
73        Self {
74            wheel,
75            tick_handle: Some(tick_handle),
76        }
77    }
78
79    /// Create timer manager with default configuration, hierarchical mode
80    /// - L0 layer tick duration: 10ms, slot count: 512
81    /// - L1 layer tick duration: 1s, slot count: 64
82    ///
83    /// # Parameters
84    /// - `config`: Timing wheel configuration, already validated
85    ///
86    /// # Returns
87    /// Timer manager instance
88    /// 
89    /// 使用默认配置创建定时器管理器,分层模式
90    /// - L0 层 tick 持续时间:10ms,槽数量:512
91    /// - L1 层 tick 持续时间:1s,槽数量:64
92    ///
93    /// # 参数
94    /// - `config`: 时间轮配置,已验证
95    ///
96    /// # 返回值
97    /// 定时器管理器实例
98    ///
99    /// # Examples (示例)
100    /// ```no_run
101    /// use kestrel_timer::TimerWheel;
102    ///
103    /// #[tokio::main]
104    /// async fn main() {
105    ///     let timer = TimerWheel::with_defaults();
106    /// }
107    /// ```
108    pub fn with_defaults() -> Self {
109        Self::new(WheelConfig::default(), BatchConfig::default())
110    }
111
112    /// Create TimerService bound to this timing wheel with default configuration
113    ///
114    /// # Parameters
115    /// - `service_config`: Service configuration
116    ///
117    /// # Returns
118    /// TimerService instance bound to this timing wheel
119    ///
120    /// 创建绑定到此时间轮的 TimerService,使用默认配置
121    ///
122    /// # 参数
123    /// - `service_config`: 服务配置
124    ///
125    /// # 返回值
126    /// 绑定到此时间轮的 TimerService 实例
127    ///
128    /// # Examples (示例)
129    /// ```no_run
130    /// use kestrel_timer::{TimerWheel, TimerService, CallbackWrapper, config::ServiceConfig};
131    /// use std::time::Duration;
132    /// 
133    ///
134    /// #[tokio::main]
135    /// async fn main() {
136    ///     let timer = TimerWheel::with_defaults();
137    ///     let mut service = timer.create_service(ServiceConfig::default());
138    ///     
139    ///     // Use two-step API to batch schedule timers through service
140    ///     // 使用两步 API 通过服务批量调度定时器
141    ///     let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
142    ///         .map(|_| (Duration::from_millis(100), Some(CallbackWrapper::new(|| async {}))))
143    ///         .collect();
144    ///     let tasks = TimerService::create_batch_with_callbacks(callbacks);
145    ///     service.register_batch(tasks).unwrap();
146    ///     
147    ///     // Receive timeout notifications
148    ///     // 接收超时通知
149    ///     let mut rx = service.take_receiver().unwrap();
150    ///     while let Some(task_id) = rx.recv().await {
151    ///         println!("Task {:?} completed", task_id);
152    ///     }
153    /// }
154    /// ```
155    pub fn create_service(&self, service_config: ServiceConfig) -> crate::service::TimerService {
156        crate::service::TimerService::new(self.wheel.clone(), service_config)
157    }
158    
159    /// Create TimerService bound to this timing wheel with custom configuration
160    ///
161    /// # Parameters
162    /// - `config`: Service configuration
163    ///
164    /// # Returns
165    /// TimerService instance bound to this timing wheel
166    ///
167    /// 创建绑定到此时间轮的 TimerService,使用自定义配置
168    ///
169    /// # 参数
170    /// - `config`: 服务配置
171    ///
172    /// # 返回值
173    /// 绑定到此时间轮的 TimerService 实例
174    ///
175    /// # Examples (示例)
176    /// ```no_run
177    /// use kestrel_timer::{TimerWheel, config::ServiceConfig};
178    ///
179    /// #[tokio::main]
180    /// async fn main() {
181    ///     let timer = TimerWheel::with_defaults();
182    ///     let config = ServiceConfig::builder()
183    ///         .command_channel_capacity(1024)
184    ///         .timeout_channel_capacity(2000)
185    ///         .build()
186    ///         .unwrap();
187    ///     let service = timer.create_service_with_config(config);
188    /// }
189    /// ```
190    pub fn create_service_with_config(&self, config: ServiceConfig) -> crate::service::TimerService {
191        crate::service::TimerService::new(self.wheel.clone(), config)
192    }
193
194    /// Create timer task (static method, apply stage)
195    /// 
196    /// # Parameters
197    /// - `delay`: Delay duration
198    /// - `callback`: Callback object implementing TimerCallback trait
199    /// 
200    /// # Returns
201    /// Return TimerTask, needs to be registered through `register()`
202    /// 
203    /// 创建定时器任务 (静态方法,应用阶段)
204    /// 
205    /// # 参数
206    /// - `delay`: 延迟时间
207    /// - `callback`: 回调对象,实现 TimerCallback 特质
208    /// 
209    /// # 返回值
210    /// 返回 TimerTask,需要通过 `register()` 注册
211    /// 
212    /// # Examples (示例)
213    /// ```no_run
214    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
215    /// use std::time::Duration;
216    /// 
217    /// 
218    /// #[tokio::main]
219    /// async fn main() {
220    ///     let timer = TimerWheel::with_defaults();
221    ///     
222    ///     // Step 1: Create task
223    ///     // 创建任务
224    ///     let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
225    ///         println!("Timer fired!");
226    ///     })));
227    ///     
228    ///     // Get task ID (获取任务 ID)
229    ///     let task_id = task.get_id();
230    ///     println!("Created task: {:?}", task_id);
231    ///     
232    ///     // Step 2: Register task
233    ///     // 注册任务
234    ///     let handle = timer.register(task);
235    /// }
236    /// ```
237    #[inline]
238    pub fn create_task(delay: Duration, callback: Option<CallbackWrapper>) -> crate::task::TimerTask {
239        crate::task::TimerTask::new(delay, callback)
240    }
241    
242    /// Create batch of timer tasks (static method, apply stage, no callbacks)
243    /// 
244    /// # Parameters
245    /// - `delays`: List of delay times
246    /// 
247    /// # Returns
248    /// Return TimerTask list, needs to be registered through `register_batch()`
249    /// 
250    /// 创建定时器任务 (静态方法,应用阶段,没有回调)
251    /// 
252    /// # 参数
253    /// - `delays`: 延迟时间列表
254    /// 
255    /// # 返回值
256    /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册
257    /// 
258    /// # Examples (示例)
259    /// ```no_run
260    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
261    /// use std::time::Duration;
262    /// use std::sync::Arc;
263    /// use std::sync::atomic::{AtomicU32, Ordering};
264    /// 
265    /// #[tokio::main]
266    /// async fn main() {
267    ///     let timer = TimerWheel::with_defaults();
268    ///     let counter = Arc::new(AtomicU32::new(0));
269    ///     
270    ///     // Step 1: Create batch of tasks
271    ///     // 创建批量任务
272    ///     let delays: Vec<Duration> = (0..3)
273    ///         .map(|_| Duration::from_millis(100))
274    ///         .collect();
275    ///     
276    ///     // Create batch of tasks
277    ///     let tasks = TimerWheel::create_batch(delays);
278    ///     println!("Created {} tasks", tasks.len());
279    ///     
280    ///     // Step 2: Register batch of tasks
281    ///     // 注册批量任务
282    ///     let batch = timer.register_batch(tasks);
283    /// }
284    /// ```
285    #[inline]
286    pub fn create_batch(delays: Vec<Duration>) -> Vec<crate::task::TimerTask>
287    {
288        delays
289            .into_iter()
290            .map(|delay| crate::task::TimerTask::new(delay, None))
291            .collect()
292    }
293
294    /// Create batch of timer tasks (static method, apply stage, with callbacks)
295    /// 
296    /// # Parameters
297    /// - `callbacks`: List of tuples of (delay time, callback)
298    /// 
299    /// # Returns
300    /// Return TimerTask list, needs to be registered through `register_batch()`
301    /// 
302    /// 创建定时器任务 (静态方法,应用阶段,有回调)
303    /// 
304    /// # 参数
305    /// - `callbacks`: (延迟时间, 回调) 元组列表
306    /// 
307    /// # 返回值
308    /// 返回 TimerTask 列表,需要通过 `register_batch()` 注册
309    /// 
310    /// # Examples (示例)
311    /// ```no_run
312    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
313    /// use std::time::Duration;
314    /// use std::sync::Arc;
315    /// use std::sync::atomic::{AtomicU32, Ordering};
316    /// 
317    /// #[tokio::main]
318    /// async fn main() {
319    ///     let timer = TimerWheel::with_defaults();
320    ///     let counter = Arc::new(AtomicU32::new(0));
321    ///     
322    ///     // Step 1: Create batch of tasks
323    ///     // 创建批量任务
324    ///     let delays: Vec<Duration> = (0..3)
325    ///         .map(|_| Duration::from_millis(100))
326    ///         .collect();
327    ///     let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = delays
328    ///         .into_iter()
329    ///         .map(|delay| {
330    ///             let counter = Arc::clone(&counter);
331    ///             let callback = Some(CallbackWrapper::new(move || {
332    ///                 let counter = Arc::clone(&counter);
333    ///                 async move {
334    ///                     counter.fetch_add(1, Ordering::SeqCst);
335    ///                 }
336    ///             }));
337    ///             (delay, callback)
338    ///         })
339    ///         .collect();
340    ///     
341    ///     // Create batch of tasks
342    ///     let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
343    ///     println!("Created {} tasks", tasks.len());
344    ///     
345    ///     // Step 2: Register batch of tasks
346    ///     // 注册批量任务
347    ///     let batch = timer.register_batch(tasks);
348    /// }
349    /// ```
350    #[inline]
351    pub fn create_batch_with_callbacks(callbacks: Vec<(Duration, Option<CallbackWrapper>)>) -> Vec<crate::task::TimerTask>
352    {
353        callbacks
354            .into_iter()
355            .map(|(delay, callback)| crate::task::TimerTask::new(delay, callback))
356            .collect()
357    }
358    
359    /// Register timer task to timing wheel (registration phase)
360    /// 
361    /// # Parameters
362    /// - `task`: Task created via `create_task()`
363    /// 
364    /// # Returns
365    /// Return timer handle with completion receiver that can be used to cancel timer and receive completion notifications
366    /// 
367    /// 注册定时器任务到时间轮 (注册阶段)
368    /// 
369    /// # 参数
370    /// - `task`: 通过 `create_task()` 创建的任务
371    /// 
372    /// # 返回值
373    /// 返回包含完成通知接收器的定时器句柄,可用于取消定时器和接收完成通知
374    /// 
375    /// # Examples (示例)
376    /// ```no_run
377    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
378    /// 
379    /// use std::time::Duration;
380    /// 
381    /// #[tokio::main]
382    /// async fn main() {
383    ///     let timer = TimerWheel::with_defaults();
384    ///     
385    ///     let task = TimerWheel::create_task(Duration::from_secs(1), Some(CallbackWrapper::new(|| async {
386    ///         println!("Timer fired!");
387    ///     })));
388    ///     let task_id = task.get_id();
389    ///     
390    ///     // Register task
391    ///     // 注册任务
392    ///     let handle = timer.register(task);
393    ///     
394    ///     // Wait for timer completion
395    ///     // 等待定时器完成
396    ///     let (rx, _handle) = handle.into_parts();
397    ///     rx.0.await.ok();
398    /// }
399    /// ```
400    #[inline]
401    pub fn register(&self, task: crate::task::TimerTask) -> TimerHandleWithCompletion {
402        let (completion_tx, completion_rx) = oneshot::channel();
403        let notifier = crate::task::CompletionNotifier(completion_tx);
404        
405        let task_id = task.id;
406        
407        // 单次加锁完成所有操作
408        {
409            let mut wheel_guard = self.wheel.lock();
410            wheel_guard.insert(task, notifier);
411        }
412        
413        TimerHandleWithCompletion::new(TimerHandle::new(task_id, self.wheel.clone()), completion_rx)
414    }
415    
416    /// Batch register timer tasks to timing wheel (registration phase)
417    /// 
418    /// # Parameters
419    /// - `tasks`: List of tasks created via `create_batch()`
420    /// 
421    /// # Returns
422    /// Return batch timer handle with completion receivers
423    /// 
424    /// 批量注册定时器任务到时间轮 (注册阶段)
425    /// 
426    /// # 参数
427    /// - `tasks`: 通过 `create_batch()` 创建的任务列表
428    /// 
429    /// # 返回值
430    /// 返回包含完成通知接收器的批量定时器句柄
431    /// 
432    /// # Examples (示例)
433    /// ```no_run
434    /// use kestrel_timer::{TimerWheel, TimerTask};
435    /// use std::time::Duration;
436    /// 
437    /// #[tokio::main]
438    /// async fn main() {
439    ///     let timer = TimerWheel::with_defaults();
440    ///     
441    ///     let delays: Vec<Duration> = (0..3)
442    ///         .map(|_| Duration::from_secs(1))
443    ///         .collect();
444    ///     let tasks = TimerWheel::create_batch(delays);
445    ///     
446    ///     let batch = timer.register_batch(tasks);
447    ///     println!("Registered {} timers", batch.len());
448    /// }
449    /// ```
450    #[inline]
451    pub fn register_batch(&self, tasks: Vec<crate::task::TimerTask>) -> BatchHandleWithCompletion {
452        let task_count = tasks.len();
453        let mut completion_rxs = Vec::with_capacity(task_count);
454        let mut task_ids = Vec::with_capacity(task_count);
455        let mut prepared_tasks = Vec::with_capacity(task_count);
456        
457        // Step 1: Prepare all channels and notifiers
458        for task in tasks {
459            let (completion_tx, completion_rx) = oneshot::channel();
460            let notifier = crate::task::CompletionNotifier(completion_tx);
461            
462            task_ids.push(task.id);
463            completion_rxs.push(completion_rx);
464            prepared_tasks.push((task, notifier));
465        }
466        
467        // Step 2: Single lock, batch insert
468        {
469            let mut wheel_guard = self.wheel.lock();
470            wheel_guard.insert_batch(prepared_tasks);
471        }
472        
473        BatchHandleWithCompletion::new(BatchHandle::new(task_ids, self.wheel.clone()), completion_rxs)
474    }
475
476    /// Cancel timer
477    ///
478    /// # Parameters
479    /// - `task_id`: Task ID
480    ///
481    /// # Returns
482    /// Returns true if task exists and is successfully cancelled, otherwise false
483    /// 
484    /// 取消定时器
485    ///
486    /// # 参数
487    /// - `task_id`: 任务 ID
488    ///
489    /// # 返回值
490    /// 如果任务存在且成功取消则返回 true,否则返回 false
491    /// 
492    /// # Examples (示例)
493    /// ```no_run
494    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
495    /// 
496    /// use std::time::Duration;
497    ///
498    /// #[tokio::main]
499    /// async fn main() {
500    ///     let timer = TimerWheel::with_defaults();
501    ///     
502    ///     let task = TimerWheel::create_task(Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
503    ///         println!("Timer fired!");
504    ///     })));
505    ///     let task_id = task.get_id();
506    ///     let _handle = timer.register(task);
507    ///     
508    ///     // Cancel task using task ID
509    ///     // 使用任务 ID 取消任务
510    ///     let cancelled = timer.cancel(task_id);
511    ///     println!("Canceled successfully: {}", cancelled);
512    /// }
513    /// ```
514    #[inline]
515    pub fn cancel(&self, task_id: TaskId) -> bool {
516        let mut wheel = self.wheel.lock();
517        wheel.cancel(task_id)
518    }
519
520    /// Batch cancel timers
521    ///
522    /// # Parameters
523    /// - `task_ids`: List of task IDs to cancel
524    ///
525    /// # Returns
526    /// Number of successfully cancelled tasks
527    ///
528    /// 批量取消定时器
529    ///
530    /// # 参数
531    /// - `task_ids`: 要取消的任务 ID 列表
532    ///
533    /// # 返回值
534    /// 成功取消的任务数量
535    ///
536    /// # Performance Advantages
537    /// - Batch processing reduces lock contention
538    /// - Internally optimized batch cancellation operation
539    ///
540    /// # Examples (示例)
541    /// ```no_run
542    /// use kestrel_timer::{TimerWheel, TimerTask};
543    /// use std::time::Duration;
544    ///
545    /// #[tokio::main]
546    /// async fn main() {
547    ///     let timer = TimerWheel::with_defaults();
548    ///     
549    ///     // Create multiple timers
550    ///     // 创建多个定时器
551    ///     let task1 = TimerWheel::create_task(Duration::from_secs(10), None);
552    ///     let task2 = TimerWheel::create_task(Duration::from_secs(10), None);
553    ///     let task3 = TimerWheel::create_task(Duration::from_secs(10), None);
554    ///     
555    ///     let task_ids = vec![task1.get_id(), task2.get_id(), task3.get_id()];
556    ///     
557    ///     let _h1 = timer.register(task1);
558    ///     let _h2 = timer.register(task2);
559    ///     let _h3 = timer.register(task3);
560    ///     
561    ///     // Batch cancel
562    ///     // 批量取消
563    ///     let cancelled = timer.cancel_batch(&task_ids);
564    ///     println!("Canceled {} timers", cancelled);
565    /// }
566    /// ```
567    #[inline]
568    pub fn cancel_batch(&self, task_ids: &[TaskId]) -> usize {
569        let mut wheel = self.wheel.lock();
570        wheel.cancel_batch(task_ids)
571    }
572
573    /// Postpone timer
574    ///
575    /// # Parameters
576    /// - `task_id`: Task ID to postpone
577    /// - `new_delay`: New delay duration, recalculated from current time
578    /// - `callback`: New callback function, pass `None` to keep original callback, pass `Some` to replace with new callback
579    ///
580    /// # Returns
581    /// Returns true if task exists and is successfully postponed, otherwise false
582    ///
583    /// 推迟定时器
584    ///
585    /// # 参数
586    /// - `task_id`: 要推迟的任务 ID
587    /// - `new_delay`: 新的延迟时间,从当前时间重新计算
588    /// - `callback`: 新的回调函数,传递 `None` 保持原始回调,传递 `Some` 替换为新的回调
589    ///
590    /// # 返回值
591    /// 如果任务存在且成功推迟则返回 true,否则返回 false
592    ///
593    /// # Note
594    /// - Task ID remains unchanged after postponement
595    /// - Original completion_receiver remains valid
596    ///
597    /// # 注意
598    /// - 任务 ID 在推迟后保持不变
599    /// - 原始 completion_receiver 保持有效
600    ///
601    /// # Examples (示例)
602    ///
603    /// ## Keep original callback (保持原始回调)
604    /// ```no_run
605    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
606    /// use std::time::Duration;
607    /// 
608    ///
609    /// #[tokio::main]
610    /// async fn main() {
611    ///     let timer = TimerWheel::with_defaults();
612    ///     
613    ///     let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
614    ///         println!("Timer fired!");
615    ///     })));
616    ///     let task_id = task.get_id();
617    ///     let _handle = timer.register(task);
618    ///     
619    ///     // Postpone to 10 seconds after triggering, and keep original callback
620    ///     // 推迟到 10 秒后触发,并保持原始回调
621    ///     let success = timer.postpone(task_id, Duration::from_secs(10), None);
622    ///     println!("Postponed successfully: {}", success);
623    /// }
624    /// ```
625    ///
626    /// ## Replace with new callback (替换为新的回调)
627    /// ```no_run
628    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
629    /// use std::time::Duration;
630    ///
631    /// #[tokio::main]
632    /// async fn main() {
633    ///     let timer = TimerWheel::with_defaults();
634    ///     
635    ///     let task = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
636    ///         println!("Original callback!");
637    ///     })));
638    ///     let task_id = task.get_id();
639    ///     let _handle = timer.register(task);
640    ///     
641    ///     // Postpone to 10 seconds after triggering, and replace with new callback
642    ///     // 推迟到 10 秒后触发,并替换为新的回调
643    ///     let success = timer.postpone(task_id, Duration::from_secs(10), Some(CallbackWrapper::new(|| async {
644    ///         println!("New callback!");
645    ///     })));
646    ///     println!("Postponed successfully: {}", success);
647    /// }
648    /// ```
649    #[inline]
650    pub fn postpone(
651        &self,
652        task_id: TaskId,
653        new_delay: Duration,
654        callback: Option<CallbackWrapper>,
655    ) -> bool {
656        let mut wheel = self.wheel.lock();
657        wheel.postpone(task_id, new_delay, callback)
658    }
659
660    /// Batch postpone timers (keep original callbacks)
661    ///
662    /// # Parameters
663    /// - `updates`: List of tuples of (task ID, new delay)
664    ///
665    /// # Returns
666    /// Number of successfully postponed tasks
667    ///
668    /// 批量推迟定时器 (保持原始回调)
669    ///
670    /// # 参数
671    /// - `updates`: (任务 ID, 新延迟) 元组列表
672    ///
673    /// # 返回值
674    /// 成功推迟的任务数量
675    ///
676    /// # Note
677    /// - This method keeps all tasks' original callbacks unchanged
678    /// - Use `postpone_batch_with_callbacks` if you need to replace callbacks
679    ///
680    /// # 注意
681    /// - 此方法保持所有任务的原始回调不变
682    /// - 如果需要替换回调,请使用 `postpone_batch_with_callbacks`
683    ///
684    /// # Performance Advantages
685    /// - Batch processing reduces lock contention
686    /// - Internally optimized batch postponement operation
687    ///
688    /// # Examples (示例)
689    /// ```no_run
690    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
691    /// use std::time::Duration;
692    ///
693    /// #[tokio::main]
694    /// async fn main() {
695    ///     let timer = TimerWheel::with_defaults();
696    ///     
697    ///     // Create multiple tasks with callbacks
698    ///     // 创建多个带有回调的任务
699    ///     let task1 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
700    ///         println!("Task 1 fired!");
701    ///     })));
702    ///     let task2 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
703    ///         println!("Task 2 fired!");
704    ///     })));
705    ///     let task3 = TimerWheel::create_task(Duration::from_secs(5), Some(CallbackWrapper::new(|| async {
706    ///         println!("Task 3 fired!");
707    ///     })));
708    ///     
709    ///     let task_ids = vec![
710    ///         (task1.get_id(), Duration::from_secs(10)),
711    ///         (task2.get_id(), Duration::from_secs(15)),
712    ///         (task3.get_id(), Duration::from_secs(20)),
713    ///     ];
714    ///     
715    ///     timer.register(task1);
716    ///     timer.register(task2);
717    ///     timer.register(task3);
718    ///     
719    ///     // Batch postpone (keep original callbacks)
720    ///     // 批量推迟 (保持原始回调)
721    ///     let postponed = timer.postpone_batch(task_ids);
722    ///     println!("Postponed {} timers", postponed);
723    /// }
724    /// ```
725    #[inline]
726    pub fn postpone_batch(&self, updates: Vec<(TaskId, Duration)>) -> usize {
727        let mut wheel = self.wheel.lock();
728        wheel.postpone_batch(updates)
729    }
730
731    /// Batch postpone timers (replace callbacks)
732    ///
733    /// # Parameters
734    /// - `updates`: List of tuples of (task ID, new delay, new callback)
735    ///
736    /// # Returns
737    /// Number of successfully postponed tasks
738    ///
739    /// 批量推迟定时器 (替换回调)
740    ///
741    /// # 参数
742    /// - `updates`: (任务 ID, 新延迟, 新回调) 元组列表
743    ///
744    /// # 返回值
745    /// 成功推迟的任务数量
746    ///
747    /// # Performance Advantages
748    /// - Batch processing reduces lock contention
749    /// - Internally optimized batch postponement operation
750    ///
751    /// # Examples (示例)
752    /// ```no_run
753    /// use kestrel_timer::{TimerWheel, TimerTask, CallbackWrapper};
754    /// use std::time::Duration;
755    /// use std::sync::Arc;
756    /// use std::sync::atomic::{AtomicU32, Ordering};
757    ///
758    /// #[tokio::main]
759    /// async fn main() {
760    ///     let timer = TimerWheel::with_defaults();
761    ///     let counter = Arc::new(AtomicU32::new(0));
762    ///     
763    ///     // Create multiple timers
764    ///     // 创建多个定时器
765    ///     let task1 = TimerWheel::create_task(Duration::from_secs(5), None);
766    ///     let task2 = TimerWheel::create_task(Duration::from_secs(5), None);
767    ///     
768    ///     let id1 = task1.get_id();
769    ///     let id2 = task2.get_id();
770    ///     
771    ///     timer.register(task1);
772    ///     timer.register(task2);
773    ///     
774    ///     // Batch postpone and replace callbacks
775    ///     // 批量推迟并替换回调
776    ///     let updates: Vec<_> = vec![id1, id2]
777    ///         .into_iter()
778    ///         .map(|id| {
779    ///             let counter = Arc::clone(&counter);
780    ///             (id, Duration::from_secs(10), Some(CallbackWrapper::new(move || {
781    ///                 let counter = Arc::clone(&counter);
782    ///                 async move { counter.fetch_add(1, Ordering::SeqCst); }
783    ///             })))
784    ///         })
785    ///         .collect();
786    ///     let postponed = timer.postpone_batch_with_callbacks(updates);
787    ///     println!("Postponed {} timers", postponed);
788    /// }
789    /// ```
790    #[inline]
791    pub fn postpone_batch_with_callbacks(
792        &self,
793        updates: Vec<(TaskId, Duration, Option<CallbackWrapper>)>,
794    ) -> usize {
795        let mut wheel = self.wheel.lock();
796        wheel.postpone_batch_with_callbacks(updates.to_vec())
797    }
798    
799    /// Core tick loop
800    async fn tick_loop(wheel: Arc<Mutex<Wheel>>, tick_duration: Duration) {
801        let mut interval = tokio::time::interval(tick_duration);
802        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
803
804        loop {
805            interval.tick().await;
806
807            // Advance timing wheel and get expired tasks
808            let expired_tasks = {
809                let mut wheel_guard = wheel.lock();
810                wheel_guard.advance()
811            };
812
813            // Execute expired tasks
814            for task in expired_tasks {
815                let callback = task.get_callback();
816                
817                // Move task ownership to get completion_notifier
818                let notifier = task.completion_notifier;
819                
820                // Only registered tasks have notifier
821                if let Some(notifier) = notifier {
822                    // Execute callback in a separate tokio task, and send notification after callback completion
823                    if let Some(callback) = callback {
824                        tokio::spawn(async move {
825                            // Execute callback
826                            let future = callback.call();
827                            future.await;
828                            
829                            // Send notification after callback completion
830                            let _ = notifier.0.send(TaskCompletionReason::Expired);
831                        });
832                    } else {
833                        // Send notification immediately if no callback
834                        let _ = notifier.0.send(TaskCompletionReason::Expired);
835                    }
836                }
837            }
838        }
839    }
840
841    /// Graceful shutdown of TimerWheel
842    /// 
843    /// 优雅关闭 TimerWheel
844    /// 
845    /// # Examples (示例)
846    /// ```no_run
847    /// # use kestrel_timer::TimerWheel;
848    /// # #[tokio::main]
849    /// # async fn main() {
850    /// let timer = TimerWheel::with_defaults();
851    /// 
852    /// // Use timer... (使用定时器...)
853    /// 
854    /// timer.shutdown().await;
855    /// # }
856    /// ```
857    pub async fn shutdown(mut self) {
858        if let Some(handle) = self.tick_handle.take() {
859            handle.abort();
860            let _ = handle.await;
861        }
862    }
863}
864
865impl Drop for TimerWheel {
866    fn drop(&mut self) {
867        if let Some(handle) = self.tick_handle.take() {
868            handle.abort();
869        }
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876    use std::sync::atomic::{AtomicU32, Ordering};
877
878    #[tokio::test]
879    async fn test_timer_creation() {
880        let _timer = TimerWheel::with_defaults();
881    }
882
883    #[tokio::test]
884    async fn test_schedule_once() {
885        use std::sync::Arc;
886        let timer = TimerWheel::with_defaults();
887        let counter = Arc::new(AtomicU32::new(0));
888        let counter_clone = Arc::clone(&counter);
889
890        let task = TimerWheel::create_task(
891            Duration::from_millis(50),
892            Some(CallbackWrapper::new(move || {
893                let counter = Arc::clone(&counter_clone);
894                async move {
895                    counter.fetch_add(1, Ordering::SeqCst);
896                }
897            })),
898        );
899        let _handle = timer.register(task);
900
901        // Wait for timer to trigger
902        // 等待定时器触发
903        tokio::time::sleep(Duration::from_millis(100)).await;
904        assert_eq!(counter.load(Ordering::SeqCst), 1);
905    }
906
907    #[tokio::test]
908    async fn test_cancel_timer() {
909        use std::sync::Arc;
910        let timer = TimerWheel::with_defaults();
911        let counter = Arc::new(AtomicU32::new(0));
912        let counter_clone = Arc::clone(&counter);
913
914        let task = TimerWheel::create_task(
915            Duration::from_millis(100),
916            Some(CallbackWrapper::new(move || {
917                let counter = Arc::clone(&counter_clone);
918                async move {
919                    counter.fetch_add(1, Ordering::SeqCst);
920                }
921            })),
922        );
923        let handle = timer.register(task);
924
925        // Immediately cancel
926        // 立即取消
927        let cancel_result = handle.cancel();
928        assert!(cancel_result);
929
930        // Wait for enough time to ensure timer does not trigger
931        // 等待足够时间确保定时器不触发
932        tokio::time::sleep(Duration::from_millis(200)).await;
933        assert_eq!(counter.load(Ordering::SeqCst), 0);
934    }
935
936    #[tokio::test]
937    async fn test_cancel_immediate() {
938        use std::sync::Arc;
939        let timer = TimerWheel::with_defaults();
940        let counter = Arc::new(AtomicU32::new(0));
941        let counter_clone = Arc::clone(&counter);
942
943        let task = TimerWheel::create_task(
944            Duration::from_millis(100),
945            Some(CallbackWrapper::new(move || {
946                let counter = Arc::clone(&counter_clone);
947                async move {
948                    counter.fetch_add(1, Ordering::SeqCst);
949                }
950            })),
951        );
952        let handle = timer.register(task);
953
954        // Immediately cancel
955        // 立即取消
956        let cancel_result = handle.cancel();
957        assert!(cancel_result);
958
959        // Wait for enough time to ensure timer does not trigger
960        // 等待足够时间确保定时器不触发
961        tokio::time::sleep(Duration::from_millis(200)).await;
962        assert_eq!(counter.load(Ordering::SeqCst), 0);
963    }
964
965    #[tokio::test]
966    async fn test_postpone_timer() {
967        use std::sync::Arc;
968        let timer = TimerWheel::with_defaults();
969        let counter = Arc::new(AtomicU32::new(0));
970        let counter_clone = Arc::clone(&counter);
971
972        let task = TimerWheel::create_task(
973            Duration::from_millis(50),
974            Some(CallbackWrapper::new(move || {
975                let counter = Arc::clone(&counter_clone);
976                async move {
977                    counter.fetch_add(1, Ordering::SeqCst);
978                }
979            })),
980        );
981        let task_id = task.get_id();
982        let handle = timer.register(task);
983
984        // Postpone task to 150ms
985        // 推迟任务到 150ms
986        let postponed = timer.postpone(task_id, Duration::from_millis(150), None);
987        assert!(postponed);
988
989        // Wait for original time 50ms, task should not trigger
990        // 等待原始时间 50ms,任务不应触发
991        tokio::time::sleep(Duration::from_millis(70)).await;
992        assert_eq!(counter.load(Ordering::SeqCst), 0);
993
994        // Wait for new trigger time (from postponed start, need to wait about 150ms)
995        // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
996        let (rx, _handle) = handle.into_parts();
997        let result = tokio::time::timeout(
998            Duration::from_millis(200),
999            rx.0
1000        ).await;
1001        assert!(result.is_ok());
1002        
1003        // Wait for callback to execute
1004        // 等待回调执行
1005        tokio::time::sleep(Duration::from_millis(20)).await;
1006        assert_eq!(counter.load(Ordering::SeqCst), 1);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_postpone_with_callback() {
1011        use std::sync::Arc;
1012        let timer = TimerWheel::with_defaults();
1013        let counter = Arc::new(AtomicU32::new(0));
1014        let counter_clone1 = Arc::clone(&counter);
1015        let counter_clone2 = Arc::clone(&counter);
1016
1017        // Create task, original callback adds 1
1018        let task = TimerWheel::create_task(
1019            Duration::from_millis(50),
1020            Some(CallbackWrapper::new(move || {
1021                let counter = Arc::clone(&counter_clone1);
1022                async move {
1023                    counter.fetch_add(1, Ordering::SeqCst);
1024                }
1025            })),
1026        );
1027        let task_id = task.get_id();
1028        let handle = timer.register(task);
1029
1030        // Postpone task and replace callback, new callback adds 10
1031        // 推迟任务并替换回调,新回调增加 10
1032        let postponed = timer.postpone(
1033            task_id,
1034            Duration::from_millis(100),
1035            Some(CallbackWrapper::new(move || {
1036                let counter = Arc::clone(&counter_clone2);
1037                async move {
1038                    counter.fetch_add(10, Ordering::SeqCst);
1039                }
1040            })),
1041        );
1042        assert!(postponed);
1043
1044        // Wait for task to trigger (after postponed, need to wait 100ms, plus margin)
1045        // 等待任务触发(推迟后,需要等待 100ms,加上余量)
1046        let (rx, _handle) = handle.into_parts();
1047        let result = tokio::time::timeout(
1048            Duration::from_millis(200),
1049            rx.0
1050        ).await;
1051        assert!(result.is_ok());
1052        
1053        // Wait for callback to execute
1054        // 等待回调执行
1055        tokio::time::sleep(Duration::from_millis(20)).await;
1056        
1057        // Verify new callback is executed (increased 10 instead of 1)
1058        // 验证新回调已执行(增加 10 而不是 1)
1059        assert_eq!(counter.load(Ordering::SeqCst), 10);
1060    }
1061
1062    #[tokio::test]
1063    async fn test_postpone_nonexistent_timer() {
1064        let timer = TimerWheel::with_defaults();
1065        
1066        // Try to postpone nonexistent task
1067        // 尝试推迟一个不存在的任务
1068        let fake_task = TimerWheel::create_task(Duration::from_millis(50), None);
1069        let fake_task_id = fake_task.get_id();
1070        // Do not register this task
1071        // 不注册这个任务
1072        let postponed = timer.postpone(fake_task_id, Duration::from_millis(100), None);
1073        assert!(!postponed);
1074    }
1075
1076    #[tokio::test]
1077    async fn test_postpone_batch() {
1078        use std::sync::Arc;
1079        let timer = TimerWheel::with_defaults();
1080        let counter = Arc::new(AtomicU32::new(0));
1081
1082        // Create 3 tasks
1083        // 创建 3 个任务
1084        let mut task_ids = Vec::new();
1085        for _ in 0..3 {
1086            let counter_clone = Arc::clone(&counter);
1087            let task = TimerWheel::create_task(
1088                Duration::from_millis(50),
1089                Some(CallbackWrapper::new(move || {
1090                    let counter = Arc::clone(&counter_clone);
1091                    async move {
1092                        counter.fetch_add(1, Ordering::SeqCst);
1093                    }
1094                })),
1095            );
1096            task_ids.push((task.get_id(), Duration::from_millis(150)));
1097            timer.register(task);
1098        }
1099
1100        // Batch postpone
1101        // 批量推迟
1102        let postponed = timer.postpone_batch(task_ids);
1103        assert_eq!(postponed, 3);
1104
1105        // Wait for original time 50ms, task should not trigger
1106        // 等待原始时间 50ms,任务不应触发
1107        tokio::time::sleep(Duration::from_millis(70)).await;
1108        assert_eq!(counter.load(Ordering::SeqCst), 0);
1109
1110        // Wait for new trigger time (from postponed start, need to wait about 150ms)
1111        // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1112        tokio::time::sleep(Duration::from_millis(200)).await;
1113        
1114        // Wait for callback to execute
1115        // 等待回调执行
1116        tokio::time::sleep(Duration::from_millis(20)).await;
1117        assert_eq!(counter.load(Ordering::SeqCst), 3);
1118    }
1119
1120    #[tokio::test]
1121    async fn test_postpone_batch_with_callbacks() {
1122        use std::sync::Arc;
1123        let timer = TimerWheel::with_defaults();
1124        let counter = Arc::new(AtomicU32::new(0));
1125
1126        // Create 3 tasks
1127        // 创建 3 个任务
1128        let mut task_ids = Vec::new();
1129        for _ in 0..3 {
1130            let task = TimerWheel::create_task(
1131                Duration::from_millis(50),
1132                None
1133            );
1134            task_ids.push(task.get_id());
1135            timer.register(task);
1136        }
1137
1138        // Batch postpone and replace callbacks
1139        // 批量推迟并替换回调
1140        let updates: Vec<_> = task_ids
1141            .into_iter()
1142            .map(|id| {
1143                let counter_clone = Arc::clone(&counter);
1144                (id, Duration::from_millis(150), Some(CallbackWrapper::new(move || {
1145                    let counter = Arc::clone(&counter_clone);
1146                    async move {
1147                        counter.fetch_add(1, Ordering::SeqCst);
1148                    }
1149                })))
1150            })
1151            .collect();
1152
1153        // Batch postpone and replace callbacks
1154        // 批量推迟并替换回调
1155        let postponed = timer.postpone_batch_with_callbacks(updates);
1156        assert_eq!(postponed, 3);
1157
1158        // Wait for original time 50ms, task should not trigger
1159        // 等待原始时间 50ms,任务不应触发
1160        tokio::time::sleep(Duration::from_millis(70)).await;
1161        assert_eq!(counter.load(Ordering::SeqCst), 0);
1162
1163        // Wait for new trigger time (from postponed start, need to wait about 150ms)
1164        // 等待新的触发时间(从推迟开始算起,大约需要等待 150ms)
1165        tokio::time::sleep(Duration::from_millis(200)).await;
1166        
1167        // Wait for callback to execute
1168        // 等待回调执行
1169        tokio::time::sleep(Duration::from_millis(20)).await;
1170        assert_eq!(counter.load(Ordering::SeqCst), 3);
1171    }
1172
1173    #[tokio::test]
1174    async fn test_postpone_keeps_completion_receiver_valid() {
1175        use std::sync::Arc;
1176        let timer = TimerWheel::with_defaults();
1177        let counter = Arc::new(AtomicU32::new(0));
1178        let counter_clone = Arc::clone(&counter);
1179
1180        let task = TimerWheel::create_task(
1181            Duration::from_millis(50),
1182            Some(CallbackWrapper::new(move || {
1183                let counter = Arc::clone(&counter_clone);
1184                async move {
1185                    counter.fetch_add(1, Ordering::SeqCst);
1186                }
1187            })),
1188        );
1189        let task_id = task.get_id();
1190        let handle = timer.register(task);
1191
1192        // Postpone task
1193        // 推迟任务
1194        timer.postpone(task_id, Duration::from_millis(100), None);
1195
1196        // Verify original completion_receiver is still valid (after postponed, need to wait 100ms, plus margin)
1197        // 验证原始完成接收器是否仍然有效(推迟后,需要等待 100ms,加上余量)
1198        let (rx, _handle) = handle.into_parts();
1199        let result = tokio::time::timeout(
1200            Duration::from_millis(200),
1201            rx.0
1202        ).await;
1203        assert!(result.is_ok(), "Completion receiver should still work after postpone");
1204        
1205        // Wait for callback to execute
1206        tokio::time::sleep(Duration::from_millis(20)).await;
1207        assert_eq!(counter.load(Ordering::SeqCst), 1);
1208    }
1209}
1210