use std::collections::{HashMap, HashSet};
use log::{debug, info, warn};
use serde::Serialize;
use crate::client::apis::configuration::Configuration;
use crate::client::apis::default_api;
use crate::client::report_models::ResourceUtilizationReport;
use crate::memory_utils::memory_string_to_bytes;
use crate::models;
use crate::time_utils::duration_string_to_seconds;
pub struct ResourceCorrectionContext<'a> {
pub config: &'a Configuration,
pub workflow_id: i64,
pub diagnosis: &'a ResourceUtilizationReport,
pub all_results: &'a [models::ResultModel],
pub all_jobs: &'a [models::JobModel],
pub all_resource_requirements: &'a [models::ResourceRequirementsModel],
}
pub struct ResourceCorrectionOptions {
pub memory_multiplier: f64,
pub cpu_multiplier: f64,
pub runtime_multiplier: f64,
pub include_jobs: Vec<i64>,
pub dry_run: bool,
pub no_downsize: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct ResourceCorrectionResult {
pub resource_requirements_updated: usize,
pub jobs_analyzed: usize,
pub memory_corrections: usize,
pub runtime_corrections: usize,
pub cpu_corrections: usize,
pub downsize_memory_corrections: usize,
pub downsize_runtime_corrections: usize,
pub downsize_cpu_corrections: usize,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub adjustments: Vec<ResourceAdjustmentReport>,
}
pub(crate) struct ResourceLookupContext<'a> {
jobs: &'a [models::JobModel],
resource_requirements: &'a [models::ResourceRequirementsModel],
}
impl<'a> ResourceLookupContext<'a> {
pub(crate) fn new(
jobs: &'a [models::JobModel],
resource_requirements: &'a [models::ResourceRequirementsModel],
) -> Self {
Self {
jobs,
resource_requirements,
}
}
pub(crate) fn find_job(&self, job_id: i64) -> Option<&models::JobModel> {
self.jobs.iter().find(|j| j.id == Some(job_id))
}
pub(crate) fn find_resource_requirements(
&self,
rr_id: i64,
) -> Option<&models::ResourceRequirementsModel> {
self.resource_requirements
.iter()
.find(|r| r.id == Some(rr_id))
}
}
pub(crate) fn detect_memory_violation(
ctx: &ResourceLookupContext,
result: &models::ResultModel,
job: &models::JobModel,
) -> bool {
if let Some(peak_mem) = result.peak_memory_bytes
&& let Some(rr_id) = job.resource_requirements_id
&& let Some(rr) = ctx.find_resource_requirements(rr_id)
&& let Ok(specified_memory_bytes) = memory_string_to_bytes(&rr.memory)
{
peak_mem > specified_memory_bytes
} else {
false
}
}
pub(crate) fn detect_cpu_violation(
ctx: &ResourceLookupContext,
result: &models::ResultModel,
job: &models::JobModel,
) -> bool {
if let Some(peak_cpu) = result.peak_cpu_percent
&& let Some(rr_id) = job.resource_requirements_id
&& let Some(rr) = ctx.find_resource_requirements(rr_id)
{
let configured_cpus = rr.num_cpus as f64;
let specified_cpu_percent = configured_cpus * 100.0;
peak_cpu > specified_cpu_percent
} else {
false
}
}
pub(crate) fn detect_runtime_violation(
ctx: &ResourceLookupContext,
result: &models::ResultModel,
job: &models::JobModel,
) -> bool {
if let Some(rr_id) = job.resource_requirements_id
&& let Some(rr) = ctx.find_resource_requirements(rr_id)
&& let Ok(specified_runtime_seconds) = duration_string_to_seconds(&rr.runtime)
{
let exec_time_seconds = result.exec_time_minutes * 60.0;
let specified_runtime_seconds = specified_runtime_seconds as f64;
exec_time_seconds > specified_runtime_seconds
} else {
false
}
}
pub(crate) fn detect_timeout(result: &models::ResultModel) -> bool {
result.return_code == 152
}
#[derive(Debug, Clone)]
struct DownsizeCandidate {
rr_id: i64,
job_count: usize,
job_ids: Vec<i64>,
job_names: Vec<String>,
max_peak_memory_bytes: Option<u64>,
max_peak_cpu_percent: Option<f64>,
max_peak_runtime_minutes: Option<f64>,
all_have_memory_data: bool,
all_have_cpu_data: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct ResourceAdjustmentReport {
pub resource_requirements_id: i64,
pub direction: String,
pub job_ids: Vec<i64>,
pub job_names: Vec<String>,
pub memory_adjusted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub original_memory: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_memory: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_peak_memory_bytes: Option<u64>,
pub runtime_adjusted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub original_runtime: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_runtime: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub cpu_adjusted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub original_cpus: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_cpus: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_peak_cpu_percent: Option<f64>,
}
fn is_false(b: &bool) -> bool {
!*b
}
#[derive(Debug)]
struct ResourceAdjustment {
rr_id: i64,
job_ids: Vec<i64>,
job_names: Vec<String>,
max_peak_memory_bytes: Option<u64>,
has_memory_violation_without_peak: bool,
has_timeout: bool,
current_memory: String,
current_runtime: String,
max_peak_cpu_percent: Option<f64>,
has_cpu_violation: bool,
current_cpus: i64,
max_peak_runtime_minutes: Option<f64>,
has_runtime_violation: bool,
}
pub fn format_memory_bytes_short(bytes: u64) -> String {
const GB: u64 = 1024 * 1024 * 1024;
const MB: u64 = 1024 * 1024;
const KB: u64 = 1024;
if bytes >= GB {
format!("{}g", bytes.div_ceil(GB))
} else if bytes >= MB {
format!("{}m", bytes.div_ceil(MB))
} else if bytes >= KB {
format!("{}k", bytes.div_ceil(KB))
} else {
format!("{}b", bytes)
}
}
pub fn format_duration_iso8601(secs: u64) -> String {
let hours = secs / 3600;
let mins = (secs % 3600) / 60;
if hours > 0 && mins > 0 {
format!("PT{}H{}M", hours, mins)
} else if hours > 0 {
format!("PT{}H", hours)
} else {
format!("PT{}M", mins.max(1))
}
}
fn build_downsize_candidates(
ctx: &ResourceCorrectionContext,
opts: &ResourceCorrectionOptions,
violated_rr_ids: &HashSet<i64>,
) -> HashMap<i64, DownsizeCandidate> {
let lookup = ResourceLookupContext::new(ctx.all_jobs, ctx.all_resource_requirements);
let mut jobs_per_rr: HashMap<i64, usize> = HashMap::new();
for job in ctx.all_jobs {
if let Some(rr_id) = job.resource_requirements_id {
*jobs_per_rr.entry(rr_id).or_insert(0) += 1;
}
}
let mut failed_rr_ids: HashSet<i64> = HashSet::new();
for result in ctx.all_results {
if result.return_code != 0
&& let Some(job) = lookup.find_job(result.job_id)
&& let Some(rr_id) = job.resource_requirements_id
{
failed_rr_ids.insert(rr_id);
}
}
let mut candidates: HashMap<i64, DownsizeCandidate> = HashMap::new();
for result in ctx.all_results {
if !opts.include_jobs.is_empty() && !opts.include_jobs.contains(&result.job_id) {
continue;
}
if result.return_code != 0 {
continue;
}
if let Some(job) = lookup.find_job(result.job_id)
&& let Some(rr_id) = job.resource_requirements_id
{
if violated_rr_ids.contains(&rr_id) || failed_rr_ids.contains(&rr_id) {
continue;
}
let candidate = candidates
.entry(rr_id)
.or_insert_with(|| DownsizeCandidate {
rr_id,
job_count: 0,
job_ids: Vec::new(),
job_names: Vec::new(),
max_peak_memory_bytes: None,
max_peak_cpu_percent: None,
max_peak_runtime_minutes: None,
all_have_memory_data: true,
all_have_cpu_data: true,
});
candidate.job_count += 1;
candidate.job_ids.push(result.job_id);
candidate.job_names.push(job.name.clone());
if let Some(peak_mem) = result.peak_memory_bytes {
let peak = peak_mem.max(0) as u64;
candidate.max_peak_memory_bytes = Some(
candidate
.max_peak_memory_bytes
.map_or(peak, |cur| cur.max(peak)),
);
} else {
candidate.all_have_memory_data = false;
}
if let Some(peak_cpu) = result.peak_cpu_percent {
candidate.max_peak_cpu_percent = Some(
candidate
.max_peak_cpu_percent
.map_or(peak_cpu, |cur| cur.max(peak_cpu)),
);
} else {
candidate.all_have_cpu_data = false;
}
candidate.max_peak_runtime_minutes = Some(
candidate
.max_peak_runtime_minutes
.map_or(result.exec_time_minutes, |cur| {
cur.max(result.exec_time_minutes)
}),
);
}
}
candidates.retain(|rr_id, candidate| {
let expected = jobs_per_rr.get(rr_id).copied().unwrap_or(0);
if candidate.job_count < expected {
debug!(
"RR {}: skipping downsize — only {}/{} jobs have results",
rr_id, candidate.job_count, expected
);
false
} else {
true
}
});
candidates
}
struct AdjustmentOutcome {
report: ResourceAdjustmentReport,
memory_corrections: usize,
runtime_corrections: usize,
cpu_corrections: usize,
}
fn aggregate_violations(
ctx: &ResourceCorrectionContext,
opts: &ResourceCorrectionOptions,
) -> (HashMap<i64, ResourceAdjustment>, usize) {
let mut rr_adjustments: HashMap<i64, ResourceAdjustment> = HashMap::new();
let jobs_to_analyze = if opts.include_jobs.is_empty() {
ctx.diagnosis.resource_violations.clone()
} else {
ctx.diagnosis
.resource_violations
.iter()
.filter(|j| opts.include_jobs.contains(&j.job_id))
.cloned()
.collect()
};
let jobs_analyzed = jobs_to_analyze.len();
for job_info in &jobs_to_analyze {
let job_id = job_info.job_id;
let memory_violation = job_info.memory_violation;
let likely_timeout = job_info.likely_timeout;
let likely_cpu_violation = job_info.likely_cpu_violation;
let likely_runtime_violation = job_info.likely_runtime_violation;
if !memory_violation
&& !likely_timeout
&& !likely_cpu_violation
&& !likely_runtime_violation
{
continue;
}
let job = match default_api::get_job(ctx.config, job_id) {
Ok(j) => j,
Err(e) => {
warn!("Warning: couldn't get job {}: {}", job_id, e);
continue;
}
};
let rr_id = match job.resource_requirements_id {
Some(id) => id,
None => {
warn!("Warning: job {} has no resource requirements", job_id);
continue;
}
};
let adjustment = rr_adjustments.entry(rr_id).or_insert_with(|| {
let (current_memory, current_runtime, current_cpus) =
match default_api::get_resource_requirements(ctx.config, rr_id) {
Ok(rr) => (rr.memory, rr.runtime, rr.num_cpus),
Err(e) => {
warn!(
"Warning: couldn't get resource requirements {}: {}",
rr_id, e
);
(String::new(), String::new(), 0)
}
};
ResourceAdjustment {
rr_id,
job_ids: Vec::new(),
job_names: Vec::new(),
max_peak_memory_bytes: None,
has_memory_violation_without_peak: false,
has_timeout: false,
current_memory,
current_runtime,
max_peak_cpu_percent: None,
has_cpu_violation: false,
current_cpus,
max_peak_runtime_minutes: None,
has_runtime_violation: false,
}
});
if adjustment.current_memory.is_empty() {
continue;
}
adjustment.job_ids.push(job_id);
adjustment.job_names.push(job.name.clone());
if memory_violation {
let peak_bytes = job_info
.peak_memory_bytes
.filter(|&v| v > 0)
.map(|v| v.max(0) as u64);
if let Some(peak) = peak_bytes {
adjustment.max_peak_memory_bytes = Some(
adjustment
.max_peak_memory_bytes
.map_or(peak, |current_max| current_max.max(peak)),
);
} else {
adjustment.has_memory_violation_without_peak = true;
}
}
if likely_timeout {
adjustment.has_timeout = true;
}
if likely_cpu_violation && let Some(peak_cpu) = job_info.peak_cpu_percent {
adjustment.has_cpu_violation = true;
adjustment.max_peak_cpu_percent = Some(
adjustment
.max_peak_cpu_percent
.map_or(peak_cpu, |current_max| current_max.max(peak_cpu)),
);
}
if likely_runtime_violation {
adjustment.has_runtime_violation = true;
adjustment.max_peak_runtime_minutes = Some(
adjustment
.max_peak_runtime_minutes
.map_or(job_info.exec_time_minutes, |current_max| {
current_max.max(job_info.exec_time_minutes)
}),
);
}
}
(rr_adjustments, jobs_analyzed)
}
fn apply_upscale_for_adjustment(
config: &Configuration,
opts: &ResourceCorrectionOptions,
adjustment: &ResourceAdjustment,
) -> Option<AdjustmentOutcome> {
let rr_id = adjustment.rr_id;
let mut memory_corrections = 0;
let mut runtime_corrections = 0;
let mut cpu_corrections = 0;
let mut updated = false;
let mut memory_adjusted = false;
let mut runtime_adjusted = false;
let mut cpu_adjusted = false;
let mut original_memory = None;
let mut new_memory_str = None;
let mut original_runtime = None;
let mut new_runtime_str = None;
let mut original_cpus = None;
let mut new_cpus_value = None;
let rr = match default_api::get_resource_requirements(config, rr_id) {
Ok(r) => r,
Err(e) => {
warn!(
"Warning: couldn't get resource requirements {}: {}",
rr_id, e
);
return None;
}
};
let mut new_rr = rr.clone();
if adjustment.max_peak_memory_bytes.is_some() || adjustment.has_memory_violation_without_peak {
let new_bytes = if let Some(max_peak) = adjustment.max_peak_memory_bytes {
(max_peak as f64 * opts.memory_multiplier).ceil() as u64
} else if let Ok(current_bytes) = memory_string_to_bytes(&adjustment.current_memory) {
(current_bytes as f64 * opts.memory_multiplier).ceil() as u64
} else {
warn!(
"RR {}: memory violation detected but couldn't determine new memory",
rr_id
);
return None;
};
let new_memory = format_memory_bytes_short(new_bytes);
let job_count = adjustment.job_ids.len();
if let Some(max_peak) = adjustment.max_peak_memory_bytes {
if job_count > 1 {
info!(
"{} job(s) with RR {}: memory violation, max peak usage {} -> allocating {} ({}x)",
job_count,
rr_id,
format_memory_bytes_short(max_peak),
new_memory,
opts.memory_multiplier
);
debug!(" Jobs: {:?}", adjustment.job_names);
} else if let (Some(job_id), Some(job_name)) =
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): memory violation, peak usage {} -> allocating {} ({}x)",
job_id,
job_name,
format_memory_bytes_short(max_peak),
new_memory,
opts.memory_multiplier
);
}
} else {
info!(
"{} job(s) with RR {}: memory violation, increasing memory {} -> {} ({}x, no peak data)",
job_count, rr_id, adjustment.current_memory, new_memory, opts.memory_multiplier
);
}
original_memory = Some(adjustment.current_memory.clone());
new_memory_str = Some(new_memory.clone());
memory_adjusted = true;
new_rr.memory = new_memory;
updated = true;
memory_corrections += adjustment.job_ids.len();
}
if adjustment.has_timeout
&& let Ok(current_secs) = duration_string_to_seconds(&adjustment.current_runtime)
{
let new_secs = (current_secs as f64 * opts.runtime_multiplier).ceil() as u64;
let new_runtime = format_duration_iso8601(new_secs);
let job_count = adjustment.job_ids.len();
if job_count > 1 {
info!(
"{} job(s) with RR {}: Timeout detected, increasing runtime {} -> {}",
job_count, rr_id, adjustment.current_runtime, new_runtime
);
} else if let (Some(job_id), Some(job_name)) =
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): Timeout detected, increasing runtime {} -> {}",
job_id, job_name, adjustment.current_runtime, new_runtime
);
}
original_runtime = Some(adjustment.current_runtime.clone());
new_runtime_str = Some(new_runtime.clone());
runtime_adjusted = true;
new_rr.runtime = new_runtime;
updated = true;
runtime_corrections += adjustment.job_ids.len();
}
if adjustment.has_runtime_violation && !adjustment.has_timeout {
if let (Some(max_peak_runtime), Ok(current_secs)) = (
adjustment.max_peak_runtime_minutes,
duration_string_to_seconds(&adjustment.current_runtime),
) {
let max_peak_secs = (max_peak_runtime * 60.0).ceil() as i64;
if max_peak_secs > current_secs {
let new_secs = (max_peak_secs as f64 * opts.runtime_multiplier).ceil() as u64;
let new_runtime = format_duration_iso8601(new_secs);
let job_count = adjustment.job_ids.len();
if job_count > 1 {
info!(
"{} job(s) with RR {}: Runtime violation detected, peak {}m -> allocating {} ({}x)",
job_count, rr_id, max_peak_runtime, new_runtime, opts.runtime_multiplier
);
} else if let (Some(job_id), Some(job_name)) =
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): Runtime violation detected, peak {}m -> allocating {} ({}x)",
job_id, job_name, max_peak_runtime, new_runtime, opts.runtime_multiplier
);
}
original_runtime = Some(adjustment.current_runtime.clone());
new_runtime_str = Some(new_runtime.clone());
runtime_adjusted = true;
new_rr.runtime = new_runtime;
updated = true;
runtime_corrections += adjustment.job_ids.len();
}
}
}
if adjustment.has_cpu_violation
&& let Some(max_peak_cpu) = adjustment.max_peak_cpu_percent
{
let required_cpus = (max_peak_cpu / 100.0 * opts.cpu_multiplier).ceil() as i64;
let new_cpus = std::cmp::max(required_cpus, 1);
if new_cpus > adjustment.current_cpus {
let job_count = adjustment.job_ids.len();
if job_count > 1 {
info!(
"{} job(s) with RR {}: CPU over-utilization detected, peak {}% -> allocating {} CPUs ({:.1}x safety margin)",
job_count, rr_id, max_peak_cpu, new_cpus, opts.cpu_multiplier
);
} else if let (Some(job_id), Some(job_name)) =
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): CPU over-utilization detected, peak {}% -> allocating {} CPUs ({:.1}x safety margin)",
job_id, job_name, max_peak_cpu, new_cpus, opts.cpu_multiplier
);
}
cpu_adjusted = true;
original_cpus = Some(adjustment.current_cpus);
new_cpus_value = Some(new_cpus);
new_rr.num_cpus = new_cpus;
updated = true;
cpu_corrections += adjustment.job_ids.len();
}
}
if !updated {
return None;
}
if !opts.dry_run
&& let Err(e) = default_api::update_resource_requirements(config, rr_id, new_rr)
{
warn!(
"Warning: failed to update resource requirements {}: {}",
rr_id, e
);
}
Some(AdjustmentOutcome {
report: ResourceAdjustmentReport {
resource_requirements_id: rr_id,
direction: "upscale".to_string(),
job_ids: adjustment.job_ids.clone(),
job_names: adjustment.job_names.clone(),
memory_adjusted,
original_memory,
new_memory: new_memory_str,
max_peak_memory_bytes: adjustment.max_peak_memory_bytes,
runtime_adjusted,
original_runtime,
new_runtime: new_runtime_str,
cpu_adjusted,
original_cpus,
new_cpus: new_cpus_value,
max_peak_cpu_percent: adjustment.max_peak_cpu_percent,
},
memory_corrections,
runtime_corrections,
cpu_corrections,
})
}
fn apply_downscale_for_candidate(
config: &Configuration,
opts: &ResourceCorrectionOptions,
candidate: &DownsizeCandidate,
) -> Option<AdjustmentOutcome> {
const MEMORY_THRESHOLD_BYTES: u64 = 1024 * 1024 * 1024; const CPU_THRESHOLD_PERCENT: f64 = 5.0; const RUNTIME_THRESHOLD_SECS: i64 = 30 * 60;
let rr_id = candidate.rr_id;
let rr = match default_api::get_resource_requirements(config, rr_id) {
Ok(r) => r,
Err(e) => {
warn!(
"Warning: couldn't get resource requirements {}: {}",
rr_id, e
);
return None;
}
};
let mut new_rr = rr.clone();
let mut memory_corrections = 0;
let mut runtime_corrections = 0;
let mut cpu_corrections = 0;
let mut updated = false;
let mut memory_adjusted = false;
let mut runtime_adjusted = false;
let mut cpu_adjusted = false;
let mut original_memory = None;
let mut new_memory_str = None;
let mut original_runtime = None;
let mut new_runtime_str = None;
let mut original_cpus = None;
let mut new_cpus_value = None;
if candidate.all_have_memory_data
&& let Some(max_peak) = candidate.max_peak_memory_bytes
&& let Ok(current_bytes) = memory_string_to_bytes(&rr.memory)
{
let current_bytes = current_bytes as u64;
let new_bytes = std::cmp::max(
(max_peak as f64 * opts.memory_multiplier).ceil() as u64,
1024 * 1024,
); if current_bytes > new_bytes && current_bytes - new_bytes > MEMORY_THRESHOLD_BYTES {
let new_memory = format_memory_bytes_short(new_bytes);
info!(
"RR {}: Downsizing memory {} -> {} (peak {} across {} job(s), {:.1}x margin)",
rr_id,
rr.memory,
new_memory,
format_memory_bytes_short(max_peak),
candidate.job_count,
opts.memory_multiplier,
);
original_memory = Some(rr.memory.clone());
new_memory_str = Some(new_memory.clone());
memory_adjusted = true;
new_rr.memory = new_memory;
updated = true;
memory_corrections += candidate.job_count;
}
}
if candidate.all_have_cpu_data
&& let Some(max_peak_cpu) = candidate.max_peak_cpu_percent
{
let current_cpu_percent = rr.num_cpus as f64 * 100.0;
let new_cpus = std::cmp::max(
(max_peak_cpu / 100.0 * opts.cpu_multiplier).ceil() as i64,
1,
);
if current_cpu_percent - max_peak_cpu > CPU_THRESHOLD_PERCENT && new_cpus < rr.num_cpus {
info!(
"RR {}: Downsizing CPUs {} -> {} (peak {:.1}% across {} job(s), {:.1}x margin)",
rr_id,
rr.num_cpus,
new_cpus,
max_peak_cpu,
candidate.job_count,
opts.cpu_multiplier,
);
cpu_adjusted = true;
original_cpus = Some(rr.num_cpus);
new_cpus_value = Some(new_cpus);
new_rr.num_cpus = new_cpus;
updated = true;
cpu_corrections += candidate.job_count;
}
}
if let Some(max_peak_minutes) = candidate.max_peak_runtime_minutes
&& let Ok(current_secs) = duration_string_to_seconds(&rr.runtime)
{
let new_secs = std::cmp::max(
(max_peak_minutes * 60.0 * opts.runtime_multiplier).ceil() as i64,
60, );
if current_secs > new_secs && current_secs - new_secs > RUNTIME_THRESHOLD_SECS {
let new_runtime = format_duration_iso8601(new_secs as u64);
info!(
"RR {}: Downsizing runtime {} -> {} (peak {:.1}m across {} job(s), {:.1}x margin)",
rr_id,
rr.runtime,
new_runtime,
max_peak_minutes,
candidate.job_count,
opts.runtime_multiplier,
);
original_runtime = Some(rr.runtime.clone());
new_runtime_str = Some(new_runtime.clone());
runtime_adjusted = true;
new_rr.runtime = new_runtime;
updated = true;
runtime_corrections += candidate.job_count;
}
}
if !updated {
return None;
}
if !opts.dry_run
&& let Err(e) = default_api::update_resource_requirements(config, rr_id, new_rr)
{
warn!(
"Warning: failed to update resource requirements {}: {}",
rr_id, e
);
}
Some(AdjustmentOutcome {
report: ResourceAdjustmentReport {
resource_requirements_id: rr_id,
direction: "downscale".to_string(),
job_ids: candidate.job_ids.clone(),
job_names: candidate.job_names.clone(),
memory_adjusted,
original_memory,
new_memory: new_memory_str,
max_peak_memory_bytes: candidate.max_peak_memory_bytes,
runtime_adjusted,
original_runtime,
new_runtime: new_runtime_str,
cpu_adjusted,
original_cpus,
new_cpus: new_cpus_value,
max_peak_cpu_percent: candidate.max_peak_cpu_percent,
},
memory_corrections,
runtime_corrections,
cpu_corrections,
})
}
pub fn apply_resource_corrections(
ctx: &ResourceCorrectionContext,
opts: &ResourceCorrectionOptions,
) -> Result<ResourceCorrectionResult, String> {
let (rr_adjustments, jobs_analyzed) = aggregate_violations(ctx, opts);
let mut adjustment_reports = Vec::new();
let mut memory_corrections = 0;
let mut runtime_corrections = 0;
let mut cpu_corrections = 0;
for adjustment in rr_adjustments.values() {
if let Some(outcome) = apply_upscale_for_adjustment(ctx.config, opts, adjustment) {
memory_corrections += outcome.memory_corrections;
runtime_corrections += outcome.runtime_corrections;
cpu_corrections += outcome.cpu_corrections;
adjustment_reports.push(outcome.report);
}
}
let mut downsize_memory_corrections = 0;
let mut downsize_runtime_corrections = 0;
let mut downsize_cpu_corrections = 0;
if !opts.no_downsize {
let violated_rr_ids: HashSet<i64> = {
let lookup = ResourceLookupContext::new(ctx.all_jobs, ctx.all_resource_requirements);
ctx.diagnosis
.resource_violations
.iter()
.filter_map(|v| {
lookup
.find_job(v.job_id)
.and_then(|j| j.resource_requirements_id)
})
.collect()
};
let downsize_candidates = build_downsize_candidates(ctx, opts, &violated_rr_ids);
for candidate in downsize_candidates.values() {
if rr_adjustments.contains_key(&candidate.rr_id) {
continue;
}
if let Some(outcome) = apply_downscale_for_candidate(ctx.config, opts, candidate) {
downsize_memory_corrections += outcome.memory_corrections;
downsize_runtime_corrections += outcome.runtime_corrections;
downsize_cpu_corrections += outcome.cpu_corrections;
adjustment_reports.push(outcome.report);
}
}
}
Ok(ResourceCorrectionResult {
resource_requirements_updated: adjustment_reports.len(),
jobs_analyzed,
memory_corrections,
runtime_corrections,
cpu_corrections,
downsize_memory_corrections,
downsize_runtime_corrections,
downsize_cpu_corrections,
adjustments: adjustment_reports,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_memory_bytes_short_gigabytes() {
assert_eq!(
format_memory_bytes_short(8 * 1024 * 1024 * 1024),
"8g".to_string()
);
}
#[test]
fn test_format_memory_bytes_short_megabytes() {
assert_eq!(
format_memory_bytes_short(512 * 1024 * 1024),
"512m".to_string()
);
}
#[test]
fn test_format_memory_bytes_short_kilobytes() {
assert_eq!(format_memory_bytes_short(1024 * 1024), "1m".to_string());
assert_eq!(format_memory_bytes_short(512 * 1024), "512k".to_string());
}
#[test]
fn test_format_memory_bytes_short_rounds_up() {
assert_eq!(
format_memory_bytes_short(3_500_000_000),
"4g".to_string(),
"3.5GB should round up to 4g"
);
assert_eq!(
format_memory_bytes_short(1_500_000),
"2m".to_string(),
"1.5MB should round up to 2m"
);
}
#[test]
fn test_format_memory_bytes_short_bytes() {
assert_eq!(format_memory_bytes_short(512), "512b".to_string());
}
#[test]
fn test_format_duration_iso8601_hours_and_minutes() {
assert_eq!(format_duration_iso8601(7200 + 1800), "PT2H30M".to_string());
}
#[test]
fn test_format_duration_iso8601_only_hours() {
assert_eq!(format_duration_iso8601(7200), "PT2H".to_string());
}
#[test]
fn test_format_duration_iso8601_only_minutes() {
assert_eq!(format_duration_iso8601(900), "PT15M".to_string());
}
#[test]
fn test_format_duration_iso8601_less_than_minute() {
assert_eq!(format_duration_iso8601(30), "PT1M".to_string());
}
#[test]
fn test_parse_format_memory_roundtrip() {
let original = "12g";
let bytes = memory_string_to_bytes(original).unwrap() as u64;
let formatted = format_memory_bytes_short(bytes);
assert_eq!(formatted, original);
}
}