use std::io::Read;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::time::{Duration, Instant};
use ktstr::metric_types::Bytes;
const ALLOC_WORKER_BINARY: &str = env!("CARGO_BIN_EXE_ktstr-jemalloc-alloc-worker");
const KNOWN_BYTES: u64 = 16 * 1024 * 1024;
const MAX_SLOP: u64 = 4 * 1024 * 1024;
const READY_TIMEOUT: Duration = Duration::from_secs(10);
const READY_MARKER_OVERRIDE_ENV: &str = ktstr::worker_ready::WORKER_READY_MARKER_OVERRIDE_ENV;
struct WorkerGuard {
child: Option<Child>,
}
impl Drop for WorkerGuard {
fn drop(&mut self) {
if let Some(mut c) = self.child.take() {
let _ = c.kill();
let _ = c.wait();
}
}
}
fn ptrace_attach_allowed() -> bool {
let child = match Command::new("sleep")
.arg("1")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
{
Ok(c) => c,
Err(_) => return false,
};
let pid = child.id() as i32;
let nix_pid = nix::unistd::Pid::from_raw(pid);
let res = nix::sys::ptrace::seize(nix_pid, nix::sys::ptrace::Options::empty());
if res.is_ok() {
let _ = nix::sys::ptrace::detach(nix_pid, None);
}
let _ = nix::sys::wait::waitpid(nix_pid, Some(nix::sys::wait::WaitPidFlag::WNOHANG));
let mut child = child;
let _ = child.kill();
let _ = child.wait();
res.is_ok()
}
fn spawn_worker(bytes: u64, marker: &PathBuf) -> std::io::Result<Child> {
Command::new(ALLOC_WORKER_BINARY)
.arg(bytes.to_string())
.env(READY_MARKER_OVERRIDE_ENV, marker)
.env("MALLOC_CONF", "background_thread:false")
.env("_RJEM_MALLOC_CONF", "background_thread:false")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
}
fn wait_for_ready(child: &mut Child, marker: &PathBuf) -> Result<(), String> {
let deadline = Instant::now() + READY_TIMEOUT;
while Instant::now() < deadline {
if marker.exists() {
return Ok(());
}
if let Ok(Some(status)) = child.try_wait() {
let mut stderr = String::new();
if let Some(mut s) = child.stderr.take() {
let _ = s.read_to_string(&mut stderr);
}
return Err(format!(
"worker exited early with status {:?}; stderr: {}",
status, stderr
));
}
std::thread::sleep(Duration::from_millis(20));
}
Err(format!(
"ready marker {:?} did not appear within {:?}",
marker, READY_TIMEOUT,
))
}
#[test]
fn capture_populates_jemalloc_counters_for_alloc_worker() {
if !ptrace_attach_allowed() {
eprintln!(
"ctprof_capture_jemalloc_wiring: skipping — \
ptrace attach is denied by the kernel policy (likely \
yama.ptrace_scope >= 1 with no parent relationship). \
Set kernel.yama.ptrace_scope=0 or run the test from \
the worker's parent process tree."
);
return;
}
let tmp = tempfile::TempDir::new().expect("tempdir for ready marker");
let marker = tmp.path().join("ready");
let child = spawn_worker(KNOWN_BYTES, &marker)
.expect("alloc-worker should spawn (CARGO_BIN_EXE_ resolved at compile time)");
let mut guard = WorkerGuard { child: Some(child) };
let child_ref = guard.child.as_mut().expect("child handle present");
let worker_pid = child_ref.id() as i32;
if let Err(msg) = wait_for_ready(child_ref, &marker) {
panic!("worker did not signal ready: {}", msg);
}
let snap = ktstr::ctprof::capture_pid(worker_pid);
let worker_threads: Vec<_> = snap
.threads
.iter()
.filter(|t| t.tgid == worker_pid as u32)
.collect();
assert!(
!worker_threads.is_empty(),
"capture_pid() did not see worker tgid={worker_pid} in \
its /proc walk; total threads in snapshot: {}",
snap.threads.len(),
);
let allocated: u64 = worker_threads
.iter()
.map(|t| t.allocated_bytes.0)
.max()
.expect("worker_threads non-empty per assert above");
let deallocated: u64 = worker_threads
.iter()
.map(|t| t.deallocated_bytes.0)
.max()
.expect("worker_threads non-empty per assert above");
assert!(
allocated >= KNOWN_BYTES,
"expected worker allocated_bytes >= {KNOWN_BYTES}, \
got {allocated}; worker_pid={worker_pid}, threads in \
worker tgid: {}. The capture pipeline's attach_jemalloc \
either failed against the worker's ELF (DWARF missing, \
arch mismatch, jemalloc-not-found) or the per-thread \
ptrace step failed (check ptrace_scope / EPERM).",
worker_threads.len(),
);
assert!(
allocated <= KNOWN_BYTES + MAX_SLOP,
"worker allocated_bytes={allocated} exceeds known + slop \
({}); probe may be reading the wrong address or the \
worker leaked extra allocations beyond the planted Vec",
KNOWN_BYTES + MAX_SLOP,
);
assert!(
deallocated < KNOWN_BYTES,
"worker deallocated_bytes={deallocated} >= KNOWN_BYTES \
({KNOWN_BYTES}); worker should not free its planted Vec \
before kill",
);
}
#[test]
fn capture_pid_skips_self_attach_and_keeps_counters_zero() {
let self_pid = std::process::id() as i32;
let snap = ktstr::ctprof::capture_pid(self_pid);
let self_threads: Vec<_> = snap
.threads
.iter()
.filter(|t| t.tgid == self_pid as u32)
.collect();
assert!(
!self_threads.is_empty(),
"capture_pid() did not see self tgid={self_pid}; \
expected the test process's own tids in the /proc walk",
);
for t in &self_threads {
assert_eq!(
t.allocated_bytes,
Bytes(0),
"self-pid threads must carry allocated_bytes=0 — the \
pid==self_pid gate must keep attach_jemalloc from \
running against the calling process; got {} on tid {}",
t.allocated_bytes,
t.tid,
);
assert_eq!(
t.deallocated_bytes,
Bytes(0),
"self-pid threads must carry deallocated_bytes=0; \
got {} on tid {}",
t.deallocated_bytes,
t.tid,
);
}
}
#[test]
fn capture_pid_against_non_jemalloc_target_keeps_counters_zero_but_populates_procfs() {
let mut child = match Command::new("sleep")
.arg("3")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
{
Ok(c) => c,
Err(_) => {
eprintln!("skipping — /bin/sleep unavailable");
return;
}
};
std::thread::sleep(Duration::from_millis(50));
let pid = child.id() as i32;
let snap = ktstr::ctprof::capture_pid(pid);
let _ = child.kill();
let _ = child.wait();
let target_threads: Vec<_> = snap
.threads
.iter()
.filter(|t| t.tgid == pid as u32)
.collect();
assert!(
!target_threads.is_empty(),
"capture_pid did not see /bin/sleep tgid={pid} in its /proc walk; \
total threads in snapshot: {}",
snap.threads.len(),
);
for t in &target_threads {
assert_eq!(
t.allocated_bytes,
Bytes(0),
"non-jemalloc target must carry allocated_bytes=0 (attach \
returned JemallocNotFound, capture absorbed into absent-\
counter contract); got {} on tid {}",
t.allocated_bytes,
t.tid,
);
assert_eq!(
t.deallocated_bytes,
Bytes(0),
"non-jemalloc target must carry deallocated_bytes=0; \
got {} on tid {}",
t.deallocated_bytes,
t.tid,
);
assert_eq!(t.tgid, pid as u32);
assert!(
t.start_time_clock_ticks > 0,
"/proc/<pid>/stat field 22 must populate for a live target",
);
assert!(
!t.policy.0.is_empty(),
"scheduling policy must populate from procfs even when the \
jemalloc probe fails — proves the per-thread procfs path \
does not depend on probe success",
);
}
}