Skip to main content

ferro_queue/
db.rs

1//! DB engine for ferro-queue: dual-backend atomic claim, idempotent enqueue,
2//! stuck-job reaper, job lifecycle ops, and the `Queue` global.
3//!
4//! Supports SQLite (single `UPDATE … RETURNING` inside a pinned transaction)
5//! and Postgres (`FOR UPDATE SKIP LOCKED` inside a transaction). All dynamic values are
6//! bound via `Statement::from_sql_and_values` — no string interpolation of
7//! caller-supplied data (T-185-01).
8
9use chrono::{DateTime, Utc};
10use sea_orm::{
11    ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, TransactionTrait, Value,
12};
13use serde::{Deserialize, Serialize};
14
15use crate::error::Error;
16
17// ---------------------------------------------------------------------------
18// Queue global
19// ---------------------------------------------------------------------------
20
21static GLOBAL_CONNECTION: std::sync::OnceLock<DatabaseConnection> = std::sync::OnceLock::new();
22
23/// Registered job-type applier functions, collected before the server starts.
24type RegisterFn = Box<dyn Fn(&mut crate::WorkerLoop) + Send + Sync>;
25static JOB_REGISTRARS: std::sync::Mutex<Vec<RegisterFn>> = std::sync::Mutex::new(Vec::new());
26
27/// Global handle to the queue's database connection.
28///
29/// Initialise once at application start with [`Queue::init`]; worker loops
30/// and dispatcher call [`Queue::connection`] to get the static reference.
31pub struct Queue;
32
33impl Queue {
34    /// Return a reference to the global `DatabaseConnection`.
35    ///
36    /// # Panics
37    ///
38    /// Panics if [`Queue::init`] has not been called yet.
39    pub fn connection() -> &'static DatabaseConnection {
40        GLOBAL_CONNECTION
41            .get()
42            .expect("Queue not initialized. Call Queue::init() first.")
43    }
44
45    /// Store `conn` as the global connection.
46    ///
47    /// Returns `Err` if called more than once.
48    pub async fn init(conn: DatabaseConnection) -> Result<(), Error> {
49        GLOBAL_CONNECTION
50            .set(conn)
51            .map_err(|_| Error::custom("Queue already initialized"))?;
52        Ok(())
53    }
54
55    /// Returns `true` if [`Queue::init`] has been called.
56    pub fn is_initialized() -> bool {
57        GLOBAL_CONNECTION.get().is_some()
58    }
59
60    /// Register a job type for auto-start by the framework's WorkerLoop.
61    ///
62    /// Call this in your application bootstrap before the server starts.
63    /// The framework's server boot path inspects [`Queue::has_registered_jobs`]
64    /// and spawns a `WorkerLoop` automatically when at least one type is registered.
65    pub fn register<J>()
66    where
67        J: crate::Job + serde::de::DeserializeOwned + 'static,
68    {
69        JOB_REGISTRARS
70            .lock()
71            .unwrap()
72            .push(Box::new(|w: &mut crate::WorkerLoop| w.register::<J>()));
73    }
74
75    /// Returns `true` if at least one job type has been registered via [`Queue::register`].
76    pub fn has_registered_jobs() -> bool {
77        !JOB_REGISTRARS.lock().unwrap().is_empty()
78    }
79
80    /// Apply all registered job types to the given `WorkerLoop`.
81    ///
82    /// Used internally by [`WorkerLoop::from_registry`].
83    pub(crate) fn apply_registrars(w: &mut crate::WorkerLoop) {
84        for r in JOB_REGISTRARS.lock().unwrap().iter() {
85            r(w);
86        }
87    }
88}
89
90// ---------------------------------------------------------------------------
91// JobRow — a claimed row from the jobs table
92// ---------------------------------------------------------------------------
93
94/// A row read from the `jobs` table during a claim operation.
95#[derive(Debug, Clone)]
96pub struct JobRow {
97    /// Primary key.
98    pub id: i64,
99    /// Fully-qualified type name of the job (used to route to the correct handler).
100    pub job_type: String,
101    /// JSON-serialized job data.
102    pub payload: String,
103    /// Name of the queue this job belongs to.
104    pub queue: String,
105    /// Number of execution attempts so far.
106    pub attempts: u32,
107    /// Maximum attempts before the job is parked as failed.
108    pub max_retries: u32,
109    /// Optional deduplication key.
110    pub idempotency_key: Option<String>,
111    /// Optional tenant scope.
112    pub tenant_id: Option<i64>,
113    /// Earliest time the job may be claimed.
114    pub available_at: DateTime<Utc>,
115    /// Insertion timestamp.
116    pub created_at: DateTime<Utc>,
117}
118
119// ---------------------------------------------------------------------------
120// Introspection types (moved from queue.rs)
121// ---------------------------------------------------------------------------
122
123/// Job status for introspection queries.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum JobState {
127    /// Waiting to be claimed.
128    Pending,
129    /// Scheduled for a future time.
130    Delayed,
131    /// Permanently failed after exhausting retries.
132    Failed,
133}
134
135/// Summary of a single job for introspection.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct JobInfo {
138    /// Primary key.
139    pub id: i64,
140    /// Fully-qualified job type name.
141    pub job_type: String,
142    /// Queue name.
143    pub queue: String,
144    /// Execution attempt count.
145    pub attempts: u32,
146    /// Maximum retry count.
147    pub max_retries: u32,
148    /// Insertion timestamp (RFC 3339).
149    pub created_at: String,
150    /// Earliest eligible claim time (RFC 3339).
151    pub available_at: String,
152    /// Logical state.
153    pub state: JobState,
154}
155
156/// Per-queue pending/delayed counts.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct SingleQueueStats {
159    /// Queue name.
160    pub name: String,
161    /// Jobs with `status='pending'` and `available_at <= now`.
162    pub pending: usize,
163    /// Jobs with `status='pending'` and `available_at > now`.
164    pub delayed: usize,
165}
166
167/// Aggregate stats across all queues.
168#[derive(Debug, Clone, Default, Serialize, Deserialize)]
169pub struct QueueStats {
170    /// Per-queue breakdown.
171    pub queues: Vec<SingleQueueStats>,
172    /// Total jobs with `status='failed'`.
173    pub total_failed: usize,
174}
175
176/// A failed job with the error message.
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FailedJobInfo {
179    /// Core job fields.
180    pub job: JobInfo,
181    /// Error message stored at failure time.
182    pub error: String,
183    /// When the job was parked as failed (recorded in the `failed_at` column;
184    /// falls back to `created_at` only for legacy rows that predate the column).
185    pub failed_at: DateTime<Utc>,
186}
187
188// ---------------------------------------------------------------------------
189// Internal helpers
190// ---------------------------------------------------------------------------
191
192/// Map a `QueryResult` row to a `JobRow`.
193fn parse_job_row(row: &sea_orm::QueryResult) -> Result<JobRow, Error> {
194    let id: i64 = row
195        .try_get_by::<i64, _>("id")
196        .map_err(|e| Error::custom(format!("parse id: {e}")))?;
197    let job_type: String = row
198        .try_get_by::<String, _>("job_type")
199        .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
200    let payload: String = row
201        .try_get_by::<String, _>("payload")
202        .map_err(|e| Error::custom(format!("parse payload: {e}")))?;
203    let queue: String = row
204        .try_get_by::<String, _>("queue")
205        .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
206    let attempts: i32 = row
207        .try_get_by::<i32, _>("attempts")
208        .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
209    let max_retries: i32 = row
210        .try_get_by::<i32, _>("max_retries")
211        .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
212    let idempotency_key: Option<String> = row
213        .try_get_by::<Option<String>, _>("idempotency_key")
214        .map_err(|e| Error::custom(format!("parse idempotency_key: {e}")))?;
215    let tenant_id: Option<i64> = row
216        .try_get_by::<Option<i64>, _>("tenant_id")
217        .map_err(|e| Error::custom(format!("parse tenant_id: {e}")))?;
218
219    // available_at / created_at — SQLite stores as ISO-8601 text, Postgres as timestamptz.
220    // SeaORM maps both to DateTime<Utc> when the column is timestamp_with_time_zone; on
221    // SQLite we fall back to string parsing if the native mapping fails.
222    let available_at = parse_timestamp(row, "available_at")?;
223    let created_at = parse_timestamp(row, "created_at")?;
224
225    Ok(JobRow {
226        id,
227        job_type,
228        payload,
229        queue,
230        attempts: attempts as u32,
231        max_retries: max_retries as u32,
232        idempotency_key,
233        tenant_id,
234        available_at,
235        created_at,
236    })
237}
238
239/// Parse a timestamp column that may arrive as `DateTime<Utc>` (Postgres) or
240/// as an ISO-8601 string (SQLite).
241fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
242    // Try native DateTime<Utc> first (Postgres timestamptz).
243    if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
244        return Ok(dt);
245    }
246    // Fall back to string parsing (SQLite TEXT column).
247    let s: String = row
248        .try_get_by::<String, _>(col)
249        .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
250    DateTime::parse_from_rfc3339(&s)
251        .map(|dt| dt.with_timezone(&Utc))
252        .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
253}
254
255/// Parse a nullable timestamp column. Returns `Ok(None)` when the column is
256/// SQL NULL, mirroring `parse_timestamp`'s Postgres-then-SQLite fallback.
257fn parse_optional_timestamp(
258    row: &sea_orm::QueryResult,
259    col: &str,
260) -> Result<Option<DateTime<Utc>>, Error> {
261    // Native Option<DateTime<Utc>> first (Postgres timestamptz).
262    if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
263        return Ok(opt);
264    }
265    // Fall back to Option<String> (SQLite TEXT column).
266    let s: Option<String> = row
267        .try_get_by::<Option<String>, _>(col)
268        .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
269    match s {
270        None => Ok(None),
271        Some(s) => DateTime::parse_from_rfc3339(&s)
272            .map(|dt| Some(dt.with_timezone(&Utc)))
273            .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
274    }
275}
276
277/// SQL placeholder style: Postgres uses `$N`, SQLite uses `?N`.
278fn ph(backend: DatabaseBackend, n: usize) -> String {
279    match backend {
280        DatabaseBackend::Postgres => format!("${n}"),
281        _ => format!("?{n}"),
282    }
283}
284
285// ---------------------------------------------------------------------------
286// claim — atomic work-stealing claim (dual-backend)
287// ---------------------------------------------------------------------------
288
289/// Atomically claim one pending job from `queue`.
290///
291/// Returns `None` when the queue is empty or all eligible jobs are locked by
292/// another worker (Postgres). Uses:
293/// - Postgres: `SELECT … FOR UPDATE SKIP LOCKED` + `UPDATE` inside a transaction.
294/// - SQLite: single `UPDATE … RETURNING` inside a `conn.begin()` transaction so
295///   every statement is pinned to one pooled connection; the write lock is taken
296///   on the UPDATE itself (no prior read to upgrade, so no `BEGIN IMMEDIATE`
297///   needed).
298pub async fn claim(
299    conn: &DatabaseConnection,
300    queue: &str,
301    worker_id: &str,
302) -> Result<Option<JobRow>, Error> {
303    match conn.get_database_backend() {
304        DatabaseBackend::Postgres => claim_postgres(conn, queue, worker_id).await,
305        DatabaseBackend::Sqlite => claim_sqlite(conn, queue, worker_id).await,
306        _ => Err(Error::UnsupportedBackend),
307    }
308}
309
310async fn claim_postgres(
311    conn: &DatabaseConnection,
312    queue: &str,
313    worker_id: &str,
314) -> Result<Option<JobRow>, Error> {
315    let txn = conn.begin().await.map_err(Error::Db)?;
316
317    let select = Statement::from_sql_and_values(
318        DatabaseBackend::Postgres,
319        "SELECT id, job_type, payload, queue, attempts, max_retries, idempotency_key, \
320         tenant_id, available_at, created_at FROM jobs \
321         WHERE status = 'pending' AND queue = $1 AND available_at <= NOW() \
322         ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED",
323        [Value::String(Some(Box::new(queue.to_string())))],
324    );
325
326    let row = txn.query_one(select).await.map_err(Error::Db)?;
327    let Some(row) = row else {
328        txn.commit().await.map_err(Error::Db)?;
329        return Ok(None);
330    };
331    let job = parse_job_row(&row)?;
332
333    let upd = Statement::from_sql_and_values(
334        DatabaseBackend::Postgres,
335        "UPDATE jobs SET status = 'claimed', claimed_at = NOW(), claimed_by = $2 WHERE id = $1",
336        [
337            Value::BigInt(Some(job.id)),
338            Value::String(Some(Box::new(worker_id.to_string()))),
339        ],
340    );
341    txn.execute(upd).await.map_err(Error::Db)?;
342    txn.commit().await.map_err(Error::Db)?;
343
344    Ok(Some(job))
345}
346
347async fn claim_sqlite(
348    conn: &DatabaseConnection,
349    queue: &str,
350    worker_id: &str,
351) -> Result<Option<JobRow>, Error> {
352    let now_iso = Utc::now().to_rfc3339();
353
354    // Acquire a transaction handle: SeaORM `begin()` checks out ONE physical
355    // connection from the pool and pins every statement on this handle to it.
356    // This is the correctness-critical change (CR-01) — issuing BEGIN/UPDATE/
357    // COMMIT directly on `conn` let the statements land on different pooled
358    // connections, breaking atomicity and leaking an open transaction back into
359    // the pool on the error path.
360    //
361    // `begin()` emits a DEFERRED BEGIN on SQLite. We do not need an explicit
362    // BEGIN IMMEDIATE here: the claim is a single `UPDATE … RETURNING`, so the
363    // write lock is acquired on that first (and only) statement. There is no
364    // prior read in this txn that could trigger the deferred-read-then-upgrade
365    // deadlock that BEGIN IMMEDIATE exists to avoid. Atomicity is guaranteed by
366    // the statements being pinned to one connection inside the txn handle.
367    let txn = conn.begin().await.map_err(Error::Db)?;
368
369    let stmt = Statement::from_sql_and_values(
370        DatabaseBackend::Sqlite,
371        "UPDATE jobs SET status='claimed', claimed_at=?1, claimed_by=?2 \
372         WHERE id = ( SELECT id FROM jobs WHERE status='pending' AND queue=?3 \
373           AND available_at <= ?1 ORDER BY id LIMIT 1 ) \
374         RETURNING id, job_type, payload, queue, attempts, max_retries, \
375           idempotency_key, tenant_id, available_at, created_at",
376        [
377            Value::String(Some(Box::new(now_iso))),
378            Value::String(Some(Box::new(worker_id.to_string()))),
379            Value::String(Some(Box::new(queue.to_string()))),
380        ],
381    );
382
383    let row = match txn.query_one(stmt).await {
384        Ok(r) => r,
385        Err(e) => {
386            // Roll back so the connection returns to the pool with no open txn.
387            let _ = txn.rollback().await;
388            return Err(Error::Db(e));
389        }
390    };
391    txn.commit().await.map_err(Error::Db)?;
392
393    row.map(|r| parse_job_row(&r)).transpose()
394}
395
396// ---------------------------------------------------------------------------
397// reaper — re-queue stuck claimed rows; park exhausted ones as failed
398// ---------------------------------------------------------------------------
399
400/// Re-queue claimed rows that have been held longer than `visibility_timeout`.
401///
402/// `attempts` is the number of attempts already completed. A row is requeued
403/// (for one more attempt) only while `attempts + 1 < max_retries`; once the
404/// next attempt would be the last allowed (`attempts + 1 >= max_retries`) the
405/// row is parked as `failed`. This matches the worker's handler-failure
406/// boundary (`handle_failure`: park when `attempts + 1 >= max_retries`) so a
407/// job gets the same total attempt count whether it fails via handler error or
408/// via visibility timeout (WR-01, T-185-04).
409pub async fn reaper(
410    conn: &DatabaseConnection,
411    queue: &str,
412    visibility_timeout: std::time::Duration,
413) -> Result<(), Error> {
414    let now = Utc::now();
415    let duration = chrono::Duration::from_std(visibility_timeout)
416        .map_err(|e| Error::custom(format!("visibility_timeout out of range: {e}")))?;
417    let cutoff = (now - duration).to_rfc3339();
418    let now_iso = now.to_rfc3339();
419
420    let backend = conn.get_database_backend();
421    let txn = conn.begin().await.map_err(Error::Db)?;
422
423    // Step 1: re-queue rows that still have a retry left after counting this
424    // timed-out attempt. The in-flight attempt (number `attempts + 1`) failed,
425    // so requeue only while `attempts + 1 < max_retries` — matching
426    // `worker::handle_failure`.
427    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
428    let requeue_sql = format!(
429        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
430         attempts = attempts + 1, available_at = {p1} \
431         WHERE status='claimed' AND claimed_at < {p2} \
432         AND attempts + 1 < max_retries AND queue = {p3}"
433    );
434    let requeue = Statement::from_sql_and_values(
435        backend,
436        &requeue_sql,
437        [
438            Value::String(Some(Box::new(now_iso.clone()))),
439            Value::String(Some(Box::new(cutoff.clone()))),
440            Value::String(Some(Box::new(queue.to_string()))),
441        ],
442    );
443    txn.execute(requeue).await.map_err(Error::Db)?;
444
445    // Step 2: park exhausted rows as failed, recording the failure time.
446    // Use fresh placeholders (independent statement, not continuing p1..p3).
447    let (pp1, pp2, pp3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
448    let park_sql = format!(
449        "UPDATE jobs SET status='failed', error='visibility timeout exceeded', failed_at={pp1} \
450         WHERE status='claimed' AND claimed_at < {pp2} \
451         AND attempts + 1 >= max_retries AND queue = {pp3}"
452    );
453    let park = Statement::from_sql_and_values(
454        backend,
455        &park_sql,
456        [
457            Value::String(Some(Box::new(now_iso))),
458            Value::String(Some(Box::new(cutoff))),
459            Value::String(Some(Box::new(queue.to_string()))),
460        ],
461    );
462    txn.execute(park).await.map_err(Error::Db)?;
463
464    txn.commit().await.map_err(Error::Db)
465}
466
467// ---------------------------------------------------------------------------
468// enqueue — idempotent insert
469// ---------------------------------------------------------------------------
470
471/// Insert a new job into the queue.
472///
473/// When `idempotency_key` is `Some`, skips the insert if a `pending` or
474/// `claimed` row with the same `(job_type, idempotency_key)` already exists
475/// (D-15, T-185-01).
476#[allow(clippy::too_many_arguments)]
477pub async fn enqueue(
478    conn: &DatabaseConnection,
479    queue: &str,
480    job_type: &str,
481    payload: &str,
482    max_retries: u32,
483    idempotency_key: Option<&str>,
484    tenant_id: Option<i64>,
485    available_at: DateTime<Utc>,
486) -> Result<(), Error> {
487    let backend = conn.get_database_backend();
488    let now_iso = Utc::now().to_rfc3339();
489    let available_iso = available_at.to_rfc3339();
490
491    if let Some(idem) = idempotency_key {
492        // Idempotent insert: skip if a pending/claimed row with the same key exists.
493        let (p1, p2, p3, p4, p5, p6, p7, p8) = (
494            ph(backend, 1),
495            ph(backend, 2),
496            ph(backend, 3),
497            ph(backend, 4),
498            ph(backend, 5),
499            ph(backend, 6),
500            ph(backend, 7),
501            ph(backend, 8),
502        );
503        let sql = format!(
504            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
505             idempotency_key, tenant_id, available_at, created_at) \
506             SELECT {p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7}, {p8} \
507             WHERE NOT EXISTS ( \
508               SELECT 1 FROM jobs WHERE job_type = {p2} AND idempotency_key = {p5} \
509               AND status IN ('pending','claimed') \
510             )"
511        );
512        let stmt = Statement::from_sql_and_values(
513            backend,
514            &sql,
515            [
516                Value::String(Some(Box::new(queue.to_string()))),
517                Value::String(Some(Box::new(job_type.to_string()))),
518                Value::String(Some(Box::new(payload.to_string()))),
519                Value::Int(Some(max_retries as i32)),
520                Value::String(Some(Box::new(idem.to_string()))),
521                tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
522                Value::String(Some(Box::new(available_iso))),
523                Value::String(Some(Box::new(now_iso))),
524            ],
525        );
526        conn.execute(stmt).await.map_err(Error::Db)?;
527    } else {
528        // Plain insert — no deduplication guard.
529        let (p1, p2, p3, p4, p5, p6, p7) = (
530            ph(backend, 1),
531            ph(backend, 2),
532            ph(backend, 3),
533            ph(backend, 4),
534            ph(backend, 5),
535            ph(backend, 6),
536            ph(backend, 7),
537        );
538        let sql = format!(
539            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
540             tenant_id, available_at, created_at) \
541             VALUES ({p1}, {p2}, {p3}, 'pending', 0, {p4}, {p5}, {p6}, {p7})"
542        );
543        let stmt = Statement::from_sql_and_values(
544            backend,
545            &sql,
546            [
547                Value::String(Some(Box::new(queue.to_string()))),
548                Value::String(Some(Box::new(job_type.to_string()))),
549                Value::String(Some(Box::new(payload.to_string()))),
550                Value::Int(Some(max_retries as i32)),
551                tenant_id.map_or(Value::BigInt(None), |id| Value::BigInt(Some(id))),
552                Value::String(Some(Box::new(available_iso))),
553                Value::String(Some(Box::new(now_iso))),
554            ],
555        );
556        conn.execute(stmt).await.map_err(Error::Db)?;
557    }
558
559    Ok(())
560}
561
562// ---------------------------------------------------------------------------
563// Lifecycle operations
564// ---------------------------------------------------------------------------
565
566/// Delete a successfully-completed job row (D-04 delete-on-success).
567pub async fn delete_job(conn: &DatabaseConnection, id: i64) -> Result<(), Error> {
568    let backend = conn.get_database_backend();
569    let p1 = ph(backend, 1);
570    let sql = format!("DELETE FROM jobs WHERE id = {p1}");
571    let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
572    conn.execute(stmt).await.map_err(Error::Db)?;
573    Ok(())
574}
575
576/// Park a job as `failed` with an error message, recording the failure time.
577pub async fn fail_job(conn: &DatabaseConnection, id: i64, error: &str) -> Result<(), Error> {
578    let backend = conn.get_database_backend();
579    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
580    let sql =
581        format!("UPDATE jobs SET status='failed', error={p1}, failed_at={p2} WHERE id = {p3}");
582    let stmt = Statement::from_sql_and_values(
583        backend,
584        &sql,
585        [
586            Value::String(Some(Box::new(error.to_string()))),
587            Value::String(Some(Box::new(Utc::now().to_rfc3339()))),
588            Value::BigInt(Some(id)),
589        ],
590    );
591    conn.execute(stmt).await.map_err(Error::Db)?;
592    Ok(())
593}
594
595/// Reset a job to `pending`, bump its attempt count, and set a new
596/// `available_at` (used by the worker after a retryable failure).
597pub async fn release_job(
598    conn: &DatabaseConnection,
599    id: i64,
600    attempts: u32,
601    available_at: DateTime<Utc>,
602) -> Result<(), Error> {
603    let backend = conn.get_database_backend();
604    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
605    let sql = format!(
606        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL, \
607         attempts={p1}, available_at={p2} WHERE id = {p3}"
608    );
609    let stmt = Statement::from_sql_and_values(
610        backend,
611        &sql,
612        [
613            Value::Int(Some(attempts as i32)),
614            Value::String(Some(Box::new(available_at.to_rfc3339()))),
615            Value::BigInt(Some(id)),
616        ],
617    );
618    conn.execute(stmt).await.map_err(Error::Db)?;
619    Ok(())
620}
621
622/// Reset all jobs claimed by `worker_id` back to `pending` (D-10 shutdown
623/// re-queue — called by the worker loop before the process exits).
624pub async fn requeue_claimed_by(conn: &DatabaseConnection, worker_id: &str) -> Result<(), Error> {
625    let backend = conn.get_database_backend();
626    let p1 = ph(backend, 1);
627    let sql = format!(
628        "UPDATE jobs SET status='pending', claimed_at=NULL, claimed_by=NULL \
629         WHERE status='claimed' AND claimed_by={p1}"
630    );
631    let stmt = Statement::from_sql_and_values(
632        backend,
633        &sql,
634        [Value::String(Some(Box::new(worker_id.to_string())))],
635    );
636    conn.execute(stmt).await.map_err(Error::Db)?;
637    Ok(())
638}
639
640// ---------------------------------------------------------------------------
641// Introspection / stat queries
642// ---------------------------------------------------------------------------
643
644/// Return up to `limit` pending (immediately eligible) jobs in `queue`.
645pub async fn get_pending_jobs(
646    conn: &DatabaseConnection,
647    queue: &str,
648    limit: u64,
649) -> Result<Vec<JobInfo>, Error> {
650    let backend = conn.get_database_backend();
651    let now_iso = Utc::now().to_rfc3339();
652    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
653    let sql = format!(
654        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
655         FROM jobs WHERE status='pending' AND queue={p1} AND available_at <= {p2} \
656         ORDER BY id LIMIT {p3}"
657    );
658    let stmt = Statement::from_sql_and_values(
659        backend,
660        &sql,
661        [
662            Value::String(Some(Box::new(queue.to_string()))),
663            Value::String(Some(Box::new(now_iso))),
664            Value::BigInt(Some(limit as i64)),
665        ],
666    );
667    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
668    rows.iter()
669        .map(|r| parse_job_info(r, JobState::Pending))
670        .collect()
671}
672
673/// Return up to `limit` delayed (not-yet-eligible) jobs in `queue`.
674pub async fn get_delayed_jobs(
675    conn: &DatabaseConnection,
676    queue: &str,
677    limit: u64,
678) -> Result<Vec<JobInfo>, Error> {
679    let backend = conn.get_database_backend();
680    let now_iso = Utc::now().to_rfc3339();
681    let (p1, p2, p3) = (ph(backend, 1), ph(backend, 2), ph(backend, 3));
682    let sql = format!(
683        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at \
684         FROM jobs WHERE status='pending' AND queue={p1} AND available_at > {p2} \
685         ORDER BY id LIMIT {p3}"
686    );
687    let stmt = Statement::from_sql_and_values(
688        backend,
689        &sql,
690        [
691            Value::String(Some(Box::new(queue.to_string()))),
692            Value::String(Some(Box::new(now_iso))),
693            Value::BigInt(Some(limit as i64)),
694        ],
695    );
696    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
697    rows.iter()
698        .map(|r| parse_job_info(r, JobState::Delayed))
699        .collect()
700}
701
702/// Return up to `limit` failed jobs (across all queues).
703pub async fn get_failed_jobs(
704    conn: &DatabaseConnection,
705    limit: u64,
706) -> Result<Vec<FailedJobInfo>, Error> {
707    let backend = conn.get_database_backend();
708    let p1 = ph(backend, 1);
709    // Order by failure time, falling back to created_at for any legacy row that
710    // predates the failed_at column (WR-06).
711    let sql = format!(
712        "SELECT id, job_type, queue, attempts, max_retries, created_at, available_at, \
713         error, failed_at FROM jobs WHERE status='failed' \
714         ORDER BY COALESCE(failed_at, created_at) DESC LIMIT {p1}"
715    );
716    let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(limit as i64))]);
717    let rows = conn.query_all(stmt).await.map_err(Error::Db)?;
718    rows.iter().map(parse_failed_job_info).collect()
719}
720
721/// Return aggregate pending/delayed counts per queue and the total failed count.
722pub async fn get_stats(conn: &DatabaseConnection, queues: &[&str]) -> Result<QueueStats, Error> {
723    let backend = conn.get_database_backend();
724    let now_iso = Utc::now().to_rfc3339();
725    let mut queue_stats = Vec::new();
726
727    for &q in queues {
728        let p1 = ph(backend, 1);
729        let p2 = ph(backend, 2);
730        let p3 = ph(backend, 3);
731        let pending_sql = format!(
732            "SELECT COUNT(*) as cnt FROM jobs \
733             WHERE status='pending' AND queue={p1} AND available_at <= {p2}"
734        );
735        let pending_stmt = Statement::from_sql_and_values(
736            backend,
737            &pending_sql,
738            [
739                Value::String(Some(Box::new(q.to_string()))),
740                Value::String(Some(Box::new(now_iso.clone()))),
741            ],
742        );
743        let pending_row = conn
744            .query_one(pending_stmt)
745            .await
746            .map_err(Error::Db)?
747            .ok_or_else(|| Error::custom("stats: no row returned for pending count"))?;
748        let pending: i64 = pending_row
749            .try_get_by::<i64, _>("cnt")
750            .map_err(|e| Error::custom(format!("stats pending cnt: {e}")))?;
751
752        let delayed_sql = format!(
753            "SELECT COUNT(*) as cnt FROM jobs \
754             WHERE status='pending' AND queue={p1} AND available_at > {p3}"
755        );
756        let delayed_stmt = Statement::from_sql_and_values(
757            backend,
758            &delayed_sql,
759            [
760                Value::String(Some(Box::new(q.to_string()))),
761                Value::String(Some(Box::new(now_iso.clone()))),
762            ],
763        );
764        let delayed_row = conn
765            .query_one(delayed_stmt)
766            .await
767            .map_err(Error::Db)?
768            .ok_or_else(|| Error::custom("stats: no row returned for delayed count"))?;
769        let delayed: i64 = delayed_row
770            .try_get_by::<i64, _>("cnt")
771            .map_err(|e| Error::custom(format!("stats delayed cnt: {e}")))?;
772
773        queue_stats.push(SingleQueueStats {
774            name: q.to_string(),
775            pending: pending as usize,
776            delayed: delayed as usize,
777        });
778    }
779
780    // Total failed across all queues.
781    let failed_sql = "SELECT COUNT(*) as cnt FROM jobs WHERE status='failed'";
782    let failed_stmt = Statement::from_string(backend, failed_sql.to_string());
783    let failed_row = conn
784        .query_one(failed_stmt)
785        .await
786        .map_err(Error::Db)?
787        .ok_or_else(|| Error::custom("stats: no row returned for failed count"))?;
788    let total_failed: i64 = failed_row
789        .try_get_by::<i64, _>("cnt")
790        .map_err(|e| Error::custom(format!("stats failed cnt: {e}")))?;
791
792    Ok(QueueStats {
793        queues: queue_stats,
794        total_failed: total_failed as usize,
795    })
796}
797
798// ---------------------------------------------------------------------------
799// Internal parse helpers for introspection types
800// ---------------------------------------------------------------------------
801
802fn parse_job_info(row: &sea_orm::QueryResult, state: JobState) -> Result<JobInfo, Error> {
803    let id: i64 = row
804        .try_get_by::<i64, _>("id")
805        .map_err(|e| Error::custom(format!("parse id: {e}")))?;
806    let job_type: String = row
807        .try_get_by::<String, _>("job_type")
808        .map_err(|e| Error::custom(format!("parse job_type: {e}")))?;
809    let queue: String = row
810        .try_get_by::<String, _>("queue")
811        .map_err(|e| Error::custom(format!("parse queue: {e}")))?;
812    let attempts: i32 = row
813        .try_get_by::<i32, _>("attempts")
814        .map_err(|e| Error::custom(format!("parse attempts: {e}")))?;
815    let max_retries: i32 = row
816        .try_get_by::<i32, _>("max_retries")
817        .map_err(|e| Error::custom(format!("parse max_retries: {e}")))?;
818    let created_at = parse_timestamp(row, "created_at")?.to_rfc3339();
819    let available_at = parse_timestamp(row, "available_at")?.to_rfc3339();
820
821    Ok(JobInfo {
822        id,
823        job_type,
824        queue,
825        attempts: attempts as u32,
826        max_retries: max_retries as u32,
827        created_at,
828        available_at,
829        state,
830    })
831}
832
833fn parse_failed_job_info(row: &sea_orm::QueryResult) -> Result<FailedJobInfo, Error> {
834    let job = parse_job_info(row, JobState::Failed)?;
835    let error: String = row
836        .try_get_by::<Option<String>, _>("error")
837        .map_err(|e| Error::custom(format!("parse error: {e}")))?
838        .unwrap_or_default();
839    // Prefer the recorded failure time; fall back to created_at for legacy rows
840    // parked before the failed_at column existed (WR-06).
841    let failed_at =
842        parse_optional_timestamp(row, "failed_at")?.unwrap_or(parse_timestamp(row, "created_at")?);
843
844    Ok(FailedJobInfo {
845        job,
846        error,
847        failed_at,
848    })
849}
850
851// ---------------------------------------------------------------------------
852// Tests
853// ---------------------------------------------------------------------------
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858    use sea_orm::Database;
859    use sea_orm_migration::MigratorTrait;
860
861    struct TestMigrator;
862
863    #[async_trait::async_trait]
864    impl MigratorTrait for TestMigrator {
865        fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
866            vec![Box::new(crate::migration::CreateJobsTable)]
867        }
868    }
869
870    /// Spin up an in-memory SQLite DB and run the jobs migration.
871    async fn setup() -> DatabaseConnection {
872        let conn = Database::connect("sqlite::memory:")
873            .await
874            .expect("connect sqlite::memory:");
875        TestMigrator::up(&conn, None)
876            .await
877            .expect("run CreateJobsTable migration");
878        conn
879    }
880
881    /// Insert a job row directly (bypassing enqueue) for test setup.
882    #[allow(clippy::too_many_arguments)]
883    async fn insert_job(
884        conn: &DatabaseConnection,
885        queue: &str,
886        job_type: &str,
887        status: &str,
888        attempts: i32,
889        max_retries: i32,
890        claimed_at: Option<&str>,
891        available_at: &str,
892    ) -> i64 {
893        let now = Utc::now().to_rfc3339();
894        let claimed_at_sql = match claimed_at {
895            Some(ts) => format!("'{ts}'"),
896            None => "NULL".to_string(),
897        };
898        let sql = format!(
899            "INSERT INTO jobs (queue, job_type, payload, status, attempts, max_retries, \
900             available_at, claimed_at, created_at) \
901             VALUES ('{queue}', '{job_type}', '{{}}', '{status}', {attempts}, {max_retries}, \
902             '{available_at}', {claimed_at_sql}, '{now}') \
903             RETURNING id"
904        );
905        let row = conn
906            .query_one(Statement::from_string(DatabaseBackend::Sqlite, sql))
907            .await
908            .expect("insert_job query")
909            .expect("insert_job row");
910        row.try_get_by::<i64, _>("id").expect("insert id")
911    }
912
913    #[tokio::test]
914    async fn claim_returns_pending_job() {
915        let conn = setup().await;
916        let now = Utc::now().to_rfc3339();
917
918        // Enqueue one job via direct INSERT.
919        insert_job(&conn, "default", "MyJob", "pending", 0, 3, None, &now).await;
920
921        // First claim should return the job.
922        let job = claim(&conn, "default", "worker-1")
923            .await
924            .expect("claim failed");
925        assert!(job.is_some(), "expected Some(job), got None");
926        let job = job.unwrap();
927        assert_eq!(job.job_type, "MyJob");
928
929        // Second claim on the same queue should return None (job is now claimed).
930        let second = claim(&conn, "default", "worker-2")
931            .await
932            .expect("second claim failed");
933        assert!(second.is_none(), "second claim should return None");
934    }
935
936    #[tokio::test]
937    async fn idempotency_dedup() {
938        let conn = setup().await;
939        let now = Utc::now().to_rfc3339();
940        let available_at = DateTime::parse_from_rfc3339(&now)
941            .unwrap()
942            .with_timezone(&Utc);
943
944        // Enqueue with an idempotency key twice.
945        enqueue(
946            &conn,
947            "default",
948            "MyJob",
949            "{}",
950            3,
951            Some("key-abc"),
952            None,
953            available_at,
954        )
955        .await
956        .expect("first enqueue");
957        enqueue(
958            &conn,
959            "default",
960            "MyJob",
961            "{}",
962            3,
963            Some("key-abc"),
964            None,
965            available_at,
966        )
967        .await
968        .expect("second enqueue (should be a no-op)");
969
970        // Verify COUNT = 1.
971        let row = conn
972            .query_one(Statement::from_string(
973                DatabaseBackend::Sqlite,
974                "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='MyJob' AND idempotency_key='key-abc'".to_string(),
975            ))
976            .await
977            .expect("count query")
978            .expect("count row");
979        let cnt: i64 = row.try_get_by::<i64, _>("cnt").expect("cnt");
980        assert_eq!(
981            cnt, 1,
982            "idempotency key should deduplicate (expected 1 row)"
983        );
984
985        // Enqueue without idempotency key twice — both must insert.
986        enqueue(
987            &conn,
988            "default",
989            "OtherJob",
990            "{}",
991            3,
992            None,
993            None,
994            available_at,
995        )
996        .await
997        .expect("first plain enqueue");
998        enqueue(
999            &conn,
1000            "default",
1001            "OtherJob",
1002            "{}",
1003            3,
1004            None,
1005            None,
1006            available_at,
1007        )
1008        .await
1009        .expect("second plain enqueue");
1010
1011        let row2 = conn
1012            .query_one(Statement::from_string(
1013                DatabaseBackend::Sqlite,
1014                "SELECT COUNT(*) as cnt FROM jobs WHERE job_type='OtherJob'".to_string(),
1015            ))
1016            .await
1017            .expect("count query 2")
1018            .expect("count row 2");
1019        let cnt2: i64 = row2.try_get_by::<i64, _>("cnt").expect("cnt2");
1020        assert_eq!(cnt2, 2, "plain enqueue should insert both rows");
1021    }
1022
1023    #[tokio::test]
1024    async fn reaper_reclaims_stuck_job() {
1025        let conn = setup().await;
1026
1027        // Insert a claimed job with claimed_at 10 minutes ago, attempts=0, max_retries=3.
1028        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1029        let now = Utc::now().to_rfc3339();
1030        let id = insert_job(
1031            &conn,
1032            "default",
1033            "StuckJob",
1034            "claimed",
1035            0,
1036            3,
1037            Some(&ten_min_ago),
1038            &now,
1039        )
1040        .await;
1041
1042        // Run reaper with 5-minute visibility timeout.
1043        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1044            .await
1045            .expect("reaper failed");
1046
1047        // Verify status = 'pending' and attempts = 1.
1048        let row = conn
1049            .query_one(Statement::from_string(
1050                DatabaseBackend::Sqlite,
1051                format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1052            ))
1053            .await
1054            .expect("select after reaper")
1055            .expect("row after reaper");
1056
1057        let status: String = row.try_get_by::<String, _>("status").expect("status");
1058        let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1059        assert_eq!(
1060            status, "pending",
1061            "reaper should reset stuck job to pending"
1062        );
1063        assert_eq!(attempts, 1, "reaper should increment attempts");
1064    }
1065
1066    #[tokio::test]
1067    async fn reaper_boundary_parks_last_attempt() {
1068        // attempts == max_retries - 1: the in-flight (timed-out) attempt is the
1069        // last allowed, so the reaper must PARK it, not requeue — matching the
1070        // worker's handle_failure boundary (WR-01).
1071        let conn = setup().await;
1072        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1073        let now = Utc::now().to_rfc3339();
1074        let id = insert_job(
1075            &conn,
1076            "default",
1077            "BoundaryJob",
1078            "claimed",
1079            2, // attempts
1080            3, // max_retries
1081            Some(&ten_min_ago),
1082            &now,
1083        )
1084        .await;
1085
1086        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1087            .await
1088            .expect("reaper failed");
1089
1090        let row = conn
1091            .query_one(Statement::from_string(
1092                DatabaseBackend::Sqlite,
1093                format!("SELECT status, attempts FROM jobs WHERE id={id}"),
1094            ))
1095            .await
1096            .expect("select after reaper")
1097            .expect("row");
1098        let status: String = row.try_get_by::<String, _>("status").expect("status");
1099        let attempts: i32 = row.try_get_by::<i32, _>("attempts").expect("attempts");
1100        assert_eq!(
1101            status, "failed",
1102            "job at attempts == max_retries - 1 must be parked by the reaper, not requeued"
1103        );
1104        assert_eq!(
1105            attempts, 2,
1106            "parked job keeps its attempt count (no further requeue)"
1107        );
1108    }
1109
1110    #[tokio::test]
1111    async fn poison_job_parked() {
1112        let conn = setup().await;
1113
1114        // Insert an exhausted job (attempts == max_retries) claimed 10 min ago.
1115        let ten_min_ago = (Utc::now() - chrono::Duration::minutes(10)).to_rfc3339();
1116        let now = Utc::now().to_rfc3339();
1117        let id = insert_job(
1118            &conn,
1119            "default",
1120            "PoisonJob",
1121            "claimed",
1122            3,
1123            3,
1124            Some(&ten_min_ago),
1125            &now,
1126        )
1127        .await;
1128
1129        // Run reaper.
1130        reaper(&conn, "default", std::time::Duration::from_secs(5 * 60))
1131            .await
1132            .expect("reaper failed");
1133
1134        // Verify status = 'failed' and error IS NOT NULL.
1135        let row = conn
1136            .query_one(Statement::from_string(
1137                DatabaseBackend::Sqlite,
1138                format!("SELECT status, error FROM jobs WHERE id={id}"),
1139            ))
1140            .await
1141            .expect("select after reaper")
1142            .expect("row");
1143
1144        let status: String = row.try_get_by::<String, _>("status").expect("status");
1145        let error: Option<String> = row.try_get_by::<Option<String>, _>("error").expect("error");
1146        assert_eq!(status, "failed", "exhausted job should be parked as failed");
1147        assert!(error.is_some(), "failed job should have an error message");
1148
1149        // A fresh pending job should still be claimable (parked row does not block).
1150        let available = Utc::now().to_rfc3339();
1151        insert_job(
1152            &conn, "default", "FreshJob", "pending", 0, 3, None, &available,
1153        )
1154        .await;
1155
1156        let claimed = claim(&conn, "default", "worker-1")
1157            .await
1158            .expect("claim after poison park");
1159        assert!(
1160            claimed.is_some(),
1161            "fresh job should be claimable after poison job is parked"
1162        );
1163        let claimed = claimed.unwrap();
1164        assert_eq!(claimed.job_type, "FreshJob");
1165    }
1166}