use crate::dispatch::PriorityQueue;
use crate::types::{Batch, CrateUnit, Edition};
use crossbeam_channel::Receiver;
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};
pub(crate) const DEFAULT_PACK_MULTIPLIER: usize = 4;
pub(crate) fn run(
rx: &Receiver<CrateUnit>,
queue: &PriorityQueue,
batch_size: usize,
pack_multiplier: usize,
solo_threshold_bytes: u64,
) {
let batch_size = batch_size.max(1);
let pack_multiplier = pack_multiplier.max(1);
let window = batch_size.saturating_mul(pack_multiplier);
let mut buckets: HashMap<Edition, Vec<CrateUnit>> = HashMap::new();
while let Ok(unit) = rx.recv() {
if unit.size_bytes >= solo_threshold_bytes {
let edition = unit.edition;
queue.push(Batch {
edition,
units: vec![unit],
});
continue;
}
let bucket = buckets.entry(unit.edition).or_default();
bucket.push(unit);
if bucket.len() >= window {
flush_window(std::mem::take(bucket), batch_size, queue);
}
}
for (_edition, units) in buckets {
flush_window(units, batch_size, queue);
}
}
fn flush_window(mut units: Vec<CrateUnit>, batch_size: usize, queue: &PriorityQueue) {
if units.is_empty() {
return;
}
let edition = units[0].edition;
let n_batches = units.len().div_ceil(batch_size).max(1);
units.sort_by_key(|u| Reverse(u.size_bytes));
let mut bins: Vec<Vec<CrateUnit>> = vec![Vec::new(); n_batches];
let mut heap: BinaryHeap<Reverse<(u64, usize)>> =
(0..n_batches).map(|i| Reverse((0u64, i))).collect();
for unit in units {
let Some(Reverse((size, idx))) = heap.pop() else {
unreachable!("LPT heap seeded with n_batches >= 1 bins and refilled each iteration")
};
let new_size = size + unit.size_bytes;
bins[idx].push(unit);
heap.push(Reverse((new_size, idx)));
}
for bin in bins {
if bin.is_empty() {
continue;
}
queue.push(Batch {
edition,
units: bin,
});
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::{CrateUnit, PriorityQueue, run};
use crate::types::{Batch, Edition};
use crossbeam_channel::bounded;
use std::collections::HashSet;
use std::path::PathBuf;
fn unit(edition: Edition, size_bytes: u64, name: &str) -> CrateUnit {
CrateUnit {
edition,
manifest_dir: PathBuf::from(name),
files: vec![PathBuf::from(name)],
size_bytes,
}
}
fn coalesce(units: Vec<CrateUnit>, batch_size: usize, solo_threshold: u64) -> Vec<Batch> {
let (tx, rx) = bounded::<CrateUnit>(64);
for u in units {
tx.send(u).unwrap();
}
drop(tx); let queue = PriorityQueue::new();
run(&rx, &queue, batch_size, 4, solo_threshold);
queue.close();
let mut batches = Vec::new();
while let Some(b) = queue.pop() {
batches.push(b);
}
batches
}
#[test]
fn units_at_or_above_threshold_ship_solo() {
let batches = coalesce(
vec![
unit(Edition::E2021, 1_000_000, "giant"),
unit(Edition::E2021, 10, "small"),
],
3,
1_000_000,
);
let solo = batches
.iter()
.find(|b| b.units.iter().any(|u| u.size_bytes >= 1_000_000));
assert!(solo.is_some(), "the giant must be dispatched");
assert_eq!(
solo.unwrap().units.len(),
1,
"a unit at the solo threshold ships as its own batch",
);
}
#[test]
fn batches_never_mix_editions() {
let mut units = Vec::new();
for i in 0..6 {
units.push(unit(Edition::E2018, 10 + i, &format!("a{i}")));
units.push(unit(Edition::E2021, 10 + i, &format!("b{i}")));
}
let batches = coalesce(units, 3, 1_000_000);
for b in &batches {
let editions: HashSet<Edition> = b.units.iter().map(|u| u.edition).collect();
assert_eq!(editions.len(), 1, "rustfmt takes one --edition per process");
}
let total: usize = batches.iter().map(|b| b.units.len()).sum();
assert_eq!(total, 12, "every unit lands in exactly one batch");
}
#[test]
fn packs_all_units_without_dropping_any() {
let units = (0..5)
.map(|i| unit(Edition::E2021, 10, &format!("c{i}")))
.collect();
let batches = coalesce(units, 2, 1_000_000);
let total: usize = batches.iter().map(|b| b.units.len()).sum();
assert_eq!(total, 5);
assert!(
batches.iter().all(|b| !b.units.is_empty()),
"the packer never emits an empty batch",
);
}
}