BullMQ Wrapper in Rust - Advanced Job Management with Redis
📌 Description
This library is a modular BullMQ-inspired wrapper in Rust, allowing queue, job, and worker management via Redis. It provides advanced features such as job prioritization, delays, retries, and queue management.
📂 Project Architecture
/src
├── config_service.rs # Centralized Redis configuration management
├── queue_service.rs # Queue and job management
├── worker_service.rs # Workers for job execution
├── job_model.rs # Job model with advanced options
├── log_service.rs # Logging service for job events
├── lib.rs # Library module declarations
├── main.rs # Application entry point
🚀 Installation
Ensure you have Rust installed and Redis running locally or in the cloud.
🛠️ Configuration
Create a .env file with Redis parameters:
REDIS_URL=redis://127.0.0.1:6379
## 📖 Usage exemple
1. start redis
docker compose up -d
2. Create a queue trigger service :
cargo run --bin queue_trigger
3. Push message to queue :
cargo run --bin push_message
### 1️⃣ Add a Job
```rust
use queue_service::QueueService;
use job_model::JobData;
use chrono::Utc;
#[tokio::main]
async fn main() {
let config = ConfigService::new();
let queue_service = QueueService::new(&config).unwrap();
let job = JobData {
message: "Hello, Rust!".to_string(),
timestamp: Utc::now().to_rfc3339(),
priority: Some(1),
delay: Some(5),
retries: Some(3),
expires_in: None,
progress: Some(0),
};
queue_service.add_job("testQueue", job).await.unwrap();
}
2️⃣ Start a Worker to Process Jobs
use Arc;
use WorkerService;
use QueueService;
use ConfigService;
async
3️⃣ Retry Failed Jobs
use Arc;
use WorkerService;
use QueueService;
use ConfigService;
async
4️⃣ Track Job Progress
use Arc;
use WorkerService;
use QueueService;
use ConfigService;
async
📜 Detailed Documentation
ConfigService
Manages Redis configuration.
Methods:
new() -> Self: Creates a newConfigServiceinstance.get_client(&self) -> RedisResult<Client>: Returns a Redis client.
QueueService
Manages queues and jobs in Redis.
Methods:
new(config: &ConfigService) -> RedisResult<Self>: Creates a newQueueServiceinstance.add_job(&self, queue_name: &str, job: JobData) -> RedisResult<()>: Adds a job to the specified queue.get_next_job(&self, queue_name: &str) -> RedisResult<Option<String>>: Retrieves the next job from the specified queue.count_jobs(&self, queue_name: &str) -> RedisResult<u64>: Counts the number of jobs in the specified queue.move_to_failed(&self, queue_name: &str, job: JobData) -> RedisResult<()>: Moves a job to the failed queue.log_job_status(&self, queue_name: &str, job: &JobData, status: &str) -> RedisResult<()>: Logs the status of a job.update_job_progress(&self, queue_name: &str, job_id: &str, progress: u32) -> RedisResult<()>: Updates the progress of a job.get_job_progress(&self, queue_name: &str, job_id: &str) -> RedisResult<u32>: Retrieves the progress of a job.
WorkerService
Manages workers that process jobs from a queue.
Methods:
new(queue_name: String, queue_service: Arc<QueueService>) -> Self: Creates a newWorkerServiceinstance.start(&self): Starts the worker to process jobs from the queue.retry_failed_jobs(&self): Retries failed jobs from the failed queue.
LogService
Logs job events to Redis.
Methods:
new(client: Arc<Mutex<Client>>) -> Self: Creates a newLogServiceinstance.log(&self, queue_name: &str, message: &str) -> RedisResult<()>: Logs a message to the specified queue's log.
JobData
Represents the data of a job.
Fields:
message: String: The message of the job.timestamp: String: The timestamp when the job was created.priority: Option<i32>: The priority of the job.delay: Option<i64>: The delay before the job can be processed.retries: Option<u32>: The number of retries allowed for the job.expires_in: Option<i64>: The expiration time of the job.progress: Option<u32>: The progress of the job.
🐳 Docker Setup
Dockerfile
FROM rust:latest
WORKDIR /usr/src/app
COPY . .
RUN apt-get update && apt-get install -y libssl-dev pkg-config
RUN cargo build --release
CMD ["cargo", "run"]
docker-compose.yml
services:
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
volumes:
- redis-data:/data
app:
build:
context: .
dockerfile: Dockerfile
container_name: bullmq-rust-app
environment:
- REDIS_URL=redis://redis:6379
depends_on:
- redis
volumes:
- .:/usr/src/app
command:
volumes:
redis-data:
.env.sample
REDIS_URL=redis://localhost:6379
.env
REDIS_URL=redis://localhost:6379
📜 License
This project is licensed under the MIT License.