use std::path::Path;
use std::time::Duration;
use crate::assert::AssertResult;
use super::args::{
extract_probe_stack_arg, extract_test_fn_arg, extract_work_type_arg, resolve_cgroup_root,
};
use super::entry::find_test;
use super::output::{extract_sched_ext_dump, print_assert_result};
use super::profraw::try_flush_profraw;
use super::runtime::{config_file_parts, verbose};
use super::{KtstrTestEntry, TopoOverride};
use crate::verifier::{SCHED_OUTPUT_END, SCHED_OUTPUT_START, parse_sched_output};
const DISCOVER_SENTINEL: &str = "__discover__";
pub(crate) fn propagate_rust_env_from_cmdline() {
let Ok(cmdline) = std::fs::read_to_string("/proc/cmdline") else {
return;
};
for (key, val) in parse_rust_env_from_cmdline(&cmdline) {
unsafe { std::env::set_var(key, val) };
}
}
fn parse_rust_env_from_cmdline(cmdline: &str) -> Vec<(&'static str, &str)> {
let mut out = Vec::new();
for token in cmdline.split_whitespace() {
if let Some(val) = token.strip_prefix("RUST_BACKTRACE=") {
out.push(("RUST_BACKTRACE", val));
} else if let Some(val) = token.strip_prefix("RUST_LOG=") {
out.push(("RUST_LOG", val));
}
}
out
}
pub(crate) const PROBE_OUTPUT_START: &str = "===PROBE_OUTPUT_START===";
pub(crate) const PROBE_OUTPUT_END: &str = "===PROBE_OUTPUT_END===";
fn format_tail(text: &str, n: usize, header: &str) -> Option<String> {
let lines: Vec<&str> = text.lines().collect();
if lines.is_empty() {
return None;
}
let start = lines.len().saturating_sub(n);
Some(format!("--- {header} ---\n{}", lines[start..].join("\n")))
}
fn classify_repro_vm_status(
timed_out: bool,
has_crash_message: bool,
output: &str,
exit_code: i32,
) -> String {
if timed_out {
return "repro VM: timed out".to_string();
}
if let Some(reason) = extract_not_attached_reason(output) {
return format!("repro VM: scheduler did not attach ({reason}) (exit code {exit_code})",);
}
if has_crash_message || output.contains(super::SENTINEL_SCHEDULER_DIED) {
let exit_clause = if exit_code == -1 {
"VM host reported no final exit status (the scheduler did not \
deliver an exit signal before the VM ended)"
.to_string()
} else if exit_code < 0 {
format!("killed by signal ({exit_code})")
} else if exit_code == 0 {
"exited cleanly".to_string()
} else {
format!("exited with non-zero status ({exit_code})")
};
return format!("repro VM: scheduler crashed — {exit_clause}");
}
if exit_code != 0 {
return format!("repro VM: exited abnormally (exit code {exit_code})");
}
"repro VM: scheduler ran normally (crash did not reproduce)".to_string()
}
fn extract_not_attached_reason(output: &str) -> Option<&str> {
let line = output
.lines()
.find(|l| l.contains(super::SENTINEL_SCHEDULER_NOT_ATTACHED))?;
let idx = line.find(super::SENTINEL_SCHEDULER_NOT_ATTACHED)?;
let after = &line[idx + super::SENTINEL_SCHEDULER_NOT_ATTACHED.len()..];
let reason = after.strip_prefix(':')?.trim();
if reason.is_empty() {
return None;
}
Some(reason)
}
pub(crate) fn attempt_auto_repro(
entry: &KtstrTestEntry,
kernel: &Path,
scheduler: Option<&Path>,
ktstr_bin: &Path,
first_vm_output: &str,
console_output: &str,
topo: Option<&TopoOverride>,
) -> Option<String> {
use crate::probe::stack::extract_stack_functions_all;
eprintln!(
"ktstr_test: auto-repro: COM2 length={} has_sched_start={} has_sched_end={}",
first_vm_output.len(),
first_vm_output.contains(SCHED_OUTPUT_START),
first_vm_output.contains(SCHED_OUTPUT_END),
);
let sched_output = parse_sched_output(first_vm_output);
let stack_funcs = if let Some(sched) = sched_output {
let funcs = extract_stack_functions_all(sched);
if funcs.is_empty() {
eprintln!("ktstr_test: auto-repro: no functions from COM2, trying COM1");
extract_stack_functions_all(console_output)
} else {
funcs
}
} else {
eprintln!("ktstr_test: auto-repro: no scheduler output on COM2, trying COM1");
extract_stack_functions_all(console_output)
};
let func_names: Vec<String> = stack_funcs.iter().map(|f| f.raw_name.clone()).collect();
let probe_arg = if func_names.is_empty() {
eprintln!("ktstr_test: auto-repro: no stack functions, using BPF discovery in repro VM");
format!("--ktstr-probe-stack={DISCOVER_SENTINEL}")
} else {
eprintln!(
"ktstr_test: auto-repro: probing {} functions in second VM",
func_names.len()
);
format!("--ktstr-probe-stack={}", func_names.join(","))
};
let guest_args = vec![
"run".to_string(),
"--ktstr-test-fn".to_string(),
entry.name.to_string(),
probe_arg,
];
let cmdline_extra = super::runtime::build_cmdline_extra(entry);
let (vm_topology, memory_mb) = super::runtime::resolve_vm_topology(entry, topo);
let no_perf_mode = std::env::var("KTSTR_NO_PERF_MODE").is_ok();
let mut builder = super::runtime::build_vm_builder_base(
entry,
kernel,
ktstr_bin,
scheduler,
vm_topology,
memory_mb,
&cmdline_extra,
&guest_args,
no_perf_mode,
);
{
let mut args: Vec<String> = Vec::new();
if let Some((archive_path, host_path, guest_path)) = config_file_parts(entry) {
builder = builder.include_files(vec![(archive_path, host_path)]);
args.push("--config".to_string());
args.push(guest_path);
}
super::runtime::append_base_sched_args(entry, &mut args);
if !args.is_empty() {
builder = builder.sched_args(&args);
}
}
let vm = match builder.build() {
Ok(vm) => vm,
Err(e) => {
eprintln!("ktstr_test: auto-repro: failed to build VM: {e:#}");
return None;
}
};
let repro_result = match vm.run() {
Ok(r) => r,
Err(e) => {
eprintln!("ktstr_test: auto-repro: VM run failed: {e:#}");
return None;
}
};
if verbose() {
eprintln!(
"ktstr_test: auto-repro: COM1 stderr length={} COM2 stdout length={}",
repro_result.stderr.len(),
repro_result.output.len(),
);
for line in repro_result.stderr.lines() {
eprintln!(" repro-vm-com1: {line}");
}
let mut in_probe = false;
for line in repro_result.output.lines() {
if line.contains("ktstr_test: probe:") {
in_probe = true;
}
if in_probe {
eprintln!(" repro-vm-com2: {line}");
}
}
}
let kernel_dir = crate::kernel_path::derive_kernel_dir(kernel)
.map(|dir| crate::cache::prefer_source_tree_for_dwarf(&dir).unwrap_or(dir))
.and_then(|p| p.to_str().map(String::from));
let kernel_dir_str = kernel_dir.as_deref();
let probe_section = extract_probe_output(&repro_result.output, kernel_dir_str);
const REPRO_TAIL_LINES: usize = 40;
let sched_log_tail = parse_sched_output(&repro_result.output).and_then(|log| {
let collapsed = crate::verifier::collapse_cycles(log);
format_tail(&collapsed, REPRO_TAIL_LINES, "repro VM scheduler log")
});
let dump_tail = extract_sched_ext_dump(&repro_result.stderr)
.and_then(|dump| format_tail(&dump, REPRO_TAIL_LINES, "repro VM sched_ext dump"));
let dmesg_filtered: String = repro_result
.stderr
.lines()
.filter(|l| !l.contains("sched_ext_dump"))
.collect::<Vec<_>>()
.join("\n");
let dmesg_tail = format_tail(&dmesg_filtered, REPRO_TAIL_LINES, "repro VM dmesg");
let tails: Vec<String> = [sched_log_tail, dump_tail, dmesg_tail]
.into_iter()
.flatten()
.collect();
if probe_section.is_none() && tails.is_empty() {
return None;
}
let has_probe = probe_section.is_some();
let mut out = probe_section.unwrap_or_default();
if !has_probe {
out.push_str(&classify_repro_vm_status(
repro_result.timed_out,
repro_result.crash_message.is_some(),
&repro_result.output,
repro_result.exit_code,
));
}
if !out.is_empty() {
out.push('\n');
}
out.push_str(&format!(
"repro VM duration: {:.1}s",
repro_result.duration.as_secs_f64(),
));
for tail in &tails {
out.push_str("\n\n");
out.push_str(tail);
}
Some(out)
}
pub(crate) fn extract_probe_output(output: &str, kernel_dir: Option<&str>) -> Option<String> {
let json = crate::probe::output::extract_section(output, PROBE_OUTPUT_START, PROBE_OUTPUT_END);
if json.is_empty() {
return None;
}
let payload: ProbeBytes = match serde_json::from_str(&json) {
Ok(p) => p,
Err(e) => {
eprintln!("ktstr_test: probe payload deserialize failed: {e}");
return None;
}
};
let mut out = String::new();
if let Some(ref diag) = payload.diagnostics {
out.push_str(&format_probe_diagnostics(&diag.pipeline, &diag.skeleton));
}
if payload.events.is_empty() {
if out.is_empty() {
return None;
}
return Some(out);
}
out.push_str(&crate::probe::output::format_probe_events_with_bpf_locs(
&payload.events,
&payload.func_names,
kernel_dir,
&payload.bpf_source_locs,
payload.nr_cpus,
&payload.param_names,
&payload.render_hints,
));
Some(out)
}
pub(crate) fn format_probe_diagnostics(
pipeline: &PipelineDiagnostics,
skeleton: &crate::probe::process::ProbeDiagnostics,
) -> String {
let mut out = String::new();
out.push_str("--- probe pipeline ---\n");
out.push_str(&format!(
" extracted: {} functions from crash backtrace\n",
pipeline.stack_extracted,
));
let passed = pipeline.stack_extracted as usize - pipeline.filter_dropped.len();
if pipeline.filter_dropped.is_empty() {
out.push_str(&format!(" traceable: {passed} passed filter\n"));
} else {
out.push_str(&format!(
" traceable: {passed} passed, {} dropped: {}\n",
pipeline.filter_dropped.len(),
pipeline.filter_dropped.join(", "),
));
}
out.push_str(&format!(
" bpf_discover: {} programs found\n",
pipeline.bpf_discovered,
));
out.push_str(&format!(
" after_expand: {} total probe targets\n",
pipeline.total_after_expand,
));
if skeleton.kprobe_attach_failed.is_empty() {
out.push_str(&format!(
" kprobes: {} attached\n",
skeleton.kprobe_attached,
));
} else {
out.push_str(&format!(
" kprobes: {} attached, {} failed: {}\n",
skeleton.kprobe_attached,
skeleton.kprobe_attach_failed.len(),
skeleton
.kprobe_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
if !skeleton.kprobe_resolve_failed.is_empty() {
out.push_str(&format!(
" kprobe_miss: {} unresolved: {}\n",
skeleton.kprobe_resolve_failed.len(),
skeleton.kprobe_resolve_failed.join(", "),
));
}
if skeleton.fentry_candidates > 0 {
if skeleton.fentry_attach_failed.is_empty() {
out.push_str(&format!(
" fentry: {} attached\n",
skeleton.fentry_attached,
));
} else {
out.push_str(&format!(
" fentry: {} attached, {} failed: {}\n",
skeleton.fentry_attached,
skeleton.fentry_attach_failed.len(),
skeleton
.fentry_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
}
let trigger_type = if skeleton.trigger_type.is_empty() {
"unknown"
} else {
&skeleton.trigger_type
};
if let Some(ref err) = skeleton.trigger_attach_error {
out.push_str(&format!(" trigger: attach failed ({err})\n"));
} else {
out.push_str(&format!(
" trigger: {} ({})\n",
if skeleton.trigger_fired {
"fired"
} else {
"not fired"
},
trigger_type,
));
}
out.push_str(&format!(
" probe_data: {} keys, {} unmatched IPs\n",
skeleton.probe_data_keys, skeleton.probe_data_unmatched_ips,
));
out.push_str(&format!(
" events: {} captured, {} after stitch\n",
skeleton.events_before_stitch, skeleton.events_after_stitch,
));
if skeleton.bpf_kprobe_fires > 0
|| skeleton.bpf_trigger_fires > 0
|| skeleton.bpf_meta_misses > 0
{
out.push_str(&format!(
" bpf_counts: {} kprobe fires, {} trigger fires, {} meta misses\n",
skeleton.bpf_kprobe_fires, skeleton.bpf_trigger_fires, skeleton.bpf_meta_misses,
));
if !skeleton.bpf_miss_ips.is_empty() {
let ips: Vec<String> = skeleton
.bpf_miss_ips
.iter()
.map(|ip| format!("0x{ip:x}"))
.collect();
out.push_str(&format!(" miss_ips: {}\n", ips.join(", ")));
}
}
out
}
pub(crate) fn maybe_dispatch_vm_test() -> Option<i32> {
let args: Vec<String> = std::env::args().collect();
maybe_dispatch_vm_test_with_args(&args)
}
fn build_dispatch_ctx_parts(
entry: &KtstrTestEntry,
args: &[String],
) -> (
crate::topology::TestTopology,
crate::cgroup::CgroupManager,
Option<libc::pid_t>,
crate::assert::Assert,
) {
let topo = match crate::topology::TestTopology::from_system() {
Ok(sys) => sys,
Err(e) => {
eprintln!("ktstr_test: topology from sysfs failed ({e}), using VM spec fallback");
crate::topology::TestTopology::from_vm_topology(&entry.topology)
}
};
let cgroup_root = resolve_cgroup_root(args);
let cgroups = crate::cgroup::CgroupManager::new(&cgroup_root);
if let Err(e) = cgroups.setup(false) {
eprintln!("ktstr_test: cgroup setup failed: {e}");
}
let sched_pid = std::env::var("SCHED_PID")
.ok()
.and_then(|s| s.parse::<libc::pid_t>().ok())
.filter(|&pid| pid != 0);
let merged_assert = crate::assert::Assert::default_checks()
.merge(entry.scheduler.assert())
.merge(&entry.assert);
(topo, cgroups, sched_pid, merged_assert)
}
pub(crate) fn maybe_dispatch_vm_test_with_args(args: &[String]) -> Option<i32> {
let name = extract_test_fn_arg(args)?;
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let probe_stack = extract_probe_stack_arg(args);
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let pipeline = ProbePipeline::new();
let probe_stop = pipeline.stop.clone();
let probe_handle: Option<ProbeHandle> = probe_stack.as_ref().and_then(|stack_input| {
use crate::probe::stack::load_probe_stack;
eprintln!("ktstr_test: probe: loading probe stack from --ktstr-probe-stack");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let mut functions = crate::probe::stack::filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let stack_display_names: Vec<&str> = functions
.iter()
.filter(|f| f.is_bpf)
.map(|f| f.display_name.as_str())
.collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&stack_display_names);
pipe_diag.bpf_discovered = bpf_syms.len() as u32;
if !bpf_syms.is_empty() {
eprintln!(
"ktstr_test: probe: {} BPF symbols discovered",
bpf_syms.len()
);
functions.extend(bpf_syms);
}
let functions = crate::probe::stack::expand_bpf_to_kernel_callers(functions);
pipe_diag.total_after_expand = functions.len() as u32;
if functions.is_empty() {
eprintln!("ktstr_test: no traceable functions from --ktstr-probe-stack");
return None;
}
eprintln!(
"ktstr_test: probe: {} functions loaded, spawning probe thread",
functions.len()
);
let kernel_names: Vec<&str> = functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
let mut btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let bpf_btf_args: Vec<(&str, u32)> = functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
if !bpf_btf_args.is_empty() {
btf_funcs.extend(crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args));
}
let func_names: Vec<(u32, String)> = functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&functions);
let pnames = crate::probe::output::build_param_names(&btf_funcs);
let rhints = crate::probe::output::build_render_hints(&btf_funcs);
let pnames_thread = pnames.clone();
let rhints_thread = rhints.clone();
let thread_pipeline = pipeline.clone();
let funcs = functions.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let handle = std::thread::spawn(move || {
use crate::probe::process::run_probe_skeleton;
let (events, diag, accumulated_fn_names) = run_probe_skeleton(
&funcs,
&btf_funcs,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
None,
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames_thread,
&rhints_thread,
);
thread_pipeline
.output_done
.store(true, std::sync::atomic::Ordering::Release);
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
Some(ProbeHandle {
thread: handle,
func_names,
pipeline_diag: pipe_diag,
output_done: pipeline.output_done.clone(),
param_names: pnames,
render_hints: rhints,
})
});
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.workers_per_cgroup(entry.workers_per_cgroup as usize)
.sched_pid(sched_pid)
.settle(Duration::from_millis(500))
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.build();
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult {
passed: false,
skipped: false,
details: vec![format!("{e:#}").into()],
stats: Default::default(),
};
publish_result_and_collect(&r, probe_stop, probe_handle);
return Some(1);
}
};
let exit_code = if result.passed { 0 } else { 1 };
publish_result_and_collect(&result, probe_stop, probe_handle);
Some(exit_code)
}
type ProbeThreadResult = (
Option<Vec<crate::probe::process::ProbeEvent>>,
crate::probe::process::ProbeDiagnostics,
Vec<(u32, String)>,
);
struct ProbeHandle {
thread: std::thread::JoinHandle<ProbeThreadResult>,
func_names: Vec<(u32, String)>,
pipeline_diag: PipelineDiagnostics,
output_done: std::sync::Arc<std::sync::atomic::AtomicBool>,
param_names: std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Clone, Default)]
pub(crate) struct ProbePipeline {
pub stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub output_done: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub probes_ready: std::sync::Arc<crate::sync::Latch>,
}
impl ProbePipeline {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct PipelineDiagnostics {
pub stack_extracted: u32,
pub filter_dropped: Vec<String>,
pub bpf_discovered: u32,
pub total_after_expand: u32,
}
pub(crate) struct ProbePhaseAState {
pub handle: std::thread::JoinHandle<ProbeThreadResult>,
pub phase_b_tx: std::sync::mpsc::Sender<crate::probe::process::PhaseBInput>,
pub pipeline: ProbePipeline,
pub kernel_func_names: Vec<(u32, String)>,
pub kernel_func_count: u32,
pub pipe_diag: PipelineDiagnostics,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
pub(crate) fn start_probe_phase_a(args: &[String]) -> Option<ProbePhaseAState> {
use crate::probe::stack::{filter_traceable, load_probe_stack};
let stack_input = extract_probe_stack_arg(args)?;
eprintln!("ktstr_test: probe phase_a: loading kernel functions");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(&stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let functions = filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let kernel_functions: Vec<crate::probe::stack::StackFunction> =
functions.into_iter().filter(|f| !f.is_bpf).collect();
let kernel_names: Vec<&str> = kernel_functions
.iter()
.map(|f| f.raw_name.as_str())
.collect();
let btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let func_names: Vec<(u32, String)> = kernel_functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
pipe_diag.total_after_expand = kernel_functions.len() as u32;
let bpf_fds = std::collections::HashMap::new(); let param_names = crate::probe::output::build_param_names(&btf_funcs);
let render_hints = crate::probe::output::build_render_hints(&btf_funcs);
let pipeline = ProbePipeline::new();
let (phase_b_tx, phase_b_rx) = std::sync::mpsc::channel();
let thread_pipeline = pipeline.clone();
let funcs = kernel_functions.clone();
let btf = btf_funcs.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let pnames = param_names.clone();
let rhints = render_hints.clone();
let handle = std::thread::spawn(move || {
let (events, diag, accumulated_fn_names) = crate::probe::process::run_probe_skeleton(
&funcs,
&btf,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
Some(phase_b_rx),
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames,
&rhints,
);
thread_pipeline
.output_done
.store(true, std::sync::atomic::Ordering::Release);
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
eprintln!(
"ktstr_test: probe phase_a: {} kernel functions attached, waiting for Phase B",
kernel_functions.len(),
);
let kernel_func_count = kernel_functions.len() as u32;
Some(ProbePhaseAState {
handle,
phase_b_tx,
pipeline,
kernel_func_names: func_names,
kernel_func_count,
pipe_diag,
param_names,
render_hints,
})
}
pub(crate) fn maybe_dispatch_vm_test_with_phase_a(
args: &[String],
pa: ProbePhaseAState,
) -> Option<i32> {
use crate::probe::btf::discover_bpf_symbols;
use crate::probe::stack::expand_bpf_to_kernel_callers;
let name = extract_test_fn_arg(args)?;
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let ProbePhaseAState {
handle: pa_handle,
phase_b_tx: pa_phase_b_tx,
pipeline: pa_pipeline,
kernel_func_names: pa_kernel_func_names,
kernel_func_count: pa_kernel_func_count,
pipe_diag: pa_pipe_diag,
param_names: pa_param_names,
render_hints: pa_render_hints,
} = pa;
eprintln!("ktstr_test: probe phase_b: discovering BPF symbols");
let stack_display_names: Vec<&str> = Vec::new(); let bpf_syms = discover_bpf_symbols(&stack_display_names);
eprintln!(
"ktstr_test: probe phase_b: {} BPF symbols discovered",
bpf_syms.len()
);
if !bpf_syms.is_empty() {
let phase_b_functions = expand_bpf_to_kernel_callers(bpf_syms);
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&phase_b_functions);
let bpf_btf_args: Vec<(&str, u32)> = phase_b_functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
let mut phase_b_btf = if !bpf_btf_args.is_empty() {
crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args)
} else {
Vec::new()
};
let kernel_caller_names: Vec<&str> = phase_b_functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
if !kernel_caller_names.is_empty() {
phase_b_btf.extend(crate::probe::btf::parse_btf_functions(
&kernel_caller_names,
None,
));
}
let phase_b_done = std::sync::Arc::new(crate::sync::Latch::new());
let phase_b_done_clone = phase_b_done.clone();
let phase_b_input = crate::probe::process::PhaseBInput {
functions: phase_b_functions,
bpf_prog_fds: bpf_fds,
btf_funcs: phase_b_btf,
done: phase_b_done_clone,
func_idx_offset: pa_kernel_func_count,
};
if let Err(e) = pa_phase_b_tx.send(phase_b_input) {
eprintln!("ktstr_test: probe phase_b: failed to send: {e}");
} else {
phase_b_done.wait();
eprintln!("ktstr_test: probe phase_b: BPF fentry attached");
}
} else {
eprintln!("ktstr_test: probe phase_b: no BPF symbols, skipping fentry");
drop(pa_phase_b_tx);
}
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.workers_per_cgroup(entry.workers_per_cgroup as usize)
.sched_pid(sched_pid)
.settle(std::time::Duration::from_millis(500))
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.build();
let stop = pa_pipeline.stop.clone();
let handle = ProbeHandle {
thread: pa_handle,
func_names: pa_kernel_func_names,
pipeline_diag: pa_pipe_diag,
output_done: pa_pipeline.output_done,
param_names: pa_param_names,
render_hints: pa_render_hints,
};
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult {
passed: false,
skipped: false,
details: vec![format!("{e:#}").into()],
stats: Default::default(),
};
publish_result_and_collect(&r, stop, Some(handle));
return Some(1);
}
};
let exit_code = if result.passed { 0 } else { 1 };
publish_result_and_collect(&result, stop, Some(handle));
Some(exit_code)
}
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytes {
pub events: Vec<crate::probe::process::ProbeEvent>,
pub func_names: Vec<(u32, String)>,
pub bpf_source_locs: std::collections::HashMap<String, String>,
pub diagnostics: Option<ProbeBytesDiagnostics>,
pub nr_cpus: Option<u32>,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytesDiagnostics {
pub pipeline: PipelineDiagnostics,
pub skeleton: crate::probe::process::ProbeDiagnostics,
}
fn emit_probe_payload(
events: &[crate::probe::process::ProbeEvent],
func_names: &[(u32, String)],
pipeline_diag: &PipelineDiagnostics,
skeleton_diag: &crate::probe::process::ProbeDiagnostics,
param_names: &std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: &std::collections::HashMap<String, crate::probe::btf::RenderHint>,
) {
let source_loc_names: Vec<&str> = func_names.iter().map(|(_, name)| name.as_str()).collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&source_loc_names);
let bpf_prog_ids: Vec<u32> = func_names
.iter()
.filter_map(|(_, name)| {
bpf_syms
.iter()
.find(|s| s.display_name == *name)
.and_then(|s| s.bpf_prog_id)
})
.collect();
let bpf_source_locs = crate::probe::btf::resolve_bpf_source_locs(&bpf_prog_ids);
let payload = ProbeBytes {
events: events.to_vec(),
func_names: func_names.to_vec(),
bpf_source_locs,
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: pipeline_diag.clone(),
skeleton: skeleton_diag.clone(),
}),
nr_cpus: crate::probe::output::get_nr_cpus(),
param_names: param_names.clone(),
render_hints: render_hints.clone(),
};
println!("{PROBE_OUTPUT_START}");
if let Ok(json) = serde_json::to_string(&payload) {
println!("{json}");
}
println!("{PROBE_OUTPUT_END}");
}
fn publish_result_and_collect(
result: &AssertResult,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
try_flush_profraw();
print_assert_result(result);
collect_and_print_probe_data(stop, handle);
}
fn collect_and_print_probe_data(
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
let Some(ph) = handle else {
return;
};
stop.store(true, std::sync::atomic::Ordering::Release);
let (events, skeleton_diag, accumulated_fn_names) = match ph.thread.join() {
Ok((Some(events), diag, fnames)) => (events, diag, fnames),
Ok((None, diag, fnames)) => (Vec::new(), diag, fnames),
Err(_) => (
Vec::new(),
crate::probe::process::ProbeDiagnostics::default(),
Vec::new(),
),
};
let effective_fn_names = if accumulated_fn_names.is_empty() {
&ph.func_names
} else {
&accumulated_fn_names
};
if !ph.output_done.load(std::sync::atomic::Ordering::Acquire) {
emit_probe_payload(
&events,
effective_fn_names,
&ph.pipeline_diag,
&skeleton_diag,
&ph.param_names,
&ph.render_hints,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_probe_output_valid_json() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 42)],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "schedule".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("noise\n{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}\nmore");
let parsed = extract_probe_output(&output, None);
assert!(parsed.is_some());
let formatted = parsed.unwrap();
assert!(
formatted.contains("schedule"),
"should contain func name: {formatted}"
);
assert!(
formatted.contains("pid"),
"should contain field name: {formatted}"
);
}
#[test]
fn extract_probe_output_missing() {
assert!(extract_probe_output("no markers", None).is_none());
}
#[test]
fn extract_probe_output_empty() {
let output = format!("{PROBE_OUTPUT_START}\n\n{PROBE_OUTPUT_END}");
assert!(extract_probe_output(&output, None).is_none());
}
#[test]
fn extract_probe_output_invalid_json() {
let output = format!("{PROBE_OUTPUT_START}\nnot valid json\n{PROBE_OUTPUT_END}");
assert!(extract_probe_output(&output, None).is_none());
}
#[test]
fn extract_probe_output_enriched_fields() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0xDEAD, 0, 0, 0, 0, 0],
fields: vec![
("prev:task_struct.pid".to_string(), 42),
("prev:task_struct.scx_flags".to_string(), 0x1c),
],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 1,
ts: 200,
args: [0; 6],
fields: vec![("rq:rq.cpu".to_string(), 3)],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![
(0, "schedule".to_string()),
(1, "pick_task_scx".to_string()),
],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}");
let formatted = extract_probe_output(&output, None).unwrap();
assert!(formatted.contains("pid"), "pid field: {formatted}");
assert!(formatted.contains("42"), "pid value: {formatted}");
assert!(
formatted.contains("scx_flags"),
"scx_flags field: {formatted}"
);
assert!(formatted.contains("cpu"), "cpu field: {formatted}");
assert!(formatted.contains("3"), "cpu value: {formatted}");
assert!(
formatted.contains("task_struct *prev"),
"type header for task_struct: {formatted}"
);
assert!(
formatted.contains("rq *rq"),
"type header for rq: {formatted}"
);
assert!(
!formatted.contains("arg0"),
"raw args should not appear when fields exist: {formatted}"
);
assert!(formatted.contains("schedule"), "func schedule: {formatted}");
assert!(
formatted.contains("pick_task_scx"),
"func pick_task_scx: {formatted}"
);
}
#[test]
fn format_tail_empty_text_returns_none() {
assert_eq!(format_tail("", 5, "scheduler"), None);
}
#[test]
fn format_tail_fewer_lines_than_n_returns_all() {
let out = format_tail("one\ntwo\nthree", 10, "scheduler").unwrap();
assert_eq!(out, "--- scheduler ---\none\ntwo\nthree");
}
#[test]
fn format_tail_trims_to_last_n_lines() {
let out = format_tail("1\n2\n3\n4\n5", 3, "log").unwrap();
assert_eq!(out, "--- log ---\n3\n4\n5");
}
#[test]
fn format_tail_zero_n_returns_empty_body_under_header() {
let out = format_tail("a\nb", 0, "hdr").unwrap();
assert_eq!(out, "--- hdr ---\n");
}
#[test]
fn format_tail_preserves_trailing_blank_lines() {
let out = format_tail("a\n\nb", 3, "hdr").unwrap();
assert_eq!(out, "--- hdr ---\na\n\nb");
}
#[test]
fn parse_rust_env_empty_cmdline_is_empty() {
assert!(parse_rust_env_from_cmdline("").is_empty());
}
#[test]
fn parse_rust_env_no_matches() {
assert!(parse_rust_env_from_cmdline("console=ttyS0 ro quiet").is_empty());
}
#[test]
fn parse_rust_env_backtrace_only() {
let parsed = parse_rust_env_from_cmdline("console=ttyS0 RUST_BACKTRACE=1 ro");
assert_eq!(parsed, vec![("RUST_BACKTRACE", "1")]);
}
#[test]
fn parse_rust_env_log_only() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=debug other=x");
assert_eq!(parsed, vec![("RUST_LOG", "debug")]);
}
#[test]
fn parse_rust_env_both() {
let parsed = parse_rust_env_from_cmdline("RUST_BACKTRACE=full RUST_LOG=trace other=y");
assert_eq!(
parsed,
vec![("RUST_BACKTRACE", "full"), ("RUST_LOG", "trace")]
);
}
#[test]
fn parse_rust_env_preserves_token_order() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=info RUST_BACKTRACE=1");
assert_eq!(parsed, vec![("RUST_LOG", "info"), ("RUST_BACKTRACE", "1")]);
}
#[test]
fn parse_rust_env_empty_value() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=");
assert_eq!(parsed, vec![("RUST_LOG", "")]);
}
#[test]
fn parse_rust_env_ignores_prefix_mismatch() {
assert!(parse_rust_env_from_cmdline("xRUST_LOG=x").is_empty());
}
#[test]
fn extract_not_attached_reason_timeout() {
let output = "noise\nSCHEDULER_NOT_ATTACHED: timeout\nmore";
assert_eq!(extract_not_attached_reason(output), Some("timeout"));
}
#[test]
fn extract_not_attached_reason_sysfs_absent() {
let output = "SCHEDULER_NOT_ATTACHED: sched_ext sysfs absent";
assert_eq!(
extract_not_attached_reason(output),
Some("sched_ext sysfs absent"),
);
}
#[test]
fn extract_not_attached_reason_trims_trailing_whitespace() {
let output = "SCHEDULER_NOT_ATTACHED: timeout \n";
assert_eq!(extract_not_attached_reason(output), Some("timeout"));
}
#[test]
fn extract_not_attached_reason_absent_returns_none() {
assert_eq!(extract_not_attached_reason(""), None);
assert_eq!(
extract_not_attached_reason("SCHEDULER_DIED\nKTSTR_EXIT=1"),
None,
);
}
#[test]
fn extract_not_attached_reason_without_colon_returns_none() {
let output = "SCHEDULER_NOT_ATTACHED\nKTSTR_EXIT=1";
assert_eq!(extract_not_attached_reason(output), None);
}
#[test]
fn extract_not_attached_reason_empty_suffix_returns_none() {
let output = "SCHEDULER_NOT_ATTACHED:\n";
assert_eq!(extract_not_attached_reason(output), None);
let output_ws = "SCHEDULER_NOT_ATTACHED: \n";
assert_eq!(extract_not_attached_reason(output_ws), None);
}
#[test]
fn extract_not_attached_reason_first_match_wins() {
let output =
"SCHEDULER_NOT_ATTACHED: timeout\nSCHEDULER_NOT_ATTACHED: sched_ext sysfs absent";
assert_eq!(extract_not_attached_reason(output), Some("timeout"));
}
#[test]
fn classify_repro_vm_status_timeout_wins_over_other_signals() {
let status = classify_repro_vm_status(
true,
true,
"SCHEDULER_NOT_ATTACHED: timeout\nSCHEDULER_DIED",
137,
);
assert_eq!(status, "repro VM: timed out");
}
#[test]
fn classify_repro_vm_status_not_attached_with_reason() {
let status = classify_repro_vm_status(
false,
false,
"noise\nSCHEDULER_NOT_ATTACHED: sched_ext sysfs absent\nKTSTR_EXIT=1",
1,
);
assert_eq!(
status,
"repro VM: scheduler did not attach (sched_ext sysfs absent) (exit code 1)",
);
}
#[test]
fn classify_repro_vm_status_not_attached_takes_precedence_over_crashed() {
let status = classify_repro_vm_status(
false,
true,
"SCHEDULER_DIED\nSCHEDULER_NOT_ATTACHED: timeout",
1,
);
assert_eq!(
status,
"repro VM: scheduler did not attach (timeout) (exit code 1)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel() {
let status = classify_repro_vm_status(false, false, "SCHEDULER_DIED\n", 139);
assert_eq!(
status,
"repro VM: scheduler crashed — exited with non-zero status (139)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_crash_message() {
let status = classify_repro_vm_status(false, true, "no sentinels here", 134);
assert_eq!(
status,
"repro VM: scheduler crashed — exited with non-zero status (134)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_qemu_clean_exit() {
let status = classify_repro_vm_status(false, false, "SCHEDULER_DIED\n", 0);
assert_eq!(status, "repro VM: scheduler crashed — exited cleanly");
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_killed_by_signal() {
let status = classify_repro_vm_status(false, false, "SCHEDULER_DIED\n", -9);
assert_eq!(
status,
"repro VM: scheduler crashed — killed by signal (-9)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_vmm_exit_code_unset() {
let status = classify_repro_vm_status(false, false, "SCHEDULER_DIED\n", -1);
assert_eq!(
status,
"repro VM: scheduler crashed — VM host reported no final exit \
status (the scheduler did not deliver an exit signal before \
the VM ended)",
);
assert!(
!status.contains("BSP"),
"user-facing status leaks BSP: {status}"
);
assert!(
!status.contains("VmResult::exit_code"),
"user-facing status leaks VmResult::exit_code: {status}",
);
assert!(
!status.contains("MSG_TYPE_EXIT"),
"user-facing status leaks MSG_TYPE_EXIT: {status}",
);
}
#[test]
fn classify_repro_vm_status_abnormal_exit() {
let status = classify_repro_vm_status(false, false, "clean output", 2);
assert_eq!(status, "repro VM: exited abnormally (exit code 2)");
}
#[test]
fn classify_repro_vm_status_clean_run() {
let status = classify_repro_vm_status(false, false, "clean output", 0);
assert_eq!(
status,
"repro VM: scheduler ran normally (crash did not reproduce)",
);
}
#[test]
fn classify_repro_vm_status_malformed_not_attached_falls_through() {
let status =
classify_repro_vm_status(false, false, "SCHEDULER_NOT_ATTACHED\nKTSTR_EXIT=1", 1);
assert_eq!(status, "repro VM: exited abnormally (exit code 1)");
}
}