ChainMQ
A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).
Features
- 🚀 Redis-Powered: Built on Redis for reliable job persistence and distribution
- 🔄 Background Jobs: Process jobs asynchronously in the background
- 🏗️ Job Registry: Simple Type-safe job registration and execution
- 🔧 Worker Management: Configurable workers with lifecycle management
- ⚡ Async/Await: Full async support throughout the system
- ⏰ Delayed jobs: Schedule jobs for future execution with atomic operations
- 🗄️ Backoff strategies: Configurable retry logic for failed jobs
- 📊 Application Context: Share application state across jobs
- 🖥️ Web UI: Dashboard for monitoring and managing queues (one server sees every logical queue under the same Redis
key_prefix; see README_UI.md) - 📜 Queue lifecycle events (Redis Stream + pub/sub) and dashboard Activity + Redis modal (
INFOsnapshot; documented in README_UI.md) - 📝 Optional Redis-backed job log lines when the worker opts in to
tracing_job_logsand the job-log layer (README_UI.md)
Web dashboard (responsive)
The dashboard adapts from wide layouts (sidebar + dense tables) to narrow viewports (mobile chrome, off-canvas queue menu, stacked controls and cards). Setup and options are in README_UI.md.
| Desktop — queue | Desktop — job detail |
|---|---|
![]() |
![]() |
| Mobile — queue | Mobile — job detail |
|---|---|
![]() |
![]() |
Quick Start
Add ChainMQ to your Cargo.toml:
[]
= "1.3.1"
= { = "1", = ["full"] }
= { = "1.0", = ["derive"] }
= "0.1"
The crate enables the web-ui feature by default (Axum dashboard router you nest on your server). For a smaller dependency tree, use chainmq = { version = "1.1.2", default-features = false }. Add features = ["web-ui"] or ["web-ui-axum"] / ["web-ui-actix"] when you want the dashboard. See README_UI.md for setup, log capture, and WebUIMountConfig.
Basic Usage:
1. Define Your Job
use ;
use ;
use async_trait;
use Arc;
2. Set Up Application Context
use AppContext;
use Arc;
3. Configure workers and mount the dashboard (Axum)
use ;
use Router;
use ;
use Client;
async
4. Enqueue Jobs from Anywhere
use ;
async
Examples
Runnable examples live under examples/. Build them all:
Run Redis first, then use separate terminals for workers and enqueuers:
# Single worker for the emails queue (enables tracing → Redis job logs like the web UI example)
# Enqueue email jobs (normal + delayed / high priority); optional UI entrypoint at end of file
# One worker handling multiple job types on one logical queue name
# Multiple job types each with their own queue_name(); enqueue then start UI (same Redis)
# Two workers polling different logical queues (emails + reports)
# Failure and retry with backoff
# Delayed jobs
# Axum host + nested dashboard — see README_UI.md
# Then open http://127.0.0.1:8080/dashboard/ (see example source for path and port)
# Larger enqueue/processing demo (data volume / stress-style usage)
# Minimal Axum binary + dashboard (`REDIS_URL` optional)
Notes:
- You can enqueue before or after workers start. Jobs persist in Redis until claimed.
- Workers must use
.with_queue_name()that matches the jobs they should claim; use the same Redis endpoint (or equivalentRedisClientsetting) andkey_prefixas producers and the web UI so everyone sees the same data. - Some examples use non-default Redis URLs or ports (for example
6370). Check the top of each example and adjust for your environment.
Core Concepts
- Job: Defines work to be done. Implements
trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str } - Queue: One client handle for a Redis instance and
key_prefix. It persists job metadata and manages wait / delayed / active / failed lists. Logical queues are the string returned byJob::queue_name(); many names can coexist under oneQueue. Listing and the web UI operate on every queue name in that namespace. - Worker: Polls a configured queue name, claims jobs atomically via Lua scripts, and executes them through
JobRegistry - Registry: Maps job type names to executors for deserialization and dispatch
- JobContext: Application state (
AppContext), job metadata, the sameArc<Queue>the worker uses (queue()), optional progress updates, and a cooperativeCancellationToken
Web UI: Mount chainmq_dashboard_router (or the Actix configure_chainmq_web_ui helper) with one Queue for the same Redis + key_prefix as workers; it discovers all logical queue names in that namespace. Details: README_UI.md.
Configuration
Redis Configuration
ChainMQ targets the redis 1.x crate and uses redis::aio::ConnectionManager internally for queues (automatic reconnect with bounded connect and response timeouts when built from a URL or redis::Client).
You describe how to reach Redis on QueueOptions via the RedisClient enum (exported at the crate root as chainmq::RedisClient):
RedisClient::Url(String)— open a client and build a connection manager from the URL (default inQueueOptions::default).RedisClient::Client(redis::Client)— share an existing synchronous client (for example the same one you keep in application state).RedisClient::Manager(ConnectionManager)— reuse an already-built manager (for example one you created for custom tuning, or to share one manager across several components).
use ;
// URL only (typical for scripts and examples)
let _ = QueueOptions ;
// Existing redis::Client
let redis_client = open?;
let _ = QueueOptions ;
// With authentication or DB index in the URL
let _ = QueueOptions ;
Worker Configuration
use WorkerBuilder;
use ConnectionManager;
// Using an existing redis::Client (takes a reference; clones internally for options)
let worker = new_with_redis_instance
.with_app_context
.with_queue_name
.with_concurrency // Number of concurrent jobs
.with_poll_interval // How often to check for jobs
.spawn
.await?;
// Using Redis URI (stored as RedisClient::Url on the worker's QueueOptions)
let worker = new_with_redis_uri
.with_app_context
.with_queue_name
.with_concurrency
.spawn
.await?;
// Optional: reuse a ConnectionManager you built yourself
let worker = new_with_redis_manager
.with_queue_name
.spawn
.await?;
Queue Configuration
use ;
let options = QueueOptions ;
let queue = new.await?;
Job Configuration
let job = EmailJob ;
let opts = JobOptions ;
let job_id = queue.enqueue_with_options.await?;
Note:
priorityandrate_limit_keyare persisted on job metadata but not yet enforced by ChainMQ (the wait queue is FIFO). Use application-level logic if you need strict prioritization or rate limits today.
Advanced Usage
Service Injection with AppContext
Inject your own services (database pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
use AppContext;
use Arc;
Use it inside jobs via the helper ctx.app::<T>():
Multiple Job Types
Register multiple job types in a single registry:
let mut registry = new;
registry.;
registry.;
registry.;
registry.;
// Single worker can handle all job types
let worker = new_with_redis_instance
.with_queue_name
.spawn
.await?;
Delayed Jobs
Schedule jobs for future execution:
use JobOptions;
use Duration;
let delayed_job = EmailJob ;
let options = JobOptions ;
queue.enqueue_with_options.await?;
Error Handling and Retries
Jobs that fail are automatically retried with configurable backoff:
Internals (high level)
Each Queue holds a clone-friendly Redis async ConnectionManager (built from your RedisClient choice) for commands and script evaluation.
ChainMQ uses Lua scripts to ensure atomic operations:
move_delayed.lua: Moves due jobs from delayed sorted set to wait listclaim_job.lua: Atomically pops from wait list and adds to active list
Redis keys use a configurable prefix (default rbq):
rbq:queues- Set of logical queue names (also discovered by scanning queue key patterns)rbq:queue:{name}:wait- Jobs waiting to be processedrbq:queue:{name}:active- Jobs currently being processedrbq:queue:{name}:delayed- Jobs scheduled for future executionrbq:queue:{name}:failed- Jobs that have failed processingrbq:queue:{name}:completed- Completed job IDs (when used)rbq:job:{id}- Individual job metadata and payloadrbq:job:{id}:logs- Per-job log lines for the UI (when enabled)
Troubleshooting
Jobs not being processed:
- Ensure worker
.with_queue_name()matchesJob::queue_name() - Verify the same Redis endpoint and
RedisClientmode (URL vs shared client vs manager) for both worker and enqueuer, not only the host string - Check jobs are enqueued:
redis-cli LRANGE rbq:queue:{queue}:wait 0 -1
Connection issues:
- Verify Redis server is running and accessible
- Check Redis URL format and credentials
- Test connection with
redis-cli ping
Jobs failing silently:
- Check Redis logs and failed job queue:
LRANGE rbq:queue:{queue}:failed 0 -1 - Add logging/tracing to your job implementations
- Ensure job payload can be properly serialized/deserialized
Performance issues:
- Increase worker concurrency with
.with_concurrency(n) - Reduce poll interval with
.with_poll_interval(duration) - Monitor Redis memory usage and job queue lengths
Development
# Build the library
# Run examples (requires Redis)
License
MIT
Acknowledgements
Inspired by existing Redis-backed job queues; built for ergonomic, type-safe Rust applications.



