kestrel_timer/task.rs
1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use deferred_map::{DefaultKey, Key};
7use lite_sync::oneshot::lite::{Receiver, Sender, State, channel};
8use lite_sync::spsc::{self, TryRecvError};
9
10/// One-shot task completion state constants
11///
12/// 一次性任务完成状态常量
13const ONESHOT_PENDING: u8 = 0;
14const ONESHOT_CALLED: u8 = 1;
15const ONESHOT_CANCELLED: u8 = 2;
16const ONESHOT_CLOSED: u8 = 3;
17
18/// Task Completion Reason for Periodic Tasks
19///
20/// Indicates the reason for task completion, called or cancelled.
21///
22/// 任务完成原因,调用或取消。
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum TaskCompletion {
25 /// Task was called
26 ///
27 /// 任务被调用
28 Called,
29 /// Task was cancelled
30 ///
31 /// 任务被取消
32 Cancelled,
33}
34
35impl State for TaskCompletion {
36 #[inline]
37 fn to_u8(&self) -> u8 {
38 match self {
39 TaskCompletion::Called => ONESHOT_CALLED,
40 TaskCompletion::Cancelled => ONESHOT_CANCELLED,
41 }
42 }
43
44 #[inline]
45 fn from_u8(value: u8) -> Option<Self> {
46 match value {
47 ONESHOT_CALLED => Some(TaskCompletion::Called),
48 ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
49 _ => None,
50 }
51 }
52
53 #[inline]
54 fn pending_value() -> u8 {
55 ONESHOT_PENDING
56 }
57
58 #[inline]
59 fn closed_value() -> u8 {
60 ONESHOT_CLOSED
61 }
62}
63
64/// Unique identifier for timer tasks
65///
66/// Now wraps a DeferredMap key (DefaultKey) which includes generation information
67/// for safe reference and prevention of use-after-free.
68///
69/// 定时器任务唯一标识符
70///
71/// 现在封装 DeferredMap key (DefaultKey),包含代数信息以实现安全引用和防止释放后使用
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73pub struct TaskId(DefaultKey);
74
75impl TaskId {
76 /// Create TaskId from DeferredMap key (internal use)
77 ///
78 /// 从 DeferredMap key 创建 TaskId (内部使用)
79 #[inline]
80 pub(crate) fn from_key(key: DefaultKey) -> Self {
81 TaskId(key)
82 }
83
84 /// Get the DeferredMap key
85 ///
86 /// 获取 DeferredMap key
87 #[inline]
88 pub(crate) fn key(&self) -> DefaultKey {
89 self.0
90 }
91
92 /// Get the numeric value of the task ID
93 ///
94 /// 获取任务 ID 的数值
95 #[inline]
96 pub fn raw(&self) -> u64 {
97 self.0.raw()
98 }
99}
100
101pub struct TaskHandle {
102 handle: deferred_map::Handle,
103}
104
105impl TaskHandle {
106 /// Create a new task handle
107 ///
108 /// 创建一个新的任务句柄
109 #[inline]
110 pub(crate) fn new(handle: deferred_map::Handle) -> Self {
111 Self { handle }
112 }
113
114 /// Get the task ID
115 ///
116 /// 获取任务 ID
117 #[inline]
118 pub fn task_id(&self) -> TaskId {
119 TaskId::from_key(self.handle.key())
120 }
121
122 /// Convert to deferred map handle
123 ///
124 /// 转换为 deferred map 句柄
125 #[inline]
126 pub(crate) fn into_handle(self) -> deferred_map::Handle {
127 self.handle
128 }
129}
130
131/// Timer Callback Trait
132///
133/// Types implementing this trait can be used as timer callbacks.
134///
135/// 可实现此特性的类型可以作为定时器回调函数。
136///
137/// # Examples (示例)
138///
139/// ```
140/// use kestrel_timer::task::TimerCallback;
141/// use std::future::Future;
142/// use std::pin::Pin;
143///
144/// struct MyCallback;
145///
146/// impl TimerCallback for MyCallback {
147/// fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
148/// Box::pin(async {
149/// println!("Timer callback executed!");
150/// })
151/// }
152/// }
153/// ```
154pub trait TimerCallback: Send + Sync + 'static {
155 /// Execute callback, returns a Future
156 ///
157 /// 执行回调函数,返回一个 Future
158 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
159}
160
161/// Implement TimerCallback trait for closures
162///
163/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
164///
165/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
166impl<F, Fut> TimerCallback for F
167where
168 F: Fn() -> Fut + Send + Sync + 'static,
169 Fut: Future<Output = ()> + Send + 'static,
170{
171 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
172 Box::pin(self())
173 }
174}
175
176/// Callback wrapper for standardized callback creation and management
177///
178/// Callback 包装器,用于标准化回调创建和管理
179///
180/// # Examples (示例)
181///
182/// ```
183/// use kestrel_timer::CallbackWrapper;
184///
185/// let callback = CallbackWrapper::new(|| async {
186/// println!("Timer callback executed!");
187/// });
188/// ```
189#[derive(Clone)]
190pub struct CallbackWrapper {
191 callback: Arc<dyn TimerCallback>,
192}
193
194impl CallbackWrapper {
195 /// Create a new callback wrapper
196 ///
197 /// # Parameters
198 /// - `callback`: Callback object implementing TimerCallback trait
199 ///
200 /// # 创建一个新的回调包装器
201 ///
202 /// # 参数
203 /// - `callback`: 实现 TimerCallback 特性的回调对象
204 ///
205 /// # Examples (示例)
206 ///
207 /// ```
208 /// use kestrel_timer::CallbackWrapper;
209 ///
210 /// let callback = CallbackWrapper::new(|| async {
211 /// println!("Timer fired!"); // 定时器触发
212 /// });
213 /// ```
214 #[inline]
215 pub fn new(callback: impl TimerCallback) -> Self {
216 Self {
217 callback: Arc::new(callback),
218 }
219 }
220
221 /// Call the callback function
222 ///
223 /// 调用回调函数
224 #[inline]
225 pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
226 self.callback.call()
227 }
228}
229
230/// Task type enum to distinguish between one-shot and periodic timers
231///
232/// 任务类型枚举,用于区分一次性和周期性定时器
233#[derive(Clone)]
234pub enum TaskType {
235 /// One-shot timer: executes once and completes
236 ///
237 /// 一次性定时器:执行一次后完成
238 OneShot,
239
240 /// Periodic timer: repeats at fixed intervals
241 ///
242 /// 周期性定时器:按固定间隔重复执行
243 Periodic {
244 /// Period interval for periodic tasks
245 ///
246 /// 周期任务的间隔时间
247 interval: std::time::Duration,
248 /// Buffer size for periodic task completion notifier
249 ///
250 /// 周期性任务完成通知器的缓冲区大小
251 buffer_size: NonZeroUsize,
252 },
253}
254
255/// Task type enum to distinguish between one-shot and periodic timers
256///
257/// 任务类型枚举,用于区分一次性和周期性定时器
258pub enum TaskTypeWithCompletionNotifier {
259 /// One-shot timer: executes once and completes
260 ///
261 /// 一次性定时器:执行一次后完成
262 OneShot {
263 completion_notifier: Sender<TaskCompletion>,
264 },
265
266 /// Periodic timer: repeats at fixed intervals
267 ///
268 /// 周期性定时器:按固定间隔重复执行
269 Periodic {
270 /// Period interval for periodic tasks
271 ///
272 /// 周期任务的间隔时间
273 interval: std::time::Duration,
274 /// Completion notifier for periodic tasks
275 ///
276 /// 周期性任务完成通知器
277 completion_notifier: PeriodicCompletionNotifier,
278 },
279}
280
281impl TaskTypeWithCompletionNotifier {
282 /// Get the interval for periodic tasks
283 ///
284 /// Returns `None` for one-shot tasks
285 ///
286 /// 获取周期任务的间隔时间
287 ///
288 /// 对于一次性任务返回 `None`
289 #[inline]
290 pub fn get_interval(&self) -> Option<std::time::Duration> {
291 match self {
292 TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(*interval),
293 TaskTypeWithCompletionNotifier::OneShot { .. } => None,
294 }
295 }
296}
297
298/// Completion notifier for periodic tasks
299///
300/// Uses custom SPSC channel for high-performance, low-latency notification
301///
302/// 周期任务完成通知器
303///
304/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
305pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
306
307/// Completion receiver for periodic tasks
308///
309/// 周期任务完成通知接收器
310pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
311
312impl PeriodicCompletionReceiver {
313 /// Try to receive a completion notification
314 ///
315 /// 尝试接收完成通知
316 #[inline]
317 pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
318 self.0.try_recv()
319 }
320
321 /// Receive a completion notification
322 ///
323 /// 接收完成通知
324 #[inline]
325 pub async fn recv(&mut self) -> Option<TaskCompletion> {
326 self.0.recv().await
327 }
328}
329
330/// Completion notifier for one-shot tasks
331///
332/// 一次性任务完成通知器
333pub enum CompletionNotifier {
334 OneShot(Sender<TaskCompletion>),
335 Periodic(PeriodicCompletionNotifier),
336}
337
338/// Completion receiver for one-shot and periodic tasks
339///
340/// 一次性和周期任务完成通知接收器
341pub enum CompletionReceiver {
342 OneShot(Receiver<TaskCompletion>),
343 Periodic(PeriodicCompletionReceiver),
344}
345
346/// Timer Task
347///
348/// Users interact via a two-step API
349/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
350/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
351///
352/// TaskId is assigned when the task is inserted into the timing wheel.
353///
354/// 定时器任务
355///
356/// 用户通过两步 API 与定时器交互
357/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
358/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
359///
360/// TaskId 在任务插入到时间轮时分配
361pub struct TimerTask {
362 /// Task type (one-shot or periodic)
363 ///
364 /// 任务类型(一次性或周期性)
365 pub(crate) task_type: TaskType,
366
367 /// User-specified delay duration (initial delay for periodic tasks)
368 ///
369 /// 用户指定的延迟时间(周期任务的初始延迟)
370 pub(crate) delay: std::time::Duration,
371
372 /// Async callback function, optional
373 ///
374 /// 异步回调函数,可选
375 pub(crate) callback: Option<CallbackWrapper>,
376}
377
378impl TimerTask {
379 /// Create a new one-shot timer task
380 ///
381 /// # Parameters
382 /// - `delay`: Delay duration before task execution
383 /// - `callback`: Callback function, optional
384 ///
385 /// # Note
386 /// TaskId will be assigned when the task is inserted into the timing wheel.
387 ///
388 /// 创建一个新的一次性定时器任务
389 ///
390 /// # 参数
391 /// - `delay`: 任务执行前的延迟时间
392 /// - `callback`: 回调函数,可选
393 ///
394 /// # 注意
395 /// TaskId 将在任务插入到时间轮时分配
396 #[inline]
397 pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
398 Self {
399 task_type: TaskType::OneShot,
400 delay,
401 callback,
402 }
403 }
404
405 /// Create a new periodic timer task
406 ///
407 /// # Parameters
408 /// - `initial_delay`: Initial delay before first execution
409 /// - `interval`: Interval between subsequent executions
410 /// - `callback`: Callback function, optional
411 ///
412 /// # Note
413 /// TaskId will be assigned when the task is inserted into the timing wheel.
414 ///
415 /// 创建一个新的周期性定时器任务
416 ///
417 /// # 参数
418 /// - `initial_delay`: 首次执行前的初始延迟
419 /// - `interval`: 后续执行之间的间隔
420 /// - `callback`: 回调函数,可选
421 ///
422 /// # 注意
423 /// TaskId 将在任务插入到时间轮时分配
424 #[inline]
425 pub fn new_periodic(
426 initial_delay: std::time::Duration,
427 interval: std::time::Duration,
428 callback: Option<CallbackWrapper>,
429 buffer_size: Option<NonZeroUsize>,
430 ) -> Self {
431 Self {
432 task_type: TaskType::Periodic {
433 interval,
434 buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()),
435 },
436 delay: initial_delay,
437 callback,
438 }
439 }
440
441 /// Get task type
442 ///
443 /// 获取任务类型
444 #[inline]
445 pub fn get_task_type(&self) -> &TaskType {
446 &self.task_type
447 }
448
449 /// Get the interval for periodic tasks
450 ///
451 /// Returns `None` for one-shot tasks
452 ///
453 /// 获取周期任务的间隔时间
454 ///
455 /// 对于一次性任务返回 `None`
456 #[inline]
457 pub fn get_interval(&self) -> Option<std::time::Duration> {
458 match self.task_type {
459 TaskType::Periodic { interval, .. } => Some(interval),
460 TaskType::OneShot => None,
461 }
462 }
463}
464
465/// Timer Task
466///
467/// Users interact via a two-step API
468/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
469/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
470///
471/// 定时器任务
472///
473/// 用户通过两步 API 与定时器交互
474/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
475/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
476pub struct TimerTaskWithCompletionNotifier {
477 /// Task type (one-shot or periodic)
478 ///
479 /// 任务类型(一次性或周期性)
480 pub(crate) task_type: TaskTypeWithCompletionNotifier,
481
482 /// User-specified delay duration (initial delay for periodic tasks)
483 ///
484 /// 用户指定的延迟时间(周期任务的初始延迟)
485 pub(crate) delay: std::time::Duration,
486
487 /// Async callback function, optional
488 ///
489 /// 异步回调函数,可选
490 pub(crate) callback: Option<CallbackWrapper>,
491}
492
493impl TimerTaskWithCompletionNotifier {
494 /// Create a new timer task with completion notifier from a timer task
495 ///
496 /// TaskId will be assigned later when inserted into the timing wheel
497 ///
498 /// 从定时器任务创建一个新的定时器任务完成通知器
499 ///
500 /// TaskId 将在插入到时间轮时分配
501 ///
502 /// # Parameters
503 /// - `task`: The timer task to create from
504 ///
505 /// # Returns
506 /// A tuple containing the new timer task with completion notifier and the completion receiver
507 ///
508 /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
509 ///
510 pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
511 match task.task_type {
512 TaskType::OneShot => {
513 // Create oneshot notifier and receiver with optimized single Arc allocation
514 // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
515 let (notifier, receiver) = channel();
516
517 (
518 Self {
519 task_type: TaskTypeWithCompletionNotifier::OneShot {
520 completion_notifier: notifier,
521 },
522 delay: task.delay,
523 callback: task.callback,
524 },
525 CompletionReceiver::OneShot(receiver),
526 )
527 }
528 TaskType::Periodic {
529 interval,
530 buffer_size,
531 } => {
532 // Use custom SPSC channel for high-performance periodic notification
533 // 使用自定义 SPSC 通道实现高性能周期通知
534 let (tx, rx) = spsc::channel(buffer_size);
535
536 let notifier = PeriodicCompletionNotifier(tx);
537 let receiver = PeriodicCompletionReceiver(rx);
538
539 (
540 Self {
541 task_type: TaskTypeWithCompletionNotifier::Periodic {
542 interval,
543 completion_notifier: notifier,
544 },
545 delay: task.delay,
546 callback: task.callback,
547 },
548 CompletionReceiver::Periodic(receiver),
549 )
550 }
551 }
552 }
553
554 /// Into task type
555 ///
556 /// 将任务类型转换为完成通知器
557 #[inline]
558 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
559 self.task_type
560 }
561
562 /// Get the interval for periodic tasks
563 ///
564 /// Returns `None` for one-shot tasks
565 ///
566 /// 获取周期任务的间隔时间
567 ///
568 /// 对于一次性任务返回 `None`
569 #[inline]
570 pub fn get_interval(&self) -> Option<std::time::Duration> {
571 self.task_type.get_interval()
572 }
573}
574
575pub(crate) struct TimerTaskForWheel {
576 pub(crate) task_id: TaskId,
577 pub(crate) task: TimerTaskWithCompletionNotifier,
578 pub(crate) deadline_tick: u64,
579 pub(crate) rounds: u32,
580}
581
582impl TimerTaskForWheel {
583 /// Create a new timer task for wheel with assigned TaskId
584 ///
585 /// 使用分配的 TaskId 创建一个新的定时器任务用于时间轮
586 ///
587 /// # Parameters
588 /// - `task_id`: The TaskId assigned by DeferredMap
589 /// - `task`: The timer task to create from
590 /// - `deadline_tick`: The deadline tick for the task
591 /// - `rounds`: The rounds for the task
592 ///
593 /// # Returns
594 /// A new timer task for wheel
595 ///
596 /// 返回一个新的定时器任务用于时间轮
597 ///
598 #[inline]
599 pub(crate) fn new_with_id(
600 task_id: TaskId,
601 task: TimerTaskWithCompletionNotifier,
602 deadline_tick: u64,
603 rounds: u32,
604 ) -> Self {
605 Self {
606 task_id,
607 task,
608 deadline_tick,
609 rounds,
610 }
611 }
612
613 /// Get task ID
614 ///
615 /// 获取任务 ID
616 #[inline]
617 pub fn get_id(&self) -> TaskId {
618 self.task_id
619 }
620
621 /// Into task type
622 ///
623 /// 将任务类型转换为完成通知器
624 #[inline]
625 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
626 self.task.into_task_type()
627 }
628
629 /// Update the delay of the timer task
630 ///
631 /// 更新定时器任务的延迟
632 ///
633 /// # Parameters
634 /// - `delay`: The new delay for the task
635 ///
636 #[inline]
637 pub fn update_delay(&mut self, delay: std::time::Duration) {
638 self.task.delay = delay
639 }
640
641 /// Update the callback of the timer task
642 ///
643 /// 更新定时器任务的回调
644 ///
645 /// # Parameters
646 /// - `callback`: The new callback for the task
647 ///
648 #[inline]
649 pub fn update_callback(&mut self, callback: CallbackWrapper) {
650 self.task.callback = Some(callback)
651 }
652}
653
654/// Task location information (including level) for hierarchical timing wheel
655///
656/// Memory layout optimization: level field placed first to reduce padding via struct alignment
657///
658/// 任务位置信息(包括层级)用于分层时间轮
659///
660/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
661#[derive(Debug, Clone, Copy)]
662pub(crate) struct TaskLocation {
663 /// Slot index
664 ///
665 /// 槽索引
666 pub slot_index: usize,
667 /// Index position of task in slot Vec for O(1) cancellation
668 ///
669 /// 槽向量中任务的索引位置,用于 O(1) 取消
670 pub vec_index: usize,
671 /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
672 /// Using u8 instead of bool to reserve space for potential multi-layer expansion
673 ///
674 /// 层级:0 = L0(底层),1 = L1(上层)
675 /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
676 pub level: u8,
677}
678
679impl TaskLocation {
680 /// Create a new task location information
681 ///
682 /// 创建一个新的任务位置信息
683 #[inline(always)]
684 pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
685 Self {
686 slot_index,
687 vec_index,
688 level,
689 }
690 }
691}