zirv_queue/models/
job.rs

1use diesel::prelude::*;
2use serde::{Deserialize, Serialize};
3use zirv_db::DB;
4
5use crate::{models::failed_job::FailedJob, Config};
6
7use super::processed_job::ProcessedJob;
8
9/// Represents a unit of work in the system. Each `Job` can be in different states
10/// (`pending`, `processing`, etc.) and may have multiple attempts if it fails.
11///
12/// Corresponds to the `jobs` table in the database.
13///
14/// # Fields
15///
16/// - `id`: Primary key of the job record. Auto-generated by the database.
17/// - `payload`: The data associated with this job (e.g., request parameters, message content).
18/// - `status`: Current status of the job (e.g., "pending", "processing").
19/// - `attempts`: Number of times an attempt has been made to run this job.
20/// - `available_at`: A timestamp indicating when this job is eligible to be processed.
21/// - `created_at`: Timestamp when the job was first created.
22/// - `updated_at`: Timestamp of the most recent update to the job record.
23#[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)]
24#[diesel(table_name = crate::schema::jobs)]
25pub struct Job {
26    pub id: i32,
27    pub payload: String,
28    pub status: String,
29    pub attempts: i32,
30    pub available_at: chrono::NaiveDateTime,
31    pub created_at: chrono::NaiveDateTime,
32    pub updated_at: chrono::NaiveDateTime,
33}
34
35/// Represents the data required to create a new `Job` record in the database.
36///
37/// Corresponds to the `jobs` table, but omits fields like `id`, `status`,
38/// `attempts`, and timestamps since they are either auto-generated or
39/// defaulted at creation time.
40///
41/// # Fields
42///
43/// - `payload`: The data associated with this job.
44/// - `available_at`: When the job can start being processed.
45#[derive(Insertable, Debug, Serialize, Deserialize)]
46#[diesel(table_name = crate::schema::jobs)]
47pub struct NewJob {
48    pub payload: String,
49    pub available_at: chrono::NaiveDateTime,
50}
51
52impl NewJob {
53    /// Creates a new `NewJob` struct with the provided payload, setting
54    /// `available_at` to the current local time.
55    ///
56    /// # Parameters
57    ///
58    /// - `payload`: The data associated with the job.
59    ///
60    /// # Returns
61    ///
62    /// A `NewJob` with the current local time as `available_at`.
63    pub fn new(payload: String) -> Self {
64        Self {
65            payload,
66            available_at: chrono::Local::now().naive_local(),
67        }
68    }
69
70    /// Creates a new `NewJob` struct with a custom `available_at` time.
71    ///
72    /// # Parameters
73    ///
74    /// - `payload`: The data associated with the job.
75    /// - `_available_at`: The point in time when the job becomes processable.
76    ///
77    /// # Returns
78    ///
79    /// A `NewJob` with the specified `available_at`.
80    pub fn with_time(payload: String, _available_at: chrono::NaiveDateTime) -> Self {
81        Self {
82            payload,
83            available_at: _available_at,
84        }
85    }
86}
87
88impl Job {
89    /// Creates a new `Job` instance without inserting it into the database.
90    ///
91    /// This sets default fields (e.g., `status = "pending"`, `attempts = 0`,
92    /// `created_at = now`, and `updated_at = now`).
93    ///
94    /// # Parameters
95    ///
96    /// - `payload`: The data associated with the job.
97    ///
98    /// # Returns
99    ///
100    /// A new `Job` struct ready to be inserted into the DB or used in-memory.
101    pub fn new(payload: String) -> Self {
102        Self {
103            id: 0,
104            payload,
105            status: "pending".to_string(),
106            attempts: 0,
107            available_at: chrono::Local::now().naive_local(),
108            created_at: chrono::Local::now().naive_local(),
109            updated_at: chrono::Local::now().naive_local(),
110        }
111    }
112
113    /// Creates a new `Job` with a custom `available_at` time, without inserting into the database.
114    ///
115    /// # Parameters
116    ///
117    /// - `payload`: The data associated with the job.
118    /// - `_available_at`: The desired time when the job becomes processable.
119    ///
120    /// # Returns
121    ///
122    /// A `Job` struct with default values for other fields.
123    pub fn with_time(payload: String, _available_at: chrono::NaiveDateTime) -> Self {
124        Self {
125            id: 0,
126            payload,
127            status: "pending".to_string(),
128            attempts: 0,
129            available_at: _available_at,
130            created_at: chrono::Local::now().naive_local(),
131            updated_at: chrono::Local::now().naive_local(),
132        }
133    }
134
135    /// Inserts a new `Job` record into the `jobs` table.
136    ///
137    /// This method will:
138    /// - Acquire a database connection from [`DB::get_conn`].
139    /// - Run a transaction inserting the provided `new_job` into the `jobs` table.
140    /// - Fetch and return the newly inserted `Job` (based on its descending `created_at`).
141    ///
142    /// # Parameters
143    ///
144    /// - `new_job`: The `NewJob` struct containing the data to be inserted.
145    ///
146    /// # Returns
147    ///
148    /// - `Ok(Job)` if the insertion succeeded.
149    /// - `Err(diesel::result::Error)` if there was a connection or insertion problem.
150    ///
151    /// # Errors
152    ///
153    /// - Returns [`diesel::result::Error::NotFound`] if unable to get a DB connection.
154    /// - Returns other Diesel errors if insertion or retrieval fails.
155    pub fn create(new_job: NewJob) -> QueryResult<Self> {
156        use crate::schema::jobs::dsl::*;
157
158        let mut conn = match DB::get_conn() {
159            Ok(conn) => conn,
160            Err(e) => {
161                eprintln!("Error getting DB connection: {:?}", e);
162                return Err(diesel::result::Error::NotFound);
163            }
164        };
165
166        conn.transaction::<_, diesel::result::Error, _>(|conn| {
167            diesel::insert_into(jobs).values(new_job).execute(conn)?;
168            let job: Job = jobs.order(created_at.desc()).first(conn)?;
169
170            Ok(job)
171        })
172    }
173
174    /// Updates an existing `Job` record in the `jobs` table.
175    ///
176    /// The following fields may be updated:
177    /// - `status` (copied from `self.status`)
178    /// - `attempts`
179    /// - `available_at`
180    /// - `updated_at` (set to `now`)
181    ///
182    /// # Returns
183    ///
184    /// - `Ok(())` if the update succeeded.
185    /// - `Err(diesel::result::Error)` if there was a connection or update issue.
186    pub fn update(self) -> QueryResult<()> {
187        use crate::schema::jobs::dsl::*;
188
189        let mut conn = match DB::get_conn() {
190            Ok(conn) => conn,
191            Err(e) => {
192                eprintln!("Error getting DB connection: {:?}", e);
193                return Err(diesel::result::Error::NotFound);
194            }
195        };
196
197        diesel::update(jobs.filter(id.eq(self.id)))
198            .set((
199                status.eq(self.status.clone()),
200                attempts.eq(self.attempts),
201                available_at.eq(self.available_at),
202                updated_at.eq(chrono::Local::now().naive_local()),
203            ))
204            .execute(&mut conn)?;
205
206        Ok(())
207    }
208
209    /// Marks a job as completed by inserting a corresponding `ProcessedJob` record
210    /// and then removing the `Job` from the `jobs` table.
211    ///
212    /// # Parameters
213    ///
214    /// - `_return_value`: A string containing any relevant result data or message
215    ///   about the job’s completion.
216    ///
217    /// # Returns
218    ///
219    /// - `Ok(())` if the operation (inserting `ProcessedJob` and deleting the original `Job`) succeeded.
220    /// - `Err(diesel::result::Error)` if any database operation failed.
221    pub fn complete(&self, _return_value: String) -> QueryResult<()> {
222        match ProcessedJob::create(self, _return_value) {
223            Ok(_) => {
224                use crate::schema::jobs::dsl::*;
225
226                let mut conn = match DB::get_conn() {
227                    Ok(conn) => conn,
228                    Err(e) => {
229                        eprintln!("Error getting DB connection: {:?}", e);
230                        return Err(diesel::result::Error::NotFound);
231                    }
232                };
233
234                diesel::delete(jobs.filter(id.eq(self.id))).execute(&mut conn)?;
235            }
236            Err(e) => {
237                eprintln!("Error creating processed job: {:?}", e);
238                return Err(diesel::result::Error::NotFound);
239            }
240        }
241
242        Ok(())
243    }
244
245    /// Marks a job as failed. If the job has remaining retry attempts, it increments
246    /// the job’s `attempts` and defers its `available_at` time by the configured retry interval.
247    /// Otherwise, it inserts a `FailedJob` record and removes the job from `jobs`.
248    ///
249    /// # Parameters
250    ///
251    /// - `_reason`: A description of why the job failed.
252    ///
253    /// # Returns
254    ///
255    /// - `Ok(())` if the job was updated or moved to failed jobs.
256    /// - `Err(diesel::result::Error)` if a database operation fails.
257    ///
258    /// # Behavior
259    ///
260    /// - If `max_retry_attempts` (from [`Config::get_config`]) is greater than the
261    ///   current number of attempts, the job is set to retry later.
262    /// - Otherwise, a record is inserted into `failed_jobs`, and the job is deleted
263    ///   from the `jobs` table.
264    pub fn fail(&mut self, _reason: &str) -> QueryResult<()> {
265        let config = Config::get_config();
266
267        if config.max_retry_attempts > self.attempts {
268            let mut job = self.clone();
269
270            job.attempts += 1;
271
272            // Convert milliseconds to seconds for chrono::Duration.
273            let retry_time =
274                chrono::Local::now().naive_local() + chrono::Duration::seconds(config.retry_interval_ms / 1000);
275            job.available_at = retry_time;
276            job.status = "pending".to_string();
277
278            match job.update() {
279                Ok(_) => {}
280                Err(e) => {
281                    eprintln!("Error updating job: {:?}", e);
282                    return Err(diesel::result::Error::NotFound);
283                }
284            };
285        } else {
286            match FailedJob::create(self, _reason) {
287                Ok(_) => {
288                    use crate::schema::jobs::dsl::*;
289
290                    let mut conn = match DB::get_conn() {
291                        Ok(conn) => conn,
292                        Err(e) => {
293                            eprintln!("Error getting DB connection: {:?}", e);
294                            return Err(diesel::result::Error::NotFound);
295                        }
296                    };
297
298                    diesel::delete(jobs.filter(id.eq(self.id))).execute(&mut conn)?;
299                }
300                Err(e) => {
301                    eprintln!("Error creating failed job: {:?}", e);
302                    return Err(e);
303                }
304            }
305        }
306
307        Ok(())
308    }
309
310    /// Returns the count of jobs that are currently in `pending` status
311    /// and have an `available_at` time less than or equal to now.
312    ///
313    /// Useful for checking how many pending jobs are ready to be processed.
314    ///
315    /// # Returns
316    ///
317    /// - `Ok(i64)` with the count of matching jobs.
318    /// - `Err(diesel::result::Error)` if there was an issue retrieving data.
319    pub fn fetch_pending_jobs_count() -> QueryResult<i64> {
320        use crate::schema::jobs::dsl::*;
321
322        let mut conn = match DB::get_conn() {
323            Ok(conn) => conn,
324            Err(e) => {
325                eprintln!("Error getting DB connection: {:?}", e);
326                return Err(diesel::result::Error::NotFound);
327            }
328        };
329
330        jobs
331            .filter(status.eq("pending"))
332            .filter(available_at.le(chrono::Local::now().naive_local()))
333            .count()
334            .get_result(&mut conn)
335    }
336
337    /// Fetches the next pending job (the earliest by ID) that is ready to be processed,
338    /// and marks its status as `processing`.
339    ///
340    /// # Returns
341    ///
342    /// - `Ok(Some(Job))` if a pending job was found and updated.
343    /// - `Ok(None)` if no pending job is available.
344    /// - `Err(diesel::result::Error)` if a database operation fails.
345    ///
346    /// # Transaction
347    ///
348    /// This operation is performed within a transaction to ensure atomicity:
349    /// retrieving the job and updating its status happen together.
350    pub fn fetch_pending_job() -> QueryResult<Option<Self>> {
351        use crate::schema::jobs::dsl::*;
352
353        let mut conn = match DB::get_conn() {
354            Ok(conn) => conn,
355            Err(e) => {
356                eprintln!("Error getting DB connection: {:?}", e);
357                return Err(diesel::result::Error::NotFound);
358            }
359        };
360
361        conn.transaction::<_, diesel::result::Error, _>(|conn| {
362            let job = match jobs
363                .filter(status.eq("pending"))
364                .filter(available_at.le(chrono::Local::now().naive_local()))
365                .order(id.asc())
366                .for_update()
367                .first::<Job>(conn)
368                .optional()
369            {
370                Ok(job) => job,
371                Err(e) => {
372                    eprintln!("Error fetching pending job: {:?}", e);
373                    return Err(diesel::result::Error::NotFound);
374                }
375            };
376
377            if let Some(job) = job {
378                diesel::update(jobs.filter(id.eq(job.id)))
379                    .set((
380                        status.eq("processing"),
381                        updated_at.eq(chrono::Local::now().naive_local()),
382                    ))
383                    .execute(conn)?;
384
385                Ok(Some(job))
386            } else {
387                Ok(None)
388            }
389        })
390    }
391}