ktstr 0.6.0

Test harness for Linux process schedulers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
//! Unified periodic-sample bundle and series projection.
//!
//! At every periodic boundary (see [`super::snapshot`] and the
//! freeze coordinator's periodic-capture loop), the framework
//! captures a coupled [`FailureDumpReport`] + scx_stats JSON pair.
//! [`Sample`] is the borrowed-view tuple over that pair plus the
//! per-sample tag and elapsed-millisecond timestamp;
//! [`SampleSeries`] is the ordered sequence of samples drained
//! from a `SnapshotBridge` after VM exit.
//!
//! Test authors do not construct samples manually — they call
//! [`SampleSeries::from_drained`] on the periodic bundle the
//! bridge surfaces via
//! `SnapshotBridge::drain_ordered_with_stats`, then project the
//! series along one of four orthogonal axes:
//!
//!  - **bpf** — kernel BPF state through
//!    [`SampleSeries::bpf`] / the typed
//!    [`SampleSeries::bpf_map`] helper.
//!  - **stats** — userspace scx_stats JSON through
//!    [`SampleSeries::stats`] / the typed
//!    [`SampleSeries::stats_path`] helper.
//!  - **host** — per-sample per-CPU host timeline through
//!    [`SampleSeries::host`] (sourced from
//!    `FailureDumpReport::per_cpu_time`).
//!  - **monitor** — per-VM-run cross-CPU host monitor aggregate
//!    through [`SampleSeries::monitor`] (sourced from
//!    `MonitorReport::summary`).
//!
//! Each projection yields a
//! [`crate::assert::temporal::SeriesField`] that
//! flows into the temporal-assertion patterns
//! (`nondecreasing`, `rate_within`, `steady_within`,
//! `converges_to`, `always_true`, `ratio_within`) defined in
//! [`crate::assert::temporal`].
//!
//! # Lifetime model
//!
//! `SampleSeries` owns the drained `Vec<(tag, report, stats,
//! elapsed_ms)>` so projection closures can borrow into the
//! reports / stats without copying. Constructing a `Sample` only
//! borrows; [`SampleSeries::iter_samples`] yields `Sample<'_>`
//! bound by the series' own lifetime.

use crate::monitor::MonitorReport;
use crate::monitor::dump::FailureDumpReport;

use super::snapshot::{Snapshot, SnapshotResult};
use crate::assert::temporal::SeriesField;

mod bpf;
mod host;
mod monitor;
mod stats;

pub use bpf::BpfMapProjector;
pub use host::HostView;
pub use monitor::{ERROR_CLASS_NAMES, MonitorView, ScxEventsView};
pub use stats::{StatsPathProjector, StatsValue};

