simd_csv/
splitter.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8/// Builds a [`Splitter`] with given configuration.
9pub struct SplitterBuilder {
10    delimiter: u8,
11    quote: u8,
12    buffer_capacity: Option<usize>,
13    has_headers: bool,
14}
15
16impl Default for SplitterBuilder {
17    fn default() -> Self {
18        Self {
19            delimiter: b',',
20            quote: b'"',
21            buffer_capacity: None,
22            has_headers: true,
23        }
24    }
25}
26
27impl SplitterBuilder {
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    pub fn with_capacity(capacity: usize) -> Self {
33        let mut splitter = Self::default();
34        splitter.buffer_capacity(capacity);
35        splitter
36    }
37
38    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
39        self.delimiter = delimiter;
40        self
41    }
42
43    pub fn quote(&mut self, quote: u8) -> &mut Self {
44        self.quote = quote;
45        self
46    }
47
48    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
49        self.has_headers = yes;
50        self
51    }
52
53    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
54        self.buffer_capacity = Some(capacity);
55        self
56    }
57
58    pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
59        Splitter {
60            buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
61            inner: CoreReader::new(self.delimiter, self.quote),
62            headers: Vec::new(),
63            has_read: false,
64            has_headers: self.has_headers,
65            must_reemit_headers: !self.has_headers,
66        }
67    }
68}
69
70/// An already configured CSV record splitter.
71///
72/// # Configuration
73///
74/// To configure a [`Splitter`], if you need a custom delimiter for instance of if
75/// you want to tweak the size of the inner buffer. Check out the
76/// [`SplitterBuilder`].
77#[derive(Debug)]
78pub struct Splitter<R> {
79    buffer: ScratchBuffer<R>,
80    inner: CoreReader,
81    headers: Vec<u8>,
82    has_read: bool,
83    has_headers: bool,
84    must_reemit_headers: bool,
85}
86
87impl<R: Read> Splitter<R> {
88    pub fn from_reader(reader: R) -> Self {
89        SplitterBuilder::new().from_reader(reader)
90    }
91
92    /// Returns whether this reader has been configured to interpret the first
93    /// record as a header.
94    pub fn has_headers(&self) -> bool {
95        self.has_headers
96    }
97
98    pub fn byte_headers(&mut self) -> error::Result<&[u8]> {
99        self.on_first_read()?;
100
101        Ok(&self.headers)
102    }
103
104    #[inline(always)]
105    fn on_first_read(&mut self) -> error::Result<()> {
106        if self.has_read {
107            return Ok(());
108        }
109
110        let input = self.buffer.fill_buf()?;
111        let bom_len = trim_bom(input);
112        self.buffer.consume(bom_len);
113
114        if let Some(record) = self.split_record_impl()? {
115            self.headers = record.to_vec();
116        } else {
117            self.must_reemit_headers = false;
118        }
119
120        self.has_read = true;
121
122        Ok(())
123    }
124
125    pub fn count_records(&mut self) -> error::Result<u64> {
126        use ReadResult::*;
127
128        self.on_first_read()?;
129        self.buffer.reset();
130
131        let mut count: u64 = 0;
132
133        if self.must_reemit_headers {
134            count += 1;
135            self.must_reemit_headers = false;
136        }
137
138        loop {
139            let input = self.buffer.fill_buf()?;
140
141            let (result, pos) = self.inner.split_record(input);
142
143            self.buffer.consume(pos);
144
145            match result {
146                End => break,
147                InputEmpty | Cr | Lf => continue,
148                Record => {
149                    count += 1;
150                }
151            };
152        }
153
154        Ok(count)
155    }
156
157    pub fn split_record_impl(&mut self) -> error::Result<Option<&[u8]>> {
158        use ReadResult::*;
159
160        self.buffer.reset();
161
162        loop {
163            let input = self.buffer.fill_buf()?;
164
165            let (result, pos) = self.inner.split_record(input);
166
167            match result {
168                End => {
169                    self.buffer.consume(pos);
170                    return Ok(None);
171                }
172                Cr | Lf => {
173                    self.buffer.consume(pos);
174                }
175                InputEmpty => {
176                    self.buffer.save();
177                }
178                Record => {
179                    return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
180                }
181            };
182        }
183    }
184
185    pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
186        self.on_first_read()?;
187
188        if self.must_reemit_headers {
189            self.must_reemit_headers = false;
190            return Ok(Some(&self.headers));
191        }
192
193        self.split_record_impl()
194    }
195
196    pub fn split_record_with_position(&mut self) -> error::Result<Option<(u64, &[u8])>> {
197        self.on_first_read()?;
198
199        let pos = self.position();
200
201        if self.must_reemit_headers {
202            self.must_reemit_headers = false;
203            return Ok(Some((pos, &self.headers)));
204        }
205
206        match self.split_record_impl() {
207            Ok(Some(record)) => Ok(Some((pos, record))),
208            Ok(None) => Ok(None),
209            Err(err) => Err(err),
210        }
211    }
212
213    pub fn into_bufreader(self) -> (Option<Vec<u8>>, BufReader<R>) {
214        (
215            self.must_reemit_headers.then_some(self.headers),
216            self.buffer.into_bufreader(),
217        )
218    }
219
220    #[inline(always)]
221    pub fn position(&self) -> u64 {
222        if self.must_reemit_headers {
223            0
224        } else {
225            self.buffer.position()
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use std::io::Cursor;
233
234    use super::*;
235
236    fn count_records(data: &str, capacity: usize) -> u64 {
237        let mut splitter = SplitterBuilder::with_capacity(capacity)
238            .has_headers(false)
239            .from_reader(Cursor::new(data));
240        splitter.count_records().unwrap()
241    }
242
243    fn split_records(data: &str, capacity: usize) -> u64 {
244        let mut splitter = SplitterBuilder::with_capacity(capacity)
245            .has_headers(false)
246            .from_reader(Cursor::new(data));
247        let mut count: u64 = 0;
248
249        while let Some(_) = splitter.split_record().unwrap() {
250            count += 1;
251        }
252
253        count
254    }
255
256    #[test]
257    fn test_count() {
258        // Empty
259        assert_eq!(count_records("", 1024), 0);
260
261        // Single cells with various empty lines
262        let tests = vec![
263            "name\njohn\nlucy",
264            "name\njohn\nlucy\n",
265            "name\n\njohn\r\nlucy\n",
266            "name\n\njohn\r\nlucy\n\n",
267            "name\n\n\njohn\r\n\r\nlucy\n\n\n",
268            "\nname\njohn\nlucy",
269            "\n\nname\njohn\nlucy",
270            "\r\n\r\nname\njohn\nlucy",
271            "name\njohn\nlucy\r\n",
272            "name\njohn\nlucy\r\n\r\n",
273        ];
274
275        for capacity in [32usize, 4, 3, 2, 1] {
276            for test in tests.iter() {
277                assert_eq!(
278                    count_records(test, capacity),
279                    3,
280                    "capacity={} string={:?}",
281                    capacity,
282                    test
283                );
284            }
285        }
286
287        // Multiple cells
288        let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
289        assert_eq!(count_records(data, 1024), 3);
290        assert_eq!(split_records(data, 1024), 3);
291
292        // Quoting
293        for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
294            let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
295
296            assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
297            assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
298        }
299
300        // Different separator
301        let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
302        assert_eq!(count_records(data, 1024), 3);
303        assert_eq!(split_records(data, 1024), 3);
304    }
305
306    #[test]
307    fn test_empty_row() -> error::Result<()> {
308        let data = "name\n\"\"\nlucy\n\"\"";
309
310        // Counting
311        let mut reader = Splitter::from_reader(Cursor::new(data));
312
313        assert_eq!(reader.count_records()?, 3);
314
315        Ok(())
316    }
317}