rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Job execution. Two parts:
//!
//! 1. The [`Executor`] trait — the pluggable boundary between this crate
//!    and the per-kind work a consumer wants to do. Implementations
//!    receive a `Job` already claimed by the queue (attempts incremented,
//!    `locked_by` set) and return one of three outcomes: succeed, fail
//!    (with an error message that lands in `last_error`), or observe a
//!    cooperative cancel request mid-execution.
//!
//! 2. [`SimulatedExecutor`] — a built-in implementation that does
//!    deterministic fake work for the four demo job kinds. Useful for the
//!    test suite, the embedding example, and `docker compose up` demos.
//!    Real consumers replace this with their own `Executor`.
//!
//! ## Determinism
//!
//! The simulator's pass/fail outcome at each sub-step is derived from a
//! hash of `(job_id, attempt, step_idx)`. This produces the same draw
//! for the same input every time the executor sees it; combined with the
//! "retries always succeed at `attempts >= 2`" rule, the
//! queued → running → failed → retrying → running → succeeded path is
//! reproducible in tests without flakiness.

use std::time::Duration;

use async_trait::async_trait;
use sqlx_postgres::PgPool;
use tokio_util::sync::CancellationToken;

use crate::domain::{Job, JobKind};
use crate::queue;

/// Per-execution context passed to the executor.
///
/// Holds the shared `PgPool` (so the executor can write its own
/// auxiliary rows transactionally, e.g., to a dedupe table), the
/// process-wide [`CancellationToken`] (so a long-running executor can
/// observe shutdown via `ctx.shutdown.is_cancelled()`), and the
/// `worker_id` for log correlation.
#[derive(Debug, Clone)]
pub struct ExecutionContext {
    pub pool: PgPool,
    pub shutdown: CancellationToken,
    pub worker_id: String,
}

/// Outcome of one `Executor::execute` call.
///
/// The runtime maps each variant to a queue transition:
/// `Succeeded → mark_succeeded`, `Failed → mark_failed_or_retry`,
/// `Cancelled → finalize_cancelled`.
#[derive(Debug)]
pub enum ExecutionOutcome {
    /// Job completed successfully. Transition to `succeeded`.
    Succeeded,
    /// Job failed. The message lands in `last_error`. The queue's retry
    /// policy decides whether the next disposition is `retrying` or
    /// `failed_permanent` based on `attempts >= max_attempts`.
    Failed(String),
    /// Executor observed `cancel_requested = TRUE` mid-execution.
    /// Transition to `cancelled`.
    Cancelled,
}

/// The pluggable boundary. Implementors do the per-kind work; the queue
/// machinery handles claim, retry, cancel propagation, and recovery.
///
/// Implementors are expected to:
/// - Periodically check `queue::get(&ctx.pool, job.id).await` to see if
///   `cancel_requested` was set after they started running, and return
///   [`ExecutionOutcome::Cancelled`] when so.
/// - Optionally observe `ctx.shutdown.is_cancelled()` if they want to
///   respond to process shutdown by returning before completion. The
///   default is to ignore this and let the runtime's shutdown grace
///   period handle long jobs.
/// - Return [`ExecutionOutcome::Failed`] for any application error; the
///   queue's retry policy takes care of whether that's a retryable
///   failure or terminal.
#[async_trait]
pub trait Executor: Send + Sync + 'static {
    /// Execute one claimed job. The job's `attempts` field has already
    /// been incremented at dequeue time, so attempt 1 corresponds to
    /// `job.attempts == 1`.
    async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome;
}

/// Built-in deterministic simulator covering the four demo job kinds.
///
/// Each kind has a fixed plan: a sequence of "sub-steps", each with a
/// sleep duration and a probability of failing. The simulator walks the
/// plan, sleeping at each step and rolling against the failure
/// probability using a deterministic hash draw. Retries (attempt >= 2)
/// always succeed — the simulator is designed for reproducible
/// "fail-then-retry-then-succeed" demonstrations, not for stress-testing
/// the retry policy.
#[derive(Debug, Default, Clone, Copy)]
pub struct SimulatedExecutor;

/// One sub-step in a simulated job's plan: how long to sleep, and the
/// probability of failing at this step on the first attempt.
#[derive(Clone, Copy)]
struct Step {
    sleep_ms: u64,
    fail_prob: f64,
}

/// The per-kind execution plan. Returning `&'static` keeps the plan
/// allocation-free and trivially copyable.
///
/// The plans are tuned so that:
/// - `SendEmail` has a moderate failure rate concentrated at one
///   "send" step (45% on attempt 1), modelling a real transient-failure
///   path.
/// - `ResizeImage` has an "fetch source" step that occasionally fails
///   (25% on attempt 1), with the "resize" step itself never failing.
/// - `SummarizeText` is the simplest: one step, 15% failure on attempt 1.
/// - `WebhookDelivery` models the flakiest external dependency: 35%
///   then 15% across two steps.
///
/// All probabilities apply only on attempt 1; subsequent attempts always
/// succeed (see [`SimulatedExecutor::execute`]).
fn steps(kind: JobKind) -> &'static [Step] {
    match kind {
        JobKind::SendEmail => &[
            Step {
                sleep_ms: 50,
                fail_prob: 0.0,
            },
            Step {
                sleep_ms: 50,
                fail_prob: 0.45,
            },
            Step {
                sleep_ms: 50,
                fail_prob: 0.0,
            },
        ],
        JobKind::ResizeImage => &[
            Step {
                sleep_ms: 100,
                fail_prob: 0.25,
            },
            Step {
                sleep_ms: 200,
                fail_prob: 0.0,
            },
        ],
        JobKind::SummarizeText => &[Step {
            sleep_ms: 150,
            fail_prob: 0.15,
        }],
        JobKind::WebhookDelivery => &[
            Step {
                sleep_ms: 80,
                fail_prob: 0.35,
            },
            Step {
                sleep_ms: 80,
                fail_prob: 0.15,
            },
        ],
    }
}

