# Queue Workers
A Redis-backed job queue system for Rust applications with support for retries and concurrent workers.
## Features
- Redis-backed persistent job queue
- Automatic job retries with configurable attempts and delay
- Concurrent worker support
- Async/await based API
- Serializable jobs using Serde
- Type-safe job definitions
## Prerequisites
- Rust 1.86.0 or later
- Redis server (local or remote)
- Docker and Docker Compose (for development)
## Quick Start
1. Add the dependency to your `Cargo.toml`:
```toml
[dependencies]
queue_workers = "0.1.0"
```
2. Define your job:
```rust
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use queue_workers::job::Job;
#[derive(Debug, Serialize, Deserialize)]
struct EmailJob {
id: String,
to: String,
subject: String,
body: String,
}
#[async_trait]
impl Job for EmailJob {
type Output = String;
type Error = String;
async fn execute(&self) -> Result<Self::Output, Self::Error> {
// Implement your job logic here
Ok(format!("Email sent to {}", self.to))
}
}
```
3. Create a queue and worker:
```rust
use queue_workers::{redis_queue::RedisQueue, worker::{Worker, WorkerConfig}};
use std::time::Duration;
#[tokio::main]
async fn main() {
// Initialize the queue
let queue = RedisQueue::<EmailJob>::new(
"redis://127.0.0.1:6379",
"email_queue"
).expect("Failed to create queue");
// Configure the worker
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
// Create and start the worker
let worker = Worker::new(queue.clone(), config);
// Push a job
let job = EmailJob {
id: "email-1".to_string(),
to: "user@example.com".to_string(),
subject: "Hello".to_string(),
body: "World".to_string(),
};
queue.push(job).await.expect("Failed to push job");
// Start processing jobs
worker.start().await.expect("Worker failed");
}
```
## Development Setup
1. Clone the repository:
```bash
git clone https://github.com/yourusername/queue_workers.git
cd queue_workers
```
2. Install development dependencies:
```bash
rustup component add rustfmt
rustup component add clippy
```
3. Set up git hooks:
```bash
chmod +x scripts/setup-git-hooks.sh
./scripts/setup-git-hooks.sh
```
4. Start Redis using Docker Compose:
```bash
docker-compose up -d redis
```
5. Run the tests:
```bash
cargo test
```
### Code Quality
This project enforces code quality through:
- Formatting using `rustfmt`
- Linting using `clippy`
To manually run the checks:
```bash
# Check formatting
cargo fmt -- --check
# Run clippy
cargo clippy -- -D warnings
```
These checks run automatically:
- As a pre-commit hook when committing changes
- In CI/CD pipeline for all pull requests
## Configuration
### Worker Configuration
The `WorkerConfig` struct allows you to customize worker behavior:
```rust
let config = WorkerConfig {
retry_attempts: 3, // Number of retry attempts for failed jobs
retry_delay: Duration::from_secs(5), // Delay between retries
shutdown_timeout: Duration::from_secs(30), // Graceful shutdown timeout
};
```
### Redis Configuration
The Redis queue can be configured with a Redis URL and queue name:
```rust
let queue = RedisQueue::<MyJob>::new(
"redis://username:password@hostname:6379/0", // Redis URL with authentication
"my_queue_name"
).expect("Failed to create queue");
```
### Queue Types
The queue supports both FIFO (First In, First Out) and LIFO (Last In, First Out) behaviors:
```rust
use queue_workers::{redis_queue::RedisQueue, queue::QueueType};
// Create a FIFO queue (default behavior)
let fifo_queue = RedisQueue::<MyJob>::new(redis_url, "fifo_queue")?;
// Create a LIFO queue
let lifo_queue = RedisQueue::<MyJob>::with_type(
redis_url,
"lifo_queue",
QueueType::LIFO
)?;
```
- FIFO: Jobs are processed in the order they were added (oldest first)
- LIFO: Jobs are processed in reverse order (newest first)
## Running Multiple Workers
You can run multiple workers processing the same queue:
```rust
let queue = RedisQueue::<EmailJob>::new(redis_url, "email_queue")?;
// Spawn multiple workers
for _ in 0..3 {
let worker_queue = queue.clone();
let worker = Worker::new(worker_queue, WorkerConfig::default());
tokio::spawn(async move {
worker.start().await.expect("Worker failed");
});
}
```
## Error Handling
The library provides a custom error type `QueueWorkerError` that covers various failure scenarios:
- Redis connection issues
- Serialization errors
- Job not found
- Worker errors
- Connection timeouts
## Worker Types
The library provides two types of workers:
### Sequential Worker
Processes jobs one at a time, with retry support:
```rust
use queue_workers::{
redis_queue::RedisQueue,
worker::{Worker, WorkerConfig}
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
let worker = Worker::new(queue.clone(), config);
worker.start().await?;
```
### Concurrent Worker
Processes multiple jobs in parallel:
```rust
use queue_workers::{
redis_queue::RedisQueue,
concurrent_worker::{ConcurrentWorker, ConcurrentWorkerConfig}
};
let config = ConcurrentWorkerConfig {
max_concurrent_jobs: 5, // Process 5 jobs simultaneously
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start().await?;
// Or with shutdown support:
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start_with_shutdown(shutdown_rx).await?;
```
## Contributing
1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
## License
This project is licensed under the MIT License - see the LICENSE file for details.