1use std::{error::Error, fmt, future::Future, pin::Pin, sync::Arc};
5
6use reifydb_core::interface::catalog::task::TaskId;
7
8use crate::{context::TaskContext, schedule::Schedule};
9
10type SyncTaskFn = Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>;
12
13type AsyncTaskFn = Arc<
15 dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>> + Send + Sync,
16>;
17
18#[derive(Clone)]
20pub enum TaskWork {
21 Sync(SyncTaskFn),
23 Async(AsyncTaskFn),
25}
26
27impl fmt::Debug for TaskWork {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
31 TaskWork::Async(_) => write!(f, "TaskWork::Async"),
32 }
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum TaskExecutor {
39 ComputePool,
41 Tokio,
43}
44
45pub struct ScheduledTask {
47 pub id: TaskId,
49 pub name: String,
51 pub schedule: Schedule,
53 pub work: TaskWork,
55 pub executor: TaskExecutor,
57}
58
59impl fmt::Debug for ScheduledTask {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("ScheduledTask")
62 .field("id", &self.id)
63 .field("name", &self.name)
64 .field("schedule", &self.schedule)
65 .field("work", &self.work)
66 .field("executor", &self.executor)
67 .finish()
68 }
69}
70
71impl ScheduledTask {
72 pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
74 ScheduledTaskBuilder::new(name)
75 }
76}
77
78pub struct ScheduledTaskBuilder {
80 name: String,
81 schedule: Option<Schedule>,
82 work: Option<TaskWork>,
83 executor: Option<TaskExecutor>,
84}
85
86impl ScheduledTaskBuilder {
87 pub fn new(name: impl Into<String>) -> Self {
89 Self {
90 name: name.into(),
91 schedule: None,
92 work: None,
93 executor: None,
94 }
95 }
96
97 pub fn schedule(mut self, schedule: Schedule) -> Self {
99 self.schedule = Some(schedule);
100 self
101 }
102
103 pub fn work_sync<F>(mut self, f: F) -> Self
105 where
106 F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
107 {
108 self.work = Some(TaskWork::Sync(Arc::new(f)));
109 self
110 }
111
112 pub fn work_async<F, Fut>(mut self, f: F) -> Self
114 where
115 F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
116 Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
117 {
118 self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
119 self
120 }
121
122 pub fn executor(mut self, executor: TaskExecutor) -> Self {
124 self.executor = Some(executor);
125 self
126 }
127
128 pub fn build(self) -> Result<ScheduledTask, String> {
130 let schedule = self.schedule.ok_or("schedule is required")?;
131 let work = self.work.ok_or("work is required")?;
132 let executor = self.executor.ok_or("executor is required")?;
133
134 schedule.validate()?;
136
137 Ok(ScheduledTask {
138 id: TaskId::new(),
139 name: self.name,
140 schedule,
141 work,
142 executor,
143 })
144 }
145}