rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Startup recovery sweep.
//!
//! A worker that exits ungracefully (SIGKILL, OOM, hardware failure)
//! leaves its in-flight rows in `status = 'running'` with `locked_at`
//! pointing at the crash time. The recovery sweep runs at the next
//! worker startup, looks for rows whose `locked_at` is older than a
//! threshold (default 5 minutes), and resets them to `retrying` so the
//! queue can re-claim them.
//!
//! The threshold is the key tunable: too short and we'll reprocess a
//! still-live job; too long and a crashed worker's row blocks for that
//! duration. See `tradeoffs.md` and `docs/runbook.md` for the operator-
//! facing details.

use std::time::Duration;

use sqlx_postgres::PgPool;
use tracing::info;

use crate::error::JobError;
use crate::queue;

/// Sweep rows whose `locked_at` is older than `threshold` ago. Returns
/// the number of rows reset.
///
/// Logs an `info!` line when the count is non-zero so that operators
/// see crash-recovery activity in normal logs without having to query
/// metrics. A zero-count run is silent to keep the log floor low on
/// clean startups.
pub async fn recover_stale_at_startup(pool: &PgPool, threshold: Duration) -> Result<u64, JobError> {
    // Clamp to a positive i64 because `queue::recover_stale` formats the
    // value into a Postgres `interval` literal that doesn't accept
    // negatives. Overflow would also be wrong — but a duration that
    // long is effectively "never recover", which the recovery sweep
    // should never produce anyway.
    let secs = i64::try_from(threshold.as_secs()).unwrap_or(i64::MAX);
    let count = queue::recover_stale(pool, secs).await?;
    if count > 0 {
        info!(
            count,
            threshold_seconds = secs,
            "recovered stale running jobs from prior shutdown"
        );
    }
    Ok(count)
}