simd_csv/
splitter.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8pub struct SplitterBuilder {
9    delimiter: u8,
10    quote: u8,
11    buffer_capacity: Option<usize>,
12    has_headers: bool,
13}
14
15impl Default for SplitterBuilder {
16    fn default() -> Self {
17        Self {
18            delimiter: b',',
19            quote: b'"',
20            buffer_capacity: None,
21            has_headers: true,
22        }
23    }
24}
25
26impl SplitterBuilder {
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    pub fn with_capacity(capacity: usize) -> Self {
32        let mut splitter = Self::default();
33        splitter.buffer_capacity(capacity);
34        splitter
35    }
36
37    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
38        self.delimiter = delimiter;
39        self
40    }
41
42    pub fn quote(&mut self, quote: u8) -> &mut Self {
43        self.quote = quote;
44        self
45    }
46
47    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
48        self.has_headers = yes;
49        self
50    }
51
52    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
53        self.buffer_capacity = Some(capacity);
54        self
55    }
56
57    pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
58        Splitter {
59            buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
60            inner: CoreReader::new(self.delimiter, self.quote),
61            headers: Vec::new(),
62            has_read: false,
63            has_headers: self.has_headers,
64            must_reemit_headers: !self.has_headers,
65        }
66    }
67}
68
69#[derive(Debug)]
70pub struct Splitter<R> {
71    buffer: ScratchBuffer<R>,
72    inner: CoreReader,
73    headers: Vec<u8>,
74    has_read: bool,
75    has_headers: bool,
76    must_reemit_headers: bool,
77}
78
79impl<R: Read> Splitter<R> {
80    pub fn from_reader(reader: R) -> Self {
81        SplitterBuilder::new().from_reader(reader)
82    }
83
84    pub fn has_headers(&self) -> bool {
85        self.has_headers
86    }
87
88    pub fn byte_headers(&mut self) -> error::Result<&[u8]> {
89        self.on_first_read()?;
90
91        Ok(&self.headers)
92    }
93
94    #[inline(always)]
95    fn on_first_read(&mut self) -> error::Result<()> {
96        if self.has_read {
97            return Ok(());
98        }
99
100        let input = self.buffer.fill_buf()?;
101        let bom_len = trim_bom(input);
102        self.buffer.consume(bom_len);
103
104        if let Some(record) = self.split_record_impl()? {
105            self.headers = record.to_vec();
106        } else {
107            self.must_reemit_headers = false;
108        }
109
110        self.has_read = true;
111
112        Ok(())
113    }
114
115    pub fn count_records(&mut self) -> error::Result<u64> {
116        use ReadResult::*;
117
118        self.on_first_read()?;
119        self.buffer.reset();
120
121        let mut count: u64 = 0;
122
123        if self.must_reemit_headers {
124            count += 1;
125            self.must_reemit_headers = false;
126        }
127
128        loop {
129            let input = self.buffer.fill_buf()?;
130
131            let (result, pos) = self.inner.split_record(input);
132
133            self.buffer.consume(pos);
134
135            match result {
136                End => break,
137                InputEmpty | Cr | Lf => continue,
138                Record => {
139                    count += 1;
140                }
141            };
142        }
143
144        Ok(count)
145    }
146
147    pub fn split_record_impl(&mut self) -> error::Result<Option<&[u8]>> {
148        use ReadResult::*;
149
150        self.buffer.reset();
151
152        loop {
153            let input = self.buffer.fill_buf()?;
154
155            let (result, pos) = self.inner.split_record(input);
156
157            match result {
158                End => {
159                    self.buffer.consume(pos);
160                    return Ok(None);
161                }
162                Cr | Lf => {
163                    self.buffer.consume(pos);
164                }
165                InputEmpty => {
166                    self.buffer.save();
167                }
168                Record => {
169                    return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
170                }
171            };
172        }
173    }
174
175    pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
176        self.on_first_read()?;
177
178        if self.must_reemit_headers {
179            self.must_reemit_headers = false;
180            return Ok(Some(&self.headers));
181        }
182
183        self.split_record_impl()
184    }
185
186    pub fn split_record_with_position(&mut self) -> error::Result<Option<(u64, &[u8])>> {
187        self.on_first_read()?;
188
189        let pos = self.position();
190
191        if self.must_reemit_headers {
192            self.must_reemit_headers = false;
193            return Ok(Some((pos, &self.headers)));
194        }
195
196        match self.split_record_impl() {
197            Ok(Some(record)) => Ok(Some((pos, record))),
198            Ok(None) => Ok(None),
199            Err(err) => Err(err),
200        }
201    }
202
203    pub fn into_bufreader(self) -> (Option<Vec<u8>>, BufReader<R>) {
204        (
205            self.must_reemit_headers.then_some(self.headers),
206            self.buffer.into_bufreader(),
207        )
208    }
209
210    #[inline(always)]
211    pub fn position(&self) -> u64 {
212        if self.must_reemit_headers {
213            0
214        } else {
215            self.buffer.position()
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use std::io::Cursor;
223
224    use super::*;
225
226    fn count_records(data: &str, capacity: usize) -> u64 {
227        let mut splitter = SplitterBuilder::with_capacity(capacity)
228            .has_headers(false)
229            .from_reader(Cursor::new(data));
230        splitter.count_records().unwrap()
231    }
232
233    fn split_records(data: &str, capacity: usize) -> u64 {
234        let mut splitter = SplitterBuilder::with_capacity(capacity)
235            .has_headers(false)
236            .from_reader(Cursor::new(data));
237        let mut count: u64 = 0;
238
239        while let Some(_) = splitter.split_record().unwrap() {
240            count += 1;
241        }
242
243        count
244    }
245
246    #[test]
247    fn test_count() {
248        // Empty
249        assert_eq!(count_records("", 1024), 0);
250
251        // Single cells with various empty lines
252        let tests = vec![
253            "name\njohn\nlucy",
254            "name\njohn\nlucy\n",
255            "name\n\njohn\r\nlucy\n",
256            "name\n\njohn\r\nlucy\n\n",
257            "name\n\n\njohn\r\n\r\nlucy\n\n\n",
258            "\nname\njohn\nlucy",
259            "\n\nname\njohn\nlucy",
260            "\r\n\r\nname\njohn\nlucy",
261            "name\njohn\nlucy\r\n",
262            "name\njohn\nlucy\r\n\r\n",
263        ];
264
265        for capacity in [32usize, 4, 3, 2, 1] {
266            for test in tests.iter() {
267                assert_eq!(
268                    count_records(test, capacity),
269                    3,
270                    "capacity={} string={:?}",
271                    capacity,
272                    test
273                );
274            }
275        }
276
277        // Multiple cells
278        let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
279        assert_eq!(count_records(data, 1024), 3);
280        assert_eq!(split_records(data, 1024), 3);
281
282        // Quoting
283        for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
284            let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
285
286            assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
287            assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
288        }
289
290        // Different separator
291        let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
292        assert_eq!(count_records(data, 1024), 3);
293        assert_eq!(split_records(data, 1024), 3);
294    }
295
296    #[test]
297    fn test_empty_row() -> error::Result<()> {
298        let data = "name\n\"\"\nlucy\n\"\"";
299
300        // Counting
301        let mut reader = Splitter::from_reader(Cursor::new(data));
302
303        assert_eq!(reader.count_records()?, 3);
304
305        Ok(())
306    }
307}