use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use async_signal::{Signal, Signals};
use event_listener::Event;
use futures_lite::{future, prelude::*};
use std::io;
use std::mem;
use std::sync::Mutex;
pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
pub(crate) struct Reaper {
sigchld: Event,
zombies: Mutex<Vec<std::process::Child>>,
pipe: Pipe,
driver_guard: AsyncMutex<()>,
}
impl Reaper {
pub(crate) fn new() -> Self {
Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
}
}
pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
self.driver_guard.lock().await
}
pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
self.pipe.wait().await;
self.sigchld.notify(usize::MAX);
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
#[allow(clippy::zombie_processes)] zombies.swap_remove(i);
}
}
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
self.zombies.lock().unwrap().append(&mut zombies);
}
}
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
self.pipe.register(&child)?;
Ok(ChildGuard { inner: Some(child) })
}
pub(crate) async fn status(
&'static self,
child: &Mutex<crate::ChildGuard>,
) -> io::Result<std::process::ExitStatus> {
loop {
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
event_listener::listener!(self.sigchld => listener);
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
listener.await;
}
}
pub(crate) fn has_zombies(&'static self) -> bool {
!self
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
}
}
pub(crate) struct ChildGuard {
inner: Option<std::process::Child>,
}
impl ChildGuard {
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap()
}
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
if let Ok(None) = self.get_mut().try_wait() {
reaper
.zombies
.lock()
.unwrap()
.push(self.inner.take().unwrap());
}
}
}
struct Pipe {
signals: Signals,
}
impl Pipe {
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}
async fn wait(&self) {
(&self.signals).next().await;
}
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}