kestrel_timer/
task.rs

1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use deferred_map::{DefaultKey, Key};
7use lite_sync::oneshot::lite::{Receiver, Sender, State, channel};
8use lite_sync::spsc::{self, TryRecvError};
9
10/// One-shot task completion state constants
11///
12/// 一次性任务完成状态常量
13const ONESHOT_PENDING: u8 = 0;
14const ONESHOT_CALLED: u8 = 1;
15const ONESHOT_CANCELLED: u8 = 2;
16const ONESHOT_CLOSED: u8 = 3;
17
18/// Task Completion Reason for Periodic Tasks
19///
20/// Indicates the reason for task completion, called or cancelled.
21///
22/// 任务完成原因,调用或取消。
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum TaskCompletion {
25    /// Task was called
26    ///
27    /// 任务被调用
28    Called,
29    /// Task was cancelled
30    ///
31    /// 任务被取消
32    Cancelled,
33}
34
35impl State for TaskCompletion {
36    #[inline]
37    fn to_u8(&self) -> u8 {
38        match self {
39            TaskCompletion::Called => ONESHOT_CALLED,
40            TaskCompletion::Cancelled => ONESHOT_CANCELLED,
41        }
42    }
43
44    #[inline]
45    fn from_u8(value: u8) -> Option<Self> {
46        match value {
47            ONESHOT_CALLED => Some(TaskCompletion::Called),
48            ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
49            _ => None,
50        }
51    }
52
53    #[inline]
54    fn pending_value() -> u8 {
55        ONESHOT_PENDING
56    }
57
58    #[inline]
59    fn closed_value() -> u8 {
60        ONESHOT_CLOSED
61    }
62}
63
64/// Unique identifier for timer tasks
65///
66/// Now wraps a DeferredMap key (DefaultKey) which includes generation information
67/// for safe reference and prevention of use-after-free.
68///
69/// 定时器任务唯一标识符
70///
71/// 现在封装 DeferredMap key (DefaultKey),包含代数信息以实现安全引用和防止释放后使用
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73pub struct TaskId(DefaultKey);
74
75impl TaskId {
76    /// Create TaskId from DeferredMap key (internal use)
77    ///
78    /// 从 DeferredMap key 创建 TaskId (内部使用)
79    #[inline]
80    pub(crate) fn from_key(key: DefaultKey) -> Self {
81        TaskId(key)
82    }
83
84    /// Get the DeferredMap key
85    ///
86    /// 获取 DeferredMap key
87    #[inline]
88    pub(crate) fn key(&self) -> DefaultKey {
89        self.0
90    }
91
92    /// Get the numeric value of the task ID
93    ///
94    /// 获取任务 ID 的数值
95    #[inline]
96    pub fn raw(&self) -> u64 {
97        self.0.raw()
98    }
99}
100
101pub struct TaskHandle {
102    handle: deferred_map::Handle,
103}
104
105impl TaskHandle {
106    /// Create a new task handle
107    ///
108    /// 创建一个新的任务句柄
109    #[inline]
110    pub(crate) fn new(handle: deferred_map::Handle) -> Self {
111        Self { handle }
112    }
113
114    /// Get the task ID
115    ///
116    /// 获取任务 ID
117    #[inline]
118    pub fn task_id(&self) -> TaskId {
119        TaskId::from_key(self.handle.key())
120    }
121
122    /// Convert to deferred map handle
123    ///
124    /// 转换为 deferred map 句柄
125    #[inline]
126    pub(crate) fn into_handle(self) -> deferred_map::Handle {
127        self.handle
128    }
129}
130
131/// Timer Callback Trait
132///
133/// Types implementing this trait can be used as timer callbacks.
134///
135/// 可实现此特性的类型可以作为定时器回调函数。
136///
137/// # Examples (示例)
138///
139/// ```
140/// use kestrel_timer::task::TimerCallback;
141/// use std::future::Future;
142/// use std::pin::Pin;
143///
144/// struct MyCallback;
145///
146/// impl TimerCallback for MyCallback {
147///     fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
148///         Box::pin(async {
149///             println!("Timer callback executed!");
150///         })
151///     }
152/// }
153/// ```
154pub trait TimerCallback: Send + Sync + 'static {
155    /// Execute callback, returns a Future
156    ///
157    /// 执行回调函数,返回一个 Future
158    fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
159}
160
161/// Implement TimerCallback trait for closures
162///
163/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
164///
165/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
166impl<F, Fut> TimerCallback for F
167where
168    F: Fn() -> Fut + Send + Sync + 'static,
169    Fut: Future<Output = ()> + Send + 'static,
170{
171    fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
172        Box::pin(self())
173    }
174}
175
176/// Callback wrapper for standardized callback creation and management
177///
178/// Callback 包装器,用于标准化回调创建和管理
179///
180/// # Examples (示例)
181///
182/// ```
183/// use kestrel_timer::CallbackWrapper;
184///
185/// let callback = CallbackWrapper::new(|| async {
186///     println!("Timer callback executed!");
187/// });
188/// ```
189#[derive(Clone)]
190pub struct CallbackWrapper {
191    callback: Arc<dyn TimerCallback>,
192}
193
194impl CallbackWrapper {
195    /// Create a new callback wrapper
196    ///
197    /// # Parameters
198    /// - `callback`: Callback object implementing TimerCallback trait
199    ///
200    /// # 创建一个新的回调包装器
201    ///
202    /// # 参数
203    /// - `callback`: 实现 TimerCallback 特性的回调对象
204    ///
205    /// # Examples (示例)
206    ///
207    /// ```
208    /// use kestrel_timer::CallbackWrapper;
209    ///
210    /// let callback = CallbackWrapper::new(|| async {
211    ///     println!("Timer fired!"); // 定时器触发
212    /// });
213    /// ```
214    #[inline]
215    pub fn new(callback: impl TimerCallback) -> Self {
216        Self {
217            callback: Arc::new(callback),
218        }
219    }
220
221    /// Call the callback function
222    ///
223    /// 调用回调函数
224    #[inline]
225    pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
226        self.callback.call()
227    }
228}
229
230/// Task type enum to distinguish between one-shot and periodic timers
231///
232/// 任务类型枚举,用于区分一次性和周期性定时器
233#[derive(Clone)]
234pub enum TaskType {
235    /// One-shot timer: executes once and completes
236    ///
237    /// 一次性定时器:执行一次后完成
238    OneShot,
239
240    /// Periodic timer: repeats at fixed intervals
241    ///
242    /// 周期性定时器:按固定间隔重复执行
243    Periodic {
244        /// Period interval for periodic tasks
245        ///
246        /// 周期任务的间隔时间
247        interval: std::time::Duration,
248        /// Buffer size for periodic task completion notifier
249        ///
250        /// 周期性任务完成通知器的缓冲区大小
251        buffer_size: NonZeroUsize,
252    },
253}
254
255/// Task type enum to distinguish between one-shot and periodic timers
256///
257/// 任务类型枚举,用于区分一次性和周期性定时器
258pub enum TaskTypeWithCompletionNotifier {
259    /// One-shot timer: executes once and completes
260    ///
261    /// 一次性定时器:执行一次后完成
262    OneShot {
263        completion_notifier: Sender<TaskCompletion>,
264    },
265
266    /// Periodic timer: repeats at fixed intervals
267    ///
268    /// 周期性定时器:按固定间隔重复执行
269    Periodic {
270        /// Period interval for periodic tasks
271        ///
272        /// 周期任务的间隔时间
273        interval: std::time::Duration,
274        /// Completion notifier for periodic tasks
275        ///
276        /// 周期性任务完成通知器
277        completion_notifier: PeriodicCompletionNotifier,
278    },
279}
280
281impl TaskTypeWithCompletionNotifier {
282    /// Get the interval for periodic tasks
283    ///
284    /// Returns `None` for one-shot tasks
285    ///
286    /// 获取周期任务的间隔时间
287    ///
288    /// 对于一次性任务返回 `None`
289    #[inline]
290    pub fn get_interval(&self) -> Option<std::time::Duration> {
291        match self {
292            TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(*interval),
293            TaskTypeWithCompletionNotifier::OneShot { .. } => None,
294        }
295    }
296}
297
298/// Completion notifier for periodic tasks
299///
300/// Uses custom SPSC channel for high-performance, low-latency notification
301///
302/// 周期任务完成通知器
303///
304/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
305pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
306
307/// Completion receiver for periodic tasks
308///
309/// 周期任务完成通知接收器
310pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
311
312impl PeriodicCompletionReceiver {
313    /// Try to receive a completion notification
314    ///
315    /// 尝试接收完成通知
316    #[inline]
317    pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
318        self.0.try_recv()
319    }
320
321    /// Receive a completion notification
322    ///
323    /// 接收完成通知
324    #[inline]
325    pub async fn recv(&mut self) -> Option<TaskCompletion> {
326        self.0.recv().await
327    }
328}
329
330/// Completion notifier for one-shot tasks
331///
332/// 一次性任务完成通知器
333pub enum CompletionNotifier {
334    OneShot(Sender<TaskCompletion>),
335    Periodic(PeriodicCompletionNotifier),
336}
337
338/// Completion receiver for one-shot and periodic tasks
339///
340/// 一次性和周期任务完成通知接收器
341pub enum CompletionReceiver {
342    OneShot(Receiver<TaskCompletion>),
343    Periodic(PeriodicCompletionReceiver),
344}
345
346/// Timer Task
347///
348/// Users interact via a two-step API
349/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
350/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
351///
352/// TaskId is assigned when the task is inserted into the timing wheel.
353///
354/// 定时器任务
355///
356/// 用户通过两步 API 与定时器交互
357/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
358/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
359///
360/// TaskId 在任务插入到时间轮时分配
361pub struct TimerTask {
362    /// Task type (one-shot or periodic)
363    ///
364    /// 任务类型(一次性或周期性)
365    pub(crate) task_type: TaskType,
366
367    /// User-specified delay duration (initial delay for periodic tasks)
368    ///
369    /// 用户指定的延迟时间(周期任务的初始延迟)
370    pub(crate) delay: std::time::Duration,
371
372    /// Async callback function, optional
373    ///
374    /// 异步回调函数,可选
375    pub(crate) callback: Option<CallbackWrapper>,
376}
377
378impl TimerTask {
379    /// Create a new one-shot timer task
380    ///
381    /// # Parameters
382    /// - `delay`: Delay duration before task execution
383    /// - `callback`: Callback function, optional
384    ///
385    /// # Note
386    /// TaskId will be assigned when the task is inserted into the timing wheel.
387    ///
388    /// 创建一个新的一次性定时器任务
389    ///
390    /// # 参数
391    /// - `delay`: 任务执行前的延迟时间
392    /// - `callback`: 回调函数,可选
393    ///
394    /// # 注意
395    /// TaskId 将在任务插入到时间轮时分配
396    #[inline]
397    pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
398        Self {
399            task_type: TaskType::OneShot,
400            delay,
401            callback,
402        }
403    }
404
405    /// Create a new periodic timer task
406    ///
407    /// # Parameters
408    /// - `initial_delay`: Initial delay before first execution
409    /// - `interval`: Interval between subsequent executions
410    /// - `callback`: Callback function, optional
411    ///
412    /// # Note
413    /// TaskId will be assigned when the task is inserted into the timing wheel.
414    ///
415    /// 创建一个新的周期性定时器任务
416    ///
417    /// # 参数
418    /// - `initial_delay`: 首次执行前的初始延迟
419    /// - `interval`: 后续执行之间的间隔
420    /// - `callback`: 回调函数,可选
421    ///
422    /// # 注意
423    /// TaskId 将在任务插入到时间轮时分配
424    #[inline]
425    pub fn new_periodic(
426        initial_delay: std::time::Duration,
427        interval: std::time::Duration,
428        callback: Option<CallbackWrapper>,
429        buffer_size: Option<NonZeroUsize>,
430    ) -> Self {
431        Self {
432            task_type: TaskType::Periodic {
433                interval,
434                buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()),
435            },
436            delay: initial_delay,
437            callback,
438        }
439    }
440
441    /// Get task type
442    ///
443    /// 获取任务类型
444    #[inline]
445    pub fn get_task_type(&self) -> &TaskType {
446        &self.task_type
447    }
448
449    /// Get the interval for periodic tasks
450    ///
451    /// Returns `None` for one-shot tasks
452    ///
453    /// 获取周期任务的间隔时间
454    ///
455    /// 对于一次性任务返回 `None`
456    #[inline]
457    pub fn get_interval(&self) -> Option<std::time::Duration> {
458        match self.task_type {
459            TaskType::Periodic { interval, .. } => Some(interval),
460            TaskType::OneShot => None,
461        }
462    }
463}
464
465/// Timer Task
466///
467/// Users interact via a two-step API
468/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
469/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
470///
471/// 定时器任务
472///
473/// 用户通过两步 API 与定时器交互
474/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
475/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
476pub struct TimerTaskWithCompletionNotifier {
477    /// Task type (one-shot or periodic)
478    ///
479    /// 任务类型(一次性或周期性)
480    pub(crate) task_type: TaskTypeWithCompletionNotifier,
481
482    /// User-specified delay duration (initial delay for periodic tasks)
483    ///
484    /// 用户指定的延迟时间(周期任务的初始延迟)
485    pub(crate) delay: std::time::Duration,
486
487    /// Async callback function, optional
488    ///
489    /// 异步回调函数,可选
490    pub(crate) callback: Option<CallbackWrapper>,
491}
492
493impl TimerTaskWithCompletionNotifier {
494    /// Create a new timer task with completion notifier from a timer task
495    ///
496    /// TaskId will be assigned later when inserted into the timing wheel
497    ///
498    /// 从定时器任务创建一个新的定时器任务完成通知器
499    ///
500    /// TaskId 将在插入到时间轮时分配
501    ///
502    /// # Parameters
503    /// - `task`: The timer task to create from
504    ///
505    /// # Returns
506    /// A tuple containing the new timer task with completion notifier and the completion receiver
507    ///
508    /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
509    ///
510    pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
511        match task.task_type {
512            TaskType::OneShot => {
513                // Create oneshot notifier and receiver with optimized single Arc allocation
514                // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
515                let (notifier, receiver) = channel();
516
517                (
518                    Self {
519                        task_type: TaskTypeWithCompletionNotifier::OneShot {
520                            completion_notifier: notifier,
521                        },
522                        delay: task.delay,
523                        callback: task.callback,
524                    },
525                    CompletionReceiver::OneShot(receiver),
526                )
527            }
528            TaskType::Periodic {
529                interval,
530                buffer_size,
531            } => {
532                // Use custom SPSC channel for high-performance periodic notification
533                // 使用自定义 SPSC 通道实现高性能周期通知
534                let (tx, rx) = spsc::channel(buffer_size);
535
536                let notifier = PeriodicCompletionNotifier(tx);
537                let receiver = PeriodicCompletionReceiver(rx);
538
539                (
540                    Self {
541                        task_type: TaskTypeWithCompletionNotifier::Periodic {
542                            interval,
543                            completion_notifier: notifier,
544                        },
545                        delay: task.delay,
546                        callback: task.callback,
547                    },
548                    CompletionReceiver::Periodic(receiver),
549                )
550            }
551        }
552    }
553
554    /// Into task type
555    ///
556    /// 将任务类型转换为完成通知器
557    #[inline]
558    pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
559        self.task_type
560    }
561
562    /// Get the interval for periodic tasks
563    ///
564    /// Returns `None` for one-shot tasks
565    ///
566    /// 获取周期任务的间隔时间
567    ///
568    /// 对于一次性任务返回 `None`
569    #[inline]
570    pub fn get_interval(&self) -> Option<std::time::Duration> {
571        self.task_type.get_interval()
572    }
573}
574
575pub(crate) struct TimerTaskForWheel {
576    pub(crate) task_id: TaskId,
577    pub(crate) task: TimerTaskWithCompletionNotifier,
578    pub(crate) deadline_tick: u64,
579    pub(crate) rounds: u32,
580}
581
582impl TimerTaskForWheel {
583    /// Create a new timer task for wheel with assigned TaskId
584    ///
585    /// 使用分配的 TaskId 创建一个新的定时器任务用于时间轮
586    ///
587    /// # Parameters
588    /// - `task_id`: The TaskId assigned by DeferredMap
589    /// - `task`: The timer task to create from
590    /// - `deadline_tick`: The deadline tick for the task
591    /// - `rounds`: The rounds for the task
592    ///
593    /// # Returns
594    /// A new timer task for wheel
595    ///
596    /// 返回一个新的定时器任务用于时间轮
597    ///
598    #[inline]
599    pub(crate) fn new_with_id(
600        task_id: TaskId,
601        task: TimerTaskWithCompletionNotifier,
602        deadline_tick: u64,
603        rounds: u32,
604    ) -> Self {
605        Self {
606            task_id,
607            task,
608            deadline_tick,
609            rounds,
610        }
611    }
612
613    /// Get task ID
614    ///
615    /// 获取任务 ID
616    #[inline]
617    pub fn get_id(&self) -> TaskId {
618        self.task_id
619    }
620
621    /// Into task type
622    ///
623    /// 将任务类型转换为完成通知器
624    #[inline]
625    pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
626        self.task.into_task_type()
627    }
628
629    /// Update the delay of the timer task
630    ///
631    /// 更新定时器任务的延迟
632    ///
633    /// # Parameters
634    /// - `delay`: The new delay for the task
635    ///
636    #[inline]
637    pub fn update_delay(&mut self, delay: std::time::Duration) {
638        self.task.delay = delay
639    }
640
641    /// Update the callback of the timer task
642    ///
643    /// 更新定时器任务的回调
644    ///
645    /// # Parameters
646    /// - `callback`: The new callback for the task
647    ///
648    #[inline]
649    pub fn update_callback(&mut self, callback: CallbackWrapper) {
650        self.task.callback = Some(callback)
651    }
652}
653
654/// Task location information (including level) for hierarchical timing wheel
655///
656/// Memory layout optimization: level field placed first to reduce padding via struct alignment
657///
658/// 任务位置信息(包括层级)用于分层时间轮
659///
660/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
661#[derive(Debug, Clone, Copy)]
662pub(crate) struct TaskLocation {
663    /// Slot index
664    ///
665    /// 槽索引
666    pub slot_index: usize,
667    /// Index position of task in slot Vec for O(1) cancellation
668    ///
669    /// 槽向量中任务的索引位置,用于 O(1) 取消
670    pub vec_index: usize,
671    /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
672    /// Using u8 instead of bool to reserve space for potential multi-layer expansion
673    ///
674    /// 层级:0 = L0(底层),1 = L1(上层)
675    /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
676    pub level: u8,
677}
678
679impl TaskLocation {
680    /// Create a new task location information
681    ///
682    /// 创建一个新的任务位置信息
683    #[inline(always)]
684    pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
685        Self {
686            slot_index,
687            vec_index,
688            level,
689        }
690    }
691}