Skip to main content

qubit_io/buffered/
buffered_byte_input.rs

1// =============================================================================
2//    Copyright (c) 2026 Haixing Hu.
3//
4//    SPDX-License-Identifier: Apache-2.0
5//
6//    Licensed under the Apache License, Version 2.0.
7// =============================================================================
8
9use std::io::{
10    BufRead,
11    Error,
12    ErrorKind,
13    Read,
14    Result,
15    Seek,
16    SeekFrom,
17};
18
19use crate::Buffer;
20use crate::ReadExt;
21use crate::buffered::DEFAULT_BUFFER_CAPACITY;
22
23/// Buffered byte input over a wrapped reader.
24///
25/// This type owns a wrapped input object and an internal byte buffer. It keeps
26/// unread bytes in `buffer[position..limit]` so callers can inspect or consume
27/// the current byte window before refilling it.
28///
29/// `BufferedByteInput` is deliberately byte-oriented. It performs no binary
30/// decoding, text decoding, or record parsing; higher-level stream adapters can
31/// build those concerns on top of [`Self::unread_slice`],
32/// [`Self::unread_raw_parts`], [`Self::ensure_available`], and
33/// [`Self::read_into_unchecked`]. The type also implements [`BufRead`] for
34/// callers that want the standard buffered-read interface.
35#[derive(Debug)]
36pub struct BufferedByteInput<R> {
37    inner: R,
38    buffer: Buffer<u8>,
39}
40
41impl<R> BufferedByteInput<R> {
42    /// Creates a buffered byte input with the default capacity.
43    ///
44    /// # Arguments
45    ///
46    /// * `inner` - The input object wrapped by this buffer.
47    ///
48    /// # Returns
49    ///
50    /// A new buffered byte input whose internal buffer has at least
51    /// `DEFAULT_BUFFER_CAPACITY` bytes.
52    #[inline(always)]
53    #[must_use]
54    pub fn new(inner: R) -> Self {
55        Self::with_capacity(inner, DEFAULT_BUFFER_CAPACITY)
56    }
57
58    /// Creates a buffered byte input with at least the requested capacity.
59    ///
60    /// The actual capacity is raised to `1` when the requested value is `0`.
61    ///
62    /// # Arguments
63    ///
64    /// * `inner` - The input object wrapped by this buffer.
65    /// * `capacity` - The requested internal buffer capacity, in bytes.
66    ///
67    /// # Returns
68    ///
69    /// A new buffered byte input whose internal buffer capacity is
70    /// `capacity.max(1)`.
71    #[inline]
72    #[must_use]
73    pub fn with_capacity(inner: R, capacity: usize) -> Self {
74        Self {
75            inner,
76            buffer: Buffer::with_capacity(capacity),
77        }
78    }
79
80    /// Returns a shared reference to the wrapped input object.
81    ///
82    /// # Returns
83    ///
84    /// A shared reference to the inner input object.
85    #[inline(always)]
86    pub const fn inner(&self) -> &R {
87        &self.inner
88    }
89
90    /// Returns an exclusive reference to the wrapped input object.
91    ///
92    /// Mutating the wrapped object directly may invalidate assumptions about
93    /// bytes already buffered by this value.
94    ///
95    /// # Returns
96    ///
97    /// An exclusive reference to the wrapped input object.
98    #[inline(always)]
99    pub fn inner_mut(&mut self) -> &mut R {
100        &mut self.inner
101    }
102
103    /// Consumes this buffered input and returns the wrapped input object plus
104    /// unread bytes.
105    ///
106    /// This method performs no I/O. Bytes that have already been read from the
107    /// wrapped input but not consumed by this buffered input are returned as
108    /// the second tuple item.
109    ///
110    /// # Returns
111    ///
112    /// The wrapped input object and a vector containing the unread buffered
113    /// bytes in logical read order.
114    #[inline(always)]
115    #[must_use]
116    pub fn into_parts(self) -> (R, Vec<u8>) {
117        let unread = self.unread_slice().to_vec();
118        (self.inner, unread)
119    }
120
121    /// Returns the internal buffer capacity.
122    ///
123    /// # Returns
124    ///
125    /// The total number of bytes that can be held by the internal buffer.
126    #[inline(always)]
127    #[must_use]
128    pub fn capacity(&self) -> usize {
129        self.buffer.capacity()
130    }
131
132    /// Returns the number of unread bytes currently buffered.
133    ///
134    /// # Returns
135    ///
136    /// The length of `buffer[position..limit]`, in bytes.
137    #[inline(always)]
138    #[must_use]
139    pub fn available(&self) -> usize {
140        self.buffer.available()
141    }
142
143    /// Returns the currently buffered unread bytes.
144    ///
145    /// # Returns
146    ///
147    /// The unread range `buffer[position..limit]`.
148    #[inline(always)]
149    #[must_use]
150    pub fn unread_slice(&self) -> &[u8] {
151        &self.buffer.data()[self.buffer.position()..self.buffer.limit()]
152    }
153
154    /// Returns raw unread-buffer parts for hot-path callers.
155    ///
156    /// The returned slice is the full internal backing storage. `index` is the
157    /// start of the unread byte window, and `count` is the number of unread
158    /// bytes. Callers that need a slice can use `&buffer[index..index +
159    /// count]`; callers that already validated bounds can pass `buffer` and
160    /// `index` directly to indexed unchecked codecs.
161    ///
162    /// # Returns
163    ///
164    /// The backing storage, the unread start index, and the unread byte count.
165    #[inline(always)]
166    #[must_use]
167    pub fn unread_raw_parts(&self) -> (&[u8], usize, usize) {
168        (
169            self.buffer.data(),
170            self.buffer.position(),
171            self.buffer.available(),
172        )
173    }
174
175    /// Advances the unread cursor by `count` bytes.
176    ///
177    /// # Parameters
178    ///
179    /// * `count` - Number of currently unread bytes to consume.
180    ///
181    /// # Panics
182    ///
183    /// Panics when `count` exceeds [`Self::available`].
184    #[inline(always)]
185    pub fn consume(&mut self, count: usize) {
186        assert!(
187            count <= self.available(),
188            "cannot consume beyond buffered input"
189        );
190        // SAFETY: The assertion proves that `count` is within the readable
191        // input window.
192        unsafe {
193            self.buffer.consume_unchecked(count);
194        }
195    }
196
197    /// Advances the unread cursor without checking bounds.
198    ///
199    /// # Parameters
200    ///
201    /// * `count` - Number of currently unread bytes to consume.
202    ///
203    /// # Safety
204    ///
205    /// The caller must guarantee that `count <= self.available()`.
206    #[inline(always)]
207    pub unsafe fn consume_unchecked(&mut self, count: usize) {
208        // SAFETY: The caller guarantees that `count` is within the readable
209        // input window.
210        unsafe {
211            self.buffer.consume_unchecked(count);
212        }
213    }
214
215    /// Returns the unused capacity at the end of the buffer.
216    ///
217    /// # Returns
218    ///
219    /// The number of writable bytes in `buffer[limit..]`.
220    #[inline(always)]
221    fn tail_capacity(&self) -> usize {
222        self.buffer.spare_capacity()
223    }
224
225    /// Invalidates all buffered bytes.
226    ///
227    /// After this call, the buffer is considered empty and subsequent reads
228    /// will refill it from the wrapped reader.
229    #[inline(always)]
230    fn discard_buffer(&mut self) {
231        self.buffer.clear();
232    }
233
234    /// Moves unread bytes to the front of the buffer.
235    ///
236    /// This preserves the unread range while reclaiming tail capacity for
237    /// future reads. If there are no unread bytes, the buffer is discarded.
238    #[inline(always)]
239    fn backshift(&mut self) {
240        self.buffer.compact();
241    }
242}
243
244impl<R> BufferedByteInput<R>
245where
246    R: Read,
247{
248    /// Appends one more chunk from the wrapped reader to the internal buffer.
249    ///
250    /// This method reads into `buffer[limit..]` and advances `limit` by the
251    /// number of bytes read. It retries automatically when the wrapped reader
252    /// returns [`ErrorKind::Interrupted`].
253    ///
254    /// # Returns
255    ///
256    /// `Ok(true)` if at least one byte was appended, or `Ok(false)` if the
257    /// wrapped reader reached EOF.
258    ///
259    /// # Errors
260    ///
261    /// Returns any non-interrupted I/O error produced by the wrapped reader.
262    /// Returns [`ErrorKind::InvalidData`] if the wrapped reader reports more
263    /// bytes than the spare buffer range could hold.
264    fn read_more(&mut self) -> Result<bool> {
265        let count = self.tail_capacity();
266        debug_assert!(count > 0, "buffer has no tail capacity");
267        loop {
268            let limit = self.buffer.limit();
269            // SAFETY: `limit` is always within `buffer`, and `count` is the
270            // remaining capacity from `limit` to the end of `buffer`.
271            match unsafe {
272                self.inner
273                    .read_unchecked(self.buffer.data_mut(), limit, count)
274            } {
275                Ok(0) => return Ok(false),
276                Ok(read) => {
277                    validate_read_count(read, count)?;
278                    // SAFETY: `read_unchecked` returns a count in
279                    // `0..=count`, and `count` was the spare capacity.
280                    unsafe {
281                        self.buffer.advance_unchecked(read);
282                    }
283                    return Ok(true);
284                }
285                Err(error) if error.kind() == ErrorKind::Interrupted => {
286                    continue;
287                }
288                Err(error) => return Err(error),
289            }
290        }
291    }
292
293    /// Refills the internal buffer after preserving unread bytes.
294    ///
295    /// Consumed bytes may be discarded, and unread bytes may be moved to the
296    /// front of the buffer before the wrapped reader is called.
297    ///
298    /// # Returns
299    ///
300    /// `Ok(true)` if at least one byte was appended, or `Ok(false)` at EOF.
301    ///
302    /// # Errors
303    ///
304    /// Returns any non-interrupted I/O error produced by the wrapped reader.
305    pub fn fill_more(&mut self) -> Result<bool> {
306        if self.available() == 0 {
307            self.discard_buffer();
308        } else if self.tail_capacity() == 0 {
309            self.backshift();
310        }
311        self.read_more()
312    }
313
314    /// Refills the buffer until at least `count` unread bytes are available.
315    ///
316    /// This method may discard consumed bytes or move unread bytes to the front
317    /// of the buffer before reading more data. It stops as soon as the unread
318    /// window reaches `count` bytes or the wrapped reader reaches EOF.
319    ///
320    /// # Parameters
321    ///
322    /// * `count` - Minimum number of unread bytes required.
323    ///
324    /// # Returns
325    ///
326    /// `Ok(true)` if at least `count` unread bytes are buffered. `Ok(false)`
327    /// means EOF was reached before the requested byte count became available.
328    ///
329    /// # Errors
330    ///
331    /// Returns [`ErrorKind::InvalidInput`] when `count` exceeds the internal
332    /// buffer capacity. Returns [`ErrorKind::InvalidData`] if the wrapped
333    /// reader reports more bytes than the spare buffer range could hold.
334    /// Returns any non-interrupted I/O error produced by the wrapped reader
335    /// while refilling the buffer.
336    #[inline]
337    pub fn fill_until(&mut self, count: usize) -> Result<bool> {
338        if count > self.capacity() {
339            return Err(Error::new(
340                ErrorKind::InvalidInput,
341                "requested available bytes exceed buffered input capacity",
342            ));
343        }
344        while self.available() < count {
345            let available = self.available();
346            if available == 0 {
347                self.discard_buffer();
348            } else {
349                let missing = count - available;
350                if self.tail_capacity() < missing {
351                    self.backshift();
352                }
353            }
354            if !self.read_more()? {
355                return Ok(false);
356            }
357        }
358        Ok(true)
359    }
360
361    /// Ensures that at least `count` unread bytes are available.
362    ///
363    /// Unlike [`Self::fill_until`], this method treats EOF before the requested
364    /// byte count as [`ErrorKind::UnexpectedEof`]. Any partial bytes buffered
365    /// before EOF are consumed so callers observe the same logical position as
366    /// a failed exact read.
367    ///
368    /// # Parameters
369    ///
370    /// * `count` - Minimum number of unread bytes required.
371    ///
372    /// # Errors
373    ///
374    /// Returns [`ErrorKind::UnexpectedEof`] if EOF is reached before `count`
375    /// bytes are available. Returns [`ErrorKind::InvalidInput`] when `count`
376    /// exceeds the internal buffer capacity. Returns [`ErrorKind::InvalidData`]
377    /// if the wrapped reader reports more bytes than the spare buffer range
378    /// could hold. Returns any non-interrupted I/O error produced by the
379    /// wrapped reader while refilling the buffer.
380    #[inline]
381    pub fn ensure_available(&mut self, count: usize) -> Result<()> {
382        if self.fill_until(count)? {
383            return Ok(());
384        }
385        let available = self.available();
386        // SAFETY: `available` is the current readable byte count.
387        unsafe {
388            self.consume_unchecked(available);
389        }
390        Err(Error::new(
391            ErrorKind::UnexpectedEof,
392            "failed to fill whole buffer",
393        ))
394    }
395
396    /// Reads bytes through the internal buffer into an indexed output range.
397    ///
398    /// If the internal buffer is empty and `count` is at least as large as the
399    /// internal buffer capacity, the read is delegated directly to the wrapped
400    /// reader to avoid an unnecessary copy. Otherwise, bytes are served from
401    /// the internal buffer.
402    ///
403    /// # Arguments
404    ///
405    /// * `output` - Destination storage that receives bytes.
406    /// * `output_index` - Start index inside `output`.
407    /// * `count` - Maximum number of bytes to read.
408    ///
409    /// # Returns
410    ///
411    /// The number of bytes written into `output[output_index..output_index +
412    /// count]`. A return value of `0` means that `count` was zero or EOF was
413    /// reached before any bytes were read.
414    ///
415    /// # Errors
416    ///
417    /// Returns any I/O error produced by the wrapped reader. Returns
418    /// [`ErrorKind::InvalidData`] if the wrapped reader reports more bytes
419    /// than the requested destination range could hold. Interrupted reads are
420    /// retried when the method refills the internal buffer through
421    /// `read_more`; direct delegated reads follow the wrapped reader's own
422    /// [`Read::read`] behavior.
423    ///
424    /// # Safety
425    ///
426    /// The caller must guarantee that `output_index..output_index + count` is
427    /// a valid range inside `output` and that the addition does not overflow.
428    #[inline(always)]
429    pub unsafe fn read_into_unchecked(
430        &mut self,
431        output: &mut [u8],
432        output_index: usize,
433        count: usize,
434    ) -> Result<usize> {
435        debug_assert!(
436            output_index
437                .checked_add(count)
438                .is_some_and(|end| end <= output.len()),
439            "unchecked read output range exceeds destination buffer"
440        );
441        if count == 0 {
442            return Ok(0);
443        }
444        if self.available() == 0 {
445            self.discard_buffer();
446            if count >= self.buffer.capacity() {
447                // SAFETY: The caller guarantees that the target range is valid.
448                let read = unsafe {
449                    self.inner.read_unchecked(output, output_index, count)
450                }?;
451                validate_read_count(read, count)?;
452                return Ok(read);
453            }
454            if !self.read_more()? {
455                return Ok(0);
456            }
457        }
458        let read_count = count.min(self.available());
459        // SAFETY: `read_count` is bounded by the caller-provided output range
460        // and the available input range.
461        unsafe {
462            self.buffer
463                .copy_to_unchecked(output, output_index, read_count);
464        }
465        Ok(read_count)
466    }
467
468    /// Seeks the wrapped reader and discards buffered bytes after success.
469    ///
470    /// For [`SeekFrom::Current`], the offset is adjusted by the number of
471    /// unread bytes already buffered, so seeking is relative to the logical
472    /// position observed by callers of this buffered input.
473    ///
474    /// # Arguments
475    ///
476    /// * `position` - The target seek position.
477    ///
478    /// # Returns
479    ///
480    /// The new absolute stream position reported by the wrapped reader.
481    ///
482    /// # Errors
483    ///
484    /// Returns [`ErrorKind::InvalidInput`] if a [`SeekFrom::Current`] offset
485    /// cannot be adjusted by the unread buffered byte count. Returns any seek
486    /// error produced by the wrapped reader.
487    fn seek_logical(&mut self, position: SeekFrom) -> Result<u64>
488    where
489        R: Seek,
490    {
491        let position = match position {
492            SeekFrom::Current(offset) => {
493                // `buffer` is a `Vec<u8>`, whose maximum allocation size fits
494                // in `isize`; that always fits in `i64`.
495                let unread = self.available() as i64;
496                let adjusted = offset.checked_sub(unread).ok_or_else(|| {
497                    Error::new(
498                        ErrorKind::InvalidInput,
499                        "current seek offset underflows after buffered adjustment",
500                    )
501                })?;
502                self.inner.seek(SeekFrom::Current(adjusted))
503            }
504            other => self.inner.seek(other),
505        }?;
506        self.discard_buffer();
507        Ok(position)
508    }
509}
510
511impl<R> Read for BufferedByteInput<R>
512where
513    R: Read,
514{
515    /// Reads bytes through the internal buffer.
516    ///
517    /// # Arguments
518    ///
519    /// * `output` - Destination slice that receives the bytes read.
520    ///
521    /// # Returns
522    ///
523    /// The number of bytes written to `output`.
524    ///
525    /// # Errors
526    ///
527    /// Returns any I/O error produced by the wrapped reader.
528    #[inline(always)]
529    fn read(&mut self, output: &mut [u8]) -> Result<usize> {
530        // SAFETY: The full output slice is a valid writable range.
531        unsafe { self.read_into_unchecked(output, 0, output.len()) }
532    }
533}
534
535impl<R> BufRead for BufferedByteInput<R>
536where
537    R: Read,
538{
539    /// Returns the currently buffered unread bytes, refilling when empty.
540    #[inline]
541    fn fill_buf(&mut self) -> Result<&[u8]> {
542        if self.available() == 0 {
543            self.discard_buffer();
544            if !self.read_more()? {
545                return Ok(&[]);
546            }
547        }
548        Ok(self.unread_slice())
549    }
550
551    /// Consumes `amount` bytes from the unread byte window.
552    #[inline(always)]
553    fn consume(&mut self, amount: usize) {
554        BufferedByteInput::consume(self, amount);
555    }
556}
557
558impl<R> Seek for BufferedByteInput<R>
559where
560    R: Read + Seek,
561{
562    /// Seeks the wrapped reader and discards buffered bytes after success.
563    #[inline(always)]
564    fn seek(&mut self, position: SeekFrom) -> Result<u64> {
565        self.seek_logical(position)
566    }
567}
568
569/// Validates a byte count returned by a wrapped reader.
570///
571/// # Parameters
572///
573/// * `read` - Byte count reported by the wrapped reader.
574/// * `requested` - Maximum byte count requested from the wrapped reader.
575///
576/// # Errors
577///
578/// Returns [`ErrorKind::InvalidData`] when the wrapped reader reports more
579/// bytes than the destination range could hold.
580#[inline(always)]
581fn validate_read_count(read: usize, requested: usize) -> Result<()> {
582    if read > requested {
583        return Err(Error::new(
584            ErrorKind::InvalidData,
585            format!(
586                "reader reported {read} bytes for a {requested}-byte buffer"
587            ),
588        ));
589    }
590    Ok(())
591}