chainmq 0.1.0

A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
Documentation

ChainMQ

A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.

This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).

Features

  • Type-safe jobs with serde payloads
  • Redis-backed persistence (wait/active/delayed/failed)
  • Delayed jobs via Lua atomics
  • Retries with pluggable backoff strategies
  • Scalable workers with configurable concurrency
  • Simple registration and execution model

Requirements

  • Rust (stable)
  • Redis server (local or remote)

Start Redis locally (examples default to localhost):

redis-server

Install (use in another crate)

In your app's Cargo.toml:

[dependencies]
chainmq = { version = "0.1.0" }

Quick start (library usage)

Define a job type and run a worker in your application:

use chainmq::{async_trait, Job, JobContext, JobRegistry, WorkerBuilder, Result, AppContext};
use serde::{Serialize, Deserialize};
use std::sync::Arc;

#[derive(Serialize, Deserialize)]
struct Hello;

#[async_trait]
impl Job for Hello {
    async fn perform(&self, _ctx: &JobContext) -> Result<()> {
        println!("hello from job");
        Ok(())
    }
    fn name() -> &'static str { "Hello" }
    fn queue_name() -> &'static str { "default" }
}

#[derive(Clone, Default)]
struct AppState;
impl AppContext for AppState { fn clone_context(&self) -> Arc<dyn AppContext> { Arc::new(self.clone()) } }

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let app = Arc::new(AppState::default());
    let mut registry = JobRegistry::new();
    registry.register::<Hello>();

    let mut worker = WorkerBuilder::new("redis://localhost:6379", registry)
        .with_app_context(app)
        .with_queue_name("default")
        .with_concurrency(5)
        .spawn()
        .await?;

    worker.start().await?;
    // enqueue from somewhere else in your app:
    // let queue = chainmq::Queue::new(chainmq::QueueOptions::default()).await?;
    // queue.enqueue(Hello).await?;

    tokio::signal::ctrl_c().await?;
    worker.stop().await;
    Ok(())
}

Examples

This repo provides runnable examples. Build them all:

cargo build --examples

Run Redis first, then in separate terminals run workers/enqueuers.

  • Single worker for emails queue:
cargo run --example worker_main
  • Enqueue email jobs (normal + delayed/high priority):
cargo run --example enqueue_email
  • One worker handling multiple job types on a single queue:
cargo run --example multi_jobs_single_worker
  • Two workers on different queues (emails + reports):
cargo run --example multi_workers
  • Failure and retry with backoff demonstration:
cargo run --example failure_retry
  • Delayed jobs demonstration:
cargo run --example delayed_jobs

Notes:

  • You can enqueue before or after workers start. Jobs persist in Redis until claimed.
  • Ensure both worker and enqueuer use the same Redis URL and queue name.
  • Some examples default to redis://localhost:6379. Adjust to your setup.

Core concepts

  • Job: trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str }
  • Queue: Persists job metadata and pushes to wait/delayed lists.
  • Worker: Polls a queue, claims jobs atomically via Lua, executes via JobRegistry.
  • Registry: Maps job type name -> executor for deserialization + dispatch.

Service injection (AppContext)

Inject your own services (DB pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.

Define your application state:

use bullmq_rs::AppContext;
use std::sync::Arc;

#[derive(Clone)]
struct AppState {
    db: sqlx::PgPool,
    http: reqwest::Client,
}

impl AppContext for AppState {
    fn clone_context(&self) -> Arc<dyn AppContext> { Arc::new(self.clone()) }
}

Pass it to the worker:

let app = Arc::new(AppState { db: pool, http: reqwest::Client::new() });
let mut worker = WorkerBuilder::new("redis://localhost:6379", registry)
    .with_app_context(app)
    .with_queue_name("default")
    .spawn()
    .await?;

Use it inside jobs via the helper ctx.app::<T>() (preferred) or explicit downcast:

#[async_trait]
impl Job for MyJob {
    async fn perform(&self, ctx: &JobContext) -> Result<()> {
        // Preferred typed helper
        if let Some(app) = ctx.app::<AppState>() {
            let row = sqlx::query!("select 1 as one").fetch_one(&app.db).await?;
            let _ = app.http.get("https://example.com").send().await?;
            println!("db one = {}", row.one);
        }

        // Or manual downcast if needed
        // if let Some(app) = ctx.app_context.as_ref().as_any().downcast_ref::<AppState>() { /* ... */ }
        Ok(())
    }
    fn name() -> &'static str { "MyJob" }
}

Internals (high level)

  • Lua scripts in src/lua ensure atomic operations:
    • move_delayed.lua: moves due jobs from delayed zset to wait list
    • claim_job.lua: pops from wait -> adds to active, updates job state
  • Redis keys (default prefix rbq):
    • rbq:queue:{name}:wait, :active, :delayed, :failed
    • rbq:job:{id} stores serialized job metadata

Troubleshooting

  • Perform not running:
    • Ensure worker .with_queue_name(...) matches Job::queue_name()
    • Same Redis URL for worker/enqueuer
    • Check LRANGE rbq:queue:{queue}:wait 0 -1 in redis-cli
  • Lua invocation error (arguments must be strings/integers):
    • Fixed in the crate: move_delayed.lua is called with ARGV (queue_name, now)
  • No jobs claimed:
    • Verify jobs enqueued to the same queue and not delayed into the future
  • Connection issues:
    • Verify Redis URL and server is running

Development

  • Build: cargo build
  • Test: cargo test

License

MIT (or your chosen license)

Acknowledgements

Inspired by existing Redis-backed queues and workers; built for ergonomic, type-safe Rust use.