Skip to main content

qubit_io/util/
streams.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::cmp::Ordering;
11use std::io::{
12    Error,
13    ErrorKind,
14    Read,
15    Result,
16    Write,
17    copy,
18};
19
20use crate::ReadExt;
21
22/// Default buffer size used by stream copy operations.
23const COPY_BUFFER_SIZE: usize = 16 * 1024;
24
25/// Buffer size used by stream comparison operations.
26const COMPARE_BUFFER_SIZE: usize = 16 * 1024;
27
28/// Stream utility namespace.
29///
30/// This type is an uninstantiable namespace for operations involving one or
31/// more [`Read`] or [`Write`] values. The methods do not close or flush the
32/// supplied streams unless the underlying standard-library operation documents
33/// otherwise.
34///
35/// # Examples
36/// ```
37/// use qubit_io::Streams;
38/// use std::io::Cursor;
39///
40/// let mut input = Cursor::new(b"abcdef".to_vec());
41/// let mut output = Vec::new();
42///
43/// let copied = Streams::copy_at_most(&mut input, &mut output, 4)?;
44///
45/// assert_eq!(4, copied);
46/// assert_eq!(b"abcd", output.as_slice());
47/// # Ok::<(), std::io::Error>(())
48/// ```
49pub enum Streams {}
50
51impl Streams {
52    /// Copies all remaining bytes from `reader` to `writer`.
53    ///
54    /// This is a namespace-style wrapper around [`std::io::copy`]. It preserves
55    /// the standard-library behavior, including platform-specific optimized
56    /// copy paths when available.
57    ///
58    /// # Parameters
59    /// - `reader`: Source reader.
60    /// - `writer`: Destination writer.
61    ///
62    /// # Returns
63    /// The number of bytes copied.
64    ///
65    /// # Errors
66    /// Returns the first read or write error reported by the underlying
67    /// streams, using the same error behavior as [`std::io::copy`].
68    #[inline]
69    pub fn copy<R, W>(reader: &mut R, writer: &mut W) -> Result<u64>
70    where
71        R: Read + ?Sized,
72        W: Write + ?Sized,
73    {
74        copy(reader, writer)
75    }
76
77    /// Copies at most `max_bytes` bytes from `reader` to `writer`.
78    ///
79    /// This method stops successfully when either EOF is reached or
80    /// `max_bytes` bytes have been copied. It does not close or flush either
81    /// stream.
82    ///
83    /// # Parameters
84    /// - `reader`: Source reader.
85    /// - `writer`: Destination writer.
86    /// - `max_bytes`: Maximum number of bytes to copy.
87    ///
88    /// # Returns
89    /// The number of bytes copied.
90    ///
91    /// # Errors
92    /// Returns the first non-interrupted read error or write error reported by
93    /// the underlying streams. Interrupted reads are retried.
94    #[inline]
95    pub fn copy_at_most<R, W>(reader: &mut R, writer: &mut W, max_bytes: u64) -> Result<u64>
96    where
97        R: Read + ?Sized,
98        W: Write + ?Sized,
99    {
100        let mut reader = reader;
101        let mut writer = writer;
102        copy_at_most_impl(&mut reader, &mut writer, max_bytes)
103    }
104
105    /// Copies the remaining input if its total length is at most `max_bytes`.
106    ///
107    /// This method copies from the current reader position until EOF. If EOF is
108    /// not reached within `max_bytes` bytes, it returns
109    /// [`std::io::ErrorKind::InvalidData`]. Detecting oversized input consumes
110    /// one excess byte from `reader`; that excess byte is not written to
111    /// `writer`.
112    ///
113    /// # Parameters
114    /// - `reader`: Source reader.
115    /// - `writer`: Destination writer.
116    /// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
117    ///
118    /// # Returns
119    /// The number of bytes copied when EOF is reached within the limit.
120    ///
121    /// # Errors
122    /// Returns [`std::io::ErrorKind::InvalidData`] when the remaining input is
123    /// longer than `max_bytes`. Returns the first non-interrupted read error or
124    /// write error reported by the underlying streams. Interrupted reads are
125    /// retried.
126    #[inline]
127    pub fn copy_to_end_limited<R, W>(reader: &mut R, writer: &mut W, max_bytes: u64) -> Result<u64>
128    where
129        R: Read + ?Sized,
130        W: Write + ?Sized,
131    {
132        let mut reader = reader;
133        let mut writer = writer;
134        copy_to_end_limited_impl(&mut reader, &mut writer, max_bytes)
135    }
136
137    /// Tests whether two readable streams have equal remaining contents.
138    ///
139    /// The comparison starts at each reader's current position and consumes
140    /// both streams until a difference or EOF is found.
141    ///
142    /// # Parameters
143    /// - `left`: First stream.
144    /// - `right`: Second stream.
145    ///
146    /// # Returns
147    /// `true` when both streams produce the same bytes until EOF.
148    ///
149    /// # Errors
150    /// Returns the first read error reported by either stream.
151    #[inline]
152    pub fn content_eq(left: &mut dyn Read, right: &mut dyn Read) -> Result<bool> {
153        Ok(Self::compare_content(left, right)? == Ordering::Equal)
154    }
155
156    /// Lexicographically compares the remaining contents of two readable
157    /// streams.
158    ///
159    /// The comparison starts at each reader's current position and consumes
160    /// both streams until a difference or EOF is found.
161    ///
162    /// # Parameters
163    /// - `left`: First stream.
164    /// - `right`: Second stream.
165    ///
166    /// # Returns
167    /// The lexicographic ordering of the remaining bytes.
168    ///
169    /// # Errors
170    /// Returns the first read error reported by either stream.
171    pub fn compare_content(left: &mut dyn Read, right: &mut dyn Read) -> Result<Ordering> {
172        let mut left_buffer = [0; COMPARE_BUFFER_SIZE];
173        let mut right_buffer = [0; COMPARE_BUFFER_SIZE];
174        loop {
175            let left_count = left.read_exact_or_eof(&mut left_buffer)?;
176            let right_count = right.read_exact_or_eof(&mut right_buffer)?;
177            let n = left_count.min(right_count);
178            for index in 0..n {
179                match left_buffer[index].cmp(&right_buffer[index]) {
180                    Ordering::Equal => {}
181                    ordering => return Ok(ordering),
182                }
183            }
184            match left_count.cmp(&right_count) {
185                Ordering::Equal if left_count == 0 => return Ok(Ordering::Equal),
186                Ordering::Equal => {}
187                ordering => return Ok(ordering),
188            }
189        }
190    }
191}
192
193/// Copies at most `max_bytes` bytes using trait-object I/O endpoints.
194///
195/// # Parameters
196/// - `reader`: Source reader.
197/// - `writer`: Destination writer.
198/// - `max_bytes`: Maximum number of bytes to copy.
199///
200/// # Returns
201/// The number of bytes copied.
202///
203/// # Errors
204/// Returns the first non-interrupted read error or write error reported by the
205/// underlying streams. Interrupted reads are retried.
206fn copy_at_most_impl(reader: &mut dyn Read, writer: &mut dyn Write, max_bytes: u64) -> Result<u64> {
207    let mut buffer = [0; COPY_BUFFER_SIZE];
208    let mut remaining = max_bytes;
209    let mut copied = 0;
210    while remaining > 0 {
211        let requested = remaining.min(COPY_BUFFER_SIZE as u64) as usize;
212        match reader.read(&mut buffer[..requested]) {
213            Ok(0) => break,
214            Ok(count) => {
215                writer.write_all(&buffer[..count])?;
216                let count = count as u64;
217                remaining -= count;
218                copied += count;
219            }
220            Err(error) => {
221                if error.kind() == ErrorKind::Interrupted {
222                    continue;
223                }
224                return Err(error);
225            }
226        }
227    }
228    Ok(copied)
229}
230
231/// Copies the remaining input through trait-object endpoints when it fits.
232///
233/// # Parameters
234/// - `reader`: Source reader.
235/// - `writer`: Destination writer.
236/// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
237///
238/// # Returns
239/// The number of bytes copied when EOF is reached within the limit.
240///
241/// # Errors
242/// Returns [`ErrorKind::InvalidData`] when the remaining input is longer than
243/// `max_bytes`. Returns the first non-interrupted read error or write error
244/// reported by the underlying streams. Interrupted reads are retried.
245fn copy_to_end_limited_impl(
246    reader: &mut dyn Read,
247    writer: &mut dyn Write,
248    max_bytes: u64,
249) -> Result<u64> {
250    let copied = copy_at_most_impl(reader, writer, max_bytes)?;
251    if copied < max_bytes {
252        return Ok(copied);
253    }
254    if has_more_input(reader)? {
255        return Err(Error::new(
256            ErrorKind::InvalidData,
257            format!("input exceeds maximum length of {max_bytes} bytes"),
258        ));
259    }
260    Ok(copied)
261}
262
263/// Returns whether `reader` has at least one more byte.
264///
265/// # Parameters
266/// - `reader`: Source reader to probe.
267///
268/// # Returns
269/// `true` when one extra byte was read, or `false` when EOF was reached.
270///
271/// # Errors
272/// Returns the first non-interrupted read error reported by `reader`.
273fn has_more_input(reader: &mut dyn Read) -> Result<bool> {
274    let mut byte = [0];
275    loop {
276        match reader.read(&mut byte) {
277            Ok(0) => return Ok(false),
278            Ok(_) => return Ok(true),
279            Err(error) => {
280                if error.kind() == ErrorKind::Interrupted {
281                    continue;
282                }
283                return Err(error);
284            }
285        }
286    }
287}