1use std::{
5 error::Error,
6 fmt,
7 future::Future,
8 pin::Pin,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13};
14
15use crate::{context::TaskContext, schedule::Schedule};
16
17type SyncTaskFn = Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>;
19
20type AsyncTaskFn = Arc<
22 dyn Fn(TaskContext) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>> + Send + Sync,
23>;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct TaskId(u64);
28
29impl TaskId {
30 pub fn new() -> Self {
32 static COUNTER: AtomicU64 = AtomicU64::new(1);
33 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
34 }
35}
36
37impl Default for TaskId {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl fmt::Display for TaskId {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 write!(f, "task-{}", self.0)
46 }
47}
48
49#[derive(Clone)]
51pub enum TaskWork {
52 Sync(SyncTaskFn),
54 Async(AsyncTaskFn),
56}
57
58impl fmt::Debug for TaskWork {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 match self {
61 TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
62 TaskWork::Async(_) => write!(f, "TaskWork::Async"),
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum TaskExecutor {
70 ComputePool,
72 Tokio,
74}
75
76pub struct ScheduledTask {
78 pub id: TaskId,
80 pub name: String,
82 pub schedule: Schedule,
84 pub work: TaskWork,
86 pub executor: TaskExecutor,
88}
89
90impl fmt::Debug for ScheduledTask {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("ScheduledTask")
93 .field("id", &self.id)
94 .field("name", &self.name)
95 .field("schedule", &self.schedule)
96 .field("work", &self.work)
97 .field("executor", &self.executor)
98 .finish()
99 }
100}
101
102impl ScheduledTask {
103 pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
105 ScheduledTaskBuilder::new(name)
106 }
107}
108
109pub struct ScheduledTaskBuilder {
111 name: String,
112 schedule: Option<Schedule>,
113 work: Option<TaskWork>,
114 executor: Option<TaskExecutor>,
115}
116
117impl ScheduledTaskBuilder {
118 pub fn new(name: impl Into<String>) -> Self {
120 Self {
121 name: name.into(),
122 schedule: None,
123 work: None,
124 executor: None,
125 }
126 }
127
128 pub fn schedule(mut self, schedule: Schedule) -> Self {
130 self.schedule = Some(schedule);
131 self
132 }
133
134 pub fn work_sync<F>(mut self, f: F) -> Self
136 where
137 F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
138 {
139 self.work = Some(TaskWork::Sync(Arc::new(f)));
140 self
141 }
142
143 pub fn work_async<F, Fut>(mut self, f: F) -> Self
145 where
146 F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
147 Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
148 {
149 self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
150 self
151 }
152
153 pub fn executor(mut self, executor: TaskExecutor) -> Self {
155 self.executor = Some(executor);
156 self
157 }
158
159 pub fn build(self) -> Result<ScheduledTask, String> {
161 let schedule = self.schedule.ok_or("schedule is required")?;
162 let work = self.work.ok_or("work is required")?;
163 let executor = self.executor.ok_or("executor is required")?;
164
165 schedule.validate()?;
167
168 Ok(ScheduledTask {
169 id: TaskId::new(),
170 name: self.name,
171 schedule,
172 work,
173 executor,
174 })
175 }
176}