kestrel_timer/
lib.rs

1//! # High-Performance Async Timer System
2//!
3//! High-performance async timer based on Timing Wheel algorithm, supports tokio runtime
4//!
5//! ## Features
6//!
7//! - **High Performance**: Uses timing wheel algorithm, insert and delete operations are O(1)
8//! - **Large-Scale Support**: Efficiently manages 10000+ concurrent timers
9//! - **Async Support**: Based on tokio async runtime
10//! - **Thread-Safe**: Uses parking_lot for high-performance locking mechanism
11//!
12//! 
13//! # 高性能异步定时器库
14//! 
15//! 基于分层时间轮算法的高性能异步定时器库,支持 tokio 运行时
16//! 
17//! ## 特性
18//! 
19//! - **高性能**: 使用时间轮算法,插入和删除操作均为 O(1)
20//! - **大规模支持**: 高效管理 10000+ 并发定时器
21//! - **异步支持**: 基于 tokio 异步运行时
22//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
23//! 
24//! ## Quick Start (快速开始)
25//!
26//! ```no_run
27//! use kestrel_timer::{TimerWheel, CallbackWrapper};
28//! use std::time::Duration;
29//! use std::sync::Arc;
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     // Create timer manager (创建定时器管理器)
34//!     let timer = TimerWheel::with_defaults();
35//!     
36//!     // Step 1: Create timer task (使用回调创建定时器任务)
37//!     let callback = Some(CallbackWrapper::new(|| async {
38//!         println!("Timer fired after 1 second!");
39//!     }));
40//!     let task = TimerWheel::create_task(Duration::from_secs(1), callback);
41//!     let task_id = task.get_id();
42//!     
43//!     // Step 2: Register timer task and get completion notification (注册定时器任务并获取完成通知)
44//!     let handle = timer.register(task);
45//!     
46//!     // Wait for timer completion (等待定时器完成)
47//!     handle.into_completion_receiver().0.await?;
48//!     Ok(())
49//! }
50//! ```
51//!
52//! ## English Architecture Description
53//!
54//! ### Timing Wheel Algorithm
55//!
56//! Uses hierarchical timing wheel algorithm with L0 and L1 layers:
57//!
58//! - **L0 Layer (Bottom)**: Handles short delay tasks
59//!   - Slot count: Default 512, configurable, must be power of 2
60//!   - Time precision: Default 10ms, configurable
61//!   - Maximum time span: 5.12 seconds
62//!
63//! - **L1 Layer (Upper)**: Handles long delay tasks
64//!   - Slot count: Default 64, configurable, must be power of 2
65//!   - Time precision: Default 1 second, configurable
66//!   - Maximum time span: 64 seconds
67//!
68//! - **Round Mechanism**: Tasks beyond L1 range use round counting
69//! 
70//! ### Performance Optimization
71//!
72//! - Uses `parking_lot::Mutex` instead of standard library Mutex for better performance
73//!   - Uses `FxHashMap` (rustc-hash) instead of standard HashMap to reduce hash collisions
74//!   - Slot count is power of 2, uses bitwise operations to optimize modulo
75//!   - Task execution in separate tokio tasks to avoid blocking timing wheel advancement
76//! 
77//! 
78//! 
79//! ## 中文架构说明
80//!
81//! ### 时间轮算法
82//!
83//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
84//!
85//! - **L0 层(底层)**: 处理短延迟任务
86//!   - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
87//!   - 时间精度: 默认 10ms(可配置)
88//!   - 最大时间跨度: 5.12 秒
89//!
90//! - **L1 层(高层)**: 处理长延迟任务
91//!   - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
92//!   - 时间精度: 默认 1 秒(可配置)
93//!   - 最大时间跨度: 64 秒
94//!
95//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
96//!
97//! ### 性能优化
98//!
99//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
100//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
101//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
102//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
103//! 
104
105pub mod config;
106pub mod error;
107pub mod task;
108pub mod wheel;
109pub mod timer;
110mod service;
111
112// Re-export public API
113pub use task::{CallbackWrapper, TaskId, TimerTask};
114pub use timer::{TimerWheel};
115pub use service::TimerService;
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use std::sync::atomic::{AtomicU32, Ordering};
121    use std::sync::Arc;
122    use std::time::Duration;
123    use crate::task::TaskCompletionReason;
124
125    #[tokio::test]
126    async fn test_basic_timer() {
127        let timer = TimerWheel::with_defaults();
128        let counter = Arc::new(AtomicU32::new(0));
129        let counter_clone = Arc::clone(&counter);
130
131        let task = TimerWheel::create_task(
132            Duration::from_millis(50),
133            Some(CallbackWrapper::new(move || {
134                let counter =  Arc::clone(&counter_clone);
135                async move {
136                    counter.fetch_add(1, Ordering::SeqCst);
137                }
138            })),
139        );
140        timer.register(task);
141
142        tokio::time::sleep(Duration::from_millis(100)).await;
143        assert_eq!(counter.load(Ordering::SeqCst), 1);
144    }
145
146    #[tokio::test]
147    async fn test_multiple_timers() {
148        let timer = TimerWheel::with_defaults();
149        let counter = Arc::new(AtomicU32::new(0));
150
151        // Create 10 timers
152        for i in 0..10 {
153            let counter_clone = Arc::clone(&counter);
154            let task = TimerWheel::create_task(
155                Duration::from_millis(10 * (i + 1)),
156                Some(CallbackWrapper::new(move || {
157                    let counter = Arc::clone(&counter_clone);
158                    async move {
159                        counter.fetch_add(1, Ordering::SeqCst);
160                    }
161                })),
162            );
163            timer.register(task);
164        }
165
166        tokio::time::sleep(Duration::from_millis(200)).await;
167        assert_eq!(counter.load(Ordering::SeqCst), 10);
168    }
169
170    #[tokio::test]
171    async fn test_timer_cancellation() {
172        let timer = TimerWheel::with_defaults();
173        let counter = Arc::new(AtomicU32::new(0));
174
175        // Create 5 timers
176        let mut handles = Vec::new();
177        for _ in 0..5 {
178            let counter_clone = Arc::clone(&counter);
179            let task = TimerWheel::create_task(
180                Duration::from_millis(100),
181                Some(CallbackWrapper::new(move || {
182                    let counter = Arc::clone(&counter_clone);
183                    async move {
184                        counter.fetch_add(1, Ordering::SeqCst);
185                    }
186                })),
187            );
188            let handle = timer.register(task);
189            handles.push(handle);
190        }
191
192        // Cancel first 3 timers
193        for i in 0..3 {
194            let cancel_result = handles[i].cancel();
195            assert!(cancel_result);
196        }
197
198        tokio::time::sleep(Duration::from_millis(200)).await;
199        // Only 2 timers should be triggered
200        assert_eq!(counter.load(Ordering::SeqCst), 2);
201    }
202
203    #[tokio::test]
204    async fn test_completion_notification_once() {
205        let timer = TimerWheel::with_defaults();
206        let counter = Arc::new(AtomicU32::new(0));
207        let counter_clone = Arc::clone(&counter);
208
209        let task = TimerWheel::create_task(
210            Duration::from_millis(50),
211            Some(CallbackWrapper::new(move || {
212                let counter = Arc::clone(&counter_clone);
213                async move {
214                    counter.fetch_add(1, Ordering::SeqCst);
215                }
216            })),
217        );
218        let handle = timer.register(task);
219
220        // Wait for completion notification
221        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
222
223        // Verify callback has been executed (wait a moment to ensure callback execution is complete)
224        tokio::time::sleep(Duration::from_millis(20)).await;
225        assert_eq!(counter.load(Ordering::SeqCst), 1);
226    }
227
228    #[tokio::test]
229    async fn test_notify_only_timer_once() {
230        let timer = TimerWheel::with_defaults();
231        
232        let task = TimerTask::new(Duration::from_millis(50), None);
233        let handle = timer.register(task);
234
235        // Wait for completion notification (no callback, only notification)
236        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
237    }
238
239    #[tokio::test]
240    async fn test_batch_completion_notifications() {
241        let timer = TimerWheel::with_defaults();
242        let counter = Arc::new(AtomicU32::new(0));
243
244        // Create batch callbacks
245        let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
246            .map(|i| {
247                let counter = Arc::clone(&counter);
248                let delay = Duration::from_millis(50 + i * 10);
249                let callback = CallbackWrapper::new(move || {
250                    let counter = Arc::clone(&counter);
251                    async move {
252                        counter.fetch_add(1, Ordering::SeqCst);
253                    }
254                });
255                (delay, Some(callback))
256            })
257            .collect();
258
259        let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
260        let batch = timer.register_batch(tasks);
261        let receivers = batch.into_completion_receivers();
262
263        // Wait for all completion notifications
264        for rx in receivers {
265            rx.await.expect("Should receive completion notification");
266        }
267
268        // Wait a moment to ensure callback execution is complete
269        tokio::time::sleep(Duration::from_millis(50)).await;
270
271        // Verify all callbacks have been executed
272        assert_eq!(counter.load(Ordering::SeqCst), 5);
273    }
274
275    #[tokio::test]
276    async fn test_completion_reason_expired() {
277        let timer = TimerWheel::with_defaults();
278        
279        let task = TimerTask::new(Duration::from_millis(50), None);
280        let handle = timer.register(task);
281
282        // Wait for completion notification and verify reason is Expired
283        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
284        assert_eq!(result, TaskCompletionReason::Expired);
285    }
286
287    #[tokio::test]
288    async fn test_completion_reason_cancelled() {
289        let timer = TimerWheel::with_defaults();
290        
291        let task = TimerTask::new(Duration::from_secs(10), None);
292        let handle = timer.register(task);
293
294        // Cancel task
295        let cancelled = handle.cancel();
296        assert!(cancelled);
297
298        // Wait for completion notification and verify reason is Cancelled
299        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
300        assert_eq!(result, TaskCompletionReason::Cancelled);
301    }
302
303    #[tokio::test]
304    async fn test_batch_completion_reasons() {
305        let timer = TimerWheel::with_defaults();
306        
307        // Create 5 tasks, delay 10 seconds
308        let tasks: Vec<_> = (0..5)
309            .map(|_| TimerTask::new(Duration::from_secs(10), None))
310            .collect();
311        
312        let batch = timer.register_batch(tasks);
313        let task_ids: Vec<_> = batch.task_ids.clone();
314        let mut receivers = batch.into_completion_receivers();
315
316        // Cancel first 3 tasks
317        timer.cancel_batch(&task_ids[0..3]);
318
319        // Verify first 3 tasks received Cancelled notification
320        for rx in receivers.drain(0..3) {
321            let result = rx.await.expect("Should receive completion notification");
322            assert_eq!(result, TaskCompletionReason::Cancelled);
323        }
324
325        // Cancel remaining tasks and verify
326        timer.cancel_batch(&task_ids[3..5]);
327        for rx in receivers {
328            let result = rx.await.expect("Should receive completion notification");
329            assert_eq!(result, TaskCompletionReason::Cancelled);
330        }
331    }
332}