use super::*;
#[test]
fn build_phase_buckets_empty_series_yields_empty_phases() {
let samples = SampleSeries::from_drained_typed(Vec::new(), None);
let phases = crate::assert::build_phase_buckets(&samples);
assert!(
phases.is_empty(),
"empty input must yield empty phases, got {phases:?}"
);
}
#[test]
fn build_phase_buckets_baseline_only_yields_single_bucket() {
let drained = vec![
fixture_entry("periodic_000", 0, 100),
fixture_entry("periodic_001", 0, 200),
fixture_entry("periodic_002", 0, 300),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 1, "single phase observed -> single bucket");
let bucket = &phases[0];
assert_eq!(bucket.step_index, 0);
assert_eq!(bucket.label, "BASELINE");
assert_eq!(bucket.sample_count, 3);
assert_eq!(bucket.start_ms, 100);
assert_eq!(bucket.end_ms, 300);
assert!(
bucket.metrics.is_empty(),
"synthetic fixture report carries no BPF state -> metrics empty"
);
}
#[test]
fn build_phase_buckets_three_phases_round_trip_with_correct_labels() {
let drained = vec![
fixture_entry("periodic_000", 0, 10), fixture_entry("periodic_001", 0, 20), fixture_entry("periodic_002", 1, 100), fixture_entry("periodic_003", 1, 200), fixture_entry("periodic_004", 1, 300), fixture_entry("periodic_005", 2, 400), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 3);
assert_eq!(phases[0].step_index, 0);
assert_eq!(phases[0].label, "BASELINE");
assert_eq!(phases[0].sample_count, 2);
assert_eq!(phases[0].start_ms, 10);
assert_eq!(phases[0].end_ms, 20);
assert_eq!(phases[1].step_index, 1);
assert_eq!(phases[1].label, "Step[0]");
assert_eq!(phases[1].sample_count, 3);
assert_eq!(phases[1].start_ms, 100);
assert_eq!(phases[1].end_ms, 300);
assert_eq!(phases[2].step_index, 2);
assert_eq!(phases[2].label, "Step[1]");
assert_eq!(phases[2].sample_count, 1);
assert_eq!(phases[2].start_ms, 400);
assert_eq!(phases[2].end_ms, 400);
}
#[test]
fn build_phase_buckets_unstamped_samples_cluster_under_baseline() {
let unstamped = DrainedSnapshotEntry {
tag: "periodic_000".to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(50),
boundary_offset_ms: None,
step_index: None,
};
let samples = SampleSeries::from_drained_typed(vec![unstamped], None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 1);
assert_eq!(phases[0].step_index, 0);
assert_eq!(phases[0].label, "BASELINE");
assert_eq!(phases[0].sample_count, 1);
}
#[test]
fn build_phase_buckets_skipped_steps_yield_sparse_output() {
let drained = vec![
fixture_entry("periodic_000", 0, 10),
fixture_entry("periodic_001", 3, 500), ];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 2);
assert_eq!(phases[0].step_index, 0);
assert_eq!(phases[1].step_index, 3);
assert_eq!(phases[1].label, "Step[2]");
}
#[test]
fn build_phase_buckets_extracts_wired_metric_arms_end_to_end() {
use crate::monitor::dump::{EventCounterSample, FailureDumpReport, SCHEMA_SINGLE};
use crate::monitor::scx_walker::DsqState;
fn report_with(dsq_depths: &[u32], fallback: i64, keep_last: i64) -> FailureDumpReport {
let dsq_states = dsq_depths
.iter()
.enumerate()
.map(|(cpu, &nr)| DsqState {
id: 0,
origin: format!("local cpu {cpu}"),
nr,
seq: 0,
task_kvas: Vec::new(),
truncated: false,
})
.collect();
let event_counter_timeline = vec![EventCounterSample {
elapsed_ms: 0,
select_cpu_fallback: fallback,
dispatch_local_dsq_offline: 0,
dispatch_keep_last: keep_last,
enq_skip_exiting: 0,
enq_skip_migration_disabled: 0,
reenq_immed: 0,
reenq_local_repeat: 0,
refill_slice_dfl: 0,
bypass_duration: 0,
bypass_dispatch: 0,
bypass_activate: 0,
insert_not_owned: 0,
sub_bypass_dispatch: 0,
}];
FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
dsq_states,
event_counter_timeline,
..Default::default()
}
}
fn entry_with(
tag: &str,
step_index: u16,
elapsed_ms: u64,
dsq_depths: &[u32],
fallback: i64,
keep_last: i64,
) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: report_with(dsq_depths, fallback, keep_last),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(step_index),
}
}
let drained = vec![
entry_with("periodic_000", 0, 10, &[5, 3], 10, 20),
entry_with("periodic_001", 0, 20, &[4, 8], 15, 30),
entry_with("periodic_002", 1, 100, &[12, 7], 18, 35),
entry_with("periodic_003", 1, 200, &[9, 11], 25, 42),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 2, "BASELINE + Step[0] -> 2 buckets");
let baseline = &phases[0];
assert_eq!(baseline.step_index, 0);
assert_eq!(
baseline.metrics.get("max_dsq_depth").copied(),
Some(8.0),
"BASELINE max_dsq_depth: Peak reduction over per-sample [5, 8] yields max 8"
);
assert_eq!(
baseline.metrics.get("total_fallback").copied(),
Some(5.0),
"BASELINE total_fallback: Counter delta 15 - 10 = 5"
);
assert_eq!(
baseline.metrics.get("total_keep_last").copied(),
Some(10.0),
"BASELINE total_keep_last: Counter delta 30 - 20 = 10"
);
let step0 = &phases[1];
assert_eq!(step0.step_index, 1);
assert_eq!(step0.label, "Step[0]");
assert_eq!(
step0.metrics.get("max_dsq_depth").copied(),
Some(12.0),
"Step[0] max_dsq_depth: Peak max of [12, 11] = 12"
);
assert_eq!(
step0.metrics.get("total_fallback").copied(),
Some(7.0),
"Step[0] total_fallback: Counter delta 25 - 18 = 7"
);
assert_eq!(
step0.metrics.get("total_keep_last").copied(),
Some(7.0),
"Step[0] total_keep_last: Counter delta 42 - 35 = 7"
);
for host_only in [
"worst_spread",
"worst_gap_ms",
"worst_migration_ratio",
"max_imbalance_ratio",
"worst_p99_wake_latency_us",
"worst_iterations_per_worker",
"worst_page_locality",
] {
assert!(
!baseline.metrics.contains_key(host_only),
"BASELINE must not carry host-only metric {host_only}"
);
assert!(
!step0.metrics.contains_key(host_only),
"Step[0] must not carry host-only metric {host_only}"
);
}
}
#[test]
fn periodic_only_excludes_off_cadence_captures_from_phase_buckets() {
use crate::monitor::dump::{FailureDumpReport, SCHEMA_SINGLE};
use crate::monitor::scx_walker::DsqState;
fn entry(tag: &str, elapsed_ms: u64, dsq_depth: u32) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
dsq_states: vec![DsqState {
id: 0,
origin: "local cpu 0".to_string(),
nr: dsq_depth,
seq: 0,
task_kvas: Vec::new(),
truncated: false,
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 5),
entry("periodic_001", 200, 8),
entry("ondemand_000", 300, 1000),
];
let series = SampleSeries::from_drained_typed(drained, None);
let full = crate::assert::build_phase_buckets(&series);
assert_eq!(
full.iter()
.find(|p| p.step_index == 1)
.expect("Step[0]")
.metrics
.get("max_dsq_depth")
.copied(),
Some(1000.0),
"full series includes the off-cadence capture's dsq depth",
);
let periodic = crate::assert::build_phase_buckets(&series.clone().periodic_only());
assert_eq!(
periodic
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0]")
.metrics
.get("max_dsq_depth")
.copied(),
Some(8.0),
"periodic_only excludes the off-cadence outlier: max is 8, NOT 1000",
);
}
#[test]
fn build_phase_buckets_injects_per_group_cpu_time_delta() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn task(tgid: i32, utime: u64, stime: u64) -> TaskEnrichment {
TaskEnrichment {
pid: tgid,
tgid,
utime,
stime,
signal_utime: Some(0),
signal_stime: Some(0),
..Default::default()
}
}
fn entry(
tag: &str,
step_index: u16,
elapsed_ms: u64,
tasks: Vec<TaskEnrichment>,
) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: tasks,
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(step_index),
}
}
let drained = vec![
entry("periodic_000", 0, 10, vec![task(100, 2000, 1000)]),
entry("periodic_001", 1, 100, vec![task(100, 2000, 1000)]),
entry(
"periodic_002",
1,
200,
vec![task(100, 9000, 4000), task(200, 2_000_000, 1_000_000)],
),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
assert_eq!(phases.len(), 2, "BASELINE + Step[0]");
let baseline = &phases[0];
assert_eq!(baseline.step_index, 0);
assert!(
!baseline.metrics.contains_key("system_time_ns")
&& !baseline.metrics.contains_key("user_time_ns"),
"single enriched sample -> CPU-time key omitted (absent != real 0)",
);
let step0 = &phases[1];
assert_eq!(step0.step_index, 1);
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(3000.0),
"system delta = tgid100 (4000-1000) + tgid200 (single-appearance \
-> 0); NOT 1_003_000 (sum-then-delta inflation)",
);
assert_eq!(
step0.metrics.get("user_time_ns").copied(),
Some(7000.0),
"user delta = tgid100 (9000-2000); tgid200 single-appearance -> 0",
);
}
#[test]
fn build_phase_buckets_cpu_time_includes_signal_accumulator() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn entry(
tag: &str,
elapsed_ms: u64,
stime: u64,
signal_stime: Option<u64>,
) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![TaskEnrichment {
pid: 100,
tgid: 100,
stime,
signal_stime,
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 1000, Some(0)),
entry("periodic_001", 200, 2000, Some(3000)),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(4000.0),
"signal accumulator (3000) folds into the group total: \
(2000+3000) - (1000+0) = 4000",
);
}
#[test]
fn build_phase_buckets_cpu_time_excludes_group_with_unreadable_signal() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn t(tgid: i32, stime: u64, signal_stime: Option<u64>) -> TaskEnrichment {
TaskEnrichment {
pid: tgid,
tgid,
stime,
signal_stime,
..Default::default()
}
}
fn entry(tag: &str, elapsed_ms: u64, tasks: Vec<TaskEnrichment>) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: tasks,
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry(
"periodic_000",
100,
vec![t(100, 1000, None), t(200, 1000, Some(0))],
),
entry(
"periodic_001",
200,
vec![t(100, 1000, Some(5_000_000)), t(200, 3000, Some(0))],
),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(2000.0),
"only tgid200 (readable at both) contributes (3000-1000=2000); \
tgid100's unreadable-signal first endpoint excludes it — the 5e6 \
accumulator must NOT leak as a phantom positive",
);
}
#[test]
fn build_phase_buckets_cpu_time_clamps_counter_decrease() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn entry(tag: &str, elapsed_ms: u64, stime: u64) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![TaskEnrichment {
pid: 100,
tgid: 100,
stime,
signal_stime: Some(0),
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 5000),
entry("periodic_001", 200, 1000),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(0.0),
"a last < first read clamps to 0 (qualifying group, counters did \
not advance) — a real Some(0.0), never a wrapped huge value",
);
}
#[test]
fn build_phase_buckets_cpu_time_disjoint_groups_yield_absent_not_zero() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn entry(tag: &str, elapsed_ms: u64, tgid: i32) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![TaskEnrichment {
pid: tgid,
tgid,
stime: 1000,
signal_stime: Some(0),
..Default::default()
}],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: Some(elapsed_ms),
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("periodic_000", 100, 100),
entry("periodic_001", 200, 200),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert!(
!step0.metrics.contains_key("system_time_ns"),
"disjoint groups -> no group spans two readable samples -> absent, \
not a sentinel Some(0.0)",
);
}
#[test]
fn build_phase_buckets_cpu_time_unanchored_sample_sorts_last_not_first() {
use crate::monitor::task_enrichment::TaskEnrichment;
fn task(stime: u64) -> TaskEnrichment {
TaskEnrichment {
pid: 100,
tgid: 100,
stime,
signal_stime: Some(0),
..Default::default()
}
}
fn entry(tag: &str, elapsed_ms: Option<u64>, stime: u64) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
task_enrichments: vec![task(stime)],
..Default::default()
},
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms,
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![
entry("on_demand_000", None, 3000),
entry("periodic_001", Some(100), 1000),
entry("periodic_002", Some(200), 3000),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("Step[0] bucket present");
assert_eq!(
step0.metrics.get("system_time_ns").copied(),
Some(2000.0),
"unanchored sample sorts LAST -> first_seen from the earliest \
timed sample (1000), delta = 2000; if it sorted first the delta \
would clamp to 0",
);
}
#[test]
fn build_phase_buckets_all_unanchored_phase_yields_inverted_window() {
fn unanchored(tag: &str) -> DrainedSnapshotEntry {
DrainedSnapshotEntry {
tag: tag.to_string(),
report: fixture_report(),
stats: Err(MissingStatsReason::NoSchedulerBinary),
elapsed_ms: None,
boundary_offset_ms: None,
step_index: Some(1),
}
}
let drained = vec![unanchored("on_demand_000"), unanchored("on_demand_001")];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let step0 = phases
.iter()
.find(|p| p.step_index == 1)
.expect("phase bucket present even when every sample is unanchored");
assert_eq!(step0.sample_count, 2, "both unanchored samples counted");
assert_eq!(
(step0.start_ms, step0.end_ms),
(u64::MAX, 0),
"all-unanchored phase -> inverted window that folds nothing, \
not a (0, x) window anchored at the run start",
);
}
#[test]
fn build_phase_buckets_integration_with_scenario_stats_phase_accessor() {
let drained = vec![
fixture_entry("periodic_000", 0, 10),
fixture_entry("periodic_001", 1, 100),
fixture_entry("periodic_002", 2, 200),
];
let samples = SampleSeries::from_drained_typed(drained, None);
let phases = crate::assert::build_phase_buckets(&samples);
let stats = ScenarioStats {
phases,
..Default::default()
};
assert_eq!(stats.phase(0).map(|p| p.label.as_str()), Some("BASELINE"));
assert_eq!(stats.phase(1).map(|p| p.label.as_str()), Some("Step[0]"));
assert_eq!(stats.phase(2).map(|p| p.label.as_str()), Some("Step[1]"));
assert_eq!(stats.phase(3), None);
assert_eq!(stats.step(0).map(|p| p.label.as_str()), Some("Step[0]"));
assert_eq!(stats.step(1).map(|p| p.label.as_str()), Some("Step[1]"));
}
#[test]
fn phase_baseline_const_is_u16_zero() {
assert_eq!(crate::assert::Phase::BASELINE.as_u16(), 0);
assert!(crate::assert::Phase::BASELINE.is_baseline());
}
#[test]
fn phase_step_zero_indexed_constructor_encodes_1_indexed() {
assert_eq!(crate::assert::Phase::step(0).as_u16(), 1);
assert_eq!(crate::assert::Phase::step(1).as_u16(), 2);
assert_eq!(crate::assert::Phase::step(7).as_u16(), 8);
assert!(!crate::assert::Phase::step(0).is_baseline());
}
#[test]
fn phase_step_saturating_at_u16_max_does_not_overflow() {
let saturated = crate::assert::Phase::step(u16::MAX - 1);
assert_eq!(saturated.as_u16(), u16::MAX);
let still_saturated = crate::assert::Phase::step(u16::MAX);
assert_eq!(still_saturated.as_u16(), u16::MAX);
assert!(
!saturated.is_baseline(),
"saturating MUST NOT collide with BASELINE"
);
}
#[test]
fn phase_display_baseline_step_format() {
assert_eq!(format!("{}", crate::assert::Phase::BASELINE), "BASELINE");
assert_eq!(format!("{}", crate::assert::Phase::step(0)), "Step[0]");
assert_eq!(format!("{}", crate::assert::Phase::step(7)), "Step[7]");
}
#[test]
fn phase_serde_transparent_round_trips_as_bare_u16() {
let phase = crate::assert::Phase::step(4);
let json = serde_json::to_string(&phase).unwrap();
assert_eq!(
json, "5",
"wire format must be the inner 1-indexed u16, not a tagged struct"
);
let round_tripped: crate::assert::Phase = serde_json::from_str(&json).unwrap();
assert_eq!(round_tripped, phase);
let from_raw: crate::assert::Phase = serde_json::from_str("0").unwrap();
assert_eq!(from_raw, crate::assert::Phase::BASELINE);
}
#[test]
fn phase_from_u16_wraps_raw_value() {
let from: crate::assert::Phase = 3u16.into();
assert_eq!(from.as_u16(), 3);
let to: u16 = crate::assert::Phase::step(2).into();
assert_eq!(to, 3);
}
#[test]
fn scenario_stats_has_steps_false_for_empty_phases() {
let stats = ScenarioStats::default();
assert!(!stats.has_steps());
}
#[test]
fn scenario_stats_has_steps_false_when_only_baseline() {
let stats = ScenarioStats {
phases: vec![crate::assert::PhaseBucket {
step_index: 0,
label: "BASELINE".to_string(),
..Default::default()
}],
..Default::default()
};
assert!(
!stats.has_steps(),
"BASELINE-only must NOT count as 'has steps'"
);
}
#[test]
fn scenario_stats_has_steps_true_when_any_step_phase_present() {
let stats = ScenarioStats {
phases: vec![
crate::assert::PhaseBucket {
step_index: 0,
label: "BASELINE".to_string(),
..Default::default()
},
crate::assert::PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
..Default::default()
},
],
..Default::default()
};
assert!(stats.has_steps());
}
#[test]
#[should_panic(expected = "metric 'missing' absent from phase step_index=1")]
fn phase_bucket_expect_metric_panics_with_diagnostic_when_absent() {
let bucket = crate::assert::PhaseBucket {
step_index: 1,
label: "Step[0]".to_string(),
sample_count: 3,
metrics: std::collections::BTreeMap::from([("throughput".to_string(), 42.0)]),
..Default::default()
};
bucket.expect_metric("missing");
}
#[test]
fn phase_bucket_expect_metric_returns_value_when_present() {
let bucket = crate::assert::PhaseBucket {
metrics: std::collections::BTreeMap::from([("throughput".to_string(), 42.5)]),
..Default::default()
};
assert_eq!(bucket.expect_metric("throughput"), 42.5);
}