kestrel_timer/lib.rs
1//! # High-Performance Async Timer System
2//!
3//! High-performance async timer based on Timing Wheel algorithm, supports tokio runtime
4//!
5//! ## Features
6//!
7//! - **High Performance**: Uses timing wheel algorithm, insert and delete operations are O(1)
8//! - **Large-Scale Support**: Efficiently manages 10000+ concurrent timers
9//! - **Async Support**: Based on tokio async runtime
10//! - **Thread-Safe**: Uses parking_lot for high-performance locking mechanism
11//!
12//!
13//! # 高性能异步定时器库
14//!
15//! 基于分层时间轮算法的高性能异步定时器库,支持 tokio 运行时
16//!
17//! ## 特性
18//!
19//! - **高性能**: 使用时间轮算法,插入和删除操作均为 O(1)
20//! - **大规模支持**: 高效管理 10000+ 并发定时器
21//! - **异步支持**: 基于 tokio 异步运行时
22//! - **线程安全**: 使用 parking_lot 提供高性能的锁机制
23//!
24//! ## Quick Start (快速开始)
25//!
26//! ```no_run
27//! use kestrel_timer::{TimerWheel, CallbackWrapper, TimerTask};
28//! use std::time::Duration;
29//! use std::sync::Arc;
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//! // Create timer manager (创建定时器管理器)
34//! let timer = TimerWheel::with_defaults();
35//!
36//! // Step 1: Allocate handle to get task_id (分配 handle 获取 task_id)
37//! let handle = timer.allocate_handle();
38//! let task_id = handle.task_id();
39//!
40//! // Step 2: Create timer task (创建定时器任务)
41//! let callback = Some(CallbackWrapper::new(|| async {
42//! println!("Timer fired after 1 second!");
43//! }));
44//! let task = TimerTask::new_oneshot(Duration::from_secs(1), callback);
45//!
46//! // Step 3: Register timer task and get completion notification (注册定时器任务并获取完成通知)
47//! let timer_handle = timer.register(handle, task);
48//!
49//! // Wait for timer completion (等待定时器完成)
50//! use kestrel_timer::CompletionReceiver;
51//! let (rx, _handle) = timer_handle.into_parts();
52//! match rx {
53//! CompletionReceiver::OneShot(receiver) => {
54//! receiver.recv().await.unwrap();
55//! },
56//! _ => {}
57//! }
58//! Ok(())
59//! }
60//! ```
61//!
62//! ## English Architecture Description
63//!
64//! ### Timing Wheel Algorithm
65//!
66//! Uses hierarchical timing wheel algorithm with L0 and L1 layers:
67//!
68//! - **L0 Layer (Bottom)**: Handles short delay tasks
69//! - Slot count: Default 512, configurable, must be power of 2
70//! - Time precision: Default 10ms, configurable
71//! - Maximum time span: 5.12 seconds
72//!
73//! - **L1 Layer (Upper)**: Handles long delay tasks
74//! - Slot count: Default 64, configurable, must be power of 2
75//! - Time precision: Default 1 second, configurable
76//! - Maximum time span: 64 seconds
77//!
78//! - **Round Mechanism**: Tasks beyond L1 range use round counting
79//!
80//! ### Task Indexing with DeferredMap
81//!
82//! Uses `DeferredMap` (a generational arena) for efficient task management:
83//!
84//! - **Two-Step Registration**:
85//! 1. Allocate handle to get task ID (cheap, no value needed)
86//! 2. Insert task using the handle (with completion notifiers)
87//!
88//! - **Generational Safety**: Each task ID includes:
89//! - Lower 32 bits: Slot index
90//! - Upper 32 bits: Generation counter
91//! - Prevents use-after-free and ABA problems
92//!
93//! - **Memory Efficiency**: Slots use union-based storage
94//! - Occupied slots: Store task data
95//! - Vacant slots: Store free-list pointer
96//!
97//! ### Performance Optimization
98//!
99//! - Uses `parking_lot::Mutex` instead of standard library Mutex for better performance
100//! - Uses `DeferredMap` (generational arena) for task indexing:
101//! - O(1) task lookup, insertion, and removal
102//! - Generational indices prevent use-after-free bugs
103//! - Memory-efficient slot reuse with union-based storage
104//! - Deferred insertion allows getting task ID before inserting task
105//! - Slot count is power of 2, uses bitwise operations to optimize modulo
106//! - Task execution in separate tokio tasks to avoid blocking timing wheel advancement
107//!
108//!
109//!
110//! ## 中文架构说明
111//!
112//! ### 时间轮算法
113//!
114//! 采用分层时间轮(Hierarchical Timing Wheel)算法,包含 L0 和 L1 两层:
115//!
116//! - **L0 层(底层)**: 处理短延迟任务
117//! - 槽位数量: 默认 512 个(可配置,必须是 2 的幂次方)
118//! - 时间精度: 默认 10ms(可配置)
119//! - 最大时间跨度: 5.12 秒
120//!
121//! - **L1 层(高层)**: 处理长延迟任务
122//! - 槽位数量: 默认 64 个(可配置,必须是 2 的幂次方)
123//! - 时间精度: 默认 1 秒(可配置)
124//! - 最大时间跨度: 64 秒
125//!
126//! - **轮次机制**: 超出 L1 层范围的任务使用轮次计数处理
127//!
128//! ### 基于 DeferredMap 的任务索引
129//!
130//! 使用 `DeferredMap`(代数竞技场)实现高效任务管理:
131//!
132//! - **两步注册流程**:
133//! 1. 分配 handle 获取任务 ID(轻量操作,无需准备任务值)
134//! 2. 使用 handle 插入任务(携带完成通知器)
135//!
136//! - **代数安全**: 每个任务 ID 包含:
137//! - 低 32 位:槽位索引
138//! - 高 32 位:代数计数器
139//! - 防止释放后使用和 ABA 问题
140//!
141//! - **内存高效**: 槽位使用联合体存储
142//! - 已占用槽位:存储任务数据
143//! - 空闲槽位:存储空闲链表指针
144//!
145//! ### 性能优化
146//!
147//! - 使用 `parking_lot::Mutex` 替代标准库的 Mutex,提供更好的性能
148//! - 使用 `DeferredMap`(代数竞技场)进行任务索引:
149//! - O(1) 任务查找、插入和删除
150//! - 代数索引防止释放后使用(use-after-free)错误
151//! - 基于联合体的槽位存储,内存高效复用
152//! - 延迟插入允许在插入任务前获取任务 ID
153//! - 槽位数量为 2 的幂次方,使用位运算优化取模操作
154//! - 任务执行在独立的 tokio 任务中,避免阻塞时间轮推进
155//!
156
157pub mod config;
158pub mod error;
159mod service;
160pub mod task;
161pub mod timer;
162pub mod wheel;
163
164#[cfg(test)]
165mod tests;
166
167// Re-export public API
168pub use service::{TaskNotification, TimerService};
169pub use task::CompletionReceiver;
170pub use task::{CallbackWrapper, TaskCompletion, TaskId, TimerTask};
171pub use timer::TimerWheel;
172pub use timer::handle::{
173 BatchHandle, BatchHandleWithCompletion, TimerHandle, TimerHandleWithCompletion,
174};
175pub use lite_sync::spsc;
176
177#[cfg(test)]
178mod integration_tests {
179 use super::*;
180 use std::sync::Arc;
181 use std::sync::atomic::{AtomicU32, Ordering};
182 use std::time::Duration;
183
184 #[tokio::test]
185 async fn test_basic_timer() {
186 let timer = TimerWheel::with_defaults();
187 let counter = Arc::new(AtomicU32::new(0));
188 let counter_clone = Arc::clone(&counter);
189
190 let handle = timer.allocate_handle();
191 let task = TimerTask::new_oneshot(
192 Duration::from_millis(50),
193 Some(CallbackWrapper::new(move || {
194 let counter = Arc::clone(&counter_clone);
195 async move {
196 counter.fetch_add(1, Ordering::SeqCst);
197 }
198 })),
199 );
200 timer.register(handle, task);
201
202 tokio::time::sleep(Duration::from_millis(100)).await;
203 assert_eq!(counter.load(Ordering::SeqCst), 1);
204 }
205
206 #[tokio::test]
207 async fn test_multiple_timers() {
208 let timer = TimerWheel::with_defaults();
209 let counter = Arc::new(AtomicU32::new(0));
210
211 // Create 10 timers
212 for i in 0..10 {
213 let counter_clone = Arc::clone(&counter);
214 let handle = timer.allocate_handle();
215 let task = TimerTask::new_oneshot(
216 Duration::from_millis(10 * (i + 1)),
217 Some(CallbackWrapper::new(move || {
218 let counter = Arc::clone(&counter_clone);
219 async move {
220 counter.fetch_add(1, Ordering::SeqCst);
221 }
222 })),
223 );
224 timer.register(handle, task);
225 }
226
227 tokio::time::sleep(Duration::from_millis(200)).await;
228 assert_eq!(counter.load(Ordering::SeqCst), 10);
229 }
230
231 #[tokio::test]
232 async fn test_timer_cancellation() {
233 let timer = TimerWheel::with_defaults();
234 let counter = Arc::new(AtomicU32::new(0));
235
236 // Create 5 timers
237 let mut handles = Vec::new();
238 for _ in 0..5 {
239 let counter_clone = Arc::clone(&counter);
240 let alloc_handle = timer.allocate_handle();
241 let task = TimerTask::new_oneshot(
242 Duration::from_millis(100),
243 Some(CallbackWrapper::new(move || {
244 let counter = Arc::clone(&counter_clone);
245 async move {
246 counter.fetch_add(1, Ordering::SeqCst);
247 }
248 })),
249 );
250 let handle = timer.register(alloc_handle, task);
251 handles.push(handle);
252 }
253
254 // Cancel first 3 timers
255 for i in 0..3 {
256 let cancel_result = handles[i].cancel();
257 assert!(cancel_result);
258 }
259
260 tokio::time::sleep(Duration::from_millis(200)).await;
261 // Only 2 timers should be triggered
262 assert_eq!(counter.load(Ordering::SeqCst), 2);
263 }
264
265 #[tokio::test]
266 async fn test_completion_notification_once() {
267 let timer = TimerWheel::with_defaults();
268 let counter = Arc::new(AtomicU32::new(0));
269 let counter_clone = Arc::clone(&counter);
270
271 let alloc_handle = timer.allocate_handle();
272 let task = TimerTask::new_oneshot(
273 Duration::from_millis(50),
274 Some(CallbackWrapper::new(move || {
275 let counter = Arc::clone(&counter_clone);
276 async move {
277 counter.fetch_add(1, Ordering::SeqCst);
278 }
279 })),
280 );
281 let handle = timer.register(alloc_handle, task);
282
283 // Wait for completion notification
284 let (rx, _handle) = handle.into_parts();
285 match rx {
286 task::CompletionReceiver::OneShot(receiver) => {
287 receiver.recv().await.unwrap();
288 }
289 _ => panic!("Expected OneShot completion receiver"),
290 }
291
292 // Verify callback has been executed (wait a moment to ensure callback execution is complete)
293 tokio::time::sleep(Duration::from_millis(20)).await;
294 assert_eq!(counter.load(Ordering::SeqCst), 1);
295 }
296
297 #[tokio::test]
298 async fn test_notify_only_timer_once() {
299 let timer = TimerWheel::with_defaults();
300
301 let alloc_handle = timer.allocate_handle();
302 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
303 let handle = timer.register(alloc_handle, task);
304
305 // Wait for completion notification (no callback, only notification)
306 let (rx, _handle) = handle.into_parts();
307 match rx {
308 task::CompletionReceiver::OneShot(receiver) => {
309 receiver.recv().await.unwrap();
310 }
311 _ => panic!("Expected OneShot completion receiver"),
312 }
313 }
314
315 #[tokio::test]
316 async fn test_batch_completion_notifications() {
317 let timer = TimerWheel::with_defaults();
318 let counter = Arc::new(AtomicU32::new(0));
319
320 // Step 1: Allocate handles
321 let handles = timer.allocate_handles(5);
322
323 // Step 2: Create batch callbacks
324 let tasks: Vec<_> = (0..5)
325 .map(|i| {
326 let counter = Arc::clone(&counter);
327 let delay = Duration::from_millis(50 + i as u64 * 10);
328 let callback = CallbackWrapper::new(move || {
329 let counter = Arc::clone(&counter);
330 async move {
331 counter.fetch_add(1, Ordering::SeqCst);
332 }
333 });
334 TimerTask::new_oneshot(delay, Some(callback))
335 })
336 .collect();
337
338 // Step 3: Batch register
339 let batch = timer
340 .register_batch(handles, tasks)
341 .expect("register_batch should succeed");
342 let (receivers, _batch_handle) = batch.into_parts();
343
344 // Wait for all completion notifications
345 for rx in receivers {
346 match rx {
347 task::CompletionReceiver::OneShot(receiver) => {
348 receiver.recv().await.unwrap();
349 }
350 _ => panic!("Expected OneShot completion receiver"),
351 }
352 }
353
354 // Wait a moment to ensure callback execution is complete
355 tokio::time::sleep(Duration::from_millis(50)).await;
356
357 // Verify all callbacks have been executed
358 assert_eq!(counter.load(Ordering::SeqCst), 5);
359 }
360
361 #[tokio::test]
362 async fn test_completion_reason_expired() {
363 let timer = TimerWheel::with_defaults();
364
365 let alloc_handle = timer.allocate_handle();
366 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
367 let handle = timer.register(alloc_handle, task);
368
369 // Wait for completion notification and verify reason is Expired
370 let (rx, _handle) = handle.into_parts();
371 let result = match rx {
372 task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
373 _ => panic!("Expected OneShot completion receiver"),
374 };
375 assert_eq!(result, TaskCompletion::Called);
376 }
377
378 #[tokio::test]
379 async fn test_completion_reason_cancelled() {
380 let timer = TimerWheel::with_defaults();
381
382 let alloc_handle = timer.allocate_handle();
383 let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
384 let handle = timer.register(alloc_handle, task);
385
386 // Cancel task
387 let cancelled = handle.cancel();
388 assert!(cancelled);
389
390 // Wait for completion notification and verify reason is Cancelled
391 let (rx, _handle) = handle.into_parts();
392 let result = match rx {
393 task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
394 _ => panic!("Expected OneShot completion receiver"),
395 };
396 assert_eq!(result, TaskCompletion::Cancelled);
397 }
398
399 #[tokio::test]
400 async fn test_batch_completion_reasons() {
401 let timer = TimerWheel::with_defaults();
402
403 // Step 1: Allocate handles
404 let handles = timer.allocate_handles(5);
405
406 // Step 2: Create 5 tasks with 10 seconds delay
407 let tasks: Vec<_> = (0..5)
408 .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
409 .collect();
410
411 // Step 3: Batch register
412 let batch = timer
413 .register_batch(handles, tasks)
414 .expect("register_batch should succeed");
415 let task_ids: Vec<_> = batch.task_ids().to_vec();
416 let (mut receivers, _batch_handle) = batch.into_parts();
417
418 // Cancel first 3 tasks
419 timer.cancel_batch(&task_ids[0..3]);
420
421 // Verify first 3 tasks received Cancelled notification
422 for rx in receivers.drain(0..3) {
423 let result = match rx {
424 task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
425 _ => panic!("Expected OneShot completion receiver"),
426 };
427 assert_eq!(result, TaskCompletion::Cancelled);
428 }
429
430 // Cancel remaining tasks and verify
431 timer.cancel_batch(&task_ids[3..5]);
432 for rx in receivers {
433 let result = match rx {
434 task::CompletionReceiver::OneShot(receiver) => receiver.recv().await.unwrap(),
435 _ => panic!("Expected OneShot completion receiver"),
436 };
437 assert_eq!(result, TaskCompletion::Cancelled);
438 }
439 }
440}