1use std::{
2 error::Error,
3 fmt,
4 future::Future,
5 pin::Pin,
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10};
11
12use crate::{context::TaskContext, schedule::Schedule};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct TaskId(u64);
17
18impl TaskId {
19 pub fn new() -> Self {
21 static COUNTER: AtomicU64 = AtomicU64::new(1);
22 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
23 }
24}
25
26impl Default for TaskId {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl fmt::Display for TaskId {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 write!(f, "task-{}", self.0)
35 }
36}
37
38#[derive(Clone)]
40pub enum TaskWork {
41 Sync(Arc<dyn Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync>),
43 Async(
45 Arc<
46 dyn Fn(
47 TaskContext,
48 ) -> Pin<Box<dyn Future<Output = Result<(), Box<dyn Error + Send>>> + Send>>
49 + Send
50 + Sync,
51 >,
52 ),
53}
54
55impl fmt::Debug for TaskWork {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 match self {
58 TaskWork::Sync(_) => write!(f, "TaskWork::Sync"),
59 TaskWork::Async(_) => write!(f, "TaskWork::Async"),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum TaskExecutor {
67 ComputePool,
69 Tokio,
71}
72
73pub struct ScheduledTask {
75 pub id: TaskId,
77 pub name: String,
79 pub schedule: Schedule,
81 pub work: TaskWork,
83 pub executor: TaskExecutor,
85}
86
87impl fmt::Debug for ScheduledTask {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 f.debug_struct("ScheduledTask")
90 .field("id", &self.id)
91 .field("name", &self.name)
92 .field("schedule", &self.schedule)
93 .field("work", &self.work)
94 .field("executor", &self.executor)
95 .finish()
96 }
97}
98
99impl ScheduledTask {
100 pub fn builder(name: impl Into<String>) -> ScheduledTaskBuilder {
102 ScheduledTaskBuilder::new(name)
103 }
104}
105
106pub struct ScheduledTaskBuilder {
108 name: String,
109 schedule: Option<Schedule>,
110 work: Option<TaskWork>,
111 executor: Option<TaskExecutor>,
112}
113
114impl ScheduledTaskBuilder {
115 pub fn new(name: impl Into<String>) -> Self {
117 Self {
118 name: name.into(),
119 schedule: None,
120 work: None,
121 executor: None,
122 }
123 }
124
125 pub fn schedule(mut self, schedule: Schedule) -> Self {
127 self.schedule = Some(schedule);
128 self
129 }
130
131 pub fn work_sync<F>(mut self, f: F) -> Self
133 where
134 F: Fn(TaskContext) -> Result<(), Box<dyn Error + Send>> + Send + Sync + 'static,
135 {
136 self.work = Some(TaskWork::Sync(Arc::new(f)));
137 self
138 }
139
140 pub fn work_async<F, Fut>(mut self, f: F) -> Self
142 where
143 F: Fn(TaskContext) -> Fut + Send + Sync + 'static,
144 Fut: Future<Output = Result<(), Box<dyn Error + Send>>> + Send + 'static,
145 {
146 self.work = Some(TaskWork::Async(Arc::new(move |ctx| Box::pin(f(ctx)))));
147 self
148 }
149
150 pub fn executor(mut self, executor: TaskExecutor) -> Self {
152 self.executor = Some(executor);
153 self
154 }
155
156 pub fn build(self) -> Result<ScheduledTask, String> {
158 let schedule = self.schedule.ok_or("schedule is required")?;
159 let work = self.work.ok_or("work is required")?;
160 let executor = self.executor.ok_or("executor is required")?;
161
162 schedule.validate()?;
164
165 Ok(ScheduledTask {
166 id: TaskId::new(),
167 name: self.name,
168 schedule,
169 work,
170 executor,
171 })
172 }
173}