use crate::monitor::MonitorReport;
use crate::monitor::dump::FailureDumpReport;
use super::snapshot::{Snapshot, SnapshotResult};
use crate::assert::temporal::SeriesField;
mod bpf;
mod host;
mod monitor;
mod stats;
pub use bpf::BpfMapProjector;
pub use host::HostView;
pub use monitor::{ERROR_CLASS_NAMES, MonitorView, ScxEventsView};
pub use stats::{StatsPathProjector, StatsValue};
#[derive(Debug)]
#[non_exhaustive]
pub struct Sample<'a> {
pub tag: &'a str,
pub elapsed_ms: u64,
pub snapshot: Snapshot<'a>,
pub stats: Result<&'a serde_json::Value, &'a crate::scenario::snapshot::MissingStatsReason>,
pub step_index: Option<u16>,
}
#[derive(Debug)]
pub struct SampleSeries {
rows: Vec<SampleRow>,
monitor: Option<MonitorReport>,
}
#[derive(Debug)]
struct SampleRow {
tag: String,
report: FailureDumpReport,
stats: Result<serde_json::Value, crate::scenario::snapshot::MissingStatsReason>,
elapsed_ms: u64,
step_index: Option<u16>,
}
fn build_series_field<T>(
rows: &[SampleRow],
label: impl Into<String>,
mut row_to_slot: impl FnMut(&SampleRow) -> SnapshotResult<T>,
) -> SeriesField<T> {
let mut values: Vec<SnapshotResult<T>> = Vec::with_capacity(rows.len());
let mut tags: Vec<String> = Vec::with_capacity(rows.len());
let mut elapsed: Vec<u64> = Vec::with_capacity(rows.len());
let mut phases: Vec<Option<crate::assert::Phase>> = Vec::with_capacity(rows.len());
for row in rows {
tags.push(row.tag.clone());
elapsed.push(row.elapsed_ms);
phases.push(row.step_index.map(crate::assert::Phase::from));
values.push(row_to_slot(row));
}
SeriesField::from_parts_with_phases(label, tags, elapsed, values, phases)
}
impl SampleSeries {
pub fn from_drained(
drained: Vec<(
String,
FailureDumpReport,
Option<serde_json::Value>,
Option<u64>,
)>,
monitor: Option<MonitorReport>,
) -> Self {
let rows = drained
.into_iter()
.map(|(tag, report, stats, elapsed_ms)| SampleRow {
tag,
report,
stats: stats.map(Ok).unwrap_or(Err(
crate::scenario::snapshot::MissingStatsReason::NoSchedulerBinary,
)),
elapsed_ms: elapsed_ms.unwrap_or(0),
step_index: None,
})
.collect();
Self { rows, monitor }
}
pub fn from_drained_typed(
drained: Vec<crate::scenario::snapshot::DrainedSnapshotEntry>,
monitor: Option<MonitorReport>,
) -> Self {
let rows = drained
.into_iter()
.map(|entry| {
let crate::scenario::snapshot::DrainedSnapshotEntry {
tag,
report,
stats,
elapsed_ms,
step_index,
..
} = entry;
SampleRow {
tag,
report,
stats,
elapsed_ms: elapsed_ms.unwrap_or(0),
step_index,
}
})
.collect();
Self { rows, monitor }
}
pub fn empty() -> Self {
Self {
rows: Vec::new(),
monitor: None,
}
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn len(&self) -> usize {
self.rows.len()
}
#[must_use = "periodic_only returns a filtered series; bind the result"]
pub fn periodic_only(self) -> Self {
Self {
rows: self
.rows
.into_iter()
.filter(|r| r.tag.starts_with("periodic_"))
.collect(),
monitor: self.monitor,
}
}
pub fn periodic_ref(&self) -> impl Iterator<Item = Sample<'_>> {
self.iter_samples()
.filter(|s| s.tag.starts_with("periodic_"))
}
pub fn iter_samples(&self) -> impl Iterator<Item = Sample<'_>> {
self.rows.iter().map(|r| Sample {
tag: r.tag.as_str(),
elapsed_ms: r.elapsed_ms,
snapshot: Snapshot::new(&r.report),
stats: r.stats.as_ref(),
step_index: r.step_index,
})
}
pub fn by_phase(&self) -> std::collections::BTreeMap<u16, Vec<Sample<'_>>> {
let mut by_phase: std::collections::BTreeMap<u16, Vec<Sample<'_>>> =
std::collections::BTreeMap::new();
for sample in self.iter_samples() {
let key = sample.step_index.unwrap_or(0);
by_phase.entry(key).or_default().push(sample);
}
by_phase
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::monitor::btf_render::{RenderedMember, RenderedValue};
use crate::monitor::dump::{FailureDumpMap, FailureDumpReport, SCHEMA_SINGLE};
fn synthetic_report(value: u64) -> FailureDumpReport {
let bss_value = RenderedValue::Struct {
type_name: Some(".bss".into()),
members: vec![
RenderedMember {
name: "nr_dispatched".into(),
value: RenderedValue::Uint { bits: 64, value },
},
RenderedMember {
name: "stall".into(),
value: RenderedValue::Uint { bits: 8, value: 0 },
},
],
};
let bss_map = FailureDumpMap {
name: "scx_obj.bss".into(),
map_kva: 0,
map_type: 2,
value_size: 16,
max_entries: 1,
value: Some(bss_value),
entries: Vec::new(),
percpu_entries: Vec::new(),
percpu_hash_entries: Vec::new(),
arena: None,
ringbuf: None,
stack_trace: None,
fd_array: None,
error: None,
};
FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
active_map_kvas: Vec::new(),
maps: vec![bss_map],
..Default::default()
}
}
fn synthetic_stats(busy: f64) -> serde_json::Value {
serde_json::json!({
"busy": busy,
"antistall": 0,
"layers": {
"batch": { "util": busy * 0.5 }
}
})
}
#[test]
fn from_drained_preserves_order() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
Some(synthetic_stats(50.0)),
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(20),
Some(synthetic_stats(60.0)),
Some(200),
),
];
let series = SampleSeries::from_drained(drained, None);
assert_eq!(series.len(), 2);
let tags: Vec<&str> = series.iter_samples().map(|s| s.tag).collect();
assert_eq!(tags, vec!["periodic_000", "periodic_001"]);
}
#[test]
fn periodic_only_filters_non_periodic_tags() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
None,
Some(100),
),
(
"user_watchpoint_kind".to_string(),
synthetic_report(99),
None,
Some(150),
),
(
"periodic_001".to_string(),
synthetic_report(20),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained, None).periodic_only();
assert_eq!(series.len(), 2);
}
}