use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
pub const MERGE_THRESHOLD: i64 = 2;
use crate::client::hpc::HpcProfile;
use crate::client::workflow_graph::{SchedulerGroup, WorkflowGraph};
use crate::time_utils::duration_string_to_seconds;
use super::commands::slurm::{
GroupByStrategy, WalltimeStrategy, parse_memory_mb, secs_to_walltime,
};
use crate::client::hpc::HpcPartition;
struct AllocationParams {
max_cpus: u32,
max_memory_mb: u64,
max_runtime_secs: u64,
max_gpus: u32,
nodes_per_job: u32,
job_count: usize,
allocation_walltime_secs: u64,
}
fn calculate_allocations(
params: &AllocationParams,
partition: &HpcPartition,
single_allocation: bool,
) -> Option<i64> {
if params.max_cpus == 0 || params.max_memory_mb == 0 {
return None;
}
let jobs_per_node_by_cpu = partition.cpus_per_node / params.max_cpus;
let jobs_per_node_by_mem = (partition.memory_mb / params.max_memory_mb) as u32;
let jobs_per_node_by_gpu = match (params.max_gpus, partition.gpus_per_node) {
(job_gpus, Some(node_gpus)) if job_gpus > 0 => node_gpus / job_gpus,
_ => u32::MAX,
};
let concurrent_jobs_per_node = std::cmp::max(
1,
std::cmp::min(
jobs_per_node_by_cpu,
std::cmp::min(jobs_per_node_by_mem, jobs_per_node_by_gpu),
),
);
let time_slots = if params.max_runtime_secs > 0 {
std::cmp::max(1, params.allocation_walltime_secs / params.max_runtime_secs)
} else {
1
};
let jobs_per_allocation = (concurrent_jobs_per_node as u64) * time_slots;
let total_nodes =
(params.job_count as u64).div_ceil(jobs_per_allocation) * (params.nodes_per_job as u64);
let total_nodes = std::cmp::max(1, total_nodes) as i64;
if single_allocation {
Some(1)
} else {
Some(total_nodes)
}
}
fn calculate_walltime(
max_job_runtime_secs: u64,
partition_max_walltime_secs: u64,
strategy: WalltimeStrategy,
multiplier: f64,
) -> u64 {
match strategy {
WalltimeStrategy::MaxPartitionTime => partition_max_walltime_secs,
WalltimeStrategy::MaxJobRuntime => {
if max_job_runtime_secs == 0 {
return partition_max_walltime_secs;
}
let scaled_runtime = (max_job_runtime_secs as f64 * multiplier).ceil() as u64;
std::cmp::min(scaled_runtime, partition_max_walltime_secs)
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannedScheduler {
pub name: String,
pub account: String,
pub partition: Option<String>,
pub mem: Option<String>,
pub walltime: String,
pub nodes: i64,
pub gres: Option<String>,
pub qos: Option<String>,
pub resource_requirements: String,
pub has_dependencies: bool,
pub job_count: usize,
pub job_names: Vec<String>,
pub job_name_patterns: Vec<String>,
pub num_allocations: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannedAction {
pub trigger_type: String,
pub scheduler_name: String,
pub job_names: Option<Vec<String>>,
pub job_name_patterns: Option<Vec<String>>,
pub num_allocations: i64,
pub is_recovery: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerPlan {
pub schedulers: Vec<PlannedScheduler>,
pub actions: Vec<PlannedAction>,
pub job_assignments: HashMap<String, String>,
pub warnings: Vec<String>,
}
impl SchedulerPlan {
pub fn new() -> Self {
Self {
schedulers: Vec::new(),
actions: Vec::new(),
job_assignments: HashMap::new(),
warnings: Vec::new(),
}
}
pub fn total_allocations(&self) -> i64 {
self.schedulers.iter().map(|s| s.num_allocations).sum()
}
}
impl Default for SchedulerPlan {
fn default() -> Self {
Self::new()
}
}
pub trait ResourceRequirements {
fn name(&self) -> &str;
fn memory(&self) -> &str;
fn runtime(&self) -> &str;
fn num_cpus(&self) -> i64;
fn num_gpus(&self) -> i64;
fn num_nodes(&self) -> i64;
}
#[allow(clippy::too_many_arguments)]
pub fn generate_scheduler_plan<RR: ResourceRequirements>(
graph: &WorkflowGraph,
resource_requirements: &HashMap<&str, &RR>,
profile: &HpcProfile,
account: &str,
single_allocation: bool,
group_by: GroupByStrategy,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
add_actions: bool,
scheduler_name_suffix: Option<&str>,
is_recovery: bool,
) -> SchedulerPlan {
let mut plan = SchedulerPlan::new();
let scheduler_groups = graph.scheduler_groups();
match group_by {
GroupByStrategy::Partition => {
generate_plan_grouped_by_partition(
&scheduler_groups,
resource_requirements,
profile,
account,
single_allocation,
walltime_strategy,
walltime_multiplier,
add_actions,
scheduler_name_suffix,
is_recovery,
&mut plan,
);
}
GroupByStrategy::ResourceRequirements => {
for group in &scheduler_groups {
match process_scheduler_group(
group,
resource_requirements,
profile,
account,
single_allocation,
walltime_strategy,
walltime_multiplier,
add_actions,
scheduler_name_suffix,
is_recovery,
) {
Ok((scheduler, action)) => {
for job_name in &scheduler.job_names {
plan.job_assignments
.insert(job_name.clone(), scheduler.name.clone());
}
plan.schedulers.push(scheduler);
if let Some(action) = action {
plan.actions.push(action);
}
}
Err(warning) => {
plan.warnings.push(warning);
}
}
}
}
}
plan
}
#[allow(clippy::too_many_arguments)]
fn process_scheduler_group<RR: ResourceRequirements>(
group: &SchedulerGroup,
resource_requirements: &HashMap<&str, &RR>,
profile: &HpcProfile,
account: &str,
single_allocation: bool,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
add_actions: bool,
scheduler_name_suffix: Option<&str>,
is_recovery: bool,
) -> Result<(PlannedScheduler, Option<PlannedAction>), String> {
let rr_name = &group.resource_requirements;
let rr = resource_requirements.get(rr_name.as_str()).ok_or_else(|| {
format!(
"Resource requirements '{}' not found, skipping {} job(s)",
rr_name, group.job_count
)
})?;
let memory_mb = parse_memory_mb(rr.memory()).map_err(|e| {
format!(
"Failed to parse memory '{}' for RR '{}': {}",
rr.memory(),
rr.name(),
e
)
})?;
let runtime_secs = duration_string_to_seconds(rr.runtime()).map_err(|e| {
format!(
"Failed to parse runtime '{}' for RR '{}': {}",
rr.runtime(),
rr.name(),
e
)
})? as u64;
let gpus = if rr.num_gpus() > 0 {
Some(rr.num_gpus() as u32)
} else {
None
};
let partition = profile
.find_best_partition(rr.num_cpus() as u32, memory_mb, runtime_secs, gpus)
.ok_or_else(|| {
format!(
"No partition found for resource requirements '{}' (CPUs: {}, Memory: {}, Runtime: {}, GPUs: {:?})",
rr.name(),
rr.num_cpus(),
rr.memory(),
rr.runtime(),
gpus
)
})?;
let walltime_secs = calculate_walltime(
runtime_secs,
partition.max_walltime_secs,
walltime_strategy,
walltime_multiplier,
);
let alloc_params = AllocationParams {
max_cpus: rr.num_cpus() as u32,
max_memory_mb: memory_mb,
max_runtime_secs: runtime_secs,
max_gpus: gpus.unwrap_or(0),
nodes_per_job: rr.num_nodes() as u32,
job_count: group.job_count,
allocation_walltime_secs: walltime_secs,
};
let num_allocations = calculate_allocations(&alloc_params, partition, single_allocation)
.ok_or_else(|| {
format!(
"Invalid resource requirements for '{}': CPUs or memory is zero",
rr.name()
)
})?;
let nodes_per_alloc = if single_allocation {
num_allocations
} else {
1
};
let base_name = if group.has_dependencies {
format!("{}_deferred", rr_name)
} else {
rr_name.clone()
};
let scheduler_name = match scheduler_name_suffix {
Some(suffix) => format!("{}_{}", base_name, suffix),
None => format!("{}_scheduler", base_name),
};
let mem_str = if partition.memory_mb >= 1024 {
format!("{}g", partition.memory_mb / 1024)
} else {
format!("{}m", partition.memory_mb)
};
let scheduler = PlannedScheduler {
name: scheduler_name.clone(),
account: account.to_string(),
partition: if partition.requires_explicit_request {
Some(partition.name.clone())
} else {
None
},
mem: Some(mem_str),
walltime: secs_to_walltime(walltime_secs),
nodes: nodes_per_alloc,
gres: gpus.map(|g| format!("gpu:{}", g)),
qos: partition.default_qos.clone(),
resource_requirements: rr_name.clone(),
has_dependencies: group.has_dependencies,
job_count: group.job_count,
job_names: group.job_names.clone(),
job_name_patterns: group.job_name_patterns.clone(),
num_allocations,
};
let action = if add_actions {
let (trigger_type, job_names, job_name_patterns) = if group.has_dependencies {
("on_jobs_ready", Some(group.job_names.clone()), None)
} else {
("on_workflow_start", None, None)
};
Some(PlannedAction {
trigger_type: trigger_type.to_string(),
scheduler_name: scheduler_name.clone(),
job_names,
job_name_patterns,
num_allocations,
is_recovery,
})
} else {
None
};
Ok((scheduler, action))
}
struct PartitionGroup {
partition_name: String,
has_dependencies: bool,
job_count: usize,
job_names: Vec<String>,
job_name_patterns: Vec<String>,
rr_names: Vec<String>,
max_memory_mb: u64,
max_runtime_secs: u64,
max_cpus: i64,
max_gpus: i64,
max_nodes: i64,
}
#[allow(clippy::too_many_arguments)]
fn generate_plan_grouped_by_partition<RR: ResourceRequirements>(
scheduler_groups: &[SchedulerGroup],
resource_requirements: &HashMap<&str, &RR>,
profile: &HpcProfile,
account: &str,
single_allocation: bool,
walltime_strategy: WalltimeStrategy,
walltime_multiplier: f64,
add_actions: bool,
scheduler_name_suffix: Option<&str>,
is_recovery: bool,
plan: &mut SchedulerPlan,
) {
let mut partition_groups: HashMap<(String, bool), PartitionGroup> = HashMap::new();
for group in scheduler_groups {
let rr_name = &group.resource_requirements;
let rr = match resource_requirements.get(rr_name.as_str()) {
Some(rr) => *rr,
None => {
plan.warnings.push(format!(
"Resource requirements '{}' not found, skipping {} job(s)",
rr_name, group.job_count
));
continue;
}
};
let memory_mb = match parse_memory_mb(rr.memory()) {
Ok(m) => m,
Err(e) => {
plan.warnings.push(format!(
"Failed to parse memory '{}' for RR '{}': {}",
rr.memory(),
rr.name(),
e
));
continue;
}
};
let runtime_secs = match duration_string_to_seconds(rr.runtime()) {
Ok(s) => s as u64,
Err(e) => {
plan.warnings.push(format!(
"Failed to parse runtime '{}' for RR '{}': {}",
rr.runtime(),
rr.name(),
e
));
continue;
}
};
let gpus = if rr.num_gpus() > 0 {
Some(rr.num_gpus() as u32)
} else {
None
};
let partition = match profile.find_best_partition(
rr.num_cpus() as u32,
memory_mb,
runtime_secs,
gpus,
) {
Some(p) => p,
None => {
plan.warnings.push(format!(
"No partition found for resource requirements '{}' (CPUs: {}, Memory: {}, Runtime: {}, GPUs: {:?})",
rr.name(),
rr.num_cpus(),
rr.memory(),
rr.runtime(),
gpus
));
continue;
}
};
let key = (partition.name.clone(), group.has_dependencies);
let pg = partition_groups
.entry(key)
.or_insert_with(|| PartitionGroup {
partition_name: partition.name.clone(),
has_dependencies: group.has_dependencies,
job_count: 0,
job_names: Vec::new(),
job_name_patterns: Vec::new(),
rr_names: Vec::new(),
max_memory_mb: 0,
max_runtime_secs: 0,
max_cpus: 0,
max_gpus: 0,
max_nodes: 0,
});
pg.job_count += group.job_count;
pg.job_names.extend(group.job_names.clone());
pg.job_name_patterns.extend(group.job_name_patterns.clone());
pg.rr_names.push(rr_name.clone());
pg.max_memory_mb = pg.max_memory_mb.max(memory_mb);
pg.max_runtime_secs = pg.max_runtime_secs.max(runtime_secs);
pg.max_cpus = pg.max_cpus.max(rr.num_cpus());
pg.max_gpus = pg.max_gpus.max(rr.num_gpus());
pg.max_nodes = pg.max_nodes.max(rr.num_nodes());
}
let calc_group_allocations = |pg: &PartitionGroup| -> Option<i64> {
let gpus = if pg.max_gpus > 0 {
Some(pg.max_gpus as u32)
} else {
None
};
let partition = profile.find_best_partition(
pg.max_cpus as u32,
pg.max_memory_mb,
pg.max_runtime_secs,
gpus,
)?;
let alloc_walltime_secs = calculate_walltime(
pg.max_runtime_secs,
partition.max_walltime_secs,
walltime_strategy,
walltime_multiplier,
);
let params = AllocationParams {
max_cpus: pg.max_cpus as u32,
max_memory_mb: pg.max_memory_mb,
max_runtime_secs: pg.max_runtime_secs,
max_gpus: pg.max_gpus as u32,
nodes_per_job: pg.max_nodes as u32,
job_count: pg.job_count,
allocation_walltime_secs: alloc_walltime_secs,
};
calculate_allocations(¶ms, partition, single_allocation)
};
let partition_names: HashSet<String> = partition_groups.keys().map(|k| k.0.clone()).collect();
for partition_name in partition_names {
let deferred_key = (partition_name.clone(), true);
let non_deferred_key = (partition_name.clone(), false);
let (deferred, non_deferred) = match (
partition_groups.get(&deferred_key),
partition_groups.get(&non_deferred_key),
) {
(Some(d), Some(nd)) => (d, nd),
_ => continue,
};
let deferred_allocs = calc_group_allocations(deferred).unwrap_or(i64::MAX);
let non_deferred_allocs = calc_group_allocations(non_deferred).unwrap_or(i64::MAX);
let total_allocs = deferred_allocs.saturating_add(non_deferred_allocs);
if total_allocs <= MERGE_THRESHOLD {
let deferred = partition_groups.remove(&deferred_key).unwrap();
let non_deferred = partition_groups.get_mut(&non_deferred_key).unwrap();
non_deferred.job_count += deferred.job_count;
non_deferred.job_names.extend(deferred.job_names);
non_deferred
.job_name_patterns
.extend(deferred.job_name_patterns);
let existing: HashSet<_> = non_deferred.rr_names.iter().cloned().collect();
for name in deferred.rr_names {
if !existing.contains(&name) {
non_deferred.rr_names.push(name);
}
}
non_deferred.max_memory_mb = non_deferred.max_memory_mb.max(deferred.max_memory_mb);
non_deferred.max_runtime_secs =
non_deferred.max_runtime_secs.max(deferred.max_runtime_secs);
non_deferred.max_cpus = non_deferred.max_cpus.max(deferred.max_cpus);
non_deferred.max_gpus = non_deferred.max_gpus.max(deferred.max_gpus);
non_deferred.max_nodes = non_deferred.max_nodes.max(deferred.max_nodes);
}
}
for pg in partition_groups.into_values() {
let gpus = if pg.max_gpus > 0 {
Some(pg.max_gpus as u32)
} else {
None
};
let partition = match profile.find_best_partition(
pg.max_cpus as u32,
pg.max_memory_mb,
pg.max_runtime_secs,
gpus,
) {
Some(p) => p,
None => {
plan.warnings.push(format!(
"No partition found for merged group '{}' (this shouldn't happen)",
pg.partition_name
));
continue;
}
};
let walltime_secs = calculate_walltime(
pg.max_runtime_secs,
partition.max_walltime_secs,
walltime_strategy,
walltime_multiplier,
);
let alloc_params = AllocationParams {
max_cpus: pg.max_cpus as u32,
max_memory_mb: pg.max_memory_mb,
max_runtime_secs: pg.max_runtime_secs,
max_gpus: gpus.unwrap_or(0),
nodes_per_job: pg.max_nodes as u32,
job_count: pg.job_count,
allocation_walltime_secs: walltime_secs,
};
let num_allocations =
match calculate_allocations(&alloc_params, partition, single_allocation) {
Some(n) => n,
None => {
plan.warnings.push(format!(
"Invalid resource parameters for group '{}': CPUs or memory is zero",
pg.partition_name
));
continue;
}
};
let nodes_per_alloc = if single_allocation {
num_allocations
} else {
1
};
let base_name = if pg.has_dependencies {
format!("{}_deferred", pg.partition_name)
} else {
pg.partition_name.clone()
};
let scheduler_name = match scheduler_name_suffix {
Some(suffix) => format!("{}_{}", base_name, suffix),
None => format!("{}_scheduler", base_name),
};
let mem_str = if partition.memory_mb >= 1024 {
format!("{}g", partition.memory_mb / 1024)
} else {
format!("{}m", partition.memory_mb)
};
let scheduler = PlannedScheduler {
name: scheduler_name.clone(),
account: account.to_string(),
partition: if partition.requires_explicit_request {
Some(partition.name.clone())
} else {
None
},
mem: Some(mem_str),
walltime: secs_to_walltime(walltime_secs),
nodes: nodes_per_alloc,
gres: gpus.map(|g| format!("gpu:{}", g)),
qos: partition.default_qos.clone(),
resource_requirements: pg.rr_names.join(","), has_dependencies: pg.has_dependencies,
job_count: pg.job_count,
job_names: pg.job_names.clone(),
job_name_patterns: pg.job_name_patterns.clone(),
num_allocations,
};
for job_name in &scheduler.job_names {
plan.job_assignments
.insert(job_name.clone(), scheduler.name.clone());
}
plan.schedulers.push(scheduler);
if add_actions {
let (trigger_type, job_names, job_name_patterns) = if pg.has_dependencies {
("on_jobs_ready", Some(pg.job_names.clone()), None)
} else {
("on_workflow_start", None, None)
};
plan.actions.push(PlannedAction {
trigger_type: trigger_type.to_string(),
scheduler_name,
job_names,
job_name_patterns,
num_allocations,
is_recovery,
});
}
}
}
impl ResourceRequirements for crate::client::workflow_spec::ResourceRequirementsSpec {
fn name(&self) -> &str {
&self.name
}
fn memory(&self) -> &str {
&self.memory
}
fn runtime(&self) -> &str {
&self.runtime
}
fn num_cpus(&self) -> i64 {
self.num_cpus
}
fn num_gpus(&self) -> i64 {
self.num_gpus
}
fn num_nodes(&self) -> i64 {
self.num_nodes
}
}
impl ResourceRequirements for crate::models::ResourceRequirementsModel {
fn name(&self) -> &str {
&self.name
}
fn memory(&self) -> &str {
&self.memory
}
fn runtime(&self) -> &str {
&self.runtime
}
fn num_cpus(&self) -> i64 {
self.num_cpus
}
fn num_gpus(&self) -> i64 {
self.num_gpus
}
fn num_nodes(&self) -> i64 {
self.num_nodes
}
}
use crate::client::workflow_spec::{SlurmSchedulerSpec, WorkflowActionSpec, WorkflowSpec};
pub fn apply_plan_to_spec(plan: &SchedulerPlan, spec: &mut WorkflowSpec) {
let schedulers: Vec<SlurmSchedulerSpec> = plan
.schedulers
.iter()
.map(|ps| SlurmSchedulerSpec {
name: Some(ps.name.clone()),
account: ps.account.clone(),
partition: ps.partition.clone(),
mem: ps.mem.clone(),
walltime: ps.walltime.clone(),
nodes: ps.nodes,
gres: ps.gres.clone(),
ntasks_per_node: None,
qos: ps.qos.clone(),
tmp: None,
extra: None,
})
.collect();
let actions: Vec<WorkflowActionSpec> = plan
.actions
.iter()
.map(|pa| WorkflowActionSpec {
trigger_type: pa.trigger_type.clone(),
action_type: "schedule_nodes".to_string(),
jobs: pa.job_names.clone(),
job_name_regexes: pa.job_name_patterns.clone(),
commands: None,
scheduler: Some(pa.scheduler_name.clone()),
scheduler_type: Some("slurm".to_string()),
num_allocations: Some(pa.num_allocations),
start_one_worker_per_node: None,
max_parallel_jobs: None,
persistent: None,
})
.collect();
spec.slurm_schedulers = Some(schedulers);
if !actions.is_empty() {
let mut existing_actions = spec.actions.take().unwrap_or_default();
existing_actions.extend(actions);
spec.actions = Some(existing_actions);
}
for job in &mut spec.jobs {
if let Some(scheduler_name) = plan.job_assignments.get(&job.name) {
job.scheduler = Some(scheduler_name.clone());
}
}
}