qrush 0.6.0

Lightweight Job Queue and Task Scheduler for Rust (Actix + Redis + Cron)
Documentation

Qrush

Crates.io Documentation License Downloads

A lightweight, production-ready job queue and task scheduler for Rust applications built on Actix Web and Redis. Qrush provides both integrated and separate process modes, making it suitable for everything from simple background tasks to large-scale distributed systems.

Features

  • 🚀 Dual Deployment Modes: Integrated (single process) or separate worker process
  • High Performance: Built on Actix Web and Redis for maximum throughput
  • 📅 Cron Scheduling: Full cron expression support for recurring tasks
  • ⏱️ Delayed Jobs: Schedule jobs to run after a specified delay
  • 📊 Built-in Metrics UI: Real-time dashboard for monitoring queues, jobs, and workers
  • 🔒 Security: Optional Basic Auth for metrics endpoints
  • 🎯 Type-Safe: Leverages Rust's type system for safe job handling
  • 🔄 Graceful Shutdown: Clean worker shutdown with configurable grace periods
  • 📈 Scalable: Support for multiple queues with different priorities and concurrency levels

Quick Start

Installation

Add to your Cargo.toml:

[dependencies]
qrush = "0.6.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde = { version = "1", features = ["derive"] }
async-trait = "0.1"
anyhow = "1"

Basic Usage (Integrated Mode)

use qrush::job::Job;
use qrush::queue::{enqueue, enqueue_in};
use qrush::config::QueueConfig;
use qrush::registry::register_job;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::BoxFuture;
use anyhow::Result;

#[derive(Clone, Serialize, Deserialize)]
pub struct EmailJob {
    pub to: String,
    pub subject: String,
}

#[async_trait]
impl Job for EmailJob {
    async fn perform(&self) -> Result<()> {
        println!("Sending email to {}: {}", self.to, self.subject);
        // Your email sending logic here
        Ok(())
    }

    fn name(&self) -> &'static str { "EmailJob" }
    fn queue(&self) -> &'static str { "default" }
}

impl EmailJob {
    pub fn name() -> &'static str { "EmailJob" }
    pub fn handler(payload: String) -> BoxFuture<'static, Result<Box<dyn Job>>> {
        Box::pin(async move {
            let job: EmailJob = serde_json::from_str(&payload)?;
            Ok(Box::new(job) as Box<dyn Job>)
        })
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Set Redis URL
    std::env::set_var("REDIS_URL", "redis://127.0.0.1:6379");
    
    // Register job
    register_job(EmailJob::name(), EmailJob::handler);
    
    // Initialize queues
    let queues = vec![
        QueueConfig::new("default", 5, 0),
    ];
    QueueConfig::initialize(
        "redis://127.0.0.1:6379".to_string(),
        queues
    ).await?;
    
    // Enqueue a job
    enqueue(EmailJob {
        to: "user@example.com".to_string(),
        subject: "Hello!".to_string(),
    }).await?;
    
    // Keep running
    tokio::signal::ctrl_c().await?;
    Ok(())
}

Architecture

QRush supports two deployment modes:

Integrated Mode

Workers run in the same process as your application. Perfect for small to medium applications.

┌─────────────────────┐
│   Application        │
│   (Single Process)   │
│                      │
│  • HTTP Server       │
│  • Enqueue Jobs      │
│  • Process Jobs      │ ← Workers here
└─────────────────────┘

Separate Process Mode

Workers run in a dedicated process. Recommended for production environments.

┌─────────────────────┐         ┌─────────────────────┐
│   Web Server         │         │  qrush-engine       │
│   (cargo run)        │         │  (separate process) │
│                      │         │                     │
│  • HTTP Server       │         │  • Worker Pools     │
│  • Enqueue Jobs ─────┼──Redis──┼─▶ Process Jobs      │
│  • Serve Routes      │         │  • Cron Scheduler   │
└─────────────────────┘         └─────────────────────┘

