use anyhow::Result;
use ktstr::assert::{AssertResult, Verdict};
use ktstr::ktstr_test;
use ktstr::prelude::{SampleSeries, VmResult};
use ktstr::scenario::ops::{CgroupDef, HoldSpec, Step, execute_steps};
use ktstr::test_support::{Scheduler, SchedulerSpec};
const KTSTR_SCHED: Scheduler =
Scheduler::new("ktstr_sched").binary(SchedulerSpec::Discover("scx-ktstr"));
const DISPATCHED_CEILING: u64 = 1_000_000_000_000;
fn assert_temporal_patterns(result: &VmResult) -> Result<()> {
let series = SampleSeries::from_drained(result.snapshot_bridge.drain_ordered_with_stats())
.periodic_only();
anyhow::ensure!(
!series.is_empty(),
"post_vm: no periodic samples on the bridge — the freeze \
coordinator never fired (periodic_target={}, \
periodic_fired={})",
result.periodic_target,
result.periodic_fired,
);
anyhow::ensure!(
result.periodic_target == 3,
"periodic_target must mirror the configured num_snapshots = 3, got {}",
result.periodic_target,
);
anyhow::ensure!(
series.len() >= 2,
"need at least 2 periodic samples for nondecreasing to be \
non-vacuous, got {}",
series.len(),
);
let bpf_dispatched = series.bpf("nr_dispatched", |snap| snap.var("nr_dispatched").as_u64());
let any_progress = bpf_dispatched
.iter_full()
.any(|(_, _, slot)| matches!(slot, Ok(v) if *v > 0));
anyhow::ensure!(
any_progress,
"BPF nr_dispatched read 0 across every periodic sample — the \
dispatch path never advanced under the 10 s workload (was \
scx-ktstr loaded?)",
);
let mut verdict = Verdict::new();
series
.bpf("nr_dispatched", |snap| snap.var("nr_dispatched").as_u64())
.nondecreasing(&mut verdict);
series
.stats("nr_dispatched", |s| s.path("nr_dispatched").as_u64())
.each(&mut verdict)
.at_most(DISPATCHED_CEILING);
let r = verdict.into_result();
if !r.passed {
let detail_lines: Vec<String> = r
.details
.iter()
.map(|d| format!(" [{:?}] {}", d.kind, d.message))
.collect();
anyhow::bail!(
"temporal assertions failed across {} sample(s):\n{}",
series.len(),
detail_lines.join("\n"),
);
}
Ok(())
}
#[ktstr_test(
scheduler = KTSTR_SCHED,
duration_s = 10,
watchdog_timeout_s = 15,
num_snapshots = 3,
auto_repro = false,
post_vm = assert_temporal_patterns,
)]
fn temporal_assertions_over_periodic_samples(ctx: &ktstr::scenario::Ctx) -> Result<AssertResult> {
let steps = vec![Step {
setup: vec![CgroupDef::named("cg_0").workers(ctx.workers_per_cgroup)].into(),
ops: vec![],
hold: HoldSpec::FULL,
}];
execute_steps(ctx, steps)
}