Skip to main content

ax_io/read/
mod.rs

1#[cfg(feature = "alloc")]
2use alloc::{string::String, vec::Vec};
3use core::io::BorrowedCursor;
4
5use crate::{Chain, Error, Result, Take};
6
7mod impls;
8
9/// Default [`Read::read_exact`] implementation.
10pub fn default_read_exact<R: Read + ?Sized>(this: &mut R, mut buf: &mut [u8]) -> Result<()> {
11    while !buf.is_empty() {
12        match this.read(buf) {
13            Ok(0) => break,
14            Ok(n) => {
15                buf = &mut buf[n..];
16            }
17            Err(e) if e.canonicalize() == Error::Interrupted => continue,
18            Err(e) => return Err(e),
19        }
20    }
21    if !buf.is_empty() {
22        Err(Error::UnexpectedEof)
23    } else {
24        Ok(())
25    }
26}
27
28/// Default [`Read::read_buf`] implementation.
29pub fn default_read_buf<F>(read: F, mut cursor: BorrowedCursor<'_>) -> Result<()>
30where
31    F: FnOnce(&mut [u8]) -> Result<usize>,
32{
33    #[cfg(borrowedbuf_init)]
34    {
35        let n = read(cursor.ensure_init().init_mut())?;
36        cursor.advance(n);
37    }
38    #[cfg(not(borrowedbuf_init))]
39    {
40        // SAFETY: We do not uninitialize any part of the buffer.
41        let n = read(unsafe { cursor.as_mut().write_filled(0) })?;
42        assert!(n <= cursor.capacity());
43        // SAFETY: We've initialized the entire buffer, and `read` can't make it uninitialized.
44        unsafe {
45            cursor.advance(n);
46        }
47    }
48    Ok(())
49}
50
51/// Default [`Read::read_buf_exact`] implementation.
52pub fn default_read_buf_exact<R: Read + ?Sized>(
53    this: &mut R,
54    mut cursor: BorrowedCursor<'_>,
55) -> Result<()> {
56    while cursor.capacity() > 0 {
57        let prev_written = cursor.written();
58        match this.read_buf(cursor.reborrow()) {
59            Ok(()) => {}
60            Err(e) if e.canonicalize() == Error::Interrupted => continue,
61            Err(e) => return Err(e),
62        }
63
64        if cursor.written() == prev_written {
65            return Err(Error::UnexpectedEof);
66        }
67    }
68
69    Ok(())
70}
71
72/// Default [`Read::read_to_end`] implementation with optional size hint.
73#[cfg(feature = "alloc")]
74pub fn default_read_to_end<R: Read + ?Sized>(
75    r: &mut R,
76    buf: &mut Vec<u8>,
77    size_hint: Option<usize>,
78) -> Result<usize> {
79    use core::io::BorrowedBuf;
80
81    use crate::DEFAULT_BUF_SIZE;
82
83    let start_len = buf.len();
84    let start_cap = buf.capacity();
85    // Optionally limit the maximum bytes read on each iteration.
86    // This adds an arbitrary fiddle factor to allow for more data than we expect.
87    let mut max_read_size = size_hint
88        .and_then(|s| {
89            s.checked_add(1024)?
90                .checked_next_multiple_of(DEFAULT_BUF_SIZE)
91        })
92        .unwrap_or(DEFAULT_BUF_SIZE);
93
94    const PROBE_SIZE: usize = 32;
95
96    fn small_probe_read<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> {
97        let mut probe = [0u8; PROBE_SIZE];
98
99        loop {
100            match r.read(&mut probe) {
101                Ok(n) => {
102                    // there is no way to recover from allocation failure here
103                    // because the data has already been read.
104                    buf.extend_from_slice(&probe[..n]);
105                    return Ok(n);
106                }
107                Err(e) if e.canonicalize() == Error::Interrupted => continue,
108                Err(e) => return Err(e),
109            }
110        }
111    }
112
113    if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
114        let read = small_probe_read(r, buf)?;
115
116        if read == 0 {
117            return Ok(0);
118        }
119    }
120
121    #[cfg(borrowedbuf_init)]
122    let mut initialized = 0; // Extra initialized bytes from previous loop iteration
123    #[cfg(borrowedbuf_init)]
124    let mut consecutive_short_reads = 0;
125
126    loop {
127        if buf.len() == buf.capacity() && buf.capacity() == start_cap {
128            // The buffer might be an exact fit. Let's read into a probe buffer
129            // and see if it returns `Ok(0)`. If so, we've avoided an
130            // unnecessary doubling of the capacity. But if not, append the
131            // probe buffer to the primary buffer and let its capacity grow.
132            let read = small_probe_read(r, buf)?;
133
134            if read == 0 {
135                return Ok(buf.len() - start_len);
136            }
137        }
138
139        if buf.len() == buf.capacity() {
140            // buf is full, need more space
141            buf.try_reserve(PROBE_SIZE).map_err(|_| Error::NoMemory)?;
142        }
143
144        let mut spare = buf.spare_capacity_mut();
145        let buf_len = spare.len().min(max_read_size);
146        spare = &mut spare[..buf_len];
147        let mut read_buf: BorrowedBuf<'_> = spare.into();
148
149        #[cfg(borrowedbuf_init)]
150        // SAFETY: These bytes were initialized but not filled in the previous loop
151        unsafe {
152            read_buf.set_init(initialized);
153        }
154
155        let mut cursor = read_buf.unfilled();
156        let result = loop {
157            match r.read_buf(cursor.reborrow()) {
158                Err(e) if e.canonicalize() == Error::Interrupted => continue,
159                // Do not stop now in case of error: we might have received both data
160                // and an error
161                res => break res,
162            }
163        };
164
165        #[cfg(borrowedbuf_init)]
166        let unfilled_but_initialized = cursor.init_mut().len();
167        let bytes_read = cursor.written();
168        #[cfg(borrowedbuf_init)]
169        let was_fully_initialized = read_buf.init_len() == buf_len;
170
171        // SAFETY: BorrowedBuf's invariants mean this much memory is initialized.
172        unsafe {
173            let new_len = bytes_read + buf.len();
174            buf.set_len(new_len);
175        }
176
177        // Now that all data is pushed to the vector, we can fail without data loss
178        result?;
179
180        if bytes_read == 0 {
181            return Ok(buf.len() - start_len);
182        }
183
184        #[cfg(borrowedbuf_init)]
185        if bytes_read < buf_len {
186            consecutive_short_reads += 1;
187        } else {
188            consecutive_short_reads = 0;
189        }
190
191        #[cfg(borrowedbuf_init)]
192        {
193            // store how much was initialized but not filled
194            initialized = unfilled_but_initialized;
195        }
196
197        // Use heuristics to determine the max read size if no initial size hint was provided
198        if size_hint.is_none() {
199            #[cfg(borrowedbuf_init)]
200            // The reader is returning short reads but it doesn't call ensure_init().
201            // In that case we no longer need to restrict read sizes to avoid
202            // initialization costs.
203            // When reading from disk we usually don't get any short reads except at EOF.
204            // So we wait for at least 2 short reads before uncapping the read buffer;
205            // this helps with the Windows issue.
206            if !was_fully_initialized && consecutive_short_reads > 1 {
207                max_read_size = usize::MAX;
208            }
209
210            // we have passed a larger buffer than previously and the
211            // reader still hasn't returned a short read
212            if buf_len >= max_read_size && bytes_read == buf_len {
213                max_read_size = max_read_size.saturating_mul(2);
214            }
215        }
216    }
217}
218
219#[cfg(feature = "alloc")]
220pub(crate) unsafe fn append_to_string<F>(buf: &mut String, f: F) -> Result<usize>
221where
222    F: FnOnce(&mut Vec<u8>) -> Result<usize>,
223{
224    struct Guard<'a> {
225        buf: &'a mut Vec<u8>,
226        len: usize,
227    }
228
229    impl Drop for Guard<'_> {
230        fn drop(&mut self) {
231            unsafe {
232                self.buf.set_len(self.len);
233            }
234        }
235    }
236
237    let mut g = Guard {
238        len: buf.len(),
239        buf: unsafe { buf.as_mut_vec() },
240    };
241    let ret = f(g.buf);
242
243    // SAFETY: the caller promises to only append data to `buf`
244    let appended = unsafe { g.buf.get_unchecked(g.len..) };
245    if str::from_utf8(appended).is_err() {
246        ret.and(Err(Error::IllegalBytes))
247    } else {
248        g.len = g.buf.len();
249        ret
250    }
251}
252
253/// Default [`Read::read_to_string`] implementation with optional size hint.
254#[cfg(feature = "alloc")]
255pub fn default_read_to_string<R: Read + ?Sized>(
256    r: &mut R,
257    buf: &mut String,
258    size_hint: Option<usize>,
259) -> Result<usize> {
260    // Note that we do *not* call `r.read_to_end()` here. We are passing
261    // `&mut Vec<u8>` (the raw contents of `buf`) into the `read_to_end`
262    // method to fill it up. An arbitrary implementation could overwrite the
263    // entire contents of the vector, not just append to it (which is what
264    // we are expecting).
265    //
266    // To prevent extraneously checking the UTF-8-ness of the entire buffer
267    // we pass it to our hardcoded `default_read_to_end` implementation which
268    // we know is guaranteed to only read data into the end of the buffer.
269    unsafe { append_to_string(buf, |b| default_read_to_end(r, b, size_hint)) }
270}
271
272/// The `Read` trait allows for reading bytes from a source.
273///
274/// See [`std::io::Read`] for more details.
275pub trait Read {
276    /// Pull some bytes from this source into the specified buffer, returning
277    /// how many bytes were read.
278    fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
279
280    /// Read the exact number of bytes required to fill `buf`.
281    fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
282        default_read_exact(self, buf)
283    }
284
285    /// Pull some bytes from this source into the specified buffer.
286    ///
287    /// This method makes it possible to return both data and an error but it is advised against.
288    fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> {
289        default_read_buf(|b| self.read(b), buf)
290    }
291
292    /// Reads the exact number of bytes required to fill `cursor`.
293    ///
294    /// If this function returns an error, all bytes read will be appended to `cursor`.
295    fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<()> {
296        default_read_buf_exact(self, cursor)
297    }
298
299    /// Read all bytes until EOF in this source, placing them into `buf`.
300    #[cfg(feature = "alloc")]
301    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
302        default_read_to_end(self, buf, None)
303    }
304
305    /// Read all bytes until EOF in this source, appending them to `buf`.
306    #[cfg(feature = "alloc")]
307    fn read_to_string(&mut self, buf: &mut String) -> Result<usize> {
308        default_read_to_string(self, buf, None)
309    }
310
311    /// Creates a "by reference" adapter for this instance of `Read`.
312    ///
313    /// The returned `adapter` also implements Read and will simply borrow this
314    /// current reader.
315    fn by_ref(&mut self) -> &mut Self
316    where
317        Self: Sized,
318    {
319        self
320    }
321
322    /// Creates an adapter which will chain this stream with another.
323    ///
324    /// The returned `Read` instance will first read all bytes from this object
325    /// until EOF is encountered. Afterwards the output is equivalent to the
326    /// output of `next`.
327    fn chain<R: Read>(self, next: R) -> Chain<Self, R>
328    where
329        Self: Sized,
330    {
331        Chain::new(self, next)
332    }
333
334    /// Creates an adapter which will read at most `limit` bytes from it.
335    ///
336    /// This function returns a new instance of `Read` which will read at most
337    /// `limit` bytes, after which it will always return EOF ([`Ok(0)`]). Any
338    /// read errors will not count towards the number of bytes read and future
339    /// calls to [`read()`] may succeed.
340    ///
341    /// [`Ok(0)`]: Ok
342    /// [`read()`]: Read::read
343    fn take(self, limit: u64) -> Take<Self>
344    where
345        Self: Sized,
346    {
347        Take::new(self, limit)
348    }
349}
350
351/// Reads all bytes from a [reader][Read] into a new [`String`].
352///
353/// This is a convenience function for [`Read::read_to_string`].
354///
355/// See [`std::io::read_to_string`] for more details.
356#[cfg(feature = "alloc")]
357pub fn read_to_string<R: Read>(mut reader: R) -> Result<String> {
358    let mut buf = String::new();
359    reader.read_to_string(&mut buf)?;
360    Ok(buf)
361}
362
363/// A `BufRead` is a type of `Read`er which has an internal buffer, allowing it
364/// to perform extra ways of reading.
365///
366/// See [`std::io::BufRead`] for more details.
367pub trait BufRead: Read {
368    /// Returns the contents of the internal buffer, filling it with more data, via `Read` methods,
369    /// if empty.
370    fn fill_buf(&mut self) -> Result<&[u8]>;
371
372    /// Marks the given `amount` of additional bytes from the internal buffer as having been read.
373    /// Subsequent calls to `read` only return bytes that have not been marked as read.
374    fn consume(&mut self, amount: usize);
375
376    /// Checks if there is any data left to be `read`.
377    fn has_data_left(&mut self) -> Result<bool> {
378        self.fill_buf().map(|b| !b.is_empty())
379    }
380
381    /// Skips all bytes until the delimiter `byte` or EOF is reached.
382    fn skip_until(&mut self, byte: u8) -> Result<usize> {
383        let mut read = 0;
384        loop {
385            let (done, used) = {
386                let available = self.fill_buf()?;
387                match memchr::memchr(byte, available) {
388                    Some(i) => (true, i + 1),
389                    None => (false, available.len()),
390                }
391            };
392            self.consume(used);
393            read += used;
394            if done || used == 0 {
395                return Ok(read);
396            }
397        }
398    }
399
400    /// Read all bytes into `buf` until the delimiter `byte` or EOF is reached.
401    #[cfg(feature = "alloc")]
402    fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> Result<usize> {
403        let mut read = 0;
404        loop {
405            let (done, used) = {
406                let available = self.fill_buf()?;
407                match memchr::memchr(byte, available) {
408                    Some(i) => {
409                        buf.extend_from_slice(&available[..=i]);
410                        (true, i + 1)
411                    }
412                    None => {
413                        buf.extend_from_slice(available);
414                        (false, available.len())
415                    }
416                }
417            };
418            self.consume(used);
419            read += used;
420            if done || used == 0 {
421                return Ok(read);
422            }
423        }
424    }
425
426    /// Read all bytes until a newline (the `0xA` byte) is reached, and append
427    /// them to the provided `String` buffer.
428    #[cfg(feature = "alloc")]
429    fn read_line(&mut self, buf: &mut String) -> Result<usize> {
430        unsafe { super::append_to_string(buf, |b| self.read_until(b'\n', b)) }
431    }
432
433    /// Returns an iterator over the contents of this reader split on the byte
434    /// `byte`.
435    #[cfg(feature = "alloc")]
436    fn split(self, byte: u8) -> Split<Self>
437    where
438        Self: Sized,
439    {
440        Split {
441            buf: self,
442            delim: byte,
443        }
444    }
445
446    /// Returns an iterator over the lines of this reader.
447    #[cfg(feature = "alloc")]
448    fn lines(self) -> Lines<Self>
449    where
450        Self: Sized,
451    {
452        Lines { buf: self }
453    }
454}
455
456/// An iterator over the contents of an instance of `BufRead` split on a
457/// particular byte.
458///
459/// This struct is generally created by calling [`split`] on a `BufRead`.
460/// Please see the documentation of [`split`] for more details.
461///
462/// [`split`]: BufRead::split
463#[cfg(feature = "alloc")]
464#[derive(Debug)]
465pub struct Split<B> {
466    buf: B,
467    delim: u8,
468}
469
470#[cfg(feature = "alloc")]
471impl<B: BufRead> Iterator for Split<B> {
472    type Item = Result<Vec<u8>>;
473
474    fn next(&mut self) -> Option<Result<Vec<u8>>> {
475        let mut buf = Vec::new();
476        match self.buf.read_until(self.delim, &mut buf) {
477            Ok(0) => None,
478            Ok(_n) => {
479                if buf[buf.len() - 1] == self.delim {
480                    buf.pop();
481                }
482                Some(Ok(buf))
483            }
484            Err(e) => Some(Err(e)),
485        }
486    }
487}
488
489/// An iterator over the lines of an instance of `BufRead`.
490///
491/// This struct is generally created by calling [`lines`] on a `BufRead`.
492/// Please see the documentation of [`lines`] for more details.
493///
494/// [`lines`]: BufRead::lines
495#[cfg(feature = "alloc")]
496#[derive(Debug)]
497pub struct Lines<B> {
498    buf: B,
499}
500
501#[cfg(feature = "alloc")]
502impl<B: BufRead> Iterator for Lines<B> {
503    type Item = Result<String>;
504
505    fn next(&mut self) -> Option<Result<String>> {
506        let mut buf = String::new();
507        match self.buf.read_line(&mut buf) {
508            Ok(0) => None,
509            Ok(_n) => {
510                if buf.ends_with('\n') {
511                    buf.pop();
512                    if buf.ends_with('\r') {
513                        buf.pop();
514                    }
515                }
516                Some(Ok(buf))
517            }
518            Err(e) => Some(Err(e)),
519        }
520    }
521}