use crate::dispatch::PriorityQueue;
use crate::types::{Batch, CrateUnit, Edition, Result};
use crossbeam_channel::Receiver;
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};
pub(crate) fn run(
rx: Receiver<CrateUnit>,
queue: &PriorityQueue,
batch_size: usize,
pack_multiplier: usize,
solo_threshold_bytes: u64,
) -> Result<()> {
let batch_size = batch_size.max(1);
let pack_multiplier = pack_multiplier.max(1);
let window = batch_size.saturating_mul(pack_multiplier);
let mut buckets: HashMap<Edition, Vec<CrateUnit>> = HashMap::new();
while let Ok(unit) = rx.recv() {
if unit.size_bytes >= solo_threshold_bytes {
let edition = unit.edition;
queue.push(Batch {
edition,
units: vec![unit],
});
continue;
}
let bucket = buckets.entry(unit.edition).or_default();
bucket.push(unit);
if bucket.len() >= window {
flush_window(std::mem::take(bucket), batch_size, queue);
}
}
for (_edition, units) in buckets {
flush_window(units, batch_size, queue);
}
Ok(())
}
fn flush_window(mut units: Vec<CrateUnit>, batch_size: usize, queue: &PriorityQueue) {
if units.is_empty() {
return;
}
let edition = units[0].edition;
let n_batches = units.len().div_ceil(batch_size).max(1);
units.sort_by_key(|u| Reverse(u.size_bytes));
let mut bins: Vec<Vec<CrateUnit>> = vec![Vec::new(); n_batches];
let mut heap: BinaryHeap<Reverse<(u64, usize)>> =
(0..n_batches).map(|i| Reverse((0u64, i))).collect();
for unit in units {
let Reverse((size, idx)) = heap.pop().expect("bins non-empty");
let new_size = size + unit.size_bytes;
bins[idx].push(unit);
heap.push(Reverse((new_size, idx)));
}
for bin in bins {
if bin.is_empty() {
continue;
}
queue.push(Batch {
edition,
units: bin,
});
}
}