rrq 0.11.1

RRQ orchestrator CLI and worker runtime.
Documentation
use anyhow::Result;

use rrq::constants::HEALTH_KEY_PREFIX;
use rrq::load_toml_settings;
use rrq::store::JobStore;

pub(crate) async fn check_workers(config: Option<String>) -> Result<()> {
    let settings = load_toml_settings(config.as_deref())?;
    let mut store = JobStore::new(settings.clone()).await?;
    let mut cursor = 0u64;
    let mut keys: Vec<String> = Vec::new();
    loop {
        let (next, batch) = store.scan_worker_health_keys(cursor, 100).await?;
        keys.extend(batch);
        if next == 0 {
            break;
        }
        cursor = next;
    }
    if keys.is_empty() {
        println!("Worker Health Check: FAIL (No active workers found)");
        return Ok(());
    }
    println!(
        "Worker Health Check: Found {} active worker(s):",
        keys.len()
    );
    for key in keys {
        let worker_id = key.trim_start_matches(HEALTH_KEY_PREFIX);
        let (health, ttl) = store.get_worker_health(worker_id).await?;
        if let Some(health) = health {
            println!("  - Worker ID: {worker_id}");
            if let Some(status) = health.get("status") {
                println!("    Status: {status}");
            }
            if let Some(active_jobs) = health.get("active_jobs") {
                println!("    Active Jobs: {active_jobs}");
            }
            if let Some(timestamp) = health.get("timestamp") {
                println!("    Last Heartbeat: {timestamp}");
            }
            println!("    TTL: {} seconds", ttl.unwrap_or(0));
        } else {
            println!("  - Worker ID: {worker_id} - Health data missing");
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::commands::test_support::RedisTestContext;
    use serde_json::Value;

    #[tokio::test]
    async fn health_check_handles_missing_and_present_workers() -> Result<()> {
        let mut ctx = RedisTestContext::new().await?;
        let config = ctx.write_config().await?;
        let config_path = Some(config.path().to_string_lossy().to_string());

        check_workers(config_path.clone()).await?;

        let mut payload = serde_json::Map::new();
        payload.insert(
            "worker_id".to_string(),
            Value::String("worker-1".to_string()),
        );
        payload.insert("status".to_string(), Value::String("running".to_string()));
        payload.insert("active_jobs".to_string(), Value::from(1));
        payload.insert("timestamp".to_string(), Value::from(1234));
        ctx.store
            .set_worker_health("worker-1", &payload, 60)
            .await?;

        check_workers(config_path).await?;
        Ok(())
    }
}