ora_client/
job_definition.rs

1//! Job definitions and related types.
2
3use std::{borrow::Cow, time::SystemTime};
4
5use eyre::{Context, OptionExt};
6use ora_proto::{
7    common::v1::{JobRetryPolicy, JobTimeoutPolicy},
8    server::v1::{Job, JobExecutionStatus},
9};
10use serde::Serialize;
11use uuid::Uuid;
12
13use crate::{
14    schedule_definition::{
15        ScheduleDefinition, ScheduleJobTimingPolicy, ScheduleJobTimingPolicyCron,
16        ScheduleJobTimingPolicyRepeat,
17    },
18    IndexMap,
19};
20
21/// The definition of a job that can be executed
22/// in the future.
23#[derive(Debug, Clone)]
24#[must_use]
25pub struct JobDefinition {
26    /// The ID of the job type.
27    pub job_type_id: Cow<'static, str>,
28    /// The target execution time of the job.
29    ///
30    /// If not provided, it should be set to the current time.
31    pub target_execution_time: std::time::SystemTime,
32    /// The job input payload JSON that is passed to the worker.
33    pub input_payload_json: String,
34    /// The labels of the job.
35    pub labels: IndexMap<String, String>,
36    /// The timeout policy of the job.
37    pub timeout_policy: TimeoutPolicy,
38    /// Retry policy for the job.
39    pub retry_policy: RetryPolicy,
40    /// Additional metadata for the job.
41    pub metadata_json: Option<String>,
42}
43
44impl JobDefinition {
45    /// Set the target execution time of the job.
46    pub fn at(mut self, target_execution_time: impl Into<std::time::SystemTime>) -> Self {
47        self.target_execution_time = target_execution_time.into();
48        self
49    }
50
51    /// Set the target execution time of the job to the current time.
52    pub fn now(mut self) -> Self {
53        self.target_execution_time = std::time::SystemTime::now();
54        self
55    }
56
57    /// Add a label to the job.
58    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
59        self.labels.insert(key.into(), value.into());
60        self
61    }
62
63    /// Add a JSON label to the job.
64    ///
65    /// # Panics
66    ///
67    /// If the value cannot be serialized to JSON.
68    pub fn with_label_json(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
69        self.labels.insert(
70            key.into(),
71            serde_json::to_string(&value).expect("label serialization failed"),
72        );
73        self
74    }
75
76    /// Set the timeout for the job.
77    ///
78    /// The timeout is calculated from the start time of the job
79    /// by default.
80    pub fn with_timeout(mut self, timeout: impl Into<std::time::Duration>) -> Self {
81        self.timeout_policy.timeout = Some(timeout.into());
82        self
83    }
84
85    /// Set the number of retries for the job.
86    pub fn with_retries(mut self, retries: u64) -> Self {
87        self.retry_policy.retries = retries;
88        self
89    }
90
91    /// Set additional metadata for the job.
92    ///
93    /// Note that this will replace any existing metadata.
94    ///
95    /// # Panics
96    ///
97    /// If the metadata cannot be serialized to JSON.
98    pub fn replace_metadata(mut self, metadata: impl Serialize) -> Self {
99        self.metadata_json =
100            Some(serde_json::to_string(&metadata).expect("metadata serialization failed"));
101        self
102    }
103}
104
105impl JobDefinition {
106    /// Create a schedule from the job definition
107    /// that repeats at a fixed interval.
108    pub fn repeat_every(self, interval: std::time::Duration) -> ScheduleDefinition {
109        ScheduleDefinition {
110            job_timing_policy: ScheduleJobTimingPolicy::Repeat(ScheduleJobTimingPolicyRepeat {
111                interval,
112                immediate: false,
113                missed_time_policy: Default::default(),
114            }),
115            job_creation_policy:
116                crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(self),
117            time_range: None,
118            labels: Default::default(),
119            metadata_json: None,
120            propagate_labels_to_jobs: true,
121        }
122    }
123
124    /// Create a schedule from the job definition
125    /// that repeats according to a cron expression.
126    ///
127    /// # Errors
128    ///
129    /// If the cron expression is invalid.
130    pub fn repeat_cron(
131        self,
132        cron_expression: impl Into<String>,
133    ) -> eyre::Result<ScheduleDefinition> {
134        let expr = cron_expression.into();
135
136        let mut parse_options = cronexpr::ParseOptions::default();
137        parse_options.fallback_timezone_option = cronexpr::FallbackTimezoneOption::UTC;
138
139        cronexpr::parse_crontab_with(&expr, parse_options)?;
140
141        Ok(ScheduleDefinition {
142            job_timing_policy: ScheduleJobTimingPolicy::Cron(ScheduleJobTimingPolicyCron {
143                cron_expression: expr,
144                immediate: false,
145                missed_time_policy: Default::default(),
146            }),
147            job_creation_policy:
148                crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(self),
149            time_range: None,
150            labels: Default::default(),
151            metadata_json: None,
152            propagate_labels_to_jobs: true,
153        })
154    }
155}
156
157/// A timeout policy for a job.
158#[derive(Debug, Default, Clone, Copy)]
159pub struct TimeoutPolicy {
160    /// The timeout for the job.
161    pub timeout: Option<std::time::Duration>,
162    /// The base time for the timeout.
163    ///
164    /// The timeout is calculated from this time.
165    pub base_time: TimeoutBaseTime,
166}
167
168/// The base time for the timeout.
169#[derive(Debug, Default, Clone, Copy)]
170pub enum TimeoutBaseTime {
171    /// The base time is the start time of the job.
172    #[default]
173    StartTime,
174    /// The base time is the target execution time of the job.
175    ///
176    /// Note that if the target execution time is not set,
177    /// the timeout is calculated from the start time of the job.
178    ///
179    /// If the target execution time is in the past,
180    /// the jobs may be immediately timed out.
181    TargetExecutionTime,
182}
183
184/// A retry policy for a job.
185#[derive(Debug, Default, Clone, Copy)]
186pub struct RetryPolicy {
187    /// The number of retries for the job.
188    ///
189    /// If the number of retries is zero, the job is not retried.
190    pub retries: u64,
191}
192
193/// The status of a job.
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
195pub enum JobStatus {
196    /// The job is not yet scheduled for execution.
197    Pending,
198    /// The job is ready to be executed.
199    Ready,
200    /// The job is assigned to an executor but has not started yet.
201    Assigned,
202    /// The job is currently being executed.
203    Running,
204    /// The job has completed successfully.
205    Succeeded,
206    /// The job has failed.
207    Failed,
208}
209
210impl JobStatus {
211    /// Check if the status is terminal.
212    #[must_use]
213    pub fn is_terminal(&self) -> bool {
214        matches!(self, Self::Succeeded | Self::Failed)
215    }
216
217    /// Returns `true` if the job status is [`Pending`].
218    ///
219    /// [`Pending`]: JobStatus::Pending
220    #[must_use]
221    pub fn is_pending(&self) -> bool {
222        matches!(self, Self::Pending)
223    }
224
225    /// Returns `true` if the job status is [`Ready`].
226    ///
227    /// [`Ready`]: JobStatus::Ready
228    #[must_use]
229    pub fn is_ready(&self) -> bool {
230        matches!(self, Self::Ready)
231    }
232
233    /// Returns `true` if the job status is [`Assigned`].
234    ///
235    /// [`Assigned`]: JobStatus::Assigned
236    #[must_use]
237    pub fn is_assigned(&self) -> bool {
238        matches!(self, Self::Assigned)
239    }
240
241    /// Returns `true` if the job status is [`Running`].
242    ///
243    /// [`Running`]: JobStatus::Running
244    #[must_use]
245    pub fn is_running(&self) -> bool {
246        matches!(self, Self::Running)
247    }
248
249    /// Returns `true` if the job status is [`Succeeded`].
250    ///
251    /// [`Succeeded`]: JobStatus::Succeeded
252    #[must_use]
253    pub fn is_succeeded(&self) -> bool {
254        matches!(self, Self::Succeeded)
255    }
256
257    /// Returns `true` if the job status is [`Failed`].
258    ///
259    /// [`Failed`]: JobStatus::Failed
260    #[must_use]
261    pub fn is_failed(&self) -> bool {
262        matches!(self, Self::Failed)
263    }
264}
265
266impl From<JobExecutionStatus> for JobStatus {
267    fn from(status: JobExecutionStatus) -> Self {
268        match status {
269            JobExecutionStatus::Pending | JobExecutionStatus::Unspecified => Self::Pending,
270            JobExecutionStatus::Ready => Self::Ready,
271            JobExecutionStatus::Assigned => Self::Assigned,
272            JobExecutionStatus::Running => Self::Running,
273            JobExecutionStatus::Succeeded => Self::Succeeded,
274            JobExecutionStatus::Failed => Self::Failed,
275        }
276    }
277}
278
279impl From<JobStatus> for JobExecutionStatus {
280    fn from(status: JobStatus) -> Self {
281        match status {
282            JobStatus::Pending => JobExecutionStatus::Pending,
283            JobStatus::Ready => JobExecutionStatus::Ready,
284            JobStatus::Assigned => JobExecutionStatus::Assigned,
285            JobStatus::Running => JobExecutionStatus::Running,
286            JobStatus::Succeeded => JobExecutionStatus::Succeeded,
287            JobStatus::Failed => JobExecutionStatus::Failed,
288        }
289    }
290}
291
292/// All core information about a job.
293#[derive(Debug, Clone)]
294pub struct JobDetails {
295    /// The unique identifier of the job.
296    pub id: Uuid,
297    /// Whether the job is active.
298    ///
299    /// Inactive jobs are not scheduled for execution.
300    ///
301    /// Jobs become inactive when they succeed or fail all their retries,
302    /// or get cancelled.
303    pub active: bool,
304    /// Whether the job was cancelled.
305    pub cancelled: bool,
306    /// The ID of the job type.
307    pub job_type_id: String,
308    /// The target execution time of the job.
309    ///
310    /// If not provided, it should be set to the current time.
311    pub target_execution_time: SystemTime,
312    /// The job input payload JSON that is passed to the executor.
313    pub input_payload_json: String,
314    /// The labels of the job.
315    pub labels: IndexMap<String, String>,
316    /// The timeout policy of the job.
317    pub timeout_policy: JobTimeoutPolicy,
318    /// Retry policy for the job.
319    pub retry_policy: JobRetryPolicy,
320    /// The creation time of the job.
321    pub created_at: SystemTime,
322    /// A list of executions for the job.
323    pub executions: Vec<ExecutionDetails>,
324}
325
326impl JobDetails {
327    /// Get the status of the job.
328    #[must_use]
329    pub fn status(&self) -> JobStatus {
330        let Some(last_execution) = self.executions.last() else {
331            return JobStatus::Pending;
332        };
333
334        last_execution.status
335    }
336}
337
338impl TryFrom<Job> for JobDetails {
339    type Error = eyre::Report;
340
341    fn try_from(value: Job) -> Result<Self, Self::Error> {
342        let Some(definition) = value.definition else {
343            return Err(eyre::eyre!("missing job definition"));
344        };
345
346        Ok(Self {
347            id: value.id.parse().wrap_err("invalid job ID")?,
348            active: value.active,
349            cancelled: value.cancelled,
350            job_type_id: definition.job_type_id,
351            target_execution_time: definition
352                .target_execution_time
353                .and_then(|t| t.try_into().ok())
354                .unwrap_or(SystemTime::UNIX_EPOCH),
355            input_payload_json: definition.input_payload_json,
356            labels: definition
357                .labels
358                .into_iter()
359                .map(|job_label| (job_label.key, job_label.value))
360                .collect(),
361            timeout_policy: definition.timeout_policy.unwrap_or_default(),
362            retry_policy: definition.retry_policy.unwrap_or_default(),
363            created_at: value
364                .created_at
365                .map(TryInto::try_into)
366                .transpose()?
367                .ok_or_eyre("missing creation time")?,
368            executions: value
369                .executions
370                .into_iter()
371                .map(TryInto::try_into)
372                .collect::<Result<_, _>>()?,
373        })
374    }
375}
376
377/// All core information about an execution
378/// that is associated with a job.
379#[derive(Debug, Clone)]
380pub struct ExecutionDetails {
381    /// The ID of the job execution.
382    pub id: Uuid,
383    /// The ID of the job.
384    pub job_id: Uuid,
385    /// The ID of the associated executor.
386    pub executor_id: Option<Uuid>,
387    /// The status of the job execution.
388    pub status: JobStatus,
389    /// The time the job execution was created.
390    pub created_at: SystemTime,
391    /// The time the job execution was marked as ready.
392    pub ready_at: Option<SystemTime>,
393    /// The time the job execution was assigned to an executor.
394    pub assigned_at: Option<SystemTime>,
395    /// The time the job execution has started.
396    pub started_at: Option<SystemTime>,
397    /// The time the job execution has succeeded.
398    pub succeeded_at: Option<SystemTime>,
399    /// The time the job execution has failed.
400    pub failed_at: Option<SystemTime>,
401    /// The output payload of the execution.
402    pub output_payload_json: Option<String>,
403    /// The error message of the execution.
404    pub failure_reason: Option<String>,
405}
406
407impl TryFrom<ora_proto::server::v1::JobExecution> for ExecutionDetails {
408    type Error = eyre::Report;
409
410    fn try_from(value: ora_proto::server::v1::JobExecution) -> Result<Self, Self::Error> {
411        Ok(Self {
412            status: value.status().into(),
413            id: value.id.parse().wrap_err("invalid execution ID")?,
414            job_id: value.job_id.parse().wrap_err("invalid job ID")?,
415            executor_id: value
416                .executor_id
417                .map(|id| id.parse())
418                .transpose()
419                .wrap_err("invalid executor ID")?,
420            created_at: value
421                .created_at
422                .map(TryInto::try_into)
423                .transpose()?
424                .ok_or_eyre("missing creation time")?,
425            ready_at: value.ready_at.map(TryInto::try_into).transpose()?,
426            assigned_at: value.assigned_at.map(TryInto::try_into).transpose()?,
427            started_at: value.started_at.map(TryInto::try_into).transpose()?,
428            succeeded_at: value.succeeded_at.map(TryInto::try_into).transpose()?,
429            failed_at: value.failed_at.map(TryInto::try_into).transpose()?,
430            output_payload_json: value.output_payload_json,
431            failure_reason: value.failure_reason,
432        })
433    }
434}