use crate::workload::WorkerReport;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
thread_local! {
static ACTIVE_PHASE: RefCell<Option<std::borrow::Cow<'static, str>>> =
const { RefCell::new(None) };
}
pub fn current_phase_label() -> Option<std::borrow::Cow<'static, str>> {
ACTIVE_PHASE.with(|p| p.borrow().clone())
}
#[must_use = "PhaseGuard restores the prior phase on Drop — bind it to a local"]
pub struct PhaseGuard {
previous: Option<std::borrow::Cow<'static, str>>,
}
impl PhaseGuard {
pub fn install(label: impl Into<std::borrow::Cow<'static, str>>) -> Self {
let previous = ACTIVE_PHASE.with(|p| p.replace(Some(label.into())));
Self { previous }
}
pub fn install_step(zero_indexed: u16) -> Self {
Self::install(format!("Step[{}]", zero_indexed))
}
pub fn install_baseline() -> Self {
Self::install(std::borrow::Cow::Borrowed("BASELINE"))
}
}
impl Drop for PhaseGuard {
fn drop(&mut self) {
ACTIVE_PHASE.with(|p| {
*p.borrow_mut() = self.previous.take();
});
}
}
#[derive(Debug, Clone, Default)]
pub struct NumaMapsEntry {
pub addr: u64,
pub node_pages: BTreeMap<usize, u64>,
}
pub fn parse_numa_maps(content: &str) -> Vec<NumaMapsEntry> {
let mut entries = Vec::new();
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let mut parts = line.split_whitespace();
let addr = match parts.next().and_then(|s| u64::from_str_radix(s, 16).ok()) {
Some(a) => a,
None => continue,
};
let _ = parts.next();
let mut entry = NumaMapsEntry {
addr,
..Default::default()
};
for token in parts {
if let Some(rest) = token.strip_prefix('N')
&& let Some((node_str, count_str)) = rest.split_once('=')
&& let (Ok(node), Ok(count)) = (node_str.parse::<usize>(), count_str.parse::<u64>())
{
*entry.node_pages.entry(node).or_insert(0) += count;
}
}
if !entry.node_pages.is_empty() {
entries.push(entry);
}
}
entries
}
pub fn page_locality(entries: &[NumaMapsEntry], expected_nodes: &BTreeSet<usize>) -> f64 {
let mut total: u64 = 0;
let mut local: u64 = 0;
for entry in entries {
for (&node, &count) in &entry.node_pages {
total += count;
if expected_nodes.contains(&node) {
local += count;
}
}
}
if total > 0 {
local as f64 / total as f64
} else {
0.0
}
}
pub fn parse_vmstat_numa_pages_migrated(content: &str) -> Option<u64> {
for line in content.lines() {
let line = line.trim();
if let Some(rest) = line.strip_prefix("numa_pages_migrated") {
let rest = rest.trim();
if let Ok(v) = rest.parse::<u64>() {
return Some(v);
}
}
}
None
}
fn gap_threshold_ms() -> u64 {
if cfg!(debug_assertions) { 3000 } else { 2000 }
}
fn spread_threshold_pct() -> f64 {
if cfg!(debug_assertions) { 35.0 } else { 15.0 }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum DetailKind {
Starved,
Stuck,
Unfair,
Isolation,
Benchmark,
Migration,
PageLocality,
CrossNodeMigration,
SlowTier,
Monitor,
SchedulerCrashed,
SchedulerExitedCleanly,
SchedulerDiedUnknownReason,
SchedulerEvent,
Temporal,
Skip,
Other,
}
pub(crate) const SCHED_DIED_PREFIX: &str = "scheduler process died";
pub(crate) fn format_sched_died_after_step(
step_idx: usize,
total_steps: usize,
elapsed_s: f64,
) -> String {
format!(
"{SCHED_DIED_PREFIX} unexpectedly after completing step {step_idx} of {total_steps} ({elapsed_s:.1}s into test)",
)
}
pub(crate) fn format_sched_died_after_all_steps(total_steps: usize, elapsed_s: f64) -> String {
format!(
"{SCHED_DIED_PREFIX} unexpectedly (detected after all {total_steps} steps completed, {elapsed_s:.1}s elapsed)",
)
}
pub(crate) fn format_sched_died_during_workload(elapsed_s: f64) -> String {
format!("{SCHED_DIED_PREFIX} unexpectedly during workload ({elapsed_s:.1}s into test)")
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AssertDetail {
pub kind: DetailKind,
pub message: String,
pub phase: Option<std::borrow::Cow<'static, str>>,
}
impl AssertDetail {
pub fn new(kind: DetailKind, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
phase: current_phase_label(),
}
}
#[must_use = "builder methods consume self; bind the result"]
pub fn with_phase(mut self, phase: impl Into<std::borrow::Cow<'static, str>>) -> Self {
self.phase = Some(phase.into());
self
}
pub fn display_with_kind(&self) -> AssertDetailWithKind<'_> {
AssertDetailWithKind { detail: self }
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PassDetail {
pub name: String,
pub comparator: std::borrow::Cow<'static, str>,
pub value: String,
pub expected: Option<String>,
pub phase: Option<std::borrow::Cow<'static, str>>,
}
impl PassDetail {
pub fn binary(
name: impl Into<String>,
comparator: impl Into<std::borrow::Cow<'static, str>>,
value: impl Into<String>,
expected: impl Into<String>,
) -> Self {
Self {
name: name.into(),
comparator: comparator.into(),
value: value.into(),
expected: Some(expected.into()),
phase: current_phase_label(),
}
}
pub fn unary(
name: impl Into<String>,
comparator: impl Into<std::borrow::Cow<'static, str>>,
value: impl Into<String>,
) -> Self {
Self {
name: name.into(),
comparator: comparator.into(),
value: value.into(),
expected: None,
phase: current_phase_label(),
}
}
pub fn with_phase(mut self, phase: impl Into<std::borrow::Cow<'static, str>>) -> Self {
self.phase = Some(phase.into());
self
}
}
pub const COMPARATOR_VOCABULARY: &[&str] = &[
"eq",
"ne",
"ge",
"le",
"lt",
"gt",
"in_range",
"near_within",
"is_finite",
"set_is_empty",
"set_is_non_empty",
"set_len_eq",
"set_len_le",
"set_len_ge",
"set_contains",
"subset_of",
"disjoint_from",
"sequence_is_empty",
"sequence_is_non_empty",
"sequence_len_eq",
"sequence_len_le",
"sequence_len_ge",
"sequence_contains",
];
pub const MAX_RECORDED_PASSES: usize = 10_000;
pub const PASSES_TRUNCATION_SENTINEL_NAME: &str = "__ktstr_passes_truncated__";
pub const PASSES_TRUNCATION_SENTINEL_COMPARATOR: &str = "truncated";
#[must_use = "AssertDetailWithKind only renders when formatted"]
pub struct AssertDetailWithKind<'a> {
detail: &'a AssertDetail,
}
impl std::fmt::Display for AssertDetailWithKind<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{:?}] {}", self.detail.kind, self.detail.message)
}
}
impl From<String> for AssertDetail {
fn from(message: String) -> Self {
Self::new(DetailKind::Other, message)
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct InfoNote {
pub message: String,
pub phase: Option<std::borrow::Cow<'static, str>>,
}
impl InfoNote {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
phase: current_phase_label(),
}
}
#[must_use = "builder methods consume self; bind the result"]
pub fn with_phase(mut self, phase: impl Into<std::borrow::Cow<'static, str>>) -> Self {
self.phase = Some(phase.into());
self
}
}
impl std::fmt::Display for InfoNote {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
impl From<&str> for AssertDetail {
fn from(s: &str) -> Self {
Self::new(DetailKind::Other, s)
}
}
impl std::fmt::Display for AssertDetail {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message)
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum NoteValue {
Int(i64),
Uint(u64),
Float(f64),
Bool(bool),
Text(String),
}
impl From<i64> for NoteValue {
fn from(v: i64) -> Self {
Self::Int(v)
}
}
impl From<u64> for NoteValue {
fn from(v: u64) -> Self {
Self::Uint(v)
}
}
impl From<f64> for NoteValue {
fn from(v: f64) -> Self {
Self::Float(v)
}
}
impl From<bool> for NoteValue {
fn from(v: bool) -> Self {
Self::Bool(v)
}
}
impl From<String> for NoteValue {
fn from(v: String) -> Self {
Self::Text(v)
}
}
impl From<&str> for NoteValue {
fn from(v: &str) -> Self {
Self::Text(v.to_string())
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum Outcome {
Pass,
Skip(AssertDetail),
Inconclusive(AssertDetail),
Fail(AssertDetail),
}
impl Outcome {
pub fn is_pass(&self) -> bool {
matches!(self, Outcome::Pass)
}
pub fn is_skip(&self) -> bool {
matches!(self, Outcome::Skip(_))
}
pub fn is_fail(&self) -> bool {
matches!(self, Outcome::Fail(_))
}
pub fn is_inconclusive(&self) -> bool {
matches!(self, Outcome::Inconclusive(_))
}
pub fn merge(self, other: Outcome) -> Outcome {
use Outcome::*;
match (self, other) {
(Fail(d), _) | (_, Fail(d)) => Fail(d),
(Inconclusive(d), _) | (_, Inconclusive(d)) => Inconclusive(d),
(Pass, _) | (_, Pass) => Pass,
(Skip(d), Skip(_)) => Skip(d),
}
}
pub fn as_ref(&self) -> OutcomeRef<'_> {
match self {
Outcome::Pass => OutcomeRef::Pass,
Outcome::Skip(d) => OutcomeRef::Skip(d),
Outcome::Inconclusive(d) => OutcomeRef::Inconclusive(d),
Outcome::Fail(d) => OutcomeRef::Fail(d),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OutcomeRef<'a> {
Pass,
Skip(&'a AssertDetail),
Inconclusive(&'a AssertDetail),
Fail(&'a AssertDetail),
}
impl OutcomeRef<'_> {
pub fn is_pass(&self) -> bool {
matches!(self, OutcomeRef::Pass)
}
pub fn is_skip(&self) -> bool {
matches!(self, OutcomeRef::Skip(_))
}
pub fn is_fail(&self) -> bool {
matches!(self, OutcomeRef::Fail(_))
}
pub fn is_inconclusive(&self) -> bool {
matches!(self, OutcomeRef::Inconclusive(_))
}
pub fn to_owned(&self) -> Outcome {
match self {
OutcomeRef::Pass => Outcome::Pass,
OutcomeRef::Skip(d) => Outcome::Skip((*d).clone()),
OutcomeRef::Inconclusive(d) => Outcome::Inconclusive((*d).clone()),
OutcomeRef::Fail(d) => Outcome::Fail((*d).clone()),
}
}
}
#[must_use = "test verdict is lost if not checked"]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AssertResult {
pub outcomes: Vec<Outcome>,
pub passes: Vec<PassDetail>,
pub stats: ScenarioStats,
pub measurements: std::collections::BTreeMap<String, NoteValue>,
pub info_notes: Vec<InfoNote>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct CgroupStats {
pub num_workers: usize,
pub num_cpus: usize,
pub avg_off_cpu_pct: f64,
pub min_off_cpu_pct: f64,
pub max_off_cpu_pct: f64,
pub spread: f64,
pub max_gap_ms: u64,
pub max_gap_cpu: usize,
pub total_migrations: u64,
pub migration_ratio: f64,
pub p99_wake_latency_us: f64,
pub median_wake_latency_us: f64,
pub wake_latency_cv: f64,
pub total_iterations: u64,
pub mean_run_delay_us: f64,
pub worst_run_delay_us: f64,
pub page_locality: f64,
pub cross_node_migration_ratio: f64,
pub ext_metrics: BTreeMap<String, f64>,
}
impl CgroupStats {
pub fn wake_latency_tail_ratio(&self) -> f64 {
if self.median_wake_latency_us > 0.0 {
self.p99_wake_latency_us / self.median_wake_latency_us
} else {
0.0
}
}
pub fn iterations_per_worker(&self) -> f64 {
if self.num_workers > 0 {
self.total_iterations as f64 / self.num_workers as f64
} else {
0.0
}
}
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
Default,
serde::Serialize,
serde::Deserialize,
)]
#[serde(transparent)]
pub struct Phase(u16);
impl Phase {
pub const BASELINE: Self = Self(0);
pub const fn step(zero_indexed: u16) -> Self {
Self(zero_indexed.saturating_add(1))
}
pub const fn is_baseline(&self) -> bool {
self.0 == 0
}
pub const fn as_u16(self) -> u16 {
self.0
}
}
impl std::fmt::Display for Phase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_baseline() {
write!(f, "BASELINE")
} else {
write!(f, "Step[{}]", self.0 - 1)
}
}
}
impl From<u16> for Phase {
fn from(value: u16) -> Self {
Self(value)
}
}
impl From<Phase> for u16 {
fn from(value: Phase) -> Self {
value.0
}
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct PhaseBucket {
pub step_index: u16,
pub label: String,
pub start_ms: u64,
pub end_ms: u64,
pub sample_count: usize,
pub metrics: std::collections::BTreeMap<String, f64>,
}
impl PhaseBucket {
pub fn get(&self, metric_name: &str) -> Option<f64> {
self.metrics.get(metric_name).copied()
}
pub fn expect_metric(&self, metric_name: &str) -> f64 {
self.get(metric_name).unwrap_or_else(|| {
panic!(
"PhaseBucket::expect_metric: metric '{}' absent from phase \
step_index={} ('{}') with sample_count={}. \
metric keys present in this bucket: {:?}. \
Possible causes: (a) phase carried 0 samples for this \
metric (sample_count==0 means no captures landed in the \
phase at all; sample_count>0 means captures landed but \
the metric extracted no finite values from them); \
(b) metric name typo (verify against \
ScenarioStats::is_known_metric / known_metrics).",
metric_name,
self.step_index,
self.label,
self.sample_count,
self.metrics.keys().collect::<Vec<_>>(),
)
})
}
}
fn merge_matched_phase_buckets(a: PhaseBucket, b: PhaseBucket) -> PhaseBucket {
assert_eq!(
a.step_index, b.step_index,
"merge_matched_phase_buckets: caller must pair by step_index",
);
let mut metrics = std::collections::BTreeMap::new();
let mut keys: std::collections::BTreeSet<&String> = a.metrics.keys().collect();
keys.extend(b.metrics.keys());
for key in keys {
let av = a.metrics.get(key).copied();
let bv = b.metrics.get(key).copied();
let merged = match (av, bv) {
(Some(av), Some(bv)) => {
let kind = crate::stats::metric_def(key).map(|m| m.kind);
merge_metric_values(
kind,
av,
bv,
a.sample_count,
b.sample_count,
a.end_ms,
b.end_ms,
)
}
(Some(v), None) | (None, Some(v)) => v,
(None, None) => continue,
};
metrics.insert(key.clone(), merged);
}
PhaseBucket {
step_index: a.step_index,
label: a.label,
start_ms: a.start_ms.min(b.start_ms),
end_ms: a.end_ms.max(b.end_ms),
sample_count: a.sample_count + b.sample_count,
metrics,
}
}
fn merge_metric_values(
kind: Option<crate::stats::MetricKind>,
a: f64,
b: f64,
a_count: usize,
b_count: usize,
a_end_ms: u64,
b_end_ms: u64,
) -> f64 {
use crate::stats::{GaugeAgg, MetricKind};
match kind {
Some(MetricKind::Counter) => a + b,
Some(MetricKind::Peak) | Some(MetricKind::Gauge(GaugeAgg::Max)) => a.max(b),
Some(MetricKind::Gauge(GaugeAgg::Avg)) => {
let total_count = a_count + b_count;
if total_count == 0 {
(a + b) / 2.0
} else {
let a_w = a_count as f64;
let b_w = b_count as f64;
(a * a_w + b * b_w) / (total_count as f64)
}
}
Some(MetricKind::Gauge(GaugeAgg::Last)) | Some(MetricKind::Timestamp) => {
if b_end_ms > a_end_ms { b } else { a }
}
None => (a + b) / 2.0,
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct ScenarioStats {
pub cgroups: Vec<CgroupStats>,
pub total_workers: usize,
pub total_cpus: usize,
pub total_migrations: u64,
pub worst_spread: f64,
pub worst_gap_ms: u64,
pub worst_gap_cpu: usize,
pub worst_migration_ratio: f64,
pub worst_p99_wake_latency_us: f64,
pub worst_median_wake_latency_us: f64,
pub worst_wake_latency_cv: f64,
pub total_iterations: u64,
pub worst_mean_run_delay_us: f64,
pub worst_run_delay_us: f64,
pub worst_page_locality: f64,
pub worst_cross_node_migration_ratio: f64,
pub worst_wake_latency_tail_ratio: f64,
pub worst_iterations_per_worker: f64,
pub ext_metrics: BTreeMap<String, f64>,
#[serde(default)]
pub phases: Vec<PhaseBucket>,
}
impl ScenarioStats {
pub fn phase(&self, step_index: u16) -> Option<&PhaseBucket> {
self.phases.iter().find(|p| p.step_index == step_index)
}
pub fn step(&self, scenario_step_idx: u16) -> Option<&PhaseBucket> {
scenario_step_idx
.checked_add(1)
.and_then(|phase_idx| self.phase(phase_idx))
}
pub fn phase_metric(&self, step_index: u16, metric: &str) -> Option<f64> {
self.phase(step_index).and_then(|p| p.get(metric))
}
pub fn step_metric(&self, scenario_step_idx: u16, metric: &str) -> Option<f64> {
self.step(scenario_step_idx).and_then(|p| p.get(metric))
}
pub fn is_known_metric(name: &str) -> bool {
crate::stats::METRICS.iter().any(|m| m.name == name)
}
pub fn known_metrics() -> impl Iterator<Item = &'static str> {
crate::stats::METRICS.iter().map(|m| m.name)
}
pub fn has_steps(&self) -> bool {
self.phases.iter().any(|p| p.step_index >= 1)
}
}
pub fn populate_run_ext_metrics_from_phases(
phases: &[PhaseBucket],
target: &mut std::collections::BTreeMap<String, f64>,
) {
if phases.is_empty() {
return;
}
let mut keys: std::collections::BTreeSet<&String> = std::collections::BTreeSet::new();
for phase in phases {
for key in phase.metrics.keys() {
keys.insert(key);
}
}
for key in keys {
if target.contains_key(key) {
continue;
}
let Some(def) = crate::stats::metric_def(key) else {
continue;
};
let pairs: Vec<(f64, usize)> = phases
.iter()
.filter_map(|phase| {
phase
.metrics
.get(key)
.copied()
.map(|v| (v, phase.sample_count.max(1)))
})
.collect();
if pairs.is_empty() {
continue;
}
if let Some(reduced) = crate::stats::aggregate_samples_weighted(&pairs, def.kind) {
target.insert(key.clone(), reduced);
}
}
}
pub fn populate_run_ext_metrics(
samples: &crate::scenario::sample::SampleSeries,
target: &mut std::collections::BTreeMap<String, f64>,
) {
const TYPED_FIELD_NAMES: &[&str] = &[
"max_dsq_depth",
"total_fallback",
"total_keep_last",
"stuck_count",
"total_iterations",
"total_migrations",
];
for metric_def in crate::stats::METRICS {
if target.contains_key(metric_def.name) {
continue;
}
if TYPED_FIELD_NAMES.contains(&metric_def.name) {
continue;
}
let readings: Vec<f64> = samples
.iter_samples()
.filter_map(|s| metric_def.read_sample(&s))
.collect();
if readings.is_empty() {
continue;
}
if let Some(reduced) = crate::stats::aggregate_samples_for_phase(metric_def, &readings) {
target.insert(metric_def.name.to_string(), reduced);
}
}
}
pub fn build_phase_buckets_with_stimulus(
samples: &crate::scenario::sample::SampleSeries,
stimulus_events: &[crate::timeline::StimulusEvent],
) -> Vec<PhaseBucket> {
let mut buckets = build_phase_buckets(samples);
let mut sorted_events: Vec<&crate::timeline::StimulusEvent> = stimulus_events.iter().collect();
sorted_events.sort_by_key(|e| e.elapsed_ms);
for w in sorted_events.windows(2) {
let prev = w[0];
let curr = w[1];
let (Some(s), Some(e)) = (prev.total_iterations, curr.total_iterations) else {
continue;
};
if e <= s {
continue;
}
let duration_ms = curr.elapsed_ms.saturating_sub(prev.elapsed_ms);
if duration_ms == 0 {
continue;
}
let rate = (e - s) as f64 / (duration_ms as f64 / 1000.0);
for bucket in buckets.iter_mut() {
let in_bucket = if bucket.start_ms == bucket.end_ms {
prev.elapsed_ms == bucket.start_ms
} else {
prev.elapsed_ms >= bucket.start_ms && prev.elapsed_ms < bucket.end_ms
};
if in_bucket {
bucket
.metrics
.entry("iteration_rate".to_string())
.or_insert(rate);
break;
}
}
}
buckets
}
pub fn build_phase_buckets(samples: &crate::scenario::sample::SampleSeries) -> Vec<PhaseBucket> {
let monitor_samples: &[crate::monitor::MonitorSample] =
samples.monitor().map(|m| m.samples()).unwrap_or(&[]);
let by_phase = samples.by_phase();
let mut out: Vec<PhaseBucket> = Vec::with_capacity(by_phase.len());
for (step_index, samples_in_phase) in by_phase {
let label = if step_index == 0 {
"BASELINE".to_string()
} else {
format!("Step[{}]", step_index.saturating_sub(1))
};
let sample_count = samples_in_phase.len();
let (start_ms, end_ms) = match samples_in_phase.as_slice() {
[] => (0, u64::MAX),
[only] => (only.elapsed_ms, only.elapsed_ms),
[first, .., last] => (first.elapsed_ms, last.elapsed_ms),
};
let mut metrics: std::collections::BTreeMap<String, f64> =
std::collections::BTreeMap::new();
for metric_def in crate::stats::METRICS {
let per_sample_readings: Vec<f64> = samples_in_phase
.iter()
.filter_map(|s| metric_def.read_sample(s))
.collect();
if per_sample_readings.is_empty() {
continue;
}
if let Some(reduced) =
crate::stats::aggregate_samples_for_phase(metric_def, &per_sample_readings)
{
metrics.insert(metric_def.name.to_string(), reduced);
}
}
let in_window = |ms: u64| -> bool {
if start_ms == end_ms {
ms == start_ms
} else {
ms >= start_ms && ms < end_ms
}
};
let phase_monitor_samples: Vec<&crate::monitor::MonitorSample> = monitor_samples
.iter()
.filter(|s| in_window(s.elapsed_ms))
.filter(|s| crate::monitor::sample_looks_valid(s))
.collect();
if !phase_monitor_samples.is_empty() && !metrics.contains_key("avg_imbalance_ratio") {
let sum: f64 = phase_monitor_samples
.iter()
.map(|s| s.imbalance_ratio())
.sum();
let avg = sum / phase_monitor_samples.len() as f64;
if avg.is_finite() {
metrics.insert("avg_imbalance_ratio".to_string(), avg);
}
}
out.push(PhaseBucket {
step_index,
label,
start_ms,
end_ms,
sample_count,
metrics,
});
}
out
}
impl AssertResult {
pub fn pass() -> Self {
Self {
outcomes: vec![],
passes: vec![],
stats: Default::default(),
measurements: std::collections::BTreeMap::new(),
info_notes: vec![],
}
}
pub fn skip(reason: impl Into<String>) -> Self {
Self {
outcomes: vec![Outcome::Skip(AssertDetail::new(DetailKind::Skip, reason))],
..Self::pass()
}
}
pub fn fail(detail: AssertDetail) -> Self {
Self {
outcomes: vec![Outcome::Fail(detail)],
..Self::pass()
}
}
pub fn fail_msg(msg: impl Into<String>) -> Self {
Self::fail(AssertDetail::new(DetailKind::Other, msg))
}
pub fn inconclusive(detail: AssertDetail) -> Self {
Self {
outcomes: vec![Outcome::Inconclusive(detail)],
..Self::pass()
}
}
pub fn inconclusive_msg(msg: impl Into<String>) -> Self {
Self::inconclusive(AssertDetail::new(DetailKind::Other, msg))
}
pub fn record_fail(&mut self, detail: AssertDetail) -> &mut Self {
self.outcomes.push(Outcome::Fail(detail));
self
}
pub fn record_skip(&mut self, reason: impl Into<String>) -> &mut Self {
self.outcomes
.push(Outcome::Skip(AssertDetail::new(DetailKind::Skip, reason)));
self
}
pub fn record_inconclusive(&mut self, detail: AssertDetail) -> &mut Self {
self.outcomes.push(Outcome::Inconclusive(detail));
self
}
pub fn record_pass(&mut self) -> &mut Self {
self.outcomes.push(Outcome::Pass);
self
}
pub fn record_outcome(&mut self, outcome: Outcome) -> &mut Self {
self.outcomes.push(outcome);
self
}
pub fn is_pass(&self) -> bool {
!self.is_fail() && !self.is_inconclusive() && !self.is_skip()
}
pub fn is_fail(&self) -> bool {
self.outcomes.iter().any(Outcome::is_fail)
}
pub fn is_skip(&self) -> bool {
!self.outcomes.is_empty() && self.outcomes.iter().all(Outcome::is_skip)
}
pub fn is_inconclusive(&self) -> bool {
!self.is_fail() && self.outcomes.iter().any(Outcome::is_inconclusive)
}
pub fn failure_details(&self) -> impl Iterator<Item = &AssertDetail> {
self.outcomes.iter().filter_map(|o| match o {
Outcome::Fail(d) => Some(d),
_ => None,
})
}
pub fn skip_details(&self) -> impl Iterator<Item = &AssertDetail> {
self.outcomes.iter().filter_map(|o| match o {
Outcome::Skip(d) => Some(d),
_ => None,
})
}
pub fn inconclusive_details(&self) -> impl Iterator<Item = &AssertDetail> {
self.outcomes.iter().filter_map(|o| match o {
Outcome::Inconclusive(d) => Some(d),
_ => None,
})
}
pub fn into_anyhow_or_log(self) -> anyhow::Result<()> {
for note in &self.info_notes {
tracing::info!(target: "ktstr::assert", "{}", note.message);
}
let failures: Vec<String> = self.failure_details().map(|d| d.message.clone()).collect();
if !failures.is_empty() {
let combined = if failures.len() == 1 {
failures.into_iter().next().unwrap()
} else {
let mut out = format!("{} assertion failures:\n", failures.len());
for (i, msg) in failures.iter().enumerate() {
out.push_str(&format!(" {}. {}\n", i + 1, msg));
}
out.trim_end().to_string()
};
anyhow::bail!("{}", combined);
}
let inconclusives: Vec<String> = self
.inconclusive_details()
.map(|d| d.message.clone())
.collect();
if !inconclusives.is_empty() {
let combined = if inconclusives.len() == 1 {
format!("1 inconclusive verdict: {}", inconclusives[0])
} else {
let mut out = format!("{} inconclusive verdicts:\n", inconclusives.len());
for (i, msg) in inconclusives.iter().enumerate() {
out.push_str(&format!(" {}. {}\n", i + 1, msg));
}
out.trim_end().to_string()
};
anyhow::bail!("{}", combined);
}
Ok(())
}
pub fn note(&mut self, msg: impl Into<String>) -> &mut Self {
self.info_notes.push(InfoNote::new(msg));
self
}
pub fn with_note(mut self, msg: impl Into<String>) -> Self {
self.note(msg);
self
}
pub fn outcome(&self) -> Outcome {
self.outcome_ref().to_owned()
}
pub fn outcome_ref(&self) -> OutcomeRef<'_> {
if let Some(d) = self.failure_details().next() {
OutcomeRef::Fail(d)
} else if let Some(d) = self.inconclusive_details().next() {
OutcomeRef::Inconclusive(d)
} else if let Some(d) = self.skip_details().next() {
if self.outcomes.iter().all(Outcome::is_skip) {
OutcomeRef::Skip(d)
} else {
OutcomeRef::Pass
}
} else {
OutcomeRef::Pass
}
}
pub fn merge(&mut self, mut other: AssertResult) {
fn fold_lowest_nonzero(self_field: &mut f64, other_field: f64) {
if other_field > 0.0 && (*self_field == 0.0 || other_field < *self_field) {
*self_field = other_field;
}
}
self.outcomes.extend(other.outcomes);
self.passes.extend(other.passes);
self.info_notes.extend(other.info_notes);
let s = &mut self.stats;
let o = &other.stats;
s.total_workers += o.total_workers;
s.total_cpus += o.total_cpus;
s.total_migrations += o.total_migrations;
s.total_iterations += o.total_iterations;
s.worst_spread = s.worst_spread.max(o.worst_spread);
s.worst_migration_ratio = s.worst_migration_ratio.max(o.worst_migration_ratio);
s.worst_p99_wake_latency_us = s.worst_p99_wake_latency_us.max(o.worst_p99_wake_latency_us);
s.worst_median_wake_latency_us = s
.worst_median_wake_latency_us
.max(o.worst_median_wake_latency_us);
s.worst_wake_latency_cv = s.worst_wake_latency_cv.max(o.worst_wake_latency_cv);
s.worst_run_delay_us = s.worst_run_delay_us.max(o.worst_run_delay_us);
s.worst_mean_run_delay_us = s.worst_mean_run_delay_us.max(o.worst_mean_run_delay_us);
s.worst_cross_node_migration_ratio = s
.worst_cross_node_migration_ratio
.max(o.worst_cross_node_migration_ratio);
s.worst_wake_latency_tail_ratio = s
.worst_wake_latency_tail_ratio
.max(o.worst_wake_latency_tail_ratio);
fold_lowest_nonzero(
&mut s.worst_iterations_per_worker,
o.worst_iterations_per_worker,
);
if o.worst_gap_ms > s.worst_gap_ms {
s.worst_gap_ms = o.worst_gap_ms;
s.worst_gap_cpu = o.worst_gap_cpu;
}
fold_lowest_nonzero(&mut s.worst_page_locality, o.worst_page_locality);
for (k, v) in &other.stats.ext_metrics {
let higher_is_worse = crate::stats::metric_def(k)
.map(|m| m.higher_is_worse())
.unwrap_or_else(|| crate::stats::infer_higher_is_worse(k));
let entry = self.stats.ext_metrics.entry(k.clone()).or_insert(*v);
*entry = if higher_is_worse {
entry.max(*v)
} else {
entry.min(*v)
};
}
let other_phases = std::mem::take(&mut other.stats.phases);
if !self.stats.phases.is_empty() || !other_phases.is_empty() {
let mut other_by_idx: std::collections::BTreeMap<u16, PhaseBucket> = other_phases
.into_iter()
.map(|b| (b.step_index, b))
.collect();
let self_buckets = std::mem::take(&mut self.stats.phases);
let mut merged: Vec<PhaseBucket> =
Vec::with_capacity(self_buckets.len() + other_by_idx.len());
for s_bucket in self_buckets {
if let Some(o_bucket) = other_by_idx.remove(&s_bucket.step_index) {
merged.push(merge_matched_phase_buckets(s_bucket, o_bucket));
} else {
merged.push(s_bucket);
}
}
merged.extend(other_by_idx.into_values());
merged.sort_by_key(|b| b.step_index);
self.stats.phases = merged;
}
self.stats.cgroups.extend(other.stats.cgroups);
for (k, v) in other.measurements {
self.measurements.insert(k, v);
}
}
pub fn note_value(&mut self, key: impl Into<String>, value: impl Into<NoteValue>) -> &mut Self {
self.measurements.insert(key.into(), value.into());
self
}
pub fn with_note_value(mut self, key: impl Into<String>, value: impl Into<NoteValue>) -> Self {
self.note_value(key, value);
self
}
pub fn any_of(branches: impl IntoIterator<Item = AssertResult>) -> AssertResult {
let branches: Vec<AssertResult> = branches.into_iter().collect();
if branches.is_empty() {
return AssertResult::fail(AssertDetail::new(
DetailKind::Other,
"any_of: empty branch list — a disjunction of zero alternatives is logically false",
));
}
let first_pass_idx = branches.iter().position(|b| b.is_pass());
if let Some(idx) = first_pass_idx {
let mut chosen: Option<AssertResult> = None;
let mut union_measurements: std::collections::BTreeMap<String, NoteValue> =
std::collections::BTreeMap::new();
let mut union_info_notes: Vec<InfoNote> = Vec::new();
for (orig_idx, b) in branches.into_iter().enumerate() {
if orig_idx == idx {
let mut b = b;
let pre_notes = std::mem::take(&mut b.info_notes);
let pre_meas = std::mem::take(&mut b.measurements);
chosen = Some(b);
for n in pre_notes {
union_info_notes
.push(InfoNote::new(format!("any_of[{orig_idx}]: {}", n.message)));
}
for (k, v) in pre_meas {
union_measurements.insert(k, v);
}
} else if b.is_pass() {
for n in b.info_notes {
union_info_notes
.push(InfoNote::new(format!("any_of[{orig_idx}]: {}", n.message)));
}
for (k, v) in b.measurements {
union_measurements.insert(k, v);
}
}
}
let mut chosen = chosen.expect("first_pass_idx matched a branch");
chosen.measurements = union_measurements;
chosen.info_notes = union_info_notes;
chosen.info_notes.push(InfoNote::new(format!(
"any_of: branch {idx} satisfied the disjunction"
)));
chosen
} else {
let total_branches = branches.len();
let (n_fail, n_inc, n_skip) =
branches
.iter()
.fold((0usize, 0usize, 0usize), |(f, i, s), b| {
if b.is_fail() {
(f + 1, i, s)
} else if b.is_inconclusive() {
(f, i + 1, s)
} else if b.is_skip() {
(f, i, s + 1)
} else {
(f, i, s)
}
});
let mut iter = branches.into_iter().enumerate();
let (_, first) = iter.next().expect("non-empty checked above");
let mut acc = AssertResult {
outcomes: Vec::new(),
passes: first.passes,
stats: first.stats,
measurements: first.measurements,
info_notes: Vec::new(),
};
fn reemit_with_prefix(
acc: &mut AssertResult,
idx: usize,
outcomes: Vec<Outcome>,
info_notes: Vec<InfoNote>,
) {
for o in outcomes {
match o {
Outcome::Pass => acc.outcomes.push(Outcome::Pass),
Outcome::Fail(d) => acc.outcomes.push(Outcome::Fail(AssertDetail::new(
d.kind,
format!("any_of[{idx}]: {}", d.message),
))),
Outcome::Inconclusive(d) => acc.outcomes.push(Outcome::Inconclusive(
AssertDetail::new(d.kind, format!("any_of[{idx}]: {}", d.message)),
)),
Outcome::Skip(d) => acc.outcomes.push(Outcome::Skip(AssertDetail::new(
d.kind,
format!("any_of[{idx}]: {}", d.message),
))),
}
}
for n in info_notes {
acc.info_notes
.push(InfoNote::new(format!("any_of[{idx}]: {}", n.message)));
}
}
reemit_with_prefix(&mut acc, 0, first.outcomes, first.info_notes);
for (idx, b) in iter {
reemit_with_prefix(&mut acc, idx, b.outcomes, b.info_notes);
}
let summary = format!(
"any_of: no branch passed ({n_fail} failed, {n_inc} inconclusive, {n_skip} skipped of {total_branches} branches)"
);
let synth = if n_fail > 0 {
Outcome::Fail(AssertDetail::new(DetailKind::Other, summary))
} else if n_inc > 0 {
Outcome::Inconclusive(AssertDetail::new(DetailKind::Other, summary))
} else {
Outcome::Skip(AssertDetail::new(DetailKind::Skip, summary))
};
acc.outcomes.push(synth);
acc
}
}
pub fn all_of(branches: impl IntoIterator<Item = AssertResult>) -> AssertResult {
let mut acc = AssertResult::pass();
for b in branches {
acc.merge(b);
}
acc
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct AssertPlan {
pub(crate) not_starved: bool,
pub(crate) isolation: bool,
pub(crate) max_gap_ms: Option<u64>,
pub(crate) max_spread_pct: Option<f64>,
pub(crate) max_throughput_cv: Option<f64>,
pub(crate) min_work_rate: Option<f64>,
pub(crate) max_p99_wake_latency_ns: Option<u64>,
pub(crate) max_wake_latency_cv: Option<f64>,
pub(crate) min_iteration_rate: Option<f64>,
pub(crate) max_migration_ratio: Option<f64>,
pub(crate) min_page_locality: Option<f64>,
pub(crate) max_cross_node_migration_ratio: Option<f64>,
pub(crate) max_slow_tier_ratio: Option<f64>,
}
impl AssertPlan {
#[cfg(test)]
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn assert_cgroup(
&self,
reports: &[WorkerReport],
cpuset: Option<&BTreeSet<usize>>,
numa_nodes: Option<&BTreeSet<usize>>,
) -> AssertResult {
let mut r = AssertResult::pass();
if self.not_starved {
let mut cgroup_result = assert_not_starved(reports);
if let Some(spread_limit) = self.max_spread_pct {
cgroup_result
.outcomes
.retain(|o| !matches!(o, Outcome::Fail(d) if d.kind == DetailKind::Unfair));
if let Some(cg) = cgroup_result.stats.cgroups.first()
&& cg.spread > spread_limit
&& cg.num_workers >= 2
{
cgroup_result.record_fail(AssertDetail::new(
DetailKind::Unfair,
format!(
"unfair cgroup: spread={:.0}% ({:.0}-{:.0}%) {} workers on {} cpus (threshold {:.0}%)",
cg.spread, cg.min_off_cpu_pct, cg.max_off_cpu_pct,
cg.num_workers, cg.num_cpus, spread_limit
),
));
}
}
if let Some(threshold) = self.max_gap_ms {
cgroup_result
.outcomes
.retain(|o| !matches!(o, Outcome::Fail(d) if d.kind == DetailKind::Stuck));
for w in reports {
if w.max_gap_ms > threshold {
cgroup_result.record_fail(AssertDetail::new(
DetailKind::Stuck,
format!(
"tid {} stuck {}ms on cpu{} at +{}ms (threshold {}ms)",
w.tid, w.max_gap_ms, w.max_gap_cpu, w.max_gap_at_ms, threshold,
),
));
}
}
}
r.merge(cgroup_result);
}
if self.isolation
&& let Some(cs) = cpuset
{
r.merge(assert_isolation(reports, cs));
}
if self.max_throughput_cv.is_some() || self.min_work_rate.is_some() {
r.merge(assert_throughput_parity(
reports,
self.max_throughput_cv,
self.min_work_rate,
));
}
if self.max_p99_wake_latency_ns.is_some()
|| self.max_wake_latency_cv.is_some()
|| self.min_iteration_rate.is_some()
{
r.merge(assert_benchmarks(
reports,
self.max_p99_wake_latency_ns,
self.max_wake_latency_cv,
self.min_iteration_rate,
));
}
if let Some(max_ratio) = self.max_migration_ratio {
let total_mig: u64 = reports.iter().map(|w| w.migration_count).sum();
let total_iters: u64 = reports.iter().map(|w| w.iterations).sum();
if total_iters == 0 {
r.record_inconclusive(AssertDetail::new(
DetailKind::Migration,
format!(
"migration ratio inconclusive: 0 iterations across {} workers — \
denominator is zero, ratio cannot be computed; threshold {:.4} \
neither pass nor fail (was the workload able to run?)",
reports.len(),
max_ratio,
),
));
} else {
let ratio = total_mig as f64 / total_iters as f64;
if ratio > max_ratio {
r.record_fail(AssertDetail::new(
DetailKind::Migration,
format!(
"migration ratio {:.4} exceeds threshold {:.4} ({} migrations / {} iterations)",
ratio, max_ratio, total_mig, total_iters,
),
));
}
}
}
if let Some(min_locality) = self.min_page_locality
&& let Some(nodes) = numa_nodes
{
let mut total: u64 = 0;
let mut local: u64 = 0;
for w in reports {
for (&node, &count) in &w.numa_pages {
total += count;
if nodes.contains(&node) {
local += count;
}
}
}
let locality = if total > 0 {
local as f64 / total as f64
} else {
0.0
};
r.merge(assert_page_locality(
locality,
Some(min_locality),
total,
local,
));
}
if let Some(max_ratio) = self.max_cross_node_migration_ratio {
let total_pages: u64 = reports
.iter()
.map(|w| w.numa_pages.values().sum::<u64>())
.sum();
let migrated_pages: u64 = reports
.iter()
.map(|w| w.vmstat_numa_pages_migrated)
.max()
.unwrap_or(0);
r.merge(assert_cross_node_migration(
migrated_pages,
total_pages,
Some(max_ratio),
));
}
if let Some(max_ratio) = self.max_slow_tier_ratio
&& numa_nodes.is_some()
{
let mut evaluated = 0usize;
for w in reports {
if w.numa_pages.is_empty() {
continue;
}
let total: u64 = w.numa_pages.values().sum();
if total > 0 {
evaluated += 1;
r.merge(assert_slow_tier_ratio(
&w.numa_pages,
max_ratio,
total,
numa_nodes,
));
}
}
if evaluated == 0 {
r.record_inconclusive(AssertDetail::new(
DetailKind::SlowTier,
format!(
"slow-tier ratio inconclusive: no worker reported any NUMA pages \
(across {} workers) — denominator is zero, ratio cannot be computed; \
threshold {max_ratio:.4} neither pass nor fail \
(did the workload allocate any memory?)",
reports.len(),
),
));
}
}
r
}
}
fn assert_slow_tier_ratio(
numa_pages: &BTreeMap<usize, u64>,
max_ratio: f64,
total_pages: u64,
numa_nodes: Option<&BTreeSet<usize>>,
) -> AssertResult {
let mut r = AssertResult::pass();
let Some(cpu_nodes) = numa_nodes else {
return r;
};
let slow_pages: u64 = numa_pages
.iter()
.filter(|(node, _)| !cpu_nodes.contains(node))
.map(|(_, count)| count)
.sum();
let ratio = slow_pages as f64 / total_pages as f64;
if ratio > max_ratio {
r.record_fail(AssertDetail::new(
DetailKind::SlowTier,
format!(
"slow-tier page ratio {ratio:.4} ({pct:.2}%) exceeds threshold {max_ratio:.4} ({thr_pct:.2}%) \
({slow_pages}/{total_pages} pages on non-CPU nodes)",
pct = ratio * 100.0,
thr_pct = max_ratio * 100.0,
),
));
}
r
}
pub fn assert_page_locality(
observed: f64,
min_locality: Option<f64>,
total_pages: u64,
local_pages: u64,
) -> AssertResult {
let mut r = AssertResult::pass();
if let Some(threshold) = min_locality
&& observed < threshold
{
r.record_fail(AssertDetail::new(
DetailKind::PageLocality,
format!(
"page locality {observed:.4} ({pct:.2}%) below threshold {threshold:.4} ({thr_pct:.2}%) ({local_pages}/{total_pages} pages local)",
pct = observed * 100.0,
thr_pct = threshold * 100.0,
),
));
}
r
}
pub fn assert_cross_node_migration(
migrated_pages: u64,
total_pages: u64,
max_ratio: Option<f64>,
) -> AssertResult {
let mut r = AssertResult::pass();
if let Some(threshold) = max_ratio {
if total_pages == 0 {
if migrated_pages > 0 {
r.record_fail(AssertDetail::new(
DetailKind::CrossNodeMigration,
format!(
"cross-node migration inconsistent: {migrated_pages} pages migrated but 0 pages observed in numa_maps (threshold {threshold:.4})",
),
));
} else {
r.record_inconclusive(AssertDetail::new(
DetailKind::CrossNodeMigration,
format!(
"cross-node migration inconclusive: 0 pages observed in numa_maps and 0 pages migrated — \
denominator is zero, ratio cannot be computed; threshold {threshold:.4} \
neither pass nor fail (did the workload allocate any memory?)",
),
));
}
return r;
}
let ratio = migrated_pages as f64 / total_pages as f64;
if ratio > threshold {
r.record_fail(AssertDetail::new(
DetailKind::CrossNodeMigration,
format!(
"cross-node migration ratio {ratio:.4} ({pct:.2}%) exceeds threshold {threshold:.4} ({thr_pct:.2}%) ({migrated_pages}/{total_pages} pages migrated)",
pct = ratio * 100.0,
thr_pct = threshold * 100.0,
),
));
}
}
r
}
#[cfg(test)]
impl AssertPlan {
fn check_not_starved(mut self) -> Self {
self.not_starved = true;
self
}
fn check_isolation(mut self) -> Self {
self.isolation = true;
self
}
fn max_gap_ms(mut self, ms: u64) -> Self {
self.max_gap_ms = Some(ms);
self
}
}
#[must_use = "builder methods return a new Assert; discard means config is lost"]
#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)]
pub struct Assert {
pub not_starved: Option<bool>,
pub isolation: Option<bool>,
pub max_gap_ms: Option<u64>,
pub max_spread_pct: Option<f64>,
pub max_throughput_cv: Option<f64>,
pub min_work_rate: Option<f64>,
pub max_p99_wake_latency_ns: Option<u64>,
pub max_wake_latency_cv: Option<f64>,
pub min_iteration_rate: Option<f64>,
pub max_migration_ratio: Option<f64>,
pub max_imbalance_ratio: Option<f64>,
pub max_local_dsq_depth: Option<u32>,
pub fail_on_stall: Option<bool>,
pub sustained_samples: Option<usize>,
pub max_fallback_rate: Option<f64>,
pub max_keep_last_rate: Option<f64>,
pub enforce_monitor_thresholds: bool,
pub min_page_locality: Option<f64>,
pub max_cross_node_migration_ratio: Option<f64>,
pub max_slow_tier_ratio: Option<f64>,
#[serde(skip)]
pub expect_scx_bpf_error_contains: Option<&'static str>,
#[serde(skip)]
pub expect_scx_bpf_error_matches: Option<&'static str>,
}
impl Assert {
pub fn format_human(&self) -> String {
use std::fmt::Write;
let mut out = String::new();
fn row<T: std::fmt::Display>(out: &mut String, name: &str, v: &Option<T>) {
match v {
Some(x) => writeln!(out, " {name:<38}: {x}").unwrap(),
None => writeln!(out, " {name:<38}: none").unwrap(),
}
}
row(&mut out, "not_starved", &self.not_starved);
row(&mut out, "isolation", &self.isolation);
row(&mut out, "max_gap_ms", &self.max_gap_ms);
row(&mut out, "max_spread_pct", &self.max_spread_pct);
row(&mut out, "max_throughput_cv", &self.max_throughput_cv);
row(&mut out, "min_work_rate", &self.min_work_rate);
row(
&mut out,
"max_p99_wake_latency_ns",
&self.max_p99_wake_latency_ns,
);
row(&mut out, "max_wake_latency_cv", &self.max_wake_latency_cv);
row(&mut out, "min_iteration_rate", &self.min_iteration_rate);
row(&mut out, "max_migration_ratio", &self.max_migration_ratio);
row(&mut out, "max_imbalance_ratio", &self.max_imbalance_ratio);
row(&mut out, "max_local_dsq_depth", &self.max_local_dsq_depth);
row(&mut out, "fail_on_stall", &self.fail_on_stall);
row(&mut out, "sustained_samples", &self.sustained_samples);
row(&mut out, "max_fallback_rate", &self.max_fallback_rate);
row(&mut out, "max_keep_last_rate", &self.max_keep_last_rate);
row(&mut out, "min_page_locality", &self.min_page_locality);
row(
&mut out,
"max_cross_node_migration_ratio",
&self.max_cross_node_migration_ratio,
);
row(&mut out, "max_slow_tier_ratio", &self.max_slow_tier_ratio);
row(
&mut out,
"expect_scx_bpf_error_contains",
&self.expect_scx_bpf_error_contains,
);
row(
&mut out,
"expect_scx_bpf_error_matches",
&self.expect_scx_bpf_error_matches,
);
out
}
pub const NO_OVERRIDES: Assert = Assert {
not_starved: None,
isolation: None,
max_gap_ms: None,
max_spread_pct: None,
max_throughput_cv: None,
min_work_rate: None,
max_p99_wake_latency_ns: None,
max_wake_latency_cv: None,
min_iteration_rate: None,
max_migration_ratio: None,
max_imbalance_ratio: None,
max_local_dsq_depth: None,
fail_on_stall: None,
sustained_samples: None,
max_fallback_rate: None,
max_keep_last_rate: None,
enforce_monitor_thresholds: false,
min_page_locality: None,
max_cross_node_migration_ratio: None,
max_slow_tier_ratio: None,
expect_scx_bpf_error_contains: None,
expect_scx_bpf_error_matches: None,
};
pub const fn default_checks() -> Assert {
Self::NO_OVERRIDES
}
pub fn verdict(self) -> Verdict {
Verdict::with_assert(self)
}
pub const fn check_not_starved(mut self) -> Self {
self.not_starved = Some(true);
self
}
pub const fn check_isolation(mut self) -> Self {
self.isolation = Some(true);
self
}
pub const fn max_gap_ms(mut self, ms: u64) -> Self {
self.max_gap_ms = Some(ms);
self
}
pub const fn max_spread_pct(mut self, pct: f64) -> Self {
self.max_spread_pct = Some(pct);
self
}
pub const fn max_throughput_cv(mut self, v: f64) -> Self {
self.max_throughput_cv = Some(v);
self
}
pub const fn min_work_rate(mut self, v: f64) -> Self {
self.min_work_rate = Some(v);
self
}
pub const fn max_p99_wake_latency_ns(mut self, v: u64) -> Self {
self.max_p99_wake_latency_ns = Some(v);
self
}
pub const fn max_wake_latency_cv(mut self, v: f64) -> Self {
self.max_wake_latency_cv = Some(v);
self
}
pub const fn min_iteration_rate(mut self, v: f64) -> Self {
self.min_iteration_rate = Some(v);
self
}
pub const fn max_migration_ratio(mut self, v: f64) -> Self {
self.max_migration_ratio = Some(v);
self
}
pub const fn max_imbalance_ratio(mut self, v: f64) -> Self {
self.max_imbalance_ratio = Some(v);
self
}
pub const fn max_local_dsq_depth(mut self, v: u32) -> Self {
self.max_local_dsq_depth = Some(v);
self
}
pub const fn fail_on_stall(mut self, v: bool) -> Self {
self.fail_on_stall = Some(v);
self
}
pub const fn sustained_samples(mut self, v: usize) -> Self {
self.sustained_samples = Some(v);
self
}
pub const fn max_fallback_rate(mut self, v: f64) -> Self {
self.max_fallback_rate = Some(v);
self
}
pub const fn max_keep_last_rate(mut self, v: f64) -> Self {
self.max_keep_last_rate = Some(v);
self
}
pub const fn min_page_locality(mut self, v: f64) -> Self {
self.min_page_locality = Some(v);
self
}
pub const fn max_cross_node_migration_ratio(mut self, v: f64) -> Self {
self.max_cross_node_migration_ratio = Some(v);
self
}
pub const fn max_slow_tier_ratio(mut self, v: f64) -> Self {
self.max_slow_tier_ratio = Some(v);
self
}
pub const fn has_worker_checks(&self) -> bool {
self.not_starved.is_some()
|| self.isolation.is_some()
|| self.max_gap_ms.is_some()
|| self.max_spread_pct.is_some()
|| self.max_throughput_cv.is_some()
|| self.min_work_rate.is_some()
|| self.max_p99_wake_latency_ns.is_some()
|| self.max_wake_latency_cv.is_some()
|| self.min_iteration_rate.is_some()
|| self.max_migration_ratio.is_some()
|| self.min_page_locality.is_some()
|| self.max_cross_node_migration_ratio.is_some()
|| self.max_slow_tier_ratio.is_some()
}
pub const fn merge(&self, other: &Assert) -> Assert {
Assert {
not_starved: match other.not_starved {
Some(v) => Some(v),
None => self.not_starved,
},
isolation: match other.isolation {
Some(v) => Some(v),
None => self.isolation,
},
max_gap_ms: match other.max_gap_ms {
Some(v) => Some(v),
None => self.max_gap_ms,
},
max_spread_pct: match other.max_spread_pct {
Some(v) => Some(v),
None => self.max_spread_pct,
},
max_throughput_cv: match other.max_throughput_cv {
Some(v) => Some(v),
None => self.max_throughput_cv,
},
min_work_rate: match other.min_work_rate {
Some(v) => Some(v),
None => self.min_work_rate,
},
max_p99_wake_latency_ns: match other.max_p99_wake_latency_ns {
Some(v) => Some(v),
None => self.max_p99_wake_latency_ns,
},
max_wake_latency_cv: match other.max_wake_latency_cv {
Some(v) => Some(v),
None => self.max_wake_latency_cv,
},
min_iteration_rate: match other.min_iteration_rate {
Some(v) => Some(v),
None => self.min_iteration_rate,
},
max_migration_ratio: match other.max_migration_ratio {
Some(v) => Some(v),
None => self.max_migration_ratio,
},
max_imbalance_ratio: match other.max_imbalance_ratio {
Some(v) => Some(v),
None => self.max_imbalance_ratio,
},
max_local_dsq_depth: match other.max_local_dsq_depth {
Some(v) => Some(v),
None => self.max_local_dsq_depth,
},
fail_on_stall: match other.fail_on_stall {
Some(v) => Some(v),
None => self.fail_on_stall,
},
sustained_samples: match other.sustained_samples {
Some(v) => Some(v),
None => self.sustained_samples,
},
max_fallback_rate: match other.max_fallback_rate {
Some(v) => Some(v),
None => self.max_fallback_rate,
},
max_keep_last_rate: match other.max_keep_last_rate {
Some(v) => Some(v),
None => self.max_keep_last_rate,
},
enforce_monitor_thresholds: self.enforce_monitor_thresholds
|| other.enforce_monitor_thresholds,
min_page_locality: match other.min_page_locality {
Some(v) => Some(v),
None => self.min_page_locality,
},
max_cross_node_migration_ratio: match other.max_cross_node_migration_ratio {
Some(v) => Some(v),
None => self.max_cross_node_migration_ratio,
},
max_slow_tier_ratio: match other.max_slow_tier_ratio {
Some(v) => Some(v),
None => self.max_slow_tier_ratio,
},
expect_scx_bpf_error_contains: match other.expect_scx_bpf_error_contains {
Some(v) => Some(v),
None => self.expect_scx_bpf_error_contains,
},
expect_scx_bpf_error_matches: match other.expect_scx_bpf_error_matches {
Some(v) => Some(v),
None => self.expect_scx_bpf_error_matches,
},
}
}
pub(crate) fn worker_plan(&self) -> AssertPlan {
AssertPlan {
not_starved: self.not_starved.unwrap_or(false),
isolation: self.isolation.unwrap_or(false),
max_gap_ms: self.max_gap_ms,
max_spread_pct: self.max_spread_pct,
max_throughput_cv: self.max_throughput_cv,
min_work_rate: self.min_work_rate,
max_p99_wake_latency_ns: self.max_p99_wake_latency_ns,
max_wake_latency_cv: self.max_wake_latency_cv,
min_iteration_rate: self.min_iteration_rate,
max_migration_ratio: self.max_migration_ratio,
min_page_locality: self.min_page_locality,
max_cross_node_migration_ratio: self.max_cross_node_migration_ratio,
max_slow_tier_ratio: self.max_slow_tier_ratio,
}
}
pub fn assert_cgroup(
&self,
reports: &[crate::workload::WorkerReport],
cpuset: Option<&BTreeSet<usize>>,
) -> AssertResult {
self.worker_plan().assert_cgroup(reports, cpuset, None)
}
pub fn assert_cgroup_with_numa(
&self,
reports: &[crate::workload::WorkerReport],
cpuset: Option<&BTreeSet<usize>>,
numa_nodes: Option<&BTreeSet<usize>>,
) -> AssertResult {
self.worker_plan()
.assert_cgroup(reports, cpuset, numa_nodes)
}
pub fn assert_page_locality(
&self,
observed: f64,
total_pages: u64,
local_pages: u64,
) -> AssertResult {
assert_page_locality(observed, self.min_page_locality, total_pages, local_pages)
}
pub fn assert_cross_node_migration(
&self,
migrated_pages: u64,
total_pages: u64,
) -> AssertResult {
assert_cross_node_migration(
migrated_pages,
total_pages,
self.max_cross_node_migration_ratio,
)
}
pub(crate) fn has_monitor_thresholds(&self) -> bool {
self.max_imbalance_ratio.is_some()
|| self.max_local_dsq_depth.is_some()
|| self.fail_on_stall.is_some()
|| self.sustained_samples.is_some()
|| self.max_fallback_rate.is_some()
|| self.max_keep_last_rate.is_some()
}
pub(crate) fn monitor_thresholds(&self) -> crate::monitor::MonitorThresholds {
use crate::monitor::MonitorThresholds;
let d = MonitorThresholds::new();
MonitorThresholds {
max_imbalance_ratio: self.max_imbalance_ratio.unwrap_or(d.max_imbalance_ratio),
max_local_dsq_depth: self.max_local_dsq_depth.unwrap_or(d.max_local_dsq_depth),
fail_on_stall: self.fail_on_stall.unwrap_or(d.fail_on_stall),
sustained_samples: self.sustained_samples.unwrap_or(d.sustained_samples),
max_fallback_rate: self.max_fallback_rate.unwrap_or(d.max_fallback_rate),
max_keep_last_rate: self.max_keep_last_rate.unwrap_or(d.max_keep_last_rate),
enforce: self.enforce_monitor_thresholds,
}
}
pub const fn with_monitor_defaults(mut self) -> Self {
use crate::monitor::MonitorThresholds;
let d = MonitorThresholds::new();
if self.max_imbalance_ratio.is_none() {
self.max_imbalance_ratio = Some(d.max_imbalance_ratio);
}
if self.max_local_dsq_depth.is_none() {
self.max_local_dsq_depth = Some(d.max_local_dsq_depth);
}
if self.fail_on_stall.is_none() {
self.fail_on_stall = Some(d.fail_on_stall);
}
if self.sustained_samples.is_none() {
self.sustained_samples = Some(d.sustained_samples);
}
if self.max_fallback_rate.is_none() {
self.max_fallback_rate = Some(d.max_fallback_rate);
}
if self.max_keep_last_rate.is_none() {
self.max_keep_last_rate = Some(d.max_keep_last_rate);
}
self.enforce_monitor_thresholds = true;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub const fn expect_scx_bpf_error_contains(mut self, literal: &'static str) -> Self {
assert!(
!literal.is_empty(),
"Assert::expect_scx_bpf_error_contains: literal must be non-empty",
);
self.expect_scx_bpf_error_contains = Some(literal);
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn expect_scx_bpf_error_matches(mut self, pattern: &'static str) -> Self {
assert!(
!pattern.is_empty(),
"Assert::expect_scx_bpf_error_matches: pattern must be non-empty",
);
let compiled = regex::Regex::new(pattern).unwrap_or_else(|e| {
panic!(
"Assert::expect_scx_bpf_error_matches: pattern {pattern:?} is not valid regex: {e}",
)
});
assert!(
!compiled.is_match(""),
"Assert::expect_scx_bpf_error_matches: pattern {pattern:?} matches the empty \
string (e.g. `a?`, `.*`, `(?:)`, `^$`); such patterns vacuously match any \
corpus and turn the matcher into a no-op — use a meaningful pattern that \
requires at least one character",
);
self.expect_scx_bpf_error_matches = Some(pattern);
self
}
pub fn evaluate_scx_bpf_error_match(
&self,
captured_text: &str,
expect_err: bool,
) -> Vec<AssertDetail> {
let mut details = Vec::new();
if self.expect_scx_bpf_error_contains.is_none()
&& self.expect_scx_bpf_error_matches.is_none()
{
return details;
}
if !expect_err {
details.push(AssertDetail::new(
DetailKind::Other,
"expect_scx_bpf_error_contains or expect_scx_bpf_error_matches \
requires expect_err = true on the test entry — the matcher narrows \
which failure counts as the expected bug, and only applies to \
expected-error tests; set #[ktstr_test(expect_err = true, ...)] or \
drop the matcher",
));
return details;
}
let excerpt = || -> &str {
let len = captured_text.len();
if len <= 400 {
captured_text
} else {
let mut end = 400;
while end > 0 && !captured_text.is_char_boundary(end) {
end -= 1;
}
&captured_text[..end]
}
};
if let Some(literal) = self.expect_scx_bpf_error_contains
&& !captured_text.contains(literal)
{
details.push(AssertDetail::new(
DetailKind::Other,
format!(
"expect_scx_bpf_error_contains({literal:?}): substring not found \
in the scheduler log + sched_ext dump corpus (the expected bug \
did not fire, or its message text changed). Captured corpus \
{} bytes; up to 400 bytes follow:\n{}",
captured_text.len(),
excerpt(),
),
));
}
if let Some(pattern) = self.expect_scx_bpf_error_matches {
match regex::Regex::new(pattern) {
Ok(re) => {
if !re.is_match(captured_text) {
details.push(AssertDetail::new(
DetailKind::Other,
format!(
"expect_scx_bpf_error_matches({pattern:?}): regex did \
not match the scheduler log + sched_ext dump corpus \
(the expected bug did not fire, or its message text \
changed). Captured corpus {} bytes; up to 400 bytes \
follow:\n{}",
captured_text.len(),
excerpt(),
),
));
}
}
Err(e) => {
details.push(AssertDetail::new(
DetailKind::Other,
format!(
"expect_scx_bpf_error_matches({pattern:?}): regex \
compilation failed: {e}. Fix the pattern at the test \
declaration site — the matcher cannot evaluate against an \
invalid pattern",
),
));
}
}
}
details
}
}
pub mod claim;
pub mod temporal;
pub use claim::{ClaimBuilder, SeqClaim, SetClaim, Verdict};
pub use temporal::{EachClaim, FracPair, PhaseMapExt, SeriesField};
pub fn assert_isolation(reports: &[WorkerReport], expected: &BTreeSet<usize>) -> AssertResult {
let mut r = AssertResult::pass();
for w in reports {
let bad: BTreeSet<usize> = w.cpus_used.difference(expected).copied().collect();
if !bad.is_empty() {
r.record_fail(AssertDetail::new(
DetailKind::Isolation,
format!("tid {} ran on unexpected CPUs {:?}", w.tid, bad),
));
}
}
r
}
fn percentile(sorted: &[u64], p: f64) -> u64 {
if sorted.is_empty() {
return 0;
}
debug_assert!(
sorted.windows(2).all(|w| w[0] <= w[1]),
"percentile() requires sorted input; got slice with out-of-order pair",
);
let n = sorted.len();
let idx = ((n as f64 * p).ceil() as usize)
.saturating_sub(1)
.min(n - 1);
sorted[idx]
}
pub fn assert_not_starved(reports: &[WorkerReport]) -> AssertResult {
let mut r = AssertResult::pass();
if reports.is_empty() {
return r;
}
let cpus: BTreeSet<usize> = reports
.iter()
.flat_map(|w| w.cpus_used.iter().copied())
.collect();
let mut pcts: Vec<f64> = Vec::new();
for w in reports {
if w.work_units == 0 {
r.record_fail(AssertDetail::new(
DetailKind::Starved,
format!("tid {} starved (0 work units)", w.tid),
));
}
if w.wall_time_ns > 0 {
pcts.push(w.off_cpu_ns as f64 / w.wall_time_ns as f64 * 100.0);
}
}
let min = pcts.iter().cloned().reduce(f64::min).unwrap_or(0.0);
let max = pcts.iter().cloned().reduce(f64::max).unwrap_or(0.0);
let avg = if pcts.is_empty() {
0.0
} else {
pcts.iter().sum::<f64>() / pcts.len() as f64
};
let spread = max - min;
let worst_gap = reports.iter().max_by_key(|w| w.max_gap_ms);
let (gap_ms, gap_cpu) = worst_gap
.map(|w| (w.max_gap_ms, w.max_gap_cpu))
.unwrap_or((0, 0));
let all_latencies: Vec<u64> = reports
.iter()
.flat_map(|w| w.wake_latencies_ns.iter().copied())
.collect();
let (p99_us, median_us, lat_cv) = if all_latencies.is_empty() {
(0.0, 0.0, 0.0)
} else {
let mut sorted = all_latencies.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
let median = percentile(&sorted, 0.5) as f64 / 1000.0;
let n = all_latencies.len() as f64;
let mean_ns = all_latencies.iter().sum::<u64>() as f64 / n;
let cv = if mean_ns > 0.0 {
let variance = all_latencies
.iter()
.map(|&v| (v as f64 - mean_ns).powi(2))
.sum::<f64>()
/ n;
variance.sqrt() / mean_ns
} else {
0.0
};
(p99, median, cv)
};
let total_iters: u64 = reports.iter().map(|w| w.iterations).sum();
let run_delays: Vec<f64> = reports
.iter()
.map(|w| w.schedstat_run_delay_ns as f64 / 1000.0)
.collect();
let mean_run_delay = if run_delays.is_empty() {
0.0
} else {
run_delays.iter().sum::<f64>() / run_delays.len() as f64
};
let worst_run_delay = run_delays.iter().cloned().reduce(f64::max).unwrap_or(0.0);
let total_mig: u64 = reports.iter().map(|w| w.migration_count).sum();
let mig_ratio = if total_iters > 0 {
total_mig as f64 / total_iters as f64
} else {
0.0
};
let cg = CgroupStats {
num_workers: reports.len(),
num_cpus: cpus.len(),
avg_off_cpu_pct: avg,
min_off_cpu_pct: min,
max_off_cpu_pct: max,
spread,
max_gap_ms: gap_ms,
max_gap_cpu: gap_cpu,
total_migrations: total_mig,
migration_ratio: mig_ratio,
p99_wake_latency_us: p99_us,
median_wake_latency_us: median_us,
wake_latency_cv: lat_cv,
total_iterations: total_iters,
mean_run_delay_us: mean_run_delay,
worst_run_delay_us: worst_run_delay,
page_locality: 0.0,
cross_node_migration_ratio: 0.0,
ext_metrics: BTreeMap::new(),
};
let spread_limit = spread_threshold_pct();
if spread > spread_limit && pcts.len() >= 2 {
r.record_fail(AssertDetail::new(
DetailKind::Unfair,
format!(
"unfair cgroup: spread={:.0}% ({:.0}-{:.0}%) {} workers on {} cpus (threshold {:.0}%)",
spread,
min,
max,
reports.len(),
cpus.len(),
spread_limit,
),
));
}
let gap_limit = gap_threshold_ms();
for w in reports {
if w.max_gap_ms > gap_limit {
r.record_fail(AssertDetail::new(
DetailKind::Stuck,
format!(
"tid {} stuck {}ms on cpu{} at +{}ms (threshold {}ms)",
w.tid, w.max_gap_ms, w.max_gap_cpu, w.max_gap_at_ms, gap_limit,
),
));
}
}
r.stats = ScenarioStats {
total_workers: reports.len(),
total_cpus: cpus.len(),
total_migrations: reports.iter().map(|w| w.migration_count).sum(),
worst_spread: spread,
worst_gap_ms: gap_ms,
worst_gap_cpu: gap_cpu,
worst_migration_ratio: cg.migration_ratio,
worst_p99_wake_latency_us: cg.p99_wake_latency_us,
worst_median_wake_latency_us: cg.median_wake_latency_us,
worst_wake_latency_cv: cg.wake_latency_cv,
total_iterations: cg.total_iterations,
worst_mean_run_delay_us: cg.mean_run_delay_us,
worst_run_delay_us: cg.worst_run_delay_us,
worst_page_locality: 0.0,
worst_cross_node_migration_ratio: 0.0,
worst_wake_latency_tail_ratio: cg.wake_latency_tail_ratio(),
worst_iterations_per_worker: cg.iterations_per_worker(),
ext_metrics: cg.ext_metrics.clone(),
cgroups: vec![cg],
phases: Vec::new(),
};
r
}
pub fn assert_throughput_parity(
reports: &[WorkerReport],
max_cv: Option<f64>,
min_rate: Option<f64>,
) -> AssertResult {
let mut r = AssertResult::pass();
if reports.is_empty() {
return r;
}
let rates: Vec<f64> = reports
.iter()
.map(|w| {
if w.cpu_time_ns == 0 {
0.0
} else {
w.work_units as f64 / (w.cpu_time_ns as f64 / 1e9)
}
})
.collect();
let n = rates.len() as f64;
let mean = rates.iter().sum::<f64>() / n;
let all_zero_cpu = reports.iter().all(|w| w.cpu_time_ns == 0);
if all_zero_cpu && (max_cv.is_some() || min_rate.is_some()) {
let mut limits: Vec<String> = Vec::with_capacity(2);
if let Some(cv_limit) = max_cv {
limits.push(format!("max_cv {cv_limit:.3}"));
}
if let Some(floor) = min_rate {
limits.push(format!("min_rate {floor:.0}"));
}
r.record_inconclusive(AssertDetail::new(
DetailKind::Benchmark,
format!(
"throughput parity inconclusive: all {} workers recorded zero cpu_time_ns — \
denominator is zero, rates cannot be computed; {} neither pass nor fail \
(was the workload able to run?)",
reports.len(),
limits.join(" + "),
),
));
return r;
}
if let Some(cv_limit) = max_cv
&& mean > 0.0
&& rates.len() >= 2
{
let variance = rates.iter().map(|r| (r - mean).powi(2)).sum::<f64>() / n;
let stddev = variance.sqrt();
let cv = stddev / mean;
if cv > cv_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"throughput CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0} work/cpu_s)"
),
));
}
}
if let Some(floor) = min_rate {
for (i, &rate) in rates.iter().enumerate() {
if reports[i].cpu_time_ns == 0 {
continue;
}
if rate < floor {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"worker {} throughput {rate:.0} work/cpu_s below floor {floor:.0}",
reports[i].tid
),
));
}
}
}
r
}
pub fn assert_benchmarks(
reports: &[WorkerReport],
max_p99_ns: Option<u64>,
max_cv: Option<f64>,
min_iter_rate: Option<f64>,
) -> AssertResult {
let mut r = AssertResult::pass();
if reports.is_empty() {
return AssertResult::skip("no worker reports — benchmark skipped");
}
let all_latencies: Vec<u64> = reports
.iter()
.flat_map(|w| w.wake_latencies_ns.iter().copied())
.collect();
if let Some(p99_limit) = max_p99_ns
&& !all_latencies.is_empty()
{
let mut sorted = all_latencies.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99);
if p99 > p99_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"p99 wake latency {p99}ns exceeds limit {p99_limit}ns ({} samples)",
sorted.len()
),
));
}
}
if let Some(cv_limit) = max_cv
&& all_latencies.len() >= 2
{
let n = all_latencies.len() as f64;
let mean = all_latencies.iter().sum::<u64>() as f64 / n;
if mean > 0.0 {
let variance = all_latencies
.iter()
.map(|&v| (v as f64 - mean).powi(2))
.sum::<f64>()
/ n;
let cv = variance.sqrt() / mean;
if cv > cv_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"wake latency CV {cv:.3} exceeds limit {cv_limit:.3} (mean={mean:.0}ns)"
),
));
}
} else {
r.record_inconclusive(AssertDetail::new(
DetailKind::Benchmark,
format!(
"wake latency CV inconclusive: all {} sample(s) had zero mean wake \
latency — denominator is zero, CV cannot be computed; limit \
{cv_limit:.3} neither pass nor fail (did any wake event capture a \
non-zero latency?)",
all_latencies.len(),
),
));
}
}
if let Some(rate_floor) = min_iter_rate {
let mut zero_wall_count = 0usize;
for w in reports {
if w.wall_time_ns == 0 {
zero_wall_count += 1;
continue;
}
let rate = w.iterations as f64 / (w.wall_time_ns as f64 / 1e9);
if rate < rate_floor {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"worker {} iteration rate {rate:.1}/s below floor {rate_floor:.1}/s",
w.tid
),
));
}
}
if zero_wall_count == reports.len() {
r.record_inconclusive(AssertDetail::new(
DetailKind::Benchmark,
format!(
"min iteration rate inconclusive: all {} workers recorded zero wall_time_ns — \
denominator is zero, rate cannot be computed; floor {rate_floor:.1}/s \
neither pass nor fail (was the workload able to run?)",
reports.len()
),
));
}
}
r
}
pub fn assert_scx_events_clean(events: &[(&str, i64)], max_count: Option<i64>) -> AssertResult {
let mut r = AssertResult::pass();
for (name, count) in events {
let failed = match max_count {
None => *count != 0,
Some(bound) => *count < 0 || *count > bound,
};
if failed {
let bound_desc = match max_count {
None => "0".to_string(),
Some(b) => b.to_string(),
};
r.record_fail(AssertDetail::new(
DetailKind::SchedulerEvent,
format!("scx event `{name}` count {count} exceeds bound {bound_desc}",),
));
}
}
r
}
#[must_use = "SchedulerBaseline only takes effect when passed to assert_baseline"]
#[derive(Debug, Clone, Copy, Default)]
pub struct SchedulerBaseline {
pub max_p99_wake_latency_ns: Option<u64>,
pub max_iteration_cost_p99_ns: Option<u64>,
pub max_migrations: Option<u64>,
pub min_work_units: Option<u64>,
}
impl SchedulerBaseline {
pub const fn strict() -> Self {
Self {
max_p99_wake_latency_ns: Some(10_000_000),
max_iteration_cost_p99_ns: Some(1_000_000),
max_migrations: Some(1000),
min_work_units: Some(1),
}
}
pub const fn max_p99_wake_latency_ns(mut self, v: u64) -> Self {
self.max_p99_wake_latency_ns = Some(v);
self
}
pub const fn max_iteration_cost_p99_ns(mut self, v: u64) -> Self {
self.max_iteration_cost_p99_ns = Some(v);
self
}
pub const fn max_migrations(mut self, v: u64) -> Self {
self.max_migrations = Some(v);
self
}
pub const fn min_work_units(mut self, v: u64) -> Self {
self.min_work_units = Some(v);
self
}
}
pub fn assert_baseline(reports: &[WorkerReport], baseline: &SchedulerBaseline) -> AssertResult {
if reports.is_empty() {
return AssertResult::skip("no worker reports to evaluate");
}
let mut r = AssertResult::pass();
if baseline.max_p99_wake_latency_ns.is_some() {
r.merge(assert_benchmarks(
reports,
baseline.max_p99_wake_latency_ns,
None,
None,
));
}
if let Some(cost_limit) = baseline.max_iteration_cost_p99_ns {
let all_costs: Vec<u64> = reports
.iter()
.flat_map(|w| w.iteration_costs_ns.iter().copied())
.collect();
if !all_costs.is_empty() {
let mut sorted = all_costs.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99);
if p99 > cost_limit {
r.record_fail(AssertDetail::new(
DetailKind::Benchmark,
format!(
"p99 iteration cost {p99}ns exceeds limit {cost_limit}ns ({} samples)",
sorted.len(),
),
));
}
}
}
if let Some(max_mig) = baseline.max_migrations {
let total_mig: u64 = reports.iter().map(|w| w.migration_count).sum();
if total_mig > max_mig {
r.record_fail(AssertDetail::new(
DetailKind::Migration,
format!(
"total migrations {total_mig} exceeds limit {max_mig} ({} workers)",
reports.len(),
),
));
}
}
if let Some(min_units) = baseline.min_work_units {
for w in reports {
if w.work_units < min_units {
r.record_fail(AssertDetail::new(
DetailKind::Starved,
format!(
"tid {} work_units {} below floor {min_units}",
w.tid, w.work_units,
),
));
}
}
}
r
}
#[cfg(test)]
mod tests_assert;
#[cfg(test)]
mod tests_benchmarks;
#[cfg(test)]
mod tests_common;
#[cfg(test)]
mod tests_merge;
#[cfg(test)]
mod tests_note;
#[cfg(test)]
mod tests_numa;
#[cfg(test)]
mod tests_percentile;
#[cfg(test)]
mod tests_phase_bucket;
#[cfg(test)]
mod tests_plan;
#[cfg(test)]
mod tests_sched_died;
#[cfg(test)]
mod tests_serde;
#[cfg(test)]
mod tests_stats;
#[cfg(test)]
mod tests_verdict;
#[cfg(test)]
mod tests_worker;