1use chrono::{DateTime, Utc};
10use sea_orm::{
11 ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
12};
13use serde::{Deserialize, Serialize};
14
15use crate::error::Error;
16
17static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
22
23type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
25static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
26
27pub struct Queue;
32
33impl Queue {
34 pub fn connection() -> &'static DatabaseConnection {
40 GLOBAL_CONNECTION
41 .get()
42 .expect("Queue not initialized. Call Queue::init() first.")
43 }
44
45 pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
49 GLOBAL_CONNECTION
50 .set(conn)
51 .map_err(|_| Error::custom("Queue already initialized"))?;
52 Ok(())
53 }
54
55 pub fn is_initialized() -> bool {
57 GLOBAL_CONNECTION.get().is_some()
58 }
59
60 pub fn register<J>()
66 where
67 J: crate::Job + serde::de::DeserializeOwned + 'static,
68 {
69 JOB_REGISTRARS
70 .lock()
71 .unwrap()
72 .push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
73 }
74
75 pub fn has_registered_jobs() -> bool {
77 !JOB_REGISTRARS.lock().unwrap().is_empty()
78 }
79
80 pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
84 for r in JOB_REGISTRARS.lock().unwrap().iter() {
85 r(w);
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
96pub struct JobRow {
97 pub id: i64,
99 pub job_type: String,
101 pub payload: String,
103 pub queue: String,
105 pub attempts: u32,
107 pub max_retries: u32,
109 pub idempotency_key: Option<String>,
111 pub tenant_id: Option<i64>,
113 pub available_at: DateTime<Utc>,
115 pub created_at: DateTime<Utc>,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum JobState {
127 Pending,
129 Delayed,
131 Failed,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct JobInfo {
138 pub id: i64,
140 pub job_type: String,
142 pub queue: String,
144 pub attempts: u32,
146 pub max_retries: u32,
148 pub created_at: String,
150 pub available_at: String,
152 pub state: JobState,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct SingleQueueStats {
159 pub name: String,
161 pub pending: usize,
163 pub delayed: usize,
165}
166
167#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169pub struct QueueStats {
170 pub queues: Vec<SingleQueueStats>,
172 pub total_failed: usize,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FailedJobInfo {
179 pub job: JobInfo,
181 pub error: String,
183 pub failed_at: DateTime<Utc>,
186}
187
188fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
194 let id: i64 = row
195 .try_get_by::<i64, _>("id")
196 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
197 let job_type: String = row
198 .try_get_by::<String, _>("job_type")
199 .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
200 let payload: String = row
201 .try_get_by::<String, _>("payload")
202 .map_err(|e| Error::custom(format!("parse payload: {e}")))?;
203 let queue: String = row
204 .try_get_by::<String, _>("queue")
205 .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
206 let attempts: i32 = row
207 .try_get_by::<i32, _>("attempts")
208 .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
209 let max_retries: i32 = row
210 .try_get_by::<i32, _>("max_retries")
211 .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
212 let idempotency_key: Option<String> = row
213 .try_get_by::<Option<String>, _>("idempotency_key")
214 .map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
215 let tenant_id: Option<i64> = row
216 .try_get_by::<Option<i64>, _>("tenant_id")
217 .map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
218
219 let available_at = parse_timestamp(row, "available_at")?;
223 let created_at = parse_timestamp(row, "created_at")?;
224
225 Ok(JobRow {
226 id,
227 job_type,
228 payload,
229 queue,
230 attempts: attempts as u32,
231 max_retries: max_retries as u32,
232 idempotency_key,
233 tenant_id,
234 available_at,
235 created_at,
236 })
237}
238
239fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
242 if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
244 return Ok(dt);
245 }
246 let s: String = row
248 .try_get_by::<String, _>(col)
249 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
250 DateTime::parse_from_rfc3339(&s)
251 .map(|dt| dt.with_timezone(&Utc))
252 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
253}
254
255fn parse_optional_timestamp(
258 row: &sea_orm::QueryResult,
259 col: &str,
260) -> Result<Option<DateTime<Utc>>, Error> {
261 if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
263 return Ok(opt);
264 }
265 let s: Option<String> = row
267 .try_get_by::<Option<String>, _>(col)
268 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
269 match s {
270 None => Ok(None),
271 Some(s) => DateTime::parse_from_rfc3339(&s)
272 .map(|dt| Some(dt.with_timezone(&Utc)))
273 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
274 }
275}
276
277fn ph(backend: DatabaseBackend, n: usize) -> String {
279 match backend {
280 DatabaseBackend::Postgres => format!("${n}"),
281 _ => format!("?{n}"),
282 }
283}
284
285pub async fn claim(
299 conn: &DatabaseConnection,
300 queue: &str,
301 worker_id: &str,
302) -> Result<Option<JobRow>, Error> {
303 match conn.get_database_backend() {
304 DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
305 DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
306 _ => Err(Error::UnsupportedBackend),
307 }
308}
309
310async fn claim_postgres(
311 conn: &DatabaseConnection,
312 queue: &str,
313 worker_id: &str,
314) -> Result<Option<JobRow>, Error> {
315 let txn = conn.begin().await.map_err(Error::Db)?;
316
317 let select = Statement::from_sql_and_values(
318 DatabaseBackend::Postgres,
319 "SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
320 tenant_id, available_at, created_at FROM jobs \
321 WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
322 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
323 [Value::String(Some(Box::new(queue.to_string())))],
324 );
325
326 let row = txn.query_one(select).await.map_err(Error::Db)?;
327 let Some(row) = row else {
328 txn.commit().await.map_err(Error::Db)?;
329 return Ok(None);
330 };
331 let job = parse_job_row(&row)?;
332
333 let upd = Statement::from_sql_and_values(
334 DatabaseBackend::Postgres,
335 "UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
336 [
337 Value::BigInt(Some(job.id)),
338 Value::String(Some(Box::new(worker_id.to_string()))),
339 ],
340 );
341 txn.execute(upd).await.map_err(Error::Db)?;
342 txn.commit().await.map_err(Error::Db)?;
343
344 Ok(Some(job))
345}
346
347async fn claim_sqlite(
348 conn: &DatabaseConnection,
349 queue: &str,
350 worker_id: &str,
351) -> Result<Option<JobRow>, Error> {
352 let now_iso = Utc::now().to_rfc3339();
353
354 let txn = conn.begin().await.map_err(Error::Db)?;
368
369 let stmt = Statement::from_sql_and_values(
370 DatabaseBackend::Sqlite,
371 "UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
372 WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
373 AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
374 RETURNING id, job_type, payload, queue, attempts, max_retries, \
375 idempotency_key, tenant_id, available_at, created_at",
376 [
377 Value::String(Some(Box::new(now_iso))),
378 Value::String(Some(Box::new(worker_id.to_string()))),
379 Value::String(Some(Box::new(queue.to_string()))),
380 ],
381 );
382
383 let row = match txn.query_one(stmt).await {
384 Ok(r) => r,
385 Err(e) => {
386 let _ = txn.rollback().await;
388 return Err(Error::Db(e));
389 }
390 };
391 txn.commit().await.map_err(Error::Db)?;
392
393 row.map(|r| parse_job_row(&r)).transpose()
394}
395
396pub async fn reaper(
410 conn: &DatabaseConnection,
411 queue: &str,
412 visibility_timeout: std::time::Duration,
413) -> Result<(), Error> {
414 let now = Utc::now();
415 let duration = chrono::Duration::from_std(visibility_timeout)
416 .map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
417 let cutoff = (now - duration).to_rfc3339();
418 let now_iso = now.to_rfc3339();
419
420 let backend = conn.get_database_backend();
421 let txn = conn.begin().await.map_err(Error::Db)?;
422
423 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
428 let requeue_sql = format!(
429 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
430 attempts = attempts + 1, available_at = {p1} \
431 WHERE status='claimed' AND claimed_at < {p2} \
432 AND attempts + 1 < max_retries AND queue = {p3}"
433 );
434 let requeue = Statement::from_sql_and_values(
435 backend,
436 &requeue_sql,
437 [
438 Value::String(Some(Box::new(now_iso.clone()))),
439 Value::String(Some(Box::new(cutoff.clone()))),
440 Value::String(Some(Box::new(queue.to_string()))),
441 ],
442 );
443 txn.execute(requeue).await.map_err(Error::Db)?;
444
445 let (pp1, pp2, pp3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
448 let park_sql = format!(
449 "UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
450 WHERE status='claimed' AND claimed_at < {pp2} \
451 AND attempts + 1 >= max_retries AND queue = {pp3}"
452 );
453 let park = Statement::from_sql_and_values(
454 backend,
455 &park_sql,
456 [
457 Value::String(Some(Box::new(now_iso))),
458 Value::String(Some(Box::new(cutoff))),
459 Value::String(Some(Box::new(queue.to_string()))),
460 ],
461 );
462 txn.execute(park).await.map_err(Error::Db)?;
463
464 txn.commit().await.map_err(Error::Db)
465}
466
467#[allow(clippy::too_many_arguments)]
477pub async fn enqueue(
478 conn: &DatabaseConnection,
479 queue: &str,
480 job_type: &str,
481 payload: &str,
482 max_retries: u32,
483 idempotency_key: Option<&str>,
484 tenant_id: Option<i64>,
485 available_at: DateTime<Utc>,
486) -> Result<(), Error> {
487 let backend = conn.get_database_backend();
488 let now_iso = Utc::now().to_rfc3339();
489 let available_iso = available_at.to_rfc3339();
490
491 if let Some(idem) = idempotency_key {
492 let (p1, p2, p3, p4, p5, p6, p7, p8) = (
494 ph(backend, 1),
495 ph(backend, 2),
496 ph(backend, 3),
497 ph(backend, 4),
498 ph(backend, 5),
499 ph(backend, 6),
500 ph(backend, 7),
501 ph(backend, 8),
502 );
503 let sql = format!(
504 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
505 idempotency_key, tenant_id, available_at, created_at) \
506 SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
507 WHERE NOT EXISTS ( \
508 SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
509 AND status IN ('pending','claimed') \
510 )"
511 );
512 let stmt = Statement::from_sql_and_values(
513 backend,
514 &sql,
515 [
516 Value::String(Some(Box::new(queue.to_string()))),
517 Value::String(Some(Box::new(job_type.to_string()))),
518 Value::String(Some(Box::new(payload.to_string()))),
519 Value::Int(Some(max_retries as i32)),
520 Value::String(Some(Box::new(idem.to_string()))),
521 tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
522 Value::String(Some(Box::new(available_iso))),
523 Value::String(Some(Box::new(now_iso))),
524 ],
525 );
526 conn.execute(stmt).await.map_err(Error::Db)?;
527 } else {
528 let (p1, p2, p3, p4, p5, p6, p7) = (
530 ph(backend, 1),
531 ph(backend, 2),
532 ph(backend, 3),
533 ph(backend, 4),
534 ph(backend, 5),
535 ph(backend, 6),
536 ph(backend, 7),
537 );
538 let sql = format!(
539 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
540 tenant_id, available_at, created_at) \
541 VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
542 );
543 let stmt = Statement::from_sql_and_values(
544 backend,
545 &sql,
546 [
547 Value::String(Some(Box::new(queue.to_string()))),
548 Value::String(Some(Box::new(job_type.to_string()))),
549 Value::String(Some(Box::new(payload.to_string()))),
550 Value::Int(Some(max_retries as i32)),
551 tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
552 Value::String(Some(Box::new(available_iso))),
553 Value::String(Some(Box::new(now_iso))),
554 ],
555 );
556 conn.execute(stmt).await.map_err(Error::Db)?;
557 }
558
559 Ok(())
560}
561
562pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
568 let backend = conn.get_database_backend();
569 let p1 = ph(backend, 1);
570 let sql = format!("DELETE FROM jobs WHERE id = {p1}");
571 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
572 conn.execute(stmt).await.map_err(Error::Db)?;
573 Ok(())
574}
575
576pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
578 let backend = conn.get_database_backend();
579 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
580 let sql =
581 format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
582 let stmt = Statement::from_sql_and_values(
583 backend,
584 &sql,
585 [
586 Value::String(Some(Box::new(error.to_string()))),
587 Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
588 Value::BigInt(Some(id)),
589 ],
590 );
591 conn.execute(stmt).await.map_err(Error::Db)?;
592 Ok(())
593}
594
595pub async fn release_job(
598 conn: &DatabaseConnection,
599 id: i64,
600 attempts: u32,
601 available_at: DateTime<Utc>,
602) -> Result<(), Error> {
603 let backend = conn.get_database_backend();
604 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
605 let sql = format!(
606 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
607 attempts={p1}, available_at={p2} WHERE id = {p3}"
608 );
609 let stmt = Statement::from_sql_and_values(
610 backend,
611 &sql,
612 [
613 Value::Int(Some(attempts as i32)),
614 Value::String(Some(Box::new(available_at.to_rfc3339()))),
615 Value::BigInt(Some(id)),
616 ],
617 );
618 conn.execute(stmt).await.map_err(Error::Db)?;
619 Ok(())
620}
621
622pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
625 let backend = conn.get_database_backend();
626 let p1 = ph(backend, 1);
627 let sql = format!(
628 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
629 WHERE status='claimed' AND claimed_by={p1}"
630 );
631 let stmt = Statement::from_sql_and_values(
632 backend,
633 &sql,
634 [Value::String(Some(Box::new(worker_id.to_string())))],
635 );
636 conn.execute(stmt).await.map_err(Error::Db)?;
637 Ok(())
638}
639
640pub async fn reap_startup_claims(
657 conn: &DatabaseConnection,
658 queues: &[String],
659) -> Result<u64, Error> {
660 if queues.is_empty() {
661 return Ok(0);
662 }
663 let backend = conn.get_database_backend();
664 let now_iso = Utc::now().to_rfc3339();
665
666 let queue_phs: Vec<String> = (1..=queues.len()).map(|i| ph(backend, i)).collect();
668 let ts_ph = ph(backend, queues.len() + 1);
669 let sql = format!(
670 "UPDATE jobs SET status='failed', \
671 error='reaped on worker startup (orphan claim from previous worker)', \
672 failed_at={ts_ph} \
673 WHERE status='claimed' AND queue IN ({})",
674 queue_phs.join(", "),
675 );
676
677 let mut values: Vec<Value> = queues
678 .iter()
679 .map(|q| Value::String(Some(Box::new(q.clone()))))
680 .collect();
681 values.push(Value::String(Some(Box::new(now_iso))));
682
683 let stmt = Statement::from_sql_and_values(backend, &sql, values);
684 let result = conn.execute(stmt).await.map_err(Error::Db)?;
685 Ok(result.rows_affected())
686}
687
688pub async fn get_pending_jobs(
694 conn: &DatabaseConnection,
695 queue: &str,
696 limit: u64,
697) -> Result<Vec<JobInfo>, Error> {
698 let backend = conn.get_database_backend();
699 let now_iso = Utc::now().to_rfc3339();
700 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
701 let sql = format!(
702 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
703 FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
704 ORDER BY id LIMIT {p3}"
705 );
706 let stmt = Statement::from_sql_and_values(
707 backend,
708 &sql,
709 [
710 Value::String(Some(Box::new(queue.to_string()))),
711 Value::String(Some(Box::new(now_iso))),
712 Value::BigInt(Some(limit as i64)),
713 ],
714 );
715 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
716 rows.iter()
717 .map(|r| parse_job_info(r, JobState::Pending))
718 .collect()
719}
720
721pub async fn get_delayed_jobs(
723 conn: &DatabaseConnection,
724 queue: &str,
725 limit: u64,
726) -> Result<Vec<JobInfo>, Error> {
727 let backend = conn.get_database_backend();
728 let now_iso = Utc::now().to_rfc3339();
729 let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
730 let sql = format!(
731 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
732 FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
733 ORDER BY id LIMIT {p3}"
734 );
735 let stmt = Statement::from_sql_and_values(
736 backend,
737 &sql,
738 [
739 Value::String(Some(Box::new(queue.to_string()))),
740 Value::String(Some(Box::new(now_iso))),
741 Value::BigInt(Some(limit as i64)),
742 ],
743 );
744 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
745 rows.iter()
746 .map(|r| parse_job_info(r, JobState::Delayed))
747 .collect()
748}
749
750pub async fn get_failed_jobs(
752 conn: &DatabaseConnection,
753 limit: u64,
754) -> Result<Vec<FailedJobInfo>, Error> {
755 let backend = conn.get_database_backend();
756 let p1 = ph(backend, 1);
757 let sql = format!(
760 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
761 error, failed_at FROM jobs WHERE status='failed' \
762 ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
763 );
764 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
765 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
766 rows.iter().map(parse_failed_job_info).collect()
767}
768
769pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
771 let backend = conn.get_database_backend();
772 let now_iso = Utc::now().to_rfc3339();
773 let mut queue_stats = Vec::new();
774
775 for &q in queues {
776 let p1 = ph(backend, 1);
777 let p2 = ph(backend, 2);
778 let p3 = ph(backend, 3);
779 let pending_sql = format!(
780 "SELECT COUNT(*) as cnt FROM jobs \
781 WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
782 );
783 let pending_stmt = Statement::from_sql_and_values(
784 backend,
785 &pending_sql,
786 [
787 Value::String(Some(Box::new(q.to_string()))),
788 Value::String(Some(Box::new(now_iso.clone()))),
789 ],
790 );
791 let pending_row = conn
792 .query_one(pending_stmt)
793 .await
794 .map_err(Error::Db)?
795 .ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
796 let pending: i64 = pending_row
797 .try_get_by::<i64, _>("cnt")
798 .map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
799
800 let delayed_sql = format!(
801 "SELECT COUNT(*) as cnt FROM jobs \
802 WHERE status='pending' AND queue={p1} AND available_at > {p3}"
803 );
804 let delayed_stmt = Statement::from_sql_and_values(
805 backend,
806 &delayed_sql,
807 [
808 Value::String(Some(Box::new(q.to_string()))),
809 Value::String(Some(Box::new(now_iso.clone()))),
810 ],
811 );
812 let delayed_row = conn
813 .query_one(delayed_stmt)
814 .await
815 .map_err(Error::Db)?
816 .ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
817 let delayed: i64 = delayed_row
818 .try_get_by::<i64, _>("cnt")
819 .map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
820
821 queue_stats.push(SingleQueueStats {
822 name: q.to_string(),
823 pending: pending as usize,
824 delayed: delayed as usize,
825 });
826 }
827
828 let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
830 let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
831 let failed_row = conn
832 .query_one(failed_stmt)
833 .await
834 .map_err(Error::Db)?
835 .ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
836 let total_failed: i64 = failed_row
837 .try_get_by::<i64, _>("cnt")
838 .map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
839
840 Ok(QueueStats {
841 queues: queue_stats,
842 total_failed: total_failed as usize,
843 })
844}
845
846fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
851 let id: i64 = row
852 .try_get_by::<i64, _>("id")
853 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
854 let job_type: String = row
855 .try_get_by::<String, _>("job_type")
856 .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
857 let queue: String = row
858 .try_get_by::<String, _>("queue")
859 .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
860 let attempts: i32 = row
861 .try_get_by::<i32, _>("attempts")
862 .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
863 let max_retries: i32 = row
864 .try_get_by::<i32, _>("max_retries")
865 .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
866 let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
867 let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
868
869 Ok(JobInfo {
870 id,
871 job_type,
872 queue,
873 attempts: attempts as u32,
874 max_retries: max_retries as u32,
875 created_at,
876 available_at,
877 state,
878 })
879}
880
881fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
882 let job = parse_job_info(row, JobState::Failed)?;
883 let error: String = row
884 .try_get_by::<Option<String>, _>("error")
885 .map_err(|e| Error::custom(format!("parse error: {e}")))?
886 .unwrap_or_default();
887 let failed_at =
890 parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
891
892 Ok(FailedJobInfo {
893 job,
894 error,
895 failed_at,
896 })
897}
898
899#[cfg(test)]
904mod tests {
905 use super::*;
906 use sea_orm::Database;
907 use sea_orm_migration::MigratorTrait;
908
909 struct TestMigrator;
910
911 #[async_trait::async_trait]
912 impl MigratorTrait for TestMigrator {
913 fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
914 vec![Box::new(crate::migration::CreateJobsTable)]
915 }
916 }
917
918 async fn setup() -> DatabaseConnection {
920 let conn = Database::connect("sqlite::memory:")
921 .await
922 .expect("connect sqlite::memory:");
923 TestMigrator::up(&conn, None)
924 .await
925 .expect("run CreateJobsTable migration");
926 conn
927 }
928
929 #[allow(clippy::too_many_arguments)]
931 async fn insert_job(
932 conn: &DatabaseConnection,
933 queue: &str,
934 job_type: &str,
935 status: &str,
936 attempts: i32,
937 max_retries: i32,
938 claimed_at: Option<&str>,
939 available_at: &str,
940 ) -> i64 {
941 let now = Utc::now().to_rfc3339();
942 let claimed_at_sql = match claimed_at {
943 Some(ts) => format!("'{ts}'"),
944 None => "NULL".to_string(),
945 };
946 let sql = format!(
947 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
948 available_at, claimed_at, created_at) \
949 VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
950 '{available_at}', {claimed_at_sql}, '{now}') \
951 RETURNING id"
952 );
953 let row = conn
954 .query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
955 .await
956 .expect("insert_job query")
957 .expect("insert_job row");
958 row.try_get_by::<i64, _>("id").expect("insert id")
959 }
960
961 #[tokio::test]
962 async fn claim_returns_pending_job() {
963 let conn = setup().await;
964 let now = Utc::now().to_rfc3339();
965
966 insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
968
969 let job = claim(&conn, "default", "worker-1")
971 .await
972 .expect("claim failed");
973 assert!(job.is_some(), "expected Some(job), got None");
974 let job = job.unwrap();
975 assert_eq!(job.job_type, "MyJob");
976
977 let second = claim(&conn, "default", "worker-2")
979 .await
980 .expect("second claim failed");
981 assert!(second.is_none(), "second claim should return None");
982 }
983
984 #[tokio::test]
985 async fn idempotency_dedup() {
986 let conn = setup().await;
987 let now = Utc::now().to_rfc3339();
988 let available_at = DateTime::parse_from_rfc3339(&now)
989 .unwrap()
990 .with_timezone(&Utc);
991
992 enqueue(
994 &conn,
995 "default",
996 "MyJob",
997 "{}",
998 3,
999 Some("key-abc"),
1000 None,
1001 available_at,
1002 )
1003 .await
1004 .expect("first enqueue");
1005 enqueue(
1006 &conn,
1007 "default",
1008 "MyJob",
1009 "{}",
1010 3,
1011 Some("key-abc"),
1012 None,
1013 available_at,
1014 )
1015 .await
1016 .expect("second enqueue (should be a no-op)");
1017
1018 let row = conn
1020 .query_one(Statement::from_string(
1021 DatabaseBackend::Sqlite,
1022 "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
1023 ))
1024 .await
1025 .expect("count query")
1026 .expect("count row");
1027 let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
1028 assert_eq!(
1029 cnt, 1,
1030 "idempotency key should deduplicate (expected 1 row)"
1031 );
1032
1033 enqueue(
1035 &conn,
1036 "default",
1037 "OtherJob",
1038 "{}",
1039 3,
1040 None,
1041 None,
1042 available_at,
1043 )
1044 .await
1045 .expect("first plain enqueue");
1046 enqueue(
1047 &conn,
1048 "default",
1049 "OtherJob",
1050 "{}",
1051 3,
1052 None,
1053 None,
1054 available_at,
1055 )
1056 .await
1057 .expect("second plain enqueue");
1058
1059 let row2 = conn
1060 .query_one(Statement::from_string(
1061 DatabaseBackend::Sqlite,
1062 "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
1063 ))
1064 .await
1065 .expect("count query 2")
1066 .expect("count row 2");
1067 let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
1068 assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
1069 }
1070
1071 #[tokio::test]
1072 async fn reaper_reclaims_stuck_job() {
1073 let conn = setup().await;
1074
1075 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1077 let now = Utc::now().to_rfc3339();
1078 let id = insert_job(
1079 &conn,
1080 "default",
1081 "StuckJob",
1082 "claimed",
1083 0,
1084 3,
1085 Some(&ten_min_ago),
1086 &now,
1087 )
1088 .await;
1089
1090 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1092 .await
1093 .expect("reaper failed");
1094
1095 let row = conn
1097 .query_one(Statement::from_string(
1098 DatabaseBackend::Sqlite,
1099 format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1100 ))
1101 .await
1102 .expect("select after reaper")
1103 .expect("row after reaper");
1104
1105 let status: String = row.try_get_by::<String, _>("status").expect("status");
1106 let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1107 assert_eq!(
1108 status, "pending",
1109 "reaper should reset stuck job to pending"
1110 );
1111 assert_eq!(attempts, 1, "reaper should increment attempts");
1112 }
1113
1114 #[tokio::test]
1115 async fn reaper_boundary_parks_last_attempt() {
1116 let conn = setup().await;
1120 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1121 let now = Utc::now().to_rfc3339();
1122 let id = insert_job(
1123 &conn,
1124 "default",
1125 "BoundaryJob",
1126 "claimed",
1127 2, 3, Some(&ten_min_ago),
1130 &now,
1131 )
1132 .await;
1133
1134 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1135 .await
1136 .expect("reaper failed");
1137
1138 let row = conn
1139 .query_one(Statement::from_string(
1140 DatabaseBackend::Sqlite,
1141 format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1142 ))
1143 .await
1144 .expect("select after reaper")
1145 .expect("row");
1146 let status: String = row.try_get_by::<String, _>("status").expect("status");
1147 let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1148 assert_eq!(
1149 status, "failed",
1150 "job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
1151 );
1152 assert_eq!(
1153 attempts, 2,
1154 "parked job keeps its attempt count (no further requeue)"
1155 );
1156 }
1157
1158 #[tokio::test]
1159 async fn reap_startup_claims_marks_orphans_failed() {
1160 let conn = setup().await;
1164 let now = Utc::now().to_rfc3339();
1165
1166 let orphan_default = insert_job(
1169 &conn,
1170 "default",
1171 "Orphan1",
1172 "claimed",
1173 0,
1174 3,
1175 Some(&now),
1176 &now,
1177 )
1178 .await;
1179 let orphan_publish = insert_job(
1180 &conn,
1181 "publish",
1182 "Orphan2",
1183 "claimed",
1184 1,
1185 3,
1186 Some(&now),
1187 &now,
1188 )
1189 .await;
1190 let orphan_other =
1191 insert_job(&conn, "other", "Orphan3", "claimed", 0, 3, Some(&now), &now).await;
1192 let pending_default =
1193 insert_job(&conn, "default", "Fresh", "pending", 0, 3, None, &now).await;
1194
1195 let reaped = reap_startup_claims(&conn, &["default".to_string(), "publish".to_string()])
1197 .await
1198 .expect("reap_startup_claims failed");
1199 assert_eq!(
1200 reaped, 2,
1201 "expected 2 orphan rows reaped (default + publish)"
1202 );
1203
1204 for id in [orphan_default, orphan_publish] {
1206 let row = conn
1207 .query_one(Statement::from_string(
1208 DatabaseBackend::Sqlite,
1209 format!("SELECT status, error, failed_at FROM jobs WHERE id={id}"),
1210 ))
1211 .await
1212 .expect("select after reap")
1213 .expect("row after reap");
1214 let status: String = row.try_get_by::<String, _>("status").expect("status");
1215 let error: Option<String> =
1216 row.try_get_by::<Option<String>, _>("error").expect("error");
1217 let failed_at: Option<String> = row
1218 .try_get_by::<Option<String>, _>("failed_at")
1219 .expect("failed_at");
1220 assert_eq!(status, "failed", "orphan must be parked as failed");
1221 assert!(
1222 error.as_deref().unwrap_or("").contains("orphan claim"),
1223 "error must record reap reason, got: {error:?}"
1224 );
1225 assert!(failed_at.is_some(), "failed_at must be stamped");
1226 }
1227
1228 let row = conn
1230 .query_one(Statement::from_string(
1231 DatabaseBackend::Sqlite,
1232 format!("SELECT status FROM jobs WHERE id={orphan_other}"),
1233 ))
1234 .await
1235 .expect("select untouched")
1236 .expect("row untouched");
1237 let status: String = row.try_get_by::<String, _>("status").expect("status");
1238 assert_eq!(
1239 status, "claimed",
1240 "orphan on a queue not handled by this worker must be left alone"
1241 );
1242
1243 let row = conn
1245 .query_one(Statement::from_string(
1246 DatabaseBackend::Sqlite,
1247 format!("SELECT status FROM jobs WHERE id={pending_default}"),
1248 ))
1249 .await
1250 .expect("select pending")
1251 .expect("row pending");
1252 let status: String = row.try_get_by::<String, _>("status").expect("status");
1253 assert_eq!(status, "pending", "pending row must not be reaped");
1254 }
1255
1256 #[tokio::test]
1257 async fn reap_startup_claims_empty_queues_is_noop() {
1258 let conn = setup().await;
1261 let now = Utc::now().to_rfc3339();
1262 insert_job(
1263 &conn,
1264 "default",
1265 "Orphan",
1266 "claimed",
1267 0,
1268 3,
1269 Some(&now),
1270 &now,
1271 )
1272 .await;
1273
1274 let reaped = reap_startup_claims(&conn, &[])
1275 .await
1276 .expect("reap on empty queues must succeed");
1277 assert_eq!(reaped, 0);
1278 }
1279
1280 #[tokio::test]
1281 async fn poison_job_parked() {
1282 let conn = setup().await;
1283
1284 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1286 let now = Utc::now().to_rfc3339();
1287 let id = insert_job(
1288 &conn,
1289 "default",
1290 "PoisonJob",
1291 "claimed",
1292 3,
1293 3,
1294 Some(&ten_min_ago),
1295 &now,
1296 )
1297 .await;
1298
1299 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1301 .await
1302 .expect("reaper failed");
1303
1304 let row = conn
1306 .query_one(Statement::from_string(
1307 DatabaseBackend::Sqlite,
1308 format!("SELECT status, error FROM jobs WHERE id={id}"),
1309 ))
1310 .await
1311 .expect("select after reaper")
1312 .expect("row");
1313
1314 let status: String = row.try_get_by::<String, _>("status").expect("status");
1315 let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
1316 assert_eq!(status, "failed", "exhausted job should be parked as failed");
1317 assert!(error.is_some(), "failed job should have an error message");
1318
1319 let available = Utc::now().to_rfc3339();
1321 insert_job(
1322 &conn, "default", "FreshJob", "pending", 0, 3, None, &available,
1323 )
1324 .await;
1325
1326 let claimed = claim(&conn, "default", "worker-1")
1327 .await
1328 .expect("claim after poison park");
1329 assert!(
1330 claimed.is_some(),
1331 "fresh job should be claimable after poison job is parked"
1332 );
1333 let claimed = claimed.unwrap();
1334 assert_eq!(claimed.job_type, "FreshJob");
1335 }
1336}