1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//! Read available data from file descriptors without blocking
//!
//! Useful for nonblocking reads from sockets, named pipes, and child stdout/stderr
//!
//! # Example
//!
//! ```no_run
//! use std::io::Read;
//! use std::process::{Command, Stdio};
//! use std::time::Duration;
//! use nonblock::NonBlockingReader;
//!
//! let mut child = Command::new("some-exectuable")
//!                         .stdout(Stdio::piped())
//!                         .spawn().unwrap();
//! let stdout = child.stdout.take().unwrap();
//! let mut noblock_stdout = NonBlockingReader::from_fd(stdout).unwrap();
//! while !noblock_stdout.is_eof() {
//!     let mut buf = String::new();
//!     noblock_stdout.read_available_to_string(&mut buf).unwrap();
//!     std::thread::sleep(Duration::from_secs(5));
//! }
extern crate libc;
use std::os::unix::io::{RawFd, AsRawFd};
use std::io::{self, Read, ErrorKind};
use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};

/// Simple non-blocking wrapper for reader types that implement AsRawFd
pub struct NonBlockingReader<R: AsRawFd + Read> {
    eof: bool,
    reader: R,
}

impl<R: AsRawFd + Read> NonBlockingReader<R> {
    /// Initialize a NonBlockingReader from the reader's file descriptor.
    ///
    /// The reader will be managed internally,
    ///   and O_NONBLOCK will be set the file descriptor.
    pub fn from_fd(reader: R) -> io::Result<NonBlockingReader<R>> {
        let fd = reader.as_raw_fd();
        try!(set_blocking(fd, false));
        Ok(NonBlockingReader {
            reader: reader,
            eof: false,
        })
    }

    /// Consume this NonBlockingReader and return the blocking version
    ///   of the interanally managed reader.
    ///
    /// This will disable O_NONBLOCK on the file descriptor,
    ///   and any data read from the NonBlockingReader before calling `into_blocking`
    ///   will already have been consumed from the reader.
    pub fn into_blocking(self) -> io::Result<R> {
        let fd = self.reader.as_raw_fd();
        try!(set_blocking(fd, true));
        Ok(self.reader)
    }

    /// Indicates if EOF has been reached for the reader.
    ///
    /// Currently this defaults to false until one of the `read_available` methods is called,
    ///   but this may change in the future if I stumble on a compelling way
    ///   to check for EOF without consuming any of the internal reader.
    pub fn is_eof(&self) -> bool {
        self.eof
    }

    /// Reads any available data from the reader without blocking, placing them into `buf`.
    ///
    /// If successful, this function will return the total number of bytes read.
    ///  0 bytes read may indicate the EOF has been reached or that reading
    ///  would block because there is not any data immediately available.
    ///  Call `is_eof()` after this method to determine if EOF was reached.
    ///
    /// ## Errors
    ///
    /// If this function encounters an error of the kind ErrorKind::Interrupted
    ///   then the error is ignored and the operation will continue.
    ///   If it encounters ErrorKind::WouldBlock, then this function immediately returns
    ///   the total number of bytes read so far.
    ///
    /// If any other read error is encountered then this function immediately returns.
    ///   Any bytes which have already been read will be appended to buf.
    pub fn read_available(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
        let mut buf_len = 0;
        loop {
            let mut bytes = [0u8; 1024];
            match self.reader.read(&mut bytes[..]) {
                // EOF
                Ok(0) => {
                    self.eof = true;
                    break;
                }
                // Not EOF, but no more data currently available
                Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
                    self.eof = false;
                    break;
                }
                // Ignore interruptions, continue reading
                Err(ref err) if err.kind() == ErrorKind::Interrupted => {}
                // bytes available
                Ok(len) => {
                    buf_len += len;
                    buf.append(&mut bytes[0..(len)].to_owned())
                }
                // IO Error encountered
                Err(err) => {
                    return Err(err);
                }
            }
        }
        Ok(buf_len)
    }

    /// Reads any available data from the reader without blocking, placing them into `buf`.
    ///
    /// If successful, this function returns the number of bytes which were read and appended to buf.
    ///
    /// This function calls `read_available()` and attempts to convert the data read to a UTF-8 string.
    ///   If the any data read is parsed successfully as a UTF-8 string  it is appended to `buf`.
    ///   If it cannot be parsed as UTF-8, then `buf` will remain unmodified, returning `ErrorKind::InvalidData`
    ///   with the `FromUtf8Error` containing any data that was read.
    ///
    /// In theory, since this function only reads immediately available data,
    ///   There may not be any guarantee that the data immediately available ends
    ///   on a UTF-8 alignment, so it might be worth a bufferred wrapper
    ///   that manages the captures a final non-UTF-8 character and prepends it to the next call,
    ///   but in practice, this has worked as expected.
    pub fn read_available_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
        let mut byte_buf: Vec<u8> = Vec::with_capacity(1024);
        let res = self.read_available(&mut byte_buf);
        match String::from_utf8(byte_buf) {
            Ok(utf8_buf) => {
                // append any read data before returning the `read_available` result
                buf.push_str(&utf8_buf);
                res
            }
            Err(err) => {
                // check for read error before returning the UTF8 Error
                let _ = try!(res);
                Err(io::Error::new(ErrorKind::InvalidData, err))
            }
        }
    }
}


fn set_blocking(fd: RawFd, blocking: bool) -> io::Result<()> {
    let flags = unsafe { fcntl(fd, F_GETFL, 0) };
    if flags < 0 {
        return Err(io::Error::last_os_error());
    }

    let flags = if blocking {
        flags & !O_NONBLOCK
    } else {
        flags | O_NONBLOCK
    };
    let res = unsafe { fcntl(fd, F_SETFL, flags) };
    if res != 0 {
        return Err(io::Error::last_os_error());
    }

    Ok(())
}


#[cfg(test)]
mod tests {
    use super::NonBlockingReader;
    use std::sync::mpsc::channel;
    use std::net::{TcpListener, TcpStream};
    use std::thread;
    use std::io::Write;

    #[test]
    fn it_works() {
        let server = TcpListener::bind("127.0.0.1:34254").unwrap();
        let (tx, rx) = channel();

        thread::spawn(move || {
            let (stream, _) = server.accept().unwrap();
            tx.send(stream).unwrap();
        });

        let client = TcpStream::connect("127.0.0.1:34254").unwrap();;
        let mut stream = rx.recv().unwrap();

        let mut nonblocking = NonBlockingReader::from_fd(client).unwrap();
        let mut buf = Vec::new();

        assert_eq!(nonblocking.read_available(&mut buf).unwrap(), 0);
        assert_eq!(buf, b"");

        stream.write(b"foo").unwrap();
        assert_eq!(nonblocking.read_available(&mut buf).unwrap(), 3);
        assert_eq!(buf, b"foo");
    }
}