rapina 0.11.0

A fast, type-safe web framework for Rust inspired by FastAPI
Documentation
//! Background jobs support for Rapina applications.
//!
//! This module provides the full background jobs system: a migration to create
//! the `rapina_jobs` table, an in-process worker that polls and dispatches jobs,
//! and the core types used by the `#[job]` macro.
//!
//! **Note:** The migration uses PostgreSQL-specific features (`gen_random_uuid()`,
//! partial indexes, `FOR UPDATE SKIP LOCKED`). MySQL and SQLite are not currently
//! supported for background jobs.
//!
//! # Setup
//!
//! Add the framework migration to your project's migration list:
//!
//! ```rust,ignore
//! use rapina::jobs::create_rapina_jobs;
//!
//! rapina::migrations! {
//!     create_rapina_jobs,
//!     m20260315_000001_create_users,
//! }
//! ```
//!
//! Or run `rapina jobs init` to configure it automatically.
//!
//! # Defining Jobs
//!
//! Use the `#[job]` macro to define a background job handler. The first argument
//! is always the payload (must implement `serde::Serialize + serde::DeserializeOwned`).
//! Remaining arguments are dependency-injected from `AppState` — `State<T>` and `Db`
//! are the supported extractors here.
//!
//! ```rust,ignore
//! use rapina::prelude::*;
//!
//! #[derive(Serialize, Deserialize)]
//! pub struct WelcomeEmailPayload {
//!     pub email: String,
//! }
//!
//! #[job(queue = "emails", max_retries = 5)]
//! async fn send_welcome_email(
//!     payload: WelcomeEmailPayload,
//!     mailer: State<Mailer>,
//! ) -> JobResult {
//!     mailer.send(&payload.email).await?;
//!     Ok(())
//! }
//! ```
//!
//! The macro generates a `send_welcome_email(payload) -> JobRequest` helper.
//! Use the [`Jobs`] extractor in HTTP handlers to dispatch jobs via [`Jobs::enqueue`]
//! or [`Jobs::enqueue_with`] for transactional enqueue.
//!
//! # Starting the Worker
//!
//! Call `.jobs()` on the application builder before `.listen()`. The worker
//! spawns in-process alongside the HTTP server and shuts down gracefully on
//! SIGINT/SIGTERM — it finishes its current batch before stopping.
//!
//! ```rust,ignore
//! use rapina::jobs::JobConfig;
//!
//! Rapina::new()
//!     .with_database(db_config).await?
//!     .jobs(JobConfig::default().queues(["default", "emails"]))
//!     .listen("127.0.0.1:3000")
//!     .await
//! ```
//!
//! # Job Lifecycle
//!
//! ```text
//! pending → running → completed
//!                   ↘ failed   (or back to pending if retries remain)
//! ```
//!
//! The worker atomically claims a batch of `pending` jobs using
//! `FOR UPDATE SKIP LOCKED`, so concurrent workers never process the same row.
//! On success [`apply_success`](retry::apply_success) sets `completed`. On
//! failure [`apply_failure`](retry::apply_failure) either reschedules as
//! `pending` with exponential backoff or permanently marks the job `failed`
//! once `max_retries` is exhausted.
//!
//! # DI Limitations
//!
//! Job handlers run outside the request cycle with synthetic request context.
//! Only `State<T>` and `Db` work correctly — they source data from `AppState`
//! directly. Request-bound extractors (`Context`, `Headers`, `Path`, `Query`,
//! `CurrentUser`) will fail at runtime and must not be used in job handlers.
//!
//! # Trace Propagation
//!
//! When a job row has a `trace_id`, the worker opens a tracing span that
//! includes the original value so all log lines emitted during handler
//! execution are correlated with the HTTP request that enqueued the job.

pub mod create_rapina_jobs;
mod model;
pub(crate) mod retry;
pub(crate) mod worker;

pub use model::{JobRow, JobStatus};
pub use retry::RetryPolicy;
pub use worker::JobConfig;

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use sea_orm::{ConnectionTrait, DatabaseConnection, DbBackend, Statement};
use uuid::Uuid;

use crate::state::AppState;

/// Unique identifier for an enqueued background job.
pub type JobId = Uuid;

/// The result type returned by job handler functions.
///
/// Equivalent to `Result<(), rapina::error::Error>`.
pub type JobResult = Result<(), crate::error::Error>;

