kestrel_protocol_timer/
lib.rs

1//! # 高性能异步定时器系统
2//!
3//! 基于时间轮(Timing Wheel)算法实现的高性能异步定时器,支持 tokio 运行时。
4//!
5//! ## 特性
6//!
7//! - **高性能**: 使用时间轮算法,插入和删除操作的时间复杂度为 O(1)
8//! - **大规模支持**: 能够高效管理 10000+ 并发定时器
9//! - **异步支持**: 基于 tokio 异步运行时
10//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
11//!
12//! ## 快速开始
13//!
14//! ```no_run
15//! use kestrel_protocol_timer::{TimerWheel, TimerTask};
16//! use std::time::Duration;
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
20//!     // 创建定时器管理器
21//!     let timer = TimerWheel::with_defaults();
22//!     
23//!     // 步骤 1: 创建定时器任务
24//!     let task = TimerWheel::create_task(Duration::from_secs(1), || async {
25//!         println!("Timer fired after 1 second!");
26//!     });
27//!     let task_id = task.get_id();
28//!     
29//!     // 步骤 2: 注册定时器任务
30//!     let handle = timer.register(task);
31//!     
32//!     // 等待定时器触发
33//!     tokio::time::sleep(Duration::from_secs(2)).await;
34//!     Ok(())
35//! }
36//! ```
37//!
38//! ## 架构说明
39//!
40//! ### 时间轮算法
41//!
42//! 时间轮是一个环形数组,每个槽位存储一组定时器任务。时间轮以固定的频率(tick)推进,
43//! 当指针移动到某个槽位时,该槽位中的所有任务会被检查是否到期。
44//!
45//! - **槽位数量**: 默认 512 个(可配置,必须是 2 的幂次方)
46//! - **时间精度**: 默认 10ms(可配置)
47//! - **最大时间跨度**: 槽位数量 × 时间精度(默认 5.12 秒)
48//! - **轮次机制**: 超出时间轮范围的任务使用轮次计数处理
49//!
50//! ### 性能优化
51//!
52//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
53//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
54//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
55//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
56
57mod config;
58mod error;
59mod task;
60mod wheel;
61mod timer;
62mod service;
63
64// 重新导出公共 API
65pub use config::{
66    BatchConfig,
67    ServiceConfig, ServiceConfigBuilder,
68    TimerConfig, TimerConfigBuilder,
69    WheelConfig, WheelConfigBuilder,
70};
71pub use error::TimerError;
72pub use task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback, TimerTask};
73pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
74pub use service::TimerService;
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use std::sync::atomic::{AtomicU32, Ordering};
80    use std::sync::Arc;
81    use std::time::Duration;
82
83    #[tokio::test]
84    async fn test_basic_timer() {
85        let timer = TimerWheel::with_defaults();
86        let counter = Arc::new(AtomicU32::new(0));
87        let counter_clone = Arc::clone(&counter);
88
89        let task = TimerWheel::create_task(
90            Duration::from_millis(50),
91            move || {
92                let counter =  Arc::clone(&counter_clone);
93                async move {
94                    counter.fetch_add(1, Ordering::SeqCst);
95                }
96            },
97        );
98        timer.register(task);
99
100        tokio::time::sleep(Duration::from_millis(100)).await;
101        assert_eq!(counter.load(Ordering::SeqCst), 1);
102    }
103
104    #[tokio::test]
105    async fn test_multiple_timers() {
106        let timer = TimerWheel::with_defaults();
107        let counter = Arc::new(AtomicU32::new(0));
108
109        // 创建 10 个定时器
110        for i in 0..10 {
111            let counter_clone = Arc::clone(&counter);
112            let task = TimerWheel::create_task(
113                Duration::from_millis(10 * (i + 1)),
114                move || {
115                    let counter = Arc::clone(&counter_clone);
116                    async move {
117                        counter.fetch_add(1, Ordering::SeqCst);
118                    }
119                },
120            );
121            timer.register(task);
122        }
123
124        tokio::time::sleep(Duration::from_millis(200)).await;
125        assert_eq!(counter.load(Ordering::SeqCst), 10);
126    }
127
128    #[tokio::test]
129    async fn test_timer_cancellation() {
130        let timer = TimerWheel::with_defaults();
131        let counter = Arc::new(AtomicU32::new(0));
132
133        // 创建 5 个定时器
134        let mut handles = Vec::new();
135        for _ in 0..5 {
136            let counter_clone = Arc::clone(&counter);
137            let task = TimerWheel::create_task(
138                Duration::from_millis(100),
139                move || {
140                    let counter = Arc::clone(&counter_clone);
141                    async move {
142                        counter.fetch_add(1, Ordering::SeqCst);
143                    }
144                },
145            );
146            let handle = timer.register(task);
147            handles.push(handle);
148        }
149
150        // 取消前 3 个定时器
151        for i in 0..3 {
152            let cancel_result = handles[i].cancel();
153            assert!(cancel_result);
154        }
155
156        tokio::time::sleep(Duration::from_millis(200)).await;
157        // 只有 2 个定时器应该被触发
158        assert_eq!(counter.load(Ordering::SeqCst), 2);
159    }
160
161    #[tokio::test]
162    async fn test_completion_notification_once() {
163        let timer = TimerWheel::with_defaults();
164        let counter = Arc::new(AtomicU32::new(0));
165        let counter_clone = Arc::clone(&counter);
166
167        let task = TimerWheel::create_task(
168            Duration::from_millis(50),
169            move || {
170                let counter = Arc::clone(&counter_clone);
171                async move {
172                    counter.fetch_add(1, Ordering::SeqCst);
173                }
174            },
175        );
176        let handle = timer.register(task);
177
178        // 等待完成通知
179        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
180
181        // 验证回调已执行(等待一下以确保回调执行完成)
182        tokio::time::sleep(Duration::from_millis(20)).await;
183        assert_eq!(counter.load(Ordering::SeqCst), 1);
184    }
185
186    #[tokio::test]
187    async fn test_notify_only_timer_once() {
188        let timer = TimerWheel::with_defaults();
189        
190        let task = TimerTask::new(Duration::from_millis(50), None);
191        let handle = timer.register(task);
192
193        // 等待完成通知(无回调,仅通知)
194        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
195    }
196
197    #[tokio::test]
198    async fn test_batch_completion_notifications() {
199        let timer = TimerWheel::with_defaults();
200        let counter = Arc::new(AtomicU32::new(0));
201
202        // 创建批量回调
203        let callbacks: Vec<(Duration, _)> = (0..5)
204            .map(|i| {
205                let counter = Arc::clone(&counter);
206                let delay = Duration::from_millis(50 + i * 10);
207                let callback = move || {
208                    let counter = Arc::clone(&counter);
209                    async move {
210                        counter.fetch_add(1, Ordering::SeqCst);
211                    }
212                };
213                (delay, callback)
214            })
215            .collect();
216
217        let tasks = TimerWheel::create_batch(callbacks);
218        let batch = timer.register_batch(tasks);
219        let receivers = batch.into_completion_receivers();
220
221        // 等待所有完成通知
222        for rx in receivers {
223            rx.await.expect("Should receive completion notification");
224        }
225
226        // 等待一下确保回调执行完成
227        tokio::time::sleep(Duration::from_millis(50)).await;
228
229        // 验证所有回调都已执行
230        assert_eq!(counter.load(Ordering::SeqCst), 5);
231    }
232}