use std::path::Path;
use std::time::{Duration, Instant};
use crate::assert::AssertResult;
use super::args::{
extract_probe_stack_arg, extract_test_fn_arg, extract_work_type_arg, resolve_cgroup_root,
};
use super::entry::find_test;
use super::output::{extract_sched_ext_dump, print_assert_result};
use super::profraw::try_flush_profraw;
use super::runtime::{config_content_parts, config_file_parts, verbose};
use super::{KtstrTestEntry, TopoOverride};
use crate::verifier::{
SCHED_OUTPUT_END, SCHED_OUTPUT_START, parse_sched_output, parse_sched_output_partial,
};
const DISCOVER_SENTINEL: &str = "__discover__";
pub(crate) fn propagate_rust_env_from_cmdline() {
let Ok(cmdline) = std::fs::read_to_string("/proc/cmdline") else {
return;
};
for (key, val) in parse_rust_env_from_cmdline(&cmdline) {
unsafe { std::env::set_var(key, val) };
}
}
fn parse_rust_env_from_cmdline(cmdline: &str) -> Vec<(&'static str, &str)> {
let mut out = Vec::new();
for token in cmdline.split_whitespace() {
if let Some(val) = token.strip_prefix("RUST_BACKTRACE=") {
out.push(("RUST_BACKTRACE", val));
} else if let Some(val) = token.strip_prefix("RUST_LOG=") {
out.push(("RUST_LOG", val));
} else if let Some(val) = token.strip_prefix("KTSTR_SIDECAR_DIR=") {
out.push(("KTSTR_SIDECAR_DIR", val));
}
}
out
}
pub(crate) const PROBE_OUTPUT_START: &str = "===PROBE_OUTPUT_START===";
pub(crate) const PROBE_OUTPUT_END: &str = "===PROBE_OUTPUT_END===";
fn format_tail(text: &str, n: usize, header: &str) -> Option<String> {
let lines: Vec<&str> = text.lines().collect();
if lines.is_empty() {
return None;
}
let start = lines.len().saturating_sub(n);
Some(format!("--- {header} ---\n{}", lines[start..].join("\n")))
}
pub(crate) const SCHED_EXT_DUMP_MARKER: &str = "sched_ext_dump:";
fn render_dmesg_tail(stderr: &str, tail_lines: usize) -> String {
let mut filter_dropped_any = false;
let filtered: String = stderr
.lines()
.filter(|l| {
let drop = l.contains(SCHED_EXT_DUMP_MARKER);
filter_dropped_any |= drop;
!drop
})
.collect::<Vec<_>>()
.join("\n");
let post_filter_corrupt = classify_dmesg_corruption(&filtered);
if filter_dropped_any && post_filter_corrupt.is_some() {
return "--- repro VM dmesg ---\n(no kernel printk other than \
sched_ext_dump — full dump in section above)"
.to_string();
}
if let Some(diag) = post_filter_corrupt {
return format!("--- repro VM dmesg ---\n{diag}");
}
format_tail(&filtered, tail_lines, "repro VM dmesg")
.unwrap_or_else(|| "--- repro VM dmesg ---\n(unavailable)".to_string())
}
fn classify_dmesg_corruption(text: &str) -> Option<&'static str> {
if text.is_empty() {
return Some("empty (scheduler crashed before kernel printk reached the UART buffer)");
}
let mut saw_corrupt = false;
for c in text.chars() {
if c.is_whitespace() {
continue;
}
if c == '\u{fffd}' || c.is_control() {
saw_corrupt = true;
continue;
}
return None;
}
if saw_corrupt {
Some(
"corrupt or no readable text (UART buffer uninitialized or \
trimmed — scheduler likely crashed before any kernel printk)",
)
} else {
Some("empty (scheduler crashed before kernel printk reached the UART buffer)")
}
}
fn render_failure_dump_file(path: &std::path::Path) -> Option<String> {
use crate::monitor::dump::FailureDumpReportAny;
use std::fmt::Write;
let json = std::fs::read_to_string(path).ok()?;
let any = FailureDumpReportAny::from_json(&json)?;
let mut buf = String::with_capacity(json.len());
buf.push_str("--- repro VM failure dump ---\n");
let _ = write!(buf, "{any}");
Some(buf)
}
fn classify_repro_vm_status(
timed_out: bool,
has_crash_message: bool,
_output: &str,
exit_code: i32,
guest_messages: Option<&crate::vmm::host_comms::BulkDrainResult>,
) -> String {
if timed_out {
return "repro VM: timed out".to_string();
}
if let Some(reason) = extract_not_attached_reason(guest_messages) {
return format!("repro VM: scheduler did not attach ({reason}) (exit code {exit_code})",);
}
let scheduler_died = guest_messages
.map(|d| {
d.entries.iter().any(|e| {
e.msg_type == crate::vmm::wire::MSG_TYPE_LIFECYCLE
&& e.crc_ok
&& !e.payload.is_empty()
&& crate::vmm::wire::LifecyclePhase::from_wire(e.payload[0])
== Some(crate::vmm::wire::LifecyclePhase::SchedulerDied)
})
})
.unwrap_or(false);
if has_crash_message || scheduler_died {
let exit_clause = if exit_code == -1 {
"VM host reported no final exit status (the scheduler did not \
deliver an exit signal before the VM ended)"
.to_string()
} else if exit_code < 0 {
format!("killed by signal ({exit_code})")
} else if exit_code == 0 {
"exited cleanly".to_string()
} else {
format!("exited with non-zero status ({exit_code})")
};
return format!("repro VM: scheduler crashed — {exit_clause}");
}
if exit_code != 0 {
return format!("repro VM: exited abnormally (exit code {exit_code})");
}
"repro VM: scheduler ran normally (crash did not reproduce)".to_string()
}
fn extract_not_attached_reason(
drain: Option<&crate::vmm::host_comms::BulkDrainResult>,
) -> Option<String> {
use crate::vmm::wire::{LifecyclePhase, MSG_TYPE_LIFECYCLE};
let drain = drain?;
for e in &drain.entries {
if e.msg_type != MSG_TYPE_LIFECYCLE || !e.crc_ok || e.payload.is_empty() {
continue;
}
if LifecyclePhase::from_wire(e.payload[0]) != Some(LifecyclePhase::SchedulerNotAttached) {
continue;
}
let reason = String::from_utf8_lossy(&e.payload[1..]).trim().to_string();
if reason.is_empty() {
return None;
}
return Some(reason);
}
None
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn attempt_auto_repro(
entry: &KtstrTestEntry,
kernel: &Path,
scheduler: Option<&Path>,
ktstr_bin: &Path,
first_vm_output: &str,
console_output: &str,
topo: Option<&TopoOverride>,
active_flags: &[String],
primary_exit_kind: Option<u64>,
) -> Option<String> {
use crate::probe::stack::extract_stack_functions_all;
let auto_repro_start = Instant::now();
let has_sched_start = first_vm_output.contains(SCHED_OUTPUT_START);
let has_sched_end = first_vm_output.contains(SCHED_OUTPUT_END);
eprintln!(
"ktstr_test: auto-repro: COM2 length={} has_sched_start={has_sched_start} has_sched_end={has_sched_end}",
first_vm_output.len(),
);
let sched_output = parse_sched_output_partial(first_vm_output);
let stack_funcs = if let Some(sched) = sched_output {
let funcs = extract_stack_functions_all(sched);
if funcs.is_empty() {
if has_sched_start && !has_sched_end {
eprintln!(
"ktstr_test: auto-repro: no functions from partial COM2 (missing \
SCHED_OUTPUT_END), trying COM1",
);
} else {
eprintln!("ktstr_test: auto-repro: no functions from COM2, trying COM1");
}
extract_stack_functions_all(console_output)
} else {
if has_sched_start && !has_sched_end {
eprintln!(
"ktstr_test: auto-repro: extracted {} functions from partial COM2 \
(missing SCHED_OUTPUT_END)",
funcs.len(),
);
}
funcs
}
} else {
eprintln!("ktstr_test: auto-repro: no scheduler output on COM2, trying COM1");
extract_stack_functions_all(console_output)
};
let func_names: Vec<String> = stack_funcs.iter().map(|f| f.raw_name.clone()).collect();
let is_stall = primary_exit_kind == Some(crate::probe::scx_defs::EXIT_ERROR_STALL);
let mut guest_args = vec![
"run".to_string(),
"--ktstr-test-fn".to_string(),
entry.name.to_string(),
];
if !is_stall {
let probe_arg = if func_names.is_empty() {
eprintln!(
"ktstr_test: auto-repro: no stack functions, using BPF discovery in repro VM"
);
format!("--ktstr-probe-stack={DISCOVER_SENTINEL}")
} else {
eprintln!(
"ktstr_test: auto-repro: probing {} functions in second VM",
func_names.len()
);
format!("--ktstr-probe-stack={}", func_names.join(","))
};
guest_args.push(probe_arg);
} else {
eprintln!("ktstr_test: auto-repro: stall exit — skipping probe attachment");
}
let cmdline_extra = super::runtime::build_cmdline_extra(entry);
let (vm_topology, memory_mb) = super::runtime::resolve_vm_topology(entry, topo);
let no_perf_mode = super::runtime::no_perf_mode_for_entry(entry);
let mut builder = super::runtime::build_vm_builder_base(
entry,
kernel,
ktstr_bin,
scheduler,
vm_topology,
memory_mb,
&cmdline_extra,
&guest_args,
no_perf_mode,
);
let repro_dump_path =
super::sidecar::sidecar_dir().join(format!("{}.repro.failure-dump.json", entry.name));
builder = builder.failure_dump_path(&repro_dump_path);
builder = builder
.dual_snapshot(true)
.performance_mode(entry.performance_mode);
if let Some(crate::test_support::entry::SchedulerSpec::KernelBuiltin { enable, disable }) =
entry.scheduler.scheduler_binary()
{
builder = builder.sched_enable_cmds(enable);
builder = builder.sched_disable_cmds(disable);
}
let merged_assert = crate::assert::Assert::default_checks()
.merge(entry.scheduler.assert())
.merge(&entry.assert);
if entry.scheduler.has_active_scheduling() {
builder = builder.monitor_thresholds(merged_assert.monitor_thresholds());
}
{
let mut args: Vec<String> = Vec::new();
let declarative_specs: Vec<std::path::PathBuf> = entry
.all_include_files()
.into_iter()
.map(std::path::PathBuf::from)
.collect();
let mut resolved_includes: Vec<(String, std::path::PathBuf, &'static str)> =
if declarative_specs.is_empty() {
Vec::new()
} else {
match crate::cli::resolve_include_files(&declarative_specs) {
Ok(v) => v.into_iter().map(|(a, h)| (a, h, "declarative")).collect(),
Err(e) => {
eprintln!("ktstr_test: auto-repro: include_files resolve: {e:#}");
Vec::new()
}
}
};
if let Some((archive_path, host_path, guest_path)) = config_file_parts(entry) {
resolved_includes.push((archive_path, host_path, "scheduler config_file"));
args.push("--config".to_string());
args.push(guest_path);
}
if let Some((archive_path, host_path, _guest_path, cfg_args)) = config_content_parts(entry)
{
resolved_includes.push((archive_path, host_path, "inline config_content"));
args.extend(cfg_args);
}
match super::eval::dedupe_include_files(&resolved_includes) {
Ok(unioned) if !unioned.is_empty() => {
builder = builder.include_files(unioned);
}
Ok(_) => {}
Err(e) => {
eprintln!("ktstr_test: auto-repro: include_files dedupe: {e:#}");
}
}
super::runtime::append_base_sched_args(entry, &mut args);
for flag_name in active_flags {
if let Some(flag_args) = entry.scheduler.flag_args(flag_name) {
args.extend(flag_args.iter().map(|s| s.to_string()));
}
}
if !args.is_empty() {
builder = builder.sched_args(&args);
}
}
let build_start = Instant::now();
let vm = match builder.build() {
Ok(vm) => vm,
Err(e) => {
eprintln!("ktstr_test: auto-repro: failed to build VM: {e:#}");
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = "build_failed",
"auto_repro: total",
);
return None;
}
};
tracing::info!(
elapsed_ms = build_start.elapsed().as_millis() as u64,
"auto_repro: vm_build",
);
let run_start = Instant::now();
let repro_result = match vm.run() {
Ok(r) => r,
Err(e) => {
eprintln!("ktstr_test: auto-repro: VM run failed: {e:#}");
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = "run_failed",
"auto_repro: total",
);
return None;
}
};
tracing::info!(
elapsed_ms = run_start.elapsed().as_millis() as u64,
guest_duration_ms = repro_result.duration.as_millis() as u64,
"auto_repro: vm_run",
);
drop(vm);
if verbose() {
eprintln!(
"ktstr_test: auto-repro: COM1 stderr length={} COM2 stdout length={}",
repro_result.stderr.len(),
repro_result.output.len(),
);
for line in repro_result.stderr.lines() {
eprintln!(" repro-vm-com1: {line}");
}
let mut in_probe = false;
for line in repro_result.output.lines() {
if line.contains("ktstr_test: probe:") {
in_probe = true;
}
if in_probe {
eprintln!(" repro-vm-com2: {line}");
}
}
}
let kernel_dir = crate::kernel_path::derive_kernel_dir(kernel)
.map(|dir| crate::cache::prefer_source_tree_for_dwarf(&dir).unwrap_or(dir))
.and_then(|p| p.to_str().map(String::from));
let kernel_dir_str = kernel_dir.as_deref();
let probe_payload_partial_path = super::sidecar::sidecar_dir()
.join(format!("{}.repro.probe-payload.partial.json", entry.name));
let _ = std::fs::remove_file(&probe_payload_partial_path);
let probe_section = if is_stall {
tracing::debug!(
"auto-repro: suppressing chain-to-failure for stall exit \
(no causal task — probe events are always empty after stitch)",
);
None
} else {
extract_probe_output(
&repro_result.output,
kernel_dir_str,
Some(probe_payload_partial_path.as_path()),
)
};
const REPRO_TAIL_LINES: usize = 40;
let sched_log_tail = parse_sched_output(&repro_result.output).and_then(|log| {
let collapsed = crate::verifier::collapse_cycles(log);
format_tail(&collapsed, REPRO_TAIL_LINES, "repro VM scheduler log")
});
let dump_tail = extract_sched_ext_dump(&repro_result.stderr)
.and_then(|dump| format_tail(&dump, REPRO_TAIL_LINES, "repro VM sched_ext dump"));
let dmesg_tail = Some(render_dmesg_tail(&repro_result.stderr, REPRO_TAIL_LINES));
let failure_dump_tail = render_failure_dump_file(&repro_dump_path);
let tails: Vec<String> = [sched_log_tail, dump_tail, failure_dump_tail, dmesg_tail]
.into_iter()
.flatten()
.collect();
if probe_section.is_none() && tails.is_empty() {
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = "no_data",
"auto_repro: total",
);
return None;
}
let has_probe = probe_section.is_some();
let mut out = probe_section.unwrap_or_default();
if !has_probe {
out.push_str(&classify_repro_vm_status(
repro_result.timed_out,
repro_result.crash_message.is_some(),
&repro_result.output,
repro_result.exit_code,
repro_result.guest_messages.as_ref(),
));
}
if !out.is_empty() {
out.push('\n');
}
out.push_str(&format!(
"repro VM duration: {:.1}s",
repro_result.duration.as_secs_f64(),
));
for tail in &tails {
out.push_str("\n\n");
out.push_str(tail);
}
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = if has_probe {
"probe_data"
} else {
"tails_only"
},
"auto_repro: total",
);
Some(out)
}
pub(crate) fn extract_probe_output(
output: &str,
kernel_dir: Option<&str>,
partial_dump_path: Option<&Path>,
) -> Option<String> {
let json = crate::probe::output::extract_section(output, PROBE_OUTPUT_START, PROBE_OUTPUT_END);
if json.is_empty() {
return None;
}
let payload = parse_probe_payload(&json, partial_dump_path)?;
let mut out = String::new();
if let Some(ref diag) = payload.diagnostics {
out.push_str(&format_probe_diagnostics(&diag.pipeline, &diag.skeleton));
}
if payload.events.is_empty() {
if out.is_empty() {
return None;
}
return Some(out);
}
out.push_str(&crate::probe::output::format_probe_events_with_bpf_locs(
&payload.events,
&payload.func_names,
kernel_dir,
&payload.bpf_source_locs,
payload.nr_cpus,
&payload.param_names,
&payload.render_hints,
));
Some(out)
}
fn parse_probe_payload(json: &str, partial_dump_path: Option<&Path>) -> Option<ProbeBytes> {
match serde_json::from_str::<ProbeBytes>(json) {
Ok(payload) => Some(payload),
Err(e) => {
let total_len = json.len();
let category = if e.is_eof() { "truncated" } else { "malformed" };
eprintln!(
"ktstr_test: probe payload {category}: {e} \
(line {}, column {}, total {total_len} bytes)",
e.line(),
e.column(),
);
if let Some(path) = partial_dump_path {
match std::fs::write(path, json) {
Ok(()) => eprintln!(
"ktstr_test: probe payload: wrote raw bytes to {}",
path.display(),
),
Err(write_err) => eprintln!(
"ktstr_test: probe payload: failed to write raw bytes to {}: {write_err}",
path.display(),
),
}
}
if !e.is_eof() {
return None;
}
let recovered = recover_partial_events(json);
if recovered.is_empty() {
return None;
}
eprintln!(
"ktstr_test: probe payload: recovered {} event(s) from truncated input",
recovered.len(),
);
Some(ProbeBytes {
events: recovered,
func_names: Vec::new(),
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
})
}
}
}
fn recover_partial_events(json: &str) -> Vec<crate::probe::process::ProbeEvent> {
let key = "\"events\":";
let Some(key_idx) = json.find(key) else {
return Vec::new();
};
let after_key = &json[key_idx + key.len()..];
let Some(open_offset) = after_key.find('[') else {
return Vec::new();
};
let mut events = Vec::new();
let mut cur = &after_key[open_offset + 1..];
loop {
cur = cur.trim_start();
if let Some(rest) = cur.strip_prefix(',') {
cur = rest.trim_start();
}
if cur.is_empty() || cur.starts_with(']') || !cur.starts_with('{') {
break;
}
let Some(end) = find_balanced_object_end(cur) else {
break;
};
let chunk = &cur[..end];
let Ok(ev) = serde_json::from_str::<crate::probe::process::ProbeEvent>(chunk) else {
break;
};
events.push(ev);
cur = &cur[end..];
}
events
}
fn find_balanced_object_end(s: &str) -> Option<usize> {
let bytes = s.as_bytes();
if bytes.first() != Some(&b'{') {
return None;
}
let mut depth: u32 = 0;
let mut in_string = false;
let mut escape = false;
for (i, &b) in bytes.iter().enumerate() {
if in_string {
if escape {
escape = false;
} else if b == b'\\' {
escape = true;
} else if b == b'"' {
in_string = false;
}
continue;
}
match b {
b'"' => in_string = true,
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return Some(i + 1);
}
}
_ => {}
}
}
None
}
fn stitch_drop_cause(
skeleton: &crate::probe::process::ProbeDiagnostics,
) -> std::borrow::Cow<'static, str> {
use crate::probe::scx_defs::{EXIT_ERROR, EXIT_ERROR_BPF, EXIT_ERROR_STALL};
if skeleton.bpf_trigger_fires == 0 {
return std::borrow::Cow::Borrowed(
"trigger never fired (timing race or scheduler clean-exited; \
no error-class sched_ext_exit observed)",
);
}
match skeleton.bpf_exit_kind_snap as u64 {
EXIT_ERROR_STALL => {
"trigger fired with kind=STALL (no causal task; pre-trigger events \
suppressed because watchdog-context exit lacks a current task)"
}
EXIT_ERROR => {
"trigger fired with kind=ERROR (no current task at exit time; \
pre-trigger events suppressed because generic ERROR can fire \
from kworker context where `current` is not the causal task)"
}
EXIT_ERROR_BPF => {
"trigger fired with kind=BPF_ERROR but stitch found no matching \
task_ptr (suspected ID mismatch or func_idx_offset bug — file a ticket)"
}
other => {
return format!(
"trigger fired but exit kind {other} is unrecognized; \
pre-trigger events suppressed because no causal task \
was identified (map value via include/linux/sched/ext.h)"
)
.into();
}
}
.into()
}
pub(crate) fn format_probe_diagnostics(
pipeline: &PipelineDiagnostics,
skeleton: &crate::probe::process::ProbeDiagnostics,
) -> String {
let mut out = String::new();
out.push_str("--- probe pipeline ---\n");
out.push_str(&format!(
" extracted: {} functions from crash backtrace\n",
pipeline.stack_extracted,
));
let passed = (pipeline.stack_extracted as usize).saturating_sub(pipeline.filter_dropped.len());
if pipeline.filter_dropped.is_empty() {
out.push_str(&format!(" traceable: {passed} passed filter\n"));
} else {
out.push_str(&format!(
" traceable: {passed} passed, {} dropped: {}\n",
pipeline.filter_dropped.len(),
pipeline.filter_dropped.join(", "),
));
}
out.push_str(&format!(
" bpf_discover: {} programs found\n",
pipeline.bpf_discovered,
));
out.push_str(&format!(
" after_expand: {} total probe targets\n",
pipeline.total_after_expand,
));
if skeleton.kprobe_attach_failed.is_empty() {
out.push_str(&format!(
" kprobes: {} attached\n",
skeleton.kprobe_attached,
));
} else {
out.push_str(&format!(
" kprobes: {} attached, {} failed: {}\n",
skeleton.kprobe_attached,
skeleton.kprobe_attach_failed.len(),
skeleton
.kprobe_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
if !skeleton.kprobe_resolve_failed.is_empty() {
out.push_str(&format!(
" kprobe_miss: {} unresolved: {}\n",
skeleton.kprobe_resolve_failed.len(),
skeleton.kprobe_resolve_failed.join(", "),
));
}
if skeleton.fentry_candidates > 0 {
if skeleton.fentry_attach_failed.is_empty() {
out.push_str(&format!(
" fentry: {} attached\n",
skeleton.fentry_attached,
));
} else {
out.push_str(&format!(
" fentry: {} attached, {} failed: {}\n",
skeleton.fentry_attached,
skeleton.fentry_attach_failed.len(),
skeleton
.fentry_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
}
let trigger_type = if skeleton.trigger_type.is_empty() {
"unknown"
} else {
&skeleton.trigger_type
};
if let Some(ref err) = skeleton.trigger_attach_error {
out.push_str(&format!(" trigger: attach failed ({err})\n"));
} else {
out.push_str(&format!(
" trigger: {} ({})\n",
if skeleton.trigger_fired {
"fired"
} else {
"not fired"
},
trigger_type,
));
}
if let Some(ref panic_msg) = skeleton.host_thread_panic {
out.push_str(&format!(
" ERROR: probe-collection thread panicked: {panic_msg}\n"
));
}
out.push_str(&format!(
" probe_data: {} keys, {} unmatched IPs\n",
skeleton.probe_data_keys, skeleton.probe_data_unmatched_ips,
));
out.push_str(&format!(
" events: {} captured, {} after stitch",
skeleton.events_before_stitch, skeleton.events_after_stitch,
));
if skeleton.events_before_stitch > 0 && skeleton.events_after_stitch == 0 {
let cause = stitch_drop_cause(skeleton);
out.push_str(" — ");
out.push_str(&cause);
} else if skeleton.stitch_fallback_used {
out.push_str(" — trigger absent, grouped by task_ptr frequency (best-effort)");
}
out.push('\n');
if skeleton.bpf_kprobe_fires > 0
|| skeleton.bpf_trigger_fires > 0
|| skeleton.bpf_meta_misses > 0
{
out.push_str(&format!(
" bpf_counts: {} kprobe fires, {} trigger fires, {} meta misses\n",
skeleton.bpf_kprobe_fires, skeleton.bpf_trigger_fires, skeleton.bpf_meta_misses,
));
if !skeleton.bpf_miss_ips.is_empty() {
let ips: Vec<String> = skeleton
.bpf_miss_ips
.iter()
.map(|ip| format!("0x{ip:x}"))
.collect();
out.push_str(&format!(" miss_ips: {}\n", ips.join(", ")));
}
}
out
}
pub(crate) fn maybe_dispatch_vm_test() -> Option<i32> {
let args: Vec<String> = std::env::args().collect();
maybe_dispatch_vm_test_with_args(&args)
}
fn build_dispatch_ctx_parts(
entry: &KtstrTestEntry,
args: &[String],
) -> (
crate::topology::TestTopology,
crate::cgroup::CgroupManager,
Option<libc::pid_t>,
crate::assert::Assert,
) {
let topo = match crate::topology::TestTopology::from_system() {
Ok(sys) => sys,
Err(e) => {
eprintln!("ktstr_test: topology from sysfs failed ({e}), using VM spec fallback");
crate::topology::TestTopology::from_vm_topology(&entry.topology)
}
};
let cgroup_root = resolve_cgroup_root(args);
let cgroups = crate::cgroup::CgroupManager::new(&cgroup_root);
let sched_pid = crate::vmm::rust_init::sched_pid();
let merged_assert = crate::assert::Assert::default_checks()
.merge(entry.scheduler.assert())
.merge(&entry.assert);
(topo, cgroups, sched_pid, merged_assert)
}
pub(crate) fn maybe_dispatch_vm_test_with_args(args: &[String]) -> Option<i32> {
let name = match extract_test_fn_arg(args) {
Some(n) => n,
None => {
tracing::debug!("ktstr-init: no --ktstr-test-fn in args, skipping dispatch");
return None;
}
};
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let probe_stack = extract_probe_stack_arg(args);
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let pipeline = ProbePipeline::new();
let probe_stop = pipeline.stop.clone();
let probe_handle: Option<ProbeHandle> = probe_stack.as_ref().and_then(|stack_input| {
use crate::probe::stack::load_probe_stack;
eprintln!("ktstr_test: probe: loading probe stack from --ktstr-probe-stack");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let mut functions = crate::probe::stack::filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let stack_display_names: Vec<&str> = functions
.iter()
.filter(|f| f.is_bpf)
.map(|f| f.display_name.as_str())
.collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&stack_display_names);
pipe_diag.bpf_discovered = bpf_syms.len() as u32;
if !bpf_syms.is_empty() {
eprintln!(
"ktstr_test: probe: {} BPF symbols discovered",
bpf_syms.len()
);
functions.extend(bpf_syms);
}
let functions = crate::probe::stack::expand_bpf_to_kernel_callers(functions);
pipe_diag.total_after_expand = functions.len() as u32;
if functions.is_empty() {
eprintln!("ktstr_test: no traceable functions from --ktstr-probe-stack");
return None;
}
eprintln!(
"ktstr_test: probe: {} functions loaded, spawning probe thread",
functions.len()
);
let kernel_names: Vec<&str> = functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
let mut btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let bpf_btf_args: Vec<(&str, u32)> = functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
if !bpf_btf_args.is_empty() {
btf_funcs.extend(crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args));
}
let func_names: Vec<(u32, String)> = functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&functions);
let pnames = crate::probe::output::build_param_names(&btf_funcs);
let rhints = crate::probe::output::build_render_hints(&btf_funcs);
let pnames_thread = pnames.clone();
let rhints_thread = rhints.clone();
let thread_pipeline = pipeline.clone();
let funcs = functions.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let handle = std::thread::spawn(move || {
use crate::probe::process::run_probe_skeleton;
let (events, diag, accumulated_fn_names) = run_probe_skeleton(
&funcs,
&btf_funcs,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
None,
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames_thread,
&rhints_thread,
);
thread_pipeline.output_done.set();
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
Some(ProbeHandle {
thread: handle,
func_names,
pipeline_diag: pipe_diag,
output_done: pipeline.output_done.clone(),
param_names: pnames,
render_hints: rhints,
})
});
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.workers_per_cgroup(entry.workers_per_cgroup as usize)
.sched_pid(sched_pid)
.settle(Duration::ZERO)
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.build();
if crate::vmm::guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_start();
}
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult {
passed: false,
skipped: false,
details: vec![format!("{e:#}").into()],
stats: Default::default(),
measurements: std::collections::BTreeMap::new(),
};
publish_result_and_collect(&r, probe_stop, probe_handle);
return Some(1);
}
};
let exit_code = if result.passed { 0 } else { 1 };
publish_result_and_collect(&result, probe_stop, probe_handle);
Some(exit_code)
}
type ProbeThreadResult = (
Option<Vec<crate::probe::process::ProbeEvent>>,
crate::probe::process::ProbeDiagnostics,
Vec<(u32, String)>,
);
struct ProbeHandle {
thread: std::thread::JoinHandle<ProbeThreadResult>,
func_names: Vec<(u32, String)>,
pipeline_diag: PipelineDiagnostics,
output_done: std::sync::Arc<crate::sync::Latch>,
param_names: std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Clone, Default)]
pub(crate) struct ProbePipeline {
pub stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub output_done: std::sync::Arc<crate::sync::Latch>,
pub probes_ready: std::sync::Arc<crate::sync::Latch>,
}
impl ProbePipeline {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct PipelineDiagnostics {
pub stack_extracted: u32,
pub filter_dropped: Vec<String>,
pub bpf_discovered: u32,
pub total_after_expand: u32,
}
pub(crate) struct ProbePhaseAState {
pub handle: std::thread::JoinHandle<ProbeThreadResult>,
pub phase_b_tx: std::sync::mpsc::Sender<crate::probe::process::PhaseBInput>,
pub pipeline: ProbePipeline,
pub kernel_func_names: Vec<(u32, String)>,
pub kernel_func_count: u32,
pub pipe_diag: PipelineDiagnostics,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
pub(crate) fn start_probe_phase_a(args: &[String]) -> Option<ProbePhaseAState> {
use crate::probe::stack::{filter_traceable, load_probe_stack};
let stack_input = extract_probe_stack_arg(args)?;
let phase_a_start = Instant::now();
eprintln!("ktstr_test: probe phase_a: loading kernel functions");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(&stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let functions = filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let kernel_functions: Vec<crate::probe::stack::StackFunction> =
functions.into_iter().filter(|f| !f.is_bpf).collect();
let kernel_names: Vec<&str> = kernel_functions
.iter()
.map(|f| f.raw_name.as_str())
.collect();
let btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let func_names: Vec<(u32, String)> = kernel_functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
pipe_diag.total_after_expand = kernel_functions.len() as u32;
let bpf_fds = std::collections::HashMap::new(); let param_names = crate::probe::output::build_param_names(&btf_funcs);
let render_hints = crate::probe::output::build_render_hints(&btf_funcs);
let pipeline = ProbePipeline::new();
let (phase_b_tx, phase_b_rx) = std::sync::mpsc::channel();
let thread_pipeline = pipeline.clone();
let funcs = kernel_functions.clone();
let btf = btf_funcs.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let pnames = param_names.clone();
let rhints = render_hints.clone();
let handle = std::thread::spawn(move || {
let (events, diag, accumulated_fn_names) = crate::probe::process::run_probe_skeleton(
&funcs,
&btf,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
Some(phase_b_rx),
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames,
&rhints,
);
thread_pipeline.output_done.set();
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
tracing::info!(
elapsed_ms = phase_a_start.elapsed().as_millis() as u64,
kernel_functions = kernel_functions.len(),
"auto_repro: phase_a_attach",
);
eprintln!(
"ktstr_test: probe phase_a: {} kernel functions attached, waiting for Phase B",
kernel_functions.len(),
);
let kernel_func_count = kernel_functions.len() as u32;
Some(ProbePhaseAState {
handle,
phase_b_tx,
pipeline,
kernel_func_names: func_names,
kernel_func_count,
pipe_diag,
param_names,
render_hints,
})
}
pub(crate) fn maybe_dispatch_vm_test_with_phase_a(
args: &[String],
pa: ProbePhaseAState,
) -> Option<i32> {
use crate::probe::btf::discover_bpf_symbols;
use crate::probe::stack::expand_bpf_to_kernel_callers;
let name = match extract_test_fn_arg(args) {
Some(n) => n,
None => {
tracing::debug!("ktstr-init: no --ktstr-test-fn in args, skipping dispatch");
return None;
}
};
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let ProbePhaseAState {
handle: pa_handle,
phase_b_tx: pa_phase_b_tx,
pipeline: pa_pipeline,
kernel_func_names: pa_kernel_func_names,
kernel_func_count: pa_kernel_func_count,
pipe_diag: mut pa_pipe_diag,
param_names: pa_param_names,
render_hints: pa_render_hints,
} = pa;
eprintln!("ktstr_test: probe phase_b: discovering BPF symbols");
let discover_start = Instant::now();
let stack_display_names: Vec<&str> = Vec::new(); let bpf_syms = discover_bpf_symbols(&stack_display_names);
if bpf_syms.is_empty() {
let sched_alive = crate::vmm::rust_init::sched_pid()
.is_some_and(|pid| unsafe { libc::kill(pid, 0) == 0 });
if sched_alive {
tracing::warn!(
"phase_b: bpf_discover returned 0 programs while scheduler is \
still alive — verify ProgInfoIter access permissions or BTF \
(this is the unexpected case; the auto-repro pipeline is now \
attached to no BPF struct_ops callbacks)"
);
} else {
tracing::info!(
"phase_b: bpf_discover returned 0 programs — scheduler exited \
before the discovery window (expected for fast-crash paths)"
);
}
}
pa_pipe_diag.bpf_discovered = bpf_syms.len() as u32;
tracing::info!(
elapsed_ms = discover_start.elapsed().as_millis() as u64,
bpf_syms = bpf_syms.len(),
"auto_repro: phase_b_discover",
);
eprintln!(
"ktstr_test: probe phase_b: {} BPF symbols discovered",
bpf_syms.len()
);
if !bpf_syms.is_empty() {
let phase_b_functions = expand_bpf_to_kernel_callers(bpf_syms);
pa_pipe_diag.total_after_expand = pa_pipe_diag
.total_after_expand
.saturating_add(phase_b_functions.len() as u32);
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&phase_b_functions);
let bpf_btf_args: Vec<(&str, u32)> = phase_b_functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
let mut phase_b_btf = if !bpf_btf_args.is_empty() {
crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args)
} else {
Vec::new()
};
let kernel_caller_names: Vec<&str> = phase_b_functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
if !kernel_caller_names.is_empty() {
phase_b_btf.extend(crate::probe::btf::parse_btf_functions(
&kernel_caller_names,
None,
));
}
let phase_b_done = std::sync::Arc::new(crate::sync::Latch::new());
let phase_b_done_clone = phase_b_done.clone();
let n_phase_b_functions = phase_b_functions.len();
let phase_b_input = crate::probe::process::PhaseBInput {
functions: phase_b_functions,
bpf_prog_fds: bpf_fds,
btf_funcs: phase_b_btf,
done: phase_b_done_clone,
func_idx_offset: pa_kernel_func_count,
};
let attach_start = Instant::now();
if let Err(e) = pa_phase_b_tx.send(phase_b_input) {
eprintln!("ktstr_test: probe phase_b: failed to send: {e}");
} else {
phase_b_done.wait();
tracing::info!(
elapsed_ms = attach_start.elapsed().as_millis() as u64,
phase_b_functions = n_phase_b_functions,
"auto_repro: phase_b_attach",
);
eprintln!("ktstr_test: probe phase_b: BPF fentry attached");
}
} else {
eprintln!("ktstr_test: probe phase_b: no BPF symbols, skipping fentry");
drop(pa_phase_b_tx);
}
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.workers_per_cgroup(entry.workers_per_cgroup as usize)
.sched_pid(sched_pid)
.settle(Duration::ZERO)
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.build();
let stop = pa_pipeline.stop.clone();
let handle = ProbeHandle {
thread: pa_handle,
func_names: pa_kernel_func_names,
pipeline_diag: pa_pipe_diag,
output_done: pa_pipeline.output_done,
param_names: pa_param_names,
render_hints: pa_render_hints,
};
if crate::vmm::guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_start();
}
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult {
passed: false,
skipped: false,
details: vec![format!("{e:#}").into()],
stats: Default::default(),
measurements: std::collections::BTreeMap::new(),
};
publish_result_and_collect(&r, stop, Some(handle));
return Some(1);
}
};
let exit_code = if result.passed { 0 } else { 1 };
publish_result_and_collect(&result, stop, Some(handle));
Some(exit_code)
}
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytes {
pub events: Vec<crate::probe::process::ProbeEvent>,
pub func_names: Vec<(u32, String)>,
pub bpf_source_locs: std::collections::HashMap<String, String>,
pub diagnostics: Option<ProbeBytesDiagnostics>,
pub nr_cpus: Option<u32>,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytesDiagnostics {
pub pipeline: PipelineDiagnostics,
pub skeleton: crate::probe::process::ProbeDiagnostics,
}
fn emit_probe_payload(
events: &[crate::probe::process::ProbeEvent],
func_names: &[(u32, String)],
pipeline_diag: &PipelineDiagnostics,
skeleton_diag: &crate::probe::process::ProbeDiagnostics,
param_names: &std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: &std::collections::HashMap<String, crate::probe::btf::RenderHint>,
) {
let bpf_source_locs = if events.is_empty() {
std::collections::HashMap::new()
} else {
let source_loc_names: Vec<&str> =
func_names.iter().map(|(_, name)| name.as_str()).collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&source_loc_names);
let bpf_prog_ids: Vec<u32> = func_names
.iter()
.filter_map(|(_, name)| {
bpf_syms
.iter()
.find(|s| s.display_name == *name)
.and_then(|s| s.bpf_prog_id)
})
.collect();
crate::probe::btf::resolve_bpf_source_locs(&bpf_prog_ids)
};
let payload = ProbeBytes {
events: events.to_vec(),
func_names: func_names.to_vec(),
bpf_source_locs,
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: pipeline_diag.clone(),
skeleton: skeleton_diag.clone(),
}),
nr_cpus: crate::probe::output::get_nr_cpus(),
param_names: param_names.clone(),
render_hints: render_hints.clone(),
};
println!("{PROBE_OUTPUT_START}");
if let Ok(json) = serde_json::to_string(&payload) {
println!("{json}");
}
println!("{PROBE_OUTPUT_END}");
}
struct DeferredProbe {
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
}
static DEFERRED_PROBE_COLLECT: std::sync::Mutex<Option<DeferredProbe>> =
std::sync::Mutex::new(None);
fn stash_deferred_probe(
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
let mut guard = DEFERRED_PROBE_COLLECT.lock().unwrap();
*guard = Some(DeferredProbe { stop, handle });
}
fn take_deferred_probe() -> Option<DeferredProbe> {
DEFERRED_PROBE_COLLECT.lock().unwrap().take()
}
fn wait_for_sched_disabled(timeout: std::time::Duration) -> bool {
let deadline = std::time::Instant::now() + timeout;
let path = "/sys/kernel/sched_ext/state";
loop {
if let Ok(s) = std::fs::read_to_string(path) {
if s.trim() == "disabled" {
return true;
}
} else {
return false;
}
if std::time::Instant::now() >= deadline {
return false;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
pub(crate) fn finalize_probe_after_unwind() {
let Some(deferred) = take_deferred_probe() else {
return;
};
if deferred.handle.is_some() {
let _ = wait_for_sched_disabled(std::time::Duration::from_secs(5));
}
collect_and_print_probe_data(deferred.stop, deferred.handle);
}
fn publish_result_and_collect(
result: &AssertResult,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
try_flush_profraw();
print_assert_result(result);
if crate::vmm::guest_comms::is_guest() {
stash_deferred_probe(stop, handle);
} else {
collect_and_print_probe_data(stop, handle);
}
}
fn collect_and_print_probe_data(
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
let Some(ph) = handle else {
return;
};
stop.store(true, std::sync::atomic::Ordering::Release);
let (events, skeleton_diag, accumulated_fn_names) = match ph.thread.join() {
Ok((Some(events), diag, fnames)) => (events, diag, fnames),
Ok((None, diag, fnames)) => (Vec::new(), diag, fnames),
Err(payload) => {
let msg = if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic>".to_string()
};
let diag = crate::probe::process::ProbeDiagnostics {
host_thread_panic: Some(msg),
..Default::default()
};
(Vec::new(), diag, Vec::new())
}
};
let effective_fn_names = if accumulated_fn_names.is_empty() {
&ph.func_names
} else {
&accumulated_fn_names
};
if !ph.output_done.is_set() {
emit_probe_payload(
&events,
effective_fn_names,
&ph.pipeline_diag,
&skeleton_diag,
&ph.param_names,
&ph.render_hints,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_probe_output_valid_json() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 42)],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "schedule".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("noise\n{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}\nmore");
let parsed = extract_probe_output(&output, None, None);
assert!(parsed.is_some());
let formatted = parsed.unwrap();
assert!(
formatted.contains("schedule"),
"should contain func name: {formatted}"
);
assert!(
formatted.contains("pid"),
"should contain field name: {formatted}"
);
}
#[test]
fn extract_probe_output_missing() {
assert!(extract_probe_output("no markers", None, None).is_none());
}
#[test]
fn extract_probe_output_empty() {
let output = format!("{PROBE_OUTPUT_START}\n\n{PROBE_OUTPUT_END}");
assert!(extract_probe_output(&output, None, None).is_none());
}
#[test]
fn extract_probe_output_invalid_json() {
let output = format!("{PROBE_OUTPUT_START}\nnot valid json\n{PROBE_OUTPUT_END}");
assert!(extract_probe_output(&output, None, None).is_none());
}
#[test]
fn extract_probe_output_enriched_fields() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0xDEAD, 0, 0, 0, 0, 0],
fields: vec![
("prev:task_struct.pid".to_string(), 42),
("prev:task_struct.scx_flags".to_string(), 0x1c),
],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 1,
ts: 200,
args: [0; 6],
fields: vec![("rq:rq.cpu".to_string(), 3)],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![
(0, "schedule".to_string()),
(1, "pick_task_scx".to_string()),
],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}");
let formatted = extract_probe_output(&output, None, None).unwrap();
assert!(formatted.contains("pid"), "pid field: {formatted}");
assert!(formatted.contains("42"), "pid value: {formatted}");
assert!(
formatted.contains("scx_flags"),
"scx_flags field: {formatted}"
);
assert!(formatted.contains("cpu"), "cpu field: {formatted}");
assert!(formatted.contains("3"), "cpu value: {formatted}");
assert!(
formatted.contains("task_struct *prev"),
"type header for task_struct: {formatted}"
);
assert!(
formatted.contains("rq *rq"),
"type header for rq: {formatted}"
);
assert!(
!formatted.contains("arg0"),
"raw args should not appear when fields exist: {formatted}"
);
assert!(formatted.contains("schedule"), "func schedule: {formatted}");
assert!(
formatted.contains("pick_task_scx"),
"func pick_task_scx: {formatted}"
);
}
fn truncate_probe_json(payload: &ProbeBytes, cut_after: usize) -> String {
let json = serde_json::to_string(payload).expect("serialize ProbeBytes");
json[..cut_after.min(json.len())].to_string()
}
#[test]
fn extract_probe_output_truncated_recovers_complete_events() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 0xb,
ts: 200,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 22)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 2,
task_ptr: 0xc,
ts: 300,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 33)],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![
(0, "first".to_string()),
(1, "second".to_string()),
(2, "third".to_string()),
],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let events_start = full.find("\"events\":[").unwrap() + "\"events\":[".len();
let mut depth: u32 = 0;
let mut in_string = false;
let mut escape = false;
let mut event_starts: Vec<usize> = Vec::new();
for (i, b) in full.bytes().enumerate().skip(events_start) {
if in_string {
if escape {
escape = false;
} else if b == b'\\' {
escape = true;
} else if b == b'"' {
in_string = false;
}
continue;
}
match b {
b'"' => in_string = true,
b'{' => {
if depth == 0 {
event_starts.push(i);
}
depth += 1;
}
b'}' => depth = depth.saturating_sub(1),
b']' if depth == 0 => break,
_ => {}
}
}
assert_eq!(
event_starts.len(),
3,
"test fixture should produce 3 events"
);
let cut = event_starts[2] + 5;
let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("payload.partial.json");
let formatted = extract_probe_output(&output, None, Some(&partial))
.expect("recovery must surface partial events");
assert!(
formatted.contains("unknown"),
"recovered events should print under `unknown` func header (no func_names): {formatted}",
);
assert!(
formatted.contains("11"),
"first event's pid value should appear: {formatted}",
);
assert!(
formatted.contains("22"),
"second event's pid value should appear: {formatted}",
);
assert!(
!formatted.contains("33"),
"third (truncated) event's pid value must NOT appear: {formatted}",
);
assert!(
partial.exists(),
"raw truncated payload must be written to partial dump path",
);
let dumped = std::fs::read_to_string(&partial).expect("read partial dump");
assert_eq!(
dumped.trim(),
truncated.trim(),
"partial dump must contain the raw extracted JSON verbatim",
);
}
#[test]
fn extract_probe_output_truncated_no_complete_events_returns_none() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "first".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let events_open = full.find("\"events\":[").unwrap() + "\"events\":[".len();
let cut = events_open + 5;
let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("payload.partial.json");
let result = extract_probe_output(&output, None, Some(&partial));
assert!(
result.is_none(),
"no complete events recoverable should yield None: {result:?}",
);
assert!(
partial.exists(),
"partial dump must be written even when recovery yields zero events",
);
let dumped = std::fs::read_to_string(&partial).expect("read partial dump");
assert_eq!(dumped.trim(), truncated.trim());
}
#[test]
fn extract_probe_output_truncated_string_value_recovers_prior() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 0xb,
ts: 200,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: Some("a".repeat(200)),
..Default::default()
},
],
func_names: vec![(0, "first".to_string()), (1, "second".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let needle = "aaaaaaaaaa"; let idx = full
.find(needle)
.expect("fixture must contain the long string");
let cut = idx + needle.len() + 50; let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let formatted = extract_probe_output(&output, None, None)
.expect("recovery must surface the first complete event");
assert!(
formatted.contains("11"),
"first event's pid must survive truncation in the second event: {formatted}",
);
}
#[test]
fn extract_probe_output_truncated_partial_dump_path_none_skips_write() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 0xb,
ts: 200,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![(0, "f0".to_string()), (1, "f1".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let first_close = full.find("},{").expect("two events present");
let cut = first_close + 3; let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let formatted = extract_probe_output(&output, None, None)
.expect("recovery must yield the first event without a dump path");
assert!(
formatted.contains("11"),
"first event recovered: {formatted}"
);
}
#[test]
fn parse_probe_payload_strict_success() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "schedule".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let parsed = parse_probe_payload(&json, None).expect("strict parse must succeed");
assert_eq!(parsed.events.len(), 1);
assert_eq!(parsed.func_names.len(), 1);
}
#[test]
fn parse_probe_payload_eof_with_dump_path_writes_file_returns_recovered() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let first_close = full.find("}]").expect("single-event array ends with `}]`");
let truncated = &full[..first_close + 1]; let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("dump.json");
let parsed = parse_probe_payload(truncated, Some(&partial))
.expect("EOF with one complete event must recover");
assert_eq!(parsed.events.len(), 1);
assert!(parsed.func_names.is_empty());
assert!(parsed.diagnostics.is_none());
assert!(partial.exists());
assert_eq!(std::fs::read_to_string(&partial).unwrap(), truncated);
}
#[test]
fn parse_probe_payload_non_eof_error_returns_none_and_dumps() {
let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("dump.json");
let result = parse_probe_payload("not json", Some(&partial));
assert!(result.is_none());
assert!(partial.exists());
assert_eq!(std::fs::read_to_string(&partial).unwrap(), "not json");
}
#[test]
fn parse_probe_payload_truncated_no_dump_path_still_recovers() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let first_close = full.find("}]").unwrap();
let truncated = &full[..first_close + 1];
let parsed = parse_probe_payload(truncated, None).expect("recovery without dump path");
assert_eq!(parsed.events.len(), 1);
}
#[test]
fn recover_partial_events_no_events_key_returns_empty() {
assert!(recover_partial_events(r#"{"foo":1}"#).is_empty());
assert!(recover_partial_events("").is_empty());
}
#[test]
fn recover_partial_events_empty_array() {
assert!(recover_partial_events(r#"{"events":[]"#).is_empty());
assert!(recover_partial_events(r#"{"events":["#).is_empty());
}
#[test]
fn recover_partial_events_handles_braces_in_strings() {
use crate::probe::process::ProbeEvent;
let event = ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: Some("contains {nested} and \\\"quoted\\\"".to_string()),
..Default::default()
};
let event_json = serde_json::to_string(&event).unwrap();
let payload = format!(r#"{{"events":[{event_json}]}}"#);
let cut = payload.rfind(']').unwrap();
let truncated = &payload[..cut];
let recovered = recover_partial_events(truncated);
assert_eq!(
recovered.len(),
1,
"one event should recover: {recovered:?}"
);
assert_eq!(
recovered[0].str_val.as_deref(),
Some("contains {nested} and \\\"quoted\\\""),
);
}
#[test]
fn find_balanced_object_end_simple_object() {
assert_eq!(find_balanced_object_end("{}"), Some(2));
assert_eq!(find_balanced_object_end("{}rest"), Some(2));
}
#[test]
fn find_balanced_object_end_nested_objects() {
assert_eq!(find_balanced_object_end(r#"{"a":{"b":{}}}"#), Some(14));
}
#[test]
fn find_balanced_object_end_braces_in_strings_ignored() {
assert_eq!(find_balanced_object_end(r#"{"x":"{{}}"}"#), Some(12));
}
#[test]
fn find_balanced_object_end_escaped_quote_does_not_close_string() {
let s = r#"{"x":"\"}"}"#;
assert_eq!(find_balanced_object_end(s), Some(s.len()));
}
#[test]
fn find_balanced_object_end_truncated_returns_none() {
assert_eq!(find_balanced_object_end(r#"{"a":1"#), None);
}
#[test]
fn find_balanced_object_end_truncated_in_string_returns_none() {
assert_eq!(find_balanced_object_end(r#"{"a":"hello"#), None);
}
#[test]
fn find_balanced_object_end_non_object_returns_none() {
assert_eq!(find_balanced_object_end("[1,2]"), None);
assert_eq!(find_balanced_object_end(""), None);
assert_eq!(find_balanced_object_end("null"), None);
}
#[test]
fn truncate_probe_json_clamps_to_full_length() {
let payload = ProbeBytes {
events: vec![],
func_names: vec![],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let s = truncate_probe_json(&payload, full.len() + 1024);
assert_eq!(s, full);
assert_eq!(truncate_probe_json(&payload, 0), "");
}
#[test]
fn format_tail_empty_text_returns_none() {
assert_eq!(format_tail("", 5, "scheduler"), None);
}
#[test]
fn format_tail_fewer_lines_than_n_returns_all() {
let out = format_tail("one\ntwo\nthree", 10, "scheduler").unwrap();
assert_eq!(out, "--- scheduler ---\none\ntwo\nthree");
}
#[test]
fn format_tail_trims_to_last_n_lines() {
let out = format_tail("1\n2\n3\n4\n5", 3, "log").unwrap();
assert_eq!(out, "--- log ---\n3\n4\n5");
}
#[test]
fn format_tail_zero_n_returns_empty_body_under_header() {
let out = format_tail("a\nb", 0, "hdr").unwrap();
assert_eq!(out, "--- hdr ---\n");
}
#[test]
fn format_tail_preserves_trailing_blank_lines() {
let out = format_tail("a\n\nb", 3, "hdr").unwrap();
assert_eq!(out, "--- hdr ---\na\n\nb");
}
#[test]
fn parse_rust_env_empty_cmdline_is_empty() {
assert!(parse_rust_env_from_cmdline("").is_empty());
}
#[test]
fn parse_rust_env_no_matches() {
assert!(parse_rust_env_from_cmdline("console=ttyS0 ro quiet").is_empty());
}
#[test]
fn parse_rust_env_backtrace_only() {
let parsed = parse_rust_env_from_cmdline("console=ttyS0 RUST_BACKTRACE=1 ro");
assert_eq!(parsed, vec![("RUST_BACKTRACE", "1")]);
}
#[test]
fn parse_rust_env_log_only() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=debug other=x");
assert_eq!(parsed, vec![("RUST_LOG", "debug")]);
}
#[test]
fn parse_rust_env_both() {
let parsed = parse_rust_env_from_cmdline("RUST_BACKTRACE=full RUST_LOG=trace other=y");
assert_eq!(
parsed,
vec![("RUST_BACKTRACE", "full"), ("RUST_LOG", "trace")]
);
}
#[test]
fn parse_rust_env_preserves_token_order() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=info RUST_BACKTRACE=1");
assert_eq!(parsed, vec![("RUST_LOG", "info"), ("RUST_BACKTRACE", "1")]);
}
#[test]
fn parse_rust_env_empty_value() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=");
assert_eq!(parsed, vec![("RUST_LOG", "")]);
}
#[test]
fn parse_rust_env_ignores_prefix_mismatch() {
assert!(parse_rust_env_from_cmdline("xRUST_LOG=x").is_empty());
}
#[test]
fn parse_rust_env_sidecar_dir() {
let parsed = parse_rust_env_from_cmdline(
"console=ttyS0 KTSTR_SIDECAR_DIR=/host/target/ktstr/run-key ro",
);
assert_eq!(
parsed,
vec![("KTSTR_SIDECAR_DIR", "/host/target/ktstr/run-key")]
);
}
#[test]
fn parse_rust_env_all_three_keys() {
let parsed =
parse_rust_env_from_cmdline("RUST_LOG=info KTSTR_SIDECAR_DIR=/dir RUST_BACKTRACE=1");
assert_eq!(
parsed,
vec![
("RUST_LOG", "info"),
("KTSTR_SIDECAR_DIR", "/dir"),
("RUST_BACKTRACE", "1")
]
);
}
fn lifecycle_drain(
phase: crate::vmm::wire::LifecyclePhase,
reason: &str,
) -> crate::vmm::host_comms::BulkDrainResult {
let mut payload = vec![phase.wire_value()];
payload.extend_from_slice(reason.as_bytes());
crate::vmm::host_comms::BulkDrainResult {
entries: vec![crate::vmm::wire::ShmEntry {
msg_type: crate::vmm::wire::MSG_TYPE_LIFECYCLE,
payload,
crc_ok: true,
}],
}
}
#[test]
fn extract_not_attached_reason_timeout() {
let drain = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"timeout",
);
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("timeout"),
);
}
#[test]
fn extract_not_attached_reason_sysfs_absent() {
let drain = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"sched_ext sysfs absent",
);
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("sched_ext sysfs absent"),
);
}
#[test]
fn extract_not_attached_reason_trims_surrounding_whitespace() {
let drain = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
" timeout ",
);
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("timeout"),
);
}
#[test]
fn extract_not_attached_reason_absent_returns_none() {
assert_eq!(extract_not_attached_reason(None), None);
let died = lifecycle_drain(crate::vmm::wire::LifecyclePhase::SchedulerDied, "");
assert_eq!(extract_not_attached_reason(Some(&died)), None);
}
#[test]
fn extract_not_attached_reason_empty_suffix_returns_none() {
let drain = lifecycle_drain(crate::vmm::wire::LifecyclePhase::SchedulerNotAttached, "");
assert_eq!(extract_not_attached_reason(Some(&drain)), None);
let drain_ws = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
" ",
);
assert_eq!(extract_not_attached_reason(Some(&drain_ws)), None);
}
#[test]
fn extract_not_attached_reason_first_match_wins() {
let drain = crate::vmm::host_comms::BulkDrainResult {
entries: vec![
lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"timeout",
)
.entries
.pop()
.unwrap(),
lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"sched_ext sysfs absent",
)
.entries
.pop()
.unwrap(),
],
};
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("timeout"),
);
}
#[test]
fn extract_not_attached_reason_skips_crc_bad() {
let mut bad = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"timeout",
);
bad.entries[0].crc_ok = false;
assert_eq!(extract_not_attached_reason(Some(&bad)), None);
}
fn died_drain() -> crate::vmm::host_comms::BulkDrainResult {
lifecycle_drain(crate::vmm::wire::LifecyclePhase::SchedulerDied, "")
}
fn not_attached_drain(reason: &str) -> crate::vmm::host_comms::BulkDrainResult {
lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
reason,
)
}
#[test]
fn classify_repro_vm_status_timeout_wins_over_other_signals() {
let drain = not_attached_drain("timeout");
let status = classify_repro_vm_status(
true,
true,
"",
137,
Some(&drain),
);
assert_eq!(status, "repro VM: timed out");
}
#[test]
fn classify_repro_vm_status_not_attached_with_reason() {
let drain = not_attached_drain("sched_ext sysfs absent");
let status = classify_repro_vm_status(false, false, "", 1, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler did not attach (sched_ext sysfs absent) (exit code 1)",
);
}
#[test]
fn classify_repro_vm_status_not_attached_takes_precedence_over_crashed() {
let mut drain = died_drain();
drain
.entries
.push(not_attached_drain("timeout").entries.pop().unwrap());
let status = classify_repro_vm_status(false, true, "", 1, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler did not attach (timeout) (exit code 1)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", 139, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler crashed — exited with non-zero status (139)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_crash_message() {
let status = classify_repro_vm_status(false, true, "no sentinels here", 134, None);
assert_eq!(
status,
"repro VM: scheduler crashed — exited with non-zero status (134)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_qemu_clean_exit() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", 0, Some(&drain));
assert_eq!(status, "repro VM: scheduler crashed — exited cleanly");
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_killed_by_signal() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", -9, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler crashed — killed by signal (-9)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_vmm_exit_code_unset() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", -1, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler crashed — VM host reported no final exit \
status (the scheduler did not deliver an exit signal before \
the VM ended)",
);
assert!(
!status.contains("BSP"),
"user-facing status leaks BSP: {status}"
);
assert!(
!status.contains("VmResult::exit_code"),
"user-facing status leaks VmResult::exit_code: {status}",
);
assert!(
!status.contains("MSG_TYPE_EXIT"),
"user-facing status leaks MSG_TYPE_EXIT: {status}",
);
}
#[test]
fn classify_repro_vm_status_abnormal_exit() {
let status = classify_repro_vm_status(false, false, "clean output", 2, None);
assert_eq!(status, "repro VM: exited abnormally (exit code 2)");
}
#[test]
fn classify_repro_vm_status_clean_run() {
let status = classify_repro_vm_status(false, false, "clean output", 0, None);
assert_eq!(
status,
"repro VM: scheduler ran normally (crash did not reproduce)",
);
}
#[test]
fn classify_repro_vm_status_malformed_not_attached_falls_through() {
let drain = not_attached_drain("");
let status = classify_repro_vm_status(false, false, "", 1, Some(&drain));
assert_eq!(status, "repro VM: exited abnormally (exit code 1)");
}
#[test]
fn render_failure_dump_file_missing_returns_none() {
let nonexistent = std::env::temp_dir().join("ktstr-render-failure-dump-missing");
let _ = std::fs::remove_file(&nonexistent);
assert!(render_failure_dump_file(&nonexistent).is_none());
}
#[test]
fn render_failure_dump_file_single_schema() {
use crate::monitor::dump::{FailureDumpReport, SCHEMA_SINGLE};
let report = FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
..Default::default()
};
let json = serde_json::to_string(&report).expect("serialize single");
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
let rendered =
render_failure_dump_file(tmp.path()).expect("single-schema must render Some");
assert!(
rendered.starts_with("--- repro VM failure dump ---"),
"header missing: {rendered}"
);
assert!(
rendered.contains("(empty failure dump)"),
"single-schema body must come from FailureDumpReport Display: {rendered}"
);
}
#[test]
fn render_failure_dump_file_dual_schema() {
use crate::monitor::dump::{DualFailureDumpReport, FailureDumpReport, SCHEMA_DUAL};
let dual = DualFailureDumpReport {
schema: SCHEMA_DUAL.to_string(),
early: None,
late: FailureDumpReport::default(),
early_max_age_jiffies: 0,
early_threshold_jiffies: 0,
early_skipped_reason: None,
};
let json = serde_json::to_string(&dual).expect("serialize dual");
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
let rendered = render_failure_dump_file(tmp.path()).expect("dual-schema must render Some");
assert!(
rendered.starts_with("--- repro VM failure dump ---"),
"header missing: {rendered}"
);
assert!(
rendered.contains("DualFailureDumpReport:"),
"dual-schema body must come from DualFailureDumpReport Display: {rendered}"
);
}
#[test]
fn render_failure_dump_file_absent_schema_defaults_to_single() {
let json = r#"{"maps":[],"vcpu_regs":[],"sdt_allocations":[]}"#;
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
let rendered =
render_failure_dump_file(tmp.path()).expect("absent-schema JSON must render as single");
assert!(
rendered.contains("(empty failure dump)"),
"absent schema must default to single: {rendered}"
);
}
#[test]
fn render_failure_dump_file_unknown_schema_returns_none() {
let json = r#"{"schema":"triple","maps":[],"vcpu_regs":[],"sdt_allocations":[]}"#;
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
assert!(render_failure_dump_file(tmp.path()).is_none());
}
#[test]
fn render_failure_dump_file_invalid_json_returns_none() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), "not json").expect("write tempfile");
assert!(render_failure_dump_file(tmp.path()).is_none());
}
fn diag_with_events(
before: u32,
after: u32,
trigger_fires: u64,
exit_kind: u32,
) -> crate::probe::process::ProbeDiagnostics {
crate::probe::process::ProbeDiagnostics {
events_before_stitch: before,
events_after_stitch: after,
bpf_trigger_fires: trigger_fires,
bpf_exit_kind_snap: exit_kind,
..Default::default()
}
}
#[test]
fn stitch_drop_cause_trigger_never_fired() {
let diag = diag_with_events(146, 0, 0, 0);
let cause = stitch_drop_cause(&diag);
assert!(
cause.contains("trigger never fired"),
"expected 'trigger never fired' branch, got: {cause}"
);
}
#[test]
fn stitch_drop_cause_kind_stall() {
use crate::probe::scx_defs::EXIT_ERROR_STALL;
let diag = diag_with_events(146, 0, 1, EXIT_ERROR_STALL as u32);
let cause = stitch_drop_cause(&diag);
assert!(
cause.contains("kind=STALL"),
"expected 'kind=STALL' branch, got: {cause}"
);
}
#[test]
fn stitch_drop_cause_kind_error_generic() {
use crate::probe::scx_defs::EXIT_ERROR;
let diag = diag_with_events(146, 0, 1, EXIT_ERROR as u32);
let cause = stitch_drop_cause(&diag);
assert!(
cause.contains("kind=ERROR"),
"expected 'kind=ERROR' branch, got: {cause}"
);
assert!(
!cause.contains("kind=STALL"),
"kind=ERROR branch must not say STALL: {cause}"
);
assert!(
!cause.contains("kind=BPF_ERROR"),
"kind=ERROR branch must not say BPF_ERROR: {cause}"
);
}
#[test]
fn stitch_drop_cause_kind_bpf_error() {
use crate::probe::scx_defs::EXIT_ERROR_BPF;
let diag = diag_with_events(146, 0, 1, EXIT_ERROR_BPF as u32);
let cause = stitch_drop_cause(&diag);
assert!(
cause.contains("kind=BPF_ERROR"),
"expected 'kind=BPF_ERROR' branch, got: {cause}"
);
assert!(
cause.contains("file a ticket") || cause.contains("ID mismatch"),
"kind=BPF_ERROR branch must signal a suspected bug: {cause}"
);
}
#[test]
fn stitch_drop_cause_unrecognized_kind() {
let diag = diag_with_events(146, 0, 1, 9999);
let cause = stitch_drop_cause(&diag);
assert!(
cause.contains("unrecognized") || cause.contains("no causal"),
"unrecognized-kind branch must surface a diagnostic, got: {cause}"
);
}
#[test]
fn format_probe_diagnostics_appends_cause_when_zero_after_stitch() {
let pipeline = PipelineDiagnostics::default();
let skeleton = diag_with_events(146, 0, 0, 0);
let rendered = format_probe_diagnostics(&pipeline, &skeleton);
assert!(
rendered.contains("146 captured, 0 after stitch"),
"rendered output missing counter pair: {rendered}"
);
assert!(
rendered.contains("trigger never fired"),
"rendered output missing cause explanation: {rendered}"
);
}
#[test]
fn format_probe_diagnostics_appends_kind_stall_diagnostic() {
use crate::probe::scx_defs::EXIT_ERROR_STALL;
let pipeline = PipelineDiagnostics::default();
let skeleton = diag_with_events(146, 0, 1, EXIT_ERROR_STALL as u32);
let rendered = format_probe_diagnostics(&pipeline, &skeleton);
assert!(
rendered.contains("kind=STALL"),
"rendered output missing kind=STALL diagnostic: {rendered}"
);
}
#[test]
fn format_probe_diagnostics_no_cause_when_clean_run() {
let pipeline = PipelineDiagnostics::default();
let skeleton = diag_with_events(0, 0, 0, 0);
let rendered = format_probe_diagnostics(&pipeline, &skeleton);
assert!(
rendered.contains("0 captured, 0 after stitch"),
"missing zero-zero counter line: {rendered}"
);
assert!(
!rendered.contains("trigger never fired"),
"must not append cause for clean run: {rendered}"
);
assert!(
!rendered.contains("kind=STALL"),
"must not append cause for clean run: {rendered}"
);
}
#[test]
fn format_probe_diagnostics_no_cause_when_stitch_succeeded() {
let pipeline = PipelineDiagnostics::default();
let skeleton = diag_with_events(146, 100, 1, 1025);
let rendered = format_probe_diagnostics(&pipeline, &skeleton);
assert!(
rendered.contains("146 captured, 100 after stitch"),
"missing counter line: {rendered}"
);
assert!(
!rendered.contains("trigger never fired"),
"must not append cause when stitch succeeded: {rendered}"
);
assert!(
!rendered.contains("kind=STALL"),
"must not append cause when stitch succeeded: {rendered}"
);
}
#[test]
fn format_probe_diagnostics_appends_fallback_marker() {
let pipeline = PipelineDiagnostics::default();
let skeleton = crate::probe::process::ProbeDiagnostics {
events_before_stitch: 146,
events_after_stitch: 80,
stitch_fallback_used: true,
bpf_trigger_fires: 0,
bpf_exit_kind_snap: 0,
..Default::default()
};
let rendered = format_probe_diagnostics(&pipeline, &skeleton);
assert!(
rendered.contains("trigger absent") && rendered.contains("frequency"),
"fallback marker missing from rendered output: {rendered}"
);
}
#[test]
fn classify_dmesg_corruption_empty_text() {
let diag = classify_dmesg_corruption("");
assert!(diag.is_some());
assert!(diag.unwrap().contains("empty"));
}
#[test]
fn classify_dmesg_corruption_only_whitespace() {
let diag = classify_dmesg_corruption(" \n\n\t \n");
assert!(diag.is_some());
assert!(diag.unwrap().contains("empty"));
}
#[test]
fn classify_dmesg_corruption_only_replacement_chars() {
let diag = classify_dmesg_corruption("\u{fffd}\u{fffd}\u{fffd}");
assert!(diag.is_some());
assert!(diag.unwrap().contains("corrupt"));
}
#[test]
fn classify_dmesg_corruption_only_control_chars() {
let diag = classify_dmesg_corruption("\0\0\0");
assert!(diag.is_some());
assert!(diag.unwrap().contains("corrupt"));
let diag = classify_dmesg_corruption("\0\x01\x02\x07\x08");
assert!(diag.is_some());
assert!(diag.unwrap().contains("corrupt"));
let diag = classify_dmesg_corruption("\x7f\x7f");
assert!(diag.is_some());
assert!(diag.unwrap().contains("corrupt"));
}
#[test]
fn classify_dmesg_corruption_only_replacement_and_control_mix() {
let diag = classify_dmesg_corruption("\u{fffd}\0\u{fffd}");
assert!(diag.is_some());
assert!(diag.unwrap().contains("corrupt"));
}
#[test]
fn classify_dmesg_corruption_latin1_text_passes_through() {
for ch in ['\u{c0}', '\u{e9}', '\u{f1}', '\u{ff}'] {
let s: String = std::iter::repeat_n(ch, 5).collect();
let diag = classify_dmesg_corruption(&s);
assert!(
diag.is_none(),
"Latin-1 char U+{:04X} must NOT be classified as corrupt: {diag:?}",
ch as u32,
);
}
}
#[test]
fn classify_dmesg_corruption_real_kernel_text_passes_through() {
let diag = classify_dmesg_corruption("[ 0.000000] Linux version 6.16.0\n");
assert!(
diag.is_none(),
"real kernel text must not be classified as corrupt: {diag:?}"
);
}
#[test]
fn classify_dmesg_corruption_one_control_amid_text_passes_through() {
let diag = classify_dmesg_corruption("[0.1] Linux\u{1}version");
assert!(
diag.is_none(),
"one control char amid real text is not corruption: {diag:?}"
);
}
#[test]
fn classify_dmesg_corruption_mixed_garbage_and_text_passes_through() {
let diag = classify_dmesg_corruption("\u{fffd}A\u{fffd}");
assert!(diag.is_none());
}
#[test]
fn render_dmesg_tail_filter_empties_non_empty_stderr_emits_pointer_diag() {
let stderr = "[ 0.5] sched_ext_dump: header\n\
[ 0.6] sched_ext_dump: body line A\n\
[ 0.7] sched_ext_dump: body line B\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("--- repro VM dmesg ---"),
"tail must carry the section header: {tail}",
);
assert!(
tail.contains("no kernel printk other than sched_ext_dump"),
"tail must point operators at the sched_ext_dump section, \
not falsely report a crash: {tail}",
);
assert!(
!tail.contains("scheduler crashed"),
"filter-emptied real output must NOT surface the crash \
classifier's diagnostic: {tail}",
);
}
#[test]
fn render_dmesg_tail_truly_empty_stderr_emits_crash_diagnostic() {
let tail = render_dmesg_tail("", 40);
assert!(
tail.contains("scheduler crashed before kernel printk"),
"genuinely-empty stderr must surface the crash diagnostic: {tail}",
);
}
#[test]
fn render_dmesg_tail_real_kernel_text_passes_through_to_format_tail() {
let stderr = "[ 0.1] Linux version 6.16.0\n\
[ 0.5] sched_ext_dump: header\n\
[ 0.6] sched_ext_dump: body\n\
[ 0.9] systemd: starting\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("Linux version 6.16.0"),
"real kernel text must survive the filter: {tail}",
);
assert!(
tail.contains("systemd: starting"),
"non-dump lines must survive the filter: {tail}",
);
assert!(
!tail.contains("sched_ext_dump"),
"dump lines must be stripped (rendered separately): {tail}",
);
assert!(
!tail.contains("scheduler crashed"),
"real kernel text must NOT surface the crash diagnostic: {tail}",
);
}
#[test]
fn render_dmesg_tail_only_whitespace_emits_crash_diagnostic() {
let tail = render_dmesg_tail(" \n\n\t \n", 40);
assert!(
tail.contains("scheduler crashed before kernel printk"),
"whitespace-only stderr must surface the crash diagnostic: {tail}",
);
}
#[test]
fn render_dmesg_tail_dump_plus_replacement_noise_emits_pointer_diag() {
let stderr = "[1] sched_ext_dump: header\n\u{fffd}\u{fffd}\u{fffd}\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("no kernel printk other than sched_ext_dump"),
"filter-dropped-real-lines + U+FFFD residue must point at \
the dump section, not surface the crash diagnostic: {tail}",
);
assert!(
!tail.contains("scheduler crashed"),
"must NOT misclassify residue as a crash when real dump lines \
were filtered: {tail}",
);
}
#[test]
fn render_dmesg_tail_dump_plus_control_noise_emits_pointer_diag() {
let stderr = "[ 0.5] sched_ext_dump: header\n\0\0\0\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("no kernel printk other than sched_ext_dump"),
"control-char residue with dump lines must point at the dump \
section: {tail}",
);
}
#[test]
fn render_dmesg_tail_dump_plus_whitespace_emits_pointer_diag() {
let stderr = "[1.0] sched_ext_dump: dump\n \n\t\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("no kernel printk other than sched_ext_dump"),
"whitespace residue with dump lines must point at the dump \
section, not surface the crash diagnostic: {tail}",
);
assert!(
!tail.contains("scheduler crashed"),
"must NOT report a crash when real dump lines were filtered: {tail}",
);
}
#[test]
fn render_dmesg_tail_pure_corruption_no_dump_emits_corrupt_diag() {
let tail = render_dmesg_tail("\u{fffd}\u{fffd}\u{fffd}", 40);
assert!(
tail.contains("scheduler crashed") || tail.contains("UART buffer"),
"pure-corruption stderr must surface the corruption diagnostic: {tail}",
);
}
#[test]
fn render_dmesg_tail_pure_whitespace_no_dump_emits_empty_diag() {
let tail = render_dmesg_tail(" \n\t\n", 40);
assert!(
tail.contains("scheduler crashed before kernel printk"),
"whitespace-only stderr (no dump line) must surface the \
crash diagnostic: {tail}",
);
}
#[test]
fn render_dmesg_tail_latin1_residue_no_dump_passes_through() {
let stderr = "[0.1] hw vendor: foo\u{ff}bar\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("foo\u{ff}bar"),
"Latin-1 residue must format-tail-render unchanged: {tail}"
);
assert!(
!tail.contains("scheduler crashed"),
"Latin-1 residue must NOT trigger the corruption diag: {tail}"
);
}
#[test]
fn render_dmesg_tail_uses_tightened_marker() {
let stderr = "[ 0.1] BUG in sched_ext_dump_disable callback\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("BUG in sched_ext_dump_disable callback"),
"non-marker line (no colon after) must survive the filter: {tail}",
);
}
#[test]
fn render_dmesg_tail_bug_line_and_real_dump_split_correctly() {
let stderr = "[ 0.1] BUG in sched_ext_dump_disable callback\n\
ktstr-0 [001] 0.5: sched_ext_dump: scheduler state\n";
let tail = render_dmesg_tail(stderr, 40);
assert!(
tail.contains("BUG in sched_ext_dump_disable callback"),
"BUG line must survive the filter and land in dmesg: {tail}",
);
assert!(
!tail.contains("scheduler state"),
"real dump line must be stripped from dmesg (it goes to \
the dump section rendered separately): {tail}",
);
}
#[test]
fn extract_probe_output_emits_kind_stall_diagnostic_end_to_end() {
use crate::probe::process::ProbeDiagnostics;
use crate::probe::scx_defs::EXIT_ERROR_STALL;
let skeleton = ProbeDiagnostics {
events_before_stitch: 146,
events_after_stitch: 0,
bpf_trigger_fires: 1,
bpf_exit_kind_snap: EXIT_ERROR_STALL as u32,
..Default::default()
};
let payload = ProbeBytes {
events: Vec::new(),
func_names: Vec::new(),
bpf_source_locs: Default::default(),
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: PipelineDiagnostics::default(),
skeleton,
}),
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("noise\n{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}\n");
let formatted = extract_probe_output(&output, None, None)
.expect("ProbeBytes with diagnostics must produce some output");
assert!(
formatted.contains("--- probe pipeline ---"),
"missing pipeline header: {formatted}"
);
assert!(
formatted.contains("146 captured, 0 after stitch"),
"missing events counter line: {formatted}"
);
assert!(
formatted.contains("kind=STALL"),
"missing kind=STALL diagnostic in end-to-end output: {formatted}"
);
}
#[test]
fn extract_probe_output_emits_trigger_never_fired_end_to_end() {
use crate::probe::process::ProbeDiagnostics;
let skeleton = ProbeDiagnostics {
events_before_stitch: 146,
events_after_stitch: 0,
bpf_trigger_fires: 0,
bpf_exit_kind_snap: 0,
bpf_kprobe_fires: 16567,
bpf_meta_misses: 0,
..Default::default()
};
let payload = ProbeBytes {
events: Vec::new(),
func_names: Vec::new(),
bpf_source_locs: Default::default(),
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: PipelineDiagnostics::default(),
skeleton,
}),
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}");
let formatted = extract_probe_output(&output, None, None)
.expect("ProbeBytes with diagnostics must produce some output");
assert!(
formatted.contains("trigger never fired"),
"missing 'trigger never fired' diagnostic: {formatted}"
);
assert!(
formatted.contains("16567 kprobe fires"),
"missing bpf_counts line: {formatted}"
);
}
#[test]
fn extract_probe_output_emits_fallback_marker_end_to_end() {
use crate::probe::process::{ProbeDiagnostics, ProbeEvent};
let skeleton = ProbeDiagnostics {
events_before_stitch: 146,
events_after_stitch: 80,
stitch_fallback_used: true,
bpf_trigger_fires: 0,
..Default::default()
};
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: Vec::new(),
kstack: Vec::new(),
str_val: None,
..Default::default()
}],
func_names: vec![(0, "schedule".to_string())],
bpf_source_locs: Default::default(),
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: PipelineDiagnostics::default(),
skeleton,
}),
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}");
let formatted = extract_probe_output(&output, None, None)
.expect("ProbeBytes with events must produce output");
assert!(
formatted.contains("trigger absent") && formatted.contains("frequency"),
"fallback marker missing from end-to-end output: {formatted}"
);
}
#[test]
fn deferred_probe_stash_take_invariants() {
static SERIALISE: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = SERIALISE.lock().unwrap();
let _ = take_deferred_probe();
assert!(
take_deferred_probe().is_none(),
"empty stash must yield None"
);
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
stash_deferred_probe(stop.clone(), None);
let taken = take_deferred_probe().expect("stash must round-trip");
assert!(
!taken.stop.load(std::sync::atomic::Ordering::Relaxed),
"stop flag must round-trip with original value"
);
assert!(taken.handle.is_none(), "handle round-trip preserved None");
assert!(
take_deferred_probe().is_none(),
"drained: subsequent take must return None"
);
let stop_a = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_b = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
stash_deferred_probe(stop_a, None);
stash_deferred_probe(stop_b, None);
let taken = take_deferred_probe().expect("stash present");
assert!(
taken.stop.load(std::sync::atomic::Ordering::Relaxed),
"second stash must win — got stop_a value instead of stop_b"
);
let _ = take_deferred_probe();
}
}