extern crate libc;
extern crate tokio_signal;
use std::io;
use std::os::unix::prelude::*;
use std::process::{self, ExitStatus};
use futures::future::FlattenStream;
use futures::{Future, Poll, Async, Stream};
use mio::unix::{EventedFd, UnixReady};
use mio::{PollOpt, Ready, Token};
use mio::event::Evented;
use mio;
use self::tokio_signal::unix::Signal;
use std::fmt;
use tokio_io::IoFuture;
use tokio_core::reactor::{Handle, PollEvented};
#[must_use = "futures do nothing unless polled"]
pub struct Child {
inner: process::Child,
reaped: bool,
sigchld: FlattenStream<IoFuture<Signal>>,
}
impl fmt::Debug for Child {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Child")
.field("pid", &self.inner.id())
.field("inner", &self.inner)
.field("reaped", &self.reaped)
.field("sigchld", &"..")
.finish()
}
}
impl Child {
pub fn new(inner: process::Child, handle: &Handle) -> Child {
Child {
inner: inner,
reaped: false,
sigchld: Signal::new(libc::SIGCHLD, handle).flatten_stream(),
}
}
pub fn register_stdin(&mut self, handle: &Handle)
-> io::Result<Option<ChildStdin>> {
stdio(self.inner.stdin.take(), handle)
}
pub fn register_stdout(&mut self, handle: &Handle)
-> io::Result<Option<ChildStdout>> {
stdio(self.inner.stdout.take(), handle)
}
pub fn register_stderr(&mut self, handle: &Handle)
-> io::Result<Option<ChildStderr>> {
stdio(self.inner.stderr.take(), handle)
}
pub fn id(&self) -> u32 {
self.inner.id()
}
pub fn kill(&mut self) -> io::Result<()> {
if !self.reaped {
self.inner.kill()?;
let _ = self.try_wait(true);
}
Ok(())
}
pub fn poll_exit(&mut self) -> Poll<ExitStatus, io::Error> {
loop {
if let Some(e) = try!(self.try_wait(false)) {
return Ok(e.into())
}
if try!(self.sigchld.poll()).is_not_ready() {
return Ok(Async::NotReady)
}
}
}
fn try_wait(&mut self, block_on_wait: bool) -> io::Result<Option<ExitStatus>> {
assert!(!self.reaped);
let exit = try!(try_wait_process(self.id() as libc::pid_t, block_on_wait));
if let Some(_) = exit {
self.reaped = true;
}
Ok(exit)
}
}
fn try_wait_process(id: libc::pid_t, block_on_wait: bool) -> io::Result<Option<ExitStatus>> {
let wait_flags = if block_on_wait { 0 } else { libc::WNOHANG };
let mut status = 0;
loop {
match unsafe { libc::waitpid(id, &mut status, wait_flags) } {
0 => return Ok(None),
n if n < 0 => {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue
}
return Err(err)
}
n => {
assert_eq!(n, id);
return Ok(Some(ExitStatus::from_raw(status)))
}
}
}
}
#[derive(Debug)]
pub struct Fd<T>(T);
impl<T: io::Read> io::Read for Fd<T> {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.0.read(bytes)
}
}
impl<T: io::Write> io::Write for Fd<T> {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.0.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
pub type ChildStdin = PollEvented<Fd<process::ChildStdin>>;
pub type ChildStdout = PollEvented<Fd<process::ChildStdout>>;
pub type ChildStderr = PollEvented<Fd<process::ChildStderr>>;
impl<T> Evented for Fd<T> where T: AsRawFd {
fn register(&self,
poll: &mio::Poll,
token: Token,
interest: Ready,
opts: PollOpt)
-> io::Result<()> {
EventedFd(&self.0.as_raw_fd()).register(poll,
token,
interest | UnixReady::hup(),
opts)
}
fn reregister(&self,
poll: &mio::Poll,
token: Token,
interest: Ready,
opts: PollOpt)
-> io::Result<()> {
EventedFd(&self.0.as_raw_fd()).reregister(poll,
token,
interest | UnixReady::hup(),
opts)
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.0.as_raw_fd()).deregister(poll)
}
}
fn stdio<T>(option: Option<T>, handle: &Handle)
-> io::Result<Option<PollEvented<Fd<T>>>>
where T: AsRawFd
{
let io = match option {
Some(io) => io,
None => return Ok(None),
};
unsafe {
let fd = io.as_raw_fd();
let r = libc::fcntl(fd, libc::F_GETFL);
if r == -1 {
return Err(io::Error::last_os_error())
}
let r = libc::fcntl(fd, libc::F_SETFL, r | libc::O_NONBLOCK);
if r == -1 {
return Err(io::Error::last_os_error())
}
}
let io = try!(PollEvented::new(Fd(io), handle));
Ok(Some(io))
}