1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{MissedRunPolicy, OverlapPolicy, Schedule};
3use chrono::{DateTime, Utc};
4use chrono_tz::Tz;
5use std::any::type_name;
6use std::future::{Future, ready};
7use std::panic::resume_unwind;
8use std::pin::Pin;
9use std::sync::Arc;
10
11pub type JobResult = Result<(), String>;
13pub type JobFuture = Pin<Box<dyn Future<Output = JobResult> + Send>>;
15pub(crate) type TaskHandler<D> = Arc<dyn Fn(TaskContext<D>) -> JobFuture + Send + Sync>;
16
17#[derive(Clone)]
18pub struct Task<D> {
19 pub(crate) handler: TaskHandler<D>,
20}
21
22impl<D> Task<D>
23where
24 D: Send + Sync + 'static,
25{
26 fn from_handler(handler: TaskHandler<D>) -> Self {
27 Self { handler }
28 }
29
30 pub fn from_async<F, Fut>(task: F) -> Self
32 where
33 F: Fn(TaskContext<D>) -> Fut + Send + Sync + 'static,
34 Fut: Future<Output = JobResult> + Send + 'static,
35 {
36 Self::from_handler(wrap_async_handler(Arc::new(task)))
37 }
38
39 pub fn from_sync<F>(task: F) -> Self
41 where
42 F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
43 {
44 Self::from_handler(wrap_sync_handler(Arc::new(task)))
45 }
46
47 pub fn from_blocking<F>(task: F) -> Self
49 where
50 F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
51 {
52 Self::from_handler(wrap_blocking_handler(Arc::new(task)))
53 }
54}
55
56#[derive(Clone)]
57pub struct Job<D = ()> {
58 pub job_id: String,
59 pub execution_resource_id: String,
60 pub guard_scope: ExecutionGuardScope,
61 pub schedule: Schedule,
62 pub max_runs: Option<u32>,
63 pub missed_run_policy: MissedRunPolicy,
64 pub overlap_policy: OverlapPolicy,
65 pub(crate) task: TaskHandler<D>,
66 pub(crate) deps: Arc<D>,
67}
68
69impl Job<()> {
70 pub fn without_deps(job_id: impl Into<String>, schedule: Schedule, task: Task<()>) -> Self {
72 Self::from_parts(job_id.into(), schedule, Arc::new(()), task)
73 }
74}
75
76impl<D> Job<D>
77where
78 D: Send + Sync + 'static,
79{
80 pub fn new(
82 job_id: impl Into<String>,
83 schedule: Schedule,
84 deps: impl Into<Arc<D>>,
85 task: Task<D>,
86 ) -> Self {
87 Self::from_parts(job_id.into(), schedule, deps.into(), task)
88 }
89}
90
91impl<D> Job<D> {
92 fn default_policies() -> (MissedRunPolicy, OverlapPolicy) {
93 (MissedRunPolicy::CatchUpOnce, OverlapPolicy::Forbid)
94 }
95
96 fn from_parts(job_id: String, schedule: Schedule, deps: Arc<D>, task: Task<D>) -> Self {
97 let (missed_run_policy, overlap_policy) = Self::default_policies();
98 Self {
99 execution_resource_id: job_id.clone(),
100 job_id,
101 guard_scope: ExecutionGuardScope::Occurrence,
102 schedule,
103 max_runs: None,
104 missed_run_policy,
105 overlap_policy,
106 task: task.handler,
107 deps,
108 }
109 }
110
111 pub fn with_max_runs(mut self, max_runs: u32) -> Self {
117 self.max_runs = Some(max_runs);
118 self
119 }
120
121 pub fn with_missed_run_policy(mut self, policy: MissedRunPolicy) -> Self {
122 self.missed_run_policy = policy;
123 self
124 }
125
126 pub fn with_overlap_policy(mut self, policy: OverlapPolicy) -> Self {
127 self.overlap_policy = policy;
128 self
129 }
130
131 pub fn with_execution_resource_id(mut self, resource_id: impl Into<String>) -> Self {
132 self.execution_resource_id = resource_id.into();
133 self
134 }
135
136 pub fn with_guard_scope(mut self, scope: ExecutionGuardScope) -> Self {
137 self.guard_scope = scope;
138 self
139 }
140}
141
142impl<D> std::fmt::Debug for Job<D> {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("Job")
145 .field("job_id", &self.job_id)
146 .field("execution_resource_id", &self.execution_resource_id)
147 .field("guard_scope", &self.guard_scope)
148 .field("schedule", &self.schedule)
149 .field("max_runs", &self.max_runs)
150 .field("missed_run_policy", &self.missed_run_policy)
151 .field("overlap_policy", &self.overlap_policy)
152 .field("deps", &type_name::<D>())
153 .finish_non_exhaustive()
154 }
155}
156
157fn wrap_async_handler<D, F, Fut>(task: Arc<F>) -> TaskHandler<D>
158where
159 D: Send + Sync + 'static,
160 F: Fn(TaskContext<D>) -> Fut + Send + Sync + 'static,
161 Fut: Future<Output = JobResult> + Send + 'static,
162{
163 Arc::new(move |context| Box::pin((*task)(context)))
164}
165
166fn wrap_sync_handler<D, F>(task: Arc<F>) -> TaskHandler<D>
167where
168 D: Send + Sync + 'static,
169 F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
170{
171 Arc::new(move |context| Box::pin(ready((*task)(context))))
172}
173
174fn wrap_blocking_handler<D, F>(task: Arc<F>) -> TaskHandler<D>
175where
176 D: Send + Sync + 'static,
177 F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
178{
179 Arc::new(move |context| {
180 let task = task.clone();
181 Box::pin(async move { await_blocking(move || (*task)(context)).await })
182 })
183}
184
185#[derive(Debug, Clone)]
186pub struct RunContext {
187 pub job_id: String,
188 pub scheduled_at: DateTime<Utc>,
189 pub catch_up: bool,
190 pub timezone: Tz,
192}
193
194#[derive(Clone)]
195pub struct TaskContext<D> {
196 pub run: RunContext,
197 pub deps: Arc<D>,
198}
199
200impl<D> std::fmt::Debug for TaskContext<D> {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.debug_struct("TaskContext")
203 .field("run", &self.run)
204 .field("deps", &type_name::<D>())
205 .finish()
206 }
207}
208
209async fn await_blocking<F>(task: F) -> JobResult
210where
211 F: FnOnce() -> JobResult + Send + 'static,
212{
213 match tokio::task::spawn_blocking(task).await {
214 Ok(result) => result,
215 Err(error) if error.is_panic() => resume_unwind(error.into_panic()),
216 Err(error) => panic!("blocking task failed to join: {error}"),
217 }
218}