use anyhow::{Result, anyhow};
use ktstr::assert::{AssertDetail, AssertResult, DetailKind};
use ktstr::ktstr_test;
use ktstr::metric_types::Bytes;
use ktstr::scenario::Ctx;
use ktstr::scenario::payload_run::PayloadHandle;
use ktstr::test_support::{OutputFormat, Payload, PayloadKind};
use ktstr::worker_ready_wait::wait_for_worker_ready;
#[::ktstr::__private::ctor::ctor(crate_path = ::ktstr::__private::ctor)]
fn set_alloc_worker_binary_env_var() {
unsafe {
std::env::set_var(
"KTSTR_JEMALLOC_ALLOC_WORKER_BINARY",
env!("CARGO_BIN_EXE_ktstr-jemalloc-alloc-worker"),
);
}
}
static JEMALLOC_ALLOC_WORKER: Payload = Payload::new(
"jemalloc_alloc_worker",
PayloadKind::Binary("ktstr-jemalloc-alloc-worker"),
OutputFormat::ExitCode,
&[],
&[],
&[],
&[],
false,
None,
None,
);
static JEMALLOC_ALLOC_WORKER_CHURN: Payload = Payload::new(
"jemalloc_alloc_worker_churn",
PayloadKind::Binary("ktstr-jemalloc-alloc-worker"),
OutputFormat::ExitCode,
&["--churn"],
&[],
&[],
&[],
false,
None,
None,
);
const KNOWN_BYTES: u64 = 16 * 1024 * 1024;
const MAX_SLOP: u64 = 4 * 1024 * 1024;
const CHURN_KNOWN_BYTES: u64 = 1024 * 1024;
const READY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const KTHREADD_TGID: u32 = 2;
#[ktstr_test(llcs = 1, cores = 1, threads = 1)]
fn ctprof_capture_records_allocated_bytes_for_jemalloc_alloc_worker(
ctx: &Ctx,
) -> Result<AssertResult> {
let mut worker: PayloadHandle = ctx
.payload(&JEMALLOC_ALLOC_WORKER)
.arg(KNOWN_BYTES.to_string())
.spawn()?;
let worker_pid = worker
.pid()
.ok_or_else(|| anyhow!("alloc-worker handle has no pid (child already consumed)"))?;
wait_for_worker_ready(
&mut worker,
worker_pid,
READY_TIMEOUT,
"alloc-worker",
"2=bytes==0, 3=/proc/self/task thread count != 1, \
4=ready-marker write failed, 5=argument parse failed, \
6=/proc/self/task unreadable, 101=Rust panic, \
negative=killed by signal",
)?;
let snap = ktstr::ctprof::capture();
let _ = worker.kill();
let worker_threads: Vec<&ktstr::ctprof::ThreadState> = snap
.threads
.iter()
.filter(|t| t.tgid == worker_pid)
.collect();
if worker_threads.is_empty() {
return Ok(AssertResult::fail_msg(format!(
"ctprof::capture() did not see worker tgid={worker_pid} in \
its /proc walk; total threads in snapshot: {}",
snap.threads.len(),
)));
}
let allocated: u64 = worker_threads
.iter()
.map(|t| t.allocated_bytes.0)
.max()
.expect("worker_threads non-empty per the gate above");
let deallocated: u64 = worker_threads
.iter()
.map(|t| t.deallocated_bytes.0)
.max()
.expect("worker_threads non-empty per the gate above");
if allocated < KNOWN_BYTES {
return Ok(AssertResult::fail_msg(format!(
"worker (tgid={worker_pid}) allocated_bytes={allocated}, \
expected >= {KNOWN_BYTES}; threads in worker tgid: {}. \
Capture's attach_jemalloc either failed against the worker's \
ELF (DWARF missing, jemalloc-not-found) or the per-thread \
ptrace step failed (check ptrace_scope inside the guest).",
worker_threads.len(),
)));
}
if allocated > KNOWN_BYTES + MAX_SLOP {
return Ok(AssertResult::fail_msg(format!(
"worker allocated_bytes={allocated} exceeds known {KNOWN_BYTES} \
+ slop {MAX_SLOP}; capture may be reading the wrong address \
or the worker leaked extra allocations beyond the planted Vec",
)));
}
if deallocated >= KNOWN_BYTES {
return Ok(AssertResult::fail_msg(format!(
"worker deallocated_bytes={deallocated} >= KNOWN_BYTES \
({KNOWN_BYTES}); worker should not free its planted Vec \
before kill",
)));
}
let mut result = AssertResult::pass();
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"ctprof_capture_records_allocated_bytes: tgid={worker_pid}, \
threads_in_tgid={}, allocated={allocated}, deallocated={deallocated}",
worker_threads.len(),
),
));
Ok(result)
}
#[ktstr_test(llcs = 1, cores = 1, threads = 1)]
fn ctprof_capture_completes_against_bare_guest(_ctx: &Ctx) -> Result<AssertResult> {
let snap = ktstr::ctprof::capture();
if snap.threads.is_empty() {
return Ok(AssertResult::fail_msg(
"ctprof::capture() returned zero threads on a bare guest — \
/proc walk produced no entries, indicating the capture layer \
is not reading the guest's procfs successfully",
));
}
let kthreadd_threads: Vec<&ktstr::ctprof::ThreadState> = snap
.threads
.iter()
.filter(|t| t.tgid == KTHREADD_TGID)
.collect();
if kthreadd_threads.is_empty() {
return Ok(AssertResult::fail_msg(format!(
"kthreadd (tgid={KTHREADD_TGID}) absent from snapshot; \
total threads: {}, observed tgids preview: {}. \
Either the guest kernel skipped tgid=2 or the capture \
/proc walk filtered it out.",
snap.threads.len(),
tgids_dump(&snap),
)));
}
for t in &kthreadd_threads {
if t.allocated_bytes != Bytes(0) {
return Ok(AssertResult::fail_msg(format!(
"kthreadd tid={} carries allocated_bytes={}; kernel \
threads have no userspace heap, the absent-counter \
contract requires this to be 0",
t.tid, t.allocated_bytes,
)));
}
if t.deallocated_bytes != Bytes(0) {
return Ok(AssertResult::fail_msg(format!(
"kthreadd tid={} carries deallocated_bytes={}; expected 0",
t.tid, t.deallocated_bytes,
)));
}
}
let mut result = AssertResult::pass();
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"ctprof_capture_completes_against_bare_guest: \
kthreadd_threads={}, total_threads={}",
kthreadd_threads.len(),
snap.threads.len(),
),
));
Ok(result)
}
fn tgids_dump(snap: &ktstr::ctprof::CtprofSnapshot) -> String {
let tgids: std::collections::BTreeSet<u32> = snap.threads.iter().map(|t| t.tgid).collect();
let total = tgids.len();
let preview: Vec<u32> = tgids.into_iter().take(16).collect();
format!("{preview:?} (of {total} distinct tgids)")
}
#[ktstr_test(llcs = 1, cores = 2, threads = 2)]
fn ctprof_capture_against_churn_worker_does_not_panic(ctx: &Ctx) -> Result<AssertResult> {
let mut worker: PayloadHandle = ctx
.payload(&JEMALLOC_ALLOC_WORKER_CHURN)
.arg(CHURN_KNOWN_BYTES.to_string())
.spawn()?;
let worker_pid = worker
.pid()
.ok_or_else(|| anyhow!("churn worker handle has no pid"))?;
wait_for_worker_ready(
&mut worker,
worker_pid,
READY_TIMEOUT,
"churn alloc-worker",
"2=bytes==0, 4=ready-marker write failed, 5=argument parse failed, \
101=Rust panic, negative=killed by signal",
)?;
let snap = ktstr::ctprof::capture();
let _ = worker.kill();
let main_tid_present = snap
.threads
.iter()
.any(|t| t.tgid == worker_pid && t.tid == worker_pid);
if !main_tid_present {
let worker_thread_count = snap.threads.iter().filter(|t| t.tgid == worker_pid).count();
return Ok(AssertResult::fail_msg(format!(
"capture saw {worker_thread_count} threads under tgid={worker_pid} \
but none with tid={worker_pid} — the leader (main) thread \
is missing from the snapshot. The leader is long-lived, so \
its absence implies the capture pipeline filtered it out \
during the per-tid walk (likely an ESRCH race between \
iter_task_ids_at and the per-tid procfs reads, mis-\
classifying the leader as a ghost thread)",
)));
}
if snap.threads.is_empty() {
return Ok(AssertResult::fail_msg(
"capture against churn worker returned zero threads — \
the ESRCH race window appears to have aborted the \
entire /proc walk rather than collapsing per-tid",
));
}
let mut result = AssertResult::pass();
let main_alloc: u64 = snap
.threads
.iter()
.find(|t| t.tgid == worker_pid && t.tid == worker_pid)
.map(|t| t.allocated_bytes.0)
.unwrap_or(0);
result.details.push(AssertDetail::new(
DetailKind::Other,
format!(
"ctprof_capture_against_churn_worker: tgid={worker_pid}, \
total_threads={}, main_allocated_bytes={main_alloc}",
snap.threads.len(),
),
));
Ok(result)
}