kestrel_timer/
lib.rs

1//! # High-Performance Async Timer System
2//!
3//! High-performance async timer based on Timing Wheel algorithm, supports tokio runtime
4//!
5//! ## Features
6//!
7//! - **High Performance**: Uses timing wheel algorithm, insert and delete operations are O(1)
8//! - **Large-Scale Support**: Efficiently manages 10000+ concurrent timers
9//! - **Async Support**: Based on tokio async runtime
10//! - **Thread-Safe**: Uses parking_lot for high-performance locking mechanism
11//!
12//! 
13//! # 高性能异步定时器库
14//! 
15//! 基于分层时间轮算法的高性能异步定时器库,支持 tokio 运行时
16//! 
17//! ## 特性
18//! 
19//! - **高性能**: 使用时间轮算法,插入和删除操作均为 O(1)
20//! - **大规模支持**: 高效管理 10000+ 并发定时器
21//! - **异步支持**: 基于 tokio 异步运行时
22//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
23//! 
24//! ## Quick Start (快速开始)
25//!
26//! ```no_run
27//! use kestrel_timer::{TimerWheel, CallbackWrapper, TimerTask};
28//! use std::time::Duration;
29//! use std::sync::Arc;
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     // Create timer manager (创建定时器管理器)
34//!     let timer = TimerWheel::with_defaults();
35//!     
36//!     // Step 1: Allocate handle to get task_id (分配 handle 获取 task_id)
37//!     let handle = timer.allocate_handle();
38//!     let task_id = handle.task_id();
39//!     
40//!     // Step 2: Create timer task (创建定时器任务)
41//!     let callback = Some(CallbackWrapper::new(|| async {
42//!         println!("Timer fired after 1 second!");
43//!     }));
44//!     let task = TimerTask::new_oneshot(Duration::from_secs(1), callback);
45//!     
46//!     // Step 3: Register timer task and get completion notification (注册定时器任务并获取完成通知)
47//!     let timer_handle = timer.register(handle, task);
48//!     
49//!     // Wait for timer completion (等待定时器完成)
50//!     use kestrel_timer::CompletionReceiver;
51//!     let (rx, _handle) = timer_handle.into_parts();
52//!     match rx {
53//!         CompletionReceiver::OneShot(receiver) => {
54//!             receiver.wait().await;
55//!         },
56//!         _ => {}
57//!     }
58//!     Ok(())
59//! }
60//! ```
61//!
62//! ## English Architecture Description
63//!
64//! ### Timing Wheel Algorithm
65//!
66//! Uses hierarchical timing wheel algorithm with L0 and L1 layers:
67//!
68//! - **L0 Layer (Bottom)**: Handles short delay tasks
69//!   - Slot count: Default 512, configurable, must be power of 2
70//!   - Time precision: Default 10ms, configurable
71//!   - Maximum time span: 5.12 seconds
72//!
73//! - **L1 Layer (Upper)**: Handles long delay tasks
74//!   - Slot count: Default 64, configurable, must be power of 2
75//!   - Time precision: Default 1 second, configurable
76//!   - Maximum time span: 64 seconds
77//!
78//! - **Round Mechanism**: Tasks beyond L1 range use round counting
79//!
80//! ### Task Indexing with DeferredMap
81//!
82//! Uses `DeferredMap` (a generational arena) for efficient task management:
83//!
84//! - **Two-Step Registration**: 
85//!   1. Allocate handle to get task ID (cheap, no value needed)
86//!   2. Insert task using the handle (with completion notifiers)
87//!
88//! - **Generational Safety**: Each task ID includes:
89//!   - Lower 32 bits: Slot index
90//!   - Upper 32 bits: Generation counter
91//!   - Prevents use-after-free and ABA problems
92//!
93//! - **Memory Efficiency**: Slots use union-based storage
94//!   - Occupied slots: Store task data
95//!   - Vacant slots: Store free-list pointer
96//!
97//! ### Performance Optimization
98//!
99//! - Uses `parking_lot::Mutex` instead of standard library Mutex for better performance
100//! - Uses `DeferredMap` (generational arena) for task indexing:
101//!   - O(1) task lookup, insertion, and removal
102//!   - Generational indices prevent use-after-free bugs
103//!   - Memory-efficient slot reuse with union-based storage
104//!   - Deferred insertion allows getting task ID before inserting task
105//! - Slot count is power of 2, uses bitwise operations to optimize modulo
106//! - Task execution in separate tokio tasks to avoid blocking timing wheel advancement
107//! 
108//! 
109//! 
110//! ## 中文架构说明
111//!
112//! ### 时间轮算法
113//!
114//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
115//!
116//! - **L0 层(底层)**: 处理短延迟任务
117//!   - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
118//!   - 时间精度: 默认 10ms(可配置)
119//!   - 最大时间跨度: 5.12 秒
120//!
121//! - **L1 层(高层)**: 处理长延迟任务
122//!   - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
123//!   - 时间精度: 默认 1 秒(可配置)
124//!   - 最大时间跨度: 64 秒
125//!
126//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
127//!
128//! ### 基于 DeferredMap 的任务索引
129//!
130//! 使用 `DeferredMap`(代数竞技场)实现高效任务管理:
131//!
132//! - **两步注册流程**:
133//!   1. 分配 handle 获取任务 ID(轻量操作,无需准备任务值)
134//!   2. 使用 handle 插入任务(携带完成通知器)
135//!
136//! - **代数安全**: 每个任务 ID 包含:
137//!   - 低 32 位:槽位索引
138//!   - 高 32 位:代数计数器
139//!   - 防止释放后使用和 ABA 问题
140//!
141//! - **内存高效**: 槽位使用联合体存储
142//!   - 已占用槽位:存储任务数据
143//!   - 空闲槽位:存储空闲链表指针
144//!
145//! ### 性能优化
146//!
147//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
148//! - 使用 `DeferredMap`(代数竞技场)进行任务索引:
149//!   - O(1) 任务查找、插入和删除
150//!   - 代数索引防止释放后使用(use-after-free)错误
151//!   - 基于联合体的槽位存储,内存高效复用
152//!   - 延迟插入允许在插入任务前获取任务 ID
153//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
154//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
155//! 
156
157pub mod config;
158pub mod error;
159pub mod task;
160pub mod wheel;
161pub mod timer;
162mod service;
163
164#[cfg(test)]
165mod tests;
166
167// Re-export public API
168pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletion};
169pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
170pub use task::CompletionReceiver;
171pub use timer::TimerWheel;
172pub use service::{TimerService, TaskNotification};
173
174#[cfg(test)]
175mod integration_tests {
176    use super::*;
177    use std::sync::atomic::{AtomicU32, Ordering};
178    use std::sync::Arc;
179    use std::time::Duration;
180
181    #[tokio::test]
182    async fn test_basic_timer() {
183        let timer = TimerWheel::with_defaults();
184        let counter = Arc::new(AtomicU32::new(0));
185        let counter_clone = Arc::clone(&counter);
186
187        let handle = timer.allocate_handle();
188        let task = TimerTask::new_oneshot(
189            Duration::from_millis(50),
190            Some(CallbackWrapper::new(move || {
191                let counter =  Arc::clone(&counter_clone);
192                async move {
193                    counter.fetch_add(1, Ordering::SeqCst);
194                }
195            })),
196        );
197        timer.register(handle, task);
198
199        tokio::time::sleep(Duration::from_millis(100)).await;
200        assert_eq!(counter.load(Ordering::SeqCst), 1);
201    }
202
203    #[tokio::test]
204    async fn test_multiple_timers() {
205        let timer = TimerWheel::with_defaults();
206        let counter = Arc::new(AtomicU32::new(0));
207
208        // Create 10 timers
209        for i in 0..10 {
210            let counter_clone = Arc::clone(&counter);
211            let handle = timer.allocate_handle();
212            let task = TimerTask::new_oneshot(
213                Duration::from_millis(10 * (i + 1)),
214                Some(CallbackWrapper::new(move || {
215                    let counter = Arc::clone(&counter_clone);
216                    async move {
217                        counter.fetch_add(1, Ordering::SeqCst);
218                    }
219                })),
220            );
221            timer.register(handle, task);
222        }
223
224        tokio::time::sleep(Duration::from_millis(200)).await;
225        assert_eq!(counter.load(Ordering::SeqCst), 10);
226    }
227
228    #[tokio::test]
229    async fn test_timer_cancellation() {
230        let timer = TimerWheel::with_defaults();
231        let counter = Arc::new(AtomicU32::new(0));
232
233        // Create 5 timers
234        let mut handles = Vec::new();
235        for _ in 0..5 {
236            let counter_clone = Arc::clone(&counter);
237            let alloc_handle = timer.allocate_handle();
238            let task = TimerTask::new_oneshot(
239                Duration::from_millis(100),
240                Some(CallbackWrapper::new(move || {
241                    let counter = Arc::clone(&counter_clone);
242                    async move {
243                        counter.fetch_add(1, Ordering::SeqCst);
244                    }
245                })),
246            );
247            let handle = timer.register(alloc_handle, task);
248            handles.push(handle);
249        }
250
251        // Cancel first 3 timers
252        for i in 0..3 {
253            let cancel_result = handles[i].cancel();
254            assert!(cancel_result);
255        }
256
257        tokio::time::sleep(Duration::from_millis(200)).await;
258        // Only 2 timers should be triggered
259        assert_eq!(counter.load(Ordering::SeqCst), 2);
260    }
261
262    #[tokio::test]
263    async fn test_completion_notification_once() {
264        let timer = TimerWheel::with_defaults();
265        let counter = Arc::new(AtomicU32::new(0));
266        let counter_clone = Arc::clone(&counter);
267
268        let alloc_handle = timer.allocate_handle();
269        let task = TimerTask::new_oneshot(
270            Duration::from_millis(50),
271            Some(CallbackWrapper::new(move || {
272                let counter = Arc::clone(&counter_clone);
273                async move {
274                    counter.fetch_add(1, Ordering::SeqCst);
275                }
276            })),
277        );
278        let handle = timer.register(alloc_handle, task);
279
280        // Wait for completion notification
281        let (rx, _handle) = handle.into_parts();
282        match rx {
283            task::CompletionReceiver::OneShot(receiver) => {
284                receiver.wait().await;
285            },
286            _ => panic!("Expected OneShot completion receiver"),
287        }
288
289        // Verify callback has been executed (wait a moment to ensure callback execution is complete)
290        tokio::time::sleep(Duration::from_millis(20)).await;
291        assert_eq!(counter.load(Ordering::SeqCst), 1);
292    }
293
294    #[tokio::test]
295    async fn test_notify_only_timer_once() {
296        let timer = TimerWheel::with_defaults();
297        
298        let alloc_handle = timer.allocate_handle();
299        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
300        let handle = timer.register(alloc_handle, task);
301
302        // Wait for completion notification (no callback, only notification)
303        let (rx, _handle) = handle.into_parts();
304        match rx {
305            task::CompletionReceiver::OneShot(receiver) => {
306                receiver.wait().await;
307            },
308            _ => panic!("Expected OneShot completion receiver"),
309        }
310    }
311
312    #[tokio::test]
313    async fn test_batch_completion_notifications() {
314        let timer = TimerWheel::with_defaults();
315        let counter = Arc::new(AtomicU32::new(0));
316
317        // Step 1: Allocate handles
318        let handles = timer.allocate_handles(5);
319
320        // Step 2: Create batch callbacks
321        let tasks: Vec<_> = (0..5)
322            .map(|i| {
323                let counter = Arc::clone(&counter);
324                let delay = Duration::from_millis(50 + i as u64 * 10);
325                let callback = CallbackWrapper::new(move || {
326                    let counter = Arc::clone(&counter);
327                    async move {
328                        counter.fetch_add(1, Ordering::SeqCst);
329                    }
330                });
331                TimerTask::new_oneshot(delay, Some(callback))
332            })
333            .collect();
334
335        // Step 3: Batch register
336        let batch = timer.register_batch(handles, tasks).expect("register_batch should succeed");
337        let (receivers, _batch_handle) = batch.into_parts();
338
339        // Wait for all completion notifications
340        for rx in receivers {
341            match rx {
342                task::CompletionReceiver::OneShot(receiver) => {
343                    receiver.wait().await;
344                },
345                _ => panic!("Expected OneShot completion receiver"),
346            }
347        }
348
349        // Wait a moment to ensure callback execution is complete
350        tokio::time::sleep(Duration::from_millis(50)).await;
351
352        // Verify all callbacks have been executed
353        assert_eq!(counter.load(Ordering::SeqCst), 5);
354    }
355
356    #[tokio::test]
357    async fn test_completion_reason_expired() {
358        let timer = TimerWheel::with_defaults();
359        
360        let alloc_handle = timer.allocate_handle();
361        let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
362        let handle = timer.register(alloc_handle, task);
363
364        // Wait for completion notification and verify reason is Expired
365        let (rx, _handle) = handle.into_parts();
366        let result = match rx {
367            task::CompletionReceiver::OneShot(receiver) => {
368                receiver.wait().await
369            },
370            _ => panic!("Expected OneShot completion receiver"),
371        };
372        assert_eq!(result, TaskCompletion::Called);
373    }
374
375    #[tokio::test]
376    async fn test_completion_reason_cancelled() {
377        let timer = TimerWheel::with_defaults();
378        
379        let alloc_handle = timer.allocate_handle();
380        let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
381        let handle = timer.register(alloc_handle, task);
382
383        // Cancel task
384        let cancelled = handle.cancel();
385        assert!(cancelled);
386
387        // Wait for completion notification and verify reason is Cancelled
388        let (rx, _handle) = handle.into_parts();
389        let result = match rx {
390            task::CompletionReceiver::OneShot(receiver) => {
391                receiver.wait().await
392            },
393            _ => panic!("Expected OneShot completion receiver"),
394        };
395        assert_eq!(result, TaskCompletion::Cancelled);
396    }
397
398    #[tokio::test]
399    async fn test_batch_completion_reasons() {
400        let timer = TimerWheel::with_defaults();
401        
402        // Step 1: Allocate handles
403        let handles = timer.allocate_handles(5);
404        
405        // Step 2: Create 5 tasks with 10 seconds delay
406        let tasks: Vec<_> = (0..5)
407            .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
408            .collect();
409        
410        // Step 3: Batch register
411        let batch = timer.register_batch(handles, tasks).expect("register_batch should succeed");
412        let task_ids: Vec<_> = batch.task_ids().to_vec();
413        let (mut receivers, _batch_handle) = batch.into_parts();
414
415        // Cancel first 3 tasks
416        timer.cancel_batch(&task_ids[0..3]);
417
418        // Verify first 3 tasks received Cancelled notification
419        for rx in receivers.drain(0..3) {
420            let result = match rx {
421                task::CompletionReceiver::OneShot(receiver) => {
422                    receiver.wait().await
423                },
424                _ => panic!("Expected OneShot completion receiver"),
425            };
426            assert_eq!(result, TaskCompletion::Cancelled);
427        }
428
429        // Cancel remaining tasks and verify
430        timer.cancel_batch(&task_ids[3..5]);
431        for rx in receivers {
432            let result = match rx {
433                task::CompletionReceiver::OneShot(receiver) => {
434                    receiver.wait().await
435                },
436                _ => panic!("Expected OneShot completion receiver"),
437            };
438            assert_eq!(result, TaskCompletion::Cancelled);
439        }
440    }
441}