#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PackJob {
pub label: String,
pub predicted_peak_bytes: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeAssignment {
pub node: usize,
pub job_labels: Vec<String>,
pub used_bytes: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PackOutcome {
Packed(Vec<NodeAssignment>),
NoFit {
reason: String,
},
}
pub fn first_fit_decreasing(
jobs: &[PackJob],
node_capacity_bytes: u64,
max_nodes: Option<usize>,
) -> PackOutcome {
if let Some(j) = jobs
.iter()
.find(|j| j.predicted_peak_bytes > node_capacity_bytes)
{
return PackOutcome::NoFit {
reason: format!(
"job '{}' predicted peak {} B exceeds the node capacity {} B — it cannot run on any node; \
use a larger --node-mb or lower --max-depth",
j.label, j.predicted_peak_bytes, node_capacity_bytes
),
};
}
let mut order: Vec<&PackJob> = jobs.iter().collect();
order.sort_by(|a, b| {
b.predicted_peak_bytes
.cmp(&a.predicted_peak_bytes)
.then_with(|| a.label.cmp(&b.label))
});
let mut nodes: Vec<NodeAssignment> = Vec::new();
for job in order {
let slot = nodes
.iter_mut()
.find(|n| n.used_bytes + job.predicted_peak_bytes <= node_capacity_bytes);
match slot {
Some(n) => {
n.used_bytes += job.predicted_peak_bytes;
n.job_labels.push(job.label.clone());
}
None => {
if let Some(max) = max_nodes {
if nodes.len() >= max {
return PackOutcome::NoFit {
reason: format!(
"jobs do not fit in {max} node(s) of {} B each; need more nodes or a larger --node-mb",
node_capacity_bytes
),
};
}
}
nodes.push(NodeAssignment {
node: nodes.len(),
job_labels: vec![job.label.clone()],
used_bytes: job.predicted_peak_bytes,
});
}
}
}
PackOutcome::Packed(nodes)
}
#[cfg(test)]
mod tests {
use super::*;
fn job(label: &str, peak: u64) -> PackJob {
PackJob {
label: label.to_string(),
predicted_peak_bytes: peak,
}
}
#[test]
fn packs_all_jobs_and_every_node_is_within_capacity() {
let jobs = vec![job("a", 30), job("b", 40), job("c", 50), job("d", 20)];
let cap = 100;
let PackOutcome::Packed(nodes) = first_fit_decreasing(&jobs, cap, None) else {
panic!("should pack");
};
for n in &nodes {
assert!(
n.used_bytes <= cap,
"node {} sums {} > capacity {cap}",
n.node,
n.used_bytes
);
}
let mut placed: Vec<&String> = nodes.iter().flat_map(|n| &n.job_labels).collect();
placed.sort();
assert_eq!(placed, vec!["a", "b", "c", "d"]);
assert_eq!(nodes.len(), 2, "FFD should use 2 nodes: {nodes:?}");
}
#[test]
fn refuses_a_job_larger_than_a_node() {
let jobs = vec![job("ok", 50), job("toobig", 150)];
match first_fit_decreasing(&jobs, 100, None) {
PackOutcome::NoFit { reason } => assert!(
reason.contains("toobig") && reason.contains("cannot run on any node"),
"unexpected reason: {reason}"
),
other => panic!("a job bigger than a node must NoFit: {other:?}"),
}
}
#[test]
fn refuses_when_jobs_exceed_the_node_budget() {
let jobs = vec![job("a", 60), job("b", 60), job("c", 60)];
match first_fit_decreasing(&jobs, 100, Some(2)) {
PackOutcome::NoFit { reason } => assert!(reason.contains("2 node"), "reason: {reason}"),
other => panic!("should not fit in 2 nodes: {other:?}"),
}
assert!(matches!(
first_fit_decreasing(&jobs, 100, Some(3)),
PackOutcome::Packed(_)
));
}
#[test]
fn is_deterministic_regardless_of_input_order() {
let a = vec![job("x", 40), job("y", 40), job("z", 40)];
let b = vec![job("z", 40), job("x", 40), job("y", 40)];
assert_eq!(
first_fit_decreasing(&a, 100, None),
first_fit_decreasing(&b, 100, None),
"packing must be order-independent (deterministic schedule)"
);
}
#[test]
fn empty_jobs_pack_to_zero_nodes() {
assert_eq!(
first_fit_decreasing(&[], 100, None),
PackOutcome::Packed(vec![])
);
}
}