kestrel_timer/
task.rs

1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use lite_sync::oneshot::{channel, Sender, Receiver};
7use lite_sync::spsc::{self, TryRecvError};
8
9/// One-shot task completion state constants
10/// 
11/// 一次性任务完成状态常量
12const ONESHOT_PENDING: u8 = 0;
13const ONESHOT_CALLED: u8 = 1;
14const ONESHOT_CANCELLED: u8 = 2;
15
16/// Task Completion Reason for Periodic Tasks
17///
18/// Indicates the reason for task completion, called or cancelled.
19/// 
20/// 任务完成原因,调用或取消。
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum TaskCompletion {
23    /// Task was called
24    /// 
25    /// 任务被调用
26    Called,
27    /// Task was cancelled
28    /// 
29    /// 任务被取消
30    Cancelled,
31}
32
33impl lite_sync::oneshot::State for TaskCompletion {
34    #[inline]
35    fn to_u8(&self) -> u8 {
36        match self {
37            TaskCompletion::Called => ONESHOT_CALLED,
38            TaskCompletion::Cancelled => ONESHOT_CANCELLED,
39        }
40    }
41    
42    #[inline]
43    fn from_u8(value: u8) -> Option<Self> {
44        match value {
45            ONESHOT_CALLED => Some(TaskCompletion::Called),
46            ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
47            _ => None,
48        }
49    }
50    
51    #[inline]
52    fn pending_value() -> u8 {
53        ONESHOT_PENDING
54    }
55}
56
57
58
59/// Unique identifier for timer tasks
60/// 
61/// Now wraps a DeferredMap key (u64) which includes generation information
62/// for safe reference and prevention of use-after-free.
63/// 
64/// 定时器任务唯一标识符
65/// 
66/// 现在封装 DeferredMap key (u64),包含代数信息以实现安全引用和防止释放后使用
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub struct TaskId(u64);
69
70impl TaskId {
71    /// Create TaskId from DeferredMap key (internal use)
72    /// 
73    /// 从 DeferredMap key 创建 TaskId (内部使用)
74    #[inline]
75    pub(crate) fn from_key(key: u64) -> Self {
76        TaskId(key)
77    }
78
79    /// Get the DeferredMap key
80    /// 
81    /// 获取 DeferredMap key
82    #[inline]
83    pub(crate) fn key(&self) -> u64 {
84        self.0
85    }
86
87    /// Get the numeric value of the task ID
88    /// 
89    /// 获取任务 ID 的数值
90    #[inline]
91    pub fn as_u64(&self) -> u64 {
92        self.0
93    }
94}
95
96pub struct TaskHandle {
97    handle: deferred_map::Handle,
98}
99
100impl TaskHandle {
101    /// Create a new task handle
102    /// 
103    /// 创建一个新的任务句柄
104    #[inline]
105    pub(crate) fn new(handle: deferred_map::Handle) -> Self {
106        Self { handle }
107    }
108
109    /// Get the task ID
110    /// 
111    /// 获取任务 ID
112    #[inline]
113    pub fn task_id(&self) -> TaskId {
114        TaskId::from_key(self.handle.key())
115    }
116
117    /// Convert to deferred map handle
118    /// 
119    /// 转换为 deferred map 句柄
120    #[inline]
121    pub(crate) fn into_handle(self) -> deferred_map::Handle {
122        self.handle
123    }
124}
125
126/// Timer Callback Trait
127/// 
128/// Types implementing this trait can be used as timer callbacks.
129/// 
130/// 可实现此特性的类型可以作为定时器回调函数。
131/// 
132/// # Examples (示例)
133/// 
134/// ```
135/// use kestrel_timer::task::TimerCallback;
136/// use std::future::Future;
137/// use std::pin::Pin;
138/// 
139/// struct MyCallback;
140/// 
141/// impl TimerCallback for MyCallback {
142///     fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
143///         Box::pin(async {
144///             println!("Timer callback executed!");
145///         })
146///     }
147/// }
148/// ```
149pub trait TimerCallback: Send + Sync + 'static {
150    /// Execute callback, returns a Future
151    /// 
152    /// 执行回调函数,返回一个 Future
153    fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
154}
155
156/// Implement TimerCallback trait for closures
157/// 
158/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
159/// 
160/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
161impl<F, Fut> TimerCallback for F
162where
163    F: Fn() -> Fut + Send + Sync + 'static,
164    Fut: Future<Output = ()> + Send + 'static,
165{
166    fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
167        Box::pin(self())
168    }
169}
170
171/// Callback wrapper for standardized callback creation and management
172/// 
173/// Callback 包装器,用于标准化回调创建和管理
174/// 
175/// # Examples (示例)
176/// 
177/// ```
178/// use kestrel_timer::CallbackWrapper;
179/// 
180/// let callback = CallbackWrapper::new(|| async {
181///     println!("Timer callback executed!");
182/// });
183/// ```
184#[derive(Clone)]
185pub struct CallbackWrapper {
186    callback: Arc<dyn TimerCallback>,
187}
188
189impl CallbackWrapper {
190    /// Create a new callback wrapper
191    /// 
192    /// # Parameters
193    /// - `callback`: Callback object implementing TimerCallback trait
194    /// 
195    /// # 创建一个新的回调包装器
196    /// 
197    /// # 参数
198    /// - `callback`: 实现 TimerCallback 特性的回调对象
199    /// 
200    /// # Examples (示例)
201    /// 
202    /// ```
203    /// use kestrel_timer::CallbackWrapper;
204    /// 
205    /// let callback = CallbackWrapper::new(|| async {
206    ///     println!("Timer fired!"); // 定时器触发
207    /// });
208    /// ```
209    #[inline]
210    pub fn new(callback: impl TimerCallback) -> Self {
211        Self {
212            callback: Arc::new(callback),
213        }
214    }
215
216    /// Call the callback function
217    /// 
218    /// 调用回调函数
219    #[inline]
220    pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
221        self.callback.call()
222    }
223}
224
225/// Task type enum to distinguish between one-shot and periodic timers
226/// 
227/// 任务类型枚举,用于区分一次性和周期性定时器
228#[derive(Clone)]
229pub enum TaskType {
230    /// One-shot timer: executes once and completes
231    /// 
232    /// 一次性定时器:执行一次后完成
233    OneShot,
234    
235    /// Periodic timer: repeats at fixed intervals
236    /// 
237    /// 周期性定时器:按固定间隔重复执行
238    Periodic {
239        /// Period interval for periodic tasks
240        /// 
241        /// 周期任务的间隔时间
242        interval: std::time::Duration,
243        /// Buffer size for periodic task completion notifier
244        /// 
245        /// 周期性任务完成通知器的缓冲区大小
246        buffer_size: NonZeroUsize,
247    },
248}
249
250/// Task type enum to distinguish between one-shot and periodic timers
251/// 
252/// 任务类型枚举,用于区分一次性和周期性定时器
253pub enum TaskTypeWithCompletionNotifier {
254    /// One-shot timer: executes once and completes
255    /// 
256    /// 一次性定时器:执行一次后完成
257    OneShot {
258        completion_notifier: Sender<TaskCompletion>,
259    },
260    
261    /// Periodic timer: repeats at fixed intervals
262    /// 
263    /// 周期性定时器:按固定间隔重复执行
264    Periodic {
265        /// Period interval for periodic tasks
266        /// 
267        /// 周期任务的间隔时间
268        interval: std::time::Duration,
269        /// Completion notifier for periodic tasks
270        /// 
271        /// 周期性任务完成通知器
272        completion_notifier: PeriodicCompletionNotifier,
273    },
274}
275
276impl TaskTypeWithCompletionNotifier {
277    /// Get the interval for periodic tasks
278    /// 
279    /// Returns `None` for one-shot tasks
280    /// 
281    /// 获取周期任务的间隔时间
282    /// 
283    /// 对于一次性任务返回 `None`
284    #[inline]
285    pub fn get_interval(&self) -> Option<std::time::Duration> {
286        match self {
287            TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(*interval),
288            TaskTypeWithCompletionNotifier::OneShot { .. } => None,
289        }
290    }
291}
292
293
294/// Completion notifier for periodic tasks
295/// 
296/// Uses custom SPSC channel for high-performance, low-latency notification
297/// 
298/// 周期任务完成通知器
299/// 
300/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
301pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
302
303/// Completion receiver for periodic tasks
304/// 
305/// 周期任务完成通知接收器
306pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
307
308impl PeriodicCompletionReceiver {
309    /// Try to receive a completion notification
310    /// 
311    /// 尝试接收完成通知
312    #[inline]
313    pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
314        self.0.try_recv()
315    }
316    
317    /// Receive a completion notification
318    /// 
319    /// 接收完成通知
320    #[inline]
321    pub async fn recv(&mut self) -> Option<TaskCompletion> {
322        self.0.recv().await
323    }
324}
325
326
327/// Completion notifier for one-shot tasks
328/// 
329/// 一次性任务完成通知器
330pub enum CompletionNotifier {
331    OneShot(Sender<TaskCompletion>),
332    Periodic(PeriodicCompletionNotifier),
333}
334
335/// Completion receiver for one-shot and periodic tasks
336/// 
337/// 一次性和周期任务完成通知接收器
338pub enum CompletionReceiver {
339    OneShot(Receiver<TaskCompletion>),
340    Periodic(PeriodicCompletionReceiver),
341}
342
343/// Timer Task
344/// 
345/// Users interact via a two-step API
346/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
347/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
348/// 
349/// TaskId is assigned when the task is inserted into the timing wheel.
350/// 
351/// 定时器任务
352/// 
353/// 用户通过两步 API 与定时器交互
354/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
355/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
356/// 
357/// TaskId 在任务插入到时间轮时分配
358pub struct TimerTask {
359    /// Task type (one-shot or periodic)
360    /// 
361    /// 任务类型(一次性或周期性)
362    pub(crate) task_type: TaskType,
363    
364    /// User-specified delay duration (initial delay for periodic tasks)
365    /// 
366    /// 用户指定的延迟时间(周期任务的初始延迟)
367    pub(crate) delay: std::time::Duration,
368    
369    /// Async callback function, optional
370    /// 
371    /// 异步回调函数,可选
372    pub(crate) callback: Option<CallbackWrapper>,
373}
374
375impl TimerTask {
376    /// Create a new one-shot timer task
377    /// 
378    /// # Parameters
379    /// - `delay`: Delay duration before task execution
380    /// - `callback`: Callback function, optional
381    /// 
382    /// # Note
383    /// TaskId will be assigned when the task is inserted into the timing wheel.
384    /// 
385    /// 创建一个新的一次性定时器任务
386    /// 
387    /// # 参数
388    /// - `delay`: 任务执行前的延迟时间
389    /// - `callback`: 回调函数,可选
390    /// 
391    /// # 注意
392    /// TaskId 将在任务插入到时间轮时分配
393    #[inline]
394    pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
395        Self {
396            task_type: TaskType::OneShot,
397            delay,
398            callback,
399        }
400    }
401    
402    /// Create a new periodic timer task
403    /// 
404    /// # Parameters
405    /// - `initial_delay`: Initial delay before first execution
406    /// - `interval`: Interval between subsequent executions
407    /// - `callback`: Callback function, optional
408    /// 
409    /// # Note
410    /// TaskId will be assigned when the task is inserted into the timing wheel.
411    /// 
412    /// 创建一个新的周期性定时器任务
413    /// 
414    /// # 参数
415    /// - `initial_delay`: 首次执行前的初始延迟
416    /// - `interval`: 后续执行之间的间隔
417    /// - `callback`: 回调函数,可选
418    /// 
419    /// # 注意
420    /// TaskId 将在任务插入到时间轮时分配
421    #[inline]
422    pub fn new_periodic(
423        initial_delay: std::time::Duration,
424        interval: std::time::Duration,
425        callback: Option<CallbackWrapper>,
426        buffer_size: Option<NonZeroUsize>,
427    ) -> Self {
428        Self {
429            task_type: TaskType::Periodic { interval, buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()) },
430            delay: initial_delay,
431            callback,
432        }
433    }
434    
435    /// Get task type
436    /// 
437    /// 获取任务类型
438    #[inline]
439    pub fn get_task_type(&self) -> &TaskType {
440        &self.task_type
441    }
442
443    /// Get the interval for periodic tasks
444    /// 
445    /// Returns `None` for one-shot tasks
446    /// 
447    /// 获取周期任务的间隔时间
448    /// 
449    /// 对于一次性任务返回 `None`
450    #[inline]
451    pub fn get_interval(&self) -> Option<std::time::Duration> {
452        match self.task_type {
453            TaskType::Periodic { interval, .. } => Some(interval),
454            TaskType::OneShot => None,
455        }
456    }
457
458}
459
460/// Timer Task
461/// 
462/// Users interact via a two-step API
463/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
464/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
465/// 
466/// 定时器任务
467/// 
468/// 用户通过两步 API 与定时器交互
469/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
470/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
471pub struct TimerTaskWithCompletionNotifier {
472    /// Task type (one-shot or periodic)
473    /// 
474    /// 任务类型(一次性或周期性)
475    pub(crate) task_type: TaskTypeWithCompletionNotifier,
476    
477    /// User-specified delay duration (initial delay for periodic tasks)
478    /// 
479    /// 用户指定的延迟时间(周期任务的初始延迟)
480    pub(crate) delay: std::time::Duration,
481    
482    /// Async callback function, optional
483    /// 
484    /// 异步回调函数,可选
485    pub(crate) callback: Option<CallbackWrapper>,
486}
487
488impl TimerTaskWithCompletionNotifier {
489
490    /// Create a new timer task with completion notifier from a timer task
491    /// 
492    /// TaskId will be assigned later when inserted into the timing wheel
493    /// 
494    /// 从定时器任务创建一个新的定时器任务完成通知器
495    /// 
496    /// TaskId 将在插入到时间轮时分配
497    /// 
498    /// # Parameters
499    /// - `task`: The timer task to create from
500    /// 
501    /// # Returns
502    /// A tuple containing the new timer task with completion notifier and the completion receiver
503    /// 
504    /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
505    /// 
506    pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
507        match task.task_type {
508            TaskType::OneShot => {
509                // Create oneshot notifier and receiver with optimized single Arc allocation
510                // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
511                let (notifier, receiver) = channel();
512
513                (Self {
514                    task_type: TaskTypeWithCompletionNotifier::OneShot { 
515                        completion_notifier: notifier 
516                    },
517                    delay: task.delay,
518                    callback: task.callback,
519                }, CompletionReceiver::OneShot(receiver))
520            },
521            TaskType::Periodic { interval, buffer_size } => {
522                // Use custom SPSC channel for high-performance periodic notification
523                // 使用自定义 SPSC 通道实现高性能周期通知
524                let (tx, rx) = spsc::channel(buffer_size);
525                
526                let notifier = PeriodicCompletionNotifier(tx);
527                let receiver = PeriodicCompletionReceiver(rx);
528                
529                (Self {
530                    task_type: TaskTypeWithCompletionNotifier::Periodic { 
531                        interval, 
532                        completion_notifier: notifier 
533                    },
534                    delay: task.delay,
535                    callback: task.callback,
536                }, CompletionReceiver::Periodic(receiver))
537            },
538        }
539    }
540
541    /// Into task type
542    /// 
543    /// 将任务类型转换为完成通知器
544    #[inline]
545    pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
546        self.task_type
547    }
548
549    /// Get the interval for periodic tasks
550    /// 
551    /// Returns `None` for one-shot tasks
552    /// 
553    /// 获取周期任务的间隔时间
554    /// 
555    /// 对于一次性任务返回 `None`
556    #[inline]
557    pub fn get_interval(&self) -> Option<std::time::Duration> {
558        self.task_type.get_interval()
559    }
560}
561
562pub(crate) struct TimerTaskForWheel {
563    pub(crate) task_id: TaskId,
564    pub(crate) task: TimerTaskWithCompletionNotifier,
565    pub(crate) deadline_tick: u64,
566    pub(crate) rounds: u32,
567}
568
569impl TimerTaskForWheel {
570    /// Create a new timer task for wheel with assigned TaskId
571    /// 
572    /// 使用分配的 TaskId 创建一个新的定时器任务用于时间轮
573    /// 
574    /// # Parameters
575    /// - `task_id`: The TaskId assigned by DeferredMap
576    /// - `task`: The timer task to create from
577    /// - `deadline_tick`: The deadline tick for the task
578    /// - `rounds`: The rounds for the task
579    /// 
580    /// # Returns
581    /// A new timer task for wheel
582    /// 
583    /// 返回一个新的定时器任务用于时间轮
584    /// 
585    #[inline]
586    pub(crate) fn new_with_id(
587        task_id: TaskId,
588        task: TimerTaskWithCompletionNotifier,
589        deadline_tick: u64,
590        rounds: u32
591    ) -> Self {
592        Self { task_id, task, deadline_tick, rounds }
593    }
594
595    /// Get task ID
596    /// 
597    /// 获取任务 ID
598    #[inline]
599    pub fn get_id(&self) -> TaskId {
600        self.task_id
601    }
602
603    /// Into task type
604    /// 
605    /// 将任务类型转换为完成通知器
606    #[inline]
607    pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
608        self.task.into_task_type()
609    }
610
611    /// Update the delay of the timer task
612    /// 
613    /// 更新定时器任务的延迟
614    /// 
615    /// # Parameters
616    /// - `delay`: The new delay for the task
617    /// 
618    #[inline]
619    pub fn update_delay(&mut self, delay: std::time::Duration) {
620        self.task.delay = delay
621    }
622
623    /// Update the callback of the timer task
624    /// 
625    /// 更新定时器任务的回调
626    /// 
627    /// # Parameters
628    /// - `callback`: The new callback for the task
629    /// 
630    #[inline]
631    pub fn update_callback(&mut self, callback: CallbackWrapper) {
632        self.task.callback = Some(callback)
633    }
634}
635
636/// Task location information (including level) for hierarchical timing wheel
637/// 
638/// Memory layout optimization: level field placed first to reduce padding via struct alignment
639/// 
640/// 任务位置信息(包括层级)用于分层时间轮
641/// 
642/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
643#[derive(Debug, Clone, Copy)]
644pub(crate) struct TaskLocation {
645    /// Slot index
646    /// 
647    /// 槽索引
648    pub slot_index: usize,
649    /// Index position of task in slot Vec for O(1) cancellation
650    /// 
651    /// 槽向量中任务的索引位置,用于 O(1) 取消
652    pub vec_index: usize,
653    /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
654    /// Using u8 instead of bool to reserve space for potential multi-layer expansion
655    /// 
656    /// 层级:0 = L0(底层),1 = L1(上层)
657    /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
658    pub level: u8,
659}
660
661impl TaskLocation {
662    /// Create a new task location information
663    /// 
664    /// 创建一个新的任务位置信息
665    #[inline(always)]
666    pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
667        Self {
668            slot_index,
669            vec_index,
670            level,
671        }
672    }
673}
674