/// One captured periodic sample: a frozen BPF snapshot paired with
/// the scx_stats JSON observed just before the freeze rendezvous,
/// labelled with the periodic tag (`periodic_000` …
/// `periodic_NNN`) and tagged with the elapsed milliseconds since
/// `run_start`.
///
/// Constructed by [`SampleSeries::iter_samples`] — test authors do
/// not invoke `Sample::new` directly. The `'a` lifetime ties the
/// borrowed `tag`, `snapshot`, and `stats` references back to the
/// owning [`SampleSeries`].
#[derive(Debug)]
#[non_exhaustive]
pub struct Sample<'a> {
    /// Periodic tag the freeze coordinator stamped onto this
    /// sample. Always begins with `"periodic_"` followed by a
    /// zero-padded ordinal — see
    /// `crate::vmm::freeze_coord::periodic_tag`.
    pub tag: &'a str,
    /// Wall-clock elapsed milliseconds (pause-adjusted: the
    /// coordinator subtracts cumulative ScenarioPause/Resume
    /// pause time and any in-flight pause window) since the
    /// coordinator's `run_start` instant at stats-request
    /// completion time, pre-freeze. The coordinator captures
    /// this timestamp AFTER the scx_stats request returns
    /// (or fails) and BEFORE entering the freeze rendezvous,
    /// so the value reflects when the running scheduler's
    /// stats were observed. BPF state is observed up to
    /// `FREEZE_RENDEZVOUS_TIMEOUT` later than this anchor.
    /// `0` when the bridge could not record a timestamp
    /// (legacy stores without elapsed metadata, or
    /// non-periodic captures surfaced through the same drain).
    pub elapsed_ms: u64,
    /// Frozen BPF state captured at this boundary. The view is
    /// cheap to build — accessor methods walk the underlying
    /// [`FailureDumpReport`] in place.
    pub snapshot: Snapshot<'a>,
    /// scx_stats JSON observed by a stats request issued just
    /// BEFORE the freeze rendezvous. `Err(reason)` when the stats
    /// client was not wired (`scheduler_binary` is absent) or the
    /// request failed — the carried
    /// [`MissingStatsReason`](crate::scenario::snapshot::MissingStatsReason)
    /// identifies the specific failure mode (no scheduler, relay
    /// rejected, watchdog cancelled, scheduler errno, etc.).
    /// [`SampleSeries::stats`] surfaces this `Err` as a per-sample
    /// [`SnapshotError::MissingStats`](crate::scenario::snapshot::SnapshotError::MissingStats)
    /// slot in the resulting [`SeriesField`] rather than vacuously
    /// skipping; temporal patterns handle that error per their own
    /// policy (gap-tolerant patterns like `nondecreasing`,
    /// `rate_within`, `steady_within`, `converges_to`, and
    /// `ratio_within` skip the sample with a rendered Note, while
    /// strict patterns like `always_true` and `each` fail the
    /// assertion so a stats-coverage gap can never silently slip
    /// past the call site).
    pub stats: Result<&'a serde_json::Value, &'a crate::scenario::snapshot::MissingStatsReason>,
    /// Scenario phase index the freeze coordinator stamped onto
    /// this sample at capture time. Encoded per the framework's
    /// 1-indexed phase convention — `0` is the BASELINE settle
    /// window, `1..=N` align with scenario Step ordinals. `None`
    /// for fixture-injected samples that took the unstamped legacy
    /// bridge paths
    /// ([`super::snapshot::SnapshotBridge::capture`] /
    /// [`super::snapshot::SnapshotBridge::store`] /
    /// [`super::snapshot::SnapshotBridge::store_with_stats`]);
    /// production captures via the periodic-fire path and the
    /// on-demand `Op::CaptureSnapshot` / `Op::WatchSnapshot` apply
    /// arms always carry `Some(idx)`. Read by
    /// [`SampleSeries::by_phase`] to bucket samples per scenario
    /// phase for the phase-aware aggregator.
    pub step_index: Option<u16>,
}

/// Ordered collection of [`Sample`]s drained from a
/// [`SnapshotBridge`](super::snapshot::SnapshotBridge) after a VM
/// run completes. Owns the underlying tuples so projection
/// closures can borrow into the reports / stats without copying.
///
/// Test authors construct a `SampleSeries` from
/// [`super::snapshot::SnapshotBridge::drain_ordered_with_stats`]
/// via [`Self::from_drained`]; non-periodic tags (e.g. `Op::CaptureSnapshot`
/// captures) coexist in the drain output and are tolerated by the
/// projection helpers — the typical pattern is to pre-filter to
/// periodic tags via [`Self::periodic_only`] before asserting.
#[derive(Debug)]
pub struct SampleSeries {
    rows: Vec<SampleRow>,
    /// Host-side monitor report for the VM run that produced this
    /// series. `None` when the monitor did not run (host-only tests,
    /// early VM failure, or `from_drained` was called with `None`
    /// for the monitor argument). Aggregates inside the report refer
    /// to THAT series' monitoring window only — no cross-series
    /// merge is supported. Surfaced via [`Self::monitor`] which wraps
    /// it in a borrowed [`MonitorView`] for typed projection.
    monitor: Option<MonitorReport>,
}

