use super::*;
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct ScenarioStats {
pub cgroups: Vec<CgroupStats>,
pub total_workers: usize,
pub total_cpus: usize,
pub total_migrations: u64,
pub worst_spread: f64,
pub worst_gap_ms: u64,
pub worst_gap_cpu: usize,
pub worst_migration_ratio: f64,
pub total_iterations: u64,
pub worst_page_locality: f64,
pub worst_cross_node_migration_ratio: f64,
pub ext_metrics: BTreeMap<String, f64>,
#[serde(default)]
pub phases: Vec<PhaseBucket>,
}
impl ScenarioStats {
pub fn phase(&self, step_index: u16) -> Option<&PhaseBucket> {
self.phases.iter().find(|p| p.step_index == step_index)
}
pub fn step(&self, scenario_step_idx: u16) -> Option<&PhaseBucket> {
scenario_step_idx
.checked_add(1)
.and_then(|phase_idx| self.phase(phase_idx))
}
pub fn phase_metric(&self, step_index: u16, metric: &str) -> Option<f64> {
self.phase(step_index).and_then(|p| p.get(metric))
}
pub fn cgroup_balance_ratio(&self) -> Option<f64> {
let mut min = f64::INFINITY;
let mut max = 0.0_f64;
let mut n = 0usize;
for cg in &self.cgroups {
if let Some(rate) = cg.iterations_per_worker() {
min = min.min(rate);
max = max.max(rate);
n += 1;
}
}
if n < 2 {
return None;
}
if min == 0.0 {
return Some(f64::INFINITY);
}
Some(max / min)
}
pub fn step_metric(&self, scenario_step_idx: u16, metric: &str) -> Option<f64> {
self.step(scenario_step_idx).and_then(|p| p.get(metric))
}
pub fn is_known_metric(name: &str) -> bool {
crate::stats::METRICS.iter().any(|m| m.name == name)
}
pub fn known_metrics() -> impl Iterator<Item = &'static str> {
crate::stats::METRICS.iter().map(|m| m.name)
}
pub fn has_steps(&self) -> bool {
self.phases.iter().any(|p| p.step_index >= 1)
}
pub fn run_metric(&self, name: &str) -> Option<f64> {
self.ext_metrics.get(name).copied()
}
}
const TYPED_FIELD_NAMES: &[&str] = &[
"max_dsq_depth",
"max_imbalance_ratio",
"total_fallback",
"total_keep_last",
"stuck_count",
"total_iterations",
"total_migrations",
];
pub fn populate_run_ext_metrics_from_phases(
phases: &[PhaseBucket],
target: &mut std::collections::BTreeMap<String, f64>,
) {
let mut keys: std::collections::BTreeSet<&String> = std::collections::BTreeSet::new();
for phase in phases {
for key in phase.metrics.keys() {
keys.insert(key);
}
}
for key in keys {
if target.contains_key(key) {
continue;
}
let Some(def) = crate::stats::metric_def(key) else {
continue;
};
if def.kind.is_derived() {
continue;
}
if TYPED_FIELD_NAMES.contains(&key.as_str()) {
continue;
}
let pairs: Vec<(f64, usize)> = phases
.iter()
.filter_map(|phase| {
phase
.metrics
.get(key)
.copied()
.map(|v| (v, phase.sample_count.max(1)))
})
.collect();
if pairs.is_empty() {
continue;
}
if let Some(reduced) = crate::stats::aggregate_samples_weighted(&pairs, def.kind) {
target.insert(key.clone(), reduced);
}
}
crate::stats::derive_rate_metrics(target);
}
pub fn populate_run_pooled_iterations_per_cpu_sec(stats: &mut ScenarioStats) {
let summed_ns: u64 = stats
.cgroups
.iter()
.filter(|c| c.total_cpu_time_ns > 0)
.map(|c| c.total_cpu_time_ns)
.sum();
if summed_ns == 0 {
return;
}
let summed_iters: u64 = stats
.cgroups
.iter()
.filter(|c| c.total_cpu_time_ns > 0)
.map(|c| c.total_iterations)
.sum();
stats
.ext_metrics
.insert("total_iterations_pooled".to_string(), summed_iters as f64);
stats
.ext_metrics
.insert("total_cpu_time_sec".to_string(), summed_ns as f64 / 1e9);
crate::stats::derive_rate_metrics(&mut stats.ext_metrics);
}
pub fn populate_run_distribution_metrics(stats: &mut ScenarioStats) {
let mut wake_pool: Vec<(u64, f64)> = Vec::new();
let mut run_delay_pool: Vec<u64> = Vec::new();
let mut wake_carriers: std::collections::BTreeSet<&str> = std::collections::BTreeSet::new();
let mut run_delay_carriers: std::collections::BTreeSet<&str> =
std::collections::BTreeSet::new();
for phase in &stats.phases {
for (cgname, pcg) in &phase.per_cgroup {
if !pcg.wake_latencies_ns.is_empty() {
let len = pcg.wake_latencies_ns.len() as u64;
debug_assert!(
pcg.wake_sample_total >= len,
"wake_sample_total ({}) < reservoir len ({}): malformed carrier",
pcg.wake_sample_total,
len,
);
let w = pcg.wake_sample_total.max(len) as f64 / len as f64;
wake_pool.extend(pcg.wake_latencies_ns.iter().map(|&v| (v, w)));
wake_carriers.insert(cgname.as_str());
}
if !pcg.run_delays_ns.is_empty() {
run_delay_pool.extend_from_slice(&pcg.run_delays_ns);
run_delay_carriers.insert(cgname.as_str());
}
}
}
wake_pool.sort_unstable_by_key(|&(v, _)| v);
run_delay_pool.sort_unstable();
populate_run_distribution_metrics_from(
&mut stats.ext_metrics,
crate::stats::METRICS.iter().filter_map(|m| {
matches!(
m.kind,
crate::stats::MetricKind::Distribution { .. }
| crate::stats::MetricKind::WorstLowest { .. }
| crate::stats::MetricKind::WakeLatencyTailRatio
)
.then_some((m.name, m.kind))
}),
&wake_pool,
&wake_carriers,
&run_delay_pool,
&run_delay_carriers,
&stats.cgroups,
stats.total_iterations,
);
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn populate_run_distribution_metrics_from<'a>(
target: &mut std::collections::BTreeMap<String, f64>,
metrics: impl Iterator<Item = (&'a str, crate::stats::MetricKind)>,
wake_pool: &[(u64, f64)],
wake_carriers: &std::collections::BTreeSet<&str>,
run_delay_pool: &[u64],
run_delay_carriers: &std::collections::BTreeSet<&str>,
cgroups: &[CgroupStats],
run_total_iterations: u64,
) {
use crate::stats::{MetricKind, SampleSource, WorstLowestDenominator};
for (name, kind) in metrics {
let value: Option<f64> = match kind {
MetricKind::Distribution { source, reduction } => {
let (mut v, carriers): (Option<f64>, &std::collections::BTreeSet<&str>) =
match source {
SampleSource::WakeLatencyNs => (
(!wake_pool.is_empty())
.then(|| reduce_weighted_sorted_distribution(wake_pool, reduction)),
wake_carriers,
),
SampleSource::RunDelayNs => (
(!run_delay_pool.is_empty())
.then(|| reduce_sorted_distribution(run_delay_pool, reduction)),
run_delay_carriers,
),
};
for cg in cgroups {
if !carriers.contains(cg.cgroup_name.as_str()) {
let r = distribution_cgroup_reduction(cg, source, reduction);
v = Some(v.map_or(r, |acc| acc.max(r)));
}
}
v
}
MetricKind::WorstLowest { denominator, .. } => {
let mut worst: Option<f64> = None;
for cg in cgroups {
let per_cg = match denominator {
WorstLowestDenominator::NumWorkers => cg.iterations_per_worker(),
WorstLowestDenominator::CpuTimeNs => cg.iterations_per_cpu_sec(),
};
if let Some(v) = per_cg
&& worst.is_none_or(|w| v < w)
{
worst = Some(v);
}
}
worst
}
MetricKind::WakeLatencyTailRatio => {
if run_total_iterations < crate::stats::WAKE_LATENCY_TAIL_RATIO_MIN_ITERATIONS {
None
} else {
let mut worst: Option<f64> = None;
for cg in cgroups {
let r = cg.wake_latency_tail_ratio();
if r > 0.0 {
worst = Some(worst.map_or(r, |w| w.max(r)));
}
}
worst
}
}
_ => None,
};
if let Some(v) = value.filter(|v| v.is_finite()) {
target.insert(name.to_string(), v);
}
}
}
pub(crate) fn reduce_sorted_distribution(
sorted: &[u64],
reduction: crate::stats::SampleReduction,
) -> f64 {
use crate::stats::SampleReduction;
match reduction {
SampleReduction::P99 => percentile(sorted, 0.99) as f64 / 1000.0,
SampleReduction::Median => percentile(sorted, 0.5) as f64 / 1000.0,
SampleReduction::Cv => {
let n = sorted.len() as f64;
let mean_ns = sorted.iter().sum::<u64>() as f64 / n;
if mean_ns > 0.0 {
let variance = sorted
.iter()
.map(|&v| (v as f64 - mean_ns).powi(2))
.sum::<f64>()
/ n;
variance.sqrt() / mean_ns
} else {
0.0
}
}
SampleReduction::Mean => {
sorted.iter().map(|&v| v as f64).sum::<f64>() / sorted.len() as f64 / 1000.0
}
SampleReduction::Worst => *sorted.last().expect("non-empty by caller") as f64 / 1000.0,
}
}
pub(crate) fn weighted_percentile(sorted: &[(u64, f64)], p: f64) -> u64 {
if sorted.is_empty() {
return 0;
}
debug_assert!(
sorted.windows(2).all(|w| w[0].0 <= w[1].0),
"weighted_percentile() requires value-sorted input",
);
let total: f64 = sorted.iter().map(|&(_, w)| w).sum();
let target = (total * p).ceil().max(1.0);
let mut cum = 0.0;
for &(v, w) in sorted {
cum += w;
if cum >= target {
return v;
}
}
sorted.last().map(|&(v, _)| v).unwrap_or(0)
}
pub(crate) fn reduce_weighted_sorted_distribution(
sorted: &[(u64, f64)],
reduction: crate::stats::SampleReduction,
) -> f64 {
use crate::stats::SampleReduction;
match reduction {
SampleReduction::P99 => weighted_percentile(sorted, 0.99) as f64 / 1000.0,
SampleReduction::Median => weighted_percentile(sorted, 0.5) as f64 / 1000.0,
SampleReduction::Cv => {
let total_w: f64 = sorted.iter().map(|&(_, w)| w).sum();
if total_w <= 0.0 {
return 0.0;
}
let mean_ns = sorted.iter().map(|&(v, w)| v as f64 * w).sum::<f64>() / total_w;
if mean_ns > 0.0 {
let variance = sorted
.iter()
.map(|&(v, w)| w * (v as f64 - mean_ns).powi(2))
.sum::<f64>()
/ total_w;
variance.sqrt() / mean_ns
} else {
0.0
}
}
SampleReduction::Mean => {
let total_w: f64 = sorted.iter().map(|&(_, w)| w).sum();
if total_w <= 0.0 {
return 0.0;
}
sorted.iter().map(|&(v, w)| v as f64 * w).sum::<f64>() / total_w / 1000.0
}
SampleReduction::Worst => sorted.last().map(|&(v, _)| v).unwrap_or(0) as f64 / 1000.0,
}
}
fn distribution_cgroup_reduction(
cg: &CgroupStats,
source: crate::stats::SampleSource,
reduction: crate::stats::SampleReduction,
) -> f64 {
use crate::stats::{SampleReduction, SampleSource};
match source {
SampleSource::WakeLatencyNs => match reduction {
SampleReduction::P99 => cg.p99_wake_latency_us,
SampleReduction::Median => cg.median_wake_latency_us,
SampleReduction::Cv => cg.wake_latency_cv,
SampleReduction::Mean | SampleReduction::Worst => {
debug_assert!(false, "no CgroupStats wake reduction for {reduction:?}");
f64::NAN
}
},
SampleSource::RunDelayNs => match reduction {
SampleReduction::Mean => cg.mean_run_delay_us,
SampleReduction::Worst => cg.worst_run_delay_us,
SampleReduction::P99 | SampleReduction::Median | SampleReduction::Cv => {
debug_assert!(
false,
"no CgroupStats run-delay reduction for {reduction:?}"
);
f64::NAN
}
},
}
}
pub fn populate_run_ext_metrics(
samples: &crate::scenario::sample::SampleSeries,
target: &mut std::collections::BTreeMap<String, f64>,
) {
for metric_def in crate::stats::METRICS {
if target.contains_key(metric_def.name) {
continue;
}
if TYPED_FIELD_NAMES.contains(&metric_def.name) {
continue;
}
let readings: Vec<f64> = samples
.iter_samples()
.filter_map(|s| metric_def.read_sample(&s))
.collect();
if readings.is_empty() {
continue;
}
if let Some(reduced) = crate::stats::aggregate_samples_for_phase(metric_def, &readings) {
target.insert(metric_def.name.to_string(), reduced);
}
}
crate::stats::derive_rate_metrics(target);
}