rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Per-kind payload validation.
//!
//! The queue itself stores payloads as opaque `JSONB`; the table's
//! `payload` column has no schema beyond "valid JSON". This file is
//! where the kind-specific shape lives. [`validate`] is called from
//! [`crate::queue::enqueue`] before the INSERT, so a bad payload fails
//! synchronously at the HTTP layer rather than later inside the worker.
//!
//! The per-kind structs (`SendEmailPayload`, `ResizeImagePayload`, …)
//! are also used by executors when they need typed access to the
//! payload — the executor calls `serde_json::from_value` on `job.payload`
//! to recover the typed shape.
//!
//! ## Why payloads aren't stored in typed columns
//!
//! Two reasons:
//!
//! 1. Adding a new job kind would otherwise require an `ALTER TABLE` and
//!    a new column. JSONB plus per-kind validation lets `JobKind` and
//!    `crate::payload` evolve together without touching the schema.
//! 2. JSONB supports partial-shape queries (`payload ? 'to'`, etc.) and
//!    GIN indexing if a payload field becomes hot. Typed columns would
//!    force every kind into the same schema.

use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::domain::JobKind;
use crate::error::JobError;

/// Schema for `JobKind::SendEmail` payloads.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SendEmailPayload {
    pub to: String,
    pub subject: String,
    pub body: String,
}

/// Schema for `JobKind::ResizeImage` payloads.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResizeImagePayload {
    pub source_url: String,
    pub width: u32,
    pub height: u32,
}

/// Schema for `JobKind::SummarizeText` payloads.
///
/// `max_words` is optional; consumers who don't supply it can choose
/// their own default in the executor.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SummarizeTextPayload {
    pub text: String,
    #[serde(default)]
    pub max_words: Option<u32>,
}

/// Schema for `JobKind::WebhookDelivery` payloads.
///
/// `body` is `serde_json::Value` because webhook bodies are
/// per-recipient and the queue makes no assumption about their shape.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookDeliveryPayload {
    pub url: String,
    pub body: Value,
}

/// Check that the given `payload` JSON value matches the schema for
/// the given `kind`.
///
/// Returns `Err(JobError::PayloadInvalid)` if the payload does not match,
/// with the serde deserialisation error message in `reason`.
///
/// The check is implemented by attempting to deserialise into the
/// kind-specific type and discarding the result — we just want the
/// "does it parse?" answer, not the value.
pub fn validate(kind: JobKind, payload: &Value) -> Result<(), JobError> {
    /// Helper that tries to deserialise `payload` as `T` and surfaces
    /// any error as a [`JobError::PayloadInvalid`] with `kind`'s label.
    fn check<T: for<'de> Deserialize<'de>>(kind: JobKind, payload: &Value) -> Result<(), JobError> {
        // `from_value` requires owned input; clone is a single Arc bump
        // for JSON containers (serde_json::Value is shallow-clone
        // cheap), so the cost is negligible.
        serde_json::from_value::<T>(payload.clone())
            .map(|_| ())
            .map_err(|e| JobError::PayloadInvalid {
                kind: kind.as_str(),
                reason: e.to_string(),
            })
    }
    match kind {
        JobKind::SendEmail => check::<SendEmailPayload>(kind, payload),
        JobKind::ResizeImage => check::<ResizeImagePayload>(kind, payload),
        JobKind::SummarizeText => check::<SummarizeTextPayload>(kind, payload),
        JobKind::WebhookDelivery => check::<WebhookDeliveryPayload>(kind, payload),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn send_email_valid() {
        // The minimal valid shape: every required string field present.
        let p = json!({ "to": "a@b.c", "subject": "hi", "body": "hello" });
        validate(JobKind::SendEmail, &p).unwrap();
    }

    #[test]
    fn send_email_missing_field_fails() {
        // serde's "missing field" error should surface as `payload_invalid`.
        let p = json!({ "to": "a@b.c", "subject": "hi" });
        let err = validate(JobKind::SendEmail, &p).unwrap_err();
        assert_eq!(err.kind(), "payload_invalid");
    }

    #[test]
    fn resize_image_valid() {
        let p = json!({ "source_url": "https://x/y.png", "width": 100, "height": 100 });
        validate(JobKind::ResizeImage, &p).unwrap();
    }

    #[test]
    fn webhook_delivery_accepts_arbitrary_body() {
        // The `body` field is typed as `serde_json::Value`, so any JSON
        // value (nested object, mixed-type array, etc.) must validate.
        let p = json!({ "url": "https://x", "body": { "any": ["thing", 1, null] } });
        validate(JobKind::WebhookDelivery, &p).unwrap();
    }
}