kestrel_timer/
lib.rs

1//! # High-Performance Async Timer System
2//!
3//! High-performance async timer based on Timing Wheel algorithm, supports tokio runtime
4//!
5//! ## Features
6//!
7//! - **High Performance**: Uses timing wheel algorithm, insert and delete operations are O(1)
8//! - **Large-Scale Support**: Efficiently manages 10000+ concurrent timers
9//! - **Async Support**: Based on tokio async runtime
10//! - **Thread-Safe**: Uses parking_lot for high-performance locking mechanism
11//!
12//! 
13//! # 高性能异步定时器库
14//! 
15//! 基于分层时间轮算法的高性能异步定时器库,支持 tokio 运行时
16//! 
17//! ## 特性
18//! 
19//! - **高性能**: 使用时间轮算法,插入和删除操作均为 O(1)
20//! - **大规模支持**: 高效管理 10000+ 并发定时器
21//! - **异步支持**: 基于 tokio 异步运行时
22//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
23//! 
24//! ## Quick Start (快速开始)
25//!
26//! ```no_run
27//! use kestrel_timer::{TimerWheel, CallbackWrapper, TimerTask};
28//! use std::time::Duration;
29//! use std::sync::Arc;
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     // Create timer manager (创建定时器管理器)
34//!     let timer = TimerWheel::with_defaults();
35//!     
36//!     // Step 1: Create timer task (使用回调创建定时器任务)
37//!     let callback = Some(CallbackWrapper::new(|| async {
38//!         println!("Timer fired after 1 second!");
39//!     }));
40//!     let task = TimerTask::new_oneshot(Duration::from_secs(1), callback);
41//!     let task_id = task.get_id();
42//!     
43//!     // Step 2: Register timer task and get completion notification (注册定时器任务并获取完成通知)
44//!     let handle = timer.register(task);
45//!     
46//!     // Wait for timer completion (等待定时器完成)
47//!     use kestrel_timer::CompletionReceiver;
48//!     let (rx, _handle) = handle.into_parts();
49//!     match rx {
50//!         CompletionReceiver::OneShot(receiver) => {
51//!             receiver.wait().await;
52//!         },
53//!         _ => {}
54//!     }
55//!     Ok(())
56//! }
57//! ```
58//!
59//! ## English Architecture Description
60//!
61//! ### Timing Wheel Algorithm
62//!
63//! Uses hierarchical timing wheel algorithm with L0 and L1 layers:
64//!
65//! - **L0 Layer (Bottom)**: Handles short delay tasks
66//!   - Slot count: Default 512, configurable, must be power of 2
67//!   - Time precision: Default 10ms, configurable
68//!   - Maximum time span: 5.12 seconds
69//!
70//! - **L1 Layer (Upper)**: Handles long delay tasks
71//!   - Slot count: Default 64, configurable, must be power of 2
72//!   - Time precision: Default 1 second, configurable
73//!   - Maximum time span: 64 seconds
74//!
75//! - **Round Mechanism**: Tasks beyond L1 range use round counting
76//! 
77//! ### Performance Optimization
78//!
79//! - Uses `parking_lot::Mutex` instead of standard library Mutex for better performance
80//!   - Uses `FxHashMap` (rustc-hash) instead of standard HashMap to reduce hash collisions
81//!   - Slot count is power of 2, uses bitwise operations to optimize modulo
82//!   - Task execution in separate tokio tasks to avoid blocking timing wheel advancement
83//! 
84//! 
85//! 
86//! ## 中文架构说明
87//!
88//! ### 时间轮算法
89//!
90//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
91//!
92//! - **L0 层(底层)**: 处理短延迟任务
93//!   - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
94//!   - 时间精度: 默认 10ms(可配置)
95//!   - 最大时间跨度: 5.12 秒
96//!
97//! - **L1 层(高层)**: 处理长延迟任务
98//!   - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
99//!   - 时间精度: 默认 1 秒(可配置)
100//!   - 最大时间跨度: 64 秒
101//!
102//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
103//!
104//! ### 性能优化
105//!
106//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
107//! - 使用 `FxHashMap`(rustc-hash)替代标准 HashMap,减少哈希冲突
108//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
109//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
110//! 
111
112pub mod config;
113pub mod error;
114pub mod task;
115pub mod wheel;
116pub mod timer;
117mod service;
118pub mod utils {
119    pub(crate) mod atomic_waker;
120    pub mod oneshot;
121    pub mod spsc;
122    pub mod ringbuf;
123    pub mod notify;
124    pub(crate) mod vec;
125}
126
127// Re-export public API
128pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletion};
129pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
130pub use task::CompletionReceiver;
131pub use timer::TimerWheel;
132pub use service::{TimerService, TaskNotification};
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use std::sync::atomic::{AtomicU32, Ordering};
138    use std::sync::Arc;
139    use std::time::Duration;
140
141    #[tokio::test]
142    async fn test_basic_timer() {
143        let timer = TimerWheel::with_defaults();
144        let counter = Arc::new(AtomicU32::new(0));
145        let counter_clone = Arc::clone(&counter);
146
147        let task = TimerTask::new_oneshot(
148            Duration::from_millis(50),
149            Some(CallbackWrapper::new(move || {
150                let counter =  Arc::clone(&counter_clone);
151                async move {
152                    counter.fetch_add(1, Ordering::SeqCst);
153                }
154            })),
155        );
156        timer.register(task);
157
158        tokio::time::sleep(Duration::from_millis(100)).await;
159        assert_eq!(counter.load(Ordering::SeqCst), 1);
160    }
161
162    #[tokio::test]
163    async fn test_multiple_timers() {
164        let timer = TimerWheel::with_defaults();
165        let counter = Arc::new(AtomicU32::new(0));
166
167        // Create 10 timers
168        for i in 0..10 {
169            let counter_clone = Arc::clone(&counter);
170            let task = TimerTask::new_oneshot(
171                Duration::from_millis(10 * (i + 1)),
172                Some(CallbackWrapper::new(move || {
173                    let counter = Arc::clone(&counter_clone);
174                    async move {
175                        counter.fetch_add(1, Ordering::SeqCst);
176                    }
177                })),
178            );
179            timer.register(task);
180        }
181
182        tokio::time::sleep(Duration::from_millis(200)).await;
183        assert_eq!(counter.load(Ordering::SeqCst), 10);
184    }
185
186    #[tokio::test]
187    async fn test_timer_cancellation() {
188        let timer = TimerWheel::with_defaults();
189        let counter = Arc::new(AtomicU32::new(0));
190
191        // Create 5 timers
192        let mut handles = Vec::new();
193        for _ in 0..5 {
194            let counter_clone = Arc::clone(&counter);
195            let task = TimerTask::new_oneshot(
196                Duration::from_millis(100),
197                Some(CallbackWrapper::new(move || {
198                    let counter = Arc::clone(&counter_clone);
199                    async move {
200                        counter.fetch_add(1, Ordering::SeqCst);
201                    }
202                })),
203            );
204            let handle = timer.register(task);
205            handles.push(handle);
206        }
207
208        // Cancel first 3 timers
209        for i in 0..3 {
210            let cancel_result = handles[i].cancel();
211            assert!(cancel_result);
212        }
213
214        tokio::time::sleep(Duration::from_millis(200)).await;
215        // Only 2 timers should be triggered
216        assert_eq!(counter.load(Ordering::SeqCst), 2);
217    }
218
219    #[tokio::test]
220    async fn test_completion_notification_once() {
221        let timer = TimerWheel::with_defaults();
222        let counter = Arc::new(AtomicU32::new(0));
223        let counter_clone = Arc::clone(&counter);
224
225        let task = TimerTask::new_oneshot(
226            Duration::from_millis(50),
227            Some(CallbackWrapper::new(move || {
228                let counter = Arc::clone(&counter_clone);
229                async move {
230                    counter.fetch_add(1, Ordering::SeqCst);
231                }
232            })),
233        );
234        let handle = timer.register(task);
235
236        // Wait for completion notification
237        let (rx, _handle) = handle.into_parts();
238        match rx {
239            task::CompletionReceiver::OneShot(receiver) => {
240                receiver.wait().await;
241            },
242            _ => panic!("Expected OneShot completion receiver"),
243        }
244
245        // Verify callback has been executed (wait a moment to ensure callback execution is complete)
246        tokio::time::sleep(Duration::from_millis(20)).await;
247        assert_eq!(counter.load(Ordering::SeqCst), 1);
248    }
249
250    #[tokio::test]
251    async fn test_notify_only_timer_once() {
252        let timer = TimerWheel::with_defaults();
253        
254        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
255        let handle = timer.register(task);
256
257        // Wait for completion notification (no callback, only notification)
258        let (rx, _handle) = handle.into_parts();
259        match rx {
260            task::CompletionReceiver::OneShot(receiver) => {
261                receiver.wait().await;
262            },
263            _ => panic!("Expected OneShot completion receiver"),
264        }
265    }
266
267    #[tokio::test]
268    async fn test_batch_completion_notifications() {
269        let timer = TimerWheel::with_defaults();
270        let counter = Arc::new(AtomicU32::new(0));
271
272        // Create batch callbacks
273        let callbacks: Vec<TimerTask> = (0..5)
274            .map(|i| {
275                let counter = Arc::clone(&counter);
276                let delay = Duration::from_millis(50 + i * 10);
277                let callback = CallbackWrapper::new(move || {
278                    let counter = Arc::clone(&counter);
279                    async move {
280                        counter.fetch_add(1, Ordering::SeqCst);
281                    }
282                });
283                TimerTask::new_oneshot(delay, Some(callback))
284            })
285            .collect();
286
287        let batch = timer.register_batch(callbacks);
288        let (receivers, _batch_handle) = batch.into_parts();
289
290        // Wait for all completion notifications
291        for rx in receivers {
292            match rx {
293                task::CompletionReceiver::OneShot(receiver) => {
294                    receiver.wait().await;
295                },
296                _ => panic!("Expected OneShot completion receiver"),
297            }
298        }
299
300        // Wait a moment to ensure callback execution is complete
301        tokio::time::sleep(Duration::from_millis(50)).await;
302
303        // Verify all callbacks have been executed
304        assert_eq!(counter.load(Ordering::SeqCst), 5);
305    }
306
307    #[tokio::test]
308    async fn test_completion_reason_expired() {
309        let timer = TimerWheel::with_defaults();
310        
311        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
312        let handle = timer.register(task);
313
314        // Wait for completion notification and verify reason is Expired
315        let (rx, _handle) = handle.into_parts();
316        let result = match rx {
317            task::CompletionReceiver::OneShot(receiver) => {
318                receiver.wait().await
319            },
320            _ => panic!("Expected OneShot completion receiver"),
321        };
322        assert_eq!(result, TaskCompletion::Called);
323    }
324
325    #[tokio::test]
326    async fn test_completion_reason_cancelled() {
327        let timer = TimerWheel::with_defaults();
328        
329        let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
330        let handle = timer.register(task);
331
332        // Cancel task
333        let cancelled = handle.cancel();
334        assert!(cancelled);
335
336        // Wait for completion notification and verify reason is Cancelled
337        let (rx, _handle) = handle.into_parts();
338        let result = match rx {
339            task::CompletionReceiver::OneShot(receiver) => {
340                receiver.wait().await
341            },
342            _ => panic!("Expected OneShot completion receiver"),
343        };
344        assert_eq!(result, TaskCompletion::Cancelled);
345    }
346
347    #[tokio::test]
348    async fn test_batch_completion_reasons() {
349        let timer = TimerWheel::with_defaults();
350        
351        // Create 5 tasks, delay 10 seconds
352        let tasks: Vec<_> = (0..5)
353            .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
354            .collect();
355        
356        let batch = timer.register_batch(tasks);
357        let task_ids: Vec<_> = batch.task_ids().to_vec();
358        let (mut receivers, _batch_handle) = batch.into_parts();
359
360        // Cancel first 3 tasks
361        timer.cancel_batch(&task_ids[0..3]);
362
363        // Verify first 3 tasks received Cancelled notification
364        for rx in receivers.drain(0..3) {
365            let result = match rx {
366                task::CompletionReceiver::OneShot(receiver) => {
367                    receiver.wait().await
368                },
369                _ => panic!("Expected OneShot completion receiver"),
370            };
371            assert_eq!(result, TaskCompletion::Cancelled);
372        }
373
374        // Cancel remaining tasks and verify
375        timer.cancel_batch(&task_ids[3..5]);
376        for rx in receivers {
377            let result = match rx {
378                task::CompletionReceiver::OneShot(receiver) => {
379                    receiver.wait().await
380                },
381                _ => panic!("Expected OneShot completion receiver"),
382            };
383            assert_eq!(result, TaskCompletion::Cancelled);
384        }
385    }
386}