Skip to main content

qubit_io/buffered/
buffered_byte_output.rs

1// =============================================================================
2//    Copyright (c) 2026 Haixing Hu.
3//
4//    SPDX-License-Identifier: Apache-2.0
5//
6//    Licensed under the Apache License, Version 2.0.
7// =============================================================================
8
9use std::io::{
10    Error,
11    ErrorKind,
12    Result,
13    Seek,
14    SeekFrom,
15    Write,
16};
17
18use crate::Buffer;
19use crate::WriteExt;
20use crate::buffered::DEFAULT_BUFFER_CAPACITY;
21
22/// Buffered byte output over a wrapped writer.
23///
24/// This type keeps a fixed-size byte buffer in front of an underlying writer so
25/// small byte writes can be accumulated before they are written to the I/O
26/// target. Large writes may bypass the buffer after pending buffered bytes
27/// have been flushed.
28///
29/// `BufferedByteOutput` is deliberately byte-oriented. It performs no binary
30/// encoding, text encoding, or record framing. Higher-level writers can either
31/// use the standard [`Write`] implementation or write directly into
32/// [`Self::spare_buffer_mut`] or [`Self::spare_raw_parts_mut`] and then call
33/// [`Self::advance`] or [`Self::advance_unchecked`] after validating the range
34/// they initialized. Callers that need to recover the wrapped writer should
35/// call [`Write::flush`] first, then use [`Self::into_parts`].
36#[derive(Debug)]
37pub struct BufferedByteOutput<W> {
38    inner: W,
39    buffer: Buffer<u8>,
40}
41
42impl<W> BufferedByteOutput<W> {
43    /// Creates a buffered byte output with the default capacity.
44    ///
45    /// # Parameters
46    ///
47    /// * `inner` - The writer that receives bytes when the internal buffer is
48    ///   flushed.
49    ///
50    /// # Returns
51    ///
52    /// A new buffered byte output using `DEFAULT_BUFFER_CAPACITY`.
53    #[inline(always)]
54    #[must_use]
55    pub fn new(inner: W) -> Self {
56        Self::with_capacity(inner, DEFAULT_BUFFER_CAPACITY)
57    }
58
59    /// Creates a buffered byte output with at least the requested capacity.
60    ///
61    /// # Parameters
62    ///
63    /// * `inner` - The writer that receives bytes when the internal buffer is
64    ///   flushed.
65    /// * `capacity` - The requested internal buffer capacity in bytes.
66    ///
67    /// # Returns
68    ///
69    /// A new buffered byte output whose actual buffer capacity is
70    /// `capacity.max(1)`.
71    #[inline(always)]
72    #[must_use]
73    pub fn with_capacity(inner: W, capacity: usize) -> Self {
74        Self {
75            inner,
76            buffer: Buffer::with_capacity(capacity),
77        }
78    }
79
80    /// Returns a shared reference to the wrapped writer.
81    ///
82    /// # Returns
83    ///
84    /// An immutable reference to the underlying writer.  Pending bytes may
85    /// still be present in the internal buffer and are not flushed by this
86    /// method.
87    #[inline(always)]
88    pub const fn inner(&self) -> &W {
89        &self.inner
90    }
91
92    /// Returns an exclusive reference to the wrapped writer.
93    ///
94    /// Pending bytes may still be present in the internal buffer and are not
95    /// flushed by this method.
96    ///
97    /// # Returns
98    ///
99    /// A mutable reference to the underlying writer.
100    #[inline(always)]
101    pub fn inner_mut(&mut self) -> &mut W {
102        &mut self.inner
103    }
104
105    /// Returns the internal buffer capacity.
106    ///
107    /// # Returns
108    ///
109    /// The total number of bytes that can be held by the internal buffer.
110    #[inline(always)]
111    #[must_use]
112    pub fn capacity(&self) -> usize {
113        self.buffer.capacity()
114    }
115
116    /// Returns the unused capacity in the internal buffer.
117    ///
118    /// # Returns
119    ///
120    /// The number of bytes that can still be appended to the internal buffer
121    /// before it must be flushed.
122    #[inline(always)]
123    #[must_use]
124    pub fn spare_capacity(&self) -> usize {
125        self.buffer.spare_capacity()
126    }
127
128    /// Returns the unused portion of the internal buffer.
129    ///
130    /// Callers may write initialized bytes into the returned slice and then
131    /// call [`Self::advance`] with the number of bytes written.
132    ///
133    /// # Returns
134    ///
135    /// A mutable slice over the spare buffer capacity.
136    #[inline(always)]
137    #[must_use]
138    pub fn spare_buffer_mut(&mut self) -> &mut [u8] {
139        let limit = self.buffer.limit();
140        &mut self.buffer.data_mut()[limit..]
141    }
142
143    /// Returns raw spare-buffer parts for hot-path callers.
144    ///
145    /// The returned slice is the full internal backing storage. `index` is the
146    /// start of the spare byte window, and `count` is the number of spare
147    /// bytes. Callers that need a slice can use `&mut buffer[index..index +
148    /// count]`; callers that already validated bounds can pass `buffer` and
149    /// `index` directly to indexed unchecked codecs.
150    ///
151    /// Mutating bytes outside `index..index + count` changes pending output
152    /// bytes and may corrupt the logical stream.
153    ///
154    /// # Returns
155    ///
156    /// The backing storage, the spare start index, and the spare byte count.
157    #[inline(always)]
158    #[must_use]
159    pub fn spare_raw_parts_mut(&mut self) -> (&mut [u8], usize, usize) {
160        let index = self.buffer.limit();
161        let count = self.buffer.spare_capacity();
162        (self.buffer.data_mut(), index, count)
163    }
164
165    /// Marks `count` bytes from [`Self::spare_buffer_mut`] as written.
166    ///
167    /// # Parameters
168    ///
169    /// * `count` - Number of bytes initialized by the caller.
170    ///
171    /// # Panics
172    ///
173    /// Panics when `count` exceeds [`Self::spare_capacity`].
174    #[inline(always)]
175    pub fn advance(&mut self, count: usize) {
176        assert!(
177            count <= self.spare_capacity(),
178            "cannot advance beyond spare output buffer"
179        );
180        // SAFETY: The assertion proves that `count` is within spare capacity.
181        unsafe {
182            self.buffer.advance_unchecked(count);
183        }
184    }
185
186    /// Marks spare bytes as written without checking bounds.
187    ///
188    /// # Parameters
189    ///
190    /// * `count` - Number of initialized spare bytes to make pending for
191    ///   output.
192    ///
193    /// # Safety
194    ///
195    /// The caller must guarantee that `count <= self.spare_capacity()` and
196    /// that the corresponding bytes returned by [`Self::spare_buffer_mut`]
197    /// have been initialized.
198    #[inline(always)]
199    pub unsafe fn advance_unchecked(&mut self, count: usize) {
200        // SAFETY: The caller guarantees that `count` is within spare capacity.
201        unsafe {
202            self.buffer.advance_unchecked(count);
203        }
204    }
205
206    /// Writes bytes into the internal buffer without checking spare capacity.
207    ///
208    /// # Parameters
209    ///
210    /// * `input` - The source bytes.
211    /// * `input_index` - The starting index in `input`.
212    /// * `count` - The number of bytes to copy.
213    ///
214    /// # Safety
215    ///
216    /// The caller must ensure that `input_index..input_index + count` is valid
217    /// in `input`, that `count <= self.spare_capacity()`, and that the copied
218    /// source range does not overlap with the destination range in the internal
219    /// buffer.
220    #[inline(always)]
221    unsafe fn write_to_buffer_unchecked(
222        &mut self,
223        input: &[u8],
224        input_index: usize,
225        count: usize,
226    ) {
227        // SAFETY: The caller upholds `Buffer::copy_from_unchecked` range and
228        // non-overlap requirements.
229        unsafe {
230            self.buffer.copy_from_unchecked(input, input_index, count);
231        }
232    }
233}
234
235impl<W> BufferedByteOutput<W>
236where
237    W: Write,
238{
239    /// Consumes this buffered output without flushing pending bytes.
240    ///
241    /// This method performs no I/O. Pending bytes that have been accepted into
242    /// the internal buffer but not written to the wrapped writer are returned
243    /// as the second tuple item.
244    ///
245    /// # Returns
246    ///
247    /// The wrapped writer and pending bytes in logical write order.
248    #[inline(always)]
249    #[must_use]
250    pub fn into_parts(self) -> (W, Vec<u8>) {
251        let pending = self.pending_slice().to_vec();
252        (self.inner, pending)
253    }
254
255    /// Ensures that at least `count` bytes are available in the spare buffer.
256    ///
257    /// # Parameters
258    ///
259    /// * `count` - Number of spare bytes required.
260    ///
261    /// # Errors
262    ///
263    /// Returns any non-interrupted I/O error produced while flushing buffered
264    /// bytes. Returns [`ErrorKind::InvalidInput`] if `count` exceeds the buffer
265    /// capacity. Returns [`ErrorKind::InvalidData`] if the wrapped writer
266    /// reports more bytes than the pending buffer range contained.
267    pub fn ensure_spare_capacity(&mut self, count: usize) -> Result<()> {
268        if count > self.buffer.capacity() {
269            return Err(Error::new(
270                ErrorKind::InvalidInput,
271                "requested spare capacity exceeds buffered output capacity",
272            ));
273        }
274        if self.spare_capacity() < count {
275            self.flush_buffer()?;
276        }
277        Ok(())
278    }
279
280    /// Writes all bytes through the internal buffer.
281    ///
282    /// Small inputs are appended to the internal buffer.  Inputs that do not
283    /// fit may flush the buffer first, and inputs at least as large as the
284    /// buffer may be written directly to the wrapped writer.
285    ///
286    /// # Parameters
287    ///
288    /// * `input` - The bytes to write.
289    ///
290    /// # Returns
291    ///
292    /// `Ok(())` after all bytes from `input` have been accepted.
293    ///
294    /// # Errors
295    ///
296    /// Returns any I/O error produced while flushing pending bytes or writing a
297    /// large input directly to the wrapped writer. Flush failures include
298    /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
299    /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
300    /// it reports more bytes than the requested range contained.
301    #[inline]
302    fn write_all_buffered(&mut self, input: &[u8]) -> Result<()> {
303        if input.len() < self.spare_capacity() {
304            // SAFETY: The branch proves that the input fits in spare capacity.
305            unsafe {
306                self.write_to_buffer_unchecked(input, 0, input.len());
307            }
308            Ok(())
309        } else {
310            self.write_all_cold(input)
311        }
312    }
313
314    /// Handles slow-path raw writes that must flush or bypass the buffer.
315    ///
316    /// # Parameters
317    ///
318    /// * `input` - The bytes to write after the fast path determined that they
319    ///   do not fit comfortably in the current spare buffer capacity.
320    ///
321    /// # Returns
322    ///
323    /// `Ok(())` after all bytes from `input` have been accepted either by the
324    /// buffer or by the wrapped writer.
325    ///
326    /// # Errors
327    ///
328    /// Returns any I/O error produced while flushing pending bytes or writing a
329    /// large input directly to the wrapped writer. Flush failures include
330    /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
331    /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
332    /// it reports more bytes than the requested range contained.
333    #[cold]
334    #[inline(never)]
335    fn write_all_cold(&mut self, input: &[u8]) -> Result<()> {
336        if input.len() > self.spare_capacity() {
337            self.flush_buffer()?;
338        }
339        if input.len() >= self.buffer.capacity() {
340            // SAFETY: The range covers the full source slice.
341            unsafe { self.write_all_inner_unchecked(input, 0, input.len()) }
342        } else {
343            // SAFETY: After the optional flush, any input smaller than the
344            // buffer capacity fits in the empty or sufficiently spare buffer.
345            unsafe {
346                self.write_to_buffer_unchecked(input, 0, input.len());
347            }
348            Ok(())
349        }
350    }
351
352    /// Handles slow-path raw writes for [`Write::write`] semantics.
353    ///
354    /// The method preserves `Write::write` behavior: it may accept fewer bytes
355    /// than the input length when the write is delegated directly to the
356    /// wrapped writer.
357    ///
358    /// # Parameters
359    ///
360    /// * `input` - The bytes to write after the fast path determined that they
361    ///   do not fit comfortably in the current spare buffer capacity.
362    ///
363    /// # Returns
364    ///
365    /// The number of bytes accepted.  Buffered writes return `input.len()`;
366    /// direct writes return the byte count reported by the wrapped writer.
367    ///
368    /// # Errors
369    ///
370    /// Returns any I/O error produced while flushing pending bytes or writing a
371    /// large input directly to the wrapped writer. Flush failures include
372    /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
373    /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
374    /// it reports more bytes than the requested range contained.
375    #[cold]
376    #[inline(never)]
377    fn write_cold(&mut self, input: &[u8]) -> Result<usize> {
378        if input.len() > self.spare_capacity() {
379            self.flush_buffer()?;
380        }
381        if input.len() >= self.buffer.capacity() {
382            // SAFETY: The range covers the full source slice.
383            unsafe { self.write_inner_unchecked(input, 0, input.len()) }
384        } else {
385            // SAFETY: After the optional flush, any input smaller than the
386            // buffer capacity fits in the empty or sufficiently spare buffer.
387            unsafe {
388                self.write_to_buffer_unchecked(input, 0, input.len());
389            }
390            Ok(input.len())
391        }
392    }
393
394    /// Flushes buffered bytes to the wrapped writer.
395    ///
396    /// The method retries interrupted writes.  If an error occurs after some
397    /// bytes have been written, the already-written bytes are removed from the
398    /// front of the buffer and the unwritten suffix is kept for a later retry.
399    ///
400    /// # Returns
401    ///
402    /// `Ok(())` once all currently buffered bytes have been written to the
403    /// wrapped writer.
404    ///
405    /// # Errors
406    ///
407    /// Returns any non-interrupted I/O error produced by the wrapped writer.
408    /// Returns [`ErrorKind::WriteZero`] if the writer reports a zero-length
409    /// write before all buffered bytes are drained. Returns
410    /// [`ErrorKind::InvalidData`] if the writer reports more bytes than the
411    /// pending buffer range contained.
412    pub fn flush_buffer(&mut self) -> Result<()> {
413        while !self.buffer.is_empty() {
414            let position = self.buffer.position();
415            let available = self.buffer.available();
416            // SAFETY: `position..position + available` is the current readable
417            // range maintained by `Buffer`.
418            match unsafe {
419                self.inner.write_unchecked(
420                    self.buffer.data(),
421                    position,
422                    available,
423                )
424            } {
425                Ok(0) => {
426                    self.buffer.compact();
427                    return Err(Error::new(
428                        ErrorKind::WriteZero,
429                        "failed to write buffered data",
430                    ));
431                }
432                Ok(written) => {
433                    if let Err(error) = validate_write_count(written, available)
434                    {
435                        self.buffer.compact();
436                        return Err(error);
437                    }
438                    // SAFETY: The validated count is in `0..=available`.
439                    unsafe {
440                        self.buffer.consume_unchecked(written);
441                    }
442                }
443                Err(error) if error.kind() == ErrorKind::Interrupted => {}
444                Err(error) => {
445                    self.buffer.compact();
446                    return Err(error);
447                }
448            }
449        }
450        self.buffer.clear();
451        Ok(())
452    }
453
454    /// Flushes buffered bytes and then flushes the wrapped writer.
455    ///
456    /// # Returns
457    ///
458    /// `Ok(())` once pending buffered bytes have been written and the wrapped
459    /// writer's own flush operation succeeds.
460    ///
461    /// # Errors
462    ///
463    /// Returns any non-interrupted I/O error produced while flushing buffered
464    /// bytes, [`ErrorKind::WriteZero`] if the wrapped writer cannot make
465    /// progress while draining the buffer, [`ErrorKind::InvalidData`] if the
466    /// writer reports an impossible byte count, or any error returned by
467    /// [`Write::flush`] on the wrapped writer.
468    #[inline(always)]
469    fn flush_all(&mut self) -> Result<()> {
470        self.flush_buffer().and_then(|()| self.inner.flush())
471    }
472
473    /// Writes bytes from the input slice and reports the accepted byte count.
474    ///
475    /// This is the buffered implementation for [`Write::write`]-style callers.
476    /// Small inputs are appended to the buffer and reported as fully accepted;
477    /// large inputs may be delegated to the wrapped writer after pending bytes
478    /// are flushed.
479    ///
480    /// # Parameters
481    ///
482    /// * `input` - The bytes to write.
483    ///
484    /// # Returns
485    ///
486    /// The number of bytes accepted.  Buffered writes return `input.len()`;
487    /// direct writes return the byte count reported by the wrapped writer.
488    ///
489    /// # Errors
490    ///
491    /// Returns any I/O error produced while flushing pending bytes or writing a
492    /// large input directly to the wrapped writer. Flush failures include
493    /// [`ErrorKind::WriteZero`] if the writer reports that zero bytes were
494    /// written before the buffer is drained, and [`ErrorKind::InvalidData`] if
495    /// it reports more bytes than the requested range contained.
496    #[inline]
497    fn write_from(&mut self, input: &[u8]) -> Result<usize> {
498        if input.len() < self.spare_capacity() {
499            // SAFETY: The branch proves that the input fits in spare capacity.
500            unsafe {
501                self.write_to_buffer_unchecked(input, 0, input.len());
502            }
503            Ok(input.len())
504        } else {
505            self.write_cold(input)
506        }
507    }
508
509    /// Flushes pending bytes before seeking the wrapped writer.
510    ///
511    /// # Parameters
512    ///
513    /// * `position` - The target seek position passed to the wrapped writer.
514    ///
515    /// # Returns
516    ///
517    /// The new stream position reported by the wrapped writer.
518    ///
519    /// # Errors
520    ///
521    /// Returns any non-interrupted I/O error produced while flushing buffered
522    /// bytes, [`ErrorKind::WriteZero`] if the wrapped writer cannot make
523    /// progress while draining the buffer, [`ErrorKind::InvalidData`] if the
524    /// writer reports an impossible byte count, or any error returned by
525    /// [`Seek::seek`] on the wrapped writer.
526    #[inline(always)]
527    fn flush_then_seek(&mut self, position: SeekFrom) -> Result<u64>
528    where
529        W: Seek,
530    {
531        self.flush_buffer().and_then(|()| self.inner.seek(position))
532    }
533
534    /// Returns pending bytes currently stored in the internal buffer.
535    ///
536    /// # Returns
537    ///
538    /// A slice over bytes accepted by this output but not yet written to the
539    /// wrapped writer.
540    #[inline(always)]
541    fn pending_slice(&self) -> &[u8] {
542        &self.buffer.data()[self.buffer.position()..self.buffer.limit()]
543    }
544
545    /// Writes bytes to the wrapped writer and validates the reported count.
546    ///
547    /// # Parameters
548    ///
549    /// * `input` - Source storage.
550    /// * `input_index` - Start index inside `input`.
551    /// * `count` - Maximum number of bytes to write.
552    ///
553    /// # Returns
554    ///
555    /// The number of bytes accepted by the wrapped writer.
556    ///
557    /// # Errors
558    ///
559    /// Returns the wrapped writer's I/O error, or [`ErrorKind::InvalidData`]
560    /// if it reports a byte count larger than `count`.
561    ///
562    /// # Safety
563    ///
564    /// The caller must guarantee that `input_index..input_index + count` is a
565    /// valid range inside `input` and that the addition does not overflow.
566    #[inline(always)]
567    unsafe fn write_inner_unchecked(
568        &mut self,
569        input: &[u8],
570        input_index: usize,
571        count: usize,
572    ) -> Result<usize> {
573        // SAFETY: The caller guarantees the source range is valid.
574        let written =
575            unsafe { self.inner.write_unchecked(input, input_index, count) }?;
576        validate_write_count(written, count)?;
577        Ok(written)
578    }
579
580    /// Writes all bytes in an indexed source range to the wrapped writer.
581    ///
582    /// # Parameters
583    ///
584    /// * `input` - Source storage.
585    /// * `input_index` - Start index inside `input`.
586    /// * `count` - Number of bytes to write.
587    ///
588    /// # Errors
589    ///
590    /// Returns the wrapped writer's I/O error, [`ErrorKind::WriteZero`] if the
591    /// writer cannot make progress, or [`ErrorKind::InvalidData`] if it
592    /// reports an impossible byte count.
593    ///
594    /// # Safety
595    ///
596    /// The caller must guarantee that `input_index..input_index + count` is a
597    /// valid range inside `input` and that the addition does not overflow.
598    unsafe fn write_all_inner_unchecked(
599        &mut self,
600        input: &[u8],
601        input_index: usize,
602        count: usize,
603    ) -> Result<()> {
604        let mut written = 0;
605        while written < count {
606            let remaining = count - written;
607            // SAFETY: `written < count`, so this suffix remains inside the
608            // caller-validated source range.
609            match unsafe {
610                self.write_inner_unchecked(
611                    input,
612                    input_index + written,
613                    remaining,
614                )
615            } {
616                Ok(0) => {
617                    return Err(Error::new(
618                        ErrorKind::WriteZero,
619                        "failed to write whole buffer",
620                    ));
621                }
622                Ok(count) => written += count,
623                Err(error) if error.kind() == ErrorKind::Interrupted => {}
624                Err(error) => return Err(error),
625            }
626        }
627        Ok(())
628    }
629}
630
631impl<W> Write for BufferedByteOutput<W>
632where
633    W: Write,
634{
635    /// Writes bytes through the internal buffer.
636    #[inline(always)]
637    fn write(&mut self, buffer: &[u8]) -> Result<usize> {
638        self.write_from(buffer)
639    }
640
641    /// Writes all bytes through the internal buffer.
642    #[inline(always)]
643    fn write_all(&mut self, buffer: &[u8]) -> Result<()> {
644        self.write_all_buffered(buffer)
645    }
646
647    /// Flushes the internal buffer and then the wrapped writer.
648    #[inline(always)]
649    fn flush(&mut self) -> Result<()> {
650        self.flush_all()
651    }
652}
653
654impl<W> Seek for BufferedByteOutput<W>
655where
656    W: Write + Seek,
657{
658    /// Flushes pending bytes before seeking the wrapped writer.
659    #[inline(always)]
660    fn seek(&mut self, position: SeekFrom) -> Result<u64> {
661        self.flush_then_seek(position)
662    }
663}
664
665/// Validates a byte count returned by a wrapped writer.
666///
667/// # Parameters
668///
669/// * `written` - Byte count reported by the wrapped writer.
670/// * `requested` - Maximum byte count requested from the wrapped writer.
671///
672/// # Errors
673///
674/// Returns [`ErrorKind::InvalidData`] when the wrapped writer reports more
675/// bytes than the source range contained.
676#[inline(always)]
677fn validate_write_count(written: usize, requested: usize) -> Result<()> {
678    if written > requested {
679        return Err(Error::new(
680            ErrorKind::InvalidData,
681            format!(
682                "writer reported {written} bytes for a {requested}-byte buffer"
683            ),
684        ));
685    }
686    Ok(())
687}