/// Describes a job to be enqueued.
///
/// Typically constructed by the helper function generated by `#[job]`.
/// All fields are public so advanced users can construct it directly when
/// bypassing the macro (e.g. for non-standard payload formats).
pub struct JobRequest {
    /// The job type name used to dispatch to the correct handler at runtime.
    ///
    /// Matches the `job_type` column in `rapina_jobs`. Derived from the
    /// function name, e.g. `send_welcome_email` → `"send_welcome_email"`.
    pub job_type: &'static str,
    /// Serialized payload passed to the handler.
    pub payload: serde_json::Value,
    /// Queue to place the job in. Defaults to `"default"`.
    pub queue: &'static str,
    /// Maximum number of retry attempts before the job is marked `failed`.
    ///
    /// `i32` to match the `INTEGER` column type in `rapina_jobs`.
    pub max_retries: i32,
}

/// Function signature for the handler wrapper generated by `#[job]`.
///
/// The wrapper deserializes the payload from JSON, injects dependencies
/// from `AppState` via `FromRequestParts`, and calls the user-defined handler.
#[doc(hidden)]
pub type JobHandlerFn =
    fn(serde_json::Value, Arc<AppState>) -> Pin<Box<dyn Future<Output = JobResult> + Send>>;

/// Describes a job handler collected at link time via `inventory`.
///
/// Registered automatically by the `#[job]` macro — no manual registration
/// needed. The worker looks up handlers by `job_type` when claiming a row.
pub struct JobDescriptor {
    /// Matches the `job_type` column in `rapina_jobs`.
    pub job_type: &'static str,
    /// The generated wrapper function.
    #[doc(hidden)]
    pub handle: JobHandlerFn,
    /// Retry policy for this job type: `"exponential"`, `"fixed"`, or `"none"`.
    #[doc(hidden)]
    pub retry_policy: &'static str,
    /// Base delay in seconds for `"exponential"` and `"fixed"` policies.
    #[doc(hidden)]
    pub retry_delay_secs: f64,
}

inventory::collect!(JobDescriptor);

/// Extractor that provides access to the job queue from HTTP handlers.
///
/// Captures the database connection pool from `AppState` and the `trace_id`
/// from the current request's [`RequestContext`](crate::context::RequestContext), so enqueued jobs inherit
/// the request's observability context automatically.
///
/// Two enqueue methods:
///
/// - [`enqueue`](Self::enqueue) grabs its own connection from the pool.
///   This is the 90% case.
/// - [`enqueue_with`](Self::enqueue_with) uses the caller's connection or
///   transaction. The job row is committed atomically with the surrounding
///   business logic — if the transaction rolls back, the job is never enqueued.
///
/// # Example
///
/// ```rust,ignore
/// use rapina::prelude::*;
///
/// #[post("/users")]
/// async fn create_user(body: Json<CreateUserRequest>, db: Db, jobs: Jobs) -> Result<StatusCode> {
///     // Simple enqueue — independent of any transaction.
///     jobs.enqueue(send_report(ReportPayload { user_id: 42 })).await?;
///
///     // Transactional enqueue — job row commits atomically with the user row.
///     let txn = db.conn().begin().await?;
///     let user = User::insert(&txn, &body).await?;
///     jobs.enqueue_with(&txn, send_welcome_email(WelcomeEmailPayload {
///         email: user.email.clone(),
///     })).await?;
///     txn.commit().await?;
///
///     Ok(StatusCode::CREATED)
/// }
/// ```
#[derive(Debug, Clone)]
pub struct Jobs {
    pool: DatabaseConnection,
    pub(crate) trace_id: Option<String>,
}

impl Jobs {
    /// Creates a `Jobs` instance from a connection pool and optional trace id.
    ///
    /// Normally constructed by the `FromRequestParts` implementation. Public
    /// so code that manages its own connection lifecycle outside the request
    /// cycle (e.g., a job handler that enqueues a follow-up job) can construct
    /// it directly.
    pub fn new(pool: DatabaseConnection, trace_id: Option<String>) -> Self {
        Self { pool, trace_id }
    }

    /// Enqueues a job using a connection from the pool.
    ///
    /// The job is inserted independently of any caller-managed transaction.
    /// For transactional enqueue, see [`enqueue_with`](Self::enqueue_with).
    pub async fn enqueue(&self, req: impl Into<JobRequest>) -> crate::error::Result<JobId> {
        insert_job(&self.pool, req.into(), self.trace_id.as_deref()).await
    }

    /// Enqueues a job using the caller's connection or transaction.
    ///
    /// Both `DatabaseConnection` and `DatabaseTransaction` implement
    /// `ConnectionTrait`, so the same method handles both cases.
    pub async fn enqueue_with<C>(
        &self,
        conn: &C,
        req: impl Into<JobRequest>,
    ) -> crate::error::Result<JobId>
    where
        C: ConnectionTrait,
    {
        insert_job(conn, req.into(), self.trace_id.as_deref()).await
    }
}

