vmap/io/
buffer.rs

1use super::{Ring, SeqRead, SeqWrite};
2use crate::Result;
3
4use std::{
5    fmt,
6    io::{self, BufRead, ErrorKind, Read, Write},
7    ops::{Deref, DerefMut},
8};
9
10/// The `BufReader` adds buffering to any reader using a specialized buffer.
11///
12/// This is very similar `std::io::BufReader`, but it uses a [`Ring`] for the
13/// internal buffer, and it provides a configurable low water mark.
14///
15/// # Examples
16///
17/// ```
18/// use vmap::io::BufReader;
19/// # use std::io::prelude::*;
20/// # use std::net::{TcpListener, TcpStream};
21///
22/// # fn main() -> std::io::Result<()> {
23/// # let srv = TcpListener::bind("127.0.0.1:0")?;
24/// let sock = TcpStream::connect(srv.local_addr().unwrap())?;
25/// # let (mut cli, _addr) = srv.accept()?;
26/// let mut buf = BufReader::new(sock, 4000).expect("failed to create buffer");
27/// # cli.write_all(b"hello\nworld\n")?;
28/// let mut line = String::new();
29/// let len = buf.read_line(&mut line)?;
30/// assert_eq!(line, "hello\n");
31/// # Ok(())
32/// # }
33/// ```
34pub struct BufReader<R> {
35    buf: Ring,
36    inner: R,
37    lowat: usize,
38}
39
40impl<R: Read> BufReader<R> {
41    /// Creates a new `BufReader`.
42    pub fn new(inner: R, capacity: usize) -> Result<Self> {
43        Ok(Self {
44            buf: Ring::new(capacity)?,
45            inner,
46            lowat: 0,
47        })
48    }
49
50    /// Get the low-water level.
51    #[inline]
52    pub fn lowat(&self) -> usize {
53        self.lowat
54    }
55
56    /// Set the low-water level.
57    ///
58    /// When the internal buffer content length drops to this level or below, a
59    /// subsequent call to `fill_buffer()` will request more from the inner reader.
60    ///
61    /// If it desired for `fill_buffer()` to always request a `read()`, you
62    /// may use:
63    ///
64    /// ```
65    /// # use vmap::io::BufReader;
66    /// # fn main() -> std::io::Result<()> {
67    /// let mut buf = BufReader::new(std::io::stdin(), 4096)?;
68    /// buf.set_lowat(usize::MAX);
69    /// # Ok(())
70    /// # }
71    /// ```
72    #[inline]
73    pub fn set_lowat(&mut self, val: usize) {
74        self.lowat = val
75    }
76
77    /// Gets a reference to the underlying reader.
78    #[inline]
79    pub fn get_ref(&self) -> &R {
80        &self.inner
81    }
82
83    /// Gets a mutable reference to the underlying reader.
84    #[inline]
85    pub fn get_mut(&mut self) -> &mut R {
86        &mut self.inner
87    }
88
89    /// Returns a reference to the internally buffered data.
90    #[inline]
91    pub fn buffer(&self) -> &[u8] {
92        self.buf.as_read_slice(std::usize::MAX)
93    }
94
95    /// Unwraps this `BufReader`, returning the underlying reader.
96    pub fn into_inner(self) -> R {
97        self.inner
98    }
99}
100
101impl<R: Read> Deref for BufReader<R> {
102    type Target = R;
103
104    #[inline]
105    fn deref(&self) -> &Self::Target {
106        self.get_ref()
107    }
108}
109
110impl<R: Read> DerefMut for BufReader<R> {
111    #[inline]
112    fn deref_mut(&mut self) -> &mut Self::Target {
113        self.get_mut()
114    }
115}
116
117impl<R> AsRef<R> for BufReader<R>
118where
119    R: Read,
120    <BufReader<R> as Deref>::Target: AsRef<R>,
121{
122    fn as_ref(&self) -> &R {
123        self.deref()
124    }
125}
126
127impl<R> AsMut<R> for BufReader<R>
128where
129    R: Read,
130    <BufReader<R> as Deref>::Target: AsMut<R>,
131{
132    fn as_mut(&mut self) -> &mut R {
133        self.deref_mut()
134    }
135}
136
137impl<R: Read> Read for BufReader<R> {
138    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
139        // If the reader has been dequeued and the destination buffer is larger
140        // than the internal buffer, then read directly into the destination.
141        if self.buf.read_len() == 0 && buf.len() >= self.buf.write_capacity() {
142            return self.inner.read(buf);
143        }
144        let nread = {
145            let mut rem = self.fill_buf()?;
146            rem.read(buf)?
147        };
148        self.consume(nread);
149        Ok(nread)
150    }
151}
152
153impl<R: Read + Write> Write for BufReader<R> {
154    #[inline]
155    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156        self.inner.write(buf)
157    }
158
159    #[inline]
160    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
161        self.inner.write_vectored(bufs)
162    }
163
164    #[inline]
165    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
166        self.inner.write_all(buf)
167    }
168
169    #[inline]
170    fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
171        self.inner.write_fmt(fmt)
172    }
173
174    #[inline]
175    fn flush(&mut self) -> io::Result<()> {
176        self.inner.flush()
177    }
178}
179
180impl<R: Read> BufRead for BufReader<R> {
181    fn fill_buf(&mut self) -> io::Result<&[u8]> {
182        if self.buf.read_len() <= self.lowat {
183            let n = self.inner.read(self.buf.as_write_slice(std::usize::MAX))?;
184            self.buf.feed(n);
185        }
186        Ok(self.buffer())
187    }
188
189    fn consume(&mut self, amt: usize) {
190        self.buf.consume(amt);
191    }
192}
193
194/// The `BufWriter` adds buffering to any writer using a specialized buffer.
195///
196/// This is very similar `std::io::BufWriter`, but it uses a [`Ring`] for the
197/// internal the buffer.
198///
199/// # Examples
200///
201/// ```
202/// use vmap::io::{BufReader, BufWriter};
203/// # use std::io::prelude::*;
204/// # use std::net::{TcpListener, TcpStream};
205///
206/// # fn main() -> std::io::Result<()> {
207/// # let srv = TcpListener::bind("127.0.0.1:0")?;
208/// let recv = TcpStream::connect(srv.local_addr().unwrap())?;
209/// let send = /* accepted socked */
210/// # srv.accept()?.0;
211///
212/// let mut wr = BufWriter::new(send, 4000).unwrap();
213/// wr.write_all(b"hello\nworld\n")?;
214/// wr.flush()?;
215///
216/// let mut rd = BufReader::new(recv, 4000).unwrap();
217/// let mut line = String::new();
218/// let len = rd.read_line(&mut line)?;
219/// assert_eq!(line, "hello\n");
220/// # Ok(())
221/// # }
222/// ```
223pub struct BufWriter<W: Write> {
224    buf: Ring,
225    inner: W,
226    panicked: bool,
227}
228
229impl<W: Write> BufWriter<W> {
230    /// Creates a new `BufWriter`.
231    pub fn new(inner: W, capacity: usize) -> Result<Self> {
232        Ok(Self::from_parts(inner, Ring::new(capacity)?))
233    }
234
235    /// Creates a new `BufWriter` using an allocated, and possibly populated,
236    /// [`Ring`] instance. Consider calling [`Ring::clear()`] prior if the
237    /// contents of the ring should be discarded.
238    pub fn from_parts(inner: W, buf: Ring) -> Self {
239        Self {
240            buf,
241            inner,
242            panicked: false,
243        }
244    }
245
246    /// Gets a reference to the underlying writer.
247    #[inline]
248    pub fn get_ref(&self) -> &W {
249        &self.inner
250    }
251
252    /// Gets a mutable reference to the underlying writer.
253    #[inline]
254    pub fn get_mut(&mut self) -> &mut W {
255        &mut self.inner
256    }
257
258    /// Unwraps this `BufWriter`, returning the underlying writer.
259    ///
260    /// On `Err`, the result is a tuple combining the error that occurred while
261    /// flusing the buffer, and the buffer object.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// use std::io::{self, Write, ErrorKind};
267    /// use vmap::io::BufWriter;
268    ///
269    /// struct ErringWriter(usize);
270    /// impl Write for ErringWriter {
271    ///   fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
272    ///     // eventually fails with BrokenPipe
273    /// #   match self.0.min(buf.len()) {
274    /// #     0 => Err(ErrorKind::BrokenPipe.into()),
275    /// #     n => { self.0 -= n; Ok(n) },
276    /// #   }
277    ///   }
278    ///   fn flush(&mut self) -> io::Result<()> { Ok(()) }
279    /// }
280    ///
281    /// # fn main() -> vmap::Result<()> {
282    /// let mut stream = BufWriter::new(ErringWriter(6), 4096)?;
283    /// stream.write_all(b"hello\nworld\n")?;
284    ///
285    /// // flush the buffer and get the original stream back
286    /// let stream = match stream.into_inner() {
287    ///     Ok(s) => s,
288    ///     Err(e) => {
289    ///         assert_eq!(e.error().kind(), ErrorKind::BrokenPipe);
290    ///
291    ///         // You can forcefully obtain the stream, however it is in an
292    ///         // failing state.
293    ///         let (recovered_writer, ring) = e.into_inner().into_parts();
294    ///         assert_eq!(ring.unwrap().as_ref(), b"world\n");
295    ///         recovered_writer
296    ///     }
297    /// };
298    /// # Ok(())
299    /// # }
300    /// ```
301    pub fn into_inner(mut self) -> std::result::Result<W, IntoInnerError<W>> {
302        match self.flush_buf() {
303            Err(e) => Err(IntoInnerError(self, e)),
304            Ok(()) => Ok(self.into_parts().0),
305        }
306    }
307
308    /// Disassembles this [`BufWriter`] into the underlying writer and the [`Ring`]
309    /// used for buffering, containing any buffered but unwritted data.
310    ///
311    /// If the underlying writer panicked during the previous write, the [`Ring`]
312    /// will be wrapped in a [`WriterPanicked`] error. In this case, the content
313    /// still buffered within the [`Ring`] may or may not have been written.
314    ///
315    /// # Example
316    ///
317    /// ```
318    /// use std::io::{self, Write};
319    /// use std::panic::{catch_unwind, AssertUnwindSafe};
320    /// use vmap::io::BufWriter;
321    ///
322    /// struct PanickingWriter;
323    /// impl Write for PanickingWriter {
324    ///   fn write(&mut self, buf: &[u8]) -> io::Result<usize> { panic!() }
325    ///   fn flush(&mut self) -> io::Result<()> { panic!() }
326    /// }
327    ///
328    /// # fn main() -> vmap::Result<()> {
329    /// let mut stream = BufWriter::new(PanickingWriter, 4096)?;
330    /// stream.write_all(b"testing")?;
331    /// let result = catch_unwind(AssertUnwindSafe(|| {
332    ///     stream.flush().unwrap()
333    /// }));
334    /// assert!(result.is_err());
335    /// let (recovered_writer, ring) = stream.into_parts();
336    /// assert!(matches!(recovered_writer, PanickingWriter));
337    /// assert_eq!(ring.unwrap_err().into_inner().as_ref(), b"testing");
338    /// # Ok(())
339    /// # }
340    /// ```
341    pub fn into_parts(self) -> (W, std::result::Result<Ring, WriterPanicked>) {
342        // SAFETY: forget(self) prevents double dropping inner and buf.
343        let inner = unsafe { std::ptr::read(&self.inner) };
344        let buf = unsafe { std::ptr::read(&self.buf) };
345        let buf = if self.panicked {
346            Err(WriterPanicked(buf))
347        } else {
348            Ok(buf)
349        };
350
351        std::mem::forget(self);
352
353        (inner, buf)
354    }
355
356    fn flush_buf(&mut self) -> io::Result<()> {
357        loop {
358            if self.buf.is_empty() {
359                break Ok(());
360            }
361
362            self.panicked = true;
363            let r = self.inner.write(self.buf.as_read_slice(std::usize::MAX));
364            self.panicked = false;
365
366            match r {
367                Ok(0) => {
368                    break Err(ErrorKind::WriteZero.into());
369                }
370                Ok(n) => self.buf.consume(n),
371                Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
372                Err(e) => break Err(e),
373            }
374        }
375    }
376}
377
378impl<W: Write> Deref for BufWriter<W> {
379    type Target = W;
380
381    #[inline]
382    fn deref(&self) -> &Self::Target {
383        self.get_ref()
384    }
385}
386
387impl<W: Write> DerefMut for BufWriter<W> {
388    #[inline]
389    fn deref_mut(&mut self) -> &mut Self::Target {
390        self.get_mut()
391    }
392}
393
394impl<W> AsRef<W> for BufWriter<W>
395where
396    W: Write,
397    <BufWriter<W> as Deref>::Target: AsRef<W>,
398{
399    fn as_ref(&self) -> &W {
400        self.deref()
401    }
402}
403
404impl<W> AsMut<W> for BufWriter<W>
405where
406    W: Write,
407    <BufWriter<W> as Deref>::Target: AsMut<W>,
408{
409    fn as_mut(&mut self) -> &mut W {
410        self.deref_mut()
411    }
412}
413
414impl<W: Write> Drop for BufWriter<W> {
415    fn drop(&mut self) {
416        if !self.panicked {
417            let _r = self.flush_buf();
418        }
419    }
420}
421
422impl<W: Write> Write for BufWriter<W> {
423    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
424        if buf.len() > self.buf.write_len() {
425            self.flush_buf()?;
426        }
427        if buf.len() >= self.buf.write_len() {
428            self.panicked = true;
429            let r = self.inner.write(buf);
430            self.panicked = false;
431            r
432        } else {
433            self.buf.write(buf)
434        }
435    }
436
437    fn flush(&mut self) -> io::Result<()> {
438        self.flush_buf().and_then(|()| self.get_mut().flush())
439    }
440}
441
442impl<W: Write + Read> Read for BufWriter<W> {
443    #[inline]
444    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
445        self.inner.read(buf)
446    }
447
448    #[inline]
449    fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
450        self.inner.read_vectored(bufs)
451    }
452
453    #[inline]
454    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
455        self.inner.read_to_end(buf)
456    }
457
458    #[inline]
459    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
460        self.inner.read_to_string(buf)
461    }
462
463    #[inline]
464    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
465        self.inner.read_exact(buf)
466    }
467}
468
469/// An error returned by [`BufWriter::into_inner`] which combines an error that
470/// happened while writing out the buffer, and the buffered writer object
471/// which may be used to recover from the condition.
472pub struct IntoInnerError<W: Write>(BufWriter<W>, io::Error);
473
474impl<W: Write> IntoInnerError<W> {
475    /// Returns the error which caused the call to [`BufWriter::into_inner()`]
476    /// to fail.
477    pub fn error(&self) -> &io::Error {
478        &self.1
479    }
480
481    /// Consumes the [`IntoInnerError`] and returns the buffered writer which
482    /// received the error.
483    pub fn into_inner(self) -> BufWriter<W> {
484        self.0
485    }
486
487    /// Consumes the [`IntoInnerError`] and returns the error which caused the call to
488    /// [`BufWriter::into_inner()`] to fail. Unlike `error`, this can be used to
489    /// obtain ownership of the underlying error.
490    pub fn into_error(self) -> io::Error {
491        self.1
492    }
493
494    /// Consumes the [`IntoInnerError`] and returns the error which caused the call to
495    /// [`BufWriter::into_inner()`] to fail, and the underlying writer.
496    pub fn into_parts(self) -> (io::Error, BufWriter<W>) {
497        (self.1, self.0)
498    }
499}
500
501/// Error returned for the buffered data from `BufWriter::into_parts` when the underlying
502/// writer has previously panicked. The contents of the buffer may be partially written.
503pub struct WriterPanicked(Ring);
504
505impl WriterPanicked {
506    /// Returns the [`Ring`] with possibly unwritten data.
507    pub fn into_inner(self) -> Ring {
508        self.0
509    }
510
511    const DESCRIPTION: &'static str = "writer panicked, unwritten data may remain";
512}
513
514impl fmt::Display for WriterPanicked {
515    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516        write!(f, "{}", Self::DESCRIPTION)
517    }
518}
519
520impl fmt::Debug for WriterPanicked {
521    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522        f.debug_struct("WriterPanicked")
523            .field(
524                "buffer",
525                &format_args!("{}/{}", self.0.write_len(), self.0.write_capacity()),
526            )
527            .finish()
528    }
529}