use std::io::IsTerminal;
use clap::{CommandFactory, Parser};
use torc::cli::{Cli, Commands};
use torc::client::apis;
use torc::client::apis::configuration::{Configuration, TlsConfig};
use torc::client::commands::access_groups::handle_access_group_commands;
use torc::client::commands::admin::handle_admin_commands;
use torc::client::commands::compute_nodes::handle_compute_node_commands;
use torc::client::commands::config::handle_config_commands;
use torc::client::commands::events::handle_event_commands;
use torc::client::commands::failure_handlers::handle_failure_handler_commands;
use torc::client::commands::files::handle_file_commands;
use torc::client::commands::hpc::handle_hpc_commands;
use torc::client::commands::job_dependencies::handle_job_dependency_commands;
use torc::client::commands::jobs::handle_job_commands;
use torc::client::commands::logs::handle_log_commands;
use torc::client::commands::recover::{
RecoverArgs, RecoveryReport, diagnose_failures, recover_workflow,
};
use torc::client::commands::remote::handle_remote_commands;
use torc::client::commands::resource_requirements::handle_resource_requirements_commands;
use torc::client::commands::results::handle_result_commands;
use torc::client::commands::ro_crate::handle_ro_crate_commands;
use torc::client::commands::scheduled_compute_nodes::handle_scheduled_compute_node_commands;
use torc::client::commands::slurm::handle_slurm_commands;
use torc::client::commands::user_data::handle_user_data_commands;
use torc::client::commands::watch::{WatchArgs, run_watch};
use torc::client::commands::workflows::{handle_cancel, handle_workflow_commands};
use torc::client::config::TorcConfig;
use torc::client::version_check;
use torc::client::workflow_manager::WorkflowManager;
use torc::client::workflow_spec::WorkflowSpec;
use torc::plot_resources_cmd;
use torc::run_jobs_cmd;
use torc::tui_runner;
fn print_workflow_message(format: &str, workflow_id: i64, message: &str) {
if format == "json" {
println!(
"{}",
serde_json::json!({"workflow_id": workflow_id, "message": message})
);
} else {
println!("{}", message);
}
}
fn is_spec_file(arg: &str) -> bool {
arg.ends_with(".yaml")
|| arg.ends_with(".yml")
|| arg.ends_with(".json")
|| arg.ends_with(".json5")
|| std::path::Path::new(arg).is_file()
}
fn main() {
let cli = Cli::parse();
let file_config = TorcConfig::load().unwrap_or_default();
let log_level = cli
.log_level
.clone()
.unwrap_or_else(|| file_config.client.log_level.clone());
let skip_logger_init = matches!(
cli.command,
Commands::Run { .. }
| Commands::Watch { .. }
| Commands::Tui(..)
| Commands::Completions { .. }
);
if !skip_logger_init {
env_logger::Builder::new().parse_filters(&log_level).init();
}
let format = if cli.format != "table" {
cli.format.clone()
} else {
file_config.client.format.clone()
};
if !matches!(format.as_str(), "table" | "json") {
eprintln!("Error: format must be either 'table' or 'json'");
std::process::exit(1);
}
let url = cli
.url
.clone()
.unwrap_or_else(|| file_config.client.api_url.clone());
let tls_ca_cert = cli
.tls_ca_cert
.clone()
.or_else(|| file_config.client.tls.ca_cert.clone());
let tls_insecure = cli.tls_insecure || file_config.client.tls.insecure;
let tls = TlsConfig {
ca_cert_path: tls_ca_cert.as_ref().map(std::path::PathBuf::from),
insecure: tls_insecure,
};
let mut config = Configuration::with_tls(tls);
config.base_path = url.clone();
let password = if cli.prompt_password {
match rpassword::prompt_password("Password: ") {
Ok(pwd) => Some(pwd),
Err(e) => {
eprintln!("Error reading password: {}", e);
std::process::exit(1);
}
}
} else {
cli.password.clone()
};
if let Some(password) = password {
let username = torc::get_username();
config.basic_auth = Some((username, Some(password)));
}
if let Some(ref cookie_header) = cli.cookie_header {
config.cookie_header = Some(cookie_header.clone());
if let Err(e) = config.apply_cookie_header() {
eprintln!("Error: {e}");
std::process::exit(1);
}
unsafe { std::env::set_var("TORC_COOKIE_HEADER", cookie_header) };
}
let requires_server = !matches!(
cli.command,
Commands::Completions { .. }
| Commands::PlotResources(..)
| Commands::Tui(..)
| Commands::Config { .. }
| Commands::Hpc { .. }
);
if requires_server && !cli.skip_version_check {
let result = version_check::check_version(&config);
if result.server_version.is_some() {
let severity = version_check::print_version_warning(&result);
if severity.is_blocking() {
eprintln!("Use --skip-version-check to bypass this check (not recommended)");
std::process::exit(1);
}
}
}
match &cli.command {
Commands::Create {
file,
no_resource_monitoring,
skip_checks,
dry_run,
} => {
let user = torc::get_username();
torc::client::commands::workflows::handle_create(
&config,
file,
&user,
*no_resource_monitoring,
*skip_checks,
*dry_run,
&format,
);
}
Commands::Run {
workflow_spec_or_id,
max_parallel_jobs,
num_cpus,
memory_gb,
num_gpus,
poll_interval,
output_dir,
time_limit,
end_time,
skip_checks,
} => {
let workflow_id = if is_spec_file(workflow_spec_or_id) {
if !*skip_checks {
WorkflowSpec::prevalidate_or_exit(workflow_spec_or_id);
}
let user = torc::get_username();
match WorkflowSpec::create_workflow_from_spec(
&config,
workflow_spec_or_id,
&user,
true,
) {
Ok(id) => {
print_workflow_message(&format, id, &format!("Created workflow {}", id));
id
}
Err(e) => {
eprintln!("Error creating workflow from spec: {}", e);
std::process::exit(1);
}
}
} else {
match workflow_spec_or_id.parse::<i64>() {
Ok(id) => id,
Err(_) => {
eprintln!(
"Error: '{}' is neither a valid workflow spec file nor a workflow ID",
workflow_spec_or_id
);
std::process::exit(1);
}
}
};
let run_config = &file_config.client.run;
let password = config.basic_auth.as_ref().and_then(|(_, p)| p.clone());
let args = run_jobs_cmd::Args {
workflow_id: Some(workflow_id),
url: url.clone(),
output_dir: output_dir
.clone()
.unwrap_or_else(|| run_config.output_dir.clone()),
poll_interval: poll_interval.unwrap_or(run_config.poll_interval),
max_parallel_jobs: max_parallel_jobs.or(run_config.max_parallel_jobs),
time_limit: time_limit.clone(),
end_time: end_time.clone(),
num_cpus: num_cpus.or(run_config.num_cpus),
memory_gb: memory_gb.or(run_config.memory_gb),
num_gpus: num_gpus.or(run_config.num_gpus),
num_nodes: None,
scheduler_config_id: None,
log_prefix: None,
cpu_affinity_cpus_per_job: None,
log_level: log_level.clone(),
password,
tls_ca_cert: tls_ca_cert.clone(),
tls_insecure,
cookie_header: config.cookie_header.clone(),
};
run_jobs_cmd::run(&args);
}
Commands::Submit {
workflow_spec_or_id,
ignore_missing_data,
skip_checks,
max_parallel_jobs,
output_dir,
poll_interval,
} => {
let workflow_id = if is_spec_file(workflow_spec_or_id) {
let spec = match WorkflowSpec::from_spec_file(workflow_spec_or_id) {
Ok(spec) => spec,
Err(e) => {
eprintln!("Error loading workflow spec: {}", e);
std::process::exit(1);
}
};
if !spec.has_schedule_nodes_action() {
eprintln!("Error: Cannot submit workflow");
eprintln!();
eprintln!(
"The spec does not define an on_workflow_start action with schedule_nodes."
);
eprintln!("To submit to Slurm, either:");
eprintln!();
eprintln!(" 1. Use 'torc slurm generate' to auto-generate schedulers:");
eprintln!(
" torc slurm generate --account <account> -o {} {}",
workflow_spec_or_id, workflow_spec_or_id
);
eprintln!(" torc submit {}", workflow_spec_or_id);
eprintln!();
eprintln!(" 2. Add a workflow action manually:");
eprintln!(" actions:");
eprintln!(" - trigger_type: on_workflow_start");
eprintln!(" action_type: schedule_nodes");
eprintln!(" scheduler_type: slurm");
eprintln!(" scheduler: \"my-scheduler\"");
eprintln!();
eprintln!("Or run locally instead:");
eprintln!(" torc run {}", workflow_spec_or_id);
std::process::exit(1);
}
if !*skip_checks {
WorkflowSpec::prevalidate_or_exit(workflow_spec_or_id);
}
let user = torc::get_username();
match WorkflowSpec::create_workflow_from_spec(
&config,
workflow_spec_or_id,
&user,
true,
) {
Ok(id) => {
print_workflow_message(&format, id, &format!("Created workflow {}", id));
id
}
Err(e) => {
eprintln!("Error creating workflow from spec: {}", e);
std::process::exit(1);
}
}
} else {
match workflow_spec_or_id.parse::<i64>() {
Ok(id) => id,
Err(_) => {
eprintln!(
"Error: '{}' is neither a valid workflow spec file nor a workflow ID",
workflow_spec_or_id
);
std::process::exit(1);
}
}
};
if !is_spec_file(workflow_spec_or_id) {
match apis::workflow_actions_api::get_workflow_actions(&config, workflow_id) {
Ok(actions) => {
let has_schedule_nodes = actions.iter().any(|action| {
action.trigger_type == "on_workflow_start"
&& action.action_type == "schedule_nodes"
});
if !has_schedule_nodes {
eprintln!("Error: Cannot submit workflow {}", workflow_id);
eprintln!();
eprintln!(
"The workflow does not define an on_workflow_start action with schedule_nodes."
);
eprintln!(
"To submit to a scheduler, the workflow must have an action configured."
);
eprintln!();
eprintln!("Or run locally instead:");
eprintln!(" torc run {}", workflow_id);
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Error getting workflow actions: {}", e);
std::process::exit(1);
}
}
}
match apis::workflows_api::get_workflow(&config, workflow_id) {
Ok(workflow) => {
let torc_config = TorcConfig::load().unwrap_or_default();
let workflow_manager =
WorkflowManager::new(config.clone(), torc_config, workflow);
match workflow_manager.start(
*ignore_missing_data,
*max_parallel_jobs,
output_dir,
*poll_interval,
) {
Ok(()) => {
print_workflow_message(
&format,
workflow_id,
&format!("Successfully submitted workflow {}", workflow_id),
);
}
Err(e) => {
eprintln!("Error submitting workflow {}: {}", workflow_id, e);
std::process::exit(1);
}
}
}
Err(e) => {
eprintln!("Error getting workflow {}: {}", workflow_id, e);
std::process::exit(1);
}
}
}
Commands::Watch {
workflow_id,
poll_interval,
recover,
max_retries,
memory_multiplier,
runtime_multiplier,
retry_unknown,
recovery_hook,
output_dir,
show_job_counts,
auto_schedule,
auto_schedule_threshold,
auto_schedule_cooldown,
auto_schedule_stranded_timeout,
ai_recovery,
ai_agent,
partition,
walltime,
} => {
let args = WatchArgs {
workflow_id: *workflow_id,
poll_interval: *poll_interval,
recover: *recover,
max_retries: *max_retries,
memory_multiplier: *memory_multiplier,
runtime_multiplier: *runtime_multiplier,
retry_unknown: *retry_unknown,
recovery_hook: recovery_hook.clone(),
output_dir: output_dir.clone(),
show_job_counts: *show_job_counts,
log_level: log_level.clone(),
auto_schedule: *auto_schedule,
auto_schedule_threshold: *auto_schedule_threshold,
auto_schedule_cooldown: *auto_schedule_cooldown,
auto_schedule_stranded_timeout: *auto_schedule_stranded_timeout,
ai_recovery: *ai_recovery,
ai_agent: ai_agent.clone(),
partition: partition.clone(),
walltime: walltime.clone(),
};
run_watch(&config, &args);
}
Commands::Recover {
workflow_id,
output_dir,
memory_multiplier,
runtime_multiplier,
retry_unknown,
recovery_hook,
dry_run,
no_prompts,
ai_recovery,
ai_agent,
} => {
let interactive = !no_prompts && std::io::stdin().is_terminal();
let args = RecoverArgs {
workflow_id: *workflow_id,
output_dir: output_dir.clone(),
memory_multiplier: *memory_multiplier,
runtime_multiplier: *runtime_multiplier,
retry_unknown: *retry_unknown,
recovery_hook: recovery_hook.clone(),
dry_run: *dry_run,
interactive,
ai_recovery: *ai_recovery,
ai_agent: ai_agent.clone(),
};
let diagnosis = if format == "json" {
diagnose_failures(&config, *workflow_id).ok()
} else {
None
};
match recover_workflow(&config, &args) {
Ok(result) => {
if format == "json" {
let report = RecoveryReport {
workflow_id: *workflow_id,
dry_run: *dry_run,
memory_multiplier: *memory_multiplier,
runtime_multiplier: *runtime_multiplier,
result,
diagnosis,
};
println!(
"{}",
serde_json::to_string_pretty(&report).unwrap_or_else(|e| {
format!("{{\"error\": \"Failed to serialize: {}\"}}", e)
})
);
} else if *dry_run {
println!("[DRY RUN] Summary for workflow {}", workflow_id);
if result.oom_fixed > 0 {
println!(
" - {} job(s) would have memory increased",
result.oom_fixed
);
}
if result.timeout_fixed > 0 {
println!(
" - {} job(s) would have runtime increased",
result.timeout_fixed
);
}
if result.unknown_retried > 0 {
println!(
" - {} job(s) with unknown failures would be reset",
result.unknown_retried
);
}
if result.jobs_to_retry.is_empty() {
println!("No recoverable jobs found.");
} else {
println!(
"Would reset {} job(s) and regenerate Slurm schedulers.",
result.jobs_to_retry.len()
);
}
println!("\nRun without --dry-run to apply these changes.");
} else {
println!("Recovery complete for workflow {}", workflow_id);
if result.oom_fixed > 0 {
println!(" - {} job(s) had memory increased", result.oom_fixed);
}
if result.timeout_fixed > 0 {
println!(" - {} job(s) had runtime increased", result.timeout_fixed);
}
if result.unknown_retried > 0 {
println!(
" - {} job(s) with unknown failures reset",
result.unknown_retried
);
}
if result.jobs_to_retry.is_empty() {
println!("No recoverable jobs found.");
} else {
println!(
"Reset {} job(s). Slurm schedulers regenerated and submitted.",
result.jobs_to_retry.len()
);
}
}
}
Err(e) => {
if format == "json" {
println!(
"{}",
serde_json::json!({
"error": e,
"workflow_id": workflow_id,
"dry_run": dry_run,
})
);
std::process::exit(1);
} else {
eprintln!("Recovery failed: {}", e);
std::process::exit(1);
}
}
}
}
Commands::Cancel { workflow_id } => {
handle_cancel(&config, workflow_id, &format);
}
Commands::Status { workflow_id } => {
torc::client::commands::reports::generate_summary(&config, *workflow_id, &format);
}
Commands::Delete {
workflow_ids,
force,
} => {
torc::client::commands::workflows::handle_delete(
&config,
workflow_ids,
*force,
&format,
);
}
Commands::Workflows { command } => {
handle_workflow_commands(&config, command, &format);
}
Commands::ComputeNodes { command } => {
handle_compute_node_commands(&config, command, &format);
}
Commands::Files { command } => {
handle_file_commands(&config, command, &format);
}
Commands::Jobs { command } => {
handle_job_commands(&config, command, &format);
}
Commands::JobDependencies { command } => {
handle_job_dependency_commands(command, &config, &format);
}
Commands::ResourceRequirements { command } => {
handle_resource_requirements_commands(&config, command, &format);
}
Commands::FailureHandlers { command } => {
handle_failure_handler_commands(&config, command, &format);
}
Commands::RoCrate { command } => {
handle_ro_crate_commands(&config, command, &format);
}
Commands::Events { command } => {
handle_event_commands(&config, command, &format);
}
Commands::Results { command } => {
handle_result_commands(&config, command, &format);
}
Commands::UserData { command } => {
handle_user_data_commands(&config, command, &format);
}
Commands::Slurm { command } => {
handle_slurm_commands(&config, command, &format);
}
Commands::Remote { command } => {
handle_remote_commands(&config, command);
}
Commands::ScheduledComputeNodes { command } => {
handle_scheduled_compute_node_commands(&config, command, &format);
}
Commands::Hpc { command } => {
handle_hpc_commands(command, &format);
}
Commands::Logs { command } => {
handle_log_commands(&config, command);
}
Commands::AccessGroups { command } => {
handle_access_group_commands(&config, command, &format);
}
Commands::Admin { command } => {
handle_admin_commands(&config, command, &format);
}
Commands::Config { command } => {
handle_config_commands(command);
}
Commands::Tui(args) => {
let basic_auth = config.basic_auth.clone();
if let Err(e) = tui_runner::run(args, basic_auth) {
eprintln!("Error running TUI: {}", e);
std::process::exit(1);
}
}
Commands::PlotResources(args) => {
if let Err(e) = plot_resources_cmd::run(args) {
eprintln!("Error generating plots: {}", e);
std::process::exit(1);
}
}
Commands::Ping => match apis::system_api::ping(&config) {
Ok(_) => {
if cli.format == "json" {
println!(r#"{{"status": "Server is running"}}"#);
} else {
println!("Server is running");
}
}
Err(e) => {
if cli.format == "json" {
println!(
r#"{{"status": "error", "message": "{}"}}"#,
e.to_string().replace('"', "\\\"")
);
} else {
eprintln!("Failed to connect to server: {}", e);
}
std::process::exit(1);
}
},
Commands::Completions { shell } => {
let mut cmd = Cli::command();
clap_complete::generate(*shell, &mut cmd, "torc", &mut std::io::stdout());
}
}
}