async fn insert_job<C>(
    conn: &C,
    req: JobRequest,
    trace_id: Option<&str>,
) -> crate::error::Result<JobId>
where
    C: ConnectionTrait,
{
    // rapina_jobs is PostgreSQL-only (gen_random_uuid(), partial indexes).
    // If this panics, the user enabled a non-postgres database feature.
    debug_assert_eq!(
        conn.get_database_backend(),
        DbBackend::Postgres,
        "Jobs require PostgreSQL — rapina_jobs uses gen_random_uuid() and partial indexes"
    );

    let stmt = build_insert_stmt(req, trace_id);

    let row = conn
        .query_one(stmt)
        .await
        .map_err(|e| crate::error::Error::internal(format!("failed to enqueue job: {e}")))?
        .ok_or_else(|| crate::error::Error::internal("INSERT INTO rapina_jobs returned no rows"))?;

    let id: Uuid = row
        .try_get("", "id")
        .map_err(|e| crate::error::Error::internal(format!("failed to read job id: {e}")))?;

    Ok(id)
}

fn build_insert_stmt(req: JobRequest, trace_id: Option<&str>) -> Statement {
    Statement::from_sql_and_values(
        DbBackend::Postgres,
        "INSERT INTO rapina_jobs (job_type, queue, payload, max_retries, trace_id) \
         VALUES ($1, $2, $3, $4, $5) \
         RETURNING id",
        [
            req.job_type.into(),
            req.queue.into(),
            req.payload.into(),
            req.max_retries.into(),
            trace_id.map(ToOwned::to_owned).into(),
        ],
    )
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn job_request_fields() {
        let req = JobRequest {
            job_type: "send_email",
            payload: serde_json::json!({ "to": "test@example.com" }),
            queue: "emails",
            max_retries: 5,
        };
        assert_eq!(req.job_type, "send_email");
        assert_eq!(req.queue, "emails");
        assert_eq!(req.max_retries, 5);
        assert_eq!(req.payload["to"], "test@example.com");
    }

    #[test]
    fn default_convention() {
        // Verify the defaults the macro uses match what callers expect.
        let req = JobRequest {
            job_type: "process_event",
            payload: serde_json::Value::Null,
            queue: "default",
            max_retries: 3,
        };
        assert_eq!(req.queue, "default");
        assert_eq!(req.max_retries, 3);
    }

    #[test]
    fn max_retries_is_i32() {
        // Must match the INTEGER column in rapina_jobs — same type as JobRow::max_retries.
        let req = JobRequest {
            job_type: "t",
            payload: serde_json::Value::Null,
            queue: "default",
            max_retries: i32::MAX,
        };
        assert_eq!(req.max_retries, i32::MAX);
    }

    #[test]
    fn insert_stmt_has_correct_sql() {
        let req = JobRequest {
            job_type: "send_email",
            payload: serde_json::json!({"to": "a@b.com"}),
            queue: "emails",
            max_retries: 5,
        };
        let stmt = build_insert_stmt(req, Some("trace-123"));
        assert!(stmt.sql.contains("INSERT INTO rapina_jobs"));
        assert!(stmt.sql.contains("RETURNING id"));
    }

    #[test]
    fn insert_stmt_uses_postgres_backend() {
        let req = JobRequest {
            job_type: "t",
            payload: serde_json::Value::Null,
            queue: "default",
            max_retries: 3,
        };
        let stmt = build_insert_stmt(req, None);
        assert_eq!(stmt.db_backend, DbBackend::Postgres);
    }

    #[test]
    fn insert_stmt_trace_id_some() {
        let req = JobRequest {
            job_type: "t",
            payload: serde_json::Value::Null,
            queue: "default",
            max_retries: 3,
        };
        let stmt = build_insert_stmt(req, Some("abc-123"));
        // 5 params: job_type, queue, payload, max_retries, trace_id
        assert_eq!(stmt.values.as_ref().map(|v| v.0.len()), Some(5));
        let trace_val = &stmt.values.as_ref().unwrap().0[4];
        assert_eq!(
            *trace_val,
            sea_orm::Value::String(Some(Box::new("abc-123".to_owned())))
        );
    }

    #[test]
    fn insert_stmt_trace_id_none() {
        let req = JobRequest {
            job_type: "t",
            payload: serde_json::Value::Null,
            queue: "default",
            max_retries: 3,
        };
        let stmt = build_insert_stmt(req, None);
        let trace_val = &stmt.values.as_ref().unwrap().0[4];
        assert_eq!(*trace_val, sea_orm::Value::String(None));
    }
}