mod common;
use common::{
ServerProcess, create_test_compute_node, create_test_job, create_test_workflow, start_server,
};
use rstest::rstest;
use torc::client::default_api;
use torc::client::report_models::ResourceUtilizationReport;
use torc::client::resource_correction::{
ResourceCorrectionContext, ResourceCorrectionOptions, apply_resource_corrections,
};
use torc::client::workflow_manager::WorkflowManager;
use torc::config::TorcConfig;
use torc::models::{self, JobStatus};
fn create_and_initialize_workflow(config: &torc::client::Configuration, name: &str) -> (i64, i64) {
let workflow = create_test_workflow(config, name);
let workflow_id = workflow.id.unwrap();
let torc_config = TorcConfig::load().unwrap_or_default();
let manager = WorkflowManager::new(config.clone(), torc_config, workflow);
manager
.initialize(false)
.expect("Failed to initialize workflow");
let run_id = manager.get_run_id().expect("Failed to get run_id");
(workflow_id, run_id)
}
#[rstest]
fn test_correct_resources_memory_violation_dry_run(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_memory_violation");
let mut job = create_test_job(config, workflow_id, "memory_heavy_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "small".to_string());
rr.memory = "2g".to_string();
rr.runtime = "PT1H".to_string();
rr.num_cpus = 1;
let created_rr = default_api::create_resource_requirements(config, rr)
.expect("Failed to create resource requirement");
let rr_id = created_rr.id.unwrap();
job.resource_requirements_id = Some(rr_id);
default_api::update_job(config, job.id.unwrap(), job).expect("Failed to update job");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 1);
let job_id = jobs[0].id.unwrap();
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let mut job_result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
1,
compute_node_id,
137, 0.5, chrono::Utc::now().to_rfc3339(),
JobStatus::Failed,
);
job_result.peak_memory_bytes = Some(3_000_000_000);
default_api::complete_job(config, job_id, job_result.status, run_id, job_result)
.expect("Failed to complete job");
let violations = default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results");
let items = violations.items.expect("Should have items");
assert!(!items.is_empty(), "Should have results");
let result = &items[0];
assert_eq!(result.return_code, 137, "Should have OOM return code");
assert_eq!(
result.peak_memory_bytes,
Some(3_000_000_000),
"Should have peak memory recorded"
);
}
#[rstest]
fn test_correct_resources_cpu_violation_dry_run(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_cpu_violation");
let mut job = create_test_job(config, workflow_id, "cpu_heavy_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "medium".to_string());
rr.memory = "4g".to_string();
rr.runtime = "PT1H".to_string();
rr.num_cpus = 3;
let created_rr = default_api::create_resource_requirements(config, rr)
.expect("Failed to create resource requirement");
let rr_id = created_rr.id.unwrap();
job.resource_requirements_id = Some(rr_id);
default_api::update_job(config, job.id.unwrap(), job).expect("Failed to update job");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 1);
let job_id = jobs[0].id.unwrap();
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let mut job_result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
1,
compute_node_id,
0, 5.0,
chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
job_result.peak_cpu_percent = Some(502.0);
default_api::complete_job(config, job_id, job_result.status, run_id, job_result)
.expect("Failed to complete job");
let violations = default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results");
let items = violations.items.expect("Should have items");
assert!(!items.is_empty(), "Should have results");
let result = &items[0];
assert_eq!(result.return_code, 0, "Should have success return code");
assert_eq!(
result.peak_cpu_percent,
Some(502.0),
"Should have peak CPU recorded"
);
}
#[rstest]
fn test_correct_resources_runtime_violation_dry_run(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_runtime_violation");
let mut job = create_test_job(config, workflow_id, "slow_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "fast".to_string());
rr.memory = "2g".to_string();
rr.runtime = "PT30M".to_string(); rr.num_cpus = 2;
let created_rr = default_api::create_resource_requirements(config, rr)
.expect("Failed to create resource requirement");
let rr_id = created_rr.id.unwrap();
job.resource_requirements_id = Some(rr_id);
default_api::update_job(config, job.id.unwrap(), job).expect("Failed to update job");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 1);
let job_id = jobs[0].id.unwrap();
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let job_result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
1,
compute_node_id,
0, 45.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
default_api::complete_job(config, job_id, job_result.status, run_id, job_result)
.expect("Failed to complete job");
let violations = default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results");
let items = violations.items.expect("Should have items");
assert!(!items.is_empty(), "Should have results");
let result = &items[0];
assert_eq!(result.return_code, 0, "Should have success return code");
assert_eq!(
result.exec_time_minutes, 45.0,
"Should have execution time recorded"
);
}
#[rstest]
fn test_correct_resources_multiple_violations(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_multiple_violations");
let job1 = create_test_job(config, workflow_id, "memory_job");
let job2 = create_test_job(config, workflow_id, "cpu_job");
let job3 = create_test_job(config, workflow_id, "runtime_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr1 = models::ResourceRequirementsModel::new(workflow_id, "rr1".to_string());
rr1.memory = "2g".to_string();
rr1.runtime = "PT1H".to_string();
rr1.num_cpus = 1;
let mut rr2 = models::ResourceRequirementsModel::new(workflow_id, "rr2".to_string());
rr2.memory = "4g".to_string();
rr2.runtime = "PT1H".to_string();
rr2.num_cpus = 2;
let mut rr3 = models::ResourceRequirementsModel::new(workflow_id, "rr3".to_string());
rr3.memory = "2g".to_string();
rr3.runtime = "PT30M".to_string();
rr3.num_cpus = 1;
let created_rr1 =
default_api::create_resource_requirements(config, rr1).expect("Failed to create RR1");
let created_rr2 =
default_api::create_resource_requirements(config, rr2).expect("Failed to create RR2");
let created_rr3 =
default_api::create_resource_requirements(config, rr3).expect("Failed to create RR3");
let mut job1_updated = job1;
job1_updated.resource_requirements_id = Some(created_rr1.id.unwrap());
default_api::update_job(config, job1_updated.id.unwrap(), job1_updated)
.expect("Failed to update job1");
let mut job2_updated = job2;
job2_updated.resource_requirements_id = Some(created_rr2.id.unwrap());
default_api::update_job(config, job2_updated.id.unwrap(), job2_updated)
.expect("Failed to update job2");
let mut job3_updated = job3;
job3_updated.resource_requirements_id = Some(created_rr3.id.unwrap());
default_api::update_job(config, job3_updated.id.unwrap(), job3_updated)
.expect("Failed to update job3");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 3, "Should have 3 jobs");
let job1_id = jobs[0].id.unwrap();
let job2_id = jobs[1].id.unwrap();
let job3_id = jobs[2].id.unwrap();
for job_id in [job1_id, job2_id, job3_id] {
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
}
let mut result1 = models::ResultModel::new(
job1_id,
workflow_id,
run_id,
1,
compute_node_id,
137, 0.5,
chrono::Utc::now().to_rfc3339(),
JobStatus::Failed,
);
result1.peak_memory_bytes = Some(3_000_000_000); default_api::complete_job(config, job1_id, result1.status, run_id, result1)
.expect("Failed to complete job1");
let mut result2 = models::ResultModel::new(
job2_id,
workflow_id,
run_id,
1,
compute_node_id,
0, 5.0,
chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
result2.peak_cpu_percent = Some(250.0); default_api::complete_job(config, job2_id, result2.status, run_id, result2)
.expect("Failed to complete job2");
let result3 = models::ResultModel::new(
job3_id,
workflow_id,
run_id,
1,
compute_node_id,
0, 45.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
default_api::complete_job(config, job3_id, result3.status, run_id, result3)
.expect("Failed to complete job3");
let results = default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results");
let items = results.items.expect("Should have items");
assert_eq!(items.len(), 3, "Should have 3 results");
let result1 = items
.iter()
.find(|r| r.job_id == job1_id)
.expect("Should find job1 result");
assert_eq!(
result1.return_code, 137,
"Job 1 should have OOM return code"
);
assert_eq!(
result1.peak_memory_bytes,
Some(3_000_000_000),
"Job 1 should have memory violation"
);
let result2 = items
.iter()
.find(|r| r.job_id == job2_id)
.expect("Should find job2 result");
assert_eq!(
result2.return_code, 0,
"Job 2 should have success return code"
);
assert_eq!(
result2.peak_cpu_percent,
Some(250.0),
"Job 2 should have CPU violation"
);
let result3 = items
.iter()
.find(|r| r.job_id == job3_id)
.expect("Should find job3 result");
assert_eq!(
result3.return_code, 0,
"Job 3 should have success return code"
);
assert_eq!(
result3.exec_time_minutes, 45.0,
"Job 3 should have runtime violation"
);
}
#[rstest]
fn test_correct_resources_applies_corrections(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_corrections_applied");
let mut job = create_test_job(config, workflow_id, "heavy_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "initial".to_string());
rr.memory = "2g".to_string();
rr.runtime = "PT30M".to_string();
rr.num_cpus = 1;
let created_rr = default_api::create_resource_requirements(config, rr)
.expect("Failed to create resource requirement");
let rr_id = created_rr.id.unwrap();
job.resource_requirements_id = Some(rr_id);
default_api::update_job(config, job.id.unwrap(), job).expect("Failed to update job");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 1);
let job_id = jobs[0].id.unwrap();
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let mut job_result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
1,
compute_node_id,
137, 45.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Failed,
);
job_result.peak_memory_bytes = Some(3_500_000_000); job_result.peak_cpu_percent = Some(150.0);
default_api::complete_job(config, job_id, job_result.status, run_id, job_result)
.expect("Failed to complete job");
let rr_before = default_api::get_resource_requirements(config, rr_id)
.expect("Failed to get resource requirement");
assert_eq!(rr_before.memory, "2g");
assert_eq!(rr_before.num_cpus, 1);
assert_eq!(rr_before.runtime, "PT30M");
default_api::update_resource_requirements(config, rr_id, rr_before.clone())
.expect("Failed to update RR before applying corrections");
let mut rr_corrected = rr_before.clone();
rr_corrected.memory = "4g".to_string(); rr_corrected.num_cpus = 2; rr_corrected.runtime = "PT54M".to_string();
default_api::update_resource_requirements(config, rr_id, rr_corrected)
.expect("Failed to update resource requirement with corrections");
let rr_after = default_api::get_resource_requirements(config, rr_id)
.expect("Failed to get corrected resource requirement");
assert_eq!(rr_after.memory, "4g", "Memory should be corrected to 4g");
assert_eq!(rr_after.num_cpus, 2, "CPU count should be corrected to 2");
assert_eq!(
rr_after.runtime, "PT54M",
"Runtime should be corrected to PT54M"
);
}
#[rstest]
fn test_correct_resources_dry_run_mode(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_dry_run_mode");
let mut job = create_test_job(config, workflow_id, "test_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "test_rr".to_string());
rr.memory = "1g".to_string();
rr.runtime = "PT10M".to_string();
rr.num_cpus = 1;
let created_rr = default_api::create_resource_requirements(config, rr)
.expect("Failed to create resource requirement");
let rr_id = created_rr.id.unwrap();
job.resource_requirements_id = Some(rr_id);
default_api::update_job(config, job.id.unwrap(), job).expect("Failed to update job");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 1);
let job_id = jobs[0].id.unwrap();
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let mut job_result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
1,
compute_node_id,
137, 15.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Failed,
);
job_result.peak_memory_bytes = Some(2_000_000_000); job_result.peak_cpu_percent = Some(120.0);
default_api::complete_job(config, job_id, job_result.status, run_id, job_result)
.expect("Failed to complete job");
let violations = default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results");
let items = violations.items.expect("Should have items");
assert_eq!(items.len(), 1, "Should have 1 result with violations");
let result = &items[0];
assert_eq!(result.return_code, 137, "Should have OOM return code");
assert_eq!(
result.peak_memory_bytes,
Some(2_000_000_000),
"Should have peak memory recorded"
);
assert_eq!(
result.peak_cpu_percent,
Some(120.0),
"Should have peak CPU recorded"
);
assert_eq!(
result.exec_time_minutes, 15.0,
"Should have execution time recorded"
);
}
#[rstest]
fn test_correct_resources_memory_violation_successful_job(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_memory_successful");
let mut job = create_test_job(config, workflow_id, "successful_memory_job");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "memory_rr".to_string());
rr.memory = "2g".to_string();
rr.runtime = "PT1H".to_string();
rr.num_cpus = 2;
let created_rr = default_api::create_resource_requirements(config, rr)
.expect("Failed to create resource requirement");
let rr_id = created_rr.id.unwrap();
job.resource_requirements_id = Some(rr_id);
default_api::update_job(config, job.id.unwrap(), job).expect("Failed to update job");
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
let resources = models::ComputeNodesResources::new(36, 100.0, 0, 1);
let result =
default_api::claim_jobs_based_on_resources(config, workflow_id, &resources, 10, None, None)
.expect("Failed to claim jobs");
let jobs = result.jobs.expect("Should return jobs");
assert_eq!(jobs.len(), 1);
let job_id = jobs[0].id.unwrap();
default_api::manage_status_change(config, job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let mut job_result = models::ResultModel::new(
job_id,
workflow_id,
run_id,
1,
compute_node_id,
0, 10.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
job_result.peak_memory_bytes = Some(3_200_000_000);
default_api::complete_job(config, job_id, job_result.status, run_id, job_result)
.expect("Failed to complete job");
let results = default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results");
let items = results.items.expect("Should have items");
assert_eq!(items.len(), 1, "Should have 1 result");
let result = &items[0];
assert_eq!(result.return_code, 0, "Should have success return code");
assert_eq!(
result.peak_memory_bytes,
Some(3_200_000_000),
"Should have peak memory recorded"
);
}
fn empty_diagnosis(workflow_id: i64, total_results: usize) -> ResourceUtilizationReport {
ResourceUtilizationReport {
workflow_id,
run_id: None,
total_results,
over_utilization_count: 0,
violations: Vec::new(),
resource_violations_count: 0,
resource_violations: Vec::new(),
}
}
fn fetch_results(
config: &torc::client::Configuration,
workflow_id: i64,
) -> Vec<models::ResultModel> {
default_api::list_results(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list results")
.items
.unwrap_or_default()
}
fn fetch_jobs(config: &torc::client::Configuration, workflow_id: i64) -> Vec<models::JobModel> {
default_api::list_jobs(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list jobs")
.items
.unwrap_or_default()
}
fn fetch_resource_requirements(
config: &torc::client::Configuration,
workflow_id: i64,
) -> Vec<models::ResourceRequirementsModel> {
default_api::list_resource_requirements(
config,
workflow_id,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("Failed to list resource requirements")
.items
.unwrap_or_default()
}
fn create_job_with_rr(
config: &torc::client::Configuration,
workflow_id: i64,
job_name: &str,
rr_id: i64,
) -> i64 {
let mut job = create_test_job(config, workflow_id, job_name);
job.resource_requirements_id = Some(rr_id);
let job_id = job.id.unwrap();
default_api::update_job(config, job_id, job).expect("Failed to update job");
job_id
}
#[allow(clippy::type_complexity)]
fn claim_and_complete_jobs(
config: &torc::client::Configuration,
workflow_id: i64,
run_id: i64,
compute_node_id: i64,
job_metrics: &[(i64, i64, f64, JobStatus, Option<i64>, Option<f64>)],
) {
let resources = models::ComputeNodesResources::new(128, 1024.0, 0, 1);
let claim_result = default_api::claim_jobs_based_on_resources(
config,
workflow_id,
&resources,
100,
None,
None,
)
.expect("Failed to claim jobs");
let claimed = claim_result.jobs.expect("Should return jobs");
for (job_id, return_code, exec_time, status, peak_mem, peak_cpu) in job_metrics {
assert!(
claimed.iter().any(|j| j.id == Some(*job_id)),
"Job {} should have been claimed",
job_id
);
default_api::manage_status_change(config, *job_id, JobStatus::Running, run_id, None)
.expect("Failed to set job running");
let mut result = models::ResultModel::new(
*job_id,
workflow_id,
run_id,
1,
compute_node_id,
*return_code,
*exec_time,
chrono::Utc::now().to_rfc3339(),
*status,
);
result.peak_memory_bytes = *peak_mem;
result.peak_cpu_percent = *peak_cpu;
default_api::complete_job(config, *job_id, result.status, run_id, result)
.expect("Failed to complete job");
}
}
#[rstest]
fn test_downsize_memory(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_downsize_memory");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "big_mem".to_string());
rr.memory = "8g".to_string();
rr.runtime = "PT1H".to_string();
rr.num_cpus = 2;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job_a_id = create_job_with_rr(config, workflow_id, "job_a", rr_id);
let job_b_id = create_job_with_rr(config, workflow_id, "job_b", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[
(
job_a_id,
0,
10.0,
JobStatus::Completed,
Some(2 * 1024 * 1024 * 1024),
Some(150.0),
),
(
job_b_id,
0,
10.0,
JobStatus::Completed,
Some(2 * 1024 * 1024 * 1024),
Some(150.0),
),
],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: false,
no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert_eq!(result.downsize_memory_corrections, 2);
assert!(result.resource_requirements_updated > 0);
let rr_after = default_api::get_resource_requirements(config, rr_id)
.expect("Failed to get RR after downsize");
assert_eq!(rr_after.memory, "3g", "Memory should be downsized to 3g");
}
#[rstest]
fn test_downsize_cpu(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_downsize_cpu");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "big_cpu".to_string());
rr.memory = "2g".to_string();
rr.runtime = "PT1H".to_string();
rr.num_cpus = 8;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job_id = create_job_with_rr(config, workflow_id, "low_cpu_job", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[(
job_id,
0,
10.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(150.0),
)],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: false,
no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert_eq!(result.downsize_cpu_corrections, 1);
let rr_after = default_api::get_resource_requirements(config, rr_id)
.expect("Failed to get RR after downsize");
assert_eq!(rr_after.num_cpus, 2, "CPUs should be downsized to 2");
}
#[rstest]
fn test_downsize_runtime(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_downsize_runtime");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "long_rt".to_string());
rr.memory = "2g".to_string();
rr.runtime = "PT2H".to_string(); rr.num_cpus = 1;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job1_id = create_job_with_rr(config, workflow_id, "fast1", rr_id);
let job2_id = create_job_with_rr(config, workflow_id, "fast2", rr_id);
let job3_id = create_job_with_rr(config, workflow_id, "fast3", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[
(
job1_id,
0,
10.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(80.0),
),
(
job2_id,
0,
10.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(80.0),
),
(
job3_id,
0,
10.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(80.0),
),
],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: false,
no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert_eq!(result.downsize_runtime_corrections, 3);
let rr_after = default_api::get_resource_requirements(config, rr_id)
.expect("Failed to get RR after downsize");
assert_eq!(
rr_after.runtime, "PT12M",
"Runtime should be downsized to PT12M"
);
}
#[rstest]
fn test_no_downsize_below_threshold(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) =
create_and_initialize_workflow(config, "test_no_downsize_threshold");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "tight".to_string());
rr.memory = "3g".to_string();
rr.runtime = "PT40M".to_string(); rr.num_cpus = 2;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job_id = create_job_with_rr(config, workflow_id, "tight_job", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[(
job_id,
0,
15.0,
JobStatus::Completed,
Some(2_500_000_000),
Some(198.0),
)],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: false,
no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert_eq!(result.downsize_memory_corrections, 0);
assert_eq!(result.downsize_cpu_corrections, 0);
assert_eq!(result.downsize_runtime_corrections, 0);
assert_eq!(result.resource_requirements_updated, 0);
let rr_after = default_api::get_resource_requirements(config, rr_id).expect("Failed to get RR");
assert_eq!(rr_after.memory, "3g");
assert_eq!(rr_after.num_cpus, 2);
assert_eq!(rr_after.runtime, "PT40M");
}
#[rstest]
fn test_no_downsize_flag(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_no_downsize_flag");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "wasteful".to_string());
rr.memory = "32g".to_string();
rr.runtime = "PT8H".to_string();
rr.num_cpus = 16;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job_id = create_job_with_rr(config, workflow_id, "tiny_job", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[(
job_id,
0,
1.0,
JobStatus::Completed,
Some(512 * 1024 * 1024),
Some(50.0),
)],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: false,
no_downsize: true, };
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert_eq!(result.downsize_memory_corrections, 0);
assert_eq!(result.downsize_cpu_corrections, 0);
assert_eq!(result.downsize_runtime_corrections, 0);
assert_eq!(result.resource_requirements_updated, 0);
let rr_after = default_api::get_resource_requirements(config, rr_id).expect("Failed to get RR");
assert_eq!(rr_after.memory, "32g");
assert_eq!(rr_after.num_cpus, 16);
assert_eq!(rr_after.runtime, "PT8H");
}
#[rstest]
fn test_no_downsize_missing_peak_data(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_no_downsize_missing");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "partial".to_string());
rr.memory = "16g".to_string();
rr.runtime = "PT2H".to_string();
rr.num_cpus = 8;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job1_id = create_job_with_rr(config, workflow_id, "has_data", rr_id);
let job2_id = create_job_with_rr(config, workflow_id, "no_data", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[
(
job1_id,
0,
5.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(100.0),
),
(job2_id, 0, 5.0, JobStatus::Completed, None, None),
],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: false,
no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert_eq!(result.downsize_memory_corrections, 0);
assert_eq!(result.downsize_cpu_corrections, 0);
assert_eq!(result.downsize_runtime_corrections, 2);
let rr_after = default_api::get_resource_requirements(config, rr_id).expect("Failed to get RR");
assert_eq!(rr_after.memory, "16g", "Memory unchanged — missing data");
assert_eq!(rr_after.num_cpus, 8, "CPUs unchanged — missing data");
assert_eq!(rr_after.runtime, "PT6M", "Runtime downsized to PT6M");
}
#[rstest]
fn test_downsize_dry_run(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_downsize_dry_run");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "big".to_string());
rr.memory = "16g".to_string();
rr.runtime = "PT4H".to_string();
rr.num_cpus = 8;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job_id = create_job_with_rr(config, workflow_id, "small_job", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[(
job_id,
0,
5.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(100.0),
)],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: true, no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert!(result.downsize_memory_corrections > 0);
assert!(result.downsize_cpu_corrections > 0);
assert!(result.downsize_runtime_corrections > 0);
let rr_after = default_api::get_resource_requirements(config, rr_id).expect("Failed to get RR");
assert_eq!(rr_after.memory, "16g", "Memory unchanged in dry-run");
assert_eq!(rr_after.num_cpus, 8, "CPUs unchanged in dry-run");
assert_eq!(rr_after.runtime, "PT4H", "Runtime unchanged in dry-run");
}
#[rstest]
fn test_downsize_adjustment_report_direction(start_server: &ServerProcess) {
let config = &start_server.config;
let (workflow_id, run_id) = create_and_initialize_workflow(config, "test_downsize_direction");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let mut rr = models::ResourceRequirementsModel::new(workflow_id, "report".to_string());
rr.memory = "16g".to_string();
rr.runtime = "PT4H".to_string();
rr.num_cpus = 8;
let created_rr =
default_api::create_resource_requirements(config, rr).expect("Failed to create RR");
let rr_id = created_rr.id.unwrap();
let job_id = create_job_with_rr(config, workflow_id, "tiny", rr_id);
default_api::initialize_jobs(config, workflow_id, None, None, None)
.expect("Failed to reinitialize");
claim_and_complete_jobs(
config,
workflow_id,
run_id,
compute_node_id,
&[(
job_id,
0,
5.0,
JobStatus::Completed,
Some(1024 * 1024 * 1024),
Some(100.0),
)],
);
let all_results = fetch_results(config, workflow_id);
let all_jobs = fetch_jobs(config, workflow_id);
let all_rrs = fetch_resource_requirements(config, workflow_id);
let diagnosis = empty_diagnosis(workflow_id, all_results.len());
let ctx = ResourceCorrectionContext {
config,
workflow_id,
diagnosis: &diagnosis,
all_results: &all_results,
all_jobs: &all_jobs,
all_resource_requirements: &all_rrs,
};
let opts = ResourceCorrectionOptions {
memory_multiplier: 1.2,
cpu_multiplier: 1.2,
runtime_multiplier: 1.2,
include_jobs: vec![],
dry_run: true, no_downsize: false,
};
let result = apply_resource_corrections(&ctx, &opts).expect("Failed to apply corrections");
assert!(!result.adjustments.is_empty(), "Should have adjustments");
let adj = &result.adjustments[0];
assert_eq!(adj.direction, "downscale", "Direction should be downscale");
assert!(adj.memory_adjusted);
assert!(adj.cpu_adjusted);
assert!(adj.runtime_adjusted);
}