use std::io;
use std::os::unix::process::CommandExt;
use std::sync::Mutex;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::time::{Instant, sleep};
#[cfg(feature = "stats")]
use crate::stats::ProcessGroupStats;
const POLL_INTERVAL: Duration = Duration::from_millis(20);
struct Tracked {
ids: Mutex<Vec<i32>>,
group: bool,
}
impl Tracked {
const fn new(group: bool) -> Self {
Tracked {
ids: Mutex::new(Vec::new()),
group,
}
}
fn exists(&self, id: i32) -> bool {
let probe = if self.group { -id } else { id };
if unsafe { libc::kill(probe, 0) } == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
fn track(&self, id: i32) {
if let Ok(mut ids) = self.ids.lock() {
ids.retain(|&id| self.exists(id));
if !ids.contains(&id) {
ids.push(id);
}
}
}
fn signal_all(&self, sig: i32) {
if let Ok(mut ids) = self.ids.lock() {
ids.retain(|&id| {
if !self.exists(id) {
return false; }
unsafe {
if self.group {
libc::killpg(id, sig);
} else {
libc::kill(id, sig);
}
}
true
});
}
}
fn any_alive(&self) -> bool {
self.ids
.lock()
.map(|ids| ids.iter().any(|&id| self.exists(id)))
.unwrap_or(false)
}
#[cfg(feature = "process-control")]
fn live_snapshot(&self) -> Vec<i32> {
match self.ids.lock() {
Ok(mut ids) => {
ids.retain(|&id| self.exists(id));
ids.clone()
}
Err(_) => Vec::new(),
}
}
#[cfg(feature = "stats")]
fn count_alive(&self) -> usize {
self.ids
.lock()
.map(|ids| ids.iter().filter(|&&id| self.exists(id)).count())
.unwrap_or(0)
}
}
pub(crate) struct ProcessGroup {
groups: Tracked,
solos: Tracked,
}
impl ProcessGroup {
pub(crate) fn new() -> Self {
ProcessGroup {
groups: Tracked::new(true),
solos: Tracked::new(false),
}
}
pub(crate) fn spawn(
&self,
cmd: &mut Command,
opts: &crate::sys::SpawnOptions,
) -> io::Result<Child> {
if !opts.setsid {
cmd.as_std_mut().process_group(0);
}
let child = cmd.spawn()?;
if let Some(pid) = child.id() {
self.groups.track(pid as i32);
}
Ok(child)
}
#[cfg(feature = "process-control")]
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let pid = child
.id()
.ok_or_else(|| io::Error::other("child has no pid (already exited?)"))?
as i32;
let rc = unsafe { libc::setpgid(pid, 0) };
if rc == 0 {
self.groups.track(pid);
return Ok(());
}
let err = io::Error::last_os_error();
match err.raw_os_error().unwrap_or(0) {
code if code == libc::ESRCH => Ok(()),
code if code == libc::EACCES || code == libc::EPERM => {
self.solos.track(pid);
Ok(())
}
_ => Err(err),
}
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
self.broadcast(libc::SIGKILL);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn signal(&self, sig: i32) -> io::Result<()> {
self.broadcast(sig);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn suspend(&self) -> io::Result<()> {
self.broadcast(libc::SIGSTOP);
Ok(())
}
#[cfg(feature = "process-control")]
pub(crate) fn resume(&self) -> io::Result<()> {
self.broadcast(libc::SIGCONT);
Ok(())
}
fn broadcast(&self, sig: i32) {
self.groups.signal_all(sig);
self.solos.signal_all(sig);
}
fn any_alive(&self) -> bool {
self.groups.any_alive() || self.solos.any_alive()
}
#[cfg(feature = "process-control")]
pub(crate) fn members(&self) -> Vec<i32> {
let mut members = self.groups.live_snapshot();
members.extend_from_slice(&self.solos.live_snapshot());
members
}
pub(crate) async fn graceful_shutdown(
&self,
timeout: Duration,
escalate: bool,
) -> io::Result<()> {
self.broadcast(libc::SIGTERM);
let deadline = Instant::now() + timeout;
while self.any_alive() {
if Instant::now() >= deadline {
break;
}
sleep(POLL_INTERVAL).await;
}
if escalate && self.any_alive() {
self.broadcast(libc::SIGKILL);
}
Ok(())
}
#[cfg(feature = "stats")]
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
Ok(ProcessGroupStats {
active_process_count: self.groups.count_alive() + self.solos.count_alive(),
total_cpu_time: None,
peak_memory_bytes: None,
})
}
}
impl Drop for ProcessGroup {
fn drop(&mut self) {
self.broadcast(libc::SIGKILL);
}
}