#![cfg(test)]
#![allow(unused_imports)]
use super::super::affinity::*;
use super::super::config::*;
use super::super::types::*;
use super::super::worker::*;
use super::testing::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[test]
fn stop_and_collect_reaps_custom_grandchild_via_process_group() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::custom("grandchild_sleep", forks_grandchild_sleep_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
let _reports = h.stop_and_collect();
assert_grandchild_reaped_within(gpid, Duration::from_secs(5), "stop_and_collect");
}
#[test]
fn stop_and_collect_reaps_grandchildren_from_multiple_workers() {
require_grandchild_sleep_binary();
const N: usize = 3;
let config = WorkloadConfig {
num_workers: N,
affinity: AffinityIntent::Inherit,
work_type: WorkType::custom("grandchild_sleep", forks_grandchild_sleep_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pids = h.worker_pids();
assert_eq!(
worker_pids.len(),
N,
"WorkloadHandle::worker_pids should report {N} workers",
);
let unique: std::collections::HashSet<libc::pid_t> = worker_pids.iter().copied().collect();
assert_eq!(
unique.len(),
worker_pids.len(),
"WorkloadHandle::worker_pids returned duplicates: {worker_pids:?}",
);
let pidfiles: Vec<std::path::PathBuf> = worker_pids
.iter()
.map(|&p| grandchild_pidfile_path(p))
.collect();
for p in &pidfiles {
let _ = std::fs::remove_file(p);
}
let _pidfile_cleanup = PidfileCleanup(pidfiles.clone());
h.start();
let gpids: Vec<libc::pid_t> = worker_pids
.iter()
.zip(pidfiles.iter())
.map(|(&wp, pf)| read_grandchild_gpid_from_pidfile(wp, pf))
.collect();
for &gpid in &gpids {
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
}
let _reports = h.stop_and_collect();
for &gpid in &gpids {
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"stop_and_collect (multi-worker)",
);
}
}
#[test]
fn stop_and_collect_reaps_grandchild_from_panicking_custom_closure() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::custom("grandchild_panic", forks_grandchild_and_panics_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
let reports = h.stop_and_collect();
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"stop_and_collect (panic-path)",
);
assert_eq!(reports.len(), 1, "one worker spawned");
let r = &reports[0];
assert_eq!(
r.work_units, 0,
"sentinel must be zeroed; non-zero work_units would mean \
a worker-authored report leaked through the JSON-parse \
branch despite the panic",
);
assert!(
!r.completed,
"sentinel must carry completed=false so downstream \
consumers distinguish '0 iterations by design / fast \
exit' (completed=true) from '0 iterations because the \
worker crashed before producing a report' (this case); \
`..WorkerReport::default()` gives the bool-default \
`false` at the sentinel construction site in \
`stop_and_collect`",
);
match &r.exit_info {
Some(WorkerExitInfo::Signaled(sig)) if *sig == libc::SIGABRT => {}
Some(WorkerExitInfo::Exited(1)) => {}
other => panic!(
"expected sentinel with Signaled(SIGABRT) (panic=abort) \
or Exited(1) (panic=unwind + catch_unwind) for a \
panicking Custom closure; got {other:?}",
),
}
}
#[test]
fn drop_reaps_custom_grandchild_via_process_group() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::custom("grandchild_sleep", forks_grandchild_sleep_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before Drop",
);
drop(h);
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"handle Drop (no stop_and_collect)",
);
}
#[test]
fn stop_and_collect_reaps_grandchild_from_graceful_custom_closure() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::custom("grandchild_graceful", forks_grandchild_and_exits_cleanly_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
let t0 = Instant::now();
let _reports = h.stop_and_collect();
let elapsed = t0.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"stop_and_collect must hit the graceful-exit branch \
(<2s), not StillAlive escalation (~5s). elapsed={elapsed:?} \
— a value near the 5s deadline means SIGUSR1 failed to \
reach the worker or wait_for_deadline did not observe \
STOP in time",
);
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"stop_and_collect (graceful-exit)",
);
}