use serial_test::serial;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
use torc::client::async_cli_command::AsyncCliCommand;
use torc::client::workflow_spec::{ExecutionMode, StdioMode};
use torc::models::{JobModel, JobStatus, ResourceRequirementsModel};
fn make_job(job_id: i64, command: &str) -> JobModel {
let mut job = JobModel::new(1, format!("test_job_{}", job_id), command.to_string());
job.id = Some(job_id);
job.status = Some(JobStatus::Ready);
job
}
fn make_rr(name: &str, num_cpus: i64, memory: &str, num_nodes: i64) -> ResourceRequirementsModel {
make_rr_with_gpus(name, num_cpus, memory, num_nodes, 0)
}
fn make_rr_with_gpus(
name: &str,
num_cpus: i64,
memory: &str,
num_nodes: i64,
num_gpus: i64,
) -> ResourceRequirementsModel {
ResourceRequirementsModel {
id: Some(1),
workflow_id: 1,
name: name.to_string(),
num_cpus,
num_gpus,
num_nodes,
memory: memory.to_string(),
runtime: "PT30M".to_string(),
}
}
fn fake_srun_path() -> PathBuf {
let current_dir = env::current_dir().expect("Failed to get current directory");
current_dir.join("tests/scripts/fake_srun_log_args.sh")
}
fn setup_srun_env(temp_dir: &TempDir) -> PathBuf {
let args_log = temp_dir.path().join("srun_args.log");
unsafe {
env::set_var("SLURM_JOB_ID", "99999");
env::set_var(
"TORC_FAKE_SRUN",
fake_srun_path().to_string_lossy().to_string(),
);
env::set_var("TORC_SRUN_ARGS_LOG", args_log.to_string_lossy().to_string());
}
args_log
}
fn cleanup_srun_env() {
unsafe {
env::remove_var("SLURM_JOB_ID");
env::remove_var("TORC_FAKE_SRUN");
env::remove_var("TORC_SRUN_ARGS_LOG");
}
}
#[allow(clippy::too_many_arguments)]
fn run_and_capture_srun_args(
temp_dir: &TempDir,
args_log: &PathBuf,
rr: Option<&ResourceRequirementsModel>,
limit_resources: bool,
execution_mode: ExecutionMode,
enable_cpu_bind: bool,
end_time: Option<chrono::DateTime<chrono::Utc>>,
srun_termination_signal: Option<&str>,
) -> Option<String> {
run_and_capture_srun_args_with_headroom(
temp_dir,
args_log,
rr,
limit_resources,
execution_mode,
enable_cpu_bind,
end_time,
srun_termination_signal,
60, )
}
#[allow(clippy::too_many_arguments)]
fn run_and_capture_srun_args_with_headroom(
temp_dir: &TempDir,
args_log: &PathBuf,
rr: Option<&ResourceRequirementsModel>,
limit_resources: bool,
execution_mode: ExecutionMode,
enable_cpu_bind: bool,
end_time: Option<chrono::DateTime<chrono::Utc>>,
srun_termination_signal: Option<&str>,
sigkill_headroom_seconds: i64,
) -> Option<String> {
let job = make_job(1, "echo hello");
let mut cmd = AsyncCliCommand::new(job);
let result = cmd.start(
temp_dir.path(),
1, 1, 1, None,
"http://localhost:8080/torc-service/v1",
rr,
None, limit_resources,
execution_mode,
enable_cpu_bind,
end_time,
srun_termination_signal,
sigkill_headroom_seconds,
None, &StdioMode::Separate,
);
assert!(
result.is_ok(),
"Failed to start command: {:?}",
result.err()
);
for _ in 0..100 {
let _ = cmd.check_status();
if !cmd.is_running {
break;
}
thread::sleep(Duration::from_millis(100));
}
if args_log.exists() {
Some(fs::read_to_string(args_log).expect("Failed to read srun args log"))
} else {
None
}
}
#[test]
#[serial(srun)]
fn test_srun_default_single_node() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 4, "8g", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None, None, )
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(args.contains("--jobid=99999"), "Missing --jobid: {}", args);
assert!(args.contains("--ntasks=1"), "Missing --ntasks=1: {}", args);
assert!(
args.contains("--cpu-bind=none"),
"Missing --cpu-bind=none: {}",
args
);
assert!(args.contains("--exact"), "Missing --exact: {}", args);
assert!(args.contains("--nodes=1"), "Missing --nodes=1: {}", args);
assert!(
args.contains("--cpus-per-task=4"),
"Missing --cpus-per-task=4: {}",
args
);
assert!(
args.contains("--mem=8192M"),
"Missing --mem=8192M: {}",
args
);
assert!(!args.contains("--signal"), "Unexpected --signal: {}", args);
assert!(!args.contains("--time="), "Unexpected --time: {}", args);
assert!(args.contains("bash -c"), "Missing 'bash -c': {}", args);
}
#[test]
#[serial(srun)]
fn test_srun_cpu_bind_enabled() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 2, "4g", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, true, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
!args.contains("--cpu-bind"),
"Unexpected --cpu-bind with enable_cpu_bind=true: {}",
args
);
assert!(args.contains("--exact"), "Missing --exact: {}", args);
assert!(
args.contains("--cpus-per-task=2"),
"Missing --cpus-per-task=2: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_termination_signal() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 2, "4g", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
Some("TERM@120"), )
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
args.contains("--signal=TERM@120"),
"Missing --signal=TERM@120: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_termination_signal_usr1() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 2, "4g", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true,
ExecutionMode::Slurm,
false,
None,
Some("USR1@60"),
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
args.contains("--signal=USR1@60"),
"Missing --signal=USR1@60: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_multi_node_step() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("mpi_compute", 8, "16g", 4);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
args.contains("--nodes=4"),
"Missing --nodes=4 for num_nodes=4: {}",
args
);
assert!(
args.contains("--cpus-per-task=8"),
"Missing --cpus-per-task=8: {}",
args
);
assert!(
args.contains("--mem=16384M"),
"Missing --mem=16384M: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_with_end_time() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 2, "4g", 1);
let end_time = chrono::Utc::now() + chrono::Duration::minutes(30);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true,
ExecutionMode::Slurm,
false,
Some(end_time),
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
args.contains("--time=30") || args.contains("--time=29") || args.contains("--time=28"),
"Missing --time=28-30 for 30-minute end_time: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_with_end_time_insufficient_time_rejected() {
let temp_dir = TempDir::new().unwrap();
let _args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 2, "4g", 1);
let end_time = chrono::Utc::now() + chrono::Duration::seconds(10);
let job = make_job(1, "echo hello");
let mut cmd = AsyncCliCommand::new(job);
let result = cmd.start(
temp_dir.path(),
1,
1,
1,
None,
"http://localhost:8080/torc-service/v1",
Some(&rr),
None, true,
ExecutionMode::Slurm,
false,
Some(end_time),
None,
60,
None,
&StdioMode::Separate,
);
cleanup_srun_env();
assert!(
result.is_err(),
"Should refuse to launch srun step when insufficient time remains"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Refusing to launch"),
"Error should mention refusing to launch: {}",
err_msg
);
}
#[test]
#[serial(srun)]
fn test_srun_use_srun_false() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 4, "8g", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Direct, false,
None,
None,
);
cleanup_srun_env();
assert!(
args.is_none(),
"srun should not have been invoked when use_srun=false, but got: {:?}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_default_resource_requirements_name() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("default", 4, "8g", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
!args.contains("--cpus-per-task"),
"Unexpected --cpus-per-task for 'default' RR name: {}",
args
);
assert!(
!args.contains("--mem="),
"Unexpected --mem for 'default' RR name: {}",
args
);
assert!(args.contains("--nodes=1"), "Missing --nodes=1: {}", args);
assert!(args.contains("--exact"), "Missing --exact: {}", args);
}
#[test]
#[serial(srun)]
fn test_srun_no_resource_requirements() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
None, true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
!args.contains("--nodes="),
"Unexpected --nodes without RR: {}",
args
);
assert!(
!args.contains("--cpus-per-task"),
"Unexpected --cpus-per-task without RR: {}",
args
);
assert!(
!args.contains("--mem="),
"Unexpected --mem without RR: {}",
args
);
assert!(args.contains("--jobid=99999"), "Missing --jobid: {}", args);
assert!(args.contains("--ntasks=1"), "Missing --ntasks=1: {}", args);
assert!(args.contains("--exact"), "Missing --exact: {}", args);
}
#[test]
#[serial(srun)]
fn test_srun_all_options_set() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("gpu_compute", 16, "32g", 2);
let end_time = chrono::Utc::now() + chrono::Duration::hours(2);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, true, Some(end_time), Some("USR1@60"), )
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(args.contains("--jobid=99999"), "Missing --jobid: {}", args);
assert!(args.contains("--ntasks=1"), "Missing --ntasks=1: {}", args);
assert!(args.contains("--exact"), "Missing --exact: {}", args);
assert!(args.contains("--nodes=2"), "Missing --nodes=2: {}", args);
assert!(
args.contains("--cpus-per-task=16"),
"Missing --cpus-per-task=16: {}",
args
);
assert!(
args.contains("--mem=32768M"),
"Missing --mem=32768M: {}",
args
);
assert!(
args.contains("--signal=USR1@60"),
"Missing --signal=USR1@60: {}",
args
);
assert!(
args.contains("--time="),
"Missing --time for end_time: {}",
args
);
assert!(
!args.contains("--cpu-bind"),
"Unexpected --cpu-bind with enable_cpu_bind=true: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_step_name_format() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("compute", 2, "4g", 1);
let job = make_job(42, "echo hello");
let mut cmd = AsyncCliCommand::new(job);
let result = cmd.start(
temp_dir.path(),
100, 3, 5, None,
"http://localhost:8080/torc-service/v1",
Some(&rr),
None, true,
ExecutionMode::Slurm,
false,
None,
None,
60, None, &StdioMode::Separate,
);
assert!(result.is_ok());
thread::sleep(Duration::from_millis(500));
let _ = cmd.check_status();
let args = fs::read_to_string(&args_log).expect("Failed to read args log");
cleanup_srun_env();
assert!(
args.contains("--job-name=wf100_j42_r3_a5"),
"Missing expected step name --job-name=wf100_j42_r3_a5: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_small_memory_rounds_up_to_1mb() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("tiny", 1, "100k", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
args.contains("--mem=1M"),
"Sub-MB memory should round up to --mem=1M: {}",
args
);
assert!(
args.contains("--cpus-per-task=1"),
"Missing --cpus-per-task=1: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_zero_memory_omits_mem() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr("zero_mem", 1, "0m", 1);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
!args.contains("--mem="),
"Should omit --mem for zero memory: {}",
args
);
assert!(
args.contains("--cpus-per-task=1"),
"Missing --cpus-per-task=1: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_with_gpus() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr_with_gpus("gpu_compute", 8, "32g", 1, 2);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
args.contains("--gpus=2"),
"Missing --gpus=2 for job with 2 GPUs: {}",
args
);
assert!(
args.contains("--cpus-per-task=8"),
"Missing --cpus-per-task=8: {}",
args
);
assert!(
args.contains("--mem=32768M"),
"Missing --mem=32768M: {}",
args
);
}
#[test]
#[serial(srun)]
fn test_srun_zero_gpus_omits_flag() {
let temp_dir = TempDir::new().unwrap();
let args_log = setup_srun_env(&temp_dir);
let rr = make_rr_with_gpus("cpu_only", 4, "8g", 1, 0);
let args = run_and_capture_srun_args(
&temp_dir,
&args_log,
Some(&rr),
true, ExecutionMode::Slurm, false, None,
None,
)
.expect("srun should have been invoked");
cleanup_srun_env();
assert!(
!args.contains("--gpus"),
"Unexpected --gpus flag for job with 0 GPUs: {}",
args
);
assert!(
args.contains("--cpus-per-task=4"),
"Missing --cpus-per-task=4: {}",
args
);
}