use rmcp::{
ErrorData as McpError,
model::{CallToolResult, RawResource, Resource, ResourceContents},
};
use std::fs;
use std::path::Path;
use std::process::Command;
use crate::client::apis::configuration::Configuration;
use crate::client::apis::default_api;
use crate::client::commands::pagination::jobs::{JobListParams, paginate_jobs};
use crate::client::commands::pagination::resource_requirements::{
ResourceRequirementsListParams, paginate_resource_requirements,
};
use crate::client::commands::pagination::results::{ResultListParams, paginate_results};
use crate::client::log_paths;
use crate::client::resource_correction::format_memory_bytes_short;
use crate::models::{JobStatus, ResourceRequirementsModel};
fn internal_error(msg: String) -> McpError {
McpError::internal_error(msg, None)
}
fn invalid_params(msg: &str) -> McpError {
McpError::invalid_request(msg.to_string(), None)
}
pub fn get_workflow_status(
config: &Configuration,
workflow_id: i64,
) -> Result<CallToolResult, McpError> {
let workflow = default_api::get_workflow(config, workflow_id)
.map_err(|e| internal_error(format!("Failed to get workflow: {}", e)))?;
let jobs = paginate_jobs(config, workflow_id, JobListParams::new())
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let mut status_counts = std::collections::HashMap::new();
for job in &jobs {
if let Some(status) = &job.status {
let status_str = format!("{:?}", status);
*status_counts.entry(status_str).or_insert(0) += 1;
}
}
let result = serde_json::json!({
"workflow_id": workflow.id,
"name": workflow.name,
"user": workflow.user,
"description": workflow.description,
"total_jobs": jobs.len(),
"job_counts_by_status": status_counts,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
pub fn get_job_details(config: &Configuration, job_id: i64) -> Result<CallToolResult, McpError> {
let job = default_api::get_job(config, job_id)
.map_err(|e| internal_error(format!("Failed to get job: {}", e)))?;
let resource_reqs = if let Some(req_id) = job.resource_requirements_id {
default_api::get_resource_requirements(config, req_id).ok()
} else {
None
};
let result = paginate_results(
config,
job.workflow_id,
ResultListParams::new().with_job_id(job_id).with_limit(1),
)
.ok()
.and_then(|items| items.into_iter().next());
let response = serde_json::json!({
"job_id": job.id,
"workflow_id": job.workflow_id,
"name": job.name,
"command": job.command,
"status": format!("{:?}", job.status),
"invocation_script": job.invocation_script,
"supports_termination": job.supports_termination,
"cancel_on_blocking_job_failure": job.cancel_on_blocking_job_failure,
"depends_on_job_ids": job.depends_on_job_ids,
"resource_requirements": resource_reqs.map(|r| serde_json::json!({
"id": r.id,
"num_cpus": r.num_cpus,
"num_gpus": r.num_gpus,
"memory": r.memory,
"runtime": r.runtime,
})),
"latest_result": result.map(|r| serde_json::json!({
"run_id": r.run_id,
"return_code": r.return_code,
"exec_time_minutes": r.exec_time_minutes,
"completion_time": r.completion_time,
"peak_memory_bytes": r.peak_memory_bytes,
"avg_cpu_percent": r.avg_cpu_percent,
})),
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
pub fn get_job_logs(
output_dir: &Path,
workflow_id: i64,
job_id: i64,
run_id: i64,
attempt_id: i64,
log_type: &str,
tail_lines: Option<usize>,
) -> Result<CallToolResult, McpError> {
let log_path = match log_type.to_lowercase().as_str() {
"stdout" => {
log_paths::get_job_stdout_path(output_dir, workflow_id, job_id, run_id, attempt_id)
}
"stderr" => {
log_paths::get_job_stderr_path(output_dir, workflow_id, job_id, run_id, attempt_id)
}
_ => return Err(invalid_params("log_type must be 'stdout' or 'stderr'")),
};
let content = fs::read_to_string(&log_path)
.map_err(|e| internal_error(format!("Failed to read log file {}: {}", log_path, e)))?;
let output = if let Some(n) = tail_lines {
let lines: Vec<&str> = content.lines().collect();
let start = lines.len().saturating_sub(n);
lines[start..].join("\n")
} else {
content
};
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
output,
)]))
}
pub fn list_failed_jobs(
config: &Configuration,
workflow_id: i64,
) -> Result<CallToolResult, McpError> {
let jobs = paginate_jobs(
config,
workflow_id,
JobListParams::new().with_status(JobStatus::Failed),
)
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let failed_jobs: Vec<serde_json::Value> = jobs
.iter()
.map(|job| {
serde_json::json!({
"job_id": job.id,
"name": job.name,
"command": job.command,
})
})
.collect();
let result = serde_json::json!({
"workflow_id": workflow_id,
"failed_job_count": failed_jobs.len(),
"failed_jobs": failed_jobs,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
pub fn list_jobs_by_status(
config: &Configuration,
workflow_id: i64,
status: &str,
) -> Result<CallToolResult, McpError> {
let status_enum: JobStatus = status
.to_lowercase()
.parse()
.map_err(|_| invalid_params("Invalid status value"))?;
let jobs = paginate_jobs(
config,
workflow_id,
JobListParams::new().with_status(status_enum),
)
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let job_list: Vec<serde_json::Value> = jobs
.iter()
.map(|job| {
serde_json::json!({
"job_id": job.id,
"name": job.name,
"command": job.command,
})
})
.collect();
let result = serde_json::json!({
"workflow_id": workflow_id,
"status": status,
"count": job_list.len(),
"jobs": job_list,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
pub fn check_resource_utilization(
workflow_id: i64,
include_failed: bool,
) -> Result<CallToolResult, McpError> {
let mut cmd = Command::new("torc");
cmd.args(["-f", "json", "reports", "check-resource-utilization"]);
cmd.arg(workflow_id.to_string());
if include_failed {
cmd.arg("--include-failed");
}
let output = cmd
.output()
.map_err(|e| internal_error(format!("Failed to execute torc command: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(internal_error(format!(
"torc command failed: {}",
stderr.trim()
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut response = stdout.to_string();
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&stdout) {
let over_count = json
.get("over_utilization_count")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let resource_violations_count = json
.get("resource_violations")
.and_then(|v| v.as_array())
.map(|a| a.len())
.unwrap_or(0);
if over_count > 0 {
response.push_str("\n\n[RECOVERABLE RESOURCE ISSUES DETECTED!");
response.push_str(&format!(
"\n{} job(s) exceeded their resource allocations.",
over_count
));
if resource_violations_count > 0 {
response.push_str(&format!(
"\n{} resource violations detected (jobs may have multiple violations).",
resource_violations_count
));
}
response
.push_str("\n\nUSE THE recover_workflow TOOL TO AUTOMATICALLY FIX THESE ISSUES:");
response.push_str(
"\n1. Call recover_workflow with dry_run=true to preview the recovery actions",
);
response.push_str(
"\n2. Show the user the preview (memory/runtime adjustments for each job)",
);
response.push_str(
"\n3. Ask user: 'Would you like me to proceed with these recovery actions?'",
);
response
.push_str("\n4. If approved, call recover_workflow with dry_run=false to execute");
response.push_str(&format!(
"\n\nExample: recover_workflow(workflow_id={}, dry_run=true)]",
workflow_id
));
}
}
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
response,
)]))
}
pub fn update_job_resources(
config: &Configuration,
job_id: i64,
num_cpus: Option<i64>,
memory: Option<String>,
runtime: Option<String>,
) -> Result<CallToolResult, McpError> {
let job = default_api::get_job(config, job_id)
.map_err(|e| internal_error(format!("Failed to get job: {}", e)))?;
let req_id = job
.resource_requirements_id
.ok_or_else(|| invalid_params("Job does not have resource requirements to update"))?;
let mut reqs = default_api::get_resource_requirements(config, req_id)
.map_err(|e| internal_error(format!("Failed to get resource requirements: {}", e)))?;
if let Some(cpus) = num_cpus {
reqs.num_cpus = cpus;
}
if let Some(mem) = memory {
reqs.memory = mem;
}
if let Some(rt) = runtime {
reqs.runtime = rt;
}
let updated = default_api::update_resource_requirements(
config,
req_id,
ResourceRequirementsModel {
id: reqs.id,
workflow_id: reqs.workflow_id,
name: reqs.name.clone(),
num_cpus: reqs.num_cpus,
num_gpus: reqs.num_gpus,
num_nodes: reqs.num_nodes,
memory: reqs.memory.clone(),
runtime: reqs.runtime.clone(),
},
)
.map_err(|e| internal_error(format!("Failed to update resource requirements: {}", e)))?;
let workflow_id = job.workflow_id;
let result = serde_json::json!({
"success": true,
"job_id": job_id,
"workflow_id": workflow_id,
"resource_requirements_id": req_id,
"updated": {
"num_cpus": updated.num_cpus,
"num_gpus": updated.num_gpus,
"memory": updated.memory,
"runtime": updated.runtime,
},
"next_steps": {
"note": "Resource updated. To restart the workflow after fixing all issues, \
use the recover_workflow tool (recommended) or manual commands.",
"recommended": format!(
"recover_workflow(workflow_id={}, dry_run=true) to preview, then dry_run=false to execute",
workflow_id
),
}
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
#[allow(clippy::too_many_arguments)]
pub fn create_workflow(
config: &Configuration,
spec_json: &str,
user: &str,
action: &str,
workflow_type: &str,
account: Option<&str>,
hpc_profile: Option<&str>,
output_path: Option<&str>,
) -> Result<CallToolResult, McpError> {
use crate::client::workflow_spec::WorkflowSpec;
use std::io::Write;
if action != "create_workflow" && action != "save_spec_file" && action != "validate" {
return Err(invalid_params(
"action must be 'validate', 'create_workflow' or 'save_spec_file'",
));
}
if workflow_type != "local" && workflow_type != "slurm" {
return Err(invalid_params("workflow_type must be 'local' or 'slurm'"));
}
if workflow_type == "slurm" && account.is_none() {
let prompt_msg = serde_json::json!({
"status": "need_input",
"message": "Slurm workflows require an account for job submission.",
"action_required": "Please ask the user: What Slurm account should be used for this workflow? (This is typically a project or allocation name like 'myproject' or 'research-gpu')",
"then": "Call this tool again with the account parameter set to the user's response."
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&prompt_msg).unwrap_or_default(),
)]));
}
if action == "save_spec_file" && output_path.is_none() {
return Err(invalid_params(
"output_path is required for save_spec_file action",
));
}
let spec: serde_json::Value = serde_json::from_str(spec_json)
.map_err(|e| invalid_params(&format!("Invalid workflow spec JSON: {}", e)))?;
let name = spec
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("unnamed")
.to_string();
if workflow_type == "slurm" {
let has_resource_reqs = spec
.get("resource_requirements")
.and_then(|v| v.as_array())
.map(|a| !a.is_empty())
.unwrap_or(false);
let jobs_missing_reqs: Vec<String> = spec
.get("jobs")
.and_then(|v| v.as_array())
.map(|jobs| {
jobs.iter()
.filter(|job| {
job.get("resource_requirements").is_none()
|| job.get("resource_requirements") == Some(&serde_json::Value::Null)
})
.filter_map(|job| job.get("name").and_then(|n| n.as_str()).map(String::from))
.collect()
})
.unwrap_or_default();
if !has_resource_reqs || !jobs_missing_reqs.is_empty() {
let mut issues = Vec::new();
if !has_resource_reqs {
issues.push(
"The workflow spec is missing a 'resource_requirements' section.".to_string(),
);
}
if !jobs_missing_reqs.is_empty() {
issues.push(format!(
"The following jobs are missing resource_requirements: {}",
jobs_missing_reqs.join(", ")
));
}
let error_msg = serde_json::json!({
"error": "missing_resource_requirements",
"message": "Slurm workflows require resource requirements for all jobs.",
"issues": issues,
"help": "Please ask the user to specify resource requirements for their jobs. Each resource requirement needs: name, num_cpus (integer), memory (e.g., '4g', '512m'), runtime (ISO8601 duration like 'PT1H' for 1 hour, 'PT30M' for 30 minutes). Jobs can share requirements by referencing the same name. Example structure:",
"example": {
"resource_requirements": [
{"name": "small", "num_cpus": 1, "memory": "2g", "runtime": "PT30M", "num_gpus": 0, "num_nodes": 1},
{"name": "large", "num_cpus": 8, "memory": "32g", "runtime": "PT4H", "num_gpus": 0, "num_nodes": 1}
],
"jobs": [
{"name": "job1", "command": "...", "resource_requirements": "small"},
{"name": "job2", "command": "...", "resource_requirements": "large"}
]
}
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&error_msg).unwrap_or_default(),
)]));
}
}
let mut temp_file = tempfile::NamedTempFile::new()
.map_err(|e| internal_error(format!("Failed to create temp file: {}", e)))?;
temp_file
.write_all(spec_json.as_bytes())
.map_err(|e| internal_error(format!("Failed to write spec to temp file: {}", e)))?;
let temp_path = temp_file.path();
if action == "validate" {
let validation_result = WorkflowSpec::validate_spec(temp_path);
let result = serde_json::json!({
"action": "validate",
"valid": validation_result.valid,
"errors": validation_result.errors,
"warnings": validation_result.warnings,
"summary": {
"workflow_name": validation_result.summary.workflow_name,
"workflow_description": validation_result.summary.workflow_description,
"job_count": validation_result.summary.job_count,
"job_count_before_expansion": validation_result.summary.job_count_before_expansion,
"file_count": validation_result.summary.file_count,
"file_count_before_expansion": validation_result.summary.file_count_before_expansion,
"user_data_count": validation_result.summary.user_data_count,
"resource_requirements_count": validation_result.summary.resource_requirements_count,
"slurm_scheduler_count": validation_result.summary.slurm_scheduler_count,
"action_count": validation_result.summary.action_count,
"has_schedule_nodes_action": validation_result.summary.has_schedule_nodes_action,
"job_names": validation_result.summary.job_names,
"scheduler_names": validation_result.summary.scheduler_names,
},
"next_steps": if validation_result.valid {
"Validation passed! Call this tool again with action='create_workflow' to create the workflow."
} else {
"Please fix the errors listed above and call validate again."
}
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]));
}
match (action, workflow_type) {
("create_workflow", "local") => {
let workflow_id =
crate::client::workflow_spec::WorkflowSpec::create_workflow_from_spec(
config, temp_path, user, false, false,
)
.map_err(|e| internal_error(format!("Failed to create workflow: {}", e)))?;
let result = serde_json::json!({
"success": true,
"workflow_id": workflow_id,
"message": format!("Created local workflow '{}' with ID {}", name, workflow_id),
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
("create_workflow", "slurm") => {
let mut cmd = Command::new("torc");
cmd.args(["-f", "json", "workflows", "create-slurm"]);
cmd.args(["--account", account.unwrap()]);
cmd.args(["--user", user]);
if let Some(profile) = hpc_profile {
cmd.args(["--hpc-profile", profile]);
}
cmd.arg(temp_path);
let output = cmd
.output()
.map_err(|e| internal_error(format!("Failed to run torc command: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(internal_error(format!(
"Failed to create slurm workflow: {}",
stderr.trim()
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let result = if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&stdout) {
parsed
} else {
serde_json::json!({
"success": true,
"message": format!("Created slurm workflow '{}'", name),
"output": stdout.trim(),
})
};
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
("save_spec_file", "local") => {
let output_path = output_path.unwrap();
let content = serde_json::to_string_pretty(&spec)
.map_err(|e| internal_error(format!("Failed to serialize spec: {}", e)))?;
std::fs::write(output_path, &content)
.map_err(|e| internal_error(format!("Failed to write spec file: {}", e)))?;
let result = serde_json::json!({
"success": true,
"message": format!("Saved workflow spec '{}' to {}", name, output_path),
"output_path": output_path,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
("save_spec_file", "slurm") => {
let output_path = output_path.unwrap();
let mut cmd = Command::new("torc");
cmd.args(["slurm", "generate"]);
cmd.args(["--account", account.unwrap()]);
cmd.args(["--output", output_path]);
if let Some(profile) = hpc_profile {
cmd.args(["--profile", profile]);
}
cmd.arg(temp_path);
let output = cmd
.output()
.map_err(|e| internal_error(format!("Failed to run torc command: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(internal_error(format!(
"Failed to generate slurm spec: {}",
stderr.trim()
)));
}
let result = serde_json::json!({
"success": true,
"message": format!("Generated slurm workflow spec '{}' at {}", name, output_path),
"output_path": output_path,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
_ => Err(invalid_params("Invalid action/workflow_type combination")),
}
}
pub fn get_execution_plan(
config: &Configuration,
spec_or_id: &str,
) -> Result<CallToolResult, McpError> {
use crate::client::commands::pagination::resource_requirements::{
ResourceRequirementsListParams, paginate_resource_requirements,
};
use crate::client::commands::pagination::slurm_schedulers::{
SlurmSchedulersListParams, paginate_slurm_schedulers,
};
use crate::client::execution_plan::ExecutionPlan;
use crate::client::workflow_spec::WorkflowSpec;
use std::io::Write;
if let Ok(workflow_id) = spec_or_id.parse::<i64>() {
let workflow = default_api::get_workflow(config, workflow_id)
.map_err(|e| internal_error(format!("Failed to get workflow: {}", e)))?;
let jobs = paginate_jobs(
config,
workflow_id,
JobListParams::new().with_include_relationships(true),
)
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let actions = default_api::get_workflow_actions(config, workflow_id)
.map_err(|e| internal_error(format!("Failed to get workflow actions: {}", e)))?;
let slurm_schedulers =
paginate_slurm_schedulers(config, workflow_id, SlurmSchedulersListParams::new())
.unwrap_or_default();
let resource_requirements = paginate_resource_requirements(
config,
workflow_id,
ResourceRequirementsListParams::new(),
)
.unwrap_or_default();
let plan = ExecutionPlan::from_database_models(
&workflow,
&jobs,
&actions,
&slurm_schedulers,
&resource_requirements,
)
.map_err(|e| internal_error(format!("Failed to build execution plan: {}", e)))?;
let events_json: Vec<serde_json::Value> = plan
.events
.values()
.map(|event| {
serde_json::json!({
"id": event.id,
"trigger": event.trigger,
"trigger_description": event.trigger_description,
"scheduler_allocations": event.scheduler_allocations.iter().map(|alloc| {
serde_json::json!({
"scheduler": alloc.scheduler,
"scheduler_type": alloc.scheduler_type,
"num_allocations": alloc.num_allocations,
"jobs": alloc.jobs,
})
}).collect::<Vec<_>>(),
"jobs_becoming_ready": event.jobs_becoming_ready,
"depends_on_events": event.depends_on_events,
"unlocks_events": event.unlocks_events,
})
})
.collect();
let result = serde_json::json!({
"source": "database",
"workflow_id": workflow_id,
"workflow_name": workflow.name,
"total_events": plan.events.len(),
"total_jobs": jobs.len(),
"root_events": plan.root_events,
"leaf_events": plan.leaf_events,
"events": events_json,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
} else {
let mut temp_file = tempfile::NamedTempFile::new()
.map_err(|e| internal_error(format!("Failed to create temp file: {}", e)))?;
temp_file
.write_all(spec_or_id.as_bytes())
.map_err(|e| internal_error(format!("Failed to write spec to temp file: {}", e)))?;
let temp_path = temp_file.path();
let mut spec = WorkflowSpec::from_spec_file(temp_path).map_err(|e| {
internal_error(format!("Failed to parse workflow specification: {}", e))
})?;
spec.expand_parameters()
.map_err(|e| internal_error(format!("Failed to expand parameters: {}", e)))?;
spec.validate_actions()
.map_err(|e| internal_error(format!("Failed to validate actions: {}", e)))?;
spec.substitute_variables()
.map_err(|e| internal_error(format!("Failed to substitute variables: {}", e)))?;
let plan = ExecutionPlan::from_spec(&spec)
.map_err(|e| internal_error(format!("Failed to build execution plan: {}", e)))?;
let events_json: Vec<serde_json::Value> = plan
.events
.values()
.map(|event| {
serde_json::json!({
"id": event.id,
"trigger": event.trigger,
"trigger_description": event.trigger_description,
"scheduler_allocations": event.scheduler_allocations.iter().map(|alloc| {
serde_json::json!({
"scheduler": alloc.scheduler,
"scheduler_type": alloc.scheduler_type,
"num_allocations": alloc.num_allocations,
"jobs": alloc.jobs,
})
}).collect::<Vec<_>>(),
"jobs_becoming_ready": event.jobs_becoming_ready,
"depends_on_events": event.depends_on_events,
"unlocks_events": event.unlocks_events,
})
})
.collect();
let result = serde_json::json!({
"source": "spec",
"workflow_name": spec.name,
"total_events": plan.events.len(),
"total_jobs": spec.jobs.len(),
"root_events": plan.root_events,
"leaf_events": plan.leaf_events,
"events": events_json,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
}
pub fn analyze_workflow_logs(
output_dir: &Path,
workflow_id: i64,
) -> Result<CallToolResult, McpError> {
use crate::client::commands::logs::analyze_workflow_logs as analyze_logs;
let result = analyze_logs(output_dir, workflow_id)
.map_err(|e| internal_error(format!("Failed to analyze logs: {}", e)))?;
let summary = if result.error_count == 0 && result.warning_count == 0 {
"No errors or warnings detected in log files.".to_string()
} else {
format!(
"Found {} error(s) and {} warning(s) across {} log files.",
result.error_count, result.warning_count, result.files_parsed
)
};
let errors_by_type: Vec<serde_json::Value> = result
.errors_by_type
.iter()
.map(|(pattern, count)| {
serde_json::json!({
"type": pattern,
"count": count,
})
})
.collect();
let sample_errors: Vec<serde_json::Value> = result
.errors
.iter()
.filter(|e| e.severity == crate::client::commands::logs::ErrorSeverity::Error)
.take(10)
.map(|e| {
serde_json::json!({
"file": e.file,
"line": e.line_number,
"type": e.pattern_name,
"content": e.line_content,
})
})
.collect();
let oom_count = result.errors_by_type.get("oom").copied().unwrap_or(0)
+ result
.errors_by_type
.get("memory_allocation_failed")
.copied()
.unwrap_or(0);
let timeout_count = result.errors_by_type.get("timeout").copied().unwrap_or(0)
+ result
.errors_by_type
.get("time_limit")
.copied()
.unwrap_or(0);
let has_recoverable_errors = oom_count > 0 || timeout_count > 0;
let mut response = serde_json::json!({
"workflow_id": workflow_id,
"summary": summary,
"files_parsed": result.files_parsed,
"error_count": result.error_count,
"warning_count": result.warning_count,
"errors_by_type": errors_by_type,
"sample_errors": sample_errors,
"files_with_errors": result.errors_by_file.keys().collect::<Vec<_>>(),
});
if has_recoverable_errors {
let mut recovery_info = serde_json::json!({
"oom_errors": oom_count,
"timeout_errors": timeout_count,
});
recovery_info["recommendation"] = serde_json::json!(
"RECOVERABLE ERRORS DETECTED! Use the recover_workflow tool to automatically fix these issues."
);
recovery_info["recovery_workflow"] = serde_json::json!([
"1. Call recover_workflow with dry_run=true to preview the recovery actions",
"2. Show the user the preview results (memory/runtime adjustments)",
"3. Ask user: 'Would you like me to proceed with these recovery actions?'",
"4. If approved, call recover_workflow with dry_run=false to execute"
]);
recovery_info["tool_call_example"] = serde_json::json!({
"tool": "recover_workflow",
"parameters": {
"workflow_id": workflow_id,
"dry_run": true,
"memory_multiplier": 1.5,
"runtime_multiplier": 1.4,
},
"note": "Start with dry_run=true to preview changes"
});
if oom_count > 0 {
recovery_info["oom_fix"] = serde_json::json!(format!(
"{} job(s) ran out of memory. Recovery will increase memory by 1.5x (configurable).",
oom_count
));
}
if timeout_count > 0 {
recovery_info["timeout_fix"] = serde_json::json!(format!(
"{} job(s) exceeded time limit. Recovery will increase runtime by 1.4x (configurable).",
timeout_count
));
}
response["recovery"] = recovery_info;
}
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
pub fn get_workflow_summary(workflow_id: i64) -> Result<CallToolResult, McpError> {
let output = Command::new("torc")
.args(["-f", "json", "reports", "summary", &workflow_id.to_string()])
.output()
.map_err(|e| internal_error(format!("Failed to execute torc command: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(internal_error(format!(
"torc command failed: {}",
stderr.trim()
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
stdout.to_string(),
)]))
}
#[allow(clippy::too_many_arguments)]
pub fn list_results(
workflow_id: i64,
job_id: Option<i64>,
run_id: Option<i64>,
return_code: Option<i64>,
failed_only: bool,
status: Option<String>,
limit: i64,
sort_by: Option<String>,
reverse_sort: bool,
) -> Result<CallToolResult, McpError> {
let mut cmd = Command::new("torc");
cmd.args(["-f", "json", "results", "list", &workflow_id.to_string()]);
if let Some(jid) = job_id {
cmd.args(["--job-id", &jid.to_string()]);
}
if let Some(rid) = run_id {
cmd.args(["--run-id", &rid.to_string()]);
}
if let Some(rc) = return_code {
cmd.args(["--return-code", &rc.to_string()]);
}
if failed_only {
cmd.arg("--failed");
}
if let Some(s) = status {
cmd.args(["--status", &s]);
}
cmd.args(["--limit", &limit.to_string()]);
if let Some(sb) = sort_by {
cmd.args(["--sort-by", &sb]);
}
if reverse_sort {
cmd.arg("--reverse-sort");
}
let output = cmd
.output()
.map_err(|e| internal_error(format!("Failed to execute torc command: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(internal_error(format!(
"torc command failed: {}",
stderr.trim()
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
stdout.to_string(),
)]))
}
pub fn get_slurm_sacct(workflow_id: i64) -> Result<CallToolResult, McpError> {
let output = Command::new("torc")
.args(["-f", "json", "slurm", "sacct", &workflow_id.to_string()])
.output()
.map_err(|e| internal_error(format!("Failed to execute torc command: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(internal_error(format!(
"torc command failed: {}",
stderr.trim()
)));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut response = stdout.to_string();
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&stdout)
&& let Some(rows) = json.get("rows").and_then(|r| r.as_array())
{
let mut total_seconds: i64 = 0;
for row in rows {
if let Some(elapsed) = row.get("elapsed").and_then(|e| e.as_str()) {
total_seconds += parse_elapsed_to_seconds(elapsed);
}
}
if total_seconds > 0 {
let hours = total_seconds / 3600;
let minutes = (total_seconds % 3600) / 60;
let seconds = total_seconds % 60;
let summary = format!(
"\n\n[SUMMARY: Total walltime consumed: {}h {}m {}s ({} seconds)]",
hours, minutes, seconds, total_seconds
);
response.push_str(&summary);
}
}
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
response,
)]))
}
fn parse_elapsed_to_seconds(elapsed: &str) -> i64 {
let mut total = 0i64;
let parts: Vec<&str> = elapsed.split_whitespace().collect();
for part in parts {
if let Some(num_str) = part.strip_suffix('d') {
if let Ok(days) = num_str.parse::<i64>() {
total += days * 86400;
}
} else if let Some(num_str) = part.strip_suffix('h') {
if let Ok(hours) = num_str.parse::<i64>() {
total += hours * 3600;
}
} else if let Some(num_str) = part.strip_suffix('m') {
if let Ok(minutes) = num_str.parse::<i64>() {
total += minutes * 60;
}
} else if let Some(num_str) = part.strip_suffix('s')
&& let Ok(seconds) = num_str.parse::<i64>()
{
total += seconds;
}
}
total
}
pub fn recover_workflow(
workflow_id: i64,
output_dir: &Path,
dry_run: bool,
memory_multiplier: f64,
runtime_multiplier: f64,
retry_unknown: bool,
) -> Result<CallToolResult, McpError> {
let mut cmd = Command::new("torc");
cmd.args(["recover", &workflow_id.to_string()]);
cmd.args(["--output-dir", &output_dir.display().to_string()]);
cmd.args(["--memory-multiplier", &memory_multiplier.to_string()]);
cmd.args(["--runtime-multiplier", &runtime_multiplier.to_string()]);
if dry_run {
cmd.arg("--dry-run");
}
if retry_unknown {
cmd.arg("--retry-unknown");
}
let output = cmd
.output()
.map_err(|e| internal_error(format!("Failed to execute torc recover: {}", e)))?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if !output.status.success() {
return Err(internal_error(format!(
"torc recover failed: {}",
stderr.trim()
)));
}
let mut response = serde_json::json!({
"workflow_id": workflow_id,
"dry_run": dry_run,
"memory_multiplier": memory_multiplier,
"runtime_multiplier": runtime_multiplier,
"retry_unknown": retry_unknown,
"output": stdout.trim(),
});
if dry_run {
response["next_steps"] = serde_json::json!({
"instruction": "Review the recovery preview above. If the proposed changes look correct, \
ask the user: 'Would you like me to proceed with these recovery actions?'",
"if_approved": "Call recover_workflow again with dry_run=false to execute the recovery.",
});
} else {
response["status"] = serde_json::json!("Recovery complete");
response["message"] = serde_json::json!(
"The workflow has been recovered. Failed jobs have been reset, resources adjusted, \
and Slurm allocations regenerated and submitted."
);
}
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
pub fn list_pending_failed_jobs(
config: &Configuration,
workflow_id: i64,
output_dir: &Path,
) -> Result<CallToolResult, McpError> {
let jobs = paginate_jobs(
config,
workflow_id,
JobListParams::new().with_status(JobStatus::PendingFailed),
)
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let mut pending_jobs: Vec<serde_json::Value> = Vec::new();
for job in &jobs {
let job_id = match job.id {
Some(id) => id,
None => {
return Err(internal_error(format!(
"Encountered pending_failed job without an ID: name={:?}",
job.name
)));
}
};
let result = paginate_results(
config,
workflow_id,
ResultListParams::new().with_job_id(job_id).with_limit(1),
)
.ok()
.and_then(|items| items.into_iter().next());
let (stderr_tail, stderr_read_error) = if let Some(ref res) = result {
let run_id = res.run_id;
let attempt_id = job.attempt_id.unwrap_or(1);
let stderr_path =
log_paths::get_job_stderr_path(output_dir, workflow_id, job_id, run_id, attempt_id);
match fs::read_to_string(&stderr_path) {
Ok(content) => {
let mut lines: Vec<&str> = content.lines().collect();
let tail_lines = if lines.len() > 50 {
lines.split_off(lines.len() - 50)
} else {
lines
};
(tail_lines.join("\n"), None)
}
Err(e) => (String::new(), Some(format!("Failed to read stderr: {}", e))),
}
} else {
(String::new(), Some("No result found".to_string()))
};
pending_jobs.push(serde_json::json!({
"job_id": job_id,
"name": job.name,
"command": job.command,
"attempt_id": job.attempt_id,
"return_code": result.as_ref().map(|r| r.return_code),
"exec_time_minutes": result.as_ref().map(|r| r.exec_time_minutes),
"stderr_tail": stderr_tail,
"stderr_read_error": stderr_read_error,
}));
}
let result = serde_json::json!({
"workflow_id": workflow_id,
"pending_failed_count": pending_jobs.len(),
"pending_failed_jobs": pending_jobs,
"guidance": if pending_jobs.is_empty() {
"No jobs are awaiting classification."
} else {
"These jobs failed without a matching failure handler. Analyze the stderr output \
to classify each failure as transient (retry) or permanent (fail). Use \
classify_and_resolve_failures to act on your classification."
},
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&result).unwrap_or_default(),
)]))
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct FailureClassification {
pub job_id: i64,
pub action: String,
pub memory: Option<String>,
pub runtime: Option<String>,
pub reason: Option<String>,
}
pub fn classify_and_resolve_failures(
config: &Configuration,
workflow_id: i64,
classifications: Vec<FailureClassification>,
dry_run: bool,
) -> Result<CallToolResult, McpError> {
let workflow = match default_api::get_workflow(config, workflow_id) {
Ok(w) => w,
Err(e) => {
return Err(internal_error(format!(
"Failed to get workflow {}: {}",
workflow_id, e
)));
}
};
if !workflow.use_pending_failed.unwrap_or(false) {
return Err(invalid_params(&format!(
"Workflow {} does not have use_pending_failed enabled. \
AI-assisted recovery is disabled for this workflow. \
Jobs will use Failed status instead of PendingFailed.",
workflow_id
)));
}
let mut results: Vec<serde_json::Value> = Vec::new();
let mut jobs_to_retry: Vec<i64> = Vec::new();
let mut jobs_to_fail: Vec<i64> = Vec::new();
for classification in &classifications {
let job_id = classification.job_id;
let action = classification.action.to_lowercase();
let job = match default_api::get_job(config, job_id) {
Ok(j) => j,
Err(e) => {
results.push(serde_json::json!({
"job_id": job_id,
"status": "error",
"message": format!("Failed to get job: {}", e),
}));
continue;
}
};
if job.status != Some(JobStatus::PendingFailed) {
results.push(serde_json::json!({
"job_id": job_id,
"status": "skipped",
"message": format!("Job is not in pending_failed status (current: {:?})", job.status),
}));
continue;
}
if action != "retry" && action != "fail" {
results.push(serde_json::json!({
"job_id": job_id,
"status": "error",
"message": format!("Invalid action '{}'. Must be 'retry' or 'fail'.", action),
}));
continue;
}
if dry_run {
results.push(serde_json::json!({
"job_id": job_id,
"job_name": job.name,
"action": action,
"memory_adjustment": classification.memory,
"runtime_adjustment": classification.runtime,
"reason": classification.reason,
"status": "would_apply",
}));
} else {
let resource_adjustment_warning = if (classification.memory.is_some()
|| classification.runtime.is_some())
&& action == "retry"
{
if let Some(req_id) = job.resource_requirements_id {
match default_api::get_resource_requirements(config, req_id) {
Ok(mut reqs) => {
if let Some(ref mem) = classification.memory {
reqs.memory = mem.clone();
}
if let Some(ref rt) = classification.runtime {
reqs.runtime = rt.clone();
}
match default_api::update_resource_requirements(
config,
req_id,
ResourceRequirementsModel {
id: reqs.id,
workflow_id: reqs.workflow_id,
name: reqs.name.clone(),
num_cpus: reqs.num_cpus,
num_gpus: reqs.num_gpus,
num_nodes: reqs.num_nodes,
memory: reqs.memory.clone(),
runtime: reqs.runtime.clone(),
},
) {
Ok(_) => None,
Err(e) => Some(format!("Failed to update resources: {}", e)),
}
}
Err(e) => Some(format!("Failed to get resource requirements: {}", e)),
}
} else {
Some("No resource requirements defined for this job".to_string())
}
} else {
None
};
if action == "retry" {
jobs_to_retry.push(job_id);
} else {
jobs_to_fail.push(job_id);
}
let mut job_result = serde_json::json!({
"job_id": job_id,
"job_name": job.name,
"action": action,
"reason": classification.reason,
"status": "pending_application",
});
if let Some(warning) = resource_adjustment_warning {
job_result["resource_adjustment_warning"] = serde_json::json!(warning);
}
results.push(job_result);
}
}
if !dry_run && (!jobs_to_retry.is_empty() || !jobs_to_fail.is_empty()) {
for job_id in &jobs_to_retry {
let output = Command::new("torc")
.args(["jobs", "update", &job_id.to_string(), "--status", "ready"])
.output();
match output {
Ok(out) if out.status.success() => {
for r in &mut results {
if r.get("job_id").and_then(|v| v.as_i64()) == Some(*job_id) {
r["status"] = serde_json::json!("applied");
}
}
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
for r in &mut results {
if r.get("job_id").and_then(|v| v.as_i64()) == Some(*job_id) {
r["status"] = serde_json::json!("error");
r["message"] = serde_json::json!(format!(
"Failed to reset job: {}",
stderr.trim()
));
}
}
}
Err(e) => {
for r in &mut results {
if r.get("job_id").and_then(|v| v.as_i64()) == Some(*job_id) {
r["status"] = serde_json::json!("error");
r["message"] = serde_json::json!(format!(
"Failed to spawn 'torc' command: {}. Is torc in PATH?",
e
));
}
}
}
}
}
for job_id in &jobs_to_fail {
let output = Command::new("torc")
.args(["jobs", "update", &job_id.to_string(), "--status", "failed"])
.output();
match output {
Ok(out) if out.status.success() => {
for r in &mut results {
if r.get("job_id").and_then(|v| v.as_i64()) == Some(*job_id) {
r["status"] = serde_json::json!("applied");
}
}
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
for r in &mut results {
if r.get("job_id").and_then(|v| v.as_i64()) == Some(*job_id) {
r["status"] = serde_json::json!("error");
r["message"] = serde_json::json!(format!(
"Failed to mark as failed: {}",
stderr.trim()
));
}
}
}
Err(e) => {
for r in &mut results {
if r.get("job_id").and_then(|v| v.as_i64()) == Some(*job_id) {
r["status"] = serde_json::json!("error");
r["message"] = serde_json::json!(format!(
"Failed to spawn 'torc' command: {}. Is torc in PATH?",
e
));
}
}
}
}
}
}
let response = serde_json::json!({
"workflow_id": workflow_id,
"dry_run": dry_run,
"total_classifications": classifications.len(),
"jobs_to_retry": jobs_to_retry.len(),
"jobs_to_fail": jobs_to_fail.len(),
"results": results,
"next_steps": if dry_run {
"Review the classifications above. If they look correct, call this tool again with dry_run=false to apply them."
} else if !jobs_to_retry.is_empty() {
"Classifications applied. Jobs marked for retry are now in 'ready' status. \
You may need to regenerate Slurm schedulers if running on HPC."
} else {
"Classifications applied."
},
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
fn compute_stats(values: &[f64]) -> serde_json::Value {
if values.is_empty() {
return serde_json::json!(null);
}
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let min = sorted[0];
let max = sorted[sorted.len() - 1];
let sum: f64 = sorted.iter().sum();
let mean = sum / sorted.len() as f64;
let median = if sorted.len().is_multiple_of(2) {
(sorted[sorted.len() / 2 - 1] + sorted[sorted.len() / 2]) / 2.0
} else {
sorted[sorted.len() / 2]
};
serde_json::json!({
"min": min,
"max": max,
"mean": (mean * 100.0).round() / 100.0,
"median": (median * 100.0).round() / 100.0,
})
}
fn compute_memory_stats(values: &[i64]) -> serde_json::Value {
if values.is_empty() {
return serde_json::json!(null);
}
let mut sorted = values.to_vec();
sorted.sort();
let min = sorted[0];
let max = sorted[sorted.len() - 1];
let sum: i64 = sorted.iter().sum();
let mean = sum as f64 / sorted.len() as f64;
let median = if sorted.len().is_multiple_of(2) {
(sorted[sorted.len() / 2 - 1] + sorted[sorted.len() / 2]) / 2
} else {
sorted[sorted.len() / 2]
};
serde_json::json!({
"min": min,
"max": max,
"mean": mean.round() as i64,
"median": median,
"min_formatted": format_memory_bytes_short(min as u64),
"max_formatted": format_memory_bytes_short(max as u64),
})
}
pub fn analyze_resource_usage(
config: &Configuration,
workflow_id: i64,
completed_only: bool,
) -> Result<CallToolResult, McpError> {
let jobs = paginate_jobs(config, workflow_id, JobListParams::new())
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let resource_requirements =
paginate_resource_requirements(config, workflow_id, ResourceRequirementsListParams::new())
.map_err(|e| internal_error(format!("Failed to list resource requirements: {}", e)))?;
let results = paginate_results(config, workflow_id, ResultListParams::new())
.map_err(|e| internal_error(format!("Failed to list results: {}", e)))?;
let mut latest_results: std::collections::HashMap<i64, &crate::models::ResultModel> =
std::collections::HashMap::new();
for result in &results {
if completed_only && result.return_code != 0 {
continue;
}
let entry = latest_results.entry(result.job_id).or_insert(result);
if result.run_id > entry.run_id
|| (result.run_id == entry.run_id
&& result.attempt_id.unwrap_or(1) > entry.attempt_id.unwrap_or(1))
{
*entry = result;
}
}
let rr_map: std::collections::HashMap<i64, &ResourceRequirementsModel> = resource_requirements
.iter()
.filter_map(|rr| rr.id.map(|id| (id, rr)))
.collect();
let mut groups: std::collections::HashMap<Option<i64>, Vec<&crate::models::JobModel>> =
std::collections::HashMap::new();
for job in &jobs {
groups
.entry(job.resource_requirements_id)
.or_default()
.push(job);
}
let mut resource_groups: Vec<serde_json::Value> = Vec::new();
let mut jobs_without_results: Vec<serde_json::Value> = Vec::new();
let total_jobs = jobs.len();
let mut total_jobs_with_results = 0;
for (rr_id, group_jobs) in &groups {
let rr = rr_id.and_then(|id| rr_map.get(&id));
let mut job_details: Vec<serde_json::Value> = Vec::new();
let mut peak_memory_values: Vec<i64> = Vec::new();
let mut peak_cpu_values: Vec<f64> = Vec::new();
let mut exec_time_values: Vec<f64> = Vec::new();
let mut jobs_with_results_count = 0;
for job in group_jobs {
let job_id = match job.id {
Some(id) => id,
None => continue,
};
if let Some(result) = latest_results.get(&job_id) {
jobs_with_results_count += 1;
total_jobs_with_results += 1;
if let Some(peak_mem) = result.peak_memory_bytes {
peak_memory_values.push(peak_mem);
}
if let Some(peak_cpu) = result.peak_cpu_percent {
peak_cpu_values.push(peak_cpu);
}
exec_time_values.push(result.exec_time_minutes);
job_details.push(serde_json::json!({
"job_id": job_id,
"name": job.name,
"peak_memory_bytes": result.peak_memory_bytes,
"peak_memory_formatted": result.peak_memory_bytes
.map(|b| format_memory_bytes_short(b as u64)),
"peak_cpu_percent": result.peak_cpu_percent,
"exec_time_minutes": (result.exec_time_minutes * 100.0).round() / 100.0,
"return_code": result.return_code,
}));
} else {
jobs_without_results.push(serde_json::json!({
"job_id": job_id,
"name": job.name,
}));
}
}
let config_json = rr.map(|r| {
serde_json::json!({
"memory": r.memory,
"num_cpus": r.num_cpus,
"runtime": r.runtime,
"num_gpus": r.num_gpus,
"num_nodes": r.num_nodes,
})
});
let summary = serde_json::json!({
"peak_memory_bytes": compute_memory_stats(&peak_memory_values),
"peak_cpu_percent": compute_stats(&peak_cpu_values),
"exec_time_minutes": compute_stats(&exec_time_values),
});
resource_groups.push(serde_json::json!({
"resource_requirements_id": rr_id,
"name": rr.map(|r| r.name.as_str()),
"config": config_json,
"job_count": group_jobs.len(),
"jobs_with_results": jobs_with_results_count,
"summary": summary,
"jobs": job_details,
}));
}
resource_groups.sort_by_key(|g| {
g.get("resource_requirements_id")
.and_then(|v| v.as_i64())
.unwrap_or(i64::MAX)
});
let response = serde_json::json!({
"workflow_id": workflow_id,
"total_jobs": total_jobs,
"total_jobs_with_results": total_jobs_with_results,
"resource_groups": resource_groups,
"jobs_without_results": jobs_without_results,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
#[derive(Debug, Clone)]
pub struct ResourceGroup {
pub memory: String,
pub num_cpus: i64,
pub runtime: String,
pub num_gpus: Option<i64>,
pub num_nodes: Option<i64>,
pub name: Option<String>,
pub job_ids: Vec<i64>,
}
pub fn regroup_job_resources(
config: &Configuration,
workflow_id: i64,
groups: Vec<ResourceGroup>,
dry_run: bool,
) -> Result<CallToolResult, McpError> {
let jobs = paginate_jobs(config, workflow_id, JobListParams::new())
.map_err(|e| internal_error(format!("Failed to list jobs: {}", e)))?;
let resource_requirements =
paginate_resource_requirements(config, workflow_id, ResourceRequirementsListParams::new())
.map_err(|e| internal_error(format!("Failed to list resource requirements: {}", e)))?;
let job_map: std::collections::HashMap<i64, &crate::models::JobModel> =
jobs.iter().filter_map(|j| j.id.map(|id| (id, j))).collect();
let rr_map: std::collections::HashMap<i64, &ResourceRequirementsModel> = resource_requirements
.iter()
.filter_map(|rr| rr.id.map(|id| (id, rr)))
.collect();
let mut errors: Vec<String> = Vec::new();
let mut all_job_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
for (i, group) in groups.iter().enumerate() {
if group.job_ids.is_empty() {
errors.push(format!("Group {} has no job_ids", i));
}
for &job_id in &group.job_ids {
if !job_map.contains_key(&job_id) {
errors.push(format!(
"Job {} in group {} does not belong to workflow {}",
job_id, i, workflow_id
));
}
if !all_job_ids.insert(job_id) {
errors.push(format!("Job {} appears in multiple groups", job_id));
}
}
}
if !errors.is_empty() {
let response = serde_json::json!({
"success": false,
"errors": errors,
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]));
}
let mut group_previews: Vec<serde_json::Value> = Vec::new();
for (i, group) in groups.iter().enumerate() {
let group_name = group.name.clone().unwrap_or_else(|| format!("group_{}", i));
let mut job_previews: Vec<serde_json::Value> = Vec::new();
for &job_id in &group.job_ids {
let job = job_map[&job_id];
let current_rr = job.resource_requirements_id.and_then(|id| rr_map.get(&id));
let resolved_num_gpus = group
.num_gpus
.or_else(|| current_rr.map(|rr| rr.num_gpus))
.unwrap_or(0);
let resolved_num_nodes = group
.num_nodes
.or_else(|| current_rr.map(|rr| rr.num_nodes))
.unwrap_or(1);
job_previews.push(serde_json::json!({
"job_id": job_id,
"name": job.name,
"current_rr": current_rr.map(|rr| serde_json::json!({
"id": rr.id,
"name": rr.name,
"memory": rr.memory,
"num_cpus": rr.num_cpus,
"runtime": rr.runtime,
"num_gpus": rr.num_gpus,
"num_nodes": rr.num_nodes,
})),
"new_rr": {
"memory": &group.memory,
"num_cpus": group.num_cpus,
"runtime": &group.runtime,
"num_gpus": resolved_num_gpus,
"num_nodes": resolved_num_nodes,
},
}));
}
group_previews.push(serde_json::json!({
"group_index": i,
"name": group_name,
"new_config": {
"memory": &group.memory,
"num_cpus": group.num_cpus,
"runtime": &group.runtime,
"num_gpus": group.num_gpus,
"num_nodes": group.num_nodes,
},
"job_count": group.job_ids.len(),
"jobs": job_previews,
}));
}
if dry_run {
let response = serde_json::json!({
"workflow_id": workflow_id,
"dry_run": true,
"groups": group_previews,
"total_jobs_affected": all_job_ids.len(),
"next_steps": "Review the proposed regrouping. If it looks correct, call again with dry_run=false to apply.",
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]));
}
let mut applied_groups: Vec<serde_json::Value> = Vec::new();
let mut total_jobs_updated = 0;
let mut apply_errors: Vec<String> = Vec::new();
for (i, group) in groups.iter().enumerate() {
let group_name = group.name.clone().unwrap_or_else(|| format!("group_{}", i));
let first_job = job_map[&group.job_ids[0]];
let current_rr = first_job
.resource_requirements_id
.and_then(|id| rr_map.get(&id));
let resolved_num_gpus = group
.num_gpus
.or_else(|| current_rr.map(|rr| rr.num_gpus))
.unwrap_or(0);
let resolved_num_nodes = group
.num_nodes
.or_else(|| current_rr.map(|rr| rr.num_nodes))
.unwrap_or(1);
let new_rr = ResourceRequirementsModel {
id: None,
workflow_id,
name: group_name.clone(),
num_cpus: group.num_cpus,
num_gpus: resolved_num_gpus,
num_nodes: resolved_num_nodes,
memory: group.memory.clone(),
runtime: group.runtime.clone(),
};
let created_rr = match default_api::create_resource_requirements(config, new_rr) {
Ok(rr) => rr,
Err(e) => {
apply_errors.push(format!(
"Failed to create RR for group '{}': {}",
group_name, e
));
continue;
}
};
let new_rr_id = match created_rr.id {
Some(id) => id,
None => {
apply_errors.push(format!(
"Created RR for group '{}' but got no ID back",
group_name
));
continue;
}
};
let mut jobs_updated: Vec<i64> = Vec::new();
for &job_id in &group.job_ids {
let job = job_map[&job_id];
let mut updated_job = job.clone();
updated_job.resource_requirements_id = Some(new_rr_id);
match default_api::update_job(config, job_id, updated_job) {
Ok(_) => {
jobs_updated.push(job_id);
total_jobs_updated += 1;
}
Err(e) => {
apply_errors.push(format!(
"Failed to update job {} in group '{}': {}",
job_id, group_name, e
));
}
}
}
applied_groups.push(serde_json::json!({
"group_index": i,
"name": group_name,
"resource_requirements_id": new_rr_id,
"config": {
"memory": created_rr.memory,
"num_cpus": created_rr.num_cpus,
"runtime": created_rr.runtime,
"num_gpus": created_rr.num_gpus,
"num_nodes": created_rr.num_nodes,
},
"jobs_updated": jobs_updated,
"jobs_failed": group.job_ids.iter()
.filter(|id| !jobs_updated.contains(id))
.collect::<Vec<_>>(),
}));
}
let response = serde_json::json!({
"workflow_id": workflow_id,
"dry_run": false,
"success": apply_errors.is_empty(),
"groups": applied_groups,
"total_jobs_updated": total_jobs_updated,
"errors": apply_errors,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
const GITHUB_RAW_BASE: &str = "https://raw.githubusercontent.com/NatLabRockies/torc/main";
fn example_descriptions() -> Vec<(&'static str, &'static str)> {
vec![
(
"sample_workflow",
"Complete example with files, user_data, resource requirements, jobs, and Slurm schedulers",
),
(
"diamond_workflow",
"Classic diamond dependency pattern (fan-out and fan-in) using file-based dependencies",
),
(
"hundred_jobs_parameterized",
"Generates 100 parallel jobs using parameter ranges",
),
(
"data_pipeline_parameterized",
"Multi-dataset pipeline with parameter sweeps and fan-in aggregation",
),
(
"hyperparameter_sweep",
"ML hyperparameter grid search (learning rate × batch size × optimizer)",
),
(
"hyperparameter_sweep_shared_params",
"Hyperparameter sweep using shared workflow-level parameters",
),
(
"simulation_sweep",
"Parameter sweep for scientific simulations",
),
(
"multi_stage_barrier_pattern",
"Multi-stage workflow using barrier jobs (1000+ jobs per stage)",
),
(
"workflow_actions_simple",
"Basic workflow with on_workflow_start and on_workflow_complete actions",
),
(
"workflow_actions_simple_slurm",
"Multi-stage Slurm workflow with automated node scheduling per stage",
),
(
"workflow_actions_data_pipeline",
"Data pipeline with automated resource management via actions",
),
(
"workflow_actions_ml_training",
"ML training with dynamic GPU allocation using on_jobs_ready actions",
),
(
"slurm_staged_pipeline",
"Multi-stage Slurm pipeline with automated scheduling and resource monitoring",
),
(
"resource_monitoring_demo",
"Demonstrates CPU and memory monitoring with time-series data collection",
),
(
"failure_handler_simulation",
"Demonstrates failure handler rules for automatic retry on specific exit codes",
),
(
"failure_handler_demo",
"Simple failure handler demo with retry logic",
),
(
"simple_retry",
"Minimal example of automatic job retry on failure",
),
(
"zip_parameter_mode",
"Demonstrates zip parameter mode (parallel iteration vs Cartesian product)",
),
]
}
fn doc_topic_mapping() -> Vec<(&'static str, &'static str, &'static str)> {
vec![
(
"workflow-spec",
"core/reference/workflow-spec.md",
"Complete workflow specification reference",
),
(
"dependencies",
"core/concepts/dependencies.md",
"Job dependency types (explicit, file-based, user_data)",
),
(
"parameterization",
"core/reference/parameterization.md",
"Parameter sweeps, ranges, format specifiers",
),
(
"slurm",
"specialized/hpc/slurm.md",
"Slurm HPC integration and scheduler configuration",
),
(
"job-states",
"core/concepts/job-states.md",
"Job status lifecycle and transitions",
),
(
"actions",
"specialized/design/workflow-actions.md",
"Workflow actions (triggers, schedule_nodes, run_commands)",
),
(
"failure-handlers",
"specialized/fault-tolerance/failure-handlers.md",
"Automatic retry rules for exit codes",
),
(
"recovery",
"specialized/fault-tolerance/automatic-recovery.md",
"Automated workflow recovery (OOM, timeout)",
),
(
"ai-recovery",
"specialized/fault-tolerance/ai-assisted-recovery.md",
"AI-assisted failure classification",
),
(
"resource-monitoring",
"core/reference/resource-monitoring.md",
"CPU/memory monitoring configuration",
),
("cli", "core/reference/cli.md", "CLI command reference"),
(
"cli-cheatsheet",
"core/reference/cli-cheatsheet.md",
"CLI quick reference cheatsheet",
),
(
"quick-start",
"getting-started/quick-start.md",
"Getting started guide",
),
(
"quick-start-local",
"getting-started/quick-start-local.md",
"Quick start for local execution",
),
(
"quick-start-hpc",
"getting-started/quick-start-hpc.md",
"Quick start for HPC/Slurm",
),
(
"architecture",
"core/concepts/architecture.md",
"System architecture overview",
),
(
"checkpointing",
"specialized/fault-tolerance/checkpointing.md",
"Job checkpointing support",
),
(
"hpc-profiles",
"specialized/hpc/hpc-profiles.md",
"HPC profile configuration",
),
(
"hpc-profiles-reference",
"specialized/hpc/hpc-profiles-reference.md",
"HPC profiles reference",
),
(
"workflow-formats",
"core/workflows/workflow-formats.md",
"YAML, JSON, JSON5, KDL format comparison",
),
(
"workflow-definition",
"core/concepts/workflow-definition.md",
"Workflow definition concepts",
),
(
"parallelization",
"core/concepts/parallelization.md",
"Job parallelization strategies",
),
(
"job-runners",
"core/concepts/job-runners.md",
"Job runner types and configuration",
),
(
"reinitialization",
"core/concepts/reinitialization.md",
"Workflow reinitialization",
),
(
"resources",
"core/reference/resources.md",
"Resource requirements reference",
),
(
"configuration",
"core/reference/configuration.md",
"Configuration reference",
),
(
"environment-variables",
"core/reference/environment-variables.md",
"Environment variable reference",
),
(
"debug-failed-job",
"core/how-to/debug-failed-job.md",
"How to debug a failed job",
),
(
"rerun-failed-jobs",
"core/how-to/rerun-failed-jobs.md",
"How to rerun failed jobs",
),
(
"cancel-workflow",
"core/how-to/cancel-workflow.md",
"How to cancel a workflow",
),
(
"slurm-workflows",
"specialized/hpc/slurm-workflows.md",
"Slurm workflow patterns",
),
(
"allocation-strategies",
"specialized/hpc/allocation-strategies.md",
"Slurm allocation strategies: single-large vs many-small tradeoffs, fair-share, sbatch --test-only",
),
(
"tutorials",
"core/tutorials/index.md",
"List of available tutorials",
),
(
"tutorial-diamond",
"core/tutorials/diamond.md",
"Tutorial: diamond workflow pattern",
),
(
"tutorial-simple-params",
"core/tutorials/simple-params.md",
"Tutorial: simple parameterization",
),
(
"tutorial-advanced-params",
"core/tutorials/advanced-params.md",
"Tutorial: advanced parameterization",
),
(
"tutorial-many-jobs",
"core/tutorials/many-jobs.md",
"Tutorial: many jobs workflow",
),
(
"tutorial-user-data",
"core/tutorials/user-data.md",
"Tutorial: user data dependencies",
),
(
"tutorial-multi-stage",
"core/tutorials/multi-stage-barrier.md",
"Tutorial: multi-stage barrier pattern",
),
]
}
fn read_content(local_dir: Option<&Path>, rel_path: &str) -> Result<String, McpError> {
if let Some(dir) = local_dir {
let path = dir.join(rel_path);
if path.exists() {
return fs::read_to_string(&path)
.map_err(|e| internal_error(format!("Failed to read local file: {}", e)));
}
}
fetch_from_github(rel_path)
}
fn github_client() -> Result<reqwest::blocking::Client, McpError> {
reqwest::blocking::Client::builder()
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(10))
.build()
.map_err(|e| internal_error(format!("Failed to build HTTP client: {}", e)))
}
fn fetch_from_github(rel_path: &str) -> Result<String, McpError> {
let url = format!("{}/{}", GITHUB_RAW_BASE, rel_path);
let client = github_client()?;
let response = client
.get(&url)
.send()
.map_err(|e| internal_error(format!("Failed to fetch from GitHub: {}", e)))?;
if !response.status().is_success() {
return Err(internal_error(format!(
"GitHub returned {} for {}",
response.status(),
url
)));
}
response
.text()
.map_err(|e| internal_error(format!("Failed to read response body: {}", e)))
}
fn read_example_content(
local_dir: Option<&Path>,
name: &str,
format: &str,
) -> Result<(String, String), McpError> {
if !name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
{
return Err(invalid_params(
"Example name must contain only alphanumeric characters, underscores, and hyphens.",
));
}
let search_order: Vec<(&str, &[&str])> = match format {
"yaml" | "yml" => vec![
("yaml", &["yaml", "yml"][..]),
("json", &["json5", "json"][..]),
("kdl", &["kdl"][..]),
],
"json" | "json5" => vec![
("json", &["json5", "json"][..]),
("yaml", &["yaml", "yml"][..]),
("kdl", &["kdl"][..]),
],
"kdl" => vec![
("kdl", &["kdl"][..]),
("yaml", &["yaml", "yml"][..]),
("json", &["json5", "json"][..]),
],
_ => vec![
("yaml", &["yaml", "yml"][..]),
("json", &["json5", "json"][..]),
("kdl", &["kdl"][..]),
],
};
let subdirs = ["yaml", "json", "kdl", "subgraphs"];
if let Some(dir) = local_dir {
for (_, exts) in &search_order {
for ext in *exts {
for subdir in &subdirs {
let path = dir.join(subdir).join(format!("{}.{}", name, ext));
if path.exists() {
let content = fs::read_to_string(&path).map_err(|e| {
internal_error(format!("Failed to read example file: {}", e))
})?;
return Ok((content, ext.to_string()));
}
}
}
}
}
for (_, exts) in &search_order {
for ext in *exts {
for subdir in &subdirs {
let rel_path = format!("examples/{}/{}.{}", subdir, name, ext);
let url = format!("{}/{}", GITHUB_RAW_BASE, rel_path);
if let Ok(client) = github_client()
&& let Ok(response) = client.get(&url).send()
&& response.status().is_success()
&& let Ok(content) = response.text()
{
return Ok((content, ext.to_string()));
}
}
}
}
Err(invalid_params(&format!(
"Example '{}' not found locally or on GitHub. Use list_examples to see available examples.",
name
)))
}
pub fn list_examples(examples_dir: Option<&Path>) -> Result<CallToolResult, McpError> {
let descriptions = example_descriptions();
let mut examples = Vec::new();
for (name, description) in &descriptions {
let mut formats = Vec::new();
if let Some(dir) = examples_dir {
for (fmt, subdir, exts) in &[
("yaml", "yaml", &["yaml", "yml"][..]),
("json", "json", &["json5", "json"][..]),
("kdl", "kdl", &["kdl"][..]),
] {
for ext in *exts {
let path = dir.join(subdir).join(format!("{}.{}", name, ext));
if path.exists() {
formats.push(*fmt);
break;
}
}
}
}
examples.push(serde_json::json!({
"name": name,
"description": description,
"formats": formats,
}));
}
let response = serde_json::json!({
"examples": examples,
"total": examples.len(),
"source": if examples_dir.is_some() { "local" } else { "github" },
"hint": "Use get_example with a name to retrieve the full specification",
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
pub fn get_example(
examples_dir: Option<&Path>,
name: &str,
format: &str,
) -> Result<CallToolResult, McpError> {
let (content, ext) = read_example_content(examples_dir, name, format)?;
let description = example_descriptions()
.iter()
.find(|(n, _)| *n == name)
.map(|(_, d)| *d)
.unwrap_or("Example workflow specification");
let response = serde_json::json!({
"name": name,
"description": description,
"format": ext,
"content": content,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
pub fn get_docs(docs_dir: Option<&Path>, topic: &str) -> Result<CallToolResult, McpError> {
let mapping = doc_topic_mapping();
let topic_lower = topic.to_lowercase();
let matched = mapping
.iter()
.find(|(t, _, _)| t.to_lowercase() == topic_lower);
if let Some((topic_name, rel_path, description)) = matched {
let docs_rel_path = format!("docs/src/{}", rel_path);
let content = read_content(docs_dir, &docs_rel_path).or_else(|_| {
if let Some(dir) = docs_dir {
let path = dir.join(rel_path);
if path.exists() {
return fs::read_to_string(&path)
.map_err(|e| internal_error(format!("Failed to read doc: {}", e)));
}
}
fetch_from_github(&docs_rel_path)
})?;
let response = serde_json::json!({
"topic": topic_name,
"description": description,
"content": content,
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]));
}
let partial: Vec<_> = mapping
.iter()
.filter(|(t, _, d)| {
t.to_lowercase().contains(&topic_lower) || d.to_lowercase().contains(&topic_lower)
})
.collect();
if !partial.is_empty() {
let suggestions: Vec<_> = partial
.iter()
.map(|(t, _, d)| serde_json::json!({"topic": t, "description": d}))
.collect();
let response = serde_json::json!({
"error": format!("Topic '{}' not found. Did you mean one of these?", topic),
"suggestions": suggestions,
});
return Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]));
}
let all_topics: Vec<_> = mapping
.iter()
.map(|(t, _, d)| serde_json::json!({"topic": t, "description": d}))
.collect();
let response = serde_json::json!({
"error": format!("Topic '{}' not found.", topic),
"available_topics": all_topics,
});
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
serde_json::to_string_pretty(&response).unwrap_or_default(),
)]))
}
pub fn plan_allocations(
spec_json: &str,
account: &str,
partition: Option<&str>,
hpc_profile: Option<&str>,
skip_test_only: bool,
) -> Result<CallToolResult, McpError> {
use crate::client::commands::hpc::create_registry_with_config_public;
use crate::client::commands::slurm::{
GroupByStrategy, WalltimeStrategy, analyze_plan_allocations,
};
use crate::client::workflow_spec::WorkflowSpec;
use crate::config::TorcConfig;
let mut spec: WorkflowSpec = serde_json::from_str(spec_json)
.map_err(|e| invalid_params(&format!("Failed to parse workflow spec JSON: {}", e)))?;
let torc_config = TorcConfig::load().unwrap_or_default();
let registry = create_registry_with_config_public(&torc_config.client.hpc);
let profile = if let Some(name) = hpc_profile {
registry.get(name).ok_or_else(|| {
invalid_params(&format!(
"Unknown HPC profile: '{}'. Available profiles can be listed with 'torc hpc list'.",
name
))
})?
} else {
registry.detect().ok_or_else(|| {
invalid_params(
"No HPC profile detected. Specify hpc_profile parameter or run on an HPC system.",
)
})?
};
let result = analyze_plan_allocations(
&mut spec,
account,
partition,
&profile,
false, skip_test_only,
GroupByStrategy::ResourceRequirements,
WalltimeStrategy::MaxJobRuntime,
1.5, )
.map_err(|e| internal_error(format!("Analysis failed: {}", e)))?;
let mut response = serde_json::to_value(&result)
.map_err(|e| internal_error(format!("Failed to serialize result: {}", e)))?;
if let Some(obj) = response.as_object_mut() {
obj.insert(
"guidance".to_string(),
serde_json::json!({
"doc_topic": "allocation-strategies",
"key_points": [
"The 'many-small' sbatch estimate is for the FIRST job only — later jobs are delayed by fair-share degradation",
"Slurm's backfill scheduler prioritizes larger allocations, giving them reserved slots in the queue",
"Check max_parallelism vs ideal_nodes — deep DAGs may not benefit from many nodes",
"If dependency_depth > 1, not all jobs can run simultaneously, so fewer nodes may suffice",
"Present both raw estimates AND the recommendation to the user"
]
}),
);
}
let json_output = serde_json::to_string_pretty(&response)
.map_err(|e| internal_error(format!("Failed to serialize response: {}", e)))?;
Ok(CallToolResult::success(vec![rmcp::model::Content::text(
json_output,
)]))
}
pub fn list_mcp_resources(docs_dir: Option<&Path>, examples_dir: Option<&Path>) -> Vec<Resource> {
let mut resources = Vec::new();
for (topic, rel_path, description) in doc_topic_mapping() {
let size = docs_dir
.and_then(|dir| fs::metadata(dir.join(rel_path)).ok())
.map(|m| m.len() as u32);
resources.push(Resource::new(
RawResource {
uri: format!("torc://docs/{}", topic),
name: format!("docs/{}", topic),
description: Some(description.to_string()),
mime_type: Some("text/markdown".to_string()),
size,
title: None,
icons: None,
meta: None,
},
None,
));
}
for (name, description) in example_descriptions() {
let size = examples_dir.and_then(|dir| {
for (subdir, ext) in &[("yaml", "yaml"), ("json", "json5"), ("kdl", "kdl")] {
let path = dir.join(subdir).join(format!("{}.{}", name, ext));
if let Ok(m) = fs::metadata(&path) {
return Some(m.len() as u32);
}
}
None
});
resources.push(Resource::new(
RawResource {
uri: format!("torc://examples/{}", name),
name: format!("examples/{}", name),
description: Some(description.to_string()),
mime_type: Some("text/plain".to_string()),
size,
title: None,
icons: None,
meta: None,
},
None,
));
}
resources
}
pub fn read_mcp_resource(
docs_dir: Option<&Path>,
examples_dir: Option<&Path>,
uri: &str,
) -> Result<ResourceContents, McpError> {
if let Some(topic) = uri.strip_prefix("torc://docs/") {
let mapping = doc_topic_mapping();
let (_, rel_path, _) = mapping
.iter()
.find(|(t, _, _)| *t == topic)
.ok_or_else(|| invalid_params(&format!("Unknown docs topic: {}", topic)))?;
let content = read_content(docs_dir, &format!("docs/src/{}", rel_path)).or_else(|_| {
if let Some(dir) = docs_dir {
let path = dir.join(rel_path);
if path.exists() {
return fs::read_to_string(&path)
.map_err(|e| internal_error(format!("Failed to read doc: {}", e)));
}
}
fetch_from_github(&format!("docs/src/{}", rel_path))
})?;
Ok(ResourceContents::TextResourceContents {
uri: uri.to_string(),
mime_type: Some("text/markdown".to_string()),
text: content,
meta: None,
})
} else if let Some(name) = uri.strip_prefix("torc://examples/") {
let (content, _) = read_example_content(examples_dir, name, "yaml")?;
Ok(ResourceContents::TextResourceContents {
uri: uri.to_string(),
mime_type: Some("text/plain".to_string()),
text: content,
meta: None,
})
} else {
Err(invalid_params(&format!(
"Unknown resource URI scheme: {}",
uri
)))
}
}