rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Worker pool with cooperative graceful shutdown.
//!
//! [`WorkerRuntime`] spawns N task-pool workers, each running the loop in
//! `worker_loop`. Each worker independently polls the queue, processes
//! claimed jobs through the supplied [`Executor`], and updates the job's
//! final state. The shared [`CancellationToken`] is the shutdown channel:
//! when it flips, each loop exits at its next iteration boundary (between
//! jobs, never mid-job). A bounded grace period gives in-flight jobs
//! time to finish naturally before any straggler tasks are aborted.
//!
//! Why these design choices:
//!
//! - **Per-task loop, not a shared queue of futures.** Each worker
//!   independently calls `fetch_next`. Postgres's SKIP LOCKED guarantees
//!   the N workers see disjoint rows; building our own work-stealing
//!   queue on top would add complexity without changing the contract.
//!
//! - **Shutdown observed between jobs, not mid-job.** Async tasks
//!   cannot be preempted safely. Killing a task mid-job would orphan its
//!   in-flight side effects. We let each job finish and observe the
//!   shutdown signal at the top of the loop.
//!
//! - **Hard timeout, then abort.** A job that legitimately needs longer
//!   than the grace period (default 30 s) gets aborted; the row stays in
//!   `running` and is reclaimed by the next worker's startup recovery
//!   sweep. The trade-off is that an in-flight long job is processed
//!   twice (once partly, then fully on restart) rather than blocking
//!   shutdown indefinitely.
//!
//! - **Metrics emitted around each `execute()` call.** A counter pair
//!   (`started_total`, `completed_total`) gives in-flight count by
//!   subtraction; a histogram on duration gives latency distribution.

use std::sync::Arc;
use std::time::Duration;

use sqlx_postgres::PgPool;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{info, info_span, warn, Instrument};

use crate::queue;
use crate::worker::executor::{ExecutionContext, ExecutionOutcome, Executor};

/// A configured worker pool. Call [`WorkerRuntime::run`] to start it.
///
/// Use the builder methods (`with_*`) to override defaults; defaults are
/// chosen to be reasonable for a typical deployment but tunable for
/// specific workloads:
///
/// | Setting | Default | When to change |
/// |---|---|---|
/// | `concurrency` | 4 | Up: many small jobs. Down: jobs do heavy CPU/RAM work and contend with each other. |
/// | `poll_interval` | 500 ms | Smaller: tail-latency sensitive. Larger: reduce DB load on idle queues. |
/// | `max_poll_interval` | 2 s | Cap on the idle backoff growth. |
/// | `shutdown_grace` | 30 s | Increase for workloads with legitimately long jobs. |
pub struct WorkerRuntime {
    pool: PgPool,
    executor: Arc<dyn Executor>,
    concurrency: usize,
    poll_interval: Duration,
    max_poll_interval: Duration,
    worker_id_prefix: String,
    shutdown_grace: Duration,
}

impl WorkerRuntime {
    /// Construct a runtime with reasonable defaults. The pool and
    /// executor are required; everything else has a default that the
    /// builder methods can override.
    pub fn new(pool: PgPool, executor: Arc<dyn Executor>) -> Self {
        Self {
            pool,
            executor,
            concurrency: 4,
            poll_interval: Duration::from_millis(500),
            max_poll_interval: Duration::from_secs(2),
            worker_id_prefix: "worker".into(),
            shutdown_grace: Duration::from_secs(30),
        }
    }

    /// Override the worker count. Clamped to at least 1 — zero workers
    /// would be a silent no-op, which is rarely what a caller wants.
    pub fn with_concurrency(mut self, n: usize) -> Self {
        self.concurrency = n.max(1);
        self
    }

    /// Override the idle poll interval and its growth ceiling. `base` is
    /// the wait between polls when the queue is empty; this doubles up to
    /// `max` on each empty poll, then resets to `base` after a successful
    /// claim. The doubling avoids hammering an empty queue.
    pub fn with_poll_interval(mut self, base: Duration, max: Duration) -> Self {
        self.poll_interval = base;
        self.max_poll_interval = max.max(base);
        self
    }

    /// Override the prefix used for the `worker_id` written into the
    /// `jobs.locked_by` column. Useful for distinguishing different
    /// worker fleets in logs (e.g., `"web-worker"` vs `"batch-worker"`).
    /// The full id is `"{prefix}-{n}"` where n is the worker's index.
    pub fn with_worker_id_prefix(mut self, prefix: impl Into<String>) -> Self {
        self.worker_id_prefix = prefix.into();
        self
    }

    /// Override the post-shutdown grace period for in-flight jobs.
    ///
    /// After the runtime observes cancellation, in-flight jobs are given
    /// up to this much time to complete before remaining tasks are
    /// hard-aborted. Aborted jobs leave their rows in `running`; the
    /// next worker's startup recovery sweep picks them up.
    pub fn with_shutdown_grace(mut self, grace: Duration) -> Self {
        self.shutdown_grace = grace;
        self
    }

