use std::io;
use std::os::unix::process::CommandExt;
use std::sync::Mutex;
use std::time::Duration;
use tokio::process::{Child, Command};
#[cfg(feature = "stats")]
use crate::stats::ProcessGroupStats;
struct Entry {
id: i32,
group_seen: bool,
}
struct Tracked {
ids: Mutex<Vec<Entry>>,
group: bool,
}
impl Tracked {
const fn new(group: bool) -> Self {
Tracked {
ids: Mutex::new(Vec::new()),
group,
}
}
fn probe_raw(&self, id: i32, group_seen: bool) -> (bool, bool) {
let probe = if self.group { -id } else { id };
if unsafe { libc::kill(probe, 0) } == 0 {
return (true, group_seen || self.group);
}
let err = std::io::Error::last_os_error().raw_os_error();
if err == Some(libc::EPERM) {
return (true, group_seen || self.group);
}
if self.group && !group_seen && err == Some(libc::ESRCH) {
if unsafe { libc::kill(id, 0) } == 0 {
return (true, false);
}
let alive = std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM);
return (alive, false);
}
(false, group_seen)
}
fn probe_entry(&self, entry: &mut Entry) -> bool {
let (alive, group_seen) = self.probe_raw(entry.id, entry.group_seen);
entry.group_seen = group_seen;
alive
}
#[cfg(feature = "process-control")]
fn contains(&self, id: i32) -> bool {
self.ids
.lock()
.unwrap_or_else(|e| e.into_inner())
.iter()
.any(|e| e.id == id)
}
fn track(&self, id: i32, group_seen: bool) {
let mut ids = self.ids.lock().unwrap_or_else(|e| e.into_inner());
ids.retain_mut(|e| self.probe_entry(e));
if !ids.iter().any(|e| e.id == id) {
ids.push(Entry { id, group_seen });
}
}
fn signal_all(&self, sig: i32) {
let mut ids = self.ids.lock().unwrap_or_else(|e| e.into_inner()); ids.retain_mut(|e| {
if !self.probe_entry(e) {
return false; }
let id = e.id;
unsafe {
if self.group {
if libc::killpg(id, sig) == -1
&& io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH)
&& !e.group_seen
{
libc::kill(id, sig);
}
} else {
libc::kill(id, sig);
}
}
true
});
}
fn any_alive(&self) -> bool {
let mut ids = self.ids.lock().unwrap_or_else(|e| e.into_inner()); ids.iter_mut().any(|e| self.probe_entry(e))
}
#[cfg(feature = "process-control")]
fn live_snapshot(&self) -> Vec<i32> {
let mut ids = self.ids.lock().unwrap_or_else(|e| e.into_inner()); ids.retain_mut(|e| self.probe_entry(e));
ids.iter().map(|e| e.id).collect()
}
#[cfg(feature = "stats")]
fn count_alive(&self) -> usize {
let mut ids = self.ids.lock().unwrap_or_else(|e| e.into_inner()); let mut alive = 0;
for e in ids.iter_mut() {
if self.probe_entry(e) {
alive += 1;
}
}
alive
}
}
pub(crate) struct ProcessGroup {
groups: Tracked,
solos: Tracked,
skip_drop_kill: super::SkipDropKill,
}
impl ProcessGroup {
pub(crate) fn new() -> Self {
ProcessGroup {
groups: Tracked::new(true),
solos: Tracked::new(false),
skip_drop_kill: super::SkipDropKill::new(),
}
}
pub(crate) fn spawn(
&self,
cmd: &mut Command,
opts: &crate::sys::SpawnOptions,
) -> io::Result<Child> {
if !opts.setsid {
cmd.as_std_mut().process_group(0);
}
let child = cmd.spawn()?;
if let Some(pid) = child.id() {
self.groups.track(pid as i32, !opts.setsid);
}
Ok(child)
}
#[cfg(feature = "process-control")]
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let pid = child
.id()
.ok_or_else(|| io::Error::other("child has no pid (already exited?)"))?
as i32;
let rc = unsafe { libc::setpgid(pid, 0) };
if rc == 0 {
self.groups.track(pid, true);
return Ok(());
}
let err = io::Error::last_os_error();
match err.raw_os_error().unwrap_or(0) {
code if code == libc::ESRCH => Ok(()),
code if code == libc::EACCES || code == libc::EPERM => {
if !self.groups.contains(pid) {
self.solos.track(pid, false);
}
Ok(())
}
_ => Err(err),
}
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
self.broadcast(libc::SIGKILL);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn signal(&self, sig: i32) -> io::Result<()> {
self.broadcast(sig);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn suspend(&self) -> io::Result<()> {
self.broadcast(libc::SIGSTOP);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn resume(&self) -> io::Result<()> {
self.broadcast(libc::SIGCONT);
Ok(())
}
fn broadcast(&self, sig: i32) {
self.groups.signal_all(sig);
self.solos.signal_all(sig);
}
fn any_alive(&self) -> bool {
self.groups.any_alive() || self.solos.any_alive()
}
#[cfg(feature = "process-control")]
pub(crate) fn members(&self) -> Vec<i32> {
let mut members = self.groups.live_snapshot();
members.extend_from_slice(&self.solos.live_snapshot());
members
}
pub(crate) async fn graceful_shutdown(
&self,
signal: i32,
timeout: Duration,
escalate: bool,
) -> io::Result<()> {
super::graceful::run(self, &self.skip_drop_kill, signal, timeout, escalate).await
}
#[cfg(feature = "stats")]
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
Ok(ProcessGroupStats {
active_process_count: self.groups.count_alive() + self.solos.count_alive(),
total_cpu_time: None,
peak_memory_bytes: None,
})
}
}
impl super::graceful::GracefulTarget for ProcessGroup {
fn signal_all(&self, signal: i32) {
self.broadcast(signal);
}
fn is_drained(&self) -> bool {
!self.any_alive()
}
fn hard_kill(&self) -> io::Result<()> {
self.broadcast(libc::SIGKILL);
Ok(())
}
}
impl Drop for ProcessGroup {
fn drop(&mut self) {
if !self.skip_drop_kill.is_set() {
self.broadcast(libc::SIGKILL);
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::process::Command;
use super::*;
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn escalate_false_does_not_kill_survivors() {
let pg = ProcessGroup::new();
let opts = crate::sys::SpawnOptions::default();
let mut cmd = Command::new("sh");
cmd.arg("-c").arg("trap '' TERM; while :; do :; done");
cmd.kill_on_drop(true);
let mut child = pg.spawn(&mut cmd, &opts).unwrap();
let pid = child.id().unwrap() as i32;
tokio::time::sleep(Duration::from_millis(50)).await;
pg.graceful_shutdown(libc::SIGTERM, Duration::from_millis(100), false)
.await
.unwrap();
drop(pg);
let alive = unsafe { libc::kill(pid, 0) } == 0;
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
assert!(alive, "child must survive when escalate_to_kill=false");
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn esrch_on_group_probe_does_not_prune_a_live_pid() {
let tracked = Tracked::new(true);
let mut child = Command::new("sh")
.arg("-c")
.arg("sleep 60")
.kill_on_drop(true)
.spawn()
.unwrap();
let pid = child.id().unwrap() as i32;
let group_ok = unsafe { libc::kill(-pid, 0) } == 0;
let pid_ok = unsafe { libc::kill(pid, 0) } == 0;
if group_ok {
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
return;
}
assert!(pid_ok, "spawned child must be alive");
let exists = tracked.probe_raw(pid, false).0;
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
assert!(
exists,
"a process that exists as a pid but not as a group leader \
must be considered alive (L6 fallback, pre-latch)"
);
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn group_seen_latch_disables_l6_fallback() {
let tracked = Tracked::new(true);
let mut child = Command::new("sh")
.arg("-c")
.arg("sleep 60")
.kill_on_drop(true)
.spawn()
.unwrap();
let pid = child.id().unwrap() as i32;
if unsafe { libc::kill(-pid, 0) } == 0 {
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
return;
}
assert!(
tracked.probe_raw(pid, false).0,
"pre-latch: L6 keeps a live pid"
);
assert!(
!tracked.probe_raw(pid, true).0,
"post-latch: L6 disabled — a not-a-group-leader pid is treated as gone (B5)"
);
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
}
#[cfg(feature = "process-control")]
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn adopt_of_an_already_spawned_child_does_not_double_track() {
let pg = ProcessGroup::new();
let opts = crate::sys::SpawnOptions::default();
let mut cmd = Command::new("sh");
cmd.arg("-c").arg("sleep 60");
cmd.kill_on_drop(true);
let mut child = pg.spawn(&mut cmd, &opts).unwrap();
let pid = child.id().unwrap() as i32;
pg.adopt(&child).unwrap();
let members = pg.members();
assert_eq!(
members.iter().filter(|&&m| m == pid).count(),
1,
"an already-spawned child must be tracked once, not double-tracked"
);
drop(pg);
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
}
}