use std::collections::HashMap;
use pkgsrc::PkgName;
use tracing::debug;
use crate::db::BuildStageTiming;
pub struct Allocator {
jobs: usize,
fair: usize,
min_jobs: usize,
max_jobs: usize,
log_min: f64,
log_range: f64,
}
impl Allocator {
pub fn new(workers: usize, jobs: usize) -> Self {
let fair = jobs.div_ceil(workers.max(1));
let min_jobs = 2.min(jobs);
let max_jobs = (fair + 2).max(jobs / 3).min(jobs);
Self {
jobs,
fair,
min_jobs,
max_jobs,
log_min: 0.0,
log_range: 0.0,
}
}
pub fn calibrate(&mut self, cpu_times: &[usize]) {
if cpu_times.is_empty() {
return;
}
let min_t = cpu_times[0].max(1) as f64;
let max_t = cpu_times[cpu_times.len() - 1].max(1) as f64;
self.log_min = min_t.ln();
self.log_range = (max_t.ln() - self.log_min).max(1.0);
}
pub(crate) fn budget(&self) -> usize {
self.jobs
}
pub fn assign(&self, cpu_time: Option<usize>) -> usize {
if self.log_range == 0.0 {
return self.fair;
}
match cpu_time {
Some(v) if v > 0 => {
let t = ((v as f64).ln() - self.log_min) / self.log_range;
let t = t.clamp(0.0, 1.0);
let j = self.min_jobs as f64 + t * (self.max_jobs - self.min_jobs) as f64;
(j.round() as usize).clamp(self.min_jobs, self.max_jobs)
}
_ => self.fair,
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct PkgMakeJobs {
safe: bool,
allocated: Option<usize>,
jobs: Option<usize>,
}
impl PkgMakeJobs {
pub fn new(safe: bool) -> Self {
Self {
safe,
allocated: None,
jobs: None,
}
}
pub fn safe(&self) -> bool {
self.safe
}
pub fn allocated(&self) -> Option<usize> {
self.allocated
}
pub fn jobs(&self) -> Option<usize> {
self.jobs
}
pub fn allocate(&mut self, jobs: usize) {
debug_assert!(self.safe, "allocate called on unsafe package");
self.allocated = Some(jobs);
}
pub fn set_jobs(&mut self, n: usize) {
self.jobs = Some(n);
}
}
impl Default for PkgMakeJobs {
fn default() -> Self {
Self::new(false)
}
}
impl serde::Serialize for PkgMakeJobs {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
self.jobs.serialize(s)
}
}
impl<'de> serde::Deserialize<'de> for PkgMakeJobs {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
let jobs = Option::<usize>::deserialize(d)?;
let mut mj = PkgMakeJobs::new(jobs.is_some());
if let Some(n) = jobs {
mj.set_jobs(n);
}
Ok(mj)
}
}
pub(crate) fn pkg_cpu_history(
stage_timings: &HashMap<(String, String), BuildStageTiming>,
pkg_paths: &HashMap<PkgName, String>,
) -> HashMap<PkgName, usize> {
let mut result: HashMap<PkgName, usize> = HashMap::new();
let mut with_history = 0usize;
for (pkgname, pkgpath) in pkg_paths {
let pkgbase = pkgname.pkgbase().to_string();
if let Some(t) = stage_timings.get(&(pkgpath.clone(), pkgbase)) {
if t.cpu_ms > 0 {
result.insert(pkgname.clone(), t.cpu_ms as usize);
with_history += 1;
}
}
}
debug!(
with_history,
total = pkg_paths.len(),
"pkg_cpu_history computed"
);
result
}
#[cfg(test)]
mod tests {
use super::*;
fn calibrated(workers: usize, jobs: usize, cpu_times: &[usize]) -> Allocator {
let mut alloc = Allocator::new(workers, jobs);
alloc.calibrate(cpu_times);
alloc
}
#[test]
fn no_history_gets_fair_share() {
for &(w, c, fair) in &[(4, 16, 4), (7, 10, 2), (24, 32, 2)] {
let alloc = Allocator::new(w, c);
for i in 0..w {
assert_eq!(alloc.assign(None), fair, "w={w} c={c} worker {i}");
}
}
}
#[test]
fn history_spreads_across_range() {
let times = [100, 1_000, 10_000, 100_000];
let alloc = calibrated(4, 16, ×);
let assigned: Vec<usize> = times.iter().map(|&v| alloc.assign(Some(v))).collect();
assert_eq!(assigned, [2, 3, 5, 6], "w=4 c=16");
let times = [100, 300, 1_000, 3_000, 10_000, 30_000, 100_000];
let alloc = calibrated(7, 10, ×);
let assigned: Vec<usize> = times.iter().map(|&v| alloc.assign(Some(v))).collect();
assert_eq!(assigned, [2, 2, 3, 3, 3, 4, 4], "w=7 c=10");
let times: Vec<usize> = (0..24).map(|i| 100 * (1 << i)).collect();
let alloc = calibrated(24, 32, ×);
let assigned: Vec<usize> = times.iter().map(|&v| alloc.assign(Some(v))).collect();
assert_eq!(
assigned,
[
2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10
],
"w=24 c=32"
);
}
#[test]
fn unsafe_package_gets_one_job() {
let mut mj = PkgMakeJobs::new(false);
assert!(!mj.safe());
assert_eq!(mj.allocated(), None);
mj.set_jobs(1);
assert_eq!(mj.jobs(), Some(1));
}
#[test]
fn empty_calibration_returns_fair() {
let alloc = calibrated(4, 16, &[]);
assert_eq!(alloc.assign(Some(5000)), 4);
}
#[test]
fn no_history_after_calibration_returns_fair() {
let alloc = calibrated(4, 16, &[100, 10_000]);
assert_eq!(alloc.assign(None), 4);
}
#[test]
fn safe_package_allocate_and_jobs() {
let mut mj = PkgMakeJobs::new(true);
assert!(mj.safe());
mj.allocate(8);
assert_eq!(mj.allocated(), Some(8));
mj.set_jobs(6);
assert_eq!(mj.jobs(), Some(6));
}
#[test]
fn serde_roundtrip() {
let mut mj = PkgMakeJobs::new(true);
mj.set_jobs(4);
let json = serde_json::to_string(&mj).expect("serialize");
assert_eq!(json, "4");
let de: PkgMakeJobs = serde_json::from_str(&json).expect("deserialize");
assert!(de.safe());
assert_eq!(de.jobs(), Some(4));
let mj = PkgMakeJobs::new(false);
let json = serde_json::to_string(&mj).expect("serialize null");
assert_eq!(json, "null");
let de: PkgMakeJobs = serde_json::from_str(&json).expect("deserialize null");
assert!(!de.safe());
assert_eq!(de.jobs(), None);
}
#[test]
fn jobs_less_than_workers() {
let alloc = Allocator::new(4, 1);
assert_eq!(alloc.assign(None), 1, "fair clamped to 1");
assert_eq!(alloc.assign(Some(1000)), 1, "clamped to total");
}
#[test]
fn max_jobs_clamped_to_total() {
let alloc = calibrated(1, 3, &[100, 100_000]);
assert_eq!(alloc.assign(Some(100_000)), 3, "max clamped to total");
}
}