kestrel_timer/task.rs
1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use lite_sync::oneshot::{channel, Sender, Receiver};
7use lite_sync::spsc::{self, TryRecvError};
8
9/// One-shot task completion state constants
10///
11/// 一次性任务完成状态常量
12const ONESHOT_PENDING: u8 = 0;
13const ONESHOT_CALLED: u8 = 1;
14const ONESHOT_CANCELLED: u8 = 2;
15
16/// Task Completion Reason for Periodic Tasks
17///
18/// Indicates the reason for task completion, called or cancelled.
19///
20/// 任务完成原因,调用或取消。
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum TaskCompletion {
23 /// Task was called
24 ///
25 /// 任务被调用
26 Called,
27 /// Task was cancelled
28 ///
29 /// 任务被取消
30 Cancelled,
31}
32
33impl lite_sync::oneshot::State for TaskCompletion {
34 #[inline]
35 fn to_u8(&self) -> u8 {
36 match self {
37 TaskCompletion::Called => ONESHOT_CALLED,
38 TaskCompletion::Cancelled => ONESHOT_CANCELLED,
39 }
40 }
41
42 #[inline]
43 fn from_u8(value: u8) -> Option<Self> {
44 match value {
45 ONESHOT_CALLED => Some(TaskCompletion::Called),
46 ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
47 _ => None,
48 }
49 }
50
51 #[inline]
52 fn pending_value() -> u8 {
53 ONESHOT_PENDING
54 }
55}
56
57
58
59/// Unique identifier for timer tasks
60///
61/// Now wraps a DeferredMap key (u64) which includes generation information
62/// for safe reference and prevention of use-after-free.
63///
64/// 定时器任务唯一标识符
65///
66/// 现在封装 DeferredMap key (u64),包含代数信息以实现安全引用和防止释放后使用
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub struct TaskId(u64);
69
70impl TaskId {
71 /// Create TaskId from DeferredMap key (internal use)
72 ///
73 /// 从 DeferredMap key 创建 TaskId (内部使用)
74 #[inline]
75 pub(crate) fn from_key(key: u64) -> Self {
76 TaskId(key)
77 }
78
79 /// Get the DeferredMap key
80 ///
81 /// 获取 DeferredMap key
82 #[inline]
83 pub(crate) fn key(&self) -> u64 {
84 self.0
85 }
86
87 /// Get the numeric value of the task ID
88 ///
89 /// 获取任务 ID 的数值
90 #[inline]
91 pub fn as_u64(&self) -> u64 {
92 self.0
93 }
94}
95
96pub struct TaskHandle {
97 handle: deferred_map::Handle,
98}
99
100impl TaskHandle {
101 /// Create a new task handle
102 ///
103 /// 创建一个新的任务句柄
104 #[inline]
105 pub(crate) fn new(handle: deferred_map::Handle) -> Self {
106 Self { handle }
107 }
108
109 /// Get the task ID
110 ///
111 /// 获取任务 ID
112 #[inline]
113 pub fn task_id(&self) -> TaskId {
114 TaskId::from_key(self.handle.key())
115 }
116
117 /// Convert to deferred map handle
118 ///
119 /// 转换为 deferred map 句柄
120 #[inline]
121 pub(crate) fn into_handle(self) -> deferred_map::Handle {
122 self.handle
123 }
124}
125
126/// Timer Callback Trait
127///
128/// Types implementing this trait can be used as timer callbacks.
129///
130/// 可实现此特性的类型可以作为定时器回调函数。
131///
132/// # Examples (示例)
133///
134/// ```
135/// use kestrel_timer::task::TimerCallback;
136/// use std::future::Future;
137/// use std::pin::Pin;
138///
139/// struct MyCallback;
140///
141/// impl TimerCallback for MyCallback {
142/// fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
143/// Box::pin(async {
144/// println!("Timer callback executed!");
145/// })
146/// }
147/// }
148/// ```
149pub trait TimerCallback: Send + Sync + 'static {
150 /// Execute callback, returns a Future
151 ///
152 /// 执行回调函数,返回一个 Future
153 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
154}
155
156/// Implement TimerCallback trait for closures
157///
158/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
159///
160/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
161impl<F, Fut> TimerCallback for F
162where
163 F: Fn() -> Fut + Send + Sync + 'static,
164 Fut: Future<Output = ()> + Send + 'static,
165{
166 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
167 Box::pin(self())
168 }
169}
170
171/// Callback wrapper for standardized callback creation and management
172///
173/// Callback 包装器,用于标准化回调创建和管理
174///
175/// # Examples (示例)
176///
177/// ```
178/// use kestrel_timer::CallbackWrapper;
179///
180/// let callback = CallbackWrapper::new(|| async {
181/// println!("Timer callback executed!");
182/// });
183/// ```
184#[derive(Clone)]
185pub struct CallbackWrapper {
186 callback: Arc<dyn TimerCallback>,
187}
188
189impl CallbackWrapper {
190 /// Create a new callback wrapper
191 ///
192 /// # Parameters
193 /// - `callback`: Callback object implementing TimerCallback trait
194 ///
195 /// # 创建一个新的回调包装器
196 ///
197 /// # 参数
198 /// - `callback`: 实现 TimerCallback 特性的回调对象
199 ///
200 /// # Examples (示例)
201 ///
202 /// ```
203 /// use kestrel_timer::CallbackWrapper;
204 ///
205 /// let callback = CallbackWrapper::new(|| async {
206 /// println!("Timer fired!"); // 定时器触发
207 /// });
208 /// ```
209 #[inline]
210 pub fn new(callback: impl TimerCallback) -> Self {
211 Self {
212 callback: Arc::new(callback),
213 }
214 }
215
216 /// Call the callback function
217 ///
218 /// 调用回调函数
219 #[inline]
220 pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
221 self.callback.call()
222 }
223}
224
225/// Task type enum to distinguish between one-shot and periodic timers
226///
227/// 任务类型枚举,用于区分一次性和周期性定时器
228#[derive(Clone)]
229pub enum TaskType {
230 /// One-shot timer: executes once and completes
231 ///
232 /// 一次性定时器:执行一次后完成
233 OneShot,
234
235 /// Periodic timer: repeats at fixed intervals
236 ///
237 /// 周期性定时器:按固定间隔重复执行
238 Periodic {
239 /// Period interval for periodic tasks
240 ///
241 /// 周期任务的间隔时间
242 interval: std::time::Duration,
243 /// Buffer size for periodic task completion notifier
244 ///
245 /// 周期性任务完成通知器的缓冲区大小
246 buffer_size: NonZeroUsize,
247 },
248}
249
250/// Task type enum to distinguish between one-shot and periodic timers
251///
252/// 任务类型枚举,用于区分一次性和周期性定时器
253pub enum TaskTypeWithCompletionNotifier {
254 /// One-shot timer: executes once and completes
255 ///
256 /// 一次性定时器:执行一次后完成
257 OneShot {
258 completion_notifier: Sender<TaskCompletion>,
259 },
260
261 /// Periodic timer: repeats at fixed intervals
262 ///
263 /// 周期性定时器:按固定间隔重复执行
264 Periodic {
265 /// Period interval for periodic tasks
266 ///
267 /// 周期任务的间隔时间
268 interval: std::time::Duration,
269 /// Completion notifier for periodic tasks
270 ///
271 /// 周期性任务完成通知器
272 completion_notifier: PeriodicCompletionNotifier,
273 },
274}
275
276impl TaskTypeWithCompletionNotifier {
277 /// Get the interval for periodic tasks
278 ///
279 /// Returns `None` for one-shot tasks
280 ///
281 /// 获取周期任务的间隔时间
282 ///
283 /// 对于一次性任务返回 `None`
284 #[inline]
285 pub fn get_interval(&self) -> Option<std::time::Duration> {
286 match self {
287 TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(*interval),
288 TaskTypeWithCompletionNotifier::OneShot { .. } => None,
289 }
290 }
291}
292
293
294/// Completion notifier for periodic tasks
295///
296/// Uses custom SPSC channel for high-performance, low-latency notification
297///
298/// 周期任务完成通知器
299///
300/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
301pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
302
303/// Completion receiver for periodic tasks
304///
305/// 周期任务完成通知接收器
306pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
307
308impl PeriodicCompletionReceiver {
309 /// Try to receive a completion notification
310 ///
311 /// 尝试接收完成通知
312 #[inline]
313 pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
314 self.0.try_recv()
315 }
316
317 /// Receive a completion notification
318 ///
319 /// 接收完成通知
320 #[inline]
321 pub async fn recv(&mut self) -> Option<TaskCompletion> {
322 self.0.recv().await
323 }
324}
325
326
327/// Completion notifier for one-shot tasks
328///
329/// 一次性任务完成通知器
330pub enum CompletionNotifier {
331 OneShot(Sender<TaskCompletion>),
332 Periodic(PeriodicCompletionNotifier),
333}
334
335/// Completion receiver for one-shot and periodic tasks
336///
337/// 一次性和周期任务完成通知接收器
338pub enum CompletionReceiver {
339 OneShot(Receiver<TaskCompletion>),
340 Periodic(PeriodicCompletionReceiver),
341}
342
343/// Timer Task
344///
345/// Users interact via a two-step API
346/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
347/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
348///
349/// TaskId is assigned when the task is inserted into the timing wheel.
350///
351/// 定时器任务
352///
353/// 用户通过两步 API 与定时器交互
354/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
355/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
356///
357/// TaskId 在任务插入到时间轮时分配
358pub struct TimerTask {
359 /// Task type (one-shot or periodic)
360 ///
361 /// 任务类型(一次性或周期性)
362 pub(crate) task_type: TaskType,
363
364 /// User-specified delay duration (initial delay for periodic tasks)
365 ///
366 /// 用户指定的延迟时间(周期任务的初始延迟)
367 pub(crate) delay: std::time::Duration,
368
369 /// Async callback function, optional
370 ///
371 /// 异步回调函数,可选
372 pub(crate) callback: Option<CallbackWrapper>,
373}
374
375impl TimerTask {
376 /// Create a new one-shot timer task
377 ///
378 /// # Parameters
379 /// - `delay`: Delay duration before task execution
380 /// - `callback`: Callback function, optional
381 ///
382 /// # Note
383 /// TaskId will be assigned when the task is inserted into the timing wheel.
384 ///
385 /// 创建一个新的一次性定时器任务
386 ///
387 /// # 参数
388 /// - `delay`: 任务执行前的延迟时间
389 /// - `callback`: 回调函数,可选
390 ///
391 /// # 注意
392 /// TaskId 将在任务插入到时间轮时分配
393 #[inline]
394 pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
395 Self {
396 task_type: TaskType::OneShot,
397 delay,
398 callback,
399 }
400 }
401
402 /// Create a new periodic timer task
403 ///
404 /// # Parameters
405 /// - `initial_delay`: Initial delay before first execution
406 /// - `interval`: Interval between subsequent executions
407 /// - `callback`: Callback function, optional
408 ///
409 /// # Note
410 /// TaskId will be assigned when the task is inserted into the timing wheel.
411 ///
412 /// 创建一个新的周期性定时器任务
413 ///
414 /// # 参数
415 /// - `initial_delay`: 首次执行前的初始延迟
416 /// - `interval`: 后续执行之间的间隔
417 /// - `callback`: 回调函数,可选
418 ///
419 /// # 注意
420 /// TaskId 将在任务插入到时间轮时分配
421 #[inline]
422 pub fn new_periodic(
423 initial_delay: std::time::Duration,
424 interval: std::time::Duration,
425 callback: Option<CallbackWrapper>,
426 buffer_size: Option<NonZeroUsize>,
427 ) -> Self {
428 Self {
429 task_type: TaskType::Periodic { interval, buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()) },
430 delay: initial_delay,
431 callback,
432 }
433 }
434
435 /// Get task type
436 ///
437 /// 获取任务类型
438 #[inline]
439 pub fn get_task_type(&self) -> &TaskType {
440 &self.task_type
441 }
442
443 /// Get the interval for periodic tasks
444 ///
445 /// Returns `None` for one-shot tasks
446 ///
447 /// 获取周期任务的间隔时间
448 ///
449 /// 对于一次性任务返回 `None`
450 #[inline]
451 pub fn get_interval(&self) -> Option<std::time::Duration> {
452 match self.task_type {
453 TaskType::Periodic { interval, .. } => Some(interval),
454 TaskType::OneShot => None,
455 }
456 }
457
458}
459
460/// Timer Task
461///
462/// Users interact via a two-step API
463/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
464/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
465///
466/// 定时器任务
467///
468/// 用户通过两步 API 与定时器交互
469/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
470/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
471pub struct TimerTaskWithCompletionNotifier {
472 /// Task type (one-shot or periodic)
473 ///
474 /// 任务类型(一次性或周期性)
475 pub(crate) task_type: TaskTypeWithCompletionNotifier,
476
477 /// User-specified delay duration (initial delay for periodic tasks)
478 ///
479 /// 用户指定的延迟时间(周期任务的初始延迟)
480 pub(crate) delay: std::time::Duration,
481
482 /// Async callback function, optional
483 ///
484 /// 异步回调函数,可选
485 pub(crate) callback: Option<CallbackWrapper>,
486}
487
488impl TimerTaskWithCompletionNotifier {
489
490 /// Create a new timer task with completion notifier from a timer task
491 ///
492 /// TaskId will be assigned later when inserted into the timing wheel
493 ///
494 /// 从定时器任务创建一个新的定时器任务完成通知器
495 ///
496 /// TaskId 将在插入到时间轮时分配
497 ///
498 /// # Parameters
499 /// - `task`: The timer task to create from
500 ///
501 /// # Returns
502 /// A tuple containing the new timer task with completion notifier and the completion receiver
503 ///
504 /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
505 ///
506 pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
507 match task.task_type {
508 TaskType::OneShot => {
509 // Create oneshot notifier and receiver with optimized single Arc allocation
510 // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
511 let (notifier, receiver) = channel();
512
513 (Self {
514 task_type: TaskTypeWithCompletionNotifier::OneShot {
515 completion_notifier: notifier
516 },
517 delay: task.delay,
518 callback: task.callback,
519 }, CompletionReceiver::OneShot(receiver))
520 },
521 TaskType::Periodic { interval, buffer_size } => {
522 // Use custom SPSC channel for high-performance periodic notification
523 // 使用自定义 SPSC 通道实现高性能周期通知
524 let (tx, rx) = spsc::channel(buffer_size);
525
526 let notifier = PeriodicCompletionNotifier(tx);
527 let receiver = PeriodicCompletionReceiver(rx);
528
529 (Self {
530 task_type: TaskTypeWithCompletionNotifier::Periodic {
531 interval,
532 completion_notifier: notifier
533 },
534 delay: task.delay,
535 callback: task.callback,
536 }, CompletionReceiver::Periodic(receiver))
537 },
538 }
539 }
540
541 /// Into task type
542 ///
543 /// 将任务类型转换为完成通知器
544 #[inline]
545 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
546 self.task_type
547 }
548
549 /// Get the interval for periodic tasks
550 ///
551 /// Returns `None` for one-shot tasks
552 ///
553 /// 获取周期任务的间隔时间
554 ///
555 /// 对于一次性任务返回 `None`
556 #[inline]
557 pub fn get_interval(&self) -> Option<std::time::Duration> {
558 self.task_type.get_interval()
559 }
560}
561
562pub(crate) struct TimerTaskForWheel {
563 pub(crate) task_id: TaskId,
564 pub(crate) task: TimerTaskWithCompletionNotifier,
565 pub(crate) deadline_tick: u64,
566 pub(crate) rounds: u32,
567}
568
569impl TimerTaskForWheel {
570 /// Create a new timer task for wheel with assigned TaskId
571 ///
572 /// 使用分配的 TaskId 创建一个新的定时器任务用于时间轮
573 ///
574 /// # Parameters
575 /// - `task_id`: The TaskId assigned by DeferredMap
576 /// - `task`: The timer task to create from
577 /// - `deadline_tick`: The deadline tick for the task
578 /// - `rounds`: The rounds for the task
579 ///
580 /// # Returns
581 /// A new timer task for wheel
582 ///
583 /// 返回一个新的定时器任务用于时间轮
584 ///
585 #[inline]
586 pub(crate) fn new_with_id(
587 task_id: TaskId,
588 task: TimerTaskWithCompletionNotifier,
589 deadline_tick: u64,
590 rounds: u32
591 ) -> Self {
592 Self { task_id, task, deadline_tick, rounds }
593 }
594
595 /// Get task ID
596 ///
597 /// 获取任务 ID
598 #[inline]
599 pub fn get_id(&self) -> TaskId {
600 self.task_id
601 }
602
603 /// Into task type
604 ///
605 /// 将任务类型转换为完成通知器
606 #[inline]
607 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
608 self.task.into_task_type()
609 }
610
611 /// Update the delay of the timer task
612 ///
613 /// 更新定时器任务的延迟
614 ///
615 /// # Parameters
616 /// - `delay`: The new delay for the task
617 ///
618 #[inline]
619 pub fn update_delay(&mut self, delay: std::time::Duration) {
620 self.task.delay = delay
621 }
622
623 /// Update the callback of the timer task
624 ///
625 /// 更新定时器任务的回调
626 ///
627 /// # Parameters
628 /// - `callback`: The new callback for the task
629 ///
630 #[inline]
631 pub fn update_callback(&mut self, callback: CallbackWrapper) {
632 self.task.callback = Some(callback)
633 }
634}
635
636/// Task location information (including level) for hierarchical timing wheel
637///
638/// Memory layout optimization: level field placed first to reduce padding via struct alignment
639///
640/// 任务位置信息(包括层级)用于分层时间轮
641///
642/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
643#[derive(Debug, Clone, Copy)]
644pub(crate) struct TaskLocation {
645 /// Slot index
646 ///
647 /// 槽索引
648 pub slot_index: usize,
649 /// Index position of task in slot Vec for O(1) cancellation
650 ///
651 /// 槽向量中任务的索引位置,用于 O(1) 取消
652 pub vec_index: usize,
653 /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
654 /// Using u8 instead of bool to reserve space for potential multi-layer expansion
655 ///
656 /// 层级:0 = L0(底层),1 = L1(上层)
657 /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
658 pub level: u8,
659}
660
661impl TaskLocation {
662 /// Create a new task location information
663 ///
664 /// 创建一个新的任务位置信息
665 #[inline(always)]
666 pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
667 Self {
668 slot_index,
669 vec_index,
670 level,
671 }
672 }
673}
674