#[macro_use]
extern crate log;
use std::io;
use std::io::prelude::*;
use std::io::{BufReader};
use std::fs::File;
use std::process::{ChildStdout, Command, Stdio, ExitStatus};
use std::sync::Mutex;
use std::ops::DerefMut;
use std::os::unix::io::FromRawFd;
use std::os::unix::io::AsRawFd;
use std::sync::mpsc;
pub enum Output {
Parent,
Ignore,
ToFd(File),
}
pub struct DealWithOutput { stderr: Output, stdout: Output }
pub fn output() -> DealWithOutput {
DealWithOutput { stdout: Output::Parent, stderr: Output::Parent }
}
impl DealWithOutput {
pub fn stderr(&mut self, stderr: Output) -> &mut Self {
self.stderr = stderr;
self
}
pub fn stdout(&mut self, stdout: Output) -> &mut Self {
self.stdout = stdout;
self
}
}
pub fn process_read_consumer<R: Read>(
deal_with: &mut DealWithOutput,
mut input: R,
cmd_args: (String, Vec<String>))
-> io::Result<ExitStatus>
{
let mut cmd = build_command(cmd_args);
cmd.stdin(Stdio::piped());
setup_stderr(&deal_with.stderr, &mut cmd)?;
setup_stdout(&deal_with.stdout, &mut cmd)?;
let mut process = cmd.spawn()?;
{
let mut stdin = process.stdin.take().expect("impossible! no stdin");
io::copy(&mut input, &mut stdin)?;
}
let status = process.wait()?;
Ok(status)
}
pub fn process_as_reader<R>(
stdin_opt: Option<R>,
stderr: Output,
cmd_args: (String, Vec<String>)) -> io::Result<ChildStream>
where R: Read + Send + 'static,
{
let mut cmd = build_command(cmd_args);
cmd.stdout(Stdio::piped());
if let Some(_) = stdin_opt {
cmd.stdin(Stdio::piped());
}
setup_stderr(&stderr, &mut cmd)?;
let mut process = cmd.spawn()?;
let stdout = process.stdout.take().expect("impossible! no stdout");
let (send_result, receiver) = mpsc::channel();
if let Some(input) = stdin_opt {
let mut stdin = process.stdin.take().expect("impossible! no stdin");
let input_mutex = Mutex::new(input);
let sender = send_result.clone();
let done_stdin = move |result| {
match result {
Err(err) => {
send_or_log_result(sender, Err(ProcessAsyncError::StdinError(err)))
}
Ok(Err(err)) => {
send_or_log_result(sender, Err(ProcessAsyncError::StdinError(err)))
}
Ok(Ok(_)) => {}
}
};
concurrent::spawn_catch_panic(done_stdin, move || {
let mut inp = input_mutex.lock().expect("error locking stdin");
let _ = io::copy(inp.deref_mut(), &mut stdin)?;
Ok(())
});
}
let sender = send_result.clone();
let done_wait = move |result| {
match result {
Err(err) => {
send_or_log_result(sender,
Err(ProcessAsyncError::WaitError(err)))
}
Ok(Err(err)) => {
send_or_log_result(sender,
Err(ProcessAsyncError::WaitError(err)))
}
Ok(Ok(status)) => {
send_or_log_result(sender, Ok(status));
}
}
};
concurrent::spawn_catch_panic(done_wait, move || {
let status = process.wait()?;
Ok(status)
});
Ok(ChildStream {
stdout: BufReader::new(stdout),
wait_result: FutureExitResult::new(receiver)
})
}
#[derive(Debug)]
pub enum ProcessAsyncError {
RecvError(mpsc::RecvError),
WaitError(io::Error),
StdinError(io::Error),
ExitStatusError(Option<i32>),
AlreadyResolvedError,
}
type ProecessAsyncResult = Result<ExitStatus, ProcessAsyncError>;
pub struct FutureExitResult {
recv: mpsc::Receiver<ProecessAsyncResult>,
already: bool,
}
impl FutureExitResult {
fn new(receiver: mpsc::Receiver<ProecessAsyncResult>) -> Self {
FutureExitResult { recv: receiver, already: false }
}
fn exit_status(&mut self) -> ProecessAsyncResult {
if self.already { return Err(ProcessAsyncError::AlreadyResolvedError) }
self.already = true;
match self.recv.recv() {
Err(err) => {
Err(ProcessAsyncError::RecvError(err))
}
Ok(stream_status) => {
stream_status
}
}
}
pub fn wait(&mut self) -> Result<Option<i32>, ProcessAsyncError> {
let status = self.exit_status()?;
if status.success() {
Ok(status.code())
} else {
Err(ProcessAsyncError::ExitStatusError(status.code()))
}
}
}
pub struct ChildStream {
pub stdout: BufReader<ChildStdout>,
pub wait_result: FutureExitResult
}
impl ChildStream {
pub fn wait(&mut self) -> Result<Option<i32>, ProcessAsyncError> {
self.wait_result.wait()
}
}
fn send_or_log_result<T>(sender: mpsc::Sender<Result<T, ProcessAsyncError>>, result: Result<T, ProcessAsyncError>){
match sender.send(result) {
Ok(_) => {}
Err(err) => error!("error sending done message: {}", err),
}
}
fn setup_stderr(deal_with_stderr: &Output, cmd: &mut Command) -> io::Result<()> {
match deal_with_stderr {
&Output::Parent => {}
&Output::Ignore => {
cmd.stderr(Stdio::null());
}
&Output::ToFd(ref file) => {
unsafe {
cmd.stderr(Stdio::from_raw_fd(file.as_raw_fd()));
}
}
}
Ok(())
}
fn setup_stdout(deal_with_stdout: &Output, cmd: &mut Command) -> io::Result<()> {
match deal_with_stdout {
&Output::Parent => {}
&Output::Ignore => {
cmd.stdout(Stdio::null());
}
&Output::ToFd(ref file) => {
unsafe {
cmd.stdout(Stdio::from_raw_fd(file.as_raw_fd()));
}
}
}
Ok(())
}
fn build_command(cmd_args: (String, Vec<String>)) -> Command {
let (exe, args) = cmd_args;
let mut cmd = Command::new(exe);
for arg in args { cmd.arg(arg); }
return cmd
}
mod concurrent {
use std::io;
use std::io::{Error, ErrorKind};
use std::panic;
use std::thread;
use std::thread::JoinHandle;
use std::any::Any;
pub fn caught_panic_to_io_error(err: Box<Any + Send + 'static>) -> io::Error {
let msg = match err.downcast_ref::<&'static str>() {
Some(s) => *s,
None => {
match err.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<Any>",
}
}
};
Error::new(ErrorKind::Other, msg)
}
pub fn spawn_catch_panic<Function, Returned, Finished>(done: Finished, f: Function) -> JoinHandle<()>
where Function: FnOnce() -> Returned,
Function: Send + 'static,
Function: panic::UnwindSafe,
Finished: FnOnce(io::Result<Returned>) -> (),
Finished: Send + 'static,
{
thread::spawn(move || {
let result = panic::catch_unwind(move || { f() });
match result {
Err(err) => { done(Err(caught_panic_to_io_error(err))) }
Ok(ok) => { done(Ok(ok)) }
}
})
}
}