mio 0.6.21

Lightweight non-blocking IO
Documentation
use {TryRead, TryWrite};
use std::mem;
use mio::*;
use std::io;
use mio::deprecated::{EventLoop, Handler};
use mio::deprecated::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio, Child};


struct SubprocessClient {
    stdin: Option<PipeWriter>,
    stdout: Option<PipeReader>,
    stderr: Option<PipeReader>,
    stdin_token : Token,
    stdout_token : Token,
    stderr_token : Token,
    output : Vec<u8>,
    output_stderr : Vec<u8>,
    input : Vec<u8>,
    input_offset : usize,
    buf : [u8; 65536],
}


// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
    fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
        SubprocessClient {
            stdin: stdin,
            stdout: stdout,
            stderr: stderr,
            stdin_token : Token(0),
            stdout_token : Token(1),
            stderr_token : Token(2),
            output : Vec::<u8>::new(),
            output_stderr : Vec::<u8>::new(),
            buf : [0; 65536],
            input : data.to_vec(),
            input_offset : 0,
        }
    }

    fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
        let mut eof = false;
        match self.stdout {
            None => unreachable!(),
            Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
                Ok(None) => {
                }
                Ok(Some(r)) => {
                    if r == 0 {
                        eof = true;
                    } else {
                         self.output.extend(&self.buf[0..r]);
                    }
                }
                Err(e) => {
                    return Err(e);
                }
            }
        };
        if eof {
            drop(self.stdout.take());
            match self.stderr {
                None => event_loop.shutdown(),
                Some(_) => {},
            }
        }
        return Ok(());
    }

    fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
        let mut eof = false;
        match self.stderr {
            None => unreachable!(),
            Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
                Ok(None) => {
                }
                Ok(Some(r)) => {
                    if r == 0 {
                        eof = true;
                    } else {
                        self.output_stderr.extend(&self.buf[0..r]);
                    }
                }
                Err(e) => {
                    return Err(e);
                }
            }
        };
        if eof {
                        drop(self.stderr.take());
                        match self.stdout {
                            None => event_loop.shutdown(),
                            Some(_) => {},
                        }
        }
        return Ok(());
    }

    fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
        let mut ok = true;
        match self.stdin {
            None => unreachable!(),
            Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
                Ok(None) => {
                },
                Ok(Some(r)) => {
                    if r == 0 {
                        ok = false;
                    } else {
                        self.input_offset += r;
                    }
                },
                Err(_) => {
                    ok = false;
                },
            }
        }
        if self.input_offset == self.input.len() || !ok {
            drop(self.stdin.take());
            match self.stderr {
                None => match self.stdout {
                            None => event_loop.shutdown(),
                            Some(_) => {},
                },
                Some(_) => {},
            }
        }
        return Ok(());
    }

}

impl Handler for SubprocessClient {
    type Timeout = usize;
    type Message = ();

    fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
             _: Ready) {
        if token == self.stderr_token {
            let _x = self.readable_stderr(event_loop);
        } else {
            let _x = self.readable(event_loop);
        }
        if token == self.stdin_token {
            let _y = self.writable(event_loop);
        }
    }
}




const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
    let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
    let stdin : Option<PipeWriter>;
    let stdin_exists : bool;
    match process.stdin {
      None => stdin_exists = false,
      Some(_) => stdin_exists = true,
    }
    if stdin_exists {
        match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
            Err(e) => panic!(e),
            Ok(pipe) => stdin = Some(pipe),
        }
    } else {
        stdin = None;
    }
    let stdout_exists : bool;
    let stdout : Option<PipeReader>;
    match process.stdout {
      None => stdout_exists = false,
      Some(_) => stdout_exists = true,
    }
    if stdout_exists {
        match PipeReader::from_stdout(process.stdout.take().unwrap()) {
            Err(e) => panic!(e),
            Ok(pipe) => stdout = Some(pipe),
        }
    } else {
        stdout = None;
    }
    let stderr_exists : bool;
    let stderr : Option<PipeReader>;
    match process.stderr {
      None => stderr_exists = false,
      Some(_) => stderr_exists = true,
    }
    if stderr_exists {
        match PipeReader::from_stderr(process.stderr.take().unwrap()) {
              Err(e) => panic!(e),
              Ok(pipe) => stderr = Some(pipe),
        }
    } else {
        stderr = None
    }

    let mut subprocess = SubprocessClient::new(stdin,
                                               stdout,
                                               stderr,
                                               input);
    match subprocess.stdout {
       Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, Ready::readable(),
                                                   PollOpt::level()).unwrap(),
       None => {},
    }

    match subprocess.stderr {
        Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, Ready::readable(),
                        PollOpt::level()).unwrap(),
        None => {},
    }

    // Connect to the server
    match subprocess.stdin {
        Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, Ready::writable(),
                        PollOpt::level()).unwrap(),
         None => {},
    }

    // Start the event loop
    event_loop.run(&mut subprocess).unwrap();
    let _ = process.wait();

    let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
    let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
    return (ret_stdout, ret_stderr);
}

#[test]
fn test_subprocess_pipe() {
    let process =
           Command::new("/bin/cat")
           .stdin(Stdio::piped())
           .stdout(Stdio::piped())
           .stderr(Stdio::piped())
           .spawn().unwrap();
     let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
     assert_eq!(TEST_DATA.len(), ret_stdout.len());
     assert_eq!(0usize, ret_stderr.len());
     let mut i : usize = 0;
     for item in TEST_DATA.iter() {
         assert_eq!(*item, ret_stdout[i]);
         i += 1;
     }
}