use bob::PackageNode;
use bob::scheduler::Scheduler;
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::{Hash, Hasher};
use std::io::BufRead;
use std::sync::{Arc, Mutex, OnceLock, mpsc};
use std::task::Poll;
use std::time::Duration;
const DEPGRAPH_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/data/depgraph.zst");
struct DepGraph {
incoming: HashMap<String, HashSet<String>>,
reverse_deps: HashMap<String, HashSet<String>>,
pkg_count: usize,
edge_count: usize,
}
static DEPGRAPH: OnceLock<DepGraph> = OnceLock::new();
fn load_depgraph() -> &'static DepGraph {
DEPGRAPH.get_or_init(|| {
let file = std::fs::File::open(DEPGRAPH_PATH).expect("failed to open depgraph.zst");
let reader = std::io::BufReader::new(file);
let decoder = zstd::Decoder::new(reader).expect("failed to create zstd decoder");
let lines = std::io::BufReader::new(decoder);
let mut incoming: HashMap<String, HashSet<String>> = HashMap::new();
let mut reverse_deps: HashMap<String, HashSet<String>> = HashMap::new();
let mut edge_count = 0;
for line in lines.lines() {
let line = line.expect("failed to read line");
let line = line.trim();
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.splitn(2, " -> ").collect();
assert_eq!(parts.len(), 2, "malformed edge: {}", line);
let dep = parts[0].to_string();
let dependent = parts[1].to_string();
incoming
.entry(dependent.clone())
.or_default()
.insert(dep.clone());
incoming.entry(dep.clone()).or_default();
reverse_deps
.entry(dep)
.or_default()
.insert(dependent.clone());
reverse_deps.entry(dependent).or_default();
edge_count += 1;
}
let pkg_count = incoming.len();
DepGraph {
incoming,
reverse_deps,
pkg_count,
edge_count,
}
})
}
fn load_depgraph_zst(path: &str) -> DepGraph {
let file = std::fs::File::open(path).unwrap_or_else(|e| panic!("open {}: {}", path, e));
let reader = std::io::BufReader::new(file);
let decoder = zstd::Decoder::new(reader).unwrap_or_else(|e| panic!("zstd {}: {}", path, e));
let lines = std::io::BufReader::new(decoder);
let mut incoming: HashMap<String, HashSet<String>> = HashMap::new();
let mut reverse_deps: HashMap<String, HashSet<String>> = HashMap::new();
let mut edge_count = 0;
for line in lines.lines() {
let line = line.expect("failed to read line");
let line = line.trim();
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.splitn(2, " -> ").collect();
assert_eq!(parts.len(), 2, "malformed edge: {}", line);
let dep = parts[0].to_string();
let dependent = parts[1].to_string();
incoming
.entry(dependent.clone())
.or_default()
.insert(dep.clone());
incoming.entry(dep.clone()).or_default();
reverse_deps
.entry(dep)
.or_default()
.insert(dependent.clone());
reverse_deps.entry(dependent).or_default();
edge_count += 1;
}
let pkg_count = incoming.len();
DepGraph {
incoming,
reverse_deps,
pkg_count,
edge_count,
}
}
fn default_packages(g: &DepGraph) -> HashMap<String, PackageNode<String>> {
g.incoming
.iter()
.map(|(k, deps)| {
(
k.clone(),
PackageNode {
deps: deps.clone(),
pbulk_weight: 100,
cpu_time: 0,
},
)
})
.collect()
}
fn new_scheduler(g: &DepGraph) -> Scheduler<String> {
Scheduler::from_graph(default_packages(g))
}
fn pkg_sleep(pkg: &str, workers: usize) -> Duration {
let mut hasher = std::hash::DefaultHasher::new();
pkg.hash(&mut hasher);
let h = hasher.finish();
Duration::from_micros((h % 100) * workers as u64)
}
fn transitive_dependents(g: &DepGraph, roots: &HashSet<String>) -> HashSet<String> {
let mut result = roots.clone();
let mut queue: VecDeque<String> = roots.iter().cloned().collect();
while let Some(pkg) = queue.pop_front() {
if let Some(rdeps) = g.reverse_deps.get(&pkg) {
for rdep in rdeps {
if result.insert(rdep.clone()) {
queue.push_back(rdep.clone());
}
}
}
}
result
}
fn most_depended_on(g: &DepGraph, count: usize) -> Vec<String> {
let mut pkgs: Vec<_> = g
.reverse_deps
.iter()
.map(|(pkg, rdeps)| (pkg.clone(), rdeps.len()))
.collect();
pkgs.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
pkgs.into_iter().take(count).map(|(pkg, _)| pkg).collect()
}
fn run_build(workers: usize, fail_set: &HashSet<String>) {
let g = load_depgraph();
let expected_failed = transitive_dependents(g, fail_set);
let expected_success = g.pkg_count - expected_failed.len();
let sched = Arc::new(Mutex::new(new_scheduler(g)));
let built_set: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
let incoming = g.incoming.clone();
let (done_tx, done_rx) = mpsc::channel::<(String, bool)>();
let mut handles = Vec::new();
for _ in 0..workers {
let sched = Arc::clone(&sched);
let built_set = Arc::clone(&built_set);
let incoming = incoming.clone();
let done_tx = done_tx.clone();
let fail_set = fail_set.clone();
handles.push(std::thread::spawn(move || {
loop {
let pkg = {
let mut s = sched.lock().expect("lock poisoned");
match s.poll() {
Poll::Ready(Some(sp)) => sp.pkg,
Poll::Ready(None) => return,
Poll::Pending => {
drop(s);
std::thread::sleep(Duration::from_millis(1));
continue;
}
}
};
if let Some(deps) = incoming.get(&pkg) {
let built = built_set.lock().expect("lock poisoned");
for dep in deps {
assert!(
built.contains(dep),
"building {} but dependency {} not yet completed",
pkg,
dep
);
}
}
if fail_set.contains(&pkg) {
done_tx.send((pkg, false)).expect("channel closed");
} else {
if workers > 1 {
std::thread::sleep(pkg_sleep(&pkg, workers));
}
built_set.lock().expect("lock poisoned").insert(pkg.clone());
done_tx.send((pkg, true)).expect("channel closed");
}
}
}));
}
drop(done_tx);
let mut succeeded = 0usize;
let mut all_failed: HashSet<String> = HashSet::new();
for (pkg, success) in done_rx {
let mut s = sched.lock().expect("lock poisoned");
if success {
s.mark_success(&pkg);
succeeded += 1;
} else {
let indirect = s.mark_failure(&pkg);
all_failed.insert(pkg);
for p in indirect {
all_failed.insert(p);
}
}
}
for h in handles {
h.join().expect("worker thread panicked");
}
assert_eq!(
succeeded, expected_success,
"expected {} successes, got {}",
expected_success, succeeded
);
assert_eq!(
all_failed, expected_failed,
"failed set does not match expected transitive closure"
);
let built = built_set.lock().expect("lock poisoned");
for pkg in &all_failed {
assert!(
!built.contains(pkg),
"failed package {} appears in built set",
pkg
);
}
}
#[test]
fn depgraph_loads() {
let g = load_depgraph();
assert!(
g.pkg_count > 25000,
"expected >25k packages, got {}",
g.pkg_count
);
assert!(
g.edge_count > 100000,
"expected >100k edges, got {}",
g.edge_count
);
}
#[test]
fn depgraph_1_worker() {
run_build(1, &HashSet::new());
}
#[test]
fn depgraph_2_workers() {
run_build(2, &HashSet::new());
}
#[test]
fn depgraph_4_workers() {
run_build(4, &HashSet::new());
}
#[test]
fn depgraph_32_workers() {
run_build(32, &HashSet::new());
}
#[test]
fn depgraph_128_workers() {
run_build(128, &HashSet::new());
}
fn phase_ticks(pkg: &str, phase: usize) -> u32 {
let mut hasher = std::hash::DefaultHasher::new();
pkg.hash(&mut hasher);
phase.hash(&mut hasher);
let h = hasher.finish();
let max = if phase == 0 { 3 } else { 10 };
(h % max) as u32 + 1
}
struct PkgTiming {
overhead_pre_ms: u32,
configure_ms: u32,
build_ms: u32,
overhead_post_ms: u32,
cpu_configure_ms: u32,
cpu_build_ms: u32,
duration_ms: u32,
history_jobs: u32,
}
struct HistoryData {
timings: HashMap<String, PkgTiming>,
unsafe_pkgs: HashSet<String>,
}
fn load_history(path: &str) -> HistoryData {
let file = std::fs::File::open(path).unwrap_or_else(|e| panic!("open {}: {}", path, e));
let reader = std::io::BufReader::new(file);
let decoder = zstd::Decoder::new(reader).unwrap_or_else(|e| panic!("zstd {}: {}", path, e));
let mut timings = HashMap::new();
let mut unsafe_pkgs = HashSet::new();
let parse = |s: &str| -> u32 { s.parse::<u32>().unwrap_or(0) };
for line in std::io::BufReader::new(decoder).lines() {
let line = line.expect("read line");
let fields: Vec<&str> = line.split(',').collect();
if fields.len() < 22 || fields[0] == "timestamp" {
continue;
}
let pkgname = fields[2].to_string();
let is_unsafe = fields[5] == "-";
if is_unsafe {
unsafe_pkgs.insert(pkgname.clone());
}
let overhead_pre_ms = parse(fields[8]) + parse(fields[9]) + parse(fields[10]);
let configure_ms = parse(fields[11]);
let build_ms = parse(fields[12]);
let overhead_post_ms =
parse(fields[13]) + parse(fields[14]) + parse(fields[15]) + parse(fields[16]);
let cpu_configure_ms = parse(fields[20]);
let cpu_build_ms = parse(fields[21]);
let duration_ms = parse(fields[6]);
let history_jobs = if is_unsafe {
1
} else {
fields[5].parse::<u32>().unwrap_or(1)
};
timings.insert(
pkgname,
PkgTiming {
overhead_pre_ms,
configure_ms,
build_ms,
overhead_post_ms,
cpu_configure_ms,
cpu_build_ms,
duration_ms,
history_jobs,
},
);
}
HistoryData {
timings,
unsafe_pkgs,
}
}
const PHASE_OVERHEAD_PRE: usize = 0;
const PHASE_CONFIGURE: usize = 1;
const PHASE_BUILD: usize = 2;
const PHASE_OVERHEAD_POST: usize = 3;
const PHASE_COUNT_TIMED: usize = 4;
const PHASE_COUNT_HASH: usize = 2;
struct SimConfig<'a> {
build_threads: usize,
max_jobs: usize,
graph: Option<&'a DepGraph>,
unsafe_pkgs: &'a HashSet<String>,
timings: Option<&'a HashMap<String, PkgTiming>>,
weights: HashMap<String, usize>,
verbose: bool,
min_utilization: f64,
fixed_jobs: Option<usize>,
jobs_override: Option<HashMap<String, usize>>,
}
struct SimResult {
ticks: u64,
utilization: f64,
_completed: usize,
peak_cores: usize,
overcommit_ticks: u64,
}
fn run_make_jobs_sim(
build_threads: usize,
max_jobs: usize,
graph: Option<&DepGraph>,
unsafe_pkgs: &HashSet<String>,
timings: Option<&HashMap<String, PkgTiming>>,
) -> SimResult {
let weights = timings.map(weights_from_history).unwrap_or_default();
run_sim(&SimConfig {
build_threads,
max_jobs,
graph,
unsafe_pkgs,
timings,
weights,
verbose: graph.is_some(),
min_utilization: 38.0,
fixed_jobs: None,
jobs_override: None,
})
}
fn run_sim(cfg: &SimConfig<'_>) -> SimResult {
let loaded;
let g = match cfg.graph {
Some(g) => g,
None => {
loaded = load_depgraph();
loaded
}
};
let packages: HashMap<String, PackageNode<String>> = g
.incoming
.iter()
.map(|(k, deps)| {
let pw = cfg.weights.get(k).copied().unwrap_or(100);
(
k.clone(),
PackageNode {
deps: deps.clone(),
pbulk_weight: pw,
cpu_time: 0,
},
)
})
.collect();
let mut sched = Scheduler::from_graph(packages);
let timings = cfg.timings;
let unsafe_pkgs = cfg.unsafe_pkgs;
let max_jobs = cfg.max_jobs;
let build_threads = cfg.build_threads;
let verbose = cfg.verbose;
let fixed_jobs = cfg.fixed_jobs;
let jobs_override = &cfg.jobs_override;
let phase_count = if timings.is_some() {
PHASE_COUNT_TIMED
} else {
PHASE_COUNT_HASH
};
struct Active {
phase: usize,
ticks_left: u32,
jobs: usize,
}
let mut active: HashMap<String, Active> = HashMap::new();
let mut total_job_ticks: u64 = 0;
let mut total_ticks: u64 = 0;
let mut histogram: Vec<u64> = Vec::new();
let mut peak_cores: usize = 0;
let mut overcommit_ticks: u64 = 0;
let mut completed = 0usize;
let mut max_active = 0usize;
let get_ticks = |pkg: &str, phase: usize, allocated_jobs: usize| -> u32 {
if let Some(t) = timings {
if let Some(pt) = t.get(pkg) {
let (wall_ms, cpu_ms) = match phase {
PHASE_CONFIGURE => (pt.configure_ms, pt.cpu_configure_ms),
PHASE_BUILD => (pt.build_ms, pt.cpu_build_ms),
PHASE_OVERHEAD_PRE => (pt.overhead_pre_ms, 0),
PHASE_OVERHEAD_POST => (pt.overhead_post_ms, 0),
_ => (0, 0),
};
let j = allocated_jobs as f64;
let hj = pt.history_jobs as f64;
if cpu_ms > wall_ms && hj > 1.0 && j != hj {
let wall = wall_ms as f64;
let cpu = cpu_ms as f64;
let serial = (wall - cpu / hj) / (1.0 - 1.0 / hj);
let serial = serial.max(0.0);
let parallel = (cpu - serial).max(0.0);
let predicted = serial + parallel / j;
(predicted / 1000.0).ceil().max(1.0) as u32
} else {
wall_ms.div_ceil(1000).max(1)
}
} else {
phase_ticks(pkg, phase)
}
} else {
phase_ticks(pkg, phase)
}
};
let is_parallel_phase = |phase: usize| -> bool {
if timings.is_some() {
phase == PHASE_CONFIGURE || phase == PHASE_BUILD
} else {
true
}
};
let fair_share = max_jobs / build_threads.max(1);
let phase_jobs = |pkg: &str, phase: usize| -> usize {
let dominated = unsafe_pkgs.contains(pkg);
if is_parallel_phase(phase) && !dominated {
if let Some(overrides) = jobs_override {
return overrides.get(pkg).copied().unwrap_or(fair_share);
}
if let Some(fj) = fixed_jobs {
fj
} else {
fair_share
}
} else {
1
}
};
loop {
while active.len() < build_threads {
match sched.poll() {
std::task::Poll::Ready(Some(sp)) => {
let first_phase = if timings.is_some() {
PHASE_OVERHEAD_PRE
} else {
0
};
let jobs = phase_jobs(&sp.pkg, first_phase);
let ticks = get_ticks(&sp.pkg, first_phase, jobs);
active.insert(
sp.pkg,
Active {
phase: first_phase,
ticks_left: ticks,
jobs,
},
);
}
_ => break,
}
}
if active.is_empty() {
break;
}
if verbose {
let used: usize = active.values().map(|a| a.jobs).sum();
if used < max_jobs {
let phase_char = |p: usize| -> &'static str {
match p {
PHASE_OVERHEAD_PRE => "pre",
PHASE_CONFIGURE => "conf",
PHASE_BUILD => "bld",
PHASE_OVERHEAD_POST => "post",
_ => "?",
}
};
let mut allocs: Vec<String> = active
.iter()
.map(|(p, a)| {
let short = p
.find(|c: char| c.is_ascii_digit())
.map(|i| &p[..i.max(1)])
.unwrap_or(p)
.trim_end_matches('-');
if timings.is_some() {
format!("{}={}:{}", short, a.jobs, phase_char(a.phase))
} else {
format!("{}={}", short, a.jobs)
}
})
.collect();
allocs.sort();
let pw = sched.queued_count();
let marker = if active.len() < build_threads {
" (draining)"
} else {
""
};
eprintln!(
" tick {:>4}: {} = {}/{} pw={}{}",
total_ticks,
allocs.join(" + "),
used,
max_jobs,
pw,
marker
);
}
}
max_active = max_active.max(active.len());
let used: usize = active.values().map(|a| a.jobs).sum();
if used >= histogram.len() {
histogram.resize(used + 1, 0);
}
histogram[used] += 1;
total_job_ticks += used as u64;
total_ticks += 1;
if used > peak_cores {
peak_cores = used;
}
if used > 16 {
overcommit_ticks += 1;
}
let mut finished_phase: Vec<String> = Vec::new();
for (pkg, a) in active.iter_mut() {
a.ticks_left -= 1;
if a.ticks_left == 0 {
finished_phase.push(pkg.clone());
}
}
let mut done_pkgs: Vec<String> = Vec::new();
for pkg in finished_phase {
let a = active.get_mut(&pkg).expect("active");
let next_phase = a.phase + 1;
if next_phase < phase_count {
let jobs = phase_jobs(&pkg, next_phase);
let ticks = get_ticks(&pkg, next_phase, jobs);
a.phase = next_phase;
a.ticks_left = ticks;
a.jobs = jobs;
} else {
done_pkgs.push(pkg);
}
}
for pkg in done_pkgs {
active.remove(&pkg);
sched.mark_success(&pkg);
completed += 1;
}
}
let utilization = if total_ticks > 0 {
total_job_ticks as f64 / (total_ticks as f64 * max_jobs as f64) * 100.0
} else {
0.0
};
if verbose {
eprintln!(
"\n=== MAKE_JOBS simulation: {} threads, {} max_jobs, {} packages ===",
build_threads, max_jobs, completed
);
if timings.is_some() {
let mins = total_ticks / 60;
let secs = total_ticks % 60;
eprintln!("Simulated wall time: {}m{}s", mins, secs);
}
eprintln!(
"Utilization: {:.1}% ({} job-ticks / {} tick-slots)",
utilization,
total_job_ticks,
total_ticks * max_jobs as u64
);
eprintln!("Ticks: {}, max concurrent: {}", total_ticks, max_active);
eprintln!(
"Peak allocated: {} cores ({}x overcommit)",
peak_cores,
if max_jobs > 0 {
format!("{:.1}", peak_cores as f64 / max_jobs as f64)
} else {
"-".to_string()
}
);
let at_or_below: u64 = histogram.iter().take(max_jobs + 1).sum();
let above: u64 = histogram.iter().skip(max_jobs + 1).sum();
eprintln!(
"Time at/below {} cores: {} ticks ({:.1}%), above: {} ticks ({:.1}%)",
max_jobs,
at_or_below,
at_or_below as f64 / total_ticks as f64 * 100.0,
above,
above as f64 / total_ticks as f64 * 100.0,
);
eprintln!("Core usage histogram:");
let max_count = *histogram.iter().max().unwrap_or(&1);
for (cores, &count) in histogram.iter().enumerate() {
if count > 0 {
let bar_len = (count * 50 / max_count) as usize;
let bar: String = "#".repeat(bar_len);
let marker = if cores == max_jobs {
" <-- max_jobs"
} else {
""
};
eprintln!(
" {:>3} cores: {:>6} ticks {}{}",
cores, count, bar, marker
);
}
}
}
assert!(
utilization > cfg.min_utilization,
"Utilization too low: {:.1}% (minimum {:.1}%)",
utilization,
cfg.min_utilization
);
SimResult {
ticks: total_ticks,
utilization,
_completed: completed,
peak_cores,
overcommit_ticks,
}
}
#[test]
fn depgraph_make_jobs_full() {
run_make_jobs_sim(4, 16, None, &HashSet::new(), None);
}
const MUTT_DEPGRAPH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/data/depgraph-mutt.zst");
const MUTT_HISTORY: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/data/history-mutt.zst");
const BULK_LARGE_DEPGRAPH: &str = concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/data/depgraph-bulk-large.zst"
);
const BULK_LARGE_HISTORY: &str = concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/data/history-bulk-large.zst"
);
#[test]
fn depgraph_make_jobs_mutt() {
let g = load_depgraph_zst(MUTT_DEPGRAPH);
eprintln!(
"\nmutt build: {} packages, {} edges",
g.pkg_count, g.edge_count
);
let unsafe_pkgs: HashSet<String> = [
"cyrus-sasl-2.1.28nb2",
"gnupg2-2.4.9nb1",
"libusb1-1.0.29",
"lynx-2.9.2nb6",
]
.iter()
.map(|s| s.to_string())
.collect();
run_make_jobs_sim(4, 16, Some(&g), &unsafe_pkgs, None);
}
#[test]
fn depgraph_make_jobs_mutt_timed() {
let g = load_depgraph_zst(MUTT_DEPGRAPH);
let history = load_history(MUTT_HISTORY);
let unsafe_list: Vec<&str> = history.unsafe_pkgs.iter().map(|s| s.as_str()).collect();
eprintln!(
"\nmutt build (timed): {} packages, {} edges, {} timings, {} unsafe {:?}",
g.pkg_count,
g.edge_count,
history.timings.len(),
history.unsafe_pkgs.len(),
unsafe_list
);
run_make_jobs_sim(
4,
16,
Some(&g),
&history.unsafe_pkgs,
Some(&history.timings),
);
}
fn weights_from_history(timings: &HashMap<String, PkgTiming>) -> HashMap<String, usize> {
timings
.iter()
.map(|(k, v)| (k.clone(), (v.duration_ms / 1000).max(1) as usize))
.collect()
}
#[allow(clippy::too_many_arguments)]
fn compute_jobs_override(
pkgs: &HashMap<String, HashSet<String>>,
timings: &HashMap<String, PkgTiming>,
weights: &HashMap<String, usize>,
time_factor: f64,
weight_factor: f64,
percentile: f64,
base_jobs: usize,
boost_jobs: usize,
) -> HashMap<String, usize> {
let mut scores: Vec<(String, f64)> = pkgs
.keys()
.map(|pkg| {
let build_secs = timings
.get(pkg)
.map(|t| t.build_ms as f64 / 1000.0)
.unwrap_or(0.0);
let w = weights.get(pkg).copied().unwrap_or(100) as f64;
let score = build_secs * time_factor + w * weight_factor;
(pkg.clone(), score)
})
.collect();
scores.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let threshold_idx = ((scores.len() as f64) * percentile / 100.0) as usize;
let threshold = scores
.get(threshold_idx)
.map(|(_, s)| *s)
.unwrap_or(f64::MAX);
scores
.into_iter()
.map(|(pkg, score)| {
let jobs = if score >= threshold {
boost_jobs
} else {
base_jobs
};
(pkg, jobs)
})
.collect()
}
fn fmt_time(ticks: u64) -> String {
format!("{}m{:02}s", ticks / 60, ticks % 60)
}
#[test]
fn depgraph_make_jobs_mutt_verbose() {
let g = load_depgraph_zst(MUTT_DEPGRAPH);
let history = load_history(MUTT_HISTORY);
let weights = weights_from_history(&history.timings);
run_sim(&SimConfig {
build_threads: 4,
max_jobs: 16,
graph: Some(&g),
unsafe_pkgs: &history.unsafe_pkgs,
timings: Some(&history.timings),
weights,
verbose: true,
min_utilization: 0.0,
fixed_jobs: None,
jobs_override: None,
});
}
#[test]
fn depgraph_make_jobs_mutt_experiments() {
let g = load_depgraph_zst(MUTT_DEPGRAPH);
let history = load_history(MUTT_HISTORY);
let weights = weights_from_history(&history.timings);
eprintln!(
"\n=== mutt build experiments ({} packages) ===",
g.pkg_count
);
eprintln!("Actual build time: 70m23s (4 workers x MAKE_JOBS=4)\n");
eprintln!(
"{:<55} {:>8} {:>8} {:>6} {:>5} {:>6}",
"Configuration", "Wall", "vs base", "Util%", "Peak", ">16"
);
eprintln!("{}", "-".repeat(95));
struct Experiment {
label: &'static str,
threads: usize,
max_jobs: usize,
use_weights: bool,
fixed_jobs: Option<usize>,
jobs_override: Option<HashMap<String, usize>>,
}
let ov = |tf: f64, wf: f64, pct: f64, base: usize, boost: usize| {
compute_jobs_override(
&g.incoming,
&history.timings,
&weights,
tf,
wf,
pct,
base,
boost,
)
};
let experiments = [
Experiment {
label: "ACTUAL: fixed 4 workers x MAKE_JOBS=4",
threads: 4,
max_jobs: 16,
use_weights: false,
fixed_jobs: Some(4),
jobs_override: None,
},
Experiment {
label: "dynamic scheduler (score-based, mutt)",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: None,
},
Experiment {
label: "time: top 10% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 0.0, 90.0, 4, 8)),
},
Experiment {
label: "time: top 10% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 0.0, 90.0, 4, 16)),
},
Experiment {
label: "weight: top 10% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(0.0, 1.0, 90.0, 4, 8)),
},
Experiment {
label: "weight: top 10% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(0.0, 1.0, 90.0, 4, 16)),
},
Experiment {
label: "combined: top 10% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 90.0, 4, 8)),
},
Experiment {
label: "combined: top 10% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 90.0, 4, 16)),
},
Experiment {
label: "combined: top 5% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 95.0, 4, 16)),
},
Experiment {
label: "combined: top 20% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 80.0, 4, 8)),
},
];
{
let mut path_time: HashMap<String, u64> = HashMap::new();
let mut topo_order: Vec<String> = Vec::new();
let mut remaining: HashMap<String, usize> = g
.incoming
.iter()
.map(|(k, v)| (k.clone(), v.len()))
.collect();
let mut queue: VecDeque<String> = remaining
.iter()
.filter(|(_, v)| **v == 0)
.map(|(k, _)| k.clone())
.collect();
while let Some(pkg) = queue.pop_front() {
topo_order.push(pkg.clone());
if let Some(rdeps) = g.reverse_deps.get(&pkg) {
for rdep in rdeps {
if let Some(c) = remaining.get_mut(rdep) {
*c -= 1;
if *c == 0 {
queue.push_back(rdep.clone());
}
}
}
}
}
for pkg in &topo_order {
let pt = history.timings.get(pkg);
let pkg_secs = if let Some(pt) = pt {
let pre = pt.overhead_pre_ms.div_ceil(1000) as u64;
let post = pt.overhead_post_ms.div_ceil(1000) as u64;
let conf_wall = if pt.cpu_configure_ms > pt.configure_ms && pt.history_jobs > 1 {
let hj = pt.history_jobs as f64;
let serial = ((pt.configure_ms as f64 - pt.cpu_configure_ms as f64 / hj)
/ (1.0 - 1.0 / hj))
.max(0.0);
let parallel = (pt.cpu_configure_ms as f64 - serial).max(0.0);
((serial + parallel / 16.0) / 1000.0).ceil().max(1.0) as u64
} else {
pt.configure_ms.div_ceil(1000) as u64
};
let build_wall = if pt.cpu_build_ms > pt.build_ms && pt.history_jobs > 1 {
let hj = pt.history_jobs as f64;
let serial = ((pt.build_ms as f64 - pt.cpu_build_ms as f64 / hj)
/ (1.0 - 1.0 / hj))
.max(0.0);
let parallel = (pt.cpu_build_ms as f64 - serial).max(0.0);
((serial + parallel / 16.0) / 1000.0).ceil().max(1.0) as u64
} else {
pt.build_ms.div_ceil(1000) as u64
};
pre + conf_wall + build_wall + post
} else {
5
};
let dep_max = g
.incoming
.get(pkg)
.map(|deps| {
deps.iter()
.filter_map(|d| path_time.get(d))
.max()
.copied()
.unwrap_or(0)
})
.unwrap_or(0);
path_time.insert(pkg.clone(), dep_max + pkg_secs);
}
let critical = path_time.values().max().copied().unwrap_or(0);
eprintln!(
"Theoretical minimum (critical path @16 jobs): {}\n",
fmt_time(critical)
);
}
let mut baseline_ticks = 0u64;
for exp in &experiments {
let result = run_sim(&SimConfig {
build_threads: exp.threads,
max_jobs: exp.max_jobs,
graph: Some(&g),
unsafe_pkgs: &history.unsafe_pkgs,
timings: Some(&history.timings),
weights: if exp.use_weights {
weights.clone()
} else {
HashMap::new()
},
verbose: false,
min_utilization: 0.0,
fixed_jobs: exp.fixed_jobs,
jobs_override: exp.jobs_override.clone(),
});
if baseline_ticks == 0 {
baseline_ticks = result.ticks;
}
let diff = result.ticks as i64 - baseline_ticks as i64;
let diff_str = if diff == 0 {
"--".to_string()
} else if diff > 0 {
format!("+{}s", diff)
} else {
format!("{}s", diff)
};
let oc_str = if result.overcommit_ticks > 0 {
format!("{:>5}s", result.overcommit_ticks)
} else {
" -".to_string()
};
eprintln!(
"{:<55} {:>8} {:>8} {:>5.1}% {:>5} {}",
exp.label,
fmt_time(result.ticks),
diff_str,
result.utilization,
result.peak_cores,
oc_str,
);
}
eprintln!();
}
#[test]
fn depgraph_make_jobs_bulk_large() {
let g = load_depgraph_zst(BULK_LARGE_DEPGRAPH);
eprintln!(
"\nbulk-large build: {} packages, {} edges",
g.pkg_count, g.edge_count
);
run_make_jobs_sim(4, 16, Some(&g), &HashSet::new(), None);
}
#[test]
fn depgraph_make_jobs_bulk_large_timed() {
let g = load_depgraph_zst(BULK_LARGE_DEPGRAPH);
let history = load_history(BULK_LARGE_HISTORY);
let unsafe_list: Vec<&str> = history.unsafe_pkgs.iter().map(|s| s.as_str()).collect();
eprintln!(
"\nbulk-large build (timed): {} packages, {} edges, {} timings, {} unsafe {:?}",
g.pkg_count,
g.edge_count,
history.timings.len(),
history.unsafe_pkgs.len(),
unsafe_list
);
run_make_jobs_sim(
4,
16,
Some(&g),
&history.unsafe_pkgs,
Some(&history.timings),
);
}
#[test]
fn depgraph_make_jobs_bulk_large_verbose() {
let g = load_depgraph_zst(BULK_LARGE_DEPGRAPH);
let history = load_history(BULK_LARGE_HISTORY);
let weights = weights_from_history(&history.timings);
run_sim(&SimConfig {
build_threads: 4,
max_jobs: 16,
graph: Some(&g),
unsafe_pkgs: &history.unsafe_pkgs,
timings: Some(&history.timings),
weights,
verbose: true,
min_utilization: 0.0,
fixed_jobs: None,
jobs_override: None,
});
}
#[test]
fn depgraph_make_jobs_bulk_large_experiments() {
let g = load_depgraph_zst(BULK_LARGE_DEPGRAPH);
let history = load_history(BULK_LARGE_HISTORY);
let weights = weights_from_history(&history.timings);
eprintln!(
"\n=== bulk-large build experiments ({} packages) ===\n",
g.pkg_count
);
eprintln!(
"{:<55} {:>8} {:>8} {:>6} {:>5} {:>6}",
"Configuration", "Wall", "vs base", "Util%", "Peak", ">16"
);
eprintln!("{}", "-".repeat(95));
struct Experiment {
label: &'static str,
threads: usize,
max_jobs: usize,
use_weights: bool,
fixed_jobs: Option<usize>,
jobs_override: Option<HashMap<String, usize>>,
}
let ov = |tf: f64, wf: f64, pct: f64, base: usize, boost: usize| {
compute_jobs_override(
&g.incoming,
&history.timings,
&weights,
tf,
wf,
pct,
base,
boost,
)
};
let experiments = [
Experiment {
label: "ACTUAL: fixed 4 workers x MAKE_JOBS=4",
threads: 4,
max_jobs: 16,
use_weights: false,
fixed_jobs: Some(4),
jobs_override: None,
},
Experiment {
label: "dynamic scheduler (score-based, bulk)",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: None,
},
Experiment {
label: "time: top 10% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 0.0, 90.0, 4, 8)),
},
Experiment {
label: "time: top 10% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 0.0, 90.0, 4, 16)),
},
Experiment {
label: "weight: top 10% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(0.0, 1.0, 90.0, 4, 8)),
},
Experiment {
label: "weight: top 10% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(0.0, 1.0, 90.0, 4, 16)),
},
Experiment {
label: "combined: top 10% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 90.0, 4, 8)),
},
Experiment {
label: "combined: top 10% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 90.0, 4, 16)),
},
Experiment {
label: "combined: top 5% -> 16, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 95.0, 4, 16)),
},
Experiment {
label: "combined: top 20% -> 8, rest 4",
threads: 4,
max_jobs: 16,
use_weights: true,
fixed_jobs: None,
jobs_override: Some(ov(1.0, 1.0, 80.0, 4, 8)),
},
];
{
let mut path_time: HashMap<String, u64> = HashMap::new();
let mut topo_order: Vec<String> = Vec::new();
let mut remaining: HashMap<String, usize> = g
.incoming
.iter()
.map(|(k, v)| (k.clone(), v.len()))
.collect();
let mut queue: VecDeque<String> = remaining
.iter()
.filter(|(_, v)| **v == 0)
.map(|(k, _)| k.clone())
.collect();
while let Some(pkg) = queue.pop_front() {
topo_order.push(pkg.clone());
if let Some(rdeps) = g.reverse_deps.get(&pkg) {
for rdep in rdeps {
if let Some(c) = remaining.get_mut(rdep) {
*c -= 1;
if *c == 0 {
queue.push_back(rdep.clone());
}
}
}
}
}
for pkg in &topo_order {
let pt = history.timings.get(pkg);
let pkg_secs = if let Some(pt) = pt {
let pre = pt.overhead_pre_ms.div_ceil(1000) as u64;
let post = pt.overhead_post_ms.div_ceil(1000) as u64;
let conf_wall = if pt.cpu_configure_ms > pt.configure_ms && pt.history_jobs > 1 {
let hj = pt.history_jobs as f64;
let serial = ((pt.configure_ms as f64 - pt.cpu_configure_ms as f64 / hj)
/ (1.0 - 1.0 / hj))
.max(0.0);
let parallel = (pt.cpu_configure_ms as f64 - serial).max(0.0);
((serial + parallel / 16.0) / 1000.0).ceil().max(1.0) as u64
} else {
pt.configure_ms.div_ceil(1000) as u64
};
let build_wall = if pt.cpu_build_ms > pt.build_ms && pt.history_jobs > 1 {
let hj = pt.history_jobs as f64;
let serial = ((pt.build_ms as f64 - pt.cpu_build_ms as f64 / hj)
/ (1.0 - 1.0 / hj))
.max(0.0);
let parallel = (pt.cpu_build_ms as f64 - serial).max(0.0);
((serial + parallel / 16.0) / 1000.0).ceil().max(1.0) as u64
} else {
pt.build_ms.div_ceil(1000) as u64
};
pre + conf_wall + build_wall + post
} else {
5
};
let dep_max = g
.incoming
.get(pkg)
.map(|deps| {
deps.iter()
.filter_map(|d| path_time.get(d))
.max()
.copied()
.unwrap_or(0)
})
.unwrap_or(0);
path_time.insert(pkg.clone(), dep_max + pkg_secs);
}
let critical = path_time.values().max().copied().unwrap_or(0);
eprintln!(
"Theoretical minimum (critical path @16 jobs): {}\n",
fmt_time(critical)
);
}
let mut baseline_ticks = 0u64;
for exp in &experiments {
let result = run_sim(&SimConfig {
build_threads: exp.threads,
max_jobs: exp.max_jobs,
graph: Some(&g),
unsafe_pkgs: &history.unsafe_pkgs,
timings: Some(&history.timings),
weights: if exp.use_weights {
weights.clone()
} else {
HashMap::new()
},
verbose: false,
min_utilization: 0.0,
fixed_jobs: exp.fixed_jobs,
jobs_override: exp.jobs_override.clone(),
});
if baseline_ticks == 0 {
baseline_ticks = result.ticks;
}
let diff = result.ticks as i64 - baseline_ticks as i64;
let diff_str = if diff == 0 {
"--".to_string()
} else if diff > 0 {
format!("+{}s", diff)
} else {
format!("{}s", diff)
};
let oc_str = if result.overcommit_ticks > 0 {
format!("{:>5}s", result.overcommit_ticks)
} else {
" -".to_string()
};
eprintln!(
"{:<55} {:>8} {:>8} {:>5.1}% {:>5} {}",
exp.label,
fmt_time(result.ticks),
diff_str,
result.utilization,
result.peak_cores,
oc_str,
);
}
eprintln!();
}
#[test]
fn depgraph_make_jobs_sweep() {
let mutt_g = load_depgraph_zst(MUTT_DEPGRAPH);
let mutt_h = load_history(MUTT_HISTORY);
let mutt_w = weights_from_history(&mutt_h.timings);
let bulk_g = load_depgraph_zst(BULK_LARGE_DEPGRAPH);
let bulk_h = load_history(BULK_LARGE_HISTORY);
let bulk_w = weights_from_history(&bulk_h.timings);
let run_one = |g: &DepGraph,
h: &HistoryData,
w: &HashMap<String, usize>,
ovr: Option<HashMap<String, usize>>|
-> u64 {
run_sim(&SimConfig {
build_threads: 4,
max_jobs: 16,
graph: Some(g),
unsafe_pkgs: &h.unsafe_pkgs,
timings: Some(&h.timings),
weights: w.clone(),
verbose: false,
min_utilization: 0.0,
fixed_jobs: if ovr.is_none() { Some(4) } else { None },
jobs_override: ovr,
})
.ticks
};
let mutt_base = run_one(&mutt_g, &mutt_h, &mutt_w, None);
let bulk_base = run_one(&bulk_g, &bulk_h, &bulk_w, None);
eprintln!(
"\nBaselines: mutt={} bulk-large={}",
fmt_time(mutt_base),
fmt_time(bulk_base)
);
struct Row {
label: String,
mutt_ticks: u64,
bulk_ticks: u64,
geo_mean: f64,
}
let mut results: Vec<Row> = Vec::new();
let percentiles = [80.0, 85.0, 90.0, 95.0];
let boosts = [6, 8, 10, 12, 16];
let factors: &[(&str, f64, f64)] = &[
("time", 1.0, 0.0),
("weight", 0.0, 1.0),
("comb", 1.0, 1.0),
("comb2:1", 2.0, 1.0),
("comb1:2", 1.0, 2.0),
];
for &(fname, tf, wf) in factors {
for &pct in &percentiles {
for &boost in &boosts {
let label = format!("{} p{:.0} -> {}", fname, pct, boost);
let mutt_ovr = compute_jobs_override(
&mutt_g.incoming,
&mutt_h.timings,
&mutt_w,
tf,
wf,
pct,
4,
boost,
);
let bulk_ovr = compute_jobs_override(
&bulk_g.incoming,
&bulk_h.timings,
&bulk_w,
tf,
wf,
pct,
4,
boost,
);
let mt = run_one(&mutt_g, &mutt_h, &mutt_w, Some(mutt_ovr));
let bt = run_one(&bulk_g, &bulk_h, &bulk_w, Some(bulk_ovr));
let mutt_ratio = mutt_base as f64 / mt as f64;
let bulk_ratio = bulk_base as f64 / bt as f64;
let geo = (mutt_ratio * bulk_ratio).sqrt();
results.push(Row {
label,
mutt_ticks: mt,
bulk_ticks: bt,
geo_mean: geo,
});
}
}
}
for &(fname, tf, wf) in factors {
let label = format!("{} graduated 5/15/rest", fname);
let grad = |pkgs: &HashMap<String, HashSet<String>>,
timings: &HashMap<String, PkgTiming>,
weights: &HashMap<String, usize>|
-> HashMap<String, usize> {
let mut scores: Vec<(String, f64)> = pkgs
.keys()
.map(|pkg| {
let build_secs = timings
.get(pkg)
.map(|t| t.build_ms as f64 / 1000.0)
.unwrap_or(0.0);
let w = weights.get(pkg).copied().unwrap_or(100) as f64;
(pkg.clone(), build_secs * tf + w * wf)
})
.collect();
scores.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let p85 = scores
.get(scores.len() * 85 / 100)
.map(|x| x.1)
.unwrap_or(f64::MAX);
let p95 = scores
.get(scores.len() * 95 / 100)
.map(|x| x.1)
.unwrap_or(f64::MAX);
scores
.into_iter()
.map(|(pkg, score)| {
let jobs = if score >= p95 {
16
} else if score >= p85 {
8
} else {
4
};
(pkg, jobs)
})
.collect()
};
let mt = run_one(
&mutt_g,
&mutt_h,
&mutt_w,
Some(grad(&mutt_g.incoming, &mutt_h.timings, &mutt_w)),
);
let bt = run_one(
&bulk_g,
&bulk_h,
&bulk_w,
Some(grad(&bulk_g.incoming, &bulk_h.timings, &bulk_w)),
);
let geo = ((mutt_base as f64 / mt as f64) * (bulk_base as f64 / bt as f64)).sqrt();
results.push(Row {
label,
mutt_ticks: mt,
bulk_ticks: bt,
geo_mean: geo,
});
let label2 = format!("{} graduated 5/10/20/rest", fname);
let grad2 = |pkgs: &HashMap<String, HashSet<String>>,
timings: &HashMap<String, PkgTiming>,
weights: &HashMap<String, usize>|
-> HashMap<String, usize> {
let mut scores: Vec<(String, f64)> = pkgs
.keys()
.map(|pkg| {
let build_secs = timings
.get(pkg)
.map(|t| t.build_ms as f64 / 1000.0)
.unwrap_or(0.0);
let w = weights.get(pkg).copied().unwrap_or(100) as f64;
(pkg.clone(), build_secs * tf + w * wf)
})
.collect();
scores.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let p80 = scores
.get(scores.len() * 80 / 100)
.map(|x| x.1)
.unwrap_or(f64::MAX);
let p90 = scores
.get(scores.len() * 90 / 100)
.map(|x| x.1)
.unwrap_or(f64::MAX);
let p95 = scores
.get(scores.len() * 95 / 100)
.map(|x| x.1)
.unwrap_or(f64::MAX);
scores
.into_iter()
.map(|(pkg, score)| {
let jobs = if score >= p95 {
16
} else if score >= p90 {
12
} else if score >= p80 {
8
} else {
4
};
(pkg, jobs)
})
.collect()
};
let mt2 = run_one(
&mutt_g,
&mutt_h,
&mutt_w,
Some(grad2(&mutt_g.incoming, &mutt_h.timings, &mutt_w)),
);
let bt2 = run_one(
&bulk_g,
&bulk_h,
&bulk_w,
Some(grad2(&bulk_g.incoming, &bulk_h.timings, &bulk_w)),
);
let geo2 = ((mutt_base as f64 / mt2 as f64) * (bulk_base as f64 / bt2 as f64)).sqrt();
results.push(Row {
label: label2,
mutt_ticks: mt2,
bulk_ticks: bt2,
geo_mean: geo2,
});
}
results.sort_by(|a, b| {
b.geo_mean
.partial_cmp(&a.geo_mean)
.unwrap_or(std::cmp::Ordering::Equal)
});
eprintln!(
"\n{:<40} {:>8} {:>7} {:>10} {:>7} {:>6}",
"Configuration", "mutt", "mutt%", "bulk", "bulk%", "geo"
);
eprintln!("{}", "-".repeat(85));
for r in &results {
let mutt_pct = (1.0 - r.mutt_ticks as f64 / mutt_base as f64) * 100.0;
let bulk_pct = (1.0 - r.bulk_ticks as f64 / bulk_base as f64) * 100.0;
eprintln!(
"{:<40} {:>8} {:>+6.1}% {:>10} {:>+6.1}% {:>5.3}",
r.label,
fmt_time(r.mutt_ticks),
mutt_pct,
fmt_time(r.bulk_ticks),
bulk_pct,
r.geo_mean,
);
}
eprintln!();
}
#[test]
fn depgraph_single_failure() {
let g = load_depgraph();
let fail_pkgs: HashSet<String> = most_depended_on(g, 1).into_iter().collect();
run_build(4, &fail_pkgs);
}
#[test]
fn depgraph_multi_failure() {
let g = load_depgraph();
let fail_pkgs: HashSet<String> = most_depended_on(g, 3).into_iter().collect();
run_build(32, &fail_pkgs);
}
#[test]
fn dep_count_breaks_pbulk_weight_tie() {
let g = load_depgraph();
let mut sched = new_scheduler(g);
let mut order: Vec<(String, usize, usize)> = Vec::new();
while let Poll::Ready(Some(sp)) = sched.poll() {
let tw = sp.total_pbulk_weight;
let dc = sp.dep_count;
sched.mark_success(&sp.pkg);
order.push((sp.pkg, tw, dc));
}
let mut i = 0;
while i < order.len() {
let tw = order[i].1;
let mut j = i;
while j < order.len() && order[j].1 == tw {
j += 1;
}
for k in i..j.saturating_sub(1) {
let dc_a = order[k].2;
let dc_b = order[k + 1].2;
assert!(
dc_a >= dc_b,
"pbulk_weight={}: {} (deps={}) before {} (deps={})",
tw,
order[k].0,
dc_a,
order[k + 1].0,
dc_b
);
}
i = j;
}
}
#[test]
fn tail_proportional_allocation() {
use bob::makejobs::Allocator;
let mut packages = HashMap::new();
for name in ["A", "B", "C"] {
packages.insert(
name.to_string(),
PackageNode {
deps: HashSet::new(),
pbulk_weight: 1,
cpu_time: 0,
},
);
}
let mut sched = Scheduler::from_graph(packages);
sched.set_pkg_cpu_history(&"A".to_string(), 100_000);
sched.set_pkg_cpu_history(&"B".to_string(), 1_000);
sched.set_pkg_cpu_history(&"C".to_string(), 100);
sched.set_allocator(Allocator::new(8, 32));
let mut allocations = Vec::new();
for _ in 0..3 {
if let Poll::Ready(Some(sp)) = sched.poll() {
allocations.push((sp.pkg.clone(), sp.make_jobs.allocated().unwrap_or(0)));
}
}
let total: usize = allocations.iter().map(|(_, j)| j).sum();
assert_eq!(total, 32, "all cores allocated: {allocations:?}");
for (pkg, jobs) in &allocations {
assert!(
*jobs >= 2,
"{pkg} should get at least min_jobs: {allocations:?}"
);
}
let a = allocations.iter().find(|(p, _)| p == "A").unwrap().1;
let b = allocations.iter().find(|(p, _)| p == "B").unwrap().1;
let c = allocations.iter().find(|(p, _)| p == "C").unwrap().1;
assert!(
a > b && b > c,
"heavier packages should get more: A={a} B={b} C={c}"
);
}
#[test]
fn sole_builder_gets_all_jobs() {
use bob::makejobs::Allocator;
let mut packages = HashMap::new();
packages.insert(
"only".to_string(),
PackageNode {
deps: HashSet::new(),
pbulk_weight: 1,
cpu_time: 0,
},
);
let mut sched = Scheduler::from_graph(packages);
sched.set_pkg_cpu_history(&"only".to_string(), 100);
sched.set_allocator(Allocator::new(8, 32));
if let Poll::Ready(Some(sp)) = sched.poll() {
assert_eq!(
sp.make_jobs.allocated(),
Some(32),
"sole builder should get the entire budget"
);
} else {
panic!("expected a package from poll");
}
}
#[test]
fn tail_after_mark_success() {
use bob::makejobs::Allocator;
let mut packages = HashMap::new();
packages.insert(
"D".to_string(),
PackageNode {
deps: HashSet::new(),
pbulk_weight: 1,
cpu_time: 0,
},
);
packages.insert(
"X".to_string(),
PackageNode {
deps: HashSet::from(["D".to_string()]),
pbulk_weight: 1,
cpu_time: 0,
},
);
packages.insert(
"Y".to_string(),
PackageNode {
deps: HashSet::from(["D".to_string()]),
pbulk_weight: 1,
cpu_time: 0,
},
);
let mut sched = Scheduler::from_graph(packages);
sched.set_allocator(Allocator::new(8, 32));
sched.mark_success(&"D".to_string());
let mut allocations = Vec::new();
for _ in 0..2 {
if let Poll::Ready(Some(sp)) = sched.poll() {
allocations.push((sp.pkg.clone(), sp.make_jobs.allocated().unwrap_or(0)));
}
}
let total: usize = allocations.iter().map(|(_, j)| j).sum();
assert_eq!(total, 32, "all cores allocated: {allocations:?}");
let x = allocations.iter().find(|(p, _)| p == "X").unwrap().1;
let y = allocations.iter().find(|(p, _)| p == "Y").unwrap().1;
assert_eq!(x, y, "equal packages should get equal share: X={x} Y={y}");
}
#[test]
fn tail_after_mark_success_weighted() {
use bob::makejobs::Allocator;
let mut packages = HashMap::new();
packages.insert(
"D".to_string(),
PackageNode {
deps: HashSet::new(),
pbulk_weight: 1,
cpu_time: 0,
},
);
for name in ["A", "B", "C"] {
packages.insert(
name.to_string(),
PackageNode {
deps: HashSet::from(["D".to_string()]),
pbulk_weight: 1,
cpu_time: 0,
},
);
}
let mut sched = Scheduler::from_graph(packages);
sched.set_pkg_cpu_history(&"A".to_string(), 100_000);
sched.set_pkg_cpu_history(&"B".to_string(), 1_000);
sched.set_pkg_cpu_history(&"C".to_string(), 100);
sched.set_allocator(Allocator::new(8, 32));
sched.mark_success(&"D".to_string());
let mut allocations = Vec::new();
for _ in 0..3 {
if let Poll::Ready(Some(sp)) = sched.poll() {
allocations.push((sp.pkg.clone(), sp.make_jobs.allocated().unwrap_or(0)));
}
}
let total: usize = allocations.iter().map(|(_, j)| j).sum();
assert_eq!(total, 32, "all cores allocated: {allocations:?}");
let a = allocations.iter().find(|(p, _)| p == "A").unwrap().1;
let b = allocations.iter().find(|(p, _)| p == "B").unwrap().1;
let c = allocations.iter().find(|(p, _)| p == "C").unwrap().1;
assert_eq!((a, b, c), (19, 9, 4), "A={a} B={b} C={c}");
}
#[test]
fn tail_overcommitted_budget() {
use bob::makejobs::Allocator;
let mut packages = HashMap::new();
for name in ["A", "B", "C"] {
packages.insert(
name.to_string(),
PackageNode {
deps: HashSet::new(),
pbulk_weight: 1,
cpu_time: 0,
},
);
}
let mut sched = Scheduler::from_graph(packages);
sched.set_allocator(Allocator::new(2, 4));
for _ in 0..3 {
if let Poll::Ready(Some(sp)) = sched.poll() {
assert!(
sp.make_jobs.allocated().unwrap_or(0) >= 2,
"{} got {} jobs",
sp.pkg,
sp.make_jobs.allocated().unwrap_or(0),
);
}
}
}