rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Handlers for the `/jobs` routes.
//!
//! Four endpoints:
//!
//! - `POST /jobs` — enqueue (with optional `Idempotency-Key` header)
//! - `GET /jobs/{id}` — fetch one
//! - `GET /jobs` — paginated list
//! - `POST /jobs/{id}/cancel` — request cancellation
//!
//! Every handler delegates the actual work to [`crate::queue`]; this
//! file's job is to translate HTTP shapes into queue inputs and queue
//! outcomes into HTTP responses. The translations are intentionally
//! thin so the queue module remains the single place where
//! correctness-sensitive logic lives.

use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::Json;

use crate::api::dto::{CancelResponse, CreateJobRequest, JobResponse, ListJobsQuery};
use crate::api::error::ApiError;
use crate::api::AppState;
use crate::domain::NewJob;
use crate::ids::JobId;
use crate::queue::{self, CancelOutcome, ListFilter};

/// Idempotency-Key header name. Lowercased because HTTP/2 normalises
/// headers to lowercase; matching case-insensitively here keeps the
/// behaviour predictable across HTTP versions.
const IDEMPOTENCY_HEADER: &str = "idempotency-key";

/// `POST /jobs` — enqueue a new job.
///
/// Idempotency: if the `Idempotency-Key` header (or, as a fallback, the
/// `idempotency_key` field in the body) matches an existing row, return
/// the existing row with HTTP 200 instead of inserting. New inserts
/// return HTTP 201.
///
/// The header is preferred over the body field per the Stripe API
/// convention. Stripe documents this explicitly and clients in the
/// wild are written against it; matching the same precedence avoids
/// surprising consumers.
#[utoipa::path(
    post,
    path = "/jobs",
    request_body = CreateJobRequest,
    responses(
        (status = 201, description = "Job created", body = JobResponse),
        (status = 200, description = "Existing job returned via idempotency key", body = JobResponse),
        (status = 422, description = "Payload validation failed", body = crate::api::dto::ErrorBody),
        (status = 500, description = "Internal error", body = crate::api::dto::ErrorBody),
    ),
    tag = "jobs",
)]
pub async fn create_job(
    State(state): State<AppState>,
    headers: HeaderMap,
    Json(body): Json<CreateJobRequest>,
) -> Result<(StatusCode, Json<JobResponse>), ApiError> {
    // Header takes precedence; only fall through to the body field if
    // the header is absent. `.or` is short-circuit so this is one
    // line.
    let header_key = headers
        .get(IDEMPOTENCY_HEADER)
        .and_then(|v| v.to_str().ok())
        .map(|s| s.to_string());

    let idempotency_key = header_key.or(body.idempotency_key);

    let new = NewJob {
        kind: body.kind,
        payload: body.payload,
        max_attempts: body.max_attempts,
        idempotency_key,
    };

    let outcome = queue::enqueue(&state.pool, new).await?;
    // 201 for new inserts, 200 for idempotent-hit returns. RESTful
    // convention: 201 means "I created something new at the location
    // identified by the response"; 200 just means "here's the resource".
    let status = if outcome.is_new() {
        StatusCode::CREATED
    } else {
        StatusCode::OK
    };
    Ok((status, Json(JobResponse::from(outcome.job().clone()))))
}

/// `GET /jobs/{id}` — fetch a single job by id.
#[utoipa::path(
    get,
    path = "/jobs/{id}",
    params(("id" = String, Path, description = "Job UUID")),
    responses(
        (status = 200, description = "Job representation", body = JobResponse),
        (status = 404, description = "Job not found", body = crate::api::dto::ErrorBody),
    ),
    tag = "jobs",
)]
pub async fn get_job(
    State(state): State<AppState>,
    Path(id): Path<uuid::Uuid>,
) -> Result<Json<JobResponse>, ApiError> {
    let job = queue::get(&state.pool, JobId::from_uuid(id))
        .await?
        .ok_or(ApiError::NotFound)?;
    Ok(Json(JobResponse::from(job)))
}

/// `GET /jobs` — paginated list, reverse-chronological, filterable by
/// `status` and `kind`.
///
/// Defaults: `limit = 50`, `offset = 0`. `limit` is clamped to
/// `[1, 200]` inside [`crate::queue::list`].
#[utoipa::path(
    get,
    path = "/jobs",
    params(ListJobsQuery),
    responses(
        (status = 200, description = "Paginated job list (reverse chronological)", body = [JobResponse]),
    ),
    tag = "jobs",
)]
pub async fn list_jobs(
    State(state): State<AppState>,
    Query(q): Query<ListJobsQuery>,
) -> Result<Json<Vec<JobResponse>>, ApiError> {
    let filter = ListFilter {
        status: q.status,
        kind: q.kind,
        limit: q.limit.unwrap_or(50),
        offset: q.offset.unwrap_or(0),
    };
    let jobs = queue::list(&state.pool, filter).await?;
    Ok(Json(jobs.into_iter().map(JobResponse::from).collect()))
}

/// `POST /jobs/{id}/cancel` — request cancellation.
///
/// Outcome mapping:
///
/// | [`CancelOutcome`] | HTTP status |
/// |---|---|
/// | `CancelledNow` (was queued/retrying) | 200 |
/// | `PendingOnWorker` (was running; flag set, worker will observe) | 202 |
/// | `AlreadyTerminal(_)` (succeeded/failed_permanent/cancelled) | 409 |
///
/// Not-found is propagated automatically: [`crate::queue::request_cancel`]
/// returns `JobError::NotFound`, which maps to `ApiError::NotFound` →
/// HTTP 404.
#[utoipa::path(
    post,
    path = "/jobs/{id}/cancel",
    params(("id" = String, Path, description = "Job UUID")),
    responses(
        (status = 200, description = "Cancelled immediately (was queued/retrying)", body = CancelResponse),
        (status = 202, description = "Cancel signalled to running worker", body = CancelResponse),
        (status = 404, description = "Job not found", body = crate::api::dto::ErrorBody),
        (status = 409, description = "Job already in terminal state", body = crate::api::dto::ErrorBody),
    ),
    tag = "jobs",
)]
pub async fn cancel_job(
    State(state): State<AppState>,
    Path(id): Path<uuid::Uuid>,
) -> Result<(StatusCode, Json<CancelResponse>), ApiError> {
    let job_id = JobId::from_uuid(id);
    let outcome = queue::request_cancel(&state.pool, job_id).await?;
    let (status, slug) = match outcome {
        CancelOutcome::CancelledNow => (StatusCode::OK, "cancelled"),
        CancelOutcome::PendingOnWorker => (StatusCode::ACCEPTED, "pending"),
        CancelOutcome::AlreadyTerminal(s) => {
            // Terminal jobs cannot be cancelled. Return 409 with a
            // useful message saying which terminal state the job is in.
            return Err(ApiError::Conflict(format!(
                "job is already in terminal state {}",
                s.as_str()
            )));
        }
    };
    Ok((
        status,
        Json(CancelResponse {
            status: slug,
            job_id,
        }),
    ))
}