ChainMQ
A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).
Features
- 🚀 Redis-Powered: Built on Redis for reliable job persistence and distribution
- 🔄 Background Jobs: Process jobs asynchronously in the background
- 🏗️ Job Registry: Simple Type-safe job registration and execution
- 🔧 Worker Management: Configurable workers with lifecycle management
- ⚡ Async/Await: Full async support throughout the system
- ⏰ Delayed jobs: Schedule jobs for future execution with atomic operations
- 🗄️ Backoff strategies: Configurable retry logic for failed jobs
- 📊 Application Context: Share application state across jobs
Quick Start:
Add ChainMQ to your Cargo.toml:
[]
= "0.2.0"
= "4.0"
= "0.23"
= { = "1.0", = ["full"] }
= { = "1.0", = ["derive"] }
Basic Usage:
1. Define Your Job
use ;
use ;
use async_trait;
use Arc;
2. Set Up Application Context
use AppContext;
use Arc;
3. Configure Workers and Your Preffered Web Server
use ;
use ;
use Client as RedisClient;
use broadcast;
async
4. Enqueue Jobs from Anywhere
use ;
async
Examples
This repo provides runnable examples. Build them all:
Run Redis first, then in separate terminals run workers/enqueuers:
# Single worker for emails queue
# Enqueue email jobs (normal + delayed/high priority)
# One worker handling multiple job types on a single queue
# Two workers on different queues (emails + reports)
# Failure and retry with backoff demonstration
# Delayed jobs demonstration
Notes:
- You can enqueue before or after workers start. Jobs persist in Redis until claimed.
- Ensure both worker and enqueuer use the same Redis URL and queue name.
- Some examples default to
redis://localhost:6379. Adjust to your setup.
Core Concepts
- Job: Defines work to be done. Implements
trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str } - Queue: Persists job metadata and manages wait/delayed/active/failed job lists in Redis
- Worker: Polls queues, claims jobs atomically via Lua scripts, executes jobs via JobRegistry
- Registry: Maps job type names to executors for deserialization and dispatch
- JobContext: Provides access to application state and job metadata during execution
Configuration
Redis Configuration
ChainMQ works with any Redis instance:
// Local Redis
let redis_client = open?;
// Redis with authentication
let redis_client = open?;
// Redis with database selection
let redis_client = open?;
Worker Configuration
// Using Redis instance
let worker = new_with_redis_instance
.with_app_context
.with_queue_name
.with_concurrency // Number of concurrent jobs
.with_poll_interval // How often to check for jobs
.spawn
.await?;
// Using Redis URI
let worker = new_with_redis_uri
.with_app_context
.with_queue_name
.with_concurrency
.spawn
.await?;
Queue Configuration
let options = QueueOptions ;
let queue = new.await?;
Job Configuration
let job = EmailJob ;
let opts = JobOptions ;
let job_id = queue.enqueue_with_options.await?;
Advanced Usage
Service Injection with AppContext
Inject your own services (database pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
use AppContext;
use Arc;
Use it inside jobs via the helper ctx.app::<T>():
Multiple Job Types
Register multiple job types in a single registry:
let mut registry = new;
registry.;
registry.;
registry.;
registry.;
// Single worker can handle all job types
let worker = new_with_redis_instance
.with_queue_name
.spawn
.await?;
Delayed Jobs
Schedule jobs for future execution:
use JobOptions;
use Duration;
let delayed_job = EmailJob ;
let options = JobOptions ;
queue.enqueue_with_options.await?;
Error Handling and Retries
Jobs that fail are automatically retried with configurable backoff:
Service injection (AppContext)
Inject your own services (DB pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
Define your application state:
use AppContext;
use Arc;
Pass it to the worker:
let app = new;
let mut worker = new_with_redis_uri
.with_app_context
.with_queue_name
.spawn
.await?;
Use it inside jobs via the helper ctx.app::<T>() (preferred) or explicit downcast:
Internals (high level)
ChainMQ uses Lua scripts to ensure atomic operations:
move_delayed.lua: Moves due jobs from delayed sorted set to wait listclaim_job.lua: Atomically pops from wait list and adds to active list
Redis keys use a configurable prefix (default rbq):
rbq:queue:{name}:wait- Jobs waiting to be processedrbq:queue:{name}:active- Jobs currently being processedrbq:queue:{name}:delayed- Jobs scheduled for future executionrbq:queue:{name}:failed- Jobs that have failed processingrbq:job:{id}- Individual job metadata and payload
Troubleshooting
Jobs not being processed:
- Ensure worker
.with_queue_name()matchesJob::queue_name() - Verify same Redis URL for both worker and enqueuer
- Check jobs are enqueued:
redis-cli LRANGE rbq:queue:{queue}:wait 0 -1
Connection issues:
- Verify Redis server is running and accessible
- Check Redis URL format and credentials
- Test connection with
redis-cli ping
Jobs failing silently:
- Check Redis logs and failed job queue:
LRANGE rbq:queue:{queue}:failed 0 -1 - Add logging/tracing to your job implementations
- Ensure job payload can be properly serialized/deserialized
Performance issues:
- Increase worker concurrency with
.with_concurrency(n) - Reduce poll interval with
.with_poll_interval(duration) - Monitor Redis memory usage and job queue lengths
Development
# Build the library
# Run examples (requires Redis)
License
MIT
Acknowledgements
Inspired by existing Redis-backed job queues; built for ergonomic, type-safe Rust applications.