use anyhow::{Result, anyhow};
use ktstr::assert::{AssertDetail, AssertResult, DetailKind};
use ktstr::ktstr_test;
use ktstr::scenario::Ctx;
use ktstr::scenario::payload_run::PayloadHandle;
use ktstr::test_support::{MetricCheck, OutputFormat, Payload, PayloadKind, PayloadMetrics};
use ktstr::worker_ready_wait::wait_for_worker_ready;
#[::ktstr::__private::ctor::ctor(crate_path = ::ktstr::__private::ctor)]
fn set_probe_binary_env_var() {
unsafe {
std::env::set_var(
"KTSTR_JEMALLOC_PROBE_BINARY",
env!("CARGO_BIN_EXE_ktstr-jemalloc-probe"),
);
std::env::set_var(
"KTSTR_JEMALLOC_ALLOC_WORKER_BINARY",
env!("CARGO_BIN_EXE_ktstr-jemalloc-alloc-worker"),
);
}
}
static JEMALLOC_PROBE: Payload = Payload::new(
"jemalloc_probe",
PayloadKind::Binary("ktstr-jemalloc-probe"),
OutputFormat::Json,
&[],
&[MetricCheck::ExitCodeEq(0)],
&[],
&[],
false,
None,
None,
);
static JEMALLOC_PROBE_NO_EXIT_CHECK: Payload = Payload::new(
"jemalloc_probe_no_exit_check",
PayloadKind::Binary("ktstr-jemalloc-probe"),
OutputFormat::Json,
&[],
&[],
&[],
&[],
false,
None,
None,
);
static JEMALLOC_ALLOC_WORKER: Payload = Payload::new(
"jemalloc_alloc_worker",
PayloadKind::Binary("ktstr-jemalloc-alloc-worker"),
OutputFormat::ExitCode,
&[],
&[],
&[],
&[],
false,
None,
None,
);
static JEMALLOC_ALLOC_WORKER_CHURN: Payload = Payload::new(
"jemalloc_alloc_worker_churn",
PayloadKind::Binary("ktstr-jemalloc-alloc-worker"),
OutputFormat::ExitCode,
&["--churn"],
&[],
&[],
&[],
false,
None,
None,
);
use ktstr::test_support::{
MAX_SCAN_INDEX, ThreadLookup, find_metric_u64, flat_metrics_dump, has_metric, lookup_thread,
snapshot_count, snapshot_worker_allocated, thread_count,
};
#[ktstr_test(llcs = 1, cores = 1, threads = 1)]
fn jemalloc_probe_external_target_observes_known_allocation(ctx: &Ctx) -> Result<AssertResult> {
const KNOWN_BYTES: u64 = 16 * 1024 * 1024;
const MAX_SLOP: u64 = 4 * 1024 * 1024;
const DEALLOC_CAP: u64 = 1024 * 1024;
let mut worker: PayloadHandle = ctx
.payload(&JEMALLOC_ALLOC_WORKER)
.arg(KNOWN_BYTES.to_string())
.spawn()?;
let worker_pid = worker
.pid()
.ok_or_else(|| anyhow!("worker PayloadHandle has no pid (child already consumed)"))?;
wait_for_worker_ready(
&mut worker,
worker_pid,
std::time::Duration::from_secs(5),
"worker",
"2=bytes==0, 3=/proc/self/task thread count != 1, \
4=ready-marker write failed, 5=argument parse failed, \
6=/proc/self/task unreadable, 101=Rust panic, \
negative=killed by signal",
)?;
let run_outcome = ctx
.payload(&JEMALLOC_PROBE)
.arg("--pid")
.arg(worker_pid.to_string())
.arg("--json")
.run();
let _ = worker.kill();
let (_assert, metrics) = run_outcome?;
let n_threads = thread_count(&metrics);
if n_threads < 1 {
return Ok(AssertResult::fail_msg(format!(
"probe saw n_threads={n_threads} for worker pid={worker_pid}; \
probe must emit at least one thread entry — bailed before \
per-thread iteration or filtered out every tid"
)));
}
let worker_tid = worker_pid as i32;
let (allocated, deallocated) = match lookup_thread(&metrics, worker_tid) {
ThreadLookup::Found {
allocated_bytes,
deallocated_bytes,
} => (allocated_bytes, deallocated_bytes),
ThreadLookup::MissingAllocatedBytes => {
return Err(anyhow!(
"probe JSON has threads entry for tid={worker_tid} but no \
allocated_bytes (n_threads={n_threads}); probe likely emitted \
an error record in place of the counter fields"
));
}
ThreadLookup::TidAbsent => {
return Err(anyhow!(
"probe JSON has no snapshots.0.threads.N.tid == {worker_tid} entry despite \
n_threads={n_threads} — the probe emitted some tids but none \
matched worker_pid, tid-identity is broken. Flat metrics: {:?}",
flat_metrics_dump(&metrics),
));
}
ThreadLookup::ExceedsCap => {
return Err(anyhow!(
"probe JSON emitted at least {cap} contiguous snapshots.0.threads.N.tid \
entries without matching tid={worker_tid}; scan hit the safety cap before \
reaching the array terminator. n_threads={n_threads}. Either the target \
is unexpectedly wide (raise the cap if legitimate) or the flat-metric \
schema changed and the terminator convention no longer holds.",
cap = MAX_SCAN_INDEX,
));
}
};
if allocated < KNOWN_BYTES {
return Ok(AssertResult::fail_msg(format!(
"worker (tid={worker_tid}) allocated_bytes={allocated}, expected >= {KNOWN_BYTES}"
)));
}
if allocated > KNOWN_BYTES + MAX_SLOP {
return Ok(AssertResult::fail_msg(format!(
"worker (tid={worker_tid}) allocated_bytes={allocated} exceeds known={KNOWN_BYTES} \
+ slop={MAX_SLOP}; probe may be reading the wrong address"
)));
}
match deallocated {
Some(d) if d >= DEALLOC_CAP => {
return Ok(AssertResult::fail_msg(format!(
"worker (tid={worker_tid}) deallocated_bytes={d} exceeds cap={DEALLOC_CAP}; \
worker should hold its Vec until kill — unexpected free implied"
)));
}
_ => {}
}
let mut result = AssertResult::pass();
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"jemalloc_probe_external_target: n_threads={n_threads} for \
worker pid={worker_pid} (expected 1 for single-threaded worker; \
>1 indicates jemalloc or a future dep spawned a helper thread)"
),
));
Ok(result)
}
#[ktstr_test(llcs = 1, cores = 1, threads = 1)]
fn jemalloc_probe_fatal_on_nonexistent_pid(ctx: &Ctx) -> Result<AssertResult> {
let fake_pid: i32 = 999_999_999;
let (_assert, metrics) = ctx
.payload(&JEMALLOC_PROBE_NO_EXIT_CHECK)
.arg("--pid")
.arg(fake_pid.to_string())
.arg("--json")
.run()?;
if metrics.exit_code != 1 {
return Ok(AssertResult::fail_msg(format!(
"probe exit_code={} against nonexistent pid {fake_pid}; \
expected 1 (RunOutcome::Fatal arm). Negative = signal-kill \
crash; 0 = unexpected success; other = unknown failure mode",
metrics.exit_code,
)));
}
if !metrics.metrics.is_empty() {
let names: Vec<&str> = metrics.metrics.iter().map(|m| m.name.as_str()).collect();
return Ok(AssertResult::fail_msg(format!(
"probe against nonexistent pid {fake_pid} emitted {} metric(s) \
{names:?}; Fatal arm should exit via stderr before \
print_output() populates ProbeOutput, leaving the metric \
list empty",
metrics.metrics.len(),
)));
}
Ok(AssertResult::pass())
}
#[ktstr_test(llcs = 1, cores = 2, threads = 2)]
fn jemalloc_probe_survives_thread_churn(ctx: &Ctx) -> Result<AssertResult> {
const KNOWN_BYTES: u64 = 1024 * 1024;
const INVOCATIONS: usize = 10;
let mut worker: PayloadHandle = ctx
.payload(&JEMALLOC_ALLOC_WORKER_CHURN)
.arg(KNOWN_BYTES.to_string())
.spawn()?;
let worker_pid = worker
.pid()
.ok_or_else(|| anyhow!("churn worker handle has no pid"))?;
wait_for_worker_ready(
&mut worker,
worker_pid,
std::time::Duration::from_secs(5),
"churn worker",
"2=bytes==0, 4=ready-marker write failed, \
5=argument parse failed, 101=Rust panic, negative=killed by signal",
)?;
let mut any_multi_thread_seen = false;
let mut error_invocations: u32 = 0;
for i in 0..INVOCATIONS {
let (_assert, metrics) = ctx
.payload(&JEMALLOC_PROBE_NO_EXIT_CHECK)
.arg("--pid")
.arg(worker_pid.to_string())
.arg("--json")
.run()?;
if metrics.exit_code < 0 {
let _ = worker.kill();
return Ok(AssertResult::fail_msg(format!(
"invocation {i}: probe died by signal (exit_code={}); \
ESRCH race should surface as ThreadResult::Err, not crash",
metrics.exit_code
)));
}
if metrics.exit_code != 0 {
let _ = worker.kill();
return Ok(AssertResult::fail_msg(format!(
"invocation {i}: probe exit_code={} — fatal error before per-thread loop; \
ESRCH stress test requires the probe to enter the tid iteration",
metrics.exit_code,
)));
}
if thread_count(&metrics) > 1 {
any_multi_thread_seen = true;
}
for j in 0..MAX_SCAN_INDEX {
if !has_metric(&metrics, &format!("snapshots.0.threads.{j}.tid")) {
break;
}
if !has_metric(
&metrics,
&format!("snapshots.0.threads.{j}.allocated_bytes"),
) {
error_invocations += 1;
break;
}
}
}
let _ = worker.kill();
if !any_multi_thread_seen {
return Ok(AssertResult::fail_msg(format!(
"none of {INVOCATIONS} probe invocations saw more than one thread — \
churn worker may not be producing tids fast enough to race the probe, \
or readdir(/proc/<pid>/task) is not observing the churn"
)));
}
let mut result = AssertResult::pass();
let message = if error_invocations > 0 {
format!(
"{error_invocations} of {INVOCATIONS} probe invocations observed \
ThreadResult::Err entries — ESRCH race window confirmed exercised"
)
} else {
format!(
"0 of {INVOCATIONS} invocations observed ThreadResult::Err entries — \
race window may not have been exercised (multi-thread view was \
visible, but no tid died mid-probe)"
)
};
result
.details
.push(AssertDetail::new(DetailKind::Other, message));
Ok(result)
}
fn snapshot_timestamp(metrics: &PayloadMetrics, snap_idx: usize) -> Option<u64> {
find_metric_u64(metrics, &format!("snapshots.{snap_idx}.timestamp_unix_sec"))
}
#[ktstr_test(llcs = 1, cores = 1, threads = 1)]
fn jemalloc_probe_multi_snapshot_monotone(ctx: &Ctx) -> Result<AssertResult> {
const KNOWN_BYTES: u64 = 16 * 1024 * 1024;
const SNAPSHOTS: usize = 3;
const INTERVAL_MS: u64 = 50;
const MAX_SLOP: u64 = 4 * 1024 * 1024;
let mut worker: PayloadHandle = ctx
.payload(&JEMALLOC_ALLOC_WORKER)
.arg(KNOWN_BYTES.to_string())
.spawn()?;
let worker_pid = worker
.pid()
.ok_or_else(|| anyhow!("worker PayloadHandle has no pid (child already consumed)"))?;
wait_for_worker_ready(
&mut worker,
worker_pid,
std::time::Duration::from_secs(5),
"worker",
"2=bytes==0, 3=/proc/self/task thread count != 1, \
4=ready-marker write failed, 5=argument parse failed, \
6=/proc/self/task unreadable, 101=Rust panic, \
negative=killed by signal",
)?;
let run_outcome = ctx
.payload(&JEMALLOC_PROBE)
.arg("--pid")
.arg(worker_pid.to_string())
.arg("--json")
.arg("--snapshots")
.arg(SNAPSHOTS.to_string())
.arg("--interval-ms")
.arg(INTERVAL_MS.to_string())
.run();
let _ = worker.kill();
let (_assert, metrics) = run_outcome?;
let n_snaps = snapshot_count(&metrics);
if n_snaps != SNAPSHOTS {
return Ok(AssertResult::fail_msg(format!(
"multi-snapshot probe emitted {n_snaps} snapshots, expected {SNAPSHOTS}; \
flat metrics: {:?}",
flat_metrics_dump(&metrics),
)));
}
let worker_tid = worker_pid as i32;
let mut timestamps: Vec<u64> = Vec::with_capacity(SNAPSHOTS);
let mut allocations: Vec<u64> = Vec::with_capacity(SNAPSHOTS);
for i in 0..SNAPSHOTS {
let ts = snapshot_timestamp(&metrics, i)
.ok_or_else(|| anyhow!("snapshots.{i}.timestamp_unix_sec missing from probe output"))?;
timestamps.push(ts);
let alloc = match snapshot_worker_allocated(&metrics, i, worker_tid) {
ThreadLookup::Found {
allocated_bytes, ..
} => allocated_bytes,
ThreadLookup::MissingAllocatedBytes => {
return Err(anyhow!(
"worker tid {worker_tid} present in snapshots.{i} but no \
allocated_bytes sibling — probe emitted an error record \
in place of the counter for this snapshot"
));
}
ThreadLookup::TidAbsent => {
return Err(anyhow!(
"worker tid {worker_tid} absent from snapshots.{i}; flat \
metrics: {:?}",
flat_metrics_dump(&metrics),
));
}
ThreadLookup::ExceedsCap => {
return Err(anyhow!(
"snapshots.{i} emitted at least {cap} contiguous tid \
entries without matching tid={worker_tid}; scan hit cap \
before terminator. Raise MAX_SCAN_INDEX if the \
target is legitimately this wide.",
cap = MAX_SCAN_INDEX,
));
}
};
allocations.push(alloc);
}
for i in 1..SNAPSHOTS {
if timestamps[i] < timestamps[i - 1] {
return Ok(AssertResult::fail_msg(format!(
"snapshot {i} timestamp {} is less than snapshot {} timestamp {}; \
CLOCK_REALTIME went backwards across snapshots",
timestamps[i],
i - 1,
timestamps[i - 1],
)));
}
}
for i in 1..SNAPSHOTS {
if allocations[i] < allocations[i - 1] {
return Ok(AssertResult::fail_msg(format!(
"snapshot {i} allocated_bytes={} < snapshot {} allocated_bytes={}; \
jemalloc cumulative counter must not decrease for a parked worker \
that holds its Vec",
allocations[i],
i - 1,
allocations[i - 1],
)));
}
}
for (i, a) in allocations.iter().enumerate() {
if *a < KNOWN_BYTES {
return Ok(AssertResult::fail_msg(format!(
"snapshot {i} allocated_bytes={a} is below known {KNOWN_BYTES} — \
probe may be reading the wrong address or the counter was not \
yet propagated to the TSD slot",
)));
}
if *a > KNOWN_BYTES + MAX_SLOP {
return Ok(AssertResult::fail_msg(format!(
"snapshot {i} allocated_bytes={a} exceeds known={KNOWN_BYTES} \
+ slop={MAX_SLOP}; probe may be reading the wrong address \
or the worker leaked extra allocations between snapshots",
)));
}
}
let started_at = find_metric_u64(&metrics, "started_at_unix_sec").ok_or_else(|| {
anyhow!(
"top-level started_at_unix_sec missing from probe output; \
flat metrics: {:?}",
flat_metrics_dump(&metrics),
)
})?;
if timestamps[0] < started_at {
return Ok(AssertResult::fail_msg(format!(
"snapshots.0.timestamp_unix_sec={} < top-level started_at_unix_sec={}; \
started_at must precede every snapshot timestamp",
timestamps[0], started_at,
)));
}
Ok(AssertResult::pass())
}