/// Owned tuple stored inside [`SampleSeries`]. Mirrors the shape of
/// [`super::snapshot::SnapshotBridge::drain_ordered_with_stats`]
/// but carries the timestamp explicitly (defaulted to `0` when
/// the bridge omitted it) so iteration does not have to handle
/// the `Option<u64>` repeatedly.
#[derive(Debug)]
struct SampleRow {
    tag: String,
    report: FailureDumpReport,
    stats: Result<serde_json::Value, crate::scenario::snapshot::MissingStatsReason>,
    elapsed_ms: u64,
    /// Scenario phase index stamped at capture time by the
    /// step-aware bridge entry points, mirrored from
    /// [`super::snapshot::DrainedSnapshotEntry::step_index`].
    /// `None` for unstamped legacy / fixture captures (see
    /// [`Sample::step_index`] for the surfaced semantic).
    step_index: Option<u16>,
}

/// Common scaffolding shared by every projector axis (bpf / stats /
/// host per-CPU). Iterates `rows` once, threads each row's
/// `tag` and `elapsed_ms` into the resulting [`SeriesField`], and
/// invokes `row_to_slot` to compute the per-sample value or per-
/// sample `SnapshotError`. Keeps the `tags`/`elapsed`/`values`
/// vec lengths in lock-step so the [`SeriesField::from_parts`]
/// length-parity invariant never triggers.
fn build_series_field<T>(
    rows: &[SampleRow],
    label: impl Into<String>,
    mut row_to_slot: impl FnMut(&SampleRow) -> SnapshotResult<T>,
) -> SeriesField<T> {
    let mut values: Vec<SnapshotResult<T>> = Vec::with_capacity(rows.len());
    let mut tags: Vec<String> = Vec::with_capacity(rows.len());
    let mut elapsed: Vec<u64> = Vec::with_capacity(rows.len());
    let mut phases: Vec<Option<crate::assert::Phase>> = Vec::with_capacity(rows.len());
    for row in rows {
        tags.push(row.tag.clone());
        elapsed.push(row.elapsed_ms);
        // The drained-bridge step_index is already in the 1-indexed
        // encoding `crate::assert::Phase` wraps (BASELINE = 0, Step[k]
        // = k + 1). Thread it through so `SeriesField::phase` /
        // `value_at_phase` / `last_per_phase` / `ratio_across_phases`
        // see live phase stamps. Synthetic rows (from `from_drained`
        // test path) carry `step_index = None` and stay None here.
        phases.push(row.step_index.map(crate::assert::Phase::from));
        values.push(row_to_slot(row));
    }
    SeriesField::from_parts_with_phases(label, tags, elapsed, values, phases)
}

impl SampleSeries {
    /// Build a series from the bridge's drained tuple. Every entry
    /// is preserved in the order the bridge surfaced, including
    /// non-periodic tags — callers that want the periodic-only
    /// view chain `.periodic_only()`.
    ///
    /// `monitor` is the per-VM-run `MonitorReport` (typically
    /// `result.monitor.clone()` from a `VmResult`). Pass `None`
    /// when the monitor did not run (host-only tests, early VM
    /// failure). Surfaced via [`Self::monitor`] for typed projection
    /// of the summary + scx_events + (future) per-sample timelines.
    pub fn from_drained(
        drained: Vec<(
            String,
            FailureDumpReport,
            Option<serde_json::Value>,
            Option<u64>,
        )>,
        monitor: Option<MonitorReport>,
    ) -> Self {
        let rows = drained
            .into_iter()
            .map(|(tag, report, stats, elapsed_ms)| SampleRow {
                tag,
                report,
                // Test/synthetic caller convention: `None` collapses to
                // the `NoSchedulerBinary` reason because that's the
                // shape every fixture has historically modelled — no
                // scheduler client wired, no stats. Production callers
                // that have a typed [`SchedStatsError`] use
                // [`Self::from_drained_typed`] instead, which preserves
                // the specific failure mode.
                stats: stats.map(Ok).unwrap_or(Err(
                    crate::scenario::snapshot::MissingStatsReason::NoSchedulerBinary,
                )),
                elapsed_ms: elapsed_ms.unwrap_or(0),
                // Unstamped fixture path: samples surface with
                // `step_index = None` and fall under the by_phase
                // fallback bucket. Production callers thread the
                // bridge-stamped index via from_drained_typed.
                step_index: None,
            })
            .collect();
        Self { rows, monitor }
    }

