zirv_queue/models/job.rs
1use diesel::prelude::*;
2use serde::{Deserialize, Serialize};
3use zirv_db::DB;
4
5use crate::{models::failed_job::FailedJob, Config};
6
7use super::processed_job::ProcessedJob;
8
9/// Represents a unit of work in the system. Each `Job` can be in different states
10/// (`pending`, `processing`, etc.) and may have multiple attempts if it fails.
11///
12/// Corresponds to the `jobs` table in the database.
13///
14/// # Fields
15///
16/// - `id`: Primary key of the job record. Auto-generated by the database.
17/// - `payload`: The data associated with this job (e.g., request parameters, message content).
18/// - `status`: Current status of the job (e.g., "pending", "processing").
19/// - `attempts`: Number of times an attempt has been made to run this job.
20/// - `available_at`: A timestamp indicating when this job is eligible to be processed.
21/// - `created_at`: Timestamp when the job was first created.
22/// - `updated_at`: Timestamp of the most recent update to the job record.
23#[derive(Queryable, Insertable, Serialize, Deserialize, Debug, Clone)]
24#[diesel(table_name = crate::schema::jobs)]
25pub struct Job {
26 pub id: i32,
27 pub payload: String,
28 pub status: String,
29 pub attempts: i32,
30 pub available_at: chrono::NaiveDateTime,
31 pub created_at: chrono::NaiveDateTime,
32 pub updated_at: chrono::NaiveDateTime,
33}
34
35/// Represents the data required to create a new `Job` record in the database.
36///
37/// Corresponds to the `jobs` table, but omits fields like `id`, `status`,
38/// `attempts`, and timestamps since they are either auto-generated or
39/// defaulted at creation time.
40///
41/// # Fields
42///
43/// - `payload`: The data associated with this job.
44/// - `available_at`: When the job can start being processed.
45#[derive(Insertable, Debug, Serialize, Deserialize)]
46#[diesel(table_name = crate::schema::jobs)]
47pub struct NewJob {
48 pub payload: String,
49 pub available_at: chrono::NaiveDateTime,
50}
51
52impl NewJob {
53 /// Creates a new `NewJob` struct with the provided payload, setting
54 /// `available_at` to the current local time.
55 ///
56 /// # Parameters
57 ///
58 /// - `payload`: The data associated with the job.
59 ///
60 /// # Returns
61 ///
62 /// A `NewJob` with the current local time as `available_at`.
63 pub fn new(payload: String) -> Self {
64 Self {
65 payload,
66 available_at: chrono::Local::now().naive_local(),
67 }
68 }
69
70 /// Creates a new `NewJob` struct with a custom `available_at` time.
71 ///
72 /// # Parameters
73 ///
74 /// - `payload`: The data associated with the job.
75 /// - `_available_at`: The point in time when the job becomes processable.
76 ///
77 /// # Returns
78 ///
79 /// A `NewJob` with the specified `available_at`.
80 pub fn with_time(payload: String, _available_at: chrono::NaiveDateTime) -> Self {
81 Self {
82 payload,
83 available_at: _available_at,
84 }
85 }
86}
87
88impl Job {
89 /// Creates a new `Job` instance without inserting it into the database.
90 ///
91 /// This sets default fields (e.g., `status = "pending"`, `attempts = 0`,
92 /// `created_at = now`, and `updated_at = now`).
93 ///
94 /// # Parameters
95 ///
96 /// - `payload`: The data associated with the job.
97 ///
98 /// # Returns
99 ///
100 /// A new `Job` struct ready to be inserted into the DB or used in-memory.
101 pub fn new(payload: String) -> Self {
102 Self {
103 id: 0,
104 payload,
105 status: "pending".to_string(),
106 attempts: 0,
107 available_at: chrono::Local::now().naive_local(),
108 created_at: chrono::Local::now().naive_local(),
109 updated_at: chrono::Local::now().naive_local(),
110 }
111 }
112
113 /// Creates a new `Job` with a custom `available_at` time, without inserting into the database.
114 ///
115 /// # Parameters
116 ///
117 /// - `payload`: The data associated with the job.
118 /// - `_available_at`: The desired time when the job becomes processable.
119 ///
120 /// # Returns
121 ///
122 /// A `Job` struct with default values for other fields.
123 pub fn with_time(payload: String, _available_at: chrono::NaiveDateTime) -> Self {
124 Self {
125 id: 0,
126 payload,
127 status: "pending".to_string(),
128 attempts: 0,
129 available_at: _available_at,
130 created_at: chrono::Local::now().naive_local(),
131 updated_at: chrono::Local::now().naive_local(),
132 }
133 }
134
135 /// Inserts a new `Job` record into the `jobs` table.
136 ///
137 /// This method will:
138 /// - Acquire a database connection from [`DB::get_conn`].
139 /// - Run a transaction inserting the provided `new_job` into the `jobs` table.
140 /// - Fetch and return the newly inserted `Job` (based on its descending `created_at`).
141 ///
142 /// # Parameters
143 ///
144 /// - `new_job`: The `NewJob` struct containing the data to be inserted.
145 ///
146 /// # Returns
147 ///
148 /// - `Ok(Job)` if the insertion succeeded.
149 /// - `Err(diesel::result::Error)` if there was a connection or insertion problem.
150 ///
151 /// # Errors
152 ///
153 /// - Returns [`diesel::result::Error::NotFound`] if unable to get a DB connection.
154 /// - Returns other Diesel errors if insertion or retrieval fails.
155 pub fn create(new_job: NewJob) -> QueryResult<Self> {
156 use crate::schema::jobs::dsl::*;
157
158 let mut conn = match DB::get_conn() {
159 Ok(conn) => conn,
160 Err(e) => {
161 eprintln!("Error getting DB connection: {:?}", e);
162 return Err(diesel::result::Error::NotFound);
163 }
164 };
165
166 conn.transaction::<_, diesel::result::Error, _>(|conn| {
167 diesel::insert_into(jobs).values(new_job).execute(conn)?;
168 let job: Job = jobs.order(created_at.desc()).first(conn)?;
169
170 Ok(job)
171 })
172 }
173
174 /// Updates an existing `Job` record in the `jobs` table.
175 ///
176 /// The following fields may be updated:
177 /// - `status` (copied from `self.status`)
178 /// - `attempts`
179 /// - `available_at`
180 /// - `updated_at` (set to `now`)
181 ///
182 /// # Returns
183 ///
184 /// - `Ok(())` if the update succeeded.
185 /// - `Err(diesel::result::Error)` if there was a connection or update issue.
186 pub fn update(self) -> QueryResult<()> {
187 use crate::schema::jobs::dsl::*;
188
189 let mut conn = match DB::get_conn() {
190 Ok(conn) => conn,
191 Err(e) => {
192 eprintln!("Error getting DB connection: {:?}", e);
193 return Err(diesel::result::Error::NotFound);
194 }
195 };
196
197 diesel::update(jobs.filter(id.eq(self.id)))
198 .set((
199 status.eq(self.status.clone()),
200 attempts.eq(self.attempts),
201 available_at.eq(self.available_at),
202 updated_at.eq(chrono::Local::now().naive_local()),
203 ))
204 .execute(&mut conn)?;
205
206 Ok(())
207 }
208
209 /// Marks a job as completed by inserting a corresponding `ProcessedJob` record
210 /// and then removing the `Job` from the `jobs` table.
211 ///
212 /// # Parameters
213 ///
214 /// - `_return_value`: A string containing any relevant result data or message
215 /// about the job’s completion.
216 ///
217 /// # Returns
218 ///
219 /// - `Ok(())` if the operation (inserting `ProcessedJob` and deleting the original `Job`) succeeded.
220 /// - `Err(diesel::result::Error)` if any database operation failed.
221 pub fn complete(&self, _return_value: String) -> QueryResult<()> {
222 match ProcessedJob::create(self, _return_value) {
223 Ok(_) => {
224 use crate::schema::jobs::dsl::*;
225
226 let mut conn = match DB::get_conn() {
227 Ok(conn) => conn,
228 Err(e) => {
229 eprintln!("Error getting DB connection: {:?}", e);
230 return Err(diesel::result::Error::NotFound);
231 }
232 };
233
234 diesel::delete(jobs.filter(id.eq(self.id))).execute(&mut conn)?;
235 }
236 Err(e) => {
237 eprintln!("Error creating processed job: {:?}", e);
238 return Err(diesel::result::Error::NotFound);
239 }
240 }
241
242 Ok(())
243 }
244
245 /// Marks a job as failed. If the job has remaining retry attempts, it increments
246 /// the job’s `attempts` and defers its `available_at` time by the configured retry interval.
247 /// Otherwise, it inserts a `FailedJob` record and removes the job from `jobs`.
248 ///
249 /// # Parameters
250 ///
251 /// - `_reason`: A description of why the job failed.
252 ///
253 /// # Returns
254 ///
255 /// - `Ok(())` if the job was updated or moved to failed jobs.
256 /// - `Err(diesel::result::Error)` if a database operation fails.
257 ///
258 /// # Behavior
259 ///
260 /// - If `max_retry_attempts` (from [`Config::get_config`]) is greater than the
261 /// current number of attempts, the job is set to retry later.
262 /// - Otherwise, a record is inserted into `failed_jobs`, and the job is deleted
263 /// from the `jobs` table.
264 pub fn fail(&mut self, _reason: &str) -> QueryResult<()> {
265 let config = Config::get_config();
266
267 if config.max_retry_attempts > self.attempts {
268 let mut job = self.clone();
269
270 job.attempts += 1;
271
272 // Convert milliseconds to seconds for chrono::Duration.
273 let retry_time =
274 chrono::Local::now().naive_local() + chrono::Duration::seconds(config.retry_interval_ms / 1000);
275 job.available_at = retry_time;
276 job.status = "pending".to_string();
277
278 match job.update() {
279 Ok(_) => {}
280 Err(e) => {
281 eprintln!("Error updating job: {:?}", e);
282 return Err(diesel::result::Error::NotFound);
283 }
284 };
285 } else {
286 match FailedJob::create(self, _reason) {
287 Ok(_) => {
288 use crate::schema::jobs::dsl::*;
289
290 let mut conn = match DB::get_conn() {
291 Ok(conn) => conn,
292 Err(e) => {
293 eprintln!("Error getting DB connection: {:?}", e);
294 return Err(diesel::result::Error::NotFound);
295 }
296 };
297
298 diesel::delete(jobs.filter(id.eq(self.id))).execute(&mut conn)?;
299 }
300 Err(e) => {
301 eprintln!("Error creating failed job: {:?}", e);
302 return Err(e);
303 }
304 }
305 }
306
307 Ok(())
308 }
309
310 /// Returns the count of jobs that are currently in `pending` status
311 /// and have an `available_at` time less than or equal to now.
312 ///
313 /// Useful for checking how many pending jobs are ready to be processed.
314 ///
315 /// # Returns
316 ///
317 /// - `Ok(i64)` with the count of matching jobs.
318 /// - `Err(diesel::result::Error)` if there was an issue retrieving data.
319 pub fn fetch_pending_jobs_count() -> QueryResult<i64> {
320 use crate::schema::jobs::dsl::*;
321
322 let mut conn = match DB::get_conn() {
323 Ok(conn) => conn,
324 Err(e) => {
325 eprintln!("Error getting DB connection: {:?}", e);
326 return Err(diesel::result::Error::NotFound);
327 }
328 };
329
330 jobs
331 .filter(status.eq("pending"))
332 .filter(available_at.le(chrono::Local::now().naive_local()))
333 .count()
334 .get_result(&mut conn)
335 }
336
337 /// Fetches the next pending job (the earliest by ID) that is ready to be processed,
338 /// and marks its status as `processing`.
339 ///
340 /// # Returns
341 ///
342 /// - `Ok(Some(Job))` if a pending job was found and updated.
343 /// - `Ok(None)` if no pending job is available.
344 /// - `Err(diesel::result::Error)` if a database operation fails.
345 ///
346 /// # Transaction
347 ///
348 /// This operation is performed within a transaction to ensure atomicity:
349 /// retrieving the job and updating its status happen together.
350 pub fn fetch_pending_job() -> QueryResult<Option<Self>> {
351 use crate::schema::jobs::dsl::*;
352
353 let mut conn = match DB::get_conn() {
354 Ok(conn) => conn,
355 Err(e) => {
356 eprintln!("Error getting DB connection: {:?}", e);
357 return Err(diesel::result::Error::NotFound);
358 }
359 };
360
361 conn.transaction::<_, diesel::result::Error, _>(|conn| {
362 let job = match jobs
363 .filter(status.eq("pending"))
364 .filter(available_at.le(chrono::Local::now().naive_local()))
365 .order(id.asc())
366 .for_update()
367 .first::<Job>(conn)
368 .optional()
369 {
370 Ok(job) => job,
371 Err(e) => {
372 eprintln!("Error fetching pending job: {:?}", e);
373 return Err(diesel::result::Error::NotFound);
374 }
375 };
376
377 if let Some(job) = job {
378 diesel::update(jobs.filter(id.eq(job.id)))
379 .set((
380 status.eq("processing"),
381 updated_at.eq(chrono::Local::now().naive_local()),
382 ))
383 .execute(conn)?;
384
385 Ok(Some(job))
386 } else {
387 Ok(None)
388 }
389 })
390 }
391}