use anyhow::{Context, Result};
use std::path::{Path, PathBuf};
use crate::assert::AssertResult;
use crate::timeline::StimulusEvent;
use crate::vmm;
use super::output::{
classify_init_stage, extract_exit_from_dump_trace, extract_kernel_version,
extract_panic_message, extract_sched_ext_dump, format_console_diagnostics,
parse_assert_result_from_drain, sched_log_fingerprint,
};
use super::probe::attempt_auto_repro;
use super::profraw::write_profraw;
use super::sidecar::{write_sidecar, write_skip_sidecar};
use super::topo::TopoOverride;
use super::{KtstrTestEntry, SchedulerSpec, Topology};
mod kernel;
#[cfg(feature = "llm")]
mod llm_extract;
mod post_vm;
#[cfg(feature = "llm")]
pub(crate) use llm_extract::host_side_llm_extract;
pub use post_vm::post_vm_skip;
pub(crate) use post_vm::{
ExpectAutoReproSatisfied, HostSkipRequest, LLM_MODEL_LOAD_FAILED_PREFIX,
PostVmAssertionFailure, ScxBpfErrorMatcherMismatch, record_skip_sidecar, run_post_vm_callbacks,
should_skip_on_llm_model_load_failure,
};
mod reporting;
mod scheduler;
use crate::verifier::{SCHED_OUTPUT_START, parse_sched_output};
pub use kernel::{KernelUnavailable, resolve_test_kernel};
pub(crate) use kernel::{acquire_test_kernel_lock_if_cached, ensure_kvm};
pub use scheduler::{ResolveSource, resolve_scheduler};
pub(crate) use scheduler::{dedupe_include_files, resolve_staged_schedulers_strict};
use super::runtime::{config_content_parts, config_file_parts, verbose, vm_timeout_from_entry};
pub(crate) const ERR_TIMED_OUT_NO_RESULT: &str = "timed out (no result via bulk port or COM2)";
pub(crate) const ERR_MONITOR_FAILED_AFTER_SCENARIO: &str = "passed scenario but monitor failed";
pub(crate) const ERR_NO_TEST_RESULT_FROM_GUEST: &str = "no test result received from guest \
(no AssertResult arrived via bulk port or COM2; check kernel log and \
scheduler exit status)";
pub(crate) const ERR_NO_TEST_FUNCTION_OUTPUT: &str =
"test function produced no output (no test result found)";
pub(crate) const ERR_GUEST_CRASHED_PREFIX: &str = "guest crashed:";
pub(crate) fn run_ktstr_test_inner(
entry: &KtstrTestEntry,
topo: Option<&TopoOverride>,
) -> Result<AssertResult> {
let result = run_ktstr_test_inner_impl(entry, topo);
if let Err(ref e) = result
&& super::is_resource_contention(e)
{
record_skip_sidecar(entry);
}
match result {
Err(e) => {
if e.downcast_ref::<PostVmAssertionFailure>().is_some() {
Err(e)
} else if e.downcast_ref::<ExpectAutoReproSatisfied>().is_some() {
eprintln!("{e:#}");
Ok(AssertResult::pass())
} else {
Err(e)
}
}
Ok(r) => Ok(r),
}
}
fn write_placeholder_failure_dump_if_missing(path: &std::path::Path, result: &vmm::VmResult) {
if path.exists() {
return;
}
let stage_label =
crate::test_support::output::classify_init_stage(result.guest_messages.as_ref());
let sched_log_merged = crate::verifier::concat_sched_log_chunks(result.guest_messages.as_ref());
let sched_log_input: &str = if !sched_log_merged.is_empty() {
&sched_log_merged
} else {
&result.output
};
let raw_dump = extract_sched_ext_dump(&result.stderr).unwrap_or_default();
let bug_summary = crate::test_support::output::extract_bug_summary(sched_log_input, &raw_dump);
let reason = match bug_summary {
Some(s) => format!(
"test failed at stage `{stage_label}`; no BPF state captured \
(probe did not attach before failure). BUG SUMMARY: {s}"
),
None => format!(
"test failed at stage `{stage_label}`; no BPF state captured \
(probe did not attach before failure)"
),
};
let stub = crate::monitor::dump::FailureDumpReport::placeholder(reason);
let json = match serde_json::to_string_pretty(&stub) {
Ok(j) => j,
Err(e) => {
tracing::warn!(
error = %e,
path = %path.display(),
"eval: failed to serialize placeholder failure-dump"
);
return;
}
};
if let Some(parent) = path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
tracing::warn!(
error = %e,
path = %parent.display(),
"eval: failed to create parent dir for placeholder failure-dump"
);
return;
}
let tmp = path.with_extension("json.tmp");
let mut file = match std::fs::File::create(&tmp) {
Ok(f) => f,
Err(e) => {
tracing::warn!(
error = %e,
path = %tmp.display(),
"eval: failed to create placeholder failure-dump tmp file"
);
return;
}
};
use std::io::Write;
if let Err(e) = file.write_all(json.as_bytes()) {
tracing::warn!(
error = %e,
path = %tmp.display(),
"eval: failed to write placeholder failure-dump"
);
let _ = std::fs::remove_file(&tmp);
return;
}
if let Err(e) = file.sync_all() {
tracing::warn!(
error = %e,
path = %tmp.display(),
"eval: failed to fsync placeholder failure-dump tmp file"
);
}
drop(file);
if let Err(e) = std::fs::rename(&tmp, path) {
tracing::warn!(
error = %e,
tmp = %tmp.display(),
target = %path.display(),
"eval: failed to rename placeholder failure-dump tmp file"
);
let _ = std::fs::remove_file(&tmp);
}
}
fn run_ktstr_test_inner_impl(
entry: &KtstrTestEntry,
topo: Option<&TopoOverride>,
) -> Result<AssertResult> {
entry.validate().context("KtstrTestEntry validation")?;
if let Some(t) = topo {
t.validate().context("TopoOverride validation")?;
}
static FIRST_RAYON_CPUSET: std::sync::OnceLock<Vec<usize>> = std::sync::OnceLock::new();
let host_cpus = crate::vmm::host_topology::host_allowed_cpus();
if !host_cpus.is_empty() {
let cpus = host_cpus.clone();
let n = cpus.len();
let range = format!("{}-{}", cpus[0], cpus[n - 1]);
let cpus_for_handler = cpus.clone();
let built = rayon::ThreadPoolBuilder::new()
.num_threads(n.min(32))
.start_handler(move |_idx| {
let mut cpuset = nix::sched::CpuSet::new();
for &cpu in &cpus_for_handler {
let _ = cpuset.set(cpu);
}
let _ = nix::sched::sched_setaffinity(nix::unistd::Pid::from_raw(0), &cpuset);
})
.build_global()
.is_ok();
if built {
let _ = FIRST_RAYON_CPUSET.set(cpus);
eprintln!("no_perf_mode: rayon pool pinned to {n} CPUs ({range})");
} else if let Some(first) = FIRST_RAYON_CPUSET.get()
&& first != &host_cpus
{
let first_n = first.len();
let first_range = if first_n > 0 {
format!("{}-{}", first[0], first[first_n - 1])
} else {
"empty".to_string()
};
eprintln!(
"no_perf_mode: WARNING: rayon pool already pinned to {first_n} CPUs \
({first_range}); requested {n} CPUs ({range}) won't take effect — \
build_global is one-shot per process",
);
}
}
if entry.performance_mode && super::runtime::no_perf_mode_active() {
const REASON: &str =
"test requires performance_mode but --no-perf-mode or KTSTR_NO_PERF_MODE is active";
crate::report::test_skip(format_args!("{}: {REASON}", entry.name));
record_skip_sidecar(entry);
return Ok(AssertResult::skip(REASON));
}
ensure_kvm()?;
let kernel = resolve_test_kernel()?;
let kernel_lock = acquire_test_kernel_lock_if_cached(&kernel)?;
let scheduler = resolve_scheduler(&entry.scheduler.binary)?.0;
let resolved_staged = resolve_staged_schedulers_strict(entry, |spec| {
resolve_scheduler(spec).map(|(opt, _src)| opt)
})?;
let ktstr_bin = crate::resolve_current_exe()?;
let guest_args = vec![
"run".to_string(),
"--ktstr-test-fn".to_string(),
entry.name.to_string(),
];
let cmdline_extra = super::runtime::build_cmdline_extra(entry);
let (vm_topology, memory_mib) = super::runtime::resolve_vm_topology(entry, topo);
let no_perf_mode = super::runtime::no_perf_mode_for_entry(entry);
let primary_dump_path =
super::sidecar::sidecar_dir().join(format!("{}.failure-dump.json", entry.name));
let repro_dump_path =
super::sidecar::sidecar_dir().join(format!("{}.repro.failure-dump.json", entry.name));
for stale in [&primary_dump_path, &repro_dump_path] {
match std::fs::remove_file(stale) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => tracing::warn!(
path = %stale.display(),
error = %e,
"eval: failed to pre-clear stale failure-dump file"
),
}
}
let mut builder = super::runtime::build_vm_builder_base(
entry,
&kernel,
&ktstr_bin,
scheduler.as_deref(),
&resolved_staged,
vm_topology,
memory_mib,
&cmdline_extra,
&guest_args,
no_perf_mode,
)
.failure_dump_path(primary_dump_path.clone())
.performance_mode(entry.performance_mode);
let merged_assert = crate::assert::Assert::default_checks()
.merge(&entry.scheduler.assert)
.merge(&entry.assert);
#[cfg(feature = "wprof")]
{
builder = super::runtime::attach_wprof_if_requested(builder, entry, "primary")?;
}
if let SchedulerSpec::KernelBuiltin { enable, disable } = &entry.scheduler.binary {
builder = builder.sched_enable_cmds(enable);
builder = builder.sched_disable_cmds(disable);
}
if entry.scheduler.has_bpf_scheduler() {
builder = builder.monitor_thresholds(merged_assert.monitor_thresholds());
}
let mut sched_args: Vec<String> = Vec::new();
let declarative_specs: Vec<std::path::PathBuf> = entry
.all_include_files()
.into_iter()
.map(std::path::PathBuf::from)
.collect();
let mut resolved_includes: Vec<(String, std::path::PathBuf, &'static str)> =
if declarative_specs.is_empty() {
Vec::new()
} else {
crate::cli::resolve_include_files(&declarative_specs)
.context("resolving declarative include_files from Payload definitions")?
.into_iter()
.map(|(a, h)| (a, h, "declarative"))
.collect()
};
if let Some((archive_path, host_path, guest_path)) = config_file_parts(entry) {
resolved_includes.push((archive_path, host_path, "scheduler config_file"));
sched_args.push("--config".to_string());
sched_args.push(guest_path);
}
if let Some((archive_path, host_path, _guest_path, args)) = config_content_parts(entry) {
resolved_includes.push((archive_path, host_path, "inline config_content"));
sched_args.extend(args);
}
let unioned = dedupe_include_files(&resolved_includes)?;
if !unioned.is_empty() {
builder = builder.include_files(unioned);
}
super::runtime::append_base_sched_args(entry, &mut sched_args);
if !sched_args.is_empty() {
builder = builder.sched_args(&sched_args);
}
#[cfg(target_arch = "x86_64")]
struct CpuStateGuard {
#[allow(dead_code)]
xsave_buf: Vec<u8>,
align_ptr: *mut u8,
has_xsave: bool,
has_fsgsbase: bool,
has_pku: bool,
fsbase: u64,
gsbase: u64,
pkru: u32,
sigmask: libc::sigset_t,
sigrtmin_action: libc::sigaction,
}
#[cfg(target_arch = "x86_64")]
impl Drop for CpuStateGuard {
fn drop(&mut self) {
unsafe {
if self.has_xsave {
core::arch::asm!(
"xrstor [{}]",
in(reg) self.align_ptr,
in("eax") 0xFFFF_FFFFu32,
in("edx") 0xFFFF_FFFFu32,
options(nostack),
);
}
if self.has_fsgsbase {
core::arch::asm!("wrfsbase {}", in(reg) self.fsbase, options(nostack));
core::arch::asm!("wrgsbase {}", in(reg) self.gsbase, options(nostack));
}
if self.has_pku {
core::arch::asm!(
"xor ecx, ecx", "wrpkru",
in("eax") self.pkru, in("edx") 0u32, out("ecx") _,
options(nostack),
);
}
libc::pthread_sigmask(libc::SIG_SETMASK, &self.sigmask, std::ptr::null_mut());
libc::sigaction(
libc::SIGRTMIN(),
&self.sigrtmin_action,
std::ptr::null_mut(),
);
}
}
}
#[cfg(target_arch = "aarch64")]
#[repr(C, align(16))]
struct FpsimdState {
v: [u128; 32],
fpcr: u64,
fpsr: u64,
}
#[cfg(target_arch = "aarch64")]
const _: () = {
assert!(std::mem::offset_of!(FpsimdState, v) == 0);
assert!(std::mem::offset_of!(FpsimdState, fpcr) == 512);
assert!(std::mem::offset_of!(FpsimdState, fpsr) == 520);
assert!(std::mem::align_of::<FpsimdState>() == 16);
};
#[cfg(target_arch = "aarch64")]
struct CpuStateGuard {
fpsimd: Option<Box<FpsimdState>>,
sigmask: libc::sigset_t,
sigrtmin_action: libc::sigaction,
}
#[cfg(target_arch = "aarch64")]
impl Drop for CpuStateGuard {
fn drop(&mut self) {
unsafe {
if let Some(fp) = self.fpsimd.as_deref() {
let ptr = fp as *const FpsimdState as *const u8;
core::arch::asm!(
"ldp q0, q1, [{p}, #0]",
"ldp q2, q3, [{p}, #32]",
"ldp q4, q5, [{p}, #64]",
"ldp q6, q7, [{p}, #96]",
"ldp q8, q9, [{p}, #128]",
"ldp q10, q11, [{p}, #160]",
"ldp q12, q13, [{p}, #192]",
"ldp q14, q15, [{p}, #224]",
"ldp q16, q17, [{p}, #256]",
"ldp q18, q19, [{p}, #288]",
"ldp q20, q21, [{p}, #320]",
"ldp q22, q23, [{p}, #352]",
"ldp q24, q25, [{p}, #384]",
"ldp q26, q27, [{p}, #416]",
"ldp q28, q29, [{p}, #448]",
"ldp q30, q31, [{p}, #480]",
"ldr {tmp}, [{p}, #512]",
"msr FPCR, {tmp}",
"ldr {tmp}, [{p}, #520]",
"msr FPSR, {tmp}",
p = in(reg) ptr,
tmp = out(reg) _,
out("v0") _, out("v1") _, out("v2") _, out("v3") _,
out("v4") _, out("v5") _, out("v6") _, out("v7") _,
out("v8") _, out("v9") _, out("v10") _, out("v11") _,
out("v12") _, out("v13") _, out("v14") _, out("v15") _,
out("v16") _, out("v17") _, out("v18") _, out("v19") _,
out("v20") _, out("v21") _, out("v22") _, out("v23") _,
out("v24") _, out("v25") _, out("v26") _, out("v27") _,
out("v28") _, out("v29") _, out("v30") _, out("v31") _,
options(nostack, readonly),
);
}
libc::pthread_sigmask(libc::SIG_SETMASK, &self.sigmask, std::ptr::null_mut());
libc::sigaction(
libc::SIGRTMIN(),
&self.sigrtmin_action,
std::ptr::null_mut(),
);
}
}
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
struct CpuStateGuard {
sigmask: libc::sigset_t,
sigrtmin_action: libc::sigaction,
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
impl Drop for CpuStateGuard {
fn drop(&mut self) {
unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &self.sigmask, std::ptr::null_mut());
libc::sigaction(
libc::SIGRTMIN(),
&self.sigrtmin_action,
std::ptr::null_mut(),
);
}
}
}
#[cfg(target_arch = "x86_64")]
let _cpu_guard = unsafe {
let has_xsave = std::arch::is_x86_feature_detected!("xsave");
let has_fsgsbase = core::arch::x86_64::__cpuid_count(7, 0).ebx & 1 != 0;
let cpuid7 = core::arch::x86_64::__cpuid_count(7, 0);
let has_pku = (cpuid7.ecx & (1 << 3)) != 0 && (cpuid7.ecx & (1 << 4)) != 0;
let mut fsbase: u64 = 0;
let mut gsbase: u64 = 0;
if has_fsgsbase {
core::arch::asm!("rdfsbase {}", out(reg) fsbase, options(nostack));
core::arch::asm!("rdgsbase {}", out(reg) gsbase, options(nostack));
}
let mut pkru: u32 = 0;
if has_pku {
core::arch::asm!(
"xor ecx, ecx", "rdpkru",
out("eax") pkru, out("ecx") _, out("edx") _,
options(nostack),
);
}
let xsave_size = if has_xsave {
let cpuid = core::arch::x86_64::__cpuid_count(0xD, 0);
(cpuid.ebx as usize).max(16384)
} else {
0
};
let mut xsave_buf = vec![0u8; xsave_size + 64];
let align_ptr = ((xsave_buf.as_mut_ptr() as usize + 63) & !63) as *mut u8;
if has_xsave {
core::arch::asm!(
"xsave [{}]",
in(reg) align_ptr,
in("eax") 0xFFFF_FFFFu32,
in("edx") 0xFFFF_FFFFu32,
options(nostack),
);
}
let mut sigmask: libc::sigset_t = std::mem::zeroed();
libc::pthread_sigmask(libc::SIG_SETMASK, std::ptr::null(), &mut sigmask);
let mut sigrtmin_action: libc::sigaction = std::mem::zeroed();
libc::sigaction(libc::SIGRTMIN(), std::ptr::null(), &mut sigrtmin_action);
CpuStateGuard {
xsave_buf,
align_ptr,
has_xsave,
has_fsgsbase,
has_pku,
fsbase,
gsbase,
pkru,
sigmask,
sigrtmin_action,
}
};
#[cfg(target_arch = "aarch64")]
let _cpu_guard = unsafe {
let fpsimd = if std::arch::is_aarch64_feature_detected!("fp") {
let mut state = Box::new(FpsimdState {
v: [0u128; 32],
fpcr: 0,
fpsr: 0,
});
let ptr = state.as_mut() as *mut FpsimdState as *mut u8;
core::arch::asm!(
"stp q0, q1, [{p}, #0]",
"stp q2, q3, [{p}, #32]",
"stp q4, q5, [{p}, #64]",
"stp q6, q7, [{p}, #96]",
"stp q8, q9, [{p}, #128]",
"stp q10, q11, [{p}, #160]",
"stp q12, q13, [{p}, #192]",
"stp q14, q15, [{p}, #224]",
"stp q16, q17, [{p}, #256]",
"stp q18, q19, [{p}, #288]",
"stp q20, q21, [{p}, #320]",
"stp q22, q23, [{p}, #352]",
"stp q24, q25, [{p}, #384]",
"stp q26, q27, [{p}, #416]",
"stp q28, q29, [{p}, #448]",
"stp q30, q31, [{p}, #480]",
"mrs {tmp}, FPCR",
"str {tmp}, [{p}, #512]",
"mrs {tmp}, FPSR",
"str {tmp}, [{p}, #520]",
p = in(reg) ptr,
tmp = out(reg) _,
options(nostack),
);
Some(state)
} else {
None
};
let mut sigmask: libc::sigset_t = std::mem::zeroed();
libc::pthread_sigmask(libc::SIG_SETMASK, std::ptr::null(), &mut sigmask);
let mut sigrtmin_action: libc::sigaction = std::mem::zeroed();
libc::sigaction(libc::SIGRTMIN(), std::ptr::null(), &mut sigrtmin_action);
CpuStateGuard {
fpsimd,
sigmask,
sigrtmin_action,
}
};
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
let _cpu_guard = unsafe {
let mut sigmask: libc::sigset_t = std::mem::zeroed();
libc::pthread_sigmask(libc::SIG_SETMASK, std::ptr::null(), &mut sigmask);
let mut sigrtmin_action: libc::sigaction = std::mem::zeroed();
libc::sigaction(libc::SIGRTMIN(), std::ptr::null(), &mut sigrtmin_action);
CpuStateGuard {
sigmask,
sigrtmin_action,
}
};
#[cfg(feature = "wprof")]
if entry.wprof {
let sidecar = crate::test_support::sidecar_dir();
let _ = std::fs::remove_file(sidecar.join(format!("{}.wprof.pb", entry.name)));
let _ = std::fs::remove_file(sidecar.join(format!("{}.repro.wprof.pb", entry.name)));
}
let vm = match builder.build() {
Ok(vm) => vm,
Err(e) => {
if e.downcast_ref::<crate::vmm::host_topology::ResourceContention>()
.is_some()
{
record_skip_sidecar(entry);
}
return Err(e.context("build ktstr_test VM"));
}
};
let mut result = match vm.run() {
Ok(r) => r,
Err(e) => {
if e.downcast_ref::<crate::vmm::host_topology::ResourceContention>()
.is_some()
{
record_skip_sidecar(entry);
}
return Err(e.context("run ktstr_test VM"));
}
};
result.entry_name = Some(entry.name);
if !result.success {
write_placeholder_failure_dump_if_missing(&primary_dump_path, &result);
}
#[cfg(feature = "wprof")]
if let Some(ref bulk) = result.guest_messages {
for bulk_entry in &bulk.entries {
if crate::vmm::wire::MsgType::from_wire(bulk_entry.msg_type)
== Some(crate::vmm::wire::MsgType::WprofTrace)
&& bulk_entry.crc_ok
&& !bulk_entry.payload.is_empty()
{
let wprof_path =
crate::test_support::sidecar_dir().join(format!("{}.wprof.pb", entry.name));
if let Err(e) = std::fs::create_dir_all(
wprof_path
.parent()
.expect("sidecar_dir join always has parent"),
) {
eprintln!("ktstr_test: create sidecar dir for wprof trace: {e}");
} else if let Err(e) = std::fs::write(&wprof_path, &bulk_entry.payload) {
eprintln!(
"ktstr_test: write wprof trace to {}: {e}",
wprof_path.display()
);
}
}
}
}
let guest_already_failed = parse_assert_result_from_drain(result.guest_messages.as_ref())
.map(|r| r.is_fail())
.unwrap_or(false);
let post_vm_err = run_post_vm_callbacks(entry, &result, guest_already_failed);
if post_vm_err.is_some() {
write_placeholder_failure_dump_if_missing(&primary_dump_path, &result);
}
let post_vm_t = std::time::Instant::now();
drop(vm);
drop(kernel_lock);
let host_cpus = crate::vmm::host_topology::host_allowed_cpus();
if !host_cpus.is_empty() {
crate::vmm::set_thread_cpumask(&host_cpus, "test");
}
if !result.verifier_stats.is_empty() {
eprintln!(
"ktstr_test: verifier_stats: {} struct_ops programs",
result.verifier_stats.len(),
);
}
if entry.scheduler.has_bpf_scheduler() && result.success && result.verifier_stats.is_empty() {
eprintln!("ktstr_test: WARNING: scheduler loaded but verifier_stats is empty");
}
let mut stimulus_events = Vec::new();
let mut payload_metrics: Vec<crate::test_support::PayloadMetrics> = Vec::new();
let mut raw_outputs: Vec<crate::test_support::RawPayloadOutput> = Vec::new();
if let Some(ref bulk) = result.guest_messages {
for bulk_entry in &bulk.entries {
let kind = crate::vmm::wire::MsgType::from_wire(bulk_entry.msg_type);
match kind {
Some(crate::vmm::wire::MsgType::Profraw) => {
if bulk_entry.crc_ok
&& !bulk_entry.payload.is_empty()
&& let Err(e) = write_profraw(&bulk_entry.payload)
{
eprintln!("ktstr_test: write guest profraw: {e}");
}
}
Some(crate::vmm::wire::MsgType::WprofTrace) => {
if bulk_entry.crc_ok && !bulk_entry.payload.is_empty() {
let wprof_path = crate::test_support::sidecar_dir()
.join(format!("{}.wprof.pb", entry.name));
if let Err(e) = std::fs::create_dir_all(
wprof_path
.parent()
.expect("sidecar_dir join always has parent"),
) {
eprintln!("ktstr_test: create sidecar dir for wprof trace: {e}");
} else if let Err(e) = std::fs::write(&wprof_path, &bulk_entry.payload) {
eprintln!(
"ktstr_test: write wprof trace to {}: {e}",
wprof_path.display()
);
}
}
}
Some(crate::vmm::wire::MsgType::Stimulus) => {
if bulk_entry.crc_ok
&& let Some(ev) =
crate::vmm::wire::StimulusEvent::from_payload(&bulk_entry.payload)
{
stimulus_events.push(crate::timeline::StimulusEvent::from_wire(&ev));
}
}
Some(crate::vmm::wire::MsgType::PayloadMetrics) => {
if bulk_entry.crc_ok {
match postcard::from_bytes::<crate::test_support::PayloadMetrics>(
&bulk_entry.payload,
) {
Ok(pm) => payload_metrics.push(pm),
Err(e) => {
eprintln!("ktstr_test: decode payload metrics from bulk port: {e}")
}
}
}
}
Some(crate::vmm::wire::MsgType::RawPayloadOutput) => {
if bulk_entry.crc_ok {
match postcard::from_bytes::<crate::test_support::RawPayloadOutput>(
&bulk_entry.payload,
) {
Ok(raw) => raw_outputs.push(raw),
Err(e) => eprintln!(
"ktstr_test: decode raw payload output from bulk port: {e}"
),
}
}
}
Some(
crate::vmm::wire::MsgType::TestResult
| crate::vmm::wire::MsgType::Exit
| crate::vmm::wire::MsgType::SchedExit
| crate::vmm::wire::MsgType::ScenarioStart
| crate::vmm::wire::MsgType::ScenarioEnd
| crate::vmm::wire::MsgType::ScenarioPause
| crate::vmm::wire::MsgType::ScenarioResume
| crate::vmm::wire::MsgType::Stdout
| crate::vmm::wire::MsgType::SchedLog
| crate::vmm::wire::MsgType::Lifecycle
| crate::vmm::wire::MsgType::ExecExit
| crate::vmm::wire::MsgType::Dmesg
| crate::vmm::wire::MsgType::ProbeOutput
| crate::vmm::wire::MsgType::SnapshotReply
| crate::vmm::wire::MsgType::Crash,
) => {}
Some(crate::vmm::wire::MsgType::Stderr) => {
if bulk_entry.crc_ok && !bulk_entry.payload.is_empty() {
eprint!("GUEST: {}", String::from_utf8_lossy(&bulk_entry.payload));
}
}
Some(crate::vmm::wire::MsgType::SnapshotRequest)
| Some(crate::vmm::wire::MsgType::KernelOpRequest)
| Some(crate::vmm::wire::MsgType::KernelOpReply)
| Some(crate::vmm::wire::MsgType::SysRdy) => {}
None => {
tracing::warn!(
msg_type = bulk_entry.msg_type,
len = bulk_entry.payload.len(),
crc_ok = bulk_entry.crc_ok,
"ktstr_test: unknown MSG_TYPE_* on bulk port; dropping"
);
}
}
}
}
#[cfg(feature = "llm")]
let host_extract_failures = host_side_llm_extract(&mut payload_metrics, &raw_outputs);
#[cfg(not(feature = "llm"))]
let host_extract_failures: Vec<crate::assert::AssertDetail> = Vec::new();
if let Some(err) = &post_vm_err
&& err.downcast_ref::<HostSkipRequest>().is_some()
{
let reason = format!("{err:#}");
crate::report::test_skip(format_args!("{}: {}", entry.name, reason));
record_skip_sidecar(entry);
return Ok(AssertResult::skip(reason));
}
if let Some(skip_reason) =
should_skip_on_llm_model_load_failure(&host_extract_failures, post_vm_err.is_some())
{
crate::report::test_skip(format_args!("{}: {}", entry.name, skip_reason));
record_skip_sidecar(entry);
return Ok(AssertResult::skip(skip_reason));
}
let effective_auto_repro = entry.auto_repro && scheduler.is_some() && !entry.expect_err;
let primary_exit_kind = {
let dump_path =
super::sidecar::sidecar_dir().join(format!("{}.failure-dump.json", entry.name));
std::fs::read_to_string(&dump_path)
.ok()
.and_then(|json| serde_json::from_str::<serde_json::Value>(&json).ok())
.and_then(|v| {
v.get("scx_sched_state")
.and_then(|s| s.get("exit_kind"))
.and_then(|k| k.as_u64())
})
};
let primary_reached_workload =
crate::test_support::output::primary_reached_workload(result.guest_messages.as_ref());
let repro_fn = |output: &str| -> Option<String> {
if !effective_auto_repro {
return None;
}
let repro = attempt_auto_repro(
entry,
&kernel,
scheduler.as_deref(),
&ktstr_bin,
output,
&result.stderr,
topo,
primary_exit_kind,
primary_reached_workload,
);
Some(repro.unwrap_or_else(|| {
"auto-repro: no probe data — the scheduler may have \
exited before probes could capture events, or the \
crash did not reproduce in the repro VM. Re-run with \
RUST_LOG=debug for probe pipeline diagnostics. Check \
the sched_ext dump and scheduler log sections above \
for crash details."
.to_string()
}))
};
eprintln!("post-VM overhead before eval: {:?}", post_vm_t.elapsed());
let eval_result = evaluate_vm_result(
entry,
&result,
&merged_assert,
&stimulus_events,
&payload_metrics,
&host_extract_failures,
&vm_topology,
&repro_fn,
post_vm_err.as_ref(),
);
apply_expect_auto_repro_inversion(entry, &mut result);
let eval_result = if result.expect_auto_repro_satisfied {
eval_result.map_err(|e| e.context(ExpectAutoReproSatisfied))
} else {
eval_result
};
let eval_result = if post_vm_err.is_some() {
eval_result.map_err(|e| e.context(PostVmAssertionFailure))
} else {
eval_result
};
eprintln!(
"evaluate_vm_result (includes auto-repro): {:?}",
post_vm_t.elapsed()
);
eval_result
}
pub(crate) fn apply_expect_auto_repro_inversion(
entry: &KtstrTestEntry,
result: &mut vmm::VmResult,
) {
#[cfg(not(feature = "wprof"))]
let _ = (entry, result);
#[cfg(feature = "wprof")]
{
if !entry.expect_auto_repro {
return;
}
if result.success {
return;
}
let Ok(repro_path) = result.repro_wprof_pb_path() else {
return;
};
if crate::test_support::wprof::assert_wprof_pb_shape(&repro_path).is_ok() {
result.expect_auto_repro_satisfied = true;
}
}
}
#[allow(clippy::too_many_arguments)]
fn evaluate_vm_result(
entry: &KtstrTestEntry,
result: &vmm::VmResult,
merged_assert: &crate::assert::Assert,
stimulus_events: &[StimulusEvent],
payload_metrics: &[crate::test_support::PayloadMetrics],
host_extract_failures: &[crate::assert::AssertDetail],
topo: &Topology,
repro_fn: &dyn Fn(&str) -> Option<String>,
post_vm_err: Option<&anyhow::Error>,
) -> Result<AssertResult> {
let drained_for_phases = result.snapshot_bridge.drain_ordered_with_stats();
let early_sample_series = crate::scenario::sample::SampleSeries::from_drained_typed(
drained_for_phases,
result.monitor.clone(),
);
let mut early_phase_buckets =
crate::assert::build_phase_buckets_with_stimulus(&early_sample_series, stimulus_events);
let timeline = if !early_phase_buckets.is_empty() {
Some(crate::timeline::Timeline::from_phase_buckets(
&early_phase_buckets,
stimulus_events,
&crate::timeline::TimelineContext::default(),
))
} else {
result
.monitor
.as_ref()
.map(|m| crate::timeline::Timeline::build(stimulus_events, &m.samples))
};
let sched_label = reporting::scheduler_label(&entry.scheduler.binary);
let output = &result.output;
let raw_dump = extract_sched_ext_dump(&result.stderr).unwrap_or_default();
let dump_section = if raw_dump.is_empty() {
String::new()
} else {
format!("\n\n--- sched_ext dump ---\n{raw_dump}")
};
let sched_log_merged = crate::verifier::concat_sched_log_chunks(result.guest_messages.as_ref());
let sched_log_input: &str = if !sched_log_merged.is_empty() {
&sched_log_merged
} else {
output
};
let sched_log_section = parse_sched_output(sched_log_input)
.map(|s| {
let collapsed = crate::verifier::collapse_cycles(s);
let is_verifier = collapsed.contains("processed") && collapsed.contains("insns");
let lines: Vec<&str> = collapsed.lines().collect();
let tail = if !is_verifier && lines.len() > 200 {
let skipped = lines.len() - 200;
format!(
"[{skipped} lines truncated]\n{}",
lines[lines.len() - 200..].join("\n")
)
} else {
collapsed
};
format!("\n\n--- scheduler log ---\n{tail}")
})
.unwrap_or_default();
let fingerprint_line = sched_log_fingerprint(sched_log_input)
.map(|fp| {
if crate::cli::stderr_color() {
format!("\x1b[1;31m{fp}\x1b[0m\n")
} else {
format!("{fp}\n")
}
})
.unwrap_or_default();
let bug_summary_line = || -> String {
match crate::test_support::output::extract_bug_summary(sched_log_input, &raw_dump) {
Some(text) => {
if crate::cli::stderr_color() {
format!("\x1b[1;31mBUG SUMMARY:\x1b[0m {text}\n")
} else {
format!("BUG SUMMARY: {text}\n")
}
}
None => String::new(),
}
};
let tl_ctx = crate::timeline::TimelineContext {
kernel: extract_kernel_version(&result.stderr),
topology: Some(format!("{topo} ({} cpus)", topo.total_cpus())),
scheduler: Some(entry.scheduler.name.to_string()),
scenario: Some(entry.name.to_string()),
duration_s: Some(result.duration.as_secs_f64()),
};
let build_timeline_section = || -> String {
timeline
.as_ref()
.filter(|t| !t.phases.is_empty())
.map(|t| format!("\n\n{}", t.format_with_context(&tl_ctx)))
.unwrap_or_default()
};
let build_monitor_section = || -> String {
if entry.scheduler.has_active_scheduling()
&& let Some(ref monitor) = result.monitor
{
reporting::format_monitor_section(monitor, merged_assert)
} else {
String::new()
}
};
if let Ok(mut check_result) = parse_assert_result_from_drain(result.guest_messages.as_ref()) {
for detail in host_extract_failures {
check_result.merge(AssertResult::fail(detail.clone()));
}
if let Some(err) = post_vm_err {
check_result.merge(AssertResult::fail(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Other,
format!("post_vm callback returned Err: {err:#}"),
)));
}
if let (Some(budget), Some(measured)) = (entry.cleanup_budget, result.cleanup_duration)
&& measured > budget
{
check_result.merge(AssertResult::fail(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Other,
format!(
"vm cleanup overran budget: measured {:.3}s, budget {:.3}s. \
Likely a regression in host-side teardown — investigate \
the post-BSP-exit join/drain path \
(`vmm::KtstrVm::collect_results`).",
measured.as_secs_f64(),
budget.as_secs_f64(),
),
)));
}
let matcher_configured = merged_assert.expect_scx_bpf_error_contains.is_some()
|| merged_assert.expect_scx_bpf_error_matches.is_some();
let matcher_details = if matcher_configured {
let matcher_corpus = format!("{sched_log_input}\n{dump_section}");
merged_assert.evaluate_scx_bpf_error_match(&matcher_corpus, entry.expect_err)
} else {
Vec::new()
};
let matcher_mismatch = !matcher_details.is_empty();
for d in matcher_details {
check_result.merge(AssertResult::fail(d));
}
let args: Vec<String> = std::env::args().collect();
let work_type =
super::args::extract_work_type_arg(&args).unwrap_or_else(|| "SpinWait".to_string());
if let Err(e) = write_sidecar(
entry,
result,
stimulus_events,
&check_result,
&work_type,
payload_metrics,
) {
eprintln!("ktstr_test: {e:#}");
}
if !check_result.is_pass() {
let details = check_result
.failure_details()
.chain(check_result.inconclusive_details())
.chain(check_result.skip_details())
.map(|d| d.message.as_str())
.collect::<Vec<_>>()
.join("\n ");
let info_section = if check_result.info_notes.is_empty() {
String::new()
} else {
let lines: Vec<String> = check_result
.info_notes
.iter()
.map(|n| format!(" {}", n.message))
.collect();
format!("\n\n--- info ---\n{}", lines.join("\n"))
};
let repro = if entry.scheduler.has_active_scheduling() {
repro_fn(output)
} else {
None
};
let repro_section = repro
.map(|r| format!("\n\n--- auto-repro ---\n{r}"))
.unwrap_or_default();
let timeline_section = build_timeline_section();
let stats_section = if !check_result.stats.cgroups.is_empty() {
let s = &check_result.stats;
let mut lines = vec![format!(
"\n\n--- stats ---\n{} workers, {} cpus, {} migrations, worst_spread={:.1}%, worst_gap={}ms",
s.total_workers,
s.total_cpus,
s.total_migrations,
s.worst_spread,
s.worst_gap_ms,
)];
for (i, cg) in s.cgroups.iter().enumerate() {
lines.push(format!(
" cg{}: workers={} cpus={} spread={:.1}% gap={}ms migrations={} iter={}",
i,
cg.num_workers,
cg.num_cpus,
cg.spread,
cg.max_gap_ms,
cg.total_migrations,
cg.total_iterations,
));
}
lines.join("\n")
} else {
String::new()
};
let console_section = if check_result.failure_details().any(|d| {
matches!(
d.kind,
crate::assert::DetailKind::SchedulerCrashed
| crate::assert::DetailKind::SchedulerExitedCleanly
| crate::assert::DetailKind::SchedulerDiedUnknownReason
)
}) || verbose()
{
let init_stage = classify_init_stage(result.guest_messages.as_ref());
format_console_diagnostics(&result.stderr, result.exit_code, init_stage)
} else {
String::new()
};
let monitor_section = build_monitor_section();
let periodic_section =
crate::test_support::output::format_periodic_samples_section(result);
let temporal_section =
crate::test_support::output::format_temporal_assertions_section(&check_result);
let verdict_word = if check_result.is_inconclusive() {
"inconclusive"
} else {
"failed"
};
let msg = format!(
"{}{}ktstr_test '{}'{} [topo={}] {verdict_word}:\n {}{}{}{}{}{}{}{}{}{}{}",
fingerprint_line,
bug_summary_line(),
entry.name,
sched_label,
topo,
details,
info_section,
stats_section,
console_section,
timeline_section,
periodic_section,
temporal_section,
sched_log_section,
monitor_section,
dump_section,
repro_section,
);
let err = anyhow::anyhow!("{msg}");
return Err(if matcher_mismatch {
err.context(ScxBpfErrorMatcherMismatch)
} else {
err
});
}
if entry.scheduler.has_active_scheduling()
&& merged_assert.has_monitor_thresholds()
&& let Some(ref monitor) = result.monitor
{
let eval_report = reporting::trim_settle_samples(monitor);
let thresholds = merged_assert.monitor_thresholds();
let verdict = thresholds.evaluate(&eval_report);
if verdict.is_fail() {
let details = verdict.details.join("\n ");
let timeline_section = build_timeline_section();
let monitor_section = reporting::format_monitor_section(monitor, merged_assert);
let msg = format!(
"{}{}ktstr_test '{}'{} [topo={}] {ERR_MONITOR_FAILED_AFTER_SCENARIO}:\n {}{}{}{}{}",
fingerprint_line,
bug_summary_line(),
entry.name,
sched_label,
topo,
details,
timeline_section,
monitor_section,
sched_log_section,
dump_section,
);
anyhow::bail!("{msg}");
} else if verdict.is_inconclusive() {
check_result.merge(crate::assert::AssertResult::inconclusive(
crate::assert::AssertDetail::new(
crate::assert::DetailKind::Monitor,
format!("monitor evaluation inconclusive: {}", verdict.summary),
),
));
}
}
check_result.stats.phases = std::mem::take(&mut early_phase_buckets);
let sample_series_for_phases = &early_sample_series;
crate::assert::populate_run_ext_metrics(
sample_series_for_phases,
&mut check_result.stats.ext_metrics,
);
crate::assert::populate_run_ext_metrics_from_phases(
&check_result.stats.phases,
&mut check_result.stats.ext_metrics,
);
return Ok(check_result);
}
let repro_section = if entry.scheduler.has_active_scheduling() {
repro_fn(output)
.map(|r| format!("\n\n--- auto-repro ---\n{r}"))
.unwrap_or_default()
} else {
String::new()
};
let bulk_sched_log = crate::verifier::concat_sched_log_chunks(result.guest_messages.as_ref());
let has_sched_output =
output.contains(SCHED_OUTPUT_START) || bulk_sched_log.contains(SCHED_OUTPUT_START);
let console_section =
if !has_sched_output || verbose() || entry.scheduler.has_active_scheduling() {
let init_stage = classify_init_stage(result.guest_messages.as_ref());
format_console_diagnostics(&result.stderr, result.exit_code, init_stage)
} else {
String::new()
};
let timeline_section = build_timeline_section();
let monitor_section = build_monitor_section();
let post_vm_prefix = post_vm_err
.map(|e| format!("post_vm callback returned Err: {e:#}\n\n"))
.unwrap_or_default();
if result.timed_out {
let crash_section = if let Some(ref guest_crash) = result.crash_message {
format!("\n\n{ERR_GUEST_CRASHED_PREFIX}\n{guest_crash}")
} else {
String::new()
};
let vm_timeout = vm_timeout_from_entry(entry);
let watchdog_section = format!(
"\n\n--- watchdog ---\n\
elapsed={:?} (VM run wall-clock)\n\
vm_timeout={:?} (host watchdog deadline = max(watchdog_timeout, \
duration, 1s) + vCPU-scaled vm_boot_headroom [+ 30s cold-BTF \
budget for bpf_map_write tests])\n\
watchdog_timeout={:?} (scx_sched.watchdog_timeout override)\n\
duration={:?} (workload duration)\n\
hint: if the test body needs more wall time, increase \
duration (the `duration` field on `KtstrTestEntry` / \
`#[ktstr_test(duration_ms = ...)]`); the VM timeout adds \
vCPU-scaled boot headroom on top of max(watchdog_timeout, \
duration, 1s), so raising duration also extends the host \
watchdog deadline",
result.duration, vm_timeout, entry.watchdog_timeout, entry.duration,
);
let timeout_reason = {
let scx_exits = crate::monitor::dmesg_scx::parse_kmsg_window(&result.stderr);
if let Some(ev) = scx_exits.last() {
if ev.message.is_empty() {
format!("timed out (scheduler {} exited)", ev.scheduler_name)
} else {
format!("timed out (scheduler exited: {})", ev.message)
}
} else {
ERR_TIMED_OUT_NO_RESULT.to_string()
}
};
let msg = format!(
"{post_vm_prefix}{}{}ktstr_test '{}'{} [topo={}] {}{}{}{}{}{}{}{}{}",
fingerprint_line,
bug_summary_line(),
entry.name,
sched_label,
topo,
timeout_reason,
watchdog_section,
crash_section,
console_section,
timeline_section,
sched_log_section,
dump_section,
monitor_section,
repro_section,
);
anyhow::bail!("{msg}");
}
let reason = if let Some(ref guest_crash) = result.crash_message {
format!("{ERR_GUEST_CRASHED_PREFIX}\n{guest_crash}")
} else if let Some(crash_msg) = extract_panic_message(output) {
format!("{ERR_GUEST_CRASHED_PREFIX} {crash_msg}")
} else if entry.scheduler.has_active_scheduling() {
let scx_exits = crate::monitor::dmesg_scx::parse_kmsg_window(&result.stderr);
if let Some(ev) = scx_exits.last() {
if ev.message.is_empty() {
format!("scheduler exited ({})", ev.scheduler_name)
} else {
format!("scheduler exited: {}", ev.message)
}
} else if let Some(reason) = extract_exit_from_dump_trace(&result.stderr) {
format!("scheduler exited: {reason}")
} else {
ERR_NO_TEST_RESULT_FROM_GUEST.to_string()
}
} else {
ERR_NO_TEST_FUNCTION_OUTPUT.to_string()
};
let msg = format!(
"{post_vm_prefix}{}{}ktstr_test '{}'{} [topo={}] {}{}{}{}{}{}{}",
fingerprint_line,
bug_summary_line(),
entry.name,
sched_label,
topo,
reason,
console_section,
timeline_section,
sched_log_section,
dump_section,
monitor_section,
repro_section,
);
anyhow::bail!("{msg}")
}
#[cfg(test)]
mod eval_tests;
#[cfg(test)]
mod eval_tests_eval;
#[cfg(test)]
mod eval_tests_llm;
#[cfg(test)]
mod eval_tests_reporting;