1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
use std::io::{self, Read};
use std::sync::mpsc;
use std::thread;
use std::thread::JoinHandle;

/// Construct an asynchronous reader over a blocking reader (by spawning a read thread).
///
/// This allows you to read from standard input _without blocking_ the current thread.
/// Specifically, it works by firing up another thread to handle the event stream, which will then
/// be buffered in a mpsc queue, which will eventually be read by the current thread.
///
/// This will not read the piped standard input, but rather read from the TTY device, since reading
/// asyncronized from piped input would rarely make sense. In other words, if you pipe standard
/// output from another process, it won't be reflected in the stream returned by this function, as
/// this represents the TTY device, and not the piped standard input.
pub fn read_async(readable: impl Read + Send + 'static, buf: usize) -> AsyncReader {
    let (send, recv) = mpsc::sync_channel(buf);

    let handle = thread::spawn(move || {
        for i in readable.bytes() {
            if send.send(i).is_err() {
                return;
            }
        }
    });

    AsyncReader { recv, handle }
}

/// An asynchronous reader.
///
/// This acts as any other stream, with the exception that reading from it won't block. Instead,
/// the buffer will only be partially updated based on how much the internal buffer holds.
pub struct AsyncReader {
    recv: mpsc::Receiver<io::Result<u8>>,
    handle: JoinHandle<()>,
}

#[cfg(feature = "tokio-io")]
use tokio_io::AsyncRead;
#[cfg(feature = "tokio-io")]
impl AsyncRead for AsyncReader {}
impl Read for AsyncReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let mut total = 0;

        loop {
            if total >= buf.len() {
                break;
            }

            match self.recv.try_recv() {
                Ok(Ok(b)) => {
                    buf[total] = b;
                    total += 1;
                }
                Ok(Err(e)) => return Err(e),
                Err(mpsc::TryRecvError::Empty) => break,
                Err(mpsc::TryRecvError::Disconnected) => {
                    warn!("Receiver disconnected");
                    return Err(io::Error::from(io::ErrorKind::BrokenPipe));
                }
            }
        }

        Ok(total)
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use std::io::Read;
    use std::time::Duration;

    struct R(Vec<u8>);

    impl Read for R {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            let mut total = 0;
            let mut drain = self.0.drain(0..buf.len());
            while let Some(b) = drain.next() {
                buf[total] = b;
                total += 1;
            }
            Ok(total)
        }
    }

    #[test]
    fn test_spawned_reader() {
        let r = R(vec![1u8, 2u8, 3u8]);
        let mut bytes = read_async(r, 5).bytes();
        // wait for the spawned thread to kick in, might not be 100%
        thread::sleep(Duration::from_millis(10));
        assert_eq!(bytes.next().unwrap().unwrap(), 1u8);
        assert_eq!(bytes.next().unwrap().unwrap(), 2u8);
        assert_eq!(bytes.next().unwrap().unwrap(), 3u8);
    }
}