use futures::{Async, Stream};
use std::error;
use std::io::{self, Read};
use std::sync::mpsc as std_mpsc;
use std::thread;
use sync::mpsc as fibers_mpsc;
macro_rules! break_if_err {
($e:expr) => {
match $e {
Err(_) => break,
Ok(v) => v,
}
};
}
pub fn stdin() -> Stdin {
let (req_tx, req_rx) = std_mpsc::channel();
let (res_tx, res_rx) = fibers_mpsc::channel();
thread::spawn(move || {
while let Ok(x) = req_rx.recv() {
assert_eq!(x, 0);
let stdin = io::stdin();
let mut locked_stdin = stdin.lock();
let _ = break_if_err!(locked_stdin.read(&mut []));
break_if_err!(res_tx.send(Ok(Vec::new())));
let required_size = break_if_err!(req_rx.recv());
let mut buf = vec![0; required_size];
let result = locked_stdin.read(&mut buf).map(|read_size| {
buf.truncate(read_size);
buf
});
break_if_err!(res_tx.send(result));
}
});
Stdin {
lock_requested: false,
req_tx,
res_rx,
}
}
#[derive(Debug)]
pub struct Stdin {
lock_requested: bool,
req_tx: std_mpsc::Sender<usize>,
res_rx: fibers_mpsc::Receiver<io::Result<Vec<u8>>>,
}
impl Read for Stdin {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if !self.lock_requested {
self.lock_requested = true;
self.req_tx.send(0).map_err(into_io_error)?;
return Err(would_block());
}
match self.res_rx.poll().expect("unreachable") {
Async::NotReady => Err(would_block()),
Async::Ready(None) => Err(unexpected_eof()),
Async::Ready(Some(Err(e))) => Err(e),
Async::Ready(Some(Ok(_))) => {
self.lock_requested = false;
self.req_tx.send(buf.len()).map_err(into_io_error)?;
loop {
if let Async::Ready(result) = self.res_rx.poll().expect("unreachable") {
let result = if let Some(result) = result {
result
} else {
return Err(unexpected_eof());
};
return match result {
Err(e) => Err(e),
Ok(data) => {
let read_size = data.len();
buf[..read_size].copy_from_slice(&data[..]);
Ok(read_size)
}
};
}
}
}
}
}
}
fn would_block() -> io::Error {
io::Error::new(io::ErrorKind::WouldBlock, "I/O operation would block")
}
fn unexpected_eof() -> io::Error {
io::Error::new(
io::ErrorKind::UnexpectedEof,
"I/O thread unexpectedly terminated",
)
}
fn into_io_error<E: error::Error + Send + Sync + 'static>(error: E) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(error))
}