kestrel_timer/
task.rs

1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use lite_sync::oneshot::{channel, Sender, Receiver};
8use lite_sync::spsc::{self, TryRecvError};
9
10/// Global unique task ID generator
11/// 
12/// 全局唯一任务 ID 生成器
13static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
14
15/// One-shot task completion state constants
16/// 
17/// 一次性任务完成状态常量
18const ONESHOT_PENDING: u8 = 0;
19const ONESHOT_CALLED: u8 = 1;
20const ONESHOT_CANCELLED: u8 = 2;
21
22/// Task Completion Reason for Periodic Tasks
23///
24/// Indicates the reason for task completion, called or cancelled.
25/// 
26/// 任务完成原因,调用或取消。
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum TaskCompletion {
29    /// Task was called
30    /// 
31    /// 任务被调用
32    Called,
33    /// Task was cancelled
34    /// 
35    /// 任务被取消
36    Cancelled,
37}
38
39impl lite_sync::oneshot::State for TaskCompletion {
40    #[inline]
41    fn to_u8(&self) -> u8 {
42        match self {
43            TaskCompletion::Called => ONESHOT_CALLED,
44            TaskCompletion::Cancelled => ONESHOT_CANCELLED,
45        }
46    }
47    
48    #[inline]
49    fn from_u8(value: u8) -> Option<Self> {
50        match value {
51            ONESHOT_CALLED => Some(TaskCompletion::Called),
52            ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
53            _ => None,
54        }
55    }
56    
57    #[inline]
58    fn pending_value() -> u8 {
59        ONESHOT_PENDING
60    }
61}
62
63
64
65/// Unique identifier for timer tasks
66/// 
67/// 定时器任务唯一标识符
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
69pub struct TaskId(u64);
70
71impl TaskId {
72    /// Generate a new unique task ID (internal use)
73    /// 
74    /// 生成一个新的唯一任务 ID (内部使用)
75    #[inline]
76    pub(crate) fn new() -> Self {
77        TaskId(NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed))
78    }
79
80    /// Get the numeric value of the task ID
81    /// 
82    /// 获取任务 ID 的数值
83    #[inline]
84    pub fn as_u64(&self) -> u64 {
85        self.0
86    }
87}
88
89impl Default for TaskId {
90    #[inline]
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96/// Timer Callback Trait
97/// 
98/// Types implementing this trait can be used as timer callbacks.
99/// 
100/// 可实现此特性的类型可以作为定时器回调函数。
101/// 
102/// # Examples (示例)
103/// 
104/// ```
105/// use kestrel_timer::task::TimerCallback;
106/// use std::future::Future;
107/// use std::pin::Pin;
108/// 
109/// struct MyCallback;
110/// 
111/// impl TimerCallback for MyCallback {
112///     fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
113///         Box::pin(async {
114///             println!("Timer callback executed!");
115///         })
116///     }
117/// }
118/// ```
119pub trait TimerCallback: Send + Sync + 'static {
120    /// Execute callback, returns a Future
121    /// 
122    /// 执行回调函数,返回一个 Future
123    fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
124}
125
126/// Implement TimerCallback trait for closures
127/// 
128/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
129/// 
130/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
131impl<F, Fut> TimerCallback for F
132where
133    F: Fn() -> Fut + Send + Sync + 'static,
134    Fut: Future<Output = ()> + Send + 'static,
135{
136    fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
137        Box::pin(self())
138    }
139}
140
141/// Callback wrapper for standardized callback creation and management
142/// 
143/// Callback 包装器,用于标准化回调创建和管理
144/// 
145/// # Examples (示例)
146/// 
147/// ```
148/// use kestrel_timer::CallbackWrapper;
149/// 
150/// let callback = CallbackWrapper::new(|| async {
151///     println!("Timer callback executed!");
152/// });
153/// ```
154#[derive(Clone)]
155pub struct CallbackWrapper {
156    callback: Arc<dyn TimerCallback>,
157}
158
159impl CallbackWrapper {
160    /// Create a new callback wrapper
161    /// 
162    /// # Parameters
163    /// - `callback`: Callback object implementing TimerCallback trait
164    /// 
165    /// # 创建一个新的回调包装器
166    /// 
167    /// # 参数
168    /// - `callback`: 实现 TimerCallback 特性的回调对象
169    /// 
170    /// # Examples (示例)
171    /// 
172    /// ```
173    /// use kestrel_timer::CallbackWrapper;
174    /// 
175    /// let callback = CallbackWrapper::new(|| async {
176    ///     println!("Timer fired!"); // 定时器触发
177    /// });
178    /// ```
179    #[inline]
180    pub fn new(callback: impl TimerCallback) -> Self {
181        Self {
182            callback: Arc::new(callback),
183        }
184    }
185
186    /// Call the callback function
187    /// 
188    /// 调用回调函数
189    #[inline]
190    pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
191        self.callback.call()
192    }
193}
194
195/// Task type enum to distinguish between one-shot and periodic timers
196/// 
197/// 任务类型枚举,用于区分一次性和周期性定时器
198#[derive(Clone)]
199pub enum TaskType {
200    /// One-shot timer: executes once and completes
201    /// 
202    /// 一次性定时器:执行一次后完成
203    OneShot,
204    
205    /// Periodic timer: repeats at fixed intervals
206    /// 
207    /// 周期性定时器:按固定间隔重复执行
208    Periodic {
209        /// Period interval for periodic tasks
210        /// 
211        /// 周期任务的间隔时间
212        interval: std::time::Duration,
213        /// Buffer size for periodic task completion notifier
214        /// 
215        /// 周期性任务完成通知器的缓冲区大小
216        buffer_size: NonZeroUsize,
217    },
218}
219
220/// Task type enum to distinguish between one-shot and periodic timers
221/// 
222/// 任务类型枚举,用于区分一次性和周期性定时器
223pub enum TaskTypeWithCompletionNotifier {
224    /// One-shot timer: executes once and completes
225    /// 
226    /// 一次性定时器:执行一次后完成
227    OneShot {
228        completion_notifier: Sender<TaskCompletion>,
229    },
230    
231    /// Periodic timer: repeats at fixed intervals
232    /// 
233    /// 周期性定时器:按固定间隔重复执行
234    Periodic {
235        /// Period interval for periodic tasks
236        /// 
237        /// 周期任务的间隔时间
238        interval: std::time::Duration,
239        /// Completion notifier for periodic tasks
240        /// 
241        /// 周期性任务完成通知器
242        completion_notifier: PeriodicCompletionNotifier,
243    },
244}
245
246
247
248/// Completion notifier for periodic tasks
249/// 
250/// Uses custom SPSC channel for high-performance, low-latency notification
251/// 
252/// 周期任务完成通知器
253/// 
254/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
255pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
256
257/// Completion receiver for periodic tasks
258/// 
259/// 周期任务完成通知接收器
260pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
261
262impl PeriodicCompletionReceiver {
263    /// Try to receive a completion notification
264    /// 
265    /// 尝试接收完成通知
266    #[inline]
267    pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
268        self.0.try_recv()
269    }
270    
271    /// Receive a completion notification
272    /// 
273    /// 接收完成通知
274    #[inline]
275    pub async fn recv(&mut self) -> Option<TaskCompletion> {
276        self.0.recv().await
277    }
278}
279
280
281/// Completion notifier for one-shot tasks
282/// 
283/// 一次性任务完成通知器
284pub enum CompletionNotifier {
285    OneShot(Sender<TaskCompletion>),
286    Periodic(PeriodicCompletionNotifier),
287}
288
289/// Completion receiver for one-shot and periodic tasks
290/// 
291/// 一次性和周期任务完成通知接收器
292pub enum CompletionReceiver {
293    OneShot(Receiver<TaskCompletion>),
294    Periodic(PeriodicCompletionReceiver),
295}
296
297/// Timer Task
298/// 
299/// Users interact via a two-step API
300/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
301/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
302/// 
303/// 定时器任务
304/// 
305/// 用户通过两步 API 与定时器交互
306/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
307/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
308pub struct TimerTask {
309    /// Unique task identifier
310    /// 
311    /// 唯一任务标识符
312    pub(crate) id: TaskId,
313    
314    /// Task type (one-shot or periodic)
315    /// 
316    /// 任务类型(一次性或周期性)
317    pub(crate) task_type: TaskType,
318    
319    /// User-specified delay duration (initial delay for periodic tasks)
320    /// 
321    /// 用户指定的延迟时间(周期任务的初始延迟)
322    pub(crate) delay: std::time::Duration,
323    
324    /// Async callback function, optional
325    /// 
326    /// 异步回调函数,可选
327    pub(crate) callback: Option<CallbackWrapper>,
328}
329
330impl TimerTask {
331    /// Create a new one-shot timer task
332    /// 
333    /// # Parameters
334    /// - `delay`: Delay duration before task execution
335    /// - `callback`: Callback function, optional
336    /// 
337    /// # Note
338    /// This is an internal method, users should use `TimerWheel::create_task()` or `TimerService::create_task()` to create tasks.
339    /// 
340    /// 创建一个新的一次性定时器任务
341    /// 
342    /// # 参数
343    /// - `delay`: 任务执行前的延迟时间
344    /// - `callback`: 回调函数,可选
345    /// 
346    #[inline]
347    pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
348        Self {
349            id: TaskId::new(),
350            task_type: TaskType::OneShot,
351            delay,
352            callback,
353        }
354    }
355    
356    /// Create a new periodic timer task
357    /// 
358    /// # Parameters
359    /// - `initial_delay`: Initial delay before first execution
360    /// - `interval`: Interval between subsequent executions
361    /// - `callback`: Callback function, optional
362    /// 
363    /// # Note
364    /// This is an internal method, users should use `TimerWheel::create_periodic_task()` or `TimerService::create_periodic_task()` to create tasks.
365    /// 
366    /// 创建一个新的周期性定时器任务
367    /// 
368    /// # 参数
369    /// - `initial_delay`: 首次执行前的初始延迟
370    /// - `interval`: 后续执行之间的间隔
371    /// - `callback`: 回调函数,可选
372    /// 
373    #[inline]
374    pub fn new_periodic(
375        initial_delay: std::time::Duration,
376        interval: std::time::Duration,
377        callback: Option<CallbackWrapper>,
378        buffer_size: Option<NonZeroUsize>,
379    ) -> Self {
380        Self {
381            id: TaskId::new(),
382            task_type: TaskType::Periodic { interval, buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()) },
383            delay: initial_delay,
384            callback,
385        }
386    }
387
388    /// Get task ID
389    /// 
390    /// 获取任务 ID
391    /// 
392    /// # Examples (示例)
393    /// 
394    /// ```no_run
395    /// use kestrel_timer::TimerTask;
396    /// use std::time::Duration;
397    /// 
398    /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
399    /// let task_id = task.get_id();
400    /// println!("Task ID: {:?}", task_id);
401    /// ```
402    #[inline]
403    pub fn get_id(&self) -> TaskId {
404        self.id
405    }
406    
407    /// Get task type
408    /// 
409    /// 获取任务类型
410    #[inline]
411    pub fn get_task_type(&self) -> &TaskType {
412        &self.task_type
413    }
414
415    /// Get the interval for periodic tasks
416    /// 
417    /// Returns `None` for one-shot tasks
418    /// 
419    /// 获取周期任务的间隔时间
420    /// 
421    /// 对于一次性任务返回 `None`
422    #[inline]
423    pub fn get_interval(&self) -> Option<std::time::Duration> {
424        match self.task_type {
425            TaskType::Periodic { interval, .. } => Some(interval),
426            TaskType::OneShot { .. } => None,
427        }
428    }
429
430}
431
432/// Timer Task
433/// 
434/// Users interact via a two-step API
435/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
436/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
437/// 
438/// 定时器任务
439/// 
440/// 用户通过两步 API 与定时器交互
441/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
442/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
443pub struct TimerTaskWithCompletionNotifier {
444    /// Unique task identifier
445    /// 
446    /// 唯一任务标识符
447    pub(crate) id: TaskId,
448    
449    /// Task type (one-shot or periodic)
450    /// 
451    /// 任务类型(一次性或周期性)
452    pub(crate) task_type: TaskTypeWithCompletionNotifier,
453    
454    /// User-specified delay duration (initial delay for periodic tasks)
455    /// 
456    /// 用户指定的延迟时间(周期任务的初始延迟)
457    pub(crate) delay: std::time::Duration,
458    
459    /// Async callback function, optional
460    /// 
461    /// 异步回调函数,可选
462    pub(crate) callback: Option<CallbackWrapper>,
463}
464
465impl TimerTaskWithCompletionNotifier {
466
467    /// Create a new timer task with completion notifier from a timer task
468    /// 
469    /// 从定时器任务创建一个新的定时器任务完成通知器
470    /// 
471    /// # Parameters
472    /// - `task`: The timer task to create from
473    /// - `buffer_size`: The buffer size for the periodic task completion notifier
474    /// 
475    /// # Returns
476    /// A tuple containing the new timer task with completion notifier and the completion receiver
477    /// 
478    /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
479    /// 
480    pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
481        match task.task_type {
482            TaskType::OneShot { .. } => {
483                // Create oneshot notifier and receiver with optimized single Arc allocation
484                // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
485                let (notifier, receiver) = channel();
486
487                (Self {
488                    id: task.id,
489                    task_type: TaskTypeWithCompletionNotifier::OneShot { 
490                        completion_notifier: notifier 
491                    },
492                    delay: task.delay,
493                    callback: task.callback,
494                }, CompletionReceiver::OneShot(receiver))
495            },
496            TaskType::Periodic { interval, buffer_size } => {
497                // Use custom SPSC channel for high-performance periodic notification
498                // 使用自定义 SPSC 通道实现高性能周期通知
499                let (tx, rx) = spsc::channel(buffer_size);
500                
501                let notifier = PeriodicCompletionNotifier(tx);
502                let receiver = PeriodicCompletionReceiver(rx);
503                
504                (Self {
505                    id: task.id,
506                    task_type: TaskTypeWithCompletionNotifier::Periodic { 
507                        interval, 
508                        completion_notifier: notifier 
509                    },
510                    delay: task.delay,
511                    callback: task.callback,
512                }, CompletionReceiver::Periodic(receiver))
513            },
514        }
515    }
516
517    /// Get task ID
518    /// 
519    /// 获取任务 ID
520    /// 
521    /// # Examples (示例)
522    /// 
523    /// ```no_run
524    /// use kestrel_timer::TimerTask;
525    /// use std::time::Duration;
526    /// 
527    /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
528    /// let task_id = task.get_id();
529    /// println!("Task ID: {:?}", task_id);
530    /// ```
531    #[inline]
532    pub fn get_id(&self) -> TaskId {
533        self.id
534    }
535
536    /// Into task type
537    /// 
538    /// 将任务类型转换为完成通知器
539    #[inline]
540    pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
541        self.task_type
542    }
543
544    /// Get the interval for periodic tasks
545    /// 
546    /// Returns `None` for one-shot tasks
547    /// 
548    /// 获取周期任务的间隔时间
549    /// 
550    /// 对于一次性任务返回 `None`
551    #[inline]
552    pub fn get_interval(&self) -> Option<std::time::Duration> {
553        match self.task_type {
554            TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(interval),
555            TaskTypeWithCompletionNotifier::OneShot { .. } => None,
556        }
557    }
558}
559
560pub(crate) struct TimerTaskForWheel {
561    pub(crate) task: TimerTaskWithCompletionNotifier,
562    pub(crate) deadline_tick: u64,
563    pub(crate) rounds: u32,
564}
565
566impl TimerTaskForWheel {
567    /// Create a new timer task for wheel
568    /// 
569    /// 创建一个新的定时器任务用于时间轮
570    /// 
571    /// # Parameters
572    /// - `task`: The timer task to create from
573    /// - `deadline_tick`: The deadline tick for the task
574    /// - `rounds`: The rounds for the task
575    /// 
576    /// # Returns
577    /// A new timer task for wheel
578    /// 
579    /// 返回一个新的定时器任务用于时间轮
580    /// 
581    #[inline]
582    pub(crate) fn new(task: TimerTaskWithCompletionNotifier, deadline_tick: u64, rounds: u32) -> Self {
583        Self { task, deadline_tick, rounds }
584    }
585
586    /// Get task ID
587    /// 
588    /// 获取任务 ID
589    #[inline]
590    pub fn get_id(&self) -> TaskId {
591        self.task.get_id()
592    }
593
594    /// Into task type
595    /// 
596    /// 将任务类型转换为完成通知器
597    #[inline]
598    pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
599        self.task.into_task_type()
600    }
601
602    /// Update the delay of the timer task
603    /// 
604    /// 更新定时器任务的延迟
605    /// 
606    /// # Parameters
607    /// - `delay`: The new delay for the task
608    /// 
609    #[inline]
610    pub fn update_delay(&mut self, delay: std::time::Duration) {
611        self.task.delay = delay
612    }
613
614    /// Update the callback of the timer task
615    /// 
616    /// 更新定时器任务的回调
617    /// 
618    /// # Parameters
619    /// - `callback`: The new callback for the task
620    /// 
621    #[inline]
622    pub fn update_callback(&mut self, callback: CallbackWrapper) {
623        self.task.callback = Some(callback)
624    }
625}
626
627/// Task location information (including level) for hierarchical timing wheel
628/// 
629/// Memory layout optimization: level field placed first to reduce padding via struct alignment
630/// 
631/// 任务位置信息(包括层级)用于分层时间轮
632/// 
633/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
634#[derive(Debug, Clone, Copy)]
635pub(crate) struct TaskLocation {
636    /// Slot index
637    /// 
638    /// 槽索引
639    pub slot_index: usize,
640    /// Index position of task in slot Vec for O(1) cancellation
641    /// 
642    /// 槽向量中任务的索引位置,用于 O(1) 取消
643    pub vec_index: usize,
644    /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
645    /// Using u8 instead of bool to reserve space for potential multi-layer expansion
646    /// 
647    /// 层级:0 = L0(底层),1 = L1(上层)
648    /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
649    pub level: u8,
650}
651
652impl TaskLocation {
653    /// Create a new task location information
654    /// 
655    /// 创建一个新的任务位置信息
656    #[inline(always)]
657    pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
658        Self {
659            slot_index,
660            vec_index,
661            level,
662        }
663    }
664}
665