use std::borrow::Cow;
use std::path::PathBuf;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::ThreadId;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use crate::assert::{AssertDetail, AssertResult, DetailKind};
use crate::scenario::Ctx;
use crate::test_support::{
Metric, MetricCheck, OutputFormat, Payload, PayloadKind, PayloadMetrics, extract_metrics,
};
static PAYLOAD_INVOCATION_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn next_payload_index() -> usize {
PAYLOAD_INVOCATION_COUNTER.fetch_add(1, Ordering::Relaxed)
}
pub struct PayloadRun<'a> {
ctx: &'a Ctx<'a>,
payload: &'static Payload,
args: Vec<String>,
checks: Vec<MetricCheck>,
cgroup: Option<Cow<'static, str>>,
timeout: Option<Duration>,
}
impl std::fmt::Debug for PayloadRun<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PayloadRun")
.field("payload", &self.payload.name)
.field("args_len", &self.args.len())
.field("checks_len", &self.checks.len())
.field("cgroup", &self.cgroup)
.field("timeout", &self.timeout)
.finish()
}
}
impl<'a> PayloadRun<'a> {
pub(crate) fn new(ctx: &'a Ctx<'a>, payload: &'static Payload) -> Self {
let args = payload.default_args.iter().map(|s| s.to_string()).collect();
let checks = payload.default_checks.to_vec();
Self {
ctx,
payload,
args,
checks,
cgroup: None,
timeout: None,
}
}
#[must_use = "builder methods consume self; bind the result"]
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.args.push(arg.into());
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.args.extend(args.into_iter().map(Into::into));
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn clear_args(mut self) -> Self {
self.args.clear();
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn check(mut self, c: MetricCheck) -> Self {
self.checks.push(c);
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn clear_checks(mut self) -> Self {
self.checks.clear();
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn in_cgroup(mut self, name: impl Into<Cow<'static, str>>) -> Self {
self.cgroup = Some(name.into());
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn timeout(mut self, duration: Duration) -> Self {
self.timeout = Some(duration);
self
}
pub fn run(self) -> Result<(AssertResult, PayloadMetrics)> {
let binary = payload_binary(self.payload)?;
let cgroup_path = resolve_cgroup_path(self.ctx, self.cgroup.as_deref())?;
let output = spawn_and_wait(
binary,
&self.args,
cgroup_path.as_deref(),
self.timeout,
self.payload.uses_parent_pgrp,
)
.with_context(|| format!("spawn payload '{}'", self.payload.name))?;
Ok(evaluate(self.payload, &self.checks, output))
}
pub fn spawn(self) -> Result<PayloadHandle> {
let binary = payload_binary(self.payload)?;
let cgroup_path = resolve_cgroup_path(self.ctx, self.cgroup.as_deref())?;
let (child, sigchld) = spawn_child(
binary,
&self.args,
cgroup_path.as_deref(),
self.payload.uses_parent_pgrp,
)
.with_context(|| format!("spawn payload '{}'", self.payload.name))?;
Ok(PayloadHandle {
child: Some(child),
payload: self.payload,
checks: self.checks,
_sigchld: sigchld,
})
}
}
fn payload_binary(payload: &Payload) -> Result<&'static str> {
match payload.kind {
PayloadKind::Binary(name) => Ok(name),
PayloadKind::Scheduler(_) => anyhow::bail!(
"ctx.payload({}) called on a scheduler-kind payload; \
schedulers are launched by the test framework, not from \
the test body. Use ctx.payload(&BINARY_PAYLOAD) instead.",
payload.name,
),
}
}
fn evaluate(
payload: &Payload,
checks: &[MetricCheck],
output: SpawnOutput,
) -> (AssertResult, PayloadMetrics) {
if let OutputFormat::LlmExtract(hint) = payload.output {
return evaluate_llm_extract_deferred(
output,
hint,
payload.metrics,
payload.metric_bounds,
checks,
);
}
let stdout_result = extract_metrics(
&output.stdout,
&payload.output,
crate::test_support::MetricStream::Stdout,
);
let mut metrics = stdout_result.unwrap_or_default();
if metrics.is_empty() && !output.stderr.is_empty() {
let stderr_result = extract_metrics(
&output.stderr,
&payload.output,
crate::test_support::MetricStream::Stderr,
);
if let Ok(m) = stderr_result {
metrics = m;
}
}
resolve_polarities(&mut metrics, payload);
let payload_metrics = PayloadMetrics {
payload_index: next_payload_index(),
metrics,
exit_code: output.exit_code,
};
emit_payload_metrics(&payload_metrics);
let result = evaluate_checks(checks, &payload_metrics, &output.stderr);
(result, payload_metrics)
}
fn emit_payload_metrics(pm: &PayloadMetrics) {
crate::vmm::guest_comms::send_payload_metrics(pm);
}
fn emit_raw_payload_output(raw: &crate::test_support::RawPayloadOutput) {
crate::vmm::guest_comms::send_raw_payload_output(raw);
}
fn evaluate_llm_extract_deferred(
output: SpawnOutput,
hint: Option<&'static str>,
metric_hints: &'static [crate::test_support::MetricHint],
metric_bounds: Option<&'static crate::test_support::MetricBounds>,
checks: &[MetricCheck],
) -> (AssertResult, PayloadMetrics) {
let bad: Vec<String> = checks
.iter()
.filter(|c| !matches!(c, MetricCheck::ExitCodeEq(_)))
.map(|c| match c {
MetricCheck::Min { metric, value } => {
format!("Min {{ metric: {metric:?}, value: {value} }}")
}
MetricCheck::Max { metric, value } => {
format!("Max {{ metric: {metric:?}, value: {value} }}")
}
MetricCheck::Range { metric, lo, hi } => {
format!("Range {{ metric: {metric:?}, lo: {lo}, hi: {hi} }}")
}
MetricCheck::Exists(metric) => format!("Exists({metric:?})"),
MetricCheck::ExitCodeEq(_) => unreachable!(
"ExitCodeEq is filtered out of the bad list above; \
this arm is exhaustive-match coverage for the variant"
),
})
.collect();
assert!(
bad.is_empty(),
"metric-level .check() on LlmExtract payloads cannot be evaluated guest-side; \
declare these as default_checks on the Payload instead. Forbidden: [{}]",
bad.join(", "),
);
let exit_code = output.exit_code;
let payload_index = next_payload_index();
let raw = crate::test_support::RawPayloadOutput {
payload_index,
stdout: output.stdout,
stderr: output.stderr,
hint: hint.map(str::to_string),
metric_hints: metric_hints
.iter()
.map(crate::test_support::WireMetricHint::from)
.collect(),
metric_bounds: metric_bounds.copied(),
};
emit_raw_payload_output(&raw);
let payload_metrics = PayloadMetrics {
payload_index,
metrics: Vec::new(),
exit_code,
};
emit_payload_metrics(&payload_metrics);
let mut result = AssertResult::pass();
if let Some(detail) = exit_code_mismatch_detail(checks, exit_code, &raw.stderr) {
result.merge(AssertResult::fail(detail));
}
(result, payload_metrics)
}
#[must_use = "dropping a PayloadHandle SIGKILLs the child's process group; call .wait() or .kill() explicitly"]
pub struct PayloadHandle {
child: Option<std::process::Child>,
payload: &'static Payload,
checks: Vec<MetricCheck>,
_sigchld: SigchldScope,
}
impl std::fmt::Debug for PayloadHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PayloadHandle")
.field("payload", &self.payload.name)
.field("child_alive", &self.child.is_some())
.field("checks_len", &self.checks.len())
.finish()
}
}
impl PayloadHandle {
pub fn payload_name(&self) -> &'static str {
self.payload.name
}
pub fn pid(&self) -> Option<u32> {
self.child.as_ref().map(|c| c.id())
}
pub fn wait(mut self) -> Result<(AssertResult, PayloadMetrics)> {
let mut child = self
.child
.take()
.ok_or_else(|| already_consumed(self.payload))?;
if let Err(e) = child.wait() {
kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
let _ = child.wait();
return Err(e).with_context(|| format!("wait payload '{}'", self.payload.name));
}
kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
match wait_and_capture(&mut child) {
Ok(output) => Ok(evaluate(self.payload, &self.checks, output)),
Err(e) => {
let _ = child.wait();
Err(e).with_context(|| format!("wait payload '{}'", self.payload.name))
}
}
}
pub fn kill(mut self) -> Result<(AssertResult, PayloadMetrics)> {
let mut child = self
.child
.take()
.ok_or_else(|| already_consumed(self.payload))?;
kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
match wait_and_capture(&mut child) {
Ok(output) => Ok(evaluate(self.payload, &self.checks, output)),
Err(e) => {
let _ = child.wait();
Err(e).with_context(|| format!("reap killed payload '{}'", self.payload.name))
}
}
}
pub fn try_wait(&mut self) -> Result<Option<(AssertResult, PayloadMetrics)>> {
let child = self
.child
.as_mut()
.ok_or_else(|| already_consumed(self.payload))?;
match child.try_wait()? {
Some(_status) => {
let mut child = self
.child
.take()
.expect("child still present on terminal branch");
kill_payload_process_group(
&child,
self.payload.name,
self.payload.uses_parent_pgrp,
);
match wait_and_capture(&mut child) {
Ok(output) => Ok(Some(evaluate(self.payload, &self.checks, output))),
Err(e) => {
let _ = child.wait();
Err(e).with_context(|| format!("reap payload '{}'", self.payload.name))
}
}
}
None => Ok(None),
}
}
}
fn already_consumed(payload: &'static Payload) -> anyhow::Error {
anyhow!(
"PayloadHandle for '{}' already consumed by a prior \
wait/kill/try_wait call; each handle can only produce \
one (AssertResult, PayloadMetrics) pair",
payload.name,
)
}
impl Drop for PayloadHandle {
fn drop(&mut self) {
if let Some(mut child) = self.child.take() {
kill_payload_process_group(&child, self.payload.name, self.payload.uses_parent_pgrp);
let _ = child.wait();
eprintln!(
"ktstr: PayloadHandle for '{}' dropped without wait/kill — \
process group SIGKILLed, metrics not recorded.",
self.payload.name,
);
}
}
}
fn kill_payload_process_group(
child: &std::process::Child,
payload_name: &str,
uses_parent_pgrp: bool,
) {
let raw_pid = child.id();
let pgid = match libc::pid_t::try_from(raw_pid) {
Ok(p) if p > 0 => p,
Ok(p) => {
tracing::error!(
payload = payload_name,
pid = p,
"child pid is non-positive — cannot kill process group; \
skipping kill (kernel's pid_max invariant violated, \
no safe target)"
);
return;
}
Err(_) => {
tracing::error!(
payload = payload_name,
pid = raw_pid,
"child pid exceeds pid_t range — cannot kill process group; \
skipping kill (Linux pid_max is 2^22 so this is only \
reachable on a non-Linux target or a kernel with an \
extended pid space)"
);
return;
}
};
let pid = nix::unistd::Pid::from_raw(pgid);
if !uses_parent_pgrp {
match nix::sys::signal::killpg(pid, nix::sys::signal::Signal::SIGKILL) {
Ok(()) => {}
Err(nix::errno::Errno::ESRCH) => {
tracing::debug!(
payload = payload_name,
pgid,
"ESRCH — payload process group already reaped",
);
}
Err(e) => {
tracing::warn!(payload = payload_name, pgid, %e, "killpg failed for payload process group");
}
}
}
match nix::sys::signal::kill(pid, nix::sys::signal::Signal::SIGKILL) {
Ok(()) => {}
Err(nix::errno::Errno::ESRCH) => {
tracing::debug!(
payload = payload_name,
pid = pgid,
"ESRCH — payload leader already reaped",
);
}
Err(e) => {
tracing::warn!(payload = payload_name, pid = pgid, %e, "direct kill failed for payload leader");
}
}
}
fn resolve_polarities(metrics: &mut [Metric], payload: &Payload) {
if payload.metrics.is_empty() || metrics.is_empty() {
return;
}
let hints: std::collections::HashMap<&str, &crate::test_support::MetricHint> =
payload.metrics.iter().map(|h| (h.name, h)).collect();
for metric in metrics {
if let Some(hint) = hints.get(metric.name.as_str()) {
metric.polarity = hint.polarity;
metric.unit = hint.unit.to_string();
}
}
}
pub(crate) fn resolve_polarities_owned(
metrics: &mut [Metric],
hints: &[crate::test_support::WireMetricHint],
) {
if hints.is_empty() || metrics.is_empty() {
return;
}
let table: std::collections::HashMap<&str, &crate::test_support::WireMetricHint> =
hints.iter().map(|h| (h.name.as_str(), h)).collect();
for metric in metrics {
if let Some(hint) = table.get(metric.name.as_str()) {
metric.polarity = hint.polarity;
metric.unit = hint.unit.clone();
}
}
}
fn evaluate_checks(checks: &[MetricCheck], pm: &PayloadMetrics, stderr: &str) -> AssertResult {
let mut result = AssertResult::pass();
if let Some(detail) = exit_code_mismatch_detail(checks, pm.exit_code, stderr) {
result.merge(AssertResult::fail(detail));
return result;
}
for check in checks {
let detail = match check {
MetricCheck::Min { metric, value } => pm.get(metric).map_or_else(
|| Some(missing_metric(metric)),
|actual| {
if actual.is_nan() {
Some(nan_metric(metric))
} else if actual < *value {
Some(AssertDetail {
kind: DetailKind::Other,
message: format!("metric '{metric}' = {actual} below minimum {value}"),
})
} else {
None
}
},
),
MetricCheck::Max { metric, value } => pm.get(metric).map_or_else(
|| Some(missing_metric(metric)),
|actual| {
if actual.is_nan() {
Some(nan_metric(metric))
} else if actual > *value {
Some(AssertDetail {
kind: DetailKind::Other,
message: format!(
"metric '{metric}' = {actual} exceeds maximum {value}"
),
})
} else {
None
}
},
),
MetricCheck::Range { metric, lo, hi } => pm.get(metric).map_or_else(
|| Some(missing_metric(metric)),
|actual| {
if actual.is_nan() {
Some(nan_metric(metric))
} else if actual < *lo || actual > *hi {
Some(AssertDetail {
kind: DetailKind::Other,
message: format!("metric '{metric}' = {actual} outside [{lo}, {hi}]"),
})
} else {
None
}
},
),
MetricCheck::Exists(metric) => pm.get(metric).is_none().then(|| missing_metric(metric)),
MetricCheck::ExitCodeEq(_) => None, };
if let Some(d) = detail {
result.merge(AssertResult::fail(d));
}
}
result
}
fn nan_metric(metric: &str) -> AssertDetail {
AssertDetail {
kind: DetailKind::Other,
message: format!("metric '{metric}' value is NaN"),
}
}
fn missing_metric(metric: &str) -> AssertDetail {
AssertDetail {
kind: DetailKind::Other,
message: format!("metric '{metric}' not found in payload output"),
}
}
fn exit_code_mismatch_detail(
checks: &[MetricCheck],
actual_exit_code: i32,
stderr: &str,
) -> Option<AssertDetail> {
checks.iter().find_map(|c| match c {
MetricCheck::ExitCodeEq(expected) if actual_exit_code != *expected => Some(AssertDetail {
kind: DetailKind::Other,
message: format_exit_mismatch(actual_exit_code, *expected, stderr),
}),
_ => None,
})
}
const STDERR_TAIL_BYTES: usize = 1024;
fn format_exit_mismatch(actual: i32, expected: i32, stderr: &str) -> String {
let trimmed = stderr.trim_end();
if trimmed.is_empty() {
return format!("payload exited with code {actual}, expected {expected}");
}
let tail = stderr_tail(trimmed, STDERR_TAIL_BYTES);
format!("payload exited with code {actual}, expected {expected}; stderr:\n{tail}")
}
fn stderr_tail(s: &str, max_bytes: usize) -> String {
if s.len() <= max_bytes {
return s.to_string();
}
let mut start = s.len() - max_bytes;
while start < s.len() && !s.is_char_boundary(start) {
start += 1;
}
format!("...{}", &s[start..])
}
struct SpawnOutput {
stdout: String,
stderr: String,
exit_code: i32,
}
fn resolve_cgroup_path(ctx: &Ctx<'_>, name: Option<&str>) -> Result<Option<PathBuf>> {
let Some(name) = name else {
return Ok(None);
};
if name.as_bytes().contains(&0) {
return Err(anyhow!("cgroup name '{name}' contains a NUL byte"));
}
let trimmed = name.trim_start_matches('/');
if trimmed.is_empty() {
return Err(anyhow!(
"cgroup name '{name}' is empty or resolves to the parent cgroup"
));
}
let relative = std::path::Path::new(trimmed);
if relative
.components()
.any(|c| matches!(c, std::path::Component::ParentDir))
{
return Err(anyhow!(
"cgroup name '{name}' contains '..'; paths must stay within the \
test's cgroup parent"
));
}
Ok(Some(ctx.cgroups.parent_path().join(relative)))
}
fn build_command(
binary: &str,
args: &[String],
cgroup_path: Option<&std::path::Path>,
uses_parent_pgrp: bool,
) -> Result<(std::process::Command, Option<CgroupSyncHandles>)> {
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
let mut cmd = Command::new(binary);
cmd.args(args).stdout(Stdio::piped()).stderr(Stdio::piped());
if !uses_parent_pgrp {
cmd.process_group(0);
}
if let Some(cgroup_path) = cgroup_path {
let notify = PipePair::new().context("allocate cgroup sync pid-notify pipe")?;
let release = PipePair::new().context("allocate cgroup sync release pipe")?;
let notify_read_fd = notify.r_fd();
let notify_write_fd = notify.w_fd();
let release_read_fd = release.r_fd();
let release_write_fd = release.w_fd();
unsafe {
cmd.pre_exec(move || {
cgroup_sync_pre_exec(
notify_read_fd,
notify_write_fd,
release_read_fd,
release_write_fd,
)
});
}
let handles = CgroupSyncHandles {
notify,
release,
cgroup_procs_path: cgroup_path.join("cgroup.procs"),
};
return Ok((cmd, Some(handles)));
}
Ok((cmd, None))
}
struct PipePair {
read_fd: std::os::fd::OwnedFd,
write_fd: std::os::fd::OwnedFd,
}
impl PipePair {
fn new() -> std::io::Result<Self> {
use std::os::fd::FromRawFd;
let mut fds = [0i32; 2];
let rc = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
if rc != 0 {
return Err(std::io::Error::last_os_error());
}
let read_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(fds[0]) };
let write_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(fds[1]) };
Ok(Self { read_fd, write_fd })
}
fn r_fd(&self) -> i32 {
use std::os::fd::AsRawFd;
self.read_fd.as_raw_fd()
}
fn w_fd(&self) -> i32 {
use std::os::fd::AsRawFd;
self.write_fd.as_raw_fd()
}
}
struct CgroupSyncHandles {
notify: PipePair,
release: PipePair,
cgroup_procs_path: PathBuf,
}
fn cgroup_sync_pre_exec(
notify_read_fd: libc::c_int,
notify_write_fd: libc::c_int,
release_read_fd: libc::c_int,
release_write_fd: libc::c_int,
) -> std::io::Result<()> {
unsafe {
libc::close(notify_read_fd);
libc::close(release_write_fd);
}
let pid = unsafe { libc::getpid() };
let pid_bytes = pid.to_le_bytes();
let mut written = 0usize;
while written < pid_bytes.len() {
let n = unsafe {
libc::write(
notify_write_fd,
pid_bytes.as_ptr().add(written) as *const libc::c_void,
pid_bytes.len() - written,
)
};
if n < 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
return Err(err);
}
if n == 0 {
return Err(std::io::Error::from_raw_os_error(libc::EIO));
}
written += n as usize;
}
unsafe {
libc::close(notify_write_fd);
}
let mut buf = [0u8; 1];
let mut read_total = 0usize;
while read_total < buf.len() {
let n = unsafe {
libc::read(
release_read_fd,
buf.as_mut_ptr().add(read_total) as *mut libc::c_void,
buf.len() - read_total,
)
};
if n < 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
return Err(err);
}
if n == 0 {
return Err(std::io::Error::from_raw_os_error(libc::EPIPE));
}
read_total += n as usize;
}
unsafe {
libc::close(release_read_fd);
}
Ok(())
}
fn spawn_with_cgroup_sync(handles: CgroupSyncHandles) -> Result<libc::pid_t> {
use std::io::{Read, Write};
let CgroupSyncHandles {
notify,
release,
cgroup_procs_path,
} = handles;
let PipePair {
read_fd: notify_r,
write_fd: notify_w,
} = notify;
{
let pfd_fd = std::os::fd::AsRawFd::as_raw_fd(¬ify_r);
let mut pfd = libc::pollfd {
fd: pfd_fd,
events: libc::POLLIN,
revents: 0,
};
let poll_ms: libc::c_int = 5_000;
let ready = unsafe { libc::poll(&mut pfd, 1, poll_ms) };
if ready < 0 {
let e = std::io::Error::last_os_error();
if e.raw_os_error() != Some(libc::EINTR) {
return Err(
anyhow::Error::new(e).context("poll(notify_r) for cgroup-sync pid-notify")
);
}
} else if ready == 0 {
anyhow::bail!(
"cgroup-sync notify pipe: no pid written by child within 5s. \
The child's pre_exec likely failed before Step 1 (possibly \
EBADF on `notify_write_fd` because the fd number was \
recycled by stdlib's internal pipe2). Check the spawn \
thread's error for the underlying cause."
);
}
}
let mut notify_file = std::fs::File::from(notify_r);
let mut pid_bytes = [0u8; 4];
notify_file
.read_exact(&mut pid_bytes)
.context("read child pid from cgroup-sync notify pipe")?;
drop(notify_file);
drop(notify_w);
let child_pid = libc::pid_t::from_le_bytes(pid_bytes);
anyhow::ensure!(
child_pid > 0,
"cgroup-sync notify pipe returned non-positive pid {child_pid}; \
the child's pre_exec hook sent a corrupted pid — fail the \
handshake rather than write a bad value to cgroup.procs"
);
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(&cgroup_procs_path)
.with_context(|| {
format!(
"open cgroup.procs at {} for cgroup-sync placement",
cgroup_procs_path.display(),
)
})?;
let line = format!("{child_pid}\n");
f.write_all(line.as_bytes())
.with_context(|| format!("write pid {child_pid} to {}", cgroup_procs_path.display(),))?;
drop(f);
let PipePair {
read_fd: release_r,
write_fd: release_w,
} = release;
drop(release_r);
let mut release_file = std::fs::File::from(release_w);
release_file
.write_all(&[1u8])
.context("write release byte to cgroup-sync release pipe")?;
drop(release_file);
Ok(child_pid)
}
fn drive_cgroup_handshake(
cmd: std::process::Command,
handles: CgroupSyncHandles,
binary: &str,
) -> Result<std::process::Child> {
let binary_owned = binary.to_string();
let spawn_thread = std::thread::spawn(move || -> Result<std::process::Child> {
let mut cmd = cmd;
cmd.spawn()
.map_err(|e| spawn_error_context(e, &binary_owned))
});
let sync_result = spawn_with_cgroup_sync(handles);
let spawn_result = spawn_thread
.join()
.map_err(|_| anyhow!("cgroup-sync spawn thread panicked"))?;
sync_result?;
spawn_result
}
fn spawn_error_context(err: std::io::Error, binary: &str) -> anyhow::Error {
if err.kind() == std::io::ErrorKind::NotFound {
anyhow::Error::new(err).context(format!(
"spawn '{binary}': binary not found on guest PATH. \
Remediation: for CLI invocations (ktstr / cargo-ktstr \
shell, run, …), package the binary with `-i {binary}` \
/ `--include-files {binary}` so it lands on the guest \
PATH under `/include-files/`. For `#[ktstr_test]` \
entries, pre-install the binary in the base initramfs \
— the macro surface does not expose `-i`. If `{binary}` \
is a script, execve(2) ALSO returns ENOENT when the \
`#!` shebang names an interpreter missing from the \
guest (the error names the script but the missing \
file is the interpreter); package the interpreter \
the same way — `-i <interpreter>` for CLI, pre-install \
for `#[ktstr_test]`."
))
} else {
anyhow::Error::new(err).context(format!("spawn '{binary}'"))
}
}
static SIGCHLD_SCOPE_OWNER_THREAD: OnceLock<ThreadId> = OnceLock::new();
struct SigchldScope {
prev: libc::sighandler_t,
_not_send: std::marker::PhantomData<*const ()>,
}
impl SigchldScope {
fn new() -> Self {
let tid = std::thread::current().id();
let pinned = SIGCHLD_SCOPE_OWNER_THREAD.get_or_init(|| tid);
if *pinned != tid {
panic!(
"SigchldScope constructed on a different thread than the first \
owner (pinned thread id={pinned:?}, this thread's id={tid:?}). \
libc::signal is not thread-safe; cross-thread installs race on \
the process-wide SIGCHLD disposition."
);
}
let prev = unsafe { libc::signal(libc::SIGCHLD, libc::SIG_DFL) };
SigchldScope {
prev,
_not_send: std::marker::PhantomData,
}
}
}
impl Drop for SigchldScope {
fn drop(&mut self) {
let pinned = SIGCHLD_SCOPE_OWNER_THREAD
.get()
.expect("SIGCHLD_SCOPE_OWNER_THREAD must be initialized — set by SigchldScope::new");
assert_eq!(
*pinned,
std::thread::current().id(),
"SigchldScope dropped on a different thread than the pinned owner \
(pinned={pinned:?}, this thread={:?}). libc::signal is not \
thread-safe and the construct-side `!Send` marker should have \
made this impossible at compile time — investigate any \
`unsafe impl Send for SigchldScope` that bypassed it.",
std::thread::current().id(),
);
unsafe {
libc::signal(libc::SIGCHLD, self.prev);
}
}
}
const _: fn() = || {
trait AmbiguousIfImpl<A> {
fn some_item() {}
}
impl<T: ?Sized> AmbiguousIfImpl<()> for T {}
#[allow(dead_code)]
struct InvalidSend;
impl<T: ?Sized + Send> AmbiguousIfImpl<InvalidSend> for T {}
#[allow(dead_code)]
struct InvalidSync;
impl<T: ?Sized + Sync> AmbiguousIfImpl<InvalidSync> for T {}
let _ = <SigchldScope as AmbiguousIfImpl<_>>::some_item;
};
fn spawn_and_wait(
binary: &str,
args: &[String],
cgroup_path: Option<&std::path::Path>,
timeout: Option<Duration>,
uses_parent_pgrp: bool,
) -> Result<SpawnOutput> {
let _sigchld = SigchldScope::new();
let (cmd, sync_handles) = build_command(binary, args, cgroup_path, uses_parent_pgrp)?;
let mut child = match sync_handles {
Some(handles) => drive_cgroup_handshake(cmd, handles, binary)?,
None => {
let mut cmd = cmd;
cmd.spawn().map_err(|e| spawn_error_context(e, binary))?
}
};
match timeout {
Some(deadline) => wait_with_deadline(&mut child, deadline, binary, uses_parent_pgrp),
None => match wait_and_capture(&mut child) {
Ok(out) => Ok(out),
Err(e) => {
kill_payload_process_group(&child, binary, uses_parent_pgrp);
let _ = child.wait();
Err(e)
}
},
}
}
fn wait_with_deadline(
child: &mut std::process::Child,
timeout: Duration,
payload_name: &str,
uses_parent_pgrp: bool,
) -> Result<SpawnOutput> {
use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
use std::os::fd::{AsFd, FromRawFd, OwnedFd};
let deadline = std::time::Instant::now() + timeout;
let pid =
libc::pid_t::try_from(child.id()).expect("child pid fits in pid_t (Linux pid_max <= 2^22)");
let pidfd_raw = unsafe { libc::syscall(libc::SYS_pidfd_open, pid, 0i32) };
if pidfd_raw < 0 {
return Err(std::io::Error::last_os_error()).with_context(|| format!("pidfd_open({pid})"));
}
let pidfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(pidfd_raw as i32) };
let epoll = Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC)
.with_context(|| "epoll_create1 for pidfd wait")?;
let event = EpollEvent::new(EpollFlags::EPOLLIN, 0);
epoll
.add(pidfd.as_fd(), event)
.with_context(|| "epoll_ctl ADD pidfd")?;
let mut events = [EpollEvent::empty()];
loop {
if child
.try_wait()
.with_context(|| "try_wait child")?
.is_some()
{
return match wait_and_capture(child) {
Ok(out) => Ok(out),
Err(e) => {
kill_payload_process_group(child, payload_name, uses_parent_pgrp);
let _ = child.wait();
Err(e)
}
};
}
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
kill_payload_process_group(child, payload_name, uses_parent_pgrp);
return match wait_and_capture(child) {
Ok(out) => Ok(out),
Err(e) => {
let _ = child.wait();
Err(e).with_context(|| format!("drain after timeout of {timeout:?}"))
}
};
}
let ms_u32 = u32::try_from(remaining.as_millis()).unwrap_or(u32::MAX);
let ms_u32 = std::cmp::min(ms_u32, i32::MAX as u32);
let timeout_param =
EpollTimeout::try_from(ms_u32).with_context(|| "epoll timeout conversion")?;
match epoll.wait(&mut events, timeout_param) {
Ok(_) => {
}
Err(nix::errno::Errno::EINTR) => {
}
Err(e) => {
return Err(anyhow::anyhow!("epoll_wait: {e}"));
}
}
}
}
fn spawn_child(
binary: &str,
args: &[String],
cgroup_path: Option<&std::path::Path>,
uses_parent_pgrp: bool,
) -> Result<(std::process::Child, SigchldScope)> {
let sigchld = SigchldScope::new();
let (cmd, sync_handles) = build_command(binary, args, cgroup_path, uses_parent_pgrp)?;
let child = match sync_handles {
Some(handles) => drive_cgroup_handshake(cmd, handles, binary)?,
None => {
let mut cmd = cmd;
cmd.spawn().map_err(|e| spawn_error_context(e, binary))?
}
};
Ok((child, sigchld))
}
pub(crate) const MAX_CAPTURED_STREAM_BYTES: u64 = 16 * 1024 * 1024;
fn wait_and_capture(child: &mut std::process::Child) -> Result<SpawnOutput> {
let stdout_handle = child.stdout.take().map(|out| {
std::thread::spawn(move || -> std::io::Result<(String, bool)> {
drain_capped(out, "stdout")
})
});
let stderr_handle = child.stderr.take().map(|err| {
std::thread::spawn(move || -> std::io::Result<(String, bool)> {
drain_capped(err, "stderr")
})
});
let status = child.wait().with_context(|| "wait child")?;
let (stdout, _stdout_truncated) = match stdout_handle {
Some(h) => h.join().unwrap().with_context(|| "read child stdout")?,
None => (String::new(), false),
};
let (stderr, _stderr_truncated) = match stderr_handle {
Some(h) => h.join().unwrap().with_context(|| "read child stderr")?,
None => (String::new(), false),
};
Ok(SpawnOutput {
stdout,
stderr,
exit_code: status.code().unwrap_or(-1),
})
}
fn drain_capped(src: impl std::io::Read, label: &'static str) -> std::io::Result<(String, bool)> {
use std::io::Read;
let mut raw: Vec<u8> = Vec::new();
let n = src
.take(MAX_CAPTURED_STREAM_BYTES + 1)
.read_to_end(&mut raw)?;
let truncated = n as u64 > MAX_CAPTURED_STREAM_BYTES;
if truncated {
raw.truncate(MAX_CAPTURED_STREAM_BYTES as usize);
eprintln!(
"ktstr: payload {label} exceeded {MAX_CAPTURED_STREAM_BYTES} bytes; tail discarded"
);
tracing::warn!(
stream = label,
cap_bytes = MAX_CAPTURED_STREAM_BYTES,
"payload {label} exceeded capture cap; tail discarded",
);
}
Ok((String::from_utf8_lossy(&raw).into_owned(), truncated))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cgroup::CgroupManager;
use crate::test_support::{MetricSource, MetricStream, OutputFormat, Polarity, Scheduler};
use crate::topology::TestTopology;
fn make_ctx<'a>(
cgroups: &'a CgroupManager,
topo: &'a TestTopology,
) -> crate::scenario::Ctx<'a> {
crate::scenario::Ctx::builder(cgroups, topo).build()
}
const FIO_BINARY: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &["--output-format=json"],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
const EEVDF_SCHED_PAYLOAD: Payload = Payload {
name: "eevdf",
kind: PayloadKind::Scheduler(&Scheduler::EEVDF),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
#[test]
fn builder_inherits_default_args() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY);
assert_eq!(run.args, vec!["--output-format=json"]);
}
#[test]
fn arg_appends() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY)
.arg("--runtime=30")
.arg("job.fio");
assert_eq!(
run.args,
vec!["--output-format=json", "--runtime=30", "job.fio"]
);
}
#[test]
fn clear_args_wipes_defaults() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY)
.clear_args()
.arg("--custom");
assert_eq!(run.args, vec!["--custom"]);
}
#[test]
fn args_method_bulk_appends() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY).args(["--a", "--b", "--c"]);
assert_eq!(run.args, vec!["--output-format=json", "--a", "--b", "--c"]);
}
#[test]
fn check_and_clear_checks() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY)
.check(MetricCheck::min("iops", 1000.0))
.check(MetricCheck::max("latency", 500.0));
assert_eq!(run.checks.len(), 2);
let cleared = PayloadRun::new(&ctx, &FIO_BINARY)
.clear_checks()
.check(MetricCheck::exit_code_eq(0));
assert_eq!(cleared.checks.len(), 1);
}
#[test]
fn in_cgroup_stores_name() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY).in_cgroup("fio_cg");
assert_eq!(run.cgroup.as_deref(), Some("fio_cg"));
}
#[test]
fn resolve_cgroup_path_strips_leading_slash_and_joins() {
let cgroups = CgroupManager::new("/sys/fs/cgroup/test-parent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let resolved = resolve_cgroup_path(&ctx, Some("/workload"))
.expect("valid cgroup name")
.expect("Some(path)");
assert_eq!(
resolved,
std::path::PathBuf::from("/sys/fs/cgroup/test-parent/workload")
);
let plain = resolve_cgroup_path(&ctx, Some("workload"))
.expect("valid")
.expect("Some");
assert_eq!(resolved, plain);
}
#[test]
fn resolve_cgroup_path_rejects_parent_dir() {
let cgroups = CgroupManager::new("/sys/fs/cgroup/test-parent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let err = resolve_cgroup_path(&ctx, Some("../escape")).expect_err("'..' must be rejected");
assert!(format!("{err:#}").contains(".."), "err: {err:#}");
}
#[test]
fn resolve_cgroup_path_rejects_nul_byte() {
let cgroups = CgroupManager::new("/sys/fs/cgroup/test-parent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let err = resolve_cgroup_path(&ctx, Some("bad\0name")).expect_err("NUL must be rejected");
assert!(format!("{err:#}").contains("NUL"), "err: {err:#}");
}
#[test]
fn resolve_cgroup_path_rejects_empty_after_strip() {
let cgroups = CgroupManager::new("/sys/fs/cgroup/test-parent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let err = resolve_cgroup_path(&ctx, Some("/")).expect_err("slash-only must be rejected");
assert!(format!("{err:#}").contains("empty"), "err: {err:#}");
let err = resolve_cgroup_path(&ctx, Some("")).expect_err("empty must be rejected");
assert!(format!("{err:#}").contains("empty"), "err: {err:#}");
}
#[test]
fn resolve_cgroup_path_none_passes_through() {
let cgroups = CgroupManager::new("/sys/fs/cgroup/test-parent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
assert!(resolve_cgroup_path(&ctx, None).unwrap().is_none());
}
#[test]
fn run_rejects_scheduler_kind() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &EEVDF_SCHED_PAYLOAD);
let err = run.run().unwrap_err();
assert!(
format!("{err:#}").contains("scheduler-kind"),
"err: {err:#}"
);
}
#[test]
fn evaluate_checks_passes_when_no_checks() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 0,
};
let r = evaluate_checks(&[], &pm, "");
assert!(r.passed);
}
#[test]
fn evaluate_checks_exit_code_mismatch_fails_fast() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 42,
};
let checks = [
MetricCheck::exit_code_eq(0),
MetricCheck::min("iops", 100.0),
];
let r = evaluate_checks(&checks, &pm, "");
assert!(!r.passed);
assert_eq!(r.details.len(), 1);
assert!(
r.details[0].message.contains("exited with code 42"),
"details: {:?}",
r.details
);
}
#[test]
fn evaluate_checks_exit_code_mismatch_surfaces_stderr() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 1,
};
let r = evaluate_checks(
&[MetricCheck::exit_code_eq(0)],
&pm,
"fatal: config missing\n",
);
assert!(!r.passed);
assert!(
r.details[0].message.contains("fatal: config missing"),
"stderr tail must appear in detail: {:?}",
r.details,
);
assert!(
r.details[0].message.contains("stderr:"),
"detail must label the stderr block: {:?}",
r.details,
);
}
#[test]
fn evaluate_checks_exit_code_mismatch_without_stderr_stays_terse() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 1,
};
let r = evaluate_checks(&[MetricCheck::exit_code_eq(0)], &pm, "");
assert!(!r.passed);
assert!(
!r.details[0].message.contains("stderr:"),
"empty stderr must not produce a stderr: block: {:?}",
r.details,
);
}
#[test]
fn evaluate_checks_exit_code_eq_negative_one_matches_signal_death() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: -1,
};
let r = evaluate_checks(&[MetricCheck::exit_code_eq(-1)], &pm, "");
assert!(
r.passed,
"exit_code_eq(-1) must pass when exit_code == -1: {:?}",
r.details,
);
}
#[test]
fn evaluate_checks_exit_code_eq_negative_one_fails_on_clean_exit() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::exit_code_eq(-1)], &pm, "");
assert!(!r.passed);
let msg = &r.details[0].message;
assert!(
msg.contains("exited with code 0"),
"mismatch detail must cite the actual exit code, got: {msg}"
);
assert!(
msg.contains("-1"),
"mismatch detail must cite the expected exit code, got: {msg}"
);
}
#[test]
#[should_panic(expected = "lo must be <= hi")]
fn evaluate_checks_range_reversed_bounds_panics_at_construction() {
let _ = MetricCheck::range("iops", 100.0, 50.0);
}
#[test]
fn stderr_tail_truncates_long_input() {
let long: String = "A".repeat(STDERR_TAIL_BYTES + 500);
let tail = stderr_tail(&long, STDERR_TAIL_BYTES);
assert!(tail.starts_with("..."));
assert_eq!(tail.len(), STDERR_TAIL_BYTES + 3);
}
#[test]
fn stderr_tail_preserves_short_input() {
let tail = stderr_tail("short error", STDERR_TAIL_BYTES);
assert_eq!(tail, "short error");
}
#[test]
fn stderr_tail_snaps_forward_across_multibyte_char_boundary() {
let mut s = String::from("A").repeat(10);
s.push('é');
s.push_str(&"B".repeat(10));
let tail = stderr_tail(&s, 11);
assert!(tail.starts_with("..."));
assert!(
tail[3..].starts_with('B'),
"expected snap-forward past 'é', got: {tail:?}"
);
}
#[test]
fn stderr_tail_preserves_multibyte_char_at_exact_boundary() {
let mut s = String::from("A").repeat(10);
s.push('é');
s.push_str(&"B".repeat(10));
let tail = stderr_tail(&s, 12);
assert!(tail.starts_with("..."));
assert!(
tail.contains('é'),
"boundary-aligned multibyte char must survive the tail, got: {tail:?}"
);
}
#[test]
fn stderr_tail_output_is_always_valid_utf8() {
let s = "xxxxxxxxxx好yyyyyyyyyy"; for max in 1..=s.len() {
let _ = stderr_tail(s, max);
}
}
#[test]
fn stderr_tail_snaps_forward_at_production_threshold() {
let mut s = "A".repeat(100);
s.push('é');
s.push_str(&"B".repeat(1023));
assert!(
s.len() > STDERR_TAIL_BYTES,
"fixture must exceed STDERR_TAIL_BYTES to exercise the truncation path",
);
let tail = stderr_tail(&s, STDERR_TAIL_BYTES);
assert!(tail.starts_with("..."));
assert!(
tail[3..].starts_with('B'),
"expected snap-forward past 'é' interior byte at >1 KiB, got prefix: {:?}",
&tail[..20.min(tail.len())],
);
}
#[test]
fn stderr_tail_preserves_multibyte_at_production_boundary() {
let mut s = "A".repeat(100);
s.push('é');
s.push_str(&"B".repeat(1022));
assert!(
s.len() > STDERR_TAIL_BYTES,
"fixture must exceed STDERR_TAIL_BYTES to exercise the truncation path",
);
let tail = stderr_tail(&s, STDERR_TAIL_BYTES);
assert!(tail.starts_with("..."));
assert!(
tail.contains('é'),
"boundary-aligned 'é' at the >1 KiB truncation offset must survive, got prefix: {:?}",
&tail[..40.min(tail.len())],
);
}
#[test]
fn evaluate_checks_missing_metric_fails_loudly() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 0,
};
let checks = [MetricCheck::min("iops", 100.0)];
let r = evaluate_checks(&checks, &pm, "");
assert!(!r.passed);
assert!(
r.details[0].message.contains("not found"),
"details: {:?}",
r.details
);
}
#[test]
fn evaluate_checks_min_below_threshold_fails() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "iops".to_string(),
value: 50.0,
polarity: Polarity::HigherBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::min("iops", 100.0)], &pm, "");
assert!(!r.passed);
assert!(r.details[0].message.contains("below minimum"));
}
#[test]
fn evaluate_checks_max_above_threshold_fails() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "lat".to_string(),
value: 1000.0,
polarity: Polarity::LowerBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::max("lat", 500.0)], &pm, "");
assert!(!r.passed);
assert!(r.details[0].message.contains("exceeds maximum"));
}
#[test]
fn evaluate_checks_range_out_of_bounds_fails() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "cpu".to_string(),
value: 150.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::range("cpu", 0.0, 100.0)], &pm, "");
assert!(!r.passed);
assert!(r.details[0].message.contains("outside"));
}
#[test]
fn evaluate_checks_min_nan_fails_with_nan_message() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "iops".to_string(),
value: f64::NAN,
polarity: Polarity::HigherBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::min("iops", 100.0)], &pm, "");
assert!(!r.passed, "NaN value must fail Min check");
assert!(
r.details[0].message.contains("value is NaN"),
"NaN failure must surface the dedicated message: {:?}",
r.details
);
}
#[test]
fn evaluate_checks_max_nan_fails_with_nan_message() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "lat".to_string(),
value: f64::NAN,
polarity: Polarity::LowerBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::max("lat", 500.0)], &pm, "");
assert!(!r.passed, "NaN value must fail Max check");
assert!(
r.details[0].message.contains("value is NaN"),
"NaN failure must surface the dedicated message: {:?}",
r.details
);
}
#[test]
fn evaluate_checks_range_nan_fails_with_nan_message() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "cpu".to_string(),
value: f64::NAN,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::range("cpu", 0.0, 100.0)], &pm, "");
assert!(!r.passed, "NaN value must fail Range check");
assert!(
r.details[0].message.contains("value is NaN"),
"NaN failure must surface the dedicated message: {:?}",
r.details
);
}
#[test]
fn evaluate_checks_exists_missing_fails() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::exists("thing")], &pm, "");
assert!(!r.passed);
}
#[test]
fn evaluate_checks_all_pass_returns_pass() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "iops".to_string(),
value: 5000.0,
polarity: Polarity::HigherBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(
&[
MetricCheck::exit_code_eq(0),
MetricCheck::min("iops", 1000.0),
MetricCheck::exists("iops"),
],
&pm,
"",
);
assert!(r.passed);
}
#[test]
fn evaluate_checks_duplicate_min_on_same_metric_both_evaluated() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "iops".to_string(),
value: 100.0,
polarity: Polarity::HigherBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(
&[
MetricCheck::min("iops", 50.0),
MetricCheck::min("iops", 200.0),
],
&pm,
"",
);
assert!(!r.passed, "second min must fail");
assert_eq!(r.details.len(), 1, "only the failing check emits a detail");
assert!(
r.details[0].message.contains("below minimum 200"),
"failing check must cite its threshold: {:?}",
r.details,
);
}
#[test]
fn evaluate_checks_conflicting_checks_on_same_metric_both_report() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "iops".to_string(),
value: 75.0,
polarity: Polarity::HigherBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(
&[
MetricCheck::min("iops", 100.0), MetricCheck::max("iops", 50.0), ],
&pm,
"",
);
assert!(!r.passed);
assert_eq!(
r.details.len(),
2,
"both conflicting checks must each emit a detail: {:?}",
r.details,
);
}
#[test]
fn evaluate_checks_exists_passes_for_zero_value_metric() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "errors".to_string(),
value: 0.0,
polarity: Polarity::LowerBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::exists("errors")], &pm, "");
assert!(
r.passed,
"exists('errors') must pass when metric is 0.0: {:?}",
r.details,
);
}
#[test]
fn evaluate_checks_exists_passes_for_negative_zero() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "drift".to_string(),
value: -0.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
};
let r = evaluate_checks(&[MetricCheck::exists("drift")], &pm, "");
assert!(r.passed);
}
#[test]
fn payload_run_debug_renders_identity_fields() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &TRUE_BIN)
.arg("--foo")
.arg("--bar")
.check(MetricCheck::exit_code_eq(0))
.in_cgroup("workers");
let s = format!("{run:?}");
assert!(s.contains("PayloadRun"), "prefix: {s}");
assert!(s.contains("payload:"), "payload field: {s}");
assert!(s.contains("true_bin"), "payload name: {s}");
assert!(s.contains("args_len"), "args_len field: {s}");
assert!(s.contains("checks_len"), "checks_len field: {s}");
assert!(s.contains("cgroup:"), "cgroup field: {s}");
assert!(s.contains("args_len: 2"), "computed args_len: {s}");
assert!(s.contains("checks_len: 1"), "computed checks_len: {s}");
assert!(s.contains("workers"), "cgroup value: {s}");
assert!(
!s.contains("Ctx {"),
"Ctx should not appear in PayloadRun Debug: {s}"
);
}
#[test]
fn payload_run_debug_renders_defaults() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &TRUE_BIN);
let s = format!("{run:?}");
assert!(s.contains("args_len: 0"), "default args_len: {s}");
assert!(s.contains("checks_len: 0"), "default checks_len: {s}");
assert!(s.contains("cgroup: None"), "default cgroup: {s}");
}
#[test]
fn resolve_polarities_applies_hints() {
let mut metrics = vec![Metric {
name: "iops".to_string(),
value: 100.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}];
const HINTED: Payload = Payload {
name: "p",
kind: PayloadKind::Binary("p"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[crate::test_support::MetricHint {
name: "iops",
polarity: Polarity::HigherBetter,
unit: "iops",
}],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
resolve_polarities(&mut metrics, &HINTED);
assert_eq!(metrics[0].polarity, Polarity::HigherBetter);
assert_eq!(metrics[0].unit, "iops");
}
const TRUE_BIN: Payload = Payload::binary("true_bin", "/bin/true");
const FALSE_BIN: Payload = Payload::binary("false_bin", "/bin/false");
#[test]
fn spawn_rejects_scheduler_kind() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &EEVDF_SCHED_PAYLOAD);
let err = run.spawn().unwrap_err();
assert!(
format!("{err:#}").contains("scheduler-kind"),
"err: {err:#}"
);
}
#[test]
fn spawn_then_wait_returns_result_and_metrics() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let handle = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true");
let (result, metrics) = handle.wait().expect("wait");
assert!(result.passed);
assert_eq!(metrics.exit_code, 0);
}
#[test]
fn spawn_then_kill_returns_collected_output() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
const SLEEPER: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: crate::test_support::OutputFormat::ExitCode,
default_args: &["60"],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let handle = PayloadRun::new(&ctx, &SLEEPER)
.spawn()
.expect("spawn sleep");
let (_result, metrics) = handle.kill().expect("kill+collect");
assert_ne!(metrics.exit_code, 0);
}
#[test]
fn spawn_try_wait_returns_none_while_running() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
const SLEEPER: Payload = Payload {
name: "sleeper3",
kind: PayloadKind::Binary("/bin/sleep"),
output: crate::test_support::OutputFormat::ExitCode,
default_args: &["60"],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mut handle = PayloadRun::new(&ctx, &SLEEPER)
.spawn()
.expect("spawn sleep");
assert!(handle.try_wait().expect("try_wait").is_none());
let _ = handle.kill();
}
#[test]
fn spawn_try_wait_returns_some_after_exit() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let mut handle = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true");
let mut result = None;
for _ in 0..100 {
if let Some(r) = handle.try_wait().expect("try_wait") {
result = Some(r);
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let (r, metrics) = result.expect("try_wait eventually returns Some");
assert!(r.passed);
assert_eq!(metrics.exit_code, 0);
}
#[test]
fn spawn_false_binary_produces_failing_exit_code() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let handle = PayloadRun::new(&ctx, &FALSE_BIN)
.spawn()
.expect("spawn /bin/false");
let (_result, metrics) = handle.wait().expect("wait");
assert_ne!(metrics.exit_code, 0);
}
#[test]
fn resolve_polarities_leaves_unhinted_alone() {
let mut metrics = vec![Metric {
name: "no_hint".to_string(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}];
resolve_polarities(&mut metrics, &FIO_BINARY);
assert_eq!(metrics[0].polarity, Polarity::Unknown);
assert_eq!(metrics[0].unit, "");
}
#[test]
fn evaluate_checks_three_failing_checks_produce_three_details() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: vec![
Metric {
name: "iops".to_string(),
value: 10.0,
polarity: Polarity::HigherBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
Metric {
name: "lat".to_string(),
value: 900.0,
polarity: Polarity::LowerBetter,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
Metric {
name: "cpu".to_string(),
value: 200.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
],
exit_code: 0,
};
let checks = [
MetricCheck::exit_code_eq(0),
MetricCheck::min("iops", 1000.0),
MetricCheck::max("lat", 100.0),
MetricCheck::range("cpu", 0.0, 100.0),
];
let r = evaluate_checks(&checks, &pm, "");
assert!(!r.passed);
assert_eq!(
r.details.len(),
3,
"expected one detail per failed metric check, got: {:?}",
r.details,
);
assert!(r.details.iter().any(|d| d.message.contains("iops")));
assert!(r.details.iter().any(|d| d.message.contains("lat")));
assert!(r.details.iter().any(|d| d.message.contains("cpu")));
}
#[test]
fn arg_then_clear_args_then_arg_yields_only_the_final_arg() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY)
.arg("--x")
.clear_args()
.arg("--y");
assert_eq!(run.args, vec!["--y"]);
}
#[test]
fn default_checks_are_inherited_by_new_builder() {
const CHECKED: Payload = Payload {
name: "checked",
kind: PayloadKind::Binary("checked"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[
MetricCheck::exit_code_eq(0),
MetricCheck::min("iops", 500.0),
],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let fresh = PayloadRun::new(&ctx, &CHECKED);
assert_eq!(fresh.checks.len(), 2);
assert!(matches!(fresh.checks[0], MetricCheck::ExitCodeEq(0)));
assert!(matches!(
fresh.checks[1],
MetricCheck::Min { value, .. } if value == 500.0,
));
let appended = PayloadRun::new(&ctx, &CHECKED).check(MetricCheck::exists("latency"));
assert_eq!(appended.checks.len(), 3);
let cleared = PayloadRun::new(&ctx, &CHECKED).clear_checks();
assert!(cleared.checks.is_empty());
}
#[test]
fn in_cgroup_accepts_static_str_zero_alloc() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let run = PayloadRun::new(&ctx, &FIO_BINARY).in_cgroup("workload");
match &run.cgroup {
Some(Cow::Borrowed(s)) => assert_eq!(*s, "workload"),
other => panic!("expected Cow::Borrowed for &'static str input, got {other:?}"),
}
}
#[test]
fn in_cgroup_accepts_owned_string() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let name = String::from("dynamic");
let run = PayloadRun::new(&ctx, &FIO_BINARY).in_cgroup(name);
match &run.cgroup {
Some(Cow::Owned(s)) => assert_eq!(s, "dynamic"),
other => panic!("expected Cow::Owned for String input, got {other:?}"),
}
}
#[test]
fn payload_metrics_shm_payload_json_round_trip() {
let emit = PayloadMetrics {
payload_index: 0,
metrics: vec![
Metric {
name: "jobs.0.read.iops".to_string(),
value: 12345.0,
polarity: Polarity::HigherBetter,
unit: "iops".to_string(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
Metric {
name: "lat_ns".to_string(),
value: 500.0,
polarity: Polarity::LowerBetter,
unit: "ns".to_string(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
],
exit_code: 0,
};
let bytes = serde_json::to_vec(&emit).expect("serialize PayloadMetrics");
let decoded: PayloadMetrics =
serde_json::from_slice(&bytes).expect("decode PayloadMetrics from JSON bytes");
assert_eq!(decoded.exit_code, emit.exit_code);
assert_eq!(decoded.metrics.len(), emit.metrics.len());
for (a, b) in decoded.metrics.iter().zip(emit.metrics.iter()) {
assert_eq!(a.name, b.name);
assert_eq!(a.value, b.value);
assert_eq!(a.polarity, b.polarity);
assert_eq!(a.unit, b.unit);
assert_eq!(a.source, b.source);
}
}
#[test]
fn resolve_polarities_applies_hints_by_name_lookup() {
use crate::test_support::{Metric, MetricHint, MetricSource, MetricStream, Polarity};
static PAYLOAD: crate::test_support::Payload = crate::test_support::Payload {
name: "hinted",
kind: crate::test_support::PayloadKind::Binary("x"),
output: crate::test_support::OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[
MetricHint {
name: "lat_ns",
polarity: Polarity::LowerBetter,
unit: "ns",
},
MetricHint {
name: "iops",
polarity: Polarity::HigherBetter,
unit: "iops",
},
],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mut ms = vec![
Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
Metric {
name: "unhinted".into(),
value: 2.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
Metric {
name: "lat_ns".into(),
value: 3.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
];
resolve_polarities(&mut ms, &PAYLOAD);
assert_eq!(ms[0].polarity, Polarity::HigherBetter);
assert_eq!(ms[0].unit, "iops");
assert_eq!(ms[1].polarity, Polarity::Unknown);
assert_eq!(ms[1].unit, "");
assert_eq!(ms[2].polarity, Polarity::LowerBetter);
assert_eq!(ms[2].unit, "ns");
}
#[test]
fn resolve_polarities_duplicate_hint_names_last_wins() {
use crate::test_support::{Metric, MetricHint, MetricSource, MetricStream, Polarity};
static PAYLOAD: crate::test_support::Payload = crate::test_support::Payload {
name: "dup_hints",
kind: crate::test_support::PayloadKind::Binary("x"),
output: crate::test_support::OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[
MetricHint {
name: "iops",
polarity: Polarity::HigherBetter,
unit: "iops",
},
MetricHint {
name: "iops",
polarity: Polarity::LowerBetter,
unit: "overridden",
},
],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mut ms = vec![Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}];
resolve_polarities(&mut ms, &PAYLOAD);
assert_eq!(ms[0].polarity, Polarity::LowerBetter);
assert_eq!(ms[0].unit, "overridden");
}
#[test]
fn resolve_polarities_duplicate_metric_names_each_gets_hint() {
use crate::test_support::{Metric, MetricHint, MetricSource, MetricStream, Polarity};
static PAYLOAD: crate::test_support::Payload = crate::test_support::Payload {
name: "dup_metrics",
kind: crate::test_support::PayloadKind::Binary("x"),
output: crate::test_support::OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[MetricHint {
name: "iops",
polarity: Polarity::HigherBetter,
unit: "iops",
}],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mut ms = vec![
Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
Metric {
name: "iops".into(),
value: 2.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
},
];
resolve_polarities(&mut ms, &PAYLOAD);
for m in &ms {
assert_eq!(m.polarity, Polarity::HigherBetter);
assert_eq!(m.unit, "iops");
}
}
#[test]
fn resolve_polarities_empty_inputs_are_noop() {
use crate::test_support::{Metric, MetricSource, MetricStream, Polarity};
static NO_HINTS: crate::test_support::Payload = crate::test_support::Payload {
name: "no_hints",
kind: crate::test_support::PayloadKind::Binary("x"),
output: crate::test_support::OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mut ms = vec![Metric {
name: "anything".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}];
resolve_polarities(&mut ms, &NO_HINTS);
assert_eq!(ms[0].polarity, Polarity::Unknown);
assert_eq!(ms[0].unit, "");
let mut empty: Vec<Metric> = vec![];
resolve_polarities(&mut empty, &NO_HINTS);
assert!(empty.is_empty());
}
#[test]
fn resolve_polarities_owned_applies_hints() {
use crate::test_support::{Metric, MetricSource, MetricStream, Polarity, WireMetricHint};
let hints = vec![
WireMetricHint {
name: "iops".to_string(),
polarity: Polarity::HigherBetter,
unit: "iops".to_string(),
},
WireMetricHint {
name: "lat_ns".to_string(),
polarity: Polarity::LowerBetter,
unit: "ns".to_string(),
},
];
let mut ms = vec![
Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
Metric {
name: "unhinted".into(),
value: 2.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
Metric {
name: "lat_ns".into(),
value: 3.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
];
resolve_polarities_owned(&mut ms, &hints);
assert_eq!(ms[0].polarity, Polarity::HigherBetter);
assert_eq!(ms[0].unit, "iops");
assert_eq!(ms[1].polarity, Polarity::Unknown);
assert_eq!(ms[1].unit, "");
assert_eq!(ms[2].polarity, Polarity::LowerBetter);
assert_eq!(ms[2].unit, "ns");
}
#[test]
fn resolve_polarities_owned_duplicate_hint_names_last_wins() {
use crate::test_support::{Metric, MetricSource, MetricStream, Polarity, WireMetricHint};
let hints = vec![
WireMetricHint {
name: "iops".to_string(),
polarity: Polarity::HigherBetter,
unit: "iops".to_string(),
},
WireMetricHint {
name: "iops".to_string(),
polarity: Polarity::LowerBetter,
unit: "overridden".to_string(),
},
];
let mut ms = vec![Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
}];
resolve_polarities_owned(&mut ms, &hints);
assert_eq!(ms[0].polarity, Polarity::LowerBetter);
assert_eq!(ms[0].unit, "overridden");
}
#[test]
fn resolve_polarities_owned_duplicate_metric_names_each_gets_hint() {
use crate::test_support::{Metric, MetricSource, MetricStream, Polarity, WireMetricHint};
let hints = vec![WireMetricHint {
name: "iops".to_string(),
polarity: Polarity::HigherBetter,
unit: "iops".to_string(),
}];
let mut ms = vec![
Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
Metric {
name: "iops".into(),
value: 2.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
];
resolve_polarities_owned(&mut ms, &hints);
for m in &ms {
assert_eq!(m.polarity, Polarity::HigherBetter);
assert_eq!(m.unit, "iops");
}
}
#[test]
fn resolve_polarities_owned_empty_inputs_are_noop() {
use crate::test_support::{Metric, MetricSource, MetricStream, Polarity};
let no_hints: Vec<crate::test_support::WireMetricHint> = vec![];
let mut ms = vec![Metric {
name: "anything".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
}];
resolve_polarities_owned(&mut ms, &no_hints);
assert_eq!(ms[0].polarity, Polarity::Unknown);
assert_eq!(ms[0].unit, "");
let hints = vec![crate::test_support::WireMetricHint {
name: "iops".to_string(),
polarity: Polarity::HigherBetter,
unit: "iops".to_string(),
}];
let mut empty: Vec<Metric> = vec![];
resolve_polarities_owned(&mut empty, &hints);
assert!(empty.is_empty());
}
#[test]
fn resolve_polarities_owned_matches_from_metric_hint_conversion() {
use crate::test_support::{
Metric, MetricHint, MetricSource, MetricStream, Polarity, WireMetricHint,
};
static STATIC_HINTS: &[MetricHint] = &[
MetricHint {
name: "iops",
polarity: Polarity::HigherBetter,
unit: "iops",
},
MetricHint {
name: "lat_ns",
polarity: Polarity::LowerBetter,
unit: "ns",
},
];
let owned: Vec<WireMetricHint> = STATIC_HINTS.iter().map(WireMetricHint::from).collect();
let mut ms = vec![
Metric {
name: "iops".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
Metric {
name: "lat_ns".into(),
value: 1.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::LlmExtract,
stream: MetricStream::Stdout,
},
];
resolve_polarities_owned(&mut ms, &owned);
assert_eq!(ms[0].polarity, Polarity::HigherBetter);
assert_eq!(ms[0].unit, "iops");
assert_eq!(ms[1].polarity, Polarity::LowerBetter);
assert_eq!(ms[1].unit, "ns");
}
#[test]
fn emit_payload_metrics_no_panic_without_shm() {
let pm = PayloadMetrics {
payload_index: 0,
metrics: Vec::new(),
exit_code: 0,
};
emit_payload_metrics(&pm);
}
#[test]
fn try_wait_after_terminal_returns_err() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let mut handle = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true");
for _ in 0..100 {
if handle.try_wait().expect("try_wait").is_some() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let err = handle
.try_wait()
.expect_err("second try_wait on consumed handle must be Err");
let msg = format!("{err:#}");
assert!(
msg.contains("already consumed") && msg.contains("true_bin"),
"error must name the handle + misuse, got: {msg}"
);
}
#[test]
fn wait_after_try_wait_consumed_returns_err() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let mut handle = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true");
for _ in 0..100 {
if handle.try_wait().expect("try_wait").is_some() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let err = handle
.wait()
.expect_err("wait() on consumed handle must return Err");
let msg = format!("{err:#}");
assert!(
msg.contains("already consumed") && msg.contains("true_bin"),
"error must name the handle + misuse, got: {msg}"
);
}
#[test]
fn kill_after_try_wait_consumed_returns_err() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
let mut handle = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true");
for _ in 0..100 {
if handle.try_wait().expect("try_wait").is_some() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let err = handle
.kill()
.expect_err("kill() on consumed handle must return Err");
let msg = format!("{err:#}");
assert!(
msg.contains("already consumed") && msg.contains("true_bin"),
"error must name the handle + misuse, got: {msg}"
);
}
const JSON_PAYLOAD: Payload = Payload {
name: "json_payload",
kind: PayloadKind::Binary("json_payload"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
#[test]
fn evaluate_prefers_stdout_when_stdout_yields_metrics() {
let output = SpawnOutput {
stdout: r#"{"iops": 500}"#.to_string(),
stderr: "unrelated banner: open fd error (ignore)".to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert_eq!(pm.metrics.len(), 1, "stdout JSON must win");
assert_eq!(pm.metrics[0].name, "iops");
assert_eq!(pm.metrics[0].value, 500.0);
}
#[test]
fn evaluate_falls_back_to_stderr_when_stdout_empty() {
let output = SpawnOutput {
stdout: String::new(),
stderr: r#"{"latency_ns": 1234}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert_eq!(pm.metrics.len(), 1, "stderr fallback must fire");
assert_eq!(pm.metrics[0].name, "latency_ns");
assert_eq!(pm.metrics[0].value, 1234.0);
}
#[test]
fn stdout_primary_tags_metrics_with_metric_stream_stdout() {
use crate::test_support::MetricStream;
for (label, stdout) in [
("single-key", r#"{"iops": 4242}"#.to_string()),
(
"multi-key",
r#"{"iops": 1000, "latency_us": 42, "runs": 3}"#.to_string(),
),
] {
let output = SpawnOutput {
stdout,
stderr: r#"{"iops": 9999999}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert!(
!pm.metrics.is_empty(),
"[{label}] stdout-primary must produce metrics",
);
for m in &pm.metrics {
assert_eq!(
m.stream,
MetricStream::Stdout,
"[{label}] stdout-extracted metric `{name}` must \
carry MetricStream::Stdout; got stream={stream:?}. \
A regression that mis-tagged stdout-sourced \
metrics as Stderr (or merged the streams) would \
trip here — the stderr-fallback sibling test \
covers the inverse direction.",
name = m.name,
stream = m.stream,
);
}
let iops = pm
.metrics
.iter()
.find(|m| m.name == "iops")
.expect("iops metric must be present");
assert!(
iops.value < 9_000_000.0,
"[{label}] iops value {val} must come from stdout \
(< 9M) not stderr (9999999); a value from stderr \
would prove the test's stream tag is accidentally \
correct because the merge went the wrong way",
val = iops.value,
);
}
}
#[test]
fn stderr_fallback_tags_metrics_with_metric_stream_stderr() {
use crate::test_support::MetricStream;
for (label, stdout) in [
("empty-stdout", String::new()),
(
"prose-stdout",
"no json here, just prose from a banner line\n".to_string(),
),
(
"valid-json-no-numeric-leaves-stdout",
r#"{"status": "ok", "ready": true, "note": null}"#.to_string(),
),
] {
let output = SpawnOutput {
stdout,
stderr: r#"{"iops": 4242}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert_eq!(
pm.metrics.len(),
1,
"[{label}] stderr fallback must produce exactly one metric",
);
assert_eq!(
pm.metrics[0].stream,
MetricStream::Stderr,
"[{label}] fallback-extracted metric must carry MetricStream::Stderr \
so downstream review tooling can distinguish stream origin; \
got stream={:?}",
pm.metrics[0].stream,
);
}
}
#[test]
fn evaluate_falls_back_to_stderr_when_stdout_yields_no_metrics() {
let output = SpawnOutput {
stdout: "no json here, just prose from a banner line\n".to_string(),
stderr: r#"{"throughput": 42}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert_eq!(
pm.metrics.len(),
1,
"stderr fallback must fire on empty result"
);
assert_eq!(pm.metrics[0].name, "throughput");
assert_eq!(pm.metrics[0].value, 42.0);
}
#[test]
fn evaluate_falls_back_when_stdout_json_has_no_numeric_leaves() {
let output = SpawnOutput {
stdout: r#"{"status": "ok", "ready": true, "note": null}"#.to_string(),
stderr: r#"{"iops": 9001}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert_eq!(
pm.metrics.len(),
1,
"stderr fallback must fire when stdout parses but has \
no numeric leaves; got metrics: {:?}",
pm.metrics,
);
assert_eq!(pm.metrics[0].name, "iops");
assert_eq!(pm.metrics[0].value, 9001.0);
for m in &pm.metrics {
assert!(
!matches!(m.name.as_str(), "status" | "ready" | "note"),
"non-numeric stdout leaf {:?} leaked into metrics",
m.name,
);
}
}
#[test]
fn evaluate_returns_empty_when_both_streams_have_no_numeric_leaves() {
let output = SpawnOutput {
stdout: r#"{"phase": "warmup"}"#.to_string(),
stderr: r#"{"phase": "shutdown"}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert!(
pm.metrics.is_empty(),
"both-streams-non-numeric must produce no metrics; \
got: {:?}",
pm.metrics,
);
}
#[test]
fn evaluate_returns_empty_metrics_on_empty_stdout_and_stderr() {
let output = SpawnOutput {
stdout: String::new(),
stderr: String::new(),
exit_code: 0,
};
let (_, pm) = evaluate(&JSON_PAYLOAD, &[], output);
assert!(pm.metrics.is_empty(), "both-empty must produce no metrics");
assert_eq!(pm.exit_code, 0);
}
#[cfg(unix)]
#[test]
fn kill_reaps_fork_descendants_via_process_group() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
const MULTI_SLEEPER: Payload = Payload {
name: "multi_sleeper",
kind: PayloadKind::Binary("/bin/sh"),
output: crate::test_support::OutputFormat::ExitCode,
default_args: &["-c", "sleep 60 & exec sleep 60"],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let handle = PayloadRun::new(&ctx, &MULTI_SLEEPER)
.spawn()
.expect("spawn multi-sleeper");
let pgid = libc::pid_t::try_from(handle.pid().expect("child still present"))
.expect("child pid fits in pid_t");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
let (_, _) = handle.kill().expect("kill+reap");
loop {
let rc = unsafe { libc::killpg(pgid, 0) };
if rc != 0 {
let err = std::io::Error::last_os_error();
assert_eq!(
err.raw_os_error(),
Some(libc::ESRCH),
"unexpected errno from killpg probe: {err}",
);
break;
}
if std::time::Instant::now() >= deadline {
panic!("process group {pgid} still alive after kill+reap");
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
#[cfg(unix)]
#[test]
fn drop_kills_fork_descendants_via_process_group() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
const MULTI_SLEEPER: Payload = Payload {
name: "multi_sleeper_drop",
kind: PayloadKind::Binary("/bin/sh"),
output: crate::test_support::OutputFormat::ExitCode,
default_args: &["-c", "sleep 60 & exec sleep 60"],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let handle = PayloadRun::new(&ctx, &MULTI_SLEEPER)
.spawn()
.expect("spawn multi-sleeper");
let pgid = libc::pid_t::try_from(handle.pid().expect("child still present"))
.expect("child pid fits in pid_t");
drop(handle);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
let rc = unsafe { libc::killpg(pgid, 0) };
if rc != 0 {
let err = std::io::Error::last_os_error();
assert_eq!(
err.raw_os_error(),
Some(libc::ESRCH),
"unexpected errno from killpg probe after drop: {err}",
);
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"process group {pgid} still alive 30 s after \
PayloadHandle drop — Drop-path killpg sweep \
failed to reach every member",
);
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
#[cfg(unix)]
#[test]
fn payload_uses_parent_pgrp_opts_out_of_process_group() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = make_ctx(&cgroups, &topo);
const PARENT_PGRP_SLEEPER: Payload = Payload {
name: "parent_pgrp_sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: crate::test_support::OutputFormat::ExitCode,
default_args: &["60"],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: true,
known_flags: None,
metric_bounds: None,
};
let handle = PayloadRun::new(&ctx, &PARENT_PGRP_SLEEPER)
.spawn()
.expect("spawn opt-out sleeper");
let child_pid = libc::pid_t::try_from(handle.pid().expect("child alive"))
.expect("child pid fits in pid_t");
let child_pgid = unsafe { libc::getpgid(child_pid) };
let parent_pgid = unsafe { libc::getpgid(0) };
assert!(child_pgid > 0, "getpgid(child) failed: {child_pgid}");
assert_eq!(
child_pgid, parent_pgid,
"uses_parent_pgrp=true payload must inherit the \
parent's pgid (child_pgid={child_pgid}, \
parent_pgid={parent_pgid}); a mismatch means \
`build_command` still called `process_group(0)` \
despite the opt-out",
);
let _ = handle.kill().expect("kill opt-out sleeper");
}
#[cfg(unix)]
#[test]
fn wait_with_deadline_timeout_kills_process_group() {
use std::os::unix::process::CommandExt;
let mut child = std::process::Command::new("/bin/sh")
.args(["-c", "sleep 60 & exec sleep 60"])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.process_group(0)
.spawn()
.expect("spawn multi-sleeper");
let pgid = libc::pid_t::try_from(child.id()).expect("child pid fits in pid_t");
let start = std::time::Instant::now();
let out = wait_with_deadline(
&mut child,
std::time::Duration::from_millis(500),
"multi_sleeper_timeout",
false,
)
.expect("wait_with_deadline returns Ok on timeout");
let elapsed = start.elapsed();
assert!(
elapsed >= std::time::Duration::from_millis(400),
"wait_with_deadline returned after only {elapsed:?}; \
deadline was 500 ms — check the epoll loop is honoring \
the timeout rather than unblocking on an unrelated event",
);
assert_eq!(out.exit_code, -1);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
let rc = unsafe { libc::killpg(pgid, 0) };
if rc != 0 {
let err = std::io::Error::last_os_error();
assert_eq!(
err.raw_os_error(),
Some(libc::ESRCH),
"unexpected errno from killpg probe after \
timeout: {err}",
);
break;
}
if std::time::Instant::now() >= deadline {
panic!(
"process group {pgid} still alive 30 s after \
wait_with_deadline timeout fired — killpg sweep \
in the timeout branch failed to reach every \
member",
);
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
#[test]
fn spawn_error_context_enoent_attaches_remediation() {
let err = std::io::Error::from_raw_os_error(libc::ENOENT);
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
let wrapped = super::spawn_error_context(err, "fio");
let rendered = format!("{wrapped:#}");
assert!(rendered.contains("spawn 'fio'"), "got: {rendered}");
assert!(
rendered.contains("not found on guest PATH"),
"ENOENT branch must name the PATH miss: {rendered}"
);
assert!(
rendered.contains("-i fio") || rendered.contains("--include-files fio"),
"ENOENT branch must name the `-i <binary>` remediation: {rendered}"
);
assert!(
rendered.contains("#[ktstr_test]"),
"ENOENT branch must name the ktstr_test pre-install remediation: {rendered}"
);
}
#[test]
fn spawn_error_context_non_enoent_keeps_minimal_context() {
let err = std::io::Error::from_raw_os_error(libc::EACCES);
assert_ne!(err.kind(), std::io::ErrorKind::NotFound);
let wrapped = super::spawn_error_context(err, "fio");
let rendered = format!("{wrapped:#}");
assert!(rendered.contains("spawn 'fio'"), "got: {rendered}");
assert!(
!rendered.contains("-i fio"),
"non-ENOENT must not leak the `-i` remediation: {rendered}"
);
assert!(
!rendered.contains("--include-files"),
"non-ENOENT must not leak the --include-files remediation: {rendered}"
);
assert!(
!rendered.contains("#[ktstr_test]"),
"non-ENOENT must not leak the ktstr_test remediation: {rendered}"
);
assert!(
!rendered.contains("not found on guest PATH"),
"non-ENOENT must not claim 'not found on PATH': {rendered}"
);
}
#[test]
fn build_command_without_cgroup_returns_no_sync_handles() {
let (_cmd, handles) = super::build_command("/bin/true", &[], None, false).unwrap();
assert!(
handles.is_none(),
"no cgroup_path ⇒ no sync handles — got Some(_)",
);
}
#[test]
fn build_command_with_cgroup_returns_sync_handles() {
let fake_cg = std::path::PathBuf::from("/nonexistent/fake-cgroup");
let (_cmd, handles) = super::build_command("/bin/true", &[], Some(&fake_cg), false)
.expect("build_command must defer cgroup-path validation to sync");
let handles = handles.expect("cgroup path ⇒ handles");
assert_eq!(
handles.cgroup_procs_path,
fake_cg.join("cgroup.procs"),
"handles must carry <cg>/cgroup.procs verbatim",
);
assert!(handles.notify.r_fd() >= 0);
assert!(handles.notify.w_fd() >= 0);
assert!(handles.release.r_fd() >= 0);
assert!(handles.release.w_fd() >= 0);
}
#[test]
fn pipe_pair_allocates_fresh_pipe_on_each_call() {
use std::io::{Read, Write};
use std::os::fd::{AsRawFd, FromRawFd};
let a = super::PipePair::new().unwrap();
let b = super::PipePair::new().unwrap();
assert_ne!(a.r_fd(), b.r_fd());
assert_ne!(a.w_fd(), b.w_fd());
{
let mut w = unsafe { std::fs::File::from_raw_fd(a.w_fd()) };
w.write_all(&[42u8]).unwrap();
std::mem::forget(w);
}
let mut buf = [0u8; 1];
let mut r = unsafe { std::fs::File::from_raw_fd(a.read_fd.as_raw_fd()) };
r.read_exact(&mut buf).unwrap();
assert_eq!(buf[0], 42);
std::mem::forget(r);
drop(b.read_fd);
drop(b.write_fd);
}
#[test]
fn spawn_with_cgroup_sync_writes_pid_and_releases_child() {
use std::io::Read;
use std::os::fd::FromRawFd;
let tmp_dir = std::env::temp_dir().join(format!("ktstr-cgroup-sync-test-{}", unsafe {
libc::getpid()
}));
std::fs::create_dir_all(&tmp_dir).unwrap();
let procs_path = tmp_dir.join("cgroup.procs");
std::fs::write(&procs_path, b"").unwrap();
let notify = super::PipePair::new().unwrap();
let release = super::PipePair::new().unwrap();
let child_pid: libc::pid_t = 99999;
let notify_w_fd = notify.w_fd();
let release_r_fd = release.r_fd();
let child_thread = std::thread::spawn(move || {
use std::io::Write;
let mut w = unsafe { std::fs::File::from_raw_fd(notify_w_fd) };
w.write_all(&child_pid.to_le_bytes()).unwrap();
drop(w);
let mut r = unsafe { std::fs::File::from_raw_fd(release_r_fd) };
let mut buf = [0u8; 1];
r.read_exact(&mut buf).unwrap();
assert_eq!(buf[0], 1, "release byte must be 1");
drop(r);
});
std::mem::forget(notify.write_fd);
std::mem::forget(release.read_fd);
let notify_r = notify.read_fd;
let release_w = release.write_fd;
let handles = super::CgroupSyncHandles {
notify: super::PipePair {
read_fd: notify_r,
write_fd: unsafe {
std::os::fd::OwnedFd::from_raw_fd(libc::open(
c"/dev/null".as_ptr(),
libc::O_WRONLY,
))
},
},
release: super::PipePair {
read_fd: unsafe {
std::os::fd::OwnedFd::from_raw_fd(libc::open(
c"/dev/null".as_ptr(),
libc::O_RDONLY,
))
},
write_fd: release_w,
},
cgroup_procs_path: procs_path.clone(),
};
let returned_pid = super::spawn_with_cgroup_sync(handles).unwrap();
assert_eq!(
returned_pid, child_pid,
"spawn_with_cgroup_sync must return the pid it read \
from the notify pipe",
);
child_thread
.join()
.expect("child thread completes after release");
let written = std::fs::read_to_string(&procs_path).unwrap();
assert_eq!(
written,
format!("{child_pid}\n"),
"spawn_with_cgroup_sync must write <pid>\\n to cgroup.procs; \
got {written:?}",
);
let _ = std::fs::remove_file(&procs_path);
let _ = std::fs::remove_dir(&tmp_dir);
}
#[test]
fn spawn_with_cgroup_sync_errors_on_missing_cgroup_procs_path() {
use std::os::fd::FromRawFd;
let missing_path =
std::path::PathBuf::from("/nonexistent/dir/that/does/not/exist/cgroup.procs");
let notify = super::PipePair::new().unwrap();
let release = super::PipePair::new().unwrap();
let child_pid: libc::pid_t = 12345;
let notify_w_fd = notify.w_fd();
let release_r_fd = release.r_fd();
let child_thread = std::thread::spawn(move || -> std::io::Error {
use std::io::{Read, Write};
let mut w = unsafe { std::fs::File::from_raw_fd(notify_w_fd) };
let _ = w.write_all(&child_pid.to_le_bytes());
drop(w);
let mut r = unsafe { std::fs::File::from_raw_fd(release_r_fd) };
let mut buf = [0u8; 1];
let err = r.read_exact(&mut buf).unwrap_err();
drop(r);
err
});
std::mem::forget(notify.write_fd);
std::mem::forget(release.read_fd);
let notify_r = notify.read_fd;
let release_w = release.write_fd;
let handles = super::CgroupSyncHandles {
notify: super::PipePair {
read_fd: notify_r,
write_fd: unsafe {
std::os::fd::OwnedFd::from_raw_fd(libc::open(
c"/dev/null".as_ptr(),
libc::O_WRONLY,
))
},
},
release: super::PipePair {
read_fd: unsafe {
std::os::fd::OwnedFd::from_raw_fd(libc::open(
c"/dev/null".as_ptr(),
libc::O_RDONLY,
))
},
write_fd: release_w,
},
cgroup_procs_path: missing_path.clone(),
};
let err = super::spawn_with_cgroup_sync(handles).unwrap_err();
let rendered = format!("{err:#}");
assert!(
rendered.contains("open cgroup.procs"),
"error must name the open step: {rendered}",
);
assert!(
rendered.contains("/nonexistent/dir/that/does/not/exist"),
"error must name the failing path: {rendered}",
);
let child_err = child_thread.join().expect("child thread returns");
assert_eq!(
child_err.kind(),
std::io::ErrorKind::UnexpectedEof,
"child's release read must hit EOF when parent abandons sync; got {child_err}",
);
}
#[test]
fn drive_cgroup_handshake_does_not_deadlock_on_failing_cgroup_write() {
use std::sync::mpsc;
let missing_cgroup =
std::path::PathBuf::from("/nonexistent/ktstr-cgroup-sync-deadlock-guard");
let (tx, rx) = mpsc::channel::<anyhow::Result<()>>();
let worker = std::thread::spawn(move || {
let (cmd, handles) =
super::build_command("/bin/true", &[], Some(&missing_cgroup), false)
.expect("build_command");
let handles = handles.expect("handles present when cgroup_path is Some");
let result = super::drive_cgroup_handshake(cmd, handles, "/bin/true");
let err = result.expect_err("handshake against nonexistent cgroup.procs must Err");
let rendered = format!("{err:#}");
assert!(
rendered.contains("open cgroup.procs") || rendered.contains("cgroup.procs"),
"handshake error must name the failing step: {rendered}",
);
let _ = tx.send(Ok(()));
});
let deadline = std::time::Duration::from_secs(10);
match rx.recv_timeout(deadline) {
Ok(Ok(())) => {
worker
.join()
.expect("worker thread completes without panic");
}
Ok(Err(e)) => panic!("worker thread reported error: {e:#}"),
Err(mpsc::RecvTimeoutError::Timeout) => panic!(
"drive_cgroup_handshake did not return within \
{deadline:?} — cross-fork inherited-fd deadlock \
has regressed. The child's pre_exec is almost \
certainly blocking on `read(release_read_fd)` \
because it still holds its own inherited copy of \
`release_write_fd` open; Step 0 of \
`cgroup_sync_pre_exec` must `close()` both \
`notify_read_fd` and `release_write_fd` BEFORE \
the release-read block, otherwise the kernel \
never delivers EOF when the parent drops its \
write end.",
),
Err(mpsc::RecvTimeoutError::Disconnected) => {
panic!("worker thread disconnected without reporting",)
}
}
}
const LLM_EXTRACT_PAYLOAD: Payload = Payload {
name: "llm_payload",
kind: PayloadKind::Binary("llm_payload"),
output: OutputFormat::LlmExtract(None),
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
const LLM_EXTRACT_HINT_PAYLOAD: Payload = Payload {
name: "llm_hint_payload",
kind: PayloadKind::Binary("llm_hint_payload"),
output: OutputFormat::LlmExtract(Some("focus on iops")),
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
#[test]
fn evaluate_llm_extract_does_not_extract_from_stdout() {
let output = SpawnOutput {
stdout: r#"{"iops": 4242, "latency": 10}"#.to_string(),
stderr: String::new(),
exit_code: 0,
};
let (assert_result, pm) = evaluate(&LLM_EXTRACT_PAYLOAD, &[], output);
assert!(
pm.metrics.is_empty(),
"guest evaluate() on an LlmExtract payload must NOT call extract_metrics; \
metrics MUST be empty even when stdout carries extractable JSON. \
Got metrics: {:?}",
pm.metrics,
);
assert_eq!(
pm.exit_code, 0,
"exit_code must still propagate through the deferral arm",
);
assert!(
assert_result.passed,
"no checks declared and no exit-code mismatch — verdict must pass; \
got {assert_result:?}",
);
}
#[test]
fn evaluate_llm_extract_does_not_extract_from_stderr_either() {
let output = SpawnOutput {
stdout: String::new(),
stderr: r#"{"latency": 42, "rps": 999}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&LLM_EXTRACT_PAYLOAD, &[], output);
assert!(
pm.metrics.is_empty(),
"LlmExtract deferral arm must not run extract_metrics on stderr either; \
both streams ride raw to the host. Got metrics: {:?}",
pm.metrics,
);
}
#[test]
fn evaluate_llm_extract_propagates_exit_code() {
for code in [0, 1, 42, 137] {
let output = SpawnOutput {
stdout: String::new(),
stderr: String::new(),
exit_code: code,
};
let (_, pm) = evaluate(&LLM_EXTRACT_PAYLOAD, &[], output);
assert_eq!(
pm.exit_code, code,
"deferral arm must propagate exit_code unchanged; expected {code}",
);
assert!(
pm.metrics.is_empty(),
"deferral arm must keep metrics empty even on non-zero exit",
);
}
}
#[test]
fn evaluate_llm_extract_with_hint_returns_empty_metrics() {
let output = SpawnOutput {
stdout: r#"{"iops": 100, "latency": 5}"#.to_string(),
stderr: r#"{"alt": 99}"#.to_string(),
exit_code: 0,
};
let (_, pm) = evaluate(&LLM_EXTRACT_HINT_PAYLOAD, &[], output);
assert!(
pm.metrics.is_empty(),
"LlmExtract(Some(hint)) must skip extraction same as LlmExtract(None); \
got metrics: {:?}",
pm.metrics,
);
}
#[test]
fn evaluate_llm_extract_honors_exit_code_eq_check() {
let output = SpawnOutput {
stdout: String::new(),
stderr: String::new(),
exit_code: 0,
};
let (assert_result, pm) = evaluate(
&LLM_EXTRACT_PAYLOAD,
&[MetricCheck::exit_code_eq(0)],
output,
);
assert!(
assert_result.passed,
"matching ExitCodeEq must pass on LlmExtract deferral arm; got {assert_result:?}",
);
assert!(pm.metrics.is_empty());
let output = SpawnOutput {
stdout: String::new(),
stderr: "stderr lives in the failure detail".to_string(),
exit_code: 1,
};
let (assert_result, _) = evaluate(
&LLM_EXTRACT_PAYLOAD,
&[MetricCheck::exit_code_eq(0)],
output,
);
assert!(
!assert_result.passed,
"mismatching ExitCodeEq must produce a failing AssertResult on the deferral arm",
);
assert!(
!assert_result.details.is_empty(),
"exit-code mismatch must surface at least one AssertDetail; got: {assert_result:?}",
);
}
#[test]
#[cfg(panic = "unwind")]
fn evaluate_llm_extract_panics_on_metric_level_runtime_check() {
let output = SpawnOutput {
stdout: String::new(),
stderr: String::new(),
exit_code: 0,
};
let result = std::panic::catch_unwind(|| {
evaluate(
&LLM_EXTRACT_PAYLOAD,
&[MetricCheck::min("iops", 1.0)],
output,
)
});
let payload = result.expect_err("metric-level check on LlmExtract must panic");
let msg = if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
String::new()
};
assert!(
msg.contains("metric-level .check()") || msg.contains("LlmExtract"),
"panic message must surface the developer-error guidance; got: {msg}",
);
}
#[test]
fn sigchld_scope_lifo_drop_restores_initial_disposition() {
fn read_sigchld() -> libc::sighandler_t {
let handler = unsafe { libc::signal(libc::SIGCHLD, libc::SIG_DFL) };
unsafe {
libc::signal(libc::SIGCHLD, handler);
}
handler
}
let initial = read_sigchld();
let outer = SigchldScope::new();
let inner = SigchldScope::new();
drop(inner);
assert_eq!(
read_sigchld(),
libc::SIG_DFL,
"after inner drop, live disposition must be SIG_DFL — \
inner.prev was outer's SIG_DFL install, not initial",
);
drop(outer);
assert_eq!(
read_sigchld(),
initial,
"after outer drop, live disposition must equal initial \
({initial:#x}); a non-LIFO drop would leave SIG_DFL \
({:#x}) leaking into the process",
libc::SIG_DFL,
);
}
#[test]
fn payload_handle_drop_restores_sigchld_disposition() {
fn read_sigchld() -> libc::sighandler_t {
let handler = unsafe { libc::signal(libc::SIGCHLD, libc::SIG_DFL) };
unsafe {
libc::signal(libc::SIGCHLD, handler);
}
handler
}
let initial = read_sigchld();
let handle = PayloadHandle {
child: None,
payload: &TRUE_BIN,
checks: Vec::new(),
_sigchld: SigchldScope::new(),
};
assert_eq!(
read_sigchld(),
libc::SIG_DFL,
"SigchldScope::new should have installed SIG_DFL while \
the handle is alive",
);
drop(handle);
assert_eq!(
read_sigchld(),
initial,
"after dropping a real PayloadHandle, live SIGCHLD \
disposition must equal initial ({initial:#x}); if the \
handle's `_sigchld` field was removed, renamed, or \
retyped, the disposition stays at SIG_DFL ({:#x})",
libc::SIG_DFL,
);
}
}