use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CtprofSnapshot {
pub captured_at_unix_ns: u64,
pub host: Option<crate::host_context::HostContext>,
pub threads: Vec<ThreadState>,
pub cgroup_stats: BTreeMap<String, CgroupStats>,
pub probe_summary: Option<CtprofProbeSummary>,
pub parse_summary: Option<CtprofParseSummary>,
pub taskstats_summary: Option<crate::taskstats::TaskstatsSummary>,
pub psi: Psi,
pub sched_ext: Option<SchedExtSysfs>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CtprofProbeSummary {
pub tgids_walked: u64,
pub jemalloc_detected: u64,
pub probed_ok: u64,
pub failed: u64,
pub dominant_failure: Option<String>,
pub privilege_dominant: bool,
}
impl CtprofProbeSummary {
pub fn remediation_hint(&self) -> Option<&'static str> {
if self.privilege_dominant {
Some(PTRACE_EPERM_HINT)
} else {
None
}
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CtprofParseSummary {
pub tids_walked: u64,
pub read_failures: u64,
pub read_failures_by_file: BTreeMap<String, u64>,
pub dominant_read_failure: Option<String>,
pub kernel_config_dominant: bool,
pub negative_dotted_values: u64,
}
impl CtprofParseSummary {
pub fn kernel_config_hint(&self) -> Option<&'static str> {
if self.kernel_config_dominant {
Some(PARSE_KCONFIG_HINT)
} else {
None
}
}
}
const PARSE_KCONFIG_HINT: &str = "hint: schedstat / io read failures dominate — \
kernel may be built without CONFIG_SCHEDSTATS \
and/or CONFIG_TASK_IO_ACCOUNTING";
fn default_state_char() -> char {
'~'
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct ThreadState {
pub tid: u32,
pub tgid: u32,
pub pcomm: String,
pub comm: String,
pub cgroup: String,
pub start_time_clock_ticks: u64,
pub policy: crate::metric_types::CategoricalString,
pub nice: crate::metric_types::OrdinalI32,
pub cpu_affinity: crate::metric_types::CpuSet,
pub processor: crate::metric_types::OrdinalI32,
#[serde(default = "default_state_char")]
pub state: char,
pub ext_enabled: bool,
pub run_time_ns: crate::metric_types::MonotonicNs,
pub wait_time_ns: crate::metric_types::MonotonicNs,
pub timeslices: crate::metric_types::MonotonicCount,
pub voluntary_csw: crate::metric_types::MonotonicCount,
pub nonvoluntary_csw: crate::metric_types::MonotonicCount,
pub nr_wakeups: crate::metric_types::MonotonicCount,
pub nr_wakeups_local: crate::metric_types::MonotonicCount,
pub nr_wakeups_remote: crate::metric_types::MonotonicCount,
pub nr_wakeups_sync: crate::metric_types::MonotonicCount,
pub nr_wakeups_migrate: crate::metric_types::MonotonicCount,
pub nr_wakeups_affine: crate::metric_types::MonotonicCount,
pub nr_wakeups_affine_attempts: crate::metric_types::MonotonicCount,
pub nr_migrations: crate::metric_types::MonotonicCount,
pub nr_forced_migrations: crate::metric_types::MonotonicCount,
pub nr_failed_migrations_affine: crate::metric_types::MonotonicCount,
pub nr_failed_migrations_running: crate::metric_types::MonotonicCount,
pub nr_failed_migrations_hot: crate::metric_types::MonotonicCount,
pub wait_sum: crate::metric_types::MonotonicNs,
pub wait_count: crate::metric_types::MonotonicCount,
pub wait_max: crate::metric_types::PeakNs,
pub voluntary_sleep_ns: crate::metric_types::MonotonicNs,
pub sleep_max: crate::metric_types::PeakNs,
pub block_sum: crate::metric_types::MonotonicNs,
pub block_max: crate::metric_types::PeakNs,
pub iowait_sum: crate::metric_types::MonotonicNs,
pub iowait_count: crate::metric_types::MonotonicCount,
pub exec_max: crate::metric_types::PeakNs,
pub slice_max: crate::metric_types::PeakNs,
pub allocated_bytes: crate::metric_types::Bytes,
pub deallocated_bytes: crate::metric_types::Bytes,
pub minflt: crate::metric_types::MonotonicCount,
pub majflt: crate::metric_types::MonotonicCount,
pub utime_clock_ticks: crate::metric_types::ClockTicks,
pub stime_clock_ticks: crate::metric_types::ClockTicks,
pub priority: crate::metric_types::OrdinalI32,
pub rt_priority: crate::metric_types::OrdinalU32,
pub core_forceidle_sum: crate::metric_types::MonotonicNs,
pub fair_slice_ns: crate::metric_types::GaugeNs,
pub nr_threads: crate::metric_types::GaugeCount,
pub smaps_rollup_kb: BTreeMap<String, u64>,
pub rchar: crate::metric_types::Bytes,
pub wchar: crate::metric_types::Bytes,
pub syscr: crate::metric_types::MonotonicCount,
pub syscw: crate::metric_types::MonotonicCount,
pub read_bytes: crate::metric_types::Bytes,
pub write_bytes: crate::metric_types::Bytes,
pub cancelled_write_bytes: crate::metric_types::Bytes,
pub cpu_delay_count: crate::metric_types::MonotonicCount,
pub cpu_delay_total_ns: crate::metric_types::MonotonicNs,
pub cpu_delay_max_ns: crate::metric_types::PeakNs,
pub cpu_delay_min_ns: crate::metric_types::PeakNs,
pub blkio_delay_count: crate::metric_types::MonotonicCount,
pub blkio_delay_total_ns: crate::metric_types::MonotonicNs,
pub blkio_delay_max_ns: crate::metric_types::PeakNs,
pub blkio_delay_min_ns: crate::metric_types::PeakNs,
pub swapin_delay_count: crate::metric_types::MonotonicCount,
pub swapin_delay_total_ns: crate::metric_types::MonotonicNs,
pub swapin_delay_max_ns: crate::metric_types::PeakNs,
pub swapin_delay_min_ns: crate::metric_types::PeakNs,
pub freepages_delay_count: crate::metric_types::MonotonicCount,
pub freepages_delay_total_ns: crate::metric_types::MonotonicNs,
pub freepages_delay_max_ns: crate::metric_types::PeakNs,
pub freepages_delay_min_ns: crate::metric_types::PeakNs,
pub thrashing_delay_count: crate::metric_types::MonotonicCount,
pub thrashing_delay_total_ns: crate::metric_types::MonotonicNs,
pub thrashing_delay_max_ns: crate::metric_types::PeakNs,
pub thrashing_delay_min_ns: crate::metric_types::PeakNs,
pub compact_delay_count: crate::metric_types::MonotonicCount,
pub compact_delay_total_ns: crate::metric_types::MonotonicNs,
pub compact_delay_max_ns: crate::metric_types::PeakNs,
pub compact_delay_min_ns: crate::metric_types::PeakNs,
pub wpcopy_delay_count: crate::metric_types::MonotonicCount,
pub wpcopy_delay_total_ns: crate::metric_types::MonotonicNs,
pub wpcopy_delay_max_ns: crate::metric_types::PeakNs,
pub wpcopy_delay_min_ns: crate::metric_types::PeakNs,
pub irq_delay_count: crate::metric_types::MonotonicCount,
pub irq_delay_total_ns: crate::metric_types::MonotonicNs,
pub irq_delay_max_ns: crate::metric_types::PeakNs,
pub irq_delay_min_ns: crate::metric_types::PeakNs,
pub hiwater_rss_bytes: crate::metric_types::PeakBytes,
pub hiwater_vm_bytes: crate::metric_types::PeakBytes,
}
impl Default for ThreadState {
fn default() -> Self {
Self {
tid: 0,
tgid: 0,
pcomm: String::new(),
comm: String::new(),
cgroup: String::new(),
start_time_clock_ticks: 0,
policy: Default::default(),
nice: Default::default(),
cpu_affinity: Default::default(),
processor: Default::default(),
state: default_state_char(),
ext_enabled: false,
run_time_ns: Default::default(),
wait_time_ns: Default::default(),
timeslices: Default::default(),
voluntary_csw: Default::default(),
nonvoluntary_csw: Default::default(),
nr_wakeups: Default::default(),
nr_wakeups_local: Default::default(),
nr_wakeups_remote: Default::default(),
nr_wakeups_sync: Default::default(),
nr_wakeups_migrate: Default::default(),
nr_wakeups_affine: Default::default(),
nr_wakeups_affine_attempts: Default::default(),
nr_migrations: Default::default(),
nr_forced_migrations: Default::default(),
nr_failed_migrations_affine: Default::default(),
nr_failed_migrations_running: Default::default(),
nr_failed_migrations_hot: Default::default(),
wait_sum: Default::default(),
wait_count: Default::default(),
wait_max: Default::default(),
voluntary_sleep_ns: Default::default(),
sleep_max: Default::default(),
block_sum: Default::default(),
block_max: Default::default(),
iowait_sum: Default::default(),
iowait_count: Default::default(),
exec_max: Default::default(),
slice_max: Default::default(),
allocated_bytes: Default::default(),
deallocated_bytes: Default::default(),
minflt: Default::default(),
majflt: Default::default(),
utime_clock_ticks: Default::default(),
stime_clock_ticks: Default::default(),
priority: Default::default(),
rt_priority: Default::default(),
core_forceidle_sum: Default::default(),
fair_slice_ns: Default::default(),
nr_threads: Default::default(),
smaps_rollup_kb: BTreeMap::new(),
rchar: Default::default(),
wchar: Default::default(),
syscr: Default::default(),
syscw: Default::default(),
read_bytes: Default::default(),
write_bytes: Default::default(),
cancelled_write_bytes: Default::default(),
cpu_delay_count: Default::default(),
cpu_delay_total_ns: Default::default(),
cpu_delay_max_ns: Default::default(),
cpu_delay_min_ns: Default::default(),
blkio_delay_count: Default::default(),
blkio_delay_total_ns: Default::default(),
blkio_delay_max_ns: Default::default(),
blkio_delay_min_ns: Default::default(),
swapin_delay_count: Default::default(),
swapin_delay_total_ns: Default::default(),
swapin_delay_max_ns: Default::default(),
swapin_delay_min_ns: Default::default(),
freepages_delay_count: Default::default(),
freepages_delay_total_ns: Default::default(),
freepages_delay_max_ns: Default::default(),
freepages_delay_min_ns: Default::default(),
thrashing_delay_count: Default::default(),
thrashing_delay_total_ns: Default::default(),
thrashing_delay_max_ns: Default::default(),
thrashing_delay_min_ns: Default::default(),
compact_delay_count: Default::default(),
compact_delay_total_ns: Default::default(),
compact_delay_max_ns: Default::default(),
compact_delay_min_ns: Default::default(),
wpcopy_delay_count: Default::default(),
wpcopy_delay_total_ns: Default::default(),
wpcopy_delay_max_ns: Default::default(),
wpcopy_delay_min_ns: Default::default(),
irq_delay_count: Default::default(),
irq_delay_total_ns: Default::default(),
irq_delay_max_ns: Default::default(),
irq_delay_min_ns: Default::default(),
hiwater_rss_bytes: Default::default(),
hiwater_vm_bytes: Default::default(),
}
}
}
impl ThreadState {
pub(crate) fn apply_delay_stats(&mut self, ds: &crate::taskstats::DelayStats) {
use crate::metric_types::{MonotonicCount, MonotonicNs, PeakBytes, PeakNs};
self.cpu_delay_count = MonotonicCount(ds.cpu_count);
self.cpu_delay_total_ns = MonotonicNs(ds.cpu_delay_total_ns);
self.cpu_delay_max_ns = PeakNs(ds.cpu_delay_max_ns);
self.cpu_delay_min_ns = PeakNs(ds.cpu_delay_min_ns);
self.blkio_delay_count = MonotonicCount(ds.blkio_count);
self.blkio_delay_total_ns = MonotonicNs(ds.blkio_delay_total_ns);
self.blkio_delay_max_ns = PeakNs(ds.blkio_delay_max_ns);
self.blkio_delay_min_ns = PeakNs(ds.blkio_delay_min_ns);
self.swapin_delay_count = MonotonicCount(ds.swapin_count);
self.swapin_delay_total_ns = MonotonicNs(ds.swapin_delay_total_ns);
self.swapin_delay_max_ns = PeakNs(ds.swapin_delay_max_ns);
self.swapin_delay_min_ns = PeakNs(ds.swapin_delay_min_ns);
self.freepages_delay_count = MonotonicCount(ds.freepages_count);
self.freepages_delay_total_ns = MonotonicNs(ds.freepages_delay_total_ns);
self.freepages_delay_max_ns = PeakNs(ds.freepages_delay_max_ns);
self.freepages_delay_min_ns = PeakNs(ds.freepages_delay_min_ns);
self.thrashing_delay_count = MonotonicCount(ds.thrashing_count);
self.thrashing_delay_total_ns = MonotonicNs(ds.thrashing_delay_total_ns);
self.thrashing_delay_max_ns = PeakNs(ds.thrashing_delay_max_ns);
self.thrashing_delay_min_ns = PeakNs(ds.thrashing_delay_min_ns);
self.compact_delay_count = MonotonicCount(ds.compact_count);
self.compact_delay_total_ns = MonotonicNs(ds.compact_delay_total_ns);
self.compact_delay_max_ns = PeakNs(ds.compact_delay_max_ns);
self.compact_delay_min_ns = PeakNs(ds.compact_delay_min_ns);
self.wpcopy_delay_count = MonotonicCount(ds.wpcopy_count);
self.wpcopy_delay_total_ns = MonotonicNs(ds.wpcopy_delay_total_ns);
self.wpcopy_delay_max_ns = PeakNs(ds.wpcopy_delay_max_ns);
self.wpcopy_delay_min_ns = PeakNs(ds.wpcopy_delay_min_ns);
self.irq_delay_count = MonotonicCount(ds.irq_count);
self.irq_delay_total_ns = MonotonicNs(ds.irq_delay_total_ns);
self.irq_delay_max_ns = PeakNs(ds.irq_delay_max_ns);
self.irq_delay_min_ns = PeakNs(ds.irq_delay_min_ns);
self.hiwater_rss_bytes = PeakBytes(ds.hiwater_rss_bytes);
self.hiwater_vm_bytes = PeakBytes(ds.hiwater_vm_bytes);
}
pub fn smaps_rollup_bytes(
&self,
) -> impl Iterator<Item = (&String, crate::metric_types::Bytes)> {
self.smaps_rollup_kb
.iter()
.map(|(k, v)| (k, crate::metric_types::Bytes(v.saturating_mul(1024))))
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupStats {
pub cpu: CgroupCpuStats,
pub memory: CgroupMemoryStats,
pub pids: CgroupPidsStats,
pub psi: Psi,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupCpuStats {
pub usage_usec: u64,
pub nr_throttled: u64,
pub throttled_usec: u64,
pub max_quota_us: Option<u64>,
pub max_period_us: u64,
pub weight: Option<u64>,
pub weight_nice: Option<i32>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupMemoryStats {
pub current: u64,
pub max: Option<u64>,
pub high: Option<u64>,
pub low: Option<u64>,
pub min: Option<u64>,
pub stat: BTreeMap<String, u64>,
pub events: BTreeMap<String, u64>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupPidsStats {
pub current: Option<u64>,
pub max: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct PsiHalf {
pub avg10: u16,
pub avg60: u16,
pub avg300: u16,
pub total_usec: u64,
}
impl PsiHalf {
pub fn avg10_percent(&self) -> f64 {
self.avg10 as f64 / 100.0
}
pub fn avg60_percent(&self) -> f64 {
self.avg60 as f64 / 100.0
}
pub fn avg300_percent(&self) -> f64 {
self.avg300 as f64 / 100.0
}
}
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct PsiResource {
pub some: PsiHalf,
pub full: PsiHalf,
}
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct Psi {
pub cpu: PsiResource,
pub memory: PsiResource,
pub io: PsiResource,
pub irq: PsiResource,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct SchedExtSysfs {
pub state: String,
pub switch_all: u64,
pub nr_rejected: u64,
pub hotplug_seq: u64,
pub enable_seq: u64,
}
fn parse_psi(raw: &str) -> PsiResource {
let mut out = PsiResource::default();
for line in raw.lines() {
let mut tokens = line.split_whitespace();
let Some(prefix) = tokens.next() else {
continue;
};
let half = match prefix {
"some" => &mut out.some,
"full" => &mut out.full,
_ => continue,
};
for tok in tokens {
let Some((key, value)) = tok.split_once('=') else {
continue;
};
match key {
"avg10" => half.avg10 = parse_centi_percent(value),
"avg60" => half.avg60 = parse_centi_percent(value),
"avg300" => half.avg300 = parse_centi_percent(value),
"total" => half.total_usec = value.parse::<u64>().unwrap_or(0),
_ => {}
}
}
}
out
}
fn parse_centi_percent(s: &str) -> u16 {
let (int_part, frac_part) = s.split_once('.').unwrap_or((s, ""));
let Ok(int) = int_part.parse::<u32>() else {
return 0;
};
let frac = if frac_part.is_empty() {
0
} else {
let padded: String = frac_part
.chars()
.chain(std::iter::repeat('0'))
.take(2)
.collect();
padded.parse::<u32>().unwrap_or(0)
};
let combined = int.saturating_mul(100).saturating_add(frac);
combined.try_into().unwrap_or(u16::MAX)
}
fn read_host_psi_at(proc_root: &Path) -> Psi {
let pressure_dir = proc_root.join("pressure");
Psi {
cpu: read_psi_file_at(&pressure_dir.join("cpu")),
memory: read_psi_file_at(&pressure_dir.join("memory")),
io: read_psi_file_at(&pressure_dir.join("io")),
irq: read_psi_file_at(&pressure_dir.join("irq")),
}
}
fn read_sched_ext_sysfs_at(sys_root: &Path) -> Option<SchedExtSysfs> {
let dir = sys_root.join("kernel").join("sched_ext");
if !dir.exists() {
return None;
}
Some(SchedExtSysfs {
state: fs::read_to_string(dir.join("state"))
.map(|s| s.trim().to_string())
.unwrap_or_default(),
switch_all: read_sysfs_u64(&dir.join("switch_all")),
nr_rejected: read_sysfs_u64(&dir.join("nr_rejected")),
hotplug_seq: read_sysfs_u64(&dir.join("hotplug_seq")),
enable_seq: read_sysfs_u64(&dir.join("enable_seq")),
})
}
fn read_sysfs_u64(path: &Path) -> u64 {
fs::read_to_string(path)
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0)
}
fn read_cgroup_psi_at(cgroup_root: &Path, path: &str) -> Psi {
let relative = path.strip_prefix('/').unwrap_or(path);
let dir = if relative.is_empty() {
cgroup_root.to_path_buf()
} else {
cgroup_root.join(relative)
};
Psi {
cpu: read_psi_file_at(&dir.join("cpu.pressure")),
memory: read_psi_file_at(&dir.join("memory.pressure")),
io: read_psi_file_at(&dir.join("io.pressure")),
irq: read_psi_file_at(&dir.join("irq.pressure")),
}
}
fn read_psi_file_at(path: &Path) -> PsiResource {
fs::read_to_string(path)
.ok()
.as_deref()
.map(parse_psi)
.unwrap_or_default()
}
impl CtprofSnapshot {
pub fn load(path: &std::path::Path) -> anyhow::Result<Self> {
use anyhow::Context;
let bytes = std::fs::read(path)
.with_context(|| format!("read ctprof snapshot from {}", path.display()))?;
let json = decompress_capped(&bytes, MAX_DECOMPRESSED_SNAPSHOT_BYTES)
.with_context(|| format!("zstd decompress ctprof snapshot {}", path.display()))?;
let snap: CtprofSnapshot = serde_json::from_slice(&json).with_context(|| {
format!(
"parse ctprof snapshot JSON from {} (did the capture format change?)",
path.display(),
)
})?;
Ok(snap)
}
pub fn write(&self, path: &std::path::Path) -> anyhow::Result<()> {
use anyhow::Context;
let json = serde_json::to_vec(self).context("serialize ctprof snapshot to JSON")?;
let compressed =
zstd::encode_all(json.as_slice(), 3).context("zstd compress ctprof snapshot")?;
std::fs::write(path, compressed)
.with_context(|| format!("write ctprof snapshot to {}", path.display()))?;
Ok(())
}
}
fn decompress_capped(bytes: &[u8], max_decompressed: u64) -> anyhow::Result<Vec<u8>> {
use std::io::Read;
let decoder = zstd::stream::read::Decoder::new(bytes)?;
let mut out = Vec::new();
decoder
.take(max_decompressed.saturating_add(1))
.read_to_end(&mut out)?;
if out.len() as u64 > max_decompressed {
anyhow::bail!(
"zstd-decompressed payload exceeds the {}-byte cap (decompression-bomb guard)",
max_decompressed,
);
}
Ok(out)
}
pub const SNAPSHOT_EXTENSION: &str = "ctprof.zst";
pub const MAX_DECOMPRESSED_SNAPSHOT_BYTES: u64 = 256 * 1024 * 1024;
pub const DEFAULT_PROC_ROOT: &str = "/proc";
pub const DEFAULT_CGROUP_ROOT: &str = "/sys/fs/cgroup";
pub const DEFAULT_SYS_ROOT: &str = "/sys";
fn task_file(proc_root: &Path, tgid: i32, tid: i32, leaf: &str) -> PathBuf {
proc_root
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join(leaf)
}
fn proc_file(proc_root: &Path, tgid: i32, leaf: &str) -> PathBuf {
proc_root.join(tgid.to_string()).join(leaf)
}
fn policy_name(policy: i32) -> String {
match policy {
libc::SCHED_OTHER => "SCHED_OTHER".to_string(),
libc::SCHED_FIFO => "SCHED_FIFO".to_string(),
libc::SCHED_RR => "SCHED_RR".to_string(),
libc::SCHED_BATCH => "SCHED_BATCH".to_string(),
libc::SCHED_IDLE => "SCHED_IDLE".to_string(),
6 => "SCHED_DEADLINE".to_string(),
7 => "SCHED_EXT".to_string(),
other => format!("SCHED_UNKNOWN({other})"),
}
}
fn iter_tgids_at(proc_root: &Path) -> Vec<i32> {
let Ok(entries) = fs::read_dir(proc_root) else {
return Vec::new();
};
let mut tgids: Vec<i32> = entries
.filter_map(|e| e.ok())
.filter_map(|e| e.file_name().to_str().and_then(|s| s.parse::<i32>().ok()))
.filter(|&p| p > 0)
.collect();
tgids.sort_unstable();
tgids
}
fn iter_task_ids_at(proc_root: &Path, tgid: i32) -> Vec<i32> {
let path = proc_root.join(tgid.to_string()).join("task");
let Ok(entries) = fs::read_dir(&path) else {
return Vec::new();
};
let mut tids: Vec<i32> = entries
.filter_map(|e| e.ok())
.filter_map(|e| e.file_name().to_str().and_then(|s| s.parse::<i32>().ok()))
.filter(|&t| t > 0)
.collect();
tids.sort_unstable();
tids
}
fn read_process_comm_at(proc_root: &Path, tgid: i32) -> Option<String> {
let raw = fs::read_to_string(proc_file(proc_root, tgid, "comm")).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn read_thread_comm_at(proc_root: &Path, tgid: i32, tid: i32) -> Option<String> {
let raw = fs::read_to_string(task_file(proc_root, tgid, tid, "comm")).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
struct StatFields {
minflt: Option<u64>,
majflt: Option<u64>,
utime_clock_ticks: Option<u64>,
stime_clock_ticks: Option<u64>,
priority: Option<i32>,
nice: Option<i32>,
start_time_clock_ticks: Option<u64>,
processor: Option<i32>,
rt_priority: Option<u32>,
policy: Option<i32>,
}
fn parse_stat(raw: &str) -> StatFields {
let Some(line) = raw.lines().next() else {
return StatFields::default();
};
let Some(last_close) = line.rfind(')') else {
return StatFields::default();
};
let Some(tail) = line.get(last_close + 1..) else {
return StatFields::default();
};
let parts: Vec<&str> = tail.split_ascii_whitespace().collect();
let get_u64 = |idx: usize| parts.get(idx).and_then(|s| s.parse::<u64>().ok());
let get_u32 = |idx: usize| parts.get(idx).and_then(|s| s.parse::<u32>().ok());
let get_i32 = |idx: usize| parts.get(idx).and_then(|s| s.parse::<i32>().ok());
StatFields {
minflt: get_u64(7),
majflt: get_u64(9),
utime_clock_ticks: get_u64(11),
stime_clock_ticks: get_u64(12),
priority: get_i32(15),
nice: get_i32(16),
start_time_clock_ticks: get_u64(19),
processor: get_i32(36),
rt_priority: get_u32(37),
policy: get_i32(38),
}
}
fn read_stat_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> StatFields {
match fs::read_to_string(task_file(proc_root, tgid, tid, "stat")) {
Ok(raw) => parse_stat(&raw),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("stat");
}
StatFields::default()
}
}
}
fn parse_schedstat(raw: &str) -> (Option<u64>, Option<u64>, Option<u64>) {
let Some(line) = raw.lines().next() else {
return (None, None, None);
};
let mut parts = line.split_ascii_whitespace();
let run = parts.next().and_then(|s| s.parse::<u64>().ok());
let wait = parts.next().and_then(|s| s.parse::<u64>().ok());
let slices = parts.next().and_then(|s| s.parse::<u64>().ok());
(run, wait, slices)
}
fn read_schedstat_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> (Option<u64>, Option<u64>, Option<u64>) {
match fs::read_to_string(task_file(proc_root, tgid, tid, "schedstat")) {
Ok(raw) => parse_schedstat(&raw),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("schedstat");
}
(None, None, None)
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
struct IoFields {
rchar: Option<u64>,
wchar: Option<u64>,
syscr: Option<u64>,
syscw: Option<u64>,
read_bytes: Option<u64>,
write_bytes: Option<u64>,
cancelled_write_bytes: Option<u64>,
}
fn parse_io(raw: &str) -> IoFields {
let mut out = IoFields::default();
for line in raw.lines() {
let Some((key, value)) = line.split_once(':') else {
continue;
};
let parsed = value.trim().parse::<u64>().ok();
match key.trim() {
"rchar" => out.rchar = parsed,
"wchar" => out.wchar = parsed,
"syscr" => out.syscr = parsed,
"syscw" => out.syscw = parsed,
"read_bytes" => out.read_bytes = parsed,
"write_bytes" => out.write_bytes = parsed,
"cancelled_write_bytes" => out.cancelled_write_bytes = parsed,
_ => {}
}
}
out
}
fn read_io_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> IoFields {
match fs::read_to_string(task_file(proc_root, tgid, tid, "io")) {
Ok(raw) => parse_io(&raw),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("io");
}
IoFields::default()
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct StatusFields {
voluntary_csw: Option<u64>,
nonvoluntary_csw: Option<u64>,
state: Option<char>,
cpus_allowed: Option<Vec<u32>>,
nr_threads: Option<u64>,
}
fn parse_status(raw: &str) -> StatusFields {
let mut out = StatusFields::default();
for line in raw.lines() {
let Some((key, value)) = line.split_once(':') else {
continue;
};
let value = value.trim();
match key.trim() {
"State" => {
out.state = value.chars().next();
}
"voluntary_ctxt_switches" => {
out.voluntary_csw = value.parse::<u64>().ok();
}
"nonvoluntary_ctxt_switches" => {
out.nonvoluntary_csw = value.parse::<u64>().ok();
}
"Cpus_allowed_list" => {
out.cpus_allowed = crate::cpu_util::parse_cpu_list(value);
}
"Threads" => {
out.nr_threads = value.parse::<u64>().ok();
}
_ => {}
}
}
out
}
fn read_status_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> StatusFields {
match fs::read_to_string(task_file(proc_root, tgid, tid, "status")) {
Ok(raw) => parse_status(&raw),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("status");
}
StatusFields::default()
}
}
}
#[cfg(test)]
fn read_cgroup_at(proc_root: &Path, tgid: i32, tid: i32) -> Option<String> {
read_cgroup_at_with_tally(proc_root, tgid, tid, &mut None)
}
fn read_cgroup_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> Option<String> {
match fs::read_to_string(task_file(proc_root, tgid, tid, "cgroup")) {
Ok(raw) => parse_cgroup_v2(&raw),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("cgroup");
}
None
}
}
}
fn parse_cgroup_v2(raw: &str) -> Option<String> {
for line in raw.lines() {
if let Some(rest) = line.strip_prefix("0::") {
let trimmed = rest.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
}
None
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
struct SchedFields {
nr_wakeups: Option<u64>,
nr_wakeups_local: Option<u64>,
nr_wakeups_remote: Option<u64>,
nr_wakeups_sync: Option<u64>,
nr_wakeups_migrate: Option<u64>,
nr_wakeups_affine: Option<u64>,
nr_wakeups_affine_attempts: Option<u64>,
nr_migrations: Option<u64>,
nr_forced_migrations: Option<u64>,
nr_failed_migrations_affine: Option<u64>,
nr_failed_migrations_running: Option<u64>,
nr_failed_migrations_hot: Option<u64>,
wait_sum: Option<u64>,
wait_count: Option<u64>,
wait_max: Option<u64>,
sleep_sum: Option<u64>,
sleep_max: Option<u64>,
block_sum: Option<u64>,
block_max: Option<u64>,
iowait_sum: Option<u64>,
iowait_count: Option<u64>,
exec_max: Option<u64>,
slice_max: Option<u64>,
core_forceidle_sum: Option<u64>,
fair_slice_ns: Option<u64>,
ext_enabled: Option<bool>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ParseDottedNs {
Negative,
Malformed,
}
fn parsed_ns_from_dotted(value: &str) -> Result<u64, ParseDottedNs> {
if let Some((ms_str, ns_str)) = value.split_once('.') {
let ms_trimmed = ms_str.trim();
if ms_trimmed.starts_with('-') {
return Err(ParseDottedNs::Negative);
}
if ns_str.trim_start().starts_with('-') {
return Err(ParseDottedNs::Negative);
}
let ms = ms_trimmed
.parse::<u64>()
.map_err(|_| ParseDottedNs::Malformed)?;
let ns_part: String = ns_str.chars().take(6).collect();
let padded = format!("{:0<6}", ns_part);
let ns = padded
.parse::<u64>()
.map_err(|_| ParseDottedNs::Malformed)?;
ms.checked_mul(1_000_000)
.and_then(|x| x.checked_add(ns))
.ok_or(ParseDottedNs::Malformed)
} else {
let trimmed = value.trim();
if trimmed.starts_with('-') {
return Err(ParseDottedNs::Negative);
}
trimmed.parse::<u64>().map_err(|_| ParseDottedNs::Malformed)
}
}
fn parse_sched(raw: &str, tally: &mut Option<&mut ParseTally>) -> SchedFields {
let mut out = SchedFields::default();
let mut parse_dotted = |value: &str| -> Option<u64> {
match parsed_ns_from_dotted(value) {
Ok(v) => Some(v),
Err(ParseDottedNs::Negative) => {
if let Some(t) = tally.as_mut() {
t.record_negative_dotted();
}
None
}
Err(ParseDottedNs::Malformed) => None,
}
};
for line in raw.lines() {
let Some((key, value)) = line.split_once(':') else {
continue;
};
let key = key.trim();
let value = value.trim();
let parsed_u64 = || value.parse::<u64>().ok();
if key == "ext.enabled" {
out.ext_enabled = value.parse::<u64>().ok().map(|n| n != 0);
continue;
}
let short = key.rsplit('.').next().unwrap_or(key);
match short {
"nr_wakeups" => out.nr_wakeups = parsed_u64(),
"nr_wakeups_local" => out.nr_wakeups_local = parsed_u64(),
"nr_wakeups_remote" => out.nr_wakeups_remote = parsed_u64(),
"nr_wakeups_sync" => out.nr_wakeups_sync = parsed_u64(),
"nr_wakeups_migrate" => out.nr_wakeups_migrate = parsed_u64(),
"nr_wakeups_affine" => out.nr_wakeups_affine = parsed_u64(),
"nr_wakeups_affine_attempts" => out.nr_wakeups_affine_attempts = parsed_u64(),
"nr_migrations" => out.nr_migrations = parsed_u64(),
"nr_forced_migrations" => out.nr_forced_migrations = parsed_u64(),
"nr_failed_migrations_affine" => out.nr_failed_migrations_affine = parsed_u64(),
"nr_failed_migrations_running" => out.nr_failed_migrations_running = parsed_u64(),
"nr_failed_migrations_hot" => out.nr_failed_migrations_hot = parsed_u64(),
"wait_sum" => out.wait_sum = parse_dotted(value),
"wait_count" => out.wait_count = parsed_u64(),
"wait_max" => out.wait_max = parse_dotted(value),
"sum_sleep_runtime" => out.sleep_sum = parse_dotted(value),
"sleep_max" => out.sleep_max = parse_dotted(value),
"sum_block_runtime" => out.block_sum = parse_dotted(value),
"block_max" => out.block_max = parse_dotted(value),
"iowait_sum" => out.iowait_sum = parse_dotted(value),
"iowait_count" => out.iowait_count = parsed_u64(),
"exec_max" => out.exec_max = parse_dotted(value),
"slice_max" => out.slice_max = parse_dotted(value),
"core_forceidle_sum" => out.core_forceidle_sum = parse_dotted(value),
"slice" => out.fair_slice_ns = parsed_u64(),
_ => {}
}
}
out
}
fn read_sched_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> SchedFields {
match fs::read_to_string(task_file(proc_root, tgid, tid, "sched")) {
Ok(raw) => parse_sched(&raw, tally),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("sched");
}
SchedFields::default()
}
}
}
fn parse_smaps_rollup(raw: &str) -> BTreeMap<String, u64> {
let mut out = BTreeMap::new();
for line in raw.lines() {
let Some((key, value)) = line.split_once(':') else {
continue;
};
let key_trimmed = key.trim();
if key_trimmed.contains(char::is_whitespace) || key_trimmed.contains('-') {
continue;
}
let Some(n_str) = value.split_ascii_whitespace().next() else {
continue;
};
let Ok(n) = n_str.parse::<u64>() else {
continue;
};
out.insert(key_trimmed.to_string(), n);
}
out
}
fn read_smaps_rollup_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
tally: &mut Option<&mut ParseTally>,
) -> BTreeMap<String, u64> {
if tid != tgid {
return BTreeMap::new();
}
match fs::read_to_string(task_file(proc_root, tgid, tid, "smaps_rollup")) {
Ok(raw) => parse_smaps_rollup(&raw),
Err(_) => {
if let Some(t) = tally.as_mut() {
t.record_failure("smaps_rollup");
}
BTreeMap::new()
}
}
}
fn parse_cpu_stat(raw: &str) -> (Option<u64>, Option<u64>, Option<u64>) {
let mut usage = None;
let mut throttled = None;
let mut throttled_usec = None;
for line in raw.lines() {
let mut parts = line.split_ascii_whitespace();
let Some(key) = parts.next() else { continue };
let Some(value) = parts.next() else { continue };
let parsed = value.parse::<u64>().ok();
match key {
"usage_usec" => usage = parsed,
"nr_throttled" => throttled = parsed,
"throttled_usec" => throttled_usec = parsed,
_ => {}
}
}
(usage, throttled, throttled_usec)
}
fn parse_kv_counters(raw: &str) -> BTreeMap<String, u64> {
let mut out = BTreeMap::new();
for line in raw.lines() {
let mut parts = line.split_ascii_whitespace();
let Some(key) = parts.next() else { continue };
let Some(value) = parts.next() else { continue };
let Ok(parsed) = value.parse::<u64>() else {
continue;
};
out.insert(key.to_string(), parsed);
}
out
}
fn parse_max_or_u64(raw: &str) -> Option<u64> {
let trimmed = raw.trim();
if trimmed == "max" {
return None;
}
trimmed.parse::<u64>().ok()
}
fn parse_floor_value(raw: &str) -> Option<u64> {
let trimmed = raw.trim();
if trimmed == "max" {
return Some(u64::MAX);
}
trimmed.parse::<u64>().ok()
}
fn parse_cpu_max(raw: &str) -> (Option<u64>, u64) {
let mut parts = raw.split_ascii_whitespace();
let quota_token = parts.next();
let period_token = parts.next();
let quota = quota_token.and_then(parse_max_or_u64_str);
let period = period_token
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(CPU_MAX_DEFAULT_PERIOD_US);
(quota, period)
}
fn parse_max_or_u64_str(s: &str) -> Option<u64> {
if s == "max" {
return None;
}
s.parse::<u64>().ok()
}
const CPU_MAX_DEFAULT_PERIOD_US: u64 = 100_000;
fn read_cgroup_stats_at(cgroup_root: &Path, path: &str) -> CgroupStats {
let relative = path.strip_prefix('/').unwrap_or(path);
let dir = if relative.is_empty() {
cgroup_root.to_path_buf()
} else {
cgroup_root.join(relative)
};
let (usage, throttled, throttled_usec) = fs::read_to_string(dir.join("cpu.stat"))
.ok()
.as_deref()
.map(parse_cpu_stat)
.unwrap_or((None, None, None));
let (max_quota_us, max_period_us) = fs::read_to_string(dir.join("cpu.max"))
.ok()
.as_deref()
.map(parse_cpu_max)
.unwrap_or((None, CPU_MAX_DEFAULT_PERIOD_US));
let weight = fs::read_to_string(dir.join("cpu.weight"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok());
let weight_nice = fs::read_to_string(dir.join("cpu.weight.nice"))
.ok()
.and_then(|s| s.trim().parse::<i32>().ok());
let memory_current = fs::read_to_string(dir.join("memory.current"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0);
let memory_max = fs::read_to_string(dir.join("memory.max"))
.ok()
.as_deref()
.and_then(parse_max_or_u64);
let memory_high = fs::read_to_string(dir.join("memory.high"))
.ok()
.as_deref()
.and_then(parse_max_or_u64);
let memory_low = fs::read_to_string(dir.join("memory.low"))
.ok()
.as_deref()
.and_then(parse_floor_value);
let memory_min = fs::read_to_string(dir.join("memory.min"))
.ok()
.as_deref()
.and_then(parse_floor_value);
let memory_stat = fs::read_to_string(dir.join("memory.stat"))
.ok()
.as_deref()
.map(parse_kv_counters)
.unwrap_or_default();
let memory_events = fs::read_to_string(dir.join("memory.events"))
.ok()
.as_deref()
.map(parse_kv_counters)
.unwrap_or_default();
let pids_current = fs::read_to_string(dir.join("pids.current"))
.ok()
.and_then(|s| s.trim().parse::<u64>().ok());
let pids_max = fs::read_to_string(dir.join("pids.max"))
.ok()
.as_deref()
.and_then(parse_max_or_u64);
let psi = read_cgroup_psi_at(cgroup_root, path);
CgroupStats {
cpu: CgroupCpuStats {
usage_usec: usage.unwrap_or(0),
nr_throttled: throttled.unwrap_or(0),
throttled_usec: throttled_usec.unwrap_or(0),
max_quota_us,
max_period_us,
weight,
weight_nice,
},
memory: CgroupMemoryStats {
current: memory_current,
max: memory_max,
high: memory_high,
low: memory_low,
min: memory_min,
stat: memory_stat,
events: memory_events,
},
pids: CgroupPidsStats {
current: pids_current,
max: pids_max,
},
psi,
}
}
#[cfg(test)]
fn capture_thread_at(
proc_root: &Path,
tgid: i32,
tid: i32,
pcomm: &str,
comm: &str,
use_syscall_affinity: bool,
) -> ThreadState {
capture_thread_at_with_tally(
proc_root,
tgid,
tid,
pcomm,
comm,
use_syscall_affinity,
&mut None,
)
}
fn capture_thread_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
pcomm: &str,
comm: &str,
use_syscall_affinity: bool,
tally: &mut Option<&mut ParseTally>,
) -> ThreadState {
let cgroup = read_cgroup_at_with_tally(proc_root, tgid, tid, tally).unwrap_or_default();
let stat = read_stat_at_with_tally(proc_root, tgid, tid, tally);
let (run_time_ns, wait_time_ns, timeslices) =
read_schedstat_at_with_tally(proc_root, tgid, tid, tally);
let io = read_io_at_with_tally(proc_root, tgid, tid, tally);
let status = read_status_at_with_tally(proc_root, tgid, tid, tally);
let sched = read_sched_at_with_tally(proc_root, tgid, tid, tally);
let smaps_rollup_kb = read_smaps_rollup_at_with_tally(proc_root, tgid, tid, tally);
let cpu_affinity = if use_syscall_affinity {
crate::cpu_util::read_affinity(tid)
.or(status.cpus_allowed)
.unwrap_or_default()
} else {
status.cpus_allowed.unwrap_or_default()
};
use crate::metric_types::{
Bytes, CategoricalString, ClockTicks, CpuSet, GaugeCount, GaugeNs, MonotonicCount,
MonotonicNs, OrdinalI32, OrdinalU32, PeakNs,
};
ThreadState {
tid: tid as u32,
tgid: tgid as u32,
pcomm: pcomm.to_string(),
comm: comm.to_string(),
cgroup,
start_time_clock_ticks: stat.start_time_clock_ticks.unwrap_or(0),
policy: CategoricalString(stat.policy.map(policy_name).unwrap_or_default()),
nice: OrdinalI32(stat.nice.unwrap_or(0)),
cpu_affinity: CpuSet(cpu_affinity),
processor: OrdinalI32(stat.processor.unwrap_or(0)),
state: status.state.unwrap_or_else(default_state_char),
ext_enabled: sched.ext_enabled.unwrap_or(false),
run_time_ns: MonotonicNs(run_time_ns.unwrap_or(0)),
wait_time_ns: MonotonicNs(wait_time_ns.unwrap_or(0)),
timeslices: MonotonicCount(timeslices.unwrap_or(0)),
voluntary_csw: MonotonicCount(status.voluntary_csw.unwrap_or(0)),
nonvoluntary_csw: MonotonicCount(status.nonvoluntary_csw.unwrap_or(0)),
nr_wakeups: MonotonicCount(sched.nr_wakeups.unwrap_or(0)),
nr_wakeups_local: MonotonicCount(sched.nr_wakeups_local.unwrap_or(0)),
nr_wakeups_remote: MonotonicCount(sched.nr_wakeups_remote.unwrap_or(0)),
nr_wakeups_sync: MonotonicCount(sched.nr_wakeups_sync.unwrap_or(0)),
nr_wakeups_migrate: MonotonicCount(sched.nr_wakeups_migrate.unwrap_or(0)),
nr_wakeups_affine: MonotonicCount(sched.nr_wakeups_affine.unwrap_or(0)),
nr_wakeups_affine_attempts: MonotonicCount(sched.nr_wakeups_affine_attempts.unwrap_or(0)),
nr_migrations: MonotonicCount(sched.nr_migrations.unwrap_or(0)),
nr_forced_migrations: MonotonicCount(sched.nr_forced_migrations.unwrap_or(0)),
nr_failed_migrations_affine: MonotonicCount(sched.nr_failed_migrations_affine.unwrap_or(0)),
nr_failed_migrations_running: MonotonicCount(
sched.nr_failed_migrations_running.unwrap_or(0),
),
nr_failed_migrations_hot: MonotonicCount(sched.nr_failed_migrations_hot.unwrap_or(0)),
wait_sum: MonotonicNs(sched.wait_sum.unwrap_or(0)),
wait_count: MonotonicCount(sched.wait_count.unwrap_or(0)),
wait_max: PeakNs(sched.wait_max.unwrap_or(0)),
voluntary_sleep_ns: MonotonicNs(
sched
.sleep_sum
.unwrap_or(0)
.saturating_sub(sched.block_sum.unwrap_or(0)),
),
sleep_max: PeakNs(sched.sleep_max.unwrap_or(0)),
block_sum: MonotonicNs(sched.block_sum.unwrap_or(0)),
block_max: PeakNs(sched.block_max.unwrap_or(0)),
iowait_sum: MonotonicNs(sched.iowait_sum.unwrap_or(0)),
iowait_count: MonotonicCount(sched.iowait_count.unwrap_or(0)),
exec_max: PeakNs(sched.exec_max.unwrap_or(0)),
slice_max: PeakNs(sched.slice_max.unwrap_or(0)),
allocated_bytes: Bytes(0),
deallocated_bytes: Bytes(0),
minflt: MonotonicCount(stat.minflt.unwrap_or(0)),
majflt: MonotonicCount(stat.majflt.unwrap_or(0)),
utime_clock_ticks: ClockTicks(stat.utime_clock_ticks.unwrap_or(0)),
stime_clock_ticks: ClockTicks(stat.stime_clock_ticks.unwrap_or(0)),
priority: OrdinalI32(stat.priority.unwrap_or(0)),
rt_priority: OrdinalU32(stat.rt_priority.unwrap_or(0)),
core_forceidle_sum: MonotonicNs(sched.core_forceidle_sum.unwrap_or(0)),
fair_slice_ns: GaugeNs(sched.fair_slice_ns.unwrap_or(0)),
nr_threads: GaugeCount(if tid == tgid {
status.nr_threads.unwrap_or(0)
} else {
0
}),
smaps_rollup_kb,
rchar: Bytes(io.rchar.unwrap_or(0)),
wchar: Bytes(io.wchar.unwrap_or(0)),
syscr: MonotonicCount(io.syscr.unwrap_or(0)),
syscw: MonotonicCount(io.syscw.unwrap_or(0)),
read_bytes: Bytes(io.read_bytes.unwrap_or(0)),
write_bytes: Bytes(io.write_bytes.unwrap_or(0)),
cancelled_write_bytes: Bytes(io.cancelled_write_bytes.unwrap_or(0)),
cpu_delay_count: MonotonicCount(0),
cpu_delay_total_ns: MonotonicNs(0),
cpu_delay_max_ns: PeakNs(0),
cpu_delay_min_ns: PeakNs(0),
blkio_delay_count: MonotonicCount(0),
blkio_delay_total_ns: MonotonicNs(0),
blkio_delay_max_ns: PeakNs(0),
blkio_delay_min_ns: PeakNs(0),
swapin_delay_count: MonotonicCount(0),
swapin_delay_total_ns: MonotonicNs(0),
swapin_delay_max_ns: PeakNs(0),
swapin_delay_min_ns: PeakNs(0),
freepages_delay_count: MonotonicCount(0),
freepages_delay_total_ns: MonotonicNs(0),
freepages_delay_max_ns: PeakNs(0),
freepages_delay_min_ns: PeakNs(0),
thrashing_delay_count: MonotonicCount(0),
thrashing_delay_total_ns: MonotonicNs(0),
thrashing_delay_max_ns: PeakNs(0),
thrashing_delay_min_ns: PeakNs(0),
compact_delay_count: MonotonicCount(0),
compact_delay_total_ns: MonotonicNs(0),
compact_delay_max_ns: PeakNs(0),
compact_delay_min_ns: PeakNs(0),
wpcopy_delay_count: MonotonicCount(0),
wpcopy_delay_total_ns: MonotonicNs(0),
wpcopy_delay_max_ns: PeakNs(0),
wpcopy_delay_min_ns: PeakNs(0),
irq_delay_count: MonotonicCount(0),
irq_delay_total_ns: MonotonicNs(0),
irq_delay_max_ns: PeakNs(0),
irq_delay_min_ns: PeakNs(0),
hiwater_rss_bytes: crate::metric_types::PeakBytes(0),
hiwater_vm_bytes: crate::metric_types::PeakBytes(0),
}
}
#[cfg(test)]
fn capture_thread(tgid: i32, tid: i32, pcomm: &str) -> ThreadState {
let proc_root = Path::new(DEFAULT_PROC_ROOT);
let comm = read_thread_comm_at(proc_root, tgid, tid).unwrap_or_default();
capture_thread_at(proc_root, tgid, tid, pcomm, &comm, true)
}
#[derive(Debug, Default)]
struct ProbeSummary {
tgids_walked: u64,
jemalloc_detected: u64,
probed_ok: u64,
failed: u64,
attach_tag_counts: BTreeMap<&'static str, u64>,
probe_tag_counts: BTreeMap<&'static str, u64>,
}
impl ProbeSummary {
fn dominant_tag(&self) -> Option<&'static str> {
self.attach_tag_counts
.iter()
.filter(|(t, _)| !matches!(**t, "jemalloc-not-found" | "readlink-failure"))
.chain(self.probe_tag_counts.iter())
.max_by(|a, b| a.1.cmp(b.1).then_with(|| b.0.cmp(a.0)))
.map(|(tag, _)| *tag)
}
fn ptrace_dominates(&self) -> bool {
let total_ptrace: u64 = self
.probe_tag_counts
.iter()
.filter(|(t, _)| matches!(**t, "ptrace-seize" | "ptrace-interrupt"))
.map(|(_, n)| *n)
.sum();
self.failed > 0 && total_ptrace * 2 >= self.failed
}
fn to_public(&self) -> CtprofProbeSummary {
CtprofProbeSummary {
tgids_walked: self.tgids_walked,
jemalloc_detected: self.jemalloc_detected,
probed_ok: self.probed_ok,
failed: self.failed,
dominant_failure: self.dominant_tag().map(|t| t.to_string()),
privilege_dominant: self.ptrace_dominates(),
}
}
}
#[derive(Debug, Default)]
struct ParseTally {
tids_walked: u64,
failures_by_file: BTreeMap<&'static str, u64>,
pending_failures: Vec<&'static str>,
negative_dotted_values: u64,
pending_negative_dotted: u64,
}
impl ParseTally {
fn record_failure(&mut self, file_kind: &'static str) {
self.pending_failures.push(file_kind);
}
fn record_negative_dotted(&mut self) {
self.pending_negative_dotted = self.pending_negative_dotted.saturating_add(1);
}
fn commit_pending(&mut self) {
for kind in self.pending_failures.drain(..) {
*self.failures_by_file.entry(kind).or_insert(0) += 1;
}
self.negative_dotted_values = self
.negative_dotted_values
.saturating_add(self.pending_negative_dotted);
self.pending_negative_dotted = 0;
}
fn discard_pending(&mut self) {
self.pending_failures.clear();
self.pending_negative_dotted = 0;
}
fn total_failures(&self) -> u64 {
self.failures_by_file.values().sum()
}
fn dominant_file(&self) -> Option<&'static str> {
self.failures_by_file
.iter()
.max_by(|a, b| a.1.cmp(b.1).then_with(|| b.0.cmp(a.0)))
.map(|(tag, _)| *tag)
}
fn kernel_config_dominates(&self) -> bool {
let total = self.total_failures();
if total == 0 {
return false;
}
let kconfig: u64 = self
.failures_by_file
.iter()
.filter(|(t, _)| matches!(**t, "schedstat" | "io"))
.map(|(_, n)| *n)
.sum();
kconfig * 2 >= total
}
fn to_public(&self) -> CtprofParseSummary {
let read_failures = self.total_failures();
let mut by_file = BTreeMap::new();
for (k, v) in &self.failures_by_file {
by_file.insert((*k).to_string(), *v);
}
CtprofParseSummary {
tids_walked: self.tids_walked,
read_failures,
read_failures_by_file: by_file,
dominant_read_failure: self.dominant_file().map(|t| t.to_string()),
kernel_config_dominant: self.kernel_config_dominates(),
negative_dotted_values: self.negative_dotted_values,
}
}
}
const PTRACE_EPERM_HINT: &str = "hint: re-run as root, or sudo setcap cap_sys_ptrace+eip $(which ktstr), or set kernel.yama.ptrace_scope=0";
struct AttachOutcome {
pcomm: String,
result: std::result::Result<
crate::host_thread_probe::JemallocProbe,
crate::host_thread_probe::AttachError,
>,
}
fn attach_probe_for_tgid_at(proc_root: &Path, tgid: i32) -> AttachOutcome {
#[cfg(test)]
{
let injected = PANIC_INJECT_TGID.load(std::sync::atomic::Ordering::Acquire);
if injected != 0 && injected == tgid {
panic!("test: injected attach worker panic for tgid {tgid}");
}
}
let pcomm = read_process_comm_at(proc_root, tgid).unwrap_or_default();
let result = crate::host_thread_probe::attach_jemalloc_at(proc_root, tgid);
AttachOutcome { pcomm, result }
}
#[cfg(test)]
static PANIC_INJECT_TGID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
fn record_attach_outcome(
tgid: i32,
outcome: AttachOutcome,
summary: &mut ProbeSummary,
) -> Option<crate::host_thread_probe::JemallocProbe> {
summary.tgids_walked += 1;
let AttachOutcome { pcomm, result } = outcome;
match result {
Ok(probe) => {
summary.jemalloc_detected += 1;
tracing::debug!(tgid, %pcomm, "ctprof probe: jemalloc detected");
Some(probe)
}
Err(err) => {
let tag = err.tag();
*summary.attach_tag_counts.entry(tag).or_insert(0) += 1;
if matches!(tag, "jemalloc-not-found" | "readlink-failure") {
tracing::debug!(tgid, %pcomm, tag, err = %err, "ctprof probe: attach skipped");
} else {
summary.failed += 1;
tracing::warn!(tgid, %pcomm, tag, err = %err, "ctprof probe: attach failed");
}
None
}
}
}
fn try_attach_probe_for_tgid_at(
proc_root: &Path,
tgid: i32,
summary: &mut ProbeSummary,
) -> Option<crate::host_thread_probe::JemallocProbe> {
let outcome = attach_probe_for_tgid_at(proc_root, tgid);
record_attach_outcome(tgid, outcome, summary)
}
fn probe_thread_recording(
probe: &crate::host_thread_probe::JemallocProbe,
tid: i32,
tgid: i32,
pcomm: &str,
comm: &str,
summary: &mut ProbeSummary,
failed_tgids_logged: &mut std::collections::BTreeSet<i32>,
) -> (u64, u64) {
match crate::host_thread_probe::probe_thread(probe, tid) {
Ok(c) => {
summary.probed_ok += 1;
(c.allocated_bytes, c.deallocated_bytes)
}
Err(err) => {
let tag = err.tag();
*summary.probe_tag_counts.entry(tag).or_insert(0) += 1;
summary.failed += 1;
if failed_tgids_logged.insert(tgid) {
tracing::warn!(
tgid,
tid,
%pcomm,
%comm,
tag,
err = %err,
"ctprof probe: probe_thread failed",
);
}
(0, 0)
}
}
}
fn emit_parse_summary(tally: &ParseTally) {
let tids_walked = tally.tids_walked;
let read_failures = tally.total_failures();
let negative_dotted = tally.negative_dotted_values;
let dominant_clause = tally
.dominant_file()
.map(|tag| format!(" (dominant: {tag})"))
.unwrap_or_default();
let kconfig_clause = if tally.kernel_config_dominates() {
format!("; {PARSE_KCONFIG_HINT}")
} else {
String::new()
};
let negative_clause = if negative_dotted > 0 {
format!(", {negative_dotted} negative-dotted values")
} else {
String::new()
};
tracing::info!(
"ctprof parse: {tids_walked} tids walked, \
{read_failures} read failures{negative_clause}\
{dominant_clause}{kconfig_clause}",
);
}
fn emit_probe_summary(summary: &ProbeSummary) {
let tgids_walked = summary.tgids_walked;
let jemalloc_detected = summary.jemalloc_detected;
let probed_ok = summary.probed_ok;
let failed = summary.failed;
if failed > 0 {
let dominant = summary.dominant_tag().unwrap_or("?");
if summary.ptrace_dominates() {
tracing::info!(
"ctprof probe: {tgids_walked} tgids walked, \
{jemalloc_detected} jemalloc detected, \
{probed_ok} probed OK, {failed} failed \
(dominant: {dominant}; {})",
PTRACE_EPERM_HINT,
);
} else {
tracing::info!(
"ctprof probe: {tgids_walked} tgids walked, \
{jemalloc_detected} jemalloc detected, \
{probed_ok} probed OK, {failed} failed \
(dominant: {dominant})",
);
}
} else {
tracing::info!(
"ctprof probe: {tgids_walked} tgids walked, \
{jemalloc_detected} jemalloc detected, \
{probed_ok} probed OK, {failed} failed",
);
}
}
fn capture_with(
proc_root: &Path,
cgroup_root: &Path,
sys_root: &Path,
use_syscall_affinity: bool,
) -> CtprofSnapshot {
let captured_at_unix_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let host = if use_syscall_affinity {
Some(crate::host_context::collect_host_context())
} else {
None
};
let self_pid = std::process::id() as i32;
let mut threads: Vec<ThreadState> = Vec::new();
let mut failed_tgids_logged: std::collections::BTreeSet<i32> =
std::collections::BTreeSet::new();
let tgids = iter_tgids_at(proc_root);
let probe_cache: std::sync::Mutex<
std::collections::HashMap<(u64, u64), Option<crate::host_thread_probe::JemallocProbe>>,
> = std::sync::Mutex::new(std::collections::HashMap::new());
let summary_mutex = std::sync::Mutex::new(ProbeSummary::default());
let probe_map: std::collections::HashMap<i32, Option<crate::host_thread_probe::JemallocProbe>> =
if use_syscall_affinity {
use rayon::prelude::*;
let max_threads = {
let num_cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let load = std::fs::read_to_string(proc_root.join("loadavg"))
.ok()
.and_then(|s| s.split_whitespace().next()?.parse::<f64>().ok())
.unwrap_or(0.0);
let headroom = (num_cpus as f64 - load).max(1.0) as usize;
headroom.clamp(1, num_cpus / 2 + 1)
};
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(max_threads)
.build()
.unwrap();
pool.install(|| tgids
.par_iter()
.copied()
.filter(|&tgid| tgid != self_pid)
.map(|tgid| {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let cache_key = std::fs::metadata(
proc_root.join(tgid.to_string()).join("exe"),
)
.ok()
.map(|m| {
use std::os::unix::fs::MetadataExt;
(m.dev(), m.ino())
});
if let Some(key) = cache_key {
let cached = probe_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(&key)
.cloned();
if let Some(cached_result) = cached {
let mut s = summary_mutex
.lock()
.unwrap_or_else(|e| e.into_inner());
s.tgids_walked += 1;
if cached_result.is_some() {
s.jemalloc_detected += 1;
tracing::debug!(tgid, "ctprof probe: cache hit (jemalloc)");
} else {
tracing::debug!(tgid, "ctprof probe: cache hit (not jemalloc or prior failure)");
}
cached_result
} else {
let outcome = attach_probe_for_tgid_at(proc_root, tgid);
let mut s = summary_mutex
.lock()
.unwrap_or_else(|e| e.into_inner());
let res = record_attach_outcome(tgid, outcome, &mut s);
drop(s);
probe_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(key, res.clone());
res
}
} else {
let outcome = attach_probe_for_tgid_at(proc_root, tgid);
let mut s = summary_mutex
.lock()
.unwrap_or_else(|e| e.into_inner());
record_attach_outcome(tgid, outcome, &mut s)
}
}));
let probe = match result {
Ok(p) => p,
Err(panic_payload) => {
let panic_msg = panic_payload
.downcast_ref::<&str>()
.copied()
.or_else(|| {
panic_payload
.downcast_ref::<String>()
.map(|s| s.as_str())
})
.unwrap_or("<non-string panic payload>");
let mut s = summary_mutex
.lock()
.unwrap_or_else(|e| e.into_inner());
s.tgids_walked += 1;
*s.attach_tag_counts.entry("worker-panic").or_insert(0) += 1;
s.failed += 1;
tracing::error!(
tgid,
panic_msg,
"ctprof probe: attach worker panicked; tgid skipped",
);
None
}
};
(tgid, probe)
})
.collect()
)
} else {
std::collections::HashMap::new()
};
let mut summary = summary_mutex.into_inner().unwrap();
let mut parse_tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = if use_syscall_affinity {
Some(&mut parse_tally)
} else {
None
};
let taskstats_client = if use_syscall_affinity {
match crate::taskstats::TaskstatsClient::open() {
Ok(c) => Some(c),
Err(e) => {
tracing::warn!(
error = %e,
"ctprof taskstats: open failed; delay-accounting and memory-watermark \
fields will be zero. Ensure the kernel was built with CONFIG_TASKSTATS \
(plus CONFIG_TASK_DELAY_ACCT for delay fields and CONFIG_TASK_XACCT for \
hiwater fields), the process holds CAP_NET_ADMIN, and the kernel was \
booted with `delayacct=on` (or sysctl `kernel.task_delayacct=1`)"
);
None
}
}
} else {
None
};
let mut taskstats_tally: Option<crate::taskstats::TaskstatsSummary> = if use_syscall_affinity {
Some(crate::taskstats::TaskstatsSummary::default())
} else {
None
};
for tgid in &tgids {
let tgid = *tgid;
let pcomm = read_process_comm_at(proc_root, tgid).unwrap_or_default();
let probe: Option<&crate::host_thread_probe::JemallocProbe> = probe_map
.get(&tgid)
.and_then(|p: &Option<crate::host_thread_probe::JemallocProbe>| p.as_ref());
for tid in iter_task_ids_at(proc_root, tgid) {
if let Some(t) = tally_opt.as_mut() {
t.tids_walked += 1;
}
let comm = read_thread_comm_at(proc_root, tgid, tid).unwrap_or_default();
let (allocated_bytes, deallocated_bytes) = probe
.map(|p| {
probe_thread_recording(
p,
tid,
tgid,
&pcomm,
&comm,
&mut summary,
&mut failed_tgids_logged,
)
})
.unwrap_or((0, 0));
let mut t = capture_thread_at_with_tally(
proc_root,
tgid,
tid,
&pcomm,
&comm,
use_syscall_affinity,
&mut tally_opt,
);
t.allocated_bytes = crate::metric_types::Bytes(allocated_bytes);
t.deallocated_bytes = crate::metric_types::Bytes(deallocated_bytes);
if let Some(client) = taskstats_client.as_ref() {
let result = client.query_tid(tid as u32);
if let Some(tally) = taskstats_tally.as_mut() {
tally.record_result(&result);
}
if let Ok(ds) = result {
t.apply_delay_stats(&ds);
}
}
if t.comm.is_empty() && t.start_time_clock_ticks == 0 {
if let Some(t) = tally_opt.as_mut() {
t.discard_pending();
}
continue;
}
if let Some(t) = tally_opt.as_mut() {
t.commit_pending();
}
threads.push(t);
}
}
let probe_summary = if use_syscall_affinity {
emit_probe_summary(&summary);
Some(summary.to_public())
} else {
None
};
let parse_summary = if use_syscall_affinity {
emit_parse_summary(&parse_tally);
Some(parse_tally.to_public())
} else {
None
};
let mut cgroup_stats: BTreeMap<String, CgroupStats> = BTreeMap::new();
for t in &threads {
if !t.cgroup.is_empty() && !cgroup_stats.contains_key(&t.cgroup) {
cgroup_stats.insert(
t.cgroup.clone(),
read_cgroup_stats_at(cgroup_root, &t.cgroup),
);
}
}
let psi = read_host_psi_at(proc_root);
let sched_ext = read_sched_ext_sysfs_at(sys_root);
CtprofSnapshot {
captured_at_unix_ns,
host,
threads,
cgroup_stats,
probe_summary,
parse_summary,
taskstats_summary: taskstats_tally,
psi,
sched_ext,
}
}
pub fn capture() -> CtprofSnapshot {
capture_with(
Path::new(DEFAULT_PROC_ROOT),
Path::new(DEFAULT_CGROUP_ROOT),
Path::new(DEFAULT_SYS_ROOT),
true,
)
}
pub fn capture_pid(pid: i32) -> CtprofSnapshot {
capture_pid_with(
Path::new(DEFAULT_PROC_ROOT),
Path::new(DEFAULT_CGROUP_ROOT),
Path::new(DEFAULT_SYS_ROOT),
pid,
true,
)
}
fn capture_pid_with(
proc_root: &Path,
cgroup_root: &Path,
sys_root: &Path,
pid: i32,
use_syscall_affinity: bool,
) -> CtprofSnapshot {
let captured_at_unix_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let host = if use_syscall_affinity {
Some(crate::host_context::collect_host_context())
} else {
None
};
let self_pid = std::process::id() as i32;
let pcomm = read_process_comm_at(proc_root, pid).unwrap_or_default();
let mut summary = ProbeSummary::default();
let mut failed_tgids_logged: std::collections::BTreeSet<i32> =
std::collections::BTreeSet::new();
let probe = if use_syscall_affinity && pid != self_pid {
try_attach_probe_for_tgid_at(proc_root, pid, &mut summary)
} else {
None
};
let mut threads: Vec<ThreadState> = Vec::new();
let mut parse_tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = if use_syscall_affinity {
Some(&mut parse_tally)
} else {
None
};
let taskstats_client = if use_syscall_affinity {
match crate::taskstats::TaskstatsClient::open() {
Ok(c) => Some(c),
Err(e) => {
tracing::warn!(
error = %e,
"ctprof taskstats: open failed; delay-accounting and memory-watermark \
fields will be zero. Ensure the kernel was built with CONFIG_TASKSTATS \
(plus CONFIG_TASK_DELAY_ACCT for delay fields and CONFIG_TASK_XACCT for \
hiwater fields), the process holds CAP_NET_ADMIN, and the kernel was \
booted with `delayacct=on` (or sysctl `kernel.task_delayacct=1`)"
);
None
}
}
} else {
None
};
let mut taskstats_tally: Option<crate::taskstats::TaskstatsSummary> = if use_syscall_affinity {
Some(crate::taskstats::TaskstatsSummary::default())
} else {
None
};
for tid in iter_task_ids_at(proc_root, pid) {
if let Some(t) = tally_opt.as_mut() {
t.tids_walked += 1;
}
let comm = read_thread_comm_at(proc_root, pid, tid).unwrap_or_default();
let (allocated_bytes, deallocated_bytes) = probe
.as_ref()
.map(|p| {
probe_thread_recording(
p,
tid,
pid,
&pcomm,
&comm,
&mut summary,
&mut failed_tgids_logged,
)
})
.unwrap_or((0, 0));
let mut t = capture_thread_at_with_tally(
proc_root,
pid,
tid,
&pcomm,
&comm,
use_syscall_affinity,
&mut tally_opt,
);
t.allocated_bytes = crate::metric_types::Bytes(allocated_bytes);
t.deallocated_bytes = crate::metric_types::Bytes(deallocated_bytes);
if let Some(client) = taskstats_client.as_ref() {
let result = client.query_tid(tid as u32);
if let Some(tally) = taskstats_tally.as_mut() {
tally.record_result(&result);
}
if let Ok(ds) = result {
t.apply_delay_stats(&ds);
}
}
if t.comm.is_empty() && t.start_time_clock_ticks == 0 {
if let Some(t) = tally_opt.as_mut() {
t.discard_pending();
}
continue;
}
if let Some(t) = tally_opt.as_mut() {
t.commit_pending();
}
threads.push(t);
}
let probe_summary = if use_syscall_affinity {
emit_probe_summary(&summary);
Some(summary.to_public())
} else {
None
};
let parse_summary = if use_syscall_affinity {
emit_parse_summary(&parse_tally);
Some(parse_tally.to_public())
} else {
None
};
let mut cgroup_stats: BTreeMap<String, CgroupStats> = BTreeMap::new();
for t in &threads {
if !t.cgroup.is_empty() && !cgroup_stats.contains_key(&t.cgroup) {
cgroup_stats.insert(
t.cgroup.clone(),
read_cgroup_stats_at(cgroup_root, &t.cgroup),
);
}
}
let psi = read_host_psi_at(proc_root);
let sched_ext = read_sched_ext_sysfs_at(sys_root);
CtprofSnapshot {
captured_at_unix_ns,
host,
threads,
cgroup_stats,
probe_summary,
parse_summary,
taskstats_summary: taskstats_tally,
psi,
sched_ext,
}
}
pub fn capture_to(path: &Path) -> Result<()> {
let snap = capture();
snap.write(path)
.with_context(|| format!("write ctprof snapshot to {}", path.display()))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metric_types::{
Bytes, CategoricalString, CpuSet, MonotonicCount, MonotonicNs, OrdinalI32,
};
use tracing_test::traced_test;
fn thread(pcomm: &str, comm: &str, run_time_ns: u64) -> ThreadState {
ThreadState {
tid: 1,
tgid: 1,
pcomm: pcomm.into(),
comm: comm.into(),
cgroup: "/".into(),
start_time_clock_ticks: 0,
policy: CategoricalString("SCHED_OTHER".into()),
nice: OrdinalI32(0),
cpu_affinity: CpuSet(vec![0, 1]),
run_time_ns: MonotonicNs(run_time_ns),
..ThreadState::default()
}
}
#[test]
fn default_threadstate_state_is_sentinel_tilde() {
let t = ThreadState::default();
assert_eq!(
t.state, '~',
"ThreadState::default().state must be '~' (the \
absent-value sentinel chosen to lex-sort AFTER \
every real kernel state letter), not '\\0' (the \
bare char Default); see field doc on \
ThreadState::state"
);
}
#[test]
fn mode_tiebreak_against_default_state_picks_real_letter() {
use crate::ctprof_compare::{AggRule, Aggregated, aggregate};
let default_thread = ThreadState::default();
let real_thread = ThreadState {
state: 'R',
..ThreadState::default()
};
let agg = aggregate(
AggRule::ModeChar(|t| t.state),
&[&default_thread, &real_thread],
);
match agg {
Aggregated::Mode { value, .. } => assert_eq!(
value, "R",
"Mode tiebreak between '~' (default sentinel) \
and 'R' (real kernel state) must elect 'R'; \
got {value:?}"
),
other => panic!("expected Mode, got {other:?}"),
}
}
#[test]
fn wire_format_identity_raw_primitives_deserialize_into_wrapped_thread_state() {
let json = r#"{
"tid": 1234,
"tgid": 1234,
"pcomm": "demo",
"comm": "demo-w",
"cgroup": "/app",
"start_time_clock_ticks": 555000,
"policy": "SCHED_OTHER",
"nice": -5,
"cpu_affinity": [0, 1, 2, 3],
"processor": 7,
"state": "R",
"ext_enabled": false,
"run_time_ns": 1000000,
"wait_time_ns": 0,
"timeslices": 50,
"voluntary_csw": 100,
"nonvoluntary_csw": 25,
"nr_wakeups": 200,
"nr_wakeups_local": 80,
"nr_wakeups_remote": 30,
"nr_wakeups_sync": 10,
"nr_wakeups_migrate": 5,
"nr_wakeups_affine": 60,
"nr_wakeups_affine_attempts": 100,
"nr_migrations": 8,
"nr_forced_migrations": 1,
"nr_failed_migrations_affine": 0,
"nr_failed_migrations_running": 0,
"nr_failed_migrations_hot": 0,
"wait_sum": 5000000,
"wait_count": 15,
"wait_max": 250000,
"voluntary_sleep_ns": 3200000,
"sleep_max": 180000,
"block_sum": 1100000,
"block_max": 60000,
"iowait_sum": 77000,
"iowait_count": 18,
"exec_max": 90000,
"slice_max": 400000,
"allocated_bytes": 16777216,
"deallocated_bytes": 8388608,
"minflt": 7777,
"majflt": 8888,
"utime_clock_ticks": 10,
"stime_clock_ticks": 11,
"priority": 25,
"rt_priority": 99,
"core_forceidle_sum": 0,
"fair_slice_ns": 250000,
"nr_threads": 4,
"smaps_rollup_kb": {},
"rchar": 100,
"wchar": 200,
"syscr": 10,
"syscw": 20,
"read_bytes": 4096,
"write_bytes": 8192,
"cancelled_write_bytes": 1024,
"cpu_delay_count": 0,
"cpu_delay_total_ns": 0,
"cpu_delay_max_ns": 0,
"cpu_delay_min_ns": 0,
"blkio_delay_count": 0,
"blkio_delay_total_ns": 0,
"blkio_delay_max_ns": 0,
"blkio_delay_min_ns": 0,
"swapin_delay_count": 0,
"swapin_delay_total_ns": 0,
"swapin_delay_max_ns": 0,
"swapin_delay_min_ns": 0,
"freepages_delay_count": 0,
"freepages_delay_total_ns": 0,
"freepages_delay_max_ns": 0,
"freepages_delay_min_ns": 0,
"thrashing_delay_count": 0,
"thrashing_delay_total_ns": 0,
"thrashing_delay_max_ns": 0,
"thrashing_delay_min_ns": 0,
"compact_delay_count": 0,
"compact_delay_total_ns": 0,
"compact_delay_max_ns": 0,
"compact_delay_min_ns": 0,
"wpcopy_delay_count": 0,
"wpcopy_delay_total_ns": 0,
"wpcopy_delay_max_ns": 0,
"wpcopy_delay_min_ns": 0,
"irq_delay_count": 0,
"irq_delay_total_ns": 0,
"irq_delay_max_ns": 0,
"irq_delay_min_ns": 0,
"hiwater_rss_bytes": 0,
"hiwater_vm_bytes": 0
}"#;
let t: ThreadState = serde_json::from_str(json).expect("deserialize");
assert_eq!(t.run_time_ns, crate::metric_types::MonotonicNs(1_000_000));
assert_eq!(t.timeslices, crate::metric_types::MonotonicCount(50));
assert_eq!(t.utime_clock_ticks, crate::metric_types::ClockTicks(10));
assert_eq!(t.allocated_bytes, crate::metric_types::Bytes(16_777_216));
assert_eq!(
t.cancelled_write_bytes,
crate::metric_types::Bytes(1024),
"cancelled_write_bytes round-trips through the JSON \
wire format alongside the other Bytes-typed fields",
);
assert_eq!(t.wait_max, crate::metric_types::PeakNs(250_000));
assert_eq!(t.fair_slice_ns, crate::metric_types::GaugeNs(250_000));
assert_eq!(t.nr_threads, crate::metric_types::GaugeCount(4));
assert_eq!(t.nice, crate::metric_types::OrdinalI32(-5));
assert_eq!(t.rt_priority, crate::metric_types::OrdinalU32(99));
assert_eq!(
t.policy,
crate::metric_types::CategoricalString::from("SCHED_OTHER")
);
assert_eq!(
t.cpu_affinity,
crate::metric_types::CpuSet(vec![0, 1, 2, 3])
);
}
#[test]
fn nr_threads_field_pinned_to_gauge_count() {
let t = ThreadState::default();
let _: crate::metric_types::GaugeCount = t.nr_threads;
}
#[test]
fn cancelled_write_bytes_field_pinned_to_bytes() {
let t = ThreadState::default();
let _: crate::metric_types::Bytes = t.cancelled_write_bytes;
}
#[test]
fn snapshot_roundtrip_through_zstd_json() {
let snap = CtprofSnapshot {
captured_at_unix_ns: 42,
host: None,
threads: vec![
thread("proc_a", "worker_0", 1_000_000),
thread("proc_a", "worker_1", 2_000_000),
],
cgroup_stats: BTreeMap::from([("/".into(), {
let mut cs = CgroupStats::default();
cs.cpu.usage_usec = 500;
cs.memory.current = 1 << 20;
cs
})]),
probe_summary: None,
parse_summary: None,
taskstats_summary: None,
psi: Psi::default(),
sched_ext: None,
};
let tmp = tempfile::NamedTempFile::new().unwrap();
snap.write(tmp.path()).unwrap();
let back = CtprofSnapshot::load(tmp.path()).unwrap();
assert_eq!(back.captured_at_unix_ns, 42);
assert_eq!(back.threads.len(), 2);
assert_eq!(
back.threads[1].run_time_ns,
crate::metric_types::MonotonicNs(2_000_000),
);
assert_eq!(back.cgroup_stats["/"].cpu.usage_usec, 500);
}
#[test]
fn load_rejects_non_zstd_payload() {
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(tmp.path(), b"{\"not\": \"zstd\"}").unwrap();
let err = CtprofSnapshot::load(tmp.path()).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("zstd"),
"expected zstd error in context chain, got: {msg}",
);
}
#[test]
fn load_rejects_zstd_of_garbage_json() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let compressed = zstd::encode_all(&b"not json"[..], 3).unwrap();
std::fs::write(tmp.path(), compressed).unwrap();
let err = CtprofSnapshot::load(tmp.path()).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("parse ctprof"),
"expected parse error in context chain, got: {msg}",
);
}
#[test]
fn decompress_capped_rejects_decompression_bomb() {
let payload = vec![0u8; 8192];
let compressed = zstd::encode_all(payload.as_slice(), 3).unwrap();
let cap: u64 = 1024;
let err = super::decompress_capped(&compressed, cap).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("decompression-bomb guard"),
"expected decompression-bomb guard error, got: {msg}",
);
}
#[test]
fn decompress_capped_accepts_payload_at_cap_boundary() {
let payload = b"hello world".to_vec();
let compressed = zstd::encode_all(payload.as_slice(), 3).unwrap();
let out = super::decompress_capped(&compressed, payload.len() as u64).unwrap();
assert_eq!(
out, payload,
"payload exactly at the cap must round-trip — \
cap is inclusive (`>` not `>=`)",
);
}
#[test]
fn parse_stat_robust_against_paren_in_comm() {
let mut line = String::from("1234 (weird)name) ");
for i in 0..20 {
line.push_str(&format!("{i} "));
}
let f = parse_stat(&line);
assert_eq!(f.start_time_clock_ticks, Some(19));
}
#[test]
fn parse_stat_extracts_all_known_fields() {
let mut line = String::from("1 (n) ");
for i in 0..=38 {
line.push_str(&format!("{i} "));
}
let f = parse_stat(&line);
assert_eq!(f.minflt, Some(7));
assert_eq!(f.majflt, Some(9));
assert_eq!(f.utime_clock_ticks, Some(11));
assert_eq!(f.stime_clock_ticks, Some(12));
assert_eq!(f.nice, Some(16));
assert_eq!(f.start_time_clock_ticks, Some(19));
assert_eq!(f.processor, Some(36));
assert_eq!(f.policy, Some(38));
}
#[test]
fn parse_stat_short_line_drops_missing_fields() {
let line = "1 (n) 0 1 2 3 4 5 6 7";
let f = parse_stat(line);
assert_eq!(f.minflt, Some(7));
assert_eq!(f.majflt, None);
assert_eq!(f.utime_clock_ticks, None);
assert_eq!(f.stime_clock_ticks, None);
assert_eq!(f.nice, None);
assert_eq!(f.start_time_clock_ticks, None);
assert_eq!(f.processor, None);
assert_eq!(f.policy, None);
}
#[test]
fn parse_stat_processor_accepts_negative() {
let mut line = String::from("1 (n) ");
for i in 0..36 {
line.push_str(&format!("{i} "));
}
line.push_str("-1 ");
line.push_str("0 ");
line.push_str("0 ");
let f = parse_stat(&line);
assert_eq!(
f.processor,
Some(-1),
"negative tokens must flow through as Some(-1) — pins \
the get_i32 vs get_u64 type choice, not kernel emit \
behavior (which never emits negative)",
);
}
#[test]
fn parse_schedstat_three_fields() {
let (a, b, c) = parse_schedstat("12345 67890 42\n");
assert_eq!(a, Some(12345));
assert_eq!(b, Some(67890));
assert_eq!(c, Some(42));
}
#[test]
fn parse_schedstat_missing_fields_drop_individually() {
let (a, b, c) = parse_schedstat("12345\n");
assert_eq!(a, Some(12345));
assert_eq!(b, None);
assert_eq!(c, None);
}
#[test]
fn parse_io_extracts_all_seven_fields() {
let raw = "rchar: 1\n\
wchar: 2\n\
syscr: 3\n\
syscw: 4\n\
read_bytes: 5\n\
write_bytes: 6\n\
cancelled_write_bytes: 7\n";
let f = parse_io(raw);
assert_eq!(f.rchar, Some(1));
assert_eq!(f.wchar, Some(2));
assert_eq!(f.syscr, Some(3));
assert_eq!(f.syscw, Some(4));
assert_eq!(f.read_bytes, Some(5));
assert_eq!(f.write_bytes, Some(6));
assert_eq!(f.cancelled_write_bytes, Some(7));
}
#[test]
fn parse_status_extracts_csw_and_affinity() {
let raw = "Name:\tbash\n\
State:\tS (sleeping)\n\
Cpus_allowed_list:\t0-3,5\n\
voluntary_ctxt_switches:\t100\n\
nonvoluntary_ctxt_switches:\t5\n";
let f = parse_status(raw);
assert_eq!(f.voluntary_csw, Some(100));
assert_eq!(f.nonvoluntary_csw, Some(5));
assert_eq!(
f.state,
Some('S'),
"first non-whitespace char of `State:` value is the \
single-letter code (R/S/D/T/t/X/Z/P/I)",
);
assert_eq!(f.cpus_allowed.as_deref(), Some(&[0u32, 1, 2, 3, 5][..]));
}
#[test]
fn parse_status_accepts_every_kernel_state_code() {
for code in ['R', 'S', 'D', 'T', 't', 'X', 'Z', 'P', 'I'] {
let raw = format!("State:\t{code} (label)\n");
assert_eq!(parse_status(&raw).state, Some(code));
}
}
#[test]
fn parse_status_absent_state_line_yields_none() {
let raw = "voluntary_ctxt_switches:\t1\n";
let f = parse_status(raw);
assert_eq!(f.state, None);
}
#[test]
fn parse_psi_extracts_some_and_full_halves() {
let raw = "some avg10=18.59 avg60=24.31 avg300=20.49 total=78097519837\n\
full avg10=0.00 avg60=0.00 avg300=0.00 total=0\n";
let r = parse_psi(raw);
assert_eq!(r.some.avg10, 1859);
assert_eq!(r.some.avg60, 2431);
assert_eq!(r.some.avg300, 2049);
assert_eq!(r.some.total_usec, 78_097_519_837);
assert_eq!(r.full.avg10, 0);
assert_eq!(r.full.avg60, 0);
assert_eq!(r.full.avg300, 0);
assert_eq!(r.full.total_usec, 0);
}
#[test]
fn parse_psi_irq_full_only_leaves_some_at_zero() {
let raw = "full avg10=1.09 avg60=1.08 avg300=1.46 total=80506377366\n";
let r = parse_psi(raw);
assert_eq!(r.full.avg10, 109);
assert_eq!(r.full.avg60, 108);
assert_eq!(r.full.avg300, 146);
assert_eq!(r.full.total_usec, 80_506_377_366);
assert_eq!(r.some.avg10, 0);
assert_eq!(r.some.avg60, 0);
assert_eq!(r.some.avg300, 0);
assert_eq!(r.some.total_usec, 0);
}
#[test]
fn parse_psi_empty_input_yields_default() {
let r = parse_psi("");
assert_eq!(r.some.avg10, 0);
assert_eq!(r.full.total_usec, 0);
}
#[test]
fn parse_psi_malformed_value_defaults_to_zero() {
let raw = "some avg10=NaN avg60=0.50 avg300=- total=abc\n";
let r = parse_psi(raw);
assert_eq!(r.some.avg10, 0, "NaN parses to zero");
assert_eq!(r.some.avg60, 50, "well-formed neighbor still parses");
assert_eq!(r.some.avg300, 0, "lone dash parses to zero");
assert_eq!(r.some.total_usec, 0, "non-numeric total parses to zero");
}
#[test]
fn parse_psi_full_saturation_maps_to_10000() {
let raw = "some avg10=100.00 avg60=100.00 avg300=100.00 total=42\n";
let r = parse_psi(raw);
assert_eq!(r.some.avg10, 10_000);
assert_eq!(r.some.avg60, 10_000);
assert_eq!(r.some.avg300, 10_000);
assert_eq!(r.some.total_usec, 42);
}
#[test]
fn parse_psi_unknown_keys_ignored() {
let raw = "some avg10=1.00 avg600=99.99 future_field=42 total=10\n";
let r = parse_psi(raw);
assert_eq!(r.some.avg10, 100);
assert_eq!(r.some.total_usec, 10);
}
#[test]
fn parse_centi_percent_zero_pads_short_fraction() {
assert_eq!(parse_centi_percent("0"), 0);
assert_eq!(parse_centi_percent("42"), 4200);
assert_eq!(parse_centi_percent("1.5"), 150, "1.5 must read as 1.50%");
assert_eq!(parse_centi_percent("0.7"), 70, "0.7 must read as 0.70%");
assert_eq!(parse_centi_percent("18.59"), 1859);
assert_eq!(
parse_centi_percent("1.501"),
150,
"1.501 truncates to 1.50%"
);
assert_eq!(parse_centi_percent("3."), 300);
assert_eq!(parse_centi_percent("100.99"), 10099);
}
#[test]
fn read_host_psi_at_populates_all_four_resources() {
let tmp = tempfile::TempDir::new().unwrap();
let pressure = tmp.path().join("pressure");
std::fs::create_dir_all(&pressure).unwrap();
std::fs::write(
pressure.join("cpu"),
"some avg10=1.00 avg60=2.00 avg300=3.00 total=100\n\
full avg10=0.00 avg60=0.00 avg300=0.00 total=0\n",
)
.unwrap();
std::fs::write(
pressure.join("memory"),
"some avg10=4.50 avg60=5.50 avg300=6.50 total=200\n\
full avg10=7.50 avg60=8.50 avg300=9.50 total=150\n",
)
.unwrap();
std::fs::write(
pressure.join("io"),
"some avg10=10.10 avg60=20.20 avg300=30.30 total=300\n\
full avg10=40.40 avg60=50.50 avg300=60.60 total=250\n",
)
.unwrap();
std::fs::write(
pressure.join("irq"),
"full avg10=0.50 avg60=0.60 avg300=0.70 total=80\n",
)
.unwrap();
let psi = read_host_psi_at(tmp.path());
assert_eq!(psi.cpu.some.avg10, 100);
assert_eq!(psi.cpu.some.avg60, 200);
assert_eq!(psi.cpu.some.avg300, 300);
assert_eq!(psi.cpu.some.total_usec, 100);
assert_eq!(psi.cpu.full.avg10, 0);
assert_eq!(psi.cpu.full.total_usec, 0);
assert_eq!(psi.memory.some.avg10, 450);
assert_eq!(psi.memory.full.avg10, 750);
assert_eq!(psi.memory.some.total_usec, 200);
assert_eq!(psi.memory.full.total_usec, 150);
assert_eq!(psi.io.some.avg10, 1010);
assert_eq!(psi.io.full.avg300, 6060);
assert_eq!(psi.io.some.total_usec, 300);
assert_eq!(psi.irq.full.avg10, 50);
assert_eq!(psi.irq.full.avg60, 60);
assert_eq!(psi.irq.full.avg300, 70);
assert_eq!(psi.irq.full.total_usec, 80);
assert_eq!(psi.irq.some.avg10, 0);
assert_eq!(psi.irq.some.total_usec, 0);
}
#[test]
fn read_host_psi_at_missing_files_yield_default() {
let tmp = tempfile::TempDir::new().unwrap();
let psi = read_host_psi_at(tmp.path());
assert_eq!(psi.cpu.some.avg10, 0);
assert_eq!(psi.memory.full.total_usec, 0);
assert_eq!(psi.io.some.avg300, 0);
assert_eq!(psi.irq.full.avg60, 0);
let pressure = tmp.path().join("pressure");
std::fs::create_dir_all(&pressure).unwrap();
std::fs::write(
pressure.join("cpu"),
"some avg10=12.34 avg60=0 avg300=0 total=0\n\
full avg10=0 avg60=0 avg300=0 total=0\n",
)
.unwrap();
let psi = read_host_psi_at(tmp.path());
assert_eq!(psi.cpu.some.avg10, 1234);
assert_eq!(psi.memory.some.avg10, 0);
assert_eq!(psi.io.full.total_usec, 0);
assert_eq!(psi.irq.full.avg10, 0);
}
#[test]
fn read_cgroup_psi_at_uses_resource_dot_pressure_naming() {
let cgroup_root = tempfile::TempDir::new().unwrap();
let cg_dir = cgroup_root.path().join("app");
std::fs::create_dir_all(&cg_dir).unwrap();
std::fs::write(
cg_dir.join("cpu.pressure"),
"some avg10=11.11 avg60=0 avg300=0 total=42\n\
full avg10=0 avg60=0 avg300=0 total=0\n",
)
.unwrap();
std::fs::write(
cg_dir.join("memory.pressure"),
"some avg10=0 avg60=0 avg300=0 total=0\n\
full avg10=22.22 avg60=0 avg300=0 total=999\n",
)
.unwrap();
std::fs::write(
cg_dir.join("irq.pressure"),
"full avg10=33.33 avg60=0 avg300=0 total=7\n",
)
.unwrap();
let psi = read_cgroup_psi_at(cgroup_root.path(), "/app");
assert_eq!(psi.cpu.some.avg10, 1111);
assert_eq!(psi.cpu.some.total_usec, 42);
assert_eq!(psi.memory.full.avg10, 2222);
assert_eq!(psi.memory.full.total_usec, 999);
assert_eq!(psi.io.some.avg10, 0, "absent io.pressure → default zero");
assert_eq!(psi.io.full.total_usec, 0);
assert_eq!(psi.irq.full.avg10, 3333);
assert_eq!(psi.irq.some.avg10, 0, "irq is full-only");
}
#[test]
fn parse_kv_counters_handles_well_formed_and_malformed_lines() {
let raw = "anon 12812288\n\
file 12623872\n\
pgfault 18\n\
pgmajfault 4\n\
workingset_refault_anon 0\n\
workingset_refault_file 27198\n";
let m = parse_kv_counters(raw);
assert_eq!(m.get("anon"), Some(&12_812_288));
assert_eq!(m.get("file"), Some(&12_623_872));
assert_eq!(m.get("pgfault"), Some(&18));
assert_eq!(m.get("pgmajfault"), Some(&4));
assert_eq!(m.get("workingset_refault_anon"), Some(&0));
assert_eq!(m.get("workingset_refault_file"), Some(&27_198));
assert_eq!(m.len(), 6);
assert!(parse_kv_counters("").is_empty());
let raw = "good 42\n\
bad_no_value\n\
bad_negative -5\n\
bad_text foo\n\
\n\
recover 7\n";
let m = parse_kv_counters(raw);
assert_eq!(m.get("good"), Some(&42));
assert_eq!(m.get("recover"), Some(&7));
assert_eq!(m.len(), 2, "malformed lines must not pollute the map");
}
#[test]
fn parse_smaps_rollup_extracts_kb_values_and_skips_header() {
let raw = "55796dced000-7ffe1f875000 ---p 00000000 00:00 0 [rollup]\n\
Rss: 2080 kB\n\
Pss: 209 kB\n\
Pss_Dirty: 136 kB\n\
Pss_Anon: 136 kB\n\
Anonymous: 136 kB\n\
Swap: 0 kB\n\
SwapPss: 0 kB\n\
Locked: 0 kB\n";
let m = parse_smaps_rollup(raw);
assert_eq!(m.get("Rss"), Some(&2080), "Rss kB stripped to integer");
assert_eq!(m.get("Pss"), Some(&209));
assert_eq!(m.get("Pss_Dirty"), Some(&136));
assert_eq!(m.get("Pss_Anon"), Some(&136));
assert_eq!(m.get("Anonymous"), Some(&136));
assert_eq!(m.get("Swap"), Some(&0));
assert_eq!(m.get("SwapPss"), Some(&0));
assert_eq!(m.get("Locked"), Some(&0));
assert_eq!(
m.len(),
8,
"[rollup] header line is silently elided (no `:` separator)",
);
}
#[test]
fn parse_smaps_rollup_empty_input_yields_empty_map() {
assert!(parse_smaps_rollup("").is_empty());
}
#[test]
fn parse_smaps_rollup_malformed_value_silently_dropped() {
let raw = "Rss: 100 kB\n\
BogusKey: not_a_number kB\n\
Pss: 50 kB\n";
let m = parse_smaps_rollup(raw);
assert_eq!(m.get("Rss"), Some(&100));
assert_eq!(m.get("Pss"), Some(&50), "well-formed neighbor still parses");
assert!(
!m.contains_key("BogusKey"),
"non-u64 value silently dropped"
);
assert_eq!(m.len(), 2);
}
#[test]
fn parse_smaps_rollup_skips_real_kernel_header_with_device_colon() {
let raw = "55796dced000-7ffe1f875000 ---p 00000000 00:00 0 [rollup]\n\
Rss: 2080 kB\n\
Pss: 209 kB\n";
let m = parse_smaps_rollup(raw);
assert_eq!(m.get("Rss"), Some(&2080));
assert_eq!(m.get("Pss"), Some(&209));
assert_eq!(
m.len(),
2,
"header line with `00:00` device pair must not produce a junk key; got {m:?}",
);
}
#[test]
fn read_sched_ext_sysfs_at_populates_all_five_attrs() {
let sys_root = tempfile::TempDir::new().unwrap();
let scx_dir = sys_root.path().join("kernel").join("sched_ext");
std::fs::create_dir_all(&scx_dir).unwrap();
std::fs::write(scx_dir.join("state"), "enabled\n").unwrap();
std::fs::write(scx_dir.join("switch_all"), "1\n").unwrap();
std::fs::write(scx_dir.join("nr_rejected"), "42\n").unwrap();
std::fs::write(scx_dir.join("hotplug_seq"), "315\n").unwrap();
std::fs::write(scx_dir.join("enable_seq"), "7\n").unwrap();
let scx = read_sched_ext_sysfs_at(sys_root.path())
.expect("populated sched_ext directory must yield Some");
assert_eq!(scx.state, "enabled");
assert_eq!(scx.switch_all, 1);
assert_eq!(scx.nr_rejected, 42);
assert_eq!(scx.hotplug_seq, 315);
assert_eq!(scx.enable_seq, 7);
}
#[test]
fn read_sched_ext_sysfs_at_absent_directory_yields_none() {
let sys_root = tempfile::TempDir::new().unwrap();
assert!(read_sched_ext_sysfs_at(sys_root.path()).is_none());
}
#[test]
fn read_sched_ext_sysfs_at_partial_files_default_zero() {
let sys_root = tempfile::TempDir::new().unwrap();
let scx_dir = sys_root.path().join("kernel").join("sched_ext");
std::fs::create_dir_all(&scx_dir).unwrap();
std::fs::write(scx_dir.join("state"), "disabled\n").unwrap();
std::fs::write(scx_dir.join("nr_rejected"), "100\n").unwrap();
let scx =
read_sched_ext_sysfs_at(sys_root.path()).expect("directory exists → returns Some");
assert_eq!(scx.state, "disabled");
assert_eq!(scx.nr_rejected, 100);
assert_eq!(scx.switch_all, 0, "absent file → default 0");
assert_eq!(scx.hotplug_seq, 0);
assert_eq!(scx.enable_seq, 0);
}
#[test]
fn read_smaps_rollup_at_with_tally_dedups_to_leader_only() {
let proc_root = tempfile::TempDir::new().unwrap();
let tgid = 4242;
let leader_tid = 4242;
let follower_tid = 4243;
let leader_dir = proc_root
.path()
.join(tgid.to_string())
.join("task")
.join(leader_tid.to_string());
std::fs::create_dir_all(&leader_dir).unwrap();
std::fs::write(
leader_dir.join("smaps_rollup"),
"Rss: 2048 kB\n\
Pss: 512 kB\n",
)
.unwrap();
let follower_dir = proc_root
.path()
.join(tgid.to_string())
.join("task")
.join(follower_tid.to_string());
std::fs::create_dir_all(&follower_dir).unwrap();
std::fs::write(
follower_dir.join("smaps_rollup"),
"Rss: 9999 kB\nPoison: 1 kB\n",
)
.unwrap();
let m = read_smaps_rollup_at_with_tally(proc_root.path(), tgid, leader_tid, &mut None);
assert_eq!(m.get("Rss"), Some(&2048));
assert_eq!(m.get("Pss"), Some(&512));
assert_eq!(m.len(), 2);
let m = read_smaps_rollup_at_with_tally(proc_root.path(), tgid, follower_tid, &mut None);
assert!(
m.is_empty(),
"follower thread must short-circuit to empty map; got {m:?}"
);
}
#[test]
fn read_smaps_rollup_at_with_tally_absent_file_yields_empty_map() {
let proc_root = tempfile::TempDir::new().unwrap();
let tgid = 4242;
let leader_tid = 4242;
let leader_dir = proc_root
.path()
.join(tgid.to_string())
.join("task")
.join(leader_tid.to_string());
std::fs::create_dir_all(&leader_dir).unwrap();
let m = read_smaps_rollup_at_with_tally(proc_root.path(), tgid, leader_tid, &mut None);
assert!(m.is_empty(), "absent file → empty map; got {m:?}");
}
#[test]
fn parse_max_or_u64_distinguishes_max_from_concrete_value() {
assert_eq!(parse_max_or_u64("max"), None, "literal max → no limit");
assert_eq!(
parse_max_or_u64("max\n"),
None,
"trailing newline tolerated"
);
assert_eq!(
parse_max_or_u64("9223372036854771712"),
Some(9_223_372_036_854_771_712)
);
assert_eq!(parse_max_or_u64("0"), Some(0));
assert_eq!(parse_max_or_u64(""), None, "empty input → no limit");
assert_eq!(parse_max_or_u64(" "), None, "whitespace-only → no limit");
assert_eq!(parse_max_or_u64("not_a_number"), None);
assert_eq!(parse_max_or_u64("-1"), None);
}
#[test]
fn parse_floor_value_treats_max_as_full_protection() {
assert_eq!(
parse_floor_value("max"),
Some(u64::MAX),
"literal max → maximum protection (NOT no floor)"
);
assert_eq!(parse_floor_value("max\n"), Some(u64::MAX));
assert_eq!(parse_floor_value("0"), Some(0), "zero → no protection");
assert_eq!(parse_floor_value("1073741824"), Some(1_073_741_824));
assert_eq!(parse_floor_value(""), None, "empty → absent file");
assert_eq!(parse_floor_value("not_a_number"), None);
}
#[test]
fn parse_cpu_max_handles_quota_period_pairs() {
assert_eq!(parse_cpu_max("50000 100000"), (Some(50_000), 100_000));
assert_eq!(parse_cpu_max("max 100000"), (None, 100_000));
assert_eq!(parse_cpu_max("25000 50000"), (Some(25_000), 50_000));
assert_eq!(parse_cpu_max("50000"), (Some(50_000), 100_000));
assert_eq!(parse_cpu_max(""), (None, 100_000));
assert_eq!(parse_cpu_max("50000 garbage"), (Some(50_000), 100_000));
assert_eq!(parse_cpu_max("max 100000\n"), (None, 100_000));
}
#[test]
fn read_cgroup_stats_at_populates_nested_controllers_end_to_end() {
let cgroup_root = tempfile::TempDir::new().unwrap();
let cg_dir = cgroup_root.path().join("app");
std::fs::create_dir_all(&cg_dir).unwrap();
std::fs::write(
cg_dir.join("cpu.stat"),
"usage_usec 12345\nnr_throttled 7\nthrottled_usec 8\n",
)
.unwrap();
std::fs::write(cg_dir.join("cpu.max"), "50000 100000\n").unwrap();
std::fs::write(cg_dir.join("cpu.weight"), "200\n").unwrap();
std::fs::write(cg_dir.join("cpu.weight.nice"), "-5\n").unwrap();
std::fs::write(cg_dir.join("memory.current"), "9999\n").unwrap();
std::fs::write(cg_dir.join("memory.max"), "max\n").unwrap();
std::fs::write(cg_dir.join("memory.high"), "1073741824\n").unwrap();
std::fs::write(cg_dir.join("memory.low"), "0\n").unwrap();
std::fs::write(cg_dir.join("memory.min"), "0\n").unwrap();
std::fs::write(
cg_dir.join("memory.stat"),
"anon 100\nfile 200\npgfault 18\nslab 50\n",
)
.unwrap();
std::fs::write(
cg_dir.join("memory.events"),
"low 0\nhigh 1\nmax 0\noom 0\noom_kill 0\n",
)
.unwrap();
std::fs::write(cg_dir.join("pids.current"), "42\n").unwrap();
std::fs::write(cg_dir.join("pids.max"), "1024\n").unwrap();
let stats = read_cgroup_stats_at(cgroup_root.path(), "/app");
assert_eq!(stats.cpu.usage_usec, 12_345);
assert_eq!(stats.cpu.nr_throttled, 7);
assert_eq!(stats.cpu.throttled_usec, 8);
assert_eq!(stats.cpu.max_quota_us, Some(50_000));
assert_eq!(stats.cpu.max_period_us, 100_000);
assert_eq!(stats.cpu.weight, Some(200));
assert_eq!(stats.cpu.weight_nice, Some(-5));
assert_eq!(stats.memory.current, 9999);
assert_eq!(stats.memory.max, None, "literal max → no limit");
assert_eq!(stats.memory.high, Some(1_073_741_824));
assert_eq!(stats.memory.low, Some(0));
assert_eq!(stats.memory.min, Some(0));
assert_eq!(stats.memory.stat.get("anon"), Some(&100));
assert_eq!(stats.memory.stat.get("file"), Some(&200));
assert_eq!(stats.memory.stat.get("pgfault"), Some(&18));
assert_eq!(stats.memory.stat.get("slab"), Some(&50));
assert_eq!(stats.memory.events.get("oom_kill"), Some(&0));
assert_eq!(stats.memory.events.get("high"), Some(&1));
assert_eq!(stats.pids.current, Some(42));
assert_eq!(stats.pids.max, Some(1024));
}
#[test]
fn read_cgroup_stats_at_root_cgroup_collapses_to_defaults() {
let cgroup_root = tempfile::TempDir::new().unwrap();
let stats = read_cgroup_stats_at(cgroup_root.path(), "/");
assert_eq!(stats.cpu.usage_usec, 0);
assert_eq!(stats.cpu.max_quota_us, None);
assert_eq!(
stats.cpu.max_period_us, CPU_MAX_DEFAULT_PERIOD_US,
"absent cpu.max → period defaults to kernel default"
);
assert_eq!(stats.cpu.weight, None);
assert_eq!(stats.memory.current, 0);
assert_eq!(stats.memory.max, None);
assert_eq!(stats.memory.high, None);
assert!(stats.memory.stat.is_empty());
assert!(stats.memory.events.is_empty());
assert_eq!(stats.pids.current, None);
assert_eq!(stats.pids.max, None);
}
#[test]
fn parse_cgroup_v2_picks_unified_hierarchy() {
let raw = "12:cpuset:/legacy/cpuset/path\n\
0::/unified/path\n\
5:freezer:/legacy/freezer\n";
assert_eq!(parse_cgroup_v2(raw), Some("/unified/path".to_string()));
}
#[test]
fn parse_cgroup_v2_none_when_only_legacy_present() {
let raw = "12:cpuset:/legacy/path\n";
assert_eq!(parse_cgroup_v2(raw), None);
}
#[test]
fn parse_sched_accepts_prefixed_and_bare_keys() {
let raw = "se.statistics.nr_wakeups : 1000\n\
se.nr_migrations : 42\n\
se.statistics.nr_wakeups_local : 600\n\
se.statistics.wait_sum : 12345.678\n";
let f = parse_sched(raw, &mut None);
assert_eq!(f.nr_wakeups, Some(1000));
assert_eq!(f.nr_migrations, Some(42));
assert_eq!(f.nr_wakeups_local, Some(600));
assert_eq!(f.wait_sum, Some(12_345_678_000));
}
#[test]
fn parse_cpu_stat_space_separated_format() {
let raw = "usage_usec 1234\n\
user_usec 1000\n\
system_usec 234\n\
nr_periods 10\n\
nr_throttled 2\n\
throttled_usec 500\n";
let (usage, throttled, throttled_usec) = parse_cpu_stat(raw);
assert_eq!(usage, Some(1234));
assert_eq!(throttled, Some(2));
assert_eq!(throttled_usec, Some(500));
}
#[test]
fn policy_name_known_and_unknown() {
assert_eq!(policy_name(libc::SCHED_OTHER), "SCHED_OTHER");
assert_eq!(policy_name(libc::SCHED_FIFO), "SCHED_FIFO");
assert_eq!(policy_name(libc::SCHED_RR), "SCHED_RR");
assert_eq!(policy_name(libc::SCHED_BATCH), "SCHED_BATCH");
assert_eq!(policy_name(libc::SCHED_IDLE), "SCHED_IDLE");
assert_eq!(policy_name(6), "SCHED_DEADLINE");
assert_eq!(policy_name(7), "SCHED_EXT");
assert_eq!(policy_name(99), "SCHED_UNKNOWN(99)");
}
#[test]
fn iter_tgids_includes_self() {
let tgids = iter_tgids_at(Path::new(DEFAULT_PROC_ROOT));
let pid = std::process::id() as i32;
assert!(tgids.contains(&pid), "self pid {pid} not in /proc walk");
}
#[test]
fn iter_task_ids_self_returns_at_least_main_tid() {
let pid = std::process::id() as i32;
let tids = iter_task_ids_at(Path::new(DEFAULT_PROC_ROOT), pid);
assert!(
tids.contains(&pid),
"main tid {pid} absent from /proc/self/task"
);
}
#[test]
fn read_process_comm_for_self_is_populated() {
let pid = std::process::id() as i32;
let comm = read_process_comm_at(Path::new(DEFAULT_PROC_ROOT), pid)
.expect("self comm must be readable");
assert!(!comm.is_empty());
}
#[test]
fn capture_thread_self_populates_identity() {
let pid = std::process::id() as i32;
let t = capture_thread(pid, pid, "testproc");
assert_eq!(t.tid, pid as u32);
assert_eq!(t.tgid, pid as u32);
assert_eq!(t.pcomm, "testproc");
assert!(!t.comm.is_empty());
assert!(t.start_time_clock_ticks > 0);
assert!(!t.policy.0.is_empty());
}
#[test]
fn capture_produces_non_empty_snapshot() {
let pid = std::process::id() as i32;
let snap = capture_pid(pid);
assert!(!snap.threads.is_empty());
let self_threads: Vec<_> = snap
.threads
.iter()
.filter(|t| t.tgid == pid as u32)
.collect();
assert!(!self_threads.is_empty(), "own tgid missing from capture");
}
#[test]
fn snapshot_extension_is_stable() {
assert_eq!(SNAPSHOT_EXTENSION, "ctprof.zst");
}
#[test]
fn parse_io_empty_input_yields_all_none() {
let f = parse_io("");
assert_eq!(f, IoFields::default());
}
#[test]
fn parse_io_malformed_value_drops_only_that_field() {
let raw = "rchar: 100\n\
wchar: not-a-number\n\
syscr: 3\n";
let f = parse_io(raw);
assert_eq!(f.rchar, Some(100));
assert_eq!(f.wchar, None, "malformed value drops to None");
assert_eq!(f.syscr, Some(3));
}
#[test]
fn parse_stat_empty_and_no_paren_return_default() {
assert_eq!(parse_stat(""), StatFields::default());
assert_eq!(
parse_stat("garbage line with no close paren 1 2 3"),
StatFields::default(),
"line without `)` must return Default, not panic on \
out-of-bounds indexing",
);
assert_eq!(
parse_stat(" \n"),
StatFields::default(),
"whitespace-only input must also land at Default",
);
}
#[test]
fn parse_stat_multi_line_input_uses_only_first_line() {
let mut first = String::from("1 (proc) ");
for i in 0..=38 {
first.push_str(&format!("{i} "));
}
let second = "2 (other) 999 999 999 999 999 999 999 999 999 999 \
999 999 999 999 999 999 999 999 999 999 999 999 999\n";
let raw = format!("{first}\n{second}");
let f = parse_stat(&raw);
assert_eq!(f.nice, Some(16));
assert_eq!(f.start_time_clock_ticks, Some(19));
assert_eq!(f.policy, Some(38));
}
#[test]
fn parse_schedstat_extra_fields_and_invalid_tokens() {
let (a, b, c) = parse_schedstat("1 2 3 4\n");
assert_eq!((a, b, c), (Some(1), Some(2), Some(3)));
let (a, b, c) = parse_schedstat("1 invalid 3\n");
assert_eq!(a, Some(1));
assert_eq!(b, None);
assert_eq!(c, Some(3));
let (a, b, c) = parse_schedstat("");
assert_eq!((a, b, c), (None, None, None));
}
#[test]
fn policy_name_negative_integer_renders_unknown() {
assert_eq!(policy_name(-1), "SCHED_UNKNOWN(-1)");
assert_eq!(
policy_name(i32::MIN),
format!("SCHED_UNKNOWN({})", i32::MIN)
);
}
#[test]
fn parse_cpu_stat_empty_and_keyonly_lines_yield_none() {
let (u, t, tu) = parse_cpu_stat("");
assert_eq!((u, t, tu), (None, None, None));
let (u, t, tu) = parse_cpu_stat("usage_usec\n");
assert_eq!((u, t, tu), (None, None, None));
}
#[test]
fn parse_status_partial_and_malformed_fields_isolate_correctly() {
let only_v = "Name:\tfoo\n\
voluntary_ctxt_switches:\t9\n";
let f = parse_status(only_v);
assert_eq!(f.voluntary_csw, Some(9));
assert_eq!(f.nonvoluntary_csw, None);
assert_eq!(f.cpus_allowed, None);
let bad_cpu_list = "Cpus_allowed_list:\t5-3\n\
voluntary_ctxt_switches:\t1\n";
let f = parse_status(bad_cpu_list);
assert_eq!(f.voluntary_csw, Some(1));
assert_eq!(
f.cpus_allowed, None,
"malformed cpulist must route parse_cpu_list's None \
into the StatusFields field — not collapse to empty vec",
);
}
#[test]
fn parse_cgroup_v2_empty_path_and_multiple_unified_lines() {
assert_eq!(parse_cgroup_v2("0::\n"), None);
assert_eq!(parse_cgroup_v2("0:: \n"), None);
let raw = "0::/first\n0::/second\n";
assert_eq!(parse_cgroup_v2(raw), Some("/first".to_string()));
}
#[test]
fn read_thread_comm_at_whitespace_only_returns_none() {
let tmp = tempfile::TempDir::new().unwrap();
let tgid = 1;
let tid = 1;
let task_dir = tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string());
std::fs::create_dir_all(&task_dir).unwrap();
std::fs::write(task_dir.join("comm"), " \n").unwrap();
assert_eq!(read_thread_comm_at(tmp.path(), tgid, tid), None);
assert_eq!(read_thread_comm_at(tmp.path(), tgid, 9999), None);
}
fn stage_synthetic_proc(root: &Path, tgid: i32, tid: i32, pcomm: &str, comm: &str) {
use std::fs;
let tgid_dir = root.join(tgid.to_string());
let task_dir = tgid_dir.join("task").join(tid.to_string());
fs::create_dir_all(&task_dir).unwrap();
fs::write(tgid_dir.join("comm"), format!("{pcomm}\n")).unwrap();
fs::write(task_dir.join("comm"), format!("{comm}\n")).unwrap();
let stat_line = format!(
"{tid} (proc (with) parens) R 1 2 3 4 5 6 \
7777 0 8888 0 10 11 12 13 14 {nice} 1 0 \
{starttime} 100 200 300 400 500 600 700 800 \
900 1000 1100 1200 1300 1400 1500 1600 1700 1800 {policy}\n",
tid = tid,
nice = -10_i32,
starttime = 555_555u64,
policy = 0, );
fs::write(task_dir.join("stat"), stat_line).unwrap();
fs::write(task_dir.join("schedstat"), "1000000 200000 50\n").unwrap();
let status = "Name:\tfoo\n\
State:\tR (running)\n\
voluntary_ctxt_switches:\t42\n\
nonvoluntary_ctxt_switches:\t7\n\
Cpus_allowed_list:\t0-3\n";
fs::write(task_dir.join("status"), status).unwrap();
let io = "rchar: 100\n\
wchar: 200\n\
syscr: 10\n\
syscw: 20\n\
read_bytes: 4096\n\
write_bytes: 8192\n\
cancelled_write_bytes: 512\n";
fs::write(task_dir.join("io"), io).unwrap();
let sched = "\
se.statistics.nr_wakeups : 11\n\
se.statistics.nr_wakeups_local : 8\n\
se.statistics.nr_wakeups_remote : 3\n\
se.statistics.nr_wakeups_sync : 2\n\
se.statistics.nr_wakeups_migrate : 1\n\
se.statistics.nr_wakeups_idle : 4\n\
se.statistics.nr_wakeups_affine : 12\n\
se.statistics.nr_wakeups_affine_attempts : 20\n\
nr_migrations : 9\n\
se.statistics.nr_migrations_cold : 5\n\
se.statistics.nr_forced_migrations : 7\n\
se.statistics.nr_failed_migrations_affine : 1\n\
se.statistics.nr_failed_migrations_running : 2\n\
se.statistics.nr_failed_migrations_hot : 3\n\
wait_sum : 5000.25\n\
wait_count : 15\n\
se.statistics.wait_max : 250.5\n\
sum_sleep_runtime : 3200.50\n\
se.statistics.sleep_max : 180.25\n\
sum_block_runtime : 1100.75\n\
se.statistics.block_max : 60.75\n\
iowait_sum : 77.0\n\
iowait_count : 18\n\
se.statistics.exec_max : 90.0\n\
se.statistics.slice_max : 400.5\n\
ext.enabled : 1\n";
fs::write(task_dir.join("sched"), sched).unwrap();
fs::write(task_dir.join("cgroup"), "0::/ktstr.slice/worker0\n").unwrap();
}
#[test]
fn capture_with_filters_ghost_threads_with_empty_comm_and_zero_start() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 42;
let live_tid: i32 = 101;
let ghost_tid: i32 = 202;
stage_synthetic_proc(proc_tmp.path(), tgid, live_tid, "pcomm-proc", "live-thread");
let ghost_dir = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(ghost_tid.to_string());
std::fs::create_dir_all(&ghost_dir).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"ghost tid with empty comm + zero start must be filtered; \
got threads: {:?}",
snap.threads
.iter()
.map(|t| (t.tid, &t.comm))
.collect::<Vec<_>>(),
);
assert_eq!(snap.threads[0].tid, live_tid as u32);
assert_eq!(snap.threads[0].comm, "live-thread");
}
#[test]
fn capture_with_synthetic_tree_assembles_thread_state() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 42;
let tid: i32 = 101;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "pcomm-proc", "worker-thread");
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1, "synthetic proc has one tid");
let t = &snap.threads[0];
assert_eq!(t.tid, tid as u32);
assert_eq!(t.tgid, tgid as u32);
assert_eq!(t.pcomm, "pcomm-proc");
assert_eq!(t.comm, "worker-thread");
assert_eq!(t.cgroup, "/ktstr.slice/worker0");
use crate::metric_types::{
Bytes, CategoricalString, ClockTicks, CpuSet, MonotonicCount, MonotonicNs, OrdinalI32,
PeakNs,
};
assert_eq!(t.nice, OrdinalI32(-10));
assert_eq!(t.start_time_clock_ticks, 555_555);
assert_eq!(t.policy, CategoricalString::from("SCHED_OTHER"));
assert_eq!(t.minflt, MonotonicCount(7777));
assert_eq!(t.majflt, MonotonicCount(8888));
assert_eq!(
t.utime_clock_ticks,
ClockTicks(10),
"tail[11] of stat fixture lands at utime_clock_ticks",
);
assert_eq!(
t.stime_clock_ticks,
ClockTicks(11),
"tail[12] of stat fixture lands at stime_clock_ticks",
);
assert_eq!(
t.processor,
OrdinalI32(1700),
"tail[36] of stat fixture (the 17th post-starttime \
token, value 100*17=1700) lands at processor",
);
assert_eq!(t.run_time_ns, MonotonicNs(1_000_000));
assert_eq!(t.wait_time_ns, MonotonicNs(200_000));
assert_eq!(t.timeslices, MonotonicCount(50));
assert_eq!(
t.state, 'R',
"first non-whitespace char of `State:\tR (running)` is \
the single-letter code R",
);
assert_eq!(t.voluntary_csw, MonotonicCount(42));
assert_eq!(t.nonvoluntary_csw, MonotonicCount(7));
assert_eq!(t.cpu_affinity, CpuSet(vec![0, 1, 2, 3]));
assert_eq!(t.rchar, Bytes(100));
assert_eq!(t.wchar, Bytes(200));
assert_eq!(t.syscr, MonotonicCount(10));
assert_eq!(t.syscw, MonotonicCount(20));
assert_eq!(t.read_bytes, Bytes(4096));
assert_eq!(t.write_bytes, Bytes(8192));
assert_eq!(
t.cancelled_write_bytes,
Bytes(512),
"cancelled_write_bytes round-trips from the 7th line of \
/proc/<tid>/io",
);
assert_eq!(t.nr_wakeups, MonotonicCount(11));
assert_eq!(t.nr_wakeups_local, MonotonicCount(8));
assert_eq!(t.nr_wakeups_remote, MonotonicCount(3));
assert_eq!(t.nr_wakeups_sync, MonotonicCount(2));
assert_eq!(t.nr_wakeups_migrate, MonotonicCount(1));
assert_eq!(t.nr_wakeups_affine, MonotonicCount(12));
assert_eq!(
t.nr_wakeups_affine_attempts,
MonotonicCount(20),
"denominator for the affine-wake success ratio \
(nr_wakeups_affine / nr_wakeups_affine_attempts = 12/20)",
);
assert_eq!(t.nr_migrations, MonotonicCount(9));
assert_eq!(t.nr_forced_migrations, MonotonicCount(7));
assert_eq!(t.nr_failed_migrations_affine, MonotonicCount(1));
assert_eq!(t.nr_failed_migrations_running, MonotonicCount(2));
assert_eq!(t.nr_failed_migrations_hot, MonotonicCount(3));
assert_eq!(
t.wait_sum,
MonotonicNs(5_000_250_000),
"PN_SCHEDSTAT 5000.25 reconstructs to 5_000_250_000 ns \
(5000ms + 250_000ns)",
);
assert_eq!(t.wait_count, MonotonicCount(15));
assert_eq!(
t.wait_max,
PeakNs(250_500_000),
"PN_SCHEDSTAT 250.5 reconstructs to 250_500_000 ns",
);
assert_eq!(
t.voluntary_sleep_ns,
MonotonicNs(2_099_750_000),
"voluntary_sleep_ns = sum_sleep_runtime (3_200_500_000) \
minus sum_block_runtime (1_100_750_000) = \
2_099_750_000 ns; capture-side normalization strips \
the kernel's sleep/block double-count",
);
assert_eq!(
t.sleep_max,
PeakNs(180_250_000),
"PN_SCHEDSTAT 180.25 reconstructs to 180_250_000 ns",
);
assert_eq!(
t.block_sum,
MonotonicNs(1_100_750_000),
"PN_SCHEDSTAT 1100.75 reconstructs to 1_100_750_000 ns; \
block_sum is populated from the kernel's `sum_block_runtime` key",
);
assert_eq!(
t.block_max,
PeakNs(60_750_000),
"PN_SCHEDSTAT 60.75 reconstructs to 60_750_000 ns",
);
assert_eq!(
t.iowait_sum,
MonotonicNs(77_000_000),
"PN_SCHEDSTAT 77.0 reconstructs to 77_000_000 ns",
);
assert_eq!(t.iowait_count, MonotonicCount(18));
assert_eq!(
t.exec_max,
PeakNs(90_000_000),
"PN_SCHEDSTAT 90.0 reconstructs to 90_000_000 ns",
);
assert_eq!(
t.slice_max,
PeakNs(400_500_000),
"PN_SCHEDSTAT 400.5 reconstructs to 400_500_000 ns",
);
assert!(
t.ext_enabled,
"ext.enabled = 1 round-trips through the full-key gate \
to ThreadState::ext_enabled true",
);
assert_eq!(
t.allocated_bytes,
Bytes(0),
"synthetic-tree capture must not probe — allocated_bytes \
collapses to absent-counter zero",
);
assert_eq!(
t.deallocated_bytes,
Bytes(0),
"synthetic-tree capture must not probe — deallocated_bytes \
collapses to absent-counter zero",
);
}
#[test]
fn capture_with_empty_proc_root_produces_empty_snapshot() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
std::fs::write(proc_tmp.path().join("loadavg"), "0.0 0.0 0.0 1/1 1\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
assert!(
snap.threads.is_empty(),
"empty proc_root must produce empty snapshot; got {} threads",
snap.threads.len(),
);
}
#[test]
fn capture_with_inode_cache_collapses_duplicate_binaries() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
std::fs::write(proc_tmp.path().join("loadavg"), "0.0 0.0 0.0 1/1 1\n").unwrap();
let shared_exe = proc_tmp.path().join("shared-exe");
std::fs::write(&shared_exe, b"\x7fELFsynthetic\n").unwrap();
for tgid in [4242, 4243] {
stage_synthetic_proc(
proc_tmp.path(),
tgid,
tgid + 1,
"shared-pcomm",
"shared-comm",
);
let exe_link = proc_tmp.path().join(tgid.to_string()).join("exe");
std::os::unix::fs::symlink(&shared_exe, &exe_link).unwrap();
}
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
assert_eq!(
snap.threads.len(),
2,
"both staged threads must land in the snapshot",
);
for thread in &snap.threads {
assert_eq!(
thread.allocated_bytes,
Bytes(0),
"synthetic /proc has no maps; attach fails, allocated_bytes \
collapses to absent-counter zero — cache-hit branch must not \
fabricate a non-zero counter",
);
}
}
#[test]
fn capture_with_nonexistent_proc_root_produces_empty_snapshot() {
let scratch = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let nonexistent = scratch.path().join("does-not-exist");
let snap = capture_with(&nonexistent, cgroup_tmp.path(), &nonexistent, false);
assert!(
snap.threads.is_empty(),
"nonexistent proc_root must produce empty snapshot; got \
{} threads — iter_tgids_at must collapse ENOENT to empty",
snap.threads.len(),
);
}
#[test]
fn capture_with_tgid_missing_task_dir_yields_no_threads_for_that_tgid() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let live_tgid: i32 = 4242;
let live_tid: i32 = 101;
stage_synthetic_proc(
proc_tmp.path(),
live_tgid,
live_tid,
"live-pcomm",
"live-comm",
);
let bare_tgid: i32 = 4243;
std::fs::create_dir_all(proc_tmp.path().join(bare_tgid.to_string())).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"tgid 4243 has no `task/` subdir → contributes zero threads; \
only live tgid 4242's tid should land. got {} threads, expected 1",
snap.threads.len(),
);
assert_eq!(snap.threads[0].tgid, live_tgid as u32);
assert_eq!(snap.threads[0].tid, live_tid as u32);
}
#[test]
fn capture_with_non_numeric_proc_entries_are_filtered() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let live_tgid: i32 = 5151;
let live_tid: i32 = 5152;
stage_synthetic_proc(proc_tmp.path(), live_tgid, live_tid, "real", "real-thread");
for junk in &["self", "thread-self", "sys", "version", "12abc", "abc"] {
std::fs::create_dir_all(proc_tmp.path().join(junk)).unwrap();
}
std::fs::create_dir_all(proc_tmp.path().join("0")).unwrap();
std::fs::create_dir_all(proc_tmp.path().join("-1")).unwrap();
assert_eq!(
iter_tgids_at(proc_tmp.path()),
vec![live_tgid],
"iter_tgids_at must return only the real numeric tgid; \
non-numeric and `0`/`-1` entries must be filtered by \
parse::<i32>().ok() + `> 0` predicates",
);
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"non-numeric proc_root entries (`self`, `12abc`, etc.) and \
`0`/`-1` must be filtered by iter_tgids_at; got {} threads, \
expected 1 (only the real tgid {live_tgid})",
snap.threads.len(),
);
assert_eq!(snap.threads[0].tgid, live_tgid as u32);
}
#[test]
fn capture_pid_with_nonexistent_pid_produces_empty_snapshot() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let snap = capture_pid_with(
proc_tmp.path(),
cgroup_tmp.path(),
sys_tmp.path(),
99999,
false,
);
assert!(
snap.threads.is_empty(),
"capture_pid_with against nonexistent pid must produce empty \
snapshot; got {} threads — iter_task_ids_at must collapse \
ENOENT to empty",
snap.threads.len(),
);
}
#[test]
fn capture_with_corrupt_stat_file_zeroes_stat_fields_only() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 6161;
let tid: i32 = 6162;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let stat_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("stat");
std::fs::write(&stat_path, "garbage no parens here\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"corrupt stat does not block thread landing — comm + status \
+ io still populate; ghost filter only fires when comm AND \
start_time are both empty/zero. got {} threads",
snap.threads.len(),
);
let t = &snap.threads[0];
assert_eq!(
t.start_time_clock_ticks, 0,
"corrupt stat → start_time_clock_ticks default 0; got {}",
t.start_time_clock_ticks
);
use crate::metric_types::{
Bytes, CategoricalString, ClockTicks, MonotonicCount, OrdinalI32,
};
assert_eq!(
t.nice,
OrdinalI32(0),
"corrupt stat → nice default 0; got {}",
t.nice.0,
);
assert_eq!(
t.policy,
CategoricalString::from(""),
"corrupt stat → policy default empty; got {:?}",
t.policy
);
assert_eq!(t.utime_clock_ticks, ClockTicks(0));
assert_eq!(t.stime_clock_ticks, ClockTicks(0));
assert_eq!(t.processor, OrdinalI32(0));
assert_eq!(
t.voluntary_csw,
MonotonicCount(42),
"status file is intact → voluntary_csw still populates"
);
assert_eq!(
t.rchar,
Bytes(100),
"io file is intact → rchar still populates"
);
}
#[test]
fn capture_with_missing_schedstat_zeroes_schedstat_fields() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7171;
let tid: i32 = 7172;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let schedstat_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("schedstat");
std::fs::remove_file(&schedstat_path).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"thread still lands with schedstat absent"
);
let t = &snap.threads[0];
use crate::metric_types::{MonotonicCount, MonotonicNs};
assert_eq!(
t.run_time_ns,
MonotonicNs(0),
"missing schedstat → run_time_ns default 0; got {}",
t.run_time_ns.0
);
assert_eq!(t.wait_time_ns, MonotonicNs(0));
assert_eq!(t.timeslices, MonotonicCount(0));
assert_eq!(t.start_time_clock_ticks, 555_555);
}
#[test]
fn capture_with_corrupt_status_zeroes_status_fields_and_empty_affinity() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 8181;
let tid: i32 = 8182;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let status_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("status");
std::fs::write(&status_path, "totally malformed garbage no colons here\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let t = &snap.threads[0];
use crate::metric_types::MonotonicCount;
assert_eq!(
t.voluntary_csw,
MonotonicCount(0),
"corrupt status → voluntary_csw default 0; got {}",
t.voluntary_csw.0
);
assert_eq!(t.nonvoluntary_csw, MonotonicCount(0));
assert_eq!(
t.state, '~',
"corrupt status → state collapses to '~' (capture-time \
unwrap_or_else(default_state_char)); got {:?}",
t.state
);
assert!(
t.cpu_affinity.0.is_empty(),
"use_syscall_affinity=false + corrupt status → cpu_affinity \
must be empty Vec, NOT inherit caller's real affinity; got {:?}",
t.cpu_affinity,
);
}
#[test]
fn capture_with_missing_io_zeroes_io_fields() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 9191;
let tid: i32 = 9192;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let io_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("io");
std::fs::remove_file(&io_path).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let t = &snap.threads[0];
use crate::metric_types::{Bytes, MonotonicCount};
assert_eq!(
t.rchar,
Bytes(0),
"missing io → rchar default 0; got {}",
t.rchar.0,
);
assert_eq!(t.wchar, Bytes(0));
assert_eq!(t.syscr, MonotonicCount(0));
assert_eq!(t.syscw, MonotonicCount(0));
assert_eq!(t.read_bytes, Bytes(0));
assert_eq!(t.write_bytes, Bytes(0));
assert_eq!(t.cancelled_write_bytes, Bytes(0));
assert_eq!(t.start_time_clock_ticks, 555_555);
}
#[test]
fn capture_with_missing_sched_zeroes_sched_fields() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 1010;
let tid: i32 = 1011;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let sched_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("sched");
std::fs::remove_file(&sched_path).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let t = &snap.threads[0];
use crate::metric_types::{MonotonicCount, MonotonicNs, PeakNs};
assert_eq!(
t.nr_wakeups,
MonotonicCount(0),
"missing sched → nr_wakeups default 0; got {}",
t.nr_wakeups.0,
);
assert_eq!(t.nr_migrations, MonotonicCount(0));
assert_eq!(t.wait_sum, MonotonicNs(0));
assert_eq!(t.wait_max, PeakNs(0));
assert_eq!(t.voluntary_sleep_ns, MonotonicNs(0));
assert_eq!(t.block_sum, MonotonicNs(0));
assert_eq!(t.iowait_sum, MonotonicNs(0));
assert_eq!(t.exec_max, PeakNs(0));
assert_eq!(t.slice_max, PeakNs(0));
assert!(
!t.ext_enabled,
"missing sched → ext.enabled key absent → ext_enabled false; \
got {}",
t.ext_enabled
);
}
#[test]
fn capture_with_partial_mid_capture_race_lands_zero_thread() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 1212;
let tid: i32 = 1213;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "racy-pcomm", "racy-comm");
let task_dir = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string());
for f in &["stat", "schedstat", "status", "io", "sched", "cgroup"] {
std::fs::remove_file(task_dir.join(f)).unwrap();
}
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1, "comm intact → thread still lands");
let t = &snap.threads[0];
use crate::metric_types::{Bytes, MonotonicCount, MonotonicNs};
assert_eq!(t.comm, "racy-comm", "comm survives the racy partial reads");
assert_eq!(t.start_time_clock_ticks, 0);
assert_eq!(t.nr_wakeups, MonotonicCount(0));
assert_eq!(t.run_time_ns, MonotonicNs(0));
assert_eq!(t.voluntary_csw, MonotonicCount(0));
assert_eq!(t.rchar, Bytes(0));
assert_eq!(t.minflt, MonotonicCount(0));
assert_eq!(t.cgroup, "");
assert!(
snap.cgroup_stats.is_empty(),
"all threads have empty cgroup → enrichment loop skips → \
cgroup_stats stays empty",
);
}
#[test]
fn capture_pid_with_filters_ghost_threads() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 1313;
let live_tid: i32 = 1314;
let ghost_tid: i32 = 1315;
stage_synthetic_proc(proc_tmp.path(), tgid, live_tid, "p", "live");
let ghost_dir = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(ghost_tid.to_string());
std::fs::create_dir_all(&ghost_dir).unwrap();
let snap = capture_pid_with(
proc_tmp.path(),
cgroup_tmp.path(),
sys_tmp.path(),
tgid,
false,
);
assert_eq!(
snap.threads.len(),
1,
"capture_pid_with must filter ghost tid {ghost_tid}; got {} \
threads, expected 1 (only live tid {live_tid})",
snap.threads.len(),
);
assert_eq!(snap.threads[0].tid, live_tid as u32);
}
#[test]
fn capture_with_malformed_cpus_allowed_list_yields_empty_affinity() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 1414;
let tid: i32 = 1415;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let status_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("status");
let status = "Name:\tfoo\n\
State:\tR (running)\n\
voluntary_ctxt_switches:\t1\n\
nonvoluntary_ctxt_switches:\t1\n\
Cpus_allowed_list:\t5-3\n";
std::fs::write(&status_path, status).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let t = &snap.threads[0];
use crate::metric_types::MonotonicCount;
assert!(
t.cpu_affinity.0.is_empty(),
"malformed Cpus_allowed_list `5-3` → parse_cpu_list returns \
None → cpu_affinity defaults to empty Vec (NOT a partial \
range, NOT the caller's affinity); got {:?}",
t.cpu_affinity,
);
assert_eq!(
t.voluntary_csw,
MonotonicCount(1),
"malformed cpulist must NOT corrupt csw fields on the same \
status file — per-arm Option isolation"
);
}
#[test]
fn capture_with_huge_cpu_range_in_status_yields_empty_affinity() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 1515;
let tid: i32 = 1516;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let status_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("status");
let status = "Cpus_allowed_list:\t0-4294967295\n\
voluntary_ctxt_switches:\t1\n\
nonvoluntary_ctxt_switches:\t1\n";
std::fs::write(&status_path, status).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let t = &snap.threads[0];
use crate::metric_types::MonotonicCount;
assert!(
t.cpu_affinity.0.is_empty(),
"huge cpulist range `0-4294967295` exceeds the 64 Ki \
expansion cap → parse_cpu_list returns None → cpu_affinity \
empty (NOT a 4-billion-element Vec, NOT a partial range); \
got {} elements",
t.cpu_affinity.0.len(),
);
assert_eq!(
t.voluntary_csw,
MonotonicCount(1),
"huge cpulist rejection must not break csw parsing on the \
same status file — per-arm Option isolation"
);
}
#[test]
fn capture_with_non_numeric_task_entries_are_filtered() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let live_tgid: i32 = 8181;
let live_tid: i32 = 8182;
stage_synthetic_proc(proc_tmp.path(), live_tgid, live_tid, "real", "real-thread");
let task_dir = proc_tmp.path().join(live_tgid.to_string()).join("task");
for junk in &["status", "self", "12abc", "abc"] {
std::fs::create_dir_all(task_dir.join(junk)).unwrap();
}
std::fs::create_dir_all(task_dir.join("0")).unwrap();
std::fs::create_dir_all(task_dir.join("-1")).unwrap();
assert_eq!(
iter_task_ids_at(proc_tmp.path(), live_tgid),
vec![live_tid],
"iter_task_ids_at must return only the real numeric tid; \
non-numeric and `0`/`-1` entries must be filtered by \
parse::<i32>().ok() + `> 0` predicates",
);
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"non-numeric `task/` entries must be filtered by \
iter_task_ids_at; got {} threads, expected 1",
snap.threads.len(),
);
assert_eq!(snap.threads[0].tid, live_tid as u32);
}
#[test]
fn capture_with_v1_only_cgroup_yields_empty_cgroup_string() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 9191;
let tid: i32 = 9192;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let cgroup_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("cgroup");
let v1_only = "12:cpuset:/legacy/cpuset/path\n\
5:freezer:/legacy/freezer\n\
3:blkio:/\n";
std::fs::write(&cgroup_path, v1_only).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"v1-only cgroup does not block thread landing — comm + \
start_time are intact, ghost filter does not fire; \
got {} threads",
snap.threads.len(),
);
let t = &snap.threads[0];
assert_eq!(
t.cgroup, "",
"v1-only cgroup file → parse_cgroup_v2 returns None → \
ThreadState.cgroup defaults to empty; got {:?}",
t.cgroup,
);
assert!(
!snap.cgroup_stats.contains_key(""),
"empty-cgroup thread must NOT seed an empty-key entry in \
cgroup_stats — the enrichment loop's `!is_empty()` guard \
pins the skip; got keys: {:?}",
snap.cgroup_stats.keys().collect::<Vec<_>>(),
);
}
#[test]
fn capture_to_returns_err_on_unwritable_path() {
let scratch = tempfile::TempDir::new().unwrap();
let unwritable = scratch.path().join("missing-dir").join("snap.ctprof.zst");
let err = capture_to(&unwritable).unwrap_err();
let chain = format!("{err:#}");
assert!(
chain.contains(unwritable.to_string_lossy().as_ref()),
"error chain must name the unwritable target path; got: {chain}",
);
}
#[test]
fn capture_with_stale_cgroup_path_yields_all_zero_stats() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7373;
let tid: i32 = 7374;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let stats = snap
.cgroup_stats
.get("/ktstr.slice/worker0")
.expect("non-empty cgroup string must seed the stats map");
assert_eq!(stats.cpu.usage_usec, 0, "stale cgroup → cpu_usage_usec 0");
assert_eq!(stats.cpu.nr_throttled, 0, "stale cgroup → nr_throttled 0");
assert_eq!(
stats.cpu.throttled_usec, 0,
"stale cgroup → throttled_usec 0"
);
assert_eq!(stats.memory.current, 0, "stale cgroup → memory_current 0");
}
#[test]
fn read_cgroup_at_v1_only_cgroup_returns_none() {
let tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 4242;
let tid: i32 = 4243;
let task_dir = tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string());
std::fs::create_dir_all(&task_dir).unwrap();
let v1_only = "12:cpuset:/legacy/cpuset/path\n\
5:freezer:/legacy/freezer\n";
std::fs::write(task_dir.join("cgroup"), v1_only).unwrap();
assert_eq!(
read_cgroup_at(tmp.path(), tgid, tid),
None,
"v1-only cgroup file → read_cgroup_at returns None (no 0:: line)",
);
assert_eq!(
read_cgroup_at(tmp.path(), tgid, 9999),
None,
"missing cgroup file → read_cgroup_at returns None",
);
}
#[test]
fn parse_cgroup_v2_root_only_path_returns_slash() {
assert_eq!(parse_cgroup_v2("0::/\n"), Some("/".to_string()));
assert_eq!(parse_cgroup_v2("0::/ \n"), Some("/".to_string()));
let raw = "12:cpuset:/legacy/path\n0::/\n5:freezer:/legacy\n";
assert_eq!(parse_cgroup_v2(raw), Some("/".to_string()));
}
fn write_cpu_stat(root: &Path, relative: &str, contents: &str) {
let dir = root.join(relative.trim_start_matches('/'));
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join("cpu.stat"), contents).unwrap();
}
fn write_memory_current(root: &Path, relative: &str, contents: &str) {
let dir = root.join(relative.trim_start_matches('/'));
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join("memory.current"), contents).unwrap();
}
#[test]
fn read_cgroup_stats_at_both_files_populate_all_fields() {
let tmp = tempfile::TempDir::new().unwrap();
write_cpu_stat(
tmp.path(),
"worker",
"usage_usec 12345\nnr_throttled 7\nthrottled_usec 8\n",
);
write_memory_current(tmp.path(), "worker", "9999\n");
let stats = read_cgroup_stats_at(tmp.path(), "/worker");
assert_eq!(stats.cpu.usage_usec, 12345);
assert_eq!(stats.cpu.nr_throttled, 7);
assert_eq!(stats.cpu.throttled_usec, 8);
assert_eq!(stats.memory.current, 9999);
}
#[test]
fn read_cgroup_stats_at_cpu_stat_only_memory_defaults_zero() {
let tmp = tempfile::TempDir::new().unwrap();
write_cpu_stat(
tmp.path(),
"cpu-only",
"usage_usec 500\nnr_throttled 0\nthrottled_usec 0\n",
);
let stats = read_cgroup_stats_at(tmp.path(), "/cpu-only");
assert_eq!(stats.cpu.usage_usec, 500);
assert_eq!(stats.cpu.nr_throttled, 0);
assert_eq!(stats.cpu.throttled_usec, 0);
assert_eq!(
stats.memory.current, 0,
"missing memory.current must collapse to 0, not None",
);
}
#[test]
fn read_cgroup_stats_at_memory_only_cpu_defaults_zero() {
let tmp = tempfile::TempDir::new().unwrap();
write_memory_current(tmp.path(), "mem-only", "2048\n");
let stats = read_cgroup_stats_at(tmp.path(), "/mem-only");
assert_eq!(stats.cpu.usage_usec, 0);
assert_eq!(stats.cpu.nr_throttled, 0);
assert_eq!(stats.cpu.throttled_usec, 0);
assert_eq!(stats.memory.current, 2048);
}
#[test]
fn read_cgroup_stats_at_both_files_missing_all_zero() {
let tmp = tempfile::TempDir::new().unwrap();
std::fs::create_dir_all(tmp.path().join("empty-cg")).unwrap();
let stats = read_cgroup_stats_at(tmp.path(), "/empty-cg");
assert_eq!(stats.cpu.usage_usec, 0);
assert_eq!(stats.cpu.nr_throttled, 0);
assert_eq!(stats.cpu.throttled_usec, 0);
assert_eq!(stats.memory.current, 0);
}
#[test]
fn read_cgroup_stats_at_cpu_stat_missing_key_defaults_field_zero() {
let tmp = tempfile::TempDir::new().unwrap();
write_cpu_stat(
tmp.path(),
"partial",
"usage_usec 999\nthrottled_usec 111\n",
);
let stats = read_cgroup_stats_at(tmp.path(), "/partial");
assert_eq!(stats.cpu.usage_usec, 999);
assert_eq!(stats.cpu.nr_throttled, 0, "absent key collapses to 0");
assert_eq!(stats.cpu.throttled_usec, 111);
}
#[test]
fn parse_sched_populates_all_known_fields() {
let raw = "\
se.statistics.nr_wakeups : 11\n\
se.statistics.nr_wakeups_sync : 2\n\
se.statistics.nr_wakeups_local : 8\n\
se.statistics.nr_wakeups_migrate : 1\n\
se.statistics.nr_wakeups_remote : 3\n\
se.statistics.nr_wakeups_idle : 4\n\
se.statistics.nr_wakeups_affine : 12\n\
se.statistics.nr_wakeups_affine_attempts : 20\n\
nr_migrations : 9\n\
se.statistics.nr_migrations_cold : 5\n\
se.statistics.nr_forced_migrations : 7\n\
se.statistics.nr_failed_migrations_affine : 1\n\
se.statistics.nr_failed_migrations_running : 2\n\
se.statistics.nr_failed_migrations_hot : 3\n\
wait_sum : 500\n\
wait_count : 15\n\
se.statistics.wait_max : 250\n\
sum_sleep_runtime : 320\n\
se.statistics.sleep_max : 180\n\
sum_block_runtime : 110\n\
se.statistics.block_max : 60\n\
iowait_sum : 77\n\
iowait_count : 18\n\
se.statistics.exec_max : 90\n\
se.statistics.slice_max : 400\n\
ext.enabled : 1\n";
let s = parse_sched(raw, &mut None);
assert_eq!(s.nr_wakeups, Some(11));
assert_eq!(s.nr_wakeups_local, Some(8));
assert_eq!(s.nr_wakeups_remote, Some(3));
assert_eq!(s.nr_wakeups_sync, Some(2));
assert_eq!(s.nr_wakeups_migrate, Some(1));
assert_eq!(s.nr_wakeups_affine, Some(12));
assert_eq!(s.nr_wakeups_affine_attempts, Some(20));
assert_eq!(s.nr_migrations, Some(9));
assert_eq!(s.nr_forced_migrations, Some(7));
assert_eq!(s.nr_failed_migrations_affine, Some(1));
assert_eq!(s.nr_failed_migrations_running, Some(2));
assert_eq!(s.nr_failed_migrations_hot, Some(3));
assert_eq!(s.wait_sum, Some(500));
assert_eq!(s.wait_count, Some(15));
assert_eq!(s.wait_max, Some(250));
assert_eq!(
s.sleep_sum,
Some(320),
"sleep_sum (raw kernel sum_sleep_runtime) reads through \
SchedFields; the capture site subtracts block_sum to \
produce ThreadState::voluntary_sleep_ns",
);
assert_eq!(s.sleep_max, Some(180));
assert_eq!(
s.block_sum,
Some(110),
"block_sum reads the kernel's `sum_block_runtime` key",
);
assert_eq!(s.block_max, Some(60));
assert_eq!(s.iowait_sum, Some(77));
assert_eq!(s.iowait_count, Some(18));
assert_eq!(s.exec_max, Some(90));
assert_eq!(s.slice_max, Some(400));
assert_eq!(
s.ext_enabled,
Some(true),
"ext.enabled = 1 → Some(true) — full-key match required \
because rsplit('.') would yield `enabled` and collide \
with any future field of that name",
);
}
#[test]
fn parse_sched_ext_enabled_zero_and_absent() {
let zero = parse_sched("ext.enabled : 0\n", &mut None);
assert_eq!(zero.ext_enabled, Some(false));
let absent = parse_sched("nr_wakeups : 1\n", &mut None);
assert_eq!(absent.ext_enabled, None);
}
#[test]
fn parse_sched_ext_enabled_no_collision_via_rsplit() {
let s = parse_sched("foo.enabled : 1\n", &mut None);
assert_eq!(s.ext_enabled, None);
}
#[test]
fn parse_sched_fractional_fields_reconstruct_ns() {
let raw = "\
wait_sum : 1234.5\n\
sum_sleep_runtime : 678.9\n\
sum_block_runtime : 42.1\n\
iowait_sum : 7.999\n";
let s = parse_sched(raw, &mut None);
assert_eq!(s.wait_sum, Some(1_234_500_000));
assert_eq!(s.sleep_sum, Some(678_900_000));
assert_eq!(s.block_sum, Some(42_100_000));
assert_eq!(s.iowait_sum, Some(7_999_000));
}
#[test]
fn parse_sched_negative_value_returns_none() {
let raw = "wait_sum : -5.0\n";
let s = parse_sched(raw, &mut None);
assert_eq!(
s.wait_sum, None,
"negative ms part fails u64 parse → None; downstream \
unwrap_or(0) collapses this to absent-counter zero",
);
}
#[test]
fn parse_sched_negative_value_records_into_tally() {
let raw = "wait_sum : -5.0\n\
sum_sleep_runtime : 12.5\n\
sum_block_runtime : -10.0\n";
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
let s = parse_sched(raw, &mut tally_opt);
assert_eq!(
s.wait_sum, None,
"negative wait_sum still reads None — the tally records \
but does not change the per-field outcome",
);
assert_eq!(
s.sleep_sum,
Some(12_500_000),
"non-negative neighbor still parses normally",
);
assert_eq!(s.block_sum, None, "negative block_sum reads None");
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(
summary.negative_dotted_values, 2,
"two negative dotted lines bumped the per-snapshot \
negative_dotted_values counter; non-negative neighbor \
did not contribute",
);
}
#[test]
fn parse_tally_negative_dotted_discard_pending_unwinds_bumps() {
let raw = "wait_sum : -5.0\n";
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
let _ = parse_sched(raw, &mut tally_opt);
tally_opt.as_mut().unwrap().discard_pending();
let summary = tally.to_public();
assert_eq!(
summary.negative_dotted_values, 0,
"discard_pending must unwind the negative-dotted \
pending bump so a ghost-filtered tid does not \
pollute the per-snapshot tally",
);
}
#[test]
fn parse_tally_negative_dotted_accumulates_across_commits() {
let raw_a = "wait_sum : -1.0\n";
let raw_b = "wait_sum : -2.0\n\
sleep_max : -3.0\n";
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
let _ = parse_sched(raw_a, &mut tally_opt);
tally_opt.as_mut().unwrap().commit_pending();
let _ = parse_sched(raw_b, &mut tally_opt);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(
summary.negative_dotted_values, 3,
"1 commit + 2 commit = 3 total — multi-tid commits \
must add, not overwrite. got {}",
summary.negative_dotted_values,
);
}
#[test]
fn parse_tally_negative_dotted_zero_for_positive_only_input() {
let raw = "wait_sum : 100.5\n\
sum_sleep_runtime : 200\n\
sum_block_runtime : 0.999\n\
wait_max : 0\n\
exec_max : 7\n";
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
let _ = parse_sched(raw, &mut tally_opt);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(
summary.negative_dotted_values, 0,
"all-positive dotted input must not bump the \
negative-dotted tally; got {}",
summary.negative_dotted_values,
);
}
#[test]
fn parsed_ns_from_dotted_sub_millisecond_negative_detected() {
assert_eq!(
parsed_ns_from_dotted("0.-000500"),
Err(ParseDottedNs::Negative),
"0.-NNN shape (sub-ms negative SPLIT_NS) MUST route \
through Negative — most schedstat negatives land \
sub-millisecond and would otherwise slip through",
);
assert_eq!(
parsed_ns_from_dotted("0.-1"),
Err(ParseDottedNs::Negative),
"single-digit sub-ms negative shape detected",
);
let raw = "wait_sum : 0.-000500\n\
sleep_max : 0.-1\n";
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
let s = parse_sched(raw, &mut tally_opt);
assert_eq!(
s.wait_sum, None,
"sub-ms negative wait_sum collapses to None",
);
assert_eq!(
s.sleep_max, None,
"sub-ms negative sleep_max collapses to None",
);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(
summary.negative_dotted_values, 2,
"two sub-ms negatives both bump the tally — pins \
that the integer-only detection is NOT enough on \
its own",
);
}
#[test]
fn parsed_ns_from_dotted_negative_bare_branch_records() {
assert_eq!(
parsed_ns_from_dotted("-5"),
Err(ParseDottedNs::Negative),
"bare-integer negative routes through Negative",
);
assert_eq!(
parsed_ns_from_dotted("-5.0"),
Err(ParseDottedNs::Negative),
"dotted negative routes through Negative",
);
assert_eq!(
parsed_ns_from_dotted("garbage"),
Err(ParseDottedNs::Malformed),
"non-numeric input routes through Malformed, not \
Negative — the tally must NOT bump on garbage",
);
assert_eq!(
parsed_ns_from_dotted("garbage.5"),
Err(ParseDottedNs::Malformed),
"non-numeric integer part with fractional routes \
through Malformed",
);
assert_eq!(
parsed_ns_from_dotted(""),
Err(ParseDottedNs::Malformed),
"empty input routes through Malformed",
);
assert_eq!(
parsed_ns_from_dotted("5"),
Ok(5),
"bare positive integer parses",
);
assert_eq!(
parsed_ns_from_dotted("5.500"),
Ok(5_500_000),
"positive dotted parses normally",
);
}
#[test]
fn parse_sched_bare_key_names_populate_same_fields() {
let raw = "\
nr_wakeups : 11\n\
nr_wakeups_local : 8\n\
nr_wakeups_remote : 3\n\
nr_wakeups_sync : 2\n\
nr_wakeups_migrate : 1\n\
nr_migrations : 42\n\
wait_max : 999.5\n";
let s = parse_sched(raw, &mut None);
assert_eq!(s.nr_wakeups, Some(11));
assert_eq!(s.nr_wakeups_local, Some(8));
assert_eq!(s.nr_wakeups_remote, Some(3));
assert_eq!(s.nr_wakeups_sync, Some(2));
assert_eq!(s.nr_wakeups_migrate, Some(1));
assert_eq!(
s.nr_migrations,
Some(42),
"bare-key `nr_migrations` must populate via \
rsplit('.').next() returning the whole no-dot string",
);
assert_eq!(
s.wait_max,
Some(999_500_000),
"bare-key `wait_max` must populate via the \
parsed_ns_from_dotted path; 999.5 → 999_500_000 ns",
);
}
#[test]
fn parse_sched_alternative_prefix_populates_same_fields() {
let raw = "\
stats.nr_wakeups : 42\n\
some.other.prefix.nr_migrations : 9\n";
let s = parse_sched(raw, &mut None);
assert_eq!(s.nr_wakeups, Some(42));
assert_eq!(s.nr_migrations, Some(9));
}
#[test]
fn parse_sched_unknown_keys_are_ignored() {
let raw = "\
nr_wakeups : 11\n\
fictional_new_kernel_stat : 9999\n\
nr_migrations : 9\n";
let s = parse_sched(raw, &mut None);
assert_eq!(s.nr_wakeups, Some(11));
assert_eq!(s.nr_migrations, Some(9));
}
fn make_summary(
failed: u64,
attach: &[(&'static str, u64)],
probe: &[(&'static str, u64)],
) -> ProbeSummary {
ProbeSummary {
failed,
attach_tag_counts: attach.iter().copied().collect(),
probe_tag_counts: probe.iter().copied().collect(),
..ProbeSummary::default()
}
}
#[test]
fn probe_summary_dominant_tag_picks_highest_count() {
let s = make_summary(6, &[("dwarf-parse-failure", 5)], &[("ptrace-seize", 1)]);
assert_eq!(s.dominant_tag(), Some("dwarf-parse-failure"));
}
#[test]
fn probe_summary_dominant_tag_filters_non_actionable_attach_tags() {
let s = make_summary(101, &[("jemalloc-not-found", 100)], &[("ptrace-seize", 1)]);
assert_eq!(
s.dominant_tag(),
Some("ptrace-seize"),
"jemalloc-not-found must be filtered out even at \
100x the count of an actionable tag",
);
let s = make_summary(101, &[("readlink-failure", 100)], &[("get-regset", 1)]);
assert_eq!(
s.dominant_tag(),
Some("get-regset"),
"readlink-failure must be filtered out even at \
100x the count of an actionable tag",
);
let s = make_summary(
201,
&[("jemalloc-not-found", 100), ("readlink-failure", 100)],
&[("waitpid", 1)],
);
assert_eq!(
s.dominant_tag(),
Some("waitpid"),
"both filtered attach tags together must NOT push their \
aggregate above an actionable probe tag",
);
let s = make_summary(5, &[("jemalloc-not-found", 5)], &[]);
assert_eq!(
s.dominant_tag(),
None,
"only-filtered-tags case must produce None, not the \
filtered tag itself",
);
}
#[test]
fn probe_summary_dominant_tag_breaks_ties_reverse_alphabetically() {
let s = make_summary(4, &[("ptrace-seize", 2)], &[("dwarf-parse-failure", 2)]);
assert_eq!(s.dominant_tag(), Some("dwarf-parse-failure"));
}
#[test]
fn probe_summary_ptrace_dominates_when_half_of_failures() {
let s = make_summary(6, &[], &[("ptrace-seize", 3), ("waitpid", 3)]);
assert!(s.ptrace_dominates());
}
#[test]
fn probe_summary_ptrace_does_not_dominate_when_below_half() {
let s = make_summary(6, &[], &[("ptrace-seize", 2), ("waitpid", 4)]);
assert!(!s.ptrace_dominates());
}
#[test]
fn probe_summary_no_failures_no_dominant_tag() {
let s = ProbeSummary::default();
assert!(!s.ptrace_dominates());
assert_eq!(s.dominant_tag(), None);
}
#[test]
fn ptrace_eperm_hint_uses_which_ktstr() {
assert!(
PTRACE_EPERM_HINT.contains("$(which ktstr)"),
"EPERM hint must use $(which ktstr) for portability, got: {PTRACE_EPERM_HINT}",
);
assert!(PTRACE_EPERM_HINT.contains("cap_sys_ptrace"));
assert!(PTRACE_EPERM_HINT.contains("yama.ptrace_scope"));
}
#[test]
fn to_public_carries_counters_and_dominant_tag() {
let mut s = make_summary(3, &[("dwarf-parse-failure", 2)], &[("ptrace-seize", 1)]);
s.tgids_walked = 10;
s.jemalloc_detected = 5;
s.probed_ok = 4;
let public = s.to_public();
assert_eq!(public.tgids_walked, 10);
assert_eq!(public.jemalloc_detected, 5);
assert_eq!(public.probed_ok, 4);
assert_eq!(public.failed, 3);
assert_eq!(
public.dominant_failure.as_deref(),
Some("dwarf-parse-failure"),
"dominant_tag picks the highest-count actionable tag, \
projected as an owned String",
);
assert!(
!public.privilege_dominant,
"ptrace 1/3 < 50% → privilege_dominant false",
);
}
#[test]
fn to_public_dominant_failure_is_none_when_no_failures() {
let s = make_summary(0, &[("jemalloc-not-found", 12)], &[]);
let public = s.to_public();
assert_eq!(public.failed, 0);
assert!(
public.dominant_failure.is_none(),
"no actionable failures means dominant_failure is None; \
got {:?}",
public.dominant_failure,
);
assert!(
!public.privilege_dominant,
"no failures means privilege_dominant is false",
);
}
#[test]
fn to_public_privilege_dominant_when_ptrace_crosses_threshold() {
let s = make_summary(4, &[], &[("ptrace-seize", 4)]);
let public = s.to_public();
assert_eq!(public.failed, 4);
assert!(
public.privilege_dominant,
"ptrace 4/4 ≥ 50% → privilege_dominant true",
);
let s = make_summary(4, &[("dwarf-parse-failure", 2)], &[("ptrace-seize", 2)]);
let public = s.to_public();
assert!(
public.privilege_dominant,
"ptrace 2/4 = 50% boundary → privilege_dominant true (>= threshold)",
);
let s = make_summary(4, &[("dwarf-parse-failure", 3)], &[("ptrace-seize", 1)]);
let public = s.to_public();
assert!(
!public.privilege_dominant,
"ptrace 1/4 < 50% → privilege_dominant false",
);
}
#[test]
fn to_public_privilege_dominant_ptrace_interrupt_and_edge_cases() {
let s = make_summary(2, &[], &[("ptrace-interrupt", 2)]);
let public = s.to_public();
assert!(
public.privilege_dominant,
"ptrace-interrupt 2/2 ≥ 50% → privilege_dominant true \
(matches! arm covers ptrace-interrupt as well as ptrace-seize)",
);
let s = make_summary(
4,
&[("dwarf-parse-failure", 2)],
&[("ptrace-seize", 1), ("ptrace-interrupt", 1)],
);
let public = s.to_public();
assert!(
public.privilege_dominant,
"summed ptrace 2/4 ≥ 50% → privilege_dominant true",
);
assert_eq!(
public.dominant_failure.as_deref(),
Some("dwarf-parse-failure"),
"dominant_failure names the non-ptrace tag that won the \
single-tag plurality while privilege_dominant is true — \
proves the two fields are independent",
);
let s = make_summary(1, &[], &[("ptrace-seize", 1)]);
let public = s.to_public();
assert!(
public.privilege_dominant,
"ptrace 1/1 ≥ 50% → privilege_dominant true at the \
smallest-failed boundary",
);
let s = make_summary(1, &[("dwarf-parse-failure", 1)], &[]);
let public = s.to_public();
assert!(
!public.privilege_dominant,
"no ptrace tags with failed == 1 → privilege_dominant \
false (total_ptrace == 0 keeps the gate closed)",
);
assert!(
!CtprofProbeSummary::default().privilege_dominant,
"CtprofProbeSummary::default().privilege_dominant \
must be false",
);
let s = make_summary(
10,
&[("dwarf-parse-failure", 3), ("jemalloc-in-dso", 3)],
&[("ptrace-seize", 4)],
);
let public = s.to_public();
assert!(
!public.privilege_dominant,
"ptrace 4/10 < 50% → privilege_dominant false",
);
assert_eq!(
public.dominant_failure.as_deref(),
Some("ptrace-seize"),
"dominant_failure names a ptrace tag while privilege_dominant \
is false — converse of the independence claim",
);
}
#[test]
fn remediation_hint_returns_some_iff_privilege_dominant() {
let ps = CtprofProbeSummary {
privilege_dominant: true,
..Default::default()
};
assert_eq!(
ps.remediation_hint(),
Some(PTRACE_EPERM_HINT),
"privilege_dominant=true must surface the same hint text \
the tracing summary prints",
);
let ps = CtprofProbeSummary::default();
assert!(
!ps.privilege_dominant,
"default privilege_dominant must be false (sanity)",
);
assert_eq!(
ps.remediation_hint(),
None,
"privilege_dominant=false → remediation_hint returns None",
);
}
#[traced_test]
#[test]
fn summary_emits_clean_line_when_no_failures() {
let summary = make_summary(0, &[("jemalloc-not-found", 12)], &[]);
emit_probe_summary(&summary);
assert!(logs_contain("ctprof probe:"));
assert!(logs_contain("0 tgids walked"));
assert!(logs_contain("0 failed"));
assert!(
!logs_contain("(dominant:"),
"no failures means the dominant-tag clause is omitted",
);
assert!(
!logs_contain("hint:"),
"no failures means the EPERM hint is omitted",
);
}
#[traced_test]
#[test]
fn summary_emits_privilege_hint_when_ptrace_dominates() {
let summary = ProbeSummary {
tgids_walked: 4,
jemalloc_detected: 2,
probed_ok: 0,
failed: 4,
attach_tag_counts: BTreeMap::new(),
probe_tag_counts: [("ptrace-seize", 4u64)].into_iter().collect(),
};
emit_probe_summary(&summary);
assert!(logs_contain("(dominant: ptrace-seize"));
assert!(logs_contain("hint:"));
assert!(logs_contain("$(which ktstr)"));
assert!(logs_contain("cap_sys_ptrace"));
assert!(logs_contain("yama.ptrace_scope"));
}
#[traced_test]
#[test]
fn summary_emits_privilege_hint_when_ptrace_interrupt_dominates() {
let summary = ProbeSummary {
tgids_walked: 4,
jemalloc_detected: 2,
probed_ok: 0,
failed: 4,
attach_tag_counts: BTreeMap::new(),
probe_tag_counts: [("ptrace-interrupt", 4u64)].into_iter().collect(),
};
emit_probe_summary(&summary);
assert!(logs_contain("(dominant: ptrace-interrupt"));
assert!(logs_contain("hint:"));
assert!(logs_contain("$(which ktstr)"));
assert!(logs_contain("cap_sys_ptrace"));
assert!(logs_contain("yama.ptrace_scope"));
}
#[traced_test]
#[test]
fn summary_omits_privilege_hint_when_debuginfo_failures_lead() {
let summary = ProbeSummary {
tgids_walked: 5,
jemalloc_detected: 3,
probed_ok: 0,
failed: 5,
attach_tag_counts: [("dwarf-parse-failure", 4u64)].into_iter().collect(),
probe_tag_counts: [("ptrace-seize", 1u64)].into_iter().collect(),
};
emit_probe_summary(&summary);
assert!(logs_contain("(dominant: dwarf-parse-failure"));
assert!(
!logs_contain("hint:"),
"DWARF-dominated failures must NOT trigger the privilege \
hint — only privilege failures earn the privilege remediation",
);
}
#[traced_test]
#[test]
fn parse_summary_emits_clean_line_when_no_failures() {
let tally = ParseTally::default();
emit_parse_summary(&tally);
assert!(logs_contain("ctprof parse:"));
assert!(logs_contain("0 tids walked"));
assert!(logs_contain("0 read failures"));
assert!(
!logs_contain("(dominant:"),
"no failures means the dominant clause is omitted",
);
assert!(
!logs_contain("hint:"),
"no failures means the kconfig hint is omitted",
);
assert!(
!logs_contain("negative-dotted"),
"zero negative-dotted values means the negative \
clause is omitted",
);
}
#[traced_test]
#[test]
fn parse_summary_emits_negative_dotted_clause_when_present() {
let mut tally = ParseTally {
tids_walked: 5,
..ParseTally::default()
};
tally.record_negative_dotted();
tally.record_negative_dotted();
tally.record_negative_dotted();
tally.commit_pending();
emit_parse_summary(&tally);
assert!(
logs_contain("3 negative-dotted values"),
"negative-dotted clause must surface the count when \
the tally is non-zero — the operator-visibility \
motivation depends on this rendering",
);
assert!(logs_contain("0 read failures"));
}
#[traced_test]
#[test]
fn parse_summary_emits_kconfig_hint_when_dominant() {
let mut tally = ParseTally {
tids_walked: 100,
..ParseTally::default()
};
for _ in 0..60 {
tally.record_failure("schedstat");
}
for _ in 0..40 {
tally.record_failure("io");
}
tally.commit_pending();
emit_parse_summary(&tally);
assert!(logs_contain("(dominant: schedstat)"));
assert!(logs_contain("hint:"));
assert!(logs_contain("CONFIG_SCHEDSTATS"));
assert!(logs_contain("CONFIG_TASK_IO_ACCOUNTING"));
}
#[traced_test]
#[test]
fn try_attach_probe_for_tgid_at_warns_on_pid_missing() {
let mut summary = ProbeSummary::default();
let probe = try_attach_probe_for_tgid_at(Path::new(DEFAULT_PROC_ROOT), 0, &mut summary);
assert!(probe.is_none(), "pid 0 must not produce a probe");
assert!(logs_contain("attach failed"));
assert!(logs_contain("pid-missing"));
assert_eq!(summary.failed, 1);
assert_eq!(summary.jemalloc_detected, 0);
assert_eq!(summary.tgids_walked, 1);
assert_eq!(
summary.attach_tag_counts.get("pid-missing").copied(),
Some(1),
"PidMissing tag must increment its bucket",
);
}
#[traced_test]
#[test]
fn try_attach_probe_for_tgid_at_debugs_on_non_jemalloc_target() {
let mut child = match std::process::Command::new("sleep")
.arg("3")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
{
Ok(c) => c,
Err(_) => {
eprintln!("skipping — /bin/sleep unavailable");
return;
}
};
let pid = child.id() as i32;
let exe_link = std::path::PathBuf::from(format!("/proc/{pid}/exe"));
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1);
while std::fs::read_link(&exe_link).is_err() {
if std::time::Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
panic!(
"/proc/{pid}/exe did not become readable within 1s — \
kernel did not surface the freshly-forked child's exe \
symlink in time, the test cannot proceed"
);
}
std::thread::sleep(std::time::Duration::from_millis(1));
}
let mut summary = ProbeSummary::default();
let probe = try_attach_probe_for_tgid_at(Path::new(DEFAULT_PROC_ROOT), pid, &mut summary);
let _ = child.kill();
let _ = child.wait();
assert!(probe.is_none(), "sleep is not jemalloc-linked");
assert_eq!(summary.tgids_walked, 1);
assert_eq!(summary.jemalloc_detected, 0);
assert_eq!(
summary.failed, 0,
"jemalloc-not-found must NOT count as failure — it's the \
expected outcome for the bulk of system processes",
);
assert_eq!(
summary.attach_tag_counts.get("jemalloc-not-found").copied(),
Some(1),
);
assert!(
logs_contain("attach skipped"),
"JemallocNotFound must emit the debug 'attach skipped' \
event so log filters can route it separately from \
actionable warnings",
);
assert!(
!logs_contain("attach failed"),
"jemalloc-not-found must NOT emit the warn 'attach failed' \
event — that level is reserved for actionable failures",
);
}
fn stage_minimal_proc_for_parse(root: &Path, tgid: i32, tid: i32) {
use std::fs;
let tgid_dir = root.join(tgid.to_string());
let task_dir = tgid_dir.join("task").join(tid.to_string());
fs::create_dir_all(&task_dir).unwrap();
fs::write(tgid_dir.join("comm"), "p\n").unwrap();
fs::write(task_dir.join("comm"), "live\n").unwrap();
let stat_line = format!(
"{tid} (live) R 1 2 3 4 5 6 7 0 8 0 10 11 12 13 14 0 1 0 \
555555 100 200 300 400 500 600 700 800 900 1000 1100 \
1200 1300 1400 1500 1600 1700 1800 0\n"
);
fs::write(task_dir.join("stat"), stat_line).unwrap();
fs::write(task_dir.join("schedstat"), "0 0 0\n").unwrap();
fs::write(
task_dir.join("status"),
"voluntary_ctxt_switches:\t0\n\
nonvoluntary_ctxt_switches:\t0\n",
)
.unwrap();
fs::write(task_dir.join("io"), "rchar: 0\n").unwrap();
fs::write(task_dir.join("sched"), "").unwrap();
fs::write(task_dir.join("cgroup"), "0::/\n").unwrap();
}
#[test]
fn parse_summary_records_schedstat_failure() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 5050;
let tid: i32 = 5051;
stage_minimal_proc_for_parse(proc_tmp.path(), tgid, tid);
std::fs::remove_file(
proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("schedstat"),
)
.unwrap();
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
tally_opt.as_mut().unwrap().tids_walked += 1;
let _ = capture_thread_at_with_tally(
proc_tmp.path(),
tgid,
tid,
"p",
"live",
false,
&mut tally_opt,
);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(summary.tids_walked, 1);
assert_eq!(summary.read_failures, 1);
assert_eq!(summary.read_failures_by_file.get("schedstat"), Some(&1));
assert!(!summary.read_failures_by_file.contains_key("stat"));
assert!(!summary.read_failures_by_file.contains_key("io"));
}
#[test]
fn parse_summary_records_io_failure() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 5060;
let tid: i32 = 5061;
stage_minimal_proc_for_parse(proc_tmp.path(), tgid, tid);
std::fs::remove_file(
proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("io"),
)
.unwrap();
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
tally_opt.as_mut().unwrap().tids_walked += 1;
let _ = capture_thread_at_with_tally(
proc_tmp.path(),
tgid,
tid,
"p",
"live",
false,
&mut tally_opt,
);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(summary.read_failures_by_file.get("io"), Some(&1));
}
#[test]
fn parse_summary_clean_proc_yields_empty_map() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 5070;
let tid: i32 = 5071;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
tally_opt.as_mut().unwrap().tids_walked += 1;
let _ = capture_thread_at_with_tally(
proc_tmp.path(),
tgid,
tid,
"p",
"live",
false,
&mut tally_opt,
);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(summary.tids_walked, 1);
assert_eq!(summary.read_failures, 0);
assert!(
summary.read_failures_by_file.is_empty(),
"clean procfs must yield an empty map, got {:?}",
summary.read_failures_by_file,
);
assert!(summary.dominant_read_failure.is_none());
assert!(!summary.kernel_config_dominant);
}
#[test]
fn parse_summary_excludes_ghost_filtered_tids() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 5080;
let tid: i32 = 5081;
let task_dir = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string());
std::fs::create_dir_all(&task_dir).unwrap();
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
tally_opt.as_mut().unwrap().tids_walked += 1;
let t =
capture_thread_at_with_tally(proc_tmp.path(), tgid, tid, "", "", false, &mut tally_opt);
if t.comm.is_empty() && t.start_time_clock_ticks == 0 {
tally_opt.as_mut().unwrap().discard_pending();
} else {
tally_opt.as_mut().unwrap().commit_pending();
}
let summary = tally.to_public();
assert_eq!(
summary.read_failures, 0,
"ghost-filtered tid must NOT contribute to read_failures; \
got {} failures (the discard_pending unwind is broken)",
summary.read_failures,
);
assert!(summary.read_failures_by_file.is_empty());
assert_eq!(summary.tids_walked, 1);
}
#[test]
fn parse_summary_serde_round_trip() {
let mut by_file = BTreeMap::new();
by_file.insert("schedstat".to_string(), 100);
by_file.insert("io".to_string(), 50);
let summary = CtprofParseSummary {
tids_walked: 1000,
read_failures: 150,
read_failures_by_file: by_file,
dominant_read_failure: Some("schedstat".to_string()),
kernel_config_dominant: true,
negative_dotted_values: 7,
};
let json = serde_json::to_string(&summary).unwrap();
let back: CtprofParseSummary = serde_json::from_str(&json).unwrap();
assert_eq!(back.tids_walked, 1000);
assert_eq!(back.read_failures, 150);
assert_eq!(back.read_failures_by_file.get("schedstat"), Some(&100));
assert_eq!(back.read_failures_by_file.get("io"), Some(&50));
assert_eq!(back.dominant_read_failure.as_deref(), Some("schedstat"));
assert!(back.kernel_config_dominant);
assert_eq!(
back.negative_dotted_values, 7,
"negative_dotted_values surfaces in the public surface \
and round-trips through JSON",
);
}
#[test]
fn parse_summary_dominant_picks_max_file_kind() {
let mut tally = ParseTally::default();
for _ in 0..10 {
tally.record_failure("schedstat");
}
for _ in 0..5 {
tally.record_failure("io");
}
for _ in 0..5 {
tally.record_failure("status");
}
tally.commit_pending();
let summary = tally.to_public();
assert_eq!(summary.dominant_read_failure.as_deref(), Some("schedstat"));
let mut tally2 = ParseTally::default();
for _ in 0..3 {
tally2.record_failure("io");
}
for _ in 0..3 {
tally2.record_failure("status");
}
tally2.commit_pending();
let summary2 = tally2.to_public();
assert_eq!(
summary2.dominant_read_failure.as_deref(),
Some("io"),
"tie must resolve to alphabetically-earlier tag — \
`io` beats `status`",
);
}
#[test]
fn parse_summary_kernel_config_hint_gate() {
let mut tally = ParseTally::default();
for _ in 0..5 {
tally.record_failure("schedstat");
}
for _ in 0..5 {
tally.record_failure("status");
}
tally.commit_pending();
let summary = tally.to_public();
assert!(
summary.kernel_config_dominant,
"50% kconfig share must hit the gate (>= 50% boundary inclusive)",
);
assert!(summary.kernel_config_hint().is_some());
let mut tally2 = ParseTally::default();
tally2.record_failure("schedstat");
for _ in 0..9 {
tally2.record_failure("status");
}
tally2.commit_pending();
let summary2 = tally2.to_public();
assert!(!summary2.kernel_config_dominant);
assert!(summary2.kernel_config_hint().is_none());
let summary3 = ParseTally::default().to_public();
assert!(!summary3.kernel_config_dominant);
assert!(summary3.kernel_config_hint().is_none());
}
#[test]
fn parse_summary_dominant_none_when_zero_failures() {
let summary = ParseTally::default().to_public();
assert_eq!(summary.read_failures, 0);
assert!(summary.dominant_read_failure.is_none());
}
#[test]
fn capture_with_synthetic_tree_yields_no_parse_summary() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 5090;
let tid: i32 = 5091;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert!(
snap.parse_summary.is_none(),
"use_syscall_affinity=false must skip parse_summary; \
got Some — production-gate discipline is broken",
);
}
#[test]
fn capture_with_phase1_loadavg_missing_does_not_panic() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
assert!(
snap.threads.is_empty(),
"missing loadavg + empty proc_root → empty snapshot, \
got {} threads",
snap.threads.len(),
);
}
#[test]
fn capture_with_phase1_loadavg_malformed_does_not_panic() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
std::fs::write(proc_tmp.path().join("loadavg"), "not_a_number\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
assert!(
snap.threads.is_empty(),
"malformed loadavg → 0.0 default, empty proc_root → empty \
snapshot; got {} threads",
snap.threads.len(),
);
}
#[test]
fn capture_with_non_utf8_comm_treated_as_absent() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 6161;
let tid: i32 = 6162;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let comm_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("comm");
std::fs::write(&comm_path, [0xFF, 0xFE]).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"non-UTF-8 comm folds to empty; ghost filter does NOT \
fire because start_time is intact; thread still lands. \
got {} threads",
snap.threads.len(),
);
assert_eq!(
snap.threads[0].comm, "",
"non-UTF-8 comm must collapse to empty (read_to_string \
returns Err on invalid UTF-8)",
);
assert_ne!(
snap.threads[0].start_time_clock_ticks, 0,
"start_time must be intact for the ghost filter NOT to fire",
);
}
#[test]
fn capture_with_cgroup_path_traversal_yields_zero_stats() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 6262;
let tid: i32 = 6263;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let cgroup_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("cgroup");
std::fs::write(&cgroup_path, "0::/../escape\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
assert_eq!(
snap.threads[0].cgroup, "/../escape",
"traversal string round-trips verbatim through ThreadState.cgroup",
);
let stats = snap
.cgroup_stats
.get("/../escape")
.expect("non-empty cgroup string must seed the stats map");
assert_eq!(
stats.cpu.usage_usec, 0,
"no matching cgroup dir under cgroup_root → all-zero stats; \
a traversal that escaped the cgroup_root would have \
non-zero values from the parent directory",
);
}
#[test]
fn capture_with_empty_cpus_allowed_yields_empty_affinity() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 6363;
let tid: i32 = 6364;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let status_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("status");
let status = "Cpus_allowed_list:\t\n\
voluntary_ctxt_switches:\t1\n\
nonvoluntary_ctxt_switches:\t1\n";
std::fs::write(&status_path, status).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(snap.threads.len(), 1);
let t = &snap.threads[0];
assert!(
t.cpu_affinity.0.is_empty(),
"empty Cpus_allowed_list value → parse_cpu_list returns \
None at the empty-input guard → cpu_affinity empty; \
got {} elements",
t.cpu_affinity.0.len(),
);
assert_eq!(
t.voluntary_csw,
MonotonicCount(1),
"empty cpulist must not break csw parsing on the same \
status file",
);
}
#[test]
fn capture_with_empty_comm_nonzero_start_time_keeps_thread() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 6464;
let tid: i32 = 6465;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let comm_path = proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("comm");
std::fs::write(&comm_path, " \n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"empty comm + nonzero start_time MUST NOT fire ghost filter \
(AND-semantics requires both empty); got {} threads",
snap.threads.len(),
);
let t = &snap.threads[0];
assert_eq!(t.comm, "", "empty-comm thread surfaces with empty comm");
assert_ne!(
t.start_time_clock_ticks, 0,
"start_time must be non-zero so the AND-clause has a `false` half",
);
}
#[test]
fn parse_summary_all_ghosts_yields_nonzero_tids_walked_zero_failures() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7070;
let n: u64 = 4;
let tgid_dir = proc_tmp.path().join(tgid.to_string());
for k in 0..n {
let tid = (tgid as u64 + 1 + k) as i32;
std::fs::create_dir_all(tgid_dir.join("task").join(tid.to_string())).unwrap();
}
std::fs::write(proc_tmp.path().join("loadavg"), "0.10 0.05 0.01 1/1 1\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
assert!(
snap.threads.is_empty(),
"every tid is ghost-filtered → threads must be empty, got {}",
snap.threads.len(),
);
let summary = snap
.parse_summary
.expect("use_syscall_affinity=true must populate parse_summary");
assert_eq!(
summary.tids_walked, n,
"tids_walked counts every walk attempt, not committed reads — \
got {}, want {n}",
summary.tids_walked,
);
assert_eq!(
summary.read_failures, 0,
"ghost-filtered tids' failures unwind via discard_pending — \
got {} failures, want 0",
summary.read_failures,
);
assert!(
summary.read_failures_by_file.is_empty(),
"no failure bucket survives the ghost-filter unwind, got {:?}",
summary.read_failures_by_file,
);
assert!(
summary.dominant_read_failure.is_none(),
"zero failures → dominant_read_failure is None, got {:?}",
summary.dominant_read_failure,
);
assert!(
!summary.kernel_config_dominant,
"zero failures → kernel_config_dominant is false, got true",
);
}
#[test]
fn parse_summary_kernel_config_token_list_pinned() {
let kconfig_tokens: &[&'static str] = &["schedstat", "io"];
for tag in kconfig_tokens {
let mut tally = ParseTally::default();
tally.record_failure(tag);
tally.commit_pending();
let summary = tally.to_public();
assert!(
summary.kernel_config_dominant,
"solo `{tag}` failure must flip kernel_config_dominant true \
(kconfig share = 100%); got false — token dropped from the \
kconfig set",
);
}
let non_kconfig_tokens: &[&'static str] = &["stat", "status", "sched", "cgroup"];
for tag in non_kconfig_tokens {
let mut tally = ParseTally::default();
tally.record_failure(tag);
tally.commit_pending();
let summary = tally.to_public();
assert!(
!summary.kernel_config_dominant,
"solo `{tag}` failure must keep kernel_config_dominant false \
(kconfig share = 0%); got true — token incorrectly added to \
the kconfig set",
);
}
}
#[test]
fn parse_summary_aggregates_across_multiple_tids() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7080;
let tid_a: i32 = 7081;
let tid_b: i32 = 7082;
stage_minimal_proc_for_parse(proc_tmp.path(), tgid, tid_a);
let tgid_dir = proc_tmp.path().join(tgid.to_string());
let task_b = tgid_dir.join("task").join(tid_b.to_string());
std::fs::create_dir_all(&task_b).unwrap();
std::fs::write(task_b.join("comm"), "live\n").unwrap();
let stat_line = format!(
"{tid_b} (live) R 1 2 3 4 5 6 7 0 8 0 10 11 12 13 14 0 1 0 \
555555 100 200 300 400 500 600 700 800 900 1000 1100 \
1200 1300 1400 1500 1600 1700 1800 0\n"
);
std::fs::write(task_b.join("stat"), stat_line).unwrap();
std::fs::write(task_b.join("schedstat"), "0 0 0\n").unwrap();
std::fs::write(
task_b.join("status"),
"voluntary_ctxt_switches:\t0\n\
nonvoluntary_ctxt_switches:\t0\n",
)
.unwrap();
std::fs::write(task_b.join("io"), "rchar: 0\n").unwrap();
std::fs::write(task_b.join("sched"), "").unwrap();
std::fs::write(task_b.join("cgroup"), "0::/\n").unwrap();
std::fs::remove_file(tgid_dir.join("task").join(tid_a.to_string()).join("io")).unwrap();
std::fs::remove_file(task_b.join("schedstat")).unwrap();
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
for tid in [tid_a, tid_b] {
tally_opt.as_mut().unwrap().tids_walked += 1;
let _ = capture_thread_at_with_tally(
proc_tmp.path(),
tgid,
tid,
"p",
"live",
false,
&mut tally_opt,
);
tally_opt.as_mut().unwrap().commit_pending();
}
let summary = tally.to_public();
assert_eq!(summary.tids_walked, 2);
assert_eq!(
summary.read_failures, 2,
"two tids, one failure each → 2 total; got {}",
summary.read_failures,
);
assert_eq!(
summary.read_failures_by_file.get("io"),
Some(&1),
"tid_a missing io → io bucket = 1; got {:?}",
summary.read_failures_by_file.get("io"),
);
assert_eq!(
summary.read_failures_by_file.get("schedstat"),
Some(&1),
"tid_b missing schedstat → schedstat bucket = 1; got {:?}",
summary.read_failures_by_file.get("schedstat"),
);
}
#[test]
fn parse_summary_records_cgroup_failure() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7090;
let tid: i32 = 7091;
stage_minimal_proc_for_parse(proc_tmp.path(), tgid, tid);
std::fs::remove_file(
proc_tmp
.path()
.join(tgid.to_string())
.join("task")
.join(tid.to_string())
.join("cgroup"),
)
.unwrap();
let mut tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = Some(&mut tally);
tally_opt.as_mut().unwrap().tids_walked += 1;
let _ = capture_thread_at_with_tally(
proc_tmp.path(),
tgid,
tid,
"p",
"live",
false,
&mut tally_opt,
);
tally_opt.as_mut().unwrap().commit_pending();
let summary = tally.to_public();
assert_eq!(
summary.read_failures_by_file.get("cgroup"),
Some(&1),
"missing cgroup file → cgroup bucket = 1; got {:?}",
summary.read_failures_by_file.get("cgroup"),
);
}
#[test]
fn capture_with_production_gate_populates_parse_summary() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7100;
let tid: i32 = 7101;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
std::fs::write(proc_tmp.path().join("loadavg"), "0.10 0.05 0.01 1/1 1\n").unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
assert!(
snap.parse_summary.is_some(),
"use_syscall_affinity=true must populate parse_summary on \
the assembled snapshot — production-gate wiring is broken",
);
}
#[test]
fn capture_with_non_utf8_pcomm_treated_as_absent() {
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
let tgid: i32 = 7110;
let tid: i32 = 7111;
stage_synthetic_proc(proc_tmp.path(), tgid, tid, "p", "live");
let pcomm_path = proc_tmp.path().join(tgid.to_string()).join("comm");
std::fs::write(&pcomm_path, [0xFF, 0xFE]).unwrap();
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), false);
assert_eq!(
snap.threads.len(),
1,
"non-UTF-8 pcomm must not break the capture — the thread still \
lands; got {} threads",
snap.threads.len(),
);
assert_eq!(
snap.threads[0].pcomm, "",
"non-UTF-8 pcomm collapses to empty (read_to_string returns Err \
on invalid UTF-8 and unwrap_or_default → \"\")",
);
}
#[test]
fn capture_with_rayon_worker_panic_is_caught_and_surfaced() {
static PANIC_INJECT_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = PANIC_INJECT_TEST_LOCK
.lock()
.unwrap_or_else(|e| e.into_inner());
let proc_tmp = tempfile::TempDir::new().unwrap();
let cgroup_tmp = tempfile::TempDir::new().unwrap();
let sys_tmp = tempfile::TempDir::new().unwrap();
std::fs::write(proc_tmp.path().join("loadavg"), "0.0 0.0 0.0 1/1 1\n").unwrap();
let survivor_tgid: i32 = 99000;
let survivor_tid: i32 = 99002;
let panic_tgid: i32 = 99001;
let panic_tid: i32 = 99003;
stage_synthetic_proc(
proc_tmp.path(),
survivor_tgid,
survivor_tid,
"ok-pcomm",
"ok-comm",
);
stage_synthetic_proc(
proc_tmp.path(),
panic_tgid,
panic_tid,
"panic-pcomm",
"panic-comm",
);
let saved_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(|_info| {}));
PANIC_INJECT_TGID.store(panic_tgid, std::sync::atomic::Ordering::Release);
let snap = capture_with(proc_tmp.path(), cgroup_tmp.path(), sys_tmp.path(), true);
PANIC_INJECT_TGID.store(0, std::sync::atomic::Ordering::Release);
std::panic::set_hook(saved_hook);
assert_eq!(
snap.threads.len(),
2,
"rayon worker panic must not block phase 2 — both staged tgids \
walk their threads; got {} threads",
snap.threads.len(),
);
let summary = snap
.probe_summary
.expect("use_syscall_affinity=true must populate probe_summary");
assert!(
summary.failed >= 1,
"worker-panic must count as a failure; got failed={}",
summary.failed,
);
assert_eq!(
summary.dominant_failure.as_deref(),
Some("worker-panic"),
"worker-panic is the only ACTIONABLE failure tag in this \
scenario. The survivor's synthetic /proc has no `exe` \
symlink, so attach short-circuits with `readlink-failure` \
— the dominant-tag comparator filters that benign tag out \
(same `matches!` arm `record_attach_outcome` uses to log it \
at debug rather than warn), leaving worker-panic as the \
sole candidate. A regression that demoted worker-panic \
out of the dominant set, or that miscounted the panic, \
would fail here. Got {:?}",
summary.dominant_failure,
);
}
}