use log::{debug, info, warn};
use serde::Serialize;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Command;
use crate::client::apis::configuration::Configuration;
use crate::client::apis::default_api;
use crate::client::commands::slurm::RegenerateDryRunResult;
use crate::client::report_models::{ResourceUtilizationReport, ResultsReport};
use crate::client::resource_correction::{
ResourceAdjustmentReport, ResourceCorrectionContext, ResourceCorrectionOptions,
apply_resource_corrections,
};
use crate::models::JobStatus;
pub struct RecoverArgs {
pub workflow_id: i64,
pub output_dir: PathBuf,
pub memory_multiplier: f64,
pub runtime_multiplier: f64,
pub retry_unknown: bool,
pub recovery_hook: Option<String>,
pub dry_run: bool,
pub ai_recovery: bool,
pub ai_agent: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct RecoveryResult {
pub oom_fixed: usize,
pub timeout_fixed: usize,
pub unknown_retried: usize,
pub other_failures: usize,
pub jobs_to_retry: Vec<i64>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub adjustments: Vec<ResourceAdjustmentReport>,
#[serde(skip_serializing_if = "Option::is_none")]
pub slurm_dry_run: Option<RegenerateDryRunResult>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RecoveryReport {
pub workflow_id: i64,
pub dry_run: bool,
pub memory_multiplier: f64,
pub runtime_multiplier: f64,
pub result: RecoveryResult,
#[serde(skip_serializing_if = "Option::is_none")]
pub diagnosis: Option<ResourceUtilizationReport>,
}
#[derive(Debug)]
pub struct SlurmLogInfo {
pub slurm_job_id: Option<String>,
pub slurm_stdout: Option<String>,
pub slurm_stderr: Option<String>,
}
pub fn recover_workflow(
config: &Configuration,
args: &RecoverArgs,
) -> Result<RecoveryResult, String> {
if args.dry_run {
info!("Recovery dry_run workflow_id={}", args.workflow_id);
}
info!("Orphan check workflow_id={}", args.workflow_id);
match super::orphan_detection::cleanup_orphaned_jobs(config, args.workflow_id, args.dry_run) {
Ok(result) => {
if result.any_cleaned() {
if args.dry_run {
info!(
"Orphan cleanup dry_run workflow_id={} slurm_jobs={} pending_allocations={} running_jobs={}",
args.workflow_id,
result.slurm_jobs_failed,
result.pending_allocations_cleaned,
result.running_jobs_failed
);
} else {
info!(
"Orphans cleaned workflow_id={} slurm_jobs_failed={} pending_allocations_cleaned={} running_jobs_failed={}",
args.workflow_id,
result.slurm_jobs_failed,
result.pending_allocations_cleaned,
result.running_jobs_failed
);
}
} else {
info!("No orphans found workflow_id={}", args.workflow_id);
}
}
Err(e) => {
warn!(
"Orphan cleanup error workflow_id={} error={}",
args.workflow_id, e
);
}
}
let pending_failed_count = count_pending_failed_jobs(config, args.workflow_id).unwrap_or(0);
if pending_failed_count > 0 {
if args.ai_recovery {
info!(
"[EXPERIMENTAL] AI recovery: {} job(s) in pending_failed status",
pending_failed_count
);
info!("These jobs failed without a matching failure handler rule.");
if args.dry_run {
info!(
"[DRY RUN] Would invoke AI agent '{}' for classification",
args.ai_agent
);
} else {
match invoke_ai_agent(args.workflow_id, &args.ai_agent, &args.output_dir) {
Ok(()) => {
let remaining =
count_pending_failed_jobs(config, args.workflow_id).unwrap_or(0);
if remaining > 0 {
warn!(
"{} job(s) still in pending_failed status after AI classification",
remaining
);
} else {
info!("All pending_failed jobs have been classified");
}
}
Err(e) => {
warn!("AI agent invocation failed: {}", e);
warn!("You can manually classify jobs using the torc MCP server:");
warn!(" 1. list_pending_failed_jobs - View jobs with their stderr");
warn!(" 2. classify_and_resolve_failures - Apply retry/fail decisions");
warn!(
"Or reset them manually: torc workflows reset-status {} --failed-only",
args.workflow_id
);
}
}
}
} else {
warn!(
"{} job(s) in pending_failed status (awaiting classification)",
pending_failed_count
);
warn!("Use --ai-recovery to enable AI-assisted classification via MCP tools.");
warn!(
"Or reset them manually: torc workflows reset-status {} --failed-only",
args.workflow_id
);
}
}
check_recovery_preconditions(config, args.workflow_id)?;
info!("Diagnosing failures...");
let diagnosis = diagnose_failures(args.workflow_id, &args.output_dir)?;
if args.dry_run {
info!("[DRY RUN] Proposed resource adjustments:");
} else {
info!("Applying recovery heuristics...");
}
let mut result = apply_recovery_heuristics(
config,
args.workflow_id,
&diagnosis,
args.memory_multiplier,
args.runtime_multiplier,
args.retry_unknown,
&args.output_dir,
args.dry_run,
)?;
if result.oom_fixed > 0 || result.timeout_fixed > 0 {
if args.dry_run {
info!(
" Would apply fixes: {} OOM, {} timeout",
result.oom_fixed, result.timeout_fixed
);
} else {
info!(
" Applied fixes: {} OOM, {} timeout",
result.oom_fixed, result.timeout_fixed
);
}
}
if result.other_failures > 0 {
if args.retry_unknown {
if args.recovery_hook.is_some() {
info!(
" {} job(s) with unknown failure cause (would run recovery hook)",
result.other_failures
);
} else {
info!(
" {} job(s) with unknown failure cause (would retry)",
result.other_failures
);
}
result.unknown_retried = result.other_failures;
} else {
info!(
" {} job(s) with unknown failure cause (skipped, use --retry-unknown to include)",
result.other_failures
);
}
}
if args.dry_run {
if result.jobs_to_retry.is_empty() {
info!("[DRY RUN] No recoverable jobs found.");
} else {
info!(
"[DRY RUN] Would reset {} job(s) for retry",
result.jobs_to_retry.len()
);
info!("[DRY RUN] Would reinitialize workflow");
info!("[DRY RUN] Slurm schedulers that would be created:");
match get_scheduler_dry_run(args.workflow_id, &args.output_dir, &result.jobs_to_retry) {
Ok(mut dry_run_result) => {
for sched in &mut dry_run_result.planned_schedulers {
for adj in &result.adjustments {
let has_matching_job = sched
.job_names
.iter()
.any(|name| adj.job_names.contains(name));
if has_matching_job {
if adj.memory_adjusted
&& let Some(ref new_mem) = adj.new_memory
{
sched.mem = Some(new_mem.clone());
}
break;
}
}
}
for sched in &dry_run_result.planned_schedulers {
let deps = if sched.has_dependencies {
" (deferred)"
} else {
""
};
info!(
" {} - {} job(s), {} allocation(s){}",
sched.name, sched.job_count, sched.num_allocations, deps
);
info!(
" Account: {}, Partition: {}, Walltime: {}, Nodes: {}, Mem: {}",
sched.account,
sched.partition.as_deref().unwrap_or("default"),
sched.walltime,
sched.nodes,
sched.mem.as_deref().unwrap_or("default")
);
}
info!(
"[DRY RUN] Total: {} allocation(s) would be submitted",
dry_run_result.total_allocations
);
dry_run_result.would_submit = true;
result.slurm_dry_run = Some(dry_run_result);
}
Err(e) => {
warn!(" Could not get scheduler preview: {}", e);
info!(
"[DRY RUN] Would submit Slurm allocations for {} job(s)",
result.jobs_to_retry.len()
);
}
}
}
return Ok(result);
}
if result.other_failures > 0
&& let Some(ref hook_cmd) = args.recovery_hook
{
info!(
"{} job(s) with unknown failure cause - running recovery hook...",
result.other_failures
);
run_recovery_hook(args.workflow_id, hook_cmd)?;
}
if result.jobs_to_retry.is_empty() {
return Err(format!(
"No recoverable jobs found. {} job(s) failed with unknown causes. \
Use --retry-unknown to retry jobs with unknown failure causes.",
result.other_failures
));
}
info!(
"Jobs resetting workflow_id={} count={}",
args.workflow_id,
result.jobs_to_retry.len()
);
let reset_count = reset_failed_jobs(config, args.workflow_id, &result.jobs_to_retry)?;
info!(
"Jobs reset workflow_id={} count={}",
args.workflow_id, reset_count
);
info!("Workflow reinitializing workflow_id={}", args.workflow_id);
reinitialize_workflow(args.workflow_id)?;
info!("Schedulers regenerating workflow_id={}", args.workflow_id);
regenerate_and_submit(args.workflow_id, &args.output_dir)?;
Ok(result)
}
fn check_recovery_preconditions(config: &Configuration, workflow_id: i64) -> Result<(), String> {
let is_complete = default_api::is_workflow_complete(config, workflow_id)
.map_err(|e| format!("Failed to check workflow completion status: {}", e))?;
if !is_complete.is_complete && !is_complete.is_canceled {
return Err("Cannot recover: workflow is not complete. \
Wait for all jobs to finish or use 'torc workflows cancel' first."
.to_string());
}
let active_nodes = default_api::list_compute_nodes(
config,
workflow_id,
None, Some(1), None, None, None, Some(true), None, )
.map_err(|e| format!("Failed to check for active compute nodes: {}", e))?;
if let Some(nodes) = active_nodes.items
&& !nodes.is_empty()
{
return Err("Cannot recover: there are still active compute nodes. \
Wait for all workers to exit."
.to_string());
}
let pending_scn = default_api::list_scheduled_compute_nodes(
config,
workflow_id,
None, Some(1), None, None, None, None, Some("pending"), )
.map_err(|e| format!("Failed to check for pending scheduled compute nodes: {}", e))?;
if pending_scn.total_count > 0 {
return Err("Cannot recover: there are pending Slurm allocations. \
Wait for them to start or cancel them with 'torc slurm cancel'."
.to_string());
}
let active_scn = default_api::list_scheduled_compute_nodes(
config,
workflow_id,
None, Some(1), None, None, None, None, Some("active"), )
.map_err(|e| format!("Failed to check for active scheduled compute nodes: {}", e))?;
if active_scn.total_count > 0 {
return Err(
"Cannot recover: there are active Slurm allocations still running. \
Wait for all workers to exit."
.to_string(),
);
}
let failed_jobs = default_api::list_jobs(
config,
workflow_id,
Some(crate::models::JobStatus::Failed), None, None, None, Some(1), None, None, None, None, )
.map_err(|e| format!("Failed to list failed jobs: {}", e))?;
let terminated_jobs = default_api::list_jobs(
config,
workflow_id,
Some(crate::models::JobStatus::Terminated), None, None, None, Some(1), None, None, None, None, )
.map_err(|e| format!("Failed to list terminated jobs: {}", e))?;
if failed_jobs.total_count == 0 && terminated_jobs.total_count == 0 {
return Err("No failed or terminated jobs to recover. \
Workflow may have completed successfully."
.to_string());
}
Ok(())
}
pub fn invoke_ai_agent(workflow_id: i64, agent: &str, output_dir: &Path) -> Result<(), String> {
let prompt = format!(
"You are helping recover a Torc workflow. Workflow {} has jobs in 'pending_failed' status \
that need classification. \n\n\
Please use the torc MCP tools to:\n\
1. Call list_pending_failed_jobs with workflow_id={} to see the jobs and their stderr\n\
2. Analyze each job's stderr to determine if the error is transient (retry) or permanent (fail)\n\
3. Call classify_and_resolve_failures with your classifications\n\n\
The output directory is: {}\n\n\
After classification, the workflow can continue with recovery.",
workflow_id,
workflow_id,
output_dir.display()
);
info!(
"[EXPERIMENTAL] Invoking AI agent '{}' for pending_failed classification...",
agent
);
match agent {
"claude" => {
let check = Command::new("claude").arg("--version").output();
match check {
Ok(output) if output.status.success() => {
}
Ok(_) | Err(_) => {
return Err(
"Claude CLI not found. Install it from https://claude.ai/code \
or use --ai-agent to specify a different agent."
.to_string(),
);
}
}
info!("Running: claude --print \"<prompt>\"");
let output = Command::new("claude")
.arg("--print")
.arg(&prompt)
.output()
.map_err(|e| format!("Failed to run claude CLI: {}", e))?;
if !output.stdout.is_empty() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
info!("[claude] {}", line);
}
}
if !output.stderr.is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stderr.lines() {
warn!("[claude] {}", line);
}
}
if !output.status.success() {
let exit_code = output.status.code().unwrap_or(-1);
return Err(format!("Claude CLI exited with code {}", exit_code));
}
info!("AI agent completed classification");
Ok(())
}
"copilot" | "github-copilot" => {
let check = Command::new("gh").arg("--version").output();
match check {
Ok(output) if output.status.success() => {
}
Ok(_) | Err(_) => {
return Err(
"GitHub CLI (gh) not found. Install it from https://cli.github.com/ \
or use --ai-agent to specify a different agent."
.to_string(),
);
}
}
info!("Running: gh copilot suggest \"<prompt>\"");
let output = Command::new("gh")
.args(["copilot", "suggest", &prompt])
.output()
.map_err(|e| format!("Failed to run gh copilot: {}", e))?;
if !output.stdout.is_empty() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
info!("[copilot] {}", line);
}
}
if !output.stderr.is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stderr.lines() {
warn!("[copilot] {}", line);
}
}
if !output.status.success() {
let exit_code = output.status.code().unwrap_or(-1);
return Err(format!("GitHub Copilot CLI exited with code {}", exit_code));
}
info!("AI agent completed classification");
Ok(())
}
other => Err(format!(
"Unsupported AI agent '{}'. Supported agents: claude, copilot",
other
)),
}
}
fn count_pending_failed_jobs(config: &Configuration, workflow_id: i64) -> Result<i64, String> {
let pending_failed_jobs = default_api::list_jobs(
config,
workflow_id,
Some(JobStatus::PendingFailed),
None, None, None, Some(1), None, None, None, None, )
.map_err(|e| format!("Failed to list pending_failed jobs: {}", e))?;
Ok(pending_failed_jobs.total_count)
}
pub fn diagnose_failures(
workflow_id: i64,
_output_dir: &Path,
) -> Result<ResourceUtilizationReport, String> {
let output = Command::new("torc")
.args([
"-f",
"json",
"reports",
"check-resource-utilization",
&workflow_id.to_string(),
"--include-failed",
])
.output()
.map_err(|e| format!("Failed to run check-resource-utilization: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("check-resource-utilization failed: {}", stderr));
}
let stdout = String::from_utf8_lossy(&output.stdout);
serde_json::from_str(&stdout)
.map_err(|e| format!("Failed to parse resource utilization output: {}", e))
}
fn get_slurm_log_info(workflow_id: i64, output_dir: &Path) -> Result<ResultsReport, String> {
let output = Command::new("torc")
.args([
"-f",
"json",
"reports",
"results",
&workflow_id.to_string(),
"-o",
output_dir.to_str().unwrap_or("torc_output"),
])
.output()
.map_err(|e| format!("Failed to run reports results: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("reports results failed: {}", stderr));
}
let stdout = String::from_utf8_lossy(&output.stdout);
serde_json::from_str(&stdout)
.map_err(|e| format!("Failed to parse reports results output: {}", e))
}
fn correlate_slurm_logs(
diagnosis: &ResourceUtilizationReport,
slurm_info: &ResultsReport,
) -> HashMap<i64, SlurmLogInfo> {
let mut log_map = HashMap::new();
for result in &slurm_info.results {
if result.slurm_stdout.is_some() || result.slurm_stderr.is_some() {
log_map.insert(
result.job_id,
SlurmLogInfo {
slurm_job_id: result.slurm_job_id.clone(),
slurm_stdout: result.slurm_stdout.clone(),
slurm_stderr: result.slurm_stderr.clone(),
},
);
}
}
let mut failed_log_map = HashMap::new();
for violation in &diagnosis.resource_violations {
if let Some(log_info) = log_map.remove(&violation.job_id) {
failed_log_map.insert(violation.job_id, log_info);
}
}
failed_log_map
}
#[allow(clippy::too_many_arguments)]
pub fn apply_recovery_heuristics(
config: &Configuration,
workflow_id: i64,
diagnosis: &ResourceUtilizationReport,
memory_multiplier: f64,
runtime_multiplier: f64,
retry_unknown: bool,
output_dir: &Path,
dry_run: bool,
) -> Result<RecoveryResult, String> {
let slurm_log_map = match get_slurm_log_info(workflow_id, output_dir) {
Ok(slurm_info) => {
let log_map = correlate_slurm_logs(diagnosis, &slurm_info);
if !log_map.is_empty() {
info!(" Found Slurm logs for {} failed job(s)", log_map.len());
}
log_map
}
Err(e) => {
debug!("Could not get Slurm log info: {}", e);
HashMap::new()
}
};
for violation in &diagnosis.resource_violations {
if let Some(slurm_info) = slurm_log_map.get(&violation.job_id)
&& let Some(slurm_job_id) = &slurm_info.slurm_job_id
{
debug!(
" Job {} ran in Slurm allocation {}",
violation.job_id, slurm_job_id
);
}
}
let mut other_failures = 0;
let mut unknown_job_ids = Vec::new();
for violation in &diagnosis.resource_violations {
if !violation.memory_violation && !violation.likely_timeout {
other_failures += 1;
if retry_unknown {
unknown_job_ids.push(violation.job_id);
}
}
}
let correction_ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis,
all_results: &[],
all_jobs: &[],
all_resource_requirements: &[],
};
let correction_opts = ResourceCorrectionOptions {
memory_multiplier,
cpu_multiplier: memory_multiplier, runtime_multiplier,
include_jobs: vec![],
dry_run,
no_downsize: true,
};
let correction_result = apply_resource_corrections(&correction_ctx, &correction_opts)?;
let oom_fixed = correction_result.memory_corrections;
let timeout_fixed = correction_result.runtime_corrections;
let mut jobs_to_retry = Vec::new();
for adj in &correction_result.adjustments {
jobs_to_retry.extend(&adj.job_ids);
}
jobs_to_retry.extend(&unknown_job_ids);
Ok(RecoveryResult {
oom_fixed,
timeout_fixed,
unknown_retried: unknown_job_ids.len(),
other_failures,
jobs_to_retry,
adjustments: correction_result.adjustments,
slurm_dry_run: None, })
}
pub fn reset_failed_jobs(
_config: &Configuration,
workflow_id: i64,
job_ids: &[i64],
) -> Result<usize, String> {
if job_ids.is_empty() {
return Ok(0);
}
let job_count = job_ids.len();
let output = Command::new("torc")
.args([
"workflows",
"reset-status",
&workflow_id.to_string(),
"--failed-only",
"--no-prompts",
])
.output()
.map_err(|e| format!("Failed to run workflow reset-status: {}", e))?;
if !output.stdout.is_empty() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
info!(" {}", line);
}
}
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("workflow reset-status failed: {}", stderr));
}
Ok(job_count)
}
pub fn reinitialize_workflow(workflow_id: i64) -> Result<(), String> {
let output = Command::new("torc")
.args(["workflows", "reinitialize", &workflow_id.to_string()])
.output()
.map_err(|e| format!("Failed to run workflow reinitialize: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("workflow reinitialize failed: {}", stderr));
}
Ok(())
}
pub fn run_recovery_hook(workflow_id: i64, hook_command: &str) -> Result<(), String> {
info!("Running recovery hook: {}", hook_command);
let parts = shlex::split(hook_command)
.ok_or_else(|| format!("Invalid quoting in recovery hook command: {}", hook_command))?;
if parts.is_empty() {
return Err("Recovery hook command is empty".to_string());
}
let program = &parts[0];
let program_path = if !program.contains('/') && std::path::Path::new(program).exists() {
format!("./{}", program)
} else {
program.to_string()
};
let mut cmd = Command::new(&program_path);
if parts.len() > 1 {
cmd.args(&parts[1..]);
}
cmd.arg(workflow_id.to_string());
cmd.env("TORC_WORKFLOW_ID", workflow_id.to_string());
let output = cmd
.output()
.map_err(|e| format!("Failed to execute recovery hook '{}': {}", hook_command, e))?;
if !output.stdout.is_empty() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
info!(" [hook] {}", line);
}
}
if !output.stderr.is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
for line in stderr.lines() {
warn!(" [hook] {}", line);
}
}
if !output.status.success() {
let exit_code = output.status.code().unwrap_or(-1);
return Err(format!(
"Recovery hook '{}' failed with exit code {}",
hook_command, exit_code
));
}
info!("Recovery hook completed successfully");
Ok(())
}
pub fn regenerate_and_submit(workflow_id: i64, output_dir: &Path) -> Result<(), String> {
let output = Command::new("torc")
.args([
"slurm",
"regenerate",
&workflow_id.to_string(),
"--submit",
"-o",
output_dir.to_str().unwrap_or("torc_output"),
])
.output()
.map_err(|e| format!("Failed to run slurm regenerate: {}", e))?;
if !output.stdout.is_empty() {
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
info!(" {}", line);
}
}
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("slurm regenerate failed: {}", stderr));
}
Ok(())
}
fn get_scheduler_dry_run(
workflow_id: i64,
output_dir: &Path,
job_ids: &[i64],
) -> Result<RegenerateDryRunResult, String> {
let job_ids_str = job_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(",");
let output = Command::new("torc")
.args([
"-f",
"json",
"slurm",
"regenerate",
&workflow_id.to_string(),
"--dry-run",
"--include-job-ids",
&job_ids_str,
"-o",
output_dir.to_str().unwrap_or("torc_output"),
])
.output()
.map_err(|e| format!("Failed to run slurm regenerate --dry-run: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("slurm regenerate --dry-run failed: {}", stderr));
}
let stdout = String::from_utf8_lossy(&output.stdout);
serde_json::from_str(&stdout)
.map_err(|e| format!("Failed to parse slurm regenerate dry-run output: {}", e))
}