rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Domain types: the data model the rest of the crate is built around.
//!
//! Every type here corresponds directly to a column or column-set in the
//! `jobs` table. The same types are also used in the HTTP layer (via
//! `utoipa::ToSchema` for OpenAPI docs and serde for JSON), which keeps
//! the public contract identical to the internal model.
//!
//! Why this works at all layers without separate DTOs: the queue's
//! external interface (HTTP) and internal interface (worker) want the
//! same fields. The HTTP-only shapes (validation envelopes, error
//! bodies, idempotency-key header handling) live in `crate::api::dto`.

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{from_row::FromRow, row::Row};
use sqlx_postgres::PgRow;
use std::{error::Error, fmt, str::FromStr};
use uuid::Uuid;

use crate::ids::JobId;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseJobKindError(String);

impl fmt::Display for ParseJobKindError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "unknown job kind `{}`", self.0)
    }
}

impl Error for ParseJobKindError {}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseJobStatusError(String);

impl fmt::Display for ParseJobStatusError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "unknown job status `{}`", self.0)
    }
}

impl Error for ParseJobStatusError {}

/// One of the four built-in job kinds. The Postgres column type is a
/// matching `job_kind` ENUM. SQL statements bind/read its snake_case
/// string form at the database boundary.
///
/// Adding a new kind requires three coordinated changes: a new variant
/// here, the `ALTER TYPE` in a new migration, and a payload validator in
/// [`crate::payload`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum JobKind {
    SendEmail,
    ResizeImage,
    SummarizeText,
    WebhookDelivery,
}

impl JobKind {
    /// The snake_case slug matching the Postgres ENUM label and the
    /// serde representation. Useful as a metric label without paying for
    /// an allocation.
    pub fn as_str(self) -> &'static str {
        match self {
            JobKind::SendEmail => "send_email",
            JobKind::ResizeImage => "resize_image",
            JobKind::SummarizeText => "summarize_text",
            JobKind::WebhookDelivery => "webhook_delivery",
        }
    }
}

impl FromStr for JobKind {
    type Err = ParseJobKindError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "send_email" => Ok(JobKind::SendEmail),
            "resize_image" => Ok(JobKind::ResizeImage),
            "summarize_text" => Ok(JobKind::SummarizeText),
            "webhook_delivery" => Ok(JobKind::WebhookDelivery),
            other => Err(ParseJobKindError(other.to_string())),
        }
    }
}

/// The lifecycle status of a job.
///
/// The state machine:
///
/// ```text
///                ┌─────────┐
///   enqueue ───► │ queued  │
///                └────┬────┘
///                     │ dequeue (SKIP LOCKED)
//////                ┌─────────┐
///                │ running │
///                └────┬────┘
///       ┌─────────────┼─────────────┬─────────────┐
///       │             │             │             │
///       ▼             ▼             ▼             ▼
///  ┌─────────┐  ┌──────────┐  ┌──────────────┐  ┌──────────┐
///  │succeeded│  │ retrying │  │failed_       │  │cancelled │
///  └─────────┘  └────┬─────┘  │   permanent  │  └──────────┘
///                    │        └──────────────┘
///                    └─► dequeue (when run_at <= now()) ─► running
/// ```
///
/// `queued` and `retrying` can also transition directly to `cancelled`
/// via `request_cancel` (before a worker has claimed them).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum JobStatus {
    /// Newly created, awaiting a worker.
    Queued,
    /// Claimed by a worker via SKIP LOCKED; currently executing.
    Running,
    /// Terminal: worker reported success.
    Succeeded,
    /// Worker reported failure; rescheduled for a future attempt
    /// (`run_at` is set to `now + backoff`).
    Retrying,
    /// Terminal: exhausted `max_attempts`; will not be retried again.
    FailedPermanent,
    /// Terminal: was queued/retrying and the user cancelled, OR was
    /// running and the worker observed the `cancel_requested` flag.
    Cancelled,
}

impl JobStatus {
    /// `true` if no further state transitions will occur for a job in
    /// this status. Used by `request_cancel` to short-circuit on
    /// already-terminal jobs.
    pub fn is_terminal(self) -> bool {
        matches!(
            self,
            JobStatus::Succeeded | JobStatus::FailedPermanent | JobStatus::Cancelled
        )
    }

