use crate::monitor::dump::FailureDumpReport;
use super::snapshot::{JsonField, Snapshot, SnapshotField, SnapshotResult, stats_path};
use crate::assert::temporal::SeriesField;
#[derive(Debug)]
#[non_exhaustive]
pub struct Sample<'a> {
pub tag: &'a str,
pub elapsed_ms: u64,
pub snapshot: Snapshot<'a>,
pub stats: Option<&'a serde_json::Value>,
}
#[derive(Debug)]
pub struct SampleSeries {
rows: Vec<SampleRow>,
}
#[derive(Debug)]
struct SampleRow {
tag: String,
report: FailureDumpReport,
stats: Option<serde_json::Value>,
elapsed_ms: u64,
}
impl SampleSeries {
pub fn from_drained(
drained: Vec<(
String,
FailureDumpReport,
Option<serde_json::Value>,
Option<u64>,
)>,
) -> Self {
let rows = drained
.into_iter()
.map(|(tag, report, stats, elapsed_ms)| SampleRow {
tag,
report,
stats,
elapsed_ms: elapsed_ms.unwrap_or(0),
})
.collect();
Self { rows }
}
pub fn empty() -> Self {
Self { rows: Vec::new() }
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn len(&self) -> usize {
self.rows.len()
}
#[must_use = "periodic_only returns a filtered series; bind the result"]
pub fn periodic_only(self) -> Self {
Self {
rows: self
.rows
.into_iter()
.filter(|r| r.tag.starts_with("periodic_"))
.collect(),
}
}
pub fn periodic_ref(&self) -> impl Iterator<Item = Sample<'_>> {
self.iter_samples()
.filter(|s| s.tag.starts_with("periodic_"))
}
pub fn iter_samples(&self) -> impl Iterator<Item = Sample<'_>> {
self.rows.iter().map(|r| Sample {
tag: r.tag.as_str(),
elapsed_ms: r.elapsed_ms,
snapshot: Snapshot::new(&r.report),
stats: r.stats.as_ref(),
})
}
pub fn bpf<T, F>(&self, label: impl Into<String>, project: F) -> SeriesField<T>
where
F: Fn(&Snapshot<'_>) -> SnapshotResult<T>,
{
let mut values: Vec<SnapshotResult<T>> = Vec::with_capacity(self.rows.len());
let mut tags: Vec<String> = Vec::with_capacity(self.rows.len());
let mut elapsed: Vec<u64> = Vec::with_capacity(self.rows.len());
for row in &self.rows {
tags.push(row.tag.clone());
elapsed.push(row.elapsed_ms);
if row.report.is_placeholder {
values.push(Err(
crate::scenario::snapshot::SnapshotError::PlaceholderSample {
tag: row.tag.clone(),
reason: row
.report
.scx_walker_unavailable
.clone()
.unwrap_or_else(|| "placeholder report".to_string()),
},
));
continue;
}
let snap = Snapshot::new(&row.report);
values.push(project(&snap));
}
SeriesField::from_parts(label, tags, elapsed, values)
}
pub fn stats<T, F>(&self, label: impl Into<String>, project: F) -> SeriesField<T>
where
F: Fn(StatsValue<'_>) -> SnapshotResult<T>,
{
let mut values: Vec<SnapshotResult<T>> = Vec::with_capacity(self.rows.len());
let mut tags: Vec<String> = Vec::with_capacity(self.rows.len());
let mut elapsed: Vec<u64> = Vec::with_capacity(self.rows.len());
for row in &self.rows {
tags.push(row.tag.clone());
elapsed.push(row.elapsed_ms);
let outcome = match row.stats.as_ref() {
Some(v) => project(StatsValue { value: v }),
None => Err(crate::scenario::snapshot::SnapshotError::MissingStats {
tag: row.tag.clone(),
}),
};
values.push(outcome);
}
SeriesField::from_parts(label, tags, elapsed, values)
}
pub fn bpf_map<'a>(&'a self, map_name: &'a str) -> BpfMapProjector<'a> {
BpfMapProjector {
series: self,
map_name,
entry_index: 0,
}
}
pub fn stats_path<'a>(&'a self, path: &str) -> StatsPathProjector<'a> {
StatsPathProjector {
series: self,
path: path.to_string(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct StatsValue<'a> {
value: &'a serde_json::Value,
}
impl<'a> StatsValue<'a> {
pub fn raw(&self) -> &'a serde_json::Value {
self.value
}
pub fn path(&self, path: &str) -> JsonField<'a> {
stats_path(self.value, path)
}
}
pub struct BpfMapProjector<'a> {
series: &'a SampleSeries,
map_name: &'a str,
entry_index: usize,
}
impl<'a> BpfMapProjector<'a> {
pub fn at(mut self, index: usize) -> Self {
self.entry_index = index;
self
}
pub fn field_u64(&self, field: &str) -> SeriesField<u64> {
let map_name = self.map_name.to_string();
let entry_index = self.entry_index;
let field_owned = field.to_string();
self.series.bpf(field, move |snap| {
let entry = match snap.map(&map_name) {
Ok(m) => m.at(entry_index),
Err(e) => return Err(e),
};
entry.get(&field_owned).as_u64()
})
}
pub fn field_i64(&self, field: &str) -> SeriesField<i64> {
let map_name = self.map_name.to_string();
let entry_index = self.entry_index;
let field_owned = field.to_string();
self.series.bpf(field, move |snap| {
let entry = match snap.map(&map_name) {
Ok(m) => m.at(entry_index),
Err(e) => return Err(e),
};
entry.get(&field_owned).as_i64()
})
}
pub fn field_f64(&self, field: &str) -> SeriesField<f64> {
let map_name = self.map_name.to_string();
let entry_index = self.entry_index;
let field_owned = field.to_string();
self.series.bpf(field, move |snap| {
let entry = match snap.map(&map_name) {
Ok(m) => m.at(entry_index),
Err(e) => return Err(e),
};
entry.get(&field_owned).as_f64()
})
}
pub fn member_names(&self) -> Vec<String> {
let row = match self.series.rows.first() {
Some(r) => r,
None => return Vec::new(),
};
let snap = Snapshot::new(&row.report);
let map = match snap.map(self.map_name) {
Ok(m) => m,
Err(_) => return Vec::new(),
};
let entry = map.at(self.entry_index);
let field = entry.get("");
match field {
SnapshotField::Value(crate::monitor::btf_render::RenderedValue::Struct {
members,
..
}) => members.iter().map(|m| m.name.clone()).collect(),
_ => Vec::new(),
}
}
pub fn u64_fields(&self) -> Vec<(String, SeriesField<u64>)> {
self.member_names()
.into_iter()
.filter_map(|name| {
let field = self.field_u64(&name);
let any_ok = field.values_iter().any(|r| r.is_ok());
any_ok.then_some((name, field))
})
.collect()
}
pub fn f64_fields(&self) -> Vec<(String, SeriesField<f64>)> {
self.member_names()
.into_iter()
.filter_map(|name| {
let field = self.field_f64(&name);
let any_ok = field.values_iter().any(|r| r.is_ok());
any_ok.then_some((name, field))
})
.collect()
}
}
pub struct StatsPathProjector<'a> {
series: &'a SampleSeries,
path: String,
}
impl<'a> StatsPathProjector<'a> {
pub fn field_u64(&self, key: &str) -> SeriesField<u64> {
let full_path = join_paths(&self.path, key);
self.series
.stats(key, move |sv| sv.path(&full_path).as_u64())
}
pub fn field_i64(&self, key: &str) -> SeriesField<i64> {
let full_path = join_paths(&self.path, key);
self.series
.stats(key, move |sv| sv.path(&full_path).as_i64())
}
pub fn field_f64(&self, key: &str) -> SeriesField<f64> {
let full_path = join_paths(&self.path, key);
self.series
.stats(key, move |sv| sv.path(&full_path).as_f64())
}
pub fn key(&self, key: &str) -> StatsPathProjector<'a> {
StatsPathProjector {
series: self.series,
path: join_paths(&self.path, key),
}
}
pub fn key_names(&self) -> Vec<String> {
let row = match self.series.rows.first() {
Some(r) => r,
None => return Vec::new(),
};
let stats = match row.stats.as_ref() {
Some(s) => s,
None => return Vec::new(),
};
let resolved = stats_path(stats, &self.path);
let raw = match resolved.raw() {
Some(v) => v,
None => return Vec::new(),
};
match raw {
serde_json::Value::Object(map) => {
let mut names: Vec<String> = map.keys().cloned().collect();
names.sort();
names
}
_ => Vec::new(),
}
}
pub fn u64_fields(&self) -> Vec<(String, SeriesField<u64>)> {
self.key_names()
.into_iter()
.filter_map(|name| {
let field = self.field_u64(&name);
let any_ok = field.values_iter().any(|r| r.is_ok());
any_ok.then_some((name, field))
})
.collect()
}
pub fn f64_fields(&self) -> Vec<(String, SeriesField<f64>)> {
self.key_names()
.into_iter()
.filter_map(|name| {
let field = self.field_f64(&name);
let any_ok = field.values_iter().any(|r| r.is_ok());
any_ok.then_some((name, field))
})
.collect()
}
}
fn join_paths(base: &str, leaf: &str) -> String {
if base.is_empty() {
leaf.to_string()
} else if leaf.is_empty() {
base.to_string()
} else {
format!("{base}.{leaf}")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::monitor::btf_render::{RenderedMember, RenderedValue};
use crate::monitor::dump::{FailureDumpMap, FailureDumpReport, SCHEMA_SINGLE};
fn synthetic_report(value: u64) -> FailureDumpReport {
let bss_value = RenderedValue::Struct {
type_name: Some(".bss".into()),
members: vec![
RenderedMember {
name: "nr_dispatched".into(),
value: RenderedValue::Uint { bits: 64, value },
},
RenderedMember {
name: "stall".into(),
value: RenderedValue::Uint { bits: 8, value: 0 },
},
],
};
let bss_map = FailureDumpMap {
name: "scx_obj.bss".into(),
map_type: 2,
value_size: 16,
max_entries: 1,
value: Some(bss_value),
entries: Vec::new(),
percpu_entries: Vec::new(),
percpu_hash_entries: Vec::new(),
arena: None,
ringbuf: None,
stack_trace: None,
fd_array: None,
error: None,
};
FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
maps: vec![bss_map],
..Default::default()
}
}
fn synthetic_stats(busy: f64) -> serde_json::Value {
serde_json::json!({
"busy": busy,
"antistall": 0,
"layers": {
"batch": { "util": busy * 0.5 }
}
})
}
#[test]
fn from_drained_preserves_order() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
Some(synthetic_stats(50.0)),
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(20),
Some(synthetic_stats(60.0)),
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
assert_eq!(series.len(), 2);
let tags: Vec<&str> = series.iter_samples().map(|s| s.tag).collect();
assert_eq!(tags, vec!["periodic_000", "periodic_001"]);
}
#[test]
fn periodic_only_filters_non_periodic_tags() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
None,
Some(100),
),
(
"user_watchpoint_kind".to_string(),
synthetic_report(99),
None,
Some(150),
),
(
"periodic_001".to_string(),
synthetic_report(20),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained).periodic_only();
assert_eq!(series.len(), 2);
}
#[test]
fn bpf_projection_extracts_field_per_sample() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
None,
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(20),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let field: SeriesField<u64> =
series.bpf("nr_dispatched", |snap| snap.var("nr_dispatched").as_u64());
let values: Vec<u64> = field
.values_iter()
.filter_map(|v| v.as_ref().ok().copied())
.collect();
assert_eq!(values, vec![10, 20]);
}
#[test]
fn stats_projection_handles_missing_stats_as_error() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
Some(synthetic_stats(50.0)),
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(20),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let field: SeriesField<f64> = series.stats("busy", |s| s.path("busy").as_f64());
let outcomes: Vec<SnapshotResult<f64>> = field.values_iter().cloned().collect();
assert_eq!(outcomes.len(), 2);
assert_eq!(
outcomes[0].as_ref().copied(),
Ok(50.0),
"sample with stats present must project the `busy` field verbatim"
);
match &outcomes[1] {
Err(crate::scenario::snapshot::SnapshotError::MissingStats { tag }) => {
assert_eq!(
tag, "periodic_001",
"MissingStats tag must identify the sample whose stats slot was None"
);
}
other => panic!(
"sample with stats=None must surface SnapshotError::MissingStats, got {other:?}"
),
}
}
#[test]
fn bpf_map_projector_field_u64_extracts_field() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(10),
None,
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(20),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let field = series
.bpf_map("scx_obj.bss")
.at(0)
.field_u64("nr_dispatched");
let values: Vec<u64> = field
.values_iter()
.filter_map(|v| v.as_ref().ok().copied())
.collect();
assert_eq!(values, vec![10, 20]);
}
#[test]
fn bpf_map_projector_member_names_lists_struct_fields() {
let drained = vec![(
"periodic_000".to_string(),
synthetic_report(10),
None,
Some(100),
)];
let series = SampleSeries::from_drained(drained);
let names = series.bpf_map("scx_obj.bss").at(0).member_names();
assert!(names.contains(&"nr_dispatched".to_string()));
assert!(names.contains(&"stall".to_string()));
}
#[test]
fn stats_path_projector_field_f64_extracts_root_scalar() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(0),
Some(synthetic_stats(50.0)),
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(0),
Some(synthetic_stats(60.0)),
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let field = series.stats_path("").field_f64("busy");
let values: Vec<f64> = field
.values_iter()
.filter_map(|v| v.as_ref().ok().copied())
.collect();
assert_eq!(values.len(), 2);
assert!((values[0] - 50.0).abs() < f64::EPSILON);
assert!((values[1] - 60.0).abs() < f64::EPSILON);
}
#[test]
fn stats_path_projector_key_names_at_root() {
let drained = vec![(
"periodic_000".to_string(),
synthetic_report(0),
Some(synthetic_stats(50.0)),
Some(100),
)];
let series = SampleSeries::from_drained(drained);
let names = series.stats_path("").key_names();
assert!(names.contains(&"busy".to_string()));
assert!(names.contains(&"layers".to_string()));
}
#[test]
fn stats_path_projector_nested_key_drills_in() {
let drained = vec![(
"periodic_000".to_string(),
synthetic_report(0),
Some(synthetic_stats(50.0)),
Some(100),
)];
let series = SampleSeries::from_drained(drained);
let field = series.stats_path("layers").key("batch").field_f64("util");
let values: Vec<f64> = field
.values_iter()
.filter_map(|v| v.as_ref().ok().copied())
.collect();
assert_eq!(values.len(), 1);
assert!((values[0] - 25.0).abs() < f64::EPSILON);
}
fn mixed_shape_report(disp: u64, balance: f64) -> FailureDumpReport {
let bss_value = RenderedValue::Struct {
type_name: Some(".bss".into()),
members: vec![
RenderedMember {
name: "nr_dispatched".into(),
value: RenderedValue::Uint {
bits: 64,
value: disp,
},
},
RenderedMember {
name: "stall".into(),
value: RenderedValue::Uint { bits: 8, value: 0 },
},
RenderedMember {
name: "balance".into(),
value: RenderedValue::Float {
bits: 64,
value: balance,
},
},
RenderedMember {
name: "flag_str".into(),
value: RenderedValue::Bytes {
hex: "de ad".into(),
},
},
],
};
let bss_map = FailureDumpMap {
name: "scx_obj.bss".into(),
map_type: 2,
value_size: 32,
max_entries: 1,
value: Some(bss_value),
entries: Vec::new(),
percpu_entries: Vec::new(),
percpu_hash_entries: Vec::new(),
arena: None,
ringbuf: None,
stack_trace: None,
fd_array: None,
error: None,
};
FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
maps: vec![bss_map],
..Default::default()
}
}
#[test]
fn bpf_map_projector_u64_fields_keeps_at_least_one_ok_excludes_all_err() {
let drained = vec![
(
"periodic_000".to_string(),
mixed_shape_report(10, 1.5),
None,
Some(100),
),
(
"periodic_001".to_string(),
mixed_shape_report(20, 2.5),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let fields = series.bpf_map("scx_obj.bss").at(0).u64_fields();
let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"nr_dispatched"),
"u64-shaped member must be kept: {names:?}",
);
assert!(
names.contains(&"stall"),
"u64-shaped member must be kept: {names:?}",
);
assert!(
!names.contains(&"balance"),
"Float-shaped member must be excluded — every u64 projection errors: {names:?}",
);
assert!(
!names.contains(&"flag_str"),
"Bytes-shaped member must be excluded — every u64 projection errors: {names:?}",
);
let dispatched = fields
.iter()
.find(|(n, _)| n == "nr_dispatched")
.expect("nr_dispatched kept above");
let values: Vec<u64> = dispatched
.1
.values_iter()
.filter_map(|r| r.as_ref().ok().copied())
.collect();
assert_eq!(
values,
vec![10, 20],
"kept SeriesField must carry the per-sample u64 projection",
);
}
#[test]
fn bpf_map_projector_f64_fields_keeps_at_least_one_ok_excludes_all_err() {
let drained = vec![
(
"periodic_000".to_string(),
mixed_shape_report(10, 1.5),
None,
Some(100),
),
(
"periodic_001".to_string(),
mixed_shape_report(20, 2.5),
None,
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let fields = series.bpf_map("scx_obj.bss").at(0).f64_fields();
let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"nr_dispatched"),
"Uint coerces to f64 — must be kept: {names:?}",
);
assert!(
names.contains(&"stall"),
"Uint coerces to f64 — must be kept: {names:?}",
);
assert!(
names.contains(&"balance"),
"Float coerces to f64 — must be kept: {names:?}",
);
assert!(
!names.contains(&"flag_str"),
"Bytes does not coerce to f64 — must be excluded: {names:?}",
);
let balance = fields
.iter()
.find(|(n, _)| n == "balance")
.expect("balance kept above");
let values: Vec<f64> = balance
.1
.values_iter()
.filter_map(|r| r.as_ref().ok().copied())
.collect();
assert_eq!(values.len(), 2, "balance must surface one f64 per sample",);
assert!((values[0] - 1.5).abs() < f64::EPSILON);
assert!((values[1] - 2.5).abs() < f64::EPSILON);
}
#[test]
fn bpf_map_projector_field_helpers_empty_series_yields_empty_vec() {
let series = SampleSeries::empty();
let u64s = series.bpf_map("scx_obj.bss").at(0).u64_fields();
assert!(
u64s.is_empty(),
"empty series must yield empty u64_fields, got {} entries",
u64s.len(),
);
let f64s = series.bpf_map("scx_obj.bss").at(0).f64_fields();
assert!(
f64s.is_empty(),
"empty series must yield empty f64_fields, got {} entries",
f64s.len(),
);
}
fn mixed_stats(busy: u64, count: u64) -> serde_json::Value {
serde_json::json!({
"busy": busy,
"count": count,
"ratio": 0.5,
"name": "nope",
})
}
#[test]
fn stats_path_projector_u64_fields_keeps_at_least_one_ok_excludes_all_err() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(0),
Some(mixed_stats(50, 7)),
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(0),
Some(mixed_stats(60, 9)),
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let fields = series.stats_path("").u64_fields();
let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"busy"),
"Number(integer) key must be kept: {names:?}",
);
assert!(
names.contains(&"count"),
"Number(integer) key must be kept: {names:?}",
);
assert!(
!names.contains(&"ratio"),
"Number(non-integer float) errors on every u64 projection — must be excluded: {names:?}",
);
assert!(
!names.contains(&"name"),
"String('nope') errors on every u64 projection — must be excluded: {names:?}",
);
let busy = fields
.iter()
.find(|(n, _)| n == "busy")
.expect("busy kept above");
let values: Vec<u64> = busy
.1
.values_iter()
.filter_map(|r| r.as_ref().ok().copied())
.collect();
assert_eq!(values, vec![50, 60]);
}
#[test]
fn stats_path_projector_f64_fields_keeps_at_least_one_ok_excludes_all_err() {
let drained = vec![
(
"periodic_000".to_string(),
synthetic_report(0),
Some(mixed_stats(50, 7)),
Some(100),
),
(
"periodic_001".to_string(),
synthetic_report(0),
Some(mixed_stats(60, 9)),
Some(200),
),
];
let series = SampleSeries::from_drained(drained);
let fields = series.stats_path("").f64_fields();
let names: Vec<&str> = fields.iter().map(|(n, _)| n.as_str()).collect();
assert!(
names.contains(&"busy"),
"Number coerces to f64 — must be kept: {names:?}",
);
assert!(
names.contains(&"count"),
"Number coerces to f64 — must be kept: {names:?}",
);
assert!(
names.contains(&"ratio"),
"Number(float) coerces to f64 — must be kept: {names:?}",
);
assert!(
!names.contains(&"name"),
"String('nope') errors on every f64 projection — must be excluded: {names:?}",
);
let ratio = fields
.iter()
.find(|(n, _)| n == "ratio")
.expect("ratio kept above");
let values: Vec<f64> = ratio
.1
.values_iter()
.filter_map(|r| r.as_ref().ok().copied())
.collect();
assert_eq!(values.len(), 2);
assert!((values[0] - 0.5).abs() < f64::EPSILON);
assert!((values[1] - 0.5).abs() < f64::EPSILON);
}
#[test]
fn stats_path_projector_field_helpers_empty_series_yields_empty_vec() {
let series = SampleSeries::empty();
let u64s = series.stats_path("").u64_fields();
assert!(
u64s.is_empty(),
"empty series must yield empty u64_fields, got {} entries",
u64s.len(),
);
let f64s = series.stats_path("").f64_fields();
assert!(
f64s.is_empty(),
"empty series must yield empty f64_fields, got {} entries",
f64s.len(),
);
}
}