1use std::{error::Error, fmt, future::Future, pin::Pin, sync::Arc};
5
6use reifydb_core::interface::catalog::task::TaskId;
7
8use crate::{context::TaskContext, schedule::Schedule};
9
10type SyncTaskFn = Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>;
11
12type AsyncTaskFn = Arc<
13 dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>> + Send + Sync,
14>;
15
16#[derive(Clone)]
17pub enum TaskWork {
18 Sync(SyncTaskFn),
19
20 Async(AsyncTaskFn),
21}
22
23impl fmt::Debug for TaskWork {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 match self {
26 TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
27 TaskWork::Async(_) => write!(f, "TaskWork::Async"),
28 }
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum TaskExecutor {
34 ComputePool,
35
36 Tokio,
37}
38
39pub struct ScheduledTask {
40 pub id: TaskId,
41
42 pub name: String,
43
44 pub schedule: Schedule,
45
46 pub work: TaskWork,
47
48 pub executor: TaskExecutor,
49}
50
51impl fmt::Debug for ScheduledTask {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_struct("ScheduledTask")
54 .field("id", &self.id)
55 .field("name", &self.name)
56 .field("schedule", &self.schedule)
57 .field("work", &self.work)
58 .field("executor", &self.executor)
59 .finish()
60 }
61}
62
63impl ScheduledTask {
64 pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
65 ScheduledTaskBuilder::new(name)
66 }
67}
68
69pub struct ScheduledTaskBuilder {
70 name: String,
71 schedule: Option<Schedule>,
72 work: Option<TaskWork>,
73 executor: Option<TaskExecutor>,
74}
75
76impl ScheduledTaskBuilder {
77 pub fn new(name: impl Into<String>) -> Self {
78 Self {
79 name: name.into(),
80 schedule: None,
81 work: None,
82 executor: None,
83 }
84 }
85
86 pub fn schedule(mut self, schedule: Schedule) -> Self {
87 self.schedule = Some(schedule);
88 self
89 }
90
91 pub fn work_sync<F>(mut self, f: F) -> Self
92 where
93 F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
94 {
95 self.work = Some(TaskWork::Sync(Arc::new(f)));
96 self
97 }
98
99 pub fn work_async<F, Fut>(mut self, f: F) -> Self
100 where
101 F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
102 Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
103 {
104 self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
105 self
106 }
107
108 pub fn executor(mut self, executor: TaskExecutor) -> Self {
109 self.executor = Some(executor);
110 self
111 }
112
113 pub fn build(self) -> Result<ScheduledTask, String> {
114 let schedule = self.schedule.ok_or("schedule is required")?;
115 let work = self.work.ok_or("work is required")?;
116 let executor = self.executor.ok_or("executor is required")?;
117
118 schedule.validate()?;
119
120 Ok(ScheduledTask {
121 id: TaskId::new(),
122 name: self.name,
123 schedule,
124 work,
125 executor,
126 })
127 }
128}