use std::path::Path;
use std::time::{Duration, Instant};
use crate::assert::AssertResult;
use super::args::{
extract_probe_stack_arg, extract_test_fn_arg, extract_work_type_arg, resolve_cgroup_root,
};
use super::entry::find_test;
use super::output::{extract_sched_ext_dump, print_assert_result};
use super::profraw::try_flush_profraw;
use super::runtime::{config_content_parts, config_file_parts, verbose};
use super::{KtstrTestEntry, TopoOverride};
use crate::verifier::{
SCHED_OUTPUT_END, SCHED_OUTPUT_START, parse_sched_output, parse_sched_output_partial,
};
const DISCOVER_SENTINEL: &str = "__discover__";
pub(crate) fn propagate_rust_env_from_cmdline() {
let Ok(cmdline) = std::fs::read_to_string("/proc/cmdline") else {
return;
};
for (key, val) in parse_rust_env_from_cmdline(&cmdline) {
unsafe { std::env::set_var(key, val) };
}
}
fn parse_rust_env_from_cmdline(cmdline: &str) -> Vec<(&'static str, &str)> {
let mut out = Vec::new();
let sidecar_prefix = format!("{}=", crate::KTSTR_SIDECAR_DIR_ENV);
for token in cmdline.split_whitespace() {
if let Some(val) = token.strip_prefix("RUST_BACKTRACE=") {
out.push(("RUST_BACKTRACE", val));
} else if let Some(val) = token.strip_prefix("RUST_LOG=") {
out.push(("RUST_LOG", val));
} else if let Some(val) = token.strip_prefix(sidecar_prefix.as_str()) {
out.push((crate::KTSTR_SIDECAR_DIR_ENV, val));
}
}
out
}
pub(crate) const PROBE_OUTPUT_START: &str = "===PROBE_OUTPUT_START===";
pub(crate) const PROBE_OUTPUT_END: &str = "===PROBE_OUTPUT_END===";
fn format_tail(text: &str, n: usize, header: &str) -> Option<String> {
let lines: Vec<&str> = text.lines().collect();
if lines.is_empty() {
return None;
}
let start = lines.len().saturating_sub(n);
Some(format!("--- {header} ---\n{}", lines[start..].join("\n")))
}
pub(crate) const SCHED_EXT_DUMP_MARKER: &str = "sched_ext_dump:";
fn render_dmesg_tail(stderr: &str, tail_lines: usize) -> String {
let mut filter_dropped_any = false;
let filtered: String = stderr
.lines()
.filter(|l| {
let drop = l.contains(SCHED_EXT_DUMP_MARKER);
filter_dropped_any |= drop;
!drop
})
.collect::<Vec<_>>()
.join("\n");
let post_filter_corrupt = classify_dmesg_corruption(&filtered);
if filter_dropped_any && post_filter_corrupt.is_some() {
return "--- repro VM dmesg ---\n(no kernel printk other than \
sched_ext_dump — full dump in section above)"
.to_string();
}
if let Some(diag) = post_filter_corrupt {
return format!("--- repro VM dmesg ---\n{diag}");
}
format_tail(&filtered, tail_lines, "repro VM dmesg")
.unwrap_or_else(|| "--- repro VM dmesg ---\n(unavailable)".to_string())
}
fn classify_dmesg_corruption(text: &str) -> Option<&'static str> {
if text.is_empty() {
return Some("empty (scheduler crashed before kernel printk reached the UART buffer)");
}
let mut saw_corrupt = false;
for c in text.chars() {
if c.is_whitespace() {
continue;
}
if c == '\u{fffd}' || c.is_control() {
saw_corrupt = true;
continue;
}
return None;
}
if saw_corrupt {
Some(
"corrupt or no readable text (UART buffer uninitialized or \
trimmed — scheduler likely crashed before any kernel printk)",
)
} else {
Some("empty (scheduler crashed before kernel printk reached the UART buffer)")
}
}
fn render_failure_dump_file(path: &std::path::Path) -> Option<String> {
use crate::monitor::dump::FailureDumpReportAny;
use std::fmt::Write;
let json = std::fs::read_to_string(path).ok()?;
let any = FailureDumpReportAny::from_json(&json)?;
let mut buf = String::with_capacity(json.len());
buf.push_str("--- repro VM failure dump ---\n");
let _ = write!(buf, "{any}");
Some(buf)
}
fn label_repro_verdict_when_workload_not_reached(
primary_reached_workload: bool,
repro_verdict: &str,
) -> String {
if primary_reached_workload {
repro_verdict.to_string()
} else {
format!(
"PRIMARY DID NOT REACH WORKLOAD — auto-repro is not \
load-bearing (the primary VM's failure prevented the bug \
from being exercised, so the repro's verdict below should \
not be read as evidence about bug reproducibility — the \
bug was never exercised by either run)\n\
{repro_verdict}"
)
}
}
fn classify_repro_vm_status(
timed_out: bool,
has_crash_message: bool,
exit_code: i32,
guest_messages: Option<&crate::vmm::host_comms::BulkDrainResult>,
) -> String {
if timed_out {
return "repro VM: timed out".to_string();
}
if let Some(reason) = extract_not_attached_reason(guest_messages) {
return format!("repro VM: scheduler did not attach ({reason}) (exit code {exit_code})",);
}
let scheduler_died = guest_messages
.map(|d| {
d.entries.iter().any(|e| {
e.msg_type == crate::vmm::wire::MSG_TYPE_LIFECYCLE
&& e.crc_ok
&& !e.payload.is_empty()
&& crate::vmm::wire::LifecyclePhase::from_wire(e.payload[0])
== Some(crate::vmm::wire::LifecyclePhase::SchedulerDied)
})
})
.unwrap_or(false);
if has_crash_message || scheduler_died {
let exit_clause = if exit_code == -1 {
"VM host reported no final exit status (the scheduler did not \
deliver an exit signal before the VM ended)"
.to_string()
} else if exit_code < 0 {
format!("killed by signal ({exit_code})")
} else if exit_code == 0 {
"exited cleanly".to_string()
} else {
format!("exited with non-zero status ({exit_code})")
};
return format!("repro VM: scheduler crashed — {exit_clause}");
}
if exit_code != 0 {
return format!("repro VM: exited abnormally (exit code {exit_code})");
}
"repro VM: scheduler ran normally (crash did not reproduce)".to_string()
}
fn extract_not_attached_reason(
drain: Option<&crate::vmm::host_comms::BulkDrainResult>,
) -> Option<String> {
use crate::vmm::wire::{LifecyclePhase, MSG_TYPE_LIFECYCLE};
let drain = drain?;
for e in &drain.entries {
if e.msg_type != MSG_TYPE_LIFECYCLE || !e.crc_ok || e.payload.is_empty() {
continue;
}
if LifecyclePhase::from_wire(e.payload[0]) != Some(LifecyclePhase::SchedulerNotAttached) {
continue;
}
let reason = String::from_utf8_lossy(&e.payload[1..]).trim().to_string();
if reason.is_empty() {
return None;
}
return Some(reason);
}
None
}
fn write_auto_repro_sidecar_artifacts(
entry: &KtstrTestEntry,
repro_result: &crate::vmm::result::VmResult,
) {
let Some(drain) = repro_result.guest_messages.as_ref() else {
return;
};
for bulk_entry in &drain.entries {
let kind = crate::vmm::wire::MsgType::from_wire(bulk_entry.msg_type);
if let Some(crate::vmm::wire::MsgType::WprofTrace) = kind
&& bulk_entry.crc_ok
&& !bulk_entry.payload.is_empty()
{
let wprof_path = crate::test_support::sidecar::sidecar_dir()
.join(format!("{}.repro.wprof.pb", entry.name));
if let Err(e) = std::fs::create_dir_all(
wprof_path
.parent()
.expect("sidecar_dir join always has parent"),
) {
eprintln!("ktstr_test: auto-repro: create sidecar dir for wprof trace: {e}",);
} else if let Err(e) = std::fs::write(&wprof_path, &bulk_entry.payload) {
eprintln!(
"ktstr_test: auto-repro: write wprof trace to {}: {e}",
wprof_path.display(),
);
}
}
}
}
fn build_repro_vm_builder(
entry: &KtstrTestEntry,
kernel: &Path,
scheduler: Option<&Path>,
ktstr_bin: &Path,
topo: Option<&TopoOverride>,
guest_args: &[String],
) -> Option<(crate::vmm::KtstrVmBuilder, std::path::PathBuf)> {
let cmdline_extra = super::runtime::build_cmdline_extra(entry);
let (vm_topology, memory_mib) = super::runtime::resolve_vm_topology(entry, topo);
let no_perf_mode = super::runtime::no_perf_mode_for_entry(entry);
let mut resolved_staged: Vec<(String, std::path::PathBuf, Vec<String>)> = Vec::new();
for staged in entry.staged_schedulers {
match super::eval::resolve_scheduler(&staged.binary) {
Ok((Some(host_path), _src)) => {
resolved_staged.push((
staged.name.to_string(),
host_path,
staged.sched_args.iter().map(|s| s.to_string()).collect(),
));
}
Ok((None, _)) => {} Err(e) => {
tracing::warn!(
staged_name = %staged.name,
error = %e,
"auto-repro: failed to resolve staged scheduler binary; skipping (Op::AttachScheduler / Op::ReplaceScheduler against this staged entry will fail at dispatch time in the repro VM)"
);
}
}
}
let mut builder = super::runtime::build_vm_builder_base(
entry,
kernel,
ktstr_bin,
scheduler,
&resolved_staged,
vm_topology,
memory_mib,
&cmdline_extra,
guest_args,
no_perf_mode,
);
let repro_dump_path =
super::sidecar::sidecar_dir().join(format!("{}.repro.failure-dump.json", entry.name));
builder = builder.failure_dump_path(&repro_dump_path);
builder = builder
.dual_snapshot(true)
.performance_mode(entry.performance_mode);
#[cfg(feature = "wprof")]
{
builder = match crate::test_support::runtime::attach_wprof_if_requested(
builder,
entry,
"auto-repro",
) {
Ok(b) => b,
Err(e) => {
eprintln!("ktstr_test: {e:#}");
return None;
}
};
}
if let crate::test_support::entry::SchedulerSpec::KernelBuiltin { enable, disable } =
&entry.scheduler.binary
{
builder = builder.sched_enable_cmds(enable);
builder = builder.sched_disable_cmds(disable);
}
let merged_assert = crate::assert::Assert::default_checks()
.merge(&entry.scheduler.assert)
.merge(&entry.assert);
if entry.scheduler.has_bpf_scheduler() {
builder = builder.monitor_thresholds(merged_assert.monitor_thresholds());
}
{
let mut args: Vec<String> = Vec::new();
let declarative_specs: Vec<std::path::PathBuf> = entry
.all_include_files()
.into_iter()
.map(std::path::PathBuf::from)
.collect();
let mut resolved_includes: Vec<(String, std::path::PathBuf, &'static str)> =
if declarative_specs.is_empty() {
Vec::new()
} else {
match crate::cli::resolve_include_files(&declarative_specs) {
Ok(v) => v.into_iter().map(|(a, h)| (a, h, "declarative")).collect(),
Err(e) => {
eprintln!("ktstr_test: auto-repro: include_files resolve: {e:#}");
Vec::new()
}
}
};
if let Some((archive_path, host_path, guest_path)) = config_file_parts(entry) {
resolved_includes.push((archive_path, host_path, "scheduler config_file"));
args.push("--config".to_string());
args.push(guest_path);
}
if let Some((archive_path, host_path, _guest_path, cfg_args)) = config_content_parts(entry)
{
resolved_includes.push((archive_path, host_path, "inline config_content"));
args.extend(cfg_args);
}
match super::eval::dedupe_include_files(&resolved_includes) {
Ok(unioned) if !unioned.is_empty() => {
builder = builder.include_files(unioned);
}
Ok(_) => {}
Err(e) => {
eprintln!("ktstr_test: auto-repro: include_files dedupe: {e:#}");
}
}
super::runtime::append_base_sched_args(entry, &mut args);
if !args.is_empty() {
builder = builder.sched_args(&args);
}
}
Some((builder, repro_dump_path))
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn attempt_auto_repro(
entry: &KtstrTestEntry,
kernel: &Path,
scheduler: Option<&Path>,
ktstr_bin: &Path,
first_vm_output: &str,
console_output: &str,
topo: Option<&TopoOverride>,
primary_exit_kind: Option<u64>,
primary_reached_workload: bool,
) -> Option<String> {
use crate::probe::stack::extract_stack_functions_all;
let auto_repro_start = Instant::now();
let has_sched_start = first_vm_output.contains(SCHED_OUTPUT_START);
let has_sched_end = first_vm_output.contains(SCHED_OUTPUT_END);
eprintln!(
"ktstr_test: auto-repro: COM2 length={} has_sched_start={has_sched_start} has_sched_end={has_sched_end}",
first_vm_output.len(),
);
let sched_output = parse_sched_output_partial(first_vm_output);
let stack_funcs = if let Some(sched) = sched_output {
let funcs = extract_stack_functions_all(sched);
if funcs.is_empty() {
if has_sched_start && !has_sched_end {
eprintln!(
"ktstr_test: auto-repro: no functions from partial COM2 (missing \
SCHED_OUTPUT_END), trying COM1",
);
} else {
eprintln!("ktstr_test: auto-repro: no functions from COM2, trying COM1");
}
extract_stack_functions_all(console_output)
} else {
if has_sched_start && !has_sched_end {
eprintln!(
"ktstr_test: auto-repro: extracted {} functions from partial COM2 \
(missing SCHED_OUTPUT_END)",
funcs.len(),
);
}
funcs
}
} else {
eprintln!("ktstr_test: auto-repro: no scheduler output on COM2, trying COM1");
extract_stack_functions_all(console_output)
};
let func_names: Vec<String> = stack_funcs.iter().map(|f| f.raw_name.clone()).collect();
let is_stall = primary_exit_kind == Some(crate::probe::scx_defs::EXIT_ERROR_STALL);
let mut guest_args = vec![
"run".to_string(),
"--ktstr-test-fn".to_string(),
entry.name.to_string(),
];
if !is_stall {
let probe_arg = if func_names.is_empty() {
eprintln!(
"ktstr_test: auto-repro: no stack functions, using BPF discovery in repro VM"
);
format!("--ktstr-probe-stack={DISCOVER_SENTINEL}")
} else {
eprintln!(
"ktstr_test: auto-repro: probing {} functions in second VM",
func_names.len()
);
format!("--ktstr-probe-stack={}", func_names.join(","))
};
guest_args.push(probe_arg);
} else {
eprintln!("ktstr_test: auto-repro: stall exit — skipping probe attachment");
}
let (builder, repro_dump_path) =
build_repro_vm_builder(entry, kernel, scheduler, ktstr_bin, topo, &guest_args)?;
let build_start = Instant::now();
let vm = match builder.build() {
Ok(vm) => vm,
Err(e) => {
eprintln!("ktstr_test: auto-repro: failed to build VM: {e:#}");
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = "build_failed",
"auto_repro: total",
);
return None;
}
};
tracing::info!(
elapsed_ms = build_start.elapsed().as_millis() as u64,
"auto_repro: vm_build",
);
let run_start = Instant::now();
let repro_result = match vm.run() {
Ok(r) => r,
Err(e) => {
eprintln!("ktstr_test: auto-repro: VM run failed: {e:#}");
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = "run_failed",
"auto_repro: total",
);
return None;
}
};
tracing::info!(
elapsed_ms = run_start.elapsed().as_millis() as u64,
guest_duration_ms = repro_result.duration.as_millis() as u64,
"auto_repro: vm_run",
);
drop(vm);
format_repro_output(
entry,
&repro_result,
is_stall,
kernel,
primary_reached_workload,
auto_repro_start,
&repro_dump_path,
)
}
#[allow(clippy::too_many_arguments)]
fn format_repro_output(
entry: &KtstrTestEntry,
repro_result: &crate::vmm::result::VmResult,
is_stall: bool,
kernel: &Path,
primary_reached_workload: bool,
auto_repro_start: Instant,
repro_dump_path: &Path,
) -> Option<String> {
write_auto_repro_sidecar_artifacts(entry, repro_result);
if verbose() {
eprintln!(
"ktstr_test: auto-repro: COM1 stderr length={} COM2 stdout length={}",
repro_result.stderr.len(),
repro_result.output.len(),
);
for line in repro_result.stderr.lines() {
eprintln!(" repro-vm-com1: {line}");
}
let mut in_probe = false;
for line in repro_result.output.lines() {
if line.contains("ktstr_test: probe:") {
in_probe = true;
}
if in_probe {
eprintln!(" repro-vm-com2: {line}");
}
}
}
let kernel_dir = crate::kernel_path::derive_kernel_dir(kernel)
.map(|dir| crate::cache::prefer_source_tree_for_dwarf(&dir).unwrap_or(dir))
.and_then(|p| p.to_str().map(String::from));
let kernel_dir_str = kernel_dir.as_deref();
let probe_payload_partial_path = super::sidecar::sidecar_dir()
.join(format!("{}.repro.probe-payload.partial.json", entry.name));
let _ = std::fs::remove_file(&probe_payload_partial_path);
let probe_section = if is_stall {
tracing::debug!(
"auto-repro: suppressing chain-to-failure for stall exit \
(no causal task — probe events are always empty after stitch)",
);
None
} else {
extract_probe_output(
&repro_result.output,
kernel_dir_str,
Some(probe_payload_partial_path.as_path()),
)
};
const REPRO_TAIL_LINES: usize = 40;
let sched_log_tail = parse_sched_output(&repro_result.output).and_then(|log| {
let collapsed = crate::verifier::collapse_cycles(log);
format_tail(&collapsed, REPRO_TAIL_LINES, "repro VM scheduler log")
});
let dump_tail = extract_sched_ext_dump(&repro_result.stderr)
.and_then(|dump| format_tail(&dump, REPRO_TAIL_LINES, "repro VM sched_ext dump"));
let dmesg_tail = Some(render_dmesg_tail(&repro_result.stderr, REPRO_TAIL_LINES));
let failure_dump_tail = render_failure_dump_file(repro_dump_path);
let tails: Vec<String> = [sched_log_tail, dump_tail, failure_dump_tail, dmesg_tail]
.into_iter()
.flatten()
.collect();
if probe_section.is_none() && tails.is_empty() {
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = "no_data",
"auto_repro: total",
);
return None;
}
let has_probe = probe_section.is_some();
let mut out = probe_section.unwrap_or_default();
if !has_probe {
let verdict = classify_repro_vm_status(
repro_result.timed_out,
repro_result.crash_message.is_some(),
repro_result.exit_code,
repro_result.guest_messages.as_ref(),
);
out.push_str(&label_repro_verdict_when_workload_not_reached(
primary_reached_workload,
&verdict,
));
}
if !out.is_empty() {
out.push('\n');
}
out.push_str(&format!(
"repro VM duration: {:.1}s",
repro_result.duration.as_secs_f64(),
));
for tail in &tails {
out.push_str("\n\n");
out.push_str(tail);
}
tracing::info!(
elapsed_ms = auto_repro_start.elapsed().as_millis() as u64,
outcome = if has_probe {
"probe_data"
} else {
"tails_only"
},
"auto_repro: total",
);
Some(out)
}
pub(crate) fn extract_probe_output(
output: &str,
kernel_dir: Option<&str>,
partial_dump_path: Option<&Path>,
) -> Option<String> {
let json = crate::probe::output::extract_section(output, PROBE_OUTPUT_START, PROBE_OUTPUT_END);
if json.is_empty() {
return None;
}
let payload = parse_probe_payload(&json, partial_dump_path)?;
let mut out = String::new();
if let Some(ref diag) = payload.diagnostics {
out.push_str(&format_probe_diagnostics(&diag.pipeline, &diag.skeleton));
}
if payload.events.is_empty() {
if out.is_empty() {
return None;
}
return Some(out);
}
out.push_str(&crate::probe::output::format_probe_events_with_bpf_locs(
&payload.events,
&payload.func_names,
kernel_dir,
&payload.bpf_source_locs,
payload.nr_cpus,
&payload.param_names,
&payload.render_hints,
));
Some(out)
}
fn parse_probe_payload(json: &str, partial_dump_path: Option<&Path>) -> Option<ProbeBytes> {
match serde_json::from_str::<ProbeBytes>(json) {
Ok(payload) => Some(payload),
Err(e) => {
let total_len = json.len();
let category = if e.is_eof() { "truncated" } else { "malformed" };
eprintln!(
"ktstr_test: probe payload {category}: {e} \
(line {}, column {}, total {total_len} bytes)",
e.line(),
e.column(),
);
if let Some(path) = partial_dump_path {
match std::fs::write(path, json) {
Ok(()) => eprintln!(
"ktstr_test: probe payload: wrote raw bytes to {}",
path.display(),
),
Err(write_err) => eprintln!(
"ktstr_test: probe payload: failed to write raw bytes to {}: {write_err}",
path.display(),
),
}
}
if !e.is_eof() {
return None;
}
let recovered = recover_partial_events(json);
if recovered.is_empty() {
return None;
}
eprintln!(
"ktstr_test: probe payload: recovered {} event(s) from truncated input",
recovered.len(),
);
Some(ProbeBytes {
events: recovered,
func_names: Vec::new(),
bpf_source_locs: Default::default(),
diagnostics: None,
nr_cpus: None,
param_names: Default::default(),
render_hints: Default::default(),
})
}
}
}
fn recover_partial_events(json: &str) -> Vec<crate::probe::process::ProbeEvent> {
let key = "\"events\":";
let Some(key_idx) = json.find(key) else {
return Vec::new();
};
let after_key = &json[key_idx + key.len()..];
let Some(open_offset) = after_key.find('[') else {
return Vec::new();
};
let mut events = Vec::new();
let mut cur = &after_key[open_offset + 1..];
loop {
cur = cur.trim_start();
if let Some(rest) = cur.strip_prefix(',') {
cur = rest.trim_start();
}
if cur.is_empty() || cur.starts_with(']') || !cur.starts_with('{') {
break;
}
let Some(end) = find_balanced_object_end(cur) else {
break;
};
let chunk = &cur[..end];
let Ok(ev) = serde_json::from_str::<crate::probe::process::ProbeEvent>(chunk) else {
break;
};
events.push(ev);
cur = &cur[end..];
}
events
}
fn find_balanced_object_end(s: &str) -> Option<usize> {
let bytes = s.as_bytes();
if bytes.first() != Some(&b'{') {
return None;
}
let mut depth: u32 = 0;
let mut in_string = false;
let mut escape = false;
for (i, &b) in bytes.iter().enumerate() {
if in_string {
if escape {
escape = false;
} else if b == b'\\' {
escape = true;
} else if b == b'"' {
in_string = false;
}
continue;
}
match b {
b'"' => in_string = true,
b'{' => depth += 1,
b'}' => {
depth -= 1;
if depth == 0 {
return Some(i + 1);
}
}
_ => {}
}
}
None
}
fn stitch_drop_cause(
skeleton: &crate::probe::process::ProbeDiagnostics,
) -> std::borrow::Cow<'static, str> {
use crate::probe::scx_defs::{EXIT_ERROR, EXIT_ERROR_BPF, EXIT_ERROR_STALL};
if skeleton.bpf_trigger_fires == 0 {
return std::borrow::Cow::Borrowed(
"trigger never fired (timing race or scheduler clean-exited; \
no error-class sched_ext_exit observed)",
);
}
match skeleton.bpf_exit_kind_snap as u64 {
EXIT_ERROR_STALL => {
"trigger fired with kind=STALL (no causal task; pre-trigger events \
suppressed because watchdog-context exit lacks a current task)"
}
EXIT_ERROR => {
"trigger fired with kind=ERROR (no current task at exit time; \
pre-trigger events suppressed because generic ERROR can fire \
from kworker context where `current` is not the causal task)"
}
EXIT_ERROR_BPF => {
"trigger fired with kind=BPF_ERROR but stitch found no matching \
task_ptr (suspected ID mismatch or func_idx_offset bug — file a ticket)"
}
other => {
return format!(
"trigger fired but exit kind {other} is unrecognized; \
pre-trigger events suppressed because no causal task \
was identified (map value via include/linux/sched/ext.h)"
)
.into();
}
}
.into()
}
pub(crate) fn format_probe_diagnostics(
pipeline: &PipelineDiagnostics,
skeleton: &crate::probe::process::ProbeDiagnostics,
) -> String {
let mut out = String::new();
out.push_str("--- probe pipeline ---\n");
out.push_str(&format!(
" extracted: {} functions from crash backtrace\n",
pipeline.stack_extracted,
));
let passed = (pipeline.stack_extracted as usize).saturating_sub(pipeline.filter_dropped.len());
if pipeline.filter_dropped.is_empty() {
out.push_str(&format!(" traceable: {passed} passed filter\n"));
} else {
out.push_str(&format!(
" traceable: {passed} passed, {} dropped: {}\n",
pipeline.filter_dropped.len(),
pipeline.filter_dropped.join(", "),
));
}
out.push_str(&format!(
" bpf_discover: {} programs found\n",
pipeline.bpf_discovered,
));
out.push_str(&format!(
" after_expand: {} total probe targets\n",
pipeline.total_after_expand,
));
if skeleton.kprobe_attach_failed.is_empty() {
out.push_str(&format!(
" kprobes: {} attached\n",
skeleton.kprobe_attached,
));
} else {
out.push_str(&format!(
" kprobes: {} attached, {} failed: {}\n",
skeleton.kprobe_attached,
skeleton.kprobe_attach_failed.len(),
skeleton
.kprobe_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
if !skeleton.kprobe_resolve_failed.is_empty() {
out.push_str(&format!(
" kprobe_miss: {} unresolved: {}\n",
skeleton.kprobe_resolve_failed.len(),
skeleton.kprobe_resolve_failed.join(", "),
));
}
if skeleton.fentry_candidates > 0 {
if skeleton.fentry_attach_failed.is_empty() {
out.push_str(&format!(
" fentry: {} attached\n",
skeleton.fentry_attached,
));
} else {
out.push_str(&format!(
" fentry: {} attached, {} failed: {}\n",
skeleton.fentry_attached,
skeleton.fentry_attach_failed.len(),
skeleton
.fentry_attach_failed
.iter()
.map(|(n, e)| format!("{n} ({e})"))
.collect::<Vec<_>>()
.join(", "),
));
}
}
let trigger_type = if skeleton.trigger_type.is_empty() {
"unknown"
} else {
&skeleton.trigger_type
};
if let Some(ref err) = skeleton.trigger_attach_error {
out.push_str(&format!(" trigger: attach failed ({err})\n"));
} else {
out.push_str(&format!(
" trigger: {} ({})\n",
if skeleton.trigger_fired {
"fired"
} else {
"not fired"
},
trigger_type,
));
}
if let Some(ref panic_msg) = skeleton.host_thread_panic {
out.push_str(&format!(
" ERROR: probe-collection thread panicked: {panic_msg}\n"
));
}
out.push_str(&format!(
" probe_data: {} keys, {} unmatched IPs\n",
skeleton.probe_data_keys, skeleton.probe_data_unmatched_ips,
));
out.push_str(&format!(
" events: {} captured, {} after stitch",
skeleton.events_before_stitch, skeleton.events_after_stitch,
));
if skeleton.events_before_stitch > 0 && skeleton.events_after_stitch == 0 {
let cause = stitch_drop_cause(skeleton);
out.push_str(" — ");
out.push_str(&cause);
} else if skeleton.stitch_fallback_used {
out.push_str(" — trigger absent, grouped by task_ptr frequency (best-effort)");
}
out.push('\n');
if skeleton.bpf_kprobe_fires > 0
|| skeleton.bpf_trigger_fires > 0
|| skeleton.bpf_meta_misses > 0
{
out.push_str(&format!(
" bpf_counts: {} kprobe fires, {} trigger fires, {} meta misses\n",
skeleton.bpf_kprobe_fires, skeleton.bpf_trigger_fires, skeleton.bpf_meta_misses,
));
if !skeleton.bpf_miss_ips.is_empty() {
let ips: Vec<String> = skeleton
.bpf_miss_ips
.iter()
.map(|ip| format!("0x{ip:x}"))
.collect();
out.push_str(&format!(" miss_ips: {}\n", ips.join(", ")));
}
}
out
}
pub(crate) fn maybe_dispatch_vm_test() -> Option<i32> {
let args: Vec<String> = std::env::args().collect();
maybe_dispatch_vm_test_with_args(&args)
}
fn build_dispatch_ctx_parts(
entry: &KtstrTestEntry,
args: &[String],
) -> (
crate::topology::TestTopology,
crate::cgroup::CgroupManager,
Option<libc::pid_t>,
crate::assert::Assert,
) {
let topo = match crate::topology::TestTopology::from_system() {
Ok(sys) => sys,
Err(e) => {
eprintln!("ktstr_test: topology from sysfs failed ({e}), using VM spec fallback");
crate::topology::TestTopology::from_vm_topology(&entry.topology)
}
};
let cgroup_root = resolve_cgroup_root(args);
let cgroups = crate::cgroup::CgroupManager::new(&cgroup_root);
let sched_pid = crate::vmm::rust_init::sched_pid();
let merged_assert = crate::assert::Assert::default_checks()
.merge(&entry.scheduler.assert)
.merge(&entry.assert);
(topo, cgroups, sched_pid, merged_assert)
}
pub(crate) fn maybe_dispatch_vm_test_with_args(args: &[String]) -> Option<i32> {
let name = match extract_test_fn_arg(args) {
Some(n) => n,
None => {
tracing::debug!("ktstr-init: no --ktstr-test-fn in args, skipping dispatch");
return None;
}
};
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let probe_stack = extract_probe_stack_arg(args);
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let pipeline = ProbePipeline::new();
let probe_stop = pipeline.stop.clone();
let probe_handle: Option<ProbeHandle> = probe_stack
.as_ref()
.and_then(|stack_input| setup_probe_handle(stack_input, &pipeline));
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.sched_pid(sched_pid)
.settle(Duration::ZERO)
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.entry_name(entry.name)
.build();
if crate::vmm::guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_start();
}
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult::fail_msg(format!("{e:#}"));
publish_result_and_collect(&r, probe_stop, probe_handle);
return Some(1);
}
};
let exit_code = exit_code_for_result(&result);
publish_result_and_collect(&result, probe_stop, probe_handle);
Some(exit_code)
}
fn setup_probe_handle(stack_input: &str, pipeline: &ProbePipeline) -> Option<ProbeHandle> {
use crate::probe::stack::load_probe_stack;
eprintln!("ktstr_test: probe: loading probe stack from --ktstr-probe-stack");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let mut functions = crate::probe::stack::filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let stack_display_names: Vec<&str> = functions
.iter()
.filter(|f| f.is_bpf)
.map(|f| f.display_name.as_str())
.collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&stack_display_names);
pipe_diag.bpf_discovered = bpf_syms.len() as u32;
if !bpf_syms.is_empty() {
eprintln!(
"ktstr_test: probe: {} BPF symbols discovered",
bpf_syms.len()
);
functions.extend(bpf_syms);
}
let functions = crate::probe::stack::expand_bpf_to_kernel_callers(functions);
pipe_diag.total_after_expand = functions.len() as u32;
if functions.is_empty() {
eprintln!("ktstr_test: no traceable functions from --ktstr-probe-stack");
return None;
}
eprintln!(
"ktstr_test: probe: {} functions loaded, spawning probe thread",
functions.len()
);
let kernel_names: Vec<&str> = functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
let mut btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let bpf_btf_args: Vec<(&str, u32)> = functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
if !bpf_btf_args.is_empty() {
btf_funcs.extend(crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args));
}
let func_names: Vec<(u32, String)> = functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&functions);
let pnames = crate::probe::output::build_param_names(&btf_funcs);
let rhints = crate::probe::output::build_render_hints(&btf_funcs);
let pnames_thread = pnames.clone();
let rhints_thread = rhints.clone();
let thread_pipeline = pipeline.clone();
let funcs = functions.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let handle = std::thread::spawn(move || {
use crate::probe::process::run_probe_skeleton;
let (events, diag, accumulated_fn_names) = run_probe_skeleton(
&funcs,
&btf_funcs,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
None,
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames_thread,
&rhints_thread,
);
thread_pipeline.output_done.set();
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
Some(ProbeHandle {
thread: handle,
func_names,
pipeline_diag: pipe_diag,
output_done: pipeline.output_done.clone(),
param_names: pnames,
render_hints: rhints,
})
}
type ProbeThreadResult = (
Option<Vec<crate::probe::process::ProbeEvent>>,
crate::probe::process::ProbeDiagnostics,
Vec<(u32, String)>,
);
struct ProbeHandle {
thread: std::thread::JoinHandle<ProbeThreadResult>,
func_names: Vec<(u32, String)>,
pipeline_diag: PipelineDiagnostics,
output_done: std::sync::Arc<crate::sync::Latch>,
param_names: std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Clone, Default)]
pub(crate) struct ProbePipeline {
pub stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
pub output_done: std::sync::Arc<crate::sync::Latch>,
pub probes_ready: std::sync::Arc<crate::sync::Latch>,
}
impl ProbePipeline {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct PipelineDiagnostics {
pub stack_extracted: u32,
pub filter_dropped: Vec<String>,
pub bpf_discovered: u32,
pub total_after_expand: u32,
}
pub(crate) struct ProbePhaseAState {
pub handle: std::thread::JoinHandle<ProbeThreadResult>,
pub phase_b_tx: std::sync::mpsc::Sender<crate::probe::process::PhaseBInput>,
pub pipeline: ProbePipeline,
pub kernel_func_names: Vec<(u32, String)>,
pub kernel_func_count: u32,
pub pipe_diag: PipelineDiagnostics,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
pub(crate) fn start_probe_phase_a(args: &[String]) -> Option<ProbePhaseAState> {
use crate::probe::stack::{filter_traceable, load_probe_stack};
let stack_input = extract_probe_stack_arg(args)?;
let phase_a_start = Instant::now();
eprintln!("ktstr_test: probe phase_a: loading kernel functions");
let mut pipe_diag = PipelineDiagnostics::default();
let raw_functions = load_probe_stack(&stack_input);
pipe_diag.stack_extracted = raw_functions.len() as u32;
let pre_filter: Vec<String> = raw_functions.iter().map(|f| f.raw_name.clone()).collect();
let functions = filter_traceable(raw_functions);
for name in &pre_filter {
if !functions.iter().any(|f| f.raw_name == *name) {
pipe_diag.filter_dropped.push(name.clone());
}
}
let kernel_functions: Vec<crate::probe::stack::StackFunction> =
functions.into_iter().filter(|f| !f.is_bpf).collect();
let kernel_names: Vec<&str> = kernel_functions
.iter()
.map(|f| f.raw_name.as_str())
.collect();
let btf_funcs = crate::probe::btf::parse_btf_functions(&kernel_names, None);
let func_names: Vec<(u32, String)> = kernel_functions
.iter()
.enumerate()
.map(|(i, f)| (i as u32, f.display_name.clone()))
.collect();
pipe_diag.total_after_expand = kernel_functions.len() as u32;
let bpf_fds = std::collections::HashMap::new(); let param_names = crate::probe::output::build_param_names(&btf_funcs);
let render_hints = crate::probe::output::build_render_hints(&btf_funcs);
let pipeline = ProbePipeline::new();
let (phase_b_tx, phase_b_rx) = std::sync::mpsc::channel();
let thread_pipeline = pipeline.clone();
let funcs = kernel_functions.clone();
let btf = btf_funcs.clone();
let fn_names = func_names.clone();
let pd = pipe_diag.clone();
let pnames = param_names.clone();
let rhints = render_hints.clone();
let handle = std::thread::spawn(move || {
let (events, diag, accumulated_fn_names) = crate::probe::process::run_probe_skeleton(
&funcs,
&btf,
&thread_pipeline.stop,
&bpf_fds,
&thread_pipeline.probes_ready,
Some(phase_b_rx),
);
let emit_fn_names = if accumulated_fn_names.is_empty() {
&fn_names
} else {
&accumulated_fn_names
};
emit_probe_payload(
events.as_deref().unwrap_or(&[]),
emit_fn_names,
&pd,
&diag,
&pnames,
&rhints,
);
thread_pipeline.output_done.set();
(events, diag, accumulated_fn_names)
});
pipeline.probes_ready.wait();
tracing::info!(
elapsed_ms = phase_a_start.elapsed().as_millis() as u64,
kernel_functions = kernel_functions.len(),
"auto_repro: phase_a_attach",
);
eprintln!(
"ktstr_test: probe phase_a: {} kernel functions attached, waiting for Phase B",
kernel_functions.len(),
);
let kernel_func_count = kernel_functions.len() as u32;
Some(ProbePhaseAState {
handle,
phase_b_tx,
pipeline,
kernel_func_names: func_names,
kernel_func_count,
pipe_diag,
param_names,
render_hints,
})
}
pub(crate) fn maybe_dispatch_vm_test_with_phase_a(
args: &[String],
pa: ProbePhaseAState,
) -> Option<i32> {
use crate::probe::btf::discover_bpf_symbols;
let name = match extract_test_fn_arg(args) {
Some(n) => n,
None => {
tracing::debug!("ktstr-init: no --ktstr-test-fn in args, skipping dispatch");
return None;
}
};
let entry = match find_test(name) {
Some(e) => e,
None => {
eprintln!("ktstr_test: unknown test function '{name}'");
return Some(1);
}
};
let work_type_override = extract_work_type_arg(args).and_then(|s| {
crate::workload::WorkType::from_name(&s).or_else(|| {
match crate::workload::WorkType::suggest(&s) {
Some(canonical) => eprintln!(
"ktstr_test: unknown work type '{s}'; did you mean \
'{canonical}'? Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
None => eprintln!(
"ktstr_test: unknown work type '{s}'. Valid types: {:?}",
crate::workload::WorkType::ALL_NAMES,
),
}
None
})
});
let ProbePhaseAState {
handle: pa_handle,
phase_b_tx: pa_phase_b_tx,
pipeline: pa_pipeline,
kernel_func_names: pa_kernel_func_names,
kernel_func_count: pa_kernel_func_count,
pipe_diag: mut pa_pipe_diag,
param_names: pa_param_names,
render_hints: pa_render_hints,
} = pa;
eprintln!("ktstr_test: probe phase_b: discovering BPF symbols");
let discover_start = Instant::now();
let stack_display_names: Vec<&str> = Vec::new(); let bpf_syms = discover_bpf_symbols(&stack_display_names);
if bpf_syms.is_empty() {
let sched_alive = crate::vmm::rust_init::sched_pid()
.is_some_and(|pid| unsafe { libc::kill(pid, 0) == 0 });
if sched_alive {
tracing::warn!(
"phase_b: bpf_discover returned 0 programs while scheduler is \
still alive — verify ProgInfoIter access permissions or BTF \
(this is the unexpected case; the auto-repro pipeline is now \
attached to no BPF struct_ops callbacks)"
);
} else {
tracing::info!(
"phase_b: bpf_discover returned 0 programs — scheduler exited \
before the discovery window (expected for fast-crash paths)"
);
}
}
pa_pipe_diag.bpf_discovered = bpf_syms.len() as u32;
tracing::info!(
elapsed_ms = discover_start.elapsed().as_millis() as u64,
bpf_syms = bpf_syms.len(),
"auto_repro: phase_b_discover",
);
eprintln!(
"ktstr_test: probe phase_b: {} BPF symbols discovered",
bpf_syms.len()
);
run_phase_b_attach(
bpf_syms,
pa_phase_b_tx,
pa_kernel_func_count,
&mut pa_pipe_diag,
);
let (topo, cgroups, sched_pid, merged_assert) = build_dispatch_ctx_parts(entry, args);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo)
.duration(entry.duration)
.sched_pid(sched_pid)
.settle(Duration::ZERO)
.work_type_override(work_type_override)
.assert(merged_assert)
.wait_for_map_write(!entry.bpf_map_write.is_empty())
.entry_name(entry.name)
.build();
let stop = pa_pipeline.stop.clone();
let handle = ProbeHandle {
thread: pa_handle,
func_names: pa_kernel_func_names,
pipeline_diag: pa_pipe_diag,
output_done: pa_pipeline.output_done,
param_names: pa_param_names,
render_hints: pa_render_hints,
};
if crate::vmm::guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_start();
}
let result = match (entry.func)(&ctx) {
Ok(r) => r,
Err(e) => {
let r = AssertResult::fail_msg(format!("{e:#}"));
publish_result_and_collect(&r, stop, Some(handle));
return Some(1);
}
};
let exit_code = exit_code_for_result(&result);
publish_result_and_collect(&result, stop, Some(handle));
Some(exit_code)
}
fn run_phase_b_attach(
bpf_syms: Vec<crate::probe::stack::StackFunction>,
phase_b_tx: std::sync::mpsc::Sender<crate::probe::process::PhaseBInput>,
kernel_func_count: u32,
pipe_diag: &mut PipelineDiagnostics,
) {
use crate::probe::stack::expand_bpf_to_kernel_callers;
if !bpf_syms.is_empty() {
let phase_b_functions = expand_bpf_to_kernel_callers(bpf_syms);
pipe_diag.total_after_expand = pipe_diag
.total_after_expand
.saturating_add(phase_b_functions.len() as u32);
let bpf_fds = crate::probe::process::open_bpf_prog_fds(&phase_b_functions);
let bpf_btf_args: Vec<(&str, u32)> = phase_b_functions
.iter()
.filter(|f| f.is_bpf)
.filter_map(|f| Some((f.display_name.as_str(), f.bpf_prog_id?)))
.collect();
let mut phase_b_btf = if !bpf_btf_args.is_empty() {
crate::probe::btf::parse_bpf_btf_functions(&bpf_btf_args)
} else {
Vec::new()
};
let kernel_caller_names: Vec<&str> = phase_b_functions
.iter()
.filter(|f| !f.is_bpf)
.map(|f| f.raw_name.as_str())
.collect();
if !kernel_caller_names.is_empty() {
phase_b_btf.extend(crate::probe::btf::parse_btf_functions(
&kernel_caller_names,
None,
));
}
let phase_b_done = std::sync::Arc::new(crate::sync::Latch::new());
let phase_b_done_clone = phase_b_done.clone();
let n_phase_b_functions = phase_b_functions.len();
let phase_b_input = crate::probe::process::PhaseBInput {
functions: phase_b_functions,
bpf_prog_fds: bpf_fds,
btf_funcs: phase_b_btf,
done: phase_b_done_clone,
func_idx_offset: kernel_func_count,
};
let attach_start = Instant::now();
if let Err(e) = phase_b_tx.send(phase_b_input) {
eprintln!("ktstr_test: probe phase_b: failed to send: {e}");
} else {
phase_b_done.wait();
tracing::info!(
elapsed_ms = attach_start.elapsed().as_millis() as u64,
phase_b_functions = n_phase_b_functions,
"auto_repro: phase_b_attach",
);
eprintln!("ktstr_test: probe phase_b: BPF fentry attached");
}
} else {
eprintln!("ktstr_test: probe phase_b: no BPF symbols, skipping fentry");
drop(phase_b_tx);
}
}
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytes {
pub events: Vec<crate::probe::process::ProbeEvent>,
pub func_names: Vec<(u32, String)>,
pub bpf_source_locs: std::collections::HashMap<String, String>,
pub diagnostics: Option<ProbeBytesDiagnostics>,
pub nr_cpus: Option<u32>,
pub param_names: std::collections::HashMap<String, Vec<(String, String)>>,
pub render_hints: std::collections::HashMap<String, crate::probe::btf::RenderHint>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub(crate) struct ProbeBytesDiagnostics {
pub pipeline: PipelineDiagnostics,
pub skeleton: crate::probe::process::ProbeDiagnostics,
}
fn emit_probe_payload(
events: &[crate::probe::process::ProbeEvent],
func_names: &[(u32, String)],
pipeline_diag: &PipelineDiagnostics,
skeleton_diag: &crate::probe::process::ProbeDiagnostics,
param_names: &std::collections::HashMap<String, Vec<(String, String)>>,
render_hints: &std::collections::HashMap<String, crate::probe::btf::RenderHint>,
) {
let bpf_source_locs = if events.is_empty() {
std::collections::HashMap::new()
} else {
let source_loc_names: Vec<&str> =
func_names.iter().map(|(_, name)| name.as_str()).collect();
let bpf_syms = crate::probe::btf::discover_bpf_symbols(&source_loc_names);
let bpf_prog_ids: Vec<u32> = func_names
.iter()
.filter_map(|(_, name)| {
bpf_syms
.iter()
.find(|s| s.display_name == *name)
.and_then(|s| s.bpf_prog_id)
})
.collect();
crate::probe::btf::resolve_bpf_source_locs(&bpf_prog_ids)
};
let payload = ProbeBytes {
events: events.to_vec(),
func_names: func_names.to_vec(),
bpf_source_locs,
diagnostics: Some(ProbeBytesDiagnostics {
pipeline: pipeline_diag.clone(),
skeleton: skeleton_diag.clone(),
}),
nr_cpus: crate::probe::output::get_nr_cpus(),
param_names: param_names.clone(),
render_hints: render_hints.clone(),
};
println!("{PROBE_OUTPUT_START}");
if let Ok(json) = serde_json::to_string(&payload) {
println!("{json}");
}
println!("{PROBE_OUTPUT_END}");
}
struct DeferredProbe {
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
}
static DEFERRED_PROBE_COLLECT: std::sync::Mutex<Option<DeferredProbe>> =
std::sync::Mutex::new(None);
fn stash_deferred_probe(
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
let mut guard = DEFERRED_PROBE_COLLECT.lock().unwrap();
*guard = Some(DeferredProbe { stop, handle });
}
fn take_deferred_probe() -> Option<DeferredProbe> {
DEFERRED_PROBE_COLLECT.lock().unwrap().take()
}
fn wait_for_sched_disabled(timeout: std::time::Duration) -> bool {
wait_for_sched_disabled_at("/sys/kernel/sched_ext/state", timeout)
}
fn wait_for_sched_disabled_at(path: &str, timeout: std::time::Duration) -> bool {
let deadline = std::time::Instant::now() + timeout;
loop {
if let Ok(s) = std::fs::read_to_string(path) {
if s.trim() == "disabled" {
return true;
}
} else {
return false;
}
if std::time::Instant::now() >= deadline {
return false;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
pub(crate) fn finalize_probe_after_unwind() {
let Some(deferred) = take_deferred_probe() else {
return;
};
if deferred.handle.is_some() {
let _ = wait_for_sched_disabled(std::time::Duration::from_secs(5));
}
collect_and_print_probe_data(deferred.stop, deferred.handle);
}
fn exit_code_for_result(result: &AssertResult) -> i32 {
if result.is_pass() {
0
} else if result.is_inconclusive() {
2
} else {
1
}
}
fn publish_result_and_collect(
result: &AssertResult,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
try_flush_profraw();
print_assert_result(result);
if crate::vmm::guest_comms::is_guest() {
stash_deferred_probe(stop, handle);
} else {
collect_and_print_probe_data(stop, handle);
}
}
fn collect_and_print_probe_data(
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
handle: Option<ProbeHandle>,
) {
let Some(ph) = handle else {
return;
};
stop.store(true, std::sync::atomic::Ordering::Release);
let (events, skeleton_diag, accumulated_fn_names) = match ph.thread.join() {
Ok((Some(events), diag, fnames)) => (events, diag, fnames),
Ok((None, diag, fnames)) => (Vec::new(), diag, fnames),
Err(payload) => {
let msg = if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic>".to_string()
};
let diag = crate::probe::process::ProbeDiagnostics {
host_thread_panic: Some(msg),
..Default::default()
};
(Vec::new(), diag, Vec::new())
}
};
let effective_fn_names = if accumulated_fn_names.is_empty() {
&ph.func_names
} else {
&accumulated_fn_names
};
if !ph.output_done.is_set() {
emit_probe_payload(
&events,
effective_fn_names,
&ph.pipeline_diag,
&skeleton_diag,
&ph.param_names,
&ph.render_hints,
);
}
}
#[cfg(test)]
#[path = "probe_tests.rs"]
mod tests;