use chrono::Utc;
use log::{debug, info, warn};
use serde::Serialize;
use crate::client::apis::configuration::Configuration;
use crate::client::apis::default_api;
use crate::client::commands::pagination::{
ComputeNodeListParams, JobListParams, ScheduledComputeNodeListParams, paginate_compute_nodes,
paginate_jobs, paginate_scheduled_compute_nodes,
};
use crate::client::hpc::common::HpcJobStatus;
use crate::client::hpc::hpc_interface::HpcInterface;
use crate::client::hpc::slurm_interface::SlurmInterface;
use crate::models;
pub const ORPHANED_JOB_RETURN_CODE: i64 = -128;
#[derive(Debug, Clone, Serialize)]
pub struct OrphanCleanupResult {
pub slurm_jobs_failed: usize,
pub pending_allocations_cleaned: usize,
pub running_jobs_failed: usize,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub failed_job_details: Vec<OrphanedJobDetail>,
}
#[derive(Debug, Clone, Serialize)]
pub struct OrphanedJobDetail {
pub job_id: i64,
pub job_name: String,
pub reason: String,
pub slurm_job_id: Option<String>,
}
impl OrphanCleanupResult {
pub fn any_cleaned(&self) -> bool {
self.slurm_jobs_failed > 0
|| self.pending_allocations_cleaned > 0
|| self.running_jobs_failed > 0
}
pub fn total_jobs_failed(&self) -> usize {
self.slurm_jobs_failed + self.running_jobs_failed
}
}
pub fn cleanup_orphaned_jobs(
config: &Configuration,
workflow_id: i64,
dry_run: bool,
) -> Result<OrphanCleanupResult, String> {
let mut result = OrphanCleanupResult {
slurm_jobs_failed: 0,
pending_allocations_cleaned: 0,
running_jobs_failed: 0,
failed_job_details: Vec::new(),
};
let (slurm_failed, slurm_details) = fail_orphaned_slurm_jobs(config, workflow_id, dry_run)?;
result.slurm_jobs_failed = slurm_failed;
result.failed_job_details.extend(slurm_details);
result.pending_allocations_cleaned =
cleanup_dead_pending_slurm_jobs(config, workflow_id, dry_run)?;
let (running_failed, running_details) =
fail_orphaned_running_jobs(config, workflow_id, dry_run)?;
result.running_jobs_failed = running_failed;
result.failed_job_details.extend(running_details);
Ok(result)
}
fn fail_orphaned_slurm_jobs(
config: &Configuration,
workflow_id: i64,
dry_run: bool,
) -> Result<(usize, Vec<OrphanedJobDetail>), String> {
let workflow_status = default_api::get_workflow_status(config, workflow_id)
.map_err(|e| format!("Failed to get workflow status: {}", e))?;
let run_id = workflow_status.run_id;
let scheduled_nodes = paginate_scheduled_compute_nodes(
config,
workflow_id,
ScheduledComputeNodeListParams::new().with_status("active".to_string()),
)
.map_err(|e| format!("Failed to list scheduled compute nodes: {}", e))?;
let slurm_nodes: Vec<_> = scheduled_nodes
.iter()
.filter(|node| node.scheduler_type.to_lowercase() == "slurm")
.collect();
if slurm_nodes.is_empty() {
return Ok((0, Vec::new()));
}
let slurm = match SlurmInterface::new() {
Ok(s) => s,
Err(e) => {
warn!("Could not create SlurmInterface: {}", e);
return Ok((0, Vec::new()));
}
};
let mut total_failed = 0;
let mut details = Vec::new();
for scheduled_node in slurm_nodes {
let slurm_job_id = scheduled_node.scheduler_id.to_string();
let scheduled_compute_node_id = match scheduled_node.id {
Some(id) => id,
None => continue,
};
let slurm_status = match slurm.get_status(&slurm_job_id) {
Ok(info) => info.status,
Err(e) => {
warn!(
"Error checking Slurm status for job {}: {}",
slurm_job_id, e
);
continue;
}
};
if slurm_status == HpcJobStatus::Running || slurm_status == HpcJobStatus::Queued {
continue;
}
info!(
"Slurm job {} is no longer running (status: {:?}), checking for orphaned jobs",
slurm_job_id, slurm_status
);
let compute_nodes = paginate_compute_nodes(
config,
workflow_id,
ComputeNodeListParams::new().with_scheduled_compute_node_id(scheduled_compute_node_id),
)
.map_err(|e| format!("Failed to list compute nodes: {}", e))?;
for compute_node in &compute_nodes {
let compute_node_id = match compute_node.id {
Some(id) => id,
None => continue,
};
let orphaned_jobs = paginate_jobs(
config,
workflow_id,
JobListParams::new().with_active_compute_node_id(compute_node_id),
)
.map_err(|e| format!("Failed to list jobs for compute node: {}", e))?;
if orphaned_jobs.is_empty() {
continue;
}
let action = if dry_run { "Would fail" } else { "Found" };
info!(
"{} {} orphaned job(s) from Slurm job {} (compute node {})",
action,
orphaned_jobs.len(),
slurm_job_id,
compute_node_id
);
for job in &orphaned_jobs {
let job_id = match job.id {
Some(id) => id,
None => continue,
};
let reason = format!("Slurm job {} no longer running", slurm_job_id);
details.push(OrphanedJobDetail {
job_id,
job_name: job.name.clone(),
reason: reason.clone(),
slurm_job_id: Some(slurm_job_id.clone()),
});
if dry_run {
info!(
" [DRY RUN] Would mark orphaned job {} ({}) as failed",
job_id, job.name
);
total_failed += 1;
continue;
}
let attempt_id = job.attempt_id.unwrap_or(1);
let result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
attempt_id,
compute_node_id,
ORPHANED_JOB_RETURN_CODE,
0.0,
Utc::now().to_rfc3339(),
models::JobStatus::Failed,
);
match default_api::complete_job(
config,
job_id,
models::JobStatus::Failed,
run_id,
result,
) {
Ok(_) => {
info!(
" Marked orphaned job {} ({}) as failed (Slurm job {} no longer running)",
job_id, job.name, slurm_job_id
);
total_failed += 1;
}
Err(e) => {
warn!(" Failed to mark job {} as failed: {}", job_id, e);
}
}
}
if !dry_run {
let mut updated_node = compute_node.clone();
updated_node.is_active = Some(false);
match default_api::update_compute_node(config, compute_node_id, updated_node) {
Ok(_) => {
debug!(
"Marked compute node {} as inactive (Slurm job {} no longer running)",
compute_node_id, slurm_job_id
);
}
Err(e) => {
warn!(
"Failed to mark compute node {} as inactive: {}",
compute_node_id, e
);
}
}
}
}
if !dry_run {
match default_api::update_scheduled_compute_node(
config,
scheduled_compute_node_id,
models::ScheduledComputeNodesModel::new(
workflow_id,
scheduled_node.scheduler_id,
scheduled_node.scheduler_config_id,
scheduled_node.scheduler_type.clone(),
"complete".to_string(),
),
) {
Ok(_) => {
info!(
"Updated scheduled compute node {} status to 'complete'",
scheduled_compute_node_id
);
}
Err(e) => {
warn!(
"Failed to update scheduled compute node {} status: {}",
scheduled_compute_node_id, e
);
}
}
}
}
if total_failed > 0 {
let action = if dry_run { "Would mark" } else { "Marked" };
info!(
"{} {} orphaned Slurm job(s) as failed (return code {})",
action, total_failed, ORPHANED_JOB_RETURN_CODE
);
}
Ok((total_failed, details))
}
fn cleanup_dead_pending_slurm_jobs(
config: &Configuration,
workflow_id: i64,
dry_run: bool,
) -> Result<usize, String> {
let scheduled_nodes = paginate_scheduled_compute_nodes(
config,
workflow_id,
ScheduledComputeNodeListParams::new().with_status("pending".to_string()),
)
.map_err(|e| format!("Failed to list pending scheduled compute nodes: {}", e))?;
let slurm_nodes: Vec<_> = scheduled_nodes
.iter()
.filter(|node| node.scheduler_type.to_lowercase() == "slurm")
.collect();
if slurm_nodes.is_empty() {
return Ok(0);
}
let slurm = match SlurmInterface::new() {
Ok(s) => s,
Err(e) => {
debug!(
"Could not create SlurmInterface for pending job check: {}",
e
);
return Ok(0);
}
};
let mut total_cleaned = 0;
for scheduled_node in slurm_nodes {
let slurm_job_id = scheduled_node.scheduler_id.to_string();
let scheduled_compute_node_id = match scheduled_node.id {
Some(id) => id,
None => continue,
};
let slurm_status = match slurm.get_status(&slurm_job_id) {
Ok(info) => info.status,
Err(e) => {
debug!(
"Error checking Slurm status for pending job {}: {}",
slurm_job_id, e
);
continue;
}
};
if slurm_status == HpcJobStatus::Queued || slurm_status == HpcJobStatus::Running {
continue;
}
if slurm_status == HpcJobStatus::Complete {
info!(
"Slurm job {} completed but was still pending in our system, marking as complete",
slurm_job_id
);
} else {
info!(
"Pending Slurm job {} no longer exists (status: {:?}), marking as complete",
slurm_job_id, slurm_status
);
}
if dry_run {
info!(
"[DRY RUN] Would mark pending scheduled compute node {} (Slurm job {}) as complete",
scheduled_compute_node_id, slurm_job_id
);
total_cleaned += 1;
continue;
}
match default_api::update_scheduled_compute_node(
config,
scheduled_compute_node_id,
models::ScheduledComputeNodesModel::new(
workflow_id,
scheduled_node.scheduler_id,
scheduled_node.scheduler_config_id,
scheduled_node.scheduler_type.clone(),
"complete".to_string(),
),
) {
Ok(_) => {
info!(
"Updated pending scheduled compute node {} (Slurm job {}) status to 'complete'",
scheduled_compute_node_id, slurm_job_id
);
total_cleaned += 1;
}
Err(e) => {
warn!(
"Failed to update scheduled compute node {} status: {}",
scheduled_compute_node_id, e
);
}
}
}
if total_cleaned > 0 {
let action = if dry_run {
"Would clean up"
} else {
"Cleaned up"
};
info!("{} {} dead pending Slurm job(s)", action, total_cleaned);
}
Ok(total_cleaned)
}
fn fail_orphaned_running_jobs(
config: &Configuration,
workflow_id: i64,
dry_run: bool,
) -> Result<(usize, Vec<OrphanedJobDetail>), String> {
let workflow_status = default_api::get_workflow_status(config, workflow_id)
.map_err(|e| format!("Failed to get workflow status: {}", e))?;
let run_id = workflow_status.run_id;
let active_nodes_response = default_api::list_compute_nodes(
config,
workflow_id,
None, Some(1), None, None, None, Some(true), None, )
.map_err(|e| format!("Failed to list active compute nodes: {}", e))?;
let active_node_count = active_nodes_response.total_count;
if active_node_count > 0 {
return Ok((0, Vec::new()));
}
let running_jobs = paginate_jobs(
config,
workflow_id,
JobListParams::new().with_status(models::JobStatus::Running),
)
.map_err(|e| format!("Failed to list running jobs: {}", e))?;
if running_jobs.is_empty() {
return Ok((0, Vec::new()));
}
let action = if dry_run { "Would fail" } else { "Detected" };
info!(
"{} {} orphaned running job(s) with no active compute nodes",
action,
running_jobs.len()
);
if dry_run {
let details: Vec<OrphanedJobDetail> = running_jobs
.iter()
.filter_map(|job| {
let job_id = job.id?;
info!(
" [DRY RUN] Would mark orphaned job {} ({}) as failed",
job_id, job.name
);
Some(OrphanedJobDetail {
job_id,
job_name: job.name.clone(),
reason: "No active compute nodes".to_string(),
slurm_job_id: None,
})
})
.collect();
return Ok((details.len(), details));
}
let compute_node_id = match default_api::list_compute_nodes(
config,
workflow_id,
None, Some(1), None, None, None, None, None, ) {
Ok(response) => {
if let Some(nodes) = response.items {
if let Some(node) = nodes.first() {
node.id.unwrap_or(0)
} else {
0
}
} else {
0
}
}
Err(_) => 0,
};
let compute_node_id = if compute_node_id == 0 {
match default_api::create_compute_node(
config,
models::ComputeNodeModel::new(
workflow_id,
"orphan-recovery".to_string(),
0, Utc::now().to_rfc3339(),
1, 1.0, 0, 1, "local".to_string(),
None, ),
) {
Ok(node) => node.id.unwrap_or(0),
Err(e) => {
warn!("Could not create recovery compute node: {}", e);
0
}
}
} else {
compute_node_id
};
let mut failed_count = 0;
let mut details = Vec::new();
for job in &running_jobs {
let job_id = match job.id {
Some(id) => id,
None => continue,
};
details.push(OrphanedJobDetail {
job_id,
job_name: job.name.clone(),
reason: "No active compute nodes".to_string(),
slurm_job_id: None,
});
let attempt_id = job.attempt_id.unwrap_or(1);
let result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
attempt_id,
compute_node_id,
ORPHANED_JOB_RETURN_CODE, 0.0, Utc::now().to_rfc3339(), models::JobStatus::Failed, );
match default_api::complete_job(config, job_id, models::JobStatus::Failed, run_id, result) {
Ok(_) => {
info!(
" Marked orphaned job {} ({}) as failed with return code {}",
job_id, job.name, ORPHANED_JOB_RETURN_CODE
);
failed_count += 1;
}
Err(e) => {
warn!(" Failed to mark job {} as failed: {}", job_id, e);
}
}
}
if failed_count > 0 {
info!(
"Marked {} orphaned job(s) as failed (return code {})",
failed_count, ORPHANED_JOB_RETURN_CODE
);
}
Ok((failed_count, details))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orphan_cleanup_result_any_cleaned() {
let empty = OrphanCleanupResult {
slurm_jobs_failed: 0,
pending_allocations_cleaned: 0,
running_jobs_failed: 0,
failed_job_details: Vec::new(),
};
assert!(!empty.any_cleaned());
let with_slurm = OrphanCleanupResult {
slurm_jobs_failed: 1,
pending_allocations_cleaned: 0,
running_jobs_failed: 0,
failed_job_details: Vec::new(),
};
assert!(with_slurm.any_cleaned());
let with_pending = OrphanCleanupResult {
slurm_jobs_failed: 0,
pending_allocations_cleaned: 1,
running_jobs_failed: 0,
failed_job_details: Vec::new(),
};
assert!(with_pending.any_cleaned());
let with_running = OrphanCleanupResult {
slurm_jobs_failed: 0,
pending_allocations_cleaned: 0,
running_jobs_failed: 1,
failed_job_details: Vec::new(),
};
assert!(with_running.any_cleaned());
}
#[test]
fn test_orphan_cleanup_result_total_jobs_failed() {
let result = OrphanCleanupResult {
slurm_jobs_failed: 3,
pending_allocations_cleaned: 2,
running_jobs_failed: 1,
failed_job_details: Vec::new(),
};
assert_eq!(result.total_jobs_failed(), 4);
}
}