Skip to main content

ferro_queue/
db.rs

1//! DB engine for ferro-queue: dual-backend atomic claim, idempotent enqueue,
2//! stuck-job reaper, job lifecycle ops, and the `Queue` global.
3//!
4//! Supports SQLite (single `UPDATE … RETURNING` inside a pinned transaction)
5//! and Postgres (`FOR UPDATE SKIP LOCKED` inside a transaction). All dynamic values are
6//! bound via `Statement::from_sql_and_values` — no string interpolation of
7//! caller-supplied data (T-185-01).
8
9use chrono::{DateTime, Utc};
10use sea_orm::{
11    ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
12};
13use serde::{Deserialize, Serialize};
14
15use crate::error::Error;
16
17// ---------------------------------------------------------------------------
18// Queue global
19// ---------------------------------------------------------------------------
20
21static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
22
23/// Registered job-type applier functions, collected before the server starts.
24type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
25static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
26
27/// Global handle to the queue's database connection.
28///
29/// Initialise once at application start with [`Queue::init`]; worker loops
30/// and dispatcher call [`Queue::connection`] to get the static reference.
31pub struct Queue;
32
33impl Queue {
34    /// Return a reference to the global `DatabaseConnection`.
35    ///
36    /// # Panics
37    ///
38    /// Panics if [`Queue::init`] has not been called yet.
39    pub fn connection() -> &'static DatabaseConnection {
40        GLOBAL_CONNECTION
41            .get()
42            .expect("Queue not initialized. Call Queue::init() first.")
43    }
44
45    /// Store `conn` as the global connection.
46    ///
47    /// Returns `Err` if called more than once.
48    pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
49        GLOBAL_CONNECTION
50            .set(conn)
51            .map_err(|_| Error::custom("Queue already initialized"))?;
52        Ok(())
53    }
54
55    /// Returns `true` if [`Queue::init`] has been called.
56    pub fn is_initialized() -> bool {
57        GLOBAL_CONNECTION.get().is_some()
58    }
59
60    /// Register a job type for auto-start by the framework's WorkerLoop.
61    ///
62    /// Call this in your application bootstrap before the server starts.
63    /// The framework's server boot path inspects [`Queue::has_registered_jobs`]
64    /// and spawns a `WorkerLoop` automatically when at least one type is registered.
65    pub fn register<J>()
66    where
67        J: crate::Job + serde::de::DeserializeOwned + 'static,
68    {
69        JOB_REGISTRARS
70            .lock()
71            .unwrap()
72            .push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
73    }
74
75    /// Returns `true` if at least one job type has been registered via [`Queue::register`].
76    pub fn has_registered_jobs() -> bool {
77        !JOB_REGISTRARS.lock().unwrap().is_empty()
78    }
79
80    /// Apply all registered job types to the given `WorkerLoop`.
81    ///
82    /// Used internally by [`WorkerLoop::from_registry`].
83    pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
84        for r in JOB_REGISTRARS.lock().unwrap().iter() {
85            r(w);
86        }
87    }
88}
89
90// ---------------------------------------------------------------------------
91// JobRow — a claimed row from the jobs table
92// ---------------------------------------------------------------------------
93
94/// A row read from the `jobs` table during a claim operation.
95#[derive(Debug, Clone)]
96pub struct JobRow {
97    /// Primary key.
98    pub id: i64,
99    /// Fully-qualified type name of the job (used to route to the correct handler).
100    pub job_type: String,
101    /// JSON-serialized job data.
102    pub payload: String,
103    /// Name of the queue this job belongs to.
104    pub queue: String,
105    /// Number of execution attempts so far.
106    pub attempts: u32,
107    /// Maximum attempts before the job is parked as failed.
108    pub max_retries: u32,
109    /// Optional deduplication key.
110    pub idempotency_key: Option<String>,
111    /// Optional tenant scope.
112    pub tenant_id: Option<i64>,
113    /// Earliest time the job may be claimed.
114    pub available_at: DateTime<Utc>,
115    /// Insertion timestamp.
116    pub created_at: DateTime<Utc>,
117}
118
119// ---------------------------------------------------------------------------
120// Introspection types (moved from queue.rs)
121// ---------------------------------------------------------------------------
122
123/// Job status for introspection queries.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum JobState {
127    /// Waiting to be claimed.
128    Pending,
129    /// Scheduled for a future time.
130    Delayed,
131    /// Permanently failed after exhausting retries.
132    Failed,
133}
134
135/// Summary of a single job for introspection.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct JobInfo {
138    /// Primary key.
139    pub id: i64,
140    /// Fully-qualified job type name.
141    pub job_type: String,
142    /// Queue name.
143    pub queue: String,
144    /// Execution attempt count.
145    pub attempts: u32,
146    /// Maximum retry count.
147    pub max_retries: u32,
148    /// Insertion timestamp (RFC 3339).
149    pub created_at: String,
150    /// Earliest eligible claim time (RFC 3339).
151    pub available_at: String,
152    /// Logical state.
153    pub state: JobState,
154}
155
156/// Per-queue pending/delayed counts.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct SingleQueueStats {
159    /// Queue name.
160    pub name: String,
161    /// Jobs with `status='pending'` and `available_at <= now`.
162    pub pending: usize,
163    /// Jobs with `status='pending'` and `available_at > now`.
164    pub delayed: usize,
165}
166
167/// Aggregate stats across all queues.
168#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169pub struct QueueStats {
170    /// Per-queue breakdown.
171    pub queues: Vec<SingleQueueStats>,
172    /// Total jobs with `status='failed'`.
173    pub total_failed: usize,
174}
175
176/// A failed job with the error message.
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FailedJobInfo {
179    /// Core job fields.
180    pub job: JobInfo,
181    /// Error message stored at failure time.
182    pub error: String,
183    /// When the job was parked as failed (recorded in the `failed_at` column;
184    /// falls back to `created_at` only for legacy rows that predate the column).
185    pub failed_at: DateTime<Utc>,
186}
187
188// ---------------------------------------------------------------------------
189// Internal helpers
190// ---------------------------------------------------------------------------
191
192/// Map a `QueryResult` row to a `JobRow`.
193fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
194    let id: i64 = row
195        .try_get_by::<i64, _>("id")
196        .map_err(|e| Error::custom(format!("parse id: {e}")))?;
197    let job_type: String = row
198        .try_get_by::<String, _>("job_type")
199        .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
200    let payload: String = row
201        .try_get_by::<String, _>("payload")
202        .map_err(|e| Error::custom(format!("parse payload: {e}")))?;
203    let queue: String = row
204        .try_get_by::<String, _>("queue")
205        .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
206    let attempts: i32 = row
207        .try_get_by::<i32, _>("attempts")
208        .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
209    let max_retries: i32 = row
210        .try_get_by::<i32, _>("max_retries")
211        .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
212    let idempotency_key: Option<String> = row
213        .try_get_by::<Option<String>, _>("idempotency_key")
214        .map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
215    let tenant_id: Option<i64> = row
216        .try_get_by::<Option<i64>, _>("tenant_id")
217        .map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
218
219    // available_at / created_at — SQLite stores as ISO-8601 text, Postgres as timestamptz.
220    // SeaORM maps both to DateTime<Utc> when the column is timestamp_with_time_zone; on
221    // SQLite we fall back to string parsing if the native mapping fails.
222    let available_at = parse_timestamp(row, "available_at")?;
223    let created_at = parse_timestamp(row, "created_at")?;
224
225    Ok(JobRow {
226        id,
227        job_type,
228        payload,
229        queue,
230        attempts: attempts as u32,
231        max_retries: max_retries as u32,
232        idempotency_key,
233        tenant_id,
234        available_at,
235        created_at,
236    })
237}
238
239/// Parse a timestamp column that may arrive as `DateTime<Utc>` (Postgres) or
240/// as an ISO-8601 string (SQLite).
241fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
242    // Try native DateTime<Utc> first (Postgres timestamptz).
243    if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
244        return Ok(dt);
245    }
246    // Fall back to string parsing (SQLite TEXT column).
247    let s: String = row
248        .try_get_by::<String, _>(col)
249        .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
250    DateTime::parse_from_rfc3339(&s)
251        .map(|dt| dt.with_timezone(&Utc))
252        .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
253}
254
255/// Parse a nullable timestamp column. Returns `Ok(None)` when the column is
256/// SQL NULL, mirroring `parse_timestamp`'s Postgres-then-SQLite fallback.
257fn parse_optional_timestamp(
258    row: &sea_orm::QueryResult,
259    col: &str,
260) -> Result<Option<DateTime<Utc>>, Error> {
261    // Native Option<DateTime<Utc>> first (Postgres timestamptz).
262    if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
263        return Ok(opt);
264    }
265    // Fall back to Option<String> (SQLite TEXT column).
266    let s: Option<String> = row
267        .try_get_by::<Option<String>, _>(col)
268        .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
269    match s {
270        None => Ok(None),
271        Some(s) => DateTime::parse_from_rfc3339(&s)
272            .map(|dt| Some(dt.with_timezone(&Utc)))
273            .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
274    }
275}
276
277/// SQL placeholder style: Postgres uses `$N`, SQLite uses `?N`.
278fn ph(backend: DatabaseBackend, n: usize) -> String {
279    match backend {
280        DatabaseBackend::Postgres => format!("${n}"),
281        _ => format!("?{n}"),
282    }
283}
284
285/// Placeholder for a value bound to a `timestamp_with_time_zone` column.
286///
287/// ferro-queue binds every timestamp as RFC 3339 *text* (`Value::String`),
288/// which is correct for SQLite, where the columns have TEXT affinity and are
289/// compared lexically. Postgres, however, types those columns as real
290/// `timestamptz` and performs NO implicit text↔timestamptz coercion: a bare
291/// `$N` then yields `operator does not exist: timestamp with time zone < text`
292/// in a comparison and `column "failed_at" is timestamp with time zone but
293/// expression is text` in an assignment. Casting the bound text to
294/// `timestamptz` resolves both (Postgres accepts the ISO-8601/RFC 3339 form).
295/// SQLite has no `::` cast operator and needs none, so it keeps the bare
296/// placeholder. Use this instead of [`ph`] at any placeholder that lands on a
297/// timestamp column (comparison RHS, SET/INSERT value).
298fn ts_ph(backend: DatabaseBackend, n: usize) -> String {
299    match backend {
300        DatabaseBackend::Postgres => format!("${n}::timestamptz"),
301        _ => format!("?{n}"),
302    }
303}
304
305// ---------------------------------------------------------------------------
306// claim — atomic work-stealing claim (dual-backend)
307// ---------------------------------------------------------------------------
308
309/// Atomically claim one pending job from `queue`.
310///
311/// Returns `None` when the queue is empty or all eligible jobs are locked by
312/// another worker (Postgres). Uses:
313/// - Postgres: `SELECT … FOR UPDATE SKIP LOCKED` + `UPDATE` inside a transaction.
314/// - SQLite: single `UPDATE … RETURNING` inside a `conn.begin()` transaction so
315///   every statement is pinned to one pooled connection; the write lock is taken
316///   on the UPDATE itself (no prior read to upgrade, so no `BEGIN IMMEDIATE`
317///   needed).
318pub async fn claim(
319    conn: &DatabaseConnection,
320    queue: &str,
321    worker_id: &str,
322) -> Result<Option<JobRow>, Error> {
323    match conn.get_database_backend() {
324        DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
325        DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
326        _ => Err(Error::UnsupportedBackend),
327    }
328}
329
330async fn claim_postgres(
331    conn: &DatabaseConnection,
332    queue: &str,
333    worker_id: &str,
334) -> Result<Option<JobRow>, Error> {
335    let txn = conn.begin().await.map_err(Error::Db)?;
336
337    let select = Statement::from_sql_and_values(
338        DatabaseBackend::Postgres,
339        "SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
340         tenant_id, available_at, created_at FROM jobs \
341         WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
342         ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
343        [Value::String(Some(Box::new(queue.to_string())))],
344    );
345
346    let row = txn.query_one(select).await.map_err(Error::Db)?;
347    let Some(row) = row else {
348        txn.commit().await.map_err(Error::Db)?;
349        return Ok(None);
350    };
351    let job = parse_job_row(&row)?;
352
353    let upd = Statement::from_sql_and_values(
354        DatabaseBackend::Postgres,
355        "UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
356        [
357            Value::BigInt(Some(job.id)),
358            Value::String(Some(Box::new(worker_id.to_string()))),
359        ],
360    );
361    txn.execute(upd).await.map_err(Error::Db)?;
362    txn.commit().await.map_err(Error::Db)?;
363
364    Ok(Some(job))
365}
366
367async fn claim_sqlite(
368    conn: &DatabaseConnection,
369    queue: &str,
370    worker_id: &str,
371) -> Result<Option<JobRow>, Error> {
372    let now_iso = Utc::now().to_rfc3339();
373
374    // Acquire a transaction handle: SeaORM `begin()` checks out ONE physical
375    // connection from the pool and pins every statement on this handle to it.
376    // This is the correctness-critical change (CR-01) — issuing BEGIN/UPDATE/
377    // COMMIT directly on `conn` let the statements land on different pooled
378    // connections, breaking atomicity and leaking an open transaction back into
379    // the pool on the error path.
380    //
381    // `begin()` emits a DEFERRED BEGIN on SQLite. We do not need an explicit
382    // BEGIN IMMEDIATE here: the claim is a single `UPDATE … RETURNING`, so the
383    // write lock is acquired on that first (and only) statement. There is no
384    // prior read in this txn that could trigger the deferred-read-then-upgrade
385    // deadlock that BEGIN IMMEDIATE exists to avoid. Atomicity is guaranteed by
386    // the statements being pinned to one connection inside the txn handle.
387    let txn = conn.begin().await.map_err(Error::Db)?;
388
389    let stmt = Statement::from_sql_and_values(
390        DatabaseBackend::Sqlite,
391        "UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
392         WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
393           AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
394         RETURNING id, job_type, payload, queue, attempts, max_retries, \
395           idempotency_key, tenant_id, available_at, created_at",
396        [
397            Value::String(Some(Box::new(now_iso))),
398            Value::String(Some(Box::new(worker_id.to_string()))),
399            Value::String(Some(Box::new(queue.to_string()))),
400        ],
401    );
402
403    let row = match txn.query_one(stmt).await {
404        Ok(r) => r,
405        Err(e) => {
406            // Roll back so the connection returns to the pool with no open txn.
407            let _ = txn.rollback().await;
408            return Err(Error::Db(e));
409        }
410    };
411    txn.commit().await.map_err(Error::Db)?;
412
413    row.map(|r| parse_job_row(&r)).transpose()
414}
415
416// ---------------------------------------------------------------------------
417// reaper — re-queue stuck claimed rows; park exhausted ones as failed
418// ---------------------------------------------------------------------------
419
420/// Re-queue claimed rows that have been held longer than `visibility_timeout`.
421///
422/// `attempts` is the number of attempts already completed. A row is requeued
423/// (for one more attempt) only while `attempts + 1 < max_retries`; once the
424/// next attempt would be the last allowed (`attempts + 1 >= max_retries`) the
425/// row is parked as `failed`. This matches the worker's handler-failure
426/// boundary (`handle_failure`: park when `attempts + 1 >= max_retries`) so a
427/// job gets the same total attempt count whether it fails via handler error or
428/// via visibility timeout (WR-01, T-185-04).
429pub async fn reaper(
430    conn: &DatabaseConnection,
431    queue: &str,
432    visibility_timeout: std::time::Duration,
433) -> Result<(), Error> {
434    let now = Utc::now();
435    let duration = chrono::Duration::from_std(visibility_timeout)
436        .map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
437    let cutoff = (now - duration).to_rfc3339();
438    let now_iso = now.to_rfc3339();
439
440    let backend = conn.get_database_backend();
441    let txn = conn.begin().await.map_err(Error::Db)?;
442
443    // Step 1: re-queue rows that still have a retry left after counting this
444    // timed-out attempt. The in-flight attempt (number `attempts + 1`) failed,
445    // so requeue only while `attempts + 1 < max_retries` — matching
446    // `worker::handle_failure`.
447    let (p1, p2, p3) = (ts_ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
448    let requeue_sql = format!(
449        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
450         attempts = attempts + 1, available_at = {p1} \
451         WHERE status='claimed' AND claimed_at < {p2} \
452         AND attempts + 1 < max_retries AND queue = {p3}"
453    );
454    let requeue = Statement::from_sql_and_values(
455        backend,
456        &requeue_sql,
457        [
458            Value::String(Some(Box::new(now_iso.clone()))),
459            Value::String(Some(Box::new(cutoff.clone()))),
460            Value::String(Some(Box::new(queue.to_string()))),
461        ],
462    );
463    txn.execute(requeue).await.map_err(Error::Db)?;
464
465    // Step 2: park exhausted rows as failed, recording the failure time.
466    // Use fresh placeholders (independent statement, not continuing p1..p3).
467    let (pp1, pp2, pp3) = (ts_ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
468    let park_sql = format!(
469        "UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
470         WHERE status='claimed' AND claimed_at < {pp2} \
471         AND attempts + 1 >= max_retries AND queue = {pp3}"
472    );
473    let park = Statement::from_sql_and_values(
474        backend,
475        &park_sql,
476        [
477            Value::String(Some(Box::new(now_iso))),
478            Value::String(Some(Box::new(cutoff))),
479            Value::String(Some(Box::new(queue.to_string()))),
480        ],
481    );
482    txn.execute(park).await.map_err(Error::Db)?;
483
484    txn.commit().await.map_err(Error::Db)
485}
486
487// ---------------------------------------------------------------------------
488// enqueue — idempotent insert
489// ---------------------------------------------------------------------------
490
491/// Insert a new job into the queue.
492///
493/// When `idempotency_key` is `Some`, skips the insert if a `pending` or
494/// `claimed` row with the same `(job_type, idempotency_key)` already exists
495/// (D-15, T-185-01).
496#[allow(clippy::too_many_arguments)]
497pub async fn enqueue(
498    conn: &DatabaseConnection,
499    queue: &str,
500    job_type: &str,
501    payload: &str,
502    max_retries: u32,
503    idempotency_key: Option<&str>,
504    tenant_id: Option<i64>,
505    available_at: DateTime<Utc>,
506) -> Result<(), Error> {
507    let backend = conn.get_database_backend();
508    let now_iso = Utc::now().to_rfc3339();
509    let available_iso = available_at.to_rfc3339();
510
511    if let Some(idem) = idempotency_key {
512        // Idempotent insert: skip if a pending/claimed row with the same key exists.
513        let (p1, p2, p3, p4, p5, p6, p7, p8) = (
514            ph(backend, 1),
515            ph(backend, 2),
516            ph(backend, 3),
517            ph(backend, 4),
518            ph(backend, 5),
519            ph(backend, 6),
520            ts_ph(backend, 7),
521            ts_ph(backend, 8),
522        );
523        let sql = format!(
524            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
525             idempotency_key, tenant_id, available_at, created_at) \
526             SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
527             WHERE NOT EXISTS ( \
528               SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
529               AND status IN ('pending','claimed') \
530             )"
531        );
532        let stmt = Statement::from_sql_and_values(
533            backend,
534            &sql,
535            [
536                Value::String(Some(Box::new(queue.to_string()))),
537                Value::String(Some(Box::new(job_type.to_string()))),
538                Value::String(Some(Box::new(payload.to_string()))),
539                Value::Int(Some(max_retries as i32)),
540                Value::String(Some(Box::new(idem.to_string()))),
541                tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
542                Value::String(Some(Box::new(available_iso))),
543                Value::String(Some(Box::new(now_iso))),
544            ],
545        );
546        conn.execute(stmt).await.map_err(Error::Db)?;
547    } else {
548        // Plain insert — no deduplication guard.
549        let (p1, p2, p3, p4, p5, p6, p7) = (
550            ph(backend, 1),
551            ph(backend, 2),
552            ph(backend, 3),
553            ph(backend, 4),
554            ph(backend, 5),
555            ts_ph(backend, 6),
556            ts_ph(backend, 7),
557        );
558        let sql = format!(
559            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
560             tenant_id, available_at, created_at) \
561             VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
562        );
563        let stmt = Statement::from_sql_and_values(
564            backend,
565            &sql,
566            [
567                Value::String(Some(Box::new(queue.to_string()))),
568                Value::String(Some(Box::new(job_type.to_string()))),
569                Value::String(Some(Box::new(payload.to_string()))),
570                Value::Int(Some(max_retries as i32)),
571                tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
572                Value::String(Some(Box::new(available_iso))),
573                Value::String(Some(Box::new(now_iso))),
574            ],
575        );
576        conn.execute(stmt).await.map_err(Error::Db)?;
577    }
578
579    Ok(())
580}
581
582// ---------------------------------------------------------------------------
583// Lifecycle operations
584// ---------------------------------------------------------------------------
585
586/// Delete a successfully-completed job row (D-04 delete-on-success).
587pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
588    let backend = conn.get_database_backend();
589    let p1 = ph(backend, 1);
590    let sql = format!("DELETE FROM jobs WHERE id = {p1}");
591    let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
592    conn.execute(stmt).await.map_err(Error::Db)?;
593    Ok(())
594}
595
596/// Park a job as `failed` with an error message, recording the failure time.
597pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
598    let backend = conn.get_database_backend();
599    let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
600    let sql =
601        format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
602    let stmt = Statement::from_sql_and_values(
603        backend,
604        &sql,
605        [
606            Value::String(Some(Box::new(error.to_string()))),
607            Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
608            Value::BigInt(Some(id)),
609        ],
610    );
611    conn.execute(stmt).await.map_err(Error::Db)?;
612    Ok(())
613}
614
615/// Reset a job to `pending`, bump its attempt count, and set a new
616/// `available_at` (used by the worker after a retryable failure).
617pub async fn release_job(
618    conn: &DatabaseConnection,
619    id: i64,
620    attempts: u32,
621    available_at: DateTime<Utc>,
622) -> Result<(), Error> {
623    let backend = conn.get_database_backend();
624    let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
625    let sql = format!(
626        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
627         attempts={p1}, available_at={p2} WHERE id = {p3}"
628    );
629    let stmt = Statement::from_sql_and_values(
630        backend,
631        &sql,
632        [
633            Value::Int(Some(attempts as i32)),
634            Value::String(Some(Box::new(available_at.to_rfc3339()))),
635            Value::BigInt(Some(id)),
636        ],
637    );
638    conn.execute(stmt).await.map_err(Error::Db)?;
639    Ok(())
640}
641
642/// Reset all jobs claimed by `worker_id` back to `pending` (D-10 shutdown
643/// re-queue — called by the worker loop before the process exits).
644pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
645    let backend = conn.get_database_backend();
646    let p1 = ph(backend, 1);
647    let sql = format!(
648        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
649         WHERE status='claimed' AND claimed_by={p1}"
650    );
651    let stmt = Statement::from_sql_and_values(
652        backend,
653        &sql,
654        [Value::String(Some(Box::new(worker_id.to_string())))],
655    );
656    conn.execute(stmt).await.map_err(Error::Db)?;
657    Ok(())
658}
659
660/// Park leftover `claimed` rows on `queues` as `failed` at worker startup.
661///
662/// Closes the orphan-claim window that the visibility-timeout reaper leaves
663/// open: when a worker is killed mid-job (SIGKILL from a deploy restart, OOM),
664/// its claimed rows stay `claimed` until `visibility_timeout` elapses
665/// (default 300s). Consumers reading `jobs.status` see those rows as Active
666/// for the full timeout window — UI labels like "in progress" stick until the
667/// row is reaped or manually cleared.
668///
669/// Called once per `WorkerLoop::run` before the claim loop starts. Operates
670/// only on the queues this worker handles, so concurrent workers on disjoint
671/// queues never clobber each other. Assumes the single-worker-per-queue model
672/// the rest of the worker is built around — multi-worker-per-queue setups
673/// should rely on `visibility_timeout` instead.
674///
675/// Returns the number of rows reaped.
676pub async fn reap_startup_claims(
677    conn: &DatabaseConnection,
678    queues: &[String],
679) -> Result<u64, Error> {
680    if queues.is_empty() {
681        return Ok(0);
682    }
683    let backend = conn.get_database_backend();
684    let now_iso = Utc::now().to_rfc3339();
685
686    // Numbered placeholders: $1..$N for queue names, $N+1 for failed_at (cast
687    // to timestamptz on Postgres — the column is timestamptz, the bound value
688    // is RFC 3339 text).
689    let queue_phs: Vec<String> = (1..=queues.len()).map(|i| ph(backend, i)).collect();
690    let failed_at_ph = ts_ph(backend, queues.len() + 1);
691    let sql = format!(
692        "UPDATE jobs SET status='failed', \
693         error='reaped on worker startup (orphan claim from previous worker)', \
694         failed_at={failed_at_ph} \
695         WHERE status='claimed' AND queue IN ({})",
696        queue_phs.join(", "),
697    );
698
699    let mut values: Vec<Value> = queues
700        .iter()
701        .map(|q| Value::String(Some(Box::new(q.clone()))))
702        .collect();
703    values.push(Value::String(Some(Box::new(now_iso))));
704
705    let stmt = Statement::from_sql_and_values(backend, &sql, values);
706    let result = conn.execute(stmt).await.map_err(Error::Db)?;
707    Ok(result.rows_affected())
708}
709
710// ---------------------------------------------------------------------------
711// Introspection / stat queries
712// ---------------------------------------------------------------------------
713
714/// Return up to `limit` pending (immediately eligible) jobs in `queue`.
715pub async fn get_pending_jobs(
716    conn: &DatabaseConnection,
717    queue: &str,
718    limit: u64,
719) -> Result<Vec<JobInfo>, Error> {
720    let backend = conn.get_database_backend();
721    let now_iso = Utc::now().to_rfc3339();
722    let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
723    let sql = format!(
724        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
725         FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
726         ORDER BY id LIMIT {p3}"
727    );
728    let stmt = Statement::from_sql_and_values(
729        backend,
730        &sql,
731        [
732            Value::String(Some(Box::new(queue.to_string()))),
733            Value::String(Some(Box::new(now_iso))),
734            Value::BigInt(Some(limit as i64)),
735        ],
736    );
737    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
738    rows.iter()
739        .map(|r| parse_job_info(r, JobState::Pending))
740        .collect()
741}
742
743/// Return up to `limit` delayed (not-yet-eligible) jobs in `queue`.
744pub async fn get_delayed_jobs(
745    conn: &DatabaseConnection,
746    queue: &str,
747    limit: u64,
748) -> Result<Vec<JobInfo>, Error> {
749    let backend = conn.get_database_backend();
750    let now_iso = Utc::now().to_rfc3339();
751    let (p1, p2, p3) = (ph(backend, 1), ts_ph(backend, 2), ph(backend, 3));
752    let sql = format!(
753        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
754         FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
755         ORDER BY id LIMIT {p3}"
756    );
757    let stmt = Statement::from_sql_and_values(
758        backend,
759        &sql,
760        [
761            Value::String(Some(Box::new(queue.to_string()))),
762            Value::String(Some(Box::new(now_iso))),
763            Value::BigInt(Some(limit as i64)),
764        ],
765    );
766    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
767    rows.iter()
768        .map(|r| parse_job_info(r, JobState::Delayed))
769        .collect()
770}
771
772/// Return up to `limit` failed jobs (across all queues).
773pub async fn get_failed_jobs(
774    conn: &DatabaseConnection,
775    limit: u64,
776) -> Result<Vec<FailedJobInfo>, Error> {
777    let backend = conn.get_database_backend();
778    let p1 = ph(backend, 1);
779    // Order by failure time, falling back to created_at for any legacy row that
780    // predates the failed_at column (WR-06).
781    let sql = format!(
782        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
783         error, failed_at FROM jobs WHERE status='failed' \
784         ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
785    );
786    let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
787    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
788    rows.iter().map(parse_failed_job_info).collect()
789}
790
791/// Return aggregate pending/delayed counts per queue and the total failed count.
792pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
793    let backend = conn.get_database_backend();
794    let now_iso = Utc::now().to_rfc3339();
795    let mut queue_stats = Vec::new();
796
797    for &q in queues {
798        let p1 = ph(backend, 1);
799        let p2 = ts_ph(backend, 2);
800        let pending_sql = format!(
801            "SELECT COUNT(*) as cnt FROM jobs \
802             WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
803        );
804        let pending_stmt = Statement::from_sql_and_values(
805            backend,
806            &pending_sql,
807            [
808                Value::String(Some(Box::new(q.to_string()))),
809                Value::String(Some(Box::new(now_iso.clone()))),
810            ],
811        );
812        let pending_row = conn
813            .query_one(pending_stmt)
814            .await
815            .map_err(Error::Db)?
816            .ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
817        let pending: i64 = pending_row
818            .try_get_by::<i64, _>("cnt")
819            .map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
820
821        let delayed_sql = format!(
822            "SELECT COUNT(*) as cnt FROM jobs \
823             WHERE status='pending' AND queue={p1} AND available_at > {p2}"
824        );
825        let delayed_stmt = Statement::from_sql_and_values(
826            backend,
827            &delayed_sql,
828            [
829                Value::String(Some(Box::new(q.to_string()))),
830                Value::String(Some(Box::new(now_iso.clone()))),
831            ],
832        );
833        let delayed_row = conn
834            .query_one(delayed_stmt)
835            .await
836            .map_err(Error::Db)?
837            .ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
838        let delayed: i64 = delayed_row
839            .try_get_by::<i64, _>("cnt")
840            .map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
841
842        queue_stats.push(SingleQueueStats {
843            name: q.to_string(),
844            pending: pending as usize,
845            delayed: delayed as usize,
846        });
847    }
848
849    // Total failed across all queues.
850    let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
851    let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
852    let failed_row = conn
853        .query_one(failed_stmt)
854        .await
855        .map_err(Error::Db)?
856        .ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
857    let total_failed: i64 = failed_row
858        .try_get_by::<i64, _>("cnt")
859        .map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
860
861    Ok(QueueStats {
862        queues: queue_stats,
863        total_failed: total_failed as usize,
864    })
865}
866
867// ---------------------------------------------------------------------------
868// Internal parse helpers for introspection types
869// ---------------------------------------------------------------------------
870
871fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
872    let id: i64 = row
873        .try_get_by::<i64, _>("id")
874        .map_err(|e| Error::custom(format!("parse id: {e}")))?;
875    let job_type: String = row
876        .try_get_by::<String, _>("job_type")
877        .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
878    let queue: String = row
879        .try_get_by::<String, _>("queue")
880        .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
881    let attempts: i32 = row
882        .try_get_by::<i32, _>("attempts")
883        .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
884    let max_retries: i32 = row
885        .try_get_by::<i32, _>("max_retries")
886        .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
887    let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
888    let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
889
890    Ok(JobInfo {
891        id,
892        job_type,
893        queue,
894        attempts: attempts as u32,
895        max_retries: max_retries as u32,
896        created_at,
897        available_at,
898        state,
899    })
900}
901
902fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
903    let job = parse_job_info(row, JobState::Failed)?;
904    let error: String = row
905        .try_get_by::<Option<String>, _>("error")
906        .map_err(|e| Error::custom(format!("parse error: {e}")))?
907        .unwrap_or_default();
908    // Prefer the recorded failure time; fall back to created_at for legacy rows
909    // parked before the failed_at column existed (WR-06).
910    let failed_at =
911        parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
912
913    Ok(FailedJobInfo {
914        job,
915        error,
916        failed_at,
917    })
918}
919
920// ---------------------------------------------------------------------------
921// Tests
922// ---------------------------------------------------------------------------
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927    use sea_orm::Database;
928    use sea_orm_migration::MigratorTrait;
929
930    struct TestMigrator;
931
932    #[async_trait::async_trait]
933    impl MigratorTrait for TestMigrator {
934        fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
935            vec![Box::new(crate::migration::CreateJobsTable)]
936        }
937    }
938
939    /// Spin up an in-memory SQLite DB and run the jobs migration.
940    async fn setup() -> DatabaseConnection {
941        let conn = Database::connect("sqlite::memory:")
942            .await
943            .expect("connect sqlite::memory:");
944        TestMigrator::up(&conn, None)
945            .await
946            .expect("run CreateJobsTable migration");
947        conn
948    }
949
950    /// Insert a job row directly (bypassing enqueue) for test setup.
951    #[allow(clippy::too_many_arguments)]
952    async fn insert_job(
953        conn: &DatabaseConnection,
954        queue: &str,
955        job_type: &str,
956        status: &str,
957        attempts: i32,
958        max_retries: i32,
959        claimed_at: Option<&str>,
960        available_at: &str,
961    ) -> i64 {
962        let now = Utc::now().to_rfc3339();
963        let claimed_at_sql = match claimed_at {
964            Some(ts) => format!("'{ts}'"),
965            None => "NULL".to_string(),
966        };
967        let sql = format!(
968            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
969             available_at, claimed_at, created_at) \
970             VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
971             '{available_at}', {claimed_at_sql}, '{now}') \
972             RETURNING id"
973        );
974        let row = conn
975            .query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
976            .await
977            .expect("insert_job query")
978            .expect("insert_job row");
979        row.try_get_by::<i64, _>("id").expect("insert id")
980    }
981
982    #[test]
983    fn ts_ph_casts_timestamp_placeholders_on_postgres_only() {
984        // Regression guard for the Postgres timestamptz binding bug. The jobs
985        // table's timestamp columns are real `timestamptz` on Postgres, but
986        // ferro-queue binds timestamps as RFC 3339 text — so every placeholder
987        // landing on a timestamp column must cast on Postgres and stay bare on
988        // SQLite (TEXT columns, lexical compare). The in-memory DB tests below
989        // run on SQLite only and cannot catch the Postgres `timestamptz < text`
990        // / `column is timestamptz but expression is text` failures, so assert
991        // the generated placeholder form directly.
992        assert_eq!(ts_ph(DatabaseBackend::Postgres, 2), "$2::timestamptz");
993        assert_eq!(ts_ph(DatabaseBackend::Sqlite, 2), "?2");
994        // Non-timestamp placeholders must never be cast.
995        assert_eq!(ph(DatabaseBackend::Postgres, 2), "$2");
996        assert_eq!(ph(DatabaseBackend::Sqlite, 2), "?2");
997    }
998
999    #[tokio::test]
1000    async fn claim_returns_pending_job() {
1001        let conn = setup().await;
1002        let now = Utc::now().to_rfc3339();
1003
1004        // Enqueue one job via direct INSERT.
1005        insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
1006
1007        // First claim should return the job.
1008        let job = claim(&conn, "default", "worker-1")
1009            .await
1010            .expect("claim failed");
1011        assert!(job.is_some(), "expected Some(job), got None");
1012        let job = job.unwrap();
1013        assert_eq!(job.job_type, "MyJob");
1014
1015        // Second claim on the same queue should return None (job is now claimed).
1016        let second = claim(&conn, "default", "worker-2")
1017            .await
1018            .expect("second claim failed");
1019        assert!(second.is_none(), "second claim should return None");
1020    }
1021
1022    #[tokio::test]
1023    async fn idempotency_dedup() {
1024        let conn = setup().await;
1025        let now = Utc::now().to_rfc3339();
1026        let available_at = DateTime::parse_from_rfc3339(&now)
1027            .unwrap()
1028            .with_timezone(&Utc);
1029
1030        // Enqueue with an idempotency key twice.
1031        enqueue(
1032            &conn,
1033            "default",
1034            "MyJob",
1035            "{}",
1036            3,
1037            Some("key-abc"),
1038            None,
1039            available_at,
1040        )
1041        .await
1042        .expect("first enqueue");
1043        enqueue(
1044            &conn,
1045            "default",
1046            "MyJob",
1047            "{}",
1048            3,
1049            Some("key-abc"),
1050            None,
1051            available_at,
1052        )
1053        .await
1054        .expect("second enqueue (should be a no-op)");
1055
1056        // Verify COUNT = 1.
1057        let row = conn
1058            .query_one(Statement::from_string(
1059                DatabaseBackend::Sqlite,
1060                "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
1061            ))
1062            .await
1063            .expect("count query")
1064            .expect("count row");
1065        let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
1066        assert_eq!(
1067            cnt, 1,
1068            "idempotency key should deduplicate (expected 1 row)"
1069        );
1070
1071        // Enqueue without idempotency key twice — both must insert.
1072        enqueue(
1073            &conn,
1074            "default",
1075            "OtherJob",
1076            "{}",
1077            3,
1078            None,
1079            None,
1080            available_at,
1081        )
1082        .await
1083        .expect("first plain enqueue");
1084        enqueue(
1085            &conn,
1086            "default",
1087            "OtherJob",
1088            "{}",
1089            3,
1090            None,
1091            None,
1092            available_at,
1093        )
1094        .await
1095        .expect("second plain enqueue");
1096
1097        let row2 = conn
1098            .query_one(Statement::from_string(
1099                DatabaseBackend::Sqlite,
1100                "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
1101            ))
1102            .await
1103            .expect("count query 2")
1104            .expect("count row 2");
1105        let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
1106        assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
1107    }
1108
1109    #[tokio::test]
1110    async fn reaper_reclaims_stuck_job() {
1111        let conn = setup().await;
1112
1113        // Insert a claimed job with claimed_at 10 minutes ago, attempts=0, max_retries=3.
1114        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1115        let now = Utc::now().to_rfc3339();
1116        let id = insert_job(
1117            &conn,
1118            "default",
1119            "StuckJob",
1120            "claimed",
1121            0,
1122            3,
1123            Some(&ten_min_ago),
1124            &now,
1125        )
1126        .await;
1127
1128        // Run reaper with 5-minute visibility timeout.
1129        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1130            .await
1131            .expect("reaper failed");
1132
1133        // Verify status = 'pending' and attempts = 1.
1134        let row = conn
1135            .query_one(Statement::from_string(
1136                DatabaseBackend::Sqlite,
1137                format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1138            ))
1139            .await
1140            .expect("select after reaper")
1141            .expect("row after reaper");
1142
1143        let status: String = row.try_get_by::<String, _>("status").expect("status");
1144        let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1145        assert_eq!(
1146            status, "pending",
1147            "reaper should reset stuck job to pending"
1148        );
1149        assert_eq!(attempts, 1, "reaper should increment attempts");
1150    }
1151
1152    #[tokio::test]
1153    async fn reaper_boundary_parks_last_attempt() {
1154        // attempts == max_retries - 1: the in-flight (timed-out) attempt is the
1155        // last allowed, so the reaper must PARK it, not requeue — matching the
1156        // worker's handle_failure boundary (WR-01).
1157        let conn = setup().await;
1158        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1159        let now = Utc::now().to_rfc3339();
1160        let id = insert_job(
1161            &conn,
1162            "default",
1163            "BoundaryJob",
1164            "claimed",
1165            2, // attempts
1166            3, // max_retries
1167            Some(&ten_min_ago),
1168            &now,
1169        )
1170        .await;
1171
1172        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1173            .await
1174            .expect("reaper failed");
1175
1176        let row = conn
1177            .query_one(Statement::from_string(
1178                DatabaseBackend::Sqlite,
1179                format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1180            ))
1181            .await
1182            .expect("select after reaper")
1183            .expect("row");
1184        let status: String = row.try_get_by::<String, _>("status").expect("status");
1185        let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1186        assert_eq!(
1187            status, "failed",
1188            "job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
1189        );
1190        assert_eq!(
1191            attempts, 2,
1192            "parked job keeps its attempt count (no further requeue)"
1193        );
1194    }
1195
1196    #[tokio::test]
1197    async fn reap_startup_claims_marks_orphans_failed() {
1198        // A worker killed mid-job leaves rows at status='claimed'. On the next
1199        // worker boot, reap_startup_claims must park them as failed so consumers
1200        // reading jobs.status no longer see them as Active.
1201        let conn = setup().await;
1202        let now = Utc::now().to_rfc3339();
1203
1204        // One orphan on "default", one orphan on "publish", one on an untouched
1205        // queue, plus a pending row that must NOT be reaped.
1206        let orphan_default = insert_job(
1207            &conn,
1208            "default",
1209            "Orphan1",
1210            "claimed",
1211            0,
1212            3,
1213            Some(&now),
1214            &now,
1215        )
1216        .await;
1217        let orphan_publish = insert_job(
1218            &conn,
1219            "publish",
1220            "Orphan2",
1221            "claimed",
1222            1,
1223            3,
1224            Some(&now),
1225            &now,
1226        )
1227        .await;
1228        let orphan_other =
1229            insert_job(&conn, "other", "Orphan3", "claimed", 0, 3, Some(&now), &now).await;
1230        let pending_default =
1231            insert_job(&conn, "default", "Fresh", "pending", 0, 3, None, &now).await;
1232
1233        // Worker handles "default" and "publish" — must reap exactly those orphans.
1234        let reaped = reap_startup_claims(&conn, &["default".to_string(), "publish".to_string()])
1235            .await
1236            .expect("reap_startup_claims failed");
1237        assert_eq!(
1238            reaped, 2,
1239            "expected 2 orphan rows reaped (default + publish)"
1240        );
1241
1242        // The two scoped orphans are now failed with a non-null error + failed_at.
1243        for id in [orphan_default, orphan_publish] {
1244            let row = conn
1245                .query_one(Statement::from_string(
1246                    DatabaseBackend::Sqlite,
1247                    format!("SELECT status, error, failed_at FROM jobs WHERE id={id}"),
1248                ))
1249                .await
1250                .expect("select after reap")
1251                .expect("row after reap");
1252            let status: String = row.try_get_by::<String, _>("status").expect("status");
1253            let error: Option<String> =
1254                row.try_get_by::<Option<String>, _>("error").expect("error");
1255            let failed_at: Option<String> = row
1256                .try_get_by::<Option<String>, _>("failed_at")
1257                .expect("failed_at");
1258            assert_eq!(status, "failed", "orphan must be parked as failed");
1259            assert!(
1260                error.as_deref().unwrap_or("").contains("orphan claim"),
1261                "error must record reap reason, got: {error:?}"
1262            );
1263            assert!(failed_at.is_some(), "failed_at must be stamped");
1264        }
1265
1266        // The untouched queue's orphan is left alone (not in this worker's queues).
1267        let row = conn
1268            .query_one(Statement::from_string(
1269                DatabaseBackend::Sqlite,
1270                format!("SELECT status FROM jobs WHERE id={orphan_other}"),
1271            ))
1272            .await
1273            .expect("select untouched")
1274            .expect("row untouched");
1275        let status: String = row.try_get_by::<String, _>("status").expect("status");
1276        assert_eq!(
1277            status, "claimed",
1278            "orphan on a queue not handled by this worker must be left alone"
1279        );
1280
1281        // The fresh pending row is untouched.
1282        let row = conn
1283            .query_one(Statement::from_string(
1284                DatabaseBackend::Sqlite,
1285                format!("SELECT status FROM jobs WHERE id={pending_default}"),
1286            ))
1287            .await
1288            .expect("select pending")
1289            .expect("row pending");
1290        let status: String = row.try_get_by::<String, _>("status").expect("status");
1291        assert_eq!(status, "pending", "pending row must not be reaped");
1292    }
1293
1294    #[tokio::test]
1295    async fn reap_startup_claims_empty_queues_is_noop() {
1296        // Defensive: an empty queue list (no queues configured) must short-circuit
1297        // without building an invalid `IN ()` SQL clause.
1298        let conn = setup().await;
1299        let now = Utc::now().to_rfc3339();
1300        insert_job(
1301            &conn,
1302            "default",
1303            "Orphan",
1304            "claimed",
1305            0,
1306            3,
1307            Some(&now),
1308            &now,
1309        )
1310        .await;
1311
1312        let reaped = reap_startup_claims(&conn, &[])
1313            .await
1314            .expect("reap on empty queues must succeed");
1315        assert_eq!(reaped, 0);
1316    }
1317
1318    #[tokio::test]
1319    async fn poison_job_parked() {
1320        let conn = setup().await;
1321
1322        // Insert an exhausted job (attempts == max_retries) claimed 10 min ago.
1323        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1324        let now = Utc::now().to_rfc3339();
1325        let id = insert_job(
1326            &conn,
1327            "default",
1328            "PoisonJob",
1329            "claimed",
1330            3,
1331            3,
1332            Some(&ten_min_ago),
1333            &now,
1334        )
1335        .await;
1336
1337        // Run reaper.
1338        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1339            .await
1340            .expect("reaper failed");
1341
1342        // Verify status = 'failed' and error IS NOT NULL.
1343        let row = conn
1344            .query_one(Statement::from_string(
1345                DatabaseBackend::Sqlite,
1346                format!("SELECT status, error FROM jobs WHERE id={id}"),
1347            ))
1348            .await
1349            .expect("select after reaper")
1350            .expect("row");
1351
1352        let status: String = row.try_get_by::<String, _>("status").expect("status");
1353        let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
1354        assert_eq!(status, "failed", "exhausted job should be parked as failed");
1355        assert!(error.is_some(), "failed job should have an error message");
1356
1357        // A fresh pending job should still be claimable (parked row does not block).
1358        let available = Utc::now().to_rfc3339();
1359        insert_job(
1360            &conn, "default", "FreshJob", "pending", 0, 3, None, &available,
1361        )
1362        .await;
1363
1364        let claimed = claim(&conn, "default", "worker-1")
1365            .await
1366            .expect("claim after poison park");
1367        assert!(
1368            claimed.is_some(),
1369            "fresh job should be claimable after poison job is parked"
1370        );
1371        let claimed = claimed.unwrap();
1372        assert_eq!(claimed.job_type, "FreshJob");
1373    }
1374}