use std::fmt;
use crate::monitor::{MonitorSample, sample_looks_valid};
#[derive(Debug, Clone, Default)]
pub struct TimelineContext {
pub kernel: Option<String>,
pub topology: Option<String>,
pub scheduler: Option<String>,
pub scenario: Option<String>,
pub duration_s: Option<f64>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StimulusEvent {
pub elapsed_ms: u64,
pub label: String,
pub op_kind: Option<String>,
pub detail: Option<String>,
pub total_iterations: Option<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct PhaseMetrics {
pub sample_count: usize,
pub avg_imbalance: f64,
pub max_imbalance: f64,
pub avg_dsq_depth: f64,
pub max_dsq_depth: u32,
pub stall_count: usize,
pub fallback_rate: Option<f64>,
pub keep_last_rate: Option<f64>,
pub iteration_rate: Option<f64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeDirection {
Improved,
Degraded,
}
impl fmt::Display for ChangeDirection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ChangeDirection::Improved => write!(f, "IMPROVEMENT"),
ChangeDirection::Degraded => write!(f, "DEGRADATION"),
}
}
}
#[derive(Debug, Clone)]
pub struct PhaseChange {
pub direction: ChangeDirection,
pub metric: String,
pub before: f64,
pub after: f64,
}
#[derive(Debug, Clone)]
pub struct Phase {
pub index: usize,
pub start_ms: u64,
pub end_ms: u64,
pub stimulus: Option<StimulusEvent>,
pub metrics: PhaseMetrics,
pub changes: Vec<PhaseChange>,
}
#[derive(Debug, Clone)]
pub struct Timeline {
pub phases: Vec<Phase>,
}
const IMBALANCE_THRESHOLD: f64 = 0.5;
const DSQ_THRESHOLD: f64 = 3.0;
const FALLBACK_RATE_THRESHOLD: f64 = 10.0;
const KEEP_LAST_RATE_THRESHOLD: f64 = 10.0;
const ITERATION_RATE_REL_THRESHOLD: f64 = 0.3;
fn detect_change(
before: f64,
after: f64,
threshold: f64,
metric: &str,
higher_is_worse: bool,
) -> Option<PhaseChange> {
let delta = after - before;
if delta.abs() <= threshold {
return None;
}
let degraded = if higher_is_worse {
delta > 0.0
} else {
delta < 0.0
};
Some(PhaseChange {
direction: if degraded {
ChangeDirection::Degraded
} else {
ChangeDirection::Improved
},
metric: metric.to_string(),
before,
after,
})
}
impl Timeline {
pub fn build(stimulus_events: &[StimulusEvent], monitor_samples: &[MonitorSample]) -> Self {
if stimulus_events.is_empty() || monitor_samples.is_empty() {
return Self { phases: Vec::new() };
}
let mut events = stimulus_events.to_vec();
events.sort_by_key(|e| e.elapsed_ms);
let first_stimulus_ms = events[0].elapsed_ms;
let first_monitor_ms = monitor_samples
.iter()
.find(|s| s.elapsed_ms > 500 && !s.cpus.is_empty())
.map(|s| s.elapsed_ms)
.unwrap_or_else(|| monitor_samples.first().map(|s| s.elapsed_ms).unwrap_or(0));
let offset = first_monitor_ms as i64 - first_stimulus_ms as i64;
let last_monitor_ms = monitor_samples.last().map(|s| s.elapsed_ms).unwrap_or(0);
let mut boundaries: Vec<(u64, u64, Option<StimulusEvent>)> = Vec::new();
for i in 0..events.len() {
let start = (events[i].elapsed_ms as i64 + offset).max(0) as u64;
let end = if i + 1 < events.len() {
(events[i + 1].elapsed_ms as i64 + offset).max(0) as u64
} else {
last_monitor_ms.saturating_add(1)
};
let stimulus = if i == 0 {
None
} else {
Some(events[i].clone())
};
boundaries.push((start, end, stimulus));
}
let mut phases: Vec<Phase> = Vec::with_capacity(boundaries.len());
for (idx, (start, end, stimulus)) in boundaries.into_iter().enumerate() {
let phase_samples: Vec<&MonitorSample> = monitor_samples
.iter()
.filter(|s| s.elapsed_ms >= start && s.elapsed_ms < end && sample_looks_valid(s))
.collect();
let metrics = compute_metrics(&phase_samples);
phases.push(Phase {
index: idx,
start_ms: start,
end_ms: end,
stimulus,
metrics,
changes: Vec::new(),
});
}
#[allow(clippy::needless_range_loop)]
for i in 0..phases.len() {
let iter_start = if i == 0 {
events.first().and_then(|e| e.total_iterations)
} else {
events.get(i).and_then(|e| e.total_iterations)
};
let iter_end = events.get(i + 1).and_then(|e| e.total_iterations);
if let (Some(s), Some(e)) = (iter_start, iter_end) {
let duration_s =
phases[i].end_ms.saturating_sub(phases[i].start_ms) as f64 / 1000.0;
if duration_s > 0.0 && e > s {
phases[i].metrics.iteration_rate =
Some(e.saturating_sub(s) as f64 / duration_s);
}
}
}
for i in 1..phases.len() {
let before = &phases[i - 1].metrics;
let after_metrics = &phases[i].metrics;
let mut changes = Vec::new();
if before.sample_count > 0 && after_metrics.sample_count > 0 {
changes.extend(detect_change(
before.avg_imbalance,
after_metrics.avg_imbalance,
IMBALANCE_THRESHOLD,
"imbalance",
true,
));
changes.extend(detect_change(
before.avg_dsq_depth,
after_metrics.avg_dsq_depth,
DSQ_THRESHOLD,
"dsq_depth",
true,
));
if let (Some(bf), Some(af)) = (before.fallback_rate, after_metrics.fallback_rate) {
changes.extend(detect_change(
bf,
af,
FALLBACK_RATE_THRESHOLD,
"fallback",
true,
));
}
if let (Some(bk), Some(ak)) = (before.keep_last_rate, after_metrics.keep_last_rate)
{
changes.extend(detect_change(
bk,
ak,
KEEP_LAST_RATE_THRESHOLD,
"keep_last",
true,
));
}
if let (Some(bi), Some(ai)) = (before.iteration_rate, after_metrics.iteration_rate)
&& bi > 0.0
{
let rel_delta = (ai - bi) / bi;
if rel_delta.abs() > ITERATION_RATE_REL_THRESHOLD {
changes.push(PhaseChange {
direction: if rel_delta < 0.0 {
ChangeDirection::Degraded
} else {
ChangeDirection::Improved
},
metric: "throughput".to_string(),
before: bi,
after: ai,
});
}
}
}
phases[i].changes = changes;
}
Self { phases }
}
pub fn format_with_context(&self, ctx: &TimelineContext) -> String {
if self.phases.is_empty() {
return String::new();
}
let mut out = String::from("--- timeline ---\n");
let mut header_parts = Vec::new();
if let Some(ref k) = ctx.kernel {
header_parts.push(format!("kernel: {k}"));
}
if let Some(ref t) = ctx.topology {
header_parts.push(format!("topology: {t}"));
}
if let Some(ref s) = ctx.scheduler {
header_parts.push(format!("scheduler: {s}"));
}
if let Some(ref s) = ctx.scenario {
header_parts.push(format!("scenario: {s}"));
}
if let Some(d) = ctx.duration_s {
header_parts.push(format!("duration: {d:.1}s"));
}
if !header_parts.is_empty() {
for part in &header_parts {
out.push_str(part);
out.push_str(" ");
}
if out.len() >= 2 {
out.truncate(out.len() - 2);
}
out.push('\n');
}
self.format_phases(&mut out);
out
}
fn format_phases(&self, out: &mut String) {
for phase in &self.phases {
let duration_ms = phase.end_ms.saturating_sub(phase.start_ms);
if phase.index == 0 {
out.push_str(&format!(
"\nBASELINE (settle, {}ms, {} samples):\n",
duration_ms, phase.metrics.sample_count,
));
} else {
let label_start = phase
.stimulus
.as_ref()
.map(|s| {
let mut l = s.label.clone();
if let Some(op) = &s.op_kind {
l.push(' ');
l.push_str(op);
}
l
})
.unwrap_or_else(|| "?".to_string());
out.push_str(&format!(
"\nPhase {}: {} ({}ms, {} samples):\n",
phase.index, label_start, duration_ms, phase.metrics.sample_count,
));
}
let m = &phase.metrics;
if m.sample_count > 0 {
out.push_str(&format!(
" imbalance: avg={:.1} max={:.1} | dsq: avg={:.0} max={}",
m.avg_imbalance, m.max_imbalance, m.avg_dsq_depth, m.max_dsq_depth,
));
if let Some(fb) = m.fallback_rate {
out.push_str(&format!(" | fallback: {:.0}/s", fb));
}
if let Some(kl) = m.keep_last_rate {
out.push_str(&format!(" | keep_last: {:.0}/s", kl));
}
if let Some(ir) = m.iteration_rate {
out.push_str(&format!(" | throughput: {:.0} iter/s", ir));
}
out.push('\n');
if m.stall_count > 0 {
out.push_str(&format!(" stalls: {}\n", m.stall_count));
}
} else {
out.push_str(" [no samples]\n");
}
if let Some(ref stim) = phase.stimulus {
let detail = stim.detail.as_deref().unwrap_or("");
let op = stim.op_kind.as_deref().unwrap_or("?");
out.push_str(&format!(" >>> {}: {op}", stim.label));
if !detail.is_empty() {
out.push_str(&format!(" ({detail})"));
}
out.push('\n');
}
for change in &phase.changes {
let delta = change.after - change.before;
let sign = if delta > 0.0 { "+" } else { "" };
out.push_str(&format!(
" >>> {}: {} {sign}{:.1}\n",
change.direction, change.metric, delta,
));
}
}
}
#[cfg(test)]
pub fn degradations(&self) -> Vec<(&Phase, &PhaseChange)> {
let mut out = Vec::new();
for phase in &self.phases {
for change in &phase.changes {
if change.direction == ChangeDirection::Degraded {
out.push((phase, change));
}
}
}
out
}
}
fn compute_metrics(samples: &[&MonitorSample]) -> PhaseMetrics {
if samples.is_empty() {
return PhaseMetrics::default();
}
let valid: Vec<&MonitorSample> = samples
.iter()
.copied()
.filter(|s| !s.cpus.is_empty() && sample_looks_valid(s))
.collect();
if valid.is_empty() {
return PhaseMetrics {
sample_count: 0,
..PhaseMetrics::default()
};
}
let mut total_imbalance = 0.0f64;
let mut max_imbalance = 0.0f64;
let mut total_dsq = 0.0f64;
let mut max_dsq = 0u32;
let mut stall_count = 0usize;
for sample in &valid {
for cpu in &sample.cpus {
max_dsq = max_dsq.max(cpu.local_dsq_depth);
}
let ratio = sample.imbalance_ratio();
total_imbalance += ratio;
if ratio > max_imbalance {
max_imbalance = ratio;
}
let avg_dsq_this: f64 = sample
.cpus
.iter()
.map(|c| c.local_dsq_depth as f64)
.sum::<f64>()
/ sample.cpus.len() as f64;
total_dsq += avg_dsq_this;
}
for w in valid.windows(2) {
let prev = w[0];
let curr = w[1];
let cpu_count = prev.cpus.len().min(curr.cpus.len());
for cpu in 0..cpu_count {
let idle = curr.cpus[cpu].nr_running == 0 && prev.cpus[cpu].nr_running == 0;
if curr.cpus[cpu].rq_clock != 0
&& curr.cpus[cpu].rq_clock == prev.cpus[cpu].rq_clock
&& !idle
{
stall_count += 1;
}
}
}
let has_events = |s: &&MonitorSample| s.cpus.iter().any(|c| c.event_counters.is_some());
let first_ev = valid.iter().copied().find(|s| has_events(s));
let last_ev = valid.iter().copied().rev().find(|s| has_events(s));
let (fallback_rate, keep_last_rate) = match (first_ev, last_ev) {
(Some(first), Some(last)) if first.elapsed_ms < last.elapsed_ms => {
let duration_s = last.elapsed_ms.saturating_sub(first.elapsed_ms) as f64 / 1000.0;
let fb_delta = crate::monitor::counter_delta(
last.sum_event_field(|e| e.select_cpu_fallback).unwrap_or(0),
first
.sum_event_field(|e| e.select_cpu_fallback)
.unwrap_or(0),
);
let kl_delta = crate::monitor::counter_delta(
last.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
first.sum_event_field(|e| e.dispatch_keep_last).unwrap_or(0),
);
(
Some(fb_delta as f64 / duration_s),
Some(kl_delta as f64 / duration_s),
)
}
_ => (None, None),
};
let n = valid.len() as f64;
PhaseMetrics {
sample_count: valid.len(),
avg_imbalance: total_imbalance / n,
max_imbalance,
avg_dsq_depth: total_dsq / n,
max_dsq_depth: max_dsq,
stall_count,
fallback_rate,
keep_last_rate,
iteration_rate: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::monitor::{CpuSnapshot, MonitorSample};
fn sample(elapsed_ms: u64, cpus: Vec<(u32, u32, u64)>) -> MonitorSample {
MonitorSample {
prog_stats: None,
elapsed_ms,
cpus: cpus
.into_iter()
.map(|(nr_running, dsq, rq_clock)| CpuSnapshot {
nr_running,
scx_nr_running: 0,
local_dsq_depth: dsq,
rq_clock,
scx_flags: 0,
event_counters: None,
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
})
.collect(),
}
}
fn stimulus(elapsed_ms: u64, label: &str) -> StimulusEvent {
StimulusEvent {
elapsed_ms,
label: label.to_string(),
op_kind: None,
detail: None,
total_iterations: None,
}
}
#[test]
fn empty_inputs_empty_timeline() {
let t = Timeline::build(&[], &[]);
assert!(t.phases.is_empty());
}
#[test]
fn no_stimulus_empty_timeline() {
let samples = vec![sample(1000, vec![(2, 1, 100)])];
let t = Timeline::build(&[], &samples);
assert!(t.phases.is_empty());
}
#[test]
fn no_monitor_empty_timeline() {
let events = vec![stimulus(0, "ScenarioStart")];
let t = Timeline::build(&events, &[]);
assert!(t.phases.is_empty());
}
#[test]
fn single_event_single_phase() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![
sample(600, vec![(2, 1, 100), (2, 1, 200)]),
sample(700, vec![(2, 1, 300), (2, 1, 400)]),
];
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 1);
assert!(t.phases[0].metrics.sample_count > 0);
}
#[test]
fn two_events_two_phases() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(3000, "StepStart[0]")];
let samples: Vec<MonitorSample> = (5..65)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 2);
assert!(t.phases[0].metrics.sample_count > 0);
assert!(t.phases[1].metrics.sample_count > 0);
}
#[test]
fn improvement_detected() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
let mut samples = Vec::new();
for i in 5..15 {
samples.push(sample(
i * 100,
vec![(1, 1, i * 1000), (5, 1, i * 1000 + 100)],
));
}
for i in 15..25 {
samples.push(sample(
i * 100,
vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
));
}
let t = Timeline::build(&events, &samples);
let improvements: Vec<_> = t
.phases
.iter()
.flat_map(|p| p.changes.iter())
.filter(|c| c.direction == ChangeDirection::Improved)
.collect();
assert!(!improvements.is_empty());
}
#[test]
fn format_non_empty() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
let samples: Vec<MonitorSample> = (5..25)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(formatted.contains("BASELINE"));
assert!(formatted.contains("Phase 1"));
assert!(formatted.contains("imbalance"));
}
#[test]
fn unsorted_events_sorted() {
let events = vec![stimulus(3000, "StepStart[0]"), stimulus(0, "ScenarioStart")];
let samples: Vec<MonitorSample> = (5..35)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 2);
assert!(t.phases[0].stimulus.is_none());
}
#[test]
fn stall_detected_in_phase() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![
sample(600, vec![(1, 0, 5000), (1, 0, 6000)]),
sample(700, vec![(1, 0, 5000), (1, 0, 7000)]), ];
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases[0].metrics.stall_count, 1);
}
#[test]
fn compute_metrics_empty() {
let m = compute_metrics(&[]);
assert_eq!(m.sample_count, 0);
assert_eq!(m.avg_imbalance, 0.0);
assert_eq!(m.max_dsq_depth, 0);
}
#[test]
fn stimulus_event_with_detail() {
let e = StimulusEvent {
elapsed_ms: 100,
label: "StepStart[0]".to_string(),
op_kind: Some("SetCpuset".to_string()),
detail: Some("4 cpus".to_string()),
total_iterations: None,
};
let events = vec![stimulus(0, "ScenarioStart"), e];
let samples: Vec<MonitorSample> = (5..25)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(formatted.contains("SetCpuset"));
assert!(formatted.contains("4 cpus"));
}
#[test]
fn many_phases() {
let events: Vec<StimulusEvent> = (0..10)
.map(|i| stimulus(i * 500, &format!("Step[{i}]")))
.collect();
let samples: Vec<MonitorSample> = (5..55)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
.collect();
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 10);
}
#[test]
fn phase_metrics_accuracy() {
let s1 = sample(600, vec![(1, 3, 100), (4, 5, 200)]); let s2 = sample(700, vec![(2, 1, 300), (2, 7, 400)]); let refs: Vec<&MonitorSample> = vec![&s1, &s2];
let m = compute_metrics(&refs);
assert_eq!(m.sample_count, 2);
assert!((m.avg_imbalance - 2.5).abs() < 0.01); assert!((m.max_imbalance - 4.0).abs() < 0.01);
assert_eq!(m.max_dsq_depth, 7);
}
#[test]
fn change_direction_display() {
assert_eq!(format!("{}", ChangeDirection::Improved), "IMPROVEMENT");
assert_eq!(format!("{}", ChangeDirection::Degraded), "DEGRADATION");
}
#[test]
fn compute_metrics_with_event_counters() {
use crate::monitor::ScxEventCounters;
let s1 = MonitorSample {
prog_stats: None,
elapsed_ms: 600,
cpus: vec![CpuSnapshot {
nr_running: 2,
local_dsq_depth: 1,
rq_clock: 100,
scx_nr_running: 0,
scx_flags: 0,
event_counters: Some(ScxEventCounters {
select_cpu_fallback: 10,
dispatch_keep_last: 5,
..Default::default()
}),
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
}],
};
let s2 = MonitorSample {
prog_stats: None,
elapsed_ms: 1600,
cpus: vec![CpuSnapshot {
nr_running: 2,
local_dsq_depth: 1,
rq_clock: 200,
scx_nr_running: 0,
scx_flags: 0,
event_counters: Some(ScxEventCounters {
select_cpu_fallback: 110,
dispatch_keep_last: 55,
..Default::default()
}),
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
}],
};
let refs: Vec<&MonitorSample> = vec![&s1, &s2];
let m = compute_metrics(&refs);
assert!((m.fallback_rate.unwrap() - 100.0).abs() < 0.01);
assert!((m.keep_last_rate.unwrap() - 50.0).abs() < 0.01);
}
#[test]
fn compute_metrics_no_event_counters() {
let s1 = sample(600, vec![(2, 1, 100)]);
let s2 = sample(700, vec![(2, 1, 200)]);
let refs: Vec<&MonitorSample> = vec![&s1, &s2];
let m = compute_metrics(&refs);
assert!(m.fallback_rate.is_none());
assert!(m.keep_last_rate.is_none());
}
#[test]
fn compute_metrics_counter_reset_clamps_rates_to_non_negative() {
use crate::monitor::ScxEventCounters;
let s1 = MonitorSample {
prog_stats: None,
elapsed_ms: 0,
cpus: vec![CpuSnapshot {
nr_running: 2,
local_dsq_depth: 1,
rq_clock: 100,
scx_nr_running: 0,
scx_flags: 0,
event_counters: Some(ScxEventCounters {
select_cpu_fallback: 1000,
dispatch_keep_last: 500,
..Default::default()
}),
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
}],
};
let s2 = MonitorSample {
prog_stats: None,
elapsed_ms: 1000,
cpus: vec![CpuSnapshot {
nr_running: 2,
local_dsq_depth: 1,
rq_clock: 200,
scx_nr_running: 0,
scx_flags: 0,
event_counters: Some(ScxEventCounters {
select_cpu_fallback: 5,
dispatch_keep_last: 2,
..Default::default()
}),
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
}],
};
let refs: Vec<&MonitorSample> = vec![&s1, &s2];
let m = compute_metrics(&refs);
let fb = m.fallback_rate.expect("reset still produces Some rate");
let kl = m.keep_last_rate.expect("reset still produces Some rate");
assert!(
fb >= 0.0,
"reset must not produce negative fallback_rate, got {fb}"
);
assert!(
kl >= 0.0,
"reset must not produce negative keep_last_rate, got {kl}"
);
}
#[test]
fn format_with_stalls_shown() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![
sample(600, vec![(1, 0, 5000), (1, 0, 6000)]),
sample(700, vec![(1, 0, 5000), (1, 0, 7000)]), ];
let t = Timeline::build(&events, &samples);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(formatted.contains("stalls: 1"));
}
#[test]
fn format_phase_no_samples() {
let events = vec![
stimulus(0, "ScenarioStart"),
stimulus(100, "StepStart[0]"),
stimulus(50000, "StepStart[1]"),
];
let samples: Vec<MonitorSample> = (5..15)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000)]))
.collect();
let t = Timeline::build(&events, &samples);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(formatted.contains("[no samples]"));
}
#[test]
fn fallback_rate_degradation_detected() {
use crate::monitor::ScxEventCounters;
let events = vec![stimulus(0, "ScenarioStart"), stimulus(1000, "StepStart[0]")];
let mut samples = Vec::new();
for i in 5..15 {
samples.push(MonitorSample {
prog_stats: None,
elapsed_ms: i * 100,
cpus: vec![CpuSnapshot {
nr_running: 2,
local_dsq_depth: 1,
rq_clock: i * 1000,
scx_nr_running: 0,
scx_flags: 0,
event_counters: Some(ScxEventCounters {
select_cpu_fallback: 0,
dispatch_keep_last: 0,
..Default::default()
}),
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
}],
});
}
for i in 15..25 {
samples.push(MonitorSample {
prog_stats: None,
elapsed_ms: i * 100,
cpus: vec![CpuSnapshot {
nr_running: 2,
local_dsq_depth: 1,
rq_clock: i * 1000,
scx_nr_running: 0,
scx_flags: 0,
event_counters: Some(ScxEventCounters {
select_cpu_fallback: (i as i64 - 15) * 50,
dispatch_keep_last: 0,
..Default::default()
}),
schedstat: None,
vcpu_cpu_time_ns: None,
vcpu_perf: None,
sched_domains: None,
}],
});
}
let t = Timeline::build(&events, &samples);
let degs: Vec<_> = t
.degradations()
.into_iter()
.filter(|(_, c)| c.metric == "fallback")
.collect();
assert!(!degs.is_empty());
}
#[test]
fn format_with_context_includes_header() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![
sample(600, vec![(2, 1, 100), (2, 1, 200)]),
sample(700, vec![(2, 1, 300), (2, 1, 400)]),
];
let t = Timeline::build(&events, &samples);
let ctx = TimelineContext {
kernel: Some("6.14.0-rc3+".to_string()),
topology: Some("2n4l4c2t (16 cpus)".to_string()),
scheduler: Some("scx_mitosis".to_string()),
scenario: Some("proportional".to_string()),
duration_s: Some(20.5),
};
let formatted = t.format_with_context(&ctx);
assert!(formatted.contains("--- timeline ---"));
assert!(formatted.contains("kernel: 6.14.0-rc3+"));
assert!(formatted.contains("topology: 2n4l4c2t (16 cpus)"));
assert!(formatted.contains("scheduler: scx_mitosis"));
assert!(formatted.contains("scenario: proportional"));
assert!(formatted.contains("duration: 20.5s"));
assert!(formatted.contains("BASELINE"));
}
#[test]
fn format_with_context_partial_fields() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![sample(600, vec![(2, 1, 100)])];
let t = Timeline::build(&events, &samples);
let ctx = TimelineContext {
kernel: None,
topology: Some("1n1l1c1t (1 cpus)".to_string()),
scheduler: None,
scenario: Some("basic".to_string()),
duration_s: None,
};
let formatted = t.format_with_context(&ctx);
assert!(formatted.contains("topology: 1n1l1c1t"));
assert!(formatted.contains("scenario: basic"));
assert!(!formatted.contains("kernel:"));
assert!(!formatted.contains("scheduler:"));
assert!(!formatted.contains("duration:"));
}
#[test]
fn format_with_context_empty_timeline() {
let t = Timeline { phases: vec![] };
let ctx = TimelineContext {
kernel: Some("6.14.0".to_string()),
..Default::default()
};
assert!(t.format_with_context(&ctx).is_empty());
}
#[test]
fn format_with_context_empty_context() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![sample(600, vec![(2, 1, 100)])];
let t = Timeline::build(&events, &samples);
let ctx = TimelineContext::default();
let formatted = t.format_with_context(&ctx);
assert!(formatted.contains("--- timeline ---"));
assert!(formatted.contains("BASELINE"));
let after_header = &formatted["--- timeline ---\n".len()..];
assert!(after_header.starts_with('\n'));
}
#[test]
fn garbage_dsq_samples_filtered_from_metrics() {
let events = vec![stimulus(0, "ScenarioStart")];
let garbage_dsq = 1_550_435_906u32;
let samples = vec![
MonitorSample {
prog_stats: None,
elapsed_ms: 600,
cpus: vec![CpuSnapshot {
nr_running: 1,
local_dsq_depth: garbage_dsq,
rq_clock: 1000,
..Default::default()
}],
},
sample(700, vec![(2, 3, 2000)]),
];
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 1);
assert_eq!(t.phases[0].metrics.sample_count, 1);
assert_eq!(t.phases[0].metrics.max_dsq_depth, 3);
}
#[test]
fn all_garbage_samples_yield_no_metrics() {
let events = vec![stimulus(0, "ScenarioStart")];
let samples = vec![MonitorSample {
prog_stats: None,
elapsed_ms: 600,
cpus: vec![CpuSnapshot {
nr_running: 1,
local_dsq_depth: 50_000,
rq_clock: 1000,
..Default::default()
}],
}];
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases[0].metrics.sample_count, 0);
}
#[test]
fn neg_timeline_detects_imbalance_degradation() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
let mut samples = Vec::new();
for i in 6..25 {
samples.push(sample(
i * 100,
vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
));
}
for i in 26..45 {
samples.push(sample(
i * 100,
vec![(1, 1, i * 1000), (10, 1, i * 1000 + 100)],
));
}
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 2, "must have 2 phases");
assert!(!t.degradations().is_empty());
assert!(
t.phases[0].metrics.sample_count > 0,
"baseline must have samples"
);
assert!(
(t.phases[0].metrics.avg_imbalance - 1.0).abs() < 0.5,
"baseline imbalance should be ~1.0, got {:.1}",
t.phases[0].metrics.avg_imbalance,
);
assert!(
t.phases[1].metrics.sample_count > 0,
"phase 1 must have samples"
);
assert!(
t.phases[1]
.stimulus
.as_ref()
.is_some_and(|s| s.label == "StepStart[0]"),
"phase 1 stimulus must be StepStart[0]",
);
let degs = t.degradations();
assert!(!degs.is_empty());
let (phase, change) = °s[0];
assert_eq!(phase.index, 1);
assert_eq!(change.metric, "imbalance");
assert_eq!(change.direction, ChangeDirection::Degraded);
let delta = change.after - change.before;
assert!(delta > 0.0, "delta must be positive for degradation");
assert!(
delta > IMBALANCE_THRESHOLD,
"delta {:.1} must exceed threshold {:.1}",
delta,
IMBALANCE_THRESHOLD
);
assert!(
change.before < 2.0,
"before should be low: {:.1}",
change.before
);
assert!(
change.after > 5.0,
"after should be high: {:.1}",
change.after
);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(
formatted.contains("BASELINE"),
"format must include BASELINE phase"
);
assert!(formatted.contains("Phase 1"), "format must include Phase 1");
assert!(
formatted.contains("DEGRADATION"),
"format must include DEGRADATION label"
);
assert!(
formatted.contains("imbalance"),
"format must name the metric"
);
}
#[test]
fn neg_timeline_detects_dsq_depth_degradation() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
let mut samples = Vec::new();
for i in 6..25 {
samples.push(sample(
i * 100,
vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
));
}
for i in 26..45 {
samples.push(sample(
i * 100,
vec![(2, 20, i * 1000), (2, 20, i * 1000 + 100)],
));
}
let t = Timeline::build(&events, &samples);
assert!(
!t.degradations().is_empty(),
"DSQ depth jump must be detected"
);
let degs = t.degradations();
let dsq_deg = degs.iter().find(|(_, c)| c.metric == "dsq_depth");
assert!(dsq_deg.is_some(), "must detect dsq_depth degradation");
let (phase, change) = dsq_deg.unwrap();
assert_eq!(phase.index, 1);
assert_eq!(change.direction, ChangeDirection::Degraded);
let delta = change.after - change.before;
assert!(
delta > DSQ_THRESHOLD,
"dsq delta {:.1} must exceed threshold {:.1}",
delta,
DSQ_THRESHOLD
);
assert!(
change.before < 5.0,
"before dsq should be low: {:.1}",
change.before
);
assert!(
change.after > 15.0,
"after dsq should be high: {:.1}",
change.after
);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(
formatted.contains("dsq_depth"),
"format must name dsq_depth"
);
assert!(
formatted.contains("DEGRADATION"),
"format must label degradation"
);
}
#[test]
fn neg_timeline_no_degradation_when_stable() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(2000, "StepStart[0]")];
let mut samples = Vec::new();
for i in 6..45 {
samples.push(sample(
i * 100,
vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)],
));
}
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 2, "must have 2 phases");
assert!(t.phases[0].metrics.sample_count > 0);
assert!(t.phases[1].metrics.sample_count > 0);
assert!(
t.degradations().is_empty(),
"stable phases must not show degradation"
);
assert!(t.degradations().is_empty());
for phase in &t.phases {
assert!(
phase.changes.is_empty(),
"phase {} should have no changes",
phase.index
);
}
}
#[test]
fn detect_change_higher_is_worse_positive_delta_degraded() {
let c = detect_change(1.0, 5.0, 0.5, "imbalance", true).unwrap();
assert_eq!(c.direction, ChangeDirection::Degraded);
assert_eq!(c.metric, "imbalance");
assert!((c.before - 1.0).abs() < f64::EPSILON);
assert!((c.after - 5.0).abs() < f64::EPSILON);
}
#[test]
fn detect_change_higher_is_worse_negative_delta_improved() {
let c = detect_change(5.0, 1.0, 0.5, "imbalance", true).unwrap();
assert_eq!(c.direction, ChangeDirection::Improved);
}
#[test]
fn detect_change_lower_is_worse_negative_delta_degraded() {
let c = detect_change(100.0, 50.0, 10.0, "throughput", false).unwrap();
assert_eq!(c.direction, ChangeDirection::Degraded);
}
#[test]
fn detect_change_lower_is_worse_positive_delta_improved() {
let c = detect_change(50.0, 100.0, 10.0, "throughput", false).unwrap();
assert_eq!(c.direction, ChangeDirection::Improved);
}
#[test]
fn detect_change_below_threshold_returns_none() {
assert!(detect_change(1.0, 1.3, 0.5, "imbalance", true).is_none());
}
#[test]
fn detect_change_exactly_at_threshold_returns_none() {
assert!(detect_change(1.0, 1.5, 0.5, "imbalance", true).is_none());
}
fn stimulus_with_iters(elapsed_ms: u64, label: &str, total_iterations: u64) -> StimulusEvent {
StimulusEvent {
elapsed_ms,
label: label.to_string(),
op_kind: None,
detail: None,
total_iterations: Some(total_iterations),
}
}
#[test]
fn iteration_rate_computed_from_consecutive_events() {
let events = vec![
stimulus_with_iters(0, "ScenarioStart", 0),
stimulus_with_iters(3000, "StepStart[0]", 3000),
];
let samples: Vec<MonitorSample> = (5..35)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 2);
let rate = t.phases[0].metrics.iteration_rate;
assert!(rate.is_some(), "phase 0 should have iteration_rate");
let r = rate.unwrap();
assert!(r > 500.0 && r < 2000.0, "rate {r} outside expected range");
}
#[test]
fn iteration_rate_none_without_total_iterations() {
let events = vec![stimulus(0, "ScenarioStart"), stimulus(3000, "StepStart[0]")];
let samples: Vec<MonitorSample> = (5..35)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
assert!(t.phases[0].metrics.iteration_rate.is_none());
assert!(t.phases[1].metrics.iteration_rate.is_none());
}
#[test]
fn throughput_degradation_detected() {
let events = vec![
stimulus_with_iters(0, "ScenarioStart", 0),
stimulus_with_iters(2000, "StepStart[0]", 10000),
stimulus_with_iters(4000, "StepEnd[0]", 11000),
];
let samples: Vec<MonitorSample> = (5..45)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
assert_eq!(t.phases.len(), 3);
assert!(t.phases[0].metrics.iteration_rate.is_some());
assert!(t.phases[1].metrics.iteration_rate.is_some());
let r0 = t.phases[0].metrics.iteration_rate.unwrap();
let r1 = t.phases[1].metrics.iteration_rate.unwrap();
assert!(
r0 > r1,
"phase 0 rate ({r0}) should exceed phase 1 rate ({r1})"
);
let degs: Vec<_> = t
.degradations()
.into_iter()
.filter(|(_, c)| c.metric == "throughput")
.collect();
assert!(!degs.is_empty(), "throughput degradation must be detected");
let (phase, change) = °s[0];
assert_eq!(phase.index, 1);
assert_eq!(change.direction, ChangeDirection::Degraded);
assert!(change.before > change.after);
}
#[test]
fn throughput_improvement_detected() {
let events = vec![
stimulus_with_iters(0, "ScenarioStart", 0),
stimulus_with_iters(2000, "StepStart[0]", 500),
stimulus_with_iters(4000, "StepEnd[0]", 10500),
];
let samples: Vec<MonitorSample> = (5..45)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
let improvements: Vec<_> = t
.phases
.iter()
.flat_map(|p| p.changes.iter())
.filter(|c| c.metric == "throughput" && c.direction == ChangeDirection::Improved)
.collect();
assert!(
!improvements.is_empty(),
"throughput improvement must be detected"
);
}
#[test]
fn throughput_stable_below_threshold() {
let events = vec![
stimulus_with_iters(0, "ScenarioStart", 0),
stimulus_with_iters(2000, "StepStart[0]", 2000),
stimulus_with_iters(4000, "StepEnd[0]", 3800),
];
let samples: Vec<MonitorSample> = (5..45)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
let throughput_changes: Vec<_> = t
.phases
.iter()
.flat_map(|p| p.changes.iter())
.filter(|c| c.metric == "throughput")
.collect();
assert!(
throughput_changes.is_empty(),
"10% change should not trigger throughput change detection"
);
}
#[test]
fn iteration_rate_in_formatted_output() {
let events = vec![
stimulus_with_iters(0, "ScenarioStart", 0),
stimulus_with_iters(2000, "StepStart[0]", 5000),
];
let samples: Vec<MonitorSample> = (5..25)
.map(|i| sample(i * 100, vec![(2, 1, i * 1000), (2, 1, i * 1000 + 100)]))
.collect();
let t = Timeline::build(&events, &samples);
let formatted = t.format_with_context(&TimelineContext::default());
assert!(
formatted.contains("throughput:"),
"format output must contain throughput when iteration_rate is set"
);
assert!(formatted.contains("iter/s"));
}
}