use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use event_listener::Event;
use futures_lite::future;
use std::io;
use std::mem;
use std::sync::Mutex;
pub(crate) type Lock = AsyncMutexGuard<'static, ()>;
pub(crate) struct Reaper {
sigchld: Event,
zombies: Mutex<Vec<std::process::Child>>,
pipe: Pipe,
driver_guard: AsyncMutex<()>,
}
impl Reaper {
pub(crate) fn new() -> Self {
Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
driver_guard: AsyncMutex::new(()),
}
}
pub(crate) async fn lock(&self) -> AsyncMutexGuard<'_, ()> {
self.driver_guard.lock().await
}
pub(crate) async fn reap(&'static self, _driver_guard: async_lock::MutexGuard<'_, ()>) -> ! {
loop {
self.pipe.wait().await;
self.sigchld.notify(usize::MAX);
let mut zombies = mem::take(&mut *self.zombies.lock().unwrap());
let mut i = 0;
'reap_zombies: loop {
for _ in 0..50 {
if i >= zombies.len() {
break 'reap_zombies;
}
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
future::yield_now().await;
zombies.append(&mut self.zombies.lock().unwrap());
}
self.zombies.lock().unwrap().append(&mut zombies);
}
}
pub(crate) fn register(&'static self, child: std::process::Child) -> io::Result<ChildGuard> {
self.pipe.register(&child)?;
Ok(ChildGuard { inner: Some(child) })
}
pub(crate) async fn status(
&'static self,
child: &Mutex<crate::ChildGuard>,
) -> io::Result<std::process::ExitStatus> {
loop {
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
event_listener::listener!(self.sigchld => listener);
if let Some(status) = child.lock().unwrap().get_mut().try_wait()? {
return Ok(status);
}
listener.await;
}
}
pub(crate) fn has_zombies(&'static self) -> bool {
!self
.zombies
.lock()
.unwrap_or_else(|x| x.into_inner())
.is_empty()
}
}
pub(crate) struct ChildGuard {
inner: Option<std::process::Child>,
}
impl ChildGuard {
pub(crate) fn get_mut(&mut self) -> &mut std::process::Child {
self.inner.as_mut().unwrap()
}
pub(crate) fn reap(&mut self, reaper: &'static Reaper) {
if let Ok(None) = self.get_mut().try_wait() {
reaper
.zombies
.lock()
.unwrap()
.push(self.inner.take().unwrap());
}
}
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
use async_channel::{Sender, Receiver, bounded};
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::ptr;
use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};
struct Pipe {
sender: Sender<()>,
receiver: Receiver<()>,
}
impl Pipe {
fn new() -> io::Result<Pipe> {
let (sender, receiver) = bounded(1);
Ok(Pipe {
sender,
receiver
})
}
async fn wait(&self) {
self.receiver.recv().await.ok();
}
fn register(&self, child: &std::process::Child) -> io::Result<()> {
#[allow(clippy::infallible_destructuring_match)]
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
let reaper = match &crate::Reaper::get().sys {
super::Reaper::Signal(reaper) => reaper,
};
reaper.pipe.sender.try_send(()).ok();
}
let mut wait_object = ptr::null_mut();
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}
} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};
use futures_lite::prelude::*;
struct Pipe {
signals: Signals,
}
impl Pipe {
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}
async fn wait(&self) {
(&self.signals).next().await;
}
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}
}
}