ora_client/
job_type.rs

1//! Traits and types that allow defining jobs in a mostly type-safe way.
2
3use std::{borrow::Cow, time::SystemTime};
4
5use schemars::JsonSchema;
6use serde::{de::DeserializeOwned, Serialize};
7
8use crate::{
9    job_definition::{JobDefinition, RetryPolicy, TimeoutPolicy},
10    schedule_definition::ScheduleDefinition,
11    IndexMap,
12};
13
14/// A job type that can be executed by the server.
15///
16/// The instances of the type implementing this
17/// are also used as inputs of the job.
18pub trait JobType: Serialize + DeserializeOwned + JsonSchema + Send + Sync + 'static {
19    /// The output type of the job.
20    type Output: Serialize + DeserializeOwned + JsonSchema + Send + Sync + 'static;
21
22    /// A unique identifier for the job type.
23    ///
24    /// Executors will primarily use this
25    /// to determine which job type to execute.
26    fn id() -> &'static str;
27
28    /// A default timeout policy for the job type.
29    #[must_use]
30    fn default_timeout_policy() -> TimeoutPolicy {
31        TimeoutPolicy::default()
32    }
33
34    /// A default retry policy for the job type.
35    #[must_use]
36    fn default_retry_policy() -> RetryPolicy {
37        RetryPolicy::default()
38    }
39
40    /// Default labels for the job type.
41    #[must_use]
42    fn default_labels() -> IndexMap<String, String> {
43        IndexMap::default()
44    }
45}
46
47/// Extension trait for job types.
48pub trait JobTypeExt: JobType {
49    /// Create a new job definition for this job type.
50    /// The target execution time is set to the current time.
51    ///
52    /// # Panics
53    ///
54    /// Panics if the job type cannot be serialized.
55    fn job(&self) -> TypedJobDefinition<Self>;
56
57    /// Return metadata for the job type.
58    #[must_use]
59    fn metadata() -> JobTypeMetadata;
60}
61
62impl<J> JobTypeExt for J
63where
64    J: JobType,
65{
66    fn job(&self) -> TypedJobDefinition<Self> {
67        JobDefinition {
68            job_type_id: Self::id().into(),
69            target_execution_time: SystemTime::now(),
70            input_payload_json: serde_json::to_string(self).unwrap(),
71            labels: Self::default_labels(),
72            timeout_policy: Self::default_timeout_policy(),
73            retry_policy: Self::default_retry_policy(),
74            metadata_json: None,
75        }
76        .into()
77    }
78
79    fn metadata() -> JobTypeMetadata {
80        let input_schema = schemars::schema_for!(J);
81
82        JobTypeMetadata {
83            id: Self::id().into(),
84            name: input_schema
85                .schema
86                .metadata
87                .as_ref()
88                .and_then(|m| m.title.clone()),
89            description: input_schema
90                .schema
91                .metadata
92                .as_ref()
93                .and_then(|m| m.description.clone()),
94            input_schema_json: Some(serde_json::to_string(&input_schema).unwrap()),
95            output_schema_json: Some(
96                serde_json::to_string(&schemars::schema_for!(J::Output)).unwrap(),
97            ),
98        }
99    }
100}
101
102/// A job definition that carries the job type.
103///
104/// Note that this type does not guarantee that the job type
105/// matches the underlying job definition.
106#[derive(Debug, Clone)]
107#[must_use]
108pub struct TypedJobDefinition<J = ()> {
109    pub(crate) inner: JobDefinition,
110    _job_type: std::marker::PhantomData<fn() -> J>,
111}
112
113impl<J> TypedJobDefinition<J> {
114    /// Set the target execution time of the job.
115    pub fn at(mut self, target_execution_time: impl Into<std::time::SystemTime>) -> Self {
116        self.inner.target_execution_time = target_execution_time.into();
117        self
118    }
119
120    /// Set the target execution time of the job to the current time.
121    pub fn now(mut self) -> Self {
122        self.inner.target_execution_time = std::time::SystemTime::now();
123        self
124    }
125
126    /// Add a label to the job.
127    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
128        self.inner.labels.insert(key.into(), value.into());
129        self
130    }
131
132    /// Set the timeout for the job.
133    ///
134    /// The timeout is calculated from the start time of the job
135    /// by default.
136    pub fn with_timeout(mut self, timeout: impl Into<std::time::Duration>) -> Self {
137        self.inner.timeout_policy.timeout = Some(timeout.into());
138        self
139    }
140
141    /// Set the number of retries for the job.
142    pub fn with_retries(mut self, retries: u64) -> Self {
143        self.inner.retry_policy.retries = retries;
144        self
145    }
146
147    /// Get the inner job definition.
148    pub fn into_inner(self) -> JobDefinition {
149        self.inner
150    }
151
152    /// Convert the job type to a different job type.
153    pub fn cast_type<T>(self) -> TypedJobDefinition<T> {
154        TypedJobDefinition {
155            inner: self.inner,
156            _job_type: std::marker::PhantomData,
157        }
158    }
159}
160
161impl<J> TypedJobDefinition<J> {
162    /// Create a schedule from the job definition
163    /// that repeats at a fixed interval.
164    pub fn repeat_every(self, interval: std::time::Duration) -> ScheduleDefinition {
165        self.inner.repeat_every(interval)
166    }
167
168    /// Create a schedule from the job definition
169    /// that repeats according to a cron expression.
170    ///
171    /// # Errors
172    ///
173    /// If the cron expression is invalid.
174    pub fn repeat_cron(
175        self,
176        cron_expression: impl Into<String>,
177    ) -> eyre::Result<ScheduleDefinition> {
178        self.inner.repeat_cron(cron_expression)
179    }
180}
181
182impl<J> TypedJobDefinition<J>
183where
184    J: JobType,
185{
186    /// Get the input of the job, which is an instance of the job type.
187    ///
188    /// # Panics
189    ///
190    /// Panics if the input payload cannot be deserialized.
191    #[must_use]
192    pub fn input(&self) -> J {
193        serde_json::from_str(&self.inner.input_payload_json).unwrap()
194    }
195
196    /// Try to get the input of the job, which is an instance of the job type.
197    pub fn try_input(&self) -> Result<J, serde_json::Error> {
198        serde_json::from_str(&self.inner.input_payload_json)
199    }
200
201    /// Set the input of the job.
202    ///
203    /// # Panics
204    ///
205    /// Panics if the input cannot be serialized.
206    pub fn with_input(mut self, input: impl AsRef<J>) -> Self {
207        self.inner.input_payload_json = serde_json::to_string(input.as_ref()).unwrap();
208        self
209    }
210
211    /// Determine whether the job definition is valid
212    /// for the job type.
213    #[must_use]
214    pub fn is_valid(&self) -> bool {
215        self.inner.job_type_id == J::id() && self.try_input().is_ok()
216    }
217}
218
219impl<J> From<JobDefinition> for TypedJobDefinition<J> {
220    fn from(job_definition: JobDefinition) -> Self {
221        Self {
222            inner: job_definition,
223            _job_type: std::marker::PhantomData,
224        }
225    }
226}
227
228impl<J> From<TypedJobDefinition<J>> for JobDefinition {
229    fn from(typed: TypedJobDefinition<J>) -> Self {
230        typed.inner
231    }
232}
233
234impl JobDefinition {
235    /// Cast the job definition to a typed job definition
236    /// with an unknown job type.
237    pub fn into_typed_unknown(self) -> TypedJobDefinition {
238        TypedJobDefinition {
239            inner: self,
240            _job_type: std::marker::PhantomData,
241        }
242    }
243
244    /// Cast the job definition to a typed job definition
245    /// with a known job type.
246    pub fn into_typed<J>(self) -> TypedJobDefinition<J> {
247        TypedJobDefinition {
248            inner: self,
249            _job_type: std::marker::PhantomData,
250        }
251    }
252}
253
254/// Metadata for a job type.
255#[derive(Debug, Clone)]
256pub struct JobTypeMetadata {
257    /// The ID of the job type.
258    pub id: Cow<'static, str>,
259    /// The name of the job type.
260    pub name: Option<String>,
261    /// The description of the job type.
262    pub description: Option<String>,
263    /// The input JSON schema of the job type.
264    pub input_schema_json: Option<String>,
265    /// The output JSON schema of the job type.
266    pub output_schema_json: Option<String>,
267}