Documentation

Integrated Mode

See Part 1: Integrated Mode below for complete setup instructions.

Separate Process Mode

See Part 2: Separate Process Mode below for production deployment.

API Reference

Core Traits

  • Job: Implement this trait for your job types
  • CronJob: Implement for recurring scheduled jobs

Core Functions

  • enqueue(job): Enqueue a job immediately
  • enqueue_in(job, delay_secs): Enqueue a job with a delay
  • register_job(name, handler): Register a job handler
  • QueueConfig::initialize(redis_url, queues): Start worker pools

Engine Runtime

  • qrush::engine::run_engine(redis_url, queues, shutdown_grace_secs): Run worker process
  • qrush::engine::parse_queues(spec): Parse queue specification string

Examples

Cron Jobs

use qrush::cron::cron_job::CronJob;
use qrush::cron::cron_scheduler::CronScheduler;

#[async_trait]
impl CronJob for EmailJob {
    fn cron_expression(&self) -> &'static str {
        "0 0 * * * *"  // Every hour
    }
    fn cron_id(&self) -> &'static str { "hourly_email" }
}

// Register cron job
let job = EmailJob { /* ... */ };
CronScheduler::register_cron_job(job).await?;

Multiple Queues

let queues = vec![
    QueueConfig::new("default", 5, 0),      // 5 workers, priority 0
    QueueConfig::new("critical", 10, 0),   // 10 workers, priority 0
    QueueConfig::new("low", 2, 1),         // 2 workers, priority 1
];

Metrics UI

Access the built-in metrics dashboard at /qrush/metrics:

  • Queue statistics and job counts
  • Worker status and health
  • Cron job management
  • Job retry and deletion
  • CSV export

Requirements

  • Rust 1.89.0 or later
  • Redis 6.0 or later
  • Tokio runtime (multi-threaded)

Environment Variables

# Required
REDIS_URL=redis://127.0.0.1:6379

# Optional
QRUSH_BASIC_AUTH=admin:password  # Basic auth for metrics
RUST_LOG=info,qrush=info         # Logging level

Detailed Documentation

Integrated Mode (Detailed)

Use this mode when: You want a simple setup with workers running in the same process as your web server.

1. Add Dependencies

[dependencies]
qrush = "0.6.0"
actix-web = "4"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde = { version = "1", features = ["derive"] }
async-trait = "0.1"
anyhow = "1"
futures = "0.3"

2. Define a Job

use qrush::job::Job;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::BoxFuture;
use anyhow::Result;

#[derive(Clone, Serialize, Deserialize)]
pub struct NotifyUser {
    pub user_id: String,
    pub message: String,
}

#[async_trait]
impl Job for NotifyUser {
    async fn perform(&self) -> Result<()> {
        println!("Notify {} -> {}", self.user_id, self.message);
        Ok(())
    }

    fn name(&self) -> &'static str { "NotifyUser" }
    fn queue(&self) -> &'static str { "default" }
}

impl NotifyUser {
    pub fn name() -> &'static str { "NotifyUser" }
    pub fn handler(payload: String) -> BoxFuture<'static, Result<Box<dyn Job>>> {
        Box::pin(async move {
            let job: NotifyUser = serde_json::from_str(&payload)?;
            Ok(Box::new(job) as Box<dyn Job>)
        })
    }
}

3. Initialize QRush

