qubit_io/util/streams.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::cmp::Ordering;
11use std::io::{
12 Error,
13 ErrorKind,
14 Read,
15 Result,
16 Write,
17 copy,
18};
19
20use crate::ReadExt;
21
22/// Default buffer size used by stream copy operations.
23const COPY_BUFFER_SIZE: usize = 16 * 1024;
24
25/// Buffer size used by stream comparison operations.
26const COMPARE_BUFFER_SIZE: usize = 16 * 1024;
27
28/// Stream utility namespace.
29///
30/// This type is an uninstantiable namespace for operations involving one or
31/// more [`Read`] or [`Write`] values. The methods do not close or flush the
32/// supplied streams unless the underlying standard-library operation documents
33/// otherwise.
34///
35/// # Examples
36/// ```
37/// use qubit_io::Streams;
38/// use std::io::Cursor;
39///
40/// let mut input = Cursor::new(b"abcdef".to_vec());
41/// let mut output = Vec::new();
42///
43/// let copied = Streams::copy_at_most(&mut input, &mut output, 4)?;
44///
45/// assert_eq!(4, copied);
46/// assert_eq!(b"abcd", output.as_slice());
47/// # Ok::<(), std::io::Error>(())
48/// ```
49pub enum Streams {}
50
51impl Streams {
52 /// Copies all remaining bytes from `reader` to `writer`.
53 ///
54 /// This is a namespace-style wrapper around [`std::io::copy`]. It preserves
55 /// the standard-library behavior, including platform-specific optimized
56 /// copy paths when available.
57 ///
58 /// # Parameters
59 /// - `reader`: Source reader.
60 /// - `writer`: Destination writer.
61 ///
62 /// # Returns
63 /// The number of bytes copied.
64 ///
65 /// # Errors
66 /// Returns the first read or write error reported by the underlying
67 /// streams, using the same error behavior as [`std::io::copy`].
68 #[inline]
69 pub fn copy<R, W>(reader: &mut R, writer: &mut W) -> Result<u64>
70 where
71 R: Read + ?Sized,
72 W: Write + ?Sized,
73 {
74 copy(reader, writer)
75 }
76
77 /// Copies at most `max_bytes` bytes from `reader` to `writer`.
78 ///
79 /// This method stops successfully when either EOF is reached or
80 /// `max_bytes` bytes have been copied. It does not close or flush either
81 /// stream.
82 ///
83 /// # Parameters
84 /// - `reader`: Source reader.
85 /// - `writer`: Destination writer.
86 /// - `max_bytes`: Maximum number of bytes to copy.
87 ///
88 /// # Returns
89 /// The number of bytes copied.
90 ///
91 /// # Errors
92 /// Returns the first non-interrupted read error or write error reported by
93 /// the underlying streams. Interrupted reads are retried.
94 #[inline]
95 pub fn copy_at_most<R, W>(reader: &mut R, writer: &mut W, max_bytes: u64) -> Result<u64>
96 where
97 R: Read + ?Sized,
98 W: Write + ?Sized,
99 {
100 let mut reader = reader;
101 let mut writer = writer;
102 copy_at_most_impl(&mut reader, &mut writer, max_bytes)
103 }
104
105 /// Copies the remaining input if its total length is at most `max_bytes`.
106 ///
107 /// This method copies from the current reader position until EOF. If EOF is
108 /// not reached within `max_bytes` bytes, it returns
109 /// [`std::io::ErrorKind::InvalidData`]. Detecting oversized input consumes
110 /// one excess byte from `reader`; that excess byte is not written to
111 /// `writer`.
112 ///
113 /// # Parameters
114 /// - `reader`: Source reader.
115 /// - `writer`: Destination writer.
116 /// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
117 ///
118 /// # Returns
119 /// The number of bytes copied when EOF is reached within the limit.
120 ///
121 /// # Errors
122 /// Returns [`std::io::ErrorKind::InvalidData`] when the remaining input is
123 /// longer than `max_bytes`. Returns the first non-interrupted read error or
124 /// write error reported by the underlying streams. Interrupted reads are
125 /// retried.
126 #[inline]
127 pub fn copy_to_end_limited<R, W>(reader: &mut R, writer: &mut W, max_bytes: u64) -> Result<u64>
128 where
129 R: Read + ?Sized,
130 W: Write + ?Sized,
131 {
132 let mut reader = reader;
133 let mut writer = writer;
134 copy_to_end_limited_impl(&mut reader, &mut writer, max_bytes)
135 }
136
137 /// Tests whether two readable streams have equal remaining contents.
138 ///
139 /// The comparison starts at each reader's current position and consumes
140 /// both streams until a difference or EOF is found.
141 ///
142 /// # Parameters
143 /// - `left`: First stream.
144 /// - `right`: Second stream.
145 ///
146 /// # Returns
147 /// `true` when both streams produce the same bytes until EOF.
148 ///
149 /// # Errors
150 /// Returns the first read error reported by either stream.
151 #[inline]
152 pub fn content_eq(left: &mut dyn Read, right: &mut dyn Read) -> Result<bool> {
153 Ok(Self::compare_content(left, right)? == Ordering::Equal)
154 }
155
156 /// Lexicographically compares the remaining contents of two readable
157 /// streams.
158 ///
159 /// The comparison starts at each reader's current position and consumes
160 /// both streams until a difference or EOF is found.
161 ///
162 /// # Parameters
163 /// - `left`: First stream.
164 /// - `right`: Second stream.
165 ///
166 /// # Returns
167 /// The lexicographic ordering of the remaining bytes.
168 ///
169 /// # Errors
170 /// Returns the first read error reported by either stream.
171 pub fn compare_content(left: &mut dyn Read, right: &mut dyn Read) -> Result<Ordering> {
172 let mut left_buffer = [0; COMPARE_BUFFER_SIZE];
173 let mut right_buffer = [0; COMPARE_BUFFER_SIZE];
174 loop {
175 let left_count = left.read_exact_or_eof(&mut left_buffer)?;
176 let right_count = right.read_exact_or_eof(&mut right_buffer)?;
177 let n = left_count.min(right_count);
178 for index in 0..n {
179 match left_buffer[index].cmp(&right_buffer[index]) {
180 Ordering::Equal => {}
181 ordering => return Ok(ordering),
182 }
183 }
184 match left_count.cmp(&right_count) {
185 Ordering::Equal if left_count == 0 => return Ok(Ordering::Equal),
186 Ordering::Equal => {}
187 ordering => return Ok(ordering),
188 }
189 }
190 }
191}
192
193/// Copies at most `max_bytes` bytes using trait-object I/O endpoints.
194///
195/// # Parameters
196/// - `reader`: Source reader.
197/// - `writer`: Destination writer.
198/// - `max_bytes`: Maximum number of bytes to copy.
199///
200/// # Returns
201/// The number of bytes copied.
202///
203/// # Errors
204/// Returns the first non-interrupted read error or write error reported by the
205/// underlying streams. Interrupted reads are retried.
206fn copy_at_most_impl(reader: &mut dyn Read, writer: &mut dyn Write, max_bytes: u64) -> Result<u64> {
207 let mut buffer = [0; COPY_BUFFER_SIZE];
208 let mut remaining = max_bytes;
209 let mut copied = 0;
210 while remaining > 0 {
211 let requested = remaining.min(COPY_BUFFER_SIZE as u64) as usize;
212 match reader.read(&mut buffer[..requested]) {
213 Ok(0) => break,
214 Ok(count) => {
215 writer.write_all(&buffer[..count])?;
216 let count = count as u64;
217 remaining -= count;
218 copied += count;
219 }
220 Err(error) => {
221 if error.kind() == ErrorKind::Interrupted {
222 continue;
223 }
224 return Err(error);
225 }
226 }
227 }
228 Ok(copied)
229}
230
231/// Copies the remaining input through trait-object endpoints when it fits.
232///
233/// # Parameters
234/// - `reader`: Source reader.
235/// - `writer`: Destination writer.
236/// - `max_bytes`: Maximum accepted number of bytes in the remaining input.
237///
238/// # Returns
239/// The number of bytes copied when EOF is reached within the limit.
240///
241/// # Errors
242/// Returns [`ErrorKind::InvalidData`] when the remaining input is longer than
243/// `max_bytes`. Returns the first non-interrupted read error or write error
244/// reported by the underlying streams. Interrupted reads are retried.
245fn copy_to_end_limited_impl(
246 reader: &mut dyn Read,
247 writer: &mut dyn Write,
248 max_bytes: u64,
249) -> Result<u64> {
250 let copied = copy_at_most_impl(reader, writer, max_bytes)?;
251 if copied < max_bytes {
252 return Ok(copied);
253 }
254 if has_more_input(reader)? {
255 return Err(Error::new(
256 ErrorKind::InvalidData,
257 format!("input exceeds maximum length of {max_bytes} bytes"),
258 ));
259 }
260 Ok(copied)
261}
262
263/// Returns whether `reader` has at least one more byte.
264///
265/// # Parameters
266/// - `reader`: Source reader to probe.
267///
268/// # Returns
269/// `true` when one extra byte was read, or `false` when EOF was reached.
270///
271/// # Errors
272/// Returns the first non-interrupted read error reported by `reader`.
273fn has_more_input(reader: &mut dyn Read) -> Result<bool> {
274 let mut byte = [0];
275 loop {
276 match reader.read(&mut byte) {
277 Ok(0) => return Ok(false),
278 Ok(_) => return Ok(true),
279 Err(error) => {
280 if error.kind() == ErrorKind::Interrupted {
281 continue;
282 }
283 return Err(error);
284 }
285 }
286 }
287}