use crate::config::{Config, ResolvedJob, SlurmConfig, reject_empty_path_entries};
use crate::error::{FlecheError, Result};
use crate::local;
use crate::ntfy;
use crate::registry::{JobStatus, LiveStatus, Registry};
use crate::runtime::{RuntimeCtx, send_notification};
use crate::slurm::{generate_sbatch_script, get_job_status, submit_job};
use crate::ssh::{SshClient, shell_escape};
use crate::sync::{
list_input_sync_files, list_project_sync_files, sync_inputs_to_workspace,
sync_project_to_workspace,
};
use chrono::Utc;
use console::style;
use rand::Rng;
use std::io::Write;
use std::time::Duration;
use super::{job_path, workspace_path};
fn require_shell() -> Result<()> {
#[cfg(unix)]
if std::process::Command::new("sh")
.arg("-c")
.arg("true")
.output()
.is_err()
{
return Err(FlecheError::MissingDependency(
"sh not found. Local execution requires a Unix shell.\n \
Windows: Install Git Bash, WSL, or Cygwin"
.to_string(),
));
}
Ok(())
}
#[derive(Debug, Default)]
pub struct RunJobOptions {
pub background: bool,
pub notify: bool,
pub ntfy_topic: Option<String>,
pub dry_run: bool,
pub after: Option<String>,
pub retry: Option<u32>,
pub note: Option<String>,
pub exec: bool,
}
pub async fn run_job(
config: &Config,
job_or_command: Option<&str>,
command_override: Option<&str>,
env_overrides: &[(String, String)],
tags: &[(String, String)],
slurm_overrides: SlurmConfig,
host_override: Option<&str>,
opts: RunJobOptions,
ctx: RuntimeCtx,
) -> Result<()> {
let (job_name, actual_command) = if let Some(joc) = job_or_command {
if config.jobs.contains_key(joc) {
(Some(joc), command_override)
} else {
(None, Some(joc))
}
} else {
(None, command_override)
};
let mut job = config.resolve_job(job_name, actual_command, env_overrides, &slurm_overrides)?;
if opts.exec {
job.exec = true;
}
let host = host_override.map_or_else(|| job.host.clone(), String::from);
if host == "local" {
return run_job_locally(config, &job, tags, &opts, ctx).await;
}
if job.exec {
return run_job_direct_remote(config, &job, &host, tags, &opts, ctx).await;
}
let dependency_slurm_id = if let Some(ref dep_job_id) = opts.after {
let registry = Registry::open()?;
let dep_job = registry.get_job(dep_job_id)?;
let slurm_id = dep_job
.slurm_id
.ok_or_else(|| FlecheError::NoSlurmId(dep_job.id.clone()))?;
Some(slurm_id)
} else {
None
};
let job_id = generate_job_id(&job.name);
let workspace = workspace_path(config);
if opts.dry_run {
let job_dir = job_path(config, &job_id);
let script = generate_sbatch_script(&job_id, &job, &workspace, &job_dir);
println!(
"{}",
style("[dry-run] Generated sbatch script:").bold().yellow()
);
println!();
println!("{script}");
println!();
print_dry_run_synced_files(config, &job.inputs).await?;
return Ok(());
}
let job_dir = job_path(config, &job_id);
let ssh =
prepare_remote_workspace(config, &host, &workspace, &job_dir, &job.inputs, ctx).await?;
let max_attempts = opts.retry.map_or(1, |r| r + 1);
let mut attempt = 0;
loop {
attempt += 1;
let job_id = if attempt == 1 {
job_id.clone()
} else {
generate_job_id(&job.name)
};
let job_dir = job_path(config, &job_id);
if attempt > 1 {
ssh.mkdir(&job_dir).await?;
}
let script = generate_sbatch_script(&job_id, &job, &workspace, &job_dir);
println!("{} Submitting job to Slurm...", style("[4/4]").bold().dim());
ssh.write_file(&format!("{job_dir}/job.sbatch"), &script)
.await?;
let dep = if attempt == 1 {
dependency_slurm_id.as_deref()
} else {
None
};
let slurm_id = submit_job(&ssh, &job_dir, dep).await?;
let registry = Registry::open()?;
let job_note = if attempt == 1 {
opts.note.as_deref()
} else {
None
};
registry.insert_job(
&job_id,
Some(&slurm_id),
&job,
&config.project_name,
&config.project_path.to_string_lossy(),
&host,
&workspace,
tags,
job_note,
)?;
if let Some(ref topic) = opts.ntfy_topic {
ntfy::notify_state_change(
topic,
&job_id,
None,
JobStatus::Pending,
opts.note.as_deref(),
);
}
println!();
if attempt > 1 {
println!(
"{} {} (attempt {}/{})",
style("Job ID:").green().bold(),
job_id,
attempt,
max_attempts
);
} else {
println!("{} {}", style("Job ID:").green().bold(), job_id);
}
println!("{} {}", style("Slurm ID:").green().bold(), slurm_id);
if opts.background {
if ctx.should_notify(opts.notify) || opts.ntfy_topic.is_some() {
wait_and_notify(&job_id, &host, opts.ntfy_topic.as_deref(), ctx).await?;
}
break;
}
println!();
let live = follow_job_logs(
&host,
&slurm_id,
&job_dir,
opts.ntfy_topic.as_deref(),
&job_id,
ctx,
)
.await?;
registry.update_status(&job_id, &live)?;
if live.status == JobStatus::Failed && attempt < max_attempts {
let delay_secs = ctx.retry_base_delay_secs * (1 << (attempt - 1));
println!();
println!(
"{} Retrying in {} seconds (attempt {}/{})...",
style("↻").yellow().bold(),
delay_secs,
attempt + 1,
max_attempts
);
tokio::time::sleep(Duration::from_secs(delay_secs)).await;
println!();
} else {
break;
}
}
Ok(())
}
async fn run_job_locally(
config: &Config,
job: &ResolvedJob,
tags: &[(String, String)],
opts: &RunJobOptions,
ctx: RuntimeCtx,
) -> Result<()> {
require_shell()?;
if let Some(ref dep_job_id) = opts.after {
let registry = Registry::open()?;
let dep_job = registry.get_job(dep_job_id)?;
if dep_job.status != JobStatus::Completed {
return Err(FlecheError::MissingDependency(format!(
"Dependency job '{}' has not completed successfully (status: {:?}). \
Use 'fleche wait {}' to wait for it.",
dep_job.id, dep_job.status, dep_job.id
)));
}
}
let job_id = generate_job_id(&job.name);
if !job.inputs.is_empty() {
eprintln!(
"{}",
style("Warning: inputs are ignored for local jobs (files are already local)").yellow()
);
}
if !job.outputs.is_empty() {
eprintln!(
"{}",
style("Warning: outputs are ignored for local jobs (files are already local)").yellow()
);
}
if job.slurm.partition.is_some()
|| job.slurm.time.is_some()
|| job.slurm.gpus.is_some()
|| job.slurm.cpus.is_some()
|| job.slurm.memory.is_some()
{
eprintln!(
"{}",
style("Warning: Slurm options are ignored for local jobs").yellow()
);
}
if opts.dry_run {
println!("{}", style("[dry-run] Would run locally:").bold().yellow());
println!();
println!(" Command: {}", job.command);
println!(" Working directory: {}", config.project_path.display());
if !job.env.is_empty() {
println!(" Environment:");
for (k, v) in &job.env {
println!(" {k}={v}");
}
}
return Ok(());
}
let job_dir = local::ensure_job_dir(&config.project_path, &job_id)?;
let registry = Registry::open()?;
registry.insert_job(
&job_id,
None, job,
&config.project_name,
&config.project_path.to_string_lossy(),
"local",
&config.project_path.to_string_lossy(),
tags,
opts.note.as_deref(),
)?;
println!("{} {}", style("Job ID:").green().bold(), job_id);
println!("{} {}", style("Job directory:").dim(), job_dir.display());
println!();
if opts.background {
#[cfg(windows)]
return Err(FlecheError::MissingDependency(
"Background local jobs (--bg) are not supported on Windows.\n \
Use foreground mode or run in WSL."
.to_string(),
));
#[cfg(unix)]
{
let pid = local::run_background(&config.project_path, &job_id, &job.command, &job.env)?;
println!("{} {}", style("PID:").green().bold(), pid);
println!(
"{}",
style("Job running in background. Use 'fleche logs' to view output.").dim()
);
registry.update_status(&job_id, &LiveStatus::new(JobStatus::Running))?;
if ctx.should_notify(opts.notify) || opts.ntfy_topic.is_some() {
let project_path = config.project_path.clone();
let job_id_clone = job_id.clone();
let poll_interval = ctx.poll_interval_local_secs;
let ntfy_topic = opts.ntfy_topic.clone();
let note = opts.note.clone();
let should_term_notify = ctx.should_notify(opts.notify);
tokio::spawn(async move {
let mut prev_status: Option<JobStatus> = Some(JobStatus::Running);
loop {
tokio::time::sleep(Duration::from_secs(poll_interval)).await;
match local::get_local_job_status(&project_path, &job_id_clone) {
Ok(live) => {
if let Ok(registry) = Registry::open() {
let _ = registry.update_status(&job_id_clone, &live);
}
if let Some(ref topic) = ntfy_topic {
ntfy::notify_state_change(
topic,
&job_id_clone,
prev_status,
live.status,
note.as_deref(),
);
prev_status = Some(live.status);
}
match live.status {
JobStatus::Completed => {
if should_term_notify {
send_notification(&format!(
"Job {job_id_clone} completed successfully."
));
}
break;
}
JobStatus::Failed => {
if should_term_notify {
send_notification(&format!(
"Job {job_id_clone} failed."
));
}
break;
}
JobStatus::Cancelled => {
if should_term_notify {
send_notification(&format!(
"Job {job_id_clone} was cancelled."
));
}
break;
}
_ => {}
}
}
Err(_) => break,
}
}
});
}
}
} else {
let max_attempts = opts.retry.map_or(1, |r| r + 1);
let mut attempt = 0;
loop {
attempt += 1;
let job_id = if attempt == 1 {
job_id.clone()
} else {
let new_id = generate_job_id(&job.name);
let _job_dir = local::ensure_job_dir(&config.project_path, &new_id)?;
let registry = Registry::open()?;
registry.insert_job(
&new_id,
None,
job,
&config.project_name,
&config.project_path.to_string_lossy(),
"local",
&config.project_path.to_string_lossy(),
tags,
None, )?;
println!("{} {}", style("Job ID:").green().bold(), new_id);
println!();
new_id
};
println!(
"{}",
style("Running locally (Ctrl+C to cancel)...").yellow()
);
if attempt > 1 {
println!(
"{}",
style(format!("(attempt {attempt}/{max_attempts})")).dim()
);
}
println!();
let registry = Registry::open()?;
registry.update_status(&job_id, &LiveStatus::new(JobStatus::Running))?;
let exit_code =
local::run_foreground(&config.project_path, &job_id, &job.command, &job.env)?;
let final_status = if exit_code == 0 {
JobStatus::Completed
} else {
JobStatus::Failed
};
registry.update_status(
&job_id,
&LiveStatus::with_exit_code(final_status, exit_code),
)?;
println!();
if exit_code == 0 {
println!("{}", style("Job completed successfully.").green().bold());
if ctx.should_notify(opts.notify) {
send_notification(&format!("Job {job_id} completed successfully."));
}
break;
}
println!(
"{} (exit code: {})",
style("Job failed.").red().bold(),
exit_code
);
if attempt < max_attempts {
let delay_secs = ctx.retry_base_delay_secs * (1 << (attempt - 1));
println!();
println!(
"{} Retrying in {} seconds (attempt {}/{})...",
style("↻").yellow().bold(),
delay_secs,
attempt + 1,
max_attempts
);
tokio::time::sleep(Duration::from_secs(delay_secs)).await;
println!();
} else {
if ctx.should_notify(opts.notify) {
send_notification(&format!("Job {job_id} failed."));
}
break;
}
}
}
Ok(())
}
async fn prepare_remote_workspace(
config: &Config,
host: &str,
workspace: &str,
job_dir: &str,
inputs: &[String],
ctx: RuntimeCtx,
) -> Result<crate::ssh::SshClient> {
let ssh = ctx.ssh(host);
println!(
"{} Creating remote directories...",
style("[1/4]").bold().dim()
);
ssh.mkdir(workspace).await?;
ssh.mkdir(job_dir).await?;
print!("{} Syncing project code...", style("[2/4]").bold().dim());
let _ = std::io::stdout().flush();
let stats = sync_project_to_workspace(&config.project_path, host, workspace).await?;
println!(" {}", style(format!("({})", stats.human_readable())).dim());
if inputs.is_empty() {
println!("{} No input files to sync", style("[3/4]").bold().dim());
} else {
print!("{} Syncing input files...", style("[3/4]").bold().dim());
let _ = std::io::stdout().flush();
let stats = sync_inputs_to_workspace(&config.project_path, inputs, host, workspace).await?;
println!(" {}", style(format!("({})", stats.human_readable())).dim());
}
Ok(ssh)
}
async fn print_dry_run_synced_files(config: &Config, inputs: &[String]) -> Result<()> {
let project_files = list_project_sync_files(&config.project_path).await?;
println!(
"{}",
style(format!(
"[dry-run] Project files to sync ({}):",
project_files.len()
))
.bold()
.yellow()
);
for file in &project_files {
println!(" {file}");
}
let input_files = list_input_sync_files(&config.project_path, inputs).await?;
if !input_files.is_empty() {
println!();
println!(
"{}",
style(format!(
"[dry-run] Input files to sync ({}):",
input_files.len()
))
.bold()
.yellow()
);
for file in &input_files {
println!(" {file}");
}
}
Ok(())
}
pub async fn rerun_job(
config: &Config,
job_id: &str,
tags: &[(String, String)],
background: bool,
ntfy_topic: Option<&str>,
ctx: RuntimeCtx,
) -> Result<()> {
let registry = Registry::open()?;
let old_job = registry.get_job(job_id)?;
let resolved: ResolvedJob = serde_json::from_str(&old_job.config_json)?;
let mut merged_tags: Vec<(String, String)> = old_job
.tags
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (k, v) in tags {
if let Some(pos) = merged_tags.iter().position(|(key, _)| key == k) {
merged_tags[pos] = (k.clone(), v.clone());
} else {
merged_tags.push((k.clone(), v.clone()));
}
}
run_job_with_resolved(config, &resolved, &merged_tags, background, ntfy_topic, ctx).await
}
async fn run_job_with_resolved(
config: &Config,
job: &ResolvedJob,
tags: &[(String, String)],
background: bool,
ntfy_topic: Option<&str>,
ctx: RuntimeCtx,
) -> Result<()> {
if job.host == "local" {
let opts = RunJobOptions {
background,
notify: false,
ntfy_topic: ntfy_topic.map(String::from),
dry_run: false,
after: None,
retry: None,
note: None,
exec: false,
};
return run_job_locally(config, job, tags, &opts, ctx).await;
}
if job.exec {
let opts = RunJobOptions {
background,
notify: false,
ntfy_topic: ntfy_topic.map(String::from),
dry_run: false,
after: None,
retry: None,
note: None,
exec: true,
};
return run_job_direct_remote(config, job, &job.host, tags, &opts, ctx).await;
}
let workspace = workspace_path(config);
let host = job.host.clone();
let job_id = generate_job_id(&job.name);
let job_dir = job_path(config, &job_id);
let ssh =
prepare_remote_workspace(config, &host, &workspace, &job_dir, &job.inputs, ctx).await?;
println!("{} Submitting job to Slurm...", style("[4/4]").bold().dim());
let script = generate_sbatch_script(&job_id, job, &workspace, &job_dir);
ssh.write_file(&format!("{job_dir}/job.sbatch"), &script)
.await?;
let slurm_id = submit_job(&ssh, &job_dir, None).await?;
let registry = Registry::open()?;
registry.insert_job(
&job_id,
Some(&slurm_id),
job,
&config.project_name,
&config.project_path.to_string_lossy(),
&host,
&workspace,
tags,
None, )?;
if let Some(topic) = ntfy_topic {
ntfy::notify_state_change(topic, &job_id, None, JobStatus::Pending, None);
}
println!();
println!("{} {}", style("Job ID:").green().bold(), job_id);
println!("{} {}", style("Slurm ID:").green().bold(), slurm_id);
if !background {
println!();
follow_job_logs(&host, &slurm_id, &job_dir, ntfy_topic, &job_id, ctx).await?;
}
Ok(())
}
pub async fn exec_command(
config: &Config,
command: &str,
env_overrides: &[(String, String)],
host_override: Option<&str>,
no_sync: bool,
ctx: RuntimeCtx,
) -> Result<()> {
let host = host_override.map_or_else(|| config.remote.host.clone(), String::from);
if host == "local" {
return exec_command_locally(config, command, env_overrides);
}
let workspace = workspace_path(config);
let ssh = ctx.ssh(&host);
if no_sync {
println!("Skipping sync, executing command directly...");
} else {
for (name, job) in &config.jobs {
reject_empty_path_entries(name, "inputs", &job.inputs, &job.inputs)?;
}
println!(
"{} Creating remote directories...",
style("[1/3]").bold().dim()
);
ssh.mkdir(&workspace).await?;
print!("{} Syncing project code...", style("[2/3]").bold().dim());
let _ = std::io::stdout().flush();
let stats = sync_project_to_workspace(&config.project_path, &host, &workspace).await?;
println!(" {}", style(format!("({})", stats.human_readable())).dim());
let global_inputs: Vec<String> = config
.jobs
.values()
.flat_map(|j| j.inputs.clone())
.collect();
if global_inputs.is_empty() {
println!("{} Executing command...", style("[3/3]").bold().dim());
} else {
print!("{} Syncing input files...", style("[3/3]").bold().dim());
let _ = std::io::stdout().flush();
let stats =
sync_inputs_to_workspace(&config.project_path, &global_inputs, &host, &workspace)
.await?;
println!(" {}", style(format!("({})", stats.human_readable())).dim());
}
}
let env_str = if env_overrides.is_empty() {
String::new()
} else {
let vars: Vec<String> = env_overrides
.iter()
.map(|(k, v)| format!("{}={}", k, shell_escape(v)))
.collect();
format!("{} ", vars.join(" "))
};
println!();
let full_command = format!("cd {} && {}{}", shell_escape(&workspace), env_str, command);
let (success, stdout, stderr) = ssh.exec_allow_failure(&full_command).await?;
if !stdout.is_empty() {
print!("{stdout}");
}
if !stderr.is_empty() {
eprint!("{stderr}");
}
if !success {
return Err(FlecheError::SshCommand(
"Command exited with non-zero status".to_string(),
));
}
Ok(())
}
fn exec_command_locally(
config: &Config,
command: &str,
env_overrides: &[(String, String)],
) -> Result<()> {
require_shell()?;
println!(
"{} Executing command locally...",
style("[1/1]").bold().dim()
);
println!();
let mut cmd = local::shell_command(command);
cmd.current_dir(&config.project_path);
for (k, v) in env_overrides {
cmd.env(k, v);
}
let status = cmd.status()?;
if !status.success() {
return Err(FlecheError::SshCommand(
"Command exited with non-zero status".to_string(),
));
}
Ok(())
}
fn generate_job_id(job_name: &str) -> String {
let now = Utc::now();
let suffix: String = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(4)
.map(char::from)
.collect::<String>()
.to_lowercase();
format!(
"{}-{}-{}",
job_name,
now.format("%Y%m%d-%H%M%S-%3f"),
suffix
)
}
async fn follow_job_logs(
host: &str,
slurm_id: &str,
job_dir: &str,
ntfy_topic: Option<&str>,
job_id: &str,
ctx: RuntimeCtx,
) -> Result<LiveStatus> {
println!(
"{}",
style("Streaming output (Ctrl+C to disconnect, job keeps running)...").yellow()
);
let ssh = ctx.ssh(host);
let stdout_path = format!("{job_dir}/job.out");
let stderr_path = format!("{job_dir}/job.err");
let mut child = ssh.tail_follow(&[&stdout_path, &stderr_path])?;
let slurm_id = slurm_id.to_string();
let host = host.to_string();
let slurm_id_for_check = slurm_id.clone();
let host_for_check = host.clone();
let ntfy_topic_owned = ntfy_topic.map(String::from);
let job_id_owned = job_id.to_string();
let status_check = async move {
let mut prev_status: Option<JobStatus> = None;
loop {
tokio::time::sleep(Duration::from_secs(ctx.poll_interval_remote_secs)).await;
let ssh = ctx.ssh(&host_for_check);
if let Ok(live) = get_job_status(&ssh, &slurm_id_for_check).await {
if let Some(ref topic) = ntfy_topic_owned {
ntfy::notify_state_change(topic, &job_id_owned, prev_status, live.status, None);
prev_status = Some(live.status);
}
match live.status {
JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled => {
return live;
}
_ => {}
}
}
}
};
let live = tokio::select! {
_ = child.wait() => {
let ssh = ctx.ssh(&host);
let mut result = None;
for attempt in 0..6 {
if attempt > 0 {
tokio::time::sleep(Duration::from_secs(2)).await;
}
if let Ok(live) = get_job_status(&ssh, &slurm_id).await {
result = Some(live);
break;
}
}
result.unwrap_or_else(|| LiveStatus::new(JobStatus::Failed))
}
result = status_check => {
let _ = child.kill().await;
tokio::time::sleep(Duration::from_millis(500)).await;
result
}
};
println!();
let message = match live.status {
JobStatus::Completed => "Job completed successfully.".to_string(),
JobStatus::Failed => match live.exit_code {
Some(code) => format!("Job failed (exit code: {code})."),
None => "Job failed.".to_string(),
},
JobStatus::Cancelled => "Job cancelled.".to_string(),
_ => "Job finished.".to_string(),
};
match live.status {
JobStatus::Completed => {
println!("{}", style(&message).green().bold());
}
JobStatus::Failed => {
println!("{}", style(&message).red().bold());
}
JobStatus::Cancelled => {
println!("{}", style(&message).yellow().bold());
}
_ => {}
}
send_notification(&message);
Ok(live)
}
async fn wait_and_notify(
job_id: &str,
remote_host: &str,
ntfy_topic: Option<&str>,
ctx: RuntimeCtx,
) -> Result<()> {
println!(
"{}",
style("Waiting for job to complete (will notify when done)...").dim()
);
let registry = Registry::open()?;
let ssh = ctx.ssh(remote_host);
let mut prev_status: Option<JobStatus> = None;
loop {
let job = registry.get_job(job_id)?;
if let Some(ref slurm_id) = job.slurm_id {
let live = get_job_status(&ssh, slurm_id).await?;
registry.update_status(job_id, &live)?;
if let Some(topic) = ntfy_topic {
ntfy::notify_state_change(topic, job_id, prev_status, live.status, None);
prev_status = Some(live.status);
}
match live.status {
JobStatus::Completed => {
let message = format!("Job {job_id} completed successfully.");
println!("{}", style(&message).green().bold());
send_notification(&message);
return Ok(());
}
JobStatus::Failed => {
let message = format!("Job {job_id} failed.");
println!("{}", style(&message).red().bold());
send_notification(&message);
return Ok(());
}
JobStatus::Cancelled => {
let message = format!("Job {job_id} was cancelled.");
println!("{}", style(&message).yellow().bold());
send_notification(&message);
return Ok(());
}
_ => {}
}
}
tokio::time::sleep(Duration::from_secs(ctx.poll_interval_remote_secs)).await;
}
}
async fn run_job_direct_remote(
config: &Config,
job: &ResolvedJob,
host: &str,
tags: &[(String, String)],
opts: &RunJobOptions,
ctx: RuntimeCtx,
) -> Result<()> {
if job.slurm.partition.is_some()
|| job.slurm.time.is_some()
|| job.slurm.gpus.is_some()
|| job.slurm.cpus.is_some()
|| job.slurm.memory.is_some()
{
eprintln!(
"{}",
style("Warning: Slurm options are ignored for exec jobs").yellow()
);
}
let job_id = generate_job_id(&job.name);
let workspace = workspace_path(config);
let job_dir = job_path(config, &job_id);
if opts.dry_run {
let script = generate_exec_script(job, &workspace, &job_dir);
println!(
"{}",
style("[dry-run] Generated exec script:").bold().yellow()
);
println!();
println!("{script}");
println!();
print_dry_run_synced_files(config, &job.inputs).await?;
return Ok(());
}
let ssh =
prepare_remote_workspace(config, host, &workspace, &job_dir, &job.inputs, ctx).await?;
let max_attempts = opts.retry.map_or(1, |r| r + 1);
let mut attempt = 0;
loop {
attempt += 1;
let job_id = if attempt == 1 {
job_id.clone()
} else {
generate_job_id(&job.name)
};
let job_dir = job_path(config, &job_id);
if attempt > 1 {
ssh.mkdir(&job_dir).await?;
}
let script = generate_exec_script(job, &workspace, &job_dir);
println!(
"{} Starting remote exec job...",
style("[4/4]").bold().dim()
);
ssh.write_file(&format!("{job_dir}/run.sh"), &script)
.await?;
ssh.exec(&format!(
"nohup sh {job_dir}/run.sh > /dev/null 2>&1 & echo started"
))
.await?;
let registry = Registry::open()?;
let job_note = if attempt == 1 {
opts.note.as_deref()
} else {
None
};
registry.insert_job(
&job_id,
None, job,
&config.project_name,
&config.project_path.to_string_lossy(),
host,
&workspace,
tags,
job_note,
)?;
registry.update_status(&job_id, &LiveStatus::new(JobStatus::Running))?;
if let Some(ref topic) = opts.ntfy_topic {
ntfy::notify_state_change(
topic,
&job_id,
None,
JobStatus::Running,
opts.note.as_deref(),
);
}
println!();
if attempt > 1 {
println!(
"{} {} (attempt {}/{})",
style("Job ID:").green().bold(),
job_id,
attempt,
max_attempts
);
} else {
println!("{} {}", style("Job ID:").green().bold(), job_id);
}
if opts.background {
println!(
"{}",
style("Job running in background. Use 'fleche logs' to view output.").dim()
);
if ctx.should_notify(opts.notify) || opts.ntfy_topic.is_some() {
wait_and_notify_direct(&job_id, host, &job_dir, opts.ntfy_topic.as_deref(), ctx)
.await?;
}
break;
}
println!();
let live = follow_direct_job_logs(host, &job_dir, opts.ntfy_topic.as_deref(), &job_id, ctx)
.await?;
registry.update_status(&job_id, &live)?;
if live.status == JobStatus::Failed && attempt < max_attempts {
let delay_secs = ctx.retry_base_delay_secs * (1 << (attempt - 1));
println!();
println!(
"{} Retrying in {} seconds (attempt {}/{})...",
style("↻").yellow().bold(),
delay_secs,
attempt + 1,
max_attempts
);
tokio::time::sleep(Duration::from_secs(delay_secs)).await;
println!();
} else {
break;
}
}
Ok(())
}
fn generate_exec_script(job: &ResolvedJob, workspace: &str, job_dir: &str) -> String {
let mut script = String::from("#!/bin/sh\n");
script.push_str(&format!("echo $$ > {job_dir}/pid\n"));
script.push_str(&format!("cd {}\n", shell_escape(workspace)));
for (key, value) in &job.env {
script.push_str(&format!("export {}={}\n", key, shell_escape(value)));
}
script.push_str(&format!(
"{} > {job_dir}/job.out 2> {job_dir}/job.err\n",
job.command
));
script.push_str(&format!("echo $? > {job_dir}/exit_code\n"));
script
}
pub async fn get_remote_direct_job_status(ssh: &SshClient, job_dir: &str) -> Result<LiveStatus> {
let (has_exit_code, exit_code_content, _) = ssh
.exec_allow_failure(&format!("cat {job_dir}/exit_code 2>/dev/null"))
.await?;
if has_exit_code && !exit_code_content.trim().is_empty() {
let code: i32 = exit_code_content.trim().parse().unwrap_or(1);
let status = if code == 0 {
JobStatus::Completed
} else {
JobStatus::Failed
};
return Ok(LiveStatus::with_exit_code(status, code));
}
let (has_pid, pid_content, _) = ssh
.exec_allow_failure(&format!("cat {job_dir}/pid 2>/dev/null"))
.await?;
if has_pid && !pid_content.trim().is_empty() {
let pid = pid_content.trim();
let (is_running, _, _) = ssh
.exec_allow_failure(&format!("kill -0 {pid} 2>/dev/null"))
.await?;
if is_running {
return Ok(LiveStatus::new(JobStatus::Running));
}
return Ok(LiveStatus::new(JobStatus::Failed));
}
Ok(LiveStatus::new(JobStatus::Pending))
}
async fn follow_direct_job_logs(
host: &str,
job_dir: &str,
ntfy_topic: Option<&str>,
job_id: &str,
ctx: RuntimeCtx,
) -> Result<LiveStatus> {
println!(
"{}",
style("Streaming output (Ctrl+C to disconnect, job keeps running)...").yellow()
);
let ssh = ctx.ssh(host);
let stdout_path = format!("{job_dir}/job.out");
let stderr_path = format!("{job_dir}/job.err");
let mut child = ssh.tail_follow(&[&stdout_path, &stderr_path])?;
let job_dir_owned = job_dir.to_string();
let host_owned = host.to_string();
let ntfy_topic_owned = ntfy_topic.map(String::from);
let job_id_owned = job_id.to_string();
let status_check = async move {
let mut prev_status: Option<JobStatus> = None;
loop {
tokio::time::sleep(Duration::from_secs(ctx.poll_interval_remote_secs)).await;
let ssh = ctx.ssh(&host_owned);
if let Ok(live) = get_remote_direct_job_status(&ssh, &job_dir_owned).await {
if let Some(ref topic) = ntfy_topic_owned {
ntfy::notify_state_change(topic, &job_id_owned, prev_status, live.status, None);
prev_status = Some(live.status);
}
match live.status {
JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled => {
return live;
}
_ => {}
}
}
}
};
let live = tokio::select! {
_ = child.wait() => {
let ssh = ctx.ssh(host);
let mut result = None;
for attempt in 0..6 {
if attempt > 0 {
tokio::time::sleep(Duration::from_secs(2)).await;
}
if let Ok(live) = get_remote_direct_job_status(&ssh, job_dir).await {
result = Some(live);
break;
}
}
result.unwrap_or_else(|| LiveStatus::new(JobStatus::Failed))
}
result = status_check => {
let _ = child.kill().await;
tokio::time::sleep(Duration::from_millis(500)).await;
result
}
};
println!();
let message = match live.status {
JobStatus::Completed => "Job completed successfully.".to_string(),
JobStatus::Failed => match live.exit_code {
Some(code) => format!("Job failed (exit code: {code})."),
None => "Job failed.".to_string(),
},
JobStatus::Cancelled => "Job cancelled.".to_string(),
_ => "Job finished.".to_string(),
};
match live.status {
JobStatus::Completed => {
println!("{}", style(&message).green().bold());
}
JobStatus::Failed => {
println!("{}", style(&message).red().bold());
}
JobStatus::Cancelled => {
println!("{}", style(&message).yellow().bold());
}
_ => {}
}
send_notification(&message);
Ok(live)
}
async fn wait_and_notify_direct(
job_id: &str,
remote_host: &str,
job_dir: &str,
ntfy_topic: Option<&str>,
ctx: RuntimeCtx,
) -> Result<()> {
println!(
"{}",
style("Waiting for job to complete (will notify when done)...").dim()
);
let ssh = ctx.ssh(remote_host);
let mut prev_status: Option<JobStatus> = None;
loop {
let live = get_remote_direct_job_status(&ssh, job_dir).await?;
if let Ok(registry) = Registry::open() {
let _ = registry.update_status(job_id, &live);
}
if let Some(topic) = ntfy_topic {
ntfy::notify_state_change(topic, job_id, prev_status, live.status, None);
prev_status = Some(live.status);
}
match live.status {
JobStatus::Completed => {
let message = format!("Job {job_id} completed successfully.");
println!("{}", style(&message).green().bold());
send_notification(&message);
return Ok(());
}
JobStatus::Failed => {
let message = format!("Job {job_id} failed.");
println!("{}", style(&message).red().bold());
send_notification(&message);
return Ok(());
}
JobStatus::Cancelled => {
let message = format!("Job {job_id} was cancelled.");
println!("{}", style(&message).yellow().bold());
send_notification(&message);
return Ok(());
}
_ => {}
}
tokio::time::sleep(Duration::from_secs(ctx.poll_interval_remote_secs)).await;
}
}
pub async fn cancel_remote_direct_job(ssh: &SshClient, job_dir: &str) -> Result<bool> {
let (has_pid, pid_content, _) = ssh
.exec_allow_failure(&format!("cat {job_dir}/pid 2>/dev/null"))
.await?;
if !has_pid || pid_content.trim().is_empty() {
return Ok(false);
}
let pid = pid_content.trim();
let (success, _, _) = ssh
.exec_allow_failure(&format!("kill {pid} 2>/dev/null"))
.await?;
if success {
let _ = ssh
.exec_allow_failure(&format!("echo 143 > {job_dir}/exit_code"))
.await;
}
Ok(success)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_job_id_format() {
let id = generate_job_id("train");
assert!(id.starts_with("train-"));
let parts: Vec<&str> = id.split('-').collect();
assert_eq!(parts.len(), 5);
assert!(parts[1].chars().all(|c| c.is_ascii_digit())); assert!(parts[2].chars().all(|c| c.is_ascii_digit()));
assert_eq!(parts[4].len(), 4);
assert!(
parts[4]
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit())
);
}
#[test]
fn test_generate_job_id_uniqueness() {
let ids: Vec<String> = (0..100).map(|_| generate_job_id("test")).collect();
let unique: std::collections::HashSet<_> = ids.iter().collect();
assert_eq!(ids.len(), unique.len());
}
#[test]
fn test_generate_job_id_with_hyphenated_name() {
let id = generate_job_id("my-job");
assert!(id.starts_with("my-job-"));
let suffix = id.split('-').next_back().unwrap();
assert_eq!(suffix.len(), 4);
}
#[test]
fn test_generate_exec_script_basic() {
use crate::config::{ResolvedJob, SlurmConfig};
use indexmap::IndexMap;
let job = ResolvedJob {
name: "test".to_string(),
command: "echo hello".to_string(),
inputs: vec![],
outputs: vec![],
slurm: SlurmConfig::default(),
env: IndexMap::new(),
host: "cluster".to_string(),
exec: true,
};
let script = generate_exec_script(&job, "/workspace", "/jobs/test-123");
assert!(script.starts_with("#!/bin/sh\n"));
assert!(script.contains("echo $$ > /jobs/test-123/pid"));
assert!(script.contains("cd '/workspace'"));
assert!(script.contains("echo hello > /jobs/test-123/job.out 2> /jobs/test-123/job.err"));
assert!(script.contains("echo $? > /jobs/test-123/exit_code"));
}
#[test]
fn test_generate_exec_script_with_env() {
use crate::config::{ResolvedJob, SlurmConfig};
use indexmap::IndexMap;
let mut env = IndexMap::new();
env.insert("FOO".to_string(), "bar".to_string());
env.insert("PATH_VAR".to_string(), "/some/path".to_string());
let job = ResolvedJob {
name: "test".to_string(),
command: "python train.py".to_string(),
inputs: vec![],
outputs: vec![],
slurm: SlurmConfig::default(),
env,
host: "cluster".to_string(),
exec: true,
};
let script = generate_exec_script(&job, "/ws", "/jobs/test-456");
assert!(script.contains("export FOO='bar'"));
assert!(script.contains("export PATH_VAR='/some/path'"));
assert!(script.contains("python train.py > /jobs/test-456/job.out"));
}
}