ChainMQ
A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).
Features
- Type-safe jobs with
serdepayloads - Redis-backed persistence (wait/active/delayed/failed)
- Delayed jobs via Lua atomics
- Retries with pluggable backoff strategies
- Scalable workers with configurable concurrency
- Simple registration and execution model
Requirements
- Rust (stable)
- Redis server (local or remote)
Start Redis locally (examples default to localhost):
Install (use in another crate)
In your app's Cargo.toml:
[]
= { = "0.1.0" }
Quick start (library usage)
Define a job type and run a worker in your application:
use ;
use ;
use Arc;
;
;
async
Examples
This repo provides runnable examples. Build them all:
Run Redis first, then in separate terminals run workers/enqueuers.
- Single worker for emails queue:
- Enqueue email jobs (normal + delayed/high priority):
- One worker handling multiple job types on a single queue:
- Two workers on different queues (emails + reports):
- Failure and retry with backoff demonstration:
- Delayed jobs demonstration:
Notes:
- You can enqueue before or after workers start. Jobs persist in Redis until claimed.
- Ensure both worker and enqueuer use the same Redis URL and queue name.
- Some examples default to
redis://localhost:6379. Adjust to your setup.
Core concepts
- Job:
trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str } - Queue: Persists job metadata and pushes to wait/delayed lists.
- Worker: Polls a queue, claims jobs atomically via Lua, executes via
JobRegistry. - Registry: Maps job type name -> executor for deserialization + dispatch.
Service injection (AppContext)
Inject your own services (DB pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
Define your application state:
use AppContext;
use Arc;
Pass it to the worker:
let app = new;
let mut worker = new
.with_app_context
.with_queue_name
.spawn
.await?;
Use it inside jobs via the helper ctx.app::<T>() (preferred) or explicit downcast:
Internals (high level)
- Lua scripts in
src/luaensure atomic operations:move_delayed.lua: moves due jobs from delayed zset to wait listclaim_job.lua: pops from wait -> adds to active, updates job state
- Redis keys (default prefix
rbq):rbq:queue:{name}:wait,:active,:delayed,:failedrbq:job:{id}stores serialized job metadata
Troubleshooting
- Perform not running:
- Ensure worker
.with_queue_name(...)matchesJob::queue_name() - Same Redis URL for worker/enqueuer
- Check
LRANGE rbq:queue:{queue}:wait 0 -1inredis-cli
- Ensure worker
- Lua invocation error (arguments must be strings/integers):
- Fixed in the crate:
move_delayed.luais called withARGV(queue_name, now)
- Fixed in the crate:
- No jobs claimed:
- Verify jobs enqueued to the same queue and not delayed into the future
- Connection issues:
- Verify Redis URL and server is running
Development
- Build:
cargo build - Test:
cargo test
License
MIT (or your chosen license)
Acknowledgements
Inspired by existing Redis-backed queues and workers; built for ergonomic, type-safe Rust use.