kestrel_timer/
lib.rs

1//! # High-Performance Async Timer System
2//!
3//! High-performance async timer based on Timing Wheel algorithm, supports tokio runtime
4//!
5//! ## Features
6//!
7//! - **High Performance**: Uses timing wheel algorithm, insert and delete operations are O(1)
8//! - **Large-Scale Support**: Efficiently manages 10000+ concurrent timers
9//! - **Async Support**: Based on tokio async runtime
10//! - **Thread-Safe**: Uses parking_lot for high-performance locking mechanism
11//!
12//! 
13//! # 高性能异步定时器库
14//! 
15//! 基于分层时间轮算法的高性能异步定时器库,支持 tokio 运行时
16//! 
17//! ## 特性
18//! 
19//! - **高性能**: 使用时间轮算法,插入和删除操作均为 O(1)
20//! - **大规模支持**: 高效管理 10000+ 并发定时器
21//! - **异步支持**: 基于 tokio 异步运行时
22//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
23//! 
24//! ## Quick Start (快速开始)
25//!
26//! ```no_run
27//! use kestrel_timer::{TimerWheel, CallbackWrapper};
28//! use std::time::Duration;
29//! use std::sync::Arc;
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     // Create timer manager (创建定时器管理器)
34//!     let timer = TimerWheel::with_defaults();
35//!     
36//!     // Step 1: Create timer task (使用回调创建定时器任务)
37//!     let callback = Some(CallbackWrapper::new(|| async {
38//!         println!("Timer fired after 1 second!");
39//!     }));
40//!     let task = TimerWheel::create_task(Duration::from_secs(1), callback);
41//!     let task_id = task.get_id();
42//!     
43//!     // Step 2: Register timer task and get completion notification (注册定时器任务并获取完成通知)
44//!     let handle = timer.register(task);
45//!     
46//!     // Wait for timer completion (等待定时器完成)
47//!     let (rx, _handle) = handle.into_parts();
48//!     rx.0.await?;
49//!     Ok(())
50//! }
51//! ```
52//!
53//! ## English Architecture Description
54//!
55//! ### Timing Wheel Algorithm
56//!
57//! Uses hierarchical timing wheel algorithm with L0 and L1 layers:
58//!
59//! - **L0 Layer (Bottom)**: Handles short delay tasks
60//!   - Slot count: Default 512, configurable, must be power of 2
61//!   - Time precision: Default 10ms, configurable
62//!   - Maximum time span: 5.12 seconds
63//!
64//! - **L1 Layer (Upper)**: Handles long delay tasks
65//!   - Slot count: Default 64, configurable, must be power of 2
66//!   - Time precision: Default 1 second, configurable
67//!   - Maximum time span: 64 seconds
68//!
69//! - **Round Mechanism**: Tasks beyond L1 range use round counting
70//! 
71//! ### Performance Optimization
72//!
73//! - Uses `parking_lot::Mutex` instead of standard library Mutex for better performance
74//!   - Uses `FxHashMap` (rustc-hash) instead of standard HashMap to reduce hash collisions
75//!   - Slot count is power of 2, uses bitwise operations to optimize modulo
76//!   - Task execution in separate tokio tasks to avoid blocking timing wheel advancement
77//! 
78//! 
79//! 
80//! ## 中文架构说明
81//!
82//! ### 时间轮算法
83//!
84//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
85//!
86//! - **L0 层(底层)**: 处理短延迟任务
87//!   - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
88//!   - 时间精度: 默认 10ms(可配置)
89//!   - 最大时间跨度: 5.12 秒
90//!
91//! - **L1 层(高层)**: 处理长延迟任务
92//!   - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
93//!   - 时间精度: 默认 1 秒(可配置)
94//!   - 最大时间跨度: 64 秒
95//!
96//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
97//!
98//! ### 性能优化
99//!
100//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
101//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
102//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
103//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
104//! 
105
106pub mod config;
107pub mod error;
108pub mod task;
109pub mod wheel;
110pub mod timer;
111mod service;
112
113// Re-export public API
114pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletionReason};
115pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion, CompletionReceiver};
116pub use timer::TimerWheel;
117pub use service::TimerService;
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use std::sync::atomic::{AtomicU32, Ordering};
123    use std::sync::Arc;
124    use std::time::Duration;
125    use crate::task::TaskCompletionReason;
126
127    #[tokio::test]
128    async fn test_basic_timer() {
129        let timer = TimerWheel::with_defaults();
130        let counter = Arc::new(AtomicU32::new(0));
131        let counter_clone = Arc::clone(&counter);
132
133        let task = TimerWheel::create_task(
134            Duration::from_millis(50),
135            Some(CallbackWrapper::new(move || {
136                let counter =  Arc::clone(&counter_clone);
137                async move {
138                    counter.fetch_add(1, Ordering::SeqCst);
139                }
140            })),
141        );
142        timer.register(task);
143
144        tokio::time::sleep(Duration::from_millis(100)).await;
145        assert_eq!(counter.load(Ordering::SeqCst), 1);
146    }
147
148    #[tokio::test]
149    async fn test_multiple_timers() {
150        let timer = TimerWheel::with_defaults();
151        let counter = Arc::new(AtomicU32::new(0));
152
153        // Create 10 timers
154        for i in 0..10 {
155            let counter_clone = Arc::clone(&counter);
156            let task = TimerWheel::create_task(
157                Duration::from_millis(10 * (i + 1)),
158                Some(CallbackWrapper::new(move || {
159                    let counter = Arc::clone(&counter_clone);
160                    async move {
161                        counter.fetch_add(1, Ordering::SeqCst);
162                    }
163                })),
164            );
165            timer.register(task);
166        }
167
168        tokio::time::sleep(Duration::from_millis(200)).await;
169        assert_eq!(counter.load(Ordering::SeqCst), 10);
170    }
171
172    #[tokio::test]
173    async fn test_timer_cancellation() {
174        let timer = TimerWheel::with_defaults();
175        let counter = Arc::new(AtomicU32::new(0));
176
177        // Create 5 timers
178        let mut handles = Vec::new();
179        for _ in 0..5 {
180            let counter_clone = Arc::clone(&counter);
181            let task = TimerWheel::create_task(
182                Duration::from_millis(100),
183                Some(CallbackWrapper::new(move || {
184                    let counter = Arc::clone(&counter_clone);
185                    async move {
186                        counter.fetch_add(1, Ordering::SeqCst);
187                    }
188                })),
189            );
190            let handle = timer.register(task);
191            handles.push(handle);
192        }
193
194        // Cancel first 3 timers
195        for i in 0..3 {
196            let cancel_result = handles[i].cancel();
197            assert!(cancel_result);
198        }
199
200        tokio::time::sleep(Duration::from_millis(200)).await;
201        // Only 2 timers should be triggered
202        assert_eq!(counter.load(Ordering::SeqCst), 2);
203    }
204
205    #[tokio::test]
206    async fn test_completion_notification_once() {
207        let timer = TimerWheel::with_defaults();
208        let counter = Arc::new(AtomicU32::new(0));
209        let counter_clone = Arc::clone(&counter);
210
211        let task = TimerWheel::create_task(
212            Duration::from_millis(50),
213            Some(CallbackWrapper::new(move || {
214                let counter = Arc::clone(&counter_clone);
215                async move {
216                    counter.fetch_add(1, Ordering::SeqCst);
217                }
218            })),
219        );
220        let handle = timer.register(task);
221
222        // Wait for completion notification
223        let (rx, _handle) = handle.into_parts();
224        rx.0.await.expect("Should receive completion notification");
225
226        // Verify callback has been executed (wait a moment to ensure callback execution is complete)
227        tokio::time::sleep(Duration::from_millis(20)).await;
228        assert_eq!(counter.load(Ordering::SeqCst), 1);
229    }
230
231    #[tokio::test]
232    async fn test_notify_only_timer_once() {
233        let timer = TimerWheel::with_defaults();
234        
235        let task = TimerTask::new(Duration::from_millis(50), None);
236        let handle = timer.register(task);
237
238        // Wait for completion notification (no callback, only notification)
239        let (rx, _handle) = handle.into_parts();
240        rx.0.await.expect("Should receive completion notification");
241    }
242
243    #[tokio::test]
244    async fn test_batch_completion_notifications() {
245        let timer = TimerWheel::with_defaults();
246        let counter = Arc::new(AtomicU32::new(0));
247
248        // Create batch callbacks
249        let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
250            .map(|i| {
251                let counter = Arc::clone(&counter);
252                let delay = Duration::from_millis(50 + i * 10);
253                let callback = CallbackWrapper::new(move || {
254                    let counter = Arc::clone(&counter);
255                    async move {
256                        counter.fetch_add(1, Ordering::SeqCst);
257                    }
258                });
259                (delay, Some(callback))
260            })
261            .collect();
262
263        let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
264        let batch = timer.register_batch(tasks);
265        let (receivers, _batch_handle) = batch.into_parts();
266
267        // Wait for all completion notifications
268        for rx in receivers {
269            rx.await.expect("Should receive completion notification");
270        }
271
272        // Wait a moment to ensure callback execution is complete
273        tokio::time::sleep(Duration::from_millis(50)).await;
274
275        // Verify all callbacks have been executed
276        assert_eq!(counter.load(Ordering::SeqCst), 5);
277    }
278
279    #[tokio::test]
280    async fn test_completion_reason_expired() {
281        let timer = TimerWheel::with_defaults();
282        
283        let task = TimerTask::new(Duration::from_millis(50), None);
284        let handle = timer.register(task);
285
286        // Wait for completion notification and verify reason is Expired
287        let (rx, _handle) = handle.into_parts();
288        let result = rx.0.await.expect("Should receive completion notification");
289        assert_eq!(result, TaskCompletionReason::Expired);
290    }
291
292    #[tokio::test]
293    async fn test_completion_reason_cancelled() {
294        let timer = TimerWheel::with_defaults();
295        
296        let task = TimerTask::new(Duration::from_secs(10), None);
297        let handle = timer.register(task);
298
299        // Cancel task
300        let cancelled = handle.cancel();
301        assert!(cancelled);
302
303        // Wait for completion notification and verify reason is Cancelled
304        let (rx, _handle) = handle.into_parts();
305        let result = rx.0.await.expect("Should receive completion notification");
306        assert_eq!(result, TaskCompletionReason::Cancelled);
307    }
308
309    #[tokio::test]
310    async fn test_batch_completion_reasons() {
311        let timer = TimerWheel::with_defaults();
312        
313        // Create 5 tasks, delay 10 seconds
314        let tasks: Vec<_> = (0..5)
315            .map(|_| TimerTask::new(Duration::from_secs(10), None))
316            .collect();
317        
318        let batch = timer.register_batch(tasks);
319        let task_ids: Vec<_> = batch.task_ids().to_vec();
320        let (mut receivers, _batch_handle) = batch.into_parts();
321
322        // Cancel first 3 tasks
323        timer.cancel_batch(&task_ids[0..3]);
324
325        // Verify first 3 tasks received Cancelled notification
326        for rx in receivers.drain(0..3) {
327            let result = rx.await.expect("Should receive completion notification");
328            assert_eq!(result, TaskCompletionReason::Cancelled);
329        }
330
331        // Cancel remaining tasks and verify
332        timer.cancel_batch(&task_ids[3..5]);
333        for rx in receivers {
334            let result = rx.await.expect("Should receive completion notification");
335            assert_eq!(result, TaskCompletionReason::Cancelled);
336        }
337    }
338}