bufstream/
lib.rs

1// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11//! A crate for separately buffered streams.
12//!
13//! This crate provides a `BufStream` type which provides buffering of both the
14//! reading and writing halves of a `Read + Write` type. Each half is completely
15//! independently buffered of the other, which may not always be desired. For
16//! example `BufStream<File>` may have surprising semantics.
17//!
18//! # Usage
19//!
20//! ```toml
21//! [dependencies]
22//! bufstream = "0.1"
23//! ```
24//!
25//! ```no_run
26//! use std::io::prelude::*;
27//! use std::net::TcpStream;
28//! use bufstream::BufStream;
29//!
30//!
31//! let stream = TcpStream::connect("localhost:4000").unwrap();
32//! let mut buf = BufStream::new(stream);
33//! buf.read(&mut [0; 1024]).unwrap();
34//! buf.write(&[0; 1024]).unwrap();
35//! ```
36//!
37//! # Async I/O
38//!
39//! This crate optionally can support async I/O streams with the [Tokio stack] via
40//! the `tokio` feature of this crate:
41//!
42//! [Tokio stack]: https://tokio.rs/
43//!
44//! ```toml
45//! bufstream = { version = "0.2", features = ["tokio"] }
46//! ```
47//!
48//! All methods are internally capable of working with streams that may return
49//! [`ErrorKind::WouldBlock`] when they're not ready to perform the particular
50//! operation.
51//!
52//! [`ErrorKind::WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html
53//!
54//! Note that care needs to be taken when using these objects, however. The
55//! Tokio runtime, in particular, requires that data is fully flushed before
56//! dropping streams. For compatibility with blocking streams all streams are
57//! flushed/written when they are dropped, and this is not always a suitable
58//! time to perform I/O. If I/O streams are flushed before drop, however, then
59//! these operations will be a noop.
60
61#[cfg(feature = "tokio")] extern crate futures;
62#[cfg(feature = "tokio")] #[macro_use] extern crate tokio_io;
63
64use std::fmt;
65use std::io::prelude::*;
66use std::io::{self, BufReader, BufWriter};
67use std::error;
68
69#[cfg(feature = "tokio")] use futures::Poll;
70#[cfg(feature = "tokio")] use tokio_io::{AsyncRead, AsyncWrite};
71
72const DEFAULT_BUF_SIZE: usize = 8 * 1024;
73
74/// Wraps a Stream and buffers input and output to and from it.
75///
76/// It can be excessively inefficient to work directly with a `Read+Write`. For
77/// example, every call to `read` or `write` on `TcpStream` results in a system
78/// call. A `BufStream` keeps in memory buffers of data, making large,
79/// infrequent calls to `read` and `write` on the underlying `Read+Write`.
80///
81/// The output buffer will be written out when this stream is dropped.
82#[derive(Debug)]
83pub struct BufStream<S: Write> {
84    inner: BufReader<InternalBufWriter<S>>
85}
86
87/// An error returned by `into_inner` which combines an error that
88/// happened while writing out the buffer, and the buffered writer object
89/// which may be used to recover from the condition.
90#[derive(Debug)]
91pub struct IntoInnerError<W>(W, io::Error);
92
93impl<W> IntoInnerError<W> {
94    /// Returns the error which caused the call to `into_inner()` to fail.
95    ///
96    /// This error was returned when attempting to write the internal buffer.
97    pub fn error(&self) -> &io::Error { &self.1 }
98    /// Returns the buffered writer instance which generated the error.
99    ///
100    /// The returned object can be used for error recovery, such as
101    /// re-inspecting the buffer.
102    pub fn into_inner(self) -> W { self.0 }
103}
104
105impl<W> From<IntoInnerError<W>> for io::Error {
106    fn from(iie: IntoInnerError<W>) -> io::Error { iie.1 }
107}
108
109impl<W: fmt::Debug> error::Error for IntoInnerError<W> {
110    fn description(&self) -> &str {
111        error::Error::description(self.error())
112    }
113}
114
115impl<W> fmt::Display for IntoInnerError<W> {
116    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117        self.error().fmt(f)
118    }
119}
120
121struct InternalBufWriter<W: Write>(Option<BufWriter<W>>);
122
123impl<W: Write> InternalBufWriter<W> {
124    fn get_ref(&self) -> &BufWriter<W> {
125        self.0.as_ref().unwrap()
126    }
127
128    fn get_mut(&mut self) -> &mut BufWriter<W> {
129        self.0.as_mut().unwrap()
130    }
131}
132
133impl<W: Read + Write> Read for InternalBufWriter<W> {
134    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
135        self.get_mut().get_mut().read(buf)
136    }
137}
138
139impl<W: Write + fmt::Debug> fmt::Debug for InternalBufWriter<W> {
140    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141        self.get_ref().fmt(f)
142    }
143}
144
145impl<S: Read + Write> BufStream<S> {
146    /// Creates a new buffered stream with explicitly listed capacities for the
147    /// reader/writer buffer.
148    pub fn with_capacities(reader_cap: usize, writer_cap: usize, inner: S)
149                           -> BufStream<S> {
150        let writer = BufWriter::with_capacity(writer_cap, inner);
151        let internal_writer = InternalBufWriter(Some(writer));
152        let reader = BufReader::with_capacity(reader_cap, internal_writer);
153        BufStream { inner: reader }
154    }
155
156    /// Creates a new buffered stream with the default reader/writer buffer
157    /// capacities.
158    pub fn new(inner: S) -> BufStream<S> {
159        BufStream::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
160    }
161
162    /// Gets a reference to the underlying stream.
163    pub fn get_ref(&self) -> &S {
164        self.inner.get_ref().get_ref().get_ref()
165    }
166
167    /// Gets a mutable reference to the underlying stream.
168    ///
169    /// # Warning
170    ///
171    /// It is inadvisable to read directly from or write directly to the
172    /// underlying stream.
173    pub fn get_mut(&mut self) -> &mut S {
174        self.inner.get_mut().get_mut().get_mut()
175    }
176
177    /// Unwraps this `BufStream`, returning the underlying stream.
178    ///
179    /// The internal write buffer is written out before returning the stream.
180    /// Any leftover data in the read buffer is lost.
181    pub fn into_inner(mut self) -> Result<S, IntoInnerError<BufStream<S>>> {
182        let e = {
183            let InternalBufWriter(ref mut w) = *self.inner.get_mut();
184            let (e, w2) = match w.take().unwrap().into_inner() {
185                Ok(s) => return Ok(s),
186                Err(err) => {
187                    (io::Error::new(err.error().kind(), err.error().to_string()),
188                     err.into_inner())
189                }
190            };
191            *w = Some(w2);
192            e
193        };
194        Err(IntoInnerError(self, e))
195    }
196}
197
198impl<S: Read + Write> BufRead for BufStream<S> {
199    fn fill_buf(&mut self) -> io::Result<&[u8]> { self.inner.fill_buf() }
200    fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
201    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
202        self.inner.read_until(byte, buf)
203    }
204    fn read_line(&mut self, string: &mut String) -> io::Result<usize> {
205        self.inner.read_line(string)
206    }
207}
208
209impl<S: Read + Write> Read for BufStream<S> {
210    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
211        self.inner.read(buf)
212    }
213}
214
215impl<S: Read + Write> Write for BufStream<S> {
216    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
217        self.inner.get_mut().0.as_mut().unwrap().write(buf)
218    }
219    fn flush(&mut self) -> io::Result<()> {
220        self.inner.get_mut().0.as_mut().unwrap().flush()
221    }
222}
223
224#[cfg(feature = "tokio")]
225impl<S: AsyncRead + AsyncWrite> AsyncRead for BufStream<S> {}
226
227#[cfg(feature = "tokio")]
228impl<S: AsyncRead + AsyncWrite> AsyncWrite for BufStream<S> {
229    fn shutdown(&mut self) -> Poll<(), io::Error> {
230        try_nb!(self.flush());
231        let mut inner = self.inner.get_mut().0.as_mut().unwrap();
232        inner.shutdown()
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use std::io::prelude::*;
239    use std::io;
240
241    use super::BufStream;
242    // This is just here to make sure that we don't infinite loop in the
243    // newtype struct autoderef weirdness
244    #[test]
245    fn test_buffered_stream() {
246        struct S;
247
248        impl Write for S {
249            fn write(&mut self, b: &[u8]) -> io::Result<usize> { Ok(b.len()) }
250            fn flush(&mut self) -> io::Result<()> { Ok(()) }
251        }
252
253        impl Read for S {
254            fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { Ok(0) }
255        }
256
257        let mut stream = BufStream::new(S);
258        assert_eq!(stream.read(&mut [0; 10]).unwrap(), 0);
259        stream.write(&[0; 10]).unwrap();
260        stream.flush().unwrap();
261    }
262}