    /// Production-path constructor: takes the typed
    /// [`Result<serde_json::Value, MissingStatsReason>`](crate::scenario::snapshot::MissingStatsReason)
    /// shape returned by
    /// [`SnapshotBridge::drain_ordered_with_stats`](crate::scenario::snapshot::SnapshotBridge::drain_ordered_with_stats),
    /// preserving the specific failure mode (relay error, scheduler
    /// errno, watchdog cancellation, etc.). Use this when the caller
    /// has access to the bridge drain output; tests prefer
    /// [`Self::from_drained`] which accepts the simpler `Option`
    /// shape and collapses absent → `NoSchedulerBinary`.
    pub fn from_drained_typed(
        drained: Vec<crate::scenario::snapshot::DrainedSnapshotEntry>,
        monitor: Option<MonitorReport>,
    ) -> Self {
        let rows = drained
            .into_iter()
            .map(|entry| {
                let crate::scenario::snapshot::DrainedSnapshotEntry {
                    tag,
                    report,
                    stats,
                    elapsed_ms,
                    step_index,
                    ..
                } = entry;
                SampleRow {
                    tag,
                    report,
                    stats,
                    elapsed_ms: elapsed_ms.unwrap_or(0),
                    step_index,
                }
            })
            .collect();
        Self { rows, monitor }
    }

    /// Empty series. Useful for tests and for the no-periodic-
    /// capture case where every assertion vacuously passes.
    pub fn empty() -> Self {
        Self {
            rows: Vec::new(),
            monitor: None,
        }
    }

    /// True when no samples are present.
    pub fn is_empty(&self) -> bool {
        self.rows.is_empty()
    }

    /// Number of samples in the series.
    pub fn len(&self) -> usize {
        self.rows.len()
    }

    /// Filter the series to entries whose tag begins with
    /// `"periodic_"`. Periodic captures are the only entries the
    /// temporal-assertion patterns are designed for; on-demand
    /// `Op::CaptureSnapshot` and watchpoint-fire captures share the
    /// bridge's tag namespace and would otherwise mix into the
    /// timeline as off-cadence outliers. Consumes `self` because
    /// the filter rebuilds the owning row vec — when a borrowed
    /// view is needed instead, see [`Self::periodic_ref`] which
    /// iterates the same rows without taking ownership.
    #[must_use = "periodic_only returns a filtered series; bind the result"]
    pub fn periodic_only(self) -> Self {
        Self {
            rows: self
                .rows
                .into_iter()
                .filter(|r| r.tag.starts_with("periodic_"))
                .collect(),
            monitor: self.monitor,
        }
    }

