Skip to main content

scheduler/model/
task.rs

1use crate::execution_guard::ExecutionGuardScope;
2use crate::model::{MissedRunPolicy, OverlapPolicy, Schedule};
3use chrono::{DateTime, Utc};
4use chrono_tz::Tz;
5use std::any::type_name;
6use std::future::{Future, ready};
7use std::panic::resume_unwind;
8use std::pin::Pin;
9use std::sync::Arc;
10
11/// The task return type used by scheduled jobs.
12pub type JobResult = Result<(), String>;
13/// The boxed future returned by a scheduled job.
14pub type JobFuture = Pin<Box<dyn Future<Output = JobResult> + Send>>;
15pub(crate) type TaskHandler<D> = Arc<dyn Fn(TaskContext<D>) -> JobFuture + Send + Sync>;
16
17#[derive(Clone)]
18pub struct Task<D> {
19    pub(crate) handler: TaskHandler<D>,
20}
21
22impl<D> Task<D>
23where
24    D: Send + Sync + 'static,
25{
26    fn from_handler(handler: TaskHandler<D>) -> Self {
27        Self { handler }
28    }
29
30    /// Create an async task from the full [`TaskContext`].
31    pub fn from_async<F, Fut>(task: F) -> Self
32    where
33        F: Fn(TaskContext<D>) -> Fut + Send + Sync + 'static,
34        Fut: Future<Output = JobResult> + Send + 'static,
35    {
36        Self::from_handler(wrap_async_handler(Arc::new(task)))
37    }
38
39    /// Create a lightweight synchronous task from the full [`TaskContext`].
40    pub fn from_sync<F>(task: F) -> Self
41    where
42        F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
43    {
44        Self::from_handler(wrap_sync_handler(Arc::new(task)))
45    }
46
47    /// Create a blocking synchronous task from the full [`TaskContext`].
48    pub fn from_blocking<F>(task: F) -> Self
49    where
50        F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
51    {
52        Self::from_handler(wrap_blocking_handler(Arc::new(task)))
53    }
54}
55
56#[derive(Clone)]
57pub struct Job<D = ()> {
58    pub job_id: String,
59    pub execution_resource_id: String,
60    pub guard_scope: ExecutionGuardScope,
61    pub schedule: Schedule,
62    pub max_runs: Option<u32>,
63    pub missed_run_policy: MissedRunPolicy,
64    pub overlap_policy: OverlapPolicy,
65    pub(crate) task: TaskHandler<D>,
66    pub(crate) deps: Arc<D>,
67}
68
69impl Job<()> {
70    /// Create a job that uses no injected dependencies.
71    pub fn without_deps(job_id: impl Into<String>, schedule: Schedule, task: Task<()>) -> Self {
72        Self::from_parts(job_id.into(), schedule, Arc::new(()), task)
73    }
74}
75
76impl<D> Job<D>
77where
78    D: Send + Sync + 'static,
79{
80    /// Create a job from explicit dependencies and a task handler.
81    pub fn new(
82        job_id: impl Into<String>,
83        schedule: Schedule,
84        deps: impl Into<Arc<D>>,
85        task: Task<D>,
86    ) -> Self {
87        Self::from_parts(job_id.into(), schedule, deps.into(), task)
88    }
89}
90
91impl<D> Job<D> {
92    fn default_policies() -> (MissedRunPolicy, OverlapPolicy) {
93        (MissedRunPolicy::CatchUpOnce, OverlapPolicy::Forbid)
94    }
95
96    fn from_parts(job_id: String, schedule: Schedule, deps: Arc<D>, task: Task<D>) -> Self {
97        let (missed_run_policy, overlap_policy) = Self::default_policies();
98        Self {
99            execution_resource_id: job_id.clone(),
100            job_id,
101            guard_scope: ExecutionGuardScope::Occurrence,
102            schedule,
103            max_runs: None,
104            missed_run_policy,
105            overlap_policy,
106            task: task.handler,
107            deps,
108        }
109    }
110
111    /// Limit how many triggers this job can consume before it exits.
112    ///
113    /// This applies to [`Schedule::Interval`], [`Schedule::AtTimes`], and
114    /// [`Schedule::Cron`].
115    /// A value of `0` makes the job exit immediately without running.
116    pub fn with_max_runs(mut self, max_runs: u32) -> Self {
117        self.max_runs = Some(max_runs);
118        self
119    }
120
121    pub fn with_missed_run_policy(mut self, policy: MissedRunPolicy) -> Self {
122        self.missed_run_policy = policy;
123        self
124    }
125
126    pub fn with_overlap_policy(mut self, policy: OverlapPolicy) -> Self {
127        self.overlap_policy = policy;
128        self
129    }
130
131    pub fn with_execution_resource_id(mut self, resource_id: impl Into<String>) -> Self {
132        self.execution_resource_id = resource_id.into();
133        self
134    }
135
136    pub fn with_guard_scope(mut self, scope: ExecutionGuardScope) -> Self {
137        self.guard_scope = scope;
138        self
139    }
140}
141
142impl<D> std::fmt::Debug for Job<D> {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("Job")
145            .field("job_id", &self.job_id)
146            .field("execution_resource_id", &self.execution_resource_id)
147            .field("guard_scope", &self.guard_scope)
148            .field("schedule", &self.schedule)
149            .field("max_runs", &self.max_runs)
150            .field("missed_run_policy", &self.missed_run_policy)
151            .field("overlap_policy", &self.overlap_policy)
152            .field("deps", &type_name::<D>())
153            .finish_non_exhaustive()
154    }
155}
156
157fn wrap_async_handler<D, F, Fut>(task: Arc<F>) -> TaskHandler<D>
158where
159    D: Send + Sync + 'static,
160    F: Fn(TaskContext<D>) -> Fut + Send + Sync + 'static,
161    Fut: Future<Output = JobResult> + Send + 'static,
162{
163    Arc::new(move |context| Box::pin((*task)(context)))
164}
165
166fn wrap_sync_handler<D, F>(task: Arc<F>) -> TaskHandler<D>
167where
168    D: Send + Sync + 'static,
169    F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
170{
171    Arc::new(move |context| Box::pin(ready((*task)(context))))
172}
173
174fn wrap_blocking_handler<D, F>(task: Arc<F>) -> TaskHandler<D>
175where
176    D: Send + Sync + 'static,
177    F: Fn(TaskContext<D>) -> JobResult + Send + Sync + 'static,
178{
179    Arc::new(move |context| {
180        let task = task.clone();
181        Box::pin(async move { await_blocking(move || (*task)(context)).await })
182    })
183}
184
185#[derive(Debug, Clone)]
186pub struct RunContext {
187    pub job_id: String,
188    pub scheduled_at: DateTime<Utc>,
189    pub catch_up: bool,
190    /// The scheduler-configured timezone for downstream task logic.
191    pub timezone: Tz,
192}
193
194#[derive(Clone)]
195pub struct TaskContext<D> {
196    pub run: RunContext,
197    pub deps: Arc<D>,
198}
199
200impl<D> std::fmt::Debug for TaskContext<D> {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        f.debug_struct("TaskContext")
203            .field("run", &self.run)
204            .field("deps", &type_name::<D>())
205            .finish()
206    }
207}
208
209async fn await_blocking<F>(task: F) -> JobResult
210where
211    F: FnOnce() -> JobResult + Send + 'static,
212{
213    match tokio::task::spawn_blocking(task).await {
214        Ok(result) => result,
215        Err(error) if error.is_panic() => resume_unwind(error.into_panic()),
216        Err(error) => panic!("blocking task failed to join: {error}"),
217    }
218}