use super::*;
pub fn assert_isolation(reports: &[WorkerReport], expected: &BTreeSet<usize>) -> AssertResult {
let mut r = AssertResult::pass();
for w in reports {
let bad: BTreeSet<usize> = w.cpus_used.difference(expected).copied().collect();
if !bad.is_empty() {
r.record_fail(AssertDetail::new(
DetailKind::Isolation,
format!("tid {} ran on unexpected CPUs {:?}", w.tid, bad),
));
}
}
r
}
pub(crate) fn percentile(sorted: &[u64], p: f64) -> u64 {
if sorted.is_empty() {
return 0;
}
debug_assert!(
sorted.windows(2).all(|w| w[0] <= w[1]),
"percentile() requires sorted input; got slice with out-of-order pair",
);
let n = sorted.len();
let idx = ((n as f64 * p).ceil() as usize)
.saturating_sub(1)
.min(n - 1);
sorted[idx]
}
pub fn cgroup_stats(reports: &[WorkerReport]) -> CgroupStats {
let cpus: BTreeSet<usize> = reports
.iter()
.flat_map(|w| w.cpus_used.iter().copied())
.collect();
let pcts: Vec<f64> = reports
.iter()
.filter(|w| w.wall_time_ns > 0)
.map(|w| w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0)
.collect();
let min = pcts.iter().cloned().reduce(f64::min);
let max = pcts.iter().cloned().reduce(f64::max);
let avg = if pcts.is_empty() {
None
} else {
Some(pcts.iter().sum::<f64>() / pcts.len() as f64)
};
let spread = match (min, max) {
(Some(lo), Some(hi)) => Some(hi - lo),
_ => None,
};
let worst_gap = reports.iter().max_by_key(|w| w.max_gap_ms);
let (gap_ms, gap_cpu) = worst_gap
.map(|w| (w.max_gap_ms, w.max_gap_cpu))
.unwrap_or((0, 0));
let all_latencies: Vec<u64> = reports
.iter()
.flat_map(|w| w.wake_latencies_ns.iter().copied())
.collect();
let (p99_us, median_us, lat_cv) = if all_latencies.is_empty() {
(0.0, 0.0, 0.0)
} else {
let mut sorted = all_latencies.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
let median = percentile(&sorted, 0.5) as f64 / 1000.0;
let n = all_latencies.len() as f64;
let mean_ns = all_latencies.iter().sum::<u64>() as f64 / n;
let cv = if mean_ns > 0.0 {
let variance = all_latencies
.iter()
.map(|&v| (v as f64 - mean_ns).powi(2))
.sum::<f64>()
/ n;
variance.sqrt() / mean_ns
} else {
0.0
};
(p99, median, cv)
};
let total_iters: u64 = reports.iter().map(|w| w.iterations).sum();
let run_delays: Vec<f64> = reports
.iter()
.map(|w| w.schedstat_run_delay_ns as f64 / 1000.0)
.collect();
let mean_run_delay = if run_delays.is_empty() {
0.0
} else {
run_delays.iter().sum::<f64>() / run_delays.len() as f64
};
let worst_run_delay = run_delays.iter().cloned().reduce(f64::max).unwrap_or(0.0);
let total_mig: u64 = reports.iter().map(|w| w.migration_count).sum();
let mig_ratio = if total_iters > 0 {
total_mig as f64 / total_iters as f64
} else {
0.0
};
let total_numa_pages: u64 = reports
.iter()
.map(|w| w.numa_pages.values().sum::<u64>())
.sum();
let migrated_pages: u64 = reports
.iter()
.map(|w| w.vmstat_numa_pages_migrated)
.max()
.unwrap_or(0);
let cross_node_ratio = if total_numa_pages > 0 {
migrated_pages as f64 / total_numa_pages as f64
} else {
0.0
};
CgroupStats {
cgroup_name: String::new(),
num_workers: reports.len(),
num_cpus: cpus.len(),
cpus_used: cpus,
avg_off_cpu_pct: avg,
min_off_cpu_pct: min,
max_off_cpu_pct: max,
spread,
max_gap_ms: gap_ms,
max_gap_cpu: gap_cpu,
total_migrations: total_mig,
migration_ratio: mig_ratio,
p99_wake_latency_us: p99_us,
median_wake_latency_us: median_us,
wake_latency_cv: lat_cv,
total_iterations: total_iters,
total_cpu_time_ns: reports.iter().map(|w| w.schedstat_cpu_time_ns).sum(),
mean_run_delay_us: mean_run_delay,
worst_run_delay_us: worst_run_delay,
page_locality: 0.0,
cross_node_migration_ratio: cross_node_ratio,
ext_metrics: BTreeMap::new(),
}
}
pub(crate) fn phase_cgroup_stats(
reports: &[WorkerReport],
expected_nodes: Option<&BTreeSet<usize>>,
) -> PhaseCgroupStats {
let cpus_used: BTreeSet<usize> = reports
.iter()
.flat_map(|w| w.cpus_used.iter().copied())
.collect();
let off_cpu_pcts: Vec<f64> = reports
.iter()
.filter(|w| w.wall_time_ns > 0)
.map(|w| w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0)
.collect();
let mut wake_latencies_ns: Vec<u64> = Vec::new();
let mut pooled_wake_count: u64 = 0;
for w in reports {
for &sample in &w.wake_latencies_ns {
crate::workload::reservoir_push(
&mut wake_latencies_ns,
&mut pooled_wake_count,
sample,
crate::workload::MAX_WAKE_SAMPLES,
);
}
}
let wake_sample_total: u64 = reports.iter().map(|w| w.wake_sample_total).sum();
let run_delays_ns: Vec<u64> = reports.iter().map(|w| w.schedstat_run_delay_ns).collect();
let (max_gap_ms, max_gap_cpu) = reports
.iter()
.max_by_key(|w| w.max_gap_ms)
.map(|w| (w.max_gap_ms, w.max_gap_cpu))
.unwrap_or((0, 0));
let total_migrations: u64 = reports.iter().map(|w| w.migration_count).sum();
let total_iterations: u64 = reports.iter().map(|w| w.iterations).sum();
let total_cpu_time_ns: u64 = reports.iter().map(|w| w.schedstat_cpu_time_ns).sum();
let numa_pages_total: u64 = reports
.iter()
.map(|w| w.numa_pages.values().sum::<u64>())
.sum();
let cross_node_migrated: u64 = reports
.iter()
.map(|w| w.vmstat_numa_pages_migrated)
.max()
.unwrap_or(0);
let numa_pages_local: u64 = expected_nodes
.map(|nodes| {
let mut local = 0u64;
for w in reports {
for (&node, &count) in &w.numa_pages {
if nodes.contains(&node) {
local += count;
}
}
}
local
})
.unwrap_or(0);
PhaseCgroupStats {
num_workers: reports.len(),
cpus_used,
wake_latencies_ns,
wake_sample_total,
run_delays_ns,
off_cpu_pcts,
total_migrations,
total_iterations,
total_cpu_time_ns,
numa_pages_local,
numa_pages_total,
cross_node_migrated,
max_gap_ms,
max_gap_cpu,
stripped: false,
}
}
pub(crate) fn phase_slice_to_cgroup_stats(
slice: &crate::workload::PhaseSlice,
expected_nodes: Option<&BTreeSet<usize>>,
) -> PhaseCgroupStats {
let off_cpu_pcts: Vec<f64> = if slice.wall_ns > 0 {
vec![slice.off_cpu_ns as f64 / slice.wall_ns as f64 * 100.0]
} else {
Vec::new()
};
let numa_pages_total: u64 = slice.numa_pages.values().copied().sum();
let numa_pages_local: u64 = expected_nodes
.map(|nodes| {
slice
.numa_pages
.iter()
.filter(|(node, _)| nodes.contains(node))
.map(|(_, &count)| count)
.sum()
})
.unwrap_or(0);
PhaseCgroupStats {
num_workers: 1,
cpus_used: slice.cpus_used.clone(),
wake_latencies_ns: slice.wake_latencies_ns.clone(),
wake_sample_total: slice.wake_sample_total,
run_delays_ns: vec![slice.run_delay_ns],
off_cpu_pcts,
total_migrations: slice.migration_count,
total_iterations: slice.iterations,
total_cpu_time_ns: slice.schedstat_cpu_time_ns,
numa_pages_local,
numa_pages_total,
cross_node_migrated: slice.vmstat_numa_pages_migrated,
max_gap_ms: slice.max_gap_ms,
max_gap_cpu: slice.max_gap_cpu,
stripped: false,
}
}
pub(crate) fn pool_phase_slice_stats(
slices: &[&crate::workload::PhaseSlice],
expected_nodes: Option<&BTreeSet<usize>>,
) -> PhaseCgroupStats {
let mut iter = slices
.iter()
.map(|s| phase_slice_to_cgroup_stats(s, expected_nodes));
match iter.next() {
Some(first) => iter.fold(first, PhaseCgroupStats::merge),
None => PhaseCgroupStats {
num_workers: 0,
cpus_used: BTreeSet::new(),
wake_latencies_ns: Vec::new(),
wake_sample_total: 0,
run_delays_ns: Vec::new(),
off_cpu_pcts: Vec::new(),
total_migrations: 0,
total_iterations: 0,
total_cpu_time_ns: 0,
numa_pages_local: 0,
numa_pages_total: 0,
cross_node_migrated: 0,
max_gap_ms: 0,
max_gap_cpu: 0,
stripped: false,
},
}
}
pub(crate) fn expand_backdrop_phase_buckets(
name: &str,
reports: &[WorkerReport],
expected_nodes: Option<&BTreeSet<usize>>,
) -> Vec<PhaseBucket> {
let mut by_epoch: std::collections::BTreeMap<u32, Vec<&crate::workload::PhaseSlice>> =
std::collections::BTreeMap::new();
for report in reports {
for slice in &report.phase_slices {
if slice.phase_epoch == 0 || slice.phase_epoch == u32::MAX {
continue;
}
by_epoch.entry(slice.phase_epoch).or_default().push(slice);
}
}
by_epoch
.into_iter()
.map(|(epoch, slices)| {
let mut per_cgroup = std::collections::BTreeMap::new();
per_cgroup.insert(
name.to_string(),
pool_phase_slice_stats(&slices, expected_nodes),
);
let step_index = epoch as u16;
PhaseBucket {
step_index,
label: Phase::from(step_index).to_string(),
start_ms: u64::MAX,
end_ms: 0,
sample_count: 0,
metrics: std::collections::BTreeMap::new(),
per_cgroup,
}
})
.collect()
}
pub(crate) fn step_per_cgroup_bucket(
name: &str,
reports: &[WorkerReport],
expected_nodes: Option<&BTreeSet<usize>>,
step_index: u16,
) -> PhaseBucket {
let mut per_cgroup = std::collections::BTreeMap::new();
per_cgroup.insert(
name.to_string(),
phase_cgroup_stats(reports, expected_nodes),
);
PhaseBucket {
step_index,
label: Phase::from(step_index).to_string(),
start_ms: u64::MAX,
end_ms: 0,
sample_count: 0,
metrics: std::collections::BTreeMap::new(),
per_cgroup,
}
}
pub(crate) fn scenario_stats_for_cgroup(cg: &CgroupStats) -> ScenarioStats {
ScenarioStats {
total_workers: cg.num_workers,
total_cpus: cg.num_cpus,
total_migrations: cg.total_migrations,
worst_spread: cg.spread.unwrap_or(0.0),
worst_gap_ms: cg.max_gap_ms,
worst_gap_cpu: cg.max_gap_cpu,
worst_migration_ratio: cg.migration_ratio,
total_iterations: cg.total_iterations,
worst_page_locality: cg.page_locality,
worst_cross_node_migration_ratio: cg.cross_node_migration_ratio,
ext_metrics: cg.ext_metrics.clone(),
cgroups: vec![cg.clone()],
phases: Vec::new(),
}
}
pub(crate) fn record_default_fairness(
r: &mut AssertResult,
cg: &CgroupStats,
reports: &[WorkerReport],
) {
for w in reports {
if w.work_units == 0 {
r.record_fail(AssertDetail::new(
DetailKind::Starved,
format!("tid {} starved (0 work units)", w.tid),
));
}
}
let measurable = reports.iter().filter(|w| w.wall_time_ns > 0).count();
let spread_limit = spread_threshold_pct();
if let Some(spread) = cg.spread
&& spread > spread_limit
&& measurable >= 2
{
r.record_fail(AssertDetail::new(
DetailKind::Unfair,
format!(
"unfair cgroup: spread={:.0}% ({:.0}-{:.0}%) {} workers on {} cpus (threshold {:.0}%)",
spread,
cg.min_off_cpu_pct.unwrap_or(0.0),
cg.max_off_cpu_pct.unwrap_or(0.0),
cg.num_workers,
cg.num_cpus,
spread_limit,
),
));
}
let gap_limit = gap_threshold_ms();
for w in reports {
if w.max_gap_ms > gap_limit {
r.record_fail(AssertDetail::new(
DetailKind::Stuck,
format!(
"tid {} stuck {}ms on cpu{} at +{}ms (threshold {}ms)",
w.tid, w.max_gap_ms, w.max_gap_cpu, w.max_gap_at_ms, gap_limit,
),
));
}
}
}
pub fn assert_not_starved(reports: &[WorkerReport]) -> AssertResult {
let cg = cgroup_stats(reports);
let mut r = AssertResult::pass();
record_default_fairness(&mut r, &cg, reports);
r.stats = scenario_stats_for_cgroup(&cg);
r
}
pub fn assert_throughput_parity(
reports: &[WorkerReport],
max_cv: Option<f64>,
min_rate: Option<f64>,
) -> AssertResult {
let mut r = AssertResult::pass();
if reports.is_empty() {
return r;
}
let rates: Vec<f64> = reports
.iter()
.map(|w| {
if w.cpu_time_ns == 0 {
0.0
} else {
w.work_units as f64 / (w.cpu_time_ns as f64 / 1e9)
}
})
.collect();
let n = rates.len() as f64;
let mean = rates.iter().sum::<f64>() / n;
let all_zero_cpu = reports.iter().all(|w| w.cpu_time_ns == 0);
if all_zero_cpu && (max_cv.is_some() || min_rate.is_some()) {
let mut limits: Vec<String> = Vec::with_capacity(2);
if let Some(cv_limit) = max_cv {
limits.push(format!("max_cv {cv_limit:.3}"));
}
if let Some(floor) = min_rate {
limits.push(format!("min_rate {floor:.0}"));
}
r.record_inconclusive(AssertDetail::new(
DetailKind::Benchmark,
format!(
"throughput parity inconclusive: all {} workers recorded zero cpu_time_ns — \
denominator is zero, rates cannot be computed; {} neither pass nor fail \
(was the workload able to run?)",
reports.len(),
limits.join(" + "),
),
));
return r;
}
if let Some(cv_limit) = max_cv
&& mean > 0.0
&& rates.len() >= 2
{
let variance = rates.iter().map(|r| (r - mean).powi(2)).sum::<f64>() / n;
let stddev = variance.sqrt();
let cv = stddev / mean;
if cv > cv_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"throughput CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0} work/cpu_s)"
),
));
}
}
if let Some(floor) = min_rate {
for (i, &rate) in rates.iter().enumerate() {
if reports[i].cpu_time_ns == 0 {
continue;
}
if rate < floor {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"worker {} throughput {rate:.0} work/cpu_s below floor {floor:.0}",
reports[i].tid
),
));
}
}
}
r
}
pub fn assert_benchmarks(
reports: &[WorkerReport],
max_p99_ns: Option<u64>,
max_cv: Option<f64>,
min_iter_rate: Option<f64>,
) -> AssertResult {
let mut r = AssertResult::pass();
if reports.is_empty() {
return AssertResult::skip("no worker reports — benchmark skipped");
}
let all_latencies: Vec<u64> = reports
.iter()
.flat_map(|w| w.wake_latencies_ns.iter().copied())
.collect();
if let Some(p99_limit) = max_p99_ns
&& !all_latencies.is_empty()
{
let mut sorted = all_latencies.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99);
if p99 > p99_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"p99 wake latency {p99}ns exceeds limit {p99_limit}ns ({} samples)",
sorted.len()
),
));
}
}
if let Some(cv_limit) = max_cv
&& all_latencies.len() >= 2
{
let n = all_latencies.len() as f64;
let mean = all_latencies.iter().sum::<u64>() as f64 / n;
if mean > 0.0 {
let variance = all_latencies
.iter()
.map(|&v| (v as f64 - mean).powi(2))
.sum::<f64>()
/ n;
let cv = variance.sqrt() / mean;
if cv > cv_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"wake latency CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0}ns)"
),
));
}
} else {
r.record_inconclusive(AssertDetail::new(
DetailKind::Benchmark,
format!(
"wake latency CV inconclusive: all {} sample(s) had zero mean wake \
latency — denominator is zero, CV cannot be computed; limit \
{cv_limit:.3} neither pass nor fail (did any wake event capture a \
non-zero latency?)",
all_latencies.len(),
),
));
}
}
if let Some(rate_floor) = min_iter_rate {
let mut zero_wall_count = 0usize;
for w in reports {
if w.wall_time_ns == 0 {
zero_wall_count += 1;
continue;
}
let rate = w.iterations as f64 / (w.wall_time_ns as f64 / 1e9);
if rate < rate_floor {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"worker {} iteration rate {rate:.1}/s below floor {rate_floor:.1}/s",
w.tid
),
));
}
}
if zero_wall_count == reports.len() {
r.record_inconclusive(AssertDetail::new(
DetailKind::Benchmark,
format!(
"min iteration rate inconclusive: all {} workers recorded zero wall_time_ns — \
denominator is zero, rate cannot be computed; floor {rate_floor:.1}/s \
neither pass nor fail (was the workload able to run?)",
reports.len()
),
));
}
}
r
}
pub fn assert_scx_events_clean(events: &[(&str, i64)], max_count: Option<i64>) -> AssertResult {
let mut r = AssertResult::pass();
for (name, count) in events {
let failed = match max_count {
None => *count != 0,
Some(bound) => *count < 0 || *count > bound,
};
if failed {
let bound_desc = match max_count {
None => "0".to_string(),
Some(b) => b.to_string(),
};
r.record_fail(AssertDetail::new(
DetailKind::SchedulerEvent,
format!("scx event `{name}` count {count} exceeds bound {bound_desc}",),
));
}
}
r
}
#[must_use = "AbsoluteThresholds only takes effect when passed to assert_thresholds"]
#[derive(Debug, Clone, Copy, Default)]
pub struct AbsoluteThresholds {
pub max_p99_wake_latency_ns: Option<u64>,
pub max_iteration_cost_p99_ns: Option<u64>,
pub max_migrations: Option<u64>,
pub min_work_units: Option<u64>,
}
impl AbsoluteThresholds {
pub const fn strict() -> Self {
Self {
max_p99_wake_latency_ns: Some(10_000_000),
max_iteration_cost_p99_ns: Some(1_000_000),
max_migrations: Some(1000),
min_work_units: Some(1),
}
}
pub const fn max_p99_wake_latency_ns(mut self, v: u64) -> Self {
self.max_p99_wake_latency_ns = Some(v);
self
}
pub const fn max_iteration_cost_p99_ns(mut self, v: u64) -> Self {
self.max_iteration_cost_p99_ns = Some(v);
self
}
pub const fn max_migrations(mut self, v: u64) -> Self {
self.max_migrations = Some(v);
self
}
pub const fn min_work_units(mut self, v: u64) -> Self {
self.min_work_units = Some(v);
self
}
}
pub fn assert_thresholds(
reports: &[WorkerReport],
thresholds: &AbsoluteThresholds,
) -> AssertResult {
if reports.is_empty() {
return AssertResult::skip("no worker reports to evaluate");
}
let mut r = AssertResult::pass();
if thresholds.max_p99_wake_latency_ns.is_some() {
r.merge(assert_benchmarks(
reports,
thresholds.max_p99_wake_latency_ns,
None,
None,
));
}
if let Some(cost_limit) = thresholds.max_iteration_cost_p99_ns {
let all_costs: Vec<u64> = reports
.iter()
.flat_map(|w| w.iteration_costs_ns.iter().copied())
.collect();
if !all_costs.is_empty() {
let mut sorted = all_costs.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99);
if p99 > cost_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"p99 iteration cost {p99}ns exceeds limit {cost_limit}ns ({} samples)",
sorted.len(),
),
));
}
}
}
if let Some(max_mig) = thresholds.max_migrations {
let total_mig: u64 = reports.iter().map(|w| w.migration_count).sum();
if total_mig > max_mig {
r.record_fail(AssertDetail::new(
DetailKind::Migration,
format!(
"total migrations {total_mig} exceeds limit {max_mig} ({} workers)",
reports.len(),
),
));
}
}
if let Some(min_units) = thresholds.min_work_units {
for w in reports {
if w.work_units < min_units {
r.record_fail(AssertDetail::new(
DetailKind::Starved,
format!(
"tid {} work_units {} below floor {min_units}",
w.tid, w.work_units,
),
));
}
}
}
r
}