# Qrush
[](https://crates.io/crates/qrush)
[](https://docs.rs/qrush)
[](LICENSE)
[](https://crates.io/crates/qrush)
A lightweight, production-ready job queue and task scheduler for Rust applications built on Actix Web and Redis. Qrush provides both integrated and separate process modes, making it suitable for everything from simple background tasks to large-scale distributed systems.
## Features
- π **Dual Deployment Modes**: Integrated (single process) or separate worker process
- β‘ **High Performance**: Built on Actix Web and Redis for maximum throughput
- π
**Cron Scheduling**: Full cron expression support for recurring tasks
- β±οΈ **Delayed Jobs**: Schedule jobs to run after a specified delay
- π **Built-in Metrics UI**: Real-time dashboard for monitoring queues, jobs, and workers
- π **Security**: Optional Basic Auth for metrics endpoints
- π― **Type-Safe**: Leverages Rust's type system for safe job handling
- π **Graceful Shutdown**: Clean worker shutdown with configurable grace periods
- π **Scalable**: Support for multiple queues with different priorities and concurrency levels
## Quick Start
### Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
qrush = "0.6.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde = { version = "1", features = ["derive"] }
async-trait = "0.1"
anyhow = "1"
```
### Basic Usage (Integrated Mode)
```rust
use qrush::job::Job;
use qrush::queue::{enqueue, enqueue_in};
use qrush::config::QueueConfig;
use qrush::registry::register_job;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::BoxFuture;
use anyhow::Result;
#[derive(Clone, Serialize, Deserialize)]
pub struct EmailJob {
pub to: String,
pub subject: String,
}
#[async_trait]
impl Job for EmailJob {
async fn perform(&self) -> Result<()> {
println!("Sending email to {}: {}", self.to, self.subject);
// Your email sending logic here
Ok(())
}
fn name(&self) -> &'static str { "EmailJob" }
fn queue(&self) -> &'static str { "default" }
}
impl EmailJob {
pub fn name() -> &'static str { "EmailJob" }
pub fn handler(payload: String) -> BoxFuture<'static, Result<Box<dyn Job>>> {
Box::pin(async move {
let job: EmailJob = serde_json::from_str(&payload)?;
Ok(Box::new(job) as Box<dyn Job>)
})
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Set Redis URL
std::env::set_var("REDIS_URL", "redis://127.0.0.1:6379");
// Register job
register_job(EmailJob::name(), EmailJob::handler);
// Initialize queues
let queues = vec![
QueueConfig::new("default", 5, 0),
];
QueueConfig::initialize(
"redis://127.0.0.1:6379".to_string(),
queues
).await?;
// Enqueue a job
enqueue(EmailJob {
to: "user@example.com".to_string(),
subject: "Hello!".to_string(),
}).await?;
// Keep running
tokio::signal::ctrl_c().await?;
Ok(())
}
```
## Architecture
QRush supports two deployment modes:
### Integrated Mode
Workers run in the same process as your application. Perfect for small to medium applications.
```
βββββββββββββββββββββββ
β Application β
β (Single Process) β
β β
β β’ HTTP Server β
β β’ Enqueue Jobs β
β β’ Process Jobs β β Workers here
βββββββββββββββββββββββ
```
### Separate Process Mode
Workers run in a dedicated process. Recommended for production environments.
```
βββββββββββββββββββββββ βββββββββββββββββββββββ
β Web Server β β qrush-engine β
β (cargo run) β β (separate process) β
β β β β
β β’ HTTP Server β β β’ Worker Pools β
β β’ Enqueue Jobs ββββββΌββRedisβββΌββΆ Process Jobs β
β β’ Serve Routes β β β’ Cron Scheduler β
βββββββββββββββββββββββ βββββββββββββββββββββββ
```
## Documentation
### Integrated Mode
See [Part 1: Integrated Mode](#integrated-mode-detailed) below for complete setup instructions.
### Separate Process Mode
See [Part 2: Separate Process Mode](#separate-process-mode-detailed) below for production deployment.
## API Reference
### Core Traits
- `Job`: Implement this trait for your job types
- `CronJob`: Implement for recurring scheduled jobs
### Core Functions
- `enqueue(job)`: Enqueue a job immediately
- `enqueue_in(job, delay_secs)`: Enqueue a job with a delay
- `register_job(name, handler)`: Register a job handler
- `QueueConfig::initialize(redis_url, queues)`: Start worker pools
### Engine Runtime
- `qrush::engine::run_engine(redis_url, queues, shutdown_grace_secs)`: Run worker process
- `qrush::engine::parse_queues(spec)`: Parse queue specification string
## Examples
### Cron Jobs
```rust
use qrush::cron::cron_job::CronJob;
use qrush::cron::cron_scheduler::CronScheduler;
#[async_trait]
impl CronJob for EmailJob {
fn cron_expression(&self) -> &'static str {
"0 0 * * * *" // Every hour
}
fn cron_id(&self) -> &'static str { "hourly_email" }
}
// Register cron job
let job = EmailJob { /* ... */ };
CronScheduler::register_cron_job(job).await?;
```
### Multiple Queues
```rust
let queues = vec![
QueueConfig::new("default", 5, 0), // 5 workers, priority 0
QueueConfig::new("critical", 10, 0), // 10 workers, priority 0
QueueConfig::new("low", 2, 1), // 2 workers, priority 1
];
```
### Metrics UI
Access the built-in metrics dashboard at `/qrush/metrics`:
- Queue statistics and job counts
- Worker status and health
- Cron job management
- Job retry and deletion
- CSV export
## Requirements
- Rust 1.89.0 or later
- Redis 6.0 or later
- Tokio runtime (multi-threaded)
## Environment Variables
```bash
# Required
REDIS_URL=redis://127.0.0.1:6379
# Optional
QRUSH_BASIC_AUTH=admin:password # Basic auth for metrics
RUST_LOG=info,qrush=info # Logging level
```
# Detailed Documentation
## Integrated Mode (Detailed)
**Use this mode when:** You want a simple setup with workers running in the same process as your web server.
### 1. Add Dependencies
```toml
[dependencies]
qrush = "0.6.0"
actix-web = "4"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde = { version = "1", features = ["derive"] }
async-trait = "0.1"
anyhow = "1"
futures = "0.3"
```
### 2. Define a Job
```rust
use qrush::job::Job;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::BoxFuture;
use anyhow::Result;
#[derive(Clone, Serialize, Deserialize)]
pub struct NotifyUser {
pub user_id: String,
pub message: String,
}
#[async_trait]
impl Job for NotifyUser {
async fn perform(&self) -> Result<()> {
println!("Notify {} -> {}", self.user_id, self.message);
Ok(())
}
fn name(&self) -> &'static str { "NotifyUser" }
fn queue(&self) -> &'static str { "default" }
}
impl NotifyUser {
pub fn name() -> &'static str { "NotifyUser" }
pub fn handler(payload: String) -> BoxFuture<'static, Result<Box<dyn Job>>> {
Box::pin(async move {
let job: NotifyUser = serde_json::from_str(&payload)?;
Ok(Box::new(job) as Box<dyn Job>)
})
}
}
```
### 3. Initialize QRush
```rust
use qrush::config::{QueueConfig, set_redis_url};
use qrush::registry::register_job;
use actix_web::{web, App, HttpServer};
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Set Redis URL
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
set_redis_url(redis_url.clone())?;
// Register jobs
register_job(NotifyUser::name(), NotifyUser::handler);
// Initialize queues
let queues = vec![
QueueConfig::new("default", 5, 0),
];
QueueConfig::initialize(redis_url, queues).await?;
// Start web server
HttpServer::new(|| {
App::new()
.service(web::scope("/qrush")
.configure(|cfg| {
// Add metrics routes
use qrush::routes::metrics_route::qrush_metrics_routes;
qrush_metrics_routes(cfg);
}))
})
.bind("0.0.0.0:8080")?
.run()
.await
}
```
### 4. Enqueue Jobs
```rust
use qrush::queue::{enqueue, enqueue_in};
// Immediate
enqueue(NotifyUser {
user_id: "123".to_string(),
message: "Hello".to_string(),
}).await?;
// Delayed (300 seconds)
enqueue_in(NotifyUser {
user_id: "123".to_string(),
message: "Reminder".to_string(),
}, 300).await?;
```
---
## Separate Process Mode (Detailed)
**Use this mode when:** You want production-ready separation with workers in a dedicated process.
### 1. Create Engine Binary
Create `src/bin/qrush_engine.rs`:
```rust
use qrush::engine::{run_engine, parse_queues};
use qrush::config::set_redis_url;
use qrush::registry::register_job;
use qrush::cron::cron_scheduler::CronScheduler;
// Your job types
use your_app::jobs::NotifyUser;
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
// Setup logging
tracing_subscriber::fmt::init();
// Get Redis URL
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
// Set Redis URL (needed before registering cron jobs)
set_redis_url(redis_url.clone())?;
// Register jobs
register_job(NotifyUser::name(), NotifyUser::handler);
// Register cron jobs (optional)
let daily_report = DailyReportJob { /* ... */ };
CronScheduler::register_cron_job(daily_report).await?;
// Parse queues: "name:concurrency:priority"
let queues = parse_queues("default:5:0,critical:10:0");
// Run engine
run_engine(redis_url, queues, 5).await
}
```
### 2. Update Cargo.toml
```toml
[[bin]]
name = "qrush_engine"
path = "src/bin/qrush_engine.rs"
```
### 3. Web Server (No Workers)
In your web server `main.rs`, only register jobs for enqueueing:
```rust
use qrush::config::set_redis_url;
use qrush::registry::register_job;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Set Redis URL (needed for enqueue operations)
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
set_redis_url(redis_url)?;
// Register jobs (for serialization only)
register_job(NotifyUser::name(), NotifyUser::handler);
// Start web server (NO worker initialization)
HttpServer::new(|| {
App::new()
// Your routes
})
.bind("0.0.0.0:8080")?
.run()
.await
}
```
### 4. Run Both Processes
**Terminal 1 - Web Server:**
```bash
export REDIS_URL=redis://127.0.0.1:6379
cargo run
```
**Terminal 2 - Worker Engine:**
```bash
export REDIS_URL=redis://127.0.0.1:6379
cargo run --bin qrush_engine
```
---
## Cron Expressions
QRush uses 6-field cron expressions (seconds, minutes, hours, day, month, weekday):
- `"0 * * * * *"` - Every minute
- `"0 */5 * * * *"` - Every 5 minutes
- `"0 0 * * * *"` - Every hour
- `"0 0 0 * * *"` - Daily at midnight
- `"0 0 0 * * 1"` - Every Monday at midnight
- `"0 0 0 1 * *"` - First day of month at midnight
## Metrics Endpoints
- `GET /qrush/metrics` - Dashboard overview
- `GET /qrush/metrics/queues/{queue}` - Queue details
- `GET /qrush/metrics/extras/cron` - Cron job management
- `GET /qrush/metrics/extras/workers` - Worker status
- `GET /qrush/metrics/health` - Health check
- `POST /qrush/metrics/jobs/action` - Job actions (retry/delete)
## Production Tips
- Use separate process mode for production
- Set `QRUSH_BASIC_AUTH` to protect metrics endpoints
- Configure appropriate queue concurrency based on your workload
- Monitor Redis memory usage
- Use graceful shutdown for zero-downtime deployments
- Scale workers horizontally by running multiple engine processes
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## Support
- **Documentation**: [docs.rs/qrush](https://docs.rs/qrush)
- **Issues**: [GitHub Issues](https://github.com/srotas-space/qrush/issues)
- **Discussions**: [GitHub Discussions](https://github.com/srotas-space/qrush/discussions)
---
Made with β€οΈ by [Srotas Space](https://srotas.space/open-source)
---