use chrono::Utc;
use clap::{Subcommand, ValueEnum};
use log::{debug, error, info, warn};
const SLURM_HELP_TEMPLATE: &str = "\
{before-help}{about-with-newline}
{usage-heading} {usage}
{all-args}
\x1b[1;32mScheduler Configuration:\x1b[0m
\x1b[1;36mcreate\x1b[0m Add a Slurm scheduler to the database
\x1b[1;36mupdate\x1b[0m Modify a Slurm scheduler in the database
\x1b[1;36mlist\x1b[0m List Slurm schedulers for a workflow
\x1b[1;36mget\x1b[0m Get a specific Slurm scheduler by ID
\x1b[1;36mdelete\x1b[0m Delete a Slurm scheduler by ID
\x1b[1;32mScheduler Generation:\x1b[0m
\x1b[1;36mgenerate\x1b[0m Generate Slurm schedulers for a workflow spec
\x1b[1;36mregenerate\x1b[0m Regenerate schedulers for pending jobs (recovery)
\x1b[1;32mExecution:\x1b[0m
\x1b[1;36mschedule-nodes\x1b[0m Submit Slurm allocations for a scheduler
\x1b[1;32mPlanning:\x1b[0m
\x1b[1;36mplan-allocations\x1b[0m Analyze workflow and cluster to recommend allocation strategy
\x1b[1;32mDiagnostics:\x1b[0m
\x1b[1;36mparse-logs\x1b[0m Parse Slurm logs for error messages
\x1b[1;36msacct\x1b[0m Show Slurm accounting info for allocations
\x1b[1;36mstats\x1b[0m Show per-job Slurm accounting stats stored in the database
\x1b[1;36musage\x1b[0m Total compute node and CPU time consumed
{after-help}";
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use super::output::{print_if_json, print_json, print_wrapped_if_json};
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;
use crate::client::apis;
use crate::client::apis::configuration::Configuration;
use crate::client::commands::get_env_user_name;
use crate::client::commands::hpc::create_registry_with_config_public;
use crate::client::commands::pagination::{
ComputeNodeListParams, JobListParams, ResourceRequirementsListParams, ResultListParams,
ScheduledComputeNodeListParams, SlurmSchedulersListParams, paginate_compute_nodes,
paginate_jobs, paginate_resource_requirements, paginate_results,
paginate_scheduled_compute_nodes, paginate_slurm_schedulers,
};
use crate::client::commands::{
print_error, select_workflow_interactively, table_format::display_table_with_count,
};
use crate::client::hpc::HpcProfile;
use crate::client::hpc::hpc_interface::HpcInterface;
use crate::client::utils;
use crate::client::workflow_graph::WorkflowGraph;
use crate::client::workflow_manager::WorkflowManager;
use crate::client::workflow_spec::{
ExecutionConfig, ExecutionMode, ResourceRequirementsSpec, SlurmDefaultsSpec, WorkflowSpec,
};
use crate::config::TorcConfig;
use crate::models;
use tabled::Tabled;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, ValueEnum)]
pub enum GroupByStrategy {
#[default]
#[value(name = "resource-requirements")]
ResourceRequirements,
#[value(name = "partition")]
Partition,
}
impl std::fmt::Display for GroupByStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GroupByStrategy::ResourceRequirements => write!(f, "resource-requirements"),
GroupByStrategy::Partition => write!(f, "partition"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, ValueEnum, Serialize, Deserialize)]
pub enum WalltimeStrategy {
#[default]
#[value(name = "max-job-runtime")]
MaxJobRuntime,
#[value(name = "max-partition-time")]
MaxPartitionTime,
}
impl std::fmt::Display for WalltimeStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WalltimeStrategy::MaxJobRuntime => write!(f, "max-job-runtime"),
WalltimeStrategy::MaxPartitionTime => write!(f, "max-partition-time"),
}
}
}
#[derive(Tabled)]
struct SlurmStatsTableRow {
#[tabled(rename = "Job ID")]
job_id: i64,
#[tabled(rename = "Run")]
run_id: i64,
#[tabled(rename = "Attempt")]
attempt_id: i64,
#[tabled(rename = "Slurm Job")]
slurm_job_id: String,
#[tabled(rename = "Max RSS")]
max_rss: String,
#[tabled(rename = "Max VM")]
max_vm: String,
#[tabled(rename = "Ave CPU (s)")]
ave_cpu_seconds: String,
#[tabled(rename = "CPU %")]
cpu_percent: String,
#[tabled(rename = "Nodes")]
node_list: String,
}
#[derive(Tabled)]
struct SlurmSchedulerTableRow {
#[tabled(rename = "ID")]
id: i64,
#[tabled(rename = "Name")]
name: String,
#[tabled(rename = "Account")]
account: String,
#[tabled(rename = "Nodes")]
nodes: i64,
#[tabled(rename = "Walltime")]
walltime: String,
#[tabled(rename = "Partition")]
partition: String,
#[tabled(rename = "QOS")]
qos: String,
}
fn select_slurm_scheduler_interactively(
config: &Configuration,
workflow_id: i64,
) -> Result<i64, Box<dyn std::error::Error>> {
match paginate_slurm_schedulers(
config,
workflow_id,
SlurmSchedulersListParams::new().with_limit(50),
) {
Ok(schedulers) => {
if schedulers.is_empty() {
eprintln!("No Slurm schedulers found for workflow: {}", workflow_id);
std::process::exit(1);
}
if schedulers.len() == 1 {
let scheduler_id = schedulers[0].id.unwrap_or(-1);
return Ok(scheduler_id);
}
eprintln!("Available Slurm schedulers:");
eprintln!(
"{:<5} {:<20} {:<15} {:<8} {:<12}",
"ID", "Name", "Account", "Nodes", "Walltime"
);
eprintln!("{}", "-".repeat(70));
for scheduler in schedulers.iter() {
eprintln!(
"{:<5} {:<20} {:<15} {:<8} {:<12}",
scheduler.id.unwrap_or(-1),
scheduler.name.as_deref().unwrap_or(""),
&scheduler.account,
scheduler.nodes,
&scheduler.walltime
);
}
eprintln!("\nEnter scheduler ID: ");
use std::io::{self, Write};
io::stdout().flush().unwrap();
let mut input = String::new();
match io::stdin().read_line(&mut input) {
Ok(_) => match input.trim().parse::<i64>() {
Ok(id) => {
if schedulers.iter().any(|s| s.id == Some(id)) {
Ok(id)
} else {
eprintln!("Invalid scheduler ID: {}", id);
std::process::exit(1);
}
}
Err(_) => {
eprintln!("Invalid input. Please enter a numeric scheduler ID.");
std::process::exit(1);
}
},
Err(e) => {
eprintln!("Failed to read input: {}", e);
std::process::exit(1);
}
}
}
Err(e) => {
print_error("listing Slurm schedulers", &e);
std::process::exit(1);
}
}
}
#[derive(Subcommand)]
#[command(
help_template = SLURM_HELP_TEMPLATE,
subcommand_help_heading = None,
after_long_help = "\
EXAMPLES:
# List Slurm schedulers for a workflow
torc slurm list 123
# Generate schedulers for a workflow spec
torc slurm generate --account myproject workflow.yaml
# Schedule compute nodes
torc slurm schedule-nodes 123 --scheduler-name gpu --num-nodes 4
# Get Slurm accounting info
torc slurm sacct 123
")]
pub enum SlurmCommands {
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
# Create a basic Slurm scheduler
torc slurm create 123 --name cpu_jobs --account myproject --walltime 04:00:00
# Create with GPU requirements
torc slurm create 123 --name gpu_jobs --account myproject \\
--partition gpu --gres gpu:1 --mem 32G
# Create with specific partition and QOS
torc slurm create 123 --name large_jobs --account myproject \\
--partition bigmem --qos high --nodes 2
"
)]
Create {
#[arg()]
workflow_id: Option<i64>,
#[arg(short, long, required = true)]
name: String,
#[arg(short, long, required = true)]
account: String,
#[arg(short, long)]
gres: Option<String>,
#[arg(short, long)]
mem: Option<String>,
#[arg(short = 'N', long, default_value = "1")]
nodes: i64,
#[arg(short, long)]
partition: Option<String>,
#[arg(short, long, default_value = "normal")]
qos: String,
#[arg(short, long)]
tmp: Option<String>,
#[arg(short = 'W', long, default_value = "04:00:00")]
walltime: String,
#[arg(short, long)]
extra: Option<String>,
},
#[command(hide = true)]
Update {
#[arg()]
scheduler_id: i64,
#[arg(short = 'N', long)]
name: Option<String>,
#[arg(short, long)]
account: Option<String>,
#[arg(short, long)]
gres: Option<String>,
#[arg(short, long)]
mem: Option<String>,
#[arg(short, long)]
nodes: Option<i64>,
#[arg(short, long)]
partition: Option<String>,
#[arg(short, long)]
qos: Option<String>,
#[arg(short, long)]
tmp: Option<String>,
#[arg(long)]
walltime: Option<String>,
#[arg(short, long)]
extra: Option<String>,
},
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
torc slurm list 123
torc -f json slurm list 123
"
)]
List {
#[arg()]
workflow_id: Option<i64>,
#[arg(short, long)]
limit: Option<i64>,
#[arg(long, default_value = "0")]
offset: i64,
},
#[command(hide = true)]
Get {
#[arg()]
id: i64,
},
#[command(hide = true)]
Delete {
#[arg()]
id: i64,
},
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
# Auto-schedule: match ready jobs to existing configs or regenerate
torc slurm schedule-nodes 123 --auto
# Schedule 4 compute nodes with specific config
torc slurm schedule-nodes 123 --scheduler-config-id 456 --num-hpc-jobs 4
# Keep submission scripts for debugging
torc slurm schedule-nodes 123 --keep-submission-scripts --num-hpc-jobs 4
"
)]
ScheduleNodes {
#[arg()]
workflow_id: Option<i64>,
#[arg(long, hide = true)]
auto: bool,
#[arg(long, default_value = "false")]
start_one_worker_per_node: bool,
#[arg(short, long, default_value = "")]
job_prefix: String,
#[arg(long, default_value = "false")]
keep_submission_scripts: bool,
#[arg(short, long)]
max_parallel_jobs: Option<i32>,
#[arg(short, long, default_value = "1")]
num_hpc_jobs: i32,
#[arg(short, long, default_value = "torc_output")]
output: String,
#[arg(short, long)]
poll_interval: Option<i32>,
#[arg(long)]
scheduler_config_id: Option<i64>,
},
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
torc slurm parse-logs ./torc_output --workflow-id 123
torc slurm parse-logs ./torc_output --workflow-id 123 --errors-only
"
)]
ParseLogs {
#[arg()]
path: PathBuf,
#[arg(short, long)]
workflow_id: Option<i64>,
#[arg(long, default_value = "false")]
errors_only: bool,
},
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
torc slurm sacct 123
torc slurm sacct 123 --save-json --output-dir ./reports
"
)]
Sacct {
#[arg()]
workflow_id: Option<i64>,
#[arg(short, long, default_value = "torc_output")]
output_dir: PathBuf,
#[arg(long, default_value = "false")]
save_json: bool,
},
#[command(after_long_help = "\
EXAMPLES:
torc slurm stats 123
torc slurm stats 123 --job-id 456
torc slurm stats 123 --run-id 2
torc slurm stats 123 --run-id 1 --attempt-id 1
torc -f json slurm stats 123
")]
Stats {
#[arg()]
workflow_id: i64,
#[arg(long)]
job_id: Option<i64>,
#[arg(long)]
run_id: Option<i64>,
#[arg(long)]
attempt_id: Option<i64>,
},
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
torc slurm usage 123
torc -f json slurm usage 123
"
)]
Usage {
#[arg()]
workflow_id: Option<i64>,
},
#[command(
hide = true,
after_long_help = "\
EXAMPLES:
# Preview generated schedulers
torc slurm generate --account myproject workflow.yaml
# Save to new file
torc slurm generate --account myproject -o workflow_with_slurm.yaml workflow.yaml
# Use specific HPC profile
torc slurm generate --account myproject --profile kestrel workflow.yaml
# Group by partition instead of resource requirements
torc slurm generate --account myproject --group-by partition workflow.yaml
"
)]
Generate {
#[arg()]
workflow_file: PathBuf,
#[arg(short, long)]
account: Option<String>,
#[arg(long)]
profile: Option<String>,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(long)]
single_allocation: bool,
#[arg(long, value_enum, default_value_t = GroupByStrategy::ResourceRequirements)]
group_by: GroupByStrategy,
#[arg(long, value_enum, default_value_t = WalltimeStrategy::MaxJobRuntime)]
walltime_strategy: WalltimeStrategy,
#[arg(long, default_value = "1.5")]
walltime_multiplier: f64,
#[arg(long)]
no_actions: bool,
#[arg(long)]
overwrite: bool,
#[arg(long)]
dry_run: bool,
},
#[command(hide = true)]
Regenerate {
#[arg()]
workflow_id: i64,
#[arg(short, long)]
account: Option<String>,
#[arg(long)]
profile: Option<String>,
#[arg(long)]
partition: Option<String>,
#[arg(long)]
walltime: Option<String>,
#[arg(long)]
single_allocation: bool,
#[arg(long, value_enum, default_value_t = GroupByStrategy::ResourceRequirements)]
group_by: GroupByStrategy,
#[arg(long, value_enum, default_value_t = WalltimeStrategy::MaxJobRuntime)]
walltime_strategy: WalltimeStrategy,
#[arg(long, default_value = "1.5")]
walltime_multiplier: f64,
#[arg(long)]
submit: bool,
#[arg(short, long, default_value = "torc_output")]
output_dir: PathBuf,
#[arg(short, long)]
poll_interval: Option<i32>,
#[arg(long)]
dry_run: bool,
#[arg(long, value_delimiter = ',')]
include_job_ids: Option<Vec<i64>>,
},
#[command(
name = "plan-allocations",
after_long_help = "\
EXAMPLES:
# Basic usage
torc slurm plan-allocations --account myproject workflow.yaml
# Specify partition explicitly
torc slurm plan-allocations --account myproject --partition standard workflow.yaml
# Offline mode (skip sinfo/squeue, only analyze workflow)
torc slurm plan-allocations --account myproject --offline workflow.yaml
"
)]
PlanAllocations {
#[arg()]
workflow_file: PathBuf,
#[arg(short, long)]
account: Option<String>,
#[arg(short, long)]
partition: Option<String>,
#[arg(long)]
profile: Option<String>,
#[arg(long)]
offline: bool,
#[arg(long)]
skip_test_only: bool,
#[arg(long, value_enum, default_value_t = GroupByStrategy::ResourceRequirements)]
group_by: GroupByStrategy,
#[arg(long, value_enum, default_value_t = WalltimeStrategy::MaxJobRuntime)]
walltime_strategy: WalltimeStrategy,
#[arg(long, default_value = "1.5")]
walltime_multiplier: f64,
},
}
pub fn secs_to_walltime(secs: u64) -> String {
let hours = secs / 3600;
let mins = (secs % 3600) / 60;
let s = secs % 60;
if hours >= 24 {
let days = hours / 24;
let h = hours % 24;
format!("{}-{:02}:{:02}:{:02}", days, h, mins, s)
} else {
format!("{:02}:{:02}:{:02}", hours, mins, s)
}
}
#[allow(clippy::too_many_arguments)]
pub fn generate_schedulers_for_workflow(
spec: &mut WorkflowSpec,
profile: &HpcProfile,
account: &str,
single_allocation: bool,
group_by: GroupByStrategy,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
add_actions: bool,
overwrite: bool,
) -> Result<GenerateResult, String> {
let has_schedulers =
spec.slurm_schedulers.is_some() && !spec.slurm_schedulers.as_ref().unwrap().is_empty();
let has_actions = spec.actions.is_some() && !spec.actions.as_ref().unwrap().is_empty();
if has_schedulers || has_actions {
if !overwrite {
let mut msg = String::from("Workflow spec already has ");
if has_schedulers && has_actions {
msg.push_str("slurm_schedulers and actions");
} else if has_schedulers {
msg.push_str("slurm_schedulers");
} else {
msg.push_str("actions");
}
msg.push_str(" defined.\n\nOptions:\n");
msg.push_str(" 1. Use --overwrite to generate new schedulers (replaces existing)\n");
msg.push_str(" 2. Use 'torc submit' to use the existing schedulers as-is\n");
msg.push_str(
" 3. Remove schedulers/actions from the spec and run 'torc slurm generate' again",
);
return Err(msg);
}
spec.slurm_schedulers = None;
spec.actions = None;
}
use crate::client::scheduler_plan::{
SchedulerOverrides, apply_plan_to_spec, generate_scheduler_plan,
};
let original_jobs = spec.jobs.clone();
let original_files = spec.files.clone();
spec.expand_parameters()
.map_err(|e| format!("Failed to expand parameters: {}", e))?;
let rr_vec = spec.resource_requirements.as_deref().unwrap_or(&[]);
let rr_map: HashMap<&str, &ResourceRequirementsSpec> =
rr_vec.iter().map(|rr| (rr.name.as_str(), rr)).collect();
if rr_map.is_empty() {
return Err(
"Workflow has no resource_requirements defined. Cannot generate schedulers."
.to_string(),
);
}
let graph = WorkflowGraph::from_spec(spec)
.map_err(|e| format!("Failed to build workflow graph: {}", e))?;
let mut warnings: Vec<String> = Vec::new();
for job in &spec.jobs {
if job.resource_requirements.is_none() {
warnings.push(format!(
"Job '{}' has no resource_requirements, skipping scheduler generation",
job.name
));
}
}
let plan = generate_scheduler_plan(
&graph,
&rr_map,
profile,
account,
single_allocation,
group_by,
walltime_strategy,
walltime_multiplier,
add_actions,
None, false, &SchedulerOverrides::default(),
);
warnings.extend(plan.warnings.clone());
if plan.schedulers.is_empty() {
let mut msg = String::from("No schedulers could be generated.\n");
if warnings.is_empty() {
msg.push_str("No jobs with resource_requirements found.");
} else {
msg.push_str("\nReasons:\n");
let max_warnings = 5;
for warning in warnings.iter().take(max_warnings) {
msg.push_str(&format!(" - {}\n", warning));
}
if warnings.len() > max_warnings {
msg.push_str(&format!(
" ... and {} more issues\n",
warnings.len() - max_warnings
));
}
}
return Err(msg);
}
apply_plan_to_spec(&plan, spec);
let mut original_jobs = original_jobs;
for orig_job in &mut original_jobs {
if let Some(scheduler) = plan.job_assignments.get(&orig_job.name) {
orig_job.scheduler = Some(scheduler.clone());
} else if orig_job.use_parameters.is_some() || orig_job.parameters.is_some() {
let pattern_prefix = orig_job.name.split('{').next().unwrap_or(&orig_job.name);
for (expanded_name, scheduler) in &plan.job_assignments {
if expanded_name.starts_with(pattern_prefix) {
orig_job.scheduler = Some(scheduler.clone());
break;
}
}
}
}
spec.jobs = original_jobs;
spec.files = original_files;
Ok(GenerateResult {
scheduler_count: plan.schedulers.len(),
action_count: plan.actions.len(),
warnings,
})
}
pub struct GenerateResult {
pub scheduler_count: usize,
pub action_count: usize,
pub warnings: Vec<String>,
}
pub fn parse_memory_mb(s: &str) -> Result<u64, String> {
let s = s.trim().to_lowercase();
if s.is_empty() {
return Err("Empty memory string".to_string());
}
if let Some(num_str) = s.strip_suffix('g') {
let num: f64 = num_str
.parse()
.map_err(|_| format!("Invalid number: {}", num_str))?;
Ok((num * 1024.0) as u64)
} else if let Some(num_str) = s.strip_suffix('m') {
let num: u64 = num_str
.parse()
.map_err(|_| format!("Invalid number: {}", num_str))?;
Ok(num)
} else if let Some(num_str) = s.strip_suffix('k') {
let num: f64 = num_str
.parse()
.map_err(|_| format!("Invalid number: {}", num_str))?;
Ok((num / 1024.0) as u64)
} else {
s.parse()
.map_err(|_| format!("Invalid memory value: {}", s))
}
}
pub fn parse_walltime_secs(s: &str) -> Result<u64, String> {
let s = s.trim();
if let Some((days_str, rest)) = s.split_once('-') {
let days: u64 = days_str
.parse()
.map_err(|_| format!("Invalid days: {}", days_str))?;
let hms_secs = parse_hms(rest)?;
return Ok(days * 24 * 3600 + hms_secs);
}
parse_hms(s)
}
fn parse_hms(s: &str) -> Result<u64, String> {
let parts: Vec<&str> = s.split(':').collect();
match parts.len() {
1 => {
let mins: u64 = parts[0]
.parse()
.map_err(|_| format!("Invalid minutes: {}", parts[0]))?;
Ok(mins * 60)
}
2 => {
let mins: u64 = parts[0]
.parse()
.map_err(|_| format!("Invalid minutes: {}", parts[0]))?;
let secs: u64 = parts[1]
.parse()
.map_err(|_| format!("Invalid seconds: {}", parts[1]))?;
Ok(mins * 60 + secs)
}
3 => {
let hours: u64 = parts[0]
.parse()
.map_err(|_| format!("Invalid hours: {}", parts[0]))?;
let mins: u64 = parts[1]
.parse()
.map_err(|_| format!("Invalid minutes: {}", parts[1]))?;
let secs: u64 = parts[2]
.parse()
.map_err(|_| format!("Invalid seconds: {}", parts[2]))?;
Ok(hours * 3600 + mins * 60 + secs)
}
_ => Err(format!("Invalid time format: {}", s)),
}
}
pub fn handle_slurm_commands(config: &Configuration, command: &SlurmCommands, format: &str) {
match command {
SlurmCommands::Create {
workflow_id,
name,
account,
gres,
mem,
nodes,
partition,
qos,
tmp,
walltime,
extra,
} => {
let user_name = get_env_user_name();
let wf_id = workflow_id.unwrap_or_else(|| {
select_workflow_interactively(config, &user_name).unwrap_or_else(|e| {
eprintln!("Error selecting workflow: {}", e);
std::process::exit(1);
})
});
let scheduler = models::SlurmSchedulerModel {
id: None,
workflow_id: wf_id,
name: Some(name.clone()),
account: account.clone(),
gres: gres.clone(),
mem: mem.clone(),
nodes: *nodes,
ntasks_per_node: None,
partition: partition.clone(),
qos: Some(qos.clone()),
tmp: tmp.clone(),
walltime: walltime.clone(),
extra: extra.clone(),
};
match apis::slurm_schedulers_api::create_slurm_scheduler(config, scheduler) {
Ok(created) => {
if print_if_json(format, &created, "Slurm scheduler") {
} else {
eprintln!(
"Added Slurm configuration '{}' (ID: {}) to workflow {}",
name,
created.id.unwrap_or(-1),
wf_id
);
}
}
Err(e) => {
print_error("creating Slurm scheduler", &e);
std::process::exit(1);
}
}
}
SlurmCommands::Update {
scheduler_id,
name,
account,
gres,
mem,
nodes,
partition,
qos,
tmp,
walltime,
extra,
} => {
let mut scheduler =
match apis::slurm_schedulers_api::get_slurm_scheduler(config, *scheduler_id) {
Ok(s) => s,
Err(e) => {
print_error("getting Slurm scheduler", &e);
std::process::exit(1);
}
};
let mut changed = false;
if let Some(n) = name {
scheduler.name = Some(n.clone());
changed = true;
}
if let Some(a) = account {
scheduler.account = a.clone();
changed = true;
}
if let Some(g) = gres {
scheduler.gres = Some(g.clone());
changed = true;
}
if let Some(m) = mem {
scheduler.mem = Some(m.clone());
changed = true;
}
if let Some(n) = nodes {
scheduler.nodes = *n;
changed = true;
}
if let Some(p) = partition {
scheduler.partition = Some(p.clone());
changed = true;
}
if let Some(q) = qos {
scheduler.qos = Some(q.clone());
changed = true;
}
if let Some(t) = tmp {
scheduler.tmp = Some(t.clone());
changed = true;
}
if let Some(w) = walltime {
scheduler.walltime = w.clone();
changed = true;
}
if let Some(e) = extra {
scheduler.extra = Some(e.clone());
changed = true;
}
if !changed {
warn!("No changes requested");
return;
}
match apis::slurm_schedulers_api::update_slurm_scheduler(
config,
*scheduler_id,
scheduler,
) {
Ok(updated) => {
if print_if_json(format, &updated, "Slurm scheduler") {
} else {
eprintln!("Updated Slurm configuration {}", scheduler_id);
}
}
Err(e) => {
print_error("updating Slurm scheduler", &e);
std::process::exit(1);
}
}
}
SlurmCommands::List {
workflow_id,
limit,
offset,
} => {
let user_name = get_env_user_name();
let wf_id = workflow_id.unwrap_or_else(|| {
select_workflow_interactively(config, &user_name).unwrap_or_else(|e| {
eprintln!("Error selecting workflow: {}", e);
std::process::exit(1);
})
});
let mut params = SlurmSchedulersListParams::new().with_offset(*offset);
if let Some(limit_val) = limit {
params = params.with_limit(*limit_val);
}
match paginate_slurm_schedulers(config, wf_id, params) {
Ok(schedulers) => {
if print_wrapped_if_json(
format,
"slurm_schedulers",
&schedulers,
"Slurm schedulers",
) {
} else {
let rows: Vec<SlurmSchedulerTableRow> = schedulers
.iter()
.map(|s| SlurmSchedulerTableRow {
id: s.id.unwrap_or(-1),
name: s.name.clone().unwrap_or_default(),
account: s.account.clone(),
nodes: s.nodes,
walltime: s.walltime.clone(),
partition: s.partition.clone().unwrap_or_default(),
qos: s.qos.clone().unwrap_or_default(),
})
.collect();
println!("Slurm configurations for workflow {}", wf_id);
display_table_with_count(&rows, "configs");
}
}
Err(e) => {
print_error("listing Slurm schedulers", &e);
std::process::exit(1);
}
}
}
SlurmCommands::Get { id } => {
match apis::slurm_schedulers_api::get_slurm_scheduler(config, *id) {
Ok(scheduler) => {
if print_if_json(format, &scheduler, "Slurm scheduler") {
} else {
eprintln!("Slurm Config ID {}:", id);
eprintln!(" Name: {}", scheduler.name.unwrap_or_default());
eprintln!(" Workflow ID: {}", scheduler.workflow_id);
eprintln!(" Account: {}", scheduler.account);
eprintln!(" Nodes: {}", scheduler.nodes);
eprintln!(" Walltime: {}", scheduler.walltime);
eprintln!(" Partition: {}", scheduler.partition.unwrap_or_default());
eprintln!(" QOS: {}", scheduler.qos.unwrap_or_default());
eprintln!(
" GRES: {}",
scheduler.gres.unwrap_or_else(|| "None".to_string())
);
eprintln!(
" Memory: {}",
scheduler.mem.unwrap_or_else(|| "None".to_string())
);
eprintln!(
" Tmp: {}",
scheduler.tmp.unwrap_or_else(|| "None".to_string())
);
eprintln!(
" Extra: {}",
scheduler.extra.unwrap_or_else(|| "None".to_string())
);
}
}
Err(e) => {
print_error("getting Slurm scheduler", &e);
std::process::exit(1);
}
}
}
SlurmCommands::Delete { id } => {
match apis::slurm_schedulers_api::delete_slurm_scheduler(config, *id) {
Ok(deleted_scheduler) => {
if print_if_json(format, &deleted_scheduler, "Slurm scheduler") {
} else {
eprintln!("Successfully deleted Slurm config ID {}", id);
eprintln!(" Name: {}", deleted_scheduler.name.unwrap_or_default());
eprintln!(" Workflow ID: {}", deleted_scheduler.workflow_id);
}
}
Err(e) => {
print_error("deleting Slurm scheduler", &e);
std::process::exit(1);
}
}
}
SlurmCommands::ScheduleNodes {
workflow_id,
auto,
start_one_worker_per_node,
job_prefix,
keep_submission_scripts,
max_parallel_jobs,
num_hpc_jobs,
output,
poll_interval,
scheduler_config_id,
} => {
let user_name = get_env_user_name();
let wf_id = workflow_id.unwrap_or_else(|| {
select_workflow_interactively(config, &user_name).unwrap_or_else(|e| {
eprintln!("Error selecting workflow: {}", e);
std::process::exit(1);
})
});
let workflow = match apis::workflows_api::get_workflow(config, wf_id) {
Ok(w) => w,
Err(e) => {
print_error("getting workflow", &e);
std::process::exit(1);
}
};
match apis::workflows_api::is_workflow_uninitialized(config, wf_id) {
Ok(response) => {
if let Some(is_uninitialized) =
response.get("is_uninitialized").and_then(|v| v.as_bool())
{
if is_uninitialized {
info!(
"Workflow {} has all jobs uninitialized. Initializing workflow...",
wf_id
);
let torc_config = TorcConfig::load().unwrap_or_default();
let workflow_manager =
WorkflowManager::new(config.clone(), torc_config, workflow.clone());
match workflow_manager.initialize(false) {
Ok(()) => {
info!("Successfully initialized workflow {}", wf_id);
}
Err(e) => {
error!("Error initializing workflow: {}", e);
eprintln!("Error initializing workflow: {}", e);
std::process::exit(1);
}
}
} else {
info!("Workflow {} already has initialized jobs", wf_id);
}
}
}
Err(e) => {
error!("Error checking if workflow is uninitialized: {}", e);
eprintln!("Error checking if workflow is uninitialized: {}", e);
std::process::exit(1);
}
}
if *auto {
eprintln!(
"Auto-scheduling is not yet implemented. \
For now, use 'torc slurm regenerate' to create/update schedulers, \
then 'torc slurm schedule-nodes' with --scheduler-config-id."
);
std::process::exit(1);
}
let sched_config_id = scheduler_config_id.unwrap_or_else(|| {
select_slurm_scheduler_interactively(config, wf_id).unwrap_or_else(|e| {
eprintln!("Error selecting scheduler: {}", e);
std::process::exit(1);
})
});
let torc_config = TorcConfig::load().unwrap_or_default();
let effective_poll_interval =
poll_interval.unwrap_or(torc_config.client.slurm.poll_interval);
match schedule_slurm_nodes(
config,
wf_id,
sched_config_id,
*num_hpc_jobs,
*start_one_worker_per_node,
job_prefix,
output,
effective_poll_interval,
*max_parallel_jobs,
*keep_submission_scripts,
) {
Ok(()) => {
eprintln!("Successfully running {} Slurm job(s)", num_hpc_jobs);
}
Err(e) => {
eprintln!("Error scheduling Slurm nodes: {}", e);
std::process::exit(1);
}
}
}
SlurmCommands::ParseLogs {
path,
workflow_id,
errors_only,
} => {
if !path.exists() {
eprintln!("Error: Path not found: {}", path.display());
std::process::exit(1);
}
if !path.is_dir() {
eprintln!("Error: Path is not a directory: {}", path.display());
std::process::exit(1);
}
let wf_id = match workflow_id {
Some(id) => *id,
None => {
let detected = super::logs::detect_workflow_ids(path);
if detected.is_empty() {
eprintln!(
"No workflow log files found in directory: {}",
path.display()
);
std::process::exit(1);
}
if detected.len() > 1 {
eprintln!("Multiple workflows detected in directory: {:?}", detected);
eprintln!("Please specify a workflow ID with --workflow-id");
std::process::exit(1);
}
detected[0]
}
};
parse_slurm_logs(config, wf_id, path, *errors_only, format);
}
SlurmCommands::Sacct {
workflow_id,
output_dir,
save_json,
} => {
let user_name = get_env_user_name();
let wf_id = workflow_id.unwrap_or_else(|| {
select_workflow_interactively(config, &user_name).unwrap_or_else(|e| {
eprintln!("Error selecting workflow: {}", e);
std::process::exit(1);
})
});
run_sacct_for_workflow(config, wf_id, output_dir, *save_json, format);
}
SlurmCommands::Stats {
workflow_id,
job_id,
run_id,
attempt_id,
} => {
handle_slurm_stats(config, *workflow_id, *job_id, *run_id, *attempt_id, format);
}
SlurmCommands::Usage { workflow_id } => {
let user_name = get_env_user_name();
let wf_id = workflow_id.unwrap_or_else(|| {
select_workflow_interactively(config, &user_name).unwrap_or_else(|e| {
eprintln!("Error selecting workflow: {}", e);
std::process::exit(1);
})
});
run_usage_for_workflow(config, wf_id, format);
}
SlurmCommands::Generate {
workflow_file,
account,
profile: profile_name,
output,
single_allocation,
group_by,
walltime_strategy,
walltime_multiplier,
no_actions,
overwrite,
dry_run,
} => {
if *walltime_multiplier <= 0.0 {
eprintln!("Error: --walltime-multiplier must be greater than 0");
std::process::exit(1);
}
handle_generate(
workflow_file,
account.as_deref(),
profile_name.as_deref(),
output.as_ref(),
*single_allocation,
*group_by,
*walltime_strategy,
*walltime_multiplier,
*no_actions,
*overwrite,
*dry_run,
format,
);
}
SlurmCommands::Regenerate {
workflow_id,
account,
profile: profile_name,
partition,
walltime,
single_allocation,
group_by,
walltime_strategy,
walltime_multiplier,
submit,
output_dir,
poll_interval,
dry_run,
include_job_ids,
} => {
if *walltime_multiplier <= 0.0 {
eprintln!("Error: --walltime-multiplier must be greater than 0");
std::process::exit(1);
}
handle_regenerate(
config,
*workflow_id,
account.as_deref(),
profile_name.as_deref(),
partition.as_deref(),
walltime.as_deref(),
*single_allocation,
*group_by,
*walltime_strategy,
*walltime_multiplier,
*submit,
output_dir,
*poll_interval,
*dry_run,
include_job_ids.as_deref(),
format,
);
}
SlurmCommands::PlanAllocations {
workflow_file,
account,
partition,
profile: profile_name,
offline,
skip_test_only,
group_by,
walltime_strategy,
walltime_multiplier,
} => {
if *walltime_multiplier <= 0.0 {
eprintln!("Error: --walltime-multiplier must be greater than 0");
std::process::exit(1);
}
handle_plan_allocations(
workflow_file,
account.as_deref(),
partition.as_deref(),
profile_name.as_deref(),
*offline,
*skip_test_only,
*group_by,
*walltime_strategy,
*walltime_multiplier,
format,
);
}
}
}
const WAIT_FOR_HEALTHY_DATABASE_MINUTES: u64 = 20;
#[derive(Debug)]
struct RetryApiError(String);
impl std::fmt::Display for RetryApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl std::error::Error for RetryApiError {}
fn box_retry_error<T, E>(result: Result<T, E>) -> Result<T, Box<dyn std::error::Error>>
where
E: std::fmt::Display,
{
result.map_err(|err| Box::new(RetryApiError(err.to_string())) as Box<dyn std::error::Error>)
}
#[allow(clippy::too_many_arguments)]
pub fn schedule_slurm_nodes(
config: &Configuration,
workflow_id: i64,
scheduler_config_id: i64,
num_hpc_jobs: i32,
start_one_worker_per_node: bool,
job_prefix: &str,
output: &str,
poll_interval: i32,
max_parallel_jobs: Option<i32>,
keep_submission_scripts: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let scheduler = match utils::send_with_retries(
config,
|| {
box_retry_error(apis::slurm_schedulers_api::get_slurm_scheduler(
config,
scheduler_config_id,
))
},
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(s) => s,
Err(e) => {
return Err(format!("Failed to get Slurm scheduler: {}", e).into());
}
};
let workflow = match utils::send_with_retries(
config,
|| apis::workflows_api::get_workflow(config, workflow_id),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(w) => w,
Err(e) => {
return Err(format!("Failed to get workflow: {}", e).into());
}
};
let execution_config = ExecutionConfig::from_workflow_model(&workflow);
if start_one_worker_per_node && execution_config.mode != ExecutionMode::Direct {
return Err(
"start_one_worker_per_node requires execution_config.mode to be 'direct'".into(),
);
}
let slurm_interface = match crate::client::hpc::slurm_interface::SlurmInterface::new() {
Ok(interface) => interface,
Err(e) => {
return Err(format!("Failed to create Slurm interface: {}", e).into());
}
};
let mut config_map = HashMap::new();
if let Some(ref defaults_json) = workflow.slurm_defaults {
match serde_json::from_str::<SlurmDefaultsSpec>(defaults_json) {
Ok(defaults) => {
if let Err(e) = defaults.validate() {
return Err(e.into());
}
debug!("Applying slurm_defaults from workflow");
for (key, value) in defaults.to_string_map() {
config_map.insert(key, value);
}
}
Err(e) => {
warn!("Failed to parse slurm_defaults: {}", e);
}
}
}
config_map.insert("account".to_string(), scheduler.account.clone());
config_map.insert("walltime".to_string(), scheduler.walltime.clone());
config_map.insert("nodes".to_string(), scheduler.nodes.to_string());
if let Some(partition) = &scheduler.partition {
config_map.insert("partition".to_string(), partition.clone());
}
if let Some(qos) = &scheduler.qos {
config_map.insert("qos".to_string(), qos.clone());
}
if let Some(gres) = &scheduler.gres {
config_map.insert("gres".to_string(), gres.clone());
}
if let Some(mem) = &scheduler.mem {
config_map.insert("mem".to_string(), mem.clone());
}
if let Some(tmp) = &scheduler.tmp {
config_map.insert("tmp".to_string(), tmp.clone());
}
if let Some(extra) = &scheduler.extra {
config_map.insert("extra".to_string(), extra.clone());
}
std::fs::create_dir_all(output)?;
let startup_delay_seconds = if execution_config.staggered_start() {
let nodes_per_alloc: i32 = config_map
.get("nodes")
.and_then(|v| v.parse().ok())
.unwrap_or(1);
let total_runners = if start_one_worker_per_node {
num_hpc_jobs * nodes_per_alloc
} else {
num_hpc_jobs
};
let delay = compute_startup_delay(total_runners.max(0) as u32);
if delay > 0 {
info!(
"Startup jitter: {} runners, delay window {} seconds",
total_runners, delay
);
}
delay
} else {
0
};
for job_num in 1..num_hpc_jobs + 1 {
let job_name = format!(
"{}wf{}_{}_{}",
job_prefix,
workflow_id,
std::process::id(),
job_num
);
let script_path = format!("{}/{}.sh", output, job_name);
let tls_ca_cert = config.tls.ca_cert_path.as_ref().and_then(|p| p.to_str());
let tls_insecure = config.tls.insecure;
if let Err(e) = slurm_interface.create_submission_script(
&job_name,
&config.base_path,
workflow_id,
output,
poll_interval,
max_parallel_jobs,
Path::new(&script_path),
&config_map,
start_one_worker_per_node,
execution_config.srun_mpi.as_deref(),
tls_ca_cert,
tls_insecure,
startup_delay_seconds,
) {
error!("Error creating submission script: {}", e);
return Err(e.into());
}
match slurm_interface.submit(Path::new(&script_path)) {
Ok((return_code, slurm_job_id, stderr)) => {
if return_code != 0 {
error!("Error submitting job: {}", stderr);
return Err(format!("Job submission failed: {}", stderr).into());
}
let slurm_job_id_int: i64 = slurm_job_id
.parse()
.unwrap_or_else(|_| panic!("Failed to parse Slurm job ID {}", slurm_job_id));
let scheduled_compute_node = models::ScheduledComputeNodesModel::new(
workflow_id,
slurm_job_id_int,
scheduler_config_id,
"slurm".to_string(),
"pending".to_string(),
);
let created_scn = match utils::send_with_retries(
config,
|| {
box_retry_error(
apis::scheduled_compute_nodes_api::create_scheduled_compute_node(
config,
scheduled_compute_node.clone(),
),
)
},
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(scn) => scn,
Err(e) => {
error!("Failed to create scheduled compute node: {}", e);
return Err(
format!("Failed to create scheduled compute node: {}", e).into()
);
}
};
let scn_id = created_scn
.id
.expect("Created scheduled compute node should have an ID");
info!(
"Submitted Slurm job name={} with ID={} (scheduled_compute_node_id={})",
job_name, slurm_job_id_int, scn_id
);
}
Err(e) => {
error!("Error submitting job: {}", e);
return Err(e.into());
}
}
if !keep_submission_scripts && let Err(e) = std::fs::remove_file(&script_path) {
error!("Failed to remove submission script: {}", e);
}
}
Ok(())
}
pub fn compute_startup_delay(total_runners: u32) -> u64 {
match total_runners {
0..=1 => 0,
2..=10 => total_runners as u64,
11..=100 => 10 + ((total_runners - 10) as u64 * 50 / 90), _ => 60,
}
}
pub fn create_node_resources(
interface: &crate::client::hpc::slurm_interface::SlurmInterface,
scheduler_config_id: Option<i64>,
is_subtask: bool,
) -> models::ComputeNodesResources {
let num_cpus_in_node = interface.get_num_cpus() as i64;
let memory_gb_in_node = interface.get_memory_gb();
let num_cpus = if is_subtask {
interface.get_num_cpus_per_task() as i64
} else {
num_cpus_in_node
};
let memory_gb = if is_subtask {
let num_workers = num_cpus_in_node / num_cpus;
memory_gb_in_node / num_workers as f64
} else {
memory_gb_in_node
};
let num_gpus = interface.get_num_gpus() as i64;
let num_nodes = if is_subtask {
1
} else {
interface.get_num_nodes() as i64
};
let mut resources =
models::ComputeNodesResources::new(num_cpus, memory_gb, num_gpus, num_nodes);
resources.scheduler_config_id = scheduler_config_id;
resources
}
pub fn create_compute_node(
config: &Configuration,
workflow_id: i64,
resources: &models::ComputeNodesResources,
hostname: &str,
scheduler: serde_json::Value,
) -> models::ComputeNodeModel {
let pid = std::process::id() as i64;
let mut compute_node = models::ComputeNodeModel::new(
workflow_id,
hostname.to_string(),
pid,
Utc::now().to_rfc3339(),
resources.num_cpus,
resources.memory_gb,
resources.num_gpus,
resources.num_nodes,
"slurm".to_string(),
Some(scheduler),
);
compute_node.is_active = Some(true);
match utils::send_with_retries(
config,
|| {
box_retry_error(apis::compute_nodes_api::create_compute_node(
config,
compute_node.clone(),
))
},
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(node) => node,
Err(e) => {
error!("Error creating compute node: {}", e);
std::process::exit(1);
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlurmErrorPattern {
pub pattern: String,
pub description: String,
pub severity: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AffectedJob {
pub job_id: i64,
pub job_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlurmLogError {
pub file: String,
pub slurm_job_id: String,
pub line_number: usize,
pub line: String,
pub pattern_description: String,
pub severity: String,
pub node: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub affected_jobs: Option<Vec<AffectedJob>>,
}
fn get_slurm_error_patterns() -> Vec<SlurmErrorPattern> {
vec![
SlurmErrorPattern {
pattern: r"(?i)out of memory".to_string(),
description: "Out of memory error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)oom-kill".to_string(),
description: "OOM killer terminated process".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)cannot allocate memory".to_string(),
description: "Memory allocation failure".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)memory cgroup out of memory".to_string(),
description: "Cgroup memory limit exceeded".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)slurmstepd: error:".to_string(),
description: "Slurm step daemon error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)srun: error:".to_string(),
description: "Slurm srun error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)DUE TO TIME LIMIT".to_string(),
description: "Job terminated due to time limit".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)CANCELLED".to_string(),
description: "Job was cancelled".to_string(),
severity: "warning".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)DUE TO PREEMPTION".to_string(),
description: "Job terminated due to preemption".to_string(),
severity: "warning".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)NODE_FAIL".to_string(),
description: "Node failure".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)FAILED".to_string(),
description: "Job failed".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Exceeded job memory limit".to_string(),
description: "Exceeded job memory limit".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)task/cgroup: .*: Killed".to_string(),
description: "Task killed by cgroup".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)No space left on device".to_string(),
description: "Disk full".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Disk quota exceeded".to_string(),
description: "Disk quota exceeded".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Read-only file system".to_string(),
description: "Read-only file system".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Connection refused".to_string(),
description: "Connection refused".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Connection timed out".to_string(),
description: "Connection timed out".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Network is unreachable".to_string(),
description: "Network unreachable".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)CUDA out of memory".to_string(),
description: "CUDA out of memory".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)CUDA error".to_string(),
description: "CUDA error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)GPU memory.*exceeded".to_string(),
description: "GPU memory exceeded".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Segmentation fault".to_string(),
description: "Segmentation fault".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)SIGSEGV".to_string(),
description: "SIGSEGV signal".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Bus error".to_string(),
description: "Bus error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)SIGBUS".to_string(),
description: "SIGBUS signal".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)killed by signal".to_string(),
description: "Process killed by signal".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)core dumped".to_string(),
description: "Core dump generated".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"Traceback \(most recent call last\)".to_string(),
description: "Python traceback".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"ModuleNotFoundError".to_string(),
description: "Python module not found".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"ImportError".to_string(),
description: "Python import error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"std::bad_alloc".to_string(),
description: "C++ memory allocation failure".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"MemoryError".to_string(),
description: "Python memory error".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"(?i)Permission denied".to_string(),
description: "Permission denied".to_string(),
severity: "error".to_string(),
},
SlurmErrorPattern {
pattern: r"slurmstepd: error: .*Exceeded.*step.*limit".to_string(),
description: "Exceeded step resource limit".to_string(),
severity: "error".to_string(),
},
]
}
fn extract_node_from_line(line: &str) -> Option<String> {
let node_patterns = [
r"\b([a-z]+\d+[a-z]*\d*)\b", r"\b([xrc]\d+[a-z]\d+[a-z]\d+[a-z]\d+[a-z]\d+)\b", ];
for pattern in node_patterns.iter() {
if let Ok(re) = Regex::new(pattern)
&& let Some(caps) = re.captures(line)
&& let Some(node) = caps.get(1)
{
return Some(node.as_str().to_string());
}
}
None
}
fn extract_slurm_job_id_from_filename(filename: &str) -> Option<(i64, String)> {
static RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"slurm_output_wf(\d+)_sl(\d+)\.[oe]$").unwrap());
RE.captures(filename).and_then(|caps| {
let wf_id = caps.get(1)?.as_str().parse::<i64>().ok()?;
let slurm_id = caps.get(2)?.as_str().to_string();
Some((wf_id, slurm_id))
})
}
fn extract_torc_job_ids_from_filename(filename: &str) -> Option<(i64, i64)> {
static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"job_wf(\d+)_j(\d+)_").unwrap());
RE.captures(filename).and_then(|caps| {
let wf_id = caps.get(1)?.as_str().parse::<i64>().ok()?;
let job_id = caps.get(2)?.as_str().parse::<i64>().ok()?;
Some((wf_id, job_id))
})
}
fn extract_slurm_job_id_from_line(line: &str) -> Option<String> {
static RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(?i)(?:StepId=|JobId=|SLURM_JOB_ID=|(?:slurm|batch)\s+job\s+(?:ID\s+)?)(\d+)(?:\.\d+)?",
)
.unwrap()
});
RE.captures(line)
.and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
}
fn build_slurm_to_jobs_map(
config: &Configuration,
workflow_id: i64,
) -> HashMap<String, Vec<AffectedJob>> {
let mut slurm_to_jobs: HashMap<String, Vec<AffectedJob>> = HashMap::new();
let scheduled_nodes = match paginate_scheduled_compute_nodes(
config,
workflow_id,
ScheduledComputeNodeListParams::new(),
) {
Ok(nodes) => nodes,
Err(e) => {
warn!(
"Could not fetch scheduled compute nodes for job correlation: {}",
e
);
return slurm_to_jobs;
}
};
let scn_to_slurm: HashMap<i64, String> = scheduled_nodes
.iter()
.filter(|scn| scn.scheduler_type == "slurm")
.filter_map(|scn| scn.id.map(|id| (id, scn.scheduler_id.to_string())))
.collect();
if scn_to_slurm.is_empty() {
return slurm_to_jobs;
}
let compute_nodes =
match paginate_compute_nodes(config, workflow_id, ComputeNodeListParams::new()) {
Ok(nodes) => nodes,
Err(e) => {
warn!("Could not fetch compute nodes for job correlation: {}", e);
return slurm_to_jobs;
}
};
let mut slurm_to_compute_nodes: HashMap<String, Vec<i64>> = HashMap::new();
for node in &compute_nodes {
if node.compute_node_type != "slurm" {
continue;
}
if let Some(scheduler) = &node.scheduler {
if let Some(scn_id) = scheduler.get("scheduler_id").and_then(|v| v.as_i64()) {
if let Some(slurm_job_id) = scn_to_slurm.get(&scn_id)
&& let Some(node_id) = node.id
{
slurm_to_compute_nodes
.entry(slurm_job_id.clone())
.or_default()
.push(node_id);
}
}
}
}
if slurm_to_compute_nodes.is_empty() {
return slurm_to_jobs;
}
let results = match paginate_results(
config,
workflow_id,
ResultListParams::new().with_all_runs(true),
) {
Ok(results) => results,
Err(e) => {
warn!("Could not fetch results for job correlation: {}", e);
return slurm_to_jobs;
}
};
let mut compute_node_to_jobs: HashMap<i64, Vec<i64>> = HashMap::new();
for result in &results {
compute_node_to_jobs
.entry(result.compute_node_id)
.or_default()
.push(result.job_id);
}
let jobs = match paginate_jobs(config, workflow_id, JobListParams::new()) {
Ok(jobs) => jobs,
Err(e) => {
warn!("Could not fetch jobs for job correlation: {}", e);
return slurm_to_jobs;
}
};
let job_id_to_name: HashMap<i64, String> = jobs
.iter()
.filter_map(|j| j.id.map(|id| (id, j.name.clone())))
.collect();
for (slurm_id, compute_node_ids) in &slurm_to_compute_nodes {
let mut affected_jobs: Vec<AffectedJob> = Vec::new();
let mut seen_job_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
for compute_node_id in compute_node_ids {
if let Some(job_ids) = compute_node_to_jobs.get(compute_node_id) {
for job_id in job_ids {
if seen_job_ids.insert(*job_id) {
let job_name = job_id_to_name
.get(job_id)
.cloned()
.unwrap_or_else(|| format!("job_{}", job_id));
affected_jobs.push(AffectedJob {
job_id: *job_id,
job_name,
});
}
}
}
}
if !affected_jobs.is_empty() {
affected_jobs.sort_by_key(|j| j.job_id);
slurm_to_jobs.insert(slurm_id.clone(), affected_jobs);
}
}
slurm_to_jobs
}
fn scan_file_for_slurm_errors(
path: &Path,
initial_slurm_job_id: &str,
compiled_patterns: &[(Regex, &SlurmErrorPattern)],
errors_only: bool,
slurm_to_jobs: &HashMap<String, Vec<AffectedJob>>,
all_errors: &mut Vec<SlurmLogError>,
) -> Option<usize> {
let file = match fs::File::open(path) {
Ok(f) => f,
Err(e) => {
warn!("Could not open file {}: {}", path.display(), e);
return None;
}
};
let file_display = path.display().to_string();
let mut count = 0;
let reader = BufReader::new(file);
for (line_num, line_result) in reader.lines().enumerate() {
let line = match line_result {
Ok(l) => l,
Err(_) => continue,
};
for (regex, pattern) in compiled_patterns {
if errors_only && pattern.severity != "error" {
continue;
}
if regex.is_match(&line) {
let node = extract_node_from_line(&line);
let mut current_slurm_id = initial_slurm_job_id.to_string();
if let Some(extracted_id) = extract_slurm_job_id_from_line(&line) {
current_slurm_id = extracted_id;
}
let affected_jobs = slurm_to_jobs.get(¤t_slurm_id).cloned();
all_errors.push(SlurmLogError {
file: file_display.clone(),
slurm_job_id: current_slurm_id,
line_number: line_num + 1,
line: line.trim().to_string(),
pattern_description: pattern.description.clone(),
severity: pattern.severity.clone(),
node,
affected_jobs,
});
count += 1;
break; }
}
}
Some(count)
}
pub fn parse_slurm_logs(
config: &Configuration,
workflow_id: i64,
output_dir: &PathBuf,
errors_only: bool,
format: &str,
) {
if !output_dir.exists() {
eprintln!(
"Error: Output directory does not exist: {}",
output_dir.display()
);
std::process::exit(1);
}
let all_scheduled_nodes = match paginate_scheduled_compute_nodes(
config,
workflow_id,
ScheduledComputeNodeListParams::new(),
) {
Ok(nodes) => nodes,
Err(e) => {
print_error("listing scheduled compute nodes", &e);
std::process::exit(1);
}
};
let scheduled_nodes: Vec<_> = all_scheduled_nodes
.into_iter()
.filter(|n| n.scheduler_type.to_lowercase() == "slurm")
.collect();
let valid_slurm_job_ids: std::collections::HashSet<String> = scheduled_nodes
.iter()
.map(|n| n.scheduler_id.to_string())
.collect();
if valid_slurm_job_ids.is_empty() {
if format == "json" {
print_json(
&serde_json::json!({
"workflow_id": workflow_id,
"output_dir": output_dir.display().to_string(),
"message": "No Slurm scheduled compute nodes found for this workflow",
"total_issues": 0,
"errors": 0,
"warnings": 0,
"issues": []
}),
"Slurm parse logs",
);
} else {
println!(
"No Slurm scheduled compute nodes found for workflow {}",
workflow_id
);
}
return;
}
info!(
"Found {} Slurm job(s) for workflow {}: {:?}",
valid_slurm_job_ids.len(),
workflow_id,
valid_slurm_job_ids
);
let slurm_to_jobs = build_slurm_to_jobs_map(config, workflow_id);
let mut torc_job_to_slurm_id: HashMap<i64, String> = HashMap::new();
for (slurm_id, affected_jobs) in &slurm_to_jobs {
for job in affected_jobs {
torc_job_to_slurm_id.insert(job.job_id, slurm_id.clone());
}
}
let patterns = get_slurm_error_patterns();
let compiled_patterns: Vec<(Regex, &SlurmErrorPattern)> = patterns
.iter()
.filter_map(|p| Regex::new(&p.pattern).ok().map(|re| (re, p)))
.collect();
let mut all_errors: Vec<SlurmLogError> = Vec::new();
let mut scanned_files = 0;
if let Ok(entries) = fs::read_dir(output_dir) {
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let filename = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};
if filename.starts_with("slurm_output_")
&& let Some((file_wf_id, slurm_job_id)) =
extract_slurm_job_id_from_filename(filename)
&& file_wf_id == workflow_id
&& valid_slurm_job_ids.contains(&slurm_job_id)
{
debug!("Scanning Slurm output file: {}", path.display());
if scan_file_for_slurm_errors(
&path,
&slurm_job_id,
&compiled_patterns,
errors_only,
&slurm_to_jobs,
&mut all_errors,
)
.is_some()
{
scanned_files += 1;
}
}
}
} else {
warn!("Could not read output directory: {}", output_dir.display());
}
let job_stdio_dir = output_dir.join("job_stdio");
if job_stdio_dir.exists()
&& let Ok(entries) = fs::read_dir(&job_stdio_dir)
{
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let filename = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};
if let Some((file_wf_id, job_id)) = extract_torc_job_ids_from_filename(filename)
&& file_wf_id == workflow_id
{
let slurm_job_id = torc_job_to_slurm_id
.get(&job_id)
.cloned()
.unwrap_or_else(|| "unknown".to_string());
debug!("Scanning job stdio file: {}", path.display());
if scan_file_for_slurm_errors(
&path,
&slurm_job_id,
&compiled_patterns,
errors_only,
&slurm_to_jobs,
&mut all_errors,
)
.is_some()
{
scanned_files += 1;
}
}
}
}
info!(
"Scanned {} log file(s) for workflow {}",
scanned_files, workflow_id
);
if format == "json" {
let output = serde_json::json!({
"workflow_id": workflow_id,
"output_dir": output_dir.display().to_string(),
"slurm_jobs_count": valid_slurm_job_ids.len(),
"files_scanned": scanned_files,
"total_issues": all_errors.len(),
"errors": all_errors.iter().filter(|e| e.severity == "error").count(),
"warnings": all_errors.iter().filter(|e| e.severity == "warning").count(),
"issues": all_errors,
});
print_json(&output, "Slurm parse logs");
} else if all_errors.is_empty() {
println!(
"No issues found in Slurm log files for workflow {} (scanned {} file(s) in {})",
workflow_id,
scanned_files,
output_dir.display()
);
} else {
println!("Found {} issue(s) in Slurm log files:\n", all_errors.len());
let mut errors_by_job: HashMap<String, Vec<&SlurmLogError>> = HashMap::new();
for err in &all_errors {
errors_by_job
.entry(err.slurm_job_id.clone())
.or_default()
.push(err);
}
let mut sorted_job_ids: Vec<_> = errors_by_job.keys().cloned().collect();
sorted_job_ids.sort();
for job_id in sorted_job_ids {
let errors = errors_by_job.get(&job_id).unwrap();
let job_label = if job_id == "unknown" || job_id.is_empty() {
"Unknown Slurm Job".to_string()
} else {
format!("Slurm Job {}", job_id)
};
let affected_jobs_info = errors
.first()
.and_then(|e| e.affected_jobs.as_ref())
.map(|jobs| {
let job_list: Vec<String> = jobs
.iter()
.map(|j| format!("{} (ID: {})", j.job_name, j.job_id))
.collect();
format!("\n Affected Torc jobs: {}", job_list.join(", "))
})
.unwrap_or_default();
println!("=== {} ==={}", job_label, affected_jobs_info);
for err in errors {
let severity_marker = match err.severity.as_str() {
"error" => "[ERROR]",
"warning" => "[WARN]",
_ => "[INFO]",
};
let node_info = err
.node
.as_ref()
.map(|n| format!(" (node: {})", n))
.unwrap_or_default();
println!(
" {} {}{}: {}",
severity_marker, err.pattern_description, node_info, err.line
);
println!(" Location: {}:{}", err.file, err.line_number);
}
println!();
}
let error_count = all_errors.iter().filter(|e| e.severity == "error").count();
let warning_count = all_errors
.iter()
.filter(|e| e.severity == "warning")
.count();
println!(
"Summary: {} error(s), {} warning(s)",
error_count, warning_count
);
}
}
#[derive(Tabled, Serialize, Deserialize, Clone)]
pub struct SacctSummaryRow {
#[tabled(rename = "Slurm Job")]
pub slurm_job_id: String,
#[tabled(rename = "Job Step")]
pub job_step: String,
#[tabled(rename = "State")]
pub state: String,
#[tabled(rename = "Exit Code")]
pub exit_code: String,
#[tabled(rename = "Elapsed")]
pub elapsed: String,
#[tabled(rename = "Max RSS")]
pub max_rss: String,
#[tabled(rename = "CPU Time")]
pub cpu_time: String,
#[tabled(rename = "Nodes")]
pub nodes: String,
}
fn extract_state_from_job(job: &serde_json::Value) -> String {
if let Some(state_obj) = job.get("state") {
if let Some(current) = state_obj.get("current") {
if let Some(arr) = current.as_array() {
let states: Vec<&str> = arr.iter().filter_map(|v| v.as_str()).collect();
if !states.is_empty() {
return states.join(", ");
}
} else if let Some(s) = current.as_str() {
return s.to_string();
}
}
if let Some(s) = state_obj.as_str() {
return s.to_string();
}
}
if let Some(job_state) = job.get("job_state") {
if let Some(arr) = job_state.as_array() {
let states: Vec<&str> = arr.iter().filter_map(|v| v.as_str()).collect();
if !states.is_empty() {
return states.join(", ");
}
} else if let Some(s) = job_state.as_str() {
return s.to_string();
}
}
"-".to_string()
}
struct SacctAllocationStats {
max_elapsed_secs: i64,
num_nodes: i64,
max_cpu_time_secs: i64,
}
fn extract_exit_code(entry: &serde_json::Value) -> String {
let exit_code = match entry.get("exit_code") {
Some(e) => e,
None => return "-".to_string(),
};
let return_code = exit_code
.get("return_code")
.and_then(|r| {
r.get("number")
.and_then(|n| n.as_i64())
.or_else(|| r.as_i64())
})
.unwrap_or(0);
let signal = exit_code
.get("signal")
.and_then(|s| {
s.get("id").and_then(|id| {
id.get("number")
.and_then(|n| n.as_i64())
.or_else(|| id.as_i64())
})
})
.unwrap_or(0);
format!("{}:{}", return_code, signal)
}
fn extract_max_rss(entry: &serde_json::Value) -> String {
entry
.get("tres")
.and_then(|t| t.get("requested"))
.and_then(|r| r.get("max"))
.and_then(|m| m.as_array())
.and_then(|arr| {
arr.iter()
.find(|item| item.get("type").and_then(|t| t.as_str()) == Some("mem"))
})
.and_then(|mem| mem.get("count").and_then(|c| c.as_i64()))
.map(|bytes| format_bytes(bytes as u64))
.unwrap_or("-".to_string())
}
fn extract_elapsed_secs(entry: &serde_json::Value) -> Option<i64> {
entry
.get("time")
.and_then(|t| t.get("elapsed"))
.and_then(|e| e.as_i64())
}
fn extract_cpu_time_secs(entry: &serde_json::Value) -> Option<i64> {
entry
.get("time")
.and_then(|t| t.get("total"))
.and_then(|t| t.get("seconds"))
.and_then(|s| s.as_i64())
}
fn parse_sacct_json_to_rows(
sacct_json: &serde_json::Value,
slurm_job_id: &str,
) -> (Vec<SacctSummaryRow>, SacctAllocationStats) {
let mut rows = Vec::new();
let mut max_elapsed_secs: i64 = 0;
let mut max_num_nodes: i64 = 0;
let mut max_cpu_time_secs: i64 = 0;
if let Some(jobs) = sacct_json.get("jobs").and_then(|j| j.as_array()) {
for job in jobs {
if let Some(secs) = extract_elapsed_secs(job) {
max_elapsed_secs = max_elapsed_secs.max(secs);
}
if let Some(secs) = extract_cpu_time_secs(job) {
max_cpu_time_secs = max_cpu_time_secs.max(secs);
}
let alloc_nodes = job.get("nodes").and_then(|n| n.as_str()).unwrap_or("");
if !alloc_nodes.is_empty() {
max_num_nodes = max_num_nodes.max(alloc_nodes.split(',').count() as i64);
}
let steps = job.get("steps").and_then(|s| s.as_array());
if let Some(steps) = steps
&& !steps.is_empty()
{
for step in steps {
rows.push(parse_step_to_row(step, slurm_job_id));
}
continue;
}
let job_step = job
.get("name")
.and_then(|n| n.as_str())
.unwrap_or("")
.to_string();
let state = extract_state_from_job(job);
let exit_code = extract_exit_code(job);
let elapsed_secs = extract_elapsed_secs(job);
let elapsed = elapsed_secs
.map(format_duration_seconds)
.or_else(|| {
job.get("elapsed")
.and_then(|e| e.as_str())
.map(|s| s.to_string())
})
.unwrap_or("-".to_string());
let max_rss = extract_max_rss(job);
let cpu_time_secs = extract_cpu_time_secs(job);
let cpu_time = cpu_time_secs
.map(format_duration_seconds)
.unwrap_or("-".to_string());
let nodes = if alloc_nodes.is_empty() {
"-".to_string()
} else {
alloc_nodes.to_string()
};
rows.push(SacctSummaryRow {
slurm_job_id: slurm_job_id.to_string(),
job_step,
state,
exit_code,
elapsed,
max_rss,
cpu_time,
nodes,
});
}
}
(
rows,
SacctAllocationStats {
max_elapsed_secs,
num_nodes: max_num_nodes,
max_cpu_time_secs,
},
)
}
fn parse_step_to_row(step: &serde_json::Value, slurm_job_id: &str) -> SacctSummaryRow {
let step_name = step
.get("step")
.and_then(|s| {
s.get("name")
.and_then(|n| n.as_str())
.or_else(|| s.get("id").and_then(|i| i.as_str()))
})
.unwrap_or("")
.to_string();
let state = step
.get("state")
.and_then(|s| {
if let Some(arr) = s.as_array() {
let states: Vec<&str> = arr.iter().filter_map(|v| v.as_str()).collect();
if !states.is_empty() {
return Some(states.join(", "));
}
}
if let Some(current) = s.get("current")
&& let Some(arr) = current.as_array()
{
let states: Vec<&str> = arr.iter().filter_map(|v| v.as_str()).collect();
if !states.is_empty() {
return Some(states.join(", "));
}
}
s.as_str().map(|s| s.to_string())
})
.unwrap_or("-".to_string());
let exit_code = extract_exit_code(step);
let elapsed_secs = extract_elapsed_secs(step);
let elapsed = elapsed_secs
.map(format_duration_seconds)
.unwrap_or("-".to_string());
let max_rss = extract_max_rss(step);
let cpu_time_secs = extract_cpu_time_secs(step);
let cpu_time = cpu_time_secs
.map(format_duration_seconds)
.unwrap_or("-".to_string());
let nodes = step
.get("nodes")
.and_then(|n| {
n.get("range")
.and_then(|r| r.as_str())
.or_else(|| n.as_str())
})
.unwrap_or("-")
.to_string();
SacctSummaryRow {
slurm_job_id: slurm_job_id.to_string(),
job_step: step_name,
state,
exit_code,
elapsed,
max_rss,
cpu_time,
nodes,
}
}
fn format_duration_seconds(secs: i64) -> String {
if secs < 60 {
format!("{}s", secs)
} else if secs < 3600 {
format!("{}m {}s", secs / 60, secs % 60)
} else {
let hours = secs / 3600;
let mins = (secs % 3600) / 60;
format!("{}h {}m", hours, mins)
}
}
fn format_bytes(bytes: u64) -> String {
if bytes < 1024 {
format!("{}B", bytes)
} else if bytes < 1024 * 1024 {
format!("{:.1}KB", bytes as f64 / 1024.0)
} else if bytes < 1024 * 1024 * 1024 {
format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
} else {
format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
}
}
fn fetch_sacct_for_workflow(
config: &Configuration,
workflow_id: i64,
save_json: bool,
output_dir: Option<&PathBuf>,
) -> (
Vec<models::ScheduledComputeNodesModel>,
Vec<SacctSummaryRow>,
Vec<SacctAllocationStats>,
Vec<String>,
) {
let all_nodes = match paginate_scheduled_compute_nodes(
config,
workflow_id,
ScheduledComputeNodeListParams::new(),
) {
Ok(nodes) => nodes,
Err(e) => {
print_error("listing scheduled compute nodes", &e);
std::process::exit(1);
}
};
let nodes: Vec<_> = all_nodes
.into_iter()
.filter(|n| n.scheduler_type.to_lowercase() == "slurm")
.collect();
let mut all_summary_rows: Vec<SacctSummaryRow> = Vec::new();
let mut all_stats: Vec<SacctAllocationStats> = Vec::new();
let mut errors: Vec<String> = Vec::new();
for node in &nodes {
let slurm_job_id = node.scheduler_id.to_string();
info!("Running sacct for Slurm job ID: {}", slurm_job_id);
let mut sacct_cmd = Command::new("sacct");
sacct_cmd.args(["-j", &slurm_job_id, "--json"]);
debug!(
"sacct command for Slurm job {}: {:?}",
slurm_job_id, sacct_cmd
);
let sacct_result = sacct_cmd.output();
match sacct_result {
Ok(output) => {
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
match serde_json::from_str::<serde_json::Value>(&stdout) {
Ok(sacct_json) => {
let (rows, stats) =
parse_sacct_json_to_rows(&sacct_json, &slurm_job_id);
all_summary_rows.extend(rows);
all_stats.push(stats);
if save_json && let Some(dir) = output_dir {
let output_file = dir.join(format!("sacct_{}.json", slurm_job_id));
if let Err(e) = fs::write(&output_file, stdout.as_bytes()) {
error!(
"Failed to write sacct output for job {}: {}",
slurm_job_id, e
);
errors.push(format!(
"Job {}: Failed to write output: {}",
slurm_job_id, e
));
} else {
info!("Saved sacct output to {}", output_file.display());
}
}
}
Err(e) => {
error!("Failed to parse sacct JSON for job {}: {}", slurm_job_id, e);
errors
.push(format!("Job {}: Invalid JSON output: {}", slurm_job_id, e));
}
}
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("sacct command failed for job {}: {}", slurm_job_id, stderr);
errors.push(format!("Job {}: sacct failed: {}", slurm_job_id, stderr));
}
}
Err(e) => {
error!("Failed to run sacct for job {}: {}", slurm_job_id, e);
errors.push(format!(
"Job {}: Failed to execute sacct: {}",
slurm_job_id, e
));
}
}
}
(nodes, all_summary_rows, all_stats, errors)
}
pub fn run_sacct_for_workflow(
config: &Configuration,
workflow_id: i64,
output_dir: &PathBuf,
save_json: bool,
format: &str,
) {
if save_json && let Err(e) = fs::create_dir_all(output_dir) {
eprintln!("Error creating output directory: {}", e);
std::process::exit(1);
}
let (nodes, all_summary_rows, _, errors) =
fetch_sacct_for_workflow(config, workflow_id, save_json, Some(output_dir));
if nodes.is_empty() {
if format == "json" {
print_json(
&serde_json::json!({
"workflow_id": workflow_id,
"message": "No Slurm scheduled compute nodes found",
"summary": []
}),
"Slurm sacct",
);
} else {
println!(
"No Slurm scheduled compute nodes found for workflow {}",
workflow_id
);
}
return;
}
if format == "json" {
let output = serde_json::json!({
"workflow_id": workflow_id,
"total_slurm_jobs": nodes.len(),
"summary": all_summary_rows,
"errors": errors,
});
print_json(&output, "Slurm sacct");
} else if all_summary_rows.is_empty() && errors.is_empty() {
println!(
"No sacct data available for workflow {} (checked {} Slurm job(s))",
workflow_id,
nodes.len()
);
} else {
println!("Slurm Accounting Summary for Workflow {}\n", workflow_id);
if !all_summary_rows.is_empty() {
display_table_with_count(&all_summary_rows, "job steps");
}
if !errors.is_empty() {
println!("\nErrors:");
for err in &errors {
println!(" {}", err);
}
}
if save_json {
println!("\nFull JSON saved to: {}", output_dir.display());
}
}
}
fn run_usage_for_workflow(config: &Configuration, workflow_id: i64, format: &str) {
let (nodes, _, all_stats, errors) = fetch_sacct_for_workflow(config, workflow_id, false, None);
if nodes.is_empty() {
if format == "json" {
print_json(
&serde_json::json!({
"workflow_id": workflow_id,
"total_slurm_jobs": 0,
"total_nodes": 0,
"total_node_time": "0s",
"total_node_time_seconds": 0,
"total_cpu_time": "0s",
"total_cpu_time_seconds": 0,
}),
"Slurm usage",
);
} else {
println!(
"No Slurm scheduled compute nodes found for workflow {}",
workflow_id
);
}
return;
}
let mut total_nodes: i64 = 0;
let mut total_node_secs: i64 = 0;
let mut total_cpu_time_secs: i64 = 0;
let mut unknown_node_count: usize = 0;
for stats in &all_stats {
total_cpu_time_secs += stats.max_cpu_time_secs;
if stats.num_nodes > 0 {
total_nodes += stats.num_nodes;
total_node_secs += stats.max_elapsed_secs * stats.num_nodes;
} else {
unknown_node_count += 1;
}
}
let total_node_time = format_duration_seconds(total_node_secs);
let total_cpu_time = format_duration_seconds(total_cpu_time_secs);
if format == "json" {
let mut output = serde_json::json!({
"workflow_id": workflow_id,
"total_slurm_jobs": nodes.len(),
"total_nodes": total_nodes,
"total_node_time": total_node_time,
"total_node_time_seconds": total_node_secs,
"total_cpu_time": total_cpu_time,
"total_cpu_time_seconds": total_cpu_time_secs,
"errors": errors,
});
if unknown_node_count > 0 {
output["unknown_node_count_allocations"] = serde_json::json!(unknown_node_count);
}
print_json(&output, "Slurm usage");
} else {
println!("Workflow {}", workflow_id);
println!("Slurm allocations: {}", nodes.len());
println!("Total nodes: {}", total_nodes);
println!("Total node time: {}", total_node_time);
println!("Total CPU time: {}", total_cpu_time);
if unknown_node_count > 0 {
println!(
"\nWarning: {} allocation(s) had unknown node count (excluded from totals)",
unknown_node_count
);
}
if !errors.is_empty() {
println!("\nErrors:");
for err in &errors {
println!(" {}", err);
}
}
}
}
#[derive(Debug, Clone, Serialize)]
struct SbatchEstimate {
strategy: String,
nodes: i64,
estimated_start: Option<String>,
estimated_completion: Option<String>,
wait_seconds: Option<i64>,
completion_seconds: Option<i64>,
success: bool,
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct PlanAllocationsResult {
pub workflow_analysis: WorkflowAnalysisInfo,
pub cluster_state: Vec<ClusterStateInfo>,
pub recommendations: Vec<AllocationRecommendation>,
pub warnings: Vec<String>,
pub resource_groups: Vec<ResourceGroupInfo>,
pub profile_name: String,
pub profile_display_name: String,
pub account: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct WorkflowAnalysisInfo {
pub total_jobs: usize,
pub total_instances: usize,
pub dependency_depth: usize,
pub max_parallelism: usize,
pub max_parallelism_level: usize,
pub resource_groups: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClusterStateInfo {
pub partition: String,
pub idle: u32,
pub mixed: u32,
pub allocated: u32,
pub down: u32,
pub total: u32,
pub pending_jobs: u32,
pub pending_nodes: u32,
pub running_jobs: u32,
}
#[derive(Debug, Clone, Serialize)]
pub struct ResourceGroupInfo {
pub name: String,
pub partition: Option<String>,
pub job_count: usize,
pub walltime: String,
pub ideal_nodes: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct AllocationRecommendation {
group_name: String,
partition: Option<String>,
num_allocations: i64,
nodes_per_allocation: i64,
total_nodes: i64,
strategy: String,
reason: String,
walltime: String,
job_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
sbatch_estimates: Option<Vec<SbatchEstimate>>,
}
fn build_sbatch_estimate(
strategy: &str,
nodes: i64,
walltime_secs: u64,
result: &crate::client::hpc::slurm::SbatchTestResult,
now: chrono::NaiveDateTime,
) -> SbatchEstimate {
if let Some(start) = result.estimated_start {
let wait = (start - now).num_seconds();
let completion = wait + walltime_secs as i64;
let completion_dt = start + chrono::Duration::seconds(walltime_secs as i64);
SbatchEstimate {
strategy: strategy.to_string(),
nodes,
estimated_start: Some(start.format("%Y-%m-%dT%H:%M:%S").to_string()),
estimated_completion: Some(completion_dt.format("%Y-%m-%dT%H:%M:%S").to_string()),
wait_seconds: Some(std::cmp::max(0, wait)),
completion_seconds: Some(std::cmp::max(0, completion)),
success: true,
error: None,
}
} else {
SbatchEstimate {
strategy: strategy.to_string(),
nodes,
estimated_start: None,
estimated_completion: None,
wait_seconds: None,
completion_seconds: None,
success: false,
error: result.error_message.clone(),
}
}
}
fn run_sbatch_estimates(
plan: &crate::client::scheduler_plan::SchedulerPlan,
account: &str,
) -> HashMap<String, Vec<SbatchEstimate>> {
use crate::client::hpc::slurm::run_sbatch_test_only;
let mut results: HashMap<String, Vec<SbatchEstimate>> = HashMap::new();
let now = chrono::Local::now().naive_local();
for scheduler in &plan.schedulers {
let partition = scheduler.partition.as_deref();
let walltime = &scheduler.walltime;
let nodes_per_alloc = scheduler.nodes;
let total_nodes = scheduler.nodes * scheduler.num_allocations;
let gres = scheduler.gres.as_deref();
let qos = scheduler.qos.as_deref();
if total_nodes <= nodes_per_alloc {
continue;
}
let walltime_secs = crate::client::hpc::slurm::parse_slurm_timelimit(walltime);
let mut estimates = Vec::new();
let large_result =
run_sbatch_test_only(account, partition, total_nodes as u32, walltime, qos, gres);
estimates.push(build_sbatch_estimate(
"single-large",
total_nodes,
walltime_secs,
&large_result,
now,
));
let small_result = run_sbatch_test_only(
account,
partition,
nodes_per_alloc as u32,
walltime,
qos,
gres,
);
estimates.push(build_sbatch_estimate(
"many-small",
nodes_per_alloc,
walltime_secs,
&small_result,
now,
));
results.insert(scheduler.resource_requirements.clone(), estimates);
}
results
}
fn format_duration_human(secs: i64) -> String {
if secs < 60 {
format!("{}s", secs)
} else if secs < 3600 {
format!("{}min", secs / 60)
} else {
let hours = secs / 3600;
let mins = (secs % 3600) / 60;
if mins > 0 {
format!("{}h {}min", hours, mins)
} else {
format!("{}h", hours)
}
}
}
fn compute_recommendations(
plan: &crate::client::scheduler_plan::SchedulerPlan,
cluster_state: &HashMap<
String,
(
crate::client::hpc::slurm::PartitionAvailability,
crate::client::hpc::slurm::QueueDepthInfo,
),
>,
sbatch_estimates: &HashMap<String, Vec<SbatchEstimate>>,
offline: bool,
) -> Vec<AllocationRecommendation> {
let mut recommendations = Vec::new();
for scheduler in &plan.schedulers {
let partition_name = scheduler.partition.as_deref();
let ideal_nodes = scheduler.num_allocations;
let estimates = sbatch_estimates.get(&scheduler.resource_requirements);
let cluster_entry = partition_name.and_then(|p| cluster_state.get(p));
if offline {
recommendations.push(AllocationRecommendation {
group_name: scheduler.resource_requirements.clone(),
partition: partition_name.map(|s| s.to_string()),
num_allocations: ideal_nodes,
nodes_per_allocation: 1,
total_nodes: ideal_nodes,
strategy: "default".to_string(),
reason: format!(
"Offline mode: {} allocations x 1 node (ideal from workflow analysis)",
ideal_nodes
),
walltime: scheduler.walltime.clone(),
job_count: scheduler.job_count,
sbatch_estimates: None,
});
continue;
}
let (idle, _mixed, _total, queue_pressure) = if let Some((avail, queue)) = cluster_entry {
let total = avail.total as i64;
let pressure = if total > 0 {
queue.pending_nodes as f64 / total as f64
} else {
0.0
};
(avail.idle as i64, avail.mixed as i64, total, pressure)
} else {
(0_i64, 0_i64, 0_i64, 0.0)
};
let (mut strategy, mut num_allocs, mut nodes_per_alloc, mut reason) = if ideal_nodes <= 1 {
("single", 1_i64, 1_i64, "Only 1 node needed".to_string())
} else if idle >= ideal_nodes && queue_pressure < 0.5 {
(
"single",
1_i64,
ideal_nodes,
format!(
"{} idle nodes >= {} needed, low queue pressure ({:.0}%)",
idle,
ideal_nodes,
queue_pressure * 100.0,
),
)
} else if idle < (ideal_nodes + 2) / 3 || queue_pressure > 2.0 {
(
"many-small",
ideal_nodes,
1_i64,
format!(
"{} idle nodes < {} needed/3, or high queue pressure ({:.0}%): \
small allocations start as nodes become available",
idle,
ideal_nodes,
queue_pressure * 100.0,
),
)
} else {
let chunk_size = std::cmp::max(
2,
std::cmp::min(idle / 2, std::cmp::min(8, ideal_nodes / 2)),
);
let num_chunks = (ideal_nodes + chunk_size - 1) / chunk_size;
(
"chunked",
num_chunks,
chunk_size,
format!(
"{} idle nodes partially covers {} needed: \
{} allocations x {} nodes balances start time vs overhead",
idle, ideal_nodes, num_chunks, chunk_size,
),
)
};
if let Some(ests) = estimates {
let large_est = ests.iter().find(|e| e.strategy == "single-large");
let small_est = ests.iter().find(|e| e.strategy == "many-small");
if let (Some(large), Some(small)) = (large_est, small_est)
&& let (Some(large_completion), Some(small_completion)) =
(large.completion_seconds, small.completion_seconds)
{
let small_wait = small.wait_seconds.unwrap_or(0);
let fairshare_factor = std::cmp::min(ideal_nodes, 10);
let estimated_last_small_completion =
small_wait * fairshare_factor + (small_completion - small_wait);
if large_completion <= estimated_last_small_completion {
strategy = "single";
num_allocs = 1;
nodes_per_alloc = ideal_nodes;
reason = format!(
"sbatch --test-only: large ({} nodes) completes in ~{}, \
faster than {} small allocations (~{}). \
Slurm prioritizes larger allocations",
ideal_nodes,
format_duration_human(large_completion),
ideal_nodes,
format_duration_human(estimated_last_small_completion),
);
} else if strategy != "single" {
reason = format!(
"{}. sbatch --test-only confirms: small allocations \
complete sooner (large: ~{}, small: ~{})",
reason,
format_duration_human(large_completion),
format_duration_human(estimated_last_small_completion),
);
}
}
}
recommendations.push(AllocationRecommendation {
group_name: scheduler.resource_requirements.clone(),
partition: partition_name.map(|s| s.to_string()),
num_allocations: num_allocs,
nodes_per_allocation: nodes_per_alloc,
total_nodes: num_allocs * nodes_per_alloc,
strategy: strategy.to_string(),
reason,
walltime: scheduler.walltime.clone(),
job_count: scheduler.job_count,
sbatch_estimates: estimates.cloned(),
});
}
recommendations
}
#[allow(clippy::too_many_arguments)]
pub fn analyze_plan_allocations(
spec: &mut WorkflowSpec,
account: &str,
partition: Option<&str>,
profile: &crate::client::hpc::HpcProfile,
offline: bool,
skip_test_only: bool,
group_by: GroupByStrategy,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
) -> Result<PlanAllocationsResult, String> {
spec.expand_parameters()
.map_err(|e| format!("Failed to expand parameters: {}", e))?;
let rr_vec = spec.resource_requirements.as_deref().unwrap_or(&[]);
let rr_map: HashMap<&str, &ResourceRequirementsSpec> =
rr_vec.iter().map(|rr| (rr.name.as_str(), rr)).collect();
if rr_map.is_empty() {
return Err("Workflow has no resource_requirements defined.".to_string());
}
let mut graph = WorkflowGraph::from_spec(spec)
.map_err(|e| format!("Failed to build workflow graph: {}", e))?;
let levels = graph
.topological_levels()
.map_err(|e| format!("Failed to compute topological levels: {}", e))?
.clone();
let max_parallelism: usize = levels
.iter()
.map(|level| {
level
.iter()
.filter_map(|name| graph.get_job(name))
.map(|node| node.instance_count)
.sum()
})
.max()
.unwrap_or(0);
let max_parallelism_level = levels
.iter()
.enumerate()
.max_by_key(|(_, level)| -> usize {
level
.iter()
.filter_map(|name| graph.get_job(name))
.map(|node| node.instance_count)
.sum()
})
.map(|(i, _)| i)
.unwrap_or(0);
use crate::client::scheduler_plan::{SchedulerOverrides, generate_scheduler_plan};
let overrides = SchedulerOverrides {
partition: partition.map(|s| s.to_string()),
walltime_secs: None,
};
let plan = generate_scheduler_plan(
&graph,
&rr_map,
profile,
account,
false, group_by,
walltime_strategy,
walltime_multiplier,
false, None,
false,
&overrides,
);
let resource_groups: Vec<ResourceGroupInfo> = plan
.schedulers
.iter()
.map(|s| ResourceGroupInfo {
name: s.resource_requirements.clone(),
partition: Some(s.resolved_partition.clone()),
job_count: s.job_count,
walltime: s.walltime.clone(),
ideal_nodes: s.num_allocations,
})
.collect();
use crate::client::hpc::slurm::{PartitionAvailability, QueueDepthInfo};
let mut cluster_state_map: HashMap<String, (PartitionAvailability, QueueDepthInfo)> =
HashMap::new();
if !offline {
let partitions: Vec<String> = plan
.schedulers
.iter()
.map(|s| s.resolved_partition.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
for part_name in &partitions {
let avail =
match crate::client::hpc::slurm::query_partition_availability(Some(part_name)) {
Ok(mut v) => v.pop().unwrap_or(PartitionAvailability {
partition: part_name.clone(),
idle: 0,
mixed: 0,
allocated: 0,
down: 0,
total: 0,
}),
Err(e) => {
warn!("Failed to query sinfo for partition '{}': {}", part_name, e);
PartitionAvailability {
partition: part_name.clone(),
idle: 0,
mixed: 0,
allocated: 0,
down: 0,
total: 0,
}
}
};
let queue = match crate::client::hpc::slurm::query_queue_depth(Some(part_name)) {
Ok(mut v) => v.pop().unwrap_or(QueueDepthInfo {
partition: part_name.clone(),
pending_jobs: 0,
pending_nodes: 0,
running_jobs: 0,
}),
Err(e) => {
warn!(
"Failed to query squeue for partition '{}': {}",
part_name, e
);
QueueDepthInfo {
partition: part_name.clone(),
pending_jobs: 0,
pending_nodes: 0,
running_jobs: 0,
}
}
};
cluster_state_map.insert(part_name.clone(), (avail, queue));
}
}
let sbatch_estimates = if !offline && !skip_test_only {
run_sbatch_estimates(&plan, account)
} else {
HashMap::new()
};
let recommendations =
compute_recommendations(&plan, &cluster_state_map, &sbatch_estimates, offline);
let mut cluster_state: Vec<ClusterStateInfo> = cluster_state_map
.iter()
.map(|(_, (a, q))| ClusterStateInfo {
partition: a.partition.clone(),
idle: a.idle,
mixed: a.mixed,
allocated: a.allocated,
down: a.down,
total: a.total,
pending_jobs: q.pending_jobs,
pending_nodes: q.pending_nodes,
running_jobs: q.running_jobs,
})
.collect();
cluster_state.sort_by(|a, b| a.partition.cmp(&b.partition));
Ok(PlanAllocationsResult {
workflow_analysis: WorkflowAnalysisInfo {
total_jobs: graph.job_count(),
total_instances: graph.total_instance_count(),
dependency_depth: levels.len(),
max_parallelism,
max_parallelism_level,
resource_groups: plan.schedulers.len(),
},
cluster_state,
recommendations,
warnings: plan.warnings.clone(),
resource_groups,
profile_name: profile.name.clone(),
profile_display_name: profile.display_name.clone(),
account: account.to_string(),
})
}
#[allow(clippy::too_many_arguments)]
fn handle_plan_allocations(
workflow_file: &PathBuf,
account: Option<&str>,
partition: Option<&str>,
profile_name: Option<&str>,
offline: bool,
skip_test_only: bool,
group_by: GroupByStrategy,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
format: &str,
) {
let torc_config = TorcConfig::load().unwrap_or_default();
let registry = create_registry_with_config_public(&torc_config.client.hpc);
let profile = if let Some(n) = profile_name {
registry.get(n)
} else {
registry.detect()
};
let profile = match profile {
Some(p) => p,
None => {
if let Some(name) = profile_name {
eprintln!("Unknown HPC profile: {}", name);
} else {
eprintln!("No HPC profile specified and no system detected.");
eprintln!("Use --profile <name> or run on an HPC system.");
}
std::process::exit(1);
}
};
let mut spec: WorkflowSpec = match WorkflowSpec::from_spec_file(workflow_file) {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to parse workflow file: {}", e);
std::process::exit(1);
}
};
let resolved_account = if let Some(acct) = account {
acct.to_string()
} else if let Some(ref defaults) = spec.slurm_defaults {
defaults
.0
.get("account")
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| {
eprintln!(
"Error: No account specified. Use --account or set 'account' in slurm_defaults."
);
std::process::exit(1);
})
} else {
eprintln!("Error: No account specified. Use --account or set 'account' in slurm_defaults.");
std::process::exit(1);
};
let result = match analyze_plan_allocations(
&mut spec,
&resolved_account,
partition,
&profile,
offline,
skip_test_only,
group_by,
walltime_strategy,
walltime_multiplier,
) {
Ok(r) => r,
Err(e) => {
eprintln!("Error: {}", e);
std::process::exit(1);
}
};
if format == "json" {
print_json(&result, "plan allocations");
} else {
let wa = &result.workflow_analysis;
println!("Workflow Analysis");
println!("=================");
println!(" Total jobs: {}", wa.total_jobs);
println!(" Total instances: {}", wa.total_instances);
println!(" Dependency depth: {} levels", wa.dependency_depth);
println!(
" Max parallelism: {} instances (at level {})",
wa.max_parallelism, wa.max_parallelism_level
);
println!(" Resource groups: {}", wa.resource_groups);
println!();
for rg in &result.resource_groups {
println!(
"Resource Group: \"{}\" (partition: {})",
rg.name,
rg.partition.as_deref().unwrap_or("<default>")
);
println!(
" Jobs: {}, Walltime: {}, Ideal nodes: {}",
rg.job_count, rg.walltime, rg.ideal_nodes
);
println!();
}
if !result.cluster_state.is_empty() {
for cs in &result.cluster_state {
println!("Cluster State (partition: {})", cs.partition);
println!(
" Idle: {}, Mixed: {}, Allocated: {}, Down: {}, Total: {}",
cs.idle, cs.mixed, cs.allocated, cs.down, cs.total
);
println!(
" Queue depth: {} pending jobs ({} pending nodes), {} running jobs",
cs.pending_jobs, cs.pending_nodes, cs.running_jobs
);
println!();
}
} else if !offline {
println!("Cluster State: unavailable (no partitions resolved)");
println!();
}
println!("Recommendations");
println!("===============");
for rec in &result.recommendations {
println!(
" \"{}\": {} allocation(s) x {} node(s) [{}]",
rec.group_name, rec.num_allocations, rec.nodes_per_allocation, rec.strategy
);
println!(" {}", rec.reason);
println!(
" Total nodes: {}, Walltime: {}, Jobs: {}",
rec.total_nodes, rec.walltime, rec.job_count
);
if let Some(ref estimates) = rec.sbatch_estimates {
println!();
println!(" Scheduler Estimate (sbatch --test-only):");
for est in estimates {
if est.success {
let wait_str = est
.wait_seconds
.map(format_duration_human)
.unwrap_or_else(|| "unknown".to_string());
let completion_str = est
.completion_seconds
.map(format_duration_human)
.unwrap_or_else(|| "unknown".to_string());
let label = if est.strategy == "single-large" {
format!("Single large ({} nodes)", est.nodes)
} else {
"Many small (1 node)".to_string()
};
println!(
" {}: start in ~{}, complete in ~{}",
label, wait_str, completion_str
);
if est.strategy == "many-small" {
println!(
" Note: estimate is for first job only; later jobs \
delayed by fair-share"
);
}
} else {
let label = if est.strategy == "single-large" {
format!("Single large ({} nodes)", est.nodes)
} else {
"Many small (1 node)".to_string()
};
let err = est.error.as_deref().unwrap_or("unknown error");
println!(" {}: failed ({})", label, err);
}
}
}
println!();
match rec.strategy.as_str() {
"single" if rec.total_nodes > 1 => {
println!(
" Suggested: torc slurm generate --account {} --single-allocation {}",
result.account,
workflow_file.display()
);
}
"many-small" => {
println!(
" Suggested: torc slurm generate --account {} {}",
result.account,
workflow_file.display()
);
}
"chunked" => {
println!(
" Note: Chunked allocations require manual configuration or multiple",
);
println!(
" schedulers. Use 'torc slurm generate' and adjust nodes per scheduler."
);
}
_ => {}
}
println!();
}
if !result.warnings.is_empty() {
println!("Warnings:");
for warning in &result.warnings {
println!(" - {}", warning);
}
}
println!(
"Profile: {} ({})",
result.profile_display_name, result.profile_name
);
}
}
#[allow(clippy::too_many_arguments)]
fn handle_generate(
workflow_file: &PathBuf,
account: Option<&str>,
profile_name: Option<&str>,
output: Option<&PathBuf>,
single_allocation: bool,
group_by: GroupByStrategy,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
no_actions: bool,
force: bool,
dry_run: bool,
format: &str,
) {
let torc_config = TorcConfig::load().unwrap_or_default();
let registry = create_registry_with_config_public(&torc_config.client.hpc);
let profile = if let Some(n) = profile_name {
registry.get(n)
} else {
registry.detect()
};
let profile = match profile {
Some(p) => p,
None => {
if let Some(name) = profile_name {
eprintln!("Unknown HPC profile: {}", name);
} else {
eprintln!("No HPC profile specified and no system detected.");
eprintln!("Use --profile <name> or run on an HPC system.");
}
std::process::exit(1);
}
};
let mut spec: WorkflowSpec = match WorkflowSpec::from_spec_file(workflow_file) {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to parse workflow file: {}", e);
std::process::exit(1);
}
};
let resolved_account = if let Some(acct) = account {
acct.to_string()
} else if let Some(ref defaults) = spec.slurm_defaults {
defaults
.0
.get("account")
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| {
eprintln!(
"Error: No account specified. Use --account or set 'account' in slurm_defaults."
);
std::process::exit(1);
})
} else {
eprintln!("Error: No account specified. Use --account or set 'account' in slurm_defaults.");
std::process::exit(1);
};
let result = match generate_schedulers_for_workflow(
&mut spec,
&profile,
&resolved_account,
single_allocation,
group_by,
walltime_strategy,
walltime_multiplier,
!no_actions,
force,
) {
Ok(r) => r,
Err(e) => {
eprintln!("Error: {}", e);
std::process::exit(1);
}
};
if dry_run {
#[derive(Serialize)]
struct GenerateDryRunResult<'a> {
dry_run: bool,
scheduler_count: usize,
action_count: usize,
profile_name: &'a str,
profile_display_name: &'a str,
slurm_schedulers: &'a Option<Vec<crate::client::workflow_spec::SlurmSchedulerSpec>>,
actions: &'a Option<Vec<crate::client::workflow_spec::WorkflowActionSpec>>,
warnings: &'a [String],
}
let dry_run_result = GenerateDryRunResult {
dry_run: true,
scheduler_count: result.scheduler_count,
action_count: result.action_count,
profile_name: &profile.name,
profile_display_name: &profile.display_name,
slurm_schedulers: &spec.slurm_schedulers,
actions: &spec.actions,
warnings: &result.warnings,
};
if format == "json" {
print_json(&dry_run_result, "dry run result");
} else {
println!("[DRY RUN] Would generate the following Slurm schedulers:");
println!();
if let Some(schedulers) = &spec.slurm_schedulers {
for sched in schedulers {
println!(
" Scheduler: {} (account: {}, partition: {}, walltime: {}, nodes: {})",
sched.name.as_deref().unwrap_or("unnamed"),
sched.account,
sched.partition.as_deref().unwrap_or("<default>"),
sched.walltime,
sched.nodes
);
}
}
if !no_actions {
println!();
println!(
"[DRY RUN] Would add {} workflow action(s)",
result.action_count
);
}
println!();
println!("Profile: {} ({})", profile.display_name, profile.name);
if !result.warnings.is_empty() {
println!();
println!("Warnings:");
for warning in &result.warnings {
println!(" - {}", warning);
}
}
}
return;
}
let format_ext = if let Some(out_path) = output {
out_path.extension().and_then(|e| e.to_str())
} else {
workflow_file.extension().and_then(|e| e.to_str())
};
let output_content = match format_ext {
Some("json") => serde_json::to_string_pretty(&spec).unwrap(),
Some("json5") => serde_json::to_string_pretty(&spec).unwrap(), Some("kdl") => spec.to_kdl_str(),
Some("yaml") | Some("yml") => pretty_print_yaml(&spec),
_ => serde_json::to_string_pretty(&spec).unwrap(), };
if let Some(output_path) = output {
match std::fs::write(output_path, &output_content) {
Ok(_) => {
if format != "json" {
println!("Generated workflow written to: {}", output_path.display());
println!();
println!("Summary:");
println!(" Schedulers generated: {}", result.scheduler_count);
println!(" Actions added: {}", result.action_count);
println!(
" Profile used: {} ({})",
profile.display_name, profile.name
);
if !result.warnings.is_empty() {
println!();
println!("Warnings:");
for warning in &result.warnings {
println!(" - {}", warning);
}
}
}
}
Err(e) => {
eprintln!("Failed to write output file: {}", e);
std::process::exit(1);
}
}
} else {
if format == "json" {
print_json(&spec, "workflow spec");
} else {
println!("{}", output_content);
eprintln!();
eprintln!("// Summary:");
eprintln!("// Schedulers generated: {}", result.scheduler_count);
eprintln!("// Actions added: {}", result.action_count);
eprintln!(
"// Profile used: {} ({})",
profile.display_name, profile.name
);
if !result.warnings.is_empty() {
eprintln!("//");
eprintln!("// Warnings:");
for warning in &result.warnings {
eprintln!("// - {}", warning);
}
}
}
}
}
fn pretty_print_yaml(spec: &WorkflowSpec) -> String {
let yaml = serde_yaml::to_string(spec).unwrap();
let mut result = String::new();
let mut prev_was_section_start = false;
for line in yaml.lines() {
let trimmed = line.trim_start();
let is_top_level = if trimmed.is_empty() {
false
} else if line.starts_with(' ') || line.starts_with('-') {
false
} else if trimmed.starts_with("---")
|| trimmed.starts_with("...")
|| trimmed.starts_with('#')
{
false
} else {
trimmed.contains(':')
};
if is_top_level && !result.is_empty() && !prev_was_section_start {
result.push('\n');
}
result.push_str(line);
result.push('\n');
prev_was_section_start = is_top_level;
}
result
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RegenerateResult {
pub workflow_id: i64,
pub pending_jobs: usize,
pub schedulers_created: Vec<SchedulerInfo>,
pub total_allocations: i64,
pub allocations_submitted: i64,
pub allocations_deferred: i64,
pub warnings: Vec<String>,
pub submitted: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannedSchedulerInfo {
pub name: String,
pub account: String,
pub partition: Option<String>,
pub walltime: String,
pub mem: Option<String>,
pub nodes: i64,
pub num_allocations: i64,
pub job_count: usize,
pub job_names: Vec<String>,
pub has_dependencies: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegenerateDryRunResult {
pub dry_run: bool,
pub workflow_id: i64,
pub pending_jobs: usize,
pub profile_name: String,
pub profile_display_name: String,
pub planned_schedulers: Vec<PlannedSchedulerInfo>,
pub total_allocations: i64,
pub would_submit: bool,
pub warnings: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SchedulerInfo {
pub id: i64,
pub name: String,
pub account: String,
pub partition: Option<String>,
pub walltime: String,
pub nodes: i64,
pub num_allocations: i64,
pub job_count: usize,
pub has_dependencies: bool,
}
#[allow(clippy::too_many_arguments, clippy::result_large_err)]
fn handle_regenerate(
config: &Configuration,
workflow_id: i64,
account: Option<&str>,
profile_name: Option<&str>,
partition: Option<&str>,
walltime: Option<&str>,
single_allocation: bool,
group_by: GroupByStrategy,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
submit: bool,
output_dir: &PathBuf,
poll_interval: Option<i32>,
dry_run: bool,
include_job_ids: Option<&[i64]>,
format: &str,
) {
let torc_config = TorcConfig::load().unwrap_or_default();
let effective_poll_interval = poll_interval.unwrap_or(torc_config.client.slurm.poll_interval);
let registry = create_registry_with_config_public(&torc_config.client.hpc);
let profile = if let Some(n) = profile_name {
registry.get(n)
} else {
registry.detect()
};
let profile = match profile {
Some(p) => p,
None => {
if let Some(name) = profile_name {
eprintln!("Unknown HPC profile: {}", name);
} else {
eprintln!("No HPC profile specified and no system detected.");
eprintln!("Use --profile <name> or run on an HPC system.");
}
std::process::exit(1);
}
};
let pending_statuses = [
models::JobStatus::Uninitialized,
models::JobStatus::Ready,
models::JobStatus::Blocked,
];
let mut pending_jobs: Vec<models::JobModel> = Vec::new();
for status in &pending_statuses {
match paginate_jobs(
config,
workflow_id,
JobListParams::new()
.with_status(*status)
.with_include_relationships(true),
) {
Ok(jobs) => {
pending_jobs.extend(jobs);
}
Err(e) => {
print_error(&format!("listing {:?} jobs", status), &e);
std::process::exit(1);
}
}
}
if let Some(job_ids) = include_job_ids {
let existing_ids: std::collections::HashSet<i64> =
pending_jobs.iter().filter_map(|j| j.id).collect();
for &job_id in job_ids {
if !existing_ids.contains(&job_id) {
match apis::jobs_api::get_job(config, job_id) {
Ok(job) => {
pending_jobs.push(job);
}
Err(e) => {
debug!("Could not fetch job {}: {:?}", job_id, e);
}
}
}
}
}
if pending_jobs.is_empty() {
if format == "json" {
if dry_run {
print_json(
&RegenerateDryRunResult {
dry_run: true,
workflow_id,
pending_jobs: 0,
profile_name: profile.name.clone(),
profile_display_name: profile.display_name.clone(),
planned_schedulers: Vec::new(),
total_allocations: 0,
would_submit: submit,
warnings: vec!["No pending jobs found".to_string()],
},
"dry run result",
);
} else {
print_json(
&RegenerateResult {
workflow_id,
pending_jobs: 0,
schedulers_created: Vec::new(),
total_allocations: 0,
allocations_submitted: 0,
allocations_deferred: 0,
warnings: vec!["No pending jobs found".to_string()],
submitted: false,
},
"regenerate result",
);
}
} else {
println!(
"No pending jobs (uninitialized, ready, or blocked) found in workflow {}",
workflow_id
);
}
return;
}
let mut warnings: Vec<String> = Vec::new();
match utils::send_with_retries(
config,
|| apis::workflow_actions_api::get_workflow_actions(config, workflow_id),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(actions) => {
for action in actions {
if action.action_type == "schedule_nodes"
&& !action.is_recovery
&& !action.executed
&& let Some(action_id) = action.id
{
match utils::send_with_retries(
config,
|| {
apis::workflow_actions_api::claim_action(
config,
workflow_id,
action_id,
models::ClaimActionRequest {
compute_node_id: None,
},
)
},
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(_) => {
info!(
"Marked action {} ({} -> schedule_nodes) as executed for recovery",
action_id, action.trigger_type
);
}
Err(e) => {
if !format!("{:?}", e).contains("409") {
warnings.push(format!(
"Failed to mark action {} as executed: {:?}",
action_id, e
));
}
}
}
}
}
}
Err(e) => {
warnings.push(format!("Failed to fetch workflow actions: {:?}", e));
}
}
let resource_requirements = match paginate_resource_requirements(
config,
workflow_id,
ResourceRequirementsListParams::new(),
) {
Ok(rrs) => rrs,
Err(e) => {
print_error("listing resource requirements", &e);
std::process::exit(1);
}
};
let existing_schedulers =
match paginate_slurm_schedulers(config, workflow_id, SlurmSchedulersListParams::new()) {
Ok(schedulers) => schedulers,
Err(e) => {
print_error("listing existing schedulers", &e);
std::process::exit(1);
}
};
let account_to_use = account
.map(|s| s.to_string())
.or_else(|| existing_schedulers.first().map(|s| s.account.clone()))
.unwrap_or_else(|| {
eprintln!("No account specified and no existing schedulers found.");
eprintln!("Use --account <account> to specify a Slurm account.");
std::process::exit(1);
});
use crate::client::scheduler_plan::{SchedulerOverrides, generate_scheduler_plan};
let overrides = SchedulerOverrides {
partition: partition.map(|s| s.to_string()),
walltime_secs: walltime.map(|w| {
parse_walltime_secs(w).unwrap_or_else(|e| {
eprintln!("Error: invalid --walltime '{}': {}", w, e);
std::process::exit(1);
})
}),
};
let graph = match WorkflowGraph::from_jobs(&pending_jobs, &resource_requirements) {
Ok(g) => g,
Err(e) => {
eprintln!("Failed to build workflow graph: {}", e);
std::process::exit(1);
}
};
for job in &pending_jobs {
if job.resource_requirements_id.is_none() {
warnings.push(format!(
"Job '{}' (ID: {}) has no resource requirements, skipping",
job.name,
job.id.unwrap_or(-1)
));
}
}
let rr_name_to_model: HashMap<&str, &models::ResourceRequirementsModel> = resource_requirements
.iter()
.map(|rr| (rr.name.as_str(), rr))
.collect();
let timestamp = Utc::now().format("%Y%m%d_%H%M%S").to_string();
let plan = generate_scheduler_plan(
&graph,
&rr_name_to_model,
&profile,
&account_to_use,
single_allocation,
group_by,
walltime_strategy,
walltime_multiplier,
true, Some(&format!("regen_{}", timestamp)),
true, &overrides,
);
warnings.extend(plan.warnings.clone());
if plan.schedulers.is_empty() {
if format == "json" {
if dry_run {
print_json(
&RegenerateDryRunResult {
dry_run: true,
workflow_id,
pending_jobs: pending_jobs.len(),
profile_name: profile.name.clone(),
profile_display_name: profile.display_name.clone(),
planned_schedulers: Vec::new(),
total_allocations: 0,
would_submit: submit,
warnings: warnings.clone(),
},
"dry run result",
);
} else {
print_json(
&RegenerateResult {
workflow_id,
pending_jobs: pending_jobs.len(),
schedulers_created: Vec::new(),
total_allocations: 0,
allocations_submitted: 0,
allocations_deferred: 0,
warnings,
submitted: false,
},
"regenerate result",
);
}
} else {
println!("No pending jobs with resource requirements found");
for warning in &warnings {
println!(" Warning: {}", warning);
}
}
return;
}
if dry_run {
let planned_schedulers: Vec<PlannedSchedulerInfo> = plan
.schedulers
.iter()
.map(|p| PlannedSchedulerInfo {
name: p.name.clone(),
account: p.account.clone(),
partition: p.partition.clone(),
walltime: p.walltime.clone(),
mem: p.mem.clone(),
nodes: p.nodes,
num_allocations: p.num_allocations,
job_count: p.job_count,
job_names: p.job_names.clone(),
has_dependencies: p.has_dependencies,
})
.collect();
let total_allocations: i64 = plan.schedulers.iter().map(|p| p.num_allocations).sum();
let dry_run_result = RegenerateDryRunResult {
dry_run: true,
workflow_id,
pending_jobs: pending_jobs.len(),
profile_name: profile.name.clone(),
profile_display_name: profile.display_name.clone(),
planned_schedulers,
total_allocations,
would_submit: submit,
warnings: warnings.clone(),
};
if format == "json" {
print_json(&dry_run_result, "dry run result");
} else {
println!("[DRY RUN] Would create the following Slurm schedulers:");
println!();
for sched in &dry_run_result.planned_schedulers {
let deps = if sched.has_dependencies {
" (deferred - has dependencies)"
} else {
""
};
println!(
" {} - {} job(s), {} allocation(s){}",
sched.name, sched.job_count, sched.num_allocations, deps
);
println!(
" Account: {}, Partition: {}, Walltime: {}, Nodes: {}",
sched.account,
sched.partition.as_deref().unwrap_or("<default>"),
sched.walltime,
sched.nodes
);
if let Some(mem) = &sched.mem {
println!(" Memory: {}", mem);
}
}
println!();
println!("Total allocations: {}", dry_run_result.total_allocations);
if submit {
println!("[DRY RUN] Would submit allocations immediately");
}
println!("Profile: {} ({})", profile.display_name, profile.name);
if !warnings.is_empty() {
println!();
println!("Warnings:");
for warning in &warnings {
println!(" - {}", warning);
}
}
}
return;
}
let mut schedulers_created: Vec<SchedulerInfo> = Vec::new();
let mut total_allocations: i64 = 0;
let mut scheduler_name_to_id: HashMap<String, i64> = HashMap::new();
for planned in &plan.schedulers {
let scheduler = models::SlurmSchedulerModel {
id: None,
workflow_id,
name: Some(planned.name.clone()),
account: planned.account.clone(),
partition: planned.partition.clone(),
mem: planned.mem.clone(),
walltime: planned.walltime.clone(),
nodes: planned.nodes,
gres: planned.gres.clone(),
ntasks_per_node: None,
qos: planned.qos.clone(),
tmp: None,
extra: None,
};
let created_scheduler = match utils::send_with_retries(
config,
|| apis::slurm_schedulers_api::create_slurm_scheduler(config, scheduler.clone()),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(s) => s,
Err(e) => {
print_error("creating scheduler", &e);
std::process::exit(1);
}
};
let scheduler_id = created_scheduler.id.unwrap_or(-1);
scheduler_name_to_id.insert(planned.name.clone(), scheduler_id);
schedulers_created.push(SchedulerInfo {
id: scheduler_id,
name: planned.name.clone(),
account: planned.account.clone(),
partition: created_scheduler.partition.clone(),
walltime: created_scheduler.walltime.clone(),
nodes: planned.nodes,
num_allocations: planned.num_allocations,
job_count: planned.job_count,
has_dependencies: planned.has_dependencies,
});
total_allocations += planned.num_allocations;
for job_name in &planned.job_names {
if let Some(job) = pending_jobs.iter().find(|j| &j.name == job_name)
&& let Some(job_id) = job.id
{
let mut updated_job = job.clone();
updated_job.scheduler_id = Some(scheduler_id);
updated_job.status = None;
if let Err(e) = utils::send_with_retries(
config,
|| apis::jobs_api::update_job(config, job_id, updated_job.clone()),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
warnings.push(format!(
"Failed to update job {} with scheduler: {}",
job_id, e
));
}
}
}
}
for action in &plan.actions {
if !action.is_recovery {
continue; }
let scheduler_id = match scheduler_name_to_id.get(&action.scheduler_name) {
Some(id) => *id,
None => continue,
};
let job_ids: Vec<i64> = if let Some(ref names) = action.job_names {
pending_jobs
.iter()
.filter(|j| names.contains(&j.name))
.filter_map(|j| j.id)
.collect()
} else if let Some(ref patterns) = action.job_name_patterns {
pending_jobs
.iter()
.filter(|j| {
patterns.iter().any(|p| {
regex::Regex::new(p)
.map(|re| re.is_match(&j.name))
.unwrap_or(false)
})
})
.filter_map(|j| j.id)
.collect()
} else {
Vec::new()
};
if job_ids.is_empty() {
continue;
}
let action_config = serde_json::json!({
"scheduler_type": "slurm",
"scheduler_id": scheduler_id,
"num_allocations": action.num_allocations,
});
let action_body = models::WorkflowActionModel {
id: None,
workflow_id,
trigger_type: "on_jobs_ready".to_string(),
action_type: "schedule_nodes".to_string(),
action_config,
job_ids: Some(job_ids.clone()),
trigger_count: 0,
required_triggers: 1,
executed: false,
executed_at: None,
executed_by: None,
persistent: false,
is_recovery: true,
};
match utils::send_with_retries(
config,
|| {
apis::workflow_actions_api::create_workflow_action(
config,
workflow_id,
action_body.clone(),
)
},
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(created_action) => {
info!(
"Created recovery action {} for {} deferred jobs using scheduler {}",
created_action.id.unwrap_or(-1),
job_ids.len(),
scheduler_id
);
}
Err(e) => {
warnings.push(format!(
"Failed to create recovery action for scheduler {}: {:?}",
scheduler_id, e
));
}
}
}
let mut submitted = false;
let mut allocations_submitted: i64 = 0;
let mut allocations_deferred: i64 = 0;
if submit && !schedulers_created.is_empty() {
if let Err(e) = std::fs::create_dir_all(output_dir) {
eprintln!("Error creating output directory: {}", e);
std::process::exit(1);
}
for scheduler_info in &schedulers_created {
if scheduler_info.has_dependencies {
println!(
" Deferring scheduler '{}' ({} allocation(s)) - will submit via on_jobs_ready action",
scheduler_info.name, scheduler_info.num_allocations
);
allocations_deferred += scheduler_info.num_allocations;
continue;
}
match schedule_slurm_nodes(
config,
workflow_id,
scheduler_info.id,
scheduler_info.num_allocations as i32,
false, "",
output_dir.to_str().unwrap_or("torc_output"),
effective_poll_interval,
None, false, ) {
Ok(()) => {
println!(
" Submitted {} allocation(s) for scheduler '{}'",
scheduler_info.num_allocations, scheduler_info.name
);
allocations_submitted += scheduler_info.num_allocations;
}
Err(e) => {
eprintln!(
"Error submitting allocations for scheduler '{}': {}",
scheduler_info.name, e
);
std::process::exit(1);
}
}
}
submitted = true;
}
let result = RegenerateResult {
workflow_id,
pending_jobs: pending_jobs.len(),
schedulers_created,
total_allocations,
allocations_submitted,
allocations_deferred,
warnings,
submitted,
};
if format == "json" {
print_json(&result, "regenerate result");
} else {
println!("Regenerated Slurm schedulers for workflow {}", workflow_id);
println!();
println!("Summary:");
println!(" Pending jobs: {}", result.pending_jobs);
println!(" Schedulers created: {}", result.schedulers_created.len());
if result.submitted {
println!(
" Allocations submitted: {} (deferred: {})",
result.allocations_submitted, result.allocations_deferred
);
} else {
println!(" Total allocations: {}", result.total_allocations);
}
println!(
" Profile used: {} ({})",
profile.display_name, profile.name
);
if !result.schedulers_created.is_empty() {
println!();
println!("Schedulers:");
for sched in &result.schedulers_created {
let deferred_marker = if sched.has_dependencies {
" [deferred]"
} else {
""
};
println!(
" - {} (ID: {}): {} job(s), {} allocation(s) × {} node(s){}",
sched.name,
sched.id,
sched.job_count,
sched.num_allocations,
sched.nodes,
deferred_marker
);
}
}
if !result.warnings.is_empty() {
println!();
println!("Warnings:");
for warning in &result.warnings {
println!(" - {}", warning);
}
}
if result.submitted && result.allocations_submitted > 0 {
println!();
if result.allocations_deferred > 0 {
println!(
"Submitted {} allocation(s). {} deferred allocation(s) will be submitted when dependencies complete.",
result.allocations_submitted, result.allocations_deferred
);
} else {
println!("Allocations submitted successfully.");
}
} else if !result.schedulers_created.is_empty() {
println!();
println!("To submit the allocations, run:");
println!(" torc slurm regenerate {} --submit", workflow_id);
}
}
}
fn fmt_opt_bytes(v: Option<i64>) -> String {
match v {
Some(b) if b >= 0 => format_bytes(b as u64),
_ => "-".to_string(),
}
}
fn fmt_opt_f64(v: Option<f64>) -> String {
match v {
Some(f) => format!("{:.1}", f),
None => "-".to_string(),
}
}
fn handle_slurm_stats(
config: &Configuration,
workflow_id: i64,
job_id: Option<i64>,
run_id: Option<i64>,
attempt_id: Option<i64>,
format: &str,
) {
let mut all_items: Vec<models::SlurmStatsModel> = Vec::new();
let limit = crate::MAX_RECORD_TRANSFER_COUNT;
let mut offset = 0i64;
loop {
match apis::slurm_stats_api::list_slurm_stats(
config,
workflow_id,
job_id,
run_id,
attempt_id,
Some(offset),
Some(limit),
) {
Ok(response) => {
let items = response.items;
if items.is_empty() {
break;
}
let fetched = items.len() as i64;
all_items.extend(items);
if fetched < limit {
break;
}
offset += fetched;
}
Err(e) => {
print_error("listing slurm stats", &e);
std::process::exit(1);
}
}
}
if format == "json" {
print_json(&serde_json::json!({ "items": all_items }), "Slurm stats");
return;
}
if all_items.is_empty() {
println!("No Slurm stats found for workflow {}", workflow_id);
return;
}
let exec_time_map = build_exec_time_map(config, workflow_id);
let rows: Vec<SlurmStatsTableRow> = all_items
.iter()
.map(|s| {
let cpu_percent = compute_cpu_percent(s, &exec_time_map);
SlurmStatsTableRow {
job_id: s.job_id,
run_id: s.run_id,
attempt_id: s.attempt_id,
slurm_job_id: s.slurm_job_id.clone().unwrap_or_else(|| "-".to_string()),
max_rss: fmt_opt_bytes(s.max_rss_bytes),
max_vm: fmt_opt_bytes(s.max_vm_size_bytes),
ave_cpu_seconds: fmt_opt_f64(s.ave_cpu_seconds),
cpu_percent,
node_list: s.node_list.clone().unwrap_or_else(|| "-".to_string()),
}
})
.collect();
display_table_with_count(&rows, "slurm stats");
}
fn build_exec_time_map(config: &Configuration, workflow_id: i64) -> HashMap<(i64, i64, i64), f64> {
let params = ResultListParams::new();
let results = match paginate_results(config, workflow_id, params) {
Ok(r) => r,
Err(_) => return HashMap::new(),
};
let mut map = HashMap::new();
for r in results {
let attempt_id = r.attempt_id.unwrap_or(1);
map.insert((r.job_id, r.run_id, attempt_id), r.exec_time_minutes);
}
map
}
fn compute_cpu_percent(
stats: &models::SlurmStatsModel,
exec_time_map: &HashMap<(i64, i64, i64), f64>,
) -> String {
let ave_cpu_s = match stats.ave_cpu_seconds {
Some(s) if s > 0.0 => s,
_ => return "-".to_string(),
};
let exec_minutes = match exec_time_map.get(&(stats.job_id, stats.run_id, stats.attempt_id)) {
Some(&m) if m > 0.0 => m,
_ => return "-".to_string(),
};
let pct = ave_cpu_s / (exec_minutes * 60.0) * 100.0;
format!("{:.1}%", pct)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_walltime_secs_rejects_unit_suffixes() {
assert!(parse_walltime_secs("2h").is_err());
assert!(parse_walltime_secs("30m").is_err());
assert!(parse_walltime_secs("120s").is_err());
assert!(parse_walltime_secs("1h 30m").is_err());
assert!(parse_walltime_secs("abc").is_err());
}
#[test]
fn test_parse_walltime_secs_slurm() {
assert_eq!(parse_walltime_secs("04:30:00").unwrap(), 4 * 3600 + 30 * 60);
assert_eq!(parse_walltime_secs("1-00:00:00").unwrap(), 24 * 3600);
assert_eq!(parse_walltime_secs("30:00").unwrap(), 30 * 60);
}
}