graphile_worker_job/
lib.rs

1use chrono::{DateTime, Utc};
2use derive_builder::Builder;
3use getset::Getters;
4use serde_json::Value;
5use sqlx::FromRow;
6
7/// `DbJob` represents a job as stored in the database.
8///
9/// It contains all the fields from the database table but doesn't include
10/// the task identifier string, which is provided separately when constructing
11/// a `Job` instance for use in the worker system.
12#[derive(FromRow, Getters, Debug, Clone, PartialEq, Eq)]
13#[getset(get = "pub")]
14#[allow(dead_code)]
15pub struct DbJob {
16    /// Unique identifier for this job
17    id: i64,
18    /// Foreign key to job_queues table if this job is part of a queue
19    job_queue_id: Option<i32>,
20    /// The JSON payload/data associated with this job
21    payload: serde_json::Value,
22    /// Priority level (lower number means higher priority)
23    priority: i16,
24    /// When the job is scheduled to run
25    run_at: DateTime<Utc>,
26    /// How many times this job has been attempted
27    attempts: i16,
28    /// Maximum number of retry attempts before considering the job permanently failed
29    max_attempts: i16,
30    /// Error message from the last failed attempt
31    last_error: Option<String>,
32    /// When the job was created
33    created_at: DateTime<Utc>,
34    /// When the job was last updated
35    updated_at: DateTime<Utc>,
36    /// Optional unique key for identifying/updating this job from user code
37    key: Option<String>,
38    /// Counter tracking the number of revisions/updates to this job
39    revision: i32,
40    /// When the job was locked for processing
41    locked_at: Option<DateTime<Utc>>,
42    /// Worker ID that has locked this job
43    locked_by: Option<String>,
44    /// Optional JSON flags to control job processing behavior
45    flags: Option<Value>,
46    /// Foreign key to the tasks table identifying the task type
47    task_id: i32,
48}
49
50/// `Job` extends `DbJob` with an additional task_identifier field.
51///
52/// This struct is used throughout the worker system for job processing and
53/// contains everything needed to execute a job, including the string identifier
54/// of the task which is used to look up the appropriate handler function.
55#[derive(FromRow, Getters, Debug, Clone, PartialEq, Eq, Builder)]
56#[getset(get = "pub")]
57#[builder(build_fn(private, name = "build_internal"), pattern = "owned")]
58#[allow(dead_code)]
59pub struct Job {
60    #[builder(default)]
61    id: i64,
62    #[builder(default, setter(strip_option))]
63    job_queue_id: Option<i32>,
64    #[builder(default = "serde_json::json!({})")]
65    payload: serde_json::Value,
66    #[builder(default)]
67    priority: i16,
68    #[builder(default = "Utc::now()")]
69    run_at: DateTime<Utc>,
70    #[builder(default)]
71    attempts: i16,
72    #[builder(default = "25")]
73    max_attempts: i16,
74    #[builder(default, setter(strip_option))]
75    last_error: Option<String>,
76    #[builder(default = "Utc::now()")]
77    created_at: DateTime<Utc>,
78    #[builder(default = "Utc::now()")]
79    updated_at: DateTime<Utc>,
80    #[builder(default, setter(strip_option))]
81    key: Option<String>,
82    #[builder(default)]
83    revision: i32,
84    #[builder(default, setter(strip_option))]
85    locked_at: Option<DateTime<Utc>>,
86    #[builder(default, setter(strip_option))]
87    locked_by: Option<String>,
88    #[builder(default, setter(strip_option))]
89    flags: Option<Value>,
90    #[builder(default)]
91    task_id: i32,
92    #[builder(default, setter(into))]
93    task_identifier: String,
94}
95
96/// Conversion from `Job` to `DbJob`, dropping the task_identifier field
97impl From<Job> for DbJob {
98    fn from(job: Job) -> DbJob {
99        DbJob {
100            id: job.id,
101            job_queue_id: job.job_queue_id,
102            payload: job.payload,
103            priority: job.priority,
104            run_at: job.run_at,
105            attempts: job.attempts,
106            max_attempts: job.max_attempts,
107            last_error: job.last_error,
108            created_at: job.created_at,
109            updated_at: job.updated_at,
110            key: job.key,
111            revision: job.revision,
112            locked_at: job.locked_at,
113            locked_by: job.locked_by,
114            flags: job.flags,
115            task_id: job.task_id,
116        }
117    }
118}
119
120impl Job {
121    /// Creates a new builder for constructing a `Job`.
122    pub fn builder() -> JobBuilder {
123        JobBuilder::default()
124    }
125
126    /// Creates a `Job` from a `DbJob` and a task identifier string.
127    ///
128    /// The task identifier is used to look up the appropriate handler function
129    /// when the job is executed.
130    ///
131    /// # Arguments
132    ///
133    /// * `db_job` - The database job record
134    /// * `task_identifier` - The string identifier for the task type
135    ///
136    /// # Returns
137    ///
138    /// A new `Job` instance with all fields from `db_job` plus the provided task identifier
139    pub fn from_db_job(db_job: DbJob, task_identifier: String) -> Job {
140        Job {
141            id: db_job.id,
142            job_queue_id: db_job.job_queue_id,
143            payload: db_job.payload,
144            priority: db_job.priority,
145            run_at: db_job.run_at,
146            attempts: db_job.attempts,
147            max_attempts: db_job.max_attempts,
148            last_error: db_job.last_error,
149            created_at: db_job.created_at,
150            updated_at: db_job.updated_at,
151            key: db_job.key,
152            revision: db_job.revision,
153            locked_at: db_job.locked_at,
154            locked_by: db_job.locked_by,
155            flags: db_job.flags,
156            task_id: db_job.task_id,
157            task_identifier,
158        }
159    }
160}
161
162impl JobBuilder {
163    /// Builds the Job with all configured values.
164    pub fn build(self) -> Job {
165        self.build_internal()
166            .expect("All fields have defaults, build should never fail")
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn test_from_db_job() {
176        let db_job = DbJob {
177            id: 1,
178            job_queue_id: Some(1),
179            payload: serde_json::json!({}),
180            priority: 1,
181            run_at: Utc::now(),
182            attempts: 1,
183            max_attempts: 1,
184            last_error: Some("error".to_string()),
185            created_at: Utc::now(),
186            updated_at: Utc::now(),
187            key: Some("key".to_string()),
188            revision: 1,
189            locked_at: Some(Utc::now()),
190            locked_by: Some("locked_by".to_string()),
191            flags: Some(serde_json::json!({})),
192            task_id: 1,
193        };
194        let task_identifier = "task_identifier".to_string();
195        let job = Job::from_db_job(db_job, task_identifier);
196        assert_eq!(job.id, 1);
197        assert_eq!(job.job_queue_id, Some(1));
198        assert_eq!(job.payload, serde_json::json!({}));
199        assert_eq!(job.priority, 1);
200        assert_eq!(job.attempts, 1);
201        assert_eq!(job.max_attempts, 1);
202        assert_eq!(job.last_error, Some("error".to_string()));
203        assert_eq!(job.key, Some("key".to_string()));
204        assert_eq!(job.revision, 1);
205        assert_eq!(job.locked_by, Some("locked_by".to_string()));
206        assert_eq!(job.task_id, 1);
207        assert_eq!(job.task_identifier, "task_identifier".to_string());
208    }
209
210    #[test]
211    fn test_from() {
212        let job = Job {
213            id: 1,
214            job_queue_id: Some(1),
215            payload: serde_json::json!({}),
216            priority: 1,
217            run_at: Utc::now(),
218            attempts: 1,
219            max_attempts: 1,
220            last_error: Some("error".to_string()),
221            created_at: Utc::now(),
222            updated_at: Utc::now(),
223            key: Some("key".to_string()),
224            revision: 1,
225            locked_at: Some(Utc::now()),
226            locked_by: Some("locked_by".to_string()),
227            flags: Some(serde_json::json!({})),
228            task_id: 1,
229            task_identifier: "task_identifier".to_string(),
230        };
231
232        let db_job: DbJob = job.clone().into();
233
234        assert_eq!(db_job.id, 1);
235        assert_eq!(db_job.job_queue_id, Some(1));
236        assert_eq!(db_job.payload, serde_json::json!({}));
237        assert_eq!(db_job.priority, 1);
238        assert_eq!(db_job.attempts, 1);
239        assert_eq!(db_job.max_attempts, 1);
240        assert_eq!(db_job.last_error, Some("error".to_string()));
241        assert_eq!(db_job.key, Some("key".to_string()));
242        assert_eq!(db_job.revision, 1);
243        assert_eq!(db_job.locked_by, Some("locked_by".to_string()));
244        assert_eq!(db_job.task_id, 1);
245    }
246}