use chrono::{DateTime, Utc};
use sea_orm::{
ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
};
use serde::{Deserialize, Serialize};
use crate::error::Error;
static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
pub struct Queue;
impl Queue {
pub fn connection() -> &'static DatabaseConnection {
GLOBAL_CONNECTION
.get()
.expect("Queue not initialized. Call Queue::init() first.")
}
pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
GLOBAL_CONNECTION
.set(conn)
.map_err(|_| Error::custom("Queue already initialized"))?;
Ok(())
}
pub fn is_initialized() -> bool {
GLOBAL_CONNECTION.get().is_some()
}
pub fn register<J>()
where
J: crate::Job + serde::de::DeserializeOwned + 'static,
{
JOB_REGISTRARS
.lock()
.unwrap()
.push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
}
pub fn has_registered_jobs() -> bool {
!JOB_REGISTRARS.lock().unwrap().is_empty()
}
pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
for r in JOB_REGISTRARS.lock().unwrap().iter() {
r(w);
}
}
}
#[derive(Debug, Clone)]
pub struct JobRow {
pub id: i64,
pub job_type: String,
pub payload: String,
pub queue: String,
pub attempts: u32,
pub max_retries: u32,
pub idempotency_key: Option<String>,
pub tenant_id: Option<i64>,
pub available_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JobState {
Pending,
Delayed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobInfo {
pub id: i64,
pub job_type: String,
pub queue: String,
pub attempts: u32,
pub max_retries: u32,
pub created_at: String,
pub available_at: String,
pub state: JobState,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SingleQueueStats {
pub name: String,
pub pending: usize,
pub delayed: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QueueStats {
pub queues: Vec<SingleQueueStats>,
pub total_failed: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailedJobInfo {
pub job: JobInfo,
pub error: String,
pub failed_at: DateTime<Utc>,
}
fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
let id: i64 = row
.try_get_by::<i64, _>("id")
.map_err(|e| Error::custom(format!("parse id: {e}")))?;
let job_type: String = row
.try_get_by::<String, _>("job_type")
.map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
let payload: String = row
.try_get_by::<String, _>("payload")
.map_err(|e| Error::custom(format!("parse payload: {e}")))?;
let queue: String = row
.try_get_by::<String, _>("queue")
.map_err(|e| Error::custom(format!("parse queue: {e}")))?;
let attempts: i32 = row
.try_get_by::<i32, _>("attempts")
.map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
let max_retries: i32 = row
.try_get_by::<i32, _>("max_retries")
.map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
let idempotency_key: Option<String> = row
.try_get_by::<Option<String>, _>("idempotency_key")
.map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
let tenant_id: Option<i64> = row
.try_get_by::<Option<i64>, _>("tenant_id")
.map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
let available_at = parse_timestamp(row, "available_at")?;
let created_at = parse_timestamp(row, "created_at")?;
Ok(JobRow {
id,
job_type,
payload,
queue,
attempts: attempts as u32,
max_retries: max_retries as u32,
idempotency_key,
tenant_id,
available_at,
created_at,
})
}
fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
return Ok(dt);
}
let s: String = row
.try_get_by::<String, _>(col)
.map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
}
fn parse_optional_timestamp(
row: &sea_orm::QueryResult,
col: &str,
) -> Result<Option<DateTime<Utc>>, Error> {
if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
return Ok(opt);
}
let s: Option<String> = row
.try_get_by::<Option<String>, _>(col)
.map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
match s {
None => Ok(None),
Some(s) => DateTime::parse_from_rfc3339(&s)
.map(|dt| Some(dt.with_timezone(&Utc)))
.map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
}
}
fn ph(backend: DatabaseBackend, n: usize) -> String {
match backend {
DatabaseBackend::Postgres => format!("${n}"),
_ => format!("?{n}"),
}
}
pub async fn claim(
conn: &DatabaseConnection,
queue: &str,
worker_id: &str,
) -> Result<Option<JobRow>, Error> {
match conn.get_database_backend() {
DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
_ => Err(Error::UnsupportedBackend),
}
}
async fn claim_postgres(
conn: &DatabaseConnection,
queue: &str,
worker_id: &str,
) -> Result<Option<JobRow>, Error> {
let txn = conn.begin().await.map_err(Error::Db)?;
let select = Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
tenant_id, available_at, created_at FROM jobs \
WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
[Value::String(Some(Box::new(queue.to_string())))],
);
let row = txn.query_one(select).await.map_err(Error::Db)?;
let Some(row) = row else {
txn.commit().await.map_err(Error::Db)?;
return Ok(None);
};
let job = parse_job_row(&row)?;
let upd = Statement::from_sql_and_values(
DatabaseBackend::Postgres,
"UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
[
Value::BigInt(Some(job.id)),
Value::String(Some(Box::new(worker_id.to_string()))),
],
);
txn.execute(upd).await.map_err(Error::Db)?;
txn.commit().await.map_err(Error::Db)?;
Ok(Some(job))
}
async fn claim_sqlite(
conn: &DatabaseConnection,
queue: &str,
worker_id: &str,
) -> Result<Option<JobRow>, Error> {
let now_iso = Utc::now().to_rfc3339();
let txn = conn.begin().await.map_err(Error::Db)?;
let stmt = Statement::from_sql_and_values(
DatabaseBackend::Sqlite,
"UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
RETURNING id, job_type, payload, queue, attempts, max_retries, \
idempotency_key, tenant_id, available_at, created_at",
[
Value::String(Some(Box::new(now_iso))),
Value::String(Some(Box::new(worker_id.to_string()))),
Value::String(Some(Box::new(queue.to_string()))),
],
);
let row = match txn.query_one(stmt).await {
Ok(r) => r,
Err(e) => {
let _ = txn.rollback().await;
return Err(Error::Db(e));
}
};
txn.commit().await.map_err(Error::Db)?;
row.map(|r| parse_job_row(&r)).transpose()
}
pub async fn reaper(
conn: &DatabaseConnection,
queue: &str,
visibility_timeout: std::time::Duration,
) -> Result<(), Error> {
let now = Utc::now();
let duration = chrono::Duration::from_std(visibility_timeout)
.map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
let cutoff = (now - duration).to_rfc3339();
let now_iso = now.to_rfc3339();
let backend = conn.get_database_backend();
let txn = conn.begin().await.map_err(Error::Db)?;
let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
let requeue_sql = format!(
"UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
attempts = attempts + 1, available_at = {p1} \
WHERE status='claimed' AND claimed_at < {p2} \
AND attempts + 1 < max_retries AND queue = {p3}"
);
let requeue = Statement::from_sql_and_values(
backend,
&requeue_sql,
[
Value::String(Some(Box::new(now_iso.clone()))),
Value::String(Some(Box::new(cutoff.clone()))),
Value::String(Some(Box::new(queue.to_string()))),
],
);
txn.execute(requeue).await.map_err(Error::Db)?;
let (pp1, pp2, pp3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
let park_sql = format!(
"UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
WHERE status='claimed' AND claimed_at < {pp2} \
AND attempts + 1 >= max_retries AND queue = {pp3}"
);
let park = Statement::from_sql_and_values(
backend,
&park_sql,
[
Value::String(Some(Box::new(now_iso))),
Value::String(Some(Box::new(cutoff))),
Value::String(Some(Box::new(queue.to_string()))),
],
);
txn.execute(park).await.map_err(Error::Db)?;
txn.commit().await.map_err(Error::Db)
}
#[allow(clippy::too_many_arguments)]
pub async fn enqueue(
conn: &DatabaseConnection,
queue: &str,
job_type: &str,
payload: &str,
max_retries: u32,
idempotency_key: Option<&str>,
tenant_id: Option<i64>,
available_at: DateTime<Utc>,
) -> Result<(), Error> {
let backend = conn.get_database_backend();
let now_iso = Utc::now().to_rfc3339();
let available_iso = available_at.to_rfc3339();
if let Some(idem) = idempotency_key {
let (p1, p2, p3, p4, p5, p6, p7, p8) = (
ph(backend, 1),
ph(backend, 2),
ph(backend, 3),
ph(backend, 4),
ph(backend, 5),
ph(backend, 6),
ph(backend, 7),
ph(backend, 8),
);
let sql = format!(
"INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
idempotency_key, tenant_id, available_at, created_at) \
SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
WHERE NOT EXISTS ( \
SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
AND status IN ('pending','claimed') \
)"
);
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[
Value::String(Some(Box::new(queue.to_string()))),
Value::String(Some(Box::new(job_type.to_string()))),
Value::String(Some(Box::new(payload.to_string()))),
Value::Int(Some(max_retries as i32)),
Value::String(Some(Box::new(idem.to_string()))),
tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
Value::String(Some(Box::new(available_iso))),
Value::String(Some(Box::new(now_iso))),
],
);
conn.execute(stmt).await.map_err(Error::Db)?;
} else {
let (p1, p2, p3, p4, p5, p6, p7) = (
ph(backend, 1),
ph(backend, 2),
ph(backend, 3),
ph(backend, 4),
ph(backend, 5),
ph(backend, 6),
ph(backend, 7),
);
let sql = format!(
"INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
tenant_id, available_at, created_at) \
VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
);
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[
Value::String(Some(Box::new(queue.to_string()))),
Value::String(Some(Box::new(job_type.to_string()))),
Value::String(Some(Box::new(payload.to_string()))),
Value::Int(Some(max_retries as i32)),
tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
Value::String(Some(Box::new(available_iso))),
Value::String(Some(Box::new(now_iso))),
],
);
conn.execute(stmt).await.map_err(Error::Db)?;
}
Ok(())
}
pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
let backend = conn.get_database_backend();
let p1 = ph(backend, 1);
let sql = format!("DELETE FROM jobs WHERE id = {p1}");
let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
conn.execute(stmt).await.map_err(Error::Db)?;
Ok(())
}
pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
let backend = conn.get_database_backend();
let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
let sql =
format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[
Value::String(Some(Box::new(error.to_string()))),
Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
Value::BigInt(Some(id)),
],
);
conn.execute(stmt).await.map_err(Error::Db)?;
Ok(())
}
pub async fn release_job(
conn: &DatabaseConnection,
id: i64,
attempts: u32,
available_at: DateTime<Utc>,
) -> Result<(), Error> {
let backend = conn.get_database_backend();
let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
let sql = format!(
"UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
attempts={p1}, available_at={p2} WHERE id = {p3}"
);
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[
Value::Int(Some(attempts as i32)),
Value::String(Some(Box::new(available_at.to_rfc3339()))),
Value::BigInt(Some(id)),
],
);
conn.execute(stmt).await.map_err(Error::Db)?;
Ok(())
}
pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
let backend = conn.get_database_backend();
let p1 = ph(backend, 1);
let sql = format!(
"UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
WHERE status='claimed' AND claimed_by={p1}"
);
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[Value::String(Some(Box::new(worker_id.to_string())))],
);
conn.execute(stmt).await.map_err(Error::Db)?;
Ok(())
}
pub async fn get_pending_jobs(
conn: &DatabaseConnection,
queue: &str,
limit: u64,
) -> Result<Vec<JobInfo>, Error> {
let backend = conn.get_database_backend();
let now_iso = Utc::now().to_rfc3339();
let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
let sql = format!(
"SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
ORDER BY id LIMIT {p3}"
);
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[
Value::String(Some(Box::new(queue.to_string()))),
Value::String(Some(Box::new(now_iso))),
Value::BigInt(Some(limit as i64)),
],
);
let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
rows.iter()
.map(|r| parse_job_info(r, JobState::Pending))
.collect()
}
pub async fn get_delayed_jobs(
conn: &DatabaseConnection,
queue: &str,
limit: u64,
) -> Result<Vec<JobInfo>, Error> {
let backend = conn.get_database_backend();
let now_iso = Utc::now().to_rfc3339();
let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
let sql = format!(
"SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
ORDER BY id LIMIT {p3}"
);
let stmt = Statement::from_sql_and_values(
backend,
&sql,
[
Value::String(Some(Box::new(queue.to_string()))),
Value::String(Some(Box::new(now_iso))),
Value::BigInt(Some(limit as i64)),
],
);
let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
rows.iter()
.map(|r| parse_job_info(r, JobState::Delayed))
.collect()
}
pub async fn get_failed_jobs(
conn: &DatabaseConnection,
limit: u64,
) -> Result<Vec<FailedJobInfo>, Error> {
let backend = conn.get_database_backend();
let p1 = ph(backend, 1);
let sql = format!(
"SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
error, failed_at FROM jobs WHERE status='failed' \
ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
);
let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
rows.iter().map(parse_failed_job_info).collect()
}
pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
let backend = conn.get_database_backend();
let now_iso = Utc::now().to_rfc3339();
let mut queue_stats = Vec::new();
for &q in queues {
let p1 = ph(backend, 1);
let p2 = ph(backend, 2);
let p3 = ph(backend, 3);
let pending_sql = format!(
"SELECT COUNT(*) as cnt FROM jobs \
WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
);
let pending_stmt = Statement::from_sql_and_values(
backend,
&pending_sql,
[
Value::String(Some(Box::new(q.to_string()))),
Value::String(Some(Box::new(now_iso.clone()))),
],
);
let pending_row = conn
.query_one(pending_stmt)
.await
.map_err(Error::Db)?
.ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
let pending: i64 = pending_row
.try_get_by::<i64, _>("cnt")
.map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
let delayed_sql = format!(
"SELECT COUNT(*) as cnt FROM jobs \
WHERE status='pending' AND queue={p1} AND available_at > {p3}"
);
let delayed_stmt = Statement::from_sql_and_values(
backend,
&delayed_sql,
[
Value::String(Some(Box::new(q.to_string()))),
Value::String(Some(Box::new(now_iso.clone()))),
],
);
let delayed_row = conn
.query_one(delayed_stmt)
.await
.map_err(Error::Db)?
.ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
let delayed: i64 = delayed_row
.try_get_by::<i64, _>("cnt")
.map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
queue_stats.push(SingleQueueStats {
name: q.to_string(),
pending: pending as usize,
delayed: delayed as usize,
});
}
let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
let failed_row = conn
.query_one(failed_stmt)
.await
.map_err(Error::Db)?
.ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
let total_failed: i64 = failed_row
.try_get_by::<i64, _>("cnt")
.map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
Ok(QueueStats {
queues: queue_stats,
total_failed: total_failed as usize,
})
}
fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
let id: i64 = row
.try_get_by::<i64, _>("id")
.map_err(|e| Error::custom(format!("parse id: {e}")))?;
let job_type: String = row
.try_get_by::<String, _>("job_type")
.map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
let queue: String = row
.try_get_by::<String, _>("queue")
.map_err(|e| Error::custom(format!("parse queue: {e}")))?;
let attempts: i32 = row
.try_get_by::<i32, _>("attempts")
.map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
let max_retries: i32 = row
.try_get_by::<i32, _>("max_retries")
.map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
Ok(JobInfo {
id,
job_type,
queue,
attempts: attempts as u32,
max_retries: max_retries as u32,
created_at,
available_at,
state,
})
}
fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
let job = parse_job_info(row, JobState::Failed)?;
let error: String = row
.try_get_by::<Option<String>, _>("error")
.map_err(|e| Error::custom(format!("parse error: {e}")))?
.unwrap_or_default();
let failed_at =
parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
Ok(FailedJobInfo {
job,
error,
failed_at,
})
}
#[cfg(test)]
mod tests {
use super::*;
use sea_orm::Database;
use sea_orm_migration::MigratorTrait;
struct TestMigrator;
#[async_trait::async_trait]
impl MigratorTrait for TestMigrator {
fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
vec![Box::new(crate::migration::CreateJobsTable)]
}
}
async fn setup() -> DatabaseConnection {
let conn = Database::connect("sqlite::memory:")
.await
.expect("connect sqlite::memory:");
TestMigrator::up(&conn, None)
.await
.expect("run CreateJobsTable migration");
conn
}
#[allow(clippy::too_many_arguments)]
async fn insert_job(
conn: &DatabaseConnection,
queue: &str,
job_type: &str,
status: &str,
attempts: i32,
max_retries: i32,
claimed_at: Option<&str>,
available_at: &str,
) -> i64 {
let now = Utc::now().to_rfc3339();
let claimed_at_sql = match claimed_at {
Some(ts) => format!("'{ts}'"),
None => "NULL".to_string(),
};
let sql = format!(
"INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
available_at, claimed_at, created_at) \
VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
'{available_at}', {claimed_at_sql}, '{now}') \
RETURNING id"
);
let row = conn
.query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
.await
.expect("insert_job query")
.expect("insert_job row");
row.try_get_by::<i64, _>("id").expect("insert id")
}
#[tokio::test]
async fn claim_returns_pending_job() {
let conn = setup().await;
let now = Utc::now().to_rfc3339();
insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
let job = claim(&conn, "default", "worker-1")
.await
.expect("claim failed");
assert!(job.is_some(), "expected Some(job), got None");
let job = job.unwrap();
assert_eq!(job.job_type, "MyJob");
let second = claim(&conn, "default", "worker-2")
.await
.expect("second claim failed");
assert!(second.is_none(), "second claim should return None");
}
#[tokio::test]
async fn idempotency_dedup() {
let conn = setup().await;
let now = Utc::now().to_rfc3339();
let available_at = DateTime::parse_from_rfc3339(&now)
.unwrap()
.with_timezone(&Utc);
enqueue(
&conn,
"default",
"MyJob",
"{}",
3,
Some("key-abc"),
None,
available_at,
)
.await
.expect("first enqueue");
enqueue(
&conn,
"default",
"MyJob",
"{}",
3,
Some("key-abc"),
None,
available_at,
)
.await
.expect("second enqueue (should be a no-op)");
let row = conn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
))
.await
.expect("count query")
.expect("count row");
let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
assert_eq!(
cnt, 1,
"idempotency key should deduplicate (expected 1 row)"
);
enqueue(
&conn,
"default",
"OtherJob",
"{}",
3,
None,
None,
available_at,
)
.await
.expect("first plain enqueue");
enqueue(
&conn,
"default",
"OtherJob",
"{}",
3,
None,
None,
available_at,
)
.await
.expect("second plain enqueue");
let row2 = conn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
))
.await
.expect("count query 2")
.expect("count row 2");
let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
}
#[tokio::test]
async fn reaper_reclaims_stuck_job() {
let conn = setup().await;
let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
let now = Utc::now().to_rfc3339();
let id = insert_job(
&conn,
"default",
"StuckJob",
"claimed",
0,
3,
Some(&ten_min_ago),
&now,
)
.await;
reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
.await
.expect("reaper failed");
let row = conn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT status, attempts FROM jobs WHERE id={id}"),
))
.await
.expect("select after reaper")
.expect("row after reaper");
let status: String = row.try_get_by::<String, _>("status").expect("status");
let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
assert_eq!(
status, "pending",
"reaper should reset stuck job to pending"
);
assert_eq!(attempts, 1, "reaper should increment attempts");
}
#[tokio::test]
async fn reaper_boundary_parks_last_attempt() {
let conn = setup().await;
let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
let now = Utc::now().to_rfc3339();
let id = insert_job(
&conn,
"default",
"BoundaryJob",
"claimed",
2, 3, Some(&ten_min_ago),
&now,
)
.await;
reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
.await
.expect("reaper failed");
let row = conn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT status, attempts FROM jobs WHERE id={id}"),
))
.await
.expect("select after reaper")
.expect("row");
let status: String = row.try_get_by::<String, _>("status").expect("status");
let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
assert_eq!(
status, "failed",
"job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
);
assert_eq!(
attempts, 2,
"parked job keeps its attempt count (no further requeue)"
);
}
#[tokio::test]
async fn poison_job_parked() {
let conn = setup().await;
let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
let now = Utc::now().to_rfc3339();
let id = insert_job(
&conn,
"default",
"PoisonJob",
"claimed",
3,
3,
Some(&ten_min_ago),
&now,
)
.await;
reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
.await
.expect("reaper failed");
let row = conn
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT status, error FROM jobs WHERE id={id}"),
))
.await
.expect("select after reaper")
.expect("row");
let status: String = row.try_get_by::<String, _>("status").expect("status");
let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
assert_eq!(status, "failed", "exhausted job should be parked as failed");
assert!(error.is_some(), "failed job should have an error message");
let available = Utc::now().to_rfc3339();
insert_job(
&conn, "default", "FreshJob", "pending", 0, 3, None, &available,
)
.await;
let claimed = claim(&conn, "default", "worker-1")
.await
.expect("claim after poison park");
assert!(
claimed.is_some(),
"fresh job should be claimable after poison job is parked"
);
let claimed = claimed.unwrap();
assert_eq!(claimed.job_type, "FreshJob");
}
}