    /// Start the worker pool and block until shutdown completes.
    ///
    /// `cancel` is the shutdown channel. The runtime spawns `concurrency`
    /// worker tasks, each running `worker_loop`. The function returns
    /// only after all workers have exited (either naturally within the
    /// grace period, or after being aborted).
    pub async fn run(self, cancel: CancellationToken) {
        let mut set = JoinSet::new();
        for n in 0..self.concurrency {
            // Each worker gets its own clones of the shared handles. The
            // PgPool is internally an Arc, so cloning is cheap.
            let worker_id = format!("{}-{n}", self.worker_id_prefix);
            let pool = self.pool.clone();
            let executor = self.executor.clone();
            let token = cancel.clone();
            let base = self.poll_interval;
            let max_iv = self.max_poll_interval;
            set.spawn(
                async move { worker_loop(worker_id, pool, executor, token, base, max_iv).await },
            );
        }

        // Block until shutdown is requested. The worker loops
        // independently observe the same token and exit between jobs;
        // this `cancelled().await` just keeps `run()` parked until then.
        cancel.cancelled().await;
        info!(
            grace_seconds = self.shutdown_grace.as_secs(),
            "shutdown requested; waiting for in-flight jobs"
        );

        // Drain the JoinSet under a timeout. Workers exit naturally as
        // their current job finishes; anything still running when the
        // grace period expires is aborted.
        let drain = async { while set.join_next().await.is_some() {} };
        match tokio::time::timeout(self.shutdown_grace, drain).await {
            Ok(()) => info!("all workers exited within grace period"),
            Err(_) => {
                warn!("shutdown grace period expired; aborting remaining workers");
                set.abort_all();
                // Drain the now-cancelled tasks so `run` doesn't return
                // with handles still in the JoinSet.
                while set.join_next().await.is_some() {}
            }
        }
    }
}

/// One worker's main loop: poll the queue, process a job if available,
/// emit metrics, repeat until shutdown.
///
/// The loop has three branches per iteration:
///
/// 1. `fetch_next` returned a job → call the executor, transition the
///    row to its terminal state, emit metrics, reset the idle backoff
///    so the next poll happens immediately.
/// 2. `fetch_next` returned no job → sleep (with exponential backoff up
///    to `max_interval`), then poll again.
/// 3. `fetch_next` errored → log a warning, sleep at the max interval,
///    then retry. Transient DB errors should self-heal; persistent ones
///    will surface as a steady stream of warnings.
async fn worker_loop(
    worker_id: String,
    pool: PgPool,
    executor: Arc<dyn Executor>,
    cancel: CancellationToken,
    base_interval: Duration,
    max_interval: Duration,
) {
    let mut idle_backoff = base_interval;
    loop {
        // Shutdown check at the *top* of the loop, not the bottom — we
        // want to exit *before* the next dequeue, not after we've claimed
        // a job we then can't finish.
        if cancel.is_cancelled() {
            break;
        }
        match queue::fetch_next(&pool, &worker_id).await {
            Ok(Some(job)) => {
                // We claimed a job. Reset the idle backoff so the next
                // empty poll starts from the base interval again.
                idle_backoff = base_interval;

                let span = info_span!(
                    "job",
                    job_id = %job.id,
                    kind = job.kind.as_str(),
                    attempt = job.attempts,
                    worker_id = %worker_id
                );
                let ctx = ExecutionContext {
                    pool: pool.clone(),
                    shutdown: cancel.clone(),
                    worker_id: worker_id.clone(),
                };
                let job_id = job.id;
                let kind_label = job.kind.as_str();

                // `started_total` lets operators derive in-flight count
                // by subtracting `completed_total`.
                metrics::counter!("worker_jobs_started_total", "kind" => kind_label).increment(1);

                let started = std::time::Instant::now();
                let outcome = executor.execute(&ctx, &job).instrument(span).await;
                let elapsed = started.elapsed().as_secs_f64();

                // Compute the durable outcome label for metrics. The
                // `Failed` arm splits into `retrying` vs `failed_permanent`
                // based on `mark_failed_or_retry`'s returned row — we
                // can't know the disposition without consulting it.
                let outcome_label: &'static str = match outcome {
                    ExecutionOutcome::Succeeded => {
                        if let Err(e) = queue::mark_succeeded(&pool, job_id).await {
                            // Possible if a concurrent cancel landed
                            // before this mark. Surface as a warn and
                            // continue — the row's state is durable
                            // either way.
                            warn!(error = %e, %job_id, "failed to mark succeeded");
                        }
                        "succeeded"
                    }
                    ExecutionOutcome::Failed(msg) => {
                        match queue::mark_failed_or_retry(&pool, job_id, &msg).await {
                            Ok(updated) => {
                                if updated.status == crate::JobStatus::FailedPermanent {
                                    "failed_permanent"
                                } else {
                                    "retrying"
                                }
                            }
                            Err(e) => {
                                warn!(error = %e, %job_id, "failed to mark failed_or_retry");
                                "error"
                            }
                        }
                    }
                    ExecutionOutcome::Cancelled => {
                        if let Err(e) = queue::finalize_cancelled(&pool, job_id).await {
                            warn!(error = %e, %job_id, "failed to finalize cancelled");
                        }
                        "cancelled"
                    }
                };

                metrics::counter!(
                    "worker_jobs_completed_total",
                    "kind" => kind_label,
                    "outcome" => outcome_label,
                )
                .increment(1);
                metrics::histogram!(
                    "worker_job_duration_seconds",
                    "kind" => kind_label,
                    "outcome" => outcome_label,
                )
                .record(elapsed);
            }
            Ok(None) => {
                // No work available. Sleep, but allow shutdown to wake
                // us early (`select!` resolves on whichever future
                // completes first).
                tokio::select! {
                    _ = cancel.cancelled() => break,
                    _ = sleep(idle_backoff) => {}
                }
                // Exponential backoff while the queue stays empty.
                idle_backoff = (idle_backoff * 2).min(max_interval);
            }
            Err(e) => {
                // Transient DB error (network, pool exhaustion, etc.).
                // Back off at the max interval — no point hammering a
                // broken connection — and try again.
                warn!(error = %e, "fetch_next error; backing off");
                tokio::select! {
                    _ = cancel.cancelled() => break,
                    _ = sleep(max_interval) => {}
                }
            }
        }
    }
    info!(worker_id, "worker loop exited");
}