Skip to main content

scheduler/model/
task.rs

1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{JobTimeWindow, MissedRunPolicy, OverlapPolicy, RunSkipReason, Schedule};
3use chrono::{DateTime, Utc};
4use chrono_tz::Tz;
5use std::any::type_name;
6use std::future::{Future, ready};
7use std::panic::resume_unwind;
8use std::pin::Pin;
9use std::sync::Arc;
10
11/// The task return type used by scheduled jobs.
12pub type JobResult = Result<(), String>;
13/// The boxed future returned by a scheduled job.
14pub type JobFuture = Pin<Box<dyn Future<Output = JobResult> + Send>>;
15pub(crate) type TaskHandler<D> = Arc<dyn Fn(TaskContext<D>) -> JobFuture + Send + Sync>;
16
17#[derive(Clone)]
18pub struct Task<D> {
19    pub(crate) handler: TaskHandler<D>,
20}
21
22impl<D> Task<D>
23where
24    D: Send + Sync + 'static,
25{
26    fn from_handler(handler: TaskHandler<D>) -> Self {
27        Self { handler }
28    }
29
30    /// Create an async task from the full [`TaskContext`].
31    pub fn from_async<F, Fut>(task: F) -> Self
32    where
33        F: Fn(TaskContext<D>) -> Fut + Send + Sync + 'static,
34        Fut: Future<Output = JobResult> + Send + 'static,
35    {
36        Self::from_handler(wrap_async_handler(Arc::new(task)))
37    }
38
39    /// Create a lightweight synchronous task from the full [`TaskContext`].
40    pub fn from_sync<F>(task: F) -> Self
41    where
42        F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
43    {
44        Self::from_handler(wrap_sync_handler(Arc::new(task)))
45    }
46
47    /// Create a blocking synchronous task from the full [`TaskContext`].
48    pub fn from_blocking<F>(task: F) -> Self
49    where
50        F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
51    {
52        Self::from_handler(wrap_blocking_handler(Arc::new(task)))
53    }
54}
55
56#[derive(Clone)]
57pub struct Job<D = ()> {
58    pub job_id: String,
59    pub execution_resource_id: String,
60    pub guard_scope: ExecutionGuardScope,
61    pub schedule: Schedule,
62    pub time_window: Option<JobTimeWindow>,
63    pub max_runs: Option<u32>,
64    pub missed_run_policy: MissedRunPolicy,
65    pub overlap_policy: OverlapPolicy,
66    pub(crate) task: TaskHandler<D>,
67    pub(crate) deps: Arc<D>,
68}
69
70impl Job<()> {
71    /// Create a job that uses no injected dependencies.
72    pub fn without_deps(job_id: impl Into<String>, schedule: Schedule, task: Task<()>) -> Self {
73        Self::from_parts(job_id.into(), schedule, Arc::new(()), task)
74    }
75}
76
77impl<D> Job<D>
78where
79    D: Send + Sync + 'static,
80{
81    /// Create a job from explicit dependencies and a task handler.
82    pub fn new(
83        job_id: impl Into<String>,
84        schedule: Schedule,
85        deps: impl Into<Arc<D>>,
86        task: Task<D>,
87    ) -> Self {
88        Self::from_parts(job_id.into(), schedule, deps.into(), task)
89    }
90}
91
92impl<D> Job<D> {
93    fn default_policies() -> (MissedRunPolicy, OverlapPolicy) {
94        (MissedRunPolicy::CatchUpOnce, OverlapPolicy::Forbid)
95    }
96
97    fn from_parts(job_id: String, schedule: Schedule, deps: Arc<D>, task: Task<D>) -> Self {
98        let (missed_run_policy, overlap_policy) = Self::default_policies();
99        Self {
100            execution_resource_id: job_id.clone(),
101            job_id,
102            guard_scope: ExecutionGuardScope::Occurrence,
103            schedule,
104            time_window: None,
105            max_runs: None,
106            missed_run_policy,
107            overlap_policy,
108            task: task.handler,
109            deps,
110        }
111    }
112
113    /// Limit how many triggers this job can consume before it exits.
114    ///
115    /// This applies to [`Schedule::Interval`], [`Schedule::AtTimes`], and
116    /// [`Schedule::Cron`].
117    /// A value of `0` makes the job exit immediately without running.
118    pub fn with_max_runs(mut self, max_runs: u32) -> Self {
119        self.max_runs = Some(max_runs);
120        self
121    }
122
123    pub fn with_missed_run_policy(mut self, policy: MissedRunPolicy) -> Self {
124        self.missed_run_policy = policy;
125        self
126    }
127
128    pub fn with_overlap_policy(mut self, policy: OverlapPolicy) -> Self {
129        self.overlap_policy = policy;
130        self
131    }
132
133    pub fn with_time_window(mut self, time_window: JobTimeWindow) -> Self {
134        self.time_window = Some(time_window);
135        self
136    }
137
138    pub fn with_execution_resource_id(mut self, resource_id: impl Into<String>) -> Self {
139        self.execution_resource_id = resource_id.into();
140        self
141    }
142
143    pub fn with_guard_scope(mut self, scope: ExecutionGuardScope) -> Self {
144        self.guard_scope = scope;
145        self
146    }
147
148    pub(crate) fn skip_reason_at(
149        &self,
150        now: DateTime<Utc>,
151        fallback_timezone: Tz,
152    ) -> Option<RunSkipReason> {
153        let window = self.time_window.as_ref()?;
154        if window.matches(now, fallback_timezone) {
155            None
156        } else {
157            Some(RunSkipReason::OutsideTimeWindow)
158        }
159    }
160}
161
162impl<D> std::fmt::Debug for Job<D> {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("Job")
165            .field("job_id", &self.job_id)
166            .field("execution_resource_id", &self.execution_resource_id)
167            .field("guard_scope", &self.guard_scope)
168            .field("schedule", &self.schedule)
169            .field("time_window", &self.time_window)
170            .field("max_runs", &self.max_runs)
171            .field("missed_run_policy", &self.missed_run_policy)
172            .field("overlap_policy", &self.overlap_policy)
173            .field("deps", &type_name::<D>())
174            .finish_non_exhaustive()
175    }
176}
177
178fn wrap_async_handler<D, F, Fut>(task: Arc<F>) -> TaskHandler<D>
179where
180    D: Send + Sync + 'static,
181    F: Fn(TaskContext<D>) -> Fut + Send + Sync + 'static,
182    Fut: Future<Output = JobResult> + Send + 'static,
183{
184    Arc::new(move |context| Box::pin((*task)(context)))
185}
186
187fn wrap_sync_handler<D, F>(task: Arc<F>) -> TaskHandler<D>
188where
189    D: Send + Sync + 'static,
190    F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
191{
192    Arc::new(move |context| Box::pin(ready((*task)(context))))
193}
194
195fn wrap_blocking_handler<D, F>(task: Arc<F>) -> TaskHandler<D>
196where
197    D: Send + Sync + 'static,
198    F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
199{
200    Arc::new(move |context| {
201        let task = task.clone();
202        Box::pin(async move { await_blocking(move || (*task)(context)).await })
203    })
204}
205
206#[derive(Debug, Clone)]
207pub struct RunContext {
208    pub job_id: String,
209    pub scheduled_at: DateTime<Utc>,
210    pub catch_up: bool,
211    /// The scheduler-configured timezone for downstream task logic.
212    pub timezone: Tz,
213}
214
215#[derive(Clone)]
216pub struct TaskContext<D> {
217    pub run: RunContext,
218    pub deps: Arc<D>,
219}
220
221impl<D> std::fmt::Debug for TaskContext<D> {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        f.debug_struct("TaskContext")
224            .field("run", &self.run)
225            .field("deps", &type_name::<D>())
226            .finish()
227    }
228}
229
230async fn await_blocking<F>(task: F) -> JobResult
231where
232    F: FnOnce() -> JobResult + Send + 'static,
233{
234    match tokio::task::spawn_blocking(task).await {
235        Ok(result) => result,
236        Err(error) if error.is_panic() => resume_unwind(error.into_panic()),
237        Err(error) => panic!("blocking task failed to join: {error}"),
238    }
239}