kestrel_protocol_timer/
lib.rs

1//! # 高性能异步定时器系统
2//!
3//! 基于时间轮(Timing Wheel)算法实现的高性能异步定时器,支持 tokio 运行时。
4//!
5//! ## 特性
6//!
7//! - **高性能**: 使用时间轮算法,插入和删除操作的时间复杂度为 O(1)
8//! - **大规模支持**: 能够高效管理 10000+ 并发定时器
9//! - **异步支持**: 基于 tokio 异步运行时
10//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
11//!
12//! ## 快速开始
13//!
14//! ```no_run
15//! use kestrel_protocol_timer::{TimerWheel, CallbackWrapper};
16//! use std::time::Duration;
17//! use std::sync::Arc;
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     // 创建定时器管理器
22//!     let timer = TimerWheel::with_defaults();
23//!     
24//!     // 步骤 1: 创建定时器任务(使用回调)
25//!     let callback = Some(CallbackWrapper::new(|| async {
26//!         println!("Timer fired after 1 second!");
27//!     }));
28//!     let task = TimerWheel::create_task(Duration::from_secs(1), callback);
29//!     let task_id = task.get_id();
30//!     
31//!     // 步骤 2: 注册定时器任务并获取完成通知
32//!     let handle = timer.register(task);
33//!     
34//!     // 等待定时器完成
35//!     handle.into_completion_receiver().0.await?;
36//!     Ok(())
37//! }
38//! ```
39//!
40//! ## 架构说明
41//!
42//! ### 时间轮算法
43//!
44//! 时间轮是一个环形数组,每个槽位存储一组定时器任务。时间轮以固定的频率(tick)推进,
45//! 当指针移动到某个槽位时,该槽位中的所有任务会被检查是否到期。
46//!
47//! - **槽位数量**: 默认 512 个(可配置,必须是 2 的幂次方)
48//! - **时间精度**: 默认 10ms(可配置)
49//! - **最大时间跨度**: 槽位数量 × 时间精度(默认 5.12 秒)
50//! - **轮次机制**: 超出时间轮范围的任务使用轮次计数处理
51//!
52//! ### 性能优化
53//!
54//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
55//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
56//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
57//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
58
59mod config;
60mod error;
61mod task;
62mod wheel;
63mod timer;
64mod service;
65
66// 重新导出公共 API
67pub use config::{
68    BatchConfig,
69    ServiceConfig, ServiceConfigBuilder,
70    TimerConfig, TimerConfigBuilder,
71    WheelConfig, WheelConfigBuilder,
72};
73pub use error::TimerError;
74pub use task::{CallbackWrapper, CompletionNotifier, TaskCompletionReason, TaskId, TimerCallback, TimerTask};
75pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
76pub use service::TimerService;
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use std::sync::atomic::{AtomicU32, Ordering};
82    use std::sync::Arc;
83    use std::time::Duration;
84
85    #[tokio::test]
86    async fn test_basic_timer() {
87        let timer = TimerWheel::with_defaults();
88        let counter = Arc::new(AtomicU32::new(0));
89        let counter_clone = Arc::clone(&counter);
90
91        let task = TimerWheel::create_task(
92            Duration::from_millis(50),
93            Some(CallbackWrapper::new(move || {
94                let counter =  Arc::clone(&counter_clone);
95                async move {
96                    counter.fetch_add(1, Ordering::SeqCst);
97                }
98            })),
99        );
100        timer.register(task);
101
102        tokio::time::sleep(Duration::from_millis(100)).await;
103        assert_eq!(counter.load(Ordering::SeqCst), 1);
104    }
105
106    #[tokio::test]
107    async fn test_multiple_timers() {
108        let timer = TimerWheel::with_defaults();
109        let counter = Arc::new(AtomicU32::new(0));
110
111        // 创建 10 个定时器
112        for i in 0..10 {
113            let counter_clone = Arc::clone(&counter);
114            let task = TimerWheel::create_task(
115                Duration::from_millis(10 * (i + 1)),
116                Some(CallbackWrapper::new(move || {
117                    let counter = Arc::clone(&counter_clone);
118                    async move {
119                        counter.fetch_add(1, Ordering::SeqCst);
120                    }
121                })),
122            );
123            timer.register(task);
124        }
125
126        tokio::time::sleep(Duration::from_millis(200)).await;
127        assert_eq!(counter.load(Ordering::SeqCst), 10);
128    }
129
130    #[tokio::test]
131    async fn test_timer_cancellation() {
132        let timer = TimerWheel::with_defaults();
133        let counter = Arc::new(AtomicU32::new(0));
134
135        // 创建 5 个定时器
136        let mut handles = Vec::new();
137        for _ in 0..5 {
138            let counter_clone = Arc::clone(&counter);
139            let task = TimerWheel::create_task(
140                Duration::from_millis(100),
141                Some(CallbackWrapper::new(move || {
142                    let counter = Arc::clone(&counter_clone);
143                    async move {
144                        counter.fetch_add(1, Ordering::SeqCst);
145                    }
146                })),
147            );
148            let handle = timer.register(task);
149            handles.push(handle);
150        }
151
152        // 取消前 3 个定时器
153        for i in 0..3 {
154            let cancel_result = handles[i].cancel();
155            assert!(cancel_result);
156        }
157
158        tokio::time::sleep(Duration::from_millis(200)).await;
159        // 只有 2 个定时器应该被触发
160        assert_eq!(counter.load(Ordering::SeqCst), 2);
161    }
162
163    #[tokio::test]
164    async fn test_completion_notification_once() {
165        let timer = TimerWheel::with_defaults();
166        let counter = Arc::new(AtomicU32::new(0));
167        let counter_clone = Arc::clone(&counter);
168
169        let task = TimerWheel::create_task(
170            Duration::from_millis(50),
171            Some(CallbackWrapper::new(move || {
172                let counter = Arc::clone(&counter_clone);
173                async move {
174                    counter.fetch_add(1, Ordering::SeqCst);
175                }
176            })),
177        );
178        let handle = timer.register(task);
179
180        // 等待完成通知
181        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
182
183        // 验证回调已执行(等待一下以确保回调执行完成)
184        tokio::time::sleep(Duration::from_millis(20)).await;
185        assert_eq!(counter.load(Ordering::SeqCst), 1);
186    }
187
188    #[tokio::test]
189    async fn test_notify_only_timer_once() {
190        let timer = TimerWheel::with_defaults();
191        
192        let task = TimerTask::new(Duration::from_millis(50), None);
193        let handle = timer.register(task);
194
195        // 等待完成通知(无回调,仅通知)
196        handle.into_completion_receiver().0.await.expect("Should receive completion notification");
197    }
198
199    #[tokio::test]
200    async fn test_batch_completion_notifications() {
201        let timer = TimerWheel::with_defaults();
202        let counter = Arc::new(AtomicU32::new(0));
203
204        // 创建批量回调
205        let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
206            .map(|i| {
207                let counter = Arc::clone(&counter);
208                let delay = Duration::from_millis(50 + i * 10);
209                let callback = CallbackWrapper::new(move || {
210                    let counter = Arc::clone(&counter);
211                    async move {
212                        counter.fetch_add(1, Ordering::SeqCst);
213                    }
214                });
215                (delay, Some(callback))
216            })
217            .collect();
218
219        let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
220        let batch = timer.register_batch(tasks);
221        let receivers = batch.into_completion_receivers();
222
223        // 等待所有完成通知
224        for rx in receivers {
225            rx.await.expect("Should receive completion notification");
226        }
227
228        // 等待一下确保回调执行完成
229        tokio::time::sleep(Duration::from_millis(50)).await;
230
231        // 验证所有回调都已执行
232        assert_eq!(counter.load(Ordering::SeqCst), 5);
233    }
234
235    #[tokio::test]
236    async fn test_completion_reason_expired() {
237        let timer = TimerWheel::with_defaults();
238        
239        let task = TimerTask::new(Duration::from_millis(50), None);
240        let handle = timer.register(task);
241
242        // 等待完成通知并验证原因是 Expired
243        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
244        assert_eq!(result, TaskCompletionReason::Expired);
245    }
246
247    #[tokio::test]
248    async fn test_completion_reason_cancelled() {
249        let timer = TimerWheel::with_defaults();
250        
251        let task = TimerTask::new(Duration::from_secs(10), None);
252        let handle = timer.register(task);
253
254        // 取消任务
255        let cancelled = handle.cancel();
256        assert!(cancelled);
257
258        // 等待完成通知并验证原因是 Cancelled
259        let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
260        assert_eq!(result, TaskCompletionReason::Cancelled);
261    }
262
263    #[tokio::test]
264    async fn test_batch_completion_reasons() {
265        let timer = TimerWheel::with_defaults();
266        
267        // 创建 5 个任务,延迟 10 秒
268        let tasks: Vec<_> = (0..5)
269            .map(|_| TimerTask::new(Duration::from_secs(10), None))
270            .collect();
271        
272        let batch = timer.register_batch(tasks);
273        let task_ids: Vec<_> = batch.task_ids.clone();
274        let mut receivers = batch.into_completion_receivers();
275
276        // 取消前 3 个任务
277        timer.cancel_batch(&task_ids[0..3]);
278
279        // 验证前 3 个任务收到 Cancelled 通知
280        for rx in receivers.drain(0..3) {
281            let result = rx.await.expect("Should receive completion notification");
282            assert_eq!(result, TaskCompletionReason::Cancelled);
283        }
284
285        // 取消剩余任务并验证
286        timer.cancel_batch(&task_ids[3..5]);
287        for rx in receivers {
288            let result = rx.await.expect("Should receive completion notification");
289            assert_eq!(result, TaskCompletionReason::Cancelled);
290        }
291    }
292}