1use chrono::{DateTime, Utc};
10use sea_orm::{
11 ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
12};
13use serde::{Deserialize, Serialize};
14
15use crate::error::Error;
16
17static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
22
23type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
25static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
26
27pub struct Queue;
32
33impl Queue {
34 pub fn connection() -> &'static DatabaseConnection {
40 GLOBAL_CONNECTION
41 .get()
42 .expect("Queue not initialized. Call Queue::init() first.")
43 }
44
45 pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
49 GLOBAL_CONNECTION
50 .set(conn)
51 .map_err(|_| Error::custom("Queue already initialized"))?;
52 Ok(())
53 }
54
55 pub fn is_initialized() -> bool {
57 GLOBAL_CONNECTION.get().is_some()
58 }
59
60 pub fn register<J>()
66 where
67 J: crate::Job + serde::de::DeserializeOwned + 'static,
68 {
69 JOB_REGISTRARS
70 .lock()
71 .unwrap()
72 .push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
73 }
74
75 pub fn has_registered_jobs() -> bool {
77 !JOB_REGISTRARS.lock().unwrap().is_empty()
78 }
79
80 pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
84 for r in JOB_REGISTRARS.lock().unwrap().iter() {
85 r(w);
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
96pub struct JobRow {
97 pub id: i64,
99 pub job_type: String,
101 pub payload: String,
103 pub queue: String,
105 pub attempts: u32,
107 pub max_retries: u32,
109 pub idempotency_key: Option<String>,
111 pub tenant_id: Option<i64>,
113 pub available_at: DateTime<Utc>,
115 pub created_at: DateTime<Utc>,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum JobState {
127 Pending,
129 Delayed,
131 Failed,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct JobInfo {
138 pub id: i64,
140 pub job_type: String,
142 pub queue: String,
144 pub attempts: u32,
146 pub max_retries: u32,
148 pub created_at: String,
150 pub available_at: String,
152 pub state: JobState,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct SingleQueueStats {
159 pub name: String,
161 pub pending: usize,
163 pub delayed: usize,
165}
166
167#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169pub struct QueueStats {
170 pub queues: Vec<SingleQueueStats>,
172 pub total_failed: usize,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FailedJobInfo {
179 pub job: JobInfo,
181 pub error: String,
183 pub failed_at: DateTime<Utc>,
186}
187
188fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
194 let id: i64 = row
195 .try_get_by::<i64, _>("id")
196 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
197 let job_type: String = row
198 .try_get_by::<String, _>("job_type")
199 .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
200 let payload: String = row
201 .try_get_by::<String, _>("payload")
202 .map_err(|e| Error::custom(format!("parse payload: {e}")))?;
203 let queue: String = row
204 .try_get_by::<String, _>("queue")
205 .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
206 let attempts: i32 = row
207 .try_get_by::<i32, _>("attempts")
208 .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
209 let max_retries: i32 = row
210 .try_get_by::<i32, _>("max_retries")
211 .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
212 let idempotency_key: Option<String> = row
213 .try_get_by::<Option<String>, _>("idempotency_key")
214 .map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
215 let tenant_id: Option<i64> = row
216 .try_get_by::<Option<i64>, _>("tenant_id")
217 .map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
218
219 let available_at = parse_timestamp(row, "available_at")?;
223 let created_at = parse_timestamp(row, "created_at")?;
224
225 Ok(JobRow {
226 id,
227 job_type,
228 payload,
229 queue,
230 attempts: attempts as u32,
231 max_retries: max_retries as u32,
232 idempotency_key,
233 tenant_id,
234 available_at,
235 created_at,
236 })
237}
238
239fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
242 if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
244 return Ok(dt);
245 }
246 let s: String = row
248 .try_get_by::<String, _>(col)
249 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
250 DateTime::parse_from_rfc3339(&s)
251 .map(|dt| dt.with_timezone(&Utc))
252 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
253}
254
255fn parse_optional_timestamp(
258 row: &sea_orm::QueryResult,
259 col: &str,
260) -> Result<Option<DateTime<Utc>>, Error> {
261 if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
263 return Ok(opt);
264 }
265 let s: Option<String> = row
267 .try_get_by::<Option<String>, _>(col)
268 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
269 match s {
270 None => Ok(None),
271 Some(s) => DateTime::parse_from_rfc3339(&s)
272 .map(|dt| Some(dt.with_timezone(&Utc)))
273 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
274 }
275}
276
277fn ph(backend: DatabaseBackend, n: usize) -> String {
279 match backend {
280 DatabaseBackend::Postgres => format!("${n}"),
281 _ => format!("?{n}"),
282 }
283}
284
285pub async fn claim(
299 conn: &DatabaseConnection,
300 queue: &str,
301 worker_id: &str,
302) -> Result<Option<JobRow>, Error> {
303 match conn.get_database_backend() {
304 DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
305 DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
306 _ => Err(Error::UnsupportedBackend),
307 }
308}
309
310async fn claim_postgres(
311 conn: &DatabaseConnection,
312 queue: &str,
313 worker_id: &str,
314) -> Result<Option<JobRow>, Error> {
315 let txn = conn.begin().await.map_err(Error::Db)?;
316
317 let select = Statement::from_sql_and_values(
318 DatabaseBackend::Postgres,
319 "SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
320 tenant_id, available_at, created_at FROM jobs \
321 WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
322 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
323 [Value::String(Some(Box::new(queue.to_string())))],
324 );
325
326 let row = txn.query_one(select).await.map_err(Error::Db)?;
327 let Some(row) = row else {
328 txn.commit().await.map_err(Error::Db)?;
329 return Ok(None);
330 };
331 let job = parse_job_row(&row)?;
332
333 let upd = Statement::from_sql_and_values(
334 DatabaseBackend::Postgres,
335 "UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
336 [
337 Value::BigInt(Some(job.id)),
338 Value::String(Some(Box::new(worker_id.to_string()))),
339 ],
340 );
341 txn.execute(upd).await.map_err(Error::Db)?;
342 txn.commit().await.map_err(Error::Db)?;
343
344 Ok(Some(job))
345}
346
347async fn claim_sqlite(
348 conn: &DatabaseConnection,
349 queue: &str,
350 worker_id: &str,
351) -> Result<Option<JobRow>, Error> {
352 let now_iso = Utc::now().to_rfc3339();
353
354 let txn = conn.begin().await.map_err(Error::Db)?;
368
369 let stmt = Statement::from_sql_and_values(
370 DatabaseBackend::Sqlite,
371 "UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
372 WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
373 AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
374 RETURNING id, job_type, payload, queue, attempts, max_retries, \
375 idempotency_key, tenant_id, available_at, created_at",
376 [
377 Value::String(Some(Box::new(now_iso))),
378 Value::String(Some(Box::new(worker_id.to_string()))),
379 Value::String(Some(Box::new(queue.to_string()))),
380 ],
381 );
382
383 let row = match txn.query_one(stmt).await {
384 Ok(r) => r,
385 Err(e) => {
386 let _ = txn.rollback().await;
388 return Err(Error::Db(e));
389 }
390 };
391 txn.commit().await.map_err(Error::Db)?;
392
393 row.map(|r| parse_job_row(&r)).transpose()
394}
395
396pub async fn reaper(
410 conn: &DatabaseConnection,
411 queue: &str,
412 visibility_timeout: std::time::Duration,
413) -> Result<(), Error> {
414 let now = Utc::now();
415 let duration = chrono::Duration::from_std(visibility_timeout)
416 .map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
417 let cutoff = (now - duration).to_rfc3339();
418 let now_iso = now.to_rfc3339();
419
420 let backend = conn.get_database_backend();
421 let txn = conn.begin().await.map_err(Error::Db)?;
422
423 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
428 let requeue_sql = format!(
429 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
430 attempts = attempts + 1, available_at = {p1} \
431 WHERE status='claimed' AND claimed_at < {p2} \
432 AND attempts + 1 < max_retries AND queue = {p3}"
433 );
434 let requeue = Statement::from_sql_and_values(
435 backend,
436 &requeue_sql,
437 [
438 Value::String(Some(Box::new(now_iso.clone()))),
439 Value::String(Some(Box::new(cutoff.clone()))),
440 Value::String(Some(Box::new(queue.to_string()))),
441 ],
442 );
443 txn.execute(requeue).await.map_err(Error::Db)?;
444
445 let (pp1, pp2, pp3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
448 let park_sql = format!(
449 "UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
450 WHERE status='claimed' AND claimed_at < {pp2} \
451 AND attempts + 1 >= max_retries AND queue = {pp3}"
452 );
453 let park = Statement::from_sql_and_values(
454 backend,
455 &park_sql,
456 [
457 Value::String(Some(Box::new(now_iso))),
458 Value::String(Some(Box::new(cutoff))),
459 Value::String(Some(Box::new(queue.to_string()))),
460 ],
461 );
462 txn.execute(park).await.map_err(Error::Db)?;
463
464 txn.commit().await.map_err(Error::Db)
465}
466
467#[allow(clippy::too_many_arguments)]
477pub async fn enqueue(
478 conn: &DatabaseConnection,
479 queue: &str,
480 job_type: &str,
481 payload: &str,
482 max_retries: u32,
483 idempotency_key: Option<&str>,
484 tenant_id: Option<i64>,
485 available_at: DateTime<Utc>,
486) -> Result<(), Error> {
487 let backend = conn.get_database_backend();
488 let now_iso = Utc::now().to_rfc3339();
489 let available_iso = available_at.to_rfc3339();
490
491 if let Some(idem) = idempotency_key {
492 let (p1, p2, p3, p4, p5, p6, p7, p8) = (
494 ph(backend, 1),
495 ph(backend, 2),
496 ph(backend, 3),
497 ph(backend, 4),
498 ph(backend, 5),
499 ph(backend, 6),
500 ph(backend, 7),
501 ph(backend, 8),
502 );
503 let sql = format!(
504 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
505 idempotency_key, tenant_id, available_at, created_at) \
506 SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
507 WHERE NOT EXISTS ( \
508 SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
509 AND status IN ('pending','claimed') \
510 )"
511 );
512 let stmt = Statement::from_sql_and_values(
513 backend,
514 &sql,
515 [
516 Value::String(Some(Box::new(queue.to_string()))),
517 Value::String(Some(Box::new(job_type.to_string()))),
518 Value::String(Some(Box::new(payload.to_string()))),
519 Value::Int(Some(max_retries as i32)),
520 Value::String(Some(Box::new(idem.to_string()))),
521 tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
522 Value::String(Some(Box::new(available_iso))),
523 Value::String(Some(Box::new(now_iso))),
524 ],
525 );
526 conn.execute(stmt).await.map_err(Error::Db)?;
527 } else {
528 let (p1, p2, p3, p4, p5, p6, p7) = (
530 ph(backend, 1),
531 ph(backend, 2),
532 ph(backend, 3),
533 ph(backend, 4),
534 ph(backend, 5),
535 ph(backend, 6),
536 ph(backend, 7),
537 );
538 let sql = format!(
539 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
540 tenant_id, available_at, created_at) \
541 VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
542 );
543 let stmt = Statement::from_sql_and_values(
544 backend,
545 &sql,
546 [
547 Value::String(Some(Box::new(queue.to_string()))),
548 Value::String(Some(Box::new(job_type.to_string()))),
549 Value::String(Some(Box::new(payload.to_string()))),
550 Value::Int(Some(max_retries as i32)),
551 tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
552 Value::String(Some(Box::new(available_iso))),
553 Value::String(Some(Box::new(now_iso))),
554 ],
555 );
556 conn.execute(stmt).await.map_err(Error::Db)?;
557 }
558
559 Ok(())
560}
561
562pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
568 let backend = conn.get_database_backend();
569 let p1 = ph(backend, 1);
570 let sql = format!("DELETE FROM jobs WHERE id = {p1}");
571 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
572 conn.execute(stmt).await.map_err(Error::Db)?;
573 Ok(())
574}
575
576pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
578 let backend = conn.get_database_backend();
579 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
580 let sql =
581 format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
582 let stmt = Statement::from_sql_and_values(
583 backend,
584 &sql,
585 [
586 Value::String(Some(Box::new(error.to_string()))),
587 Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
588 Value::BigInt(Some(id)),
589 ],
590 );
591 conn.execute(stmt).await.map_err(Error::Db)?;
592 Ok(())
593}
594
595pub async fn release_job(
598 conn: &DatabaseConnection,
599 id: i64,
600 attempts: u32,
601 available_at: DateTime<Utc>,
602) -> Result<(), Error> {
603 let backend = conn.get_database_backend();
604 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
605 let sql = format!(
606 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
607 attempts={p1}, available_at={p2} WHERE id = {p3}"
608 );
609 let stmt = Statement::from_sql_and_values(
610 backend,
611 &sql,
612 [
613 Value::Int(Some(attempts as i32)),
614 Value::String(Some(Box::new(available_at.to_rfc3339()))),
615 Value::BigInt(Some(id)),
616 ],
617 );
618 conn.execute(stmt).await.map_err(Error::Db)?;
619 Ok(())
620}
621
622pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
625 let backend = conn.get_database_backend();
626 let p1 = ph(backend, 1);
627 let sql = format!(
628 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
629 WHERE status='claimed' AND claimed_by={p1}"
630 );
631 let stmt = Statement::from_sql_and_values(
632 backend,
633 &sql,
634 [Value::String(Some(Box::new(worker_id.to_string())))],
635 );
636 conn.execute(stmt).await.map_err(Error::Db)?;
637 Ok(())
638}
639
640pub async fn get_pending_jobs(
646 conn: &DatabaseConnection,
647 queue: &str,
648 limit: u64,
649) -> Result<Vec<JobInfo>, Error> {
650 let backend = conn.get_database_backend();
651 let now_iso = Utc::now().to_rfc3339();
652 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
653 let sql = format!(
654 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
655 FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
656 ORDER BY id LIMIT {p3}"
657 );
658 let stmt = Statement::from_sql_and_values(
659 backend,
660 &sql,
661 [
662 Value::String(Some(Box::new(queue.to_string()))),
663 Value::String(Some(Box::new(now_iso))),
664 Value::BigInt(Some(limit as i64)),
665 ],
666 );
667 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
668 rows.iter()
669 .map(|r| parse_job_info(r, JobState::Pending))
670 .collect()
671}
672
673pub async fn get_delayed_jobs(
675 conn: &DatabaseConnection,
676 queue: &str,
677 limit: u64,
678) -> Result<Vec<JobInfo>, Error> {
679 let backend = conn.get_database_backend();
680 let now_iso = Utc::now().to_rfc3339();
681 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
682 let sql = format!(
683 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
684 FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
685 ORDER BY id LIMIT {p3}"
686 );
687 let stmt = Statement::from_sql_and_values(
688 backend,
689 &sql,
690 [
691 Value::String(Some(Box::new(queue.to_string()))),
692 Value::String(Some(Box::new(now_iso))),
693 Value::BigInt(Some(limit as i64)),
694 ],
695 );
696 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
697 rows.iter()
698 .map(|r| parse_job_info(r, JobState::Delayed))
699 .collect()
700}
701
702pub async fn get_failed_jobs(
704 conn: &DatabaseConnection,
705 limit: u64,
706) -> Result<Vec<FailedJobInfo>, Error> {
707 let backend = conn.get_database_backend();
708 let p1 = ph(backend, 1);
709 let sql = format!(
712 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
713 error, failed_at FROM jobs WHERE status='failed' \
714 ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
715 );
716 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
717 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
718 rows.iter().map(parse_failed_job_info).collect()
719}
720
721pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
723 let backend = conn.get_database_backend();
724 let now_iso = Utc::now().to_rfc3339();
725 let mut queue_stats = Vec::new();
726
727 for &q in queues {
728 let p1 = ph(backend, 1);
729 let p2 = ph(backend, 2);
730 let p3 = ph(backend, 3);
731 let pending_sql = format!(
732 "SELECT COUNT(*) as cnt FROM jobs \
733 WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
734 );
735 let pending_stmt = Statement::from_sql_and_values(
736 backend,
737 &pending_sql,
738 [
739 Value::String(Some(Box::new(q.to_string()))),
740 Value::String(Some(Box::new(now_iso.clone()))),
741 ],
742 );
743 let pending_row = conn
744 .query_one(pending_stmt)
745 .await
746 .map_err(Error::Db)?
747 .ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
748 let pending: i64 = pending_row
749 .try_get_by::<i64, _>("cnt")
750 .map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
751
752 let delayed_sql = format!(
753 "SELECT COUNT(*) as cnt FROM jobs \
754 WHERE status='pending' AND queue={p1} AND available_at > {p3}"
755 );
756 let delayed_stmt = Statement::from_sql_and_values(
757 backend,
758 &delayed_sql,
759 [
760 Value::String(Some(Box::new(q.to_string()))),
761 Value::String(Some(Box::new(now_iso.clone()))),
762 ],
763 );
764 let delayed_row = conn
765 .query_one(delayed_stmt)
766 .await
767 .map_err(Error::Db)?
768 .ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
769 let delayed: i64 = delayed_row
770 .try_get_by::<i64, _>("cnt")
771 .map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
772
773 queue_stats.push(SingleQueueStats {
774 name: q.to_string(),
775 pending: pending as usize,
776 delayed: delayed as usize,
777 });
778 }
779
780 let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
782 let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
783 let failed_row = conn
784 .query_one(failed_stmt)
785 .await
786 .map_err(Error::Db)?
787 .ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
788 let total_failed: i64 = failed_row
789 .try_get_by::<i64, _>("cnt")
790 .map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
791
792 Ok(QueueStats {
793 queues: queue_stats,
794 total_failed: total_failed as usize,
795 })
796}
797
798fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
803 let id: i64 = row
804 .try_get_by::<i64, _>("id")
805 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
806 let job_type: String = row
807 .try_get_by::<String, _>("job_type")
808 .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
809 let queue: String = row
810 .try_get_by::<String, _>("queue")
811 .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
812 let attempts: i32 = row
813 .try_get_by::<i32, _>("attempts")
814 .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
815 let max_retries: i32 = row
816 .try_get_by::<i32, _>("max_retries")
817 .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
818 let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
819 let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
820
821 Ok(JobInfo {
822 id,
823 job_type,
824 queue,
825 attempts: attempts as u32,
826 max_retries: max_retries as u32,
827 created_at,
828 available_at,
829 state,
830 })
831}
832
833fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
834 let job = parse_job_info(row, JobState::Failed)?;
835 let error: String = row
836 .try_get_by::<Option<String>, _>("error")
837 .map_err(|e| Error::custom(format!("parse error: {e}")))?
838 .unwrap_or_default();
839 let failed_at =
842 parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
843
844 Ok(FailedJobInfo {
845 job,
846 error,
847 failed_at,
848 })
849}
850
851#[cfg(test)]
856mod tests {
857 use super::*;
858 use sea_orm::Database;
859 use sea_orm_migration::MigratorTrait;
860
861 struct TestMigrator;
862
863 #[async_trait::async_trait]
864 impl MigratorTrait for TestMigrator {
865 fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
866 vec![Box::new(crate::migration::CreateJobsTable)]
867 }
868 }
869
870 async fn setup() -> DatabaseConnection {
872 let conn = Database::connect("sqlite::memory:")
873 .await
874 .expect("connect sqlite::memory:");
875 TestMigrator::up(&conn, None)
876 .await
877 .expect("run CreateJobsTable migration");
878 conn
879 }
880
881 #[allow(clippy::too_many_arguments)]
883 async fn insert_job(
884 conn: &DatabaseConnection,
885 queue: &str,
886 job_type: &str,
887 status: &str,
888 attempts: i32,
889 max_retries: i32,
890 claimed_at: Option<&str>,
891 available_at: &str,
892 ) -> i64 {
893 let now = Utc::now().to_rfc3339();
894 let claimed_at_sql = match claimed_at {
895 Some(ts) => format!("'{ts}'"),
896 None => "NULL".to_string(),
897 };
898 let sql = format!(
899 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
900 available_at, claimed_at, created_at) \
901 VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
902 '{available_at}', {claimed_at_sql}, '{now}') \
903 RETURNING id"
904 );
905 let row = conn
906 .query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
907 .await
908 .expect("insert_job query")
909 .expect("insert_job row");
910 row.try_get_by::<i64, _>("id").expect("insert id")
911 }
912
913 #[tokio::test]
914 async fn claim_returns_pending_job() {
915 let conn = setup().await;
916 let now = Utc::now().to_rfc3339();
917
918 insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
920
921 let job = claim(&conn, "default", "worker-1")
923 .await
924 .expect("claim failed");
925 assert!(job.is_some(), "expected Some(job), got None");
926 let job = job.unwrap();
927 assert_eq!(job.job_type, "MyJob");
928
929 let second = claim(&conn, "default", "worker-2")
931 .await
932 .expect("second claim failed");
933 assert!(second.is_none(), "second claim should return None");
934 }
935
936 #[tokio::test]
937 async fn idempotency_dedup() {
938 let conn = setup().await;
939 let now = Utc::now().to_rfc3339();
940 let available_at = DateTime::parse_from_rfc3339(&now)
941 .unwrap()
942 .with_timezone(&Utc);
943
944 enqueue(
946 &conn,
947 "default",
948 "MyJob",
949 "{}",
950 3,
951 Some("key-abc"),
952 None,
953 available_at,
954 )
955 .await
956 .expect("first enqueue");
957 enqueue(
958 &conn,
959 "default",
960 "MyJob",
961 "{}",
962 3,
963 Some("key-abc"),
964 None,
965 available_at,
966 )
967 .await
968 .expect("second enqueue (should be a no-op)");
969
970 let row = conn
972 .query_one(Statement::from_string(
973 DatabaseBackend::Sqlite,
974 "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
975 ))
976 .await
977 .expect("count query")
978 .expect("count row");
979 let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
980 assert_eq!(
981 cnt, 1,
982 "idempotency key should deduplicate (expected 1 row)"
983 );
984
985 enqueue(
987 &conn,
988 "default",
989 "OtherJob",
990 "{}",
991 3,
992 None,
993 None,
994 available_at,
995 )
996 .await
997 .expect("first plain enqueue");
998 enqueue(
999 &conn,
1000 "default",
1001 "OtherJob",
1002 "{}",
1003 3,
1004 None,
1005 None,
1006 available_at,
1007 )
1008 .await
1009 .expect("second plain enqueue");
1010
1011 let row2 = conn
1012 .query_one(Statement::from_string(
1013 DatabaseBackend::Sqlite,
1014 "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
1015 ))
1016 .await
1017 .expect("count query 2")
1018 .expect("count row 2");
1019 let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
1020 assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
1021 }
1022
1023 #[tokio::test]
1024 async fn reaper_reclaims_stuck_job() {
1025 let conn = setup().await;
1026
1027 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1029 let now = Utc::now().to_rfc3339();
1030 let id = insert_job(
1031 &conn,
1032 "default",
1033 "StuckJob",
1034 "claimed",
1035 0,
1036 3,
1037 Some(&ten_min_ago),
1038 &now,
1039 )
1040 .await;
1041
1042 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1044 .await
1045 .expect("reaper failed");
1046
1047 let row = conn
1049 .query_one(Statement::from_string(
1050 DatabaseBackend::Sqlite,
1051 format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1052 ))
1053 .await
1054 .expect("select after reaper")
1055 .expect("row after reaper");
1056
1057 let status: String = row.try_get_by::<String, _>("status").expect("status");
1058 let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1059 assert_eq!(
1060 status, "pending",
1061 "reaper should reset stuck job to pending"
1062 );
1063 assert_eq!(attempts, 1, "reaper should increment attempts");
1064 }
1065
1066 #[tokio::test]
1067 async fn reaper_boundary_parks_last_attempt() {
1068 let conn = setup().await;
1072 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1073 let now = Utc::now().to_rfc3339();
1074 let id = insert_job(
1075 &conn,
1076 "default",
1077 "BoundaryJob",
1078 "claimed",
1079 2, 3, Some(&ten_min_ago),
1082 &now,
1083 )
1084 .await;
1085
1086 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1087 .await
1088 .expect("reaper failed");
1089
1090 let row = conn
1091 .query_one(Statement::from_string(
1092 DatabaseBackend::Sqlite,
1093 format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1094 ))
1095 .await
1096 .expect("select after reaper")
1097 .expect("row");
1098 let status: String = row.try_get_by::<String, _>("status").expect("status");
1099 let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1100 assert_eq!(
1101 status, "failed",
1102 "job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
1103 );
1104 assert_eq!(
1105 attempts, 2,
1106 "parked job keeps its attempt count (no further requeue)"
1107 );
1108 }
1109
1110 #[tokio::test]
1111 async fn poison_job_parked() {
1112 let conn = setup().await;
1113
1114 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1116 let now = Utc::now().to_rfc3339();
1117 let id = insert_job(
1118 &conn,
1119 "default",
1120 "PoisonJob",
1121 "claimed",
1122 3,
1123 3,
1124 Some(&ten_min_ago),
1125 &now,
1126 )
1127 .await;
1128
1129 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1131 .await
1132 .expect("reaper failed");
1133
1134 let row = conn
1136 .query_one(Statement::from_string(
1137 DatabaseBackend::Sqlite,
1138 format!("SELECT status, error FROM jobs WHERE id={id}"),
1139 ))
1140 .await
1141 .expect("select after reaper")
1142 .expect("row");
1143
1144 let status: String = row.try_get_by::<String, _>("status").expect("status");
1145 let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
1146 assert_eq!(status, "failed", "exhausted job should be parked as failed");
1147 assert!(error.is_some(), "failed job should have an error message");
1148
1149 let available = Utc::now().to_rfc3339();
1151 insert_job(
1152 &conn, "default", "FreshJob", "pending", 0, 3, None, &available,
1153 )
1154 .await;
1155
1156 let claimed = claim(&conn, "default", "worker-1")
1157 .await
1158 .expect("claim after poison park");
1159 assert!(
1160 claimed.is_some(),
1161 "fresh job should be claimable after poison job is parked"
1162 );
1163 let claimed = claimed.unwrap();
1164 assert_eq!(claimed.job_type, "FreshJob");
1165 }
1166}