use super::*;
#[must_use = "test verdict is lost if not checked"]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AssertResult {
pub outcomes: Vec<Outcome>,
pub passes: Vec<PassDetail>,
pub stats: ScenarioStats,
pub measurements: std::collections::BTreeMap<String, NoteValue>,
pub info_notes: Vec<InfoNote>,
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct CgroupStats {
pub cgroup_name: String,
pub num_workers: usize,
pub cpus_used: BTreeSet<usize>,
pub num_cpus: usize,
pub avg_off_cpu_pct: Option<f64>,
pub min_off_cpu_pct: Option<f64>,
pub max_off_cpu_pct: Option<f64>,
pub spread: Option<f64>,
pub max_gap_ms: u64,
pub max_gap_cpu: usize,
pub total_migrations: u64,
pub migration_ratio: f64,
pub p99_wake_latency_us: f64,
pub median_wake_latency_us: f64,
pub wake_latency_cv: f64,
pub total_iterations: u64,
pub total_cpu_time_ns: u64,
pub mean_run_delay_us: f64,
pub worst_run_delay_us: f64,
pub page_locality: f64,
pub cross_node_migration_ratio: f64,
pub ext_metrics: BTreeMap<String, f64>,
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct PhaseCgroupStats {
pub num_workers: usize,
pub cpus_used: std::collections::BTreeSet<usize>,
pub wake_latencies_ns: Vec<u64>,
pub wake_sample_total: u64,
pub run_delays_ns: Vec<u64>,
pub off_cpu_pcts: Vec<f64>,
pub total_migrations: u64,
pub total_iterations: u64,
pub total_cpu_time_ns: u64,
pub numa_pages_local: u64,
pub numa_pages_total: u64,
pub cross_node_migrated: u64,
pub max_gap_ms: u64,
pub max_gap_cpu: usize,
pub stripped: bool,
}
impl PhaseCgroupStats {
pub(crate) fn merge(a: PhaseCgroupStats, b: PhaseCgroupStats) -> PhaseCgroupStats {
let cap = crate::workload::MAX_WAKE_SAMPLES;
let wake_latencies_ns = if a.wake_latencies_ns.len() + b.wake_latencies_ns.len() <= cap {
let mut v = a.wake_latencies_ns;
v.extend(b.wake_latencies_ns);
v
} else {
Self::weighted_merge_reservoirs(
&a.wake_latencies_ns,
a.wake_sample_total,
&b.wake_latencies_ns,
b.wake_sample_total,
cap,
)
};
let mut run_delays_ns = a.run_delays_ns;
run_delays_ns.extend(b.run_delays_ns);
let mut off_cpu_pcts = a.off_cpu_pcts;
off_cpu_pcts.extend(b.off_cpu_pcts);
let mut cpus_used = a.cpus_used;
cpus_used.extend(b.cpus_used);
let (max_gap_ms, max_gap_cpu) = if b.max_gap_ms >= a.max_gap_ms {
(b.max_gap_ms, b.max_gap_cpu)
} else {
(a.max_gap_ms, a.max_gap_cpu)
};
PhaseCgroupStats {
num_workers: a.num_workers + b.num_workers,
cpus_used,
wake_latencies_ns,
wake_sample_total: a.wake_sample_total + b.wake_sample_total,
run_delays_ns,
off_cpu_pcts,
total_migrations: a.total_migrations + b.total_migrations,
total_iterations: a.total_iterations + b.total_iterations,
total_cpu_time_ns: a.total_cpu_time_ns + b.total_cpu_time_ns,
numa_pages_local: a.numa_pages_local + b.numa_pages_local,
numa_pages_total: a.numa_pages_total + b.numa_pages_total,
cross_node_migrated: a.cross_node_migrated.max(b.cross_node_migrated),
max_gap_ms,
max_gap_cpu,
stripped: a.stripped || b.stripped,
}
}
pub(crate) fn weighted_merge_reservoirs(
a: &[u64],
w_a: u64,
b: &[u64],
w_b: u64,
cap: usize,
) -> Vec<u64> {
if a.is_empty() && b.is_empty() {
return Vec::new();
}
let (wa, wb) = if w_a == 0 && w_b == 0 {
(a.len() as u128, b.len() as u128)
} else {
(w_a as u128, w_b as u128)
};
let total = wa + wb;
debug_assert!(
total <= u64::MAX as u128,
"weighted_merge_reservoirs: w_a + w_b overflows u64 ({total}); source draw would bias",
);
const GOLDEN: u64 = 0x9E37_79B9_7F4A_7C15;
let mut s =
(w_a ^ w_b.rotate_left(32) ^ (a.len() as u64).rotate_left(16) ^ (b.len() as u64))
.wrapping_mul(GOLDEN);
if s == 0 {
s = GOLDEN;
}
let step = |x: u64| {
let mut v = x;
v ^= v << 13;
v ^= v >> 7;
v ^= v << 17;
v
};
let mut out = Vec::with_capacity(cap);
for _ in 0..cap {
s = step(s);
let from_a = if a.is_empty() {
false
} else if b.is_empty() {
true
} else {
(s as u128 % total) < wa
};
s = step(s);
if from_a {
out.push(a[(s % a.len() as u64) as usize]);
} else {
out.push(b[(s % b.len() as u64) as usize]);
}
}
out
}
pub fn off_cpu_summary(&self) -> Option<(f64, f64, f64, f64)> {
let pcts = &self.off_cpu_pcts;
if pcts.is_empty() {
return None;
}
let min = pcts.iter().cloned().reduce(f64::min).expect("non-empty");
let max = pcts.iter().cloned().reduce(f64::max).expect("non-empty");
let avg = pcts.iter().sum::<f64>() / pcts.len() as f64;
Some((avg, min, max, max - min))
}
pub fn wake_summary(&self) -> Option<(f64, f64)> {
if self.wake_latencies_ns.is_empty() {
return None;
}
let mut sorted = self.wake_latencies_ns.clone();
sorted.sort_unstable();
let p99 = percentile(&sorted, 0.99) as f64 / 1000.0;
let median = percentile(&sorted, 0.5) as f64 / 1000.0;
Some((p99, median))
}
pub fn run_delay_summary(&self) -> Option<(f64, f64)> {
if self.run_delays_ns.is_empty() {
return None;
}
let n = self.run_delays_ns.len() as f64;
let mean = self.run_delays_ns.iter().map(|&v| v as f64).sum::<f64>() / n / 1000.0;
let worst = *self.run_delays_ns.iter().max().expect("non-empty") as f64 / 1000.0;
Some((mean, worst))
}
}
impl CgroupStats {
pub fn wake_latency_tail_ratio(&self) -> f64 {
if self.median_wake_latency_us > 0.0 {
self.p99_wake_latency_us / self.median_wake_latency_us
} else {
0.0
}
}
pub fn iterations_per_worker(&self) -> Option<f64> {
if self.num_workers > 0 {
Some(self.total_iterations as f64 / self.num_workers as f64)
} else {
None
}
}
pub fn iterations_per_cpu_sec(&self) -> Option<f64> {
if self.num_workers == 0 || self.total_cpu_time_ns == 0 {
return None;
}
Some(self.total_iterations as f64 / (self.total_cpu_time_ns as f64 / 1e9))
}
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
Default,
serde::Serialize,
serde::Deserialize,
)]
#[serde(transparent)]
pub struct Phase(u16);
impl Phase {
pub const BASELINE: Self = Self(0);
pub const fn step(zero_indexed: u16) -> Self {
Self(zero_indexed.saturating_add(1))
}
pub const fn is_baseline(&self) -> bool {
self.0 == 0
}
pub const fn as_u16(self) -> u16 {
self.0
}
}
impl std::fmt::Display for Phase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_baseline() {
write!(f, "BASELINE")
} else {
write!(f, "Step[{}]", self.0 - 1)
}
}
}
impl From<u16> for Phase {
fn from(value: u16) -> Self {
Self(value)
}
}
impl From<Phase> for u16 {
fn from(value: Phase) -> Self {
value.0
}
}
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct PhaseBucket {
pub step_index: u16,
pub label: String,
pub start_ms: u64,
pub end_ms: u64,
pub sample_count: usize,
pub metrics: std::collections::BTreeMap<String, f64>,
pub per_cgroup: std::collections::BTreeMap<String, PhaseCgroupStats>,
}
impl PhaseBucket {
pub fn get(&self, metric_name: &str) -> Option<f64> {
self.metrics.get(metric_name).copied()
}
pub fn expect_metric(&self, metric_name: &str) -> f64 {
self.get(metric_name).unwrap_or_else(|| {
panic!(
"PhaseBucket::expect_metric: metric '{}' absent from phase \
step_index={} ('{}') with sample_count={}. \
metric keys present in this bucket: {:?}. \
Possible causes: (a) phase carried 0 samples for this \
metric (sample_count==0 means no captures landed in the \
phase at all; sample_count>0 means captures landed but \
the metric extracted no finite values from them); \
(b) metric name typo (verify against \
ScenarioStats::is_known_metric / known_metrics).",
metric_name,
self.step_index,
self.label,
self.sample_count,
self.metrics.keys().collect::<Vec<_>>(),
)
})
}
}
pub(crate) fn merge_matched_phase_buckets(a: PhaseBucket, b: PhaseBucket) -> PhaseBucket {
assert_eq!(
a.step_index, b.step_index,
"merge_matched_phase_buckets: caller must pair by step_index",
);
let mut metrics = std::collections::BTreeMap::new();
let mut keys: std::collections::BTreeSet<&String> = a.metrics.keys().collect();
keys.extend(b.metrics.keys());
for key in keys {
if crate::stats::metric_def(key).is_some_and(|m| m.kind.is_derived()) {
continue;
}
let av = a.metrics.get(key).copied();
let bv = b.metrics.get(key).copied();
let merged = match (av, bv) {
(Some(av), Some(bv)) => {
let kind = crate::stats::metric_def(key).map(|m| m.kind);
merge_metric_values(
kind,
av,
bv,
a.sample_count,
b.sample_count,
a.end_ms,
b.end_ms,
)
}
(Some(v), None) | (None, Some(v)) => v,
(None, None) => continue,
};
metrics.insert(key.clone(), merged);
}
crate::stats::derive_rate_metrics(&mut metrics);
let mut per_cgroup = a.per_cgroup;
for (name, b_cg) in b.per_cgroup {
match per_cgroup.remove(&name) {
Some(a_cg) => {
per_cgroup.insert(name, PhaseCgroupStats::merge(a_cg, b_cg));
}
None => {
per_cgroup.insert(name, b_cg);
}
}
}
PhaseBucket {
step_index: a.step_index,
label: a.label,
start_ms: a.start_ms.min(b.start_ms),
end_ms: a.end_ms.max(b.end_ms),
sample_count: a.sample_count + b.sample_count,
metrics,
per_cgroup,
}
}
pub(crate) fn fold_guest_per_cgroup_into_host_buckets(
host_buckets: Vec<PhaseBucket>,
guest_buckets: Vec<PhaseBucket>,
) -> Vec<PhaseBucket> {
let host_len = host_buckets.len();
let mut by_idx: std::collections::BTreeMap<u16, PhaseBucket> =
std::collections::BTreeMap::new();
for b in host_buckets {
match by_idx.remove(&b.step_index) {
Some(existing) => {
by_idx.insert(b.step_index, merge_matched_phase_buckets(existing, b));
}
None => {
by_idx.insert(b.step_index, b);
}
}
}
debug_assert_eq!(
by_idx.len(),
host_len,
"host buckets must have unique step_index; a collision merged (not dropped)",
);
for gb in guest_buckets {
debug_assert!(
gb.start_ms == u64::MAX && gb.end_ms == 0,
"guest carrier must carry the merge-neutral (u64::MAX, 0) window; got ({}, {})",
gb.start_ms,
gb.end_ms,
);
match by_idx.remove(&gb.step_index) {
Some(hb) => {
by_idx.insert(gb.step_index, merge_matched_phase_buckets(hb, gb));
}
None => {
let mut orphan = gb;
orphan.start_ms = 0;
orphan.end_ms = 0;
by_idx.insert(orphan.step_index, orphan);
}
}
}
by_idx.into_values().collect()
}
fn merge_metric_values(
kind: Option<crate::stats::MetricKind>,
a: f64,
b: f64,
a_count: usize,
b_count: usize,
a_end_ms: u64,
b_end_ms: u64,
) -> f64 {
use crate::stats::{GaugeAgg, MetricKind};
match kind {
Some(MetricKind::Counter) | Some(MetricKind::DeltaSum) => a + b,
Some(MetricKind::Peak) | Some(MetricKind::Gauge(GaugeAgg::Max)) => a.max(b),
Some(MetricKind::Gauge(GaugeAgg::Avg)) => {
let a_w = a_count.max(1) as f64;
let b_w = b_count.max(1) as f64;
(a * a_w + b * b_w) / (a_w + b_w)
}
Some(MetricKind::Gauge(GaugeAgg::Last)) | Some(MetricKind::Timestamp) => {
if b_end_ms > a_end_ms { b } else { a }
}
Some(MetricKind::Rate { .. })
| Some(MetricKind::Distribution { .. })
| Some(MetricKind::WorstLowest { .. })
| Some(MetricKind::WakeLatencyTailRatio) => unreachable!(
"derived metrics (Rate/Distribution/WorstLowest/WakeLatencyTailRatio) are produced post-merge, not merged as values"
),
None => (a + b) / 2.0,
}
}