Skip to main content

qubit_io/buffered/
buffered_input.rs

1// =============================================================================
2//    Copyright (c) 2026 Haixing Hu.
3//
4//    SPDX-License-Identifier: Apache-2.0
5//
6//    Licensed under the Apache License, Version 2.0.
7// =============================================================================
8
9use std::io::{
10    BufRead,
11    Error,
12    ErrorKind,
13    Read,
14    Result,
15    Seek,
16    SeekFrom,
17};
18
19use crate::buffered::DEFAULT_BUFFER_CAPACITY;
20use crate::{
21    Buffer,
22    Input,
23};
24
25/// Buffered unit input over a wrapped input source.
26///
27/// This type owns a wrapped input object and an internal unit buffer. It keeps
28/// unread units in `buffer[position..limit]` so callers can inspect or consume
29/// the current unit window before refilling it.
30///
31/// `BufferedInput` is deliberately unit-oriented. It performs no binary
32/// decoding, text decoding, or record parsing; higher-level stream adapters can
33/// build those concerns on top of [`Self::unread_slice`],
34/// [`Self::unread_raw_parts`], [`Self::ensure_available`], and
35/// [`Self::read_into_unchecked`]. The type also implements [`BufRead`] for
36/// callers that want the standard buffered-read interface.
37#[derive(Debug)]
38pub struct BufferedInput<I>
39where
40    I: Input,
41    I::Item: Copy + Default,
42{
43    inner: I,
44    buffer: Buffer<I::Item>,
45}
46
47impl<I> BufferedInput<I>
48where
49    I: Input,
50    I::Item: Copy + Default,
51{
52    /// Creates a buffered unit input with the default capacity.
53    ///
54    /// # Arguments
55    ///
56    /// * `inner` - The input object wrapped by this buffer.
57    ///
58    /// # Returns
59    ///
60    /// A new buffered unit input whose internal buffer has at least
61    /// `DEFAULT_BUFFER_CAPACITY` units.
62    #[inline(always)]
63    #[must_use]
64    pub fn new(inner: I) -> Self {
65        Self::with_capacity(inner, DEFAULT_BUFFER_CAPACITY)
66    }
67
68    /// Creates a buffered unit input with at least the requested capacity.
69    ///
70    /// The actual capacity is raised to `1` when the requested value is `0`.
71    ///
72    /// # Arguments
73    ///
74    /// * `inner` - The input object wrapped by this buffer.
75    /// * `capacity` - The requested internal buffer capacity, in units.
76    ///
77    /// # Returns
78    ///
79    /// A new buffered unit input whose internal buffer capacity is
80    /// `capacity.max(1)`.
81    #[inline]
82    #[must_use]
83    pub fn with_capacity(inner: I, capacity: usize) -> Self {
84        Self {
85            inner,
86            buffer: Buffer::with_capacity(capacity),
87        }
88    }
89
90    /// Returns a shared reference to the wrapped input object.
91    ///
92    /// # Returns
93    ///
94    /// A shared reference to the inner input object.
95    #[inline(always)]
96    pub const fn inner(&self) -> &I {
97        &self.inner
98    }
99
100    /// Returns an exclusive reference to the wrapped input object.
101    ///
102    /// Mutating the wrapped object directly may invalidate assumptions about
103    /// units already buffered by this value.
104    ///
105    /// # Returns
106    ///
107    /// An exclusive reference to the wrapped input object.
108    #[inline(always)]
109    pub fn inner_mut(&mut self) -> &mut I {
110        &mut self.inner
111    }
112
113    /// Consumes this buffered input and returns the wrapped input object plus
114    /// unread bytes.
115    ///
116    /// This method performs no I/O. Units that have already been read from the
117    /// wrapped input but not consumed by this buffered input are returned as
118    /// the second tuple item.
119    ///
120    /// # Returns
121    ///
122    /// The wrapped input object and a vector containing the unread buffered
123    /// units in logical read order.
124    #[inline(always)]
125    #[must_use]
126    pub fn into_parts(self) -> (I, Vec<I::Item>) {
127        let unread = self.unread_slice().to_vec();
128        (self.inner, unread)
129    }
130
131    /// Returns the internal buffer capacity.
132    ///
133    /// # Returns
134    ///
135    /// The total number of units that can be held by the internal buffer.
136    #[inline(always)]
137    #[must_use]
138    pub fn capacity(&self) -> usize {
139        self.buffer.capacity()
140    }
141
142    /// Returns the number of unread units currently buffered.
143    ///
144    /// # Returns
145    ///
146    /// The length of `buffer[position..limit]`, in units.
147    #[inline(always)]
148    #[must_use]
149    pub fn available(&self) -> usize {
150        self.buffer.available()
151    }
152
153    /// Returns the currently buffered unread units.
154    ///
155    /// # Returns
156    ///
157    /// The unread range `buffer[position..limit]`.
158    #[inline(always)]
159    #[must_use]
160    pub fn unread_slice(&self) -> &[I::Item] {
161        &self.buffer.data()[self.buffer.position()..self.buffer.limit()]
162    }
163
164    /// Returns raw unread-buffer parts for hot-path callers.
165    ///
166    /// The returned slice is the full internal backing storage. `index` is the
167    /// start of the unread unit window, and `count` is the number of unread
168    /// units. Callers that need a slice can use `&buffer[index..index +
169    /// count]`; callers that already validated bounds can pass `buffer` and
170    /// `index` directly to indexed unchecked codecs.
171    ///
172    /// # Returns
173    ///
174    /// The backing storage, the unread start index, and the unread unit count.
175    #[inline(always)]
176    #[must_use]
177    pub fn unread_raw_parts(&self) -> (&[I::Item], usize, usize) {
178        (
179            self.buffer.data(),
180            self.buffer.position(),
181            self.buffer.available(),
182        )
183    }
184
185    /// Advances the unread cursor by `count` units.
186    ///
187    /// # Parameters
188    ///
189    /// * `count` - Number of currently unread units to consume.
190    ///
191    /// # Panics
192    ///
193    /// Panics when `count` exceeds [`Self::available`].
194    #[inline(always)]
195    pub fn consume(&mut self, count: usize) {
196        assert!(
197            count <= self.available(),
198            "cannot consume beyond buffered input"
199        );
200        // SAFETY: The assertion proves that `count` is within the readable
201        // input window.
202        unsafe {
203            self.buffer.consume_unchecked(count);
204        }
205    }
206
207    /// Advances the unread cursor without checking bounds.
208    ///
209    /// # Parameters
210    ///
211    /// * `count` - Number of currently unread bytes to consume.
212    ///
213    /// # Safety
214    ///
215    /// The caller must guarantee that `count <= self.available()`.
216    #[inline(always)]
217    pub unsafe fn consume_unchecked(&mut self, count: usize) {
218        // SAFETY: The caller guarantees that `count` is within the readable
219        // input window.
220        unsafe {
221            self.buffer.consume_unchecked(count);
222        }
223    }
224
225    /// Returns the unused capacity at the end of the buffer.
226    ///
227    /// # Returns
228    ///
229    /// The number of writable bytes in `buffer[limit..]`.
230    #[inline(always)]
231    fn tail_capacity(&self) -> usize {
232        self.buffer.spare_capacity()
233    }
234
235    /// Invalidates all buffered units.
236    ///
237    /// After this call, the buffer is considered empty and subsequent reads
238    /// will refill it from the wrapped input.
239    #[inline(always)]
240    fn discard_buffer(&mut self) {
241        self.buffer.clear();
242    }
243
244    /// Moves unread units to the front of the buffer.
245    ///
246    /// This preserves the unread range while reclaiming tail capacity for
247    /// future reads. If there are no unread bytes, the buffer is discarded.
248    #[inline(always)]
249    fn backshift(&mut self) {
250        self.buffer.compact();
251    }
252}
253
254impl<I> BufferedInput<I>
255where
256    I: Input,
257    I::Item: Copy + Default,
258{
259    /// Appends one more chunk from the wrapped reader to the internal buffer.
260    ///
261    /// This method reads into `buffer[limit..]` and advances `limit` by the
262    /// number of bytes read. It retries automatically when the wrapped reader
263    /// returns [`ErrorKind::Interrupted`].
264    ///
265    /// # Returns
266    ///
267    /// `Ok(true)` if at least one byte was appended, or `Ok(false)` if the
268    /// wrapped reader reached EOF.
269    ///
270    /// # Errors
271    ///
272    /// Returns any non-interrupted I/O error produced by the wrapped reader.
273    /// Returns [`ErrorKind::InvalidData`] if the wrapped reader reports more
274    /// bytes than the spare buffer range could hold.
275    fn read_more(&mut self) -> Result<bool> {
276        let count = self.tail_capacity();
277        debug_assert!(count > 0, "buffer has no tail capacity");
278        loop {
279            let limit = self.buffer.limit();
280            // SAFETY: `limit` is always within `buffer`, and `count` is the
281            // remaining capacity from `limit` to the end of `buffer`.
282            match unsafe {
283                self.inner
284                    .read_unchecked(self.buffer.data_mut(), limit, count)
285            } {
286                Ok(0) => return Ok(false),
287                Ok(read) => {
288                    validate_read_count(read, count)?;
289                    // SAFETY: `read_unchecked` returns a count in
290                    // `0..=count`, and `count` was the spare capacity.
291                    unsafe {
292                        self.buffer.advance_unchecked(read);
293                    }
294                    return Ok(true);
295                }
296                Err(error) if error.kind() == ErrorKind::Interrupted => {
297                    continue;
298                }
299                Err(error) => return Err(error),
300            }
301        }
302    }
303
304    /// Refills the internal buffer after preserving unread bytes.
305    ///
306    /// Consumed bytes may be discarded, and unread bytes may be moved to the
307    /// front of the buffer before the wrapped reader is called.
308    ///
309    /// # Returns
310    ///
311    /// `Ok(true)` if at least one byte was appended, or `Ok(false)` at EOF.
312    ///
313    /// # Errors
314    ///
315    /// Returns any non-interrupted I/O error produced by the wrapped reader.
316    pub fn fill_more(&mut self) -> Result<bool> {
317        if self.available() == 0 {
318            self.discard_buffer();
319        } else if self.tail_capacity() == 0 {
320            self.backshift();
321        }
322        self.read_more()
323    }
324
325    /// Refills the buffer until at least `count` unread bytes are available.
326    ///
327    /// This method may discard consumed bytes or move unread bytes to the front
328    /// of the buffer before reading more data. It stops as soon as the unread
329    /// window reaches `count` bytes or the wrapped reader reaches EOF.
330    ///
331    /// # Parameters
332    ///
333    /// * `count` - Minimum number of unread bytes required.
334    ///
335    /// # Returns
336    ///
337    /// `Ok(true)` if at least `count` unread bytes are buffered. `Ok(false)`
338    /// means EOF was reached before the requested byte count became available.
339    ///
340    /// # Errors
341    ///
342    /// Returns [`ErrorKind::InvalidInput`] when `count` exceeds the internal
343    /// buffer capacity. Returns [`ErrorKind::InvalidData`] if the wrapped
344    /// reader reports more bytes than the spare buffer range could hold.
345    /// Returns any non-interrupted I/O error produced by the wrapped reader
346    /// while refilling the buffer.
347    #[inline]
348    pub fn fill_until(&mut self, count: usize) -> Result<bool> {
349        if count > self.capacity() {
350            return Err(Error::new(
351                ErrorKind::InvalidInput,
352                "requested available bytes exceed buffered input capacity",
353            ));
354        }
355        while self.available() < count {
356            let available = self.available();
357            if available == 0 {
358                self.discard_buffer();
359            } else {
360                let missing = count - available;
361                if self.tail_capacity() < missing {
362                    self.backshift();
363                }
364            }
365            if !self.read_more()? {
366                return Ok(false);
367            }
368        }
369        Ok(true)
370    }
371
372    /// Ensures that at least `count` unread bytes are available.
373    ///
374    /// Unlike [`Self::fill_until`], this method treats EOF before the requested
375    /// byte count as [`ErrorKind::UnexpectedEof`]. Any partial bytes buffered
376    /// before EOF are consumed so callers observe the same logical position as
377    /// a failed exact read.
378    ///
379    /// # Parameters
380    ///
381    /// * `count` - Minimum number of unread bytes required.
382    ///
383    /// # Errors
384    ///
385    /// Returns [`ErrorKind::UnexpectedEof`] if EOF is reached before `count`
386    /// bytes are available. Returns [`ErrorKind::InvalidInput`] when `count`
387    /// exceeds the internal buffer capacity. Returns [`ErrorKind::InvalidData`]
388    /// if the wrapped reader reports more bytes than the spare buffer range
389    /// could hold. Returns any non-interrupted I/O error produced by the
390    /// wrapped reader while refilling the buffer.
391    #[inline]
392    pub fn ensure_available(&mut self, count: usize) -> Result<()> {
393        if self.fill_until(count)? {
394            return Ok(());
395        }
396        let available = self.available();
397        // SAFETY: `available` is the current readable byte count.
398        unsafe {
399            self.consume_unchecked(available);
400        }
401        Err(Error::new(
402            ErrorKind::UnexpectedEof,
403            "failed to fill whole buffer",
404        ))
405    }
406
407    /// Reads bytes through the internal buffer into an indexed output range.
408    ///
409    /// If the internal buffer is empty and `count` is at least as large as the
410    /// internal buffer capacity, the read is delegated directly to the wrapped
411    /// reader to avoid an unnecessary copy. Otherwise, bytes are served from
412    /// the internal buffer.
413    ///
414    /// # Arguments
415    ///
416    /// * `output` - Destination storage that receives bytes.
417    /// * `output_index` - Start index inside `output`.
418    /// * `count` - Maximum number of bytes to read.
419    ///
420    /// # Returns
421    ///
422    /// The number of bytes written into `output[output_index..output_index +
423    /// count]`. A return value of `0` means that `count` was zero or EOF was
424    /// reached before any bytes were read.
425    ///
426    /// # Errors
427    ///
428    /// Returns any I/O error produced by the wrapped reader. Returns
429    /// [`ErrorKind::InvalidData`] if the wrapped reader reports more bytes
430    /// than the requested destination range could hold. Interrupted reads are
431    /// retried when the method refills the internal buffer through
432    /// `read_more`; direct delegated reads follow the wrapped reader's own
433    /// [`Read::read`] behavior.
434    ///
435    /// # Safety
436    ///
437    /// The caller must guarantee that `output_index..output_index + count` is
438    /// a valid range inside `output` and that the addition does not overflow.
439    #[inline(always)]
440    pub unsafe fn read_into_unchecked(
441        &mut self,
442        output: &mut [I::Item],
443        output_index: usize,
444        count: usize,
445    ) -> Result<usize> {
446        debug_assert!(
447            output_index
448                .checked_add(count)
449                .is_some_and(|end| end <= output.len()),
450            "unchecked read output range exceeds destination buffer"
451        );
452        if count == 0 {
453            return Ok(0);
454        }
455        if self.available() == 0 {
456            self.discard_buffer();
457            if count >= self.buffer.capacity() {
458                // SAFETY: The caller guarantees that the target range is valid.
459                let read = unsafe {
460                    self.inner.read_unchecked(output, output_index, count)
461                }?;
462                validate_read_count(read, count)?;
463                return Ok(read);
464            }
465            if !self.read_more()? {
466                return Ok(0);
467            }
468        }
469        let read_count = count.min(self.available());
470        // SAFETY: `read_count` is bounded by the caller-provided output range
471        // and the available input range.
472        unsafe {
473            self.buffer
474                .copy_to_unchecked(output, output_index, read_count);
475        }
476        Ok(read_count)
477    }
478}
479
480impl<I> BufferedInput<I>
481where
482    I: Input<Item = u8>,
483{
484    /// Seeks the wrapped reader and discards buffered bytes after success.
485    ///
486    /// For [`SeekFrom::Current`], the offset is adjusted by the number of
487    /// unread bytes already buffered, so seeking is relative to the logical
488    /// position observed by callers of this buffered input.
489    ///
490    /// # Arguments
491    ///
492    /// * `position` - The target seek position.
493    ///
494    /// # Returns
495    ///
496    /// The new absolute stream position reported by the wrapped reader.
497    ///
498    /// # Errors
499    ///
500    /// Returns [`ErrorKind::InvalidInput`] if a [`SeekFrom::Current`] offset
501    /// cannot be adjusted by the unread buffered byte count. Returns any seek
502    /// error produced by the wrapped reader.
503    fn seek_logical(&mut self, position: SeekFrom) -> Result<u64>
504    where
505        I: Seek,
506    {
507        let position = match position {
508            SeekFrom::Current(offset) => {
509                // `buffer` is a `Vec<u8>`, whose maximum allocation size fits
510                // in `isize`; that always fits in `i64`.
511                let unread = self.available() as i64;
512                let adjusted = offset.checked_sub(unread).ok_or_else(|| {
513                    Error::new(
514                        ErrorKind::InvalidInput,
515                        "current seek offset underflows after buffered adjustment",
516                    )
517                })?;
518                self.inner.seek(SeekFrom::Current(adjusted))
519            }
520            other => self.inner.seek(other),
521        }?;
522        self.discard_buffer();
523        Ok(position)
524    }
525}
526
527impl<I> Read for BufferedInput<I>
528where
529    I: Input<Item = u8>,
530{
531    /// Reads bytes through the internal buffer.
532    ///
533    /// # Arguments
534    ///
535    /// * `output` - Destination slice that receives the bytes read.
536    ///
537    /// # Returns
538    ///
539    /// The number of bytes written to `output`.
540    ///
541    /// # Errors
542    ///
543    /// Returns any I/O error produced by the wrapped reader.
544    #[inline(always)]
545    fn read(&mut self, output: &mut [u8]) -> Result<usize> {
546        // SAFETY: The full output slice is a valid writable range.
547        unsafe { self.read_into_unchecked(output, 0, output.len()) }
548    }
549}
550
551impl<I> BufRead for BufferedInput<I>
552where
553    I: Input<Item = u8>,
554{
555    /// Returns the currently buffered unread bytes, refilling when empty.
556    #[inline]
557    fn fill_buf(&mut self) -> Result<&[u8]> {
558        if self.available() == 0 {
559            self.discard_buffer();
560            if !self.read_more()? {
561                return Ok(&[]);
562            }
563        }
564        Ok(self.unread_slice())
565    }
566
567    /// Consumes `amount` bytes from the unread byte window.
568    #[inline(always)]
569    fn consume(&mut self, amount: usize) {
570        BufferedInput::consume(self, amount);
571    }
572}
573
574impl<I> Seek for BufferedInput<I>
575where
576    I: Input<Item = u8> + Seek,
577{
578    /// Seeks the wrapped reader and discards buffered bytes after success.
579    #[inline(always)]
580    fn seek(&mut self, position: SeekFrom) -> Result<u64> {
581        self.seek_logical(position)
582    }
583}
584
585/// Validates a byte count returned by a wrapped reader.
586///
587/// # Parameters
588///
589/// * `read` - Byte count reported by the wrapped reader.
590/// * `requested` - Maximum byte count requested from the wrapped reader.
591///
592/// # Errors
593///
594/// Returns [`ErrorKind::InvalidData`] when the wrapped reader reports more
595/// bytes than the destination range could hold.
596#[inline(always)]
597fn validate_read_count(read: usize, requested: usize) -> Result<()> {
598    if read > requested {
599        return Err(Error::new(
600            ErrorKind::InvalidData,
601            format!(
602                "reader reported {read} bytes for a {requested}-byte buffer"
603            ),
604        ));
605    }
606    Ok(())
607}