Skip to main content

ferro_queue/
db.rs

1//! DB engine for ferro-queue: dual-backend atomic claim, idempotent enqueue,
2//! stuck-job reaper, job lifecycle ops, and the `Queue` global.
3//!
4//! Supports SQLite (single `UPDATE … RETURNING` inside a pinned transaction)
5//! and Postgres (`FOR UPDATE SKIP LOCKED` inside a transaction). All dynamic values are
6//! bound via `Statement::from_sql_and_values` — no string interpolation of
7//! caller-supplied data (T-185-01).
8
9use chrono::{DateTime, Utc};
10use sea_orm::{
11    ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
12};
13use serde::{Deserialize, Serialize};
14
15use crate::error::Error;
16
17// ---------------------------------------------------------------------------
18// Queue global
19// ---------------------------------------------------------------------------
20
21static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
22
23/// Registered job-type applier functions, collected before the server starts.
24type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
25static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
26
27/// Global handle to the queue's database connection.
28///
29/// Initialise once at application start with [`Queue::init`]; worker loops
30/// and dispatcher call [`Queue::connection`] to get the static reference.
31pub struct Queue;
32
33impl Queue {
34    /// Return a reference to the global `DatabaseConnection`.
35    ///
36    /// # Panics
37    ///
38    /// Panics if [`Queue::init`] has not been called yet.
39    pub fn connection() -> &'static DatabaseConnection {
40        GLOBAL_CONNECTION
41            .get()
42            .expect("Queue not initialized. Call Queue::init() first.")
43    }
44
45    /// Store `conn` as the global connection.
46    ///
47    /// Returns `Err` if called more than once.
48    pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
49        GLOBAL_CONNECTION
50            .set(conn)
51            .map_err(|_| Error::custom("Queue already initialized"))?;
52        Ok(())
53    }
54
55    /// Returns `true` if [`Queue::init`] has been called.
56    pub fn is_initialized() -> bool {
57        GLOBAL_CONNECTION.get().is_some()
58    }
59
60    /// Register a job type for auto-start by the framework's WorkerLoop.
61    ///
62    /// Call this in your application bootstrap before the server starts.
63    /// The framework's server boot path inspects [`Queue::has_registered_jobs`]
64    /// and spawns a `WorkerLoop` automatically when at least one type is registered.
65    pub fn register<J>()
66    where
67        J: crate::Job + serde::de::DeserializeOwned + 'static,
68    {
69        JOB_REGISTRARS
70            .lock()
71            .unwrap()
72            .push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
73    }
74
75    /// Returns `true` if at least one job type has been registered via [`Queue::register`].
76    pub fn has_registered_jobs() -> bool {
77        !JOB_REGISTRARS.lock().unwrap().is_empty()
78    }
79
80    /// Apply all registered job types to the given `WorkerLoop`.
81    ///
82    /// Used internally by [`WorkerLoop::from_registry`].
83    pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
84        for r in JOB_REGISTRARS.lock().unwrap().iter() {
85            r(w);
86        }
87    }
88}
89
90// ---------------------------------------------------------------------------
91// JobRow — a claimed row from the jobs table
92// ---------------------------------------------------------------------------
93
94/// A row read from the `jobs` table during a claim operation.
95#[derive(Debug, Clone)]
96pub struct JobRow {
97    /// Primary key.
98    pub id: i64,
99    /// Fully-qualified type name of the job (used to route to the correct handler).
100    pub job_type: String,
101    /// JSON-serialized job data.
102    pub payload: String,
103    /// Name of the queue this job belongs to.
104    pub queue: String,
105    /// Number of execution attempts so far.
106    pub attempts: u32,
107    /// Maximum attempts before the job is parked as failed.
108    pub max_retries: u32,
109    /// Optional deduplication key.
110    pub idempotency_key: Option<String>,
111    /// Optional tenant scope.
112    pub tenant_id: Option<i64>,
113    /// Earliest time the job may be claimed.
114    pub available_at: DateTime<Utc>,
115    /// Insertion timestamp.
116    pub created_at: DateTime<Utc>,
117}
118
119// ---------------------------------------------------------------------------
120// Introspection types (moved from queue.rs)
121// ---------------------------------------------------------------------------
122
123/// Job status for introspection queries.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum JobState {
127    /// Waiting to be claimed.
128    Pending,
129    /// Scheduled for a future time.
130    Delayed,
131    /// Permanently failed after exhausting retries.
132    Failed,
133}
134
135/// Summary of a single job for introspection.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct JobInfo {
138    /// Primary key.
139    pub id: i64,
140    /// Fully-qualified job type name.
141    pub job_type: String,
142    /// Queue name.
143    pub queue: String,
144    /// Execution attempt count.
145    pub attempts: u32,
146    /// Maximum retry count.
147    pub max_retries: u32,
148    /// Insertion timestamp (RFC 3339).
149    pub created_at: String,
150    /// Earliest eligible claim time (RFC 3339).
151    pub available_at: String,
152    /// Logical state.
153    pub state: JobState,
154}
155
156/// Per-queue pending/delayed counts.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct SingleQueueStats {
159    /// Queue name.
160    pub name: String,
161    /// Jobs with `status='pending'` and `available_at <= now`.
162    pub pending: usize,
163    /// Jobs with `status='pending'` and `available_at > now`.
164    pub delayed: usize,
165}
166
167/// Aggregate stats across all queues.
168#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169pub struct QueueStats {
170    /// Per-queue breakdown.
171    pub queues: Vec<SingleQueueStats>,
172    /// Total jobs with `status='failed'`.
173    pub total_failed: usize,
174}
175
176/// A failed job with the error message.
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FailedJobInfo {
179    /// Core job fields.
180    pub job: JobInfo,
181    /// Error message stored at failure time.
182    pub error: String,
183    /// When the job was parked as failed (recorded in the `failed_at` column;
184    /// falls back to `created_at` only for legacy rows that predate the column).
185    pub failed_at: DateTime<Utc>,
186}
187
188// ---------------------------------------------------------------------------
189// Internal helpers
190// ---------------------------------------------------------------------------
191
192/// Map a `QueryResult` row to a `JobRow`.
193fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
194    let id: i64 = row
195        .try_get_by::<i64, _>("id")
196        .map_err(|e| Error::custom(format!("parse id: {e}")))?;
197    let job_type: String = row
198        .try_get_by::<String, _>("job_type")
199        .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
200    let payload: String = row
201        .try_get_by::<String, _>("payload")
202        .map_err(|e| Error::custom(format!("parse payload: {e}")))?;
203    let queue: String = row
204        .try_get_by::<String, _>("queue")
205        .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
206    let attempts: i32 = row
207        .try_get_by::<i32, _>("attempts")
208        .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
209    let max_retries: i32 = row
210        .try_get_by::<i32, _>("max_retries")
211        .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
212    let idempotency_key: Option<String> = row
213        .try_get_by::<Option<String>, _>("idempotency_key")
214        .map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
215    let tenant_id: Option<i64> = row
216        .try_get_by::<Option<i64>, _>("tenant_id")
217        .map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
218
219    // available_at / created_at — SQLite stores as ISO-8601 text, Postgres as timestamptz.
220    // SeaORM maps both to DateTime<Utc> when the column is timestamp_with_time_zone; on
221    // SQLite we fall back to string parsing if the native mapping fails.
222    let available_at = parse_timestamp(row, "available_at")?;
223    let created_at = parse_timestamp(row, "created_at")?;
224
225    Ok(JobRow {
226        id,
227        job_type,
228        payload,
229        queue,
230        attempts: attempts as u32,
231        max_retries: max_retries as u32,
232        idempotency_key,
233        tenant_id,
234        available_at,
235        created_at,
236    })
237}
238
239/// Parse a timestamp column that may arrive as `DateTime<Utc>` (Postgres) or
240/// as an ISO-8601 string (SQLite).
241fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
242    // Try native DateTime<Utc> first (Postgres timestamptz).
243    if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
244        return Ok(dt);
245    }
246    // Fall back to string parsing (SQLite TEXT column).
247    let s: String = row
248        .try_get_by::<String, _>(col)
249        .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
250    DateTime::parse_from_rfc3339(&s)
251        .map(|dt| dt.with_timezone(&Utc))
252        .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
253}
254
255/// Parse a nullable timestamp column. Returns `Ok(None)` when the column is
256/// SQL NULL, mirroring `parse_timestamp`'s Postgres-then-SQLite fallback.
257fn parse_optional_timestamp(
258    row: &sea_orm::QueryResult,
259    col: &str,
260) -> Result<Option<DateTime<Utc>>, Error> {
261    // Native Option<DateTime<Utc>> first (Postgres timestamptz).
262    if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
263        return Ok(opt);
264    }
265    // Fall back to Option<String> (SQLite TEXT column).
266    let s: Option<String> = row
267        .try_get_by::<Option<String>, _>(col)
268        .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
269    match s {
270        None => Ok(None),
271        Some(s) => DateTime::parse_from_rfc3339(&s)
272            .map(|dt| Some(dt.with_timezone(&Utc)))
273            .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
274    }
275}
276
277/// SQL placeholder style: Postgres uses `$N`, SQLite uses `?N`.
278fn ph(backend: DatabaseBackend, n: usize) -> String {
279    match backend {
280        DatabaseBackend::Postgres => format!("${n}"),
281        _ => format!("?{n}"),
282    }
283}
284
285// ---------------------------------------------------------------------------
286// claim — atomic work-stealing claim (dual-backend)
287// ---------------------------------------------------------------------------
288
289/// Atomically claim one pending job from `queue`.
290///
291/// Returns `None` when the queue is empty or all eligible jobs are locked by
292/// another worker (Postgres). Uses:
293/// - Postgres: `SELECT … FOR UPDATE SKIP LOCKED` + `UPDATE` inside a transaction.
294/// - SQLite: single `UPDATE … RETURNING` inside a `conn.begin()` transaction so
295///   every statement is pinned to one pooled connection; the write lock is taken
296///   on the UPDATE itself (no prior read to upgrade, so no `BEGIN IMMEDIATE`
297///   needed).
298pub async fn claim(
299    conn: &DatabaseConnection,
300    queue: &str,
301    worker_id: &str,
302) -> Result<Option<JobRow>, Error> {
303    match conn.get_database_backend() {
304        DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
305        DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
306        _ => Err(Error::UnsupportedBackend),
307    }
308}
309
310async fn claim_postgres(
311    conn: &DatabaseConnection,
312    queue: &str,
313    worker_id: &str,
314) -> Result<Option<JobRow>, Error> {
315    let txn = conn.begin().await.map_err(Error::Db)?;
316
317    let select = Statement::from_sql_and_values(
318        DatabaseBackend::Postgres,
319        "SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
320         tenant_id, available_at, created_at FROM jobs \
321         WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
322         ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
323        [Value::String(Some(Box::new(queue.to_string())))],
324    );
325
326    let row = txn.query_one(select).await.map_err(Error::Db)?;
327    let Some(row) = row else {
328        txn.commit().await.map_err(Error::Db)?;
329        return Ok(None);
330    };
331    let job = parse_job_row(&row)?;
332
333    let upd = Statement::from_sql_and_values(
334        DatabaseBackend::Postgres,
335        "UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
336        [
337            Value::BigInt(Some(job.id)),
338            Value::String(Some(Box::new(worker_id.to_string()))),
339        ],
340    );
341    txn.execute(upd).await.map_err(Error::Db)?;
342    txn.commit().await.map_err(Error::Db)?;
343
344    Ok(Some(job))
345}
346
347async fn claim_sqlite(
348    conn: &DatabaseConnection,
349    queue: &str,
350    worker_id: &str,
351) -> Result<Option<JobRow>, Error> {
352    let now_iso = Utc::now().to_rfc3339();
353
354    // Acquire a transaction handle: SeaORM `begin()` checks out ONE physical
355    // connection from the pool and pins every statement on this handle to it.
356    // This is the correctness-critical change (CR-01) — issuing BEGIN/UPDATE/
357    // COMMIT directly on `conn` let the statements land on different pooled
358    // connections, breaking atomicity and leaking an open transaction back into
359    // the pool on the error path.
360    //
361    // `begin()` emits a DEFERRED BEGIN on SQLite. We do not need an explicit
362    // BEGIN IMMEDIATE here: the claim is a single `UPDATE … RETURNING`, so the
363    // write lock is acquired on that first (and only) statement. There is no
364    // prior read in this txn that could trigger the deferred-read-then-upgrade
365    // deadlock that BEGIN IMMEDIATE exists to avoid. Atomicity is guaranteed by
366    // the statements being pinned to one connection inside the txn handle.
367    let txn = conn.begin().await.map_err(Error::Db)?;
368
369    let stmt = Statement::from_sql_and_values(
370        DatabaseBackend::Sqlite,
371        "UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
372         WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
373           AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
374         RETURNING id, job_type, payload, queue, attempts, max_retries, \
375           idempotency_key, tenant_id, available_at, created_at",
376        [
377            Value::String(Some(Box::new(now_iso))),
378            Value::String(Some(Box::new(worker_id.to_string()))),
379            Value::String(Some(Box::new(queue.to_string()))),
380        ],
381    );
382
383    let row = match txn.query_one(stmt).await {
384        Ok(r) => r,
385        Err(e) => {
386            // Roll back so the connection returns to the pool with no open txn.
387            let _ = txn.rollback().await;
388            return Err(Error::Db(e));
389        }
390    };
391    txn.commit().await.map_err(Error::Db)?;
392
393    row.map(|r| parse_job_row(&r)).transpose()
394}
395
396// ---------------------------------------------------------------------------
397// reaper — re-queue stuck claimed rows; park exhausted ones as failed
398// ---------------------------------------------------------------------------
399
400/// Re-queue claimed rows that have been held longer than `visibility_timeout`.
401///
402/// `attempts` is the number of attempts already completed. A row is requeued
403/// (for one more attempt) only while `attempts + 1 < max_retries`; once the
404/// next attempt would be the last allowed (`attempts + 1 >= max_retries`) the
405/// row is parked as `failed`. This matches the worker's handler-failure
406/// boundary (`handle_failure`: park when `attempts + 1 >= max_retries`) so a
407/// job gets the same total attempt count whether it fails via handler error or
408/// via visibility timeout (WR-01, T-185-04).
409pub async fn reaper(
410    conn: &DatabaseConnection,
411    queue: &str,
412    visibility_timeout: std::time::Duration,
413) -> Result<(), Error> {
414    let now = Utc::now();
415    let duration = chrono::Duration::from_std(visibility_timeout)
416        .map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
417    let cutoff = (now - duration).to_rfc3339();
418    let now_iso = now.to_rfc3339();
419
420    let backend = conn.get_database_backend();
421    let txn = conn.begin().await.map_err(Error::Db)?;
422
423    // Step 1: re-queue rows that still have a retry left after counting this
424    // timed-out attempt. The in-flight attempt (number `attempts + 1`) failed,
425    // so requeue only while `attempts + 1 < max_retries` — matching
426    // `worker::handle_failure`.
427    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
428    let requeue_sql = format!(
429        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
430         attempts = attempts + 1, available_at = {p1} \
431         WHERE status='claimed' AND claimed_at < {p2} \
432         AND attempts + 1 < max_retries AND queue = {p3}"
433    );
434    let requeue = Statement::from_sql_and_values(
435        backend,
436        &requeue_sql,
437        [
438            Value::String(Some(Box::new(now_iso.clone()))),
439            Value::String(Some(Box::new(cutoff.clone()))),
440            Value::String(Some(Box::new(queue.to_string()))),
441        ],
442    );
443    txn.execute(requeue).await.map_err(Error::Db)?;
444
445    // Step 2: park exhausted rows as failed, recording the failure time.
446    // Use fresh placeholders (independent statement, not continuing p1..p3).
447    let (pp1, pp2, pp3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
448    let park_sql = format!(
449        "UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
450         WHERE status='claimed' AND claimed_at < {pp2} \
451         AND attempts + 1 >= max_retries AND queue = {pp3}"
452    );
453    let park = Statement::from_sql_and_values(
454        backend,
455        &park_sql,
456        [
457            Value::String(Some(Box::new(now_iso))),
458            Value::String(Some(Box::new(cutoff))),
459            Value::String(Some(Box::new(queue.to_string()))),
460        ],
461    );
462    txn.execute(park).await.map_err(Error::Db)?;
463
464    txn.commit().await.map_err(Error::Db)
465}
466
467// ---------------------------------------------------------------------------
468// enqueue — idempotent insert
469// ---------------------------------------------------------------------------
470
471/// Insert a new job into the queue.
472///
473/// When `idempotency_key` is `Some`, skips the insert if a `pending` or
474/// `claimed` row with the same `(job_type, idempotency_key)` already exists
475/// (D-15, T-185-01).
476#[allow(clippy::too_many_arguments)]
477pub async fn enqueue(
478    conn: &DatabaseConnection,
479    queue: &str,
480    job_type: &str,
481    payload: &str,
482    max_retries: u32,
483    idempotency_key: Option<&str>,
484    tenant_id: Option<i64>,
485    available_at: DateTime<Utc>,
486) -> Result<(), Error> {
487    let backend = conn.get_database_backend();
488    let now_iso = Utc::now().to_rfc3339();
489    let available_iso = available_at.to_rfc3339();
490
491    if let Some(idem) = idempotency_key {
492        // Idempotent insert: skip if a pending/claimed row with the same key exists.
493        let (p1, p2, p3, p4, p5, p6, p7, p8) = (
494            ph(backend, 1),
495            ph(backend, 2),
496            ph(backend, 3),
497            ph(backend, 4),
498            ph(backend, 5),
499            ph(backend, 6),
500            ph(backend, 7),
501            ph(backend, 8),
502        );
503        let sql = format!(
504            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
505             idempotency_key, tenant_id, available_at, created_at) \
506             SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
507             WHERE NOT EXISTS ( \
508               SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
509               AND status IN ('pending','claimed') \
510             )"
511        );
512        let stmt = Statement::from_sql_and_values(
513            backend,
514            &sql,
515            [
516                Value::String(Some(Box::new(queue.to_string()))),
517                Value::String(Some(Box::new(job_type.to_string()))),
518                Value::String(Some(Box::new(payload.to_string()))),
519                Value::Int(Some(max_retries as i32)),
520                Value::String(Some(Box::new(idem.to_string()))),
521                tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
522                Value::String(Some(Box::new(available_iso))),
523                Value::String(Some(Box::new(now_iso))),
524            ],
525        );
526        conn.execute(stmt).await.map_err(Error::Db)?;
527    } else {
528        // Plain insert — no deduplication guard.
529        let (p1, p2, p3, p4, p5, p6, p7) = (
530            ph(backend, 1),
531            ph(backend, 2),
532            ph(backend, 3),
533            ph(backend, 4),
534            ph(backend, 5),
535            ph(backend, 6),
536            ph(backend, 7),
537        );
538        let sql = format!(
539            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
540             tenant_id, available_at, created_at) \
541             VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
542        );
543        let stmt = Statement::from_sql_and_values(
544            backend,
545            &sql,
546            [
547                Value::String(Some(Box::new(queue.to_string()))),
548                Value::String(Some(Box::new(job_type.to_string()))),
549                Value::String(Some(Box::new(payload.to_string()))),
550                Value::Int(Some(max_retries as i32)),
551                tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
552                Value::String(Some(Box::new(available_iso))),
553                Value::String(Some(Box::new(now_iso))),
554            ],
555        );
556        conn.execute(stmt).await.map_err(Error::Db)?;
557    }
558
559    Ok(())
560}
561
562// ---------------------------------------------------------------------------
563// Lifecycle operations
564// ---------------------------------------------------------------------------
565
566/// Delete a successfully-completed job row (D-04 delete-on-success).
567pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
568    let backend = conn.get_database_backend();
569    let p1 = ph(backend, 1);
570    let sql = format!("DELETE FROM jobs WHERE id = {p1}");
571    let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
572    conn.execute(stmt).await.map_err(Error::Db)?;
573    Ok(())
574}
575
576/// Park a job as `failed` with an error message, recording the failure time.
577pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
578    let backend = conn.get_database_backend();
579    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
580    let sql =
581        format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
582    let stmt = Statement::from_sql_and_values(
583        backend,
584        &sql,
585        [
586            Value::String(Some(Box::new(error.to_string()))),
587            Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
588            Value::BigInt(Some(id)),
589        ],
590    );
591    conn.execute(stmt).await.map_err(Error::Db)?;
592    Ok(())
593}
594
595/// Reset a job to `pending`, bump its attempt count, and set a new
596/// `available_at` (used by the worker after a retryable failure).
597pub async fn release_job(
598    conn: &DatabaseConnection,
599    id: i64,
600    attempts: u32,
601    available_at: DateTime<Utc>,
602) -> Result<(), Error> {
603    let backend = conn.get_database_backend();
604    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
605    let sql = format!(
606        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
607         attempts={p1}, available_at={p2} WHERE id = {p3}"
608    );
609    let stmt = Statement::from_sql_and_values(
610        backend,
611        &sql,
612        [
613            Value::Int(Some(attempts as i32)),
614            Value::String(Some(Box::new(available_at.to_rfc3339()))),
615            Value::BigInt(Some(id)),
616        ],
617    );
618    conn.execute(stmt).await.map_err(Error::Db)?;
619    Ok(())
620}
621
622/// Reset all jobs claimed by `worker_id` back to `pending` (D-10 shutdown
623/// re-queue — called by the worker loop before the process exits).
624pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
625    let backend = conn.get_database_backend();
626    let p1 = ph(backend, 1);
627    let sql = format!(
628        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
629         WHERE status='claimed' AND claimed_by={p1}"
630    );
631    let stmt = Statement::from_sql_and_values(
632        backend,
633        &sql,
634        [Value::String(Some(Box::new(worker_id.to_string())))],
635    );
636    conn.execute(stmt).await.map_err(Error::Db)?;
637    Ok(())
638}
639
640/// Park leftover `claimed` rows on `queues` as `failed` at worker startup.
641///
642/// Closes the orphan-claim window that the visibility-timeout reaper leaves
643/// open: when a worker is killed mid-job (SIGKILL from a deploy restart, OOM),
644/// its claimed rows stay `claimed` until `visibility_timeout` elapses
645/// (default 300s). Consumers reading `jobs.status` see those rows as Active
646/// for the full timeout window — UI labels like "in progress" stick until the
647/// row is reaped or manually cleared.
648///
649/// Called once per `WorkerLoop::run` before the claim loop starts. Operates
650/// only on the queues this worker handles, so concurrent workers on disjoint
651/// queues never clobber each other. Assumes the single-worker-per-queue model
652/// the rest of the worker is built around — multi-worker-per-queue setups
653/// should rely on `visibility_timeout` instead.
654///
655/// Returns the number of rows reaped.
656pub async fn reap_startup_claims(
657    conn: &DatabaseConnection,
658    queues: &[String],
659) -> Result<u64, Error> {
660    if queues.is_empty() {
661        return Ok(0);
662    }
663    let backend = conn.get_database_backend();
664    let now_iso = Utc::now().to_rfc3339();
665
666    // Numbered placeholders: $1..$N for queue names, $N+1 for failed_at.
667    let queue_phs: Vec<String> = (1..=queues.len()).map(|i| ph(backend, i)).collect();
668    let ts_ph = ph(backend, queues.len() + 1);
669    let sql = format!(
670        "UPDATE jobs SET status='failed', \
671         error='reaped on worker startup (orphan claim from previous worker)', \
672         failed_at={ts_ph} \
673         WHERE status='claimed' AND queue IN ({})",
674        queue_phs.join(", "),
675    );
676
677    let mut values: Vec<Value> = queues
678        .iter()
679        .map(|q| Value::String(Some(Box::new(q.clone()))))
680        .collect();
681    values.push(Value::String(Some(Box::new(now_iso))));
682
683    let stmt = Statement::from_sql_and_values(backend, &sql, values);
684    let result = conn.execute(stmt).await.map_err(Error::Db)?;
685    Ok(result.rows_affected())
686}
687
688// ---------------------------------------------------------------------------
689// Introspection / stat queries
690// ---------------------------------------------------------------------------
691
692/// Return up to `limit` pending (immediately eligible) jobs in `queue`.
693pub async fn get_pending_jobs(
694    conn: &DatabaseConnection,
695    queue: &str,
696    limit: u64,
697) -> Result<Vec<JobInfo>, Error> {
698    let backend = conn.get_database_backend();
699    let now_iso = Utc::now().to_rfc3339();
700    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
701    let sql = format!(
702        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
703         FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
704         ORDER BY id LIMIT {p3}"
705    );
706    let stmt = Statement::from_sql_and_values(
707        backend,
708        &sql,
709        [
710            Value::String(Some(Box::new(queue.to_string()))),
711            Value::String(Some(Box::new(now_iso))),
712            Value::BigInt(Some(limit as i64)),
713        ],
714    );
715    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
716    rows.iter()
717        .map(|r| parse_job_info(r, JobState::Pending))
718        .collect()
719}
720
721/// Return up to `limit` delayed (not-yet-eligible) jobs in `queue`.
722pub async fn get_delayed_jobs(
723    conn: &DatabaseConnection,
724    queue: &str,
725    limit: u64,
726) -> Result<Vec<JobInfo>, Error> {
727    let backend = conn.get_database_backend();
728    let now_iso = Utc::now().to_rfc3339();
729    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
730    let sql = format!(
731        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
732         FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
733         ORDER BY id LIMIT {p3}"
734    );
735    let stmt = Statement::from_sql_and_values(
736        backend,
737        &sql,
738        [
739            Value::String(Some(Box::new(queue.to_string()))),
740            Value::String(Some(Box::new(now_iso))),
741            Value::BigInt(Some(limit as i64)),
742        ],
743    );
744    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
745    rows.iter()
746        .map(|r| parse_job_info(r, JobState::Delayed))
747        .collect()
748}
749
750/// Return up to `limit` failed jobs (across all queues).
751pub async fn get_failed_jobs(
752    conn: &DatabaseConnection,
753    limit: u64,
754) -> Result<Vec<FailedJobInfo>, Error> {
755    let backend = conn.get_database_backend();
756    let p1 = ph(backend, 1);
757    // Order by failure time, falling back to created_at for any legacy row that
758    // predates the failed_at column (WR-06).
759    let sql = format!(
760        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
761         error, failed_at FROM jobs WHERE status='failed' \
762         ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
763    );
764    let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
765    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
766    rows.iter().map(parse_failed_job_info).collect()
767}
768
769/// Return aggregate pending/delayed counts per queue and the total failed count.
770pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
771    let backend = conn.get_database_backend();
772    let now_iso = Utc::now().to_rfc3339();
773    let mut queue_stats = Vec::new();
774
775    for &q in queues {
776        let p1 = ph(backend, 1);
777        let p2 = ph(backend, 2);
778        let p3 = ph(backend, 3);
779        let pending_sql = format!(
780            "SELECT COUNT(*) as cnt FROM jobs \
781             WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
782        );
783        let pending_stmt = Statement::from_sql_and_values(
784            backend,
785            &pending_sql,
786            [
787                Value::String(Some(Box::new(q.to_string()))),
788                Value::String(Some(Box::new(now_iso.clone()))),
789            ],
790        );
791        let pending_row = conn
792            .query_one(pending_stmt)
793            .await
794            .map_err(Error::Db)?
795            .ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
796        let pending: i64 = pending_row
797            .try_get_by::<i64, _>("cnt")
798            .map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
799
800        let delayed_sql = format!(
801            "SELECT COUNT(*) as cnt FROM jobs \
802             WHERE status='pending' AND queue={p1} AND available_at > {p3}"
803        );
804        let delayed_stmt = Statement::from_sql_and_values(
805            backend,
806            &delayed_sql,
807            [
808                Value::String(Some(Box::new(q.to_string()))),
809                Value::String(Some(Box::new(now_iso.clone()))),
810            ],
811        );
812        let delayed_row = conn
813            .query_one(delayed_stmt)
814            .await
815            .map_err(Error::Db)?
816            .ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
817        let delayed: i64 = delayed_row
818            .try_get_by::<i64, _>("cnt")
819            .map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
820
821        queue_stats.push(SingleQueueStats {
822            name: q.to_string(),
823            pending: pending as usize,
824            delayed: delayed as usize,
825        });
826    }
827
828    // Total failed across all queues.
829    let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
830    let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
831    let failed_row = conn
832        .query_one(failed_stmt)
833        .await
834        .map_err(Error::Db)?
835        .ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
836    let total_failed: i64 = failed_row
837        .try_get_by::<i64, _>("cnt")
838        .map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
839
840    Ok(QueueStats {
841        queues: queue_stats,
842        total_failed: total_failed as usize,
843    })
844}
845
846// ---------------------------------------------------------------------------
847// Internal parse helpers for introspection types
848// ---------------------------------------------------------------------------
849
850fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
851    let id: i64 = row
852        .try_get_by::<i64, _>("id")
853        .map_err(|e| Error::custom(format!("parse id: {e}")))?;
854    let job_type: String = row
855        .try_get_by::<String, _>("job_type")
856        .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
857    let queue: String = row
858        .try_get_by::<String, _>("queue")
859        .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
860    let attempts: i32 = row
861        .try_get_by::<i32, _>("attempts")
862        .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
863    let max_retries: i32 = row
864        .try_get_by::<i32, _>("max_retries")
865        .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
866    let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
867    let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
868
869    Ok(JobInfo {
870        id,
871        job_type,
872        queue,
873        attempts: attempts as u32,
874        max_retries: max_retries as u32,
875        created_at,
876        available_at,
877        state,
878    })
879}
880
881fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
882    let job = parse_job_info(row, JobState::Failed)?;
883    let error: String = row
884        .try_get_by::<Option<String>, _>("error")
885        .map_err(|e| Error::custom(format!("parse error: {e}")))?
886        .unwrap_or_default();
887    // Prefer the recorded failure time; fall back to created_at for legacy rows
888    // parked before the failed_at column existed (WR-06).
889    let failed_at =
890        parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
891
892    Ok(FailedJobInfo {
893        job,
894        error,
895        failed_at,
896    })
897}
898
899// ---------------------------------------------------------------------------
900// Tests
901// ---------------------------------------------------------------------------
902
903#[cfg(test)]
904mod tests {
905    use super::*;
906    use sea_orm::Database;
907    use sea_orm_migration::MigratorTrait;
908
909    struct TestMigrator;
910
911    #[async_trait::async_trait]
912    impl MigratorTrait for TestMigrator {
913        fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
914            vec![Box::new(crate::migration::CreateJobsTable)]
915        }
916    }
917
918    /// Spin up an in-memory SQLite DB and run the jobs migration.
919    async fn setup() -> DatabaseConnection {
920        let conn = Database::connect("sqlite::memory:")
921            .await
922            .expect("connect sqlite::memory:");
923        TestMigrator::up(&conn, None)
924            .await
925            .expect("run CreateJobsTable migration");
926        conn
927    }
928
929    /// Insert a job row directly (bypassing enqueue) for test setup.
930    #[allow(clippy::too_many_arguments)]
931    async fn insert_job(
932        conn: &DatabaseConnection,
933        queue: &str,
934        job_type: &str,
935        status: &str,
936        attempts: i32,
937        max_retries: i32,
938        claimed_at: Option<&str>,
939        available_at: &str,
940    ) -> i64 {
941        let now = Utc::now().to_rfc3339();
942        let claimed_at_sql = match claimed_at {
943            Some(ts) => format!("'{ts}'"),
944            None => "NULL".to_string(),
945        };
946        let sql = format!(
947            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
948             available_at, claimed_at, created_at) \
949             VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
950             '{available_at}', {claimed_at_sql}, '{now}') \
951             RETURNING id"
952        );
953        let row = conn
954            .query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
955            .await
956            .expect("insert_job query")
957            .expect("insert_job row");
958        row.try_get_by::<i64, _>("id").expect("insert id")
959    }
960
961    #[tokio::test]
962    async fn claim_returns_pending_job() {
963        let conn = setup().await;
964        let now = Utc::now().to_rfc3339();
965
966        // Enqueue one job via direct INSERT.
967        insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
968
969        // First claim should return the job.
970        let job = claim(&conn, "default", "worker-1")
971            .await
972            .expect("claim failed");
973        assert!(job.is_some(), "expected Some(job), got None");
974        let job = job.unwrap();
975        assert_eq!(job.job_type, "MyJob");
976
977        // Second claim on the same queue should return None (job is now claimed).
978        let second = claim(&conn, "default", "worker-2")
979            .await
980            .expect("second claim failed");
981        assert!(second.is_none(), "second claim should return None");
982    }
983
984    #[tokio::test]
985    async fn idempotency_dedup() {
986        let conn = setup().await;
987        let now = Utc::now().to_rfc3339();
988        let available_at = DateTime::parse_from_rfc3339(&now)
989            .unwrap()
990            .with_timezone(&Utc);
991
992        // Enqueue with an idempotency key twice.
993        enqueue(
994            &conn,
995            "default",
996            "MyJob",
997            "{}",
998            3,
999            Some("key-abc"),
1000            None,
1001            available_at,
1002        )
1003        .await
1004        .expect("first enqueue");
1005        enqueue(
1006            &conn,
1007            "default",
1008            "MyJob",
1009            "{}",
1010            3,
1011            Some("key-abc"),
1012            None,
1013            available_at,
1014        )
1015        .await
1016        .expect("second enqueue (should be a no-op)");
1017
1018        // Verify COUNT = 1.
1019        let row = conn
1020            .query_one(Statement::from_string(
1021                DatabaseBackend::Sqlite,
1022                "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
1023            ))
1024            .await
1025            .expect("count query")
1026            .expect("count row");
1027        let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
1028        assert_eq!(
1029            cnt, 1,
1030            "idempotency key should deduplicate (expected 1 row)"
1031        );
1032
1033        // Enqueue without idempotency key twice — both must insert.
1034        enqueue(
1035            &conn,
1036            "default",
1037            "OtherJob",
1038            "{}",
1039            3,
1040            None,
1041            None,
1042            available_at,
1043        )
1044        .await
1045        .expect("first plain enqueue");
1046        enqueue(
1047            &conn,
1048            "default",
1049            "OtherJob",
1050            "{}",
1051            3,
1052            None,
1053            None,
1054            available_at,
1055        )
1056        .await
1057        .expect("second plain enqueue");
1058
1059        let row2 = conn
1060            .query_one(Statement::from_string(
1061                DatabaseBackend::Sqlite,
1062                "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
1063            ))
1064            .await
1065            .expect("count query 2")
1066            .expect("count row 2");
1067        let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
1068        assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
1069    }
1070
1071    #[tokio::test]
1072    async fn reaper_reclaims_stuck_job() {
1073        let conn = setup().await;
1074
1075        // Insert a claimed job with claimed_at 10 minutes ago, attempts=0, max_retries=3.
1076        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1077        let now = Utc::now().to_rfc3339();
1078        let id = insert_job(
1079            &conn,
1080            "default",
1081            "StuckJob",
1082            "claimed",
1083            0,
1084            3,
1085            Some(&ten_min_ago),
1086            &now,
1087        )
1088        .await;
1089
1090        // Run reaper with 5-minute visibility timeout.
1091        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1092            .await
1093            .expect("reaper failed");
1094
1095        // Verify status = 'pending' and attempts = 1.
1096        let row = conn
1097            .query_one(Statement::from_string(
1098                DatabaseBackend::Sqlite,
1099                format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1100            ))
1101            .await
1102            .expect("select after reaper")
1103            .expect("row after reaper");
1104
1105        let status: String = row.try_get_by::<String, _>("status").expect("status");
1106        let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1107        assert_eq!(
1108            status, "pending",
1109            "reaper should reset stuck job to pending"
1110        );
1111        assert_eq!(attempts, 1, "reaper should increment attempts");
1112    }
1113
1114    #[tokio::test]
1115    async fn reaper_boundary_parks_last_attempt() {
1116        // attempts == max_retries - 1: the in-flight (timed-out) attempt is the
1117        // last allowed, so the reaper must PARK it, not requeue — matching the
1118        // worker's handle_failure boundary (WR-01).
1119        let conn = setup().await;
1120        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1121        let now = Utc::now().to_rfc3339();
1122        let id = insert_job(
1123            &conn,
1124            "default",
1125            "BoundaryJob",
1126            "claimed",
1127            2, // attempts
1128            3, // max_retries
1129            Some(&ten_min_ago),
1130            &now,
1131        )
1132        .await;
1133
1134        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1135            .await
1136            .expect("reaper failed");
1137
1138        let row = conn
1139            .query_one(Statement::from_string(
1140                DatabaseBackend::Sqlite,
1141                format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1142            ))
1143            .await
1144            .expect("select after reaper")
1145            .expect("row");
1146        let status: String = row.try_get_by::<String, _>("status").expect("status");
1147        let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1148        assert_eq!(
1149            status, "failed",
1150            "job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
1151        );
1152        assert_eq!(
1153            attempts, 2,
1154            "parked job keeps its attempt count (no further requeue)"
1155        );
1156    }
1157
1158    #[tokio::test]
1159    async fn reap_startup_claims_marks_orphans_failed() {
1160        // A worker killed mid-job leaves rows at status='claimed'. On the next
1161        // worker boot, reap_startup_claims must park them as failed so consumers
1162        // reading jobs.status no longer see them as Active.
1163        let conn = setup().await;
1164        let now = Utc::now().to_rfc3339();
1165
1166        // One orphan on "default", one orphan on "publish", one on an untouched
1167        // queue, plus a pending row that must NOT be reaped.
1168        let orphan_default = insert_job(
1169            &conn,
1170            "default",
1171            "Orphan1",
1172            "claimed",
1173            0,
1174            3,
1175            Some(&now),
1176            &now,
1177        )
1178        .await;
1179        let orphan_publish = insert_job(
1180            &conn,
1181            "publish",
1182            "Orphan2",
1183            "claimed",
1184            1,
1185            3,
1186            Some(&now),
1187            &now,
1188        )
1189        .await;
1190        let orphan_other =
1191            insert_job(&conn, "other", "Orphan3", "claimed", 0, 3, Some(&now), &now).await;
1192        let pending_default =
1193            insert_job(&conn, "default", "Fresh", "pending", 0, 3, None, &now).await;
1194
1195        // Worker handles "default" and "publish" — must reap exactly those orphans.
1196        let reaped = reap_startup_claims(&conn, &["default".to_string(), "publish".to_string()])
1197            .await
1198            .expect("reap_startup_claims failed");
1199        assert_eq!(
1200            reaped, 2,
1201            "expected 2 orphan rows reaped (default + publish)"
1202        );
1203
1204        // The two scoped orphans are now failed with a non-null error + failed_at.
1205        for id in [orphan_default, orphan_publish] {
1206            let row = conn
1207                .query_one(Statement::from_string(
1208                    DatabaseBackend::Sqlite,
1209                    format!("SELECT status, error, failed_at FROM jobs WHERE id={id}"),
1210                ))
1211                .await
1212                .expect("select after reap")
1213                .expect("row after reap");
1214            let status: String = row.try_get_by::<String, _>("status").expect("status");
1215            let error: Option<String> =
1216                row.try_get_by::<Option<String>, _>("error").expect("error");
1217            let failed_at: Option<String> = row
1218                .try_get_by::<Option<String>, _>("failed_at")
1219                .expect("failed_at");
1220            assert_eq!(status, "failed", "orphan must be parked as failed");
1221            assert!(
1222                error.as_deref().unwrap_or("").contains("orphan claim"),
1223                "error must record reap reason, got: {error:?}"
1224            );
1225            assert!(failed_at.is_some(), "failed_at must be stamped");
1226        }
1227
1228        // The untouched queue's orphan is left alone (not in this worker's queues).
1229        let row = conn
1230            .query_one(Statement::from_string(
1231                DatabaseBackend::Sqlite,
1232                format!("SELECT status FROM jobs WHERE id={orphan_other}"),
1233            ))
1234            .await
1235            .expect("select untouched")
1236            .expect("row untouched");
1237        let status: String = row.try_get_by::<String, _>("status").expect("status");
1238        assert_eq!(
1239            status, "claimed",
1240            "orphan on a queue not handled by this worker must be left alone"
1241        );
1242
1243        // The fresh pending row is untouched.
1244        let row = conn
1245            .query_one(Statement::from_string(
1246                DatabaseBackend::Sqlite,
1247                format!("SELECT status FROM jobs WHERE id={pending_default}"),
1248            ))
1249            .await
1250            .expect("select pending")
1251            .expect("row pending");
1252        let status: String = row.try_get_by::<String, _>("status").expect("status");
1253        assert_eq!(status, "pending", "pending row must not be reaped");
1254    }
1255
1256    #[tokio::test]
1257    async fn reap_startup_claims_empty_queues_is_noop() {
1258        // Defensive: an empty queue list (no queues configured) must short-circuit
1259        // without building an invalid `IN ()` SQL clause.
1260        let conn = setup().await;
1261        let now = Utc::now().to_rfc3339();
1262        insert_job(
1263            &conn,
1264            "default",
1265            "Orphan",
1266            "claimed",
1267            0,
1268            3,
1269            Some(&now),
1270            &now,
1271        )
1272        .await;
1273
1274        let reaped = reap_startup_claims(&conn, &[])
1275            .await
1276            .expect("reap on empty queues must succeed");
1277        assert_eq!(reaped, 0);
1278    }
1279
1280    #[tokio::test]
1281    async fn poison_job_parked() {
1282        let conn = setup().await;
1283
1284        // Insert an exhausted job (attempts == max_retries) claimed 10 min ago.
1285        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1286        let now = Utc::now().to_rfc3339();
1287        let id = insert_job(
1288            &conn,
1289            "default",
1290            "PoisonJob",
1291            "claimed",
1292            3,
1293            3,
1294            Some(&ten_min_ago),
1295            &now,
1296        )
1297        .await;
1298
1299        // Run reaper.
1300        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1301            .await
1302            .expect("reaper failed");
1303
1304        // Verify status = 'failed' and error IS NOT NULL.
1305        let row = conn
1306            .query_one(Statement::from_string(
1307                DatabaseBackend::Sqlite,
1308                format!("SELECT status, error FROM jobs WHERE id={id}"),
1309            ))
1310            .await
1311            .expect("select after reaper")
1312            .expect("row");
1313
1314        let status: String = row.try_get_by::<String, _>("status").expect("status");
1315        let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
1316        assert_eq!(status, "failed", "exhausted job should be parked as failed");
1317        assert!(error.is_some(), "failed job should have an error message");
1318
1319        // A fresh pending job should still be claimable (parked row does not block).
1320        let available = Utc::now().to_rfc3339();
1321        insert_job(
1322            &conn, "default", "FreshJob", "pending", 0, 3, None, &available,
1323        )
1324        .await;
1325
1326        let claimed = claim(&conn, "default", "worker-1")
1327            .await
1328            .expect("claim after poison park");
1329        assert!(
1330            claimed.is_some(),
1331            "fresh job should be claimable after poison job is parked"
1332        );
1333        let claimed = claimed.unwrap();
1334        assert_eq!(claimed.job_type, "FreshJob");
1335    }
1336}