kestrel_protocol_timer/
lib.rs

1//! # 高性能异步定时器系统
2//!
3//! 基于时间轮(Timing Wheel)算法实现的高性能异步定时器,支持 tokio 运行时。
4//!
5//! ## 特性
6//!
7//! - **高性能**: 使用时间轮算法,插入和删除操作的时间复杂度为 O(1)
8//! - **大规模支持**: 能够高效管理 10000+ 并发定时器
9//! - **异步支持**: 基于 tokio 异步运行时
10//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
11//!
12//! ## 快速开始
13//!
14//! ```no_run
15//! use timer::TimerWheel;
16//! use std::time::Duration;
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
20//!     // 创建定时器管理器
21//!     let timer = TimerWheel::with_defaults()?;
22//!     
23//!     // 调度一次性定时器
24//!     let handle = timer.schedule_once(Duration::from_secs(1), || async {
25//!         println!("Timer fired after 1 second!");
26//!     }).await?;
27//!     
28//!     // 等待定时器触发
29//!     tokio::time::sleep(Duration::from_secs(2)).await;
30//!     Ok(())
31//! }
32//! ```
33//!
34//! ## 架构说明
35//!
36//! ### 时间轮算法
37//!
38//! 时间轮是一个环形数组,每个槽位存储一组定时器任务。时间轮以固定的频率(tick)推进,
39//! 当指针移动到某个槽位时,该槽位中的所有任务会被检查是否到期。
40//!
41//! - **槽位数量**: 默认 512 个(可配置,必须是 2 的幂次方)
42//! - **时间精度**: 默认 10ms(可配置)
43//! - **最大时间跨度**: 槽位数量 × 时间精度(默认 5.12 秒)
44//! - **轮次机制**: 超出时间轮范围的任务使用轮次计数处理
45//!
46//! ### 性能优化
47//!
48//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
49//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
50//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
51//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
52
53mod error;
54mod task;
55mod wheel;
56mod timer;
57mod service;
58
59// 重新导出公共 API
60pub use error::TimerError;
61pub use task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback};
62pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
63pub use service::TimerService;
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use std::sync::atomic::{AtomicU32, Ordering};
69    use std::sync::Arc;
70    use std::time::Duration;
71
72    #[tokio::test]
73    async fn test_basic_timer() {
74        let timer = TimerWheel::with_defaults().unwrap();
75        let counter = Arc::new(AtomicU32::new(0));
76        let counter_clone = Arc::clone(&counter);
77
78        timer.schedule_once(
79            Duration::from_millis(50),
80            move || {
81                let counter =  Arc::clone(&counter_clone);
82                async move {
83                    counter.fetch_add(1, Ordering::SeqCst);
84                }
85            },
86        ).await.unwrap();
87
88        tokio::time::sleep(Duration::from_millis(100)).await;
89        assert_eq!(counter.load(Ordering::SeqCst), 1);
90    }
91
92    #[tokio::test]
93    async fn test_multiple_timers() {
94        let timer = TimerWheel::with_defaults().unwrap();
95        let counter = Arc::new(AtomicU32::new(0));
96
97        // 创建 10 个定时器
98        for i in 0..10 {
99            let counter_clone = Arc::clone(&counter);
100            timer.schedule_once(
101                Duration::from_millis(10 * (i + 1)),
102                move || {
103                    let counter = Arc::clone(&counter_clone);
104                    async move {
105                        counter.fetch_add(1, Ordering::SeqCst);
106                    }
107                },
108            ).await.unwrap();
109        }
110
111        tokio::time::sleep(Duration::from_millis(200)).await;
112        assert_eq!(counter.load(Ordering::SeqCst), 10);
113    }
114
115    #[tokio::test]
116    async fn test_timer_cancellation() {
117        let timer = TimerWheel::with_defaults().unwrap();
118        let counter = Arc::new(AtomicU32::new(0));
119
120        // 创建 5 个定时器
121        let mut handles = Vec::new();
122        for _ in 0..5 {
123            let counter_clone = Arc::clone(&counter);
124            let handle = timer.schedule_once(
125                Duration::from_millis(100),
126                move || {
127                    let counter = Arc::clone(&counter_clone);
128                    async move {
129                        counter.fetch_add(1, Ordering::SeqCst);
130                    }
131                },
132            ).await.unwrap();
133            handles.push(handle);
134        }
135
136        // 取消前 3 个定时器
137        for i in 0..3 {
138            let cancel_result = handles[i].cancel();
139            assert!(cancel_result);
140        }
141
142        tokio::time::sleep(Duration::from_millis(200)).await;
143        // 只有 2 个定时器应该被触发
144        assert_eq!(counter.load(Ordering::SeqCst), 2);
145    }
146
147    #[tokio::test]
148    async fn test_completion_notification_once() {
149        let timer = TimerWheel::with_defaults().unwrap();
150        let counter = Arc::new(AtomicU32::new(0));
151        let counter_clone = Arc::clone(&counter);
152
153        let handle = timer.schedule_once(
154            Duration::from_millis(50),
155            move || {
156                let counter = Arc::clone(&counter_clone);
157                async move {
158                    counter.fetch_add(1, Ordering::SeqCst);
159                }
160            },
161        ).await.unwrap();
162
163        // 等待完成通知
164        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
165
166        // 验证回调已执行(等待一下以确保回调执行完成)
167        tokio::time::sleep(Duration::from_millis(20)).await;
168        assert_eq!(counter.load(Ordering::SeqCst), 1);
169    }
170
171    #[tokio::test]
172    async fn test_notify_only_timer_once() {
173        let timer = TimerWheel::with_defaults().unwrap();
174        
175        let handle = timer.schedule_once_notify(Duration::from_millis(50)).await.unwrap();
176
177        // 等待完成通知(无回调,仅通知)
178        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
179    }
180
181    #[tokio::test]
182    async fn test_batch_completion_notifications() {
183        let timer = TimerWheel::with_defaults().unwrap();
184        let counter = Arc::new(AtomicU32::new(0));
185
186        // 创建批量回调
187        let callbacks: Vec<(Duration, _)> = (0..5)
188            .map(|i| {
189                let counter = Arc::clone(&counter);
190                let delay = Duration::from_millis(50 + i * 10);
191                let callback = move || {
192                    let counter = Arc::clone(&counter);
193                    async move {
194                        counter.fetch_add(1, Ordering::SeqCst);
195                    }
196                };
197                (delay, callback)
198            })
199            .collect();
200
201        let batch = timer.schedule_once_batch(callbacks).await.unwrap();
202        let receivers = batch.into_completion_receivers();
203
204        // 等待所有完成通知
205        for rx in receivers {
206            rx.await.expect("Should receive completion notification");
207        }
208
209        // 等待一下确保回调执行完成
210        tokio::time::sleep(Duration::from_millis(50)).await;
211
212        // 验证所有回调都已执行
213        assert_eq!(counter.load(Ordering::SeqCst), 5);
214    }
215}