bufstream_fresh/
lib.rs

1// Copyright 2013 The Rust Project Developers.
2// Copyright 2023 Saul Hazledine.
3// See the COPYRIGHT file at the top-level directory of this
4// distribution.
5//
6// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
7// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
9// option. This file may not be copied, modified, or distributed
10// except according to those terms.
11
12//! A crate for separately buffered streams.
13//!
14//! This crate provides a `BufStream` type which provides buffering of both the
15//! reading and writing halves of a `Read + Write` type. Each half is completely
16//! independently buffered of the other, which may not always be desired. For
17//! example `BufStream<File>` may have surprising semantics.
18//!
19//! ```no_run
20//! use std::io::prelude::*;
21//! use std::net::TcpStream;
22//! use bufstream_fresh::BufStream;
23//!
24//!
25//! let stream = TcpStream::connect("localhost:4000").unwrap();
26//! let mut buf = BufStream::new(stream);
27//! buf.read(&mut [0; 1024]).unwrap();
28//! buf.write(&[0; 1024]).unwrap();
29//! ```
30//!
31use std::error::Error;
32use std::fmt;
33use std::io::prelude::*;
34use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
35
36const DEFAULT_BUF_SIZE: usize = 8 * 1024;
37
38/// Wraps a Stream and buffers input and output to and from it.
39///
40/// It can be excessively inefficient to work directly with a `Read+Write`. For
41/// example, every call to `read` or `write` on `TcpStream` results in a system
42/// call. A `BufStream` keeps in memory buffers of data, making large,
43/// infrequent calls to `read` and `write` on the underlying `Read+Write`.
44///
45/// The output buffer will be written out when this stream is dropped.
46#[derive(Debug)]
47pub struct BufStream<S: Write> {
48    inner: BufReader<InternalBufWriter<S>>,
49}
50
51/// An error returned by `into_inner` which combines an error that
52/// happened while writing out the buffer, and the buffered writer object
53/// which may be used to recover from the condition.
54#[derive(Debug)]
55pub struct IntoInnerError<W>(W, io::Error);
56
57impl<W> IntoInnerError<W> {
58    /// Returns the error which caused the call to `into_inner()` to fail.
59    ///
60    /// This error was returned when attempting to write the internal buffer.
61    pub fn error(&self) -> &io::Error {
62        &self.1
63    }
64    /// Returns the buffered writer instance which generated the error.
65    ///
66    /// The returned object can be used for error recovery, such as
67    /// re-inspecting the buffer.
68    pub fn into_inner(self) -> W {
69        self.0
70    }
71}
72
73impl<W> Error for IntoInnerError<W> where W: fmt::Debug {}
74
75impl<W> From<IntoInnerError<W>> for io::Error {
76    fn from(iie: IntoInnerError<W>) -> io::Error {
77        iie.1
78    }
79}
80
81impl<W> fmt::Display for IntoInnerError<W> {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        self.error().fmt(f)
84    }
85}
86
87struct InternalBufWriter<W: Write>(Option<BufWriter<W>>);
88
89impl<W: Write> InternalBufWriter<W> {
90    fn get_ref(&self) -> &BufWriter<W> {
91        self.0.as_ref().unwrap()
92    }
93
94    fn get_mut(&mut self) -> &mut BufWriter<W> {
95        self.0.as_mut().unwrap()
96    }
97}
98
99impl<W: Read + Write> Read for InternalBufWriter<W> {
100    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
101        self.get_mut().get_mut().read(buf)
102    }
103}
104
105impl<W: Write + fmt::Debug> fmt::Debug for InternalBufWriter<W> {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.get_ref().fmt(f)
108    }
109}
110
111impl<S: Seek + Write> Seek for InternalBufWriter<S> {
112    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
113        self.get_mut().get_mut().seek(pos)
114    }
115}
116
117impl<S: Read + Write> BufStream<S> {
118    /// Creates a new buffered stream with explicitly listed capacities for the
119    /// reader/writer buffer.
120    pub fn with_capacities(reader_cap: usize, writer_cap: usize, inner: S) -> BufStream<S> {
121        let writer = BufWriter::with_capacity(writer_cap, inner);
122        let internal_writer = InternalBufWriter(Some(writer));
123        let reader = BufReader::with_capacity(reader_cap, internal_writer);
124        BufStream { inner: reader }
125    }
126
127    /// Creates a new buffered stream with the default reader/writer buffer
128    /// capacities.
129    pub fn new(inner: S) -> BufStream<S> {
130        BufStream::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
131    }
132
133    /// Gets a reference to the underlying stream.
134    pub fn get_ref(&self) -> &S {
135        self.inner.get_ref().get_ref().get_ref()
136    }
137
138    /// Gets a mutable reference to the underlying stream.
139    ///
140    /// # Warning
141    ///
142    /// It is inadvisable to read directly from or write directly to the
143    /// underlying stream.
144    pub fn get_mut(&mut self) -> &mut S {
145        self.inner.get_mut().get_mut().get_mut()
146    }
147
148    /// Unwraps this `BufStream`, returning the underlying stream.
149    ///
150    /// The internal write buffer is written out before returning the stream.
151    /// Any leftover data in the read buffer is lost.
152    pub fn into_inner(mut self) -> Result<S, IntoInnerError<BufStream<S>>> {
153        let e = {
154            let InternalBufWriter(ref mut w) = *self.inner.get_mut();
155            let (e, w2) = match w.take().unwrap().into_inner() {
156                Ok(s) => return Ok(s),
157                Err(err) => (
158                    io::Error::new(err.error().kind(), err.error().to_string()),
159                    err.into_inner(),
160                ),
161            };
162            *w = Some(w2);
163            e
164        };
165        Err(IntoInnerError(self, e))
166    }
167}
168
169impl<S: Read + Write> BufRead for BufStream<S> {
170    fn fill_buf(&mut self) -> io::Result<&[u8]> {
171        self.inner.fill_buf()
172    }
173    fn consume(&mut self, amt: usize) {
174        self.inner.consume(amt)
175    }
176    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
177        self.inner.read_until(byte, buf)
178    }
179    fn read_line(&mut self, string: &mut String) -> io::Result<usize> {
180        self.inner.read_line(string)
181    }
182}
183
184impl<S: Read + Write> Read for BufStream<S> {
185    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
186        self.inner.read(buf)
187    }
188}
189
190impl<S: Read + Write> Write for BufStream<S> {
191    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
192        self.inner.get_mut().0.as_mut().unwrap().write(buf)
193    }
194    fn flush(&mut self) -> io::Result<()> {
195        self.inner.get_mut().0.as_mut().unwrap().flush()
196    }
197}
198
199impl<S: Seek + Write> Seek for BufStream<S> {
200    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
201        self.inner.seek(pos)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::io;
208    use std::io::prelude::*;
209
210    use super::BufStream;
211    // This is just here to make sure that we don't infinite loop in the
212    // newtype struct autoderef weirdness
213    #[test]
214    fn test_buffered_stream() {
215        struct S;
216
217        impl Write for S {
218            fn write(&mut self, b: &[u8]) -> io::Result<usize> {
219                Ok(b.len())
220            }
221            fn flush(&mut self) -> io::Result<()> {
222                Ok(())
223            }
224        }
225
226        impl Read for S {
227            fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
228                Ok(0)
229            }
230        }
231
232        let mut stream = BufStream::new(S);
233        assert_eq!(stream.read(&mut [0; 10]).unwrap(), 0);
234        stream.write(&[0; 10]).unwrap();
235        stream.flush().unwrap();
236    }
237}