use std::collections::BTreeMap;
use super::{PhaseBucket, ScenarioStats};
#[test]
fn phase_bucket_json_round_trips_all_fields() {
let mut metrics = BTreeMap::new();
metrics.insert("worst_spread".to_string(), 0.42);
metrics.insert("dsq_depth_max".to_string(), 12.0);
let bucket = PhaseBucket {
step_index: 7,
label: "Step[6]".to_string(),
start_ms: 1500,
end_ms: u64::MAX,
sample_count: 42,
metrics,
};
let json = serde_json::to_string(&bucket).expect("serialize");
let back: PhaseBucket = serde_json::from_str(&json).expect("deserialize");
assert_eq!(back, bucket);
}
#[test]
fn phase_bucket_empty_metrics_round_trips_as_empty_object() {
let bucket = PhaseBucket {
step_index: 0,
label: "BASELINE".to_string(),
start_ms: 0,
end_ms: 100,
sample_count: 0,
metrics: BTreeMap::new(),
};
let json = serde_json::to_string(&bucket).expect("serialize");
assert!(
json.contains(r#""metrics":{}"#),
"empty metrics must serialize as present `metrics: {{}}`, got: {json}"
);
let back: PhaseBucket = serde_json::from_str(&json).expect("deserialize");
assert_eq!(back, bucket);
}
#[test]
fn phase_bucket_step_index_u16_max_round_trips() {
let bucket = PhaseBucket {
step_index: u16::MAX,
label: "Step[65534]".to_string(),
start_ms: 0,
end_ms: 1,
sample_count: 0,
metrics: BTreeMap::new(),
};
let json = serde_json::to_string(&bucket).expect("serialize");
let back: PhaseBucket = serde_json::from_str(&json).expect("deserialize");
assert_eq!(back.step_index, u16::MAX);
assert_eq!(back, bucket);
}
#[test]
fn phase_bucket_empty_label_round_trips_as_present_field() {
let bucket = PhaseBucket {
step_index: 0,
label: String::new(),
start_ms: 0,
end_ms: 0,
sample_count: 0,
metrics: BTreeMap::new(),
};
let json = serde_json::to_string(&bucket).expect("serialize");
assert!(
json.contains(r#""label":"""#),
"empty label must serialize as present `label: \"\"`, got: {json}"
);
let back: PhaseBucket = serde_json::from_str(&json).expect("deserialize");
assert_eq!(back.label, "");
assert_eq!(back, bucket);
}
#[test]
fn phase_bucket_get_distinguishes_absent_from_zero() {
let mut metrics = BTreeMap::new();
metrics.insert("present".to_string(), 0.0);
let bucket = PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
start_ms: 0,
end_ms: 1000,
sample_count: 10,
metrics,
};
assert_eq!(bucket.get("present"), Some(0.0));
assert_eq!(bucket.get("absent"), None);
}
#[test]
fn scenario_stats_default_has_empty_phases() {
let stats = ScenarioStats::default();
assert!(stats.phases.is_empty());
assert_eq!(stats.phase(0), None);
assert_eq!(stats.phase_metric(0, "any"), None);
}
#[test]
fn scenario_stats_phase_lookup_by_step_index_not_position() {
let mut metrics_baseline = BTreeMap::new();
metrics_baseline.insert("worst_spread".to_string(), 0.10);
let mut metrics_step2 = BTreeMap::new();
metrics_step2.insert("worst_spread".to_string(), 0.42);
let stats = ScenarioStats {
phases: vec![
PhaseBucket {
step_index: 0,
label: "BASELINE".to_string(),
start_ms: 0,
end_ms: 100,
sample_count: 2,
metrics: metrics_baseline,
},
PhaseBucket {
step_index: 3,
label: "Step[2]".to_string(),
start_ms: 200,
end_ms: 300,
sample_count: 5,
metrics: metrics_step2,
},
],
..Default::default()
};
assert_eq!(stats.phase(0).map(|p| p.step_index), Some(0));
assert_eq!(stats.phase(3).map(|p| p.step_index), Some(3));
assert_eq!(stats.phase(1), None);
assert_eq!(stats.phase(2), None);
}
#[test]
fn scenario_stats_phase_metric_resolves_typed_lookup() {
let mut metrics = BTreeMap::new();
metrics.insert("worst_spread".to_string(), 0.42);
metrics.insert("dsq_depth_max".to_string(), 12.0);
let stats = ScenarioStats {
phases: vec![PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
start_ms: 100,
end_ms: 200,
sample_count: 3,
metrics,
}],
..Default::default()
};
assert_eq!(stats.phase_metric(1, "worst_spread"), Some(0.42));
assert_eq!(stats.phase_metric(1, "dsq_depth_max"), Some(12.0));
assert_eq!(stats.phase_metric(1, "absent"), None);
assert_eq!(stats.phase_metric(99, "worst_spread"), None);
}
#[test]
fn scenario_stats_step_translates_scenario_step_idx_to_phase_index() {
let stats = ScenarioStats {
phases: vec![
PhaseBucket {
step_index: 0, label: "BASELINE".to_string(),
..Default::default()
},
PhaseBucket {
step_index: 1, label: "Step[0]".to_string(),
..Default::default()
},
PhaseBucket {
step_index: 2, label: "Step[1]".to_string(),
..Default::default()
},
],
..Default::default()
};
assert_eq!(stats.step(0).map(|p| p.label.as_str()), Some("Step[0]"));
assert_eq!(stats.step(1).map(|p| p.label.as_str()), Some("Step[1]"));
assert_eq!(stats.step(99), None);
assert_eq!(stats.step(u16::MAX), None);
}
#[test]
fn scenario_stats_step_metric_resolves_scenario_indexed_lookup() {
let mut metrics = BTreeMap::new();
metrics.insert("worst_spread".to_string(), 0.42);
let stats = ScenarioStats {
phases: vec![PhaseBucket {
step_index: 1, label: "Step[0]".to_string(),
metrics,
..Default::default()
}],
..Default::default()
};
assert_eq!(stats.step_metric(0, "worst_spread"), Some(0.42));
assert_eq!(stats.step_metric(0, "absent"), None);
assert_eq!(stats.step_metric(1, "worst_spread"), None);
}
#[test]
fn scenario_stats_is_known_metric_distinguishes_typo_from_absent_data() {
assert!(ScenarioStats::is_known_metric("worst_spread"));
assert!(!ScenarioStats::is_known_metric("worts_spread"));
assert!(!ScenarioStats::is_known_metric(""));
assert!(!ScenarioStats::is_known_metric("totally_made_up"));
}
#[test]
fn scenario_stats_known_metrics_iterates_registry() {
let names: Vec<&'static str> = ScenarioStats::known_metrics().collect();
assert!(!names.is_empty(), "METRICS registry must have entries");
assert_eq!(names.len(), crate::stats::METRICS.len());
for name in names {
assert!(
ScenarioStats::is_known_metric(name),
"every known_metrics() entry must pass is_known_metric: {name}"
);
}
}
use crate::monitor::dump::{FailureDumpReport, SCHEMA_SINGLE};
use crate::scenario::sample::SampleSeries;
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
fn fixture_report() -> FailureDumpReport {
FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
..Default::default()
}
}
fn fixture_entry(tag: &str, step_index: u16, elapsed_ms: u64) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(step_index),
}
}
#[test]
fn build_phase_buckets_empty_series_yields_empty_phases() {
let samples = SampleSeries::from_drained_typed(Vec::new(), None);
let phases = crate::assert::build_phase_buckets(&samples);
assert!(
phases.is_empty(),
"empty input must yield empty phases, got {phases:?}"
);
}
#[test]
fn build_phase_buckets_baseline_only_yields_single_bucket() {
let drained = vec![
fixture_entry("periodic_000", 0, 100),
fixture_entry("periodic_001", 0, 200),
fixture_entry("periodic_002", 0, 300),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 1, "single phase observed -> single bucket");
let bucket = &phases[0];
assert_eq!(bucket.step_index, 0);
assert_eq!(bucket.label, "BASELINE");
assert_eq!(bucket.sample_count, 3);
assert_eq!(bucket.start_ms, 100);
assert_eq!(bucket.end_ms, 300);
assert!(
bucket.metrics.is_empty(),
"synthetic fixture report carries no BPF state -> metrics empty"
);
}
#[test]
fn build_phase_buckets_three_phases_round_trip_with_correct_labels() {
let drained = vec![
fixture_entry("periodic_000", 0, 10), fixture_entry("periodic_001", 0, 20), fixture_entry("periodic_002", 1, 100), fixture_entry("periodic_003", 1, 200), fixture_entry("periodic_004", 1, 300), fixture_entry("periodic_005", 2, 400), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 3);
assert_eq!(phases[0].step_index, 0);
assert_eq!(phases[0].label, "BASELINE");
assert_eq!(phases[0].sample_count, 2);
assert_eq!(phases[0].start_ms, 10);
assert_eq!(phases[0].end_ms, 20);
assert_eq!(phases[1].step_index, 1);
assert_eq!(phases[1].label, "Step[0]");
assert_eq!(phases[1].sample_count, 3);
assert_eq!(phases[1].start_ms, 100);
assert_eq!(phases[1].end_ms, 300);
assert_eq!(phases[2].step_index, 2);
assert_eq!(phases[2].label, "Step[1]");
assert_eq!(phases[2].sample_count, 1);
assert_eq!(phases[2].start_ms, 400);
assert_eq!(phases[2].end_ms, 400);
}
#[test]
fn build_phase_buckets_unstamped_samples_cluster_under_baseline() {
let unstamped = DrainedSnapshotEntry {
tag: "periodic_000".to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(50),
boundary_offset_ms: None,
step_index: None,
};
let samples = SampleSeries::from_drained_typed(vec![unstamped], None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 1);
assert_eq!(phases[0].step_index, 0);
assert_eq!(phases[0].label, "BASELINE");
assert_eq!(phases[0].sample_count, 1);
}
#[test]
fn build_phase_buckets_skipped_steps_yield_sparse_output() {
let drained = vec![
fixture_entry("periodic_000", 0, 10),
fixture_entry("periodic_001", 3, 500), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 2);
assert_eq!(phases[0].step_index, 0);
assert_eq!(phases[1].step_index, 3);
assert_eq!(phases[1].label, "Step[2]");
}
#[test]
fn build_phase_buckets_extracts_wired_metric_arms_end_to_end() {
use crate::monitor::dump::{EventCounterSample, FailureDumpReport, SCHEMA_SINGLE};
use crate::monitor::scx_walker::DsqState;
fn report_with(dsq_depths: &[u32], fallback: i64, keep_last: i64) -> FailureDumpReport {
let dsq_states = dsq_depths
.iter()
.enumerate()
.map(|(cpu, &nr)| DsqState {
id: 0,
origin: format!("local cpu {cpu}"),
nr,
seq: 0,
task_kvas: Vec::new(),
truncated: false,
})
.collect();
let event_counter_timeline = vec![EventCounterSample {
elapsed_ms: 0,
select_cpu_fallback: fallback,
dispatch_local_dsq_offline: 0,
dispatch_keep_last: keep_last,
enq_skip_exiting: 0,
enq_skip_migration_disabled: 0,
reenq_immed: 0,
reenq_local_repeat: 0,
refill_slice_dfl: 0,
bypass_duration: 0,
bypass_dispatch: 0,
bypass_activate: 0,
insert_not_owned: 0,
sub_bypass_dispatch: 0,
}];
FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
dsq_states,
event_counter_timeline,
..Default::default()
}
}
fn entry_with(
tag: &str,
step_index: u16,
elapsed_ms: u64,
dsq_depths: &[u32],
fallback: i64,
keep_last: i64,
) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: report_with(dsq_depths, fallback, keep_last),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(step_index),
}
}
let drained = vec![
entry_with("periodic_000", 0, 10, &[5, 3], 10, 20),
entry_with("periodic_001", 0, 20, &[4, 8], 15, 30),
entry_with("periodic_002", 1, 100, &[12, 7], 18, 35),
entry_with("periodic_003", 1, 200, &[9, 11], 25, 42),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 2, "BASELINE + Step[0] -> 2 buckets");
let baseline = &phases[0];
assert_eq!(baseline.step_index, 0);
assert_eq!(
baseline.metrics.get("max_dsq_depth").copied(),
Some(8.0),
"BASELINE max_dsq_depth: Peak reduction over per-sample [5, 8] yields max 8"
);
assert_eq!(
baseline.metrics.get("total_fallback").copied(),
Some(5.0),
"BASELINE total_fallback: Counter delta 15 - 10 = 5"
);
assert_eq!(
baseline.metrics.get("total_keep_last").copied(),
Some(10.0),
"BASELINE total_keep_last: Counter delta 30 - 20 = 10"
);
let step0 = &phases[1];
assert_eq!(step0.step_index, 1);
assert_eq!(step0.label, "Step[0]");
assert_eq!(
step0.metrics.get("max_dsq_depth").copied(),
Some(12.0),
"Step[0] max_dsq_depth: Peak max of [12, 11] = 12"
);
assert_eq!(
step0.metrics.get("total_fallback").copied(),
Some(7.0),
"Step[0] total_fallback: Counter delta 25 - 18 = 7"
);
assert_eq!(
step0.metrics.get("total_keep_last").copied(),
Some(7.0),
"Step[0] total_keep_last: Counter delta 42 - 35 = 7"
);
for host_only in [
"worst_spread",
"worst_gap_ms",
"worst_migration_ratio",
"max_imbalance_ratio",
"worst_p99_wake_latency_us",
"worst_iterations_per_worker",
"worst_page_locality",
] {
assert!(
!baseline.metrics.contains_key(host_only),
"BASELINE must not carry host-only metric {host_only}"
);
assert!(
!step0.metrics.contains_key(host_only),
"Step[0] must not carry host-only metric {host_only}"
);
}
}
#[test]
fn periodic_only_excludes_off_cadence_captures_from_phase_buckets() {
use crate::monitor::dump::{FailureDumpReport, SCHEMA_SINGLE};
use crate::monitor::scx_walker::DsqState;
fn entry(tag: &str, elapsed_ms: u64, dsq_depth: u32) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
dsq_states: vec![DsqState {
id: 0,
origin: "local cpu 0".to_string(),
nr: dsq_depth,
seq: 0,
task_kvas: Vec::new(),
truncated: false,
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 5),
entry("periodic_001", 200, 8),
entry("ondemand_000", 300, 1000),
];
let series = SampleSeries::from_drained_typed(drained, None);
let full = crate::assert::build_phase_buckets(&series);
assert_eq!(
full.iter()
.find(|p| p.step_index == 1)
.expect("Step[0]")
.metrics
.get("max_dsq_depth")
.copied(),
Some(1000.0),
"full series includes the off-cadence capture's dsq depth",
);
let periodic = crate::assert::build_phase_buckets(&series.clone().periodic_only());
assert_eq!(
periodic
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0]")
.metrics
.get("max_dsq_depth")
.copied(),
Some(8.0),
"periodic_only excludes the off-cadence outlier: max is 8, NOT 1000",
);
}
#[test]
fn build_phase_buckets_injects_per_group_cpu_time_delta() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn task(tgid: i32, utime: u64, stime: u64) -> TaskEnrichment {
TaskEnrichment {
pid: tgid,
tgid,
utime,
stime,
signal_utime: Some(0),
signal_stime: Some(0),
..Default::default()
}
}
fn entry(
tag: &str,
step_index: u16,
elapsed_ms: u64,
tasks: Vec<TaskEnrichment>,
) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: tasks,
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(step_index),
}
}
let drained = vec![
entry("periodic_000", 0, 10, vec![task(100, 2000, 1000)]),
entry("periodic_001", 1, 100, vec![task(100, 2000, 1000)]),
entry(
"periodic_002",
1,
200,
vec![task(100, 9000, 4000), task(200, 2_000_000, 1_000_000)],
),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 2, "BASELINE + Step[0]");
let baseline = &phases[0];
assert_eq!(baseline.step_index, 0);
assert!(
!baseline.metrics.contains_key("system_time_ns")
&& !baseline.metrics.contains_key("user_time_ns"),
"single enriched sample -> CPU-time key omitted (absent != real 0)",
);
let step0 = &phases[1];
assert_eq!(step0.step_index, 1);
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(3000.0),
"system delta = tgid100 (4000-1000) + tgid200 (single-appearance \
-> 0); NOT 1_003_000 (sum-then-delta inflation)",
);
assert_eq!(
step0.metrics.get("user_time_ns").copied(),
Some(7000.0),
"user delta = tgid100 (9000-2000); tgid200 single-appearance -> 0",
);
}
#[test]
fn build_phase_buckets_cpu_time_includes_signal_accumulator() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn entry(
tag: &str,
elapsed_ms: u64,
stime: u64,
signal_stime: Option<u64>,
) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![TaskEnrichment {
pid: 100,
tgid: 100,
stime,
signal_stime,
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 1000, Some(0)),
entry("periodic_001", 200, 2000, Some(3000)),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(4000.0),
"signal accumulator (3000) folds into the group total: \
(2000+3000) - (1000+0) = 4000",
);
}
#[test]
fn build_phase_buckets_cpu_time_excludes_group_with_unreadable_signal() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn t(tgid: i32, stime: u64, signal_stime: Option<u64>) -> TaskEnrichment {
TaskEnrichment {
pid: tgid,
tgid,
stime,
signal_stime,
..Default::default()
}
}
fn entry(tag: &str, elapsed_ms: u64, tasks: Vec<TaskEnrichment>) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: tasks,
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry(
"periodic_000",
100,
vec![t(100, 1000, None), t(200, 1000, Some(0))],
),
entry(
"periodic_001",
200,
vec![t(100, 1000, Some(5_000_000)), t(200, 3000, Some(0))],
),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(2000.0),
"only tgid200 (readable at both) contributes (3000-1000=2000); \
tgid100's unreadable-signal first endpoint excludes it — the 5e6 \
accumulator must NOT leak as a phantom positive",
);
}
#[test]
fn build_phase_buckets_cpu_time_clamps_counter_decrease() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn entry(tag: &str, elapsed_ms: u64, stime: u64) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![TaskEnrichment {
pid: 100,
tgid: 100,
stime,
signal_stime: Some(0),
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 5000),
entry("periodic_001", 200, 1000),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(0.0),
"a last < first read clamps to 0 (qualifying group, counters did \
not advance) — a real Some(0.0), never a wrapped huge value",
);
}
#[test]
fn build_phase_buckets_cpu_time_disjoint_groups_yield_absent_not_zero() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn entry(tag: &str, elapsed_ms: u64, tgid: i32) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![TaskEnrichment {
pid: tgid,
tgid,
stime: 1000,
signal_stime: Some(0),
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 100),
entry("periodic_001", 200, 200),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert!(
!step0.metrics.contains_key("system_time_ns"),
"disjoint groups -> no group spans two readable samples -> absent, \
not a sentinel Some(0.0)",
);
}
#[test]
fn build_phase_buckets_cpu_time_unanchored_sample_sorts_last_not_first() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn task(stime: u64) -> TaskEnrichment {
TaskEnrichment {
pid: 100,
tgid: 100,
stime,
signal_stime: Some(0),
..Default::default()
}
}
fn entry(tag: &str, elapsed_ms: Option<u64>, stime: u64) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![task(stime)],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms,
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("on_demand_000", None, 3000),
entry("periodic_001", Some(100), 1000),
entry("periodic_002", Some(200), 3000),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(2000.0),
"unanchored sample sorts LAST -> first_seen from the earliest \
timed sample (1000), delta = 2000; if it sorted first the delta \
would clamp to 0",
);
}
#[test]
fn build_phase_buckets_all_unanchored_phase_yields_inverted_window() {
fn unanchored(tag: &str) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: None,
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![unanchored("on_demand_000"), unanchored("on_demand_001")];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("phase bucket present even when every sample is unanchored");
assert_eq!(step0.sample_count, 2, "both unanchored samples counted");
assert_eq!(
(step0.start_ms, step0.end_ms),
(u64::MAX, 0),
"all-unanchored phase -> inverted window that folds nothing, \
not a (0, x) window anchored at the run start",
);
}
#[test]
fn build_phase_buckets_integration_with_scenario_stats_phase_accessor() {
let drained = vec![
fixture_entry("periodic_000", 0, 10),
fixture_entry("periodic_001", 1, 100),
fixture_entry("periodic_002", 2, 200),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let stats = ScenarioStats {
phases,
..Default::default()
};
assert_eq!(stats.phase(0).map(|p| p.label.as_str()), Some("BASELINE"));
assert_eq!(stats.phase(1).map(|p| p.label.as_str()), Some("Step[0]"));
assert_eq!(stats.phase(2).map(|p| p.label.as_str()), Some("Step[1]"));
assert_eq!(stats.phase(3), None);
assert_eq!(stats.step(0).map(|p| p.label.as_str()), Some("Step[0]"));
assert_eq!(stats.step(1).map(|p| p.label.as_str()), Some("Step[1]"));
}
#[test]
fn phase_baseline_const_is_u16_zero() {
assert_eq!(crate::assert::Phase::BASELINE.as_u16(), 0);
assert!(crate::assert::Phase::BASELINE.is_baseline());
}
#[test]
fn phase_step_zero_indexed_constructor_encodes_1_indexed() {
assert_eq!(crate::assert::Phase::step(0).as_u16(), 1);
assert_eq!(crate::assert::Phase::step(1).as_u16(), 2);
assert_eq!(crate::assert::Phase::step(7).as_u16(), 8);
assert!(!crate::assert::Phase::step(0).is_baseline());
}
#[test]
fn phase_step_saturating_at_u16_max_does_not_overflow() {
let saturated = crate::assert::Phase::step(u16::MAX - 1);
assert_eq!(saturated.as_u16(), u16::MAX);
let still_saturated = crate::assert::Phase::step(u16::MAX);
assert_eq!(still_saturated.as_u16(), u16::MAX);
assert!(
!saturated.is_baseline(),
"saturating MUST NOT collide with BASELINE"
);
}
#[test]
fn phase_display_baseline_step_format() {
assert_eq!(format!("{}", crate::assert::Phase::BASELINE), "BASELINE");
assert_eq!(format!("{}", crate::assert::Phase::step(0)), "Step[0]");
assert_eq!(format!("{}", crate::assert::Phase::step(7)), "Step[7]");
}
#[test]
fn phase_serde_transparent_round_trips_as_bare_u16() {
let phase = crate::assert::Phase::step(4);
let json = serde_json::to_string(&phase).unwrap();
assert_eq!(
json, "5",
"wire format must be the inner 1-indexed u16, not a tagged struct"
);
let round_tripped: crate::assert::Phase = serde_json::from_str(&json).unwrap();
assert_eq!(round_tripped, phase);
let from_raw: crate::assert::Phase = serde_json::from_str("0").unwrap();
assert_eq!(from_raw, crate::assert::Phase::BASELINE);
}
#[test]
fn phase_from_u16_wraps_raw_value() {
let from: crate::assert::Phase = 3u16.into();
assert_eq!(from.as_u16(), 3);
let to: u16 = crate::assert::Phase::step(2).into();
assert_eq!(to, 3);
}
#[test]
fn scenario_stats_has_steps_false_for_empty_phases() {
let stats = ScenarioStats::default();
assert!(!stats.has_steps());
}
#[test]
fn scenario_stats_has_steps_false_when_only_baseline() {
let stats = ScenarioStats {
phases: vec![crate::assert::PhaseBucket {
step_index: 0,
label: "BASELINE".to_string(),
..Default::default()
}],
..Default::default()
};
assert!(
!stats.has_steps(),
"BASELINE-only must NOT count as 'has steps'"
);
}
#[test]
fn scenario_stats_has_steps_true_when_any_step_phase_present() {
let stats = ScenarioStats {
phases: vec![
crate::assert::PhaseBucket {
step_index: 0,
label: "BASELINE".to_string(),
..Default::default()
},
crate::assert::PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
..Default::default()
},
],
..Default::default()
};
assert!(stats.has_steps());
}
#[test]
#[should_panic(expected = "metric 'missing' absent from phase step_index=1")]
fn phase_bucket_expect_metric_panics_with_diagnostic_when_absent() {
let bucket = crate::assert::PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
sample_count: 3,
metrics: std::collections::BTreeMap::from([("throughput".to_string(), 42.0)]),
..Default::default()
};
bucket.expect_metric("missing");
}
#[test]
fn phase_bucket_expect_metric_returns_value_when_present() {
let bucket = crate::assert::PhaseBucket {
metrics: std::collections::BTreeMap::from([("throughput".to_string(), 42.5)]),
..Default::default()
};
assert_eq!(bucket.expect_metric("throughput"), 42.5);
}
#[test]
fn phase_guard_outside_scope_returns_none() {
assert!(crate::assert::current_phase_label().is_none());
let d = crate::assert::AssertDetail::new(crate::assert::DetailKind::Other, "no guard");
assert!(
d.phase.is_none(),
"AssertDetail constructed outside any PhaseGuard must stamp phase=None"
);
}
#[test]
fn phase_guard_install_step_sets_active_label() {
let _g = crate::assert::PhaseGuard::install_step(0);
assert_eq!(
crate::assert::current_phase_label().as_deref(),
Some("Step[0]"),
);
let d = crate::assert::AssertDetail::new(crate::assert::DetailKind::Other, "under Step[0]");
assert_eq!(d.phase.as_deref(), Some("Step[0]"));
}
#[test]
fn phase_guard_install_baseline_sets_active_label() {
let _g = crate::assert::PhaseGuard::install_baseline();
assert_eq!(
crate::assert::current_phase_label().as_deref(),
Some("BASELINE"),
);
}
#[test]
fn phase_guard_drop_restores_prior_label() {
{
let _outer = crate::assert::PhaseGuard::install_step(0); assert_eq!(
crate::assert::current_phase_label().as_deref(),
Some("Step[0]"),
);
{
let _inner = crate::assert::PhaseGuard::install_step(2); assert_eq!(
crate::assert::current_phase_label().as_deref(),
Some("Step[2]"),
);
} assert_eq!(
crate::assert::current_phase_label().as_deref(),
Some("Step[0]"),
"inner guard's Drop must restore the outer guard's label",
);
} assert!(
crate::assert::current_phase_label().is_none(),
"outermost guard's Drop must restore None",
);
}
#[test]
fn phase_guard_passdetail_binary_auto_stamps() {
let _g = crate::assert::PhaseGuard::install_step(1);
let p = crate::assert::PassDetail::binary("metric", "ge", "10.0", "5.0");
assert_eq!(p.phase.as_deref(), Some("Step[1]"));
}
#[test]
fn phase_guard_passdetail_unary_auto_stamps() {
let _g = crate::assert::PhaseGuard::install_step(2);
let p = crate::assert::PassDetail::unary("metric", "is_finite", "42.0");
assert_eq!(p.phase.as_deref(), Some("Step[2]"));
}
#[test]
fn phase_guard_infonote_auto_stamps() {
let _g = crate::assert::PhaseGuard::install_baseline();
let n = crate::assert::InfoNote::new("settle observed");
assert_eq!(n.phase.as_deref(), Some("BASELINE"));
}
#[test]
fn phase_guard_with_phase_builder_overrides_auto_stamp() {
let _g = crate::assert::PhaseGuard::install_step(0); let d = crate::assert::AssertDetail::new(crate::assert::DetailKind::Other, "override")
.with_phase("explicit_override");
assert_eq!(
d.phase.as_deref(),
Some("explicit_override"),
"with_phase builder must override the auto-stamp default",
);
}
#[test]
fn populate_run_ext_metrics_empty_series_inserts_nothing() {
let samples = SampleSeries::from_drained_typed(Vec::new(), None);
let mut target = std::collections::BTreeMap::new();
crate::assert::populate_run_ext_metrics(&samples, &mut target);
assert!(
target.is_empty(),
"no input samples must produce no ext_metrics entries, got {target:?}",
);
}
#[test]
fn populate_run_ext_metrics_does_not_overwrite_existing_keys() {
let samples = SampleSeries::from_drained_typed(Vec::new(), None);
let mut target = std::collections::BTreeMap::new();
target.insert("avg_dsq_depth".to_string(), 42.0);
crate::assert::populate_run_ext_metrics(&samples, &mut target);
assert_eq!(
target.get("avg_dsq_depth").copied(),
Some(42.0),
"existing key must survive populate_run_ext_metrics",
);
}
#[test]
fn build_phase_buckets_avg_imbalance_ratio_from_monitor_samples() {
use crate::monitor::{CpuSnapshot, MonitorReport, MonitorSample};
let cpu = |nr: u32| CpuSnapshot {
nr_running: nr,
..Default::default()
};
let mon = MonitorReport {
samples: vec![
MonitorSample::new(50, vec![cpu(2), cpu(2)]),
MonitorSample::new(100, vec![cpu(4), cpu(2)]),
MonitorSample::new(200, vec![cpu(6), cpu(2)]),
],
..Default::default()
};
let drained = vec![
fixture_entry("periodic_000", 1, 50),
fixture_entry("periodic_001", 1, 250),
];
let samples = SampleSeries::from_drained_typed(drained, Some(mon));
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 1, "single phase from two same-step samples");
let step0 = &phases[0];
let avg = step0
.metrics
.get("avg_imbalance_ratio")
.copied()
.expect("avg_imbalance_ratio must be populated from MonitorSamples");
assert!(
(avg - 2.0).abs() < f64::EPSILON,
"expected mean = 2.0, got {avg}",
);
}
#[test]
fn build_phase_buckets_avg_imbalance_excludes_out_of_window_monitor_samples() {
use crate::monitor::{CpuSnapshot, MonitorReport, MonitorSample};
let cpu = |nr: u32| CpuSnapshot {
nr_running: nr,
..Default::default()
};
let mon = MonitorReport {
samples: vec![
MonitorSample::new(100, vec![cpu(4), cpu(2)]),
MonitorSample::new(150, vec![cpu(4), cpu(2)]),
MonitorSample::new(200, vec![cpu(4), cpu(2)]),
MonitorSample::new(9999, vec![cpu(100), cpu(2)]),
],
..Default::default()
};
let drained = vec![
fixture_entry("periodic_000", 1, 100),
fixture_entry("periodic_001", 1, 200),
];
let samples = SampleSeries::from_drained_typed(drained, Some(mon));
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = &phases[0];
let avg = step0
.metrics
.get("avg_imbalance_ratio")
.copied()
.expect("avg_imbalance_ratio populated");
assert!(
(avg - 2.0).abs() < f64::EPSILON,
"out-of-window sample must not contaminate in-window mean (got {avg})",
);
}
#[test]
fn build_phase_buckets_avg_dsq_depth_from_snapshot_dsq_states() {
use crate::monitor::dump::FailureDumpReport;
use crate::monitor::scx_walker::DsqState;
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
let mk_entry = |tag: &str, ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
dsq_states: vec![
DsqState {
origin: "local cpu 0".to_string(),
nr: 2,
..Default::default()
},
DsqState {
origin: "local cpu 1".to_string(),
nr: 4,
..Default::default()
},
DsqState {
origin: "local cpu 2".to_string(),
nr: 6,
..Default::default()
},
],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(ms),
boundary_offset_ms: None,
step_index: Some(1),
};
let drained = vec![mk_entry("periodic_000", 100), mk_entry("periodic_001", 200)];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
let avg = step0
.metrics
.get("avg_dsq_depth")
.copied()
.expect("avg_dsq_depth populated from local-cpu DSQ states");
assert!(
(avg - 4.0).abs() < f64::EPSILON,
"expected per-phase avg of mean(2,4,6)=4.0, got {avg}",
);
let max = step0
.metrics
.get("max_dsq_depth")
.copied()
.expect("max_dsq_depth populated alongside avg");
assert!(
(max - 6.0).abs() < f64::EPSILON,
"expected max=6.0, got {max}"
);
}
#[test]
fn build_phase_buckets_with_stimulus_populates_iteration_rate() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk_entry = |tag: &str, step: u16, ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(ms),
boundary_offset_ms: None,
step_index: Some(step),
};
let drained = vec![
mk_entry("periodic_000", 1, 100),
mk_entry("periodic_001", 1, 1100),
mk_entry("periodic_002", 2, 1100),
mk_entry("periodic_003", 2, 2100),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let stimulus = vec![
StimulusEvent {
elapsed_ms: 100,
label: "Step[0]".to_string(),
op_kind: None,
detail: None,
total_iterations: Some(0),
step_index: None,
is_terminal: false,
is_step_end: false,
},
StimulusEvent {
elapsed_ms: 1100,
label: "Step[1]".to_string(),
op_kind: None,
detail: None,
total_iterations: Some(1000),
step_index: None,
is_terminal: false,
is_step_end: false,
},
StimulusEvent {
elapsed_ms: 2100,
label: "end".to_string(),
op_kind: None,
detail: None,
total_iterations: Some(3000),
step_index: None,
is_terminal: false,
is_step_end: false,
},
];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 2)
.expect("Step[1] bucket present");
let rate = step1
.metrics
.get("iteration_rate")
.copied()
.expect("iteration_rate populated for Step[1]");
assert!(
(rate - 2000.0).abs() < f64::EPSILON,
"expected iteration_rate=2000.0 iter/s, got {rate}",
);
}
#[test]
fn build_phase_buckets_with_stimulus_remaps_by_boundary_offset_over_stamped_step() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(3),
};
let drained = vec![
mk("periodic_base", 500), mk("periodic_000", 1_500), mk("periodic_001", 2_500), mk("periodic_002", 3_500), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let stim = |elapsed_ms: u64, k: u16| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: None,
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let stimulus = vec![stim(1000, 1), stim(2000, 2), stim(3000, 3)];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let idxs: Vec<u16> = phases.iter().map(|p| p.step_index).collect();
assert_eq!(
idxs,
vec![0, 1, 2, 3],
"boundary_offset_ms must drive grouping (BASELINE + one capture \
per step), NOT the uniformly-wrong stamped step_index=3 which \
would collapse all four into a single bucket; got {idxs:?}",
);
for p in &phases {
assert_eq!(
p.sample_count, 1,
"each remapped bucket holds exactly its one scheduled capture; \
step_index={} count={}",
p.step_index, p.sample_count,
);
}
}
#[test]
fn by_stimulus_phase_separates_what_by_stamped_phase_collapses() {
use crate::scenario::sample::SampleSeries;
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(3),
};
let drained = vec![
mk("p0", 500), mk("p1", 1_500), mk("p2", 2_500), mk("p3", 3_500), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let stim = |elapsed_ms: u64, k: u16| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: None,
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let stimulus = vec![stim(1000, 1), stim(2000, 2), stim(3000, 3)];
let stamped = samples.by_stamped_phase();
assert_eq!(stamped.keys().copied().collect::<Vec<_>>(), vec![3]);
assert_eq!(stamped[&3].len(), 4, "stamped grouping collapses the burst");
let by_stim = samples.by_stimulus_phase(&stimulus);
assert_eq!(
by_stim.keys().copied().collect::<Vec<_>>(),
vec![0, 1, 2, 3]
);
for k in [0u16, 1, 2, 3] {
assert_eq!(
by_stim[&k].len(),
1,
"phase {k} should hold exactly one sample"
);
}
}
#[test]
fn build_phase_buckets_with_stimulus_none_offset_falls_back_to_stamped_step() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, step: u16| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(100),
boundary_offset_ms: None,
step_index: Some(step),
};
let drained = vec![mk("periodic_000", 1), mk("periodic_001", 2)];
let samples = SampleSeries::from_drained_typed(drained, None);
let stimulus = vec![StimulusEvent {
elapsed_ms: 0,
label: "StepStart[5]".to_string(),
op_kind: None,
detail: None,
total_iterations: None,
step_index: Some(5),
is_terminal: false,
is_step_end: false,
}];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let idxs: Vec<u16> = phases.iter().map(|p| p.step_index).collect();
assert_eq!(
idxs,
vec![1, 2],
"None-offset captures keep their stamped step_index (1, 2), not \
remapped to the step-5 stimulus; got {idxs:?}",
);
}
#[test]
fn build_phase_buckets_with_stimulus_iteration_rate_attaches_by_step_not_interior_window() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![
mk("periodic_000", 1_500), mk("periodic_001", 2_500), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let stim = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let stimulus = vec![stim(1000, 1, 0), stim(2000, 2, 1000), stim(3000, 3, 3000)];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step1.metrics.get("iteration_rate").copied(),
Some(1000.0),
"step 1 iteration_rate must attach by step_index to the interior \
bucket; got {:?} (start_ms={}, end_ms={})",
step1.metrics.get("iteration_rate"),
step1.start_ms,
step1.end_ms,
);
let step2 = phases
.iter()
.find(|p| p.step_index == 2)
.expect("Step[1] bucket present");
assert_eq!(
step2.metrics.get("iteration_rate").copied(),
Some(2000.0),
"step 2 iteration_rate must attach by step_index; got {:?}",
step2.metrics.get("iteration_rate"),
);
}
#[test]
fn build_phase_buckets_with_stimulus_pairs_step_local_for_respawned_workers() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![
mk("periodic_000", 1_500), mk("periodic_001", 2_500), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let start = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let end = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepEnd[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: true,
};
let stimulus = vec![
start(1_000, 1, 0),
end(2_000, 1, 10_000),
start(2_100, 2, 0),
end(3_100, 2, 5_000),
StimulusEvent::terminal(3_200, 5_000),
];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("step 1 bucket present");
let step2 = phases
.iter()
.find(|p| p.step_index == 2)
.expect("step 2 bucket present");
assert_eq!(
step1.metrics.get("iteration_rate").copied(),
Some(10_000.0),
"step 1 must report its step-local StepStart->StepEnd rate, not the \
cross-step 0->0 delta (the old cross-step silent None); got {:?}",
step1.metrics.get("iteration_rate"),
);
assert_eq!(
step2.metrics.get("iteration_rate").copied(),
Some(5_000.0),
"step 2 (respawned workers) must report its step-local rate; got {:?}",
step2.metrics.get("iteration_rate"),
);
}
#[test]
fn build_phase_buckets_with_stimulus_step_local_wins_over_persistent_cross_step() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![mk("periodic_000", 1_500), mk("periodic_001", 2_500)];
let samples = SampleSeries::from_drained_typed(drained, None);
let start = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let end = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepEnd[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: true,
};
let stimulus = vec![
start(1_000, 1, 0),
end(2_000, 1, 10_000),
start(2_100, 2, 10_500),
end(3_100, 2, 15_500),
StimulusEvent::terminal(3_200, 15_500),
];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("step 1 bucket present");
let step2 = phases
.iter()
.find(|p| p.step_index == 2)
.expect("step 2 bucket present");
assert_eq!(
step1.metrics.get("iteration_rate").copied(),
Some(10_000.0),
"step-local rate must win over the 5000/s persistent cross-step \
delta (the is_step_end guard skips the cross-step pair); got {:?}",
step1.metrics.get("iteration_rate"),
);
assert_eq!(
step2.metrics.get("iteration_rate").copied(),
Some(5_000.0),
"step 2 must report its own start-to-end-of-hold rate; got {:?}",
step2.metrics.get("iteration_rate"),
);
}
#[test]
fn build_phase_buckets_with_stimulus_stalled_step_reports_measured_zero() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![mk("periodic_000", 1_500), mk("periodic_001", 2_500)];
let samples = SampleSeries::from_drained_typed(drained, None);
let start = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let end = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepEnd[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: true,
};
let stimulus = vec![
start(1_000, 1, 0),
end(2_000, 1, 0),
start(2_100, 2, 500),
end(3_100, 2, 1_500),
StimulusEvent::terminal(3_200, 1_500),
];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("step 1 bucket present");
let step2 = phases
.iter()
.find(|p| p.step_index == 2)
.expect("step 2 bucket present");
assert_eq!(
step1.metrics.get("iteration_rate").copied(),
Some(0.0),
"a stalled step reports measured-zero throughput, not the leaked \
5000/s teardown gap rate from the StepEnd[1] -> StepStart[2] pair; \
got {:?}",
step1.metrics.get("iteration_rate"),
);
assert_eq!(
step2.metrics.get("iteration_rate").copied(),
Some(1_000.0),
"step 2 still reports its own step-local rate; got {:?}",
step2.metrics.get("iteration_rate"),
);
}
#[test]
fn build_phase_buckets_with_stimulus_terminal_gives_last_step_rate_no_phantom_bucket() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![mk("periodic_000", 1_500), mk("periodic_001", 2_500)];
let samples = SampleSeries::from_drained_typed(drained, None);
let stim = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let stimulus = vec![
stim(1000, 1, 0),
stim(2000, 2, 1000),
StimulusEvent::terminal(3000, 3000),
];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let idxs: Vec<u16> = phases.iter().map(|p| p.step_index).collect();
assert_eq!(
idxs,
vec![1, 2],
"terminal must not add a phantom bucket; got {idxs:?}",
);
let step2 = phases
.iter()
.find(|p| p.step_index == 2)
.expect("step 2 bucket present");
assert_eq!(
step2.metrics.get("iteration_rate").copied(),
Some(2000.0),
"last step's iteration_rate must come from the terminal boundary",
);
}
#[test]
fn build_phase_buckets_with_stimulus_first_step_zero_baseline_from_wire() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![mk("periodic_000", 1_500), mk("periodic_001", 2_500)];
let samples = SampleSeries::from_drained_typed(drained, None);
let wire = |elapsed_ms: u32, step_index: u16, iters: u64| crate::vmm::wire::StimulusEvent {
elapsed_ms,
step_index,
op_count: 0,
op_kinds: 0,
cgroup_count: 0,
worker_count: 1,
total_iterations: iters,
};
let stimulus: Vec<StimulusEvent> = [wire(1000, 1, 0), wire(2000, 2, 2000)]
.iter()
.map(StimulusEvent::from_wire)
.collect();
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("step 1 bucket present");
assert_eq!(
step1.metrics.get("iteration_rate").copied(),
Some(2000.0),
"first step's iteration_rate must compute from the 0 baseline",
);
}
#[test]
fn build_phase_buckets_with_stimulus_loop_step_rate_no_prior_graft() {
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
use crate::timeline::StimulusEvent;
let mk = |tag: &str, offset_ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(9_000),
boundary_offset_ms: Some(offset_ms),
step_index: Some(9),
};
let drained = vec![mk("periodic_000", 1_500), mk("periodic_001", 2_500)];
let samples = SampleSeries::from_drained_typed(drained, None);
let stim = |elapsed_ms: u64, k: u16, iters: u64| StimulusEvent {
elapsed_ms,
label: format!("StepStart[{k}]"),
op_kind: None,
detail: None,
total_iterations: Some(iters),
step_index: Some(k),
is_terminal: false,
is_step_end: false,
};
let stimulus = vec![
stim(1000, 1, 0),
stim(2000, 2, 1000),
StimulusEvent::terminal(3000, 3000),
];
let phases = crate::assert::build_phase_buckets_with_stimulus(&samples, &stimulus);
let step1 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("step 1 bucket");
let loop_step = phases
.iter()
.find(|p| p.step_index == 2)
.expect("loop step bucket");
assert_eq!(
step1.metrics.get("iteration_rate").copied(),
Some(1000.0),
"prior step must not absorb the loop step's window",
);
assert_eq!(
loop_step.metrics.get("iteration_rate").copied(),
Some(2000.0),
"loop step must get its own throughput from its start frame + terminal",
);
}
#[test]
fn populate_run_ext_metrics_populated_series_inserts_expected_keys() {
use crate::monitor::dump::FailureDumpReport;
use crate::monitor::scx_walker::DsqState;
use crate::scenario::snapshot::{DrainedSnapshotEntry, MissingStatsReason};
let mk_entry = |tag: &str, ms: u64| DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
dsq_states: vec![DsqState {
origin: "local cpu 0".to_string(),
nr: 5,
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(ms),
boundary_offset_ms: None,
step_index: Some(0),
};
let drained = vec![mk_entry("periodic_000", 100), mk_entry("periodic_001", 200)];
let samples = SampleSeries::from_drained_typed(drained, None);
let mut target = std::collections::BTreeMap::new();
crate::assert::populate_run_ext_metrics(&samples, &mut target);
let avg = target
.get("avg_dsq_depth")
.copied()
.expect("avg_dsq_depth populated for populated series");
assert!(
(avg - 5.0).abs() < f64::EPSILON,
"expected avg_dsq_depth=5.0, got {avg}",
);
assert!(
!target.contains_key("max_dsq_depth"),
"max_dsq_depth has a typed GauntletRow field; must not leak into ext_metrics",
);
}
#[test]
fn populate_run_ext_metrics_from_phases_folds_per_phase_keys() {
use crate::assert::PhaseBucket;
use std::collections::BTreeMap;
let mut m0 = BTreeMap::new();
m0.insert("avg_imbalance_ratio".to_string(), 2.0);
let mut m1 = BTreeMap::new();
m1.insert("avg_imbalance_ratio".to_string(), 4.0);
let phases = vec![
PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
start_ms: 0,
end_ms: 100,
sample_count: 5,
metrics: m0,
},
PhaseBucket {
step_index: 2,
label: "Step[1]".to_string(),
start_ms: 100,
end_ms: 200,
sample_count: 15,
metrics: m1,
},
];
let mut target = BTreeMap::new();
crate::assert::populate_run_ext_metrics_from_phases(&phases, &mut target);
let avg = target
.get("avg_imbalance_ratio")
.copied()
.expect("avg_imbalance_ratio folded from per-phase");
assert!(
(avg - 3.5).abs() < f64::EPSILON,
"expected weighted mean 3.5, got {avg}",
);
}