kestrel_protocol_timer/task.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use tokio::sync::oneshot;
6
7/// 全局唯一的任务 ID 生成器
8static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
9
10/// 任务完成原因
11///
12/// 表示定时器任务完成的原因,可以是正常到期或被取消。
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum TaskCompletionReason {
15 /// 任务正常到期
16 Expired,
17 /// 任务被取消
18 Cancelled,
19}
20
21/// 定时器任务的唯一标识符
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub struct TaskId(u64);
24
25impl TaskId {
26 /// 生成一个新的唯一任务 ID(内部使用)
27 #[inline]
28 pub(crate) fn new() -> Self {
29 TaskId(NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed))
30 }
31
32 /// 获取任务 ID 的数值
33 #[inline]
34 pub fn as_u64(&self) -> u64 {
35 self.0
36 }
37}
38
39impl Default for TaskId {
40 #[inline]
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46/// 定时器回调 trait
47///
48/// 实现此 trait 的类型可以作为定时器的回调函数使用。
49///
50/// # 示例
51///
52/// ```
53/// use kestrel_protocol_timer::TimerCallback;
54/// use std::future::Future;
55/// use std::pin::Pin;
56///
57/// struct MyCallback;
58///
59/// impl TimerCallback for MyCallback {
60/// fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
61/// Box::pin(async {
62/// println!("Timer callback executed!");
63/// })
64/// }
65/// }
66/// ```
67pub trait TimerCallback: Send + Sync + 'static {
68 /// 执行回调,返回一个 Future
69 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
70}
71
72/// 为闭包实现 TimerCallback trait
73/// 支持 Fn() -> Future 类型的闭包(可以多次调用,适合周期性任务)
74impl<F, Fut> TimerCallback for F
75where
76 F: Fn() -> Fut + Send + Sync + 'static,
77 Fut: Future<Output = ()> + Send + 'static,
78{
79 fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
80 Box::pin(self())
81 }
82}
83
84/// 回调包装器,用于规范化创建和管理回调
85///
86/// # 示例
87///
88/// ```
89/// use kestrel_protocol_timer::CallbackWrapper;
90///
91/// let callback = CallbackWrapper::new(|| async {
92/// println!("Timer callback executed!");
93/// });
94/// ```
95#[derive(Clone)]
96pub struct CallbackWrapper {
97 callback: Arc<dyn TimerCallback>,
98}
99
100impl CallbackWrapper {
101 /// 创建新的回调包装器
102 ///
103 /// # 参数
104 /// - `callback`: 实现了 TimerCallback trait 的回调对象
105 ///
106 /// # 示例
107 ///
108 /// ```
109 /// use kestrel_protocol_timer::CallbackWrapper;
110 ///
111 /// let callback = CallbackWrapper::new(|| async {
112 /// println!("Timer fired!");
113 /// });
114 /// ```
115 #[inline]
116 pub fn new(callback: impl TimerCallback) -> Self {
117 Self {
118 callback: Arc::new(callback),
119 }
120 }
121
122 /// 调用回调函数
123 #[inline]
124 pub(crate) fn call(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
125 self.callback.call()
126 }
127}
128
129/// 完成通知器,用于在任务完成时发送通知
130pub struct CompletionNotifier(pub oneshot::Sender<TaskCompletionReason>);
131
132/// 定时器任务
133///
134/// 用户通过两步式 API 使用:
135/// 1. 使用 `TimerTask::new()` 创建任务
136/// 2. 使用 `TimerWheel::register()` 或 `TimerService::register()` 注册任务
137pub struct TimerTask {
138 /// 任务唯一标识符
139 pub(crate) id: TaskId,
140
141 /// 用户指定的延迟时间
142 pub(crate) delay: std::time::Duration,
143
144 /// 到期时间(相对于时间轮的 tick 数)
145 pub(crate) deadline_tick: u64,
146
147 /// 轮次计数(用于超出时间轮范围的任务)
148 pub(crate) rounds: u32,
149
150 /// 异步回调函数(可选)
151 pub(crate) callback: Option<CallbackWrapper>,
152
153 /// 完成通知器(用于在任务完成时发送通知,注册时创建)
154 pub(crate) completion_notifier: Option<CompletionNotifier>,
155}
156
157impl TimerTask {
158 /// 创建新的定时器任务(内部使用)
159 ///
160 /// # 参数
161 /// - `delay`: 延迟时间
162 /// - `callback`: 回调函数(可选)
163 ///
164 /// # 注意
165 /// 这是内部方法,用户应该使用 `TimerWheel::create_task()` 或 `TimerService::create_task()` 创建任务。
166 #[inline]
167 pub(crate) fn new(delay: std::time::Duration, callback: Option<CallbackWrapper>) -> Self {
168 Self {
169 id: TaskId::new(),
170 delay,
171 deadline_tick: 0,
172 rounds: 0,
173 callback,
174 completion_notifier: None,
175 }
176 }
177
178 /// 获取任务 ID
179 ///
180 /// # 示例
181 /// ```no_run
182 /// use kestrel_protocol_timer::TimerWheel;
183 /// use std::time::Duration;
184 ///
185 /// let task = TimerWheel::create_task(Duration::from_secs(1), None);
186 /// let task_id = task.get_id();
187 /// println!("Task ID: {:?}", task_id);
188 /// ```
189 pub fn get_id(&self) -> TaskId {
190 self.id
191 }
192
193 /// 内部方法:准备注册(在注册时由时间轮调用)
194 ///
195 /// 注意:此方法已内联到 insert/insert_batch 中以提升性能,
196 /// 但保留此方法以供未来可能的其他用途
197 #[allow(dead_code)]
198 pub(crate) fn prepare_for_registration(
199 &mut self,
200 completion_notifier: CompletionNotifier,
201 deadline_tick: u64,
202 rounds: u32,
203 ) {
204 self.completion_notifier = Some(completion_notifier);
205 self.deadline_tick = deadline_tick;
206 self.rounds = rounds;
207 }
208
209 /// 获取回调函数的克隆(如果存在)
210 #[inline]
211 pub(crate) fn get_callback(&self) -> Option<CallbackWrapper> {
212 self.callback.clone()
213 }
214}
215
216/// 任务位置信息(包含层级),用于分层时间轮
217///
218/// 优化内存布局:将 level 字段放在最前,利用结构体对齐减少填充
219#[derive(Debug, Clone, Copy)]
220pub(crate) struct TaskLocation {
221 /// 槽位索引
222 pub slot_index: usize,
223 /// 任务在槽位 Vec 中的索引位置(用于 O(1) 取消)
224 pub vec_index: usize,
225 /// 层级:0 = L0(底层),1 = L1(高层)
226 /// 使用 u8 而非 bool,为未来可能的多层扩展预留空间
227 pub level: u8,
228}
229
230impl TaskLocation {
231 #[inline(always)]
232 pub fn new(level: u8, slot_index: usize, vec_index: usize) -> Self {
233 Self {
234 slot_index,
235 vec_index,
236 level,
237 }
238 }
239}
240