use std::path::Path;
use std::time::Duration;
use crate::assert::AssertResult;
use super::args::{
extract_probe_stack_arg, extract_test_fn_arg, extract_work_type_arg, resolve_cgroup_root,
};
use super::entry::find_test;
use super::output::{extract_sched_ext_dump, print_assert_result};
use super::profraw::try_flush_profraw;
use super::runtime::{config_file_parts, verbose};
use super::{KtstrTestEntry, TopoOverride};
use crate::verifier::{
SCHED_OUTPUT_END, SCHED_OUTPUT_START, parse_sched_output, parse_sched_output_partial,
};
const DISCOVER_SENTINEL: &str = "__discover__";
pub(crate) fn propagate_rust_env_from_cmdline() {
let Ok(cmdline) = std::fs::read_to_string("/proc/cmdline") else {
return;
};
for (key, val) in parse_rust_env_from_cmdline(&cmdline) {
unsafe { std::env::set_var(key, val) };
}
}
fn parse_rust_env_from_cmdline(cmdline: &str) -> Vec<(&'static str, &str)> {
let mut out = Vec::new();
for token in cmdline.split_whitespace() {
if let Some(val) = token.strip_prefix("RUST_BACKTRACE=") {
out.push(("RUST_BACKTRACE", val));
} else if let Some(val) = token.strip_prefix("RUST_LOG=") {
out.push(("RUST_LOG", val));
} else if let Some(val) = token.strip_prefix("KTSTR_SIDECAR_DIR=") {
out.push(("KTSTR_SIDECAR_DIR", val));
}
}
out
}
pub(crate) const PROBE_OUTPUT_START: &str = "===PROBE_OUTPUT_START===";
pub(crate) const PROBE_OUTPUT_END: &str = "===PROBE_OUTPUT_END===";
fn format_tail(text: &str, n: usize, header: &str) -> Option<String> {
let lines: Vec<&str> = text.lines().collect();
if lines.is_empty() {
return None;
}
let start = lines.len().saturating_sub(n);
Some(format!("--- {header} ---\n{}", lines[start..].join("\n")))
}
fn render_failure_dump_file(path: &std::path::Path) -> Option<String> {
use crate::monitor::dump::FailureDumpReportAny;
use std::fmt::Write;
let json = std::fs::read_to_string(path).ok()?;
let any = FailureDumpReportAny::from_json(&json)?;
let mut buf = String::with_capacity(json.len());
buf.push_str("--- repro VM failure dump ---\n");
let _ = write!(buf, "{any}");
Some(buf)
}
fn classify_repro_vm_status(
timed_out: bool,
has_crash_message: bool,
_output: &str,
exit_code: i32,
guest_messages: Option<&crate::vmm::host_comms::BulkDrainResult>,
) -> String {
if timed_out {
return "repro VM: timed out".to_string();
}
if let Some(reason) = extract_not_attached_reason(guest_messages) {
return format!("repro VM: scheduler did not attach ({reason}) (exit code {exit_code})",);
}
let scheduler_died = guest_messages
.map(|d| {
d.entries.iter().any(|e| {
e.msg_type == crate::vmm::wire::MSG_TYPE_LIFECYCLE
&& e.crc_ok
&& !e.payload.is_empty()
&& crate::vmm::wire::LifecyclePhase::from_wire(e.payload[0])
== Some(crate::vmm::wire::LifecyclePhase::SchedulerDied)
})
})
.unwrap_or(false);
if has_crash_message || scheduler_died {
let exit_clause = if exit_code == -1 {
"VM host reported no final exit status (the scheduler did not \
deliver an exit signal before the VM ended)"
.to_string()
} else if exit_code < 0 {
format!("killed by signal ({exit_code})")
} else if exit_code == 0 {
"exited cleanly".to_string()
} else {
format!("exited with non-zero status ({exit_code})")
};
return format!("repro VM: scheduler crashed — {exit_clause}");
}
if exit_code != 0 {
return format!("repro VM: exited abnormally (exit code {exit_code})");
}
"repro VM: scheduler ran normally (crash did not reproduce)".to_string()
}
fn extract_not_attached_reason(
drain: Option<&crate::vmm::host_comms::BulkDrainResult>,
) -> Option<String> {
use crate::vmm::wire::{LifecyclePhase, MSG_TYPE_LIFECYCLE};
let drain = drain?;
for e in &drain.entries {
if e.msg_type != MSG_TYPE_LIFECYCLE || !e.crc_ok || e.payload.is_empty() {
continue;
}
if LifecyclePhase::from_wire(e.payload[0]) != Some(LifecyclePhase::SchedulerNotAttached) {
continue;
}
let reason = String::from_utf8_lossy(&e.payload[1..]).trim().to_string();
if reason.is_empty() {
return None;
}
return Some(reason);
}
None
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn attempt_auto_repro(
entry: &KtstrTestEntry,
kernel: &Path,
scheduler: Option<&Path>,
ktstr_bin: &Path,
first_vm_output: &str,
console_output: &str,
topo: Option<&TopoOverride>,
active_flags: &[String],
) -> Option<String> {
use crate::probe::stack::extract_stack_functions_all;
let has_sched_start = first_vm_output.contains(SCHED_OUTPUT_START);
let has_sched_end = first_vm_output.contains(SCHED_OUTPUT_END);
eprintln!(
"ktstr_test: auto-repro: COM2 length={} has_sched_start={has_sched_start} has_sched_end={has_sched_end}",
first_vm_output.len(),
);
let sched_output = parse_sched_output_partial(first_vm_output);
let stack_funcs = if let Some(sched) = sched_output {
let funcs = extract_stack_functions_all(sched);
if funcs.is_empty() {
if has_sched_start && !has_sched_end {
eprintln!(
"ktstr_test: auto-repro: no functions from partial COM2 (missing \
SCHED_OUTPUT_END), trying COM1",
);
} else {
eprintln!("ktstr_test: auto-repro: no functions from COM2, trying COM1");
}
extract_stack_functions_all(console_output)
} else {
if has_sched_start && !has_sched_end {
eprintln!(
"ktstr_test: auto-repro: extracted {} functions from partial COM2 \
(missing SCHED_OUTPUT_END)",
funcs.len(),
);
}
funcs
}
} else {
eprintln!("ktstr_test: auto-repro: no scheduler output on COM2, trying COM1");
extract_stack_functions_all(console_output)
};
let func_names: Vec<String> = stack_funcs.iter().map(|f| f.raw_name.clone()).collect();
let probe_arg = if func_names.is_empty() {
eprintln!("ktstr_test: auto-repro: no stack functions, using BPF discovery in repro VM");
format!("--ktstr-probe-stack={DISCOVER_SENTINEL}")
} else {
eprintln!(
"ktstr_test: auto-repro: probing {} functions in second VM",
func_names.len()
);
format!("--ktstr-probe-stack={}", func_names.join(","))
};
let guest_args = vec![
"run".to_string(),
"--ktstr-test-fn".to_string(),
entry.name.to_string(),
probe_arg,
];
let cmdline_extra = super::runtime::build_cmdline_extra(entry);
let (vm_topology, memory_mb) = super::runtime::resolve_vm_topology(entry, topo);
let no_perf_mode = super::runtime::no_perf_mode_active();
let mut builder = super::runtime::build_vm_builder_base(
entry,
kernel,
ktstr_bin,
scheduler,
vm_topology,
memory_mb,
&cmdline_extra,
&guest_args,
no_perf_mode,
);
let repro_dump_path =
super::sidecar::sidecar_dir().join(format!("{}.repro.failure-dump.json", entry.name));
builder = builder.failure_dump_path(&repro_dump_path);
builder = builder
.dual_snapshot(true)
.performance_mode(entry.performance_mode);
if let Some(crate::test_support::entry::SchedulerSpec::KernelBuiltin { enable, disable }) =
entry.scheduler.scheduler_binary()
{
builder = builder.sched_enable_cmds(enable);
builder = builder.sched_disable_cmds(disable);
}
let merged_assert = crate::assert::Assert::default_checks()
.merge(entry.scheduler.assert())
.merge(&entry.assert);
if entry.scheduler.has_active_scheduling() {
builder = builder.monitor_thresholds(merged_assert.monitor_thresholds());
}
{
let mut args: Vec<String> = Vec::new();
let declarative_specs: Vec<std::path::PathBuf> = entry
.all_include_files()
.into_iter()
.map(std::path::PathBuf::from)
.collect();
let mut resolved_includes: Vec<(String, std::path::PathBuf, &'static str)> =
if declarative_specs.is_empty() {
Vec::new()
} else {
match crate::cli::resolve_include_files(&declarative_specs) {
Ok(v) => v.into_iter().map(|(a, h)| (a, h, "declarative")).collect(),
Err(e) => {
eprintln!("ktstr_test: auto-repro: include_files resolve: {e:#}");
Vec::new()
}
}
};
if let Some((archive_path, host_path, guest_path)) = config_file_parts(entry) {
resolved_includes.push((archive_path, host_path, "scheduler config_file"));
args.push("--config".to_string());
args.push(guest_path);
}
match super::eval::dedupe_include_files(&resolved_includes) {
Ok(unioned) if !unioned.is_empty() => {
builder = builder.include_files(unioned);
}
Ok(_) => {}
Err(e) => {
eprintln!("ktstr_test: auto-repro: include_files dedupe: {e:#}");
}
}
super::runtime::append_base_sched_args(entry, &mut args);
for flag_name in active_flags {
if let Some(flag_args) = entry.scheduler.flag_args(flag_name) {
args.extend(flag_args.iter().map(|s| s.to_string()));
}
}
if !args.is_empty() {
builder = builder.sched_args(&args);
}
}
let vm = match builder.build() {
Ok(vm) => vm,
Err(e) => {
eprintln!("ktstr_test: auto-repro: failed to build VM: {e:#}");
return None;
}
};
let repro_result = match vm.run() {
Ok(r) => r,
Err(e) => {
eprintln!("ktstr_test: auto-repro: VM run failed: {e:#}");
return None;
}
};
drop(vm);
if verbose() {
eprintln!(
"ktstr_test: auto-repro: COM1 stderr length={} COM2 stdout length={}",
repro_result.stderr.len(),
repro_result.output.len(),
);
for line in repro_result.stderr.lines() {
eprintln!(" repro-vm-com1: {line}");
}
let mut in_probe = false;
for line in repro_result.output.lines() {
if line.contains("ktstr_test: probe:") {
in_probe = true;
}
if in_probe {
eprintln!(" repro-vm-com2: {line}");
}
}
}
let kernel_dir = crate::kernel_path::derive_kernel_dir(kernel)
.map(|dir| crate::cache::prefer_source_tree_for_dwarf(&dir).unwrap_or(dir))
.and_then(|p| p.to_str().map(String::from));
let kernel_dir_str = kernel_dir.as_deref();
let probe_payload_partial_path = super::sidecar::sidecar_dir()
.join(format!("{}.repro.probe-payload.partial.json", entry.name));
let _ = std::fs::remove_file(&probe_payload_partial_path);
let probe_section = extract_probe_output(
&repro_result.output,
kernel_dir_str,
Some(probe_payload_partial_path.as_path()),
);
const REPRO_TAIL_LINES: usize = 40;
let sched_log_tail = parse_sched_output(&repro_result.output).and_then(|log| {
let collapsed = crate::verifier::collapse_cycles(log);
format_tail(&collapsed, REPRO_TAIL_LINES, "repro VM scheduler log")
});
let dump_tail = extract_sched_ext_dump(&repro_result.stderr)
.and_then(|dump| format_tail(&dump, REPRO_TAIL_LINES, "repro VM sched_ext dump"));
let dmesg_filtered: String = repro_result
.stderr
.lines()
.filter(|l| !l.contains("sched_ext_dump"))
.collect::<Vec<_>>()
.join("\n");
let dmesg_tail = format_tail(&dmesg_filtered, REPRO_TAIL_LINES, "repro VM dmesg");
let failure_dump_tail = render_failure_dump_file(&repro_dump_path);
let tails: Vec<String> = [sched_log_tail, dump_tail, failure_dump_tail, dmesg_tail]
.into_iter()
.flatten()
.collect();
if probe_section.is_none() && tails.is_empty() {
return None;
}
let has_probe = probe_section.is_some();
let mut out = probe_section.unwrap_or_default();
if !has_probe {
out.push_str(&classify_repro_vm_status(
repro_result.timed_out,
repro_result.crash_message.is_some(),
&repro_result.output,
repro_result.exit_code,
repro_result.guest_messages.as_ref(),
));
}
if !out.is_empty() {
out.push('\n');
}
out.push_str(&format!(
"repro VM duration: {:.1}s",
repro_result.duration.as_secs_f64(),
));
for tail in &tails {
out.push_str("\n\n");
out.push_str(tail);
}
Some(out)
}
pub(crate) fn extract_probe_output(
output: &str,
kernel_dir: Option<&str>,
partial_dump_path: Option<&Path>,
) -> Option<String> {
let json = crate::probe::output::extract_section(output, PROBE_OUTPUT_START, PROBE_OUTPUT_END);
if json.is_empty() {
return None;
}
let payload = parse_probe_payload(&json, partial_dump_path)?;
let mut out = String::new();
if let Some(ref diag) = payload.diagnostics {
out.push_str(&format_probe_diagnostics(&diag.pipeline, &diag.skeleton));
}
if payload.events.is_empty() {
if out.is_empty() {
return None;
}
return Some(out);
}
out.push_str(&crate::probe::output::format_probe_events_with_bpf_locs(
&payload.events,
&payload.func_names,
kernel_dir,
&payload.bpf_source_locs,
payload.nr_cpus,
&payload.param_names,
&payload.render_hints,
));
Some(out)
}
fn parse_probe_payload(json: &str, partial_dump_path: Option<&Path>) -> Option<ProbeBytes> {
match serde_json::from_str::<ProbeBytes>(json) {
Ok(payload) => Some(payload),
Err(e) => {
let total_len = json.len();
let category = if e.is_eof() { "truncated" } else { "malformed" };
eprintln!(
"ktstr_test: probe payload {category}: {e} \
(line {}, column {}, total {total_len} bytes)",
e.line(),
e.column(),
);
if let Some(path) = partial_dump_path {
match std::fs::write(path, json) {
Ok(()) => eprintln!(
"ktstr_test: probe payload: wrote raw bytes to {}",
path.display(),
),
Err(write_err) => eprintln!(
"ktstr_test: probe payload: failed to write raw bytes to {}: {write_err}",
path.display(),
),
}
}
if !e.is_eof() {
return None;
}
let recovered = recover_partial_events(json);
if recovered.is_empty() {
return None;
}
eprintln!(
"ktstr_test: probe payload: recovered {} event(s) from truncated input",
recovered.len(),
);
Some(ProbeBytes {
events: recovered,
func_names: Vec::new(),
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
})
}
}
}
fn recover_partial_events(json: &str) -> Vec<crate::probe::process::ProbeEvent> {
let key = "\"events\":";
let Some(key_idx) = json.find(key) else {
return Vec::new();
};
let after_key = &json[key_idx + key.len()..];
let Some(open_offset) = after_key.find('[') else {
return Vec::new();
};
let mut events = Vec::new();
let mut cur = &after_key[open_offset + 1..];
loop {
cur = cur.trim_start();
if let Some(rest) = cur.strip_prefix(',') {
cur = rest.trim_start();
}
if cur.is_empty() || cur.starts_with(']') || !cur.starts_with('{') {
break;
}
let Some(end) = find_balanced_object_end(cur) else {
break;
};
let chunk = &cur[..end];
let Ok(ev) = serde_json::from_str::<crate::probe::process::ProbeEvent>(chunk) else {
break;
};
events.push(ev);
cur = &cur[end..];
}
events
}
fn find_balanced_object_end(s: &str) -> Option<usize> {
let bytes = s.as_bytes();
if bytes.first() != Some(&b'{') {
return None;
}
let mut depth: u32 = 0;
let mut in_string = false;
let mut escape = false;
for (i, &b) in bytes.iter().enumerate() {
if in_string {
if escape {
escape = false;
} else if b == b'\\' {
escape = true;
} else if b == b'"' {
in_string = false;
}
continue;
}
match b {
b'"' => in_string = true,
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return Some(i + 1);
}
}
_ => {}
}
}
None
}
pub(crate) fn format_probe_diagnostics(
pipeline: &PipelineDiagnostics,
skeleton: &crate::probe::process::ProbeDiagnostics,
) -> String {
let mut out = String::new();
out.push_str("--- probe pipeline ---\n");
out.push_str(&format!(
" extracted: {} functions from crash backtrace\n",
pipeline.stack_extracted,
));
let passed = (pipeline.stack_extracted as usize).saturating_sub(pipeline.filter_dropped.len());
if pipeline.filter_dropped.is_empty() {
out.push_str(&format!(" traceable: {passed} passed filter\n"));
} else {
out.push_str(&format!(
" traceable: {passed} passed, {} dropped: {}\n",
pipeline.filter_dropped.len(),
pipeline.filter_dropped.join(", "),
));
}
out.push_str(&format!(
" bpf_discover: {} programs found\n",
pipeline.bpf_discovered,
));
out.push_str(&format!(
" after_expand: {} total probe targets\n",
pipeline.total_after_expand,
));
if skeleton.kprobe_attach_failed.is_empty() {
out.push_str(&format!(
" kprobes: {} attached\n",
skeleton.kprobe_attached,
));
} else {
out.push_str(&format!(
" kprobes: {} attached, {} failed: {}\n",
skeleton.kprobe_attached,
skeleton.kprobe_attach_failed.len(),
skeleton
.kprobe_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
if !skeleton.kprobe_resolve_failed.is_empty() {
out.push_str(&format!(
" kprobe_miss: {} unresolved: {}\n",
skeleton.kprobe_resolve_failed.len(),
skeleton.kprobe_resolve_failed.join(", "),
));
}
if skeleton.fentry_candidates > 0 {
if skeleton.fentry_attach_failed.is_empty() {
out.push_str(&format!(
" fentry: {} attached\n",
skeleton.fentry_attached,
));
} else {
out.push_str(&format!(
" fentry: {} attached, {} failed: {}\n",
skeleton.fentry_attached,
skeleton.fentry_attach_failed.len(),
skeleton
.fentry_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
}
let trigger_type = if skeleton.trigger_type.is_empty() {
"unknown"
} else {
&skeleton.trigger_type
};
if let Some(ref err) = skeleton.trigger_attach_error {
out.push_str(&format!(" trigger: attach failed ({err})\n"));
} else {
out.push_str(&format!(
" trigger: {} ({})\n",
if skeleton.trigger_fired {
"fired"
} else {
"not fired"
},
trigger_type,
));
}
if let Some(ref panic_msg) = skeleton.host_thread_panic {
out.push_str(&format!(
" ERROR: probe-collection thread panicked: {panic_msg}\n"
));
}
out.push_str(&format!(
" probe_data: {} keys, {} unmatched IPs\n",
skeleton.probe_data_keys, skeleton.probe_data_unmatched_ips,
));
out.push_str(&format!(
" events: {} captured, {} after stitch\n",
skeleton.events_before_stitch, skeleton.events_after_stitch,
));
if skeleton.bpf_kprobe_fires > 0
|| skeleton.bpf_trigger_fires > 0
|| skeleton.bpf_meta_misses > 0
{
out.push_str(&format!(
" bpf_counts: {} kprobe fires, {} trigger fires, {} meta misses\n",
skeleton.bpf_kprobe_fires, skeleton.bpf_trigger_fires, skeleton.bpf_meta_misses,
));
if !skeleton.bpf_miss_ips.is_empty() {
let ips: Vec<String> = skeleton
.bpf_miss_ips
.iter()
.map(|ip| format!("0x{ip:x}"))
.collect();
out.push_str(&format!(" miss_ips: {}\n", ips.join(", ")));
}
}
out
}
pub(crate) fn maybe_dispatch_vm_test() -> Option<i32> {
let args: Vec<String> = std::env::args().collect();
maybe_dispatch_vm_test_with_args(&args)
}
fn build_dispatch_ctx_parts(
entry: &KtstrTestEntry,
args: &[String],
) -> (
crate::topology::TestTopology,
crate::cgroup::CgroupManager,
Option<libc::pid_t>,
crate::assert::Assert,
) {
let topo = match crate::topology::TestTopology::from_system() {
Ok(sys) => sys,
Err(e) => {
eprintln!("ktstr_test: topology from sysfs failed ({e}), using VM spec fallback");
crate::topology::TestTopology::from_vm_topology(&entry.topology)
}
};
let cgroup_root = resolve_cgroup_root(args);
let cgroups = crate::cgroup::CgroupManager::new(&cgroup_root);
let sched_pid = crate::vmm::rust_init::sched_pid();
let merged_assert = crate::assert::Assert::default_checks()
.merge(entry.scheduler.assert())
.merge(&entry.assert);
(topo, cgroups, sched_pid, merged_assert)
}
pub(crate) fn maybe_dispatch_vm_test_with_args(args: &[String]) -> Option<i32> {
let name = extract_test_fn_arg(args)?;
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let probe_stack = extract_probe_stack_arg(args);
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let pipeline = ProbePipeline::new();
let probe_stop = pipeline.stop.clone();
let probe_handle: Option<ProbeHandle> = probe_stack.as_ref().and_then(|stack_input| {
use crate::probe::stack::load_probe_stack;
eprintln!("ktstr_test: probe: loading probe stack from --ktstr-probe-stack");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let mut functions = crate::probe::stack::filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let stack_display_names: Vec<&str> = functions
.iter()
.filter(|f| f.is_bpf)
.map(|f| f.display_name.as_str())
.collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&stack_display_names);
pipe_diag.bpf_discovered = bpf_syms.len() as u32;
if !bpf_syms.is_empty() {
eprintln!(
"ktstr_test: probe: {} BPF symbols discovered",
bpf_syms.len()
);
functions.extend(bpf_syms);
}
let functions = crate::probe::stack::expand_bpf_to_kernel_callers(functions);
pipe_diag.total_after_expand = functions.len() as u32;
if functions.is_empty() {
eprintln!("ktstr_test: no traceable functions from --ktstr-probe-stack");
return None;
}
eprintln!(
"ktstr_test: probe: {} functions loaded, spawning probe thread",
functions.len()
);
let kernel_names: Vec<&str> = functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
let mut btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let bpf_btf_args: Vec<(&str, u32)> = functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
if !bpf_btf_args.is_empty() {
btf_funcs.extend(crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args));
}
let func_names: Vec<(u32, String)> = functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&functions);
let pnames = crate::probe::output::build_param_names(&btf_funcs);
let rhints = crate::probe::output::build_render_hints(&btf_funcs);
let pnames_thread = pnames.clone();
let rhints_thread = rhints.clone();
let thread_pipeline = pipeline.clone();
let funcs = functions.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let handle = std::thread::spawn(move || {
use crate::probe::process::run_probe_skeleton;
let (events, diag, accumulated_fn_names) = run_probe_skeleton(
&funcs,
&btf_funcs,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
None,
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames_thread,
&rhints_thread,
);
thread_pipeline.output_done.set();
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
Some(ProbeHandle {
thread: handle,
func_names,
pipeline_diag: pipe_diag,
output_done: pipeline.output_done.clone(),
param_names: pnames,
render_hints: rhints,
})
});
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.workers_per_cgroup(entry.workers_per_cgroup as usize)
.sched_pid(sched_pid)
.settle(Duration::from_millis(500))
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.build();
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult {
passed: false,
skipped: false,
details: vec![format!("{e:#}").into()],
stats: Default::default(),
measurements: std::collections::BTreeMap::new(),
};
publish_result_and_collect(&r, probe_stop, probe_handle);
return Some(1);
}
};
let exit_code = if result.passed { 0 } else { 1 };
publish_result_and_collect(&result, probe_stop, probe_handle);
Some(exit_code)
}
type ProbeThreadResult = (
Option<Vec<crate::probe::process::ProbeEvent>>,
crate::probe::process::ProbeDiagnostics,
Vec<(u32, String)>,
);
struct ProbeHandle {
thread: std::thread::JoinHandle<ProbeThreadResult>,
func_names: Vec<(u32, String)>,
pipeline_diag: PipelineDiagnostics,
output_done: std::sync::Arc<crate::sync::Latch>,
param_names: std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Clone, Default)]
pub(crate) struct ProbePipeline {
pub stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub output_done: std::sync::Arc<crate::sync::Latch>,
pub probes_ready: std::sync::Arc<crate::sync::Latch>,
}
impl ProbePipeline {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct PipelineDiagnostics {
pub stack_extracted: u32,
pub filter_dropped: Vec<String>,
pub bpf_discovered: u32,
pub total_after_expand: u32,
}
pub(crate) struct ProbePhaseAState {
pub handle: std::thread::JoinHandle<ProbeThreadResult>,
pub phase_b_tx: std::sync::mpsc::Sender<crate::probe::process::PhaseBInput>,
pub pipeline: ProbePipeline,
pub kernel_func_names: Vec<(u32, String)>,
pub kernel_func_count: u32,
pub pipe_diag: PipelineDiagnostics,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
pub(crate) fn start_probe_phase_a(args: &[String]) -> Option<ProbePhaseAState> {
use crate::probe::stack::{filter_traceable, load_probe_stack};
let stack_input = extract_probe_stack_arg(args)?;
eprintln!("ktstr_test: probe phase_a: loading kernel functions");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(&stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let functions = filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let kernel_functions: Vec<crate::probe::stack::StackFunction> =
functions.into_iter().filter(|f| !f.is_bpf).collect();
let kernel_names: Vec<&str> = kernel_functions
.iter()
.map(|f| f.raw_name.as_str())
.collect();
let btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let func_names: Vec<(u32, String)> = kernel_functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
pipe_diag.total_after_expand = kernel_functions.len() as u32;
let bpf_fds = std::collections::HashMap::new(); let param_names = crate::probe::output::build_param_names(&btf_funcs);
let render_hints = crate::probe::output::build_render_hints(&btf_funcs);
let pipeline = ProbePipeline::new();
let (phase_b_tx, phase_b_rx) = std::sync::mpsc::channel();
let thread_pipeline = pipeline.clone();
let funcs = kernel_functions.clone();
let btf = btf_funcs.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let pnames = param_names.clone();
let rhints = render_hints.clone();
let handle = std::thread::spawn(move || {
let (events, diag, accumulated_fn_names) = crate::probe::process::run_probe_skeleton(
&funcs,
&btf,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
Some(phase_b_rx),
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames,
&rhints,
);
thread_pipeline.output_done.set();
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
eprintln!(
"ktstr_test: probe phase_a: {} kernel functions attached, waiting for Phase B",
kernel_functions.len(),
);
let kernel_func_count = kernel_functions.len() as u32;
Some(ProbePhaseAState {
handle,
phase_b_tx,
pipeline,
kernel_func_names: func_names,
kernel_func_count,
pipe_diag,
param_names,
render_hints,
})
}
pub(crate) fn maybe_dispatch_vm_test_with_phase_a(
args: &[String],
pa: ProbePhaseAState,
) -> Option<i32> {
use crate::probe::btf::discover_bpf_symbols;
use crate::probe::stack::expand_bpf_to_kernel_callers;
let name = extract_test_fn_arg(args)?;
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let ProbePhaseAState {
handle: pa_handle,
phase_b_tx: pa_phase_b_tx,
pipeline: pa_pipeline,
kernel_func_names: pa_kernel_func_names,
kernel_func_count: pa_kernel_func_count,
pipe_diag: pa_pipe_diag,
param_names: pa_param_names,
render_hints: pa_render_hints,
} = pa;
eprintln!("ktstr_test: probe phase_b: discovering BPF symbols");
let stack_display_names: Vec<&str> = Vec::new(); let bpf_syms = discover_bpf_symbols(&stack_display_names);
eprintln!(
"ktstr_test: probe phase_b: {} BPF symbols discovered",
bpf_syms.len()
);
if !bpf_syms.is_empty() {
let phase_b_functions = expand_bpf_to_kernel_callers(bpf_syms);
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&phase_b_functions);
let bpf_btf_args: Vec<(&str, u32)> = phase_b_functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
let mut phase_b_btf = if !bpf_btf_args.is_empty() {
crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args)
} else {
Vec::new()
};
let kernel_caller_names: Vec<&str> = phase_b_functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
if !kernel_caller_names.is_empty() {
phase_b_btf.extend(crate::probe::btf::parse_btf_functions(
&kernel_caller_names,
None,
));
}
let phase_b_done = std::sync::Arc::new(crate::sync::Latch::new());
let phase_b_done_clone = phase_b_done.clone();
let phase_b_input = crate::probe::process::PhaseBInput {
functions: phase_b_functions,
bpf_prog_fds: bpf_fds,
btf_funcs: phase_b_btf,
done: phase_b_done_clone,
func_idx_offset: pa_kernel_func_count,
};
if let Err(e) = pa_phase_b_tx.send(phase_b_input) {
eprintln!("ktstr_test: probe phase_b: failed to send: {e}");
} else {
phase_b_done.wait();
eprintln!("ktstr_test: probe phase_b: BPF fentry attached");
}
} else {
eprintln!("ktstr_test: probe phase_b: no BPF symbols, skipping fentry");
drop(pa_phase_b_tx);
}
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.workers_per_cgroup(entry.workers_per_cgroup as usize)
.sched_pid(sched_pid)
.settle(std::time::Duration::from_millis(500))
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.build();
let stop = pa_pipeline.stop.clone();
let handle = ProbeHandle {
thread: pa_handle,
func_names: pa_kernel_func_names,
pipeline_diag: pa_pipe_diag,
output_done: pa_pipeline.output_done,
param_names: pa_param_names,
render_hints: pa_render_hints,
};
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult {
passed: false,
skipped: false,
details: vec![format!("{e:#}").into()],
stats: Default::default(),
measurements: std::collections::BTreeMap::new(),
};
publish_result_and_collect(&r, stop, Some(handle));
return Some(1);
}
};
let exit_code = if result.passed { 0 } else { 1 };
publish_result_and_collect(&result, stop, Some(handle));
Some(exit_code)
}
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytes {
pub events: Vec<crate::probe::process::ProbeEvent>,
pub func_names: Vec<(u32, String)>,
pub bpf_source_locs: std::collections::HashMap<String, String>,
pub diagnostics: Option<ProbeBytesDiagnostics>,
pub nr_cpus: Option<u32>,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytesDiagnostics {
pub pipeline: PipelineDiagnostics,
pub skeleton: crate::probe::process::ProbeDiagnostics,
}
fn emit_probe_payload(
events: &[crate::probe::process::ProbeEvent],
func_names: &[(u32, String)],
pipeline_diag: &PipelineDiagnostics,
skeleton_diag: &crate::probe::process::ProbeDiagnostics,
param_names: &std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: &std::collections::HashMap<String, crate::probe::btf::RenderHint>,
) {
let source_loc_names: Vec<&str> = func_names.iter().map(|(_, name)| name.as_str()).collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&source_loc_names);
let bpf_prog_ids: Vec<u32> = func_names
.iter()
.filter_map(|(_, name)| {
bpf_syms
.iter()
.find(|s| s.display_name == *name)
.and_then(|s| s.bpf_prog_id)
})
.collect();
let bpf_source_locs = crate::probe::btf::resolve_bpf_source_locs(&bpf_prog_ids);
let payload = ProbeBytes {
events: events.to_vec(),
func_names: func_names.to_vec(),
bpf_source_locs,
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: pipeline_diag.clone(),
skeleton: skeleton_diag.clone(),
}),
nr_cpus: crate::probe::output::get_nr_cpus(),
param_names: param_names.clone(),
render_hints: render_hints.clone(),
};
println!("{PROBE_OUTPUT_START}");
if let Ok(json) = serde_json::to_string(&payload) {
println!("{json}");
}
println!("{PROBE_OUTPUT_END}");
}
fn publish_result_and_collect(
result: &AssertResult,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
try_flush_profraw();
print_assert_result(result);
collect_and_print_probe_data(stop, handle);
}
fn collect_and_print_probe_data(
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
let Some(ph) = handle else {
return;
};
stop.store(true, std::sync::atomic::Ordering::Release);
let (events, skeleton_diag, accumulated_fn_names) = match ph.thread.join() {
Ok((Some(events), diag, fnames)) => (events, diag, fnames),
Ok((None, diag, fnames)) => (Vec::new(), diag, fnames),
Err(payload) => {
let msg = if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic>".to_string()
};
let diag = crate::probe::process::ProbeDiagnostics {
host_thread_panic: Some(msg),
..Default::default()
};
(Vec::new(), diag, Vec::new())
}
};
let effective_fn_names = if accumulated_fn_names.is_empty() {
&ph.func_names
} else {
&accumulated_fn_names
};
if !ph.output_done.is_set() {
emit_probe_payload(
&events,
effective_fn_names,
&ph.pipeline_diag,
&skeleton_diag,
&ph.param_names,
&ph.render_hints,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_probe_output_valid_json() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 42)],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "schedule".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("noise\n{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}\nmore");
let parsed = extract_probe_output(&output, None, None);
assert!(parsed.is_some());
let formatted = parsed.unwrap();
assert!(
formatted.contains("schedule"),
"should contain func name: {formatted}"
);
assert!(
formatted.contains("pid"),
"should contain field name: {formatted}"
);
}
#[test]
fn extract_probe_output_missing() {
assert!(extract_probe_output("no markers", None, None).is_none());
}
#[test]
fn extract_probe_output_empty() {
let output = format!("{PROBE_OUTPUT_START}\n\n{PROBE_OUTPUT_END}");
assert!(extract_probe_output(&output, None, None).is_none());
}
#[test]
fn extract_probe_output_invalid_json() {
let output = format!("{PROBE_OUTPUT_START}\nnot valid json\n{PROBE_OUTPUT_END}");
assert!(extract_probe_output(&output, None, None).is_none());
}
#[test]
fn extract_probe_output_enriched_fields() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0xDEAD, 0, 0, 0, 0, 0],
fields: vec![
("prev:task_struct.pid".to_string(), 42),
("prev:task_struct.scx_flags".to_string(), 0x1c),
],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 1,
ts: 200,
args: [0; 6],
fields: vec![("rq:rq.cpu".to_string(), 3)],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![
(0, "schedule".to_string()),
(1, "pick_task_scx".to_string()),
],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let output = format!("{PROBE_OUTPUT_START}\n{json}\n{PROBE_OUTPUT_END}");
let formatted = extract_probe_output(&output, None, None).unwrap();
assert!(formatted.contains("pid"), "pid field: {formatted}");
assert!(formatted.contains("42"), "pid value: {formatted}");
assert!(
formatted.contains("scx_flags"),
"scx_flags field: {formatted}"
);
assert!(formatted.contains("cpu"), "cpu field: {formatted}");
assert!(formatted.contains("3"), "cpu value: {formatted}");
assert!(
formatted.contains("task_struct *prev"),
"type header for task_struct: {formatted}"
);
assert!(
formatted.contains("rq *rq"),
"type header for rq: {formatted}"
);
assert!(
!formatted.contains("arg0"),
"raw args should not appear when fields exist: {formatted}"
);
assert!(formatted.contains("schedule"), "func schedule: {formatted}");
assert!(
formatted.contains("pick_task_scx"),
"func pick_task_scx: {formatted}"
);
}
fn truncate_probe_json(payload: &ProbeBytes, cut_after: usize) -> String {
let json = serde_json::to_string(payload).expect("serialize ProbeBytes");
json[..cut_after.min(json.len())].to_string()
}
#[test]
fn extract_probe_output_truncated_recovers_complete_events() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 0xb,
ts: 200,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 22)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 2,
task_ptr: 0xc,
ts: 300,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 33)],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![
(0, "first".to_string()),
(1, "second".to_string()),
(2, "third".to_string()),
],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let events_start = full.find("\"events\":[").unwrap() + "\"events\":[".len();
let mut depth: u32 = 0;
let mut in_string = false;
let mut escape = false;
let mut event_starts: Vec<usize> = Vec::new();
for (i, b) in full.bytes().enumerate().skip(events_start) {
if in_string {
if escape {
escape = false;
} else if b == b'\\' {
escape = true;
} else if b == b'"' {
in_string = false;
}
continue;
}
match b {
b'"' => in_string = true,
b'{' => {
if depth == 0 {
event_starts.push(i);
}
depth += 1;
}
b'}' => depth = depth.saturating_sub(1),
b']' if depth == 0 => break,
_ => {}
}
}
assert_eq!(
event_starts.len(),
3,
"test fixture should produce 3 events"
);
let cut = event_starts[2] + 5;
let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("payload.partial.json");
let formatted = extract_probe_output(&output, None, Some(&partial))
.expect("recovery must surface partial events");
assert!(
formatted.contains("unknown"),
"recovered events should print under `unknown` func header (no func_names): {formatted}",
);
assert!(
formatted.contains("11"),
"first event's pid value should appear: {formatted}",
);
assert!(
formatted.contains("22"),
"second event's pid value should appear: {formatted}",
);
assert!(
!formatted.contains("33"),
"third (truncated) event's pid value must NOT appear: {formatted}",
);
assert!(
partial.exists(),
"raw truncated payload must be written to partial dump path",
);
let dumped = std::fs::read_to_string(&partial).expect("read partial dump");
assert_eq!(
dumped.trim(),
truncated.trim(),
"partial dump must contain the raw extracted JSON verbatim",
);
}
#[test]
fn extract_probe_output_truncated_no_complete_events_returns_none() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "first".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let events_open = full.find("\"events\":[").unwrap() + "\"events\":[".len();
let cut = events_open + 5;
let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("payload.partial.json");
let result = extract_probe_output(&output, None, Some(&partial));
assert!(
result.is_none(),
"no complete events recoverable should yield None: {result:?}",
);
assert!(
partial.exists(),
"partial dump must be written even when recovery yields zero events",
);
let dumped = std::fs::read_to_string(&partial).expect("read partial dump");
assert_eq!(dumped.trim(), truncated.trim());
}
#[test]
fn extract_probe_output_truncated_string_value_recovers_prior() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 0xb,
ts: 200,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: Some("a".repeat(200)),
..Default::default()
},
],
func_names: vec![(0, "first".to_string()), (1, "second".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let needle = "aaaaaaaaaa"; let idx = full
.find(needle)
.expect("fixture must contain the long string");
let cut = idx + needle.len() + 50; let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let formatted = extract_probe_output(&output, None, None)
.expect("recovery must surface the first complete event");
assert!(
formatted.contains("11"),
"first event's pid must survive truncation in the second event: {formatted}",
);
}
#[test]
fn extract_probe_output_truncated_partial_dump_path_none_skips_write() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![
ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![("p:task_struct.pid".to_string(), 11)],
kstack: vec![],
str_val: None,
..Default::default()
},
ProbeEvent {
func_idx: 1,
task_ptr: 0xb,
ts: 200,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
},
],
func_names: vec![(0, "f0".to_string()), (1, "f1".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let first_close = full.find("},{").expect("two events present");
let cut = first_close + 3; let truncated = &full[..cut];
let output = format!("{PROBE_OUTPUT_START}\n{truncated}\n");
let formatted = extract_probe_output(&output, None, None)
.expect("recovery must yield the first event without a dump path");
assert!(
formatted.contains("11"),
"first event recovered: {formatted}"
);
}
#[test]
fn parse_probe_payload_strict_success() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![(0, "schedule".to_string())],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let json = serde_json::to_string(&payload).unwrap();
let parsed = parse_probe_payload(&json, None).expect("strict parse must succeed");
assert_eq!(parsed.events.len(), 1);
assert_eq!(parsed.func_names.len(), 1);
}
#[test]
fn parse_probe_payload_eof_with_dump_path_writes_file_returns_recovered() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 0xa,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let first_close = full.find("}]").expect("single-event array ends with `}]`");
let truncated = &full[..first_close + 1]; let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("dump.json");
let parsed = parse_probe_payload(truncated, Some(&partial))
.expect("EOF with one complete event must recover");
assert_eq!(parsed.events.len(), 1);
assert!(parsed.func_names.is_empty());
assert!(parsed.diagnostics.is_none());
assert!(partial.exists());
assert_eq!(std::fs::read_to_string(&partial).unwrap(), truncated);
}
#[test]
fn parse_probe_payload_non_eof_error_returns_none_and_dumps() {
let dir = tempfile::tempdir().expect("tempdir");
let partial = dir.path().join("dump.json");
let result = parse_probe_payload("not json", Some(&partial));
assert!(result.is_none());
assert!(partial.exists());
assert_eq!(std::fs::read_to_string(&partial).unwrap(), "not json");
}
#[test]
fn parse_probe_payload_truncated_no_dump_path_still_recovers() {
use crate::probe::process::ProbeEvent;
let payload = ProbeBytes {
events: vec![ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: None,
..Default::default()
}],
func_names: vec![],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let first_close = full.find("}]").unwrap();
let truncated = &full[..first_close + 1];
let parsed = parse_probe_payload(truncated, None).expect("recovery without dump path");
assert_eq!(parsed.events.len(), 1);
}
#[test]
fn recover_partial_events_no_events_key_returns_empty() {
assert!(recover_partial_events(r#"{"foo":1}"#).is_empty());
assert!(recover_partial_events("").is_empty());
}
#[test]
fn recover_partial_events_empty_array() {
assert!(recover_partial_events(r#"{"events":[]"#).is_empty());
assert!(recover_partial_events(r#"{"events":["#).is_empty());
}
#[test]
fn recover_partial_events_handles_braces_in_strings() {
use crate::probe::process::ProbeEvent;
let event = ProbeEvent {
func_idx: 0,
task_ptr: 1,
ts: 100,
args: [0; 6],
fields: vec![],
kstack: vec![],
str_val: Some("contains {nested} and \\\"quoted\\\"".to_string()),
..Default::default()
};
let event_json = serde_json::to_string(&event).unwrap();
let payload = format!(r#"{{"events":[{event_json}]}}"#);
let cut = payload.rfind(']').unwrap();
let truncated = &payload[..cut];
let recovered = recover_partial_events(truncated);
assert_eq!(
recovered.len(),
1,
"one event should recover: {recovered:?}"
);
assert_eq!(
recovered[0].str_val.as_deref(),
Some("contains {nested} and \\\"quoted\\\""),
);
}
#[test]
fn find_balanced_object_end_simple_object() {
assert_eq!(find_balanced_object_end("{}"), Some(2));
assert_eq!(find_balanced_object_end("{}rest"), Some(2));
}
#[test]
fn find_balanced_object_end_nested_objects() {
assert_eq!(find_balanced_object_end(r#"{"a":{"b":{}}}"#), Some(14));
}
#[test]
fn find_balanced_object_end_braces_in_strings_ignored() {
assert_eq!(find_balanced_object_end(r#"{"x":"{{}}"}"#), Some(12));
}
#[test]
fn find_balanced_object_end_escaped_quote_does_not_close_string() {
let s = r#"{"x":"\"}"}"#;
assert_eq!(find_balanced_object_end(s), Some(s.len()));
}
#[test]
fn find_balanced_object_end_truncated_returns_none() {
assert_eq!(find_balanced_object_end(r#"{"a":1"#), None);
}
#[test]
fn find_balanced_object_end_truncated_in_string_returns_none() {
assert_eq!(find_balanced_object_end(r#"{"a":"hello"#), None);
}
#[test]
fn find_balanced_object_end_non_object_returns_none() {
assert_eq!(find_balanced_object_end("[1,2]"), None);
assert_eq!(find_balanced_object_end(""), None);
assert_eq!(find_balanced_object_end("null"), None);
}
#[test]
fn truncate_probe_json_clamps_to_full_length() {
let payload = ProbeBytes {
events: vec![],
func_names: vec![],
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
};
let full = serde_json::to_string(&payload).unwrap();
let s = truncate_probe_json(&payload, full.len() + 1024);
assert_eq!(s, full);
assert_eq!(truncate_probe_json(&payload, 0), "");
}
#[test]
fn format_tail_empty_text_returns_none() {
assert_eq!(format_tail("", 5, "scheduler"), None);
}
#[test]
fn format_tail_fewer_lines_than_n_returns_all() {
let out = format_tail("one\ntwo\nthree", 10, "scheduler").unwrap();
assert_eq!(out, "--- scheduler ---\none\ntwo\nthree");
}
#[test]
fn format_tail_trims_to_last_n_lines() {
let out = format_tail("1\n2\n3\n4\n5", 3, "log").unwrap();
assert_eq!(out, "--- log ---\n3\n4\n5");
}
#[test]
fn format_tail_zero_n_returns_empty_body_under_header() {
let out = format_tail("a\nb", 0, "hdr").unwrap();
assert_eq!(out, "--- hdr ---\n");
}
#[test]
fn format_tail_preserves_trailing_blank_lines() {
let out = format_tail("a\n\nb", 3, "hdr").unwrap();
assert_eq!(out, "--- hdr ---\na\n\nb");
}
#[test]
fn parse_rust_env_empty_cmdline_is_empty() {
assert!(parse_rust_env_from_cmdline("").is_empty());
}
#[test]
fn parse_rust_env_no_matches() {
assert!(parse_rust_env_from_cmdline("console=ttyS0 ro quiet").is_empty());
}
#[test]
fn parse_rust_env_backtrace_only() {
let parsed = parse_rust_env_from_cmdline("console=ttyS0 RUST_BACKTRACE=1 ro");
assert_eq!(parsed, vec![("RUST_BACKTRACE", "1")]);
}
#[test]
fn parse_rust_env_log_only() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=debug other=x");
assert_eq!(parsed, vec![("RUST_LOG", "debug")]);
}
#[test]
fn parse_rust_env_both() {
let parsed = parse_rust_env_from_cmdline("RUST_BACKTRACE=full RUST_LOG=trace other=y");
assert_eq!(
parsed,
vec![("RUST_BACKTRACE", "full"), ("RUST_LOG", "trace")]
);
}
#[test]
fn parse_rust_env_preserves_token_order() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=info RUST_BACKTRACE=1");
assert_eq!(parsed, vec![("RUST_LOG", "info"), ("RUST_BACKTRACE", "1")]);
}
#[test]
fn parse_rust_env_empty_value() {
let parsed = parse_rust_env_from_cmdline("RUST_LOG=");
assert_eq!(parsed, vec![("RUST_LOG", "")]);
}
#[test]
fn parse_rust_env_ignores_prefix_mismatch() {
assert!(parse_rust_env_from_cmdline("xRUST_LOG=x").is_empty());
}
#[test]
fn parse_rust_env_sidecar_dir() {
let parsed = parse_rust_env_from_cmdline(
"console=ttyS0 KTSTR_SIDECAR_DIR=/host/target/ktstr/run-key ro",
);
assert_eq!(
parsed,
vec![("KTSTR_SIDECAR_DIR", "/host/target/ktstr/run-key")]
);
}
#[test]
fn parse_rust_env_all_three_keys() {
let parsed =
parse_rust_env_from_cmdline("RUST_LOG=info KTSTR_SIDECAR_DIR=/dir RUST_BACKTRACE=1");
assert_eq!(
parsed,
vec![
("RUST_LOG", "info"),
("KTSTR_SIDECAR_DIR", "/dir"),
("RUST_BACKTRACE", "1")
]
);
}
fn lifecycle_drain(
phase: crate::vmm::wire::LifecyclePhase,
reason: &str,
) -> crate::vmm::host_comms::BulkDrainResult {
let mut payload = vec![phase.wire_value()];
payload.extend_from_slice(reason.as_bytes());
crate::vmm::host_comms::BulkDrainResult {
entries: vec![crate::vmm::wire::ShmEntry {
msg_type: crate::vmm::wire::MSG_TYPE_LIFECYCLE,
payload,
crc_ok: true,
}],
}
}
#[test]
fn extract_not_attached_reason_timeout() {
let drain = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"timeout",
);
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("timeout"),
);
}
#[test]
fn extract_not_attached_reason_sysfs_absent() {
let drain = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"sched_ext sysfs absent",
);
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("sched_ext sysfs absent"),
);
}
#[test]
fn extract_not_attached_reason_trims_surrounding_whitespace() {
let drain = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
" timeout ",
);
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("timeout"),
);
}
#[test]
fn extract_not_attached_reason_absent_returns_none() {
assert_eq!(extract_not_attached_reason(None), None);
let died = lifecycle_drain(crate::vmm::wire::LifecyclePhase::SchedulerDied, "");
assert_eq!(extract_not_attached_reason(Some(&died)), None);
}
#[test]
fn extract_not_attached_reason_empty_suffix_returns_none() {
let drain = lifecycle_drain(crate::vmm::wire::LifecyclePhase::SchedulerNotAttached, "");
assert_eq!(extract_not_attached_reason(Some(&drain)), None);
let drain_ws = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
" ",
);
assert_eq!(extract_not_attached_reason(Some(&drain_ws)), None);
}
#[test]
fn extract_not_attached_reason_first_match_wins() {
let drain = crate::vmm::host_comms::BulkDrainResult {
entries: vec![
lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"timeout",
)
.entries
.pop()
.unwrap(),
lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"sched_ext sysfs absent",
)
.entries
.pop()
.unwrap(),
],
};
assert_eq!(
extract_not_attached_reason(Some(&drain)).as_deref(),
Some("timeout"),
);
}
#[test]
fn extract_not_attached_reason_skips_crc_bad() {
let mut bad = lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
"timeout",
);
bad.entries[0].crc_ok = false;
assert_eq!(extract_not_attached_reason(Some(&bad)), None);
}
fn died_drain() -> crate::vmm::host_comms::BulkDrainResult {
lifecycle_drain(crate::vmm::wire::LifecyclePhase::SchedulerDied, "")
}
fn not_attached_drain(reason: &str) -> crate::vmm::host_comms::BulkDrainResult {
lifecycle_drain(
crate::vmm::wire::LifecyclePhase::SchedulerNotAttached,
reason,
)
}
#[test]
fn classify_repro_vm_status_timeout_wins_over_other_signals() {
let drain = not_attached_drain("timeout");
let status = classify_repro_vm_status(
true,
true,
"",
137,
Some(&drain),
);
assert_eq!(status, "repro VM: timed out");
}
#[test]
fn classify_repro_vm_status_not_attached_with_reason() {
let drain = not_attached_drain("sched_ext sysfs absent");
let status = classify_repro_vm_status(false, false, "", 1, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler did not attach (sched_ext sysfs absent) (exit code 1)",
);
}
#[test]
fn classify_repro_vm_status_not_attached_takes_precedence_over_crashed() {
let mut drain = died_drain();
drain
.entries
.push(not_attached_drain("timeout").entries.pop().unwrap());
let status = classify_repro_vm_status(false, true, "", 1, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler did not attach (timeout) (exit code 1)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", 139, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler crashed — exited with non-zero status (139)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_crash_message() {
let status = classify_repro_vm_status(false, true, "no sentinels here", 134, None);
assert_eq!(
status,
"repro VM: scheduler crashed — exited with non-zero status (134)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_qemu_clean_exit() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", 0, Some(&drain));
assert_eq!(status, "repro VM: scheduler crashed — exited cleanly");
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_killed_by_signal() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", -9, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler crashed — killed by signal (-9)",
);
}
#[test]
fn classify_repro_vm_status_crashed_from_sentinel_vmm_exit_code_unset() {
let drain = died_drain();
let status = classify_repro_vm_status(false, false, "", -1, Some(&drain));
assert_eq!(
status,
"repro VM: scheduler crashed — VM host reported no final exit \
status (the scheduler did not deliver an exit signal before \
the VM ended)",
);
assert!(
!status.contains("BSP"),
"user-facing status leaks BSP: {status}"
);
assert!(
!status.contains("VmResult::exit_code"),
"user-facing status leaks VmResult::exit_code: {status}",
);
assert!(
!status.contains("MSG_TYPE_EXIT"),
"user-facing status leaks MSG_TYPE_EXIT: {status}",
);
}
#[test]
fn classify_repro_vm_status_abnormal_exit() {
let status = classify_repro_vm_status(false, false, "clean output", 2, None);
assert_eq!(status, "repro VM: exited abnormally (exit code 2)");
}
#[test]
fn classify_repro_vm_status_clean_run() {
let status = classify_repro_vm_status(false, false, "clean output", 0, None);
assert_eq!(
status,
"repro VM: scheduler ran normally (crash did not reproduce)",
);
}
#[test]
fn classify_repro_vm_status_malformed_not_attached_falls_through() {
let drain = not_attached_drain("");
let status = classify_repro_vm_status(false, false, "", 1, Some(&drain));
assert_eq!(status, "repro VM: exited abnormally (exit code 1)");
}
#[test]
fn render_failure_dump_file_missing_returns_none() {
let nonexistent = std::env::temp_dir().join("ktstr-render-failure-dump-missing");
let _ = std::fs::remove_file(&nonexistent);
assert!(render_failure_dump_file(&nonexistent).is_none());
}
#[test]
fn render_failure_dump_file_single_schema() {
use crate::monitor::dump::{FailureDumpReport, SCHEMA_SINGLE};
let report = FailureDumpReport {
schema: SCHEMA_SINGLE.to_string(),
..Default::default()
};
let json = serde_json::to_string(&report).expect("serialize single");
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
let rendered =
render_failure_dump_file(tmp.path()).expect("single-schema must render Some");
assert!(
rendered.starts_with("--- repro VM failure dump ---"),
"header missing: {rendered}"
);
assert!(
rendered.contains("(empty failure dump)"),
"single-schema body must come from FailureDumpReport Display: {rendered}"
);
}
#[test]
fn render_failure_dump_file_dual_schema() {
use crate::monitor::dump::{DualFailureDumpReport, FailureDumpReport, SCHEMA_DUAL};
let dual = DualFailureDumpReport {
schema: SCHEMA_DUAL.to_string(),
early: None,
late: FailureDumpReport::default(),
early_max_age_jiffies: 0,
early_threshold_jiffies: 0,
early_skipped_reason: None,
};
let json = serde_json::to_string(&dual).expect("serialize dual");
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
let rendered = render_failure_dump_file(tmp.path()).expect("dual-schema must render Some");
assert!(
rendered.starts_with("--- repro VM failure dump ---"),
"header missing: {rendered}"
);
assert!(
rendered.contains("DualFailureDumpReport:"),
"dual-schema body must come from DualFailureDumpReport Display: {rendered}"
);
}
#[test]
fn render_failure_dump_file_absent_schema_defaults_to_single() {
let json = r#"{"maps":[],"vcpu_regs":[],"sdt_allocations":[]}"#;
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
let rendered =
render_failure_dump_file(tmp.path()).expect("absent-schema JSON must render as single");
assert!(
rendered.contains("(empty failure dump)"),
"absent schema must default to single: {rendered}"
);
}
#[test]
fn render_failure_dump_file_unknown_schema_returns_none() {
let json = r#"{"schema":"triple","maps":[],"vcpu_regs":[],"sdt_allocations":[]}"#;
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), json).expect("write tempfile");
assert!(render_failure_dump_file(tmp.path()).is_none());
}
#[test]
fn render_failure_dump_file_invalid_json_returns_none() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
std::fs::write(tmp.path(), "not json").expect("write tempfile");
assert!(render_failure_dump_file(tmp.path()).is_none());
}
}