use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CtprofSnapshot {
pub captured_at_unix_ns: u64,
pub host: Option<crate::host_context::HostContext>,
pub threads: Vec<ThreadState>,
pub cgroup_stats: BTreeMap<String, CgroupStats>,
pub probe_summary: Option<CtprofProbeSummary>,
pub parse_summary: Option<CtprofParseSummary>,
pub taskstats_summary: Option<crate::taskstats::TaskstatsSummary>,
pub psi: Psi,
pub sched_ext: Option<SchedExtSysfs>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CtprofProbeSummary {
pub tgids_walked: u64,
pub jemalloc_detected: u64,
pub probed_ok: u64,
pub failed: u64,
pub dominant_failure: Option<String>,
pub privilege_dominant: bool,
}
impl CtprofProbeSummary {
pub fn remediation_hint(&self) -> Option<&'static str> {
if self.privilege_dominant {
Some(PTRACE_EPERM_HINT)
} else {
None
}
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CtprofParseSummary {
pub tids_walked: u64,
pub read_failures: u64,
pub read_failures_by_file: BTreeMap<String, u64>,
pub dominant_read_failure: Option<String>,
pub kernel_config_dominant: bool,
pub negative_dotted_values: u64,
}
impl CtprofParseSummary {
pub fn kernel_config_hint(&self) -> Option<&'static str> {
if self.kernel_config_dominant {
Some(PARSE_KCONFIG_HINT)
} else {
None
}
}
}
const PARSE_KCONFIG_HINT: &str = "hint: schedstat / io read failures dominate — \
kernel may be built without CONFIG_SCHEDSTATS \
and/or CONFIG_TASK_IO_ACCOUNTING";
fn default_state_char() -> char {
'~'
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct ThreadState {
pub tid: u32,
pub tgid: u32,
pub pcomm: String,
pub comm: String,
pub cgroup: String,
pub start_time_clock_ticks: u64,
pub policy: crate::metric_types::CategoricalString,
pub nice: crate::metric_types::OrdinalI32,
pub cpu_affinity: crate::metric_types::CpuSet,
pub processor: crate::metric_types::OrdinalI32,
#[serde(default = "default_state_char")]
pub state: char,
pub ext_enabled: bool,
pub run_time_ns: crate::metric_types::MonotonicNs,
pub wait_time_ns: crate::metric_types::MonotonicNs,
pub timeslices: crate::metric_types::MonotonicCount,
pub voluntary_csw: crate::metric_types::MonotonicCount,
pub nonvoluntary_csw: crate::metric_types::MonotonicCount,
pub nr_wakeups: crate::metric_types::MonotonicCount,
pub nr_wakeups_local: crate::metric_types::MonotonicCount,
pub nr_wakeups_remote: crate::metric_types::MonotonicCount,
pub nr_wakeups_sync: crate::metric_types::MonotonicCount,
pub nr_wakeups_migrate: crate::metric_types::MonotonicCount,
pub nr_wakeups_affine: crate::metric_types::MonotonicCount,
pub nr_wakeups_affine_attempts: crate::metric_types::MonotonicCount,
pub nr_migrations: crate::metric_types::MonotonicCount,
pub nr_forced_migrations: crate::metric_types::MonotonicCount,
pub nr_failed_migrations_affine: crate::metric_types::MonotonicCount,
pub nr_failed_migrations_running: crate::metric_types::MonotonicCount,
pub nr_failed_migrations_hot: crate::metric_types::MonotonicCount,
pub wait_sum: crate::metric_types::MonotonicNs,
pub wait_count: crate::metric_types::MonotonicCount,
pub wait_max: crate::metric_types::PeakNs,
pub voluntary_sleep_ns: crate::metric_types::MonotonicNs,
pub sleep_max: crate::metric_types::PeakNs,
pub block_sum: crate::metric_types::MonotonicNs,
pub block_max: crate::metric_types::PeakNs,
pub iowait_sum: crate::metric_types::MonotonicNs,
pub iowait_count: crate::metric_types::MonotonicCount,
pub exec_max: crate::metric_types::PeakNs,
pub slice_max: crate::metric_types::PeakNs,
pub allocated_bytes: crate::metric_types::Bytes,
pub deallocated_bytes: crate::metric_types::Bytes,
pub minflt: crate::metric_types::MonotonicCount,
pub majflt: crate::metric_types::MonotonicCount,
pub utime_clock_ticks: crate::metric_types::ClockTicks,
pub stime_clock_ticks: crate::metric_types::ClockTicks,
pub priority: crate::metric_types::OrdinalI32,
pub rt_priority: crate::metric_types::OrdinalU32,
pub core_forceidle_sum: crate::metric_types::MonotonicNs,
pub fair_slice_ns: crate::metric_types::GaugeNs,
pub nr_threads: crate::metric_types::GaugeCount,
pub smaps_rollup_kb: BTreeMap<String, u64>,
pub rchar: crate::metric_types::Bytes,
pub wchar: crate::metric_types::Bytes,
pub syscr: crate::metric_types::MonotonicCount,
pub syscw: crate::metric_types::MonotonicCount,
pub read_bytes: crate::metric_types::Bytes,
pub write_bytes: crate::metric_types::Bytes,
pub cancelled_write_bytes: crate::metric_types::Bytes,
pub cpu_delay_count: crate::metric_types::MonotonicCount,
pub cpu_delay_total_ns: crate::metric_types::MonotonicNs,
pub cpu_delay_max_ns: crate::metric_types::PeakNs,
pub cpu_delay_min_ns: crate::metric_types::PeakNs,
pub blkio_delay_count: crate::metric_types::MonotonicCount,
pub blkio_delay_total_ns: crate::metric_types::MonotonicNs,
pub blkio_delay_max_ns: crate::metric_types::PeakNs,
pub blkio_delay_min_ns: crate::metric_types::PeakNs,
pub swapin_delay_count: crate::metric_types::MonotonicCount,
pub swapin_delay_total_ns: crate::metric_types::MonotonicNs,
pub swapin_delay_max_ns: crate::metric_types::PeakNs,
pub swapin_delay_min_ns: crate::metric_types::PeakNs,
pub freepages_delay_count: crate::metric_types::MonotonicCount,
pub freepages_delay_total_ns: crate::metric_types::MonotonicNs,
pub freepages_delay_max_ns: crate::metric_types::PeakNs,
pub freepages_delay_min_ns: crate::metric_types::PeakNs,
pub thrashing_delay_count: crate::metric_types::MonotonicCount,
pub thrashing_delay_total_ns: crate::metric_types::MonotonicNs,
pub thrashing_delay_max_ns: crate::metric_types::PeakNs,
pub thrashing_delay_min_ns: crate::metric_types::PeakNs,
pub compact_delay_count: crate::metric_types::MonotonicCount,
pub compact_delay_total_ns: crate::metric_types::MonotonicNs,
pub compact_delay_max_ns: crate::metric_types::PeakNs,
pub compact_delay_min_ns: crate::metric_types::PeakNs,
pub wpcopy_delay_count: crate::metric_types::MonotonicCount,
pub wpcopy_delay_total_ns: crate::metric_types::MonotonicNs,
pub wpcopy_delay_max_ns: crate::metric_types::PeakNs,
pub wpcopy_delay_min_ns: crate::metric_types::PeakNs,
pub irq_delay_count: crate::metric_types::MonotonicCount,
pub irq_delay_total_ns: crate::metric_types::MonotonicNs,
pub irq_delay_max_ns: crate::metric_types::PeakNs,
pub irq_delay_min_ns: crate::metric_types::PeakNs,
pub hiwater_rss_bytes: crate::metric_types::PeakBytes,
pub hiwater_vm_bytes: crate::metric_types::PeakBytes,
}
impl Default for ThreadState {
fn default() -> Self {
Self {
tid: 0,
tgid: 0,
pcomm: String::new(),
comm: String::new(),
cgroup: String::new(),
start_time_clock_ticks: 0,
policy: Default::default(),
nice: crate::metric_types::OrdinalI32(0),
cpu_affinity: Default::default(),
processor: Default::default(),
state: default_state_char(),
ext_enabled: false,
run_time_ns: Default::default(),
wait_time_ns: Default::default(),
timeslices: Default::default(),
voluntary_csw: Default::default(),
nonvoluntary_csw: Default::default(),
nr_wakeups: Default::default(),
nr_wakeups_local: Default::default(),
nr_wakeups_remote: Default::default(),
nr_wakeups_sync: Default::default(),
nr_wakeups_migrate: Default::default(),
nr_wakeups_affine: Default::default(),
nr_wakeups_affine_attempts: Default::default(),
nr_migrations: Default::default(),
nr_forced_migrations: Default::default(),
nr_failed_migrations_affine: Default::default(),
nr_failed_migrations_running: Default::default(),
nr_failed_migrations_hot: Default::default(),
wait_sum: Default::default(),
wait_count: Default::default(),
wait_max: Default::default(),
voluntary_sleep_ns: Default::default(),
sleep_max: Default::default(),
block_sum: Default::default(),
block_max: Default::default(),
iowait_sum: Default::default(),
iowait_count: Default::default(),
exec_max: Default::default(),
slice_max: Default::default(),
allocated_bytes: Default::default(),
deallocated_bytes: Default::default(),
minflt: Default::default(),
majflt: Default::default(),
utime_clock_ticks: Default::default(),
stime_clock_ticks: Default::default(),
priority: Default::default(),
rt_priority: Default::default(),
core_forceidle_sum: Default::default(),
fair_slice_ns: Default::default(),
nr_threads: Default::default(),
smaps_rollup_kb: BTreeMap::new(),
rchar: Default::default(),
wchar: Default::default(),
syscr: Default::default(),
syscw: Default::default(),
read_bytes: Default::default(),
write_bytes: Default::default(),
cancelled_write_bytes: Default::default(),
cpu_delay_count: Default::default(),
cpu_delay_total_ns: Default::default(),
cpu_delay_max_ns: Default::default(),
cpu_delay_min_ns: Default::default(),
blkio_delay_count: Default::default(),
blkio_delay_total_ns: Default::default(),
blkio_delay_max_ns: Default::default(),
blkio_delay_min_ns: Default::default(),
swapin_delay_count: Default::default(),
swapin_delay_total_ns: Default::default(),
swapin_delay_max_ns: Default::default(),
swapin_delay_min_ns: Default::default(),
freepages_delay_count: Default::default(),
freepages_delay_total_ns: Default::default(),
freepages_delay_max_ns: Default::default(),
freepages_delay_min_ns: Default::default(),
thrashing_delay_count: Default::default(),
thrashing_delay_total_ns: Default::default(),
thrashing_delay_max_ns: Default::default(),
thrashing_delay_min_ns: Default::default(),
compact_delay_count: Default::default(),
compact_delay_total_ns: Default::default(),
compact_delay_max_ns: Default::default(),
compact_delay_min_ns: Default::default(),
wpcopy_delay_count: Default::default(),
wpcopy_delay_total_ns: Default::default(),
wpcopy_delay_max_ns: Default::default(),
wpcopy_delay_min_ns: Default::default(),
irq_delay_count: Default::default(),
irq_delay_total_ns: Default::default(),
irq_delay_max_ns: Default::default(),
irq_delay_min_ns: Default::default(),
hiwater_rss_bytes: Default::default(),
hiwater_vm_bytes: Default::default(),
}
}
}
impl ThreadState {
pub(crate) fn apply_delay_stats(&mut self, ds: &crate::taskstats::DelayStats) {
use crate::metric_types::{MonotonicCount, MonotonicNs, PeakBytes, PeakNs};
self.cpu_delay_count = MonotonicCount(ds.cpu_count);
self.cpu_delay_total_ns = MonotonicNs(ds.cpu_delay_total_ns);
self.cpu_delay_max_ns = PeakNs(ds.cpu_delay_max_ns);
self.cpu_delay_min_ns = PeakNs(ds.cpu_delay_min_ns);
self.blkio_delay_count = MonotonicCount(ds.blkio_count);
self.blkio_delay_total_ns = MonotonicNs(ds.blkio_delay_total_ns);
self.blkio_delay_max_ns = PeakNs(ds.blkio_delay_max_ns);
self.blkio_delay_min_ns = PeakNs(ds.blkio_delay_min_ns);
self.swapin_delay_count = MonotonicCount(ds.swapin_count);
self.swapin_delay_total_ns = MonotonicNs(ds.swapin_delay_total_ns);
self.swapin_delay_max_ns = PeakNs(ds.swapin_delay_max_ns);
self.swapin_delay_min_ns = PeakNs(ds.swapin_delay_min_ns);
self.freepages_delay_count = MonotonicCount(ds.freepages_count);
self.freepages_delay_total_ns = MonotonicNs(ds.freepages_delay_total_ns);
self.freepages_delay_max_ns = PeakNs(ds.freepages_delay_max_ns);
self.freepages_delay_min_ns = PeakNs(ds.freepages_delay_min_ns);
self.thrashing_delay_count = MonotonicCount(ds.thrashing_count);
self.thrashing_delay_total_ns = MonotonicNs(ds.thrashing_delay_total_ns);
self.thrashing_delay_max_ns = PeakNs(ds.thrashing_delay_max_ns);
self.thrashing_delay_min_ns = PeakNs(ds.thrashing_delay_min_ns);
self.compact_delay_count = MonotonicCount(ds.compact_count);
self.compact_delay_total_ns = MonotonicNs(ds.compact_delay_total_ns);
self.compact_delay_max_ns = PeakNs(ds.compact_delay_max_ns);
self.compact_delay_min_ns = PeakNs(ds.compact_delay_min_ns);
self.wpcopy_delay_count = MonotonicCount(ds.wpcopy_count);
self.wpcopy_delay_total_ns = MonotonicNs(ds.wpcopy_delay_total_ns);
self.wpcopy_delay_max_ns = PeakNs(ds.wpcopy_delay_max_ns);
self.wpcopy_delay_min_ns = PeakNs(ds.wpcopy_delay_min_ns);
self.irq_delay_count = MonotonicCount(ds.irq_count);
self.irq_delay_total_ns = MonotonicNs(ds.irq_delay_total_ns);
self.irq_delay_max_ns = PeakNs(ds.irq_delay_max_ns);
self.irq_delay_min_ns = PeakNs(ds.irq_delay_min_ns);
self.hiwater_rss_bytes = PeakBytes(ds.hiwater_rss_bytes);
self.hiwater_vm_bytes = PeakBytes(ds.hiwater_vm_bytes);
}
pub fn smaps_rollup_bytes(
&self,
) -> impl Iterator<Item = (&String, crate::metric_types::Bytes)> {
self.smaps_rollup_kb
.iter()
.map(|(k, v)| (k, crate::metric_types::Bytes(v.saturating_mul(1024))))
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupStats {
pub cpu: CgroupCpuStats,
pub memory: CgroupMemoryStats,
pub pids: CgroupPidsStats,
pub psi: Psi,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupCpuStats {
pub usage_usec: u64,
pub nr_throttled: u64,
pub throttled_usec: u64,
pub max_quota_us: Option<u64>,
pub max_period_us: u64,
pub weight: Option<u64>,
pub weight_nice: Option<i32>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupMemoryStats {
pub current: u64,
pub max: Option<u64>,
pub high: Option<u64>,
pub low: Option<u64>,
pub min: Option<u64>,
pub stat: BTreeMap<String, u64>,
pub events: BTreeMap<String, u64>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct CgroupPidsStats {
pub current: Option<u64>,
pub max: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct PsiHalf {
pub avg10: u16,
pub avg60: u16,
pub avg300: u16,
pub total_usec: u64,
}
impl PsiHalf {
pub fn avg10_percent(&self) -> f64 {
self.avg10 as f64 / 100.0
}
pub fn avg60_percent(&self) -> f64 {
self.avg60 as f64 / 100.0
}
pub fn avg300_percent(&self) -> f64 {
self.avg300 as f64 / 100.0
}
}
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct PsiResource {
pub some: PsiHalf,
pub full: PsiHalf,
}
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct Psi {
pub cpu: PsiResource,
pub memory: PsiResource,
pub io: PsiResource,
pub irq: PsiResource,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct SchedExtSysfs {
pub state: String,
pub switch_all: u64,
pub nr_rejected: u64,
pub hotplug_seq: u64,
pub enable_seq: u64,
}
mod parse;
use parse::*;
#[cfg(test)]
fn capture_thread_at(
proc_root: &Path,
tgid: i32,
tid: i32,
pcomm: &str,
comm: &str,
use_syscall_affinity: bool,
) -> ThreadState {
capture_thread_at_with_tally(
proc_root,
tgid,
tid,
pcomm,
comm,
use_syscall_affinity,
&mut None,
)
}
fn capture_thread_at_with_tally(
proc_root: &Path,
tgid: i32,
tid: i32,
pcomm: &str,
comm: &str,
use_syscall_affinity: bool,
tally: &mut Option<&mut ParseTally>,
) -> ThreadState {
let cgroup = read_cgroup_at_with_tally(proc_root, tgid, tid, tally).unwrap_or_default();
let stat = read_stat_at_with_tally(proc_root, tgid, tid, tally);
let (run_time_ns, wait_time_ns, timeslices) =
read_schedstat_at_with_tally(proc_root, tgid, tid, tally);
let io = read_io_at_with_tally(proc_root, tgid, tid, tally);
let status = read_status_at_with_tally(proc_root, tgid, tid, tally);
let sched = read_sched_at_with_tally(proc_root, tgid, tid, tally);
let smaps_rollup_kb = read_smaps_rollup_at_with_tally(proc_root, tgid, tid, tally);
let cpu_affinity = if use_syscall_affinity {
crate::cpu_util::read_affinity(tid)
.or(status.cpus_allowed)
.unwrap_or_default()
} else {
status.cpus_allowed.unwrap_or_default()
};
use crate::metric_types::{
Bytes, CategoricalString, ClockTicks, CpuSet, GaugeCount, GaugeNs, MonotonicCount,
MonotonicNs, OrdinalI32, OrdinalU32, PeakNs,
};
ThreadState {
tid: tid as u32,
tgid: tgid as u32,
pcomm: pcomm.to_string(),
comm: comm.to_string(),
cgroup,
start_time_clock_ticks: stat.start_time_clock_ticks.unwrap_or(0),
policy: CategoricalString(stat.policy.map(policy_name).unwrap_or_default()),
nice: OrdinalI32(stat.nice.unwrap_or(0)),
cpu_affinity: CpuSet(cpu_affinity),
processor: OrdinalI32(stat.processor.unwrap_or(0)),
state: status.state.unwrap_or_else(default_state_char),
ext_enabled: sched.ext_enabled.unwrap_or(false),
run_time_ns: MonotonicNs(run_time_ns.unwrap_or(0)),
wait_time_ns: MonotonicNs(wait_time_ns.unwrap_or(0)),
timeslices: MonotonicCount(timeslices.unwrap_or(0)),
voluntary_csw: MonotonicCount(status.voluntary_csw.unwrap_or(0)),
nonvoluntary_csw: MonotonicCount(status.nonvoluntary_csw.unwrap_or(0)),
nr_wakeups: MonotonicCount(sched.nr_wakeups.unwrap_or(0)),
nr_wakeups_local: MonotonicCount(sched.nr_wakeups_local.unwrap_or(0)),
nr_wakeups_remote: MonotonicCount(sched.nr_wakeups_remote.unwrap_or(0)),
nr_wakeups_sync: MonotonicCount(sched.nr_wakeups_sync.unwrap_or(0)),
nr_wakeups_migrate: MonotonicCount(sched.nr_wakeups_migrate.unwrap_or(0)),
nr_wakeups_affine: MonotonicCount(sched.nr_wakeups_affine.unwrap_or(0)),
nr_wakeups_affine_attempts: MonotonicCount(sched.nr_wakeups_affine_attempts.unwrap_or(0)),
nr_migrations: MonotonicCount(sched.nr_migrations.unwrap_or(0)),
nr_forced_migrations: MonotonicCount(sched.nr_forced_migrations.unwrap_or(0)),
nr_failed_migrations_affine: MonotonicCount(sched.nr_failed_migrations_affine.unwrap_or(0)),
nr_failed_migrations_running: MonotonicCount(
sched.nr_failed_migrations_running.unwrap_or(0),
),
nr_failed_migrations_hot: MonotonicCount(sched.nr_failed_migrations_hot.unwrap_or(0)),
wait_sum: MonotonicNs(sched.wait_sum.unwrap_or(0)),
wait_count: MonotonicCount(sched.wait_count.unwrap_or(0)),
wait_max: PeakNs(sched.wait_max.unwrap_or(0)),
voluntary_sleep_ns: MonotonicNs(match (sched.sleep_sum, sched.block_sum) {
(Some(sleep), Some(block)) => sleep.saturating_sub(block),
_ => 0,
}),
sleep_max: PeakNs(sched.sleep_max.unwrap_or(0)),
block_sum: MonotonicNs(sched.block_sum.unwrap_or(0)),
block_max: PeakNs(sched.block_max.unwrap_or(0)),
iowait_sum: MonotonicNs(sched.iowait_sum.unwrap_or(0)),
iowait_count: MonotonicCount(sched.iowait_count.unwrap_or(0)),
exec_max: PeakNs(sched.exec_max.unwrap_or(0)),
slice_max: PeakNs(sched.slice_max.unwrap_or(0)),
allocated_bytes: Bytes(0),
deallocated_bytes: Bytes(0),
minflt: MonotonicCount(stat.minflt.unwrap_or(0)),
majflt: MonotonicCount(stat.majflt.unwrap_or(0)),
utime_clock_ticks: ClockTicks(stat.utime_clock_ticks.unwrap_or(0)),
stime_clock_ticks: ClockTicks(stat.stime_clock_ticks.unwrap_or(0)),
priority: OrdinalI32(stat.priority.unwrap_or(0)),
rt_priority: OrdinalU32(stat.rt_priority.unwrap_or(0)),
core_forceidle_sum: MonotonicNs(sched.core_forceidle_sum.unwrap_or(0)),
fair_slice_ns: GaugeNs(sched.fair_slice_ns.unwrap_or(0)),
nr_threads: GaugeCount(if tid == tgid {
status.nr_threads.unwrap_or(0)
} else {
0
}),
smaps_rollup_kb,
rchar: Bytes(io.rchar.unwrap_or(0)),
wchar: Bytes(io.wchar.unwrap_or(0)),
syscr: MonotonicCount(io.syscr.unwrap_or(0)),
syscw: MonotonicCount(io.syscw.unwrap_or(0)),
read_bytes: Bytes(io.read_bytes.unwrap_or(0)),
write_bytes: Bytes(io.write_bytes.unwrap_or(0)),
cancelled_write_bytes: Bytes(io.cancelled_write_bytes.unwrap_or(0)),
cpu_delay_count: MonotonicCount(0),
cpu_delay_total_ns: MonotonicNs(0),
cpu_delay_max_ns: PeakNs(0),
cpu_delay_min_ns: PeakNs(0),
blkio_delay_count: MonotonicCount(0),
blkio_delay_total_ns: MonotonicNs(0),
blkio_delay_max_ns: PeakNs(0),
blkio_delay_min_ns: PeakNs(0),
swapin_delay_count: MonotonicCount(0),
swapin_delay_total_ns: MonotonicNs(0),
swapin_delay_max_ns: PeakNs(0),
swapin_delay_min_ns: PeakNs(0),
freepages_delay_count: MonotonicCount(0),
freepages_delay_total_ns: MonotonicNs(0),
freepages_delay_max_ns: PeakNs(0),
freepages_delay_min_ns: PeakNs(0),
thrashing_delay_count: MonotonicCount(0),
thrashing_delay_total_ns: MonotonicNs(0),
thrashing_delay_max_ns: PeakNs(0),
thrashing_delay_min_ns: PeakNs(0),
compact_delay_count: MonotonicCount(0),
compact_delay_total_ns: MonotonicNs(0),
compact_delay_max_ns: PeakNs(0),
compact_delay_min_ns: PeakNs(0),
wpcopy_delay_count: MonotonicCount(0),
wpcopy_delay_total_ns: MonotonicNs(0),
wpcopy_delay_max_ns: PeakNs(0),
wpcopy_delay_min_ns: PeakNs(0),
irq_delay_count: MonotonicCount(0),
irq_delay_total_ns: MonotonicNs(0),
irq_delay_max_ns: PeakNs(0),
irq_delay_min_ns: PeakNs(0),
hiwater_rss_bytes: crate::metric_types::PeakBytes(0),
hiwater_vm_bytes: crate::metric_types::PeakBytes(0),
}
}
#[cfg(test)]
fn capture_thread(tgid: i32, tid: i32, pcomm: &str) -> ThreadState {
let proc_root = Path::new(DEFAULT_PROC_ROOT);
let comm = read_thread_comm_at(proc_root, tgid, tid).unwrap_or_default();
capture_thread_at(proc_root, tgid, tid, pcomm, &comm, true)
}
#[derive(Debug, Default)]
struct ProbeSummary {
tgids_walked: u64,
jemalloc_detected: u64,
probed_ok: u64,
failed: u64,
attach_tag_counts: BTreeMap<&'static str, u64>,
probe_tag_counts: BTreeMap<&'static str, u64>,
}
impl ProbeSummary {
fn dominant_tag(&self) -> Option<&'static str> {
self.attach_tag_counts
.iter()
.filter(|(t, _)| !matches!(**t, "jemalloc-not-found" | "readlink-failure"))
.chain(self.probe_tag_counts.iter())
.max_by(|a, b| a.1.cmp(b.1).then_with(|| b.0.cmp(a.0)))
.map(|(tag, _)| *tag)
}
fn ptrace_dominates(&self) -> bool {
let total_ptrace: u64 = self
.probe_tag_counts
.iter()
.filter(|(t, _)| matches!(**t, "ptrace-seize" | "ptrace-interrupt"))
.map(|(_, n)| *n)
.sum();
self.failed > 0 && total_ptrace * 2 >= self.failed
}
fn to_public(&self) -> CtprofProbeSummary {
CtprofProbeSummary {
tgids_walked: self.tgids_walked,
jemalloc_detected: self.jemalloc_detected,
probed_ok: self.probed_ok,
failed: self.failed,
dominant_failure: self.dominant_tag().map(|t| t.to_string()),
privilege_dominant: self.ptrace_dominates(),
}
}
}
#[derive(Debug, Default)]
struct ParseTally {
tids_walked: u64,
failures_by_file: BTreeMap<&'static str, u64>,
pending_failures: Vec<&'static str>,
negative_dotted_values: u64,
pending_negative_dotted: u64,
}
impl ParseTally {
fn record_failure(&mut self, file_kind: &'static str) {
self.pending_failures.push(file_kind);
}
fn record_negative_dotted(&mut self) {
self.pending_negative_dotted = self.pending_negative_dotted.saturating_add(1);
}
fn commit_pending(&mut self) {
for kind in self.pending_failures.drain(..) {
*self.failures_by_file.entry(kind).or_insert(0) += 1;
}
self.negative_dotted_values = self
.negative_dotted_values
.saturating_add(self.pending_negative_dotted);
self.pending_negative_dotted = 0;
}
fn discard_pending(&mut self) {
self.pending_failures.clear();
self.pending_negative_dotted = 0;
}
fn total_failures(&self) -> u64 {
self.failures_by_file.values().sum()
}
fn dominant_file(&self) -> Option<&'static str> {
self.failures_by_file
.iter()
.max_by(|a, b| a.1.cmp(b.1).then_with(|| b.0.cmp(a.0)))
.map(|(tag, _)| *tag)
}
fn kernel_config_dominates(&self) -> bool {
let total = self.total_failures();
if total == 0 {
return false;
}
let kconfig: u64 = self
.failures_by_file
.iter()
.filter(|(t, _)| matches!(**t, "schedstat" | "io"))
.map(|(_, n)| *n)
.sum();
kconfig * 2 >= total
}
fn to_public(&self) -> CtprofParseSummary {
let read_failures = self.total_failures();
let mut by_file = BTreeMap::new();
for (k, v) in &self.failures_by_file {
by_file.insert((*k).to_string(), *v);
}
CtprofParseSummary {
tids_walked: self.tids_walked,
read_failures,
read_failures_by_file: by_file,
dominant_read_failure: self.dominant_file().map(|t| t.to_string()),
kernel_config_dominant: self.kernel_config_dominates(),
negative_dotted_values: self.negative_dotted_values,
}
}
}
const PTRACE_EPERM_HINT: &str = "hint: re-run as root, or sudo setcap cap_sys_ptrace+eip $(which ktstr), or set kernel.yama.ptrace_scope=0";
struct AttachOutcome {
pcomm: String,
result: std::result::Result<
crate::host_thread_probe::JemallocProbe,
crate::host_thread_probe::AttachError,
>,
}
#[derive(Clone)]
struct CachedAttachResult {
probe: Option<crate::host_thread_probe::JemallocProbe>,
failed_tag: Option<&'static str>,
}
fn attach_probe_for_tgid_at(proc_root: &Path, tgid: i32) -> AttachOutcome {
#[cfg(test)]
{
let injected = PANIC_INJECT_TGID.load(std::sync::atomic::Ordering::Acquire);
if injected != 0 && injected == tgid {
if PANIC_INJECT_NON_STRING.load(std::sync::atomic::Ordering::Acquire) {
std::panic::panic_any(0xDEADBEEFu64);
}
panic!("test: injected attach worker panic for tgid {tgid}");
}
}
let pcomm = read_process_comm_at(proc_root, tgid).unwrap_or_default();
let result = crate::host_thread_probe::attach_jemalloc_at(proc_root, tgid);
AttachOutcome { pcomm, result }
}
#[cfg(test)]
static PANIC_INJECT_TGID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
#[cfg(test)]
static PANIC_INJECT_NON_STRING: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
fn record_attach_outcome(
tgid: i32,
outcome: AttachOutcome,
summary: &mut ProbeSummary,
) -> CachedAttachResult {
summary.tgids_walked += 1;
let AttachOutcome { pcomm, result } = outcome;
match result {
Ok(probe) => {
summary.jemalloc_detected += 1;
tracing::debug!(tgid, %pcomm, "ctprof probe: jemalloc detected");
CachedAttachResult {
probe: Some(probe),
failed_tag: None,
}
}
Err(err) => {
let tag = err.tag();
*summary.attach_tag_counts.entry(tag).or_insert(0) += 1;
if matches!(tag, "jemalloc-not-found" | "readlink-failure") {
tracing::debug!(tgid, %pcomm, tag, err = %err, "ctprof probe: attach skipped");
} else {
summary.failed += 1;
tracing::warn!(tgid, %pcomm, tag, err = %err, "ctprof probe: attach failed");
}
CachedAttachResult {
probe: None,
failed_tag: Some(tag),
}
}
}
}
fn try_attach_probe_for_tgid_at(
proc_root: &Path,
tgid: i32,
summary: &mut ProbeSummary,
) -> Option<crate::host_thread_probe::JemallocProbe> {
let outcome = attach_probe_for_tgid_at(proc_root, tgid);
record_attach_outcome(tgid, outcome, summary).probe
}
fn probe_thread_recording(
probe: &crate::host_thread_probe::JemallocProbe,
tid: i32,
tgid: i32,
pcomm: &str,
comm: &str,
summary: &mut ProbeSummary,
failed_tgids_logged: &mut std::collections::BTreeSet<i32>,
) -> (u64, u64) {
match crate::host_thread_probe::probe_thread(probe, tid) {
Ok(c) => {
summary.probed_ok += 1;
(c.allocated_bytes, c.deallocated_bytes)
}
Err(err) => {
let tag = err.tag();
*summary.probe_tag_counts.entry(tag).or_insert(0) += 1;
summary.failed += 1;
if failed_tgids_logged.insert(tgid) {
tracing::warn!(
tgid,
tid,
%pcomm,
%comm,
tag,
err = %err,
"ctprof probe: probe_thread failed",
);
}
(0, 0)
}
}
}
fn emit_parse_summary(tally: &ParseTally) {
let tids_walked = tally.tids_walked;
let read_failures = tally.total_failures();
let negative_dotted = tally.negative_dotted_values;
let dominant_clause = tally
.dominant_file()
.map(|tag| format!(" (dominant: {tag})"))
.unwrap_or_default();
let kconfig_clause = if tally.kernel_config_dominates() {
format!("; {PARSE_KCONFIG_HINT}")
} else {
String::new()
};
let negative_clause = if negative_dotted > 0 {
format!(", {negative_dotted} negative-dotted values")
} else {
String::new()
};
tracing::info!(
"ctprof parse: {tids_walked} tids walked, \
{read_failures} read failures{negative_clause}\
{dominant_clause}{kconfig_clause}",
);
}
fn emit_probe_summary(summary: &ProbeSummary) {
let tgids_walked = summary.tgids_walked;
let jemalloc_detected = summary.jemalloc_detected;
let probed_ok = summary.probed_ok;
let failed = summary.failed;
if failed > 0 {
let dominant = summary.dominant_tag().unwrap_or("?");
if summary.ptrace_dominates() {
tracing::info!(
"ctprof probe: {tgids_walked} tgids walked, \
{jemalloc_detected} jemalloc detected, \
{probed_ok} probed OK, {failed} failed \
(dominant: {dominant}; {})",
PTRACE_EPERM_HINT,
);
} else {
tracing::info!(
"ctprof probe: {tgids_walked} tgids walked, \
{jemalloc_detected} jemalloc detected, \
{probed_ok} probed OK, {failed} failed \
(dominant: {dominant})",
);
}
} else {
tracing::info!(
"ctprof probe: {tgids_walked} tgids walked, \
{jemalloc_detected} jemalloc detected, \
{probed_ok} probed OK, {failed} failed",
);
}
}
fn capture_with(
proc_root: &Path,
cgroup_root: &Path,
sys_root: &Path,
use_syscall_affinity: bool,
) -> CtprofSnapshot {
let captured_at_unix_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let host = if use_syscall_affinity {
Some(crate::host_context::collect_host_context())
} else {
None
};
let self_pid = std::process::id() as i32;
let mut threads: Vec<ThreadState> = Vec::new();
let mut failed_tgids_logged: std::collections::BTreeSet<i32> =
std::collections::BTreeSet::new();
let tgids = iter_tgids_at(proc_root);
let probe_cache: std::sync::Mutex<std::collections::HashMap<(u64, u64), CachedAttachResult>> =
std::sync::Mutex::new(std::collections::HashMap::new());
let summary_mutex = std::sync::Mutex::new(ProbeSummary::default());
let probe_map: std::collections::HashMap<i32, Option<crate::host_thread_probe::JemallocProbe>> =
if use_syscall_affinity {
use rayon::prelude::*;
let max_threads = {
let num_cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let load = std::fs::read_to_string(proc_root.join("loadavg"))
.ok()
.and_then(|s| s.split_whitespace().next()?.parse::<f64>().ok())
.unwrap_or(0.0);
let headroom = (num_cpus as f64 - load).max(1.0) as usize;
headroom.clamp(1, num_cpus / 2 + 1)
};
let pool_result = rayon::ThreadPoolBuilder::new()
.num_threads(max_threads)
.build();
let work = || {
tgids
.par_iter()
.copied()
.filter(|&tgid| tgid != self_pid)
.map(|tgid| {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let cache_key =
std::fs::metadata(proc_root.join(tgid.to_string()).join("exe"))
.ok()
.map(|m| {
use std::os::unix::fs::MetadataExt;
(m.dev(), m.ino())
});
if let Some(key) = cache_key {
let cached = probe_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(&key)
.cloned();
if let Some(cached_result) = cached {
let mut s =
summary_mutex.lock().unwrap_or_else(|e| e.into_inner());
s.tgids_walked += 1;
match &cached_result.failed_tag {
None => {
s.jemalloc_detected += 1;
tracing::debug!(
tgid,
"ctprof probe: cache hit (jemalloc)"
);
}
Some(tag) => {
*s.attach_tag_counts.entry(tag).or_insert(0) += 1;
if !matches!(
*tag,
"jemalloc-not-found" | "readlink-failure"
) {
s.failed += 1;
}
tracing::debug!(
tgid,
tag,
"ctprof probe: cache hit (prior failure)"
);
}
}
cached_result.probe
} else {
let outcome = attach_probe_for_tgid_at(proc_root, tgid);
let mut s =
summary_mutex.lock().unwrap_or_else(|e| e.into_inner());
let res = record_attach_outcome(tgid, outcome, &mut s);
drop(s);
let probe = res.probe.clone();
probe_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(key, res);
probe
}
} else {
let outcome = attach_probe_for_tgid_at(proc_root, tgid);
let mut s = summary_mutex.lock().unwrap_or_else(|e| e.into_inner());
record_attach_outcome(tgid, outcome, &mut s).probe
}
}));
let probe = match result {
Ok(p) => p,
Err(panic_payload) => {
let panic_msg = panic_payload
.downcast_ref::<&str>()
.copied()
.or_else(|| {
panic_payload.downcast_ref::<String>().map(|s| s.as_str())
})
.unwrap_or("<non-string panic payload>");
let mut s = summary_mutex.lock().unwrap_or_else(|e| e.into_inner());
s.tgids_walked += 1;
*s.attach_tag_counts.entry("worker-panic").or_insert(0) += 1;
s.failed += 1;
tracing::error!(
tgid,
panic_msg,
"ctprof probe: attach worker panicked; tgid skipped",
);
None
}
};
(tgid, probe)
})
.collect()
};
match pool_result {
Ok(pool) => pool.install(work),
Err(e) => {
tracing::warn!(
error = %e,
max_threads,
"rayon ThreadPoolBuilder failed; falling back to global pool"
);
work()
}
}
} else {
std::collections::HashMap::new()
};
let mut summary = summary_mutex
.into_inner()
.unwrap_or_else(|e| e.into_inner());
let mut parse_tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = if use_syscall_affinity {
Some(&mut parse_tally)
} else {
None
};
let taskstats_client = if use_syscall_affinity {
match crate::taskstats::TaskstatsClient::open() {
Ok(c) => Some(c),
Err(e) => {
tracing::warn!(
error = %e,
"ctprof taskstats: open failed; delay-accounting and memory-watermark \
fields will be zero. Ensure the kernel was built with CONFIG_TASKSTATS \
(plus CONFIG_TASK_DELAY_ACCT for delay fields and CONFIG_TASK_XACCT for \
hiwater fields), the process holds CAP_NET_ADMIN, and the kernel was \
booted with `delayacct=on` (or sysctl `kernel.task_delayacct=1`)"
);
None
}
}
} else {
None
};
let mut taskstats_tally: Option<crate::taskstats::TaskstatsSummary> = if use_syscall_affinity {
Some(crate::taskstats::TaskstatsSummary::default())
} else {
None
};
for tgid in &tgids {
let tgid = *tgid;
let pcomm = read_process_comm_at(proc_root, tgid).unwrap_or_default();
let probe: Option<&crate::host_thread_probe::JemallocProbe> = probe_map
.get(&tgid)
.and_then(|p: &Option<crate::host_thread_probe::JemallocProbe>| p.as_ref());
for tid in iter_task_ids_at(proc_root, tgid) {
if let Some(t) = tally_opt.as_mut() {
t.tids_walked += 1;
}
let comm = read_thread_comm_at(proc_root, tgid, tid).unwrap_or_default();
let (allocated_bytes, deallocated_bytes) = probe
.map(|p| {
probe_thread_recording(
p,
tid,
tgid,
&pcomm,
&comm,
&mut summary,
&mut failed_tgids_logged,
)
})
.unwrap_or((0, 0));
let mut t = capture_thread_at_with_tally(
proc_root,
tgid,
tid,
&pcomm,
&comm,
use_syscall_affinity,
&mut tally_opt,
);
t.allocated_bytes = crate::metric_types::Bytes(allocated_bytes);
t.deallocated_bytes = crate::metric_types::Bytes(deallocated_bytes);
if let Some(client) = taskstats_client.as_ref() {
let result = client.query_tid(tid as u32);
if let Some(tally) = taskstats_tally.as_mut() {
tally.record_result(&result);
}
if let Ok(ds) = result {
t.apply_delay_stats(&ds);
}
}
if t.comm.is_empty() && t.start_time_clock_ticks == 0 {
if let Some(t) = tally_opt.as_mut() {
t.discard_pending();
}
continue;
}
if let Some(t) = tally_opt.as_mut() {
t.commit_pending();
}
threads.push(t);
}
}
let probe_summary = if use_syscall_affinity {
emit_probe_summary(&summary);
Some(summary.to_public())
} else {
None
};
let parse_summary = if use_syscall_affinity {
emit_parse_summary(&parse_tally);
Some(parse_tally.to_public())
} else {
None
};
let mut cgroup_stats: BTreeMap<String, CgroupStats> = BTreeMap::new();
for t in &threads {
if !t.cgroup.is_empty() && !cgroup_stats.contains_key(&t.cgroup) {
cgroup_stats.insert(
t.cgroup.clone(),
read_cgroup_stats_at(cgroup_root, &t.cgroup),
);
}
}
let psi = read_host_psi_at(proc_root);
let sched_ext = read_sched_ext_sysfs_at(sys_root);
CtprofSnapshot {
captured_at_unix_ns,
host,
threads,
cgroup_stats,
probe_summary,
parse_summary,
taskstats_summary: taskstats_tally,
psi,
sched_ext,
}
}
pub fn capture() -> CtprofSnapshot {
capture_with(
Path::new(DEFAULT_PROC_ROOT),
Path::new(DEFAULT_CGROUP_ROOT),
Path::new(DEFAULT_SYS_ROOT),
true,
)
}
pub fn capture_pid(pid: i32) -> CtprofSnapshot {
capture_pid_with(
Path::new(DEFAULT_PROC_ROOT),
Path::new(DEFAULT_CGROUP_ROOT),
Path::new(DEFAULT_SYS_ROOT),
pid,
true,
)
}
fn capture_pid_with(
proc_root: &Path,
cgroup_root: &Path,
sys_root: &Path,
pid: i32,
use_syscall_affinity: bool,
) -> CtprofSnapshot {
let captured_at_unix_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let host = if use_syscall_affinity {
Some(crate::host_context::collect_host_context())
} else {
None
};
let self_pid = std::process::id() as i32;
let pcomm = read_process_comm_at(proc_root, pid).unwrap_or_default();
let mut summary = ProbeSummary::default();
let mut failed_tgids_logged: std::collections::BTreeSet<i32> =
std::collections::BTreeSet::new();
let probe = if use_syscall_affinity && pid != self_pid {
try_attach_probe_for_tgid_at(proc_root, pid, &mut summary)
} else {
None
};
let mut threads: Vec<ThreadState> = Vec::new();
let mut parse_tally = ParseTally::default();
let mut tally_opt: Option<&mut ParseTally> = if use_syscall_affinity {
Some(&mut parse_tally)
} else {
None
};
let taskstats_client = if use_syscall_affinity {
match crate::taskstats::TaskstatsClient::open() {
Ok(c) => Some(c),
Err(e) => {
tracing::warn!(
error = %e,
"ctprof taskstats: open failed; delay-accounting and memory-watermark \
fields will be zero. Ensure the kernel was built with CONFIG_TASKSTATS \
(plus CONFIG_TASK_DELAY_ACCT for delay fields and CONFIG_TASK_XACCT for \
hiwater fields), the process holds CAP_NET_ADMIN, and the kernel was \
booted with `delayacct=on` (or sysctl `kernel.task_delayacct=1`)"
);
None
}
}
} else {
None
};
let mut taskstats_tally: Option<crate::taskstats::TaskstatsSummary> = if use_syscall_affinity {
Some(crate::taskstats::TaskstatsSummary::default())
} else {
None
};
for tid in iter_task_ids_at(proc_root, pid) {
if let Some(t) = tally_opt.as_mut() {
t.tids_walked += 1;
}
let comm = read_thread_comm_at(proc_root, pid, tid).unwrap_or_default();
let (allocated_bytes, deallocated_bytes) = probe
.as_ref()
.map(|p| {
probe_thread_recording(
p,
tid,
pid,
&pcomm,
&comm,
&mut summary,
&mut failed_tgids_logged,
)
})
.unwrap_or((0, 0));
let mut t = capture_thread_at_with_tally(
proc_root,
pid,
tid,
&pcomm,
&comm,
use_syscall_affinity,
&mut tally_opt,
);
t.allocated_bytes = crate::metric_types::Bytes(allocated_bytes);
t.deallocated_bytes = crate::metric_types::Bytes(deallocated_bytes);
if let Some(client) = taskstats_client.as_ref() {
let result = client.query_tid(tid as u32);
if let Some(tally) = taskstats_tally.as_mut() {
tally.record_result(&result);
}
if let Ok(ds) = result {
t.apply_delay_stats(&ds);
}
}
if t.comm.is_empty() && t.start_time_clock_ticks == 0 {
if let Some(t) = tally_opt.as_mut() {
t.discard_pending();
}
continue;
}
if let Some(t) = tally_opt.as_mut() {
t.commit_pending();
}
threads.push(t);
}
let probe_summary = if use_syscall_affinity {
emit_probe_summary(&summary);
Some(summary.to_public())
} else {
None
};
let parse_summary = if use_syscall_affinity {
emit_parse_summary(&parse_tally);
Some(parse_tally.to_public())
} else {
None
};
let mut cgroup_stats: BTreeMap<String, CgroupStats> = BTreeMap::new();
for t in &threads {
if !t.cgroup.is_empty() && !cgroup_stats.contains_key(&t.cgroup) {
cgroup_stats.insert(
t.cgroup.clone(),
read_cgroup_stats_at(cgroup_root, &t.cgroup),
);
}
}
let psi = read_host_psi_at(proc_root);
let sched_ext = read_sched_ext_sysfs_at(sys_root);
CtprofSnapshot {
captured_at_unix_ns,
host,
threads,
cgroup_stats,
probe_summary,
parse_summary,
taskstats_summary: taskstats_tally,
psi,
sched_ext,
}
}
pub fn capture_to(path: &Path) -> Result<()> {
let snap = capture();
snap.write(path)
.with_context(|| format!("write ctprof snapshot to {}", path.display()))
}
#[cfg(test)]
mod tests_capture;
#[cfg(test)]
mod tests_cgroup;
#[cfg(test)]
mod tests_helpers;
#[cfg(test)]
mod tests_parse;
#[cfg(test)]
mod tests_parse_summary;
#[cfg(test)]
mod tests_probe;
#[cfg(test)]
mod tests_snapshot;
#[cfg(test)]
mod tests_thread_state;