1use std::{
10 fmt::{self, Display, Formatter},
11 ops::Deref,
12 sync::Arc,
13 time::Duration,
14};
15
16use reifydb_engine::StandardEngine;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19pub enum Priority {
20 Low = 0,
21 Normal = 1,
22 High = 2,
23}
24
25#[repr(transparent)]
27#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
28pub struct TaskHandle(pub u64);
29
30impl Display for TaskHandle {
31 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
32 Display::fmt(&self.0, f)
33 }
34}
35
36impl Deref for TaskHandle {
37 type Target = u64;
38
39 fn deref(&self) -> &Self::Target {
40 &self.0
41 }
42}
43
44impl PartialEq<u64> for TaskHandle {
45 fn eq(&self, other: &u64) -> bool {
46 self.0.eq(other)
47 }
48}
49
50impl From<TaskHandle> for u64 {
51 fn from(value: TaskHandle) -> Self {
52 value.0
53 }
54}
55
56impl From<u64> for TaskHandle {
57 fn from(value: u64) -> Self {
58 TaskHandle(value)
59 }
60}
61
62#[derive(Clone)]
63pub struct TaskContext {
64 engine: StandardEngine,
65}
66
67impl TaskContext {
68 pub fn new(engine: StandardEngine) -> Self {
69 Self {
70 engine,
71 }
72 }
73
74 pub fn engine(&self) -> &StandardEngine {
75 &self.engine
76 }
77}
78
79pub trait SchedulableTask: Send + Sync {
80 fn execute(&self, ctx: &TaskContext) -> reifydb_core::Result<()>;
81 fn name(&self) -> &str;
82 fn priority(&self) -> Priority;
83}
84
85pub type BoxedTask = Box<dyn SchedulableTask>;
86
87pub trait OnceTask: Send + Sync {
93 fn execute_once(self: Box<Self>, ctx: &TaskContext) -> reifydb_core::Result<()>;
94 fn name(&self) -> &str;
95 fn priority(&self) -> Priority;
96}
97
98pub type BoxedOnceTask = Box<dyn OnceTask>;
99
100pub struct OnceClosureTask<F>
102where
103 F: FnOnce(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
104{
105 name: String,
106 priority: Priority,
107 task: Option<F>, }
109
110impl<F> OnceClosureTask<F>
111where
112 F: FnOnce(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
113{
114 pub fn new(name: impl Into<String>, priority: Priority, task: F) -> Self {
115 Self {
116 name: name.into(),
117 priority,
118 task: Some(task),
119 }
120 }
121}
122
123impl<F> OnceTask for OnceClosureTask<F>
124where
125 F: FnOnce(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
126{
127 fn execute_once(mut self: Box<Self>, ctx: &TaskContext) -> reifydb_core::Result<()> {
128 let task = self.task.take().expect("Task already executed");
129 task(ctx)
130 }
131
132 fn name(&self) -> &str {
133 &self.name
134 }
135
136 fn priority(&self) -> Priority {
137 self.priority
138 }
139}
140
141pub struct ClosureTask<F>
143where
144 F: Fn(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
145{
146 name: String,
147 priority: Priority,
148 task: F,
149}
150
151impl<F> ClosureTask<F>
152where
153 F: Fn(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
154{
155 pub fn new(name: impl Into<String>, priority: Priority, task: F) -> Self {
156 Self {
157 name: name.into(),
158 priority,
159 task,
160 }
161 }
162}
163
164impl<F> SchedulableTask for ClosureTask<F>
165where
166 F: Fn(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
167{
168 fn execute(&self, ctx: &TaskContext) -> reifydb_core::Result<()> {
169 (self.task)(ctx)
170 }
171
172 fn name(&self) -> &str {
173 &self.name
174 }
175
176 fn priority(&self) -> Priority {
177 self.priority
178 }
179}
180
181#[macro_export]
218macro_rules! task {
219 ($closure:expr) => {
221 Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
222 };
223
224 (Low, $closure:expr) => {
226 Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::Low, $closure))
227 };
228 (Normal, $closure:expr) => {
229 Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
230 };
231 (High, $closure:expr) => {
232 Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::High, $closure))
233 };
234
235 ($name:literal, $closure:expr) => {
237 Box::new($crate::ClosureTask::new($name, $crate::Priority::Normal, $closure))
238 };
239
240 (Low, $name:literal, $closure:expr) => {
242 Box::new($crate::ClosureTask::new($name, $crate::Priority::Low, $closure))
243 };
244 (Normal, $name:literal, $closure:expr) => {
245 Box::new($crate::ClosureTask::new($name, $crate::Priority::Normal, $closure))
246 };
247 (High, $name:literal, $closure:expr) => {
248 Box::new($crate::ClosureTask::new($name, $crate::Priority::High, $closure))
249 };
250
251 ($name:literal, Low, $closure:expr) => {
253 Box::new($crate::ClosureTask::new($name, $crate::Priority::Low, $closure))
254 };
255 ($name:literal, Normal, $closure:expr) => {
256 Box::new($crate::ClosureTask::new($name, $crate::Priority::Normal, $closure))
257 };
258 ($name:literal, High, $closure:expr) => {
259 Box::new($crate::ClosureTask::new($name, $crate::Priority::High, $closure))
260 };
261
262 ($priority:expr, $closure:expr) => {
264 Box::new($crate::ClosureTask::new("unnamed", $priority, $closure))
265 };
266
267 ($name:expr, $priority:expr, $closure:expr) => {
269 Box::new($crate::ClosureTask::new($name, $priority, $closure))
270 };
271}
272
273#[macro_export]
295macro_rules! task_once {
296 ($closure:expr) => {
298 Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
299 };
300
301 (Low, $closure:expr) => {
303 Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::Low, $closure))
304 };
305 (Normal, $closure:expr) => {
306 Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
307 };
308 (High, $closure:expr) => {
309 Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::High, $closure))
310 };
311
312 ($name:literal, $closure:expr) => {
314 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Normal, $closure))
315 };
316
317 (Low, $name:literal, $closure:expr) => {
319 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Low, $closure))
320 };
321 (Normal, $name:literal, $closure:expr) => {
322 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Normal, $closure))
323 };
324 (High, $name:literal, $closure:expr) => {
325 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::High, $closure))
326 };
327
328 ($name:literal, Low, $closure:expr) => {
330 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Low, $closure))
331 };
332 ($name:literal, Normal, $closure:expr) => {
333 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Normal, $closure))
334 };
335 ($name:literal, High, $closure:expr) => {
336 Box::new($crate::OnceClosureTask::new($name, $crate::Priority::High, $closure))
337 };
338
339 ($priority:expr, $closure:expr) => {
341 Box::new($crate::OnceClosureTask::new("unnamed", $priority, $closure))
342 };
343
344 ($name:expr, $priority:expr, $closure:expr) => {
346 Box::new($crate::OnceClosureTask::new($name, $priority, $closure))
347 };
348}
349
350pub trait Scheduler: Send + Sync {
351 fn every(&self, interval: Duration, task: BoxedTask) -> reifydb_core::Result<TaskHandle>;
358
359 fn once(&self, task: BoxedOnceTask) -> reifydb_core::Result<()>;
371
372 fn cancel(&self, handle: TaskHandle) -> reifydb_core::Result<()>;
374}
375
376#[derive(Clone)]
382pub struct SchedulerService(pub Arc<dyn Scheduler>);
383
384impl Deref for SchedulerService {
385 type Target = Arc<dyn Scheduler>;
386
387 fn deref(&self) -> &Self::Target {
388 &self.0
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::Priority::{High, Low, Normal};
396
397 #[test]
398 fn test_task_macro_minimal() {
399 let task: BoxedTask = task!(|_ctx| { Ok(()) });
401
402 assert_eq!(task.name(), "unnamed");
403 assert_eq!(task.priority(), Normal);
404 }
405
406 #[test]
407 fn test_task_macro_with_name() {
408 let task: BoxedTask = task!("test_task", |_ctx| { Ok(()) });
410
411 assert_eq!(task.name(), "test_task");
412 assert_eq!(task.priority(), Normal);
413 }
414
415 #[test]
416 fn test_task_macro_with_priority() {
417 let task: BoxedTask = task!(High, |_ctx| { Ok(()) });
419
420 assert_eq!(task.name(), "unnamed");
421 assert_eq!(task.priority(), High);
422 }
423
424 #[test]
425 fn test_task_macro_priority_name() {
426 let task: BoxedTask = task!(Low, "priority_first", |_ctx| { Ok(()) });
428
429 assert_eq!(task.name(), "priority_first");
430 assert_eq!(task.priority(), Low);
431 }
432
433 #[test]
434 fn test_task_macro_name_priority() {
435 let task: BoxedTask = task!("name_first", High, |_ctx| { Ok(()) });
437
438 assert_eq!(task.name(), "name_first");
439 assert_eq!(task.priority(), High);
440 }
441
442 #[test]
443 fn test_task_macro_with_move_closure() {
444 let captured_value = 42;
446 let task: BoxedTask = task!("move_task", move |_ctx| {
447 let _val = captured_value;
449 Ok(())
450 });
451
452 assert_eq!(task.name(), "move_task");
453 assert_eq!(task.priority(), Normal);
454 }
455
456 #[test]
457 fn test_task_macro_all_priorities() {
458 let low_task: BoxedTask = task!(Low, |_ctx| { Ok(()) });
460 let normal_task: BoxedTask = task!(Normal, |_ctx| { Ok(()) });
461 let high_task: BoxedTask = task!(High, |_ctx| { Ok(()) });
462
463 assert_eq!(low_task.priority(), Low);
464 assert_eq!(normal_task.priority(), Normal);
465 assert_eq!(high_task.priority(), High);
466 }
467}