use std::io;
use std::os::unix::process::CommandExt;
use std::sync::Mutex;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::time::{Instant, sleep};
use crate::stats::ProcessGroupStats;
const POLL_INTERVAL: Duration = Duration::from_millis(20);
pub(crate) struct ProcessGroup {
pgids: Mutex<Vec<i32>>,
}
impl ProcessGroup {
pub(crate) fn new() -> Self {
ProcessGroup {
pgids: Mutex::new(Vec::new()),
}
}
pub(crate) fn spawn(&self, cmd: &mut Command) -> io::Result<Child> {
cmd.as_std_mut().process_group(0);
let child = cmd.spawn()?;
if let Some(pid) = child.id()
&& let Ok(mut g) = self.pgids.lock()
{
retain_live(&mut g);
g.push(pid as i32);
}
Ok(child)
}
pub(crate) fn adopt(&self, child: &Child) -> io::Result<()> {
let pid = child
.id()
.ok_or_else(|| io::Error::other("child has no pid (already exited?)"))?
as i32;
let rc = unsafe { libc::setpgid(pid, 0) };
if rc != 0 {
let err = io::Error::last_os_error();
let code = err.raw_os_error().unwrap_or(0);
if code != libc::ESRCH && code != libc::EPERM && code != libc::EACCES {
return Err(err);
}
}
if let Ok(mut g) = self.pgids.lock() {
retain_live(&mut g);
g.push(pid);
}
Ok(())
}
pub(crate) fn kill_all(&self) -> io::Result<()> {
signal_groups(&self.pgids, libc::SIGKILL);
Ok(())
}
pub(crate) async fn graceful_shutdown(
&self,
timeout: Duration,
escalate: bool,
) -> io::Result<()> {
signal_groups(&self.pgids, libc::SIGTERM);
let deadline = Instant::now() + timeout;
while groups_alive(&self.pgids) {
if Instant::now() >= deadline {
break;
}
sleep(POLL_INTERVAL).await;
}
if escalate && groups_alive(&self.pgids) {
signal_groups(&self.pgids, libc::SIGKILL);
}
Ok(())
}
pub(crate) fn stats(&self) -> io::Result<ProcessGroupStats> {
let active = match self.pgids.lock() {
Ok(g) => g
.iter()
.filter(|&&pgid| unsafe { libc::kill(-pgid, 0) == 0 })
.count(),
Err(_) => 0,
};
Ok(ProcessGroupStats {
active_process_count: active,
total_cpu_time: None,
peak_memory_bytes: None,
})
}
}
impl Drop for ProcessGroup {
fn drop(&mut self) {
signal_groups(&self.pgids, libc::SIGKILL);
}
}
fn signal_groups(pgids: &Mutex<Vec<i32>>, sig: i32) {
if let Ok(mut g) = pgids.lock() {
g.retain(|&pgid| {
if unsafe { libc::kill(-pgid, 0) } != 0 {
return false; }
unsafe { libc::killpg(pgid, sig) };
true
});
}
}
fn groups_alive(pgids: &Mutex<Vec<i32>>) -> bool {
let Ok(g) = pgids.lock() else {
return false;
};
g.iter().any(|&pgid| {
unsafe { libc::kill(-pgid, 0) == 0 }
})
}
fn retain_live(pgids: &mut Vec<i32>) {
pgids.retain(|&pgid| unsafe { libc::kill(-pgid, 0) == 0 });
}