simd_csv/
line_buffer.rs

1use memchr::memchr;
2
3use std::io::{self, BufRead, BufReader, Read};
4
5use crate::utils::trim_trailing_cr;
6
7pub struct LineBuffer<R> {
8    buffer: BufReader<R>,
9    scratch: Vec<u8>,
10    actual_buffer_position: Option<usize>,
11}
12
13impl<R: Read> LineBuffer<R> {
14    pub fn new(inner: R) -> Self {
15        Self {
16            buffer: BufReader::new(inner),
17            scratch: Vec::new(),
18            actual_buffer_position: None,
19        }
20    }
21
22    pub fn with_capacity(capacity: usize, inner: R) -> Self {
23        Self {
24            buffer: BufReader::with_capacity(capacity, inner),
25            scratch: Vec::with_capacity(capacity),
26            actual_buffer_position: None,
27        }
28    }
29
30    pub fn count_lines(&mut self) -> io::Result<u64> {
31        let mut count: u64 = 0;
32        let mut current_is_empty = true;
33
34        loop {
35            let input = self.buffer.fill_buf()?;
36            let len = input.len();
37
38            if len == 0 {
39                if !current_is_empty {
40                    count += 1;
41                }
42
43                return Ok(count);
44            }
45
46            match memchr(b'\n', input) {
47                None => {
48                    self.buffer.consume(len);
49                    current_is_empty = false;
50                }
51                Some(pos) => {
52                    count += 1;
53                    self.buffer.consume(pos + 1);
54                    current_is_empty = true;
55                }
56            };
57        }
58    }
59
60    pub fn read_line(&mut self) -> io::Result<Option<&[u8]>> {
61        self.scratch.clear();
62
63        if let Some(last_pos) = self.actual_buffer_position.take() {
64            self.buffer.consume(last_pos);
65        }
66
67        loop {
68            let input = self.buffer.fill_buf()?;
69            let len = input.len();
70
71            if len == 0 {
72                if !self.scratch.is_empty() {
73                    return Ok(Some(trim_trailing_cr(&self.scratch)));
74                }
75
76                return Ok(None);
77            }
78
79            match memchr(b'\n', input) {
80                None => {
81                    self.scratch.extend_from_slice(input);
82                    self.buffer.consume(len);
83                }
84                Some(pos) => {
85                    if self.scratch.is_empty() {
86                        self.actual_buffer_position = Some(pos + 1);
87                        return Ok(Some(trim_trailing_cr(&self.buffer.buffer()[..pos])));
88                    } else {
89                        self.scratch.extend_from_slice(&input[..pos]);
90                        self.buffer.consume(pos + 1);
91
92                        return Ok(Some(trim_trailing_cr(&self.scratch)));
93                    }
94                }
95            };
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use std::io::Cursor;
103
104    use super::*;
105
106    #[test]
107    fn test_read_line() -> io::Result<()> {
108        let tests: &[(&[u8], Vec<&[u8]>)] = &[
109            (b"", vec![]),
110            (b"test", vec![b"test"]),
111            (
112                b"hello\nwhatever\r\nbye!",
113                vec![b"hello", b"whatever", b"bye!"],
114            ),
115            (
116                b"hello\nwhatever\nbye!\n",
117                vec![b"hello", b"whatever", b"bye!"],
118            ),
119            (
120                b"hello\nwhatever\r\nbye!\n\n\r\n\n",
121                vec![b"hello", b"whatever", b"bye!", b"", b"", b""],
122            ),
123        ];
124
125        for (data, expected) in tests {
126            let mut reader = LineBuffer::new(Cursor::new(data));
127
128            let mut lines = Vec::new();
129
130            while let Some(line) = reader.read_line()? {
131                lines.push(line.to_vec());
132            }
133
134            assert_eq!(lines, *expected);
135
136            let mut reader = LineBuffer::new(Cursor::new(data));
137
138            assert_eq!(reader.count_lines()?, expected.len() as u64);
139        }
140
141        Ok(())
142    }
143}