qrush 0.6.0

Lightweight Job Queue and Task Scheduler for Rust (Actix + Redis + Cron)
Documentation
// Engine runtime utilities for separate process mode

use crate::config::{QueueConfig, trigger_shutdown};
use std::time::Duration;
use tracing::{info, warn};

/// Run the qrush-engine worker process.
/// 
/// This handles:
/// - Starting worker pools
/// - Waiting for shutdown signals
/// - Graceful shutdown
/// 
/// # Arguments
/// 
/// * `redis_url` - Redis connection URL
/// * `queues` - Queue configurations
/// * `shutdown_grace_secs` - Grace period for shutdown (default: 5)
/// 
/// # Note
/// 
/// Jobs must be registered BEFORE calling this function.
/// Use `qrush::registry::register_job()` to register your jobs.
pub async fn run_engine(
    redis_url: String,
    queues: Vec<QueueConfig>,
    shutdown_grace_secs: u64,
) -> anyhow::Result<()> {
    let queues = if queues.is_empty() {
        warn!("No queues configured; defaulting to default:10");
        vec![QueueConfig::new("default", 10, 0)]
    } else {
        queues
    };

    info!(redis = %redis_url, "Starting qrush-engine");

    // Initialize worker pools
    QueueConfig::initialize(redis_url, queues).await?;

    info!("qrush-engine running. Press Ctrl+C to stop.");

    // Wait for shutdown signal
    tokio::select! {
        _ = tokio::signal::ctrl_c() => {
            info!("Received Ctrl+C");
        }
        _ = async {
            #[cfg(unix)]
            {
                use tokio::signal::unix::{signal, SignalKind};
                if let Ok(mut sigterm) = signal(SignalKind::terminate()) {
                    sigterm.recv().await;
                    return;
                }
            }
            std::future::pending::<()>().await
        } => {}
    }

    // Signal worker loops to stop
    trigger_shutdown();

    // Give workers a short grace period to exit loops
    let grace = Duration::from_secs(shutdown_grace_secs);
    info!(?grace, "Waiting for graceful shutdown");
    tokio::time::sleep(grace).await;

    info!("qrush-engine exited");
    Ok(())
}

/// Parse queue specification string into QueueConfig vector.
/// 
/// Format: "name:concurrency:priority" or "name:concurrency" or "name"
/// Multiple queues: "default:10,critical:25:0"
pub fn parse_queues(spec: &str) -> Vec<QueueConfig> {
    spec.split(',')
        .filter_map(|raw| {
            let s = raw.trim();
            if s.is_empty() {
                return None;
            }
            let parts: Vec<&str> = s.split(':').collect();
            let name = parts.get(0).unwrap_or(&"default").trim().to_string();
            let concurrency = parts
                .get(1)
                .and_then(|v| v.parse::<usize>().ok())
                .unwrap_or(5);
            let priority = parts
                .get(2)
                .and_then(|v| v.parse::<usize>().ok())
                .unwrap_or(0);

            Some(QueueConfig::new(name, concurrency, priority))
        })
        .collect()
}