bufstream/lib.rs
1// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11//! A crate for separately buffered streams.
12//!
13//! This crate provides a `BufStream` type which provides buffering of both the
14//! reading and writing halves of a `Read + Write` type. Each half is completely
15//! independently buffered of the other, which may not always be desired. For
16//! example `BufStream<File>` may have surprising semantics.
17//!
18//! # Usage
19//!
20//! ```toml
21//! [dependencies]
22//! bufstream = "0.1"
23//! ```
24//!
25//! ```no_run
26//! use std::io::prelude::*;
27//! use std::net::TcpStream;
28//! use bufstream::BufStream;
29//!
30//!
31//! let stream = TcpStream::connect("localhost:4000").unwrap();
32//! let mut buf = BufStream::new(stream);
33//! buf.read(&mut [0; 1024]).unwrap();
34//! buf.write(&[0; 1024]).unwrap();
35//! ```
36//!
37//! # Async I/O
38//!
39//! This crate optionally can support async I/O streams with the [Tokio stack] via
40//! the `tokio` feature of this crate:
41//!
42//! [Tokio stack]: https://tokio.rs/
43//!
44//! ```toml
45//! bufstream = { version = "0.2", features = ["tokio"] }
46//! ```
47//!
48//! All methods are internally capable of working with streams that may return
49//! [`ErrorKind::WouldBlock`] when they're not ready to perform the particular
50//! operation.
51//!
52//! [`ErrorKind::WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html
53//!
54//! Note that care needs to be taken when using these objects, however. The
55//! Tokio runtime, in particular, requires that data is fully flushed before
56//! dropping streams. For compatibility with blocking streams all streams are
57//! flushed/written when they are dropped, and this is not always a suitable
58//! time to perform I/O. If I/O streams are flushed before drop, however, then
59//! these operations will be a noop.
60
61#[cfg(feature = "tokio")] extern crate futures;
62#[cfg(feature = "tokio")] #[macro_use] extern crate tokio_io;
63
64use std::fmt;
65use std::io::prelude::*;
66use std::io::{self, BufReader, BufWriter};
67use std::error;
68
69#[cfg(feature = "tokio")] use futures::Poll;
70#[cfg(feature = "tokio")] use tokio_io::{AsyncRead, AsyncWrite};
71
72const DEFAULT_BUF_SIZE: usize = 8 * 1024;
73
74/// Wraps a Stream and buffers input and output to and from it.
75///
76/// It can be excessively inefficient to work directly with a `Read+Write`. For
77/// example, every call to `read` or `write` on `TcpStream` results in a system
78/// call. A `BufStream` keeps in memory buffers of data, making large,
79/// infrequent calls to `read` and `write` on the underlying `Read+Write`.
80///
81/// The output buffer will be written out when this stream is dropped.
82#[derive(Debug)]
83pub struct BufStream<S: Write> {
84 inner: BufReader<InternalBufWriter<S>>
85}
86
87/// An error returned by `into_inner` which combines an error that
88/// happened while writing out the buffer, and the buffered writer object
89/// which may be used to recover from the condition.
90#[derive(Debug)]
91pub struct IntoInnerError<W>(W, io::Error);
92
93impl<W> IntoInnerError<W> {
94 /// Returns the error which caused the call to `into_inner()` to fail.
95 ///
96 /// This error was returned when attempting to write the internal buffer.
97 pub fn error(&self) -> &io::Error { &self.1 }
98 /// Returns the buffered writer instance which generated the error.
99 ///
100 /// The returned object can be used for error recovery, such as
101 /// re-inspecting the buffer.
102 pub fn into_inner(self) -> W { self.0 }
103}
104
105impl<W> From<IntoInnerError<W>> for io::Error {
106 fn from(iie: IntoInnerError<W>) -> io::Error { iie.1 }
107}
108
109impl<W: fmt::Debug> error::Error for IntoInnerError<W> {
110 fn description(&self) -> &str {
111 error::Error::description(self.error())
112 }
113}
114
115impl<W> fmt::Display for IntoInnerError<W> {
116 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117 self.error().fmt(f)
118 }
119}
120
121struct InternalBufWriter<W: Write>(Option<BufWriter<W>>);
122
123impl<W: Write> InternalBufWriter<W> {
124 fn get_ref(&self) -> &BufWriter<W> {
125 self.0.as_ref().unwrap()
126 }
127
128 fn get_mut(&mut self) -> &mut BufWriter<W> {
129 self.0.as_mut().unwrap()
130 }
131}
132
133impl<W: Read + Write> Read for InternalBufWriter<W> {
134 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
135 self.get_mut().get_mut().read(buf)
136 }
137}
138
139impl<W: Write + fmt::Debug> fmt::Debug for InternalBufWriter<W> {
140 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141 self.get_ref().fmt(f)
142 }
143}
144
145impl<S: Read + Write> BufStream<S> {
146 /// Creates a new buffered stream with explicitly listed capacities for the
147 /// reader/writer buffer.
148 pub fn with_capacities(reader_cap: usize, writer_cap: usize, inner: S)
149 -> BufStream<S> {
150 let writer = BufWriter::with_capacity(writer_cap, inner);
151 let internal_writer = InternalBufWriter(Some(writer));
152 let reader = BufReader::with_capacity(reader_cap, internal_writer);
153 BufStream { inner: reader }
154 }
155
156 /// Creates a new buffered stream with the default reader/writer buffer
157 /// capacities.
158 pub fn new(inner: S) -> BufStream<S> {
159 BufStream::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
160 }
161
162 /// Gets a reference to the underlying stream.
163 pub fn get_ref(&self) -> &S {
164 self.inner.get_ref().get_ref().get_ref()
165 }
166
167 /// Gets a mutable reference to the underlying stream.
168 ///
169 /// # Warning
170 ///
171 /// It is inadvisable to read directly from or write directly to the
172 /// underlying stream.
173 pub fn get_mut(&mut self) -> &mut S {
174 self.inner.get_mut().get_mut().get_mut()
175 }
176
177 /// Unwraps this `BufStream`, returning the underlying stream.
178 ///
179 /// The internal write buffer is written out before returning the stream.
180 /// Any leftover data in the read buffer is lost.
181 pub fn into_inner(mut self) -> Result<S, IntoInnerError<BufStream<S>>> {
182 let e = {
183 let InternalBufWriter(ref mut w) = *self.inner.get_mut();
184 let (e, w2) = match w.take().unwrap().into_inner() {
185 Ok(s) => return Ok(s),
186 Err(err) => {
187 (io::Error::new(err.error().kind(), err.error().to_string()),
188 err.into_inner())
189 }
190 };
191 *w = Some(w2);
192 e
193 };
194 Err(IntoInnerError(self, e))
195 }
196}
197
198impl<S: Read + Write> BufRead for BufStream<S> {
199 fn fill_buf(&mut self) -> io::Result<&[u8]> { self.inner.fill_buf() }
200 fn consume(&mut self, amt: usize) { self.inner.consume(amt) }
201 fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
202 self.inner.read_until(byte, buf)
203 }
204 fn read_line(&mut self, string: &mut String) -> io::Result<usize> {
205 self.inner.read_line(string)
206 }
207}
208
209impl<S: Read + Write> Read for BufStream<S> {
210 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
211 self.inner.read(buf)
212 }
213}
214
215impl<S: Read + Write> Write for BufStream<S> {
216 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
217 self.inner.get_mut().0.as_mut().unwrap().write(buf)
218 }
219 fn flush(&mut self) -> io::Result<()> {
220 self.inner.get_mut().0.as_mut().unwrap().flush()
221 }
222}
223
224#[cfg(feature = "tokio")]
225impl<S: AsyncRead + AsyncWrite> AsyncRead for BufStream<S> {}
226
227#[cfg(feature = "tokio")]
228impl<S: AsyncRead + AsyncWrite> AsyncWrite for BufStream<S> {
229 fn shutdown(&mut self) -> Poll<(), io::Error> {
230 try_nb!(self.flush());
231 let mut inner = self.inner.get_mut().0.as_mut().unwrap();
232 inner.shutdown()
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use std::io::prelude::*;
239 use std::io;
240
241 use super::BufStream;
242 // This is just here to make sure that we don't infinite loop in the
243 // newtype struct autoderef weirdness
244 #[test]
245 fn test_buffered_stream() {
246 struct S;
247
248 impl Write for S {
249 fn write(&mut self, b: &[u8]) -> io::Result<usize> { Ok(b.len()) }
250 fn flush(&mut self) -> io::Result<()> { Ok(()) }
251 }
252
253 impl Read for S {
254 fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { Ok(0) }
255 }
256
257 let mut stream = BufStream::new(S);
258 assert_eq!(stream.read(&mut [0; 10]).unwrap(), 0);
259 stream.write(&[0; 10]).unwrap();
260 stream.flush().unwrap();
261 }
262}