use crate::error::{FlecheError, Result};
use crate::local;
use crate::output::OutputFormat;
use crate::registry::{
ArchivedFilter, JobRecord, JobStatus, LiveStatus, Registry, build_job_filter_pattern,
};
use crate::runtime::RuntimeCtx;
use crate::slurm::{get_job_resource_usage, get_job_status};
use console::style;
use regex::Regex;
use serde::Serialize;
use std::path::PathBuf;
use super::display::{print_indexed_job_table, print_job_details, print_job_table};
use super::get_remote_direct_job_status;
use super::job_path_from_workspace;
const DEFAULT_LIST_LIMIT: usize = 20;
#[derive(Default)]
pub struct StatusOptions<'a> {
pub filters: &'a [String],
pub name: Option<&'a str>,
pub tags: &'a [(String, String)],
pub last: Option<usize>,
pub default_limit: Option<usize>,
pub archived: ArchivedFilter,
pub compact: bool,
pub format: OutputFormat,
}
async fn query_live_status(job: &JobRecord, ctx: &RuntimeCtx) -> Result<LiveStatus> {
if job.remote_host == "local" {
let project_path = PathBuf::from(&job.project_path);
return local::get_local_job_status(&project_path, &job.id);
}
let ssh = ctx.ssh(&job.remote_host);
if let Some(ref slurm_id) = job.slurm_id {
get_job_status(&ssh, slurm_id).await
} else {
let job_dir = job_path_from_workspace(&job.remote_path, &job.id);
get_remote_direct_job_status(&ssh, &job_dir).await
}
}
pub async fn show_status(
job_id: Option<&str>,
opts: StatusOptions<'_>,
ctx: RuntimeCtx,
) -> Result<()> {
let registry = Registry::open()?;
if let Some(id) = job_id {
show_job_detail(®istry, id, &ctx, opts.format).await?;
} else {
refresh_active_job_statuses(®istry, &ctx).await?;
list_recent_jobs(®istry, &opts)?;
}
Ok(())
}
async fn show_job_detail(
registry: &Registry,
id: &str,
ctx: &RuntimeCtx,
format: OutputFormat,
) -> Result<()> {
let mut job = registry.get_job(id)?;
if let Ok(live) = query_live_status(&job, ctx).await {
registry.update_status(&job.id, &live)?;
job.status = live.status;
job.exit_code = live.exit_code;
job.slurm_state = live.slurm_state;
job.sacct_exit_code = live.sacct_exit_code;
}
let usage = if let Some(ref slurm_id) = job.slurm_id {
if job.remote_host != "local"
&& matches!(
job.status,
JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled
)
{
let ssh = ctx.ssh(&job.remote_host);
get_job_resource_usage(&ssh, slurm_id).await.ok()
} else {
None
}
} else {
None
};
format.print(&job, || {
print_job_details(&job, usage.as_ref());
Ok(())
})
}
fn list_recent_jobs(registry: &Registry, opts: &StatusOptions<'_>) -> Result<()> {
let status_filters: Vec<JobStatus> = opts
.filters
.iter()
.map(|f| f.parse())
.collect::<Result<Vec<_>>>()?;
let limit = opts
.last
.unwrap_or_else(|| opts.default_limit.unwrap_or(DEFAULT_LIST_LIMIT));
let jobs = if opts.archived == ArchivedFilter::ExcludeArchived {
let (indices, jobs) = list_indexed_jobs(registry, opts, &status_filters, limit)?;
if opts.format.is_human() && !jobs.is_empty() {
print_indexed_job_table(&jobs, &indices, !opts.compact);
}
jobs
} else {
let jobs = registry.list_jobs(
None,
&status_filters,
opts.name,
None,
opts.tags,
opts.archived,
limit,
)?;
if opts.format.is_human() && !jobs.is_empty() {
print_job_table(&jobs, !opts.compact);
}
jobs
};
opts.format.print(&jobs, || {
if jobs.is_empty() {
println!("No jobs found. Run `fleche run` to submit a job.");
}
Ok(())
})
}
fn list_indexed_jobs(
registry: &Registry,
opts: &StatusOptions<'_>,
status_filters: &[JobStatus],
limit: usize,
) -> Result<(Vec<usize>, Vec<JobRecord>)> {
let has_filters = !status_filters.is_empty() || opts.name.is_some() || !opts.tags.is_empty();
let fetch_limit = if has_filters {
limit.saturating_mul(10).max(1000)
} else {
limit
};
let all_jobs = registry.list_all_jobs(fetch_limit)?;
let name_re = opts
.name
.map(|p| {
let pattern = build_job_filter_pattern(p);
Regex::new(&pattern)
.map_err(|e| FlecheError::InvalidRegexPattern(format!("--name '{p}': {e}")))
})
.transpose()?;
Ok(all_jobs
.into_iter()
.enumerate()
.filter(|(_, job)| {
(status_filters.is_empty() || status_filters.contains(&job.status))
&& name_re.as_ref().is_none_or(|re| re.is_match(&job.id))
&& opts
.tags
.iter()
.all(|(k, v)| job.tags.get(k).is_some_and(|tv| tv == v))
})
.take(limit)
.map(|(i, job)| (i + 1, job))
.unzip())
}
pub fn note_job(job_id: &str, note: Option<&str>) -> Result<()> {
let registry = Registry::open()?;
let job = registry.get_job(job_id)?;
if let Some(note_text) = note {
registry.set_note(&job.id, Some(note_text))?;
println!(
"{} Note set for job {}",
style("✓").green(),
style(&job.id).bold()
);
} else {
match job.note {
Some(ref note_text) => {
println!("{} {}", style("Note:").bold(), note_text);
}
None => {
println!("No note set for job {}.", job.id);
}
}
}
Ok(())
}
#[derive(Serialize)]
struct TagEntry {
key: String,
value: String,
}
fn to_tag_entries(tags: Vec<(String, String)>) -> Vec<TagEntry> {
tags.into_iter()
.map(|(key, value)| TagEntry { key, value })
.collect()
}
pub fn list_tags(format: OutputFormat) -> Result<()> {
let registry = Registry::open()?;
let tags = registry.list_unique_tags()?;
let entries = to_tag_entries(tags);
format.print(&entries, || {
if entries.is_empty() {
println!("No tags found. Use --tag when running jobs to add tags.");
return Ok(());
}
let mut current_key = "";
for entry in &entries {
if entry.key != current_key {
if !current_key.is_empty() {
println!();
}
println!("{}", style(&entry.key).bold());
current_key = &entry.key;
}
println!(" {}", entry.value);
}
Ok(())
})
}
pub async fn refresh_active_job_statuses(registry: &Registry, ctx: &RuntimeCtx) -> Result<()> {
let active_jobs = registry.list_active_jobs()?;
for job in active_jobs {
if let Ok(live) = query_live_status(&job, ctx).await {
if live.status != job.status {
registry.update_status(&job.id, &live)?;
}
}
}
Ok(())
}
pub fn resolve_job(
registry: &Registry,
job_id: Option<&str>,
tags: &[(String, String)],
note_filter: Option<&str>,
) -> Result<JobRecord> {
if let Some(id) = job_id {
registry.get_job(id)
} else {
registry
.list_jobs(
None,
&[],
None,
note_filter,
tags,
ArchivedFilter::ExcludeArchived,
1,
)?
.into_iter()
.next()
.ok_or(FlecheError::NoRecentJob)
}
}