use std::io;
use std::os::unix::process::CommandExt;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::time::{Instant, sleep};
#[cfg(feature = "stats")]
use crate::stats::ProcessGroupStats;
const POLL_INTERVAL: Duration = Duration::from_millis(20);
struct Tracked {
ids: Mutex<Vec<i32>>,
group: bool,
}
impl Tracked {
const fn new(group: bool) -> Self {
Tracked {
ids: Mutex::new(Vec::new()),
group,
}
}
fn exists(&self, id: i32) -> bool {
let probe = if self.group { -id } else { id };
if unsafe { libc::kill(probe, 0) } == 0 {
return true;
}
let err = std::io::Error::last_os_error().raw_os_error();
if err == Some(libc::EPERM) {
return true;
}
if self.group && err == Some(libc::ESRCH) {
if unsafe { libc::kill(id, 0) } == 0 {
return true;
}
return std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM);
}
false
}
fn track(&self, id: i32) {
if let Ok(mut ids) = self.ids.lock() {
ids.retain(|&id| self.exists(id));
if !ids.contains(&id) {
ids.push(id);
}
}
}
fn signal_all(&self, sig: i32) {
if let Ok(mut ids) = self.ids.lock() {
ids.retain(|&id| {
if !self.exists(id) {
return false; }
unsafe {
if self.group {
if libc::killpg(id, sig) == -1
&& io::Error::last_os_error().raw_os_error() == Some(libc::ESRCH)
{
libc::kill(id, sig);
}
} else {
libc::kill(id, sig);
}
}
true
});
}
}
fn any_alive(&self) -> bool {
self.ids
.lock()
.map(|ids| ids.iter().any(|&id| self.exists(id)))
.unwrap_or(false)
}
#[cfg(feature = "process-control")]
fn live_snapshot(&self) -> Vec<i32> {
match self.ids.lock() {
Ok(mut ids) => {
ids.retain(|&id| self.exists(id));
ids.clone()
}
Err(_) => Vec::new(),
}
}
#[cfg(feature = "stats")]
fn count_alive(&self) -> usize {
self.ids
.lock()
.map(|ids| ids.iter().filter(|&&id| self.exists(id)).count())
.unwrap_or(0)
}
}
pub(crate) struct ProcessGroup {
groups: Tracked,
solos: Tracked,
skip_drop_kill: AtomicBool,
}
impl ProcessGroup {
pub(crate) fn new() -> Self {
ProcessGroup {
groups: Tracked::new(true),
solos: Tracked::new(false),
skip_drop_kill: AtomicBool::new(false),
}
}
pub(crate) fn spawn(
&self,
cmd: &mut Command,
opts: &crate::sys::SpawnOptions,
) -> io::Result<Child> {
if !opts.setsid {
cmd.as_std_mut().process_group(0);
}
let child = cmd.spawn()?;
if let Some(pid) = child.id() {
self.groups.track(pid as i32);
}
Ok(child)
}
#[cfg(feature = "process-control")]
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let pid = child
.id()
.ok_or_else(|| io::Error::other("child has no pid (already exited?)"))?
as i32;
let rc = unsafe { libc::setpgid(pid, 0) };
if rc == 0 {
self.groups.track(pid);
return Ok(());
}
let err = io::Error::last_os_error();
match err.raw_os_error().unwrap_or(0) {
code if code == libc::ESRCH => Ok(()),
code if code == libc::EACCES || code == libc::EPERM => {
self.solos.track(pid);
Ok(())
}
_ => Err(err),
}
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
self.broadcast(libc::SIGKILL);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn signal(&self, sig: i32) -> io::Result<()> {
self.broadcast(sig);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn suspend(&self) -> io::Result<()> {
self.broadcast(libc::SIGSTOP);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn resume(&self) -> io::Result<()> {
self.broadcast(libc::SIGCONT);
Ok(())
}
fn broadcast(&self, sig: i32) {
self.groups.signal_all(sig);
self.solos.signal_all(sig);
}
fn any_alive(&self) -> bool {
self.groups.any_alive() || self.solos.any_alive()
}
#[cfg(feature = "process-control")]
pub(crate) fn members(&self) -> Vec<i32> {
let mut members = self.groups.live_snapshot();
members.extend_from_slice(&self.solos.live_snapshot());
members
}
pub(crate) async fn graceful_shutdown(
&self,
signal: i32,
timeout: Duration,
escalate: bool,
) -> io::Result<()> {
self.broadcast(signal);
let deadline = Instant::now() + timeout;
while self.any_alive() {
if Instant::now() >= deadline {
break;
}
sleep(POLL_INTERVAL).await;
}
if escalate && self.any_alive() {
self.broadcast(libc::SIGKILL);
} else if !escalate {
self.skip_drop_kill.store(true, Ordering::Relaxed);
}
Ok(())
}
#[cfg(feature = "stats")]
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
Ok(ProcessGroupStats {
active_process_count: self.groups.count_alive() + self.solos.count_alive(),
total_cpu_time: None,
peak_memory_bytes: None,
})
}
}
impl Drop for ProcessGroup {
fn drop(&mut self) {
if !self.skip_drop_kill.load(Ordering::Relaxed) {
self.broadcast(libc::SIGKILL);
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::process::Command;
use super::*;
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn escalate_false_does_not_kill_survivors() {
let pg = ProcessGroup::new();
let opts = crate::sys::SpawnOptions::default();
let mut cmd = Command::new("sh");
cmd.arg("-c").arg("trap '' TERM; while :; do :; done");
cmd.kill_on_drop(true);
let mut child = pg.spawn(&mut cmd, &opts).unwrap();
let pid = child.id().unwrap() as i32;
tokio::time::sleep(Duration::from_millis(50)).await;
pg.graceful_shutdown(libc::SIGTERM, Duration::from_millis(100), false)
.await
.unwrap();
drop(pg);
let alive = unsafe { libc::kill(pid, 0) } == 0;
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
assert!(alive, "child must survive when escalate_to_kill=false");
}
#[tokio::test]
#[ignore = "spawns a real subprocess"]
async fn esrch_on_group_probe_does_not_prune_a_live_pid() {
let tracked = Tracked::new(true);
let mut child = Command::new("sh")
.arg("-c")
.arg("sleep 60")
.kill_on_drop(true)
.spawn()
.unwrap();
let pid = child.id().unwrap() as i32;
let group_ok = unsafe { libc::kill(-pid, 0) } == 0;
let pid_ok = unsafe { libc::kill(pid, 0) } == 0;
if group_ok {
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
return;
}
assert!(pid_ok, "spawned child must be alive");
let exists = tracked.exists(pid);
let _ = unsafe { libc::kill(pid, libc::SIGKILL) };
let _ = child.wait().await;
assert!(
exists,
"a process that exists as a pid but not as a group leader \
must be considered alive by exists()"
);
}
}