rok-cli 0.3.2

Developer CLI for rok-based Axum applications
//! `rok queue:*` — queue management CLI commands.

use console::style;

// ── queue:work ────────────────────────────────────────────────────────────────

pub fn work(queue: Option<&str>, concurrency: Option<usize>) -> anyhow::Result<()> {
    let queue_name = queue.unwrap_or("default");
    let workers = concurrency.unwrap_or(4);

    println!(
        "{} Starting worker (queue={queue_name}, concurrency={workers})",
        style("queue:work").green().bold()
    );
    println!("  Use Ctrl+C to stop.\n");
    println!(
        "  {} This command requires rok-queue integration in your application.",
        style("note:").yellow()
    );
    println!("  Add the following to your main.rs:\n");
    println!(
        "    let worker = rok_queue::Worker::new(\n\
         \t\tdbc.clone(),  // Arc<dyn QueueDriver>\n\
         \t\tregistry,     // Arc<JobRegistry>\n\
         \t\trok_queue::WorkerConfig {{\n\
         \t\t\tqueues: vec![\"{queue_name}\".into()],\n\
         \t\t\tconcurrency: {workers},\n\
         \t\t\t..Default::default()\n\
         \t\t}},\n\
         \t);\n\
         \t worker.run().await;"
    );
    Ok(())
}

// ── queue:status ──────────────────────────────────────────────────────────────

pub async fn status() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();

    let url = std::env::var("DATABASE_URL")
        .map_err(|_| anyhow::anyhow!("DATABASE_URL not set — add it to .env"))?;

    println!(
        "{} Connecting to database...",
        style("queue:status").green().bold()
    );

    let pool = sqlx::PgPool::connect(&url)
        .await
        .map_err(|e| anyhow::anyhow!("Failed to connect: {e}"))?;

    #[derive(sqlx::FromRow)]
    struct Row {
        queue: String,
        status: String,
        count: i64,
    }

    let rows = sqlx::query_as::<_, Row>(
        "SELECT queue, status, COUNT(*) AS count
         FROM jobs
         GROUP BY queue, status
         ORDER BY queue, status",
    )
    .fetch_all(&pool)
    .await?;

    if rows.is_empty() {
        println!("  No jobs found.");
        return Ok(());
    }

    println!(
        "\n  {:<20} {:<12} {}",
        style("Queue").bold(),
        style("Status").bold(),
        style("Count").bold()
    );
    println!("  {}", "-".repeat(48));

    for row in &rows {
        let status_styled = match row.status.as_str() {
            "pending" => style(row.status.as_str()).yellow(),
            "running" => style(row.status.as_str()).cyan(),
            "done" => style(row.status.as_str()).green(),
            "failed" => style(row.status.as_str()).red(),
            _ => style(row.status.as_str()).white(),
        };
        println!("  {:<20} {:<20} {}", row.queue, status_styled, row.count);
    }

    Ok(())
}

// ── queue:retry ───────────────────────────────────────────────────────────────

pub async fn retry(job_id: i64) -> anyhow::Result<()> {
    dotenvy::dotenv().ok();

    let url = std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL not set"))?;

    let pool = sqlx::PgPool::connect(&url)
        .await
        .map_err(|e| anyhow::anyhow!("Failed to connect: {e}"))?;

    let affected = sqlx::query(
        "UPDATE jobs
         SET status    = 'pending',
             failed_at = NULL,
             error     = NULL,
             scheduled_at = NOW()
         WHERE id = $1 AND status = 'failed'",
    )
    .bind(job_id)
    .execute(&pool)
    .await?
    .rows_affected();

    if affected == 0 {
        println!(
            "{} Job {} not found or not in 'failed' status.",
            style("!").yellow().bold(),
            job_id
        );
    } else {
        println!(
            "{} Job {} reset to pending.",
            style("").green().bold(),
            job_id
        );
    }

    Ok(())
}

// ── queue:flush ───────────────────────────────────────────────────────────────

pub async fn flush(queue: Option<&str>) -> anyhow::Result<()> {
    dotenvy::dotenv().ok();

    let url = std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL not set"))?;

    let pool = sqlx::PgPool::connect(&url)
        .await
        .map_err(|e| anyhow::anyhow!("Failed to connect: {e}"))?;

    let result = match queue {
        Some(q) => {
            let r = sqlx::query("DELETE FROM jobs WHERE queue = $1 AND status = 'failed'")
                .bind(q)
                .execute(&pool)
                .await?;
            println!(
                "{} Flushed {} failed jobs from queue '{q}'.",
                style("").green().bold(),
                r.rows_affected()
            );
            r.rows_affected()
        }
        None => {
            let r = sqlx::query("DELETE FROM jobs WHERE status = 'failed'")
                .execute(&pool)
                .await?;
            println!(
                "{} Flushed {} failed jobs across all queues.",
                style("").green().bold(),
                r.rows_affected()
            );
            r.rows_affected()
        }
    };

    let _ = result;
    Ok(())
}