rexecutor/
job.rs

1//! Structs and definitions for jobs.
2//!
3//! The main type is the [`Job`] representing a single executable job in the system.
4use std::fmt::Display;
5
6use chrono::{DateTime, Utc};
7use serde::de::DeserializeOwned;
8
9use crate::backend;
10
11pub mod builder;
12pub mod query;
13pub(crate) mod runner;
14pub mod uniqueness_criteria;
15
16/// The id of a job.
17#[derive(Debug, Eq, Clone, Copy)]
18pub struct JobId(i32);
19
20impl From<i32> for JobId {
21    fn from(value: i32) -> Self {
22        Self(value)
23    }
24}
25
26impl From<JobId> for i32 {
27    fn from(value: JobId) -> Self {
28        value.0
29    }
30}
31
32impl PartialEq<i32> for JobId {
33    fn eq(&self, other: &i32) -> bool {
34        self.0 == *other
35    }
36}
37
38impl PartialEq<JobId> for i32 {
39    fn eq(&self, other: &JobId) -> bool {
40        *self == other.0
41    }
42}
43
44impl PartialEq<JobId> for JobId {
45    fn eq(&self, other: &Self) -> bool {
46        self.0 == other.0
47    }
48}
49
50impl Display for JobId {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "JobId({})", self.0)
53    }
54}
55
56/// The main data type representing a single executable job.
57///
58/// The job is passed as an argument to a number of the methods of [`crate::executor::Executor`] in
59/// particular the [`crate::executor::Executor::execute`] method responsible for defining how the
60/// job should be ran.
61#[derive(Debug, Eq, PartialEq, Clone)]
62// #[non_exhaustive]
63pub struct Job<D, M> {
64    /// The id of the job.
65    pub id: JobId,
66    /// The current status of the job.
67    pub status: JobStatus,
68    /// The name of the executor which is responsible for running this job.
69    ///
70    /// Determined via [`crate::executor::Executor::NAME`].
71    pub executor: String,
72    /// The data for running this job.
73    pub data: D,
74    /// Any metadata associated with this job.
75    pub metadata: Option<M>,
76    /// The current attempt of the job.
77    pub attempt: u16,
78    /// The maximum number of times to attempt this job before discarding it.
79    pub max_attempts: u16,
80    /// The priority of the job.
81    ///
82    /// The lower the number the higher the priority, i.e. the highest priority jobs are those with
83    /// priority zero.
84    ///
85    /// The priority is used to determine which jobs should be ran first when many jobs are
86    /// scheduled to run at the same time. The priority is applied to all jobs for the same
87    /// executor and not used to prioritise between different executors. Instead all executors will
88    /// attempt to execute their highest priority jobs simultaneously.
89    pub priority: u16,
90    /// The tags associated with this job.
91    pub tags: Vec<String>,
92    /// Any errors that have occurred in previous attempts to run this job.
93    pub errors: Vec<JobError>,
94    /// The timestamp when this job was inserted.
95    pub inserted_at: DateTime<Utc>,
96    /// The timestamp when the job should next be executed.
97    pub scheduled_at: DateTime<Utc>,
98    /// The timestamp of the last attempt to execute the job.
99    pub attempted_at: Option<DateTime<Utc>>,
100    /// The timestamp when the job completed successfully.
101    pub completed_at: Option<DateTime<Utc>>,
102    /// The timestamp when the job was cancelled.
103    pub cancelled_at: Option<DateTime<Utc>>,
104    /// The timestamp when the job was discarded.
105    pub discarded_at: Option<DateTime<Utc>>,
106}
107
108impl<D, M> TryFrom<backend::Job> for Job<D, M>
109where
110    D: DeserializeOwned,
111    M: DeserializeOwned,
112{
113    type Error = serde_json::Error;
114
115    fn try_from(value: backend::Job) -> Result<Self, Self::Error> {
116        let data = serde_json::from_value(value.data)?;
117        let metadata = serde_json::from_value(value.metadata)?;
118        Ok(Self {
119            id: value.id.into(),
120            status: value.status,
121            executor: value.executor,
122            data,
123            metadata,
124            attempt: value.attempt.try_into().unwrap(),
125            attempted_at: value.attempted_at,
126            max_attempts: value.max_attempts.try_into().unwrap(),
127            priority: value.priority.try_into().unwrap(),
128            tags: value.tags,
129            errors: value.errors,
130            inserted_at: value.inserted_at,
131            scheduled_at: value.scheduled_at,
132            completed_at: value.completed_at,
133            cancelled_at: value.cancelled_at,
134            discarded_at: value.discarded_at,
135        })
136    }
137}
138
139impl<D, M> Job<D, M> {
140    pub(crate) fn is_final_attempt(&self) -> bool {
141        self.attempt == self.max_attempts
142    }
143}
144
145/// The status of the job.
146///
147/// When a job is first inserted it will be in the [`JobStatus::Scheduled`] state.
148#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
149pub enum JobStatus {
150    /// The status of the job after it has ran successfully.
151    ///
152    /// I.e The state of the job after [`crate::executor::Executor`] returns
153    /// [`crate::executor::ExecutionResult::Done`].
154    Complete,
155    /// The status of the job while an executor is running the job.
156    Executing,
157    /// Initial state of the job before any attempt has been made to run the job.
158    ///
159    /// Usually this will imply that the jobs `scheduled_at` field is in the future. However, this
160    /// is not always the case. It is possible that a job gets inserted with a `scheduled_at` in
161    /// the past, or when the system has been offline for some time.
162    Scheduled,
163    /// State of the job after the first failed attempt to execute the job if the job's
164    /// `max_attempts` field is greater than one.
165    Retryable,
166    /// The state of the job after [`crate::executor::Executor`] returns
167    /// [`crate::executor::ExecutionResult::Cancel`].
168    Cancelled,
169    /// The state of the job attempting the job `max_attempts` times without success.
170    Discarded,
171}
172
173impl JobStatus {
174    /// A static array containing all the possible job statuses.
175    pub const ALL: [JobStatus; 6] = [
176        Self::Complete,
177        Self::Executing,
178        Self::Scheduled,
179        Self::Retryable,
180        Self::Cancelled,
181        Self::Discarded,
182    ];
183}
184
185/// A record of an error that was returned from a job.
186#[derive(Debug, PartialEq, Eq, Clone)]
187pub struct JobError {
188    /// The attempt when this error occurred.
189    pub attempt: u16,
190    /// The type of error that occurred.
191    pub error_type: ErrorType,
192    /// The details of the error.
193    pub details: String,
194    /// The time at which this error was recorded.
195    pub recorded_at: DateTime<Utc>,
196}
197
198/// Calcification of the types of errors that can cause a job to fail to execute successfully.
199#[derive(Debug, PartialEq, Eq, Clone)]
200pub enum ErrorType {
201    /// The execution of the job resulted in a `panic`.
202    Panic,
203    /// The job failed to complete before it timed out.
204    ///
205    /// The timeout can be set using [`crate::executor::Executor::timeout`].
206    Timeout,
207    /// The job was cancelled.
208    ///
209    /// This is achieved by returning [`crate::executor::ExecutionResult::Cancel`] from
210    /// [`crate::executor::Executor`].
211    Cancelled,
212    /// A customer error type.
213    ///
214    /// The details of which will be taken from the error returned as part of
215    /// [`crate::executor::ExecutionResult::Error`] from [`crate::executor::Executor`].
216    Other(String),
217}
218
219#[cfg(test)]
220mod test {
221    use super::*;
222
223    #[test]
224    fn job_id_display() {
225        let job_id = JobId(42);
226        assert_eq!(&job_id.to_string(), "JobId(42)");
227    }
228
229    #[test]
230    fn job_id_from_i32() {
231        let value = 42;
232        let job_id = JobId(42);
233        let into_job_id: JobId = value.into();
234        let into_i32: i32 = job_id.into();
235
236        assert_eq!(into_job_id, job_id);
237        assert_eq!(into_i32, value);
238    }
239
240    #[test]
241    fn from_backend_job_errors() {
242        let backend_job = backend::Job::raw_job();
243        let result = Job::<String, String>::try_from(backend_job);
244        assert!(result.is_err());
245
246        let backend_job =
247            backend::Job::raw_job().with_raw_metadata(serde_json::Value::Number(42.into()));
248        let result = Job::<(), String>::try_from(backend_job);
249        assert!(result.is_err());
250    }
251
252    #[test]
253    fn from_backend_job() {
254        let backend_job =
255            backend::Job::raw_job().with_raw_data(serde_json::Value::String("data".to_owned()));
256        let job = Job::<String, String>::try_from(backend_job.clone()).expect("Should not error");
257
258        assert_eq!(job.id, backend_job.id);
259        assert_eq!(job.status, backend_job.status);
260        assert_eq!(job.executor, backend_job.executor);
261        assert_eq!(&job.data, "data");
262        assert_eq!(job.metadata, None);
263        assert_eq!(job.attempt as i32, backend_job.attempt);
264        assert_eq!(job.max_attempts as i32, backend_job.max_attempts);
265        assert_eq!(job.priority as i32, backend_job.priority);
266        assert_eq!(job.tags, backend_job.tags);
267        assert_eq!(job.errors, backend_job.errors);
268        assert_eq!(job.inserted_at, backend_job.inserted_at);
269        assert_eq!(job.scheduled_at, backend_job.scheduled_at);
270        assert_eq!(job.attempted_at, backend_job.attempted_at);
271        assert_eq!(job.completed_at, backend_job.completed_at);
272        assert_eq!(job.cancelled_at, backend_job.cancelled_at);
273        assert_eq!(job.discarded_at, backend_job.discarded_at);
274    }
275}