extern crate winapi;
extern crate mio_named_pipes;
use std::fmt;
use std::io;
use std::os::windows::prelude::*;
use std::os::windows::process::ExitStatusExt;
use std::process::{self, ExitStatus};
use futures::future::Fuse;
use futures::sync::oneshot;
use futures::{Future, Poll, Async} ;
use self::mio_named_pipes::NamedPipe;
use self::winapi::shared::minwindef::*;
use self::winapi::shared::winerror::*;
use self::winapi::um::handleapi::*;
use self::winapi::um::processthreadsapi::*;
use self::winapi::um::synchapi::*;
use self::winapi::um::threadpoollegacyapiset::*;
use self::winapi::um::winbase::*;
use self::winapi::um::winnt::*;
use tokio_reactor::{Handle, PollEvented};
#[must_use = "futures do nothing unless polled"]
pub struct Child {
child: process::Child,
waiting: Option<Waiting>,
}
impl fmt::Debug for Child {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Child")
.field("pid", &self.id())
.field("child", &self.child)
.field("waiting", &"..")
.finish()
}
}
struct Waiting {
rx: Fuse<oneshot::Receiver<()>>,
wait_object: HANDLE,
tx: *mut Option<oneshot::Sender<()>>,
}
unsafe impl Sync for Waiting {}
unsafe impl Send for Waiting {}
impl Child {
pub fn new(child: process::Child, _handle: &Handle) -> Child {
Child {
child: child,
waiting: None,
}
}
pub fn register_stdin(&mut self, handle: &Handle)
-> io::Result<Option<ChildStdin>> {
stdio(self.child.stdin.take(), handle)
}
pub fn register_stdout(&mut self, handle: &Handle)
-> io::Result<Option<ChildStdout>> {
stdio(self.child.stdout.take(), handle)
}
pub fn register_stderr(&mut self, handle: &Handle)
-> io::Result<Option<ChildStderr>> {
stdio(self.child.stderr.take(), handle)
}
pub fn id(&self) -> u32 {
self.child.id()
}
pub fn kill(&mut self) -> io::Result<()> {
self.child.kill()
}
pub fn poll_exit(&mut self) -> Poll<ExitStatus, io::Error> {
loop {
if let Some(ref mut w) = self.waiting {
match w.rx.poll().expect("should not be canceled") {
Async::Ready(()) => {}
Async::NotReady => return Ok(Async::NotReady),
}
let status = try!(try_wait(&self.child)).expect("not ready yet");
return Ok(status.into())
}
if let Some(e) = try!(try_wait(&self.child)) {
return Ok(e.into())
}
let (tx, rx) = oneshot::channel();
let ptr = Box::into_raw(Box::new(Some(tx)));
let mut wait_object = 0 as *mut _;
let rc = unsafe {
RegisterWaitForSingleObject(&mut wait_object,
self.child.as_raw_handle(),
Some(callback),
ptr as *mut _,
INFINITE,
WT_EXECUTEINWAITTHREAD |
WT_EXECUTEONLYONCE)
};
if rc == 0 {
let err = io::Error::last_os_error();
drop(unsafe { Box::from_raw(ptr) });
return Err(err)
}
self.waiting = Some(Waiting {
rx: rx.fuse(),
wait_object: wait_object,
tx: ptr,
});
}
}
}
impl Drop for Waiting {
fn drop(&mut self) {
unsafe {
let rc = UnregisterWaitEx(self.wait_object, INVALID_HANDLE_VALUE);
if rc == 0 {
panic!("failed to unregister: {}", io::Error::last_os_error());
}
drop(Box::from_raw(self.tx));
}
}
}
unsafe extern "system" fn callback(ptr: PVOID,
_timer_fired: BOOLEAN) {
let complete = &mut *(ptr as *mut Option<oneshot::Sender<()>>);
drop(complete.take().unwrap().send(()));
}
pub fn try_wait(child: &process::Child) -> io::Result<Option<ExitStatus>> {
unsafe {
match WaitForSingleObject(child.as_raw_handle(), 0) {
WAIT_OBJECT_0 => {}
WAIT_TIMEOUT => return Ok(None),
_ => return Err(io::Error::last_os_error()),
}
let mut status = 0;
let rc = GetExitCodeProcess(child.as_raw_handle(), &mut status);
if rc == FALSE {
Err(io::Error::last_os_error())
} else {
Ok(Some(ExitStatus::from_raw(status)))
}
}
}
pub type ChildStdin = PollEvented<NamedPipe>;
pub type ChildStdout = PollEvented<NamedPipe>;
pub type ChildStderr = PollEvented<NamedPipe>;
fn stdio<T>(option: Option<T>, handle: &Handle)
-> io::Result<Option<PollEvented<NamedPipe>>>
where T: IntoRawHandle,
{
let io = match option {
Some(io) => io,
None => return Ok(None),
};
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
let io = try!(PollEvented::new_with_handle(pipe, handle));
Ok(Some(io))
}