rust-job-queue-api-worker-system 0.1.0

A production-shaped Rust job queue: Axum API + async workers + Postgres SKIP LOCKED dequeue, retries with decorrelated jitter, idempotency, cooperative cancellation, OpenAPI, Prometheus metrics.
//! Embedding example: plug a custom `Executor` into [`WorkerRuntime`].
//!
//! This is the shape downstream consumers use when they want the queue
//! machinery from this crate but supply their own job-execution logic
//! (e.g., sending real emails, talking to S3, calling a third-party API).
//!
//! Run with:
//!
//! ```bash
//! DATABASE_URL=postgres://localhost/jobs \
//!   cargo run --example embed_worker --features worker
//! ```
//!
//! The example exits after processing a small number of pre-seeded jobs so
//! it can be used as a quick smoke test.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use rust_job_queue_api_worker_system::{
    connect, migrate,
    queue::enqueue,
    worker::{ExecutionContext, ExecutionOutcome, Executor, WorkerRuntime},
    Job, JobKind, NewJob, PoolConfig,
};
use serde_json::json;
use tokio_util::sync::CancellationToken;

/// A toy executor that just logs each job and pretends it succeeded.
///
/// Real consumers would do something useful here: send an email, resize
/// an image, hit an external webhook, etc. Return [`ExecutionOutcome::Failed`]
/// to retry / fail-permanent according to the job's `max_attempts`; return
/// [`ExecutionOutcome::Cancelled`] when the executor observes the
/// cancel-requested flag on the job row mid-execution.
struct LoggingExecutor;

#[async_trait]
impl Executor for LoggingExecutor {
    async fn execute(&self, _ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
        println!(
            "executing job_id={} kind={} attempt={} payload={}",
            job.id,
            job.kind.as_str(),
            job.attempts,
            job.payload
        );
        ExecutionOutcome::Succeeded
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let database_url =
        std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL must be set"))?;

    let pool = connect(&PoolConfig::from_url(database_url)).await?;
    migrate(&pool).await?;

    // Seed three jobs so the worker has something to do.
    for i in 0..3 {
        enqueue(
            &pool,
            NewJob {
                kind: JobKind::SummarizeText,
                payload: json!({ "text": format!("example {i}") }),
                max_attempts: Some(1),
                idempotency_key: None,
            },
        )
        .await?;
    }

    let cancel = CancellationToken::new();
    let runtime = WorkerRuntime::new(pool.clone(), Arc::new(LoggingExecutor))
        .with_concurrency(1)
        .with_poll_interval(Duration::from_millis(50), Duration::from_millis(200));

    // Run for a bounded amount of time, then shut down. In a real binary
    // you'd wait on SIGINT/SIGTERM instead.
    let cancel_for_shutdown = cancel.clone();
    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(3)).await;
        cancel_for_shutdown.cancel();
    });

    runtime.run(cancel).await;
    println!("example done");
    Ok(())
}