use console::style;
pub fn work(queue: Option<&str>, concurrency: Option<usize>) -> anyhow::Result<()> {
let queue_name = queue.unwrap_or("default");
let workers = concurrency.unwrap_or(4);
println!(
"{} Starting worker (queue={queue_name}, concurrency={workers})",
style("queue:work").green().bold()
);
println!(" Use Ctrl+C to stop.\n");
println!(
" {} This command requires rok-queue integration in your application.",
style("note:").yellow()
);
println!(" Add the following to your main.rs:\n");
println!(
" let worker = rok_queue::Worker::new(\n\
\t\tdbc.clone(), // Arc<dyn QueueDriver>\n\
\t\tregistry, // Arc<JobRegistry>\n\
\t\trok_queue::WorkerConfig {{\n\
\t\t\tqueues: vec![\"{queue_name}\".into()],\n\
\t\t\tconcurrency: {workers},\n\
\t\t\t..Default::default()\n\
\t\t}},\n\
\t);\n\
\t worker.run().await;"
);
Ok(())
}
pub async fn status() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let url = std::env::var("DATABASE_URL")
.map_err(|_| anyhow::anyhow!("DATABASE_URL not set — add it to .env"))?;
println!(
"{} Connecting to database...",
style("queue:status").green().bold()
);
let pool = sqlx::PgPool::connect(&url)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect: {e}"))?;
#[derive(sqlx::FromRow)]
struct Row {
queue: String,
status: String,
count: i64,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT queue, status, COUNT(*) AS count
FROM jobs
GROUP BY queue, status
ORDER BY queue, status",
)
.fetch_all(&pool)
.await?;
if rows.is_empty() {
println!(" No jobs found.");
return Ok(());
}
println!(
"\n {:<20} {:<12} {}",
style("Queue").bold(),
style("Status").bold(),
style("Count").bold()
);
println!(" {}", "-".repeat(48));
for row in &rows {
let status_styled = match row.status.as_str() {
"pending" => style(row.status.as_str()).yellow(),
"running" => style(row.status.as_str()).cyan(),
"done" => style(row.status.as_str()).green(),
"failed" => style(row.status.as_str()).red(),
_ => style(row.status.as_str()).white(),
};
println!(" {:<20} {:<20} {}", row.queue, status_styled, row.count);
}
Ok(())
}
pub async fn retry(job_id: i64) -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let url = std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL not set"))?;
let pool = sqlx::PgPool::connect(&url)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect: {e}"))?;
let affected = sqlx::query(
"UPDATE jobs
SET status = 'pending',
failed_at = NULL,
error = NULL,
scheduled_at = NOW()
WHERE id = $1 AND status = 'failed'",
)
.bind(job_id)
.execute(&pool)
.await?
.rows_affected();
if affected == 0 {
println!(
"{} Job {} not found or not in 'failed' status.",
style("!").yellow().bold(),
job_id
);
} else {
println!(
"{} Job {} reset to pending.",
style("✓").green().bold(),
job_id
);
}
Ok(())
}
pub async fn flush(queue: Option<&str>) -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let url = std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL not set"))?;
let pool = sqlx::PgPool::connect(&url)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect: {e}"))?;
let result = match queue {
Some(q) => {
let r = sqlx::query("DELETE FROM jobs WHERE queue = $1 AND status = 'failed'")
.bind(q)
.execute(&pool)
.await?;
println!(
"{} Flushed {} failed jobs from queue '{q}'.",
style("✓").green().bold(),
r.rows_affected()
);
r.rows_affected()
}
None => {
let r = sqlx::query("DELETE FROM jobs WHERE status = 'failed'")
.execute(&pool)
.await?;
println!(
"{} Flushed {} failed jobs across all queues.",
style("✓").green().bold(),
r.rows_affected()
);
r.rows_affected()
}
};
let _ = result;
Ok(())
}