#![warn(missing_debug_implementations)]
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/tokio-process/0.2")]
extern crate futures;
extern crate tokio_io;
extern crate tokio_reactor;
#[cfg(unix)]
#[macro_use]
extern crate lazy_static;
#[cfg(unix)]
#[macro_use]
extern crate log;
use std::io::{self, Read, Write};
use std::process::{Command, ExitStatus, Output, Stdio};
use crate::kill::Kill;
use futures::future::{ok, Either};
use futures::{Async, Future, IntoFuture, Poll};
use std::fmt;
use tokio_io::io::read_to_end;
use tokio_io::{AsyncRead, AsyncWrite, IoFuture};
use tokio_reactor::Handle;
#[path = "unix/mod.rs"]
#[cfg(unix)]
mod imp;
#[path = "windows.rs"]
#[cfg(windows)]
mod imp;
mod kill;
pub trait CommandExt {
fn spawn_async(&mut self) -> io::Result<Child> {
self.spawn_async_with_handle(&Handle::default())
}
fn spawn_async_with_handle(&mut self, handle: &Handle) -> io::Result<Child>;
fn status_async(&mut self) -> io::Result<StatusAsync> {
self.status_async_with_handle(&Handle::default())
}
fn status_async_with_handle(&mut self, handle: &Handle) -> io::Result<StatusAsync>;
fn output_async(&mut self) -> OutputAsync {
self.output_async_with_handle(&Handle::default())
}
fn output_async_with_handle(&mut self, handle: &Handle) -> OutputAsync;
}
struct SpawnedChild {
child: imp::Child,
stdin: Option<imp::ChildStdin>,
stdout: Option<imp::ChildStdout>,
stderr: Option<imp::ChildStderr>,
}
impl CommandExt for Command {
fn spawn_async_with_handle(&mut self, handle: &Handle) -> io::Result<Child> {
imp::spawn_child(self, handle).map(|spawned_child| Child {
child: ChildDropGuard::new(spawned_child.child),
stdin: spawned_child.stdin.map(|inner| ChildStdin { inner }),
stdout: spawned_child.stdout.map(|inner| ChildStdout { inner }),
stderr: spawned_child.stderr.map(|inner| ChildStderr { inner }),
})
}
fn status_async_with_handle(&mut self, handle: &Handle) -> io::Result<StatusAsync> {
self.spawn_async_with_handle(handle).map(|mut child| {
child.stdin.take();
child.stdout.take();
child.stderr.take();
StatusAsync { inner: child }
})
}
fn output_async_with_handle(&mut self, handle: &Handle) -> OutputAsync {
self.stdout(Stdio::piped());
self.stderr(Stdio::piped());
let inner = self
.spawn_async_with_handle(handle)
.into_future()
.and_then(Child::wait_with_output);
OutputAsync {
inner: Box::new(inner),
}
}
}
#[derive(Debug)]
struct ChildDropGuard<T: Kill> {
inner: T,
kill_on_drop: bool,
}
impl<T: Kill> ChildDropGuard<T> {
fn new(inner: T) -> Self {
Self {
inner,
kill_on_drop: true,
}
}
fn forget(&mut self) {
self.kill_on_drop = false;
}
}
impl<T: Kill> Kill for ChildDropGuard<T> {
fn kill(&mut self) -> io::Result<()> {
let ret = self.inner.kill();
if ret.is_ok() {
self.kill_on_drop = false;
}
ret
}
}
impl<T: Kill> Drop for ChildDropGuard<T> {
fn drop(&mut self) {
if self.kill_on_drop {
drop(self.kill());
}
}
}
impl<T: Future + Kill> Future for ChildDropGuard<T> {
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let ret = self.inner.poll();
if let Ok(Async::Ready(_)) = ret {
self.kill_on_drop = false;
}
ret
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Child {
child: ChildDropGuard<imp::Child>,
stdin: Option<ChildStdin>,
stdout: Option<ChildStdout>,
stderr: Option<ChildStderr>,
}
impl Child {
pub fn id(&self) -> u32 {
self.child.inner.id()
}
pub fn kill(&mut self) -> io::Result<()> {
self.child.kill()
}
pub fn stdin(&mut self) -> &mut Option<ChildStdin> {
&mut self.stdin
}
pub fn stdout(&mut self) -> &mut Option<ChildStdout> {
&mut self.stdout
}
pub fn stderr(&mut self) -> &mut Option<ChildStderr> {
&mut self.stderr
}
pub fn wait_with_output(mut self) -> WaitWithOutput {
drop(self.stdin().take());
let stdout = match self.stdout().take() {
Some(io) => Either::A(read_to_end(io, Vec::new()).map(|p| p.1)),
None => Either::B(ok(Vec::new())),
};
let stderr = match self.stderr().take() {
Some(io) => Either::A(read_to_end(io, Vec::new()).map(|p| p.1)),
None => Either::B(ok(Vec::new())),
};
WaitWithOutput {
inner: Box::new(
self.join3(stdout, stderr)
.map(|(status, stdout, stderr)| Output {
status,
stdout,
stderr,
}),
),
}
}
pub fn forget(mut self) {
self.child.forget();
}
}
impl Future for Child {
type Item = ExitStatus;
type Error = io::Error;
fn poll(&mut self) -> Poll<ExitStatus, io::Error> {
self.child.poll()
}
}
#[must_use = "futures do nothing unless polled"]
pub struct WaitWithOutput {
inner: IoFuture<Output>,
}
impl fmt::Debug for WaitWithOutput {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("WaitWithOutput")
.field("inner", &"..")
.finish()
}
}
impl Future for WaitWithOutput {
type Item = Output;
type Error = io::Error;
fn poll(&mut self) -> Poll<Output, io::Error> {
self.inner.poll()
}
}
#[doc(hidden)]
#[deprecated(note = "renamed to `StatusAsync`", since = "0.2.1")]
pub type StatusAsync2 = StatusAsync;
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct StatusAsync {
inner: Child,
}
impl Future for StatusAsync {
type Item = ExitStatus;
type Error = io::Error;
fn poll(&mut self) -> Poll<ExitStatus, io::Error> {
self.inner.poll()
}
}
#[must_use = "futures do nothing unless polled"]
pub struct OutputAsync {
inner: IoFuture<Output>,
}
impl fmt::Debug for OutputAsync {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("OutputAsync")
.field("inner", &"..")
.finish()
}
}
impl Future for OutputAsync {
type Item = Output;
type Error = io::Error;
fn poll(&mut self) -> Poll<Output, io::Error> {
self.inner.poll()
}
}
#[derive(Debug)]
pub struct ChildStdin {
inner: imp::ChildStdin,
}
#[derive(Debug)]
pub struct ChildStdout {
inner: imp::ChildStdout,
}
#[derive(Debug)]
pub struct ChildStderr {
inner: imp::ChildStderr,
}
impl Write for ChildStdin {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.inner.write(bytes)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncWrite for ChildStdin {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
}
}
impl Read for ChildStdout {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.inner.read(bytes)
}
}
impl AsyncRead for ChildStdout {}
impl Read for ChildStderr {
fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
self.inner.read(bytes)
}
}
impl AsyncRead for ChildStderr {}
#[cfg(unix)]
mod sys {
use super::{ChildStderr, ChildStdin, ChildStdout};
use std::os::unix::io::{AsRawFd, RawFd};
impl AsRawFd for ChildStdin {
fn as_raw_fd(&self) -> RawFd {
self.inner.get_ref().as_raw_fd()
}
}
impl AsRawFd for ChildStdout {
fn as_raw_fd(&self) -> RawFd {
self.inner.get_ref().as_raw_fd()
}
}
impl AsRawFd for ChildStderr {
fn as_raw_fd(&self) -> RawFd {
self.inner.get_ref().as_raw_fd()
}
}
}
#[cfg(windows)]
mod sys {
use super::{ChildStderr, ChildStdin, ChildStdout};
use std::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for ChildStdin {
fn as_raw_handle(&self) -> RawHandle {
self.inner.get_ref().as_raw_handle()
}
}
impl AsRawHandle for ChildStdout {
fn as_raw_handle(&self) -> RawHandle {
self.inner.get_ref().as_raw_handle()
}
}
impl AsRawHandle for ChildStderr {
fn as_raw_handle(&self) -> RawHandle {
self.inner.get_ref().as_raw_handle()
}
}
}
#[cfg(test)]
mod test {
use super::ChildDropGuard;
use crate::kill::Kill;
use futures::{Async, Future, Poll};
use std::io;
struct Mock {
num_kills: usize,
num_polls: usize,
poll_result: Poll<(), ()>,
}
impl Mock {
fn new() -> Self {
Self::with_result(Ok(Async::NotReady))
}
fn with_result(result: Poll<(), ()>) -> Self {
Self {
num_kills: 0,
num_polls: 0,
poll_result: result,
}
}
}
impl Kill for Mock {
fn kill(&mut self) -> io::Result<()> {
self.num_kills += 1;
Ok(())
}
}
impl Future for Mock {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.num_polls += 1;
self.poll_result
}
}
#[test]
fn kills_on_drop() {
let mut mock = Mock::new();
{
let guard = ChildDropGuard::new(&mut mock);
drop(guard);
}
assert_eq!(1, mock.num_kills);
assert_eq!(0, mock.num_polls);
}
#[test]
fn no_kill_if_already_killed() {
let mut mock = Mock::new();
{
let mut guard = ChildDropGuard::new(&mut mock);
let _ = guard.kill();
drop(guard);
}
assert_eq!(1, mock.num_kills);
assert_eq!(0, mock.num_polls);
}
#[test]
fn no_kill_if_reaped() {
let mut mock_pending = Mock::with_result(Ok(Async::NotReady));
let mut mock_reaped = Mock::with_result(Ok(Async::Ready(())));
let mut mock_err = Mock::with_result(Err(()));
{
let mut guard = ChildDropGuard::new(&mut mock_pending);
let _ = guard.poll();
let mut guard = ChildDropGuard::new(&mut mock_reaped);
let _ = guard.poll();
let mut guard = ChildDropGuard::new(&mut mock_err);
let _ = guard.poll();
}
assert_eq!(1, mock_pending.num_kills);
assert_eq!(1, mock_pending.num_polls);
assert_eq!(0, mock_reaped.num_kills);
assert_eq!(1, mock_reaped.num_polls);
assert_eq!(1, mock_err.num_kills);
assert_eq!(1, mock_err.num_polls);
}
#[test]
fn no_kill_on_forget() {
let mut mock = Mock::new();
{
let mut guard = ChildDropGuard::new(&mut mock);
guard.forget();
drop(guard);
}
assert_eq!(0, mock.num_kills);
assert_eq!(0, mock.num_polls);
}
}