simd_csv/
line_reader.rs

1use memchr::memchr;
2
3use std::io::{self, BufReader, Read};
4
5use crate::buffer::ScratchBuffer;
6use crate::utils::trim_trailing_crlf;
7
8/// A zero-copy & optimized line reader.
9///
10/// This reader recognizes both `LF` & `CRLF` line terminators, but not single
11/// `CR`.
12pub struct LineReader<R> {
13    inner: ScratchBuffer<R>,
14}
15
16impl<R: Read> LineReader<R> {
17    /// Create a new reader with default using the provided reader implementing
18    /// [`std::io::Read`].
19    ///
20    /// Avoid providing a buffered reader because buffering will be handled for
21    /// you by the [`LineReader`].
22    pub fn from_reader(inner: R) -> Self {
23        Self {
24            inner: ScratchBuffer::new(inner),
25        }
26    }
27
28    /// Create a new reader with provided buffer capacity and using the provided
29    /// reader implementing [`std::io::Read`].
30    ///
31    /// Avoid providing a buffered reader because buffering will be handled for
32    /// you by the [`LineReader`].
33    pub fn with_capacity(capacity: usize, inner: R) -> Self {
34        Self {
35            inner: ScratchBuffer::with_capacity(capacity, inner),
36        }
37    }
38
39    /// Consume the reader to count the number of lines as fast as possible.
40    pub fn count_lines(&mut self) -> io::Result<u64> {
41        let mut count: u64 = 0;
42        let mut current_is_empty = true;
43
44        loop {
45            let input = self.inner.fill_buf()?;
46            let len = input.len();
47
48            if len == 0 {
49                if !current_is_empty {
50                    count += 1;
51                }
52
53                return Ok(count);
54            }
55
56            match memchr(b'\n', input) {
57                None => {
58                    self.inner.consume(len);
59                    current_is_empty = false;
60                }
61                Some(pos) => {
62                    count += 1;
63                    self.inner.consume(pos + 1);
64                    current_is_empty = true;
65                }
66            };
67        }
68    }
69
70    /// Attempt to read the next line from underlying reader.
71    ///
72    /// Will return `None` if the end of stream was reached.
73    pub fn read_line(&mut self) -> io::Result<Option<&[u8]>> {
74        self.inner.reset();
75
76        loop {
77            let input = self.inner.fill_buf()?;
78            let len = input.len();
79
80            if len == 0 {
81                if self.inner.has_something_saved() {
82                    return Ok(Some(trim_trailing_crlf(self.inner.saved())));
83                }
84
85                return Ok(None);
86            }
87
88            match memchr(b'\n', input) {
89                None => {
90                    self.inner.save();
91                }
92                Some(pos) => {
93                    let bytes = self.inner.flush(pos + 1);
94                    return Ok(Some(trim_trailing_crlf(bytes)));
95                }
96            };
97        }
98    }
99
100    /// Return the current byte offset of the reader.
101    #[inline(always)]
102    pub fn position(&self) -> u64 {
103        self.inner.position()
104    }
105
106    /// Return the underlying [`BufReader`].
107    #[inline(always)]
108    pub fn into_bufreader(self) -> BufReader<R> {
109        self.inner.into_bufreader()
110    }
111
112    /// Return the underlying reader.
113    ///
114    /// **BEWARE**: Already buffered data will be lost!
115    #[inline(always)]
116    pub fn into_inner(self) -> R {
117        self.inner.into_bufreader().into_inner()
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use std::io::Cursor;
124
125    use super::*;
126
127    #[test]
128    fn test_read_line() -> io::Result<()> {
129        let tests: &[(&[u8], Vec<&[u8]>)] = &[
130            (b"", vec![]),
131            (b"test", vec![b"test"]),
132            (
133                b"hello\nwhatever\r\nbye!",
134                vec![b"hello", b"whatever", b"bye!"],
135            ),
136            (
137                b"hello\nwhatever\nbye!\n",
138                vec![b"hello", b"whatever", b"bye!"],
139            ),
140            (
141                b"hello\nwhatever\r\nbye!\n\n\r\n\n",
142                vec![b"hello", b"whatever", b"bye!", b"", b"", b""],
143            ),
144        ];
145
146        for (data, expected) in tests {
147            let mut reader = LineReader::from_reader(Cursor::new(data));
148
149            let mut lines = Vec::new();
150
151            while let Some(line) = reader.read_line()? {
152                lines.push(line.to_vec());
153            }
154
155            assert_eq!(lines, *expected);
156
157            let mut reader = LineReader::from_reader(Cursor::new(data));
158
159            assert_eq!(reader.count_lines()?, expected.len() as u64);
160        }
161
162        Ok(())
163    }
164}