use qrush::config::{QueueConfig, set_redis_url};
use qrush::registry::register_job;
use actix_web::{web, App, HttpServer};

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // Set Redis URL
    let redis_url = std::env::var("REDIS_URL")
        .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
    set_redis_url(redis_url.clone())?;
    
    // Register jobs
    register_job(NotifyUser::name(), NotifyUser::handler);
    
    // Initialize queues
    let queues = vec![
        QueueConfig::new("default", 5, 0),
    ];
    QueueConfig::initialize(redis_url, queues).await?;
    
    // Start web server
    HttpServer::new(|| {
        App::new()
            .service(web::scope("/qrush")
                .configure(|cfg| {
                    // Add metrics routes
                    use qrush::routes::metrics_route::qrush_metrics_routes;
                    qrush_metrics_routes(cfg);
                }))
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

4. Enqueue Jobs

use qrush::queue::{enqueue, enqueue_in};

// Immediate
enqueue(NotifyUser {
    user_id: "123".to_string(),
    message: "Hello".to_string(),
}).await?;

// Delayed (300 seconds)
enqueue_in(NotifyUser {
    user_id: "123".to_string(),
    message: "Reminder".to_string(),
}, 300).await?;

Separate Process Mode (Detailed)

Use this mode when: You want production-ready separation with workers in a dedicated process.

1. Create Engine Binary

Create src/bin/qrush_engine.rs:

use qrush::engine::{run_engine, parse_queues};
use qrush::config::set_redis_url;
use qrush::registry::register_job;
use qrush::cron::cron_scheduler::CronScheduler;

// Your job types
use your_app::jobs::NotifyUser;

#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    
    // Setup logging
    tracing_subscriber::fmt::init();
    
    // Get Redis URL
    let redis_url = std::env::var("REDIS_URL")
        .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
    
    // Set Redis URL (needed before registering cron jobs)
    set_redis_url(redis_url.clone())?;
    
    // Register jobs
    register_job(NotifyUser::name(), NotifyUser::handler);
    
    // Register cron jobs (optional)
    let daily_report = DailyReportJob { /* ... */ };
    CronScheduler::register_cron_job(daily_report).await?;
    
    // Parse queues: "name:concurrency:priority"
    let queues = parse_queues("default:5:0,critical:10:0");
    
    // Run engine
    run_engine(redis_url, queues, 5).await
}

2. Update Cargo.toml

[[bin]]
name = "qrush_engine"
path = "src/bin/qrush_engine.rs"

3. Web Server (No Workers)

In your web server main.rs, only register jobs for enqueueing:

use qrush::config::set_redis_url;
use qrush::registry::register_job;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // Set Redis URL (needed for enqueue operations)
    let redis_url = std::env::var("REDIS_URL")
        .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
    set_redis_url(redis_url)?;
    
    // Register jobs (for serialization only)
    register_job(NotifyUser::name(), NotifyUser::handler);
    
    // Start web server (NO worker initialization)
    HttpServer::new(|| {
        App::new()
            // Your routes
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

4. Run Both Processes

Terminal 1 - Web Server:

export REDIS_URL=redis://127.0.0.1:6379
cargo run

Terminal 2 - Worker Engine:

export REDIS_URL=redis://127.0.0.1:6379
cargo run --bin qrush_engine

Cron Expressions

QRush uses 6-field cron expressions (seconds, minutes, hours, day, month, weekday):

  • "0 * * * * *" - Every minute
  • "0 */5 * * * *" - Every 5 minutes
  • "0 0 * * * *" - Every hour
  • "0 0 0 * * *" - Daily at midnight
  • "0 0 0 * * 1" - Every Monday at midnight
  • "0 0 0 1 * *" - First day of month at midnight

Metrics Endpoints

  • GET /qrush/metrics - Dashboard overview
  • GET /qrush/metrics/queues/{queue} - Queue details
  • GET /qrush/metrics/extras/cron - Cron job management
  • GET /qrush/metrics/extras/workers - Worker status
  • GET /qrush/metrics/health - Health check
  • POST /qrush/metrics/jobs/action - Job actions (retry/delete)

Production Tips

  • Use separate process mode for production
  • Set QRUSH_BASIC_AUTH to protect metrics endpoints
  • Configure appropriate queue concurrency based on your workload
  • Monitor Redis memory usage
  • Use graceful shutdown for zero-downtime deployments
  • Scale workers horizontally by running multiple engine processes

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support


Made with ❤️ by Srotas Space