kestrel_protocol_timer/
lib.rs

1//! # 高性能异步定时器系统
2//!
3//! 基于时间轮(Timing Wheel)算法实现的高性能异步定时器,支持 tokio 运行时。
4//!
5//! ## 特性
6//!
7//! - **高性能**: 使用时间轮算法,插入和删除操作的时间复杂度为 O(1)
8//! - **大规模支持**: 能够高效管理 10000+ 并发定时器
9//! - **异步支持**: 基于 tokio 异步运行时
10//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
11//!
12//! ## 快速开始
13//!
14//! ```no_run
15//! use kestrel_protocol_timer::TimerWheel;
16//! use std::time::Duration;
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
20//!     // 创建定时器管理器
21//!     let timer = TimerWheel::with_defaults();
22//!     
23//!     // 调度一次性定时器
24//!     let handle = timer.schedule_once(Duration::from_secs(1), || async {
25//!         println!("Timer fired after 1 second!");
26//!     }).await;
27//!     
28//!     // 等待定时器触发
29//!     tokio::time::sleep(Duration::from_secs(2)).await;
30//!     Ok(())
31//! }
32//! ```
33//!
34//! ## 架构说明
35//!
36//! ### 时间轮算法
37//!
38//! 时间轮是一个环形数组,每个槽位存储一组定时器任务。时间轮以固定的频率(tick)推进,
39//! 当指针移动到某个槽位时,该槽位中的所有任务会被检查是否到期。
40//!
41//! - **槽位数量**: 默认 512 个(可配置,必须是 2 的幂次方)
42//! - **时间精度**: 默认 10ms(可配置)
43//! - **最大时间跨度**: 槽位数量 × 时间精度(默认 5.12 秒)
44//! - **轮次机制**: 超出时间轮范围的任务使用轮次计数处理
45//!
46//! ### 性能优化
47//!
48//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
49//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
50//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
51//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
52
53mod config;
54mod error;
55mod task;
56mod wheel;
57mod timer;
58mod service;
59
60// 重新导出公共 API
61pub use config::{
62    BatchConfig,
63    ServiceConfig, ServiceConfigBuilder,
64    TimerConfig, TimerConfigBuilder,
65    WheelConfig, WheelConfigBuilder,
66};
67pub use error::TimerError;
68pub use task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback};
69pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
70pub use service::TimerService;
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75    use std::sync::atomic::{AtomicU32, Ordering};
76    use std::sync::Arc;
77    use std::time::Duration;
78
79    #[tokio::test]
80    async fn test_basic_timer() {
81        let timer = TimerWheel::with_defaults();
82        let counter = Arc::new(AtomicU32::new(0));
83        let counter_clone = Arc::clone(&counter);
84
85        timer.schedule_once(
86            Duration::from_millis(50),
87            move || {
88                let counter =  Arc::clone(&counter_clone);
89                async move {
90                    counter.fetch_add(1, Ordering::SeqCst);
91                }
92            },
93        ).await;
94
95        tokio::time::sleep(Duration::from_millis(100)).await;
96        assert_eq!(counter.load(Ordering::SeqCst), 1);
97    }
98
99    #[tokio::test]
100    async fn test_multiple_timers() {
101        let timer = TimerWheel::with_defaults();
102        let counter = Arc::new(AtomicU32::new(0));
103
104        // 创建 10 个定时器
105        for i in 0..10 {
106            let counter_clone = Arc::clone(&counter);
107            timer.schedule_once(
108                Duration::from_millis(10 * (i + 1)),
109                move || {
110                    let counter = Arc::clone(&counter_clone);
111                    async move {
112                        counter.fetch_add(1, Ordering::SeqCst);
113                    }
114                },
115            ).await;
116        }
117
118        tokio::time::sleep(Duration::from_millis(200)).await;
119        assert_eq!(counter.load(Ordering::SeqCst), 10);
120    }
121
122    #[tokio::test]
123    async fn test_timer_cancellation() {
124        let timer = TimerWheel::with_defaults();
125        let counter = Arc::new(AtomicU32::new(0));
126
127        // 创建 5 个定时器
128        let mut handles = Vec::new();
129        for _ in 0..5 {
130            let counter_clone = Arc::clone(&counter);
131            let handle = timer.schedule_once(
132                Duration::from_millis(100),
133                move || {
134                    let counter = Arc::clone(&counter_clone);
135                    async move {
136                        counter.fetch_add(1, Ordering::SeqCst);
137                    }
138                },
139            ).await;
140            handles.push(handle);
141        }
142
143        // 取消前 3 个定时器
144        for i in 0..3 {
145            let cancel_result = handles[i].cancel();
146            assert!(cancel_result);
147        }
148
149        tokio::time::sleep(Duration::from_millis(200)).await;
150        // 只有 2 个定时器应该被触发
151        assert_eq!(counter.load(Ordering::SeqCst), 2);
152    }
153
154    #[tokio::test]
155    async fn test_completion_notification_once() {
156        let timer = TimerWheel::with_defaults();
157        let counter = Arc::new(AtomicU32::new(0));
158        let counter_clone = Arc::clone(&counter);
159
160        let handle = timer.schedule_once(
161            Duration::from_millis(50),
162            move || {
163                let counter = Arc::clone(&counter_clone);
164                async move {
165                    counter.fetch_add(1, Ordering::SeqCst);
166                }
167            },
168        ).await;
169
170        // 等待完成通知
171        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
172
173        // 验证回调已执行(等待一下以确保回调执行完成)
174        tokio::time::sleep(Duration::from_millis(20)).await;
175        assert_eq!(counter.load(Ordering::SeqCst), 1);
176    }
177
178    #[tokio::test]
179    async fn test_notify_only_timer_once() {
180        let timer = TimerWheel::with_defaults();
181        
182        let handle = timer.schedule_once_notify(Duration::from_millis(50)).await;
183
184        // 等待完成通知(无回调,仅通知)
185        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
186    }
187
188    #[tokio::test]
189    async fn test_batch_completion_notifications() {
190        let timer = TimerWheel::with_defaults();
191        let counter = Arc::new(AtomicU32::new(0));
192
193        // 创建批量回调
194        let callbacks: Vec<(Duration, _)> = (0..5)
195            .map(|i| {
196                let counter = Arc::clone(&counter);
197                let delay = Duration::from_millis(50 + i * 10);
198                let callback = move || {
199                    let counter = Arc::clone(&counter);
200                    async move {
201                        counter.fetch_add(1, Ordering::SeqCst);
202                    }
203                };
204                (delay, callback)
205            })
206            .collect();
207
208        let batch = timer.schedule_once_batch(callbacks).await;
209        let receivers = batch.into_completion_receivers();
210
211        // 等待所有完成通知
212        for rx in receivers {
213            rx.await.expect("Should receive completion notification");
214        }
215
216        // 等待一下确保回调执行完成
217        tokio::time::sleep(Duration::from_millis(50)).await;
218
219        // 验证所有回调都已执行
220        assert_eq!(counter.load(Ordering::SeqCst), 5);
221    }
222}