#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::fs;
use std::io::IoSliceMut;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use gimli::{AttributeValue, EndianSlice, LittleEndian, Reader, Unit};
use goblin::elf::Elf;
use nix::sys::ptrace;
use nix::sys::ptrace::Options;
#[cfg(target_arch = "x86_64")]
use nix::sys::ptrace::regset::NT_PRSTATUS;
use nix::sys::signal::{SaFlags, SigAction, SigHandler, SigSet, Signal, sigaction, signal};
use nix::sys::uio::{RemoteIoVec, process_vm_readv};
use nix::sys::wait::{WaitStatus, waitpid};
use nix::unistd::Pid;
use serde::Serialize;
const SCHEMA_VERSION: u32 = 2;
fn now_unix_sec() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn self_pid() -> i32 {
libc::pid_t::try_from(std::process::id()).expect("Linux pid_max <= 2^22 so pid fits in pid_t")
}
fn format_comm_suffix(comm: Option<&str>) -> String {
comm.map(|c| format!(" comm={c}")).unwrap_or_default()
}
mod arch {
use super::*;
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
compile_error!(
"ktstr-jemalloc-probe supports only x86_64 and aarch64 targets; \
ptrace is same-arch and the TLS address math is arch-specific \
(Variant II on x86_64, Variant I on aarch64)"
);
#[cfg(target_arch = "x86_64")]
pub const EXPECTED_E_MACHINE: u16 = goblin::elf::header::EM_X86_64;
#[cfg(target_arch = "aarch64")]
pub const EXPECTED_E_MACHINE: u16 = goblin::elf::header::EM_AARCH64;
#[cfg(target_arch = "x86_64")]
pub const ARCH_NAME: &str = "x86_64";
#[cfg(target_arch = "aarch64")]
pub const ARCH_NAME: &str = "aarch64";
#[cfg(target_arch = "x86_64")]
pub const REGSET_NAME: &str = "NT_PRSTATUS";
#[cfg(target_arch = "aarch64")]
pub const REGSET_NAME: &str = "NT_ARM_TLS";
#[cfg(target_arch = "aarch64")]
pub const NT_ARM_TLS: libc::c_int = 0x401;
#[cfg(target_arch = "x86_64")]
pub fn read_thread_pointer_ptrace(pid: Pid) -> std::result::Result<u64, nix::errno::Errno> {
let regs = ptrace::getregset::<NT_PRSTATUS>(pid)?;
Ok(regs.fs_base)
}
#[cfg(target_arch = "aarch64")]
pub fn read_thread_pointer_ptrace(pid: Pid) -> std::result::Result<u64, nix::errno::Errno> {
let mut tpidr: u64 = 0;
let mut iov = libc::iovec {
iov_base: (&mut tpidr as *mut u64).cast::<libc::c_void>(),
iov_len: std::mem::size_of::<u64>(),
};
let res = unsafe {
libc::ptrace(
libc::PTRACE_GETREGSET,
pid.as_raw(),
NT_ARM_TLS as libc::c_long,
&mut iov as *mut libc::iovec,
)
};
if res == -1 {
return Err(nix::errno::Errno::last());
}
if iov.iov_len < std::mem::size_of::<u64>() {
return Err(nix::errno::Errno::EIO);
}
Ok(tpidr)
}
}
const TSD_TLS_SYMBOL_NAMES: &[&str] = &["tsd_tls", "je_tsd_tls", "_rjem_je_tsd_tls"];
const TSD_STRUCT_NAME: &str = "tsd_s";
macro_rules! tsd_mangle_prefix {
() => {
"cant_access_tsd_items_directly_use_a_getter_or_setter_"
};
}
const TSD_MANGLE_PREFIX: &str = tsd_mangle_prefix!();
const ALLOCATED_FIELD: &str = concat!(tsd_mangle_prefix!(), "thread_allocated");
const DEALLOCATED_FIELD: &str = concat!(tsd_mangle_prefix!(), "thread_deallocated");
#[derive(Parser, Debug)]
#[command(
name = "ktstr-jemalloc-probe",
version = env!("CARGO_PKG_VERSION"),
about = "Read per-thread jemalloc allocated/deallocated byte counters from a running process",
after_help = "\
EXAMPLES:
Single snapshot against a running pid:
ktstr-jemalloc-probe --pid 12345 --json
Multi-snapshot sampling — 5 snapshots at 200ms each (= 1s total):
ktstr-jemalloc-probe --pid 12345 --snapshots 5 --interval-ms 200 --json
Time-bounded run — take up to 10 snapshots at 500ms, self-abort after 3s:
ktstr-jemalloc-probe --pid 12345 --snapshots 10 --interval-ms 500 \\
--abort-after-ms 3000 --json
Enrich an existing ktstr sidecar with probe metrics:
ktstr-jemalloc-probe --pid 12345 --sidecar \\
target/ktstr/{kernel}-{project_commit}/{test}-{hash}.ktstr.json\
",
long_about = "Reads jemalloc's per-thread `thread_allocated` / `thread_deallocated` TLS \
counters out of a running process via ptrace + process_vm_readv. Counters are \
cumulative from thread creation — attaching late does not miss prior \
allocations. Requires CAP_SYS_PTRACE, root, or same-uid / descendant \
relationship under YAMA (kernel.yama.ptrace_scope). Supports Linux x86_64 \
and aarch64 (same-arch only) targets with a statically-linked jemalloc and \
DWARF debuginfo reachable from the target ELF — either inline on the binary \
carrying the jemalloc TLS symbol or as a paired external debug file \
discovered via .gnu_debuglink / NT_GNU_BUILD_ID (distro -dbg / -debuginfo \
packages under /usr/lib/debug).\n\n\
The `--enable-stats` jemalloc build flag is NOT required: `thread.allocated` / \
`thread.deallocated` use jemalloc's `CTL_RO_NL_GEN` (ungated) and the fast/slow \
path writes are unconditional.\n\n\
Sampling mode: pass `--snapshots N --interval-ms MS` to take N snapshots \
separated by MS milliseconds. Symbol resolution and tid enumeration run \
once; each snapshot attach/detaches per tid and threads are released during \
the inter-snapshot sleep so the workload is not held stopped across the \
run. The JSON output always carries a `snapshots` array — single snapshot \
is an array of length 1.\n\n\
Sidecar enrichment: pass `--sidecar PATH` to append probe metrics into an \
existing ktstr sidecar file. The file MUST exist — run the test first to \
generate it, then re-invoke with `--sidecar`.\n\n\
CI deadline: pass `--abort-after-ms MS` to self-abort after MS \
milliseconds, producing a partial ProbeOutput with interrupted: true \
instead of hanging indefinitely on a wedged snapshot loop."
)]
struct Cli {
#[arg(long, value_parser = clap::value_parser!(i32).range(1..))]
pid: i32,
#[arg(long)]
json: bool,
#[arg(long, value_name = "PATH")]
sidecar: Option<PathBuf>,
#[arg(
long,
default_value_t = 1,
value_parser = clap::value_parser!(u32).range(1..=100_000),
value_name = "N",
)]
snapshots: u32,
#[arg(
long,
value_parser = clap::value_parser!(u64).range(1..=3_600_000),
value_name = "MS",
)]
interval_ms: Option<u64>,
#[arg(
long,
value_parser = clap::value_parser!(u64).range(1..=3_600_000),
value_name = "MS",
)]
abort_after_ms: Option<u64>,
}
impl Cli {
fn validate_sampling_flags(&self) -> Result<()> {
if self.snapshots > 1 && self.interval_ms.is_none() {
bail!(
"--snapshots {} requires --interval-ms <MS>; multi-snapshot sampling \
needs an explicit inter-snapshot wait",
self.snapshots,
);
}
if self.snapshots == 1 && self.interval_ms.is_some() {
bail!(
"--interval-ms is only meaningful with --snapshots > 1; omit --interval-ms \
for a single-snapshot run",
);
}
Ok(())
}
}
#[derive(Debug, Serialize)]
struct ProbeOutput {
schema_version: u32,
pid: i32,
tool_version: &'static str,
started_at_unix_sec: u64,
#[serde(skip_serializing_if = "Option::is_none")]
interval_ms: Option<u64>,
interrupted: bool,
snapshots: Vec<Snapshot>,
}
#[derive(Debug, Serialize)]
struct Snapshot {
timestamp_unix_sec: u64,
elapsed_since_start_ns: u64,
threads: Vec<ThreadResult>,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
enum ThreadResult {
Ok {
tid: i32,
#[serde(skip_serializing_if = "Option::is_none")]
comm: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
start_time_jiffies: Option<u64>,
allocated_bytes: u64,
deallocated_bytes: u64,
},
Err {
tid: i32,
#[serde(skip_serializing_if = "Option::is_none")]
comm: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
start_time_jiffies: Option<u64>,
error: String,
error_kind: ThreadErrorKind,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, strum::EnumIter)]
#[serde(rename_all = "snake_case")]
enum ThreadErrorKind {
PtraceSeize,
PtraceInterrupt,
Waitpid,
GetRegset,
ProcessVmReadv,
TlsArithmetic,
}
impl std::fmt::Display for ThreadErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let token = match self {
Self::PtraceSeize => "ptrace_seize",
Self::PtraceInterrupt => "ptrace_interrupt",
Self::Waitpid => "waitpid",
Self::GetRegset => "get_regset",
Self::ProcessVmReadv => "process_vm_readv",
Self::TlsArithmetic => "tls_arithmetic",
};
f.write_str(token)
}
}
#[derive(Debug, Clone)]
pub(crate) struct TsdTlsSymbol {
pub elf_path: PathBuf,
pub st_value: u64,
pub tls_image_aligned_size: u64,
pub p_align: u64,
pub e_machine: u16,
}
pub(crate) fn find_tsd_tls(elf: &Elf<'_>, elf_path: &Path) -> Result<TsdTlsSymbol> {
let e_machine = elf.header.e_machine;
let (tls_image_aligned_size, p_align) = extract_pt_tls_layout(elf)?;
#[allow(clippy::type_complexity)]
let finders: [(&str, &dyn Fn(&str) -> Option<u64>); 2] = [
(".symtab", &|name| {
find_symbol_by_name(&elf.syms, &elf.strtab, name)
}),
(".dynsym", &|name| {
find_symbol_by_name(&elf.dynsyms, &elf.dynstrtab, name)
}),
];
for (_table_name, finder) in finders {
for name in TSD_TLS_SYMBOL_NAMES {
if let Some(st_value) = finder(name) {
return Ok(TsdTlsSymbol {
elf_path: elf_path.to_path_buf(),
st_value,
tls_image_aligned_size,
p_align,
e_machine,
});
}
}
}
Err(anyhow!(
"jemalloc TLS symbol ({}) not found in .symtab or .dynsym of {}",
TSD_TLS_SYMBOL_NAMES.join(" / "),
elf_path.display(),
))
}
fn find_symbol_by_name(
syms: &goblin::elf::Symtab<'_>,
strs: &goblin::strtab::Strtab<'_>,
needle: &str,
) -> Option<u64> {
for sym in syms.iter() {
if let Some(name) = strs.get_at(sym.st_name)
&& name == needle
{
return Some(sym.st_value);
}
}
None
}
fn round_up_pow2(value: u64, align: u64) -> Option<u64> {
let align = align.max(1);
value.checked_add(align - 1).map(|v| v & !(align - 1))
}
fn extract_pt_tls_layout(elf: &Elf<'_>) -> Result<(u64, u64)> {
let tls_hdr = elf
.program_headers
.iter()
.find(|ph| ph.p_type == goblin::elf::program_header::PT_TLS)
.ok_or_else(|| anyhow!("ELF has no PT_TLS segment — target does not use static TLS"))?;
debug_assert!(
tls_hdr.p_align == 0 || tls_hdr.p_align.is_power_of_two(),
"PT_TLS.p_align must be 0 or a power of two, got {}",
tls_hdr.p_align,
);
let align = tls_hdr.p_align.max(1);
let rounded = round_up_pow2(tls_hdr.p_memsz, align)
.ok_or_else(|| anyhow!("PT_TLS size arithmetic overflow"))?;
Ok((rounded, align))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct CounterOffsets {
thread_allocated: u64,
thread_deallocated: u64,
}
impl CounterOffsets {
pub fn new(thread_allocated: u64, thread_deallocated: u64) -> Result<Self> {
if thread_allocated >= thread_deallocated {
bail!(
"unexpected tsd_s layout: thread_allocated ({thread_allocated}) \
must precede thread_deallocated ({thread_deallocated}) per \
jemalloc TSD_DATA_FAST ordering",
);
}
Ok(Self {
thread_allocated,
thread_deallocated,
})
}
pub fn combined_read_span(&self) -> u64 {
self.thread_deallocated + 8 - self.thread_allocated
}
}
pub(crate) fn resolve_field_offsets(elf_path: &Path) -> Result<CounterOffsets> {
let data = fs::read(elf_path)
.with_context(|| format!("re-read {} for DWARF inspection", elf_path.display()))?;
let elf = Elf::parse(&data).with_context(|| format!("parse ELF {}", elf_path.display()))?;
if section_is_populated(&elf, &data, ".debug_info") {
return resolve_field_offsets_from_bytes(&data, elf_path);
}
let debuglink = read_gnu_debuglink(&elf, &data);
let build_id = read_build_id(&elf, &data);
let debuglink_name = debuglink.as_ref().map(|(n, _)| n.as_str());
let build_id_hex = build_id.as_deref();
let candidates = candidate_debuginfo_paths(elf_path, debuglink_name, build_id_hex);
if candidates.is_empty() {
anyhow::bail!(
"{} has no populated .debug_info and carries neither a \
.gnu_debuglink section nor an NT_GNU_BUILD_ID note — there \
is no pointer to external debuginfo. Rebuild the target \
with `-g`, ship a paired `.debug` file, or install the \
distro's -dbg / -debuginfo package.",
elf_path.display(),
);
}
let mut tried: Vec<String> = Vec::new();
for candidate in &candidates {
let debug_data = match fs::read(candidate) {
Ok(d) => d,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tried.push(format!("{} (not found)", candidate.display()));
continue;
}
Err(e) => {
tried.push(format!("{}: {e}", candidate.display()));
continue;
}
};
if let Some((_, expected_crc)) = debuglink.as_ref() {
let actual = crc32fast::hash(&debug_data);
if actual != *expected_crc {
tried.push(format!(
"{} (CRC mismatch: expected {:#010x}, got {:#010x})",
candidate.display(),
expected_crc,
actual,
));
continue;
}
}
return resolve_field_offsets_from_bytes(&debug_data, candidate);
}
anyhow::bail!(
"{} is stripped; searched for external debuginfo via \
debuglink={debuglink_name:?} build_id={build_id_hex:?} but \
no candidate was readable or CRC-matched. Tried: {}",
elf_path.display(),
tried.join("; "),
);
}
fn resolve_field_offsets_from_bytes(data: &[u8], source_path: &Path) -> Result<CounterOffsets> {
let elf = Elf::parse(data).with_context(|| format!("parse ELF {}", source_path.display()))?;
let load_section = |id: gimli::SectionId| -> Result<Cow<'_, [u8]>> {
let name = id.name();
for sh in &elf.section_headers {
if let Some(section_name) = elf.shdr_strtab.get_at(sh.sh_name)
&& section_name == name
{
let range = sh.file_range().unwrap_or(0..0);
return Ok(Cow::Borrowed(&data[range]));
}
}
Ok(Cow::Borrowed(&[]))
};
let dwarf_sections = gimli::DwarfSections::load(load_section)?;
let dwarf = dwarf_sections.borrow(|bytes| EndianSlice::new(bytes, LittleEndian));
let mut allocated: Option<u64> = None;
let mut deallocated: Option<u64> = None;
let mut units = dwarf.units();
while let Some(header) = units.next()? {
let unit = dwarf.unit(header)?;
if let Some((a, d)) = struct_member_offsets_in_unit(&dwarf, &unit)? {
if let Some(v) = a {
allocated.get_or_insert(v);
}
if let Some(v) = d {
deallocated.get_or_insert(v);
}
if allocated.is_some() && deallocated.is_some() {
break;
}
}
}
let allocated = allocated.ok_or_else(|| {
anyhow!(
"DWARF walk of {} did not find field '{}' in struct '{}' — \
target was built without -g, the jemalloc version renamed the field, \
or the TSD_MANGLE prefix ('{}') drifted",
source_path.display(),
ALLOCATED_FIELD,
TSD_STRUCT_NAME,
TSD_MANGLE_PREFIX,
)
})?;
let deallocated = deallocated.ok_or_else(|| {
anyhow!(
"DWARF walk of {} did not find field '{}' in struct '{}'",
source_path.display(),
DEALLOCATED_FIELD,
TSD_STRUCT_NAME,
)
})?;
CounterOffsets::new(allocated, deallocated)
}
fn section_is_populated(elf: &Elf, data: &[u8], name: &str) -> bool {
for sh in &elf.section_headers {
if let Some(n) = elf.shdr_strtab.get_at(sh.sh_name)
&& n == name
{
let range = sh.file_range().unwrap_or(0..0);
let len = data.get(range).map(<[u8]>::len).unwrap_or(0);
return len > 0;
}
}
false
}
fn read_gnu_debuglink(elf: &Elf, data: &[u8]) -> Option<(String, u32)> {
let section = find_section_slice(elf, data, ".gnu_debuglink")?;
let nul = section.iter().position(|&b| b == 0)?;
let name = std::str::from_utf8(§ion[..nul]).ok()?.to_owned();
let crc_offset = (nul + 1 + 3) & !3;
let crc_bytes = section.get(crc_offset..crc_offset + 4)?;
let crc = u32::from_le_bytes([crc_bytes[0], crc_bytes[1], crc_bytes[2], crc_bytes[3]]);
Some((name, crc))
}
fn read_build_id(elf: &Elf, data: &[u8]) -> Option<String> {
let iter = elf.iter_note_sections(data, Some("GNU"))?;
for note in iter.flatten() {
if note.n_type == goblin::elf::note::NT_GNU_BUILD_ID {
let mut hex = String::with_capacity(note.desc.len() * 2);
for b in note.desc {
use std::fmt::Write as _;
let _ = write!(&mut hex, "{b:02x}");
}
return Some(hex);
}
}
None
}
fn candidate_debuginfo_paths(
target_path: &Path,
debuglink_name: Option<&str>,
build_id_hex: Option<&str>,
) -> Vec<PathBuf> {
let mut out: Vec<PathBuf> = Vec::new();
if let Some(hex) = build_id_hex
&& hex.len() >= 2
{
let (prefix, rest) = hex.split_at(2);
out.push(PathBuf::from(format!(
"/usr/lib/debug/.build-id/{prefix}/{rest}.debug"
)));
}
if let (Some(name), Some(parent)) = (debuglink_name, target_path.parent()) {
out.push(parent.join(name));
out.push(parent.join(".debug").join(name));
if parent.is_absolute() {
let rel = parent.strip_prefix("/").unwrap_or(parent);
out.push(Path::new("/usr/lib/debug").join(rel).join(name));
}
}
out
}
fn find_section_slice<'a>(elf: &Elf, data: &'a [u8], name: &str) -> Option<&'a [u8]> {
for sh in &elf.section_headers {
if let Some(n) = elf.shdr_strtab.get_at(sh.sh_name)
&& n == name
{
let range = sh.file_range().unwrap_or(0..0);
return data.get(range);
}
}
None
}
#[allow(clippy::type_complexity)]
fn struct_member_offsets_in_unit<R: Reader>(
dwarf: &gimli::Dwarf<R>,
unit: &Unit<R>,
) -> Result<Option<(Option<u64>, Option<u64>)>> {
let mut entries = unit.entries();
while let Some((_, entry)) = entries.next_dfs()? {
if entry.tag() != gimli::DW_TAG_structure_type {
continue;
}
let name = match entry.attr_value(gimli::DW_AT_name)? {
Some(v) => v,
None => continue,
};
let name_str = dwarf.attr_string(unit, name)?;
if name_str.to_slice()?.as_ref() != TSD_STRUCT_NAME.as_bytes() {
continue;
}
let mut allocated: Option<u64> = None;
let mut deallocated: Option<u64> = None;
let mut depth = 1;
while let Some((delta, child)) = entries.next_dfs()? {
depth += delta;
if depth <= 0 {
break;
}
if depth != 2 {
continue;
}
if child.tag() != gimli::DW_TAG_member {
continue;
}
let child_name = match child.attr_value(gimli::DW_AT_name)? {
Some(v) => v,
None => continue,
};
let child_name_str = dwarf.attr_string(unit, child_name)?;
let bytes = child_name_str.to_slice()?;
let as_str = bytes.as_ref();
let is_allocated = as_str == ALLOCATED_FIELD.as_bytes();
let is_deallocated = as_str == DEALLOCATED_FIELD.as_bytes();
if !is_allocated && !is_deallocated {
continue;
}
let offset = member_offset(child.attr_value(gimli::DW_AT_data_member_location)?)?;
if is_allocated && allocated.is_none() {
allocated = offset;
}
if is_deallocated && deallocated.is_none() {
deallocated = offset;
}
if allocated.is_some() && deallocated.is_some() {
return Ok(Some((allocated, deallocated)));
}
}
return Ok(Some((allocated, deallocated)));
}
Ok(None)
}
fn member_offset<R: Reader>(attr: Option<AttributeValue<R>>) -> Result<Option<u64>> {
let Some(attr) = attr else { return Ok(None) };
match attr {
AttributeValue::Udata(v) => Ok(Some(v)),
AttributeValue::Data1(v) => Ok(Some(v as u64)),
AttributeValue::Data2(v) => Ok(Some(v as u64)),
AttributeValue::Data4(v) => Ok(Some(v as u64)),
AttributeValue::Data8(v) => Ok(Some(v)),
AttributeValue::Sdata(v) if v >= 0 => Ok(Some(v as u64)),
other => Err(anyhow!(
"unexpected DW_AT_data_member_location form: {:?} — \
DWARF expression forms are not supported for field-offset resolution in v1",
other
)),
}
}
#[allow(dead_code)] pub(crate) const TCB_SIZE_AARCH64: u64 = 16;
#[allow(dead_code)] pub(crate) fn compute_tls_address_variant_ii(
fs_base: u64,
tls_image_aligned_size: u64,
st_value: u64,
field_offset: u64,
) -> Result<u64> {
let image_base = fs_base.checked_sub(tls_image_aligned_size).ok_or_else(|| {
anyhow!(
"fs_base ({fs_base:#x}) is below the aligned TLS image size \
({tls_image_aligned_size:#x}); target likely has no static \
TLS initialized yet"
)
})?;
image_base
.checked_add(st_value)
.and_then(|v| v.checked_add(field_offset))
.ok_or_else(|| anyhow!("TLS address arithmetic overflow"))
}
#[allow(dead_code)] pub(crate) fn compute_tls_address_variant_i(
tpidr_el0: u64,
p_align: u64,
st_value: u64,
field_offset: u64,
) -> Result<u64> {
let image_offset = round_up_pow2(TCB_SIZE_AARCH64, p_align).ok_or_else(|| {
anyhow!(
"TLS image offset overflow: tcb={} align={p_align:#x}",
TCB_SIZE_AARCH64,
)
})?;
tpidr_el0
.checked_add(image_offset)
.and_then(|v| v.checked_add(st_value))
.and_then(|v| v.checked_add(field_offset))
.ok_or_else(|| anyhow!("TLS address arithmetic overflow"))
}
#[cfg(target_arch = "x86_64")]
pub(crate) fn compute_tls_address(
tp: u64,
tls_image_aligned_size: u64,
_p_align: u64,
st_value: u64,
field_offset: u64,
) -> Result<u64> {
compute_tls_address_variant_ii(tp, tls_image_aligned_size, st_value, field_offset)
}
#[cfg(target_arch = "aarch64")]
pub(crate) fn compute_tls_address(
tp: u64,
_tls_image_aligned_size: u64,
p_align: u64,
st_value: u64,
field_offset: u64,
) -> Result<u64> {
compute_tls_address_variant_i(tp, p_align, st_value, field_offset)
}
pub(crate) fn iter_task_ids(pid: i32) -> Result<Vec<i32>> {
let path = format!("/proc/{pid}/task");
let entries = fs::read_dir(&path).with_context(|| format!("read_dir {path}"))?;
let mut tids: Vec<i32> = entries
.filter_map(|e| e.ok())
.filter_map(|e| e.file_name().to_str().and_then(|s| s.parse().ok()))
.collect();
tids.sort_unstable();
Ok(tids)
}
pub(crate) fn find_jemalloc_via_maps(
pid: i32,
) -> std::result::Result<(TsdTlsSymbol, CounterOffsets), FatalError> {
let exe_link = format!("/proc/{pid}/exe");
let exe_path = fs::read_link(&exe_link).map_err(|e| {
FatalError::readlink_failure(anyhow::Error::from(e).context(format!(
"readlink {exe_link} (need it to gate static-TLS match)"
)))
})?;
let maps_path = format!("/proc/{pid}/maps");
let contents = fs::read_to_string(&maps_path).map_err(|e| {
FatalError::maps_read_failure(anyhow::Error::from(e).context(format!("read {maps_path}")))
})?;
let mut seen: BTreeSet<PathBuf> = BTreeSet::new();
let mut last_symbol_err: Option<anyhow::Error> = None;
for line in contents.lines() {
let Some(path) = parse_maps_elf_path(line) else {
continue;
};
if !seen.insert(path.clone()) {
continue;
}
let data = match fs::read(&path) {
Ok(d) => d,
Err(_) => continue,
};
let elf = match Elf::parse(&data) {
Ok(e) => e,
Err(_) => continue,
};
let symbol = match find_tsd_tls(&elf, &path) {
Ok(s) => s,
Err(e) => {
last_symbol_err = Some(e);
continue;
}
};
if path != exe_path {
return Err(FatalError::jemalloc_in_dso(anyhow!(
"jemalloc TLS symbol found in {} but static-TLS probe requires \
the match be in the main executable ({}); dynamic-TLS lookups \
in shared objects are not supported in v1. Remediation: relink \
the target to embed jemalloc statically (e.g. build against \
tikv-jemallocator-sys rather than a system libjemalloc.so), or \
wait for a future DTV-walking probe variant.",
path.display(),
exe_path.display(),
)));
}
if symbol.e_machine != arch::EXPECTED_E_MACHINE {
return Err(FatalError::arch_mismatch(anyhow!(
"probe is {}-only; target ELF {} is {} (e_machine={:#x}). \
Obtain or build a probe matching the target's architecture \
(ptrace is same-arch only — the probe and its target must \
share the same machine type).",
arch::ARCH_NAME,
symbol.elf_path.display(),
e_machine_name(symbol.e_machine),
symbol.e_machine,
)));
}
let offsets = resolve_field_offsets(&path).map_err(FatalError::dwarf_parse_failure)?;
return Ok((symbol, offsets));
}
let context = last_symbol_err
.map(|e| format!(" — last per-ELF error: {e}"))
.unwrap_or_default();
Err(FatalError::jemalloc_not_found(anyhow!(
"jemalloc TLS symbol ({}) not found in any r-x mapping under {}. \
Remediation: confirm the target is linked against a supported \
jemalloc build (tikv-jemallocator-sys, or a je_/unprefixed \
libjemalloc), rebuild against one of the accepted name prefixes, \
or extend TSD_TLS_SYMBOL_NAMES if you are carrying a new \
prefix.{}",
TSD_TLS_SYMBOL_NAMES.join(" / "),
maps_path,
context,
)))
}
pub(crate) fn e_machine_name(e_machine: u16) -> &'static str {
use goblin::elf::header::{EM_386, EM_AARCH64, EM_PPC64, EM_RISCV, EM_S390, EM_X86_64};
match e_machine {
EM_X86_64 => "x86_64",
EM_AARCH64 => "aarch64",
EM_386 => "i386",
EM_RISCV => "riscv",
EM_PPC64 => "ppc64",
EM_S390 => "s390",
_ => "unknown",
}
}
fn parse_maps_elf_path(line: &str) -> Option<PathBuf> {
let mut iter = line.split_whitespace();
let _range = iter.next()?;
let perms = iter.next()?;
if !perms.contains('x') {
return None;
}
let _offset = iter.next()?;
let _dev = iter.next()?;
let _inode = iter.next()?;
let path = iter.next()?;
if !path.starts_with('/') {
return None;
}
Some(PathBuf::from(path))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ThreadCounters {
pub allocated_bytes: u64,
pub deallocated_bytes: u64,
}
static ATTACHED: OnceLock<Mutex<BTreeSet<i32>>> = OnceLock::new();
static CLEANUP_REQUESTED: AtomicBool = AtomicBool::new(false);
fn attached() -> &'static Mutex<BTreeSet<i32>> {
ATTACHED.get_or_init(|| Mutex::new(BTreeSet::new()))
}
fn attached_lock() -> std::sync::MutexGuard<'static, BTreeSet<i32>> {
attached().lock().unwrap_or_else(|e| e.into_inner())
}
extern "C" fn on_sigint(_sig: i32) {
CLEANUP_REQUESTED.store(true, Ordering::SeqCst);
}
extern "C" fn on_sigalrm(_sig: i32) {
}
fn install_cleanup_handler() {
for sig in [Signal::SIGINT, Signal::SIGTERM] {
unsafe {
let _ = signal(sig, SigHandler::Handler(on_sigint));
}
}
let action = SigAction::new(
SigHandler::Handler(on_sigalrm),
SaFlags::empty(),
SigSet::empty(),
);
unsafe {
let _ = sigaction(Signal::SIGALRM, &action);
}
}
struct ThreadProbeError {
kind: ThreadErrorKind,
source: anyhow::Error,
}
impl ThreadProbeError {
fn new(kind: ThreadErrorKind, source: anyhow::Error) -> Self {
Self { kind, source }
}
fn ptrace_seize(tid: i32, e: nix::errno::Errno) -> Self {
let source = if e == nix::errno::Errno::EPERM {
anyhow!(
"ptrace(PTRACE_SEIZE) on tid {tid}: permission denied (EPERM). \
Grant access with one of: (1) run as root, (2) setcap \
cap_sys_ptrace+ep ktstr-jemalloc-probe, (3) run under the \
same uid as target, (4) set /proc/sys/kernel/yama/ptrace_scope=0 \
(requires root; affects system-wide ptrace policy)."
)
} else {
anyhow!("ptrace(PTRACE_SEIZE) on tid {tid}: {e}")
};
Self::new(ThreadErrorKind::PtraceSeize, source)
}
fn ptrace_interrupt(tid: i32, e: nix::errno::Errno) -> Self {
Self::new(
ThreadErrorKind::PtraceInterrupt,
anyhow!("ptrace(PTRACE_INTERRUPT) on tid {tid}: {e}"),
)
}
fn waitpid_unexpected(tid: i32, status: WaitStatus) -> Self {
Self::new(
ThreadErrorKind::Waitpid,
anyhow!("waitpid on tid {tid} returned unexpected status: {status:?}"),
)
}
fn waitpid_err(tid: i32, e: nix::errno::Errno) -> Self {
Self::new(
ThreadErrorKind::Waitpid,
anyhow!("waitpid on tid {tid}: {e}"),
)
}
fn getregset(tid: i32, e: nix::errno::Errno) -> Self {
Self::new(
ThreadErrorKind::GetRegset,
anyhow!(
"ptrace(PTRACE_GETREGSET, {}) on tid {tid}: {e}",
arch::REGSET_NAME,
),
)
}
fn tls_arithmetic(source: anyhow::Error) -> Self {
Self::new(ThreadErrorKind::TlsArithmetic, source)
}
fn process_vm_readv_err(tid: i32, addr: u64, e: nix::errno::Errno) -> Self {
Self::new(
ThreadErrorKind::ProcessVmReadv,
anyhow!("process_vm_readv on tid {tid} at {addr:#x}: {e}"),
)
}
fn process_vm_readv_short(tid: i32, n: usize, expected: u64) -> Self {
Self::new(
ThreadErrorKind::ProcessVmReadv,
anyhow!("short process_vm_readv on tid {tid}: got {n} bytes, expected {expected}"),
)
}
}
fn probe_single_thread(
tid: i32,
symbol: &TsdTlsSymbol,
offsets: &CounterOffsets,
cached_thread_pointer: Option<u64>,
) -> std::result::Result<(ThreadCounters, u64), ThreadProbeError> {
let pid = Pid::from_raw(tid);
let (thread_pointer, _attached_guard) = match cached_thread_pointer {
Some(tp) => (tp, None),
None => {
ptrace::seize(pid, Options::empty())
.map_err(|e| ThreadProbeError::ptrace_seize(tid, e))?;
let guard = ScopeDetach(tid);
attached_lock().insert(tid);
ptrace::interrupt(pid).map_err(|e| ThreadProbeError::ptrace_interrupt(tid, e))?;
match waitpid(pid, None) {
Ok(WaitStatus::Stopped(_, _) | WaitStatus::PtraceEvent(_, _, _)) => {}
Ok(other) => return Err(ThreadProbeError::waitpid_unexpected(tid, other)),
Err(e) => return Err(ThreadProbeError::waitpid_err(tid, e)),
}
let tp = arch::read_thread_pointer_ptrace(pid)
.map_err(|e| ThreadProbeError::getregset(tid, e))?;
(tp, Some(guard))
}
};
let addr = compute_tls_address(
thread_pointer,
symbol.tls_image_aligned_size,
symbol.p_align,
symbol.st_value,
offsets.thread_allocated,
)
.map_err(ThreadProbeError::tls_arithmetic)?;
let span = offsets.combined_read_span();
debug_assert!(
addr % 8 == 0,
"process_vm_readv remote base must be 8-byte aligned (jemalloc \
tsd_s.thread_allocated is a u64); got addr={addr:#x}",
);
let mut buf = vec![0u8; span as usize];
let remote = RemoteIoVec {
base: addr as usize,
len: span as usize,
};
let mut local = [IoSliceMut::new(&mut buf)];
let n = process_vm_readv(pid, &mut local, &[remote])
.map_err(|e| ThreadProbeError::process_vm_readv_err(tid, addr, e))?;
if n != span as usize {
return Err(ThreadProbeError::process_vm_readv_short(tid, n, span));
}
let allocated = u64::from_le_bytes(buf[0..8].try_into().unwrap());
let dealloc_offset = (offsets.thread_deallocated - offsets.thread_allocated) as usize;
let deallocated =
u64::from_le_bytes(buf[dealloc_offset..dealloc_offset + 8].try_into().unwrap());
Ok((
ThreadCounters {
allocated_bytes: allocated,
deallocated_bytes: deallocated,
},
thread_pointer,
))
}
fn read_thread_comm(pid: i32, tid: i32) -> Option<String> {
let path = format!("/proc/{pid}/task/{tid}/comm");
let raw = fs::read_to_string(path).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn read_thread_start_time(pid: i32, tid: i32) -> Option<u64> {
let path = format!("/proc/{pid}/task/{tid}/stat");
let raw = fs::read_to_string(path).ok()?;
parse_start_time_from_stat(&raw)
}
fn parse_start_time_from_stat(raw: &str) -> Option<u64> {
let line = raw.lines().next()?;
let last_close = line.rfind(')')?;
let tail = line.get(last_close + 1..)?;
let mut fields = tail.split_ascii_whitespace();
for _ in 0..19 {
fields.next()?;
}
fields.next()?.parse::<u64>().ok()
}
struct ScopeDetach(i32);
impl Drop for ScopeDetach {
fn drop(&mut self) {
let pid = Pid::from_raw(self.0);
let _ = ptrace::detach(pid, None);
attached_lock().remove(&self.0);
}
}
fn detach_all_attached() {
let tids: Vec<i32> = attached_lock().iter().copied().collect();
for tid in tids {
let _ = ptrace::detach(Pid::from_raw(tid), None);
attached_lock().remove(&tid);
}
}
enum RunOutcome {
Ok(ProbeOutput),
AllFailed(ProbeOutput),
Fatal(FatalError),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumIter)]
enum FatalKind {
PidMissing,
ExeIdentityChanged,
JemallocNotFound,
JemallocInDso,
ReadlinkFailure,
MapsReadFailure,
DwarfParseFailure,
ArchMismatch,
SelfProbeRejected,
TidEnumerationFailure,
Other,
}
impl FatalKind {
fn tag(self) -> &'static str {
match self {
Self::PidMissing => "pid-missing",
Self::ExeIdentityChanged => "exe-identity-changed",
Self::JemallocNotFound => "jemalloc-not-found",
Self::JemallocInDso => "jemalloc-in-dso",
Self::ReadlinkFailure => "readlink-failure",
Self::MapsReadFailure => "maps-read-failure",
Self::DwarfParseFailure => "dwarf-parse-failure",
Self::ArchMismatch => "arch-mismatch",
Self::SelfProbeRejected => "self-probe-rejected",
Self::TidEnumerationFailure => "tid-enumeration-failure",
Self::Other => "other",
}
}
}
struct FatalError {
kind: FatalKind,
error: anyhow::Error,
}
impl FatalError {
fn new(kind: FatalKind, error: anyhow::Error) -> Self {
Self { kind, error }
}
fn pid_missing(error: anyhow::Error) -> Self {
Self::new(FatalKind::PidMissing, error)
}
fn exe_identity_changed(error: anyhow::Error) -> Self {
Self::new(FatalKind::ExeIdentityChanged, error)
}
fn jemalloc_not_found(error: anyhow::Error) -> Self {
Self::new(FatalKind::JemallocNotFound, error)
}
fn jemalloc_in_dso(error: anyhow::Error) -> Self {
Self::new(FatalKind::JemallocInDso, error)
}
fn readlink_failure(error: anyhow::Error) -> Self {
Self::new(FatalKind::ReadlinkFailure, error)
}
fn maps_read_failure(error: anyhow::Error) -> Self {
Self::new(FatalKind::MapsReadFailure, error)
}
fn dwarf_parse_failure(error: anyhow::Error) -> Self {
Self::new(FatalKind::DwarfParseFailure, error)
}
fn arch_mismatch(error: anyhow::Error) -> Self {
Self::new(FatalKind::ArchMismatch, error)
}
fn self_probe_rejected(error: anyhow::Error) -> Self {
Self::new(FatalKind::SelfProbeRejected, error)
}
fn tid_enumeration_failure(error: anyhow::Error) -> Self {
Self::new(FatalKind::TidEnumerationFailure, error)
}
}
const CANCEL_POLL_TICK_MS: u64 = 10;
fn sleep_with_cancel(total_ms: u64) -> bool {
let start = std::time::Instant::now();
let deadline = start
.checked_add(std::time::Duration::from_millis(total_ms))
.unwrap_or(start);
loop {
if CLEANUP_REQUESTED.load(Ordering::SeqCst) {
return true;
}
let now = std::time::Instant::now();
if now >= deadline {
return false;
}
let remaining = deadline - now;
let tick = std::time::Duration::from_millis(CANCEL_POLL_TICK_MS);
std::thread::sleep(remaining.min(tick));
}
}
fn take_snapshot(
pid: i32,
symbol: &TsdTlsSymbol,
offsets: &CounterOffsets,
tids: &[i32],
run_start: std::time::Instant,
tp_cache: &mut std::collections::HashMap<i32, u64>,
) -> (Snapshot, bool) {
let timestamp_unix_sec = now_unix_sec();
let elapsed_since_start_ns = run_start.elapsed().as_nanos() as u64;
let live_tids: BTreeSet<i32> = tids.iter().copied().collect();
tp_cache.retain(|tid, _| live_tids.contains(tid));
let mut threads: Vec<ThreadResult> = Vec::with_capacity(tids.len());
let mut interrupted = false;
for &tid in tids {
if CLEANUP_REQUESTED.load(Ordering::SeqCst) {
detach_all_attached();
interrupted = true;
break;
}
let comm = read_thread_comm(pid, tid);
let start_time_jiffies = read_thread_start_time(pid, tid);
let cached_tp = tp_cache.get(&tid).copied();
match probe_single_thread(tid, symbol, offsets, cached_tp) {
Ok((c, observed_tp)) => {
tp_cache.insert(tid, observed_tp);
threads.push(ThreadResult::Ok {
tid,
comm,
start_time_jiffies,
allocated_bytes: c.allocated_bytes,
deallocated_bytes: c.deallocated_bytes,
});
}
Err(e) => threads.push(ThreadResult::Err {
tid,
comm,
start_time_jiffies,
error: format!("{:#}", e.source),
error_kind: e.kind,
}),
}
}
(
Snapshot {
timestamp_unix_sec,
elapsed_since_start_ns,
threads,
},
interrupted,
)
}
fn all_failed(threads: &[ThreadResult]) -> bool {
threads.is_empty()
|| threads
.iter()
.all(|t| matches!(t, ThreadResult::Err { .. }))
}
fn ensure_exe_identity_unchanged(
pid: i32,
baseline: &ExeIdentity,
context: &'static str,
) -> std::result::Result<(), anyhow::Error> {
match ExeIdentity::capture(pid) {
Ok(current) if current != *baseline => Err(anyhow!(
"target pid {pid} /proc/<pid>/exe changed {context} \
(captured dev={:#x} ino={}, now dev={:#x} ino={}); \
target execve'd or pid recycled, TLS offsets invalid",
baseline.dev,
baseline.ino,
current.dev,
current.ino,
)),
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct ExeIdentity {
dev: u64,
ino: u64,
}
impl ExeIdentity {
fn capture(pid: i32) -> Result<Self> {
use std::os::unix::fs::MetadataExt;
let path = format!("/proc/{pid}/exe");
let md = fs::metadata(&path).with_context(|| format!("stat {path}"))?;
Ok(Self {
dev: md.dev(),
ino: md.ino(),
})
}
}
fn run(cli: &Cli) -> RunOutcome {
let started_at_unix_sec = now_unix_sec();
let run_start = std::time::Instant::now();
let pid = cli.pid;
let self_pid = self_pid();
if pid == self_pid {
return RunOutcome::Fatal(FatalError::self_probe_rejected(anyhow!(
"refusing to probe self (pid {pid} == ktstr-jemalloc-probe's own pid). \
ptrace(PTRACE_SEIZE) rejects self-attach — a process cannot trace \
itself. Run the probe from a separate process against the target's pid."
)));
}
if !Path::new(&format!("/proc/{pid}")).exists() {
return RunOutcome::Fatal(FatalError::pid_missing(anyhow!("pid {pid} does not exist")));
}
let exe_identity = match ExeIdentity::capture(pid) {
Ok(v) => v,
Err(e) => return RunOutcome::Fatal(FatalError::pid_missing(e)),
};
let (symbol, offsets) = match find_jemalloc_via_maps(pid) {
Ok(v) => v,
Err(e) => return RunOutcome::Fatal(e),
};
if let Err(e) = ensure_exe_identity_unchanged(pid, &exe_identity, "during ELF/DWARF parse") {
return RunOutcome::Fatal(FatalError::exe_identity_changed(e));
}
let snapshot_count = cli.snapshots as usize;
let mut snapshots: Vec<Snapshot> = Vec::with_capacity(snapshot_count);
let mut interrupted = false;
let mut tp_cache: std::collections::HashMap<i32, u64> = std::collections::HashMap::new();
for i in 0..cli.snapshots {
if CLEANUP_REQUESTED.load(Ordering::SeqCst) {
interrupted = true;
break;
}
if i > 0
&& let Err(e) = ensure_exe_identity_unchanged(pid, &exe_identity, "between snapshots")
{
return RunOutcome::Fatal(FatalError::exe_identity_changed(e));
}
let tids = match iter_task_ids(pid) {
Ok(v) => v,
Err(e) => return RunOutcome::Fatal(FatalError::tid_enumeration_failure(e)),
};
let (snap, snap_interrupted) =
take_snapshot(pid, &symbol, &offsets, &tids, run_start, &mut tp_cache);
snapshots.push(snap);
if snap_interrupted {
interrupted = true;
break;
}
if i + 1 < cli.snapshots {
let interval_ms = cli
.interval_ms
.expect("interval_ms guaranteed Some for snapshots > 1 by validate_sampling_flags");
if sleep_with_cancel(interval_ms) {
interrupted = true;
break;
}
}
}
let out = ProbeOutput {
schema_version: SCHEMA_VERSION,
pid,
tool_version: env!("CARGO_PKG_VERSION"),
started_at_unix_sec,
interval_ms: cli.interval_ms,
interrupted,
snapshots,
};
if out.snapshots.is_empty() {
RunOutcome::Ok(out)
} else if multi_snapshot_all_failed(&out.snapshots) {
RunOutcome::AllFailed(out)
} else {
RunOutcome::Ok(out)
}
}
fn multi_snapshot_all_failed(snapshots: &[Snapshot]) -> bool {
snapshots.iter().all(|s| all_failed(&s.threads))
}
fn print_thread_result(t: &ThreadResult) {
match t {
ThreadResult::Ok {
tid,
comm,
allocated_bytes,
deallocated_bytes,
..
} => {
let comm_suffix = format_comm_suffix(comm.as_deref());
println!(
"tid={tid}{comm_suffix} allocated_bytes={allocated_bytes} deallocated_bytes={deallocated_bytes}",
);
}
ThreadResult::Err {
tid,
comm,
error,
error_kind,
..
} => {
let comm_suffix = format_comm_suffix(comm.as_deref());
eprintln!("warning: tid {tid}{comm_suffix} [{error_kind}]: {error}");
}
}
}
fn print_output(cli: &Cli, out: &ProbeOutput) -> Result<()> {
if cli.json {
let s = serde_json::to_string_pretty(out)?;
println!("{s}");
} else {
println!("pid={} tool_version={}", out.pid, out.tool_version);
let total = out.snapshots.len();
for (i, snap) in out.snapshots.iter().enumerate() {
println!(
"--- snapshot {n}/{total} @ {ts}s ---",
n = i + 1,
ts = snap.timestamp_unix_sec,
);
for t in &snap.threads {
print_thread_result(t);
}
}
}
Ok(())
}
const SIDECAR_METRIC_PREFIX: &str = "jemalloc_probe";
fn apply_probe_metric_hints(m: &mut ktstr::test_support::Metric) {
use ktstr::test_support::Polarity;
if m.name.ends_with(".allocated_bytes") || m.name.ends_with(".deallocated_bytes") {
m.polarity = Polarity::LowerBetter;
m.unit = "bytes".to_string();
}
}
fn synthesize_payload_metrics(
out: &ProbeOutput,
exit_code: i32,
payload_index: usize,
) -> Result<ktstr::test_support::PayloadMetrics> {
use ktstr::test_support::{MetricSource, MetricStream, PayloadMetrics, walk_json_leaves};
let value = serde_json::to_value(out)
.context("serialize ProbeOutput to serde_json::Value for sidecar append")?;
let mut metrics = walk_json_leaves(&value, MetricSource::Json, MetricStream::Stdout);
for m in &mut metrics {
m.name = format!("{SIDECAR_METRIC_PREFIX}.{}", m.name);
apply_probe_metric_hints(m);
}
Ok(PayloadMetrics {
payload_index,
metrics,
exit_code,
})
}
fn append_probe_output_to_sidecar(path: &Path, out: &ProbeOutput, exit_code: i32) -> Result<()> {
use ktstr::test_support::SidecarResult;
use rustix::fs::{FlockOperation, Mode, OFlags, flock, open};
let lock_path = path.with_extension({
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if ext.is_empty() {
"lock".to_string()
} else {
format!("{ext}.lock")
}
});
let lock_fd = open(
&lock_path,
OFlags::CREATE | OFlags::RDWR | OFlags::CLOEXEC,
Mode::from_raw_mode(0o600),
)
.with_context(|| format!("open lock file {}", lock_path.display()))?;
const FLOCK_BUDGET: std::time::Duration = std::time::Duration::from_secs(30);
const FLOCK_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
let deadline = std::time::Instant::now() + FLOCK_BUDGET;
loop {
if CLEANUP_REQUESTED.load(Ordering::SeqCst) {
bail!(
"sidecar append aborted by probe deadline (SIGINT / SIGTERM / --abort-after-ms) \
while waiting on flock(LOCK_EX) on {}",
lock_path.display(),
);
}
match flock(&lock_fd, FlockOperation::NonBlockingLockExclusive) {
Ok(()) => break,
Err(rustix::io::Errno::WOULDBLOCK) if std::time::Instant::now() < deadline => {
std::thread::sleep(FLOCK_RETRY_INTERVAL);
continue;
}
Err(rustix::io::Errno::WOULDBLOCK) => bail!(
"flock(LOCK_EX) on {} timed out after {:?} — another \
--sidecar writer holds the lock. Run `lslocks | grep {}` \
or `fuser {}` to identify the flock holder; if it is a \
wedged probe, kill it and re-run. This bounded wait \
replaces the old unbounded LOCK_EX that could hang CI \
indefinitely.",
lock_path.display(),
FLOCK_BUDGET,
lock_path.display(),
lock_path.display(),
),
Err(e) => {
return Err(anyhow::Error::from(e).context(format!(
"flock(LOCK_EX, non-blocking) on {}",
lock_path.display(),
)));
}
}
}
let existing = match fs::read_to_string(path) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => bail!(
"sidecar file not found at {}; run the test first to \
generate it, then re-invoke with --sidecar",
path.display(),
),
Err(e) => return Err(anyhow::Error::from(e).context(format!("read {}", path.display()))),
};
let mut sidecar: SidecarResult = serde_json::from_str(&existing).with_context(|| {
format!(
"parse {} as SidecarResult — sidecar may be from an incompatible \
schema version (pre-1.0 policy: regenerate, do not migrate)",
path.display(),
)
})?;
let payload_metrics = synthesize_payload_metrics(out, exit_code, sidecar.metrics.len())?;
sidecar.metrics.push(payload_metrics);
let serialized = serde_json::to_string_pretty(&sidecar)
.context("re-serialize SidecarResult after appending probe metrics")?;
let dir = path
.parent()
.ok_or_else(|| anyhow!("sidecar path {} has no parent directory", path.display()))?;
let mut tmp = tempfile::NamedTempFile::new_in(dir)
.with_context(|| format!("create staging file in {}", dir.display()))?;
std::io::Write::write_all(tmp.as_file_mut(), serialized.as_bytes())
.with_context(|| format!("write staging file in {}", dir.display()))?;
tmp.as_file()
.sync_all()
.with_context(|| format!("fsync staging file in {}", dir.display()))?;
tmp.persist(path)
.with_context(|| format!("atomic rename staging file into {}", path.display()))?;
let parent_dir = path.parent().unwrap_or(Path::new("."));
match std::fs::File::open(parent_dir) {
Ok(parent) => {
if let Err(e) = parent.sync_all() {
tracing::warn!(
dir = %parent_dir.display(),
err = %format!("{e:#}"),
"jemalloc_probe: parent-directory fsync failed after \
rename; the renamed sidecar is visible in the VFS but a \
concurrent crash could drop the directory-entry update \
from durable storage",
);
}
}
Err(e) => tracing::warn!(
dir = %parent_dir.display(),
err = %format!("{e:#}"),
"jemalloc_probe: could not open parent directory for fsync; \
the rename already committed but the directory entry has no \
explicit durability guarantee",
),
}
drop(lock_fd);
Ok(())
}
fn main() {
ktstr::cli::restore_sigpipe_default();
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
)
.with_writer(std::io::stderr)
.try_init();
install_cleanup_handler();
let cli = Cli::parse();
if let Err(e) = cli.validate_sampling_flags() {
eprintln!("error: {e:#}");
std::process::exit(2);
}
if let Some(path) = cli.sidecar.as_deref()
&& !path.exists()
{
eprintln!(
"error: sidecar file not found at {}; run the test \
first to generate it, then re-invoke with --sidecar",
path.display(),
);
std::process::exit(2);
}
if let Some(ms) = cli.abort_after_ms {
let main_tid = unsafe { libc::syscall(libc::SYS_gettid) } as libc::pid_t;
let main_pid = std::process::id() as libc::pid_t;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(ms));
eprintln!("ktstr-probe-deadline: abort after {ms}ms");
CLEANUP_REQUESTED.store(true, Ordering::SeqCst);
unsafe {
libc::syscall(libc::SYS_tgkill, main_pid, main_tid, libc::SIGALRM);
}
});
}
match run(&cli) {
RunOutcome::Ok(out) => {
if let Err(e) = print_output(&cli, &out) {
eprintln!("error writing output: {e:#}");
std::process::exit(1);
}
if let Some(path) = cli.sidecar.as_deref()
&& let Err(e) = append_probe_output_to_sidecar(path, &out, 0)
{
eprintln!("sidecar append failed (exit 3): {}: {e:#}", path.display());
std::process::exit(3);
}
}
RunOutcome::AllFailed(out) => {
let is_multi = cli.snapshots > 1;
let marker = if is_multi { "multi" } else { "single" };
if let Err(e) = print_output(&cli, &out) {
eprintln!("error writing output: {e:#}");
}
if let Some(path) = cli.sidecar.as_deref()
&& let Err(e) = append_probe_output_to_sidecar(path, &out, 1)
{
eprintln!("error appending to sidecar {}: {e:#}", path.display());
}
eprintln!("ktstr-probe-all-failed: {marker}");
eprintln!(
"error: all threads failed probe{}",
if is_multi { " in every snapshot" } else { "" },
);
detach_all_attached();
std::process::exit(1);
}
RunOutcome::Fatal(fatal) => {
eprintln!("ktstr-probe-fatal: {}", fatal.kind.tag());
eprintln!("error: {:#}", fatal.error);
detach_all_attached();
std::process::exit(1);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compute_tls_address_variant_ii_example() {
let fs_base = 0x7f12_3456_7000;
let aligned = 512; let st_value = 0x100; let field = 264; let addr = compute_tls_address_variant_ii(fs_base, aligned, st_value, field).unwrap();
assert_eq!(addr, 0x7f12_3456_7008);
}
#[test]
fn compute_tls_address_boundary_tp_equals_image_size() {
let addr =
compute_tls_address_variant_ii( 4096, 4096, 0, 0).unwrap();
assert_eq!(addr, 0);
}
#[test]
fn compute_tls_address_underflow_errors() {
let err = compute_tls_address_variant_ii(4096, 8192, 0, 0).unwrap_err();
assert!(
format!("{err}").contains("below the aligned TLS image size"),
"got: {err}",
);
}
#[test]
fn compute_tls_address_variant_i_example() {
let tpidr = 0x7f12_3456_7000;
let p_align = 16;
let st_value = 0x100;
let field = 264;
let addr = compute_tls_address_variant_i(tpidr, p_align, st_value, field).unwrap();
assert_eq!(addr, 0x7f12_3456_7218);
}
#[test]
fn compute_tls_address_variant_i_high_alignment() {
let addr = compute_tls_address_variant_i(0x1000, 64, 0, 0).unwrap();
assert_eq!(addr, 0x1040);
}
#[test]
fn compute_tls_address_variant_i_tcb_sized_alignment() {
let addr = compute_tls_address_variant_i(0x1000, TCB_SIZE_AARCH64, 0, 0).unwrap();
assert_eq!(addr, 0x1010);
}
#[test]
fn compute_tls_address_variant_i_sub_tcb_alignment() {
let addr = compute_tls_address_variant_i(0x1000, 8, 0, 0).unwrap();
assert_eq!(addr, 0x1010);
}
#[test]
fn compute_tls_address_variant_i_zero_align_clamped() {
let addr = compute_tls_address_variant_i(0x1000, 0, 0, 0).unwrap();
assert_eq!(addr, 0x1010);
}
#[test]
fn compute_tls_address_variant_i_overflow_errors() {
let err = compute_tls_address_variant_i(u64::MAX - 10, 16, 0x100, 0).unwrap_err();
assert!(
format!("{err}").contains("TLS address arithmetic overflow"),
"got: {err}",
);
}
#[test]
fn compute_tls_address_variant_i_image_offset_overflow_errors() {
let err = compute_tls_address_variant_i(0x1000, u64::MAX, 0, 0).unwrap_err();
assert!(
format!("{err}").contains("TLS image offset overflow"),
"expected image-offset overflow, got: {err}",
);
}
#[test]
fn compute_tls_address_dispatches_by_target_arch() {
let got = compute_tls_address(4096, 4096, 16, 0, 0).unwrap();
#[cfg(target_arch = "x86_64")]
assert_eq!(got, 0, "x86_64 must dispatch to Variant II");
#[cfg(target_arch = "aarch64")]
assert_eq!(got, 4112, "aarch64 must dispatch to Variant I");
}
#[test]
fn compute_tls_address_dispatches_positionally_distinct() {
let got = compute_tls_address(13_000_009, 1009, 64, 307, 83).unwrap();
#[cfg(target_arch = "x86_64")]
assert_eq!(got, 12_999_390, "x86_64 Variant II formula");
#[cfg(target_arch = "aarch64")]
assert_eq!(got, 13_000_463, "aarch64 Variant I formula");
}
#[test]
fn extract_pt_tls_layout_on_real_elf() {
let exe = std::env::current_exe().expect("current_exe");
let data = std::fs::read(&exe).expect("read current_exe");
let elf = goblin::elf::Elf::parse(&data).expect("parse current_exe");
let (rounded, align) =
extract_pt_tls_layout(&elf).expect("probe test binary must carry PT_TLS");
assert!(
align.is_power_of_two(),
"p_align {align} must be a power of two",
);
assert!(
rounded >= align,
"aligned_size {rounded} must be >= align {align}"
);
assert!(
rounded % align == 0,
"aligned_size {rounded} must be a multiple of align {align}",
);
}
#[test]
fn counter_offsets_combined_span_covers_both() {
let o = CounterOffsets::new(264, 280).unwrap();
let span = o.combined_read_span();
assert_eq!(span, 24, "8 (allocated) + 8 (fast_event) + 8 (deallocated)");
}
#[test]
fn counter_offsets_combined_span_adjacent() {
let o = CounterOffsets::new(100, 108).unwrap();
let span = o.combined_read_span();
assert_eq!(span, 16);
}
#[test]
fn counter_offsets_reject_reversed_order() {
let err = CounterOffsets::new(280, 264).unwrap_err();
assert!(
format!("{err}").contains("unexpected tsd_s layout"),
"got: {err}",
);
}
#[test]
fn counter_offsets_reject_equal_offsets() {
assert!(CounterOffsets::new(100, 100).is_err());
}
#[test]
fn e_machine_name_common_arches() {
use goblin::elf::header::{EM_386, EM_AARCH64, EM_X86_64};
assert_eq!(e_machine_name(EM_X86_64), "x86_64");
assert_eq!(e_machine_name(EM_AARCH64), "aarch64");
assert_eq!(e_machine_name(EM_386), "i386");
assert_eq!(e_machine_name(0xbeef), "unknown");
}
#[test]
fn parse_maps_elf_path_accepts_rx_only() {
let line = "5580e0001000-5580e0002000 r-xp 00000000 fd:01 12345 /usr/bin/ktstr";
assert_eq!(
parse_maps_elf_path(line),
Some(PathBuf::from("/usr/bin/ktstr"))
);
}
#[test]
fn parse_maps_elf_path_rejects_non_executable() {
let line = "5580e0002000-5580e0003000 rw-p 00001000 fd:01 12345 /usr/bin/ktstr";
assert!(parse_maps_elf_path(line).is_none());
}
#[test]
fn parse_maps_elf_path_rejects_anon_mapping() {
let line = "7f1234567000-7f1234568000 rw-p 00000000 00:00 0 ";
assert!(parse_maps_elf_path(line).is_none());
}
#[test]
fn parse_maps_elf_path_rejects_pseudo_paths() {
let line = "7ffc12345000-7ffc12367000 rw-p 00000000 00:00 0 [stack]";
assert!(parse_maps_elf_path(line).is_none());
}
#[test]
fn find_symbol_by_name_nothing_found() {
let tab: goblin::elf::Symtab<'_> = Default::default();
let strs = goblin::strtab::Strtab::default();
assert!(find_symbol_by_name(&tab, &strs, "tsd_tls").is_none());
}
#[test]
fn thread_result_json_shape() {
let ok = ThreadResult::Ok {
tid: 42,
comm: Some("worker-0".to_string()),
start_time_jiffies: None,
allocated_bytes: 1024,
deallocated_bytes: 512,
};
let ok_no_comm = ThreadResult::Ok {
tid: 44,
comm: None,
start_time_jiffies: None,
allocated_bytes: 2048,
deallocated_bytes: 1024,
};
let err = ThreadResult::Err {
tid: 43,
comm: None,
start_time_jiffies: None,
error: "thread exited before probe".to_string(),
error_kind: ThreadErrorKind::Waitpid,
};
let out = ProbeOutput {
schema_version: SCHEMA_VERSION,
pid: 100,
tool_version: "0.0.0",
started_at_unix_sec: 1_700_000_000,
interval_ms: None,
interrupted: false,
snapshots: vec![Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads: vec![ok, ok_no_comm, err],
}],
};
let s = serde_json::to_string(&out).unwrap();
assert!(s.contains("\"schema_version\":2"));
assert!(s.contains("\"pid\":100"));
assert!(s.contains("\"tool_version\":\"0.0.0\""));
assert!(s.contains("\"started_at_unix_sec\":1700000000"));
assert!(s.contains("\"timestamp_unix_sec\":1700000000"));
assert!(s.contains("\"interrupted\":false"));
assert!(s.contains("\"snapshots\":["));
assert!(s.contains("\"allocated_bytes\":1024"));
assert!(s.contains("\"deallocated_bytes\":512"));
assert!(s.contains("\"allocated_bytes\":2048"));
assert!(s.contains("\"deallocated_bytes\":1024"));
assert!(s.contains("\"comm\":\"worker-0\""));
assert!(s.contains("\"error\":\"thread exited before probe\""));
assert!(s.contains("\"error_kind\":\"waitpid\""));
assert!(s.contains("\"tid\":42"));
assert!(s.contains("\"tid\":43"));
assert!(s.contains("\"tid\":44"));
assert!(!s.contains("\"comm\":null"));
assert!(!s.contains("\"interval_ms\":null"));
}
fn expected_error_kind_token(k: ThreadErrorKind) -> &'static str {
match k {
ThreadErrorKind::PtraceSeize => "ptrace_seize",
ThreadErrorKind::PtraceInterrupt => "ptrace_interrupt",
ThreadErrorKind::Waitpid => "waitpid",
ThreadErrorKind::GetRegset => "get_regset",
ThreadErrorKind::ProcessVmReadv => "process_vm_readv",
ThreadErrorKind::TlsArithmetic => "tls_arithmetic",
}
}
#[test]
fn thread_error_kind_snake_case_serialization() {
use strum::IntoEnumIterator;
for k in ThreadErrorKind::iter() {
let s = serde_json::to_string(&k).unwrap();
assert_eq!(
s,
format!("\"{}\"", expected_error_kind_token(k)),
"variant {k:?}",
);
}
}
#[test]
fn iter_task_ids_self() {
let pid = self_pid();
let tids = iter_task_ids(pid).expect("self/task must be readable");
assert!(!tids.is_empty());
assert!(tids.windows(2).all(|w| w[0] <= w[1]), "tids must be sorted");
}
#[test]
fn pt_tls_round_up_arithmetic() {
fn round_up(memsz: u64, align: u64) -> u64 {
let align = align.max(1);
(memsz + (align - 1)) & !(align - 1)
}
assert_eq!(round_up(500, 16), 512);
assert_eq!(round_up(512, 16), 512);
assert_eq!(round_up(513, 16), 528);
assert_eq!(round_up(0, 1), 0);
}
#[test]
fn thread_error_kind_display_matches_serde_token() {
use strum::IntoEnumIterator;
for k in ThreadErrorKind::iter() {
let expected = expected_error_kind_token(k);
let json = serde_json::to_string(&k).unwrap();
let serde_token = json.trim_matches('"');
let display_token = format!("{k}");
assert_eq!(serde_token, expected, "serde token for {k:?}");
assert_eq!(display_token, expected, "Display token for {k:?}");
}
}
#[test]
fn run_rejects_self_probe() {
let cli = Cli {
pid: self_pid(),
json: false,
snapshots: 1,
interval_ms: None,
sidecar: None,
abort_after_ms: None,
};
match run(&cli) {
RunOutcome::Fatal(fatal) => {
let msg = format!("{:#}", fatal.error);
assert!(
msg.contains("refusing to probe self"),
"expected self-probe rejection wording, got: {msg}",
);
}
other => panic!(
"expected Fatal for pid==self_pid, got variant: {}",
match other {
RunOutcome::Ok(_) => "Ok",
RunOutcome::AllFailed(_) => "AllFailed",
RunOutcome::Fatal(_) => unreachable!(),
},
),
}
}
#[test]
fn run_accepts_non_self_pid() {
let mut child = std::process::Command::new("sleep")
.arg("30")
.spawn()
.expect("spawn sleep for non-self pid acceptance test");
let child_pid =
libc::pid_t::try_from(child.id()).expect("Linux pid_max <= 2^22 so pid fits in pid_t");
let self_pid = self_pid();
assert_ne!(
child_pid, self_pid,
"spawned child pid must differ from parent for this test to be meaningful",
);
let cli = Cli {
pid: child_pid,
json: false,
snapshots: 1,
interval_ms: None,
sidecar: None,
abort_after_ms: None,
};
let outcome = run(&cli);
let _ = child.kill();
let _ = child.wait();
if let RunOutcome::Fatal(fatal) = outcome {
let msg = format!("{:#}", fatal.error);
assert!(
!msg.contains("refusing to probe self"),
"self-probe gate must NOT fire for non-self pid {child_pid} (self={self_pid}), got: {msg}",
);
}
}
#[test]
fn ptrace_seize_eperm_renders_operator_hint() {
let err = ThreadProbeError::ptrace_seize(42, nix::errno::Errno::EPERM);
assert_eq!(err.kind, ThreadErrorKind::PtraceSeize);
let msg = format!("{}", err.source);
assert!(msg.contains("tid 42"), "got: {msg}");
assert!(msg.contains("permission denied"), "got: {msg}");
assert!(msg.contains("(1) run as root"), "got: {msg}");
assert!(msg.contains("(2) setcap"), "got: {msg}");
assert!(msg.contains("(3) run under the"), "got: {msg}");
assert!(
msg.contains("(4) set /proc/sys/kernel/yama/ptrace_scope=0"),
"got: {msg}"
);
}
#[test]
fn ptrace_seize_non_eperm_uses_generic_rendering() {
let err = ThreadProbeError::ptrace_seize(42, nix::errno::Errno::ESRCH);
assert_eq!(err.kind, ThreadErrorKind::PtraceSeize);
let msg = format!("{}", err.source);
assert!(msg.contains("ptrace(PTRACE_SEIZE) on tid 42"), "got: {msg}");
assert!(!msg.contains("permission denied"), "got: {msg}");
assert!(!msg.contains("yama"), "got: {msg}");
}
#[test]
fn ptrace_interrupt_formats_tid_and_errno() {
let err = ThreadProbeError::ptrace_interrupt(17, nix::errno::Errno::ESRCH);
assert_eq!(err.kind, ThreadErrorKind::PtraceInterrupt);
let msg = format!("{}", err.source);
assert!(
msg.contains("ptrace(PTRACE_INTERRUPT) on tid 17"),
"got: {msg}"
);
}
#[test]
fn waitpid_unexpected_records_status_debug() {
let status = WaitStatus::Exited(Pid::from_raw(99), 7);
let err = ThreadProbeError::waitpid_unexpected(99, status);
assert_eq!(err.kind, ThreadErrorKind::Waitpid);
let msg = format!("{}", err.source);
assert!(msg.contains("waitpid on tid 99"), "got: {msg}");
assert!(msg.contains("unexpected status"), "got: {msg}");
assert!(msg.contains("Exited"), "got: {msg}");
}
#[test]
fn waitpid_err_formats_tid_and_errno() {
let err = ThreadProbeError::waitpid_err(55, nix::errno::Errno::ECHILD);
assert_eq!(err.kind, ThreadErrorKind::Waitpid);
let msg = format!("{}", err.source);
assert!(msg.contains("waitpid on tid 55"), "got: {msg}");
}
#[test]
fn getregset_formats_tid_and_errno() {
let err = ThreadProbeError::getregset(88, nix::errno::Errno::ESRCH);
assert_eq!(err.kind, ThreadErrorKind::GetRegset);
let msg = format!("{}", err.source);
assert!(msg.contains("PTRACE_GETREGSET"), "got: {msg}");
assert!(
msg.contains(arch::REGSET_NAME),
"expected regset name {}, got: {msg}",
arch::REGSET_NAME,
);
assert!(msg.contains("tid 88"), "got: {msg}");
}
#[test]
fn tls_arithmetic_passes_through_source() {
let source = anyhow!("computed TLS address underflowed for fs_base=0x1000");
let err = ThreadProbeError::tls_arithmetic(source);
assert_eq!(err.kind, ThreadErrorKind::TlsArithmetic);
let msg = format!("{}", err.source);
assert!(msg.contains("underflowed"), "got: {msg}");
}
#[test]
fn process_vm_readv_err_renders_address_hex() {
let err =
ThreadProbeError::process_vm_readv_err(123, 0xdeadbeef, nix::errno::Errno::EFAULT);
assert_eq!(err.kind, ThreadErrorKind::ProcessVmReadv);
let msg = format!("{}", err.source);
assert!(msg.contains("tid 123"), "got: {msg}");
assert!(msg.contains("0xdeadbeef"), "got: {msg}");
}
#[test]
fn process_vm_readv_short_records_got_and_expected() {
let err = ThreadProbeError::process_vm_readv_short(200, 12, 24);
assert_eq!(err.kind, ThreadErrorKind::ProcessVmReadv);
let msg = format!("{}", err.source);
assert!(
msg.contains("short process_vm_readv on tid 200"),
"got: {msg}"
);
assert!(msg.contains("got 12 bytes"), "got: {msg}");
assert!(msg.contains("expected 24"), "got: {msg}");
}
#[test]
fn cli_default_sampling_count_is_one() {
let cli = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid", "42"]).unwrap();
assert_eq!(cli.snapshots, 1);
assert!(cli.interval_ms.is_none());
assert!(cli.validate_sampling_flags().is_ok());
}
#[test]
fn cli_explicit_count_one_without_interval_accepted() {
let cli = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid", "42", "--snapshots", "1"])
.unwrap();
assert_eq!(cli.snapshots, 1);
assert!(cli.interval_ms.is_none());
assert!(cli.validate_sampling_flags().is_ok());
}
#[test]
fn cli_multi_snapshot_accepts_count_and_interval() {
let cli = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--snapshots",
"3",
"--interval-ms",
"50",
])
.unwrap();
assert_eq!(cli.snapshots, 3);
assert_eq!(cli.interval_ms, Some(50));
assert!(cli.validate_sampling_flags().is_ok());
}
#[test]
fn cli_count_zero_rejected() {
let err = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid", "42", "--snapshots", "0"])
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("0 is not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_snapshots_upper_bound_rejected() {
let err = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--snapshots",
"100001",
])
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_interval_zero_rejected() {
let err = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--snapshots",
"2",
"--interval-ms",
"0",
])
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("0 is not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_abort_after_ms_defaults_none() {
let cli = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid", "42"]).unwrap();
assert!(cli.abort_after_ms.is_none());
}
#[test]
fn cli_abort_after_ms_accepts_positive_value() {
let cli = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--abort-after-ms",
"500",
])
.unwrap();
assert_eq!(cli.abort_after_ms, Some(500));
}
#[test]
fn cli_abort_after_ms_lower_boundary_accepted() {
let cli = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--abort-after-ms",
"1",
])
.unwrap();
assert_eq!(cli.abort_after_ms, Some(1));
}
#[test]
fn cli_abort_after_ms_upper_boundary_accepted() {
let cli = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--abort-after-ms",
"3600000",
])
.unwrap();
assert_eq!(cli.abort_after_ms, Some(3_600_000));
}
#[test]
fn cli_abort_after_ms_zero_rejected() {
let err = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--abort-after-ms",
"0",
])
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("0 is not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_abort_after_ms_upper_bound_rejected() {
let err = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--abort-after-ms",
"3600001",
])
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn sleep_with_cancel_observes_deadline_thread_flip() {
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
let start = std::time::Instant::now();
let flipper = std::thread::spawn(|| {
std::thread::sleep(std::time::Duration::from_millis(50));
CLEANUP_REQUESTED.store(true, Ordering::SeqCst);
});
let cancelled = sleep_with_cancel(10_000);
let elapsed = start.elapsed();
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
let _ = flipper.join();
assert!(
cancelled,
"deadline thread set the flag at 50ms; sleep must report cancelled",
);
assert!(
elapsed < std::time::Duration::from_millis(500),
"sleep should return within a poll tick of the flag flip; got {elapsed:?}",
);
assert!(
elapsed >= std::time::Duration::from_millis(30),
"sleep returned before the deadline thread could flip the flag; got {elapsed:?}",
);
}
#[test]
fn cli_interval_upper_bound_rejected() {
let err = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--snapshots",
"2",
"--interval-ms",
"3600001",
])
.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_pid_zero_rejected() {
let err = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid", "0"]).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("0 is not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_pid_negative_rejected() {
let err = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid=-1"]).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("not in") || msg.contains("invalid value"),
"expected clap range-rejection message, got: {msg}",
);
}
#[test]
fn cli_count_greater_than_one_requires_interval() {
let cli = Cli::try_parse_from(["ktstr-jemalloc-probe", "--pid", "42", "--snapshots", "3"])
.unwrap();
let err = cli.validate_sampling_flags().unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("requires --interval-ms"), "got: {msg}");
}
#[test]
fn cli_interval_requires_count_greater_than_one() {
let cli = Cli::try_parse_from([
"ktstr-jemalloc-probe",
"--pid",
"42",
"--interval-ms",
"100",
])
.unwrap();
let err = cli.validate_sampling_flags().unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("only meaningful with --snapshots > 1"),
"got: {msg}",
);
}
#[test]
fn sleep_with_cancel_completes_without_flag_set() {
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
let start = std::time::Instant::now();
let cancelled = sleep_with_cancel(25);
let elapsed = start.elapsed();
assert!(
!cancelled,
"sleep should not report cancellation when flag stays clear"
);
assert!(
elapsed >= std::time::Duration::from_millis(20),
"sleep returned too fast: {elapsed:?}",
);
}
#[test]
fn sleep_with_cancel_observes_pre_set_flag() {
CLEANUP_REQUESTED.store(true, Ordering::SeqCst);
let start = std::time::Instant::now();
let cancelled = sleep_with_cancel(10_000);
let elapsed = start.elapsed();
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
assert!(cancelled, "pre-set flag must cause immediate cancel");
assert!(
elapsed < std::time::Duration::from_millis(500),
"cancel path should return within a poll tick, got: {elapsed:?}",
);
}
#[test]
fn multi_snapshot_all_failed_classification() {
let err_thread = || ThreadResult::Err {
tid: 1,
comm: None,
start_time_jiffies: None,
error: "e".into(),
error_kind: ThreadErrorKind::PtraceSeize,
};
let ok_thread = || ThreadResult::Ok {
tid: 2,
comm: None,
start_time_jiffies: None,
allocated_bytes: 10,
deallocated_bytes: 0,
};
let snap = |threads: Vec<ThreadResult>| Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads,
};
let all_err = vec![
snap(vec![err_thread(), err_thread()]),
snap(vec![err_thread()]),
snap(vec![err_thread(), err_thread(), err_thread()]),
];
assert!(
multi_snapshot_all_failed(&all_err),
"every snapshot all-Err must classify as MultiAllFailed",
);
let mixed = vec![
snap(vec![err_thread()]),
snap(vec![err_thread(), ok_thread()]),
snap(vec![err_thread()]),
];
assert!(
!multi_snapshot_all_failed(&mixed),
"a single Ok in any snapshot must disqualify MultiAllFailed",
);
let empty_threads = vec![snap(vec![]), snap(vec![])];
assert!(
multi_snapshot_all_failed(&empty_threads),
"every snapshot's threads empty must classify as MultiAllFailed",
);
let empty_snapshots: &[Snapshot] = &[];
assert!(multi_snapshot_all_failed(empty_snapshots));
}
#[test]
fn all_failed_classification() {
assert!(all_failed(&[]), "empty threads vec is all-failed");
let only_err = vec![ThreadResult::Err {
tid: 1,
comm: None,
start_time_jiffies: None,
error: "e".into(),
error_kind: ThreadErrorKind::PtraceSeize,
}];
assert!(all_failed(&only_err));
let mixed = vec![
ThreadResult::Err {
tid: 1,
comm: None,
start_time_jiffies: None,
error: "e".into(),
error_kind: ThreadErrorKind::PtraceSeize,
},
ThreadResult::Ok {
tid: 2,
comm: None,
start_time_jiffies: None,
allocated_bytes: 10,
deallocated_bytes: 0,
},
];
assert!(!all_failed(&mixed));
}
#[test]
fn multi_snapshot_output_json_shape() {
let out = ProbeOutput {
schema_version: SCHEMA_VERSION,
pid: 777,
tool_version: "0.0.0",
started_at_unix_sec: 1_699_999_999,
interval_ms: Some(50),
interrupted: false,
snapshots: vec![
Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads: vec![ThreadResult::Ok {
tid: 42,
comm: Some("worker".to_string()),
start_time_jiffies: None,
allocated_bytes: 1024,
deallocated_bytes: 0,
}],
},
Snapshot {
timestamp_unix_sec: 1_700_000_001,
elapsed_since_start_ns: 0,
threads: vec![ThreadResult::Ok {
tid: 42,
comm: Some("worker".to_string()),
start_time_jiffies: None,
allocated_bytes: 2048,
deallocated_bytes: 0,
}],
},
],
};
let s = serde_json::to_string(&out).unwrap();
assert!(s.contains("\"schema_version\":2"));
assert!(s.contains("\"pid\":777"));
assert!(s.contains("\"started_at_unix_sec\":1699999999"));
assert!(s.contains("\"interval_ms\":50"));
assert!(s.contains("\"interrupted\":false"));
assert!(s.contains("\"snapshots\":["));
assert!(s.contains("\"timestamp_unix_sec\":1700000000"));
assert!(s.contains("\"timestamp_unix_sec\":1700000001"));
assert!(s.contains("\"allocated_bytes\":1024"));
assert!(s.contains("\"allocated_bytes\":2048"));
let v: serde_json::Value = serde_json::from_str(&s).unwrap();
assert!(
v.get("timestamp_unix_sec").is_none(),
"top-level timestamp_unix_sec must not appear on ProbeOutput: {s}",
);
assert!(
v.get("threads").is_none(),
"top-level threads must not appear on ProbeOutput: {s}",
);
assert!(
v.get("snapshots").is_some(),
"snapshots array required: {s}"
);
assert!(v.get("started_at_unix_sec").is_some());
assert!(v.get("interval_ms").is_some());
assert!(v.get("interrupted").is_some());
}
#[test]
fn single_snapshot_output_omits_interval_ms() {
let out = ProbeOutput {
schema_version: SCHEMA_VERSION,
pid: 555,
tool_version: "0.0.0",
started_at_unix_sec: 1_700_000_000,
interval_ms: None,
interrupted: false,
snapshots: vec![Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads: vec![ThreadResult::Ok {
tid: 99,
comm: None,
start_time_jiffies: None,
allocated_bytes: 10,
deallocated_bytes: 0,
}],
}],
};
let s = serde_json::to_string(&out).unwrap();
assert!(
!s.contains("\"interval_ms\""),
"interval_ms must be omitted when None: {s}"
);
let v: serde_json::Value = serde_json::from_str(&s).unwrap();
assert!(v.get("interval_ms").is_none());
let snaps = v.get("snapshots").and_then(|v| v.as_array()).unwrap();
assert_eq!(
snaps.len(),
1,
"single-snapshot must emit snapshots of length 1"
);
}
#[test]
fn exe_identity_stable_within_run() {
let pid = self_pid();
let a = ExeIdentity::capture(pid).expect("stat /proc/self/exe");
let b = ExeIdentity::capture(pid).expect("stat /proc/self/exe");
assert_eq!(
a, b,
"ExeIdentity must be stable across back-to-back captures"
);
}
#[test]
fn take_snapshot_interrupted_flag_truncates_threads_vec() {
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
CLEANUP_REQUESTED.store(true, Ordering::SeqCst);
let symbol = TsdTlsSymbol {
elf_path: std::path::PathBuf::from("/dummy"),
st_value: 0,
tls_image_aligned_size: 0,
p_align: 8,
e_machine: arch::EXPECTED_E_MACHINE,
};
let offsets = CounterOffsets::new(0, 8).expect("0 < 8 satisfies layout invariant");
let tids = vec![1, 2, 3];
let run_start = std::time::Instant::now();
let mut tp_cache = std::collections::HashMap::new();
let (snap, interrupted) = take_snapshot(
self_pid(),
&symbol,
&offsets,
&tids,
run_start,
&mut tp_cache,
);
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
assert!(interrupted, "pre-set flag must surface interrupted=true");
assert!(
snap.threads.is_empty(),
"truncated snapshot must carry no thread entries when flag is set \
before the first per-tid iteration; got {} entries",
snap.threads.len(),
);
assert!(
snap.elapsed_since_start_ns < 1_000_000_000,
"elapsed_since_start_ns must be populated sub-second on an \
immediately-interrupted snapshot; got {} ns",
snap.elapsed_since_start_ns,
);
}
#[test]
fn take_snapshot_flag_clear_completes_normally() {
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
let symbol = TsdTlsSymbol {
elf_path: std::path::PathBuf::from("/dummy"),
st_value: 0,
tls_image_aligned_size: 0,
p_align: 8,
e_machine: arch::EXPECTED_E_MACHINE,
};
let offsets = CounterOffsets::new(0, 8).expect("0 < 8 satisfies layout invariant");
let tids: Vec<i32> = vec![];
let run_start = std::time::Instant::now();
let mut tp_cache = std::collections::HashMap::new();
let (snap, interrupted) = take_snapshot(
self_pid(),
&symbol,
&offsets,
&tids,
run_start,
&mut tp_cache,
);
assert!(
!interrupted,
"clear flag + empty tids must not mark interrupted"
);
assert!(snap.threads.is_empty());
}
#[test]
fn ensure_exe_identity_unchanged_ok_on_match() {
let pid = self_pid();
let baseline = ExeIdentity::capture(pid).expect("stat /proc/self/exe");
ensure_exe_identity_unchanged(pid, &baseline, "test context")
.expect("identical baseline must pass");
}
#[test]
fn ensure_exe_identity_unchanged_errs_on_mismatch() {
let pid = self_pid();
let baseline = ExeIdentity {
dev: 0xDEAD_BEEF_DEAD_BEEF,
ino: 0xCAFE_BABE_CAFE_BABE,
};
let err = ensure_exe_identity_unchanged(pid, &baseline, "in unit test")
.expect_err("synthetic mismatch must produce Err");
let msg = format!("{err}");
assert!(
msg.contains("changed in unit test"),
"error must carry the context string; got: {msg}",
);
assert!(
msg.contains("dev=0xdeadbeefdeadbeef") || msg.contains("dev=0xdeadbeefDEADBEEF"),
"error must carry the baseline dev in hex; got: {msg}",
);
assert!(
msg.contains("TLS offsets invalid"),
"error must carry the downstream consequence so operators \
know the probe is bailing before reading garbage; got: {msg}",
);
}
#[test]
fn ensure_exe_identity_unchanged_error_wraps_into_run_outcome_fatal() {
let pid = self_pid();
let baseline = ExeIdentity { dev: 0, ino: 0 };
let err = ensure_exe_identity_unchanged(pid, &baseline, "between snapshots")
.expect_err("synthetic mismatch");
let outcome = RunOutcome::Fatal(FatalError::exe_identity_changed(err));
match outcome {
RunOutcome::Fatal(fatal) => {
assert_eq!(fatal.kind, FatalKind::ExeIdentityChanged);
let msg = format!("{}", fatal.error);
assert!(msg.contains("between snapshots"));
}
_ => panic!("expected RunOutcome::Fatal"),
}
}
#[test]
fn interrupted_true_serializes_as_json_true() {
let out = ProbeOutput {
schema_version: SCHEMA_VERSION,
pid: 321,
tool_version: "0.0.0",
started_at_unix_sec: 1_700_000_000,
interval_ms: Some(100),
interrupted: true,
snapshots: vec![Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads: Vec::new(),
}],
};
let s = serde_json::to_string(&out).unwrap();
assert!(
s.contains("\"interrupted\":true"),
"expected `\"interrupted\":true` literal, got: {s}",
);
let v: serde_json::Value = serde_json::from_str(&s).unwrap();
assert_eq!(v.get("interrupted").and_then(|b| b.as_bool()), Some(true));
}
fn minimal_sidecar_json() -> String {
let sc = ktstr::test_support::SidecarResult {
test_name: "t".to_string(),
topology: "1n1l1c1t".to_string(),
scheduler: "eevdf".to_string(),
scheduler_commit: None,
project_commit: None,
payload: None,
metrics: Vec::new(),
passed: true,
skipped: false,
stats: ktstr::assert::ScenarioStats::default(),
monitor: None,
stimulus_events: Vec::new(),
work_type: "CpuSpin".to_string(),
active_flags: Vec::new(),
verifier_stats: Vec::new(),
kvm_stats: None,
sysctls: Vec::new(),
kargs: Vec::new(),
kernel_version: None,
kernel_commit: None,
timestamp: String::new(),
run_id: String::new(),
host: None,
cleanup_duration_ms: None,
run_source: None,
};
serde_json::to_string_pretty(&sc).unwrap()
}
fn probe_output_fixture() -> ProbeOutput {
ProbeOutput {
schema_version: SCHEMA_VERSION,
pid: 42,
tool_version: "0.0.0",
started_at_unix_sec: 1_700_000_000,
interval_ms: None,
interrupted: false,
snapshots: vec![Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads: vec![ThreadResult::Ok {
tid: 42,
comm: Some("worker".to_string()),
start_time_jiffies: None,
allocated_bytes: 1024,
deallocated_bytes: 512,
}],
}],
}
}
#[test]
fn sidecar_append_happy_path() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("t-0000000000000000.ktstr.json");
std::fs::write(&path, minimal_sidecar_json()).unwrap();
let out = probe_output_fixture();
append_probe_output_to_sidecar(&path, &out, 0).expect("append happy path");
let re_read = std::fs::read_to_string(&path).unwrap();
let sc: ktstr::test_support::SidecarResult =
serde_json::from_str(&re_read).expect("sidecar re-parse");
assert_eq!(sc.test_name, "t");
assert_eq!(sc.topology, "1n1l1c1t");
assert_eq!(sc.scheduler, "eevdf");
assert!(sc.passed);
assert!(!sc.skipped);
assert_eq!(sc.metrics.len(), 1, "one appended PayloadMetrics");
let pm = &sc.metrics[0];
assert_eq!(pm.exit_code, 0);
for m in &pm.metrics {
assert!(
m.name.starts_with(&format!("{SIDECAR_METRIC_PREFIX}.")),
"metric name {} missing probe prefix",
m.name,
);
}
let alloc = pm
.metrics
.iter()
.find(|m| m.name.ends_with(".allocated_bytes"))
.expect("allocated_bytes metric in appended entry");
assert_eq!(alloc.value, 1024.0);
assert_eq!(alloc.unit, "bytes");
assert!(matches!(
alloc.polarity,
ktstr::test_support::Polarity::LowerBetter,
));
let tid = pm
.metrics
.iter()
.find(|m| m.name.ends_with(".tid"))
.expect("tid metric in appended entry");
assert!(matches!(
tid.polarity,
ktstr::test_support::Polarity::Unknown,
));
assert_eq!(tid.unit, "");
let lock_path = path.with_extension("json.lock");
assert!(
lock_path.exists(),
"expected lock file at {}",
lock_path.display(),
);
let orphans: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok().map(|e| e.path()))
.filter(|p| {
p.extension().and_then(|x| x.to_str()) == Some("tmp")
|| p.file_name()
.and_then(|n| n.to_str())
.is_some_and(|n| n.contains(".tmp"))
})
.collect();
assert!(
orphans.is_empty(),
"expected no staging tmp files after successful append, got: {orphans:?}",
);
}
#[test]
fn sidecar_append_stacks_across_invocations() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("t.ktstr.json");
std::fs::write(&path, minimal_sidecar_json()).unwrap();
let out = probe_output_fixture();
append_probe_output_to_sidecar(&path, &out, 0).unwrap();
append_probe_output_to_sidecar(&path, &out, 1).unwrap();
let sc: ktstr::test_support::SidecarResult =
serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
assert_eq!(sc.metrics.len(), 2, "both appends retained");
assert_eq!(sc.metrics[0].exit_code, 0);
assert_eq!(sc.metrics[1].exit_code, 1);
for (i, pm) in sc.metrics.iter().enumerate() {
for m in &pm.metrics {
assert!(
m.name.starts_with(&format!("{SIDECAR_METRIC_PREFIX}.")),
"append {i} metric {} missing probe prefix",
m.name,
);
}
}
}
#[test]
fn sidecar_append_preserves_prepopulated_metrics() {
use ktstr::test_support::{Metric, MetricSource, MetricStream, PayloadMetrics, Polarity};
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("t.ktstr.json");
let mut sc: ktstr::test_support::SidecarResult =
serde_json::from_str(&minimal_sidecar_json()).unwrap();
sc.metrics.push(PayloadMetrics {
payload_index: 0,
metrics: vec![Metric {
name: "primary.bogo_ops".to_string(),
value: 12345.0,
polarity: Polarity::HigherBetter,
unit: "ops".to_string(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
});
sc.metrics.push(PayloadMetrics {
payload_index: 1,
metrics: vec![Metric {
name: "secondary.latency_us".to_string(),
value: 42.0,
polarity: Polarity::LowerBetter,
unit: "us".to_string(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
}],
exit_code: 0,
});
std::fs::write(&path, serde_json::to_string_pretty(&sc).unwrap()).unwrap();
let out = probe_output_fixture();
append_probe_output_to_sidecar(&path, &out, 0).unwrap();
let after: ktstr::test_support::SidecarResult =
serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
assert_eq!(
after.metrics.len(),
3,
"expected 2 pre-existing + 1 appended"
);
assert_eq!(after.metrics[0].metrics[0].name, "primary.bogo_ops");
assert_eq!(after.metrics[0].metrics[0].value, 12345.0);
assert_eq!(after.metrics[1].metrics[0].name, "secondary.latency_us");
assert_eq!(after.metrics[1].metrics[0].value, 42.0);
for m in &after.metrics[2].metrics {
assert!(
m.name.starts_with(&format!("{SIDECAR_METRIC_PREFIX}.")),
"last entry's metric {} missing probe prefix",
m.name,
);
}
}
#[test]
fn sidecar_append_missing_file_errors() {
let dir = tempfile::tempdir().unwrap();
let missing = dir.path().join("does-not-exist.ktstr.json");
let out = probe_output_fixture();
let err = append_probe_output_to_sidecar(&missing, &out, 0).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("sidecar file not found"),
"expected missing-file wording, got: {msg}",
);
assert!(
msg.contains("run the test first"),
"expected operator-actionable hint, got: {msg}",
);
assert!(
msg.contains("--sidecar"),
"expected flag name in hint, got: {msg}",
);
}
#[test]
fn sidecar_append_malformed_json_errors() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("bad.ktstr.json");
std::fs::write(&path, "{ this is not valid json }").unwrap();
let out = probe_output_fixture();
let err = append_probe_output_to_sidecar(&path, &out, 0).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("parse"),
"expected parse-failure context, got: {msg}",
);
assert!(
msg.contains("regenerate"),
"expected pre-1.0 regenerate-policy hint, got: {msg}",
);
}
#[test]
fn sidecar_append_bails_when_cleanup_requested_preflock() {
struct FlagGuard;
impl Drop for FlagGuard {
fn drop(&mut self) {
CLEANUP_REQUESTED.store(false, Ordering::SeqCst);
}
}
let _guard = FlagGuard;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("pre-flock-bail.ktstr.json");
std::fs::write(&path, minimal_sidecar_json()).unwrap();
let out = probe_output_fixture();
CLEANUP_REQUESTED.store(true, Ordering::SeqCst);
let err = append_probe_output_to_sidecar(&path, &out, 0)
.expect_err("flock retry loop must bail when CLEANUP_REQUESTED is set");
let msg = format!("{err:#}");
assert!(
msg.contains("aborted by probe deadline"),
"expected deadline-abort bail message, got: {msg}",
);
assert!(
msg.contains("flock(LOCK_EX)"),
"bail message must name the flock phase so operators know which \
retry loop fired; got: {msg}",
);
let re_read = std::fs::read_to_string(&path).unwrap();
assert_eq!(
re_read,
minimal_sidecar_json(),
"sidecar contents must be unchanged when the flock gate fires",
);
}
#[test]
fn apply_probe_metric_hints_classifies_byte_counters() {
use ktstr::test_support::{Metric, MetricSource, MetricStream, Polarity};
let mut alloc = Metric {
name: "jemalloc_probe.snapshots.0.threads.0.allocated_bytes".to_string(),
value: 1024.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
};
apply_probe_metric_hints(&mut alloc);
assert!(matches!(alloc.polarity, Polarity::LowerBetter));
assert_eq!(alloc.unit, "bytes");
let mut dealloc = Metric {
name: "jemalloc_probe.snapshots.0.threads.0.deallocated_bytes".to_string(),
value: 512.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
};
apply_probe_metric_hints(&mut dealloc);
assert!(matches!(dealloc.polarity, Polarity::LowerBetter));
assert_eq!(dealloc.unit, "bytes");
let mut tid = Metric {
name: "jemalloc_probe.snapshots.0.threads.0.tid".to_string(),
value: 42.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
};
apply_probe_metric_hints(&mut tid);
assert!(matches!(tid.polarity, Polarity::Unknown));
assert_eq!(tid.unit, "");
let mut extra = Metric {
name: "jemalloc_probe.snapshots.0.threads.0.allocated_bytes_extra".to_string(),
value: 999.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
};
apply_probe_metric_hints(&mut extra);
assert!(
matches!(extra.polarity, Polarity::Unknown),
"name ending in _extra must not match the byte-counter hint",
);
assert_eq!(extra.unit, "");
let mut dextra = Metric {
name: "jemalloc_probe.deallocated_bytes_something".to_string(),
value: 0.0,
polarity: Polarity::Unknown,
unit: String::new(),
source: MetricSource::Json,
stream: MetricStream::Stdout,
};
apply_probe_metric_hints(&mut dextra);
assert!(matches!(dextra.polarity, Polarity::Unknown));
assert_eq!(dextra.unit, "");
}
#[test]
fn synthesize_payload_metrics_handles_ok_and_err_threads() {
let out = ProbeOutput {
schema_version: SCHEMA_VERSION,
pid: 1234,
tool_version: "0.0.0",
started_at_unix_sec: 1_700_000_000,
interval_ms: None,
interrupted: false,
snapshots: vec![Snapshot {
timestamp_unix_sec: 1_700_000_000,
elapsed_since_start_ns: 0,
threads: vec![
ThreadResult::Ok {
tid: 42,
comm: Some("ok-worker".to_string()),
start_time_jiffies: None,
allocated_bytes: 2048,
deallocated_bytes: 128,
},
ThreadResult::Err {
tid: 99,
comm: Some("err-worker".to_string()),
start_time_jiffies: None,
error: "ptrace(PTRACE_SEIZE): ESRCH".to_string(),
error_kind: ThreadErrorKind::PtraceSeize,
},
],
}],
};
let pm = synthesize_payload_metrics(&out, 7, 0).expect("synthesize");
assert_eq!(pm.exit_code, 7, "exit_code flows through");
assert_eq!(pm.payload_index, 0, "payload_index flows through");
for m in &pm.metrics {
assert!(
m.name.starts_with(&format!("{SIDECAR_METRIC_PREFIX}.")),
"metric {} missing probe prefix",
m.name,
);
}
for m in &pm.metrics {
assert!(
!m.name.ends_with(".error"),
"string `error` leaf must not surface, got: {}",
m.name,
);
assert!(
!m.name.ends_with(".error_kind"),
"string `error_kind` leaf must not surface, got: {}",
m.name,
);
}
let tid_values: Vec<f64> = pm
.metrics
.iter()
.filter(|m| m.name.ends_with(".tid"))
.map(|m| m.value)
.collect();
assert!(
tid_values.contains(&42.0),
"Ok thread's tid=42 must surface, got: {tid_values:?}",
);
assert!(
tid_values.contains(&99.0),
"Err thread's tid=99 must surface, got: {tid_values:?}",
);
let alloc_count = pm
.metrics
.iter()
.filter(|m| m.name.ends_with(".allocated_bytes"))
.count();
assert_eq!(alloc_count, 1, "one Ok thread emits one allocated_bytes");
}
#[test]
fn round_up_pow2_boundary_matrix() {
assert_eq!(round_up_pow2(0, 0), Some(0));
assert_eq!(round_up_pow2(0, 1), Some(0));
assert_eq!(round_up_pow2(u64::MAX, 1), Some(u64::MAX));
assert_eq!(round_up_pow2(u64::MAX, 2), None);
assert_eq!(round_up_pow2(7, 8), Some(8));
assert_eq!(round_up_pow2(8, 8), Some(8));
assert_eq!(round_up_pow2(9, 8), Some(16));
}
#[test]
fn fatal_kind_tag_strings_pinned() {
assert_eq!(FatalKind::PidMissing.tag(), "pid-missing");
assert_eq!(FatalKind::ExeIdentityChanged.tag(), "exe-identity-changed",);
assert_eq!(FatalKind::JemallocNotFound.tag(), "jemalloc-not-found");
assert_eq!(FatalKind::JemallocInDso.tag(), "jemalloc-in-dso");
assert_eq!(FatalKind::ReadlinkFailure.tag(), "readlink-failure");
assert_eq!(FatalKind::MapsReadFailure.tag(), "maps-read-failure");
assert_eq!(FatalKind::DwarfParseFailure.tag(), "dwarf-parse-failure");
assert_eq!(FatalKind::ArchMismatch.tag(), "arch-mismatch");
assert_eq!(FatalKind::SelfProbeRejected.tag(), "self-probe-rejected");
assert_eq!(
FatalKind::TidEnumerationFailure.tag(),
"tid-enumeration-failure",
);
assert_eq!(FatalKind::Other.tag(), "other");
}
#[test]
fn fatal_kind_exhaustiveness_compile_time_guard() {
use strum::IntoEnumIterator;
let mut count = 0;
for kind in FatalKind::iter() {
let tag = kind.tag();
assert!(
!tag.is_empty(),
"FatalKind::{kind:?}.tag() returned empty string"
);
assert!(
tag.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-'),
"FatalKind::{kind:?}.tag() = {tag:?} must be lowercase-kebab-case \
(only [a-z0-9-]) per the ktstr-probe-fatal: wire contract",
);
match kind {
FatalKind::PidMissing
| FatalKind::ExeIdentityChanged
| FatalKind::JemallocNotFound
| FatalKind::JemallocInDso
| FatalKind::ReadlinkFailure
| FatalKind::MapsReadFailure
| FatalKind::DwarfParseFailure
| FatalKind::ArchMismatch
| FatalKind::SelfProbeRejected
| FatalKind::TidEnumerationFailure
| FatalKind::Other => {}
}
count += 1;
}
assert_eq!(
count, 11,
"FatalKind::iter() must yield exactly the eleven variants pinned in \
`fatal_kind_tag_strings_pinned`; drift means either strum::EnumIter \
is broken or a new variant was added without updating the tag-string \
pin — fix by adding the new tag literal to `fatal_kind_tag_strings_pinned` \
and bumping this expected count",
);
}
#[test]
fn probe_single_thread_fast_path_skips_ptrace() {
use std::process::Command;
let mut child = Command::new("/bin/true").spawn().expect("spawn /bin/true");
let dead_tid = child.id() as i32;
let _ = child.wait();
let symbol = TsdTlsSymbol {
elf_path: std::path::PathBuf::from("/nonexistent"),
st_value: 0,
tls_image_aligned_size: 4096,
p_align: 8,
e_machine: arch::EXPECTED_E_MACHINE,
};
let offsets = CounterOffsets::new(0, 8).expect("valid offset pair");
let fake_tp: u64 = 0x1000;
let result = probe_single_thread(dead_tid, &symbol, &offsets, Some(fake_tp));
let err = result.expect_err("dead pid + Some(tp) must fail");
assert_eq!(
err.kind,
ThreadErrorKind::ProcessVmReadv,
"fast path must reach process_vm_readv, not bail at ptrace \
— seeing {kind:?} means the cached-tp arm is issuing ptrace \
when it shouldn't",
kind = err.kind,
);
}
#[test]
fn start_time_parser_handles_parens_in_comm() {
let mut s = String::from("1234 (a)b(c)) S");
for i in 0..18 {
s.push(' ');
s.push_str(&i.to_string());
}
s.push_str(" 987654321 rest of line ignored");
assert_eq!(parse_start_time_from_stat(&s), Some(987654321));
}
#[test]
fn start_time_parser_empty_input_returns_none() {
assert_eq!(parse_start_time_from_stat(""), None);
}
#[test]
fn start_time_parser_no_close_paren_returns_none() {
assert_eq!(
parse_start_time_from_stat("1234 comm_without_parens S 0 0 0 0"),
None,
);
}
#[test]
fn start_time_parser_nothing_after_close_paren_returns_none() {
assert_eq!(parse_start_time_from_stat("1234 (comm)"), None);
}
#[test]
fn start_time_parser_too_few_fields_returns_none() {
assert_eq!(
parse_start_time_from_stat("1234 (comm) S 1 2 3 4 5 6 7 8 9"),
None,
);
}
#[test]
fn start_time_parser_non_numeric_field_22_returns_none() {
let mut s = String::from("1234 (comm) S");
for i in 0..18 {
s.push(' ');
s.push_str(&i.to_string());
}
s.push_str(" not_a_number trailing garbage");
assert_eq!(parse_start_time_from_stat(&s), None);
}
#[test]
fn read_build_id_on_real_elf_returns_lowercase_hex() {
let exe = std::env::current_exe().expect("current_exe");
let data = std::fs::read(&exe).expect("read current_exe");
let elf = goblin::elf::Elf::parse(&data).expect("parse current_exe");
let Some(hex) = read_build_id(&elf, &data) else {
eprintln!(
"ktstr_test: SKIP read_build_id_on_real_elf_returns_lowercase_hex — \
current_exe ({}) carries no NT_GNU_BUILD_ID note; the host's linker \
(or a RUSTFLAGS override) does not emit one. Positive-path \
invariants are only testable when the note exists; negative-path \
coverage is in candidate_debuginfo_paths_* tests.",
exe.display(),
);
return;
};
assert!(!hex.is_empty(), "build-id hex must be non-empty");
assert_eq!(
hex,
hex.to_ascii_lowercase(),
"build-id must be rendered in lowercase hex per the probe's \
/usr/lib/debug/.build-id/<xx>/<rest>.debug lookup convention",
);
assert!(
hex.chars()
.all(|c| c.is_ascii_hexdigit() && (c.is_ascii_digit() || c.is_ascii_lowercase())),
"build-id must contain only ASCII hex digits [0-9a-f]; got {hex:?}",
);
}
#[test]
fn read_gnu_debuglink_on_inline_debug_elf_returns_none() {
let exe = std::env::current_exe().expect("current_exe");
let data = std::fs::read(&exe).expect("read current_exe");
let elf = goblin::elf::Elf::parse(&data).expect("parse current_exe");
assert!(
read_gnu_debuglink(&elf, &data).is_none(),
"test binary has inline .debug_info; .gnu_debuglink \
section must be absent and the parser must return None",
);
}
#[test]
fn candidate_debuginfo_paths_build_id_first_then_debuglink_then_debug_dir_then_lib_debug() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(
target,
Some("ktstr-test-target.debug"),
Some("abcdef0123456789"),
);
assert_eq!(
paths.len(),
4,
"with both debuglink + build-id and an absolute target path, \
the helper must emit 4 candidates: build-id first, then \
parent + parent/.debug + /usr/lib/debug-rooted; got {paths:?}",
);
assert_eq!(
paths[0],
PathBuf::from("/usr/lib/debug/.build-id/ab/cdef0123456789.debug"),
"build-id candidate must split after the first two hex chars \
(the distro convention documented in the helper's doc block)",
);
assert_eq!(paths[1], PathBuf::from("/usr/bin/ktstr-test-target.debug"),);
assert_eq!(
paths[2],
PathBuf::from("/usr/bin/.debug/ktstr-test-target.debug"),
);
assert_eq!(
paths[3],
PathBuf::from("/usr/lib/debug/usr/bin/ktstr-test-target.debug"),
);
}
#[test]
fn candidate_debuginfo_paths_returns_empty_when_no_hints() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(target, None, None);
assert!(
paths.is_empty(),
"no debuglink and no build-id means no candidates; \
got {paths:?}",
);
}
#[test]
fn candidate_debuginfo_paths_skips_short_build_id() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(
target,
Some("ktstr-test-target.debug"),
Some("a"), );
assert_eq!(
paths.len(),
3,
"short build-id must be skipped; debuglink paths still emit; \
got {paths:?}",
);
assert!(
!paths[0].to_string_lossy().contains("/.build-id/"),
"first candidate must be a debuglink path, not a build-id \
path with a degenerate split; got {:?}",
paths[0],
);
}
#[test]
fn candidate_debuginfo_paths_relative_target_skips_lib_debug_root() {
let target = Path::new("./ktstr-test-target");
let paths = candidate_debuginfo_paths(
target,
Some("ktstr-test-target.debug"),
Some("deadbeef12345678"),
);
assert_eq!(
paths.len(),
3,
"relative target must skip lib-debug root; got {paths:?}"
);
assert!(
!paths
.iter()
.any(|p| p.starts_with("/usr/lib/debug")
&& !p.to_string_lossy().contains(".build-id")),
"no /usr/lib/debug-rooted debuglink candidate when target \
parent is relative; got {paths:?}",
);
}
#[test]
fn test_elf_has_populated_debug_info_section_and_stt_func_symbols() {
use goblin::elf::sym;
let exe = std::env::current_exe().expect("current_exe");
let data = std::fs::read(&exe).expect("read current_exe");
let elf = goblin::elf::Elf::parse(&data).expect("parse current_exe");
assert!(
section_is_populated(&elf, &data, ".debug_info"),
"test binary must carry a populated .debug_info section; \
if this fails, the debuglink-discovery tests above are \
exercising the wrong code path",
);
let func_count = elf
.syms
.iter()
.filter(|s| s.st_type() == sym::STT_FUNC)
.count();
assert!(
func_count > 0,
"test binary must carry at least one STT_FUNC symbol in \
.symtab; a fully-stripped binary would have zero and \
silently invalidate symbol-resolution pin tests",
);
}
#[test]
fn candidate_debuginfo_paths_build_id_only() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(target, None, Some("abcdef0123456789"));
assert_eq!(
paths.len(),
1,
"build-id alone must emit exactly one candidate; got {paths:?}",
);
assert_eq!(
paths[0],
PathBuf::from("/usr/lib/debug/.build-id/ab/cdef0123456789.debug"),
);
}
#[test]
fn candidate_debuginfo_paths_debuglink_only() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(target, Some("ktstr-test-target.debug"), None);
assert_eq!(
paths.len(),
3,
"debuglink alone on an absolute target must emit exactly three \
candidates (parent, parent/.debug, /usr/lib/debug + strip-root); \
got {paths:?}",
);
assert_eq!(paths[0], PathBuf::from("/usr/bin/ktstr-test-target.debug"));
assert_eq!(
paths[1],
PathBuf::from("/usr/bin/.debug/ktstr-test-target.debug"),
);
assert_eq!(
paths[2],
PathBuf::from("/usr/lib/debug/usr/bin/ktstr-test-target.debug"),
);
assert!(
!paths
.iter()
.any(|p| p.to_string_lossy().contains(".build-id")),
"no build-id candidate must appear when build_id hint is None; \
got {paths:?}",
);
}
#[test]
fn candidate_debuginfo_paths_build_id_exactly_two_chars() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(target, None, Some("ab"));
assert_eq!(
paths.len(),
1,
"2-char build-id must be accepted (>= 2 gate) and produce one \
candidate; got {paths:?}",
);
assert_eq!(
paths[0],
PathBuf::from("/usr/lib/debug/.build-id/ab/.debug"),
"2-char build-id splits into prefix=\"ab\", rest=\"\", producing \
a degenerate-but-well-formed /usr/lib/debug/.build-id/ab/.debug \
path (empty hex body between the subdir and the .debug suffix)",
);
}
#[test]
fn candidate_debuginfo_paths_no_parent_skips_debuglink() {
let target = Path::new("/");
let paths =
candidate_debuginfo_paths(target, Some("orphan.debug"), Some("abcdef0123456789"));
assert_eq!(
paths.len(),
1,
"root-path target with no parent must skip debuglink candidates; \
build-id candidate still emits; got {paths:?}",
);
assert_eq!(
paths[0],
PathBuf::from("/usr/lib/debug/.build-id/ab/cdef0123456789.debug"),
);
let paths = candidate_debuginfo_paths(target, Some("orphan.debug"), None);
assert!(
paths.is_empty(),
"debuglink-only with no parent must produce zero candidates; \
got {paths:?}",
);
}
#[test]
fn candidate_debuginfo_paths_empty_build_id_skipped() {
let target = Path::new("/usr/bin/ktstr-test-target");
let paths = candidate_debuginfo_paths(
target,
Some("ktstr-test-target.debug"),
Some(""), );
assert_eq!(
paths.len(),
3,
"empty build-id must be skipped; debuglink paths still emit \
(3 on an absolute target); got {paths:?}",
);
assert!(
!paths
.iter()
.any(|p| p.to_string_lossy().contains(".build-id")),
"no build-id candidate must appear when hint is an empty string; \
got {paths:?}",
);
}
}