#![cfg(test)]
#![allow(unused_imports)]
use super::super::affinity::*;
use super::super::config::*;
use super::super::types::*;
use super::super::worker::*;
use super::testing::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
fn read_tgid(tid: libc::pid_t) -> libc::pid_t {
let status = std::fs::read_to_string(format!("/proc/{tid}/status"))
.expect("/proc/<tid>/status must be readable for live thread");
let line = status
.lines()
.find(|l| l.starts_with("Tgid:"))
.expect("/proc/<tid>/status must include Tgid line");
line.trim_start_matches("Tgid:")
.trim()
.parse()
.expect("Tgid must be a parseable pid_t")
}
fn read_comm(pid: libc::pid_t) -> String {
let raw = std::fs::read_to_string(format!("/proc/{pid}/comm"))
.expect("/proc/<pid>/comm must be readable for live task");
raw.trim_end_matches('\n').to_string()
}
fn pcomm_spec(workers: usize, pcomm: &'static str) -> WorkSpec {
WorkSpec::default()
.work_type(WorkType::SpinWait)
.workers(workers)
.pcomm(pcomm)
}
fn read_task_tids(leader_pid: libc::pid_t) -> BTreeSet<libc::pid_t> {
let mut out = BTreeSet::new();
let dir = std::fs::read_dir(format!("/proc/{leader_pid}/task"))
.expect("/proc/<leader>/task must be readable while leader is alive");
for entry in dir {
let entry = entry.expect("task directory entry must read cleanly");
let name = entry.file_name();
let s = name
.to_str()
.expect("/proc task entry names are ASCII tids");
let tid: libc::pid_t = s.parse().expect("/proc task entry must parse as pid_t");
out.insert(tid);
}
out
}
#[test]
fn pcomm_container_sets_group_leader_comm() {
let works = vec![pcomm_spec(2, "leader")];
let mut h = WorkloadHandle::spawn_pcomm_cgroup("leader", None, None, &works)
.expect("pcomm spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let pids = h.worker_pids();
assert_eq!(
pids.len(),
1,
"pcomm spawn registers a single container child (the leader); \
got {} entries",
pids.len(),
);
let leader_pid = pids[0];
let leader_comm = read_comm(leader_pid);
let task_tids = read_task_tids(leader_pid);
let mut tgids: BTreeSet<libc::pid_t> = BTreeSet::new();
for tid in &task_tids {
tgids.insert(read_tgid(*tid));
}
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
2,
"pcomm group must produce exactly 2 reports; got {}",
reports.len(),
);
assert_eq!(
tgids.len(),
1,
"all pcomm-group threads must share a single Tgid (the leader); \
observed Tgids: {tgids:?}",
);
assert_eq!(
*tgids.iter().next().unwrap(),
leader_pid,
"shared Tgid must equal the container leader pid",
);
assert_eq!(
leader_comm, "leader",
"/proc/<leader>/comm must equal pcomm; got {leader_comm:?}",
);
}
#[test]
fn pcomm_per_thread_comm_distinct_from_group_leader() {
let works = vec![
WorkSpec::default()
.work_type(WorkType::SpinWait)
.workers(2)
.pcomm("leader")
.comm("worker"),
];
let mut h = WorkloadHandle::spawn_pcomm_cgroup("leader", None, None, &works)
.expect("pcomm + comm spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let pids = h.worker_pids();
assert_eq!(pids.len(), 1);
let leader_pid = pids[0];
let leader_comm = read_comm(leader_pid);
let task_tids = read_task_tids(leader_pid);
let mut worker_comms: Vec<(libc::pid_t, String)> = Vec::new();
for tid in &task_tids {
if *tid == leader_pid {
continue;
}
worker_comms.push((*tid, read_comm(*tid)));
}
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 2);
assert_eq!(
leader_comm, "leader",
"/proc/<leader>/comm must equal pcomm",
);
assert_eq!(
worker_comms.len(),
2,
"expected 2 worker thread tids besides the leader; got {worker_comms:?}",
);
for (tid, tcomm) in &worker_comms {
assert_eq!(
tcomm, "worker",
"/proc/<tid>/comm for worker tid={tid} must equal per-thread \
comm 'worker'; got {tcomm:?}",
);
}
}
#[test]
fn multiple_pcomm_groups_have_distinct_containers() {
let works_a = vec![pcomm_spec(2, "group_a")];
let works_b = vec![pcomm_spec(2, "group_b")];
let mut h_a = WorkloadHandle::spawn_pcomm_cgroup("group_a", None, None, &works_a)
.expect("group_a pcomm spawn must succeed");
let mut h_b = WorkloadHandle::spawn_pcomm_cgroup("group_b", None, None, &works_b)
.expect("group_b pcomm spawn must succeed");
h_a.start();
h_b.start();
std::thread::sleep(Duration::from_millis(200));
let pids_a = h_a.worker_pids();
let pids_b = h_b.worker_pids();
assert_eq!(pids_a.len(), 1);
assert_eq!(pids_b.len(), 1);
let leader_a = pids_a[0];
let leader_b = pids_b[0];
let comm_a = read_comm(leader_a);
let comm_b = read_comm(leader_b);
let tids_a = read_task_tids(leader_a);
let tids_b = read_task_tids(leader_b);
let mut tgids_a: BTreeSet<libc::pid_t> = BTreeSet::new();
for tid in &tids_a {
tgids_a.insert(read_tgid(*tid));
}
let mut tgids_b: BTreeSet<libc::pid_t> = BTreeSet::new();
for tid in &tids_b {
tgids_b.insert(read_tgid(*tid));
}
let reports_a = h_a.stop_and_collect();
let reports_b = h_b.stop_and_collect();
assert_eq!(reports_a.len(), 2, "group_a must produce 2 reports");
assert_eq!(reports_b.len(), 2, "group_b must produce 2 reports");
assert_ne!(
leader_a, leader_b,
"the two pcomm groups must have distinct leaders; both observed pid={leader_a}",
);
assert_eq!(
tgids_a.len(),
1,
"group_a tgids must collapse to one; observed {tgids_a:?}",
);
assert_eq!(
*tgids_a.iter().next().unwrap(),
leader_a,
"group_a shared Tgid must equal leader_a",
);
assert_eq!(
tgids_b.len(),
1,
"group_b tgids must collapse to one; observed {tgids_b:?}",
);
assert_eq!(
*tgids_b.iter().next().unwrap(),
leader_b,
"group_b shared Tgid must equal leader_b",
);
assert_eq!(comm_a, "group_a");
assert_eq!(comm_b, "group_b");
}
#[test]
fn pcomm_stop_and_collect_returns_all_reports() {
let works = vec![pcomm_spec(4, "multi")];
let baseline_fds = std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0);
let mut h = WorkloadHandle::spawn_pcomm_cgroup("multi", None, None, &works)
.expect("pcomm spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(
reports.len(),
4,
"stop_and_collect must return all N reports from the leader; \
got {}",
reports.len(),
);
for r in &reports {
assert!(
r.completed,
"pcomm worker tid={} report must be completed=true; \
completed=false indicates a sentinel from a missing \
or unparseable per-thread report",
r.tid,
);
assert!(
r.work_units > 0,
"pcomm worker tid={} did no work; work_units=0",
r.tid,
);
}
let after_fds = std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0);
assert!(
after_fds <= baseline_fds + 1,
"fd leak: baseline={baseline_fds}, after collect={after_fds}",
);
}
#[test]
fn pcomm_kernel_truncates_to_15_bytes() {
let long_name = "this_is_a_very_long_name";
assert!(
long_name.len() > 15,
"test fixture must exceed TASK_COMM_LEN-1"
);
let works = vec![pcomm_spec(1, long_name)];
let mut h = WorkloadHandle::spawn_pcomm_cgroup(long_name, None, None, &works)
.expect("pcomm spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let pids = h.worker_pids();
assert_eq!(pids.len(), 1);
let leader_pid = pids[0];
let observed = read_comm(leader_pid);
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
assert_eq!(
observed.len(),
15,
"kernel must truncate pcomm to 15 bytes (TASK_COMM_LEN-1=15); \
observed length {} for {observed:?}",
observed.len(),
);
assert_eq!(
observed,
&long_name[..15],
"truncated comm must be the leading 15 bytes of pcomm input",
);
}
#[test]
fn pcomm_zero_workers_no_container_spawn() {
let works = vec![pcomm_spec(0, "empty")];
let baseline_fds = std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0);
let h = WorkloadHandle::spawn_pcomm_cgroup("empty", None, None, &works)
.expect("pcomm with 0 workers must spawn cleanly");
let reports = h.stop_and_collect();
assert!(
reports.is_empty(),
"pcomm group with 0 workers must produce no reports; got {}",
reports.len(),
);
let after_fds = std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0);
assert!(
after_fds <= baseline_fds + 1,
"0-worker pcomm path must not leak fds; baseline={baseline_fds}, after={after_fds}",
);
}
#[test]
fn pcomm_handle_drop_reaps_container() {
let works = vec![pcomm_spec(2, "dropme")];
let mut h = WorkloadHandle::spawn_pcomm_cgroup("dropme", None, None, &works)
.expect("pcomm spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(100));
let pids = h.worker_pids();
assert!(
!pids.is_empty(),
"pcomm handle must report worker pids after start",
);
let first_pid = pids[0];
assert!(
first_pid > 0,
"first worker pid must be published post-start; got {first_pid}",
);
let leader_pid = first_pid;
drop(h);
let alive = nix::sys::signal::kill(nix::unistd::Pid::from_raw(leader_pid), None).is_ok();
assert!(!alive, "leader pid {leader_pid} must be dead after Drop",);
}
#[test]
fn pcomm_multiple_workspecs_coalesce_into_one_leader() {
let works = vec![
WorkSpec::default()
.work_type(WorkType::SpinWait)
.workers(2)
.pcomm("shared"),
WorkSpec::default()
.work_type(WorkType::SpinWait)
.workers(1)
.pcomm("shared"),
];
let mut h = WorkloadHandle::spawn_pcomm_cgroup("shared", None, None, &works)
.expect("multi-WorkSpec pcomm spawn must succeed");
h.start();
std::thread::sleep(Duration::from_millis(200));
let pids = h.worker_pids();
assert_eq!(pids.len(), 1);
let leader_pid = pids[0];
let task_tids = read_task_tids(leader_pid);
let mut tgids: BTreeSet<libc::pid_t> = BTreeSet::new();
for tid in &task_tids {
tgids.insert(read_tgid(*tid));
}
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 3, "2 + 1 workers across 2 WorkSpecs");
assert_eq!(
tgids.len(),
1,
"every thread shares the single leader's Tgid; observed {tgids:?}",
);
assert_eq!(
*tgids.iter().next().unwrap(),
leader_pid,
"shared Tgid must equal the container leader pid",
);
let group0 = reports.iter().filter(|r| r.group_idx == 0).count();
let group1 = reports.iter().filter(|r| r.group_idx == 1).count();
assert_eq!(group0, 2, "WorkSpec[0] contributes 2 reports");
assert_eq!(group1, 1, "WorkSpec[1] contributes 1 report");
}
#[test]
fn pcomm_rejects_fork_exit() {
let works = vec![
WorkSpec::default()
.work_type(WorkType::ForkExit)
.workers(1)
.pcomm("leader"),
];
let result = WorkloadHandle::spawn_pcomm_cgroup("leader", None, None, &works);
let err = match result {
Ok(_) => panic!("pcomm + ForkExit must reject at spawn_pcomm_cgroup"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("incompatible with WorkType::ForkExit"),
"diagnostic must name the ForkExit incompatibility, got: {msg}",
);
}
#[test]
fn pcomm_rejects_cgroup_churn() {
let works = vec![
WorkSpec::default()
.work_type(WorkType::CgroupChurn {
groups: 2,
cycle_ms: 10,
})
.workers(1)
.pcomm("leader"),
];
let result = WorkloadHandle::spawn_pcomm_cgroup("leader", None, None, &works);
let err = match result {
Ok(_) => panic!("pcomm + CgroupChurn must reject at spawn_pcomm_cgroup"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("incompatible with WorkType::CgroupChurn"),
"diagnostic must name the CgroupChurn incompatibility, got: {msg}",
);
}