Qrush 🌀 – Lightweight Job Queue for Rust (Actix + Redis)
Qrush is a lightweight background job queue system for Rust, designed for Actix-Web projects with Redis as the backend. It supports job definition, scheduling (including delayed jobs), and monitoring with optional Basic Auth-protected Prometheus metrics.
🚀 Features
- Simple job definition using a trait
- Background job execution
- Delayed job scheduling (
enqueue_in)
- Redis-based queue persistence
- Prometheus-style metrics endpoint (
/qrush/metrics)
- Easy integration with Actix-Web
- Optional Basic Auth protection
📦 Installation
Add this to your Cargo.toml:
qrush = "0.2.0"
Or use:
cargo add qrush
⚙️ Integration Guide for Version >= 0.2.0
1. Initialize Qrush
Create src/jobs/qrush.rs:
use std::sync::{Arc};
use tokio::sync::Notify;
use std::env;
use qrush::config::QueueConfig;
use qrush::registry::register_job;
use qrush::config::QUEUE_INITIALIZED;
use qrush::job::Job;
use qrush::config::{set_basic_auth, QrushBasicAuthConfig};
use crate::jobs::notify_user::NotifyUser;
pub async fn initiate(basic_auth: Option<QrushBasicAuthConfig>) {
let queueNotify = Arc::new(Notify::new());
let basic_auth = basic_auth.or_else(|| {
std::env::var("QRUSH_BASIC_AUTH").ok().and_then(|auth| {
let parts: Vec<&str> = auth.splitn(2, ':').collect();
if parts.len() == 2 {
Some(QrushBasicAuthConfig {
username: parts[0].to_string(),
password: parts[1].to_string(),
})
} else {
None
}
})
});
let _ = set_basic_auth(basic_auth);
let _ = QUEUE_INITIALIZED.set(queueNotify.clone());
register_job(NotifyUser::name(), NotifyUser::handler);
tokio::spawn({
let queueNotify = queueNotify.clone();
async move {
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let queues = vec![
QueueConfig::new("default", 5, 1),
QueueConfig::new("critical", 10, 0),
];
if let Err(err) = QueueConfig::initialize(redis_url, queues).await {
eprintln!("❌ Failed to initialize qrush: {:?}", err);
} else {
println!("✅ qrush started successfully");
queueNotify.notify_waiters();
}
}
});
queueNotify.notified().await;
println!("🚀 Queue initialization complete. Continuing main logic...");
}
2. Call initiate() in main.rs
use actix_web::{web, App, HttpServer};
use qrush::routes::metrics_route::qrush_metrics_routes;
use crate::jobs::qrush::initiate as qrush_initiate;
use qrush::config::QrushBasicAuthConfig;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let basic_auth = Some(QrushBasicAuthConfig {
username: "qrush".into(),
password: "qrush".into(),
});
qrush_initiate(basic_auth).await;
HttpServer::new(|| {
App::new()
.service(web::scope("/qrush").configure(qrush_metrics_routes))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
3. Define a Job
Create src/jobs/workers/notify_user.rs:
use qrush::job::Job;
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use anyhow::{Result, Error};
use futures::future::{BoxFuture, FutureExt};
#[derive(Debug, Serialize, Deserialize)]
pub struct NotifyUser {
pub user_id: String,
pub message: String,
}
#[async_trait]
impl Job for NotifyUser {
fn name(&self) -> &'static str {
"NotifyUser"
}
fn queue(&self) -> &'static str {
"default"
}
async fn before(&self) -> Result<()> {
println!("⏳ Before NotifyUser job for user: {}", self.user_id);
Ok(())
}
async fn perform(&self) -> Result<()> {
println!("📬 Performing NotifyUser: '{}' to user {}", self.message, self.user_id);
Ok(())
}
async fn after(&self) {
println!("✅ After NotifyUser job for user: {}", self.user_id);
}
async fn on_error(&self, err: &Error) {
eprintln!("❌ Error in NotifyUser job for user {}: {:?}", self.user_id, err);
}
async fn always(&self) {
println!("🔁 Always block executed for NotifyUser job");
}
}
impl NotifyUser {
pub fn name() -> &'static str {
"notify_user"
}
pub fn handler(payload: String) -> BoxFuture<'static, Result<Box<dyn Job>>> {
async move {
let job: NotifyUser = serde_json::from_str(&payload)?;
Ok(Box::new(job) as Box<dyn Job>)
}.boxed()
}
}
4. Enqueue a Job Anywhere
use qrush::queue::{enqueue, enqueue_in};
use crate::jobs::notify_user::NotifyUser;
let _ = enqueue(NotifyUser {
user_id: "abc".into(),
message: "Hello from Qrush!".into(),
}).await;
let _ = enqueue_in(NotifyUser {
user_id: "abc".into(),
message: "Delayed job after 60s".into(),
}, 60).await;
5. Metrics Route
Visit:
http://localhost:8080/qrush/metrics
⚙️ Integration Guide for Version <= 0.1.2
1. Initialize Qrush (Legacy)
Create src/jobs/qrush.rs:
use std::sync::Arc;
use tokio::sync::Notify;
use qrush::config::{QueueConfig, set_basic_auth, QUEUE_INITIALIZED, QrushBasicAuthConfig};
use qrush::registry::register_jobs;
use qrush::job::Job;
use crate::jobs::workers::notify_user_job::NotifyUser;
pub async fn initiate(basic_auth: Option<QrushBasicAuthConfig>) {
let queue_notify = Arc::new(Notify::new());
let _ = set_basic_auth(basic_auth);
let _ = QUEUE_INITIALIZED.set(queue_notify.clone());
register_jobs(<NotifyUser as Job>::name(), NotifyUser::handler);
tokio::spawn({
let queue_notify = queue_notify.clone();
async move {
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let queues = vec![
QueueConfig::new("default", 5, 1),
QueueConfig::new("critical", 10, 0),
];
if let Err(err) = QueueConfig::initialize(redis_url, queues).await {
eprintln!("Failed to initialize Qrush: {:?}", err);
} else {
println!("Qrush started successfully");
queue_notify.notify_waiters();
}
}
});
queue_notify.notified().await;
}
2. Call initiate() in main.rs
use actix_web::{web, App, HttpServer};
use qrush::routes::metrics_route::qrush_metrics_routes;
use crate::jobs::qrush::initiate as qrush_initiate;
use qrush::config::QrushBasicAuthConfig;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let basic_auth = Some(QrushBasicAuthConfig {
username: "qrush".into(),
password: "qrush".into(),
});
qrush_initiate(basic_auth).await;
HttpServer::new(|| {
App::new()
.service(web::scope("/qrush").configure(qrush_metrics_routes))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
3. Define a Job (Legacy Style)
Create src/jobs/workers/notify_user_job.rs:
use qrush::job::Job;
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use anyhow::Result;
use futures::future::BoxFuture;
#[derive(Debug, Serialize, Deserialize)]
pub struct NotifyUser {
pub user_id: String,
pub message: String,
}
#[async_trait]
impl Job for NotifyUser {
async fn perform(&self) -> Result<()> {
println!("Performing NotifyUser: '{}' to user {}", self.message, self.user_id);
Ok(())
}
fn name() -> &'static str {
"NotifyUser"
}
fn queue() -> &'static str {
"default"
}
}
impl NotifyUser {
pub fn handler(payload: String) -> BoxFuture<'static, Result<()>> {
Box::pin(async move {
let job: NotifyUser = serde_json::from_str(&payload)?;
job.perform().await
})
}
}
4. Enqueue a Job
use qrush::queue::{enqueue, enqueue_in};
use crate::jobs::workers::notify_user_job::NotifyUser;
let _ = enqueue(NotifyUser {
user_id: "abc".into(),
message: "Hello from Qrush!".into(),
}).await;
let _ = enqueue_in(NotifyUser {
user_id: "abc".into(),
message: "Delayed job after 60s".into(),
}, 60).await;
5. Metrics Route
Visit:
http://localhost:8080/qrush/metrics
📈 Optional Environment Variables
| Variable |
Description |
Default |
REDIS_URL |
Redis connection string |
redis://127.0.0.1:6379 |
QRUSH_BASIC_AUTH |
Basic Auth as user:pass |
Optional |
💪 TODO / Roadmap
- 💀 Dead Letter Queues
- 📜 Persistent Job Logs
- 🌐 Web UI for Monitoring
👨💼 Author
Built with ❤️ by SNM Maurya
🖼️ Screenshots
Qrush Metrics Example
https://snmmaurya.com/qrush/metrics.png
https://snmmaurya.com/qrush/summary.png
🪪 License
MIT
🔧 Optional Enhancements
Feel free to open issues or PRs with feature requests.