use std::collections::HashMap;
use anyhow::Result;
use serde_json::Value;
use crate::cli_utils;
use rrq::constants::JOB_KEY_PREFIX;
use rrq::load_toml_settings;
use rrq::store::JobStore;
use rrq::{EnqueueOptions, JobStatus, RRQClient};
use super::shared::queue_matches;
pub(crate) async fn job_show(job_id: String, config: Option<String>, raw: bool) -> Result<()> {
let settings = load_toml_settings(config.as_deref())?;
let mut store = JobStore::new(settings).await?;
let job_map = store.get_job_data_map(&job_id).await?;
let Some(job_map) = job_map else {
println!("Job '{job_id}' not found");
return Ok(());
};
if raw {
let json = serde_json::to_string_pretty(&job_map)?;
println!("{json}");
return Ok(());
}
let mut fields: Vec<(String, String)> = job_map
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
fields.sort_by(|a, b| a.0.cmp(&b.0));
println!("Job {job_id}");
for (key, value) in fields {
println!("{:<24} {}", key, value);
}
Ok(())
}
pub(crate) async fn job_list(
config: Option<String>,
status: Option<String>,
queue: Option<String>,
function: Option<String>,
limit: usize,
) -> Result<()> {
let settings = load_toml_settings(config.as_deref())?;
let mut store = JobStore::new(settings).await?;
let mut job_keys = Vec::new();
let mut cursor = 0u64;
loop {
let (next, keys) = store.scan_job_keys(cursor, 200).await?;
job_keys.extend(keys);
if next == 0 {
break;
}
cursor = next;
}
if job_keys.is_empty() {
println!("No jobs found");
return Ok(());
}
let mut jobs: Vec<(String, HashMap<String, String>)> = Vec::new();
for key in job_keys {
let job_id = key.trim_start_matches(JOB_KEY_PREFIX).to_string();
if let Some(job_map) = store.get_job_data_map_by_key(&key).await? {
if let Some(filter) = status.as_deref()
&& job_map
.get("status")
.map(|value| value.eq_ignore_ascii_case(filter))
!= Some(true)
{
continue;
}
if let Some(filter) = queue.as_deref() {
let job_queue = job_map.get("queue_name").cloned().unwrap_or_default();
if !queue_matches(filter, &job_queue) {
continue;
}
}
if let Some(filter) = function.as_deref()
&& job_map.get("function_name").map(|value| value.as_str()) != Some(filter)
{
continue;
}
jobs.push((job_id, job_map));
}
}
jobs.sort_by(|a, b| {
let a_ts =
a.1.get("enqueue_time")
.and_then(|value| cli_utils::parse_timestamp(value))
.unwrap_or(0.0);
let b_ts =
b.1.get("enqueue_time")
.and_then(|value| cli_utils::parse_timestamp(value))
.unwrap_or(0.0);
b_ts.partial_cmp(&a_ts).unwrap_or(std::cmp::Ordering::Equal)
});
let jobs = jobs.into_iter().take(limit).collect::<Vec<_>>();
println!(
"{:<36} {:<30} {:<24} {:<10} {:<18} {:>10}",
"Job ID", "Function", "Queue", "Status", "Enqueued", "Duration"
);
for (job_id, job_map) in jobs {
let function_name = job_map
.get("function_name")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
let queue_name = job_map
.get("queue_name")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
let status = cli_utils::format_status(job_map.get("status").map(|value| value.as_str()));
let enqueue =
cli_utils::format_timestamp(job_map.get("enqueue_time").map(|value| value.as_str()));
let duration = match (
job_map.get("start_time"),
job_map.get("completion_time"),
job_map.get("status"),
) {
(Some(start), Some(end), _) => {
let start_ts = cli_utils::parse_timestamp(start).unwrap_or(0.0);
let end_ts = cli_utils::parse_timestamp(end).unwrap_or(0.0);
cli_utils::format_duration(Some(end_ts - start_ts))
}
(Some(start), None, Some(status)) if status.eq_ignore_ascii_case("active") => {
let start_ts = cli_utils::parse_timestamp(start).unwrap_or(0.0);
let now = chrono::Utc::now().timestamp() as f64;
cli_utils::format_duration(Some(now - start_ts))
}
_ => "N/A".to_string(),
};
println!(
"{:<36} {:<30} {:<24} {:<10} {:<18} {:>10}",
job_id,
cli_utils::truncate(&function_name, 28),
cli_utils::truncate(&queue_name, 22),
status,
enqueue,
duration
);
}
Ok(())
}
pub(crate) async fn job_replay(
job_id: String,
config: Option<String>,
queue: Option<String>,
) -> Result<()> {
let settings = load_toml_settings(config.as_deref())?;
let mut store = JobStore::new(settings.clone()).await?;
let Some(job_map) = store.get_job_data_map(&job_id).await? else {
println!("Job '{job_id}' not found");
return Ok(());
};
let function_name = job_map
.get("function_name")
.cloned()
.ok_or_else(|| anyhow::anyhow!("job missing function_name"))?;
let params = cli_utils::parse_json(job_map.get("job_params").map(|v| v.as_str()))
.unwrap_or(Value::Object(serde_json::Map::new()));
let queue_name = queue
.or_else(|| job_map.get("queue_name").cloned())
.unwrap_or_else(|| settings.default_queue_name.clone());
let mut client = RRQClient::new(settings.clone(), store.clone());
let params_map = match params {
Value::Object(map) => map,
_ => serde_json::Map::new(),
};
let job = client
.enqueue(
&function_name,
params_map,
EnqueueOptions {
queue_name: Some(queue_name.clone()),
..Default::default()
},
)
.await?;
println!("Job replayed with new ID: {}", job.id);
Ok(())
}
pub(crate) async fn job_cancel(job_id: String, config: Option<String>) -> Result<()> {
let settings = load_toml_settings(config.as_deref())?;
let mut store = JobStore::new(settings).await?;
let Some(job_map) = store.get_job_data_map(&job_id).await? else {
println!("Job '{job_id}' not found");
return Ok(());
};
let status = job_map.get("status").cloned().unwrap_or_default();
if !status.eq_ignore_ascii_case("pending") {
println!("Can only cancel pending jobs. Job status: {status}");
return Ok(());
}
let queue_name = job_map.get("queue_name").cloned().unwrap_or_default();
if queue_name.is_empty() {
println!("Job has no associated queue");
return Ok(());
}
let removed = store.remove_job_from_queue(&queue_name, &job_id).await?;
if removed > 0 {
store
.update_job_status(&job_id, JobStatus::Cancelled)
.await?;
println!("Job '{job_id}' cancelled successfully");
} else {
println!("Failed to remove job from queue");
}
Ok(())
}
pub(crate) async fn job_trace(job_id: String, config: Option<String>) -> Result<()> {
let settings = load_toml_settings(config.as_deref())?;
let mut store = JobStore::new(settings).await?;
let Some(job_map) = store.get_job_data_map(&job_id).await? else {
println!("Job '{job_id}' not found");
return Ok(());
};
let mut events: Vec<(String, f64, String)> = Vec::new();
if let Some(enqueue) = job_map
.get("enqueue_time")
.and_then(|v| cli_utils::parse_timestamp(v))
{
let function_name = job_map
.get("function_name")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
events.push((
"Enqueued".to_string(),
enqueue,
format!("Function: {function_name}"),
));
}
if let Some(start) = job_map
.get("start_time")
.and_then(|v| cli_utils::parse_timestamp(v))
{
let worker_id = job_map
.get("worker_id")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
events.push(("Started".to_string(), start, format!("Worker: {worker_id}")));
}
let retries = job_map
.get("current_retries")
.and_then(|v| v.parse::<i64>().ok())
.unwrap_or(0);
for i in 0..retries {
let key = format!("retry_{i}_at");
if let Some(ts) = job_map
.get(&key)
.and_then(|v| cli_utils::parse_timestamp(v))
{
let max_retries = job_map
.get("max_retries")
.cloned()
.unwrap_or_else(|| "0".to_string());
events.push((
format!("Retry {}", i + 1),
ts,
format!("Attempt {} of {max_retries}", i + 1),
));
}
}
if let Some(end) = job_map
.get("completion_time")
.and_then(|v| cli_utils::parse_timestamp(v))
{
let status = job_map
.get("status")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
events.push(("Finished".to_string(), end, format!("Status: {status}")));
}
events.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
if events.is_empty() {
println!("No trace data found for job {job_id}");
return Ok(());
}
println!("Job trace for {job_id}");
println!("{:<12} {:<20} Details", "Event", "Timestamp");
let mut prev: Option<f64> = None;
for (event, ts, details) in &events {
let time_str = cli_utils::to_utc_rfc3339(*ts);
let mut gap = String::new();
if let Some(prev) = prev {
gap = format!(" (+{})", cli_utils::format_duration(Some(ts - prev)));
}
println!("{:<12} {:<20} {}{}", event, time_str, details, gap);
prev = Some(*ts);
}
if let (Some(first), Some(last)) = (events.first(), events.last()) {
let duration = last.1 - first.1;
println!(
"\nTotal Duration: {}",
cli_utils::format_duration(Some(duration))
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commands::test_support::RedisTestContext;
use chrono::{Duration, Utc};
use rrq::Job;
use serde_json::json;
use std::collections::HashMap;
fn build_job(queue_name: &str, dlq_name: &str) -> Job {
Job {
id: Job::new_id(),
function_name: "trace_job".to_string(),
job_params: serde_json::Map::from_iter([
("arg".to_string(), json!("arg")),
("flag".to_string(), json!(true)),
]),
enqueue_time: Utc::now(),
start_time: Some(Utc::now() - Duration::seconds(10)),
status: JobStatus::Completed,
current_retries: 1,
next_scheduled_run_time: None,
max_retries: 3,
job_timeout_seconds: Some(30),
result_ttl_seconds: Some(60),
job_unique_key: None,
completion_time: Some(Utc::now()),
result: None,
last_error: None,
queue_name: Some(queue_name.to_string()),
dlq_name: Some(dlq_name.to_string()),
worker_id: Some("worker-1".to_string()),
trace_context: None,
correlation_context: None,
}
}
#[tokio::test]
async fn job_commands_cover_branches() -> Result<()> {
let mut ctx = RedisTestContext::new().await?;
let config = ctx.write_config().await?;
let config_path = Some(config.path().to_string_lossy().to_string());
job_show("missing".to_string(), config_path.clone(), false).await?;
let mut client = RRQClient::new(ctx.settings.clone(), ctx.store.clone());
let job = client
.enqueue(
"do_work",
serde_json::Map::from_iter([("n".to_string(), json!(1))]),
EnqueueOptions::default(),
)
.await?;
job_show(job.id.clone(), config_path.clone(), true).await?;
job_list(
config_path.clone(),
Some("pending".to_string()),
Some(ctx.settings.default_queue_name.clone()),
Some("do_work".to_string()),
10,
)
.await?;
let (cursor, before_keys) = ctx.store.scan_job_keys(0, 200).await?;
assert_eq!(cursor, 0);
job_replay(job.id.clone(), config_path.clone(), None).await?;
let (_, after_keys) = ctx.store.scan_job_keys(0, 200).await?;
assert!(after_keys.len() > before_keys.len());
job_replay(
job.id.clone(),
config_path.clone(),
Some("manual-bare".to_string()),
)
.await?;
let manual_queue_size = ctx.store.queue_size("rrq:queue:manual-bare").await?;
assert!(manual_queue_size >= 1);
let cancel_job = client
.enqueue(
"cancel_me",
serde_json::Map::new(),
EnqueueOptions::default(),
)
.await?;
job_cancel(cancel_job.id.clone(), config_path.clone()).await?;
let updated = ctx.store.get_job_definition(&cancel_job.id).await?.unwrap();
assert_eq!(updated.status, JobStatus::Cancelled);
let trace_job = build_job(
&ctx.settings.default_queue_name,
&ctx.settings.default_dlq_name,
);
ctx.store.save_job_definition(&trace_job).await?;
let mut fields = HashMap::new();
fields.insert(
"retry_0_at".to_string(),
(Utc::now() - Duration::seconds(5)).to_rfc3339(),
);
ctx.store.update_job_fields(&trace_job.id, &fields).await?;
job_trace(trace_job.id, config_path.clone()).await?;
Ok(())
}
}