Skip to main content

simple_queue/
job.rs

1use super::*;
2
3/// Represents a job processed/processing by the queue.
4pub struct Job {
5    /// Job ID
6    pub id: uuid::Uuid,
7    /// Fingerprint, made as a soft key for deduplication purposes or cancelling.
8    /// Does not violate any database constraints.
9    pub fingerprint: Option<String>,
10    /// Unique key - database constraint on `unique_key` jobs in `pending` and `running` states.
11    /// Repeated inserts with the same `unique_key` will be rejected by the database.
12    pub unique_key: Option<String>,
13    /// Name of the queue job belongs to.
14    pub queue: String,
15    /// Payload - `serde_json::Value`,
16    pub job_data: serde_json::Value,
17    /// Current status of the job in string form
18    pub status: String,
19    /// Timestamp when the job was created.
20    pub created_at: chrono::DateTime<chrono::Utc>,
21    /// Timestamp when the job is scheduled to run.
22    pub run_at: Option<chrono::DateTime<chrono::Utc>>,
23    /// Timestamp when the job was last updated (for use by heartbeat)
24    pub updated_at: Option<chrono::DateTime<chrono::Utc>>,
25    /// Attempt count of the job
26    pub attempt: i32,
27    /// Maximum number of attempts for the job
28    pub max_attempts: i32,
29    pub(crate) reprocess_count: i32,
30}
31impl std::fmt::Debug for Job {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("Job")
34            .field("id", &self.id)
35            .field("fingerprint", &self.fingerprint)
36            .field("unique_key", &self.unique_key)
37            .field("queue", &self.queue)
38            .field("status", &self.status)
39            .field("created_at", &self.created_at)
40            .field("attempt", &self.attempt)
41            .field("max_attempts", &self.max_attempts)
42            .finish()
43    }
44}
45impl Default for Job {
46    fn default() -> Self {
47        Self {
48            id: uuid::Uuid::new_v4(),
49            fingerprint: None,
50            unique_key: None,
51            queue: "default".into(),
52            job_data: serde_json::Value::default(),
53            status: result::JobResultInternal::Pending.to_string(),
54            created_at: chrono::Utc::now(),
55            run_at: None,
56            updated_at: None,
57            attempt: 0,
58            max_attempts: 3,
59            reprocess_count: 0,
60        }
61    }
62}
63
64impl Job {
65    /// Create a new [`Job`] instance with the given queue name and job payload.
66    pub fn new<T: serde::Serialize + serde::de::DeserializeOwned>(
67        queue: &'static str,
68        job_data: T,
69    ) -> Self {
70        Self {
71            queue: queue.to_string(),
72            job_data: serde_json::to_value(job_data).unwrap_or_default(),
73            ..Default::default()
74        }
75    }
76    /// Builder method: sets [`Job::unique_key`]
77    pub fn with_unique_key(self, unique_key: impl Into<String>) -> Self {
78        Self {
79            unique_key: Some(unique_key.into()),
80            ..self
81        }
82    }
83    /// Builder method: sets [`Job::run_at`]
84    pub fn with_run_at(self, run_at: chrono::DateTime<chrono::Utc>) -> Self {
85        Self {
86            run_at: Some(run_at),
87            ..self
88        }
89    }
90    /// Builder method: sets [`Job::max_attempts`]
91    pub fn with_max_attempts(self, max_attempts: i32) -> Self {
92        Self {
93            max_attempts,
94            ..self
95        }
96    }
97    /// Builder method: sets [`Job::fingerprint`]
98    pub fn with_fingerprint(self, fingerprint: impl Into<String>) -> Self {
99        Self {
100            fingerprint: Some(fingerprint.into()),
101            ..self
102        }
103    }
104}
105
106pub trait JobExt<T> {
107    fn into_job(self, queue: &'static str) -> Job;
108}
109
110impl<T: serde::Serialize + serde::de::DeserializeOwned> JobExt<T> for T {
111    fn into_job(self, queue: &'static str) -> Job {
112        Job::new(queue, self)
113    }
114}