Skip to main content

ax_io/buffered/bufwriter/
mod.rs

1use core::{fmt, mem::ManuallyDrop, ptr};
2
3use crate::{DEFAULT_BUF_SIZE, Error, IntoInnerError, IoBufMut, Result, Seek, SeekFrom, Write};
4
5#[cfg(feature = "alloc")]
6type Buffer = alloc::vec::Vec<u8>;
7#[cfg(not(feature = "alloc"))]
8type Buffer = heapless::Vec<u8, DEFAULT_BUF_SIZE, u16>;
9
10/// Wraps a writer and buffers its output.
11///
12/// See [std::io::BufWriter] for more details.
13pub struct BufWriter<W: ?Sized + Write> {
14    // The buffer.
15    buf: Buffer,
16    // #30888: If the inner writer panics in a call to write, we don't want to
17    // write the buffered data a second time in BufWriter's destructor. This
18    // flag tells the Drop impl if it should skip the flush.
19    panicked: bool,
20    inner: W,
21}
22
23impl<W: Write> BufWriter<W> {
24    /// Creates a new `BufWriter<W>` with a default buffer capacity.
25    pub fn new(inner: W) -> BufWriter<W> {
26        #[cfg(feature = "alloc")]
27        let buf = Buffer::with_capacity(DEFAULT_BUF_SIZE);
28        #[cfg(not(feature = "alloc"))]
29        let buf = Buffer::new();
30        BufWriter {
31            buf,
32            panicked: false,
33            inner,
34        }
35    }
36
37    /// Creates a new `BufWriter<W>` with at least the specified buffer capacity.
38    #[cfg(feature = "alloc")]
39    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
40        BufWriter {
41            buf: Buffer::with_capacity(capacity),
42            panicked: false,
43            inner,
44        }
45    }
46
47    /// Unwraps this `BufWriter<W>`, returning the underlying writer.
48    ///
49    /// The buffer is written out before returning the writer.
50    ///
51    /// # Errors
52    ///
53    /// An [`Err`] will be returned if an error occurs while flushing the buffer.
54    #[cfg_attr(not(feature = "alloc"), allow(clippy::result_large_err))]
55    pub fn into_inner(mut self) -> core::result::Result<W, IntoInnerError<BufWriter<W>>> {
56        match self.flush_buf() {
57            Err(e) => Err(IntoInnerError::new(self, e)),
58            Ok(()) => Ok(self.into_parts().0),
59        }
60    }
61
62    /// Disassembles this `BufWriter<W>`, returning the underlying writer, and any buffered but
63    /// unwritten data.
64    ///
65    /// If the underlying writer panicked, it is not known what portion of the data was written.
66    /// In this case, we return `WriterPanicked` for the buffered data (from which the buffer
67    /// contents can still be recovered).
68    ///
69    /// `into_parts` makes no attempt to flush data and cannot fail.
70    pub fn into_parts(self) -> (W, core::result::Result<Buffer, WriterPanicked>) {
71        let mut this = ManuallyDrop::new(self);
72        let buf = core::mem::take(&mut this.buf);
73        let buf = if !this.panicked {
74            Ok(buf)
75        } else {
76            Err(WriterPanicked { buf })
77        };
78
79        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never
80        // dropped
81        let inner = unsafe { ptr::read(&this.inner) };
82
83        (inner, buf)
84    }
85}
86
87/// Error returned for the buffered data from `BufWriter::into_parts`, when the underlying
88/// writer has previously panicked.  Contains the (possibly partly written) buffered data.
89pub struct WriterPanicked {
90    buf: Buffer,
91}
92
93impl WriterPanicked {
94    /// Returns the perhaps-unwritten data.  Some of this data may have been written by the
95    /// panicking call(s) to the underlying writer, so simply writing it again is not a good idea.
96    #[must_use = "`self` will be dropped if the result is not used"]
97    pub fn into_inner(self) -> Buffer {
98        self.buf
99    }
100}
101
102impl fmt::Display for WriterPanicked {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        "BufWriter inner writer panicked, what data remains unwritten is not known".fmt(f)
105    }
106}
107
108impl fmt::Debug for WriterPanicked {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("WriterPanicked")
111            .field(
112                "buffer",
113                &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
114            )
115            .finish()
116    }
117}
118
119impl<W: ?Sized + Write> BufWriter<W> {
120    /// Gets a reference to the underlying writer.
121    pub fn get_ref(&self) -> &W {
122        &self.inner
123    }
124
125    /// Gets a mutable reference to the underlying writer.
126    ///
127    /// It is inadvisable to directly write to the underlying writer.
128    pub fn get_mut(&mut self) -> &mut W {
129        &mut self.inner
130    }
131
132    /// Returns a reference to the internally buffered data.
133    pub fn buffer(&self) -> &[u8] {
134        self.buf.as_slice()
135    }
136
137    /// Returns the number of bytes the internal buffer can hold without flushing.
138    pub fn capacity(&self) -> usize {
139        self.buf.capacity()
140    }
141
142    pub(crate) fn buffer_mut(&mut self) -> &mut Buffer {
143        &mut self.buf
144    }
145
146    /// Send data in our local buffer into the inner writer, looping as
147    /// necessary until either it's all been sent or an error occurs.
148    ///
149    /// Because all the data in the buffer has been reported to our owner as
150    /// "successfully written" (by returning nonzero success values from
151    /// `write`), any 0-length writes from `inner` must be reported as i/o
152    /// errors from this method.
153    pub(crate) fn flush_buf(&mut self) -> Result<()> {
154        /// Helper struct to ensure the buffer is updated after all the writes
155        /// are complete. It tracks the number of written bytes and drains them
156        /// all from the front of the buffer when dropped.
157        struct BufGuard<'a> {
158            buffer: &'a mut Buffer,
159            written: usize,
160        }
161
162        impl<'a> BufGuard<'a> {
163            fn new(buffer: &'a mut Buffer) -> Self {
164                Self { buffer, written: 0 }
165            }
166
167            /// The unwritten part of the buffer
168            fn remaining(&self) -> &[u8] {
169                &self.buffer.as_slice()[self.written..]
170            }
171
172            /// Flag some bytes as removed from the front of the buffer
173            fn consume(&mut self, amt: usize) {
174                self.written += amt;
175            }
176
177            /// true if all of the bytes have been written
178            fn done(&self) -> bool {
179                self.written >= self.buffer.len()
180            }
181        }
182
183        impl Drop for BufGuard<'_> {
184            fn drop(&mut self) {
185                if self.written > 0 {
186                    self.buffer.drain(..self.written);
187                }
188            }
189        }
190
191        let mut guard = BufGuard::new(&mut self.buf);
192        while !guard.done() {
193            self.panicked = true;
194            let r = self.inner.write(guard.remaining());
195            self.panicked = false;
196
197            match r {
198                Ok(0) => {
199                    return Err(Error::WriteZero);
200                }
201                Ok(n) => guard.consume(n),
202                Err(ref e) if e.canonicalize() == Error::Interrupted => {}
203                Err(e) => return Err(e),
204            }
205        }
206        Ok(())
207    }
208
209    fn spare_capacity(&self) -> usize {
210        self.buf.capacity() - self.buf.len()
211    }
212
213    // SAFETY: Requires `buf.len() <= self.buf.capacity() - self.buf.len()`,
214    // i.e., that input buffer length is less than or equal to spare capacity.
215    #[inline]
216    unsafe fn write_to_buffer_unchecked(&mut self, buf: &[u8]) {
217        debug_assert!(buf.len() <= self.spare_capacity());
218        let old_len = self.buf.len();
219        let buf_len = buf.len();
220        let src = buf.as_ptr();
221        unsafe {
222            let dst = self.buf.as_mut_ptr().add(old_len);
223            core::ptr::copy_nonoverlapping(src, dst, buf_len);
224            self.buf.set_len(old_len + buf_len);
225        }
226    }
227
228    /// Buffer some data without flushing it, regardless of the size of the
229    /// data. Writes as much as possible without exceeding capacity. Returns
230    /// the number of bytes written.
231    pub(crate) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
232        let available = self.spare_capacity();
233        let amt_to_buffer = available.min(buf.len());
234
235        // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
236        unsafe {
237            self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
238        }
239
240        amt_to_buffer
241    }
242
243    // Ensure this function does not get inlined into `write`, so that it
244    // remains inlineable and its common path remains as short as possible.
245    // If this function ends up being called frequently relative to `write`,
246    // it's likely a sign that the client is using an improperly sized buffer
247    // or their write patterns are somewhat pathological.
248    #[cold]
249    #[inline(never)]
250    fn write_cold(&mut self, buf: &[u8]) -> Result<usize> {
251        if buf.len() > self.spare_capacity() {
252            self.flush_buf()?;
253        }
254
255        // Why not len > capacity? To avoid a needless trip through the buffer when the input
256        // exactly fills it. We'd just need to flush it to the underlying writer anyway.
257        if buf.len() >= self.buf.capacity() {
258            self.panicked = true;
259            let r = self.get_mut().write(buf);
260            self.panicked = false;
261            r
262        } else {
263            // Write to the buffer. In this case, we write to the buffer even if it fills it
264            // exactly. Doing otherwise would mean flushing the buffer, then writing this
265            // input to the inner writer, which in many cases would be a worse strategy.
266
267            // SAFETY: There was either enough spare capacity already, or there wasn't and we
268            // flushed the buffer to ensure that there is. In the latter case, we know that there
269            // is because flushing ensured that our entire buffer is spare capacity, and we entered
270            // this block because the input buffer length is less than that capacity. In either
271            // case, it's safe to write the input buffer to our buffer.
272            unsafe {
273                self.write_to_buffer_unchecked(buf);
274            }
275
276            Ok(buf.len())
277        }
278    }
279
280    // Ensure this function does not get inlined into `write_all`, so that it
281    // remains inlineable and its common path remains as short as possible.
282    // If this function ends up being called frequently relative to `write_all`,
283    // it's likely a sign that the client is using an improperly sized buffer
284    // or their write patterns are somewhat pathological.
285    #[cold]
286    #[inline(never)]
287    fn write_all_cold(&mut self, buf: &[u8]) -> Result<()> {
288        // Normally, `write_all` just calls `write` in a loop. We can do better
289        // by calling `self.get_mut().write_all()` directly, which avoids
290        // round trips through the buffer in the event of a series of partial
291        // writes in some circumstances.
292
293        if buf.len() > self.spare_capacity() {
294            self.flush_buf()?;
295        }
296
297        // Why not len > capacity? To avoid a needless trip through the buffer when the input
298        // exactly fills it. We'd just need to flush it to the underlying writer anyway.
299        if buf.len() >= self.buf.capacity() {
300            self.panicked = true;
301            let r = self.get_mut().write_all(buf);
302            self.panicked = false;
303            r
304        } else {
305            // Write to the buffer. In this case, we write to the buffer even if it fills it
306            // exactly. Doing otherwise would mean flushing the buffer, then writing this
307            // input to the inner writer, which in many cases would be a worse strategy.
308
309            // SAFETY: There was either enough spare capacity already, or there wasn't and we
310            // flushed the buffer to ensure that there is. In the latter case, we know that there
311            // is because flushing ensured that our entire buffer is spare capacity, and we entered
312            // this block because the input buffer length is less than that capacity. In either
313            // case, it's safe to write the input buffer to our buffer.
314            unsafe {
315                self.write_to_buffer_unchecked(buf);
316            }
317
318            Ok(())
319        }
320    }
321}
322
323impl<W: ?Sized + Write + fmt::Debug> fmt::Debug for BufWriter<W> {
324    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
325        fmt.debug_struct("BufWriter")
326            .field("writer", &&self.inner)
327            .field(
328                "buffer",
329                &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
330            )
331            .finish()
332    }
333}
334
335impl<W: ?Sized + Write> Write for BufWriter<W> {
336    #[inline]
337    fn write(&mut self, buf: &[u8]) -> Result<usize> {
338        // Use < instead of <= to avoid a needless trip through the buffer in some cases.
339        // See `write_cold` for details.
340        if buf.len() < self.spare_capacity() {
341            // SAFETY: safe by above conditional.
342            unsafe {
343                self.write_to_buffer_unchecked(buf);
344            }
345
346            Ok(buf.len())
347        } else {
348            self.write_cold(buf)
349        }
350    }
351
352    #[inline]
353    fn write_all(&mut self, buf: &[u8]) -> Result<()> {
354        // Use < instead of <= to avoid a needless trip through the buffer in some cases.
355        // See `write_all_cold` for details.
356        if buf.len() < self.spare_capacity() {
357            // SAFETY: safe by above conditional.
358            unsafe {
359                self.write_to_buffer_unchecked(buf);
360            }
361
362            Ok(())
363        } else {
364            self.write_all_cold(buf)
365        }
366    }
367
368    fn flush(&mut self) -> Result<()> {
369        self.flush_buf().and_then(|()| self.get_mut().flush())
370    }
371}
372
373impl<W: ?Sized + Write + Seek> Seek for BufWriter<W> {
374    /// Seek to the offset, in bytes, in the underlying writer.
375    ///
376    /// Seeking always writes out the internal buffer before seeking.
377    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
378        self.flush_buf()?;
379        self.get_mut().seek(pos)
380    }
381}
382
383impl<W: ?Sized + Write> Drop for BufWriter<W> {
384    fn drop(&mut self) {
385        if !self.panicked {
386            // dtors should not panic, so we ignore a failed flush
387            let _r = self.flush_buf();
388        }
389    }
390}
391
392impl<W: ?Sized + Write + IoBufMut> IoBufMut for BufWriter<W> {
393    #[inline]
394    fn remaining_mut(&self) -> usize {
395        self.inner.remaining_mut().saturating_sub(self.buf.len())
396    }
397}