#![deny(missing_docs)]
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use futures::future::{Either, Loop};
use futures::sink::Send;
use futures::sync::{mpsc, oneshot};
use std::io::{self, Error, ErrorKind, Read, Write};
use std::thread;
use tokio_core::reactor::Core;
type StdioPacket = Result<Vec<u8>, Error>;
struct StdinState<'a> {
first: bool,
buf: [u8; 512],
stdin_lock: io::StdinLock<'a>,
tx_stdin_data: Either<mpsc::Sender<StdioPacket>, Send<mpsc::Sender<StdioPacket>>>,
rx_stdin_stop: oneshot::Receiver<()>,
}
pub fn borrow_stdio<F, T>(f: F) -> Result<T, Error>
where F: FnOnce(StdinStream, StdoutSink) -> Result<T, Error>
{
let (tx_stdin_data, rx_stdin_data) = mpsc::channel(1);
let (tx_stdin_stop, rx_stdin_stop) = oneshot::channel::<()>();
thread::spawn(move || {
let mut core = Core::new().expect("couldn't create core");
let stdin = io::stdin();
let stdin_lock = stdin.lock();
let state = StdinState {
first: true,
buf: [0u8; 512],
stdin_lock: stdin_lock,
tx_stdin_data: Either::A(tx_stdin_data),
rx_stdin_stop: rx_stdin_stop,
};
let read_blocking = futures::future::loop_fn(state, |mut state| {
if state.first {
state.first = false;
return Ok(Loop::Continue(state));
}
if let Ok(Async::NotReady) = state.rx_stdin_stop.poll() {
} else {
return Ok(Loop::Break(()));
}
let tx_stdin_data = match state.tx_stdin_data {
Either::A(tx) => tx,
Either::B(mut send) => match send.poll() {
Ok(Async::NotReady) => {
state.tx_stdin_data = Either::B(send);
return Ok(Loop::Continue(state));
}
Ok(Async::Ready(tx)) => tx,
Err(e) => return Err(e),
},
};
let msg = match state.stdin_lock.read(&mut state.buf[..]) {
Ok(n) => Ok((&state.buf[..n]).to_owned()),
Err(e) => Err(e),
};
state.tx_stdin_data = Either::B(tx_stdin_data.send(msg));
Ok(Loop::Continue(state))
});
if let Err(e) = core.run(read_blocking) {
eprintln!("error reading stdin in background thread: {}", e);
}
});
let wrap_stdin = StdinStream(rx_stdin_data);
let (tx_stdout_cmd, rx_stdout_cmd) = mpsc::channel::<Option<Vec<u8>>>(8);
thread::spawn(move || {
let stdout = io::stdout();
let mut stdout_lock = stdout.lock();
let rx = rx_stdout_cmd.wait();
for item in rx {
match item {
Ok(None) => break,
Ok(Some(data)) => {
let _r = stdout_lock.write(&data);
let _r = stdout_lock.flush();
},
Err(_) => {},
};
}
});
let wrap_stdout = StdoutSink(tx_stdout_cmd.clone());
let r = f(wrap_stdin, wrap_stdout);
let _r = tx_stdin_stop.send(()); let _r = tx_stdout_cmd.send(None);
r
}
#[derive(Debug)]
pub struct StdinStream(mpsc::Receiver<StdioPacket>);
impl Stream for StdinStream {
type Item = Vec<u8>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
match self.0.poll() {
Err(()) => Err(ErrorKind::Other.into()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::Ready(Some(io_res))) => {
match io_res {
Ok(data) => {
if data.len() == 0 {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(data)))
}
},
Err(e) => {
if e.kind() == ErrorKind::UnexpectedEof {
Ok(Async::Ready(None))
} else {
Err(e)
}
}
}
}
}
}
}
#[derive(Debug)]
pub struct StdoutSink(mpsc::Sender<Option<Vec<u8>>>);
impl Sink for StdoutSink {
type SinkItem = Vec<u8>;
type SinkError = Error;
fn start_send(&mut self, item: Vec<u8>) -> StartSend<Vec<u8>, Error> {
match self.0.start_send(Some(item)) {
Err(_) => Err(io::ErrorKind::Other.into()),
Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
Ok(AsyncSink::NotReady(o)) => Ok(AsyncSink::NotReady(o.unwrap())),
}
}
fn poll_complete(&mut self) -> Poll<(), Error> {
match self.0.poll_complete() {
Err(_) => Err(io::ErrorKind::Other.into()),
Ok(x) => Ok(x),
}
}
}