vmap/io/buffer.rs
1use super::{Ring, SeqRead, SeqWrite};
2use crate::Result;
3
4use std::{
5 fmt,
6 io::{self, BufRead, ErrorKind, Read, Write},
7 ops::{Deref, DerefMut},
8};
9
10/// The `BufReader` adds buffering to any reader using a specialized buffer.
11///
12/// This is very similar `std::io::BufReader`, but it uses a [`Ring`] for the
13/// internal buffer, and it provides a configurable low water mark.
14///
15/// # Examples
16///
17/// ```
18/// use vmap::io::BufReader;
19/// # use std::io::prelude::*;
20/// # use std::net::{TcpListener, TcpStream};
21///
22/// # fn main() -> std::io::Result<()> {
23/// # let srv = TcpListener::bind("127.0.0.1:0")?;
24/// let sock = TcpStream::connect(srv.local_addr().unwrap())?;
25/// # let (mut cli, _addr) = srv.accept()?;
26/// let mut buf = BufReader::new(sock, 4000).expect("failed to create buffer");
27/// # cli.write_all(b"hello\nworld\n")?;
28/// let mut line = String::new();
29/// let len = buf.read_line(&mut line)?;
30/// assert_eq!(line, "hello\n");
31/// # Ok(())
32/// # }
33/// ```
34pub struct BufReader<R> {
35 buf: Ring,
36 inner: R,
37 lowat: usize,
38}
39
40impl<R: Read> BufReader<R> {
41 /// Creates a new `BufReader`.
42 pub fn new(inner: R, capacity: usize) -> Result<Self> {
43 Ok(Self {
44 buf: Ring::new(capacity)?,
45 inner,
46 lowat: 0,
47 })
48 }
49
50 /// Get the low-water level.
51 #[inline]
52 pub fn lowat(&self) -> usize {
53 self.lowat
54 }
55
56 /// Set the low-water level.
57 ///
58 /// When the internal buffer content length drops to this level or below, a
59 /// subsequent call to `fill_buffer()` will request more from the inner reader.
60 ///
61 /// If it desired for `fill_buffer()` to always request a `read()`, you
62 /// may use:
63 ///
64 /// ```
65 /// # use vmap::io::BufReader;
66 /// # fn main() -> std::io::Result<()> {
67 /// let mut buf = BufReader::new(std::io::stdin(), 4096)?;
68 /// buf.set_lowat(usize::MAX);
69 /// # Ok(())
70 /// # }
71 /// ```
72 #[inline]
73 pub fn set_lowat(&mut self, val: usize) {
74 self.lowat = val
75 }
76
77 /// Gets a reference to the underlying reader.
78 #[inline]
79 pub fn get_ref(&self) -> &R {
80 &self.inner
81 }
82
83 /// Gets a mutable reference to the underlying reader.
84 #[inline]
85 pub fn get_mut(&mut self) -> &mut R {
86 &mut self.inner
87 }
88
89 /// Returns a reference to the internally buffered data.
90 #[inline]
91 pub fn buffer(&self) -> &[u8] {
92 self.buf.as_read_slice(std::usize::MAX)
93 }
94
95 /// Unwraps this `BufReader`, returning the underlying reader.
96 pub fn into_inner(self) -> R {
97 self.inner
98 }
99}
100
101impl<R: Read> Deref for BufReader<R> {
102 type Target = R;
103
104 #[inline]
105 fn deref(&self) -> &Self::Target {
106 self.get_ref()
107 }
108}
109
110impl<R: Read> DerefMut for BufReader<R> {
111 #[inline]
112 fn deref_mut(&mut self) -> &mut Self::Target {
113 self.get_mut()
114 }
115}
116
117impl<R> AsRef<R> for BufReader<R>
118where
119 R: Read,
120 <BufReader<R> as Deref>::Target: AsRef<R>,
121{
122 fn as_ref(&self) -> &R {
123 self.deref()
124 }
125}
126
127impl<R> AsMut<R> for BufReader<R>
128where
129 R: Read,
130 <BufReader<R> as Deref>::Target: AsMut<R>,
131{
132 fn as_mut(&mut self) -> &mut R {
133 self.deref_mut()
134 }
135}
136
137impl<R: Read> Read for BufReader<R> {
138 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
139 // If the reader has been dequeued and the destination buffer is larger
140 // than the internal buffer, then read directly into the destination.
141 if self.buf.read_len() == 0 && buf.len() >= self.buf.write_capacity() {
142 return self.inner.read(buf);
143 }
144 let nread = {
145 let mut rem = self.fill_buf()?;
146 rem.read(buf)?
147 };
148 self.consume(nread);
149 Ok(nread)
150 }
151}
152
153impl<R: Read + Write> Write for BufReader<R> {
154 #[inline]
155 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156 self.inner.write(buf)
157 }
158
159 #[inline]
160 fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
161 self.inner.write_vectored(bufs)
162 }
163
164 #[inline]
165 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
166 self.inner.write_all(buf)
167 }
168
169 #[inline]
170 fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
171 self.inner.write_fmt(fmt)
172 }
173
174 #[inline]
175 fn flush(&mut self) -> io::Result<()> {
176 self.inner.flush()
177 }
178}
179
180impl<R: Read> BufRead for BufReader<R> {
181 fn fill_buf(&mut self) -> io::Result<&[u8]> {
182 if self.buf.read_len() <= self.lowat {
183 let n = self.inner.read(self.buf.as_write_slice(std::usize::MAX))?;
184 self.buf.feed(n);
185 }
186 Ok(self.buffer())
187 }
188
189 fn consume(&mut self, amt: usize) {
190 self.buf.consume(amt);
191 }
192}
193
194/// The `BufWriter` adds buffering to any writer using a specialized buffer.
195///
196/// This is very similar `std::io::BufWriter`, but it uses a [`Ring`] for the
197/// internal the buffer.
198///
199/// # Examples
200///
201/// ```
202/// use vmap::io::{BufReader, BufWriter};
203/// # use std::io::prelude::*;
204/// # use std::net::{TcpListener, TcpStream};
205///
206/// # fn main() -> std::io::Result<()> {
207/// # let srv = TcpListener::bind("127.0.0.1:0")?;
208/// let recv = TcpStream::connect(srv.local_addr().unwrap())?;
209/// let send = /* accepted socked */
210/// # srv.accept()?.0;
211///
212/// let mut wr = BufWriter::new(send, 4000).unwrap();
213/// wr.write_all(b"hello\nworld\n")?;
214/// wr.flush()?;
215///
216/// let mut rd = BufReader::new(recv, 4000).unwrap();
217/// let mut line = String::new();
218/// let len = rd.read_line(&mut line)?;
219/// assert_eq!(line, "hello\n");
220/// # Ok(())
221/// # }
222/// ```
223pub struct BufWriter<W: Write> {
224 buf: Ring,
225 inner: W,
226 panicked: bool,
227}
228
229impl<W: Write> BufWriter<W> {
230 /// Creates a new `BufWriter`.
231 pub fn new(inner: W, capacity: usize) -> Result<Self> {
232 Ok(Self::from_parts(inner, Ring::new(capacity)?))
233 }
234
235 /// Creates a new `BufWriter` using an allocated, and possibly populated,
236 /// [`Ring`] instance. Consider calling [`Ring::clear()`] prior if the
237 /// contents of the ring should be discarded.
238 pub fn from_parts(inner: W, buf: Ring) -> Self {
239 Self {
240 buf,
241 inner,
242 panicked: false,
243 }
244 }
245
246 /// Gets a reference to the underlying writer.
247 #[inline]
248 pub fn get_ref(&self) -> &W {
249 &self.inner
250 }
251
252 /// Gets a mutable reference to the underlying writer.
253 #[inline]
254 pub fn get_mut(&mut self) -> &mut W {
255 &mut self.inner
256 }
257
258 /// Unwraps this `BufWriter`, returning the underlying writer.
259 ///
260 /// On `Err`, the result is a tuple combining the error that occurred while
261 /// flusing the buffer, and the buffer object.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use std::io::{self, Write, ErrorKind};
267 /// use vmap::io::BufWriter;
268 ///
269 /// struct ErringWriter(usize);
270 /// impl Write for ErringWriter {
271 /// fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
272 /// // eventually fails with BrokenPipe
273 /// # match self.0.min(buf.len()) {
274 /// # 0 => Err(ErrorKind::BrokenPipe.into()),
275 /// # n => { self.0 -= n; Ok(n) },
276 /// # }
277 /// }
278 /// fn flush(&mut self) -> io::Result<()> { Ok(()) }
279 /// }
280 ///
281 /// # fn main() -> vmap::Result<()> {
282 /// let mut stream = BufWriter::new(ErringWriter(6), 4096)?;
283 /// stream.write_all(b"hello\nworld\n")?;
284 ///
285 /// // flush the buffer and get the original stream back
286 /// let stream = match stream.into_inner() {
287 /// Ok(s) => s,
288 /// Err(e) => {
289 /// assert_eq!(e.error().kind(), ErrorKind::BrokenPipe);
290 ///
291 /// // You can forcefully obtain the stream, however it is in an
292 /// // failing state.
293 /// let (recovered_writer, ring) = e.into_inner().into_parts();
294 /// assert_eq!(ring.unwrap().as_ref(), b"world\n");
295 /// recovered_writer
296 /// }
297 /// };
298 /// # Ok(())
299 /// # }
300 /// ```
301 pub fn into_inner(mut self) -> std::result::Result<W, IntoInnerError<W>> {
302 match self.flush_buf() {
303 Err(e) => Err(IntoInnerError(self, e)),
304 Ok(()) => Ok(self.into_parts().0),
305 }
306 }
307
308 /// Disassembles this [`BufWriter`] into the underlying writer and the [`Ring`]
309 /// used for buffering, containing any buffered but unwritted data.
310 ///
311 /// If the underlying writer panicked during the previous write, the [`Ring`]
312 /// will be wrapped in a [`WriterPanicked`] error. In this case, the content
313 /// still buffered within the [`Ring`] may or may not have been written.
314 ///
315 /// # Example
316 ///
317 /// ```
318 /// use std::io::{self, Write};
319 /// use std::panic::{catch_unwind, AssertUnwindSafe};
320 /// use vmap::io::BufWriter;
321 ///
322 /// struct PanickingWriter;
323 /// impl Write for PanickingWriter {
324 /// fn write(&mut self, buf: &[u8]) -> io::Result<usize> { panic!() }
325 /// fn flush(&mut self) -> io::Result<()> { panic!() }
326 /// }
327 ///
328 /// # fn main() -> vmap::Result<()> {
329 /// let mut stream = BufWriter::new(PanickingWriter, 4096)?;
330 /// stream.write_all(b"testing")?;
331 /// let result = catch_unwind(AssertUnwindSafe(|| {
332 /// stream.flush().unwrap()
333 /// }));
334 /// assert!(result.is_err());
335 /// let (recovered_writer, ring) = stream.into_parts();
336 /// assert!(matches!(recovered_writer, PanickingWriter));
337 /// assert_eq!(ring.unwrap_err().into_inner().as_ref(), b"testing");
338 /// # Ok(())
339 /// # }
340 /// ```
341 pub fn into_parts(self) -> (W, std::result::Result<Ring, WriterPanicked>) {
342 // SAFETY: forget(self) prevents double dropping inner and buf.
343 let inner = unsafe { std::ptr::read(&self.inner) };
344 let buf = unsafe { std::ptr::read(&self.buf) };
345 let buf = if self.panicked {
346 Err(WriterPanicked(buf))
347 } else {
348 Ok(buf)
349 };
350
351 std::mem::forget(self);
352
353 (inner, buf)
354 }
355
356 fn flush_buf(&mut self) -> io::Result<()> {
357 loop {
358 if self.buf.is_empty() {
359 break Ok(());
360 }
361
362 self.panicked = true;
363 let r = self.inner.write(self.buf.as_read_slice(std::usize::MAX));
364 self.panicked = false;
365
366 match r {
367 Ok(0) => {
368 break Err(ErrorKind::WriteZero.into());
369 }
370 Ok(n) => self.buf.consume(n),
371 Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
372 Err(e) => break Err(e),
373 }
374 }
375 }
376}
377
378impl<W: Write> Deref for BufWriter<W> {
379 type Target = W;
380
381 #[inline]
382 fn deref(&self) -> &Self::Target {
383 self.get_ref()
384 }
385}
386
387impl<W: Write> DerefMut for BufWriter<W> {
388 #[inline]
389 fn deref_mut(&mut self) -> &mut Self::Target {
390 self.get_mut()
391 }
392}
393
394impl<W> AsRef<W> for BufWriter<W>
395where
396 W: Write,
397 <BufWriter<W> as Deref>::Target: AsRef<W>,
398{
399 fn as_ref(&self) -> &W {
400 self.deref()
401 }
402}
403
404impl<W> AsMut<W> for BufWriter<W>
405where
406 W: Write,
407 <BufWriter<W> as Deref>::Target: AsMut<W>,
408{
409 fn as_mut(&mut self) -> &mut W {
410 self.deref_mut()
411 }
412}
413
414impl<W: Write> Drop for BufWriter<W> {
415 fn drop(&mut self) {
416 if !self.panicked {
417 let _r = self.flush_buf();
418 }
419 }
420}
421
422impl<W: Write> Write for BufWriter<W> {
423 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
424 if buf.len() > self.buf.write_len() {
425 self.flush_buf()?;
426 }
427 if buf.len() >= self.buf.write_len() {
428 self.panicked = true;
429 let r = self.inner.write(buf);
430 self.panicked = false;
431 r
432 } else {
433 self.buf.write(buf)
434 }
435 }
436
437 fn flush(&mut self) -> io::Result<()> {
438 self.flush_buf().and_then(|()| self.get_mut().flush())
439 }
440}
441
442impl<W: Write + Read> Read for BufWriter<W> {
443 #[inline]
444 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
445 self.inner.read(buf)
446 }
447
448 #[inline]
449 fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
450 self.inner.read_vectored(bufs)
451 }
452
453 #[inline]
454 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
455 self.inner.read_to_end(buf)
456 }
457
458 #[inline]
459 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
460 self.inner.read_to_string(buf)
461 }
462
463 #[inline]
464 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
465 self.inner.read_exact(buf)
466 }
467}
468
469/// An error returned by [`BufWriter::into_inner`] which combines an error that
470/// happened while writing out the buffer, and the buffered writer object
471/// which may be used to recover from the condition.
472pub struct IntoInnerError<W: Write>(BufWriter<W>, io::Error);
473
474impl<W: Write> IntoInnerError<W> {
475 /// Returns the error which caused the call to [`BufWriter::into_inner()`]
476 /// to fail.
477 pub fn error(&self) -> &io::Error {
478 &self.1
479 }
480
481 /// Consumes the [`IntoInnerError`] and returns the buffered writer which
482 /// received the error.
483 pub fn into_inner(self) -> BufWriter<W> {
484 self.0
485 }
486
487 /// Consumes the [`IntoInnerError`] and returns the error which caused the call to
488 /// [`BufWriter::into_inner()`] to fail. Unlike `error`, this can be used to
489 /// obtain ownership of the underlying error.
490 pub fn into_error(self) -> io::Error {
491 self.1
492 }
493
494 /// Consumes the [`IntoInnerError`] and returns the error which caused the call to
495 /// [`BufWriter::into_inner()`] to fail, and the underlying writer.
496 pub fn into_parts(self) -> (io::Error, BufWriter<W>) {
497 (self.1, self.0)
498 }
499}
500
501/// Error returned for the buffered data from `BufWriter::into_parts` when the underlying
502/// writer has previously panicked. The contents of the buffer may be partially written.
503pub struct WriterPanicked(Ring);
504
505impl WriterPanicked {
506 /// Returns the [`Ring`] with possibly unwritten data.
507 pub fn into_inner(self) -> Ring {
508 self.0
509 }
510
511 const DESCRIPTION: &'static str = "writer panicked, unwritten data may remain";
512}
513
514impl fmt::Display for WriterPanicked {
515 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516 write!(f, "{}", Self::DESCRIPTION)
517 }
518}
519
520impl fmt::Debug for WriterPanicked {
521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522 f.debug_struct("WriterPanicked")
523 .field(
524 "buffer",
525 &format_args!("{}/{}", self.0.write_len(), self.0.write_capacity()),
526 )
527 .finish()
528 }
529}