    /// Borrowed equivalent of [`Self::periodic_only`]: yields a
    /// borrowed-view iterator over [`Sample`]s whose tag starts
    /// with `"periodic_"`, without consuming the series. Use when
    /// a single test asserts on both periodic-only and
    /// all-captures views from the same series.
    pub fn periodic_ref(&self) -> impl Iterator<Item = Sample<'_>> {
        self.iter_samples()
            .filter(|s| s.tag.starts_with("periodic_"))
    }

    /// Iterate over [`Sample`] views borrowing into this series.
    /// Each yielded `Sample<'_>` carries the tag, elapsed-ms,
    /// borrowed [`Snapshot`], borrowed `Option<&Value>` stats,
    /// and the per-sample phase step index.
    pub fn iter_samples(&self) -> impl Iterator<Item = Sample<'_>> {
        self.rows.iter().map(|r| Sample {
            tag: r.tag.as_str(),
            elapsed_ms: r.elapsed_ms,
            snapshot: Snapshot::new(&r.report),
            stats: r.stats.as_ref(),
            step_index: r.step_index,
        })
    }

    /// Group samples by their stamped scenario phase. The returned
    /// map is keyed by `step_index` (1-indexed phase encoding —
    /// `0` is BASELINE, `1..=N` align with scenario Step ordinals);
    /// each entry is the ordered run of samples that fell in that
    /// phase, preserving the iteration order produced by
    /// [`Self::iter_samples`].
    ///
    /// Samples that lack a stamped step index (the unstamped
    /// fixture path via
    /// [`super::snapshot::SnapshotBridge::capture`] /
    /// [`super::snapshot::SnapshotBridge::store`] /
    /// [`super::snapshot::SnapshotBridge::store_with_stats`]) fall
    /// under key `0` per the "no stamped index" fallback — the same
    /// bucket BASELINE samples land in. The fixture / BASELINE
    /// collision is acceptable because both flavours represent
    /// pre-first-Step (or unstamped) state from the bucketer's
    /// perspective; production callers that need to distinguish
    /// can inspect `Sample::step_index` directly.
    ///
    /// The phase-aware aggregator consumes this map to compute
    /// per-phase metric reductions (Counter `last - first` delta,
    /// Gauge / Peak / Timestamp via `crate::stats::aggregate_samples`).
    pub fn by_phase(&self) -> std::collections::BTreeMap<u16, Vec<Sample<'_>>> {
        let mut by_phase: std::collections::BTreeMap<u16, Vec<Sample<'_>>> =
            std::collections::BTreeMap::new();
        for sample in self.iter_samples() {
            let key = sample.step_index.unwrap_or(0);
            by_phase.entry(key).or_default().push(sample);
        }
        by_phase
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::monitor::btf_render::{RenderedMember, RenderedValue};
    use crate::monitor::dump::{FailureDumpMap, FailureDumpReport, SCHEMA_SINGLE};

    fn synthetic_report(value: u64) -> FailureDumpReport {
        let bss_value = RenderedValue::Struct {
            type_name: Some(".bss".into()),
            members: vec![
                RenderedMember {
                    name: "nr_dispatched".into(),
                    value: RenderedValue::Uint { bits: 64, value },
                },
                RenderedMember {
                    name: "stall".into(),
                    value: RenderedValue::Uint { bits: 8, value: 0 },
                },
            ],
        };
        let bss_map = FailureDumpMap {
            name: "scx_obj.bss".into(),
            map_kva: 0,
            map_type: 2,
            value_size: 16,
            max_entries: 1,
            value: Some(bss_value),
            entries: Vec::new(),
            percpu_entries: Vec::new(),
            percpu_hash_entries: Vec::new(),
            arena: None,
            ringbuf: None,
            stack_trace: None,
            fd_array: None,
            error: None,
        };
        FailureDumpReport {
            schema: SCHEMA_SINGLE.to_string(),
            active_map_kvas: Vec::new(),
            maps: vec![bss_map],
            ..Default::default()
        }
    }

    fn synthetic_stats(busy: f64) -> serde_json::Value {
        serde_json::json!({
            "busy": busy,
            "antistall": 0,
            "layers": {
                "batch": { "util": busy * 0.5 }
            }
        })
    }

    #[test]
    fn from_drained_preserves_order() {
        let drained = vec![
            (
                "periodic_000".to_string(),
                synthetic_report(10),
                Some(synthetic_stats(50.0)),
                Some(100),
            ),
            (
                "periodic_001".to_string(),
                synthetic_report(20),
                Some(synthetic_stats(60.0)),
                Some(200),
            ),
        ];
        let series = SampleSeries::from_drained(drained, None);
        assert_eq!(series.len(), 2);
        let tags: Vec<&str> = series.iter_samples().map(|s| s.tag).collect();
        assert_eq!(tags, vec!["periodic_000", "periodic_001"]);
    }

    #[test]
    fn periodic_only_filters_non_periodic_tags() {
        let drained = vec![
            (
                "periodic_000".to_string(),
                synthetic_report(10),
                None,
                Some(100),
            ),
            (
                "user_watchpoint_kind".to_string(),
                synthetic_report(99),
                None,
                Some(150),
            ),
            (
                "periodic_001".to_string(),
                synthetic_report(20),
                None,
                Some(200),
            ),
        ];
        let series = SampleSeries::from_drained(drained, None).periodic_only();
        assert_eq!(series.len(), 2);
    }
}