    /// The snake_case slug matching the Postgres ENUM label and the
    /// serde representation. Useful as a metric label.
    pub fn as_str(self) -> &'static str {
        match self {
            JobStatus::Queued => "queued",
            JobStatus::Running => "running",
            JobStatus::Succeeded => "succeeded",
            JobStatus::Retrying => "retrying",
            JobStatus::FailedPermanent => "failed_permanent",
            JobStatus::Cancelled => "cancelled",
        }
    }
}

impl FromStr for JobStatus {
    type Err = ParseJobStatusError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "queued" => Ok(JobStatus::Queued),
            "running" => Ok(JobStatus::Running),
            "succeeded" => Ok(JobStatus::Succeeded),
            "retrying" => Ok(JobStatus::Retrying),
            "failed_permanent" => Ok(JobStatus::FailedPermanent),
            "cancelled" => Ok(JobStatus::Cancelled),
            other => Err(ParseJobStatusError(other.to_string())),
        }
    }
}

/// A full job row, exactly as stored in Postgres.
///
/// Every field maps 1:1 to a column in the `jobs` table. `sqlx::FromRow`
/// is derived so that `SELECT * FROM jobs WHERE ...` returns rows that
/// deserialise into this struct through the custom `FromRow` impl below.
///
/// Fields worth a note:
///
/// - `payload` is `serde_json::Value` because the queue itself is
///   payload-agnostic — see [`crate::payload`] for the kind-specific
///   typed shapes used during validation and inside executors.
/// - `last_error` is `Option<String>` and is overwritten on every
///   retry. The history of errors is not preserved; see `tradeoffs.md`
///   for the rationale.
/// - `locked_at` and `locked_by` are diagnostic (the recovery sweep
///   uses `locked_at`; logs use `locked_by`); they are not required for
///   correctness because SKIP LOCKED provides the actual claim.
/// - `cancel_requested` is the cooperative-cancellation flag observed
///   by executors between sub-steps.
/// - `idempotency_key` has a partial unique index — see
///   `migrations/0001_init.sql`.
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct Job {
    pub id: JobId,
    pub kind: JobKind,
    #[schema(value_type = serde_json::Value)]
    pub payload: serde_json::Value,
    pub status: JobStatus,
    pub attempts: i32,
    pub max_attempts: i32,
    pub last_error: Option<String>,
    pub run_at: DateTime<Utc>,
    pub locked_at: Option<DateTime<Utc>>,
    pub locked_by: Option<String>,
    pub cancel_requested: bool,
    pub idempotency_key: Option<String>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

impl<'r> FromRow<'r, PgRow> for Job {
    fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
        let kind_raw: String = row.try_get("kind")?;
        let status_raw: String = row.try_get("status")?;
        let kind = JobKind::from_str(&kind_raw).map_err(|e| sqlx::Error::ColumnDecode {
            index: "kind".into(),
            source: Box::new(e),
        })?;
        let status = JobStatus::from_str(&status_raw).map_err(|e| sqlx::Error::ColumnDecode {
            index: "status".into(),
            source: Box::new(e),
        })?;

        Ok(Self {
            id: JobId(row.try_get::<Uuid, _>("id")?),
            kind,
            payload: row.try_get("payload")?,
            status,
            attempts: row.try_get("attempts")?,
            max_attempts: row.try_get("max_attempts")?,
            last_error: row.try_get("last_error")?,
            run_at: row.try_get("run_at")?,
            locked_at: row.try_get("locked_at")?,
            locked_by: row.try_get("locked_by")?,
            cancel_requested: row.try_get("cancel_requested")?,
            idempotency_key: row.try_get("idempotency_key")?,
            created_at: row.try_get("created_at")?,
            updated_at: row.try_get("updated_at")?,
        })
    }
}

/// Input to [`crate::queue::enqueue`].
///
/// Only the kind and payload are required; `max_attempts` defaults to
/// 3 and `idempotency_key` is optional (when absent, every call creates
/// a new row). The serde `default` attributes mean a request body can
/// omit the optional fields entirely.
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct NewJob {
    pub kind: JobKind,
    #[schema(value_type = serde_json::Value)]
    pub payload: serde_json::Value,
    #[serde(default)]
    pub max_attempts: Option<i32>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub idempotency_key: Option<String>,
}

/// Coarse-grained outcome label for metrics and reporting.
///
/// Not the same as [`JobStatus`] — `Failed` is the umbrella that maps to
/// `retrying`-and-not-yet-permanent under [`JobStatus`]. Used in API
/// layers and reporting where the finer-grained distinction is noise.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, utoipa::ToSchema)]
pub enum JobOutcome {
    Succeeded,
    Failed,
    FailedPermanent,
    Cancelled,
}