kestrel_timer/task.rs
1use std::future::Future;
2use std::num::NonZeroU16;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7use crate::utils::oneshot::{channel, Sender, Receiver};
8use crate::utils::spsc::{self, TryRecvError};
9
10/// Global unique task ID generator
11///
12/// 全局唯一任务 ID 生成器
13static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
14
15/// One-shot task completion state constants
16///
17/// 一次性任务完成状态常量
18pub(crate) const ONESHOT_PENDING: u8 = 0;
19pub(crate) const ONESHOT_CALLED: u8 = 1;
20pub(crate) const ONESHOT_CANCELLED: u8 = 2;
21
22/// Task Completion Reason for Periodic Tasks
23///
24/// Indicates the reason for task completion, called or cancelled.
25///
26/// 任务完成原因,调用或取消。
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum TaskCompletion {
29 /// Task was called
30 ///
31 /// 任务被调用
32 Called,
33 /// Task was cancelled
34 ///
35 /// 任务被取消
36 Cancelled,
37}
38
39/// Unique identifier for timer tasks
40///
41/// 定时器任务唯一标识符
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub struct TaskId(u64);
44
45impl TaskId {
46 /// Generate a new unique task ID (internal use)
47 ///
48 /// 生成一个新的唯一任务 ID (内部使用)
49 #[inline]
50 pub(crate) fn new() -> Self {
51 TaskId(NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed))
52 }
53
54 /// Get the numeric value of the task ID
55 ///
56 /// 获取任务 ID 的数值
57 #[inline]
58 pub fn as_u64(&self) -> u64 {
59 self.0
60 }
61}
62
63impl Default for TaskId {
64 #[inline]
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70/// Timer Callback Trait
71///
72/// Types implementing this trait can be used as timer callbacks.
73///
74/// 可实现此特性的类型可以作为定时器回调函数。
75///
76/// # Examples (示例)
77///
78/// ```
79/// use kestrel_timer::task::TimerCallback;
80/// use std::future::Future;
81/// use std::pin::Pin;
82///
83/// struct MyCallback;
84///
85/// impl TimerCallback for MyCallback {
86/// fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
87/// Box::pin(async {
88/// println!("Timer callback executed!");
89/// })
90/// }
91/// }
92/// ```
93pub trait TimerCallback: Send + Sync + 'static {
94 /// Execute callback, returns a Future
95 ///
96 /// 执行回调函数,返回一个 Future
97 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
98}
99
100/// Implement TimerCallback trait for closures
101///
102/// Supports Fn() -> Future closures, can be called multiple times, suitable for periodic tasks
103///
104/// 实现 TimerCallback 特性的类型,支持 Fn() -> Future 闭包,可以多次调用,适合周期性任务
105impl<F, Fut> TimerCallback for F
106where
107 F: Fn() -> Fut + Send + Sync + 'static,
108 Fut: Future<Output = ()> + Send + 'static,
109{
110 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
111 Box::pin(self())
112 }
113}
114
115/// Callback wrapper for standardized callback creation and management
116///
117/// Callback 包装器,用于标准化回调创建和管理
118///
119/// # Examples (示例)
120///
121/// ```
122/// use kestrel_timer::CallbackWrapper;
123///
124/// let callback = CallbackWrapper::new(|| async {
125/// println!("Timer callback executed!");
126/// });
127/// ```
128#[derive(Clone)]
129pub struct CallbackWrapper {
130 callback: Arc<dyn TimerCallback>,
131}
132
133impl CallbackWrapper {
134 /// Create a new callback wrapper
135 ///
136 /// # Parameters
137 /// - `callback`: Callback object implementing TimerCallback trait
138 ///
139 /// # 创建一个新的回调包装器
140 ///
141 /// # 参数
142 /// - `callback`: 实现 TimerCallback 特性的回调对象
143 ///
144 /// # Examples (示例)
145 ///
146 /// ```
147 /// use kestrel_timer::CallbackWrapper;
148 ///
149 /// let callback = CallbackWrapper::new(|| async {
150 /// println!("Timer fired!"); // 定时器触发
151 /// });
152 /// ```
153 #[inline]
154 pub fn new(callback: impl TimerCallback) -> Self {
155 Self {
156 callback: Arc::new(callback),
157 }
158 }
159
160 /// Call the callback function
161 ///
162 /// 调用回调函数
163 #[inline]
164 pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
165 self.callback.call()
166 }
167}
168
169/// Task type enum to distinguish between one-shot and periodic timers
170///
171/// 任务类型枚举,用于区分一次性和周期性定时器
172#[derive(Clone)]
173pub enum TaskType {
174 /// One-shot timer: executes once and completes
175 ///
176 /// 一次性定时器:执行一次后完成
177 OneShot,
178
179 /// Periodic timer: repeats at fixed intervals
180 ///
181 /// 周期性定时器:按固定间隔重复执行
182 Periodic {
183 /// Period interval for periodic tasks
184 ///
185 /// 周期任务的间隔时间
186 interval: std::time::Duration,
187 /// Buffer size for periodic task completion notifier
188 ///
189 /// 周期性任务完成通知器的缓冲区大小
190 buffer_size: NonZeroU16,
191 },
192}
193
194/// Task type enum to distinguish between one-shot and periodic timers
195///
196/// 任务类型枚举,用于区分一次性和周期性定时器
197pub enum TaskTypeWithCompletionNotifier {
198 /// One-shot timer: executes once and completes
199 ///
200 /// 一次性定时器:执行一次后完成
201 OneShot {
202 completion_notifier: Sender,
203 },
204
205 /// Periodic timer: repeats at fixed intervals
206 ///
207 /// 周期性定时器:按固定间隔重复执行
208 Periodic {
209 /// Period interval for periodic tasks
210 ///
211 /// 周期任务的间隔时间
212 interval: std::time::Duration,
213 /// Completion notifier for periodic tasks
214 ///
215 /// 周期性任务完成通知器
216 completion_notifier: PeriodicCompletionNotifier,
217 },
218}
219
220
221
222/// Completion notifier for periodic tasks
223///
224/// Uses custom SPSC channel for high-performance, low-latency notification
225///
226/// 周期任务完成通知器
227///
228/// 使用自定义 SPSC 通道实现高性能、低延迟的通知
229pub struct PeriodicCompletionNotifier(pub spsc::Sender<TaskCompletion>);
230
231/// Completion receiver for periodic tasks
232///
233/// 周期任务完成通知接收器
234pub struct PeriodicCompletionReceiver(pub spsc::Receiver<TaskCompletion>);
235
236impl PeriodicCompletionReceiver {
237 /// Try to receive a completion notification
238 ///
239 /// 尝试接收完成通知
240 #[inline]
241 pub fn try_recv(&mut self) -> Result<TaskCompletion, TryRecvError> {
242 self.0.try_recv()
243 }
244
245 /// Receive a completion notification
246 ///
247 /// 接收完成通知
248 #[inline]
249 pub async fn recv(&mut self) -> Option<TaskCompletion> {
250 self.0.recv().await
251 }
252}
253
254
255/// Completion notifier for one-shot tasks
256///
257/// 一次性任务完成通知器
258pub enum CompletionNotifier {
259 OneShot(Sender),
260 Periodic(PeriodicCompletionNotifier),
261}
262
263/// Completion receiver for one-shot and periodic tasks
264///
265/// 一次性和周期任务完成通知接收器
266pub enum CompletionReceiver {
267 OneShot(Receiver),
268 Periodic(PeriodicCompletionReceiver),
269}
270
271/// Timer Task
272///
273/// Users interact via a two-step API
274/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
275/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
276///
277/// 定时器任务
278///
279/// 用户通过两步 API 与定时器交互
280/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
281/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
282pub struct TimerTask {
283 /// Unique task identifier
284 ///
285 /// 唯一任务标识符
286 pub(crate) id: TaskId,
287
288 /// Task type (one-shot or periodic)
289 ///
290 /// 任务类型(一次性或周期性)
291 pub(crate) task_type: TaskType,
292
293 /// User-specified delay duration (initial delay for periodic tasks)
294 ///
295 /// 用户指定的延迟时间(周期任务的初始延迟)
296 pub(crate) delay: std::time::Duration,
297
298 /// Async callback function, optional
299 ///
300 /// 异步回调函数,可选
301 pub(crate) callback: Option<CallbackWrapper>,
302}
303
304impl TimerTask {
305 /// Create a new one-shot timer task
306 ///
307 /// # Parameters
308 /// - `delay`: Delay duration before task execution
309 /// - `callback`: Callback function, optional
310 ///
311 /// # Note
312 /// This is an internal method, users should use `TimerWheel::create_task()` or `TimerService::create_task()` to create tasks.
313 ///
314 /// 创建一个新的一次性定时器任务
315 ///
316 /// # 参数
317 /// - `delay`: 任务执行前的延迟时间
318 /// - `callback`: 回调函数,可选
319 ///
320 #[inline]
321 pub fn new_oneshot(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
322 Self {
323 id: TaskId::new(),
324 task_type: TaskType::OneShot,
325 delay,
326 callback,
327 }
328 }
329
330 /// Create a new periodic timer task
331 ///
332 /// # Parameters
333 /// - `initial_delay`: Initial delay before first execution
334 /// - `interval`: Interval between subsequent executions
335 /// - `callback`: Callback function, optional
336 ///
337 /// # Note
338 /// This is an internal method, users should use `TimerWheel::create_periodic_task()` or `TimerService::create_periodic_task()` to create tasks.
339 ///
340 /// 创建一个新的周期性定时器任务
341 ///
342 /// # 参数
343 /// - `initial_delay`: 首次执行前的初始延迟
344 /// - `interval`: 后续执行之间的间隔
345 /// - `callback`: 回调函数,可选
346 ///
347 #[inline]
348 pub fn new_periodic(
349 initial_delay: std::time::Duration,
350 interval: std::time::Duration,
351 callback: Option<CallbackWrapper>,
352 buffer_size: Option<NonZeroU16>,
353 ) -> Self {
354 Self {
355 id: TaskId::new(),
356 task_type: TaskType::Periodic { interval, buffer_size: buffer_size.unwrap_or(NonZeroU16::new(32).unwrap()) },
357 delay: initial_delay,
358 callback,
359 }
360 }
361
362 /// Get task ID
363 ///
364 /// 获取任务 ID
365 ///
366 /// # Examples (示例)
367 ///
368 /// ```no_run
369 /// use kestrel_timer::TimerTask;
370 /// use std::time::Duration;
371 ///
372 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
373 /// let task_id = task.get_id();
374 /// println!("Task ID: {:?}", task_id);
375 /// ```
376 #[inline]
377 pub fn get_id(&self) -> TaskId {
378 self.id
379 }
380
381 /// Get task type
382 ///
383 /// 获取任务类型
384 #[inline]
385 pub fn get_task_type(&self) -> &TaskType {
386 &self.task_type
387 }
388
389 /// Get the interval for periodic tasks
390 ///
391 /// Returns `None` for one-shot tasks
392 ///
393 /// 获取周期任务的间隔时间
394 ///
395 /// 对于一次性任务返回 `None`
396 #[inline]
397 pub fn get_interval(&self) -> Option<std::time::Duration> {
398 match self.task_type {
399 TaskType::Periodic { interval, .. } => Some(interval),
400 TaskType::OneShot { .. } => None,
401 }
402 }
403
404}
405
406/// Timer Task
407///
408/// Users interact via a two-step API
409/// 1. Create task using `TimerTask::new_oneshot()` or `TimerTask::new_periodic()`
410/// 2. Register task using `TimerWheel::register()` or `TimerService::register()`
411///
412/// 定时器任务
413///
414/// 用户通过两步 API 与定时器交互
415/// 1. 使用 `TimerTask::new_oneshot()` 或 `TimerTask::new_periodic()` 创建任务
416/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
417pub struct TimerTaskWithCompletionNotifier {
418 /// Unique task identifier
419 ///
420 /// 唯一任务标识符
421 pub(crate) id: TaskId,
422
423 /// Task type (one-shot or periodic)
424 ///
425 /// 任务类型(一次性或周期性)
426 pub(crate) task_type: TaskTypeWithCompletionNotifier,
427
428 /// User-specified delay duration (initial delay for periodic tasks)
429 ///
430 /// 用户指定的延迟时间(周期任务的初始延迟)
431 pub(crate) delay: std::time::Duration,
432
433 /// Async callback function, optional
434 ///
435 /// 异步回调函数,可选
436 pub(crate) callback: Option<CallbackWrapper>,
437}
438
439impl TimerTaskWithCompletionNotifier {
440
441 /// Create a new timer task with completion notifier from a timer task
442 ///
443 /// 从定时器任务创建一个新的定时器任务完成通知器
444 ///
445 /// # Parameters
446 /// - `task`: The timer task to create from
447 /// - `buffer_size`: The buffer size for the periodic task completion notifier
448 ///
449 /// # Returns
450 /// A tuple containing the new timer task with completion notifier and the completion receiver
451 ///
452 /// 返回一个包含新的定时器任务完成通知器和完成通知接收器的元组
453 ///
454 pub fn from_timer_task(task: TimerTask) -> (Self, CompletionReceiver) {
455 match task.task_type {
456 TaskType::OneShot { .. } => {
457 // Create oneshot notifier and receiver with optimized single Arc allocation
458 // 创建 oneshot 通知器和接收器,使用优化的单个 Arc 分配
459 let (notifier, receiver) = channel();
460
461 (Self {
462 id: task.id,
463 task_type: TaskTypeWithCompletionNotifier::OneShot {
464 completion_notifier: notifier
465 },
466 delay: task.delay,
467 callback: task.callback,
468 }, CompletionReceiver::OneShot(receiver))
469 },
470 TaskType::Periodic { interval, buffer_size } => {
471 // Use custom SPSC channel for high-performance periodic notification
472 // 使用自定义 SPSC 通道实现高性能周期通知
473 let (tx, rx) = spsc::channel(buffer_size.get() as usize);
474
475 let notifier = PeriodicCompletionNotifier(tx);
476 let receiver = PeriodicCompletionReceiver(rx);
477
478 (Self {
479 id: task.id,
480 task_type: TaskTypeWithCompletionNotifier::Periodic {
481 interval,
482 completion_notifier: notifier
483 },
484 delay: task.delay,
485 callback: task.callback,
486 }, CompletionReceiver::Periodic(receiver))
487 },
488 }
489 }
490
491 /// Get task ID
492 ///
493 /// 获取任务 ID
494 ///
495 /// # Examples (示例)
496 ///
497 /// ```no_run
498 /// use kestrel_timer::TimerTask;
499 /// use std::time::Duration;
500 ///
501 /// let task = TimerTask::new_oneshot(Duration::from_secs(1), None);
502 /// let task_id = task.get_id();
503 /// println!("Task ID: {:?}", task_id);
504 /// ```
505 #[inline]
506 pub fn get_id(&self) -> TaskId {
507 self.id
508 }
509
510 /// Into task type
511 ///
512 /// 将任务类型转换为完成通知器
513 #[inline]
514 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
515 self.task_type
516 }
517
518 /// Get the interval for periodic tasks
519 ///
520 /// Returns `None` for one-shot tasks
521 ///
522 /// 获取周期任务的间隔时间
523 ///
524 /// 对于一次性任务返回 `None`
525 #[inline]
526 pub fn get_interval(&self) -> Option<std::time::Duration> {
527 match self.task_type {
528 TaskTypeWithCompletionNotifier::Periodic { interval, .. } => Some(interval),
529 TaskTypeWithCompletionNotifier::OneShot { .. } => None,
530 }
531 }
532}
533
534pub(crate) struct TimerTaskForWheel {
535 pub(crate) task: TimerTaskWithCompletionNotifier,
536 pub(crate) deadline_tick: u64,
537 pub(crate) rounds: u32,
538}
539
540impl TimerTaskForWheel {
541 /// Create a new timer task for wheel
542 ///
543 /// 创建一个新的定时器任务用于时间轮
544 ///
545 /// # Parameters
546 /// - `task`: The timer task to create from
547 /// - `deadline_tick`: The deadline tick for the task
548 /// - `rounds`: The rounds for the task
549 ///
550 /// # Returns
551 /// A new timer task for wheel
552 ///
553 /// 返回一个新的定时器任务用于时间轮
554 ///
555 #[inline]
556 pub(crate) fn new(task: TimerTaskWithCompletionNotifier, deadline_tick: u64, rounds: u32) -> Self {
557 Self { task, deadline_tick, rounds }
558 }
559
560 /// Get task ID
561 ///
562 /// 获取任务 ID
563 #[inline]
564 pub fn get_id(&self) -> TaskId {
565 self.task.get_id()
566 }
567
568 /// Into task type
569 ///
570 /// 将任务类型转换为完成通知器
571 #[inline]
572 pub fn into_task_type(self) -> TaskTypeWithCompletionNotifier {
573 self.task.into_task_type()
574 }
575
576 /// Update the delay of the timer task
577 ///
578 /// 更新定时器任务的延迟
579 ///
580 /// # Parameters
581 /// - `delay`: The new delay for the task
582 ///
583 #[inline]
584 pub fn update_delay(&mut self, delay: std::time::Duration) {
585 self.task.delay = delay
586 }
587
588 /// Update the callback of the timer task
589 ///
590 /// 更新定时器任务的回调
591 ///
592 /// # Parameters
593 /// - `callback`: The new callback for the task
594 ///
595 #[inline]
596 pub fn update_callback(&mut self, callback: CallbackWrapper) {
597 self.task.callback = Some(callback)
598 }
599}
600
601/// Task location information (including level) for hierarchical timing wheel
602///
603/// Memory layout optimization: level field placed first to reduce padding via struct alignment
604///
605/// 任务位置信息(包括层级)用于分层时间轮
606///
607/// 内存布局优化:将 level 字段放在第一位,通过结构体对齐来减少填充
608#[derive(Debug, Clone, Copy)]
609pub(crate) struct TaskLocation {
610 /// Slot index
611 ///
612 /// 槽索引
613 pub slot_index: usize,
614 /// Index position of task in slot Vec for O(1) cancellation
615 ///
616 /// 槽向量中任务的索引位置,用于 O(1) 取消
617 pub vec_index: usize,
618 /// Level: 0 = L0 (bottom layer), 1 = L1 (upper layer)
619 /// Using u8 instead of bool to reserve space for potential multi-layer expansion
620 ///
621 /// 层级:0 = L0(底层),1 = L1(上层)
622 /// 使用 u8 而不是 bool 来保留空间,用于潜在的多层扩展
623 pub level: u8,
624}
625
626impl TaskLocation {
627 /// Create a new task location information
628 ///
629 /// 创建一个新的任务位置信息
630 #[inline(always)]
631 pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
632 Self {
633 slot_index,
634 vec_index,
635 level,
636 }
637 }
638}
639