use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::{Args, Subcommand};
use gradatum_core::{JobFilter, JobRecord, JobStatus, QueueStore};
use gradatum_db_sqlite::{apply_sqlite_pragmas, run_migrations, SqliteQueueStore};
use sqlx::SqlitePool;
use ulid::Ulid;
#[derive(Debug, Subcommand)]
pub enum JobsCmd {
List(JobsListArgs),
Get(JobsGetArgs),
Cancel(JobsCancelArgs),
Dlq(JobsDlqArgs),
}
#[derive(Debug, Args)]
pub struct JobsListArgs {
#[arg(long, default_value = "/var/lib/gradatum")]
pub root: PathBuf,
#[arg(long)]
pub status: Option<String>,
#[arg(long)]
pub kind: Option<String>,
#[arg(long, default_value = "50")]
pub limit: usize,
}
#[derive(Debug, Args)]
pub struct JobsGetArgs {
#[arg(long, default_value = "/var/lib/gradatum")]
pub root: PathBuf,
pub id: String,
}
#[derive(Debug, Args)]
pub struct JobsCancelArgs {
#[arg(long, default_value = "/var/lib/gradatum")]
pub root: PathBuf,
pub id: String,
}
#[derive(Debug, Args)]
pub struct JobsDlqArgs {
#[arg(long, default_value = "/var/lib/gradatum")]
pub root: PathBuf,
#[arg(long)]
pub replay: Option<String>,
#[arg(long)]
pub replay_all: bool,
#[arg(long, default_value = "50")]
pub limit: usize,
}
pub async fn run(cmd: JobsCmd) -> Result<()> {
match cmd {
JobsCmd::List(args) => jobs_list(args).await,
JobsCmd::Get(args) => jobs_get(args).await,
JobsCmd::Cancel(args) => jobs_cancel(args).await,
JobsCmd::Dlq(args) => jobs_dlq(args).await,
}
}
async fn open_queue_pool(root: &std::path::Path) -> Result<SqlitePool> {
let db_path = root.join("db").join("queue.sqlite");
let url = format!("sqlite://{}?mode=rwc", db_path.display());
let pool = SqlitePool::connect(&url).await.with_context(|| {
format!(
"impossible d'ouvrir la queue SQLite : {}",
db_path.display()
)
})?;
apply_sqlite_pragmas(&pool)
.await
.context("erreur pragmas WAL queue")?;
run_migrations(&pool)
.await
.context("erreur migrations queue")?;
Ok(pool)
}
fn format_record_short(r: &JobRecord) -> String {
format!(
"{id} {status:<12} {kind:<20} class={class:?} prio={prio} created={created}",
id = r.id,
status = format!("{:?}", r.lifecycle.status),
kind = format!("{:?}", r.spec.kind)
.chars()
.take(20)
.collect::<String>(),
class = r.spec.class,
prio = r.spec.priority.as_u8(),
created = r.lifecycle.created_at.format("%Y-%m-%dT%H:%M:%SZ"),
)
}
fn parse_status(s: &str) -> Result<JobStatus> {
match s.to_lowercase().as_str() {
"pending" => Ok(JobStatus::Pending),
"running" => Ok(JobStatus::Running),
"waiting" => Ok(JobStatus::Waiting),
"done" => Ok(JobStatus::Done),
"failed" => Ok(JobStatus::Failed),
"dlq" => Ok(JobStatus::DLQ),
"cancelled" | "canceled" => Ok(JobStatus::Cancelled),
other => anyhow::bail!("statut inconnu : '{}' (valeurs valides : pending, running, waiting, done, failed, dlq, cancelled)", other),
}
}
async fn jobs_list(args: JobsListArgs) -> Result<()> {
let pool = open_queue_pool(&args.root).await?;
let store = SqliteQueueStore::new(pool);
let status = args.status.as_deref().map(parse_status).transpose()?;
let limit = args.limit.clamp(1, 200);
let filter = JobFilter {
status,
kind: args.kind.clone(),
limit,
..Default::default()
};
let records = store
.list(filter)
.await
.context("erreur lors du listing des jobs")?;
if records.is_empty() {
println!("Aucun job trouvé.");
return Ok(());
}
println!(
"{} job(s) — filtres: status={:?} kind={:?} limit={}",
records.len(),
args.status,
args.kind,
limit
);
println!("{}", "─".repeat(90));
for r in &records {
println!("{}", format_record_short(r));
}
Ok(())
}
async fn jobs_get(args: JobsGetArgs) -> Result<()> {
let id = args
.id
.parse::<Ulid>()
.with_context(|| format!("ULID invalide : '{}'", args.id))?;
let pool = open_queue_pool(&args.root).await?;
let store = SqliteQueueStore::new(pool);
match store.get(id).await.context("erreur get job")? {
None => {
eprintln!("Job {} introuvable.", id);
std::process::exit(1);
}
Some(record) => {
let json = serde_json::to_string_pretty(&record)
.context("erreur sérialisation JSON JobRecord")?;
println!("{}", json);
}
}
Ok(())
}
async fn jobs_cancel(args: JobsCancelArgs) -> Result<()> {
let id = args
.id
.parse::<Ulid>()
.with_context(|| format!("ULID invalide : '{}'", args.id))?;
let pool = open_queue_pool(&args.root).await?;
let store = SqliteQueueStore::new(pool);
let record = match store.get(id).await.context("erreur get job")? {
None => {
eprintln!("Job {} introuvable.", id);
std::process::exit(1);
}
Some(r) => r,
};
match record.lifecycle.status {
JobStatus::Running => {
eprintln!(
"Impossible d'annuler le job {} : statut Running (409 Conflict).",
id
);
eprintln!("Attendre la fin d'exécution ou utiliser `fail_dlq` si le worker est mort.");
std::process::exit(1);
}
JobStatus::Done | JobStatus::DLQ | JobStatus::Cancelled => {
println!(
"Job {} déjà terminal (statut={:?}) — annulation idempotente.",
id, record.lifecycle.status
);
return Ok(());
}
_ => {}
}
store
.cancel(id)
.await
.context("erreur lors de l'annulation du job")?;
println!("Job {} annulé (statut=Cancelled).", id);
Ok(())
}
async fn jobs_dlq(args: JobsDlqArgs) -> Result<()> {
let pool = open_queue_pool(&args.root).await?;
let store = SqliteQueueStore::new(pool.clone());
if let Some(ref id_str) = args.replay {
let id = id_str
.parse::<Ulid>()
.with_context(|| format!("ULID invalide : '{}'", id_str))?;
return replay_single(&store, &pool, id).await;
}
let filter = JobFilter {
status: Some(JobStatus::DLQ),
limit: args.limit.clamp(1, 200),
..Default::default()
};
let dlq_jobs = store.list(filter).await.context("erreur listing DLQ")?;
if dlq_jobs.is_empty() {
println!("DLQ vide — aucun job en Dead Letter Queue.");
return Ok(());
}
println!("{} job(s) en DLQ :", dlq_jobs.len());
println!("{}", "─".repeat(90));
for r in &dlq_jobs {
let last_err = r.retry.last_error.as_deref().unwrap_or("(sans détail)");
println!(
"{} retries={}/{} last_error={}",
format_record_short(r),
r.retry.count,
r.retry.max,
&last_err[..last_err.len().min(80)],
);
}
if args.replay_all {
println!();
println!("Replay de {} job(s) DLQ en Pending...", dlq_jobs.len());
let mut replayed = 0usize;
let mut errors = 0usize;
for r in &dlq_jobs {
match replay_single(&store, &pool, r.id).await {
Ok(()) => replayed += 1,
Err(e) => {
eprintln!(" ERREUR replay {} : {e}", r.id);
errors += 1;
}
}
}
println!("Replay terminé : {} OK, {} erreur(s).", replayed, errors);
}
Ok(())
}
async fn replay_single(_store: &SqliteQueueStore, pool: &SqlitePool, id: Ulid) -> Result<()> {
let id_str = id.to_string();
let result = sqlx::query(
r#"
UPDATE gradatum_jobs
SET status = 'Pending',
lease_until = NULL,
scheduled_at = datetime('now'),
-- Réinitialise le compteur de tentatives : sans reset, le job replayed
-- aurait attempt_count >= max_attempts et serait immédiatement renvoyé
-- en DLQ par promote_retries dès le prochain sweep (30s).
attempt_count = 0,
last_error = NULL
WHERE id = ?
AND status = 'DLQ'
"#,
)
.bind(&id_str)
.execute(pool)
.await
.with_context(|| format!("erreur replay DLQ job {}", id))?;
if result.rows_affected() == 0 {
anyhow::bail!("Job {} non trouvé en DLQ (statut ≠ DLQ ou ID inconnu)", id);
}
println!(" Job {} remis en Pending (replay DLQ OK).", id);
Ok(())
}