simd_csv/
line_reader.rs

1use memchr::memchr;
2
3use std::io::{self, BufReader, Read};
4
5use crate::buffer::ScratchBuffer;
6use crate::utils::trim_trailing_crlf;
7
8/// A zero-copy & optimized line reader.
9pub struct LineReader<R> {
10    inner: ScratchBuffer<R>,
11}
12
13impl<R: Read> LineReader<R> {
14    pub fn new(inner: R) -> Self {
15        Self {
16            inner: ScratchBuffer::new(inner),
17        }
18    }
19
20    pub fn with_capacity(capacity: usize, inner: R) -> Self {
21        Self {
22            inner: ScratchBuffer::with_capacity(capacity, inner),
23        }
24    }
25
26    pub fn count_lines(&mut self) -> io::Result<u64> {
27        let mut count: u64 = 0;
28        let mut current_is_empty = true;
29
30        loop {
31            let input = self.inner.fill_buf()?;
32            let len = input.len();
33
34            if len == 0 {
35                if !current_is_empty {
36                    count += 1;
37                }
38
39                return Ok(count);
40            }
41
42            match memchr(b'\n', input) {
43                None => {
44                    self.inner.consume(len);
45                    current_is_empty = false;
46                }
47                Some(pos) => {
48                    count += 1;
49                    self.inner.consume(pos + 1);
50                    current_is_empty = true;
51                }
52            };
53        }
54    }
55
56    pub fn read_line(&mut self) -> io::Result<Option<&[u8]>> {
57        self.inner.reset();
58
59        loop {
60            let input = self.inner.fill_buf()?;
61            let len = input.len();
62
63            if len == 0 {
64                if self.inner.has_something_saved() {
65                    return Ok(Some(trim_trailing_crlf(self.inner.saved())));
66                }
67
68                return Ok(None);
69            }
70
71            match memchr(b'\n', input) {
72                None => {
73                    self.inner.save();
74                }
75                Some(pos) => {
76                    let bytes = self.inner.flush(pos + 1);
77                    return Ok(Some(trim_trailing_crlf(bytes)));
78                }
79            };
80        }
81    }
82
83    #[inline(always)]
84    pub fn position(&self) -> u64 {
85        self.inner.position()
86    }
87
88    #[inline(always)]
89    pub fn into_bufreader(self) -> BufReader<R> {
90        self.inner.into_bufreader()
91    }
92
93    #[inline(always)]
94    pub fn into_inner(self) -> R {
95        self.inner.into_bufreader().into_inner()
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use std::io::Cursor;
102
103    use super::*;
104
105    #[test]
106    fn test_read_line() -> io::Result<()> {
107        let tests: &[(&[u8], Vec<&[u8]>)] = &[
108            (b"", vec![]),
109            (b"test", vec![b"test"]),
110            (
111                b"hello\nwhatever\r\nbye!",
112                vec![b"hello", b"whatever", b"bye!"],
113            ),
114            (
115                b"hello\nwhatever\nbye!\n",
116                vec![b"hello", b"whatever", b"bye!"],
117            ),
118            (
119                b"hello\nwhatever\r\nbye!\n\n\r\n\n",
120                vec![b"hello", b"whatever", b"bye!", b"", b"", b""],
121            ),
122        ];
123
124        for (data, expected) in tests {
125            let mut reader = LineReader::new(Cursor::new(data));
126
127            let mut lines = Vec::new();
128
129            while let Some(line) = reader.read_line()? {
130                lines.push(line.to_vec());
131            }
132
133            assert_eq!(lines, *expected);
134
135            let mut reader = LineReader::new(Cursor::new(data));
136
137            assert_eq!(reader.count_lines()?, expected.len() as u64);
138        }
139
140        Ok(())
141    }
142}