mod cli;
mod config;
mod diagnostics;
mod error;
mod handlers;
mod job;
mod local;
mod ntfy;
mod output;
mod proxy;
mod registry;
mod runtime;
mod slurm;
mod ssh;
mod sync;
use anyhow::Result;
use clap::{CommandFactory, Parser};
use cli::{Cli, Commands};
use config::Config;
use console::style;
use job::StatusOptions;
use output::OutputFormat;
use registry::ArchivedFilter;
use runtime::RuntimeCtx;
use slurm::slurm_config_from_cli;
#[tokio::main]
async fn main() {
#[cfg(unix)]
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
}
if let Err(e) = run().await {
eprintln!("{} {}", style("Error:").red().bold(), e);
std::process::exit(1);
}
}
fn check_dependencies() -> Result<()> {
use std::process::Command;
if Command::new("ssh").arg("-V").output().is_err() {
anyhow::bail!(
"ssh not found. Install it with:\n \
macOS: Pre-installed (check /usr/bin/ssh)\n \
Ubuntu: apt install openssh-client\n \
Windows: Install Git Bash, WSL, or OpenSSH"
);
}
if Command::new("rsync").arg("--version").output().is_err() {
anyhow::bail!(
"rsync not found. Install it with:\n \
macOS: brew install rsync\n \
Ubuntu: apt install rsync\n \
Fedora: dnf install rsync"
);
}
Ok(())
}
async fn run() -> Result<()> {
let cli = Cli::parse();
check_dependencies()?;
if let Some(ref dir) = cli.directory {
std::env::set_current_dir(dir).map_err(|e| {
anyhow::anyhow!("Cannot change to directory '{}': {}", dir.display(), e)
})?;
}
let optional_settings = Config::find_and_load().ok().map(|c| c.settings);
let runtime_ctx = RuntimeCtx::from_optional_settings(cli.debug, optional_settings.as_ref());
let format = if cli.json {
OutputFormat::Json
} else {
OutputFormat::Human
};
match cli.command {
Commands::Run(args) => {
let config = Config::find_and_load()?;
let runtime_ctx = RuntimeCtx::from_settings(cli.debug, &config.settings);
let slurm_overrides = slurm_config_from_cli(
args.partition,
args.time,
args.gpus,
args.cpus,
args.memory,
args.constraint,
args.nodes,
args.exclude,
);
job::run_job(
&config,
args.job_or_command.as_deref(),
args.command.as_deref(),
&args.env_vars,
&args.tags,
slurm_overrides,
args.host.as_deref(),
job::RunJobOptions {
background: args.bg,
notify: args.notify,
ntfy_topic: args.ntfy,
dry_run: args.dry_run,
after: args.after,
retry: args.retry,
note: args.note,
exec: args.exec,
},
runtime_ctx,
)
.await?;
}
Commands::Exec {
command,
env_vars,
host,
no_sync,
} => {
let config = Config::find_and_load()?;
let runtime_ctx = RuntimeCtx::from_settings(cli.debug, &config.settings);
job::exec_command(
&config,
&command,
&env_vars,
host.as_deref(),
no_sync,
runtime_ctx,
)
.await?;
}
Commands::Status(args) => {
let archived_filter = if args.archived {
ArchivedFilter::OnlyArchived
} else if args.all_jobs {
ArchivedFilter::IncludeAll
} else {
ArchivedFilter::ExcludeArchived
};
job::show_status(
args.job_id.as_deref(),
StatusOptions {
filters: &args.filter,
name: args.name.as_deref(),
tags: &args.tags,
last: args.last,
default_limit: optional_settings.as_ref().map(|s| s.default_list_limit),
archived: archived_filter,
compact: args.compact,
format,
},
runtime_ctx,
)
.await?;
}
Commands::Watch(args) => {
let archived_filter = if args.archived {
ArchivedFilter::OnlyArchived
} else if args.all_jobs {
ArchivedFilter::IncludeAll
} else {
ArchivedFilter::ExcludeArchived
};
let interval = job::parse_interval_secs(args.interval)?;
job::watch_status(
StatusOptions {
filters: &args.filter,
name: args.name.as_deref(),
tags: &args.tags,
last: args.last,
default_limit: optional_settings.as_ref().map(|s| s.default_list_limit),
archived: archived_filter,
compact: args.compact,
format,
},
interval,
runtime_ctx,
)
.await?;
}
Commands::Logs(args) => {
job::show_logs(
args.job_id.as_deref(),
&args.tags,
args.note.as_deref(),
job::ShowLogsOptions {
follow: args.follow,
only_stdout: args.stdout,
only_stderr: args.stderr,
tail: args.tail,
raw: args.raw,
ctx: runtime_ctx,
},
)
.await?;
}
Commands::Download(args) => {
job::download_outputs(
args.job_id.as_deref(),
args.partial,
args.path.as_deref(),
&args.filter,
&args.tags,
args.dry_run,
runtime_ctx,
)
.await?;
}
Commands::Cancel(args) => {
job::cancel_jobs(
args.job_id.as_deref(),
&args.tags,
job::CancelJobsOptions {
all: args.all,
skip_confirm: args.yes,
dry_run: args.dry_run,
format,
ctx: runtime_ctx,
},
)
.await?;
}
Commands::Clean(args) => {
job::clean_jobs(
args.job_id.as_deref(),
args.older_than.as_deref(),
&args.tags,
job::CleanJobsOptions {
all: args.all,
status_filters: args.filter,
delete: args.delete,
clean_workspace: args.workspace,
include_archived: args.archived,
unarchive: args.unarchive,
dry_run: args.dry_run,
skip_confirm: args.yes,
format,
ctx: runtime_ctx,
},
)
.await?;
}
Commands::Jobs => {
let config = Config::find_and_load()?;
handlers::list_jobs(&config, format)?;
}
Commands::Tags => {
job::list_tags(format)?;
}
Commands::Rerun {
job_id,
bg,
ntfy,
tags,
} => {
let config = Config::find_and_load()?;
let runtime_ctx = RuntimeCtx::from_settings(cli.debug, &config.settings);
job::rerun_job(&config, &job_id, &tags, bg, ntfy.as_deref(), runtime_ctx).await?;
}
Commands::Init => handlers::init()?,
Commands::Check { remote } => {
let config = Config::find_and_load()?;
let runtime_ctx = RuntimeCtx::from_settings(cli.debug, &config.settings);
handlers::check(&config);
if remote {
diagnostics::check_remote(&config, runtime_ctx).await?;
}
}
Commands::Skill { install } => {
if let Some(scope) = install {
handlers::install_skill(scope)?;
} else {
print!("{}", include_str!("../docs/skill.md"));
}
}
Commands::Doctor => {
diagnostics::doctor(runtime_ctx.debug).await?;
}
Commands::Ping => {
let config = Config::find_and_load()?;
let runtime_ctx = RuntimeCtx::from_settings(cli.debug, &config.settings);
job::ping_cluster(&config, runtime_ctx).await?;
}
Commands::Wait {
job_id,
notify,
ntfy,
tags,
} => {
job::wait_for_job(
job_id.as_deref(),
notify,
ntfy.as_deref(),
&tags,
format,
runtime_ctx,
)
.await?;
}
Commands::Completions { shell } => {
clap_complete::generate(shell, &mut Cli::command(), "fleche", &mut std::io::stdout());
}
Commands::Stats { job_id, last, tags } => {
job::show_stats(job_id.as_deref(), last, &tags, format, runtime_ctx).await?;
}
Commands::Note { job_id, note } => {
job::note_job(&job_id, note.as_deref())?;
}
Commands::Compare { job_a, job_b } => {
handlers::compare_jobs(&job_a, &job_b)?;
}
Commands::Proxy {
command,
port,
host,
} => {
let host = match host {
Some(h) => h,
None => Config::find_and_load()?.remote.host,
};
let code = proxy::run_proxy_command(&host, &command, port, cli.debug).await?;
if code != 0 {
std::process::exit(code);
}
}
}
Ok(())
}