kestrel_timer/
lib.rs

1//! # 高性能异步定时器系统 (High-Performance Async Timer System)
2//!
3//! 基于时间轮(Timing Wheel)算法实现的高性能异步定时器,支持 tokio 运行时。
4//! (High-performance async timer based on Timing Wheel algorithm, supports tokio runtime)
5//!
6//! ## 特性 (Features)
7//!
8//! - **高性能 (High Performance)**: 使用时间轮算法,插入和删除操作的时间复杂度为 O(1)
9//!   (Uses timing wheel algorithm, insert and delete operations are O(1))
10//! - **大规模支持 (Large-Scale Support)**: 能够高效管理 10000+ 并发定时器
11//!   (Efficiently manages 10000+ concurrent timers)
12//! - **异步支持 (Async Support)**: 基于 tokio 异步运行时
13//!   (Based on tokio async runtime)
14//! - **线程安全 (Thread-Safe)**: 使用 parking_lot 提供高性能的锁机制
15//!   (Uses parking_lot for high-performance locking mechanism)
16//!
17//! ## 快速开始 (Quick Start)
18//!
19//! ```no_run
20//! use kestrel_timer::{TimerWheel, CallbackWrapper};
21//! use std::time::Duration;
22//! use std::sync::Arc;
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//!     // 创建定时器管理器
27//!     let timer = TimerWheel::with_defaults();
28//!     
29//!     // 步骤 1: 创建定时器任务(使用回调)
30//!     let callback = Some(CallbackWrapper::new(|| async {
31//!         println!("Timer fired after 1 second!");
32//!     }));
33//!     let task = TimerWheel::create_task(Duration::from_secs(1), callback);
34//!     let task_id = task.get_id();
35//!     
36//!     // 步骤 2: 注册定时器任务并获取完成通知
37//!     let handle = timer.register(task);
38//!     
39//!     // 等待定时器完成
40//!     handle.into_completion_receiver().0.await?;
41//!     Ok(())
42//! }
43//! ```
44//!
45//! ## 中文架构说明
46//!
47//! ### 时间轮算法
48//!
49//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
50//!
51//! - **L0 层(底层)**: 处理短延迟任务
52//!   - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
53//!   - 时间精度: 默认 10ms(可配置)
54//!   - 最大时间跨度: 5.12 秒
55//!
56//! - **L1 层(高层)**: 处理长延迟任务
57//!   - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
58//!   - 时间精度: 默认 1 秒(可配置)
59//!   - 最大时间跨度: 64 秒
60//!
61//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
62//!
63//! ### 性能优化
64//!
65//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
66//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
67//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
68//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
69//! 
70//! ## English Architecture Description
71//!
72//! ### Timing Wheel Algorithm
73//!
74//! Uses hierarchical timing wheel algorithm with L0 and L1 layers:
75//!
76//! - **L0 Layer (Bottom)**: Handles short delay tasks
77//!   - Slot count: Default 512, configurable, must be power of 2
78//!   - Time precision: Default 10ms, configurable
79//!   - Maximum time span: 5.12 seconds
80//!
81//! - **L1 Layer (Upper)**: Handles long delay tasks
82//!   - Slot count: Default 64, configurable, must be power of 2
83//!   - Time precision: Default 1 second, configurable
84//!   - Maximum time span: 64 seconds
85//!
86//! - **Round Mechanism**: Tasks beyond L1 range use round counting
87//! 
88//! ### Performance Optimization
89//!
90//! - Uses `parking_lot::Mutex` instead of standard library Mutex for better performance
91//!   - Uses `FxHashMap` (rustc-hash) instead of standard HashMap to reduce hash collisions
92//!   - Slot count is power of 2, uses bitwise operations to optimize modulo
93//!   - Task execution in separate tokio tasks to avoid blocking timing wheel advancement
94
95mod config;
96mod error;
97mod task;
98mod wheel;
99mod timer;
100mod service;
101
102// 重新导出公共 API (Re-export public API)
103pub use config::{
104    BatchConfig,
105    ServiceConfig, ServiceConfigBuilder,
106    TimerConfig, TimerConfigBuilder,
107    WheelConfig, WheelConfigBuilder,
108};
109pub use error::TimerError;
110pub use task::{CallbackWrapper, CompletionNotifier, TaskCompletionReason, TaskId, TimerCallback, TimerTask};
111pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
112pub use service::TimerService;
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use std::sync::atomic::{AtomicU32, Ordering};
118    use std::sync::Arc;
119    use std::time::Duration;
120
121    #[tokio::test]
122    async fn test_basic_timer() {
123        let timer = TimerWheel::with_defaults();
124        let counter = Arc::new(AtomicU32::new(0));
125        let counter_clone = Arc::clone(&counter);
126
127        let task = TimerWheel::create_task(
128            Duration::from_millis(50),
129            Some(CallbackWrapper::new(move || {
130                let counter =  Arc::clone(&counter_clone);
131                async move {
132                    counter.fetch_add(1, Ordering::SeqCst);
133                }
134            })),
135        );
136        timer.register(task);
137
138        tokio::time::sleep(Duration::from_millis(100)).await;
139        assert_eq!(counter.load(Ordering::SeqCst), 1);
140    }
141
142    #[tokio::test]
143    async fn test_multiple_timers() {
144        let timer = TimerWheel::with_defaults();
145        let counter = Arc::new(AtomicU32::new(0));
146
147        // 创建 10 个定时器
148        for i in 0..10 {
149            let counter_clone = Arc::clone(&counter);
150            let task = TimerWheel::create_task(
151                Duration::from_millis(10 * (i + 1)),
152                Some(CallbackWrapper::new(move || {
153                    let counter = Arc::clone(&counter_clone);
154                    async move {
155                        counter.fetch_add(1, Ordering::SeqCst);
156                    }
157                })),
158            );
159            timer.register(task);
160        }
161
162        tokio::time::sleep(Duration::from_millis(200)).await;
163        assert_eq!(counter.load(Ordering::SeqCst), 10);
164    }
165
166    #[tokio::test]
167    async fn test_timer_cancellation() {
168        let timer = TimerWheel::with_defaults();
169        let counter = Arc::new(AtomicU32::new(0));
170
171        // 创建 5 个定时器
172        let mut handles = Vec::new();
173        for _ in 0..5 {
174            let counter_clone = Arc::clone(&counter);
175            let task = TimerWheel::create_task(
176                Duration::from_millis(100),
177                Some(CallbackWrapper::new(move || {
178                    let counter = Arc::clone(&counter_clone);
179                    async move {
180                        counter.fetch_add(1, Ordering::SeqCst);
181                    }
182                })),
183            );
184            let handle = timer.register(task);
185            handles.push(handle);
186        }
187
188        // 取消前 3 个定时器
189        for i in 0..3 {
190            let cancel_result = handles[i].cancel();
191            assert!(cancel_result);
192        }
193
194        tokio::time::sleep(Duration::from_millis(200)).await;
195        // 只有 2 个定时器应该被触发
196        assert_eq!(counter.load(Ordering::SeqCst), 2);
197    }
198
199    #[tokio::test]
200    async fn test_completion_notification_once() {
201        let timer = TimerWheel::with_defaults();
202        let counter = Arc::new(AtomicU32::new(0));
203        let counter_clone = Arc::clone(&counter);
204
205        let task = TimerWheel::create_task(
206            Duration::from_millis(50),
207            Some(CallbackWrapper::new(move || {
208                let counter = Arc::clone(&counter_clone);
209                async move {
210                    counter.fetch_add(1, Ordering::SeqCst);
211                }
212            })),
213        );
214        let handle = timer.register(task);
215
216        // 等待完成通知
217        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
218
219        // 验证回调已执行(等待一下以确保回调执行完成)
220        tokio::time::sleep(Duration::from_millis(20)).await;
221        assert_eq!(counter.load(Ordering::SeqCst), 1);
222    }
223
224    #[tokio::test]
225    async fn test_notify_only_timer_once() {
226        let timer = TimerWheel::with_defaults();
227        
228        let task = TimerTask::new(Duration::from_millis(50), None);
229        let handle = timer.register(task);
230
231        // 等待完成通知(无回调,仅通知)
232        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
233    }
234
235    #[tokio::test]
236    async fn test_batch_completion_notifications() {
237        let timer = TimerWheel::with_defaults();
238        let counter = Arc::new(AtomicU32::new(0));
239
240        // 创建批量回调
241        let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
242            .map(|i| {
243                let counter = Arc::clone(&counter);
244                let delay = Duration::from_millis(50 + i * 10);
245                let callback = CallbackWrapper::new(move || {
246                    let counter = Arc::clone(&counter);
247                    async move {
248                        counter.fetch_add(1, Ordering::SeqCst);
249                    }
250                });
251                (delay, Some(callback))
252            })
253            .collect();
254
255        let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
256        let batch = timer.register_batch(tasks);
257        let receivers = batch.into_completion_receivers();
258
259        // 等待所有完成通知
260        for rx in receivers {
261            rx.await.expect("Should receive completion notification");
262        }
263
264        // 等待一下确保回调执行完成
265        tokio::time::sleep(Duration::from_millis(50)).await;
266
267        // 验证所有回调都已执行
268        assert_eq!(counter.load(Ordering::SeqCst), 5);
269    }
270
271    #[tokio::test]
272    async fn test_completion_reason_expired() {
273        let timer = TimerWheel::with_defaults();
274        
275        let task = TimerTask::new(Duration::from_millis(50), None);
276        let handle = timer.register(task);
277
278        // 等待完成通知并验证原因是 Expired
279        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
280        assert_eq!(result, TaskCompletionReason::Expired);
281    }
282
283    #[tokio::test]
284    async fn test_completion_reason_cancelled() {
285        let timer = TimerWheel::with_defaults();
286        
287        let task = TimerTask::new(Duration::from_secs(10), None);
288        let handle = timer.register(task);
289
290        // 取消任务
291        let cancelled = handle.cancel();
292        assert!(cancelled);
293
294        // 等待完成通知并验证原因是 Cancelled
295        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
296        assert_eq!(result, TaskCompletionReason::Cancelled);
297    }
298
299    #[tokio::test]
300    async fn test_batch_completion_reasons() {
301        let timer = TimerWheel::with_defaults();
302        
303        // 创建 5 个任务,延迟 10 秒
304        let tasks: Vec<_> = (0..5)
305            .map(|_| TimerTask::new(Duration::from_secs(10), None))
306            .collect();
307        
308        let batch = timer.register_batch(tasks);
309        let task_ids: Vec<_> = batch.task_ids.clone();
310        let mut receivers = batch.into_completion_receivers();
311
312        // 取消前 3 个任务
313        timer.cancel_batch(&task_ids[0..3]);
314
315        // 验证前 3 个任务收到 Cancelled 通知
316        for rx in receivers.drain(0..3) {
317            let result = rx.await.expect("Should receive completion notification");
318            assert_eq!(result, TaskCompletionReason::Cancelled);
319        }
320
321        // 取消剩余任务并验证
322        timer.cancel_batch(&task_ids[3..5]);
323        for rx in receivers {
324            let result = rx.await.expect("Should receive completion notification");
325            assert_eq!(result, TaskCompletionReason::Cancelled);
326        }
327    }
328}