kestrel_timer/task.rs
1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use lite_sync::oneshot::{channel, Sender, Receiver};
8use lite_sync::spsc::{self, TryRecvError};
9
10/// Global unique task ID generator
11///
12/// 全局唯一任务 ID 生成器
13static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
14
15/// One-shot task completion state constants
16///
17/// 一次性任务完成状态常量
18const ONESHOT_PENDING: u8 = 0;
19const ONESHOT_CALLED: u8 = 1;
20const ONESHOT_CANCELLED: u8 = 2;
21
22/// Task Completion Reason for Periodic Tasks
23///
24/// Indicates the reason for task completion, called or cancelled.
25///
26/// 任务完成原因,调用或取消。
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum TaskCompletion {
29 /// Task was called
30 ///
31 /// 任务被调用
32 Called,
33 /// Task was cancelled
34 ///
35 /// 任务被取消
36 Cancelled,
37}
38
39impl lite_sync::oneshot::State for TaskCompletion {
40 #[inline]
41 fn to_u8(&self) -> u8 {
42 match self {
43 TaskCompletion::Called => ONESHOT_CALLED,
44 TaskCompletion::Cancelled => ONESHOT_CANCELLED,
45 }
46 }
47
48 #[inline]
49 fn from_u8(value: u8) -> Option<Self> {
50 match value {
51 ONESHOT_CALLED => Some(TaskCompletion::Called),
52 ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
53 _ => None,
54 }
55 }
56
57 #[inline]
58 fn pending_value() -> u8 {
59 ONESHOT_PENDING
60 }
61}
62
63
64
65/// Unique identifier for timer tasks
66///
67/// 定时器任务唯一标识符
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
69pub struct TaskId(u64);
70
71impl TaskId {
72 /// Generate a new unique task ID (internal use)
73 ///
74 /// 生成一个新的唯一任务 ID (内部使用)
75 #[inline]
76 pub(crate) fn new() -> Self {
77 TaskId(NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed))
78 }
79
80 /// Get the numeric value of the task ID
81 ///
82 /// 获取任务 ID 的数值
83 #[inline]
84 pub fn as_u64(&self) -> u64 {
85 self.0
86 }
87}
88
89impl Default for TaskId {
90 #[inline]
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96/// Timer Callback Trait
97///
98/// Types implementing this trait can be used as timer callbacks.
99///
100/// 可实现此特性的类型可以作为定时器回调函数。
101///
102/// # Examples (示例)
103///
104/// ```
105/// use kestrel_timer::task::TimerCallback;
106/// use std::future::Future;
107/// use std::pin::Pin;
108///
109/// struct MyCallback;
110///
111/// impl TimerCallback for MyCallback {
112/// fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
113/// Box::pin(async {
114/// println!("Timer callback executed!");
115/// })
116/// }
117/// }
118/// ```
119pub trait TimerCallback: Send + Sync + 'static {
120 /// Execute callback, returns a Future
121 ///
122 /// 执行回调函数,返回一个 Future
123 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
124}
125
126/// Implement TimerCallback trait for closures
127///
128/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
129///
130/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
131impl<F, Fut> TimerCallback for F
132where
133 F: Fn() -> Fut + Send + Sync + 'static,
134 Fut: Future<Output = ()> + Send + 'static,
135{
136 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
137 Box::pin(self())
138 }
139}
140
141/// Callback wrapper for standardized callback creation and management
142///
143/// Callback 包装器,用于标准化回调创建和管理
144///
145/// # Examples (示例)
146///
147/// ```
148/// use kestrel_timer::CallbackWrapper;
149///
150/// let callback = CallbackWrapper::new(|| async {
151/// println!("Timer callback executed!");
152/// });
153/// ```
154#[derive(Clone)]
155pub struct CallbackWrapper {
156 callback: Arc<dyn TimerCallback>,
157}
158
159impl CallbackWrapper {
160 /// Create a new callback wrapper
161 ///
162 /// # Parameters
163 /// - `callback`: Callback object implementing TimerCallback trait
164 ///
165 /// # 创建一个新的回调包装器
166 ///
167 /// # 参数
168 /// - `callback`: 实现 TimerCallback 特性的回调对象
169 ///
170 /// # Examples (示例)
171 ///
172 /// ```
173 /// use kestrel_timer::CallbackWrapper;
174 ///
175 /// let callback = CallbackWrapper::new(|| async {
176 /// println!("Timer fired!"); // 定时器触发
177 /// });
178 /// ```
179 #[inline]
180 pub fn new(callback: impl TimerCallback) -> Self {
181 Self {
182 callback: Arc::new(callback),
183 }
184 }
185
186 /// Call the callback function
187 ///
188 /// 调用回调函数
189 #[inline]
190 pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
191 self.callback.call()
192 }
193}
194
195/// Task type enum to distinguish between one-shot and periodic timers
196///
197/// 任务类型枚举,用于区分一次性和周期性定时器
198#[derive(Clone)]
199pub enum TaskType {
200 /// One-shot timer: executes once and completes
201 ///
202 /// 一次性定时器:执行一次后完成
203 OneShot,
204
205 /// Periodic timer: repeats at fixed intervals
206 ///
207 /// 周期性定时器:按固定间隔重复执行
208 Periodic {
209 /// Period interval for periodic tasks
210 ///
211 /// 周期任务的间隔时间
212 interval: std::time::Duration,
213 /// Buffer size for periodic task completion notifier
214 ///
215 /// 周期性任务完成通知器的缓冲区大小
216 buffer_size: NonZeroUsize,
217 },
218}
219
220/// Task type enum to distinguish between one-shot and periodic timers
221///
222/// 任务类型枚举,用于区分一次性和周期性定时器
223pub enum TaskTypeWithCompletionNotifier {
224 /// One-shot timer: executes once and completes
225 ///
226 /// 一次性定时器:执行一次后完成
227 OneShot {
228 completion_notifier: Sender<TaskCompletion>,
229 },
230
231 /// Periodic timer: repeats at fixed intervals
232 ///
233 /// 周期性定时器:按固定间隔重复执行
234 Periodic {
235 /// Period interval for periodic tasks
236 ///
237 /// 周期任务的间隔时间
238 interval: std::time::Duration,
239 /// Completion notifier for periodic tasks
240 ///
241 /// 周期性任务完成通知器
242 completion_notifier: PeriodicCompletionNotifier,
243 },
244}
245
246
247
248/// Completion notifier for periodic tasks
249///
250/// Uses custom SPSC channel for high-performance, low-latency notification
251///
252/// 周期任务完成通知器
253///
254/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
255pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion, 32>);
256
257/// Completion receiver for periodic tasks
258///
259/// 周期任务完成通知接收器
260pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion, 32>);
261
262impl PeriodicCompletionReceiver {
263 /// Try to receive a completion notification
264 ///
265 /// 尝试接收完成通知
266 #[inline]
267 pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
268 self.0.try_recv()
269 }
270
271 /// Receive a completion notification
272 ///
273 /// 接收完成通知
274 #[inline]
275 pub async fn recv(&mut self) -> Option<TaskCompletion> {
276 self.0.recv().await
277 }
278}
279
280
281/// Completion notifier for one-shot tasks
282///
283/// 一次性任务完成通知器
284pub enum CompletionNotifier {
285 OneShot(Sender<TaskCompletion>),
286 Periodic(PeriodicCompletionNotifier),
287}
288
289/// Completion receiver for one-shot and periodic tasks
290///
291/// 一次性和周期任务完成通知接收器
292pub enum CompletionReceiver {
293 OneShot(Receiver<TaskCompletion>),
294 Periodic(PeriodicCompletionReceiver),
295}
296
297/// Timer Task
298///
299/// Users interact via a two-step API
300/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
301/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
302///
303/// 定时器任务
304///
305/// 用户通过两步 API 与定时器交互
306/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
307/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
308pub struct TimerTask {
309 /// Unique task identifier
310 ///
311 /// 唯一任务标识符
312 pub(crate) id: TaskId,
313
314 /// Task type (one-shot or periodic)
315 ///
316 /// 任务类型(一次性或周期性)
317 pub(crate) task_type: TaskType,
318
319 /// User-specified delay duration (initial delay for periodic tasks)
320 ///
321 /// 用户指定的延迟时间(周期任务的初始延迟)
322 pub(crate) delay: std::time::Duration,
323
324 /// Async callback function, optional
325 ///
326 /// 异步回调函数,可选
327 pub(crate) callback: Option<CallbackWrapper>,
328}
329
330impl TimerTask {
331 /// Create a new one-shot timer task
332 ///
333 /// # Parameters
334 /// - `delay`: Delay duration before task execution
335 /// - `callback`: Callback function, optional
336 ///
337 /// # Note
338 /// This is an internal method, users should use `TimerWheel::create_task()` or `TimerService::create_task()` to create tasks.
339 ///
340 /// 创建一个新的一次性定时器任务
341 ///
342 /// # 参数
343 /// - `delay`: 任务执行前的延迟时间
344 /// - `callback`: 回调函数,可选
345 ///
346 #[inline]
347 pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
348 Self {
349 id: TaskId::new(),
350 task_type: TaskType::OneShot,
351 delay,
352 callback,
353 }
354 }
355
356 /// Create a new periodic timer task
357 ///
358 /// # Parameters
359 /// - `initial_delay`: Initial delay before first execution
360 /// - `interval`: Interval between subsequent executions
361 /// - `callback`: Callback function, optional
362 ///
363 /// # Note
364 /// This is an internal method, users should use `TimerWheel::create_periodic_task()` or `TimerService::create_periodic_task()` to create tasks.
365 ///
366 /// 创建一个新的周期性定时器任务
367 ///
368 /// # 参数
369 /// - `initial_delay`: 首次执行前的初始延迟
370 /// - `interval`: 后续执行之间的间隔
371 /// - `callback`: 回调函数,可选
372 ///
373 #[inline]
374 pub fn new_periodic(
375 initial_delay: std::time::Duration,
376 interval: std::time::Duration,
377 callback: Option<CallbackWrapper>,
378 buffer_size: Option<NonZeroUsize>,
379 ) -> Self {
380 Self {
381 id: TaskId::new(),
382 task_type: TaskType::Periodic { interval, buffer_size: buffer_size.unwrap_or(NonZeroUsize::new(32).unwrap()) },
383 delay: initial_delay,
384 callback,
385 }
386 }
387
388 /// Get task ID
389 ///
390 /// 获取任务 ID
391 ///
392 /// # Examples (示例)
393 ///
394 /// ```no_run
395 /// use kestrel_timer::TimerTask;
396 /// use std::time::Duration;
397 ///
398 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
399 /// let task_id = task.get_id();
400 /// println!("Task ID: {:?}", task_id);
401 /// ```
402 #[inline]
403 pub fn get_id(&self) -> TaskId {
404 self.id
405 }
406
407 /// Get task type
408 ///
409 /// 获取任务类型
410 #[inline]
411 pub fn get_task_type(&self) -> &TaskType {
412 &self.task_type
413 }
414
415 /// Get the interval for periodic tasks
416 ///
417 /// Returns `None` for one-shot tasks
418 ///
419 /// 获取周期任务的间隔时间
420 ///
421 /// 对于一次性任务返回 `None`
422 #[inline]
423 pub fn get_interval(&self) -> Option<std::time::Duration> {
424 match self.task_type {
425 TaskType::Periodic { interval, .. } => Some(interval),
426 TaskType::OneShot { .. } => None,
427 }
428 }
429
430}
431
432/// Timer Task
433///
434/// Users interact via a two-step API
435/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
436/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
437///
438/// 定时器任务
439///
440/// 用户通过两步 API 与定时器交互
441/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
442/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
443pub struct TimerTaskWithCompletionNotifier {
444 /// Unique task identifier
445 ///
446 /// 唯一任务标识符
447 pub(crate) id: TaskId,
448
449 /// Task type (one-shot or periodic)
450 ///
451 /// 任务类型(一次性或周期性)
452 pub(crate) task_type: TaskTypeWithCompletionNotifier,
453
454 /// User-specified delay duration (initial delay for periodic tasks)
455 ///
456 /// 用户指定的延迟时间(周期任务的初始延迟)
457 pub(crate) delay: std::time::Duration,
458
459 /// Async callback function, optional
460 ///
461 /// 异步回调函数,可选
462 pub(crate) callback: Option<CallbackWrapper>,
463}
464
465impl TimerTaskWithCompletionNotifier {
466
467 /// Create a new timer task with completion notifier from a timer task
468 ///
469 /// 从定时器任务创建一个新的定时器任务完成通知器
470 ///
471 /// # Parameters
472 /// - `task`: The timer task to create from
473 /// - `buffer_size`: The buffer size for the periodic task completion notifier
474 ///
475 /// # Returns
476 /// A tuple containing the new timer task with completion notifier and the completion receiver
477 ///
478 /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
479 ///
480 pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
481 match task.task_type {
482 TaskType::OneShot { .. } => {
483 // Create oneshot notifier and receiver with optimized single Arc allocation
484 // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
485 let (notifier, receiver) = channel();
486
487 (Self {
488 id: task.id,
489 task_type: TaskTypeWithCompletionNotifier::OneShot {
490 completion_notifier: notifier
491 },
492 delay: task.delay,
493 callback: task.callback,
494 }, CompletionReceiver::OneShot(receiver))
495 },
496 TaskType::Periodic { interval, buffer_size } => {
497 // Use custom SPSC channel for high-performance periodic notification
498 // 使用自定义 SPSC 通道实现高性能周期通知
499 let (tx, rx) = spsc::channel(buffer_size);
500
501 let notifier = PeriodicCompletionNotifier(tx);
502 let receiver = PeriodicCompletionReceiver(rx);
503
504 (Self {
505 id: task.id,
506 task_type: TaskTypeWithCompletionNotifier::Periodic {
507 interval,
508 completion_notifier: notifier
509 },
510 delay: task.delay,
511 callback: task.callback,
512 }, CompletionReceiver::Periodic(receiver))
513 },
514 }
515 }
516
517 /// Get task ID
518 ///
519 /// 获取任务 ID
520 ///
521 /// # Examples (示例)
522 ///
523 /// ```no_run
524 /// use kestrel_timer::TimerTask;
525 /// use std::time::Duration;
526 ///
527 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
528 /// let task_id = task.get_id();
529 /// println!("Task ID: {:?}", task_id);
530 /// ```
531 #[inline]
532 pub fn get_id(&self) -> TaskId {
533 self.id
534 }
535
536 /// Into task type
537 ///
538 /// 将任务类型转换为完成通知器
539 #[inline]
540 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
541 self.task_type
542 }
543
544 /// Get the interval for periodic tasks
545 ///
546 /// Returns `None` for one-shot tasks
547 ///
548 /// 获取周期任务的间隔时间
549 ///
550 /// 对于一次性任务返回 `None`
551 #[inline]
552 pub fn get_interval(&self) -> Option<std::time::Duration> {
553 match self.task_type {
554 TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(interval),
555 TaskTypeWithCompletionNotifier::OneShot { .. } => None,
556 }
557 }
558}
559
560pub(crate) struct TimerTaskForWheel {
561 pub(crate) task: TimerTaskWithCompletionNotifier,
562 pub(crate) deadline_tick: u64,
563 pub(crate) rounds: u32,
564}
565
566impl TimerTaskForWheel {
567 /// Create a new timer task for wheel
568 ///
569 /// 创建一个新的定时器任务用于时间轮
570 ///
571 /// # Parameters
572 /// - `task`: The timer task to create from
573 /// - `deadline_tick`: The deadline tick for the task
574 /// - `rounds`: The rounds for the task
575 ///
576 /// # Returns
577 /// A new timer task for wheel
578 ///
579 /// 返回一个新的定时器任务用于时间轮
580 ///
581 #[inline]
582 pub(crate) fn new(task: TimerTaskWithCompletionNotifier, deadline_tick: u64, rounds: u32) -> Self {
583 Self { task, deadline_tick, rounds }
584 }
585
586 /// Get task ID
587 ///
588 /// 获取任务 ID
589 #[inline]
590 pub fn get_id(&self) -> TaskId {
591 self.task.get_id()
592 }
593
594 /// Into task type
595 ///
596 /// 将任务类型转换为完成通知器
597 #[inline]
598 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
599 self.task.into_task_type()
600 }
601
602 /// Update the delay of the timer task
603 ///
604 /// 更新定时器任务的延迟
605 ///
606 /// # Parameters
607 /// - `delay`: The new delay for the task
608 ///
609 #[inline]
610 pub fn update_delay(&mut self, delay: std::time::Duration) {
611 self.task.delay = delay
612 }
613
614 /// Update the callback of the timer task
615 ///
616 /// 更新定时器任务的回调
617 ///
618 /// # Parameters
619 /// - `callback`: The new callback for the task
620 ///
621 #[inline]
622 pub fn update_callback(&mut self, callback: CallbackWrapper) {
623 self.task.callback = Some(callback)
624 }
625}
626
627/// Task location information (including level) for hierarchical timing wheel
628///
629/// Memory layout optimization: level field placed first to reduce padding via struct alignment
630///
631/// 任务位置信息(包括层级)用于分层时间轮
632///
633/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
634#[derive(Debug, Clone, Copy)]
635pub(crate) struct TaskLocation {
636 /// Slot index
637 ///
638 /// 槽索引
639 pub slot_index: usize,
640 /// Index position of task in slot Vec for O(1) cancellation
641 ///
642 /// 槽向量中任务的索引位置,用于 O(1) 取消
643 pub vec_index: usize,
644 /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
645 /// Using u8 instead of bool to reserve space for potential multi-layer expansion
646 ///
647 /// 层级:0 = L0(底层),1 = L1(上层)
648 /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
649 pub level: u8,
650}
651
652impl TaskLocation {
653 /// Create a new task location information
654 ///
655 /// 创建一个新的任务位置信息
656 #[inline(always)]
657 pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
658 Self {
659 slot_index,
660 vec_index,
661 level,
662 }
663 }
664}
665