kestrel_protocol_timer/
lib.rs

1//! # 高性能异步定时器系统
2//!
3//! 基于时间轮(Timing Wheel)算法实现的高性能异步定时器,支持 tokio 运行时。
4//!
5//! ## 特性
6//!
7//! - **高性能**: 使用时间轮算法,插入和删除操作的时间复杂度为 O(1)
8//! - **大规模支持**: 能够高效管理 10000+ 并发定时器
9//! - **异步支持**: 基于 tokio 异步运行时
10//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
11//!
12//! ## 快速开始
13//!
14//! ```no_run
15//! use kestrel_protocol_timer::{TimerWheel, CallbackWrapper};
16//! use std::time::Duration;
17//! use std::sync::Arc;
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     // 创建定时器管理器
22//!     let timer = TimerWheel::with_defaults();
23//!     
24//!     // 步骤 1: 创建定时器任务(使用回调)
25//!     let callback = Some(CallbackWrapper::new(|| async {
26//!         println!("Timer fired after 1 second!");
27//!     }));
28//!     let task = TimerWheel::create_task(Duration::from_secs(1), callback);
29//!     let task_id = task.get_id();
30//!     
31//!     // 步骤 2: 注册定时器任务并获取完成通知
32//!     let handle = timer.register(task);
33//!     
34//!     // 等待定时器完成
35//!     handle.into_completion_receiver().0.await?;
36//!     Ok(())
37//! }
38//! ```
39//!
40//! ## 架构说明
41//!
42//! ### 时间轮算法
43//!
44//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
45//!
46//! - **L0 层(底层)**: 处理短延迟任务
47//!   - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
48//!   - 时间精度: 默认 10ms(可配置)
49//!   - 最大时间跨度: 5.12 秒
50//!
51//! - **L1 层(高层)**: 处理长延迟任务
52//!   - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
53//!   - 时间精度: 默认 1 秒(可配置)
54//!   - 最大时间跨度: 64 秒
55//!
56//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
57//!
58//! ### 性能优化
59//!
60//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
61//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
62//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
63//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
64
65mod config;
66mod error;
67mod task;
68mod wheel;
69mod timer;
70mod service;
71
72// 重新导出公共 API
73pub use config::{
74    BatchConfig,
75    HierarchicalWheelConfig,
76    ServiceConfig, ServiceConfigBuilder,
77    TimerConfig, TimerConfigBuilder,
78    WheelConfig, WheelConfigBuilder,
79};
80pub use error::TimerError;
81pub use task::{CallbackWrapper, CompletionNotifier, TaskCompletionReason, TaskId, TimerCallback, TimerTask};
82pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
83pub use service::TimerService;
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use std::sync::atomic::{AtomicU32, Ordering};
89    use std::sync::Arc;
90    use std::time::Duration;
91
92    #[tokio::test]
93    async fn test_basic_timer() {
94        let timer = TimerWheel::with_defaults();
95        let counter = Arc::new(AtomicU32::new(0));
96        let counter_clone = Arc::clone(&counter);
97
98        let task = TimerWheel::create_task(
99            Duration::from_millis(50),
100            Some(CallbackWrapper::new(move || {
101                let counter =  Arc::clone(&counter_clone);
102                async move {
103                    counter.fetch_add(1, Ordering::SeqCst);
104                }
105            })),
106        );
107        timer.register(task);
108
109        tokio::time::sleep(Duration::from_millis(100)).await;
110        assert_eq!(counter.load(Ordering::SeqCst), 1);
111    }
112
113    #[tokio::test]
114    async fn test_multiple_timers() {
115        let timer = TimerWheel::with_defaults();
116        let counter = Arc::new(AtomicU32::new(0));
117
118        // 创建 10 个定时器
119        for i in 0..10 {
120            let counter_clone = Arc::clone(&counter);
121            let task = TimerWheel::create_task(
122                Duration::from_millis(10 * (i + 1)),
123                Some(CallbackWrapper::new(move || {
124                    let counter = Arc::clone(&counter_clone);
125                    async move {
126                        counter.fetch_add(1, Ordering::SeqCst);
127                    }
128                })),
129            );
130            timer.register(task);
131        }
132
133        tokio::time::sleep(Duration::from_millis(200)).await;
134        assert_eq!(counter.load(Ordering::SeqCst), 10);
135    }
136
137    #[tokio::test]
138    async fn test_timer_cancellation() {
139        let timer = TimerWheel::with_defaults();
140        let counter = Arc::new(AtomicU32::new(0));
141
142        // 创建 5 个定时器
143        let mut handles = Vec::new();
144        for _ in 0..5 {
145            let counter_clone = Arc::clone(&counter);
146            let task = TimerWheel::create_task(
147                Duration::from_millis(100),
148                Some(CallbackWrapper::new(move || {
149                    let counter = Arc::clone(&counter_clone);
150                    async move {
151                        counter.fetch_add(1, Ordering::SeqCst);
152                    }
153                })),
154            );
155            let handle = timer.register(task);
156            handles.push(handle);
157        }
158
159        // 取消前 3 个定时器
160        for i in 0..3 {
161            let cancel_result = handles[i].cancel();
162            assert!(cancel_result);
163        }
164
165        tokio::time::sleep(Duration::from_millis(200)).await;
166        // 只有 2 个定时器应该被触发
167        assert_eq!(counter.load(Ordering::SeqCst), 2);
168    }
169
170    #[tokio::test]
171    async fn test_completion_notification_once() {
172        let timer = TimerWheel::with_defaults();
173        let counter = Arc::new(AtomicU32::new(0));
174        let counter_clone = Arc::clone(&counter);
175
176        let task = TimerWheel::create_task(
177            Duration::from_millis(50),
178            Some(CallbackWrapper::new(move || {
179                let counter = Arc::clone(&counter_clone);
180                async move {
181                    counter.fetch_add(1, Ordering::SeqCst);
182                }
183            })),
184        );
185        let handle = timer.register(task);
186
187        // 等待完成通知
188        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
189
190        // 验证回调已执行(等待一下以确保回调执行完成)
191        tokio::time::sleep(Duration::from_millis(20)).await;
192        assert_eq!(counter.load(Ordering::SeqCst), 1);
193    }
194
195    #[tokio::test]
196    async fn test_notify_only_timer_once() {
197        let timer = TimerWheel::with_defaults();
198        
199        let task = TimerTask::new(Duration::from_millis(50), None);
200        let handle = timer.register(task);
201
202        // 等待完成通知(无回调,仅通知)
203        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
204    }
205
206    #[tokio::test]
207    async fn test_batch_completion_notifications() {
208        let timer = TimerWheel::with_defaults();
209        let counter = Arc::new(AtomicU32::new(0));
210
211        // 创建批量回调
212        let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
213            .map(|i| {
214                let counter = Arc::clone(&counter);
215                let delay = Duration::from_millis(50 + i * 10);
216                let callback = CallbackWrapper::new(move || {
217                    let counter = Arc::clone(&counter);
218                    async move {
219                        counter.fetch_add(1, Ordering::SeqCst);
220                    }
221                });
222                (delay, Some(callback))
223            })
224            .collect();
225
226        let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
227        let batch = timer.register_batch(tasks);
228        let receivers = batch.into_completion_receivers();
229
230        // 等待所有完成通知
231        for rx in receivers {
232            rx.await.expect("Should receive completion notification");
233        }
234
235        // 等待一下确保回调执行完成
236        tokio::time::sleep(Duration::from_millis(50)).await;
237
238        // 验证所有回调都已执行
239        assert_eq!(counter.load(Ordering::SeqCst), 5);
240    }
241
242    #[tokio::test]
243    async fn test_completion_reason_expired() {
244        let timer = TimerWheel::with_defaults();
245        
246        let task = TimerTask::new(Duration::from_millis(50), None);
247        let handle = timer.register(task);
248
249        // 等待完成通知并验证原因是 Expired
250        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
251        assert_eq!(result, TaskCompletionReason::Expired);
252    }
253
254    #[tokio::test]
255    async fn test_completion_reason_cancelled() {
256        let timer = TimerWheel::with_defaults();
257        
258        let task = TimerTask::new(Duration::from_secs(10), None);
259        let handle = timer.register(task);
260
261        // 取消任务
262        let cancelled = handle.cancel();
263        assert!(cancelled);
264
265        // 等待完成通知并验证原因是 Cancelled
266        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
267        assert_eq!(result, TaskCompletionReason::Cancelled);
268    }
269
270    #[tokio::test]
271    async fn test_batch_completion_reasons() {
272        let timer = TimerWheel::with_defaults();
273        
274        // 创建 5 个任务,延迟 10 秒
275        let tasks: Vec<_> = (0..5)
276            .map(|_| TimerTask::new(Duration::from_secs(10), None))
277            .collect();
278        
279        let batch = timer.register_batch(tasks);
280        let task_ids: Vec<_> = batch.task_ids.clone();
281        let mut receivers = batch.into_completion_receivers();
282
283        // 取消前 3 个任务
284        timer.cancel_batch(&task_ids[0..3]);
285
286        // 验证前 3 个任务收到 Cancelled 通知
287        for rx in receivers.drain(0..3) {
288            let result = rx.await.expect("Should receive completion notification");
289            assert_eq!(result, TaskCompletionReason::Cancelled);
290        }
291
292        // 取消剩余任务并验证
293        timer.cancel_batch(&task_ids[3..5]);
294        for rx in receivers {
295            let result = rx.await.expect("Should receive completion notification");
296            assert_eq!(result, TaskCompletionReason::Cancelled);
297        }
298    }
299}