extern crate mio_named_pipes;
extern crate winapi;
use std::fmt;
use std::io;
use std::os::windows::prelude::*;
use std::os::windows::process::ExitStatusExt;
use std::process::{self, ExitStatus};
use std::ptr;
use self::mio_named_pipes::NamedPipe;
use self::winapi::shared::minwindef::*;
use self::winapi::shared::winerror::*;
use self::winapi::um::handleapi::*;
use self::winapi::um::processthreadsapi::*;
use self::winapi::um::synchapi::*;
use self::winapi::um::threadpoollegacyapiset::*;
use self::winapi::um::winbase::*;
use self::winapi::um::winnt::*;
use super::SpawnedChild;
use crate::kill::Kill;
use futures::future::Fuse;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use tokio_reactor::{Handle, PollEvented};
#[must_use = "futures do nothing unless polled"]
pub struct Child {
child: process::Child,
waiting: Option<Waiting>,
}
impl fmt::Debug for Child {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Child")
.field("pid", &self.id())
.field("child", &self.child)
.field("waiting", &"..")
.finish()
}
}
struct Waiting {
rx: Fuse<oneshot::Receiver<()>>,
wait_object: HANDLE,
tx: *mut Option<oneshot::Sender<()>>,
}
unsafe impl Sync for Waiting {}
unsafe impl Send for Waiting {}
pub(crate) fn spawn_child(cmd: &mut process::Command, handle: &Handle) -> io::Result<SpawnedChild> {
let mut child = cmd.spawn()?;
let stdin = stdio(child.stdin.take(), handle)?;
let stdout = stdio(child.stdout.take(), handle)?;
let stderr = stdio(child.stderr.take(), handle)?;
Ok(SpawnedChild {
child: Child {
child,
waiting: None,
},
stdin,
stdout,
stderr,
})
}
impl Child {
pub fn id(&self) -> u32 {
self.child.id()
}
}
impl Kill for Child {
fn kill(&mut self) -> io::Result<()> {
self.child.kill()
}
}
impl Future for Child {
type Item = ExitStatus;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let Some(ref mut w) = self.waiting {
match w.rx.poll().expect("should not be canceled") {
Async::Ready(()) => {}
Async::NotReady => return Ok(Async::NotReady),
}
let status = try_wait(&self.child)?.expect("not ready yet");
return Ok(status.into());
}
if let Some(e) = try_wait(&self.child)? {
return Ok(e.into());
}
let (tx, rx) = oneshot::channel();
let ptr = Box::into_raw(Box::new(Some(tx)));
let mut wait_object = ptr::null_mut();
let rc = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
self.child.as_raw_handle(),
Some(callback),
ptr as *mut _,
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if rc == 0 {
let err = io::Error::last_os_error();
drop(unsafe { Box::from_raw(ptr) });
return Err(err);
}
self.waiting = Some(Waiting {
rx: rx.fuse(),
wait_object,
tx: ptr,
});
}
}
}
impl Drop for Waiting {
fn drop(&mut self) {
unsafe {
let rc = UnregisterWaitEx(self.wait_object, INVALID_HANDLE_VALUE);
if rc == 0 {
panic!("failed to unregister: {}", io::Error::last_os_error());
}
drop(Box::from_raw(self.tx));
}
}
}
unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
let complete = &mut *(ptr as *mut Option<oneshot::Sender<()>>);
let _ = complete.take().unwrap().send(());
}
pub fn try_wait(child: &process::Child) -> io::Result<Option<ExitStatus>> {
unsafe {
match WaitForSingleObject(child.as_raw_handle(), 0) {
WAIT_OBJECT_0 => {}
WAIT_TIMEOUT => return Ok(None),
_ => return Err(io::Error::last_os_error()),
}
let mut status = 0;
let rc = GetExitCodeProcess(child.as_raw_handle(), &mut status);
if rc == FALSE {
Err(io::Error::last_os_error())
} else {
Ok(Some(ExitStatus::from_raw(status)))
}
}
}
pub type ChildStdin = PollEvented<NamedPipe>;
pub type ChildStdout = PollEvented<NamedPipe>;
pub type ChildStderr = PollEvented<NamedPipe>;
fn stdio<T>(option: Option<T>, handle: &Handle) -> io::Result<Option<PollEvented<NamedPipe>>>
where
T: IntoRawHandle,
{
let io = match option {
Some(io) => io,
None => return Ok(None),
};
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
let io = PollEvented::new_with_handle(pipe, handle)?;
Ok(Some(io))
}