/// Deterministic pseudo-random draw in `[0, 1)` from `(job_id, attempt,
/// step_idx)`.
///
/// Implementation note: we use `DefaultHasher` directly rather than a
/// proper PRNG seeded with the inputs. The reason is honest: we don't
/// need cryptographic quality, we don't need a stateful PRNG, and a
/// pure hash function is the simplest thing that gives us reproducible
/// per-input draws. The top 53 bits of the hash are reinterpreted as
/// an f64 mantissa, yielding a value in `[0, 1)`.
///
/// Not cryptographic. Not statistically rigorous. Not suitable for
/// anything other than this simulator's purpose.
fn draw(job_id: uuid::Uuid, attempt: i32, step_idx: usize) -> f64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut h = DefaultHasher::new();
    job_id.hash(&mut h);
    attempt.hash(&mut h);
    step_idx.hash(&mut h);
    let v = h.finish();
    // Top 53 bits → f64 in [0, 1). The IEEE 754 double mantissa is 52
    // bits + an implicit leading 1, giving 53 bits of precision.
    ((v >> 11) as f64) / ((1u64 << 53) as f64)
}

#[async_trait]
impl Executor for SimulatedExecutor {
    async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
        let plan = steps(job.kind);
        for (i, step) in plan.iter().enumerate() {
            // Cooperative cancellation: check the DB flag between
            // sub-steps. We don't abort mid-sleep; the granularity is
            // intentionally per-step so the executor's invariants
            // (e.g., a step that does a transactional write) are
            // honoured. Process-wide shutdown is handled by the runtime
            // separately.
            match queue::get(&ctx.pool, job.id).await {
                Ok(Some(j)) if j.cancel_requested => return ExecutionOutcome::Cancelled,
                Ok(_) => {}
                Err(e) => {
                    // A DB error reading our own row is rare but
                    // possible (connection drop, transient pool
                    // exhaustion). Treat it as a job failure rather than
                    // an executor crash — the runtime will then go
                    // through the retry policy.
                    return ExecutionOutcome::Failed(format!(
                        "executor: db error checking cancel flag: {e}"
                    ));
                }
            }

            tokio::time::sleep(Duration::from_millis(step.sleep_ms)).await;

            // The "retries always succeed" rule. This makes the
            // queued → running → failed → retrying → running → succeeded
            // path deterministic for tests. It also means the simulator
            // is NOT a good way to test the `failed_permanent` path —
            // for that, use a custom executor that always fails (see
            // `tests/retry_and_cancel.rs::always_failing_lands_in_failed_permanent_after_max_attempts`).
            let effective_fail_prob = if job.attempts >= 2 {
                0.0
            } else {
                step.fail_prob
            };
            if draw(job.id.as_uuid(), job.attempts, i) < effective_fail_prob {
                return ExecutionOutcome::Failed(format!(
                    "simulated failure at step {}/{} for kind={}",
                    i + 1,
                    plan.len(),
                    job.kind.as_str()
                ));
            }
        }
        ExecutionOutcome::Succeeded
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use uuid::Uuid;

    #[test]
    fn draw_is_deterministic() {
        // The same `(id, attempt, step)` triple must always produce the
        // same draw. Without this, the simulator's "fail-on-attempt-1,
        // succeed-on-attempt-2" promise would be flaky.
        let id = Uuid::now_v7();
        let a = draw(id, 1, 0);
        let b = draw(id, 1, 0);
        assert_eq!(a, b);
    }

    #[test]
    fn draw_is_in_unit_interval() {
        // Output domain check: every draw should land in [0, 1) so the
        // < fail_prob comparison is meaningful.
        let id = Uuid::now_v7();
        for attempt in 0..5 {
            for step in 0..5 {
                let v = draw(id, attempt, step);
                assert!((0.0..1.0).contains(&v), "got {v}");
            }
        }
    }

    #[test]
    fn draw_varies_with_attempt() {
        // A weak independence check: changing `attempt` should change
        // the draw. Without this property, a job that fails at step 0
        // on attempt 1 would also fail at step 0 on attempt 2 — making
        // retries useless even when fail_prob is 0 on attempt >= 2.
        // (The retry-success path doesn't actually depend on this
        // property because we short-circuit to fail_prob = 0, but it's a
        // sanity check on the hash mixing.)
        let id = Uuid::now_v7();
        let a1 = draw(id, 1, 0);
        let a2 = draw(id, 2, 0);
        assert_ne!(
            a1, a2,
            "expected different attempts to produce different draws"
        );
    }
}