1use chrono::{DateTime, Utc};
10use sea_orm::{
11 ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
12};
13use serde::{Deserialize, Serialize};
14
15use crate::error::Error;
16
17static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
22
23type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
25static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
26
27pub struct Queue;
32
33impl Queue {
34 pub fn connection() -> &'static DatabaseConnection {
40 GLOBAL_CONNECTION
41 .get()
42 .expect("Queue not initialized. Call Queue::init() first.")
43 }
44
45 pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
49 GLOBAL_CONNECTION
50 .set(conn)
51 .map_err(|_| Error::custom("Queue already initialized"))?;
52 Ok(())
53 }
54
55 pub fn is_initialized() -> bool {
57 GLOBAL_CONNECTION.get().is_some()
58 }
59
60 pub fn register<J>()
66 where
67 J: crate::Job + serde::de::DeserializeOwned + 'static,
68 {
69 JOB_REGISTRARS
70 .lock()
71 .unwrap()
72 .push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
73 }
74
75 pub fn has_registered_jobs() -> bool {
77 !JOB_REGISTRARS.lock().unwrap().is_empty()
78 }
79
80 pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
84 for r in JOB_REGISTRARS.lock().unwrap().iter() {
85 r(w);
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
96pub struct JobRow {
97 pub id: i64,
99 pub job_type: String,
101 pub payload: String,
103 pub queue: String,
105 pub attempts: u32,
107 pub max_retries: u32,
109 pub idempotency_key: Option<String>,
111 pub tenant_id: Option<i64>,
113 pub available_at: DateTime<Utc>,
115 pub created_at: DateTime<Utc>,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum JobState {
127 Pending,
129 Delayed,
131 Failed,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct JobInfo {
138 pub id: i64,
140 pub job_type: String,
142 pub queue: String,
144 pub attempts: u32,
146 pub max_retries: u32,
148 pub created_at: String,
150 pub available_at: String,
152 pub state: JobState,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct SingleQueueStats {
159 pub name: String,
161 pub pending: usize,
163 pub delayed: usize,
165}
166
167#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169pub struct QueueStats {
170 pub queues: Vec<SingleQueueStats>,
172 pub total_failed: usize,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FailedJobInfo {
179 pub job: JobInfo,
181 pub error: String,
183 pub failed_at: DateTime<Utc>,
186}
187
188fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
194 let id: i64 = row
195 .try_get_by::<i64, _>("id")
196 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
197 let job_type: String = row
198 .try_get_by::<String, _>("job_type")
199 .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
200 let payload: String = row
201 .try_get_by::<String, _>("payload")
202 .map_err(|e| Error::custom(format!("parse payload: {e}")))?;
203 let queue: String = row
204 .try_get_by::<String, _>("queue")
205 .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
206 let attempts: i32 = row
207 .try_get_by::<i32, _>("attempts")
208 .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
209 let max_retries: i32 = row
210 .try_get_by::<i32, _>("max_retries")
211 .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
212 let idempotency_key: Option<String> = row
213 .try_get_by::<Option<String>, _>("idempotency_key")
214 .map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
215 let tenant_id: Option<i64> = row
216 .try_get_by::<Option<i64>, _>("tenant_id")
217 .map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
218
219 let available_at = parse_timestamp(row, "available_at")?;
223 let created_at = parse_timestamp(row, "created_at")?;
224
225 Ok(JobRow {
226 id,
227 job_type,
228 payload,
229 queue,
230 attempts: attempts as u32,
231 max_retries: max_retries as u32,
232 idempotency_key,
233 tenant_id,
234 available_at,
235 created_at,
236 })
237}
238
239fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
242 if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
244 return Ok(dt);
245 }
246 let s: String = row
248 .try_get_by::<String, _>(col)
249 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
250 DateTime::parse_from_rfc3339(&s)
251 .map(|dt| dt.with_timezone(&Utc))
252 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
253}
254
255fn parse_optional_timestamp(
258 row: &sea_orm::QueryResult,
259 col: &str,
260) -> Result<Option<DateTime<Utc>>, Error> {
261 if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
263 return Ok(opt);
264 }
265 let s: Option<String> = row
267 .try_get_by::<Option<String>, _>(col)
268 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
269 match s {
270 None => Ok(None),
271 Some(s) => DateTime::parse_from_rfc3339(&s)
272 .map(|dt| Some(dt.with_timezone(&Utc)))
273 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
274 }
275}
276
277fn ph(backend: DatabaseBackend, n: usize) -> String {
279 match backend {
280 DatabaseBackend::Postgres => format!("${n}"),
281 _ => format!("?{n}"),
282 }
283}
284
285fn ts_ph(backend: DatabaseBackend, n: usize) -> String {
299 match backend {
300 DatabaseBackend::Postgres => format!("${n}::timestamptz"),
301 _ => format!("?{n}"),
302 }
303}
304
305pub async fn claim(
319 conn: &DatabaseConnection,
320 queue: &str,
321 worker_id: &str,
322) -> Result<Option<JobRow>, Error> {
323 match conn.get_database_backend() {
324 DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
325 DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
326 _ => Err(Error::UnsupportedBackend),
327 }
328}
329
330async fn claim_postgres(
331 conn: &DatabaseConnection,
332 queue: &str,
333 worker_id: &str,
334) -> Result<Option<JobRow>, Error> {
335 let txn = conn.begin().await.map_err(Error::Db)?;
336
337 let select = Statement::from_sql_and_values(
338 DatabaseBackend::Postgres,
339 "SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
340 tenant_id, available_at, created_at FROM jobs \
341 WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
342 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
343 [Value::String(Some(Box::new(queue.to_string())))],
344 );
345
346 let row = txn.query_one(select).await.map_err(Error::Db)?;
347 let Some(row) = row else {
348 txn.commit().await.map_err(Error::Db)?;
349 return Ok(None);
350 };
351 let job = parse_job_row(&row)?;
352
353 let upd = Statement::from_sql_and_values(
354 DatabaseBackend::Postgres,
355 "UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
356 [
357 Value::BigInt(Some(job.id)),
358 Value::String(Some(Box::new(worker_id.to_string()))),
359 ],
360 );
361 txn.execute(upd).await.map_err(Error::Db)?;
362 txn.commit().await.map_err(Error::Db)?;
363
364 Ok(Some(job))
365}
366
367async fn claim_sqlite(
368 conn: &DatabaseConnection,
369 queue: &str,
370 worker_id: &str,
371) -> Result<Option<JobRow>, Error> {
372 let now_iso = Utc::now().to_rfc3339();
373
374 let txn = conn.begin().await.map_err(Error::Db)?;
388
389 let stmt = Statement::from_sql_and_values(
390 DatabaseBackend::Sqlite,
391 "UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
392 WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
393 AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
394 RETURNING id, job_type, payload, queue, attempts, max_retries, \
395 idempotency_key, tenant_id, available_at, created_at",
396 [
397 Value::String(Some(Box::new(now_iso))),
398 Value::String(Some(Box::new(worker_id.to_string()))),
399 Value::String(Some(Box::new(queue.to_string()))),
400 ],
401 );
402
403 let row = match txn.query_one(stmt).await {
404 Ok(r) => r,
405 Err(e) => {
406 let _ = txn.rollback().await;
408 return Err(Error::Db(e));
409 }
410 };
411 txn.commit().await.map_err(Error::Db)?;
412
413 row.map(|r| parse_job_row(&r)).transpose()
414}
415
416pub async fn reaper(
430 conn: &DatabaseConnection,
431 queue: &str,
432 visibility_timeout: std::time::Duration,
433) -> Result<(), Error> {
434 let now = Utc::now();
435 let duration = chrono::Duration::from_std(visibility_timeout)
436 .map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
437 let cutoff = (now - duration).to_rfc3339();
438 let now_iso = now.to_rfc3339();
439
440 let backend = conn.get_database_backend();
441 let txn = conn.begin().await.map_err(Error::Db)?;
442
443 let (p1, p2, p3) = (ts_ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
448 let requeue_sql = format!(
449 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
450 attempts = attempts + 1, available_at = {p1} \
451 WHERE status='claimed' AND claimed_at < {p2} \
452 AND attempts + 1 < max_retries AND queue = {p3}"
453 );
454 let requeue = Statement::from_sql_and_values(
455 backend,
456 &requeue_sql,
457 [
458 Value::String(Some(Box::new(now_iso.clone()))),
459 Value::String(Some(Box::new(cutoff.clone()))),
460 Value::String(Some(Box::new(queue.to_string()))),
461 ],
462 );
463 txn.execute(requeue).await.map_err(Error::Db)?;
464
465 let (pp1, pp2, pp3) = (ts_ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
468 let park_sql = format!(
469 "UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
470 WHERE status='claimed' AND claimed_at < {pp2} \
471 AND attempts + 1 >= max_retries AND queue = {pp3}"
472 );
473 let park = Statement::from_sql_and_values(
474 backend,
475 &park_sql,
476 [
477 Value::String(Some(Box::new(now_iso))),
478 Value::String(Some(Box::new(cutoff))),
479 Value::String(Some(Box::new(queue.to_string()))),
480 ],
481 );
482 txn.execute(park).await.map_err(Error::Db)?;
483
484 txn.commit().await.map_err(Error::Db)
485}
486
487#[allow(clippy::too_many_arguments)]
497pub async fn enqueue(
498 conn: &DatabaseConnection,
499 queue: &str,
500 job_type: &str,
501 payload: &str,
502 max_retries: u32,
503 idempotency_key: Option<&str>,
504 tenant_id: Option<i64>,
505 available_at: DateTime<Utc>,
506) -> Result<(), Error> {
507 let backend = conn.get_database_backend();
508 let now_iso = Utc::now().to_rfc3339();
509 let available_iso = available_at.to_rfc3339();
510
511 if let Some(idem) = idempotency_key {
512 let (p1, p2, p3, p4, p5, p6, p7, p8) = (
514 ph(backend, 1),
515 ph(backend, 2),
516 ph(backend, 3),
517 ph(backend, 4),
518 ph(backend, 5),
519 ph(backend, 6),
520 ts_ph(backend, 7),
521 ts_ph(backend, 8),
522 );
523 let sql = format!(
524 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
525 idempotency_key, tenant_id, available_at, created_at) \
526 SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
527 WHERE NOT EXISTS ( \
528 SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
529 AND status IN ('pending','claimed') \
530 )"
531 );
532 let stmt = Statement::from_sql_and_values(
533 backend,
534 &sql,
535 [
536 Value::String(Some(Box::new(queue.to_string()))),
537 Value::String(Some(Box::new(job_type.to_string()))),
538 Value::String(Some(Box::new(payload.to_string()))),
539 Value::Int(Some(max_retries as i32)),
540 Value::String(Some(Box::new(idem.to_string()))),
541 tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
542 Value::String(Some(Box::new(available_iso))),
543 Value::String(Some(Box::new(now_iso))),
544 ],
545 );
546 conn.execute(stmt).await.map_err(Error::Db)?;
547 } else {
548 let (p1, p2, p3, p4, p5, p6, p7) = (
550 ph(backend, 1),
551 ph(backend, 2),
552 ph(backend, 3),
553 ph(backend, 4),
554 ph(backend, 5),
555 ts_ph(backend, 6),
556 ts_ph(backend, 7),
557 );
558 let sql = format!(
559 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
560 tenant_id, available_at, created_at) \
561 VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
562 );
563 let stmt = Statement::from_sql_and_values(
564 backend,
565 &sql,
566 [
567 Value::String(Some(Box::new(queue.to_string()))),
568 Value::String(Some(Box::new(job_type.to_string()))),
569 Value::String(Some(Box::new(payload.to_string()))),
570 Value::Int(Some(max_retries as i32)),
571 tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
572 Value::String(Some(Box::new(available_iso))),
573 Value::String(Some(Box::new(now_iso))),
574 ],
575 );
576 conn.execute(stmt).await.map_err(Error::Db)?;
577 }
578
579 Ok(())
580}
581
582pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
588 let backend = conn.get_database_backend();
589 let p1 = ph(backend, 1);
590 let sql = format!("DELETE FROM jobs WHERE id = {p1}");
591 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
592 conn.execute(stmt).await.map_err(Error::Db)?;
593 Ok(())
594}
595
596pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
598 let backend = conn.get_database_backend();
599 let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
600 let sql =
601 format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
602 let stmt = Statement::from_sql_and_values(
603 backend,
604 &sql,
605 [
606 Value::String(Some(Box::new(error.to_string()))),
607 Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
608 Value::BigInt(Some(id)),
609 ],
610 );
611 conn.execute(stmt).await.map_err(Error::Db)?;
612 Ok(())
613}
614
615pub async fn release_job(
618 conn: &DatabaseConnection,
619 id: i64,
620 attempts: u32,
621 available_at: DateTime<Utc>,
622) -> Result<(), Error> {
623 let backend = conn.get_database_backend();
624 let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
625 let sql = format!(
626 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
627 attempts={p1}, available_at={p2} WHERE id = {p3}"
628 );
629 let stmt = Statement::from_sql_and_values(
630 backend,
631 &sql,
632 [
633 Value::Int(Some(attempts as i32)),
634 Value::String(Some(Box::new(available_at.to_rfc3339()))),
635 Value::BigInt(Some(id)),
636 ],
637 );
638 conn.execute(stmt).await.map_err(Error::Db)?;
639 Ok(())
640}
641
642pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
645 let backend = conn.get_database_backend();
646 let p1 = ph(backend, 1);
647 let sql = format!(
648 "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
649 WHERE status='claimed' AND claimed_by={p1}"
650 );
651 let stmt = Statement::from_sql_and_values(
652 backend,
653 &sql,
654 [Value::String(Some(Box::new(worker_id.to_string())))],
655 );
656 conn.execute(stmt).await.map_err(Error::Db)?;
657 Ok(())
658}
659
660pub async fn reap_startup_claims(
677 conn: &DatabaseConnection,
678 queues: &[String],
679) -> Result<u64, Error> {
680 if queues.is_empty() {
681 return Ok(0);
682 }
683 let backend = conn.get_database_backend();
684 let now_iso = Utc::now().to_rfc3339();
685
686 let queue_phs: Vec<String> = (1..=queues.len()).map(|i| ph(backend, i)).collect();
690 let failed_at_ph = ts_ph(backend, queues.len() + 1);
691 let sql = format!(
692 "UPDATE jobs SET status='failed', \
693 error='reaped on worker startup (orphan claim from previous worker)', \
694 failed_at={failed_at_ph} \
695 WHERE status='claimed' AND queue IN ({})",
696 queue_phs.join(", "),
697 );
698
699 let mut values: Vec<Value> = queues
700 .iter()
701 .map(|q| Value::String(Some(Box::new(q.clone()))))
702 .collect();
703 values.push(Value::String(Some(Box::new(now_iso))));
704
705 let stmt = Statement::from_sql_and_values(backend, &sql, values);
706 let result = conn.execute(stmt).await.map_err(Error::Db)?;
707 Ok(result.rows_affected())
708}
709
710pub async fn get_pending_jobs(
716 conn: &DatabaseConnection,
717 queue: &str,
718 limit: u64,
719) -> Result<Vec<JobInfo>, Error> {
720 let backend = conn.get_database_backend();
721 let now_iso = Utc::now().to_rfc3339();
722 let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
723 let sql = format!(
724 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
725 FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
726 ORDER BY id LIMIT {p3}"
727 );
728 let stmt = Statement::from_sql_and_values(
729 backend,
730 &sql,
731 [
732 Value::String(Some(Box::new(queue.to_string()))),
733 Value::String(Some(Box::new(now_iso))),
734 Value::BigInt(Some(limit as i64)),
735 ],
736 );
737 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
738 rows.iter()
739 .map(|r| parse_job_info(r, JobState::Pending))
740 .collect()
741}
742
743pub async fn get_delayed_jobs(
745 conn: &DatabaseConnection,
746 queue: &str,
747 limit: u64,
748) -> Result<Vec<JobInfo>, Error> {
749 let backend = conn.get_database_backend();
750 let now_iso = Utc::now().to_rfc3339();
751 let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
752 let sql = format!(
753 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
754 FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
755 ORDER BY id LIMIT {p3}"
756 );
757 let stmt = Statement::from_sql_and_values(
758 backend,
759 &sql,
760 [
761 Value::String(Some(Box::new(queue.to_string()))),
762 Value::String(Some(Box::new(now_iso))),
763 Value::BigInt(Some(limit as i64)),
764 ],
765 );
766 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
767 rows.iter()
768 .map(|r| parse_job_info(r, JobState::Delayed))
769 .collect()
770}
771
772pub async fn get_failed_jobs(
774 conn: &DatabaseConnection,
775 limit: u64,
776) -> Result<Vec<FailedJobInfo>, Error> {
777 let backend = conn.get_database_backend();
778 let p1 = ph(backend, 1);
779 let sql = format!(
782 "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
783 error, failed_at FROM jobs WHERE status='failed' \
784 ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
785 );
786 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
787 let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
788 rows.iter().map(parse_failed_job_info).collect()
789}
790
791pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
793 let backend = conn.get_database_backend();
794 let now_iso = Utc::now().to_rfc3339();
795 let mut queue_stats = Vec::new();
796
797 for &q in queues {
798 let p1 = ph(backend, 1);
799 let p2 = ts_ph(backend, 2);
800 let pending_sql = format!(
801 "SELECT COUNT(*) as cnt FROM jobs \
802 WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
803 );
804 let pending_stmt = Statement::from_sql_and_values(
805 backend,
806 &pending_sql,
807 [
808 Value::String(Some(Box::new(q.to_string()))),
809 Value::String(Some(Box::new(now_iso.clone()))),
810 ],
811 );
812 let pending_row = conn
813 .query_one(pending_stmt)
814 .await
815 .map_err(Error::Db)?
816 .ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
817 let pending: i64 = pending_row
818 .try_get_by::<i64, _>("cnt")
819 .map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
820
821 let delayed_sql = format!(
822 "SELECT COUNT(*) as cnt FROM jobs \
823 WHERE status='pending' AND queue={p1} AND available_at > {p2}"
824 );
825 let delayed_stmt = Statement::from_sql_and_values(
826 backend,
827 &delayed_sql,
828 [
829 Value::String(Some(Box::new(q.to_string()))),
830 Value::String(Some(Box::new(now_iso.clone()))),
831 ],
832 );
833 let delayed_row = conn
834 .query_one(delayed_stmt)
835 .await
836 .map_err(Error::Db)?
837 .ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
838 let delayed: i64 = delayed_row
839 .try_get_by::<i64, _>("cnt")
840 .map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
841
842 queue_stats.push(SingleQueueStats {
843 name: q.to_string(),
844 pending: pending as usize,
845 delayed: delayed as usize,
846 });
847 }
848
849 let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
851 let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
852 let failed_row = conn
853 .query_one(failed_stmt)
854 .await
855 .map_err(Error::Db)?
856 .ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
857 let total_failed: i64 = failed_row
858 .try_get_by::<i64, _>("cnt")
859 .map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
860
861 Ok(QueueStats {
862 queues: queue_stats,
863 total_failed: total_failed as usize,
864 })
865}
866
867fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
872 let id: i64 = row
873 .try_get_by::<i64, _>("id")
874 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
875 let job_type: String = row
876 .try_get_by::<String, _>("job_type")
877 .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
878 let queue: String = row
879 .try_get_by::<String, _>("queue")
880 .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
881 let attempts: i32 = row
882 .try_get_by::<i32, _>("attempts")
883 .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
884 let max_retries: i32 = row
885 .try_get_by::<i32, _>("max_retries")
886 .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
887 let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
888 let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
889
890 Ok(JobInfo {
891 id,
892 job_type,
893 queue,
894 attempts: attempts as u32,
895 max_retries: max_retries as u32,
896 created_at,
897 available_at,
898 state,
899 })
900}
901
902fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
903 let job = parse_job_info(row, JobState::Failed)?;
904 let error: String = row
905 .try_get_by::<Option<String>, _>("error")
906 .map_err(|e| Error::custom(format!("parse error: {e}")))?
907 .unwrap_or_default();
908 let failed_at =
911 parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
912
913 Ok(FailedJobInfo {
914 job,
915 error,
916 failed_at,
917 })
918}
919
920#[cfg(test)]
925mod tests {
926 use super::*;
927 use sea_orm::Database;
928 use sea_orm_migration::MigratorTrait;
929
930 struct TestMigrator;
931
932 #[async_trait::async_trait]
933 impl MigratorTrait for TestMigrator {
934 fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
935 vec![Box::new(crate::migration::CreateJobsTable)]
936 }
937 }
938
939 async fn setup() -> DatabaseConnection {
941 let conn = Database::connect("sqlite::memory:")
942 .await
943 .expect("connect sqlite::memory:");
944 TestMigrator::up(&conn, None)
945 .await
946 .expect("run CreateJobsTable migration");
947 conn
948 }
949
950 #[allow(clippy::too_many_arguments)]
952 async fn insert_job(
953 conn: &DatabaseConnection,
954 queue: &str,
955 job_type: &str,
956 status: &str,
957 attempts: i32,
958 max_retries: i32,
959 claimed_at: Option<&str>,
960 available_at: &str,
961 ) -> i64 {
962 let now = Utc::now().to_rfc3339();
963 let claimed_at_sql = match claimed_at {
964 Some(ts) => format!("'{ts}'"),
965 None => "NULL".to_string(),
966 };
967 let sql = format!(
968 "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
969 available_at, claimed_at, created_at) \
970 VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
971 '{available_at}', {claimed_at_sql}, '{now}') \
972 RETURNING id"
973 );
974 let row = conn
975 .query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
976 .await
977 .expect("insert_job query")
978 .expect("insert_job row");
979 row.try_get_by::<i64, _>("id").expect("insert id")
980 }
981
982 #[test]
983 fn ts_ph_casts_timestamp_placeholders_on_postgres_only() {
984 assert_eq!(ts_ph(DatabaseBackend::Postgres, 2), "$2::timestamptz");
993 assert_eq!(ts_ph(DatabaseBackend::Sqlite, 2), "?2");
994 assert_eq!(ph(DatabaseBackend::Postgres, 2), "$2");
996 assert_eq!(ph(DatabaseBackend::Sqlite, 2), "?2");
997 }
998
999 #[tokio::test]
1000 async fn claim_returns_pending_job() {
1001 let conn = setup().await;
1002 let now = Utc::now().to_rfc3339();
1003
1004 insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
1006
1007 let job = claim(&conn, "default", "worker-1")
1009 .await
1010 .expect("claim failed");
1011 assert!(job.is_some(), "expected Some(job), got None");
1012 let job = job.unwrap();
1013 assert_eq!(job.job_type, "MyJob");
1014
1015 let second = claim(&conn, "default", "worker-2")
1017 .await
1018 .expect("second claim failed");
1019 assert!(second.is_none(), "second claim should return None");
1020 }
1021
1022 #[tokio::test]
1023 async fn idempotency_dedup() {
1024 let conn = setup().await;
1025 let now = Utc::now().to_rfc3339();
1026 let available_at = DateTime::parse_from_rfc3339(&now)
1027 .unwrap()
1028 .with_timezone(&Utc);
1029
1030 enqueue(
1032 &conn,
1033 "default",
1034 "MyJob",
1035 "{}",
1036 3,
1037 Some("key-abc"),
1038 None,
1039 available_at,
1040 )
1041 .await
1042 .expect("first enqueue");
1043 enqueue(
1044 &conn,
1045 "default",
1046 "MyJob",
1047 "{}",
1048 3,
1049 Some("key-abc"),
1050 None,
1051 available_at,
1052 )
1053 .await
1054 .expect("second enqueue (should be a no-op)");
1055
1056 let row = conn
1058 .query_one(Statement::from_string(
1059 DatabaseBackend::Sqlite,
1060 "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
1061 ))
1062 .await
1063 .expect("count query")
1064 .expect("count row");
1065 let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
1066 assert_eq!(
1067 cnt, 1,
1068 "idempotency key should deduplicate (expected 1 row)"
1069 );
1070
1071 enqueue(
1073 &conn,
1074 "default",
1075 "OtherJob",
1076 "{}",
1077 3,
1078 None,
1079 None,
1080 available_at,
1081 )
1082 .await
1083 .expect("first plain enqueue");
1084 enqueue(
1085 &conn,
1086 "default",
1087 "OtherJob",
1088 "{}",
1089 3,
1090 None,
1091 None,
1092 available_at,
1093 )
1094 .await
1095 .expect("second plain enqueue");
1096
1097 let row2 = conn
1098 .query_one(Statement::from_string(
1099 DatabaseBackend::Sqlite,
1100 "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
1101 ))
1102 .await
1103 .expect("count query 2")
1104 .expect("count row 2");
1105 let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
1106 assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
1107 }
1108
1109 #[tokio::test]
1110 async fn reaper_reclaims_stuck_job() {
1111 let conn = setup().await;
1112
1113 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1115 let now = Utc::now().to_rfc3339();
1116 let id = insert_job(
1117 &conn,
1118 "default",
1119 "StuckJob",
1120 "claimed",
1121 0,
1122 3,
1123 Some(&ten_min_ago),
1124 &now,
1125 )
1126 .await;
1127
1128 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1130 .await
1131 .expect("reaper failed");
1132
1133 let row = conn
1135 .query_one(Statement::from_string(
1136 DatabaseBackend::Sqlite,
1137 format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1138 ))
1139 .await
1140 .expect("select after reaper")
1141 .expect("row after reaper");
1142
1143 let status: String = row.try_get_by::<String, _>("status").expect("status");
1144 let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1145 assert_eq!(
1146 status, "pending",
1147 "reaper should reset stuck job to pending"
1148 );
1149 assert_eq!(attempts, 1, "reaper should increment attempts");
1150 }
1151
1152 #[tokio::test]
1153 async fn reaper_boundary_parks_last_attempt() {
1154 let conn = setup().await;
1158 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1159 let now = Utc::now().to_rfc3339();
1160 let id = insert_job(
1161 &conn,
1162 "default",
1163 "BoundaryJob",
1164 "claimed",
1165 2, 3, Some(&ten_min_ago),
1168 &now,
1169 )
1170 .await;
1171
1172 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1173 .await
1174 .expect("reaper failed");
1175
1176 let row = conn
1177 .query_one(Statement::from_string(
1178 DatabaseBackend::Sqlite,
1179 format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1180 ))
1181 .await
1182 .expect("select after reaper")
1183 .expect("row");
1184 let status: String = row.try_get_by::<String, _>("status").expect("status");
1185 let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1186 assert_eq!(
1187 status, "failed",
1188 "job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
1189 );
1190 assert_eq!(
1191 attempts, 2,
1192 "parked job keeps its attempt count (no further requeue)"
1193 );
1194 }
1195
1196 #[tokio::test]
1197 async fn reap_startup_claims_marks_orphans_failed() {
1198 let conn = setup().await;
1202 let now = Utc::now().to_rfc3339();
1203
1204 let orphan_default = insert_job(
1207 &conn,
1208 "default",
1209 "Orphan1",
1210 "claimed",
1211 0,
1212 3,
1213 Some(&now),
1214 &now,
1215 )
1216 .await;
1217 let orphan_publish = insert_job(
1218 &conn,
1219 "publish",
1220 "Orphan2",
1221 "claimed",
1222 1,
1223 3,
1224 Some(&now),
1225 &now,
1226 )
1227 .await;
1228 let orphan_other =
1229 insert_job(&conn, "other", "Orphan3", "claimed", 0, 3, Some(&now), &now).await;
1230 let pending_default =
1231 insert_job(&conn, "default", "Fresh", "pending", 0, 3, None, &now).await;
1232
1233 let reaped = reap_startup_claims(&conn, &["default".to_string(), "publish".to_string()])
1235 .await
1236 .expect("reap_startup_claims failed");
1237 assert_eq!(
1238 reaped, 2,
1239 "expected 2 orphan rows reaped (default + publish)"
1240 );
1241
1242 for id in [orphan_default, orphan_publish] {
1244 let row = conn
1245 .query_one(Statement::from_string(
1246 DatabaseBackend::Sqlite,
1247 format!("SELECT status, error, failed_at FROM jobs WHERE id={id}"),
1248 ))
1249 .await
1250 .expect("select after reap")
1251 .expect("row after reap");
1252 let status: String = row.try_get_by::<String, _>("status").expect("status");
1253 let error: Option<String> =
1254 row.try_get_by::<Option<String>, _>("error").expect("error");
1255 let failed_at: Option<String> = row
1256 .try_get_by::<Option<String>, _>("failed_at")
1257 .expect("failed_at");
1258 assert_eq!(status, "failed", "orphan must be parked as failed");
1259 assert!(
1260 error.as_deref().unwrap_or("").contains("orphan claim"),
1261 "error must record reap reason, got: {error:?}"
1262 );
1263 assert!(failed_at.is_some(), "failed_at must be stamped");
1264 }
1265
1266 let row = conn
1268 .query_one(Statement::from_string(
1269 DatabaseBackend::Sqlite,
1270 format!("SELECT status FROM jobs WHERE id={orphan_other}"),
1271 ))
1272 .await
1273 .expect("select untouched")
1274 .expect("row untouched");
1275 let status: String = row.try_get_by::<String, _>("status").expect("status");
1276 assert_eq!(
1277 status, "claimed",
1278 "orphan on a queue not handled by this worker must be left alone"
1279 );
1280
1281 let row = conn
1283 .query_one(Statement::from_string(
1284 DatabaseBackend::Sqlite,
1285 format!("SELECT status FROM jobs WHERE id={pending_default}"),
1286 ))
1287 .await
1288 .expect("select pending")
1289 .expect("row pending");
1290 let status: String = row.try_get_by::<String, _>("status").expect("status");
1291 assert_eq!(status, "pending", "pending row must not be reaped");
1292 }
1293
1294 #[tokio::test]
1295 async fn reap_startup_claims_empty_queues_is_noop() {
1296 let conn = setup().await;
1299 let now = Utc::now().to_rfc3339();
1300 insert_job(
1301 &conn,
1302 "default",
1303 "Orphan",
1304 "claimed",
1305 0,
1306 3,
1307 Some(&now),
1308 &now,
1309 )
1310 .await;
1311
1312 let reaped = reap_startup_claims(&conn, &[])
1313 .await
1314 .expect("reap on empty queues must succeed");
1315 assert_eq!(reaped, 0);
1316 }
1317
1318 #[tokio::test]
1319 async fn poison_job_parked() {
1320 let conn = setup().await;
1321
1322 let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1324 let now = Utc::now().to_rfc3339();
1325 let id = insert_job(
1326 &conn,
1327 "default",
1328 "PoisonJob",
1329 "claimed",
1330 3,
1331 3,
1332 Some(&ten_min_ago),
1333 &now,
1334 )
1335 .await;
1336
1337 reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1339 .await
1340 .expect("reaper failed");
1341
1342 let row = conn
1344 .query_one(Statement::from_string(
1345 DatabaseBackend::Sqlite,
1346 format!("SELECT status, error FROM jobs WHERE id={id}"),
1347 ))
1348 .await
1349 .expect("select after reaper")
1350 .expect("row");
1351
1352 let status: String = row.try_get_by::<String, _>("status").expect("status");
1353 let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
1354 assert_eq!(status, "failed", "exhausted job should be parked as failed");
1355 assert!(error.is_some(), "failed job should have an error message");
1356
1357 let available = Utc::now().to_rfc3339();
1359 insert_job(
1360 &conn, "default", "FreshJob", "pending", 0, 3, None, &available,
1361 )
1362 .await;
1363
1364 let claimed = claim(&conn, "default", "worker-1")
1365 .await
1366 .expect("claim after poison park");
1367 assert!(
1368 claimed.is_some(),
1369 "fresh job should be claimable after poison job is parked"
1370 );
1371 let claimed = claimed.unwrap();
1372 assert_eq!(claimed.job_type, "FreshJob");
1373 }
1374}