simd_csv/
splitter.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8/// Builds a [`Splitter`] with given configuration.
9pub struct SplitterBuilder {
10    delimiter: u8,
11    quote: u8,
12    buffer_capacity: Option<usize>,
13    has_headers: bool,
14}
15
16impl Default for SplitterBuilder {
17    fn default() -> Self {
18        Self {
19            delimiter: b',',
20            quote: b'"',
21            buffer_capacity: None,
22            has_headers: true,
23        }
24    }
25}
26
27impl SplitterBuilder {
28    /// Create a new [`SplitterBuilder`] with default configuration.
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Create a new [`SplitterBuilder`] with provided `capacity`.
34    pub fn with_capacity(capacity: usize) -> Self {
35        let mut splitter = Self::default();
36        splitter.buffer_capacity(capacity);
37        splitter
38    }
39
40    /// Set the delimiter to be used by the created [`Splitter`].
41    ///
42    /// This delimiter must be a single byte.
43    ///
44    /// Will default to a comma.
45    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
46        self.delimiter = delimiter;
47        self
48    }
49
50    /// Set the quote char to be used by the created [`Splitter`].
51    ///
52    /// This char must be a single byte.
53    ///
54    /// Will default to a double quote.
55    pub fn quote(&mut self, quote: u8) -> &mut Self {
56        self.quote = quote;
57        self
58    }
59
60    /// Indicate whether first record must be understood as a header.
61    ///
62    /// Will default to `true`.
63    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
64        self.has_headers = yes;
65        self
66    }
67
68    /// Set the capacity of the created [`Splitter`]'s buffered reader.
69    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
70        self.buffer_capacity = Some(capacity);
71        self
72    }
73
74    /// Create a new [`Splitter`] using the provided reader implementing
75    /// [`std::io::Read`].
76    pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
77        Splitter {
78            buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
79            inner: CoreReader::new(self.delimiter, self.quote),
80            headers: Vec::new(),
81            has_read: false,
82            has_headers: self.has_headers,
83            must_reemit_headers: !self.has_headers,
84        }
85    }
86}
87
88/// An already configured CSV record splitter.
89///
90/// # Configuration
91///
92/// To configure a [`Splitter`], if you need a custom delimiter for instance of if
93/// you want to tweak the size of the inner buffer. Check out the
94/// [`SplitterBuilder`].
95#[derive(Debug)]
96pub struct Splitter<R> {
97    buffer: ScratchBuffer<R>,
98    inner: CoreReader,
99    headers: Vec<u8>,
100    has_read: bool,
101    has_headers: bool,
102    must_reemit_headers: bool,
103}
104
105impl<R: Read> Splitter<R> {
106    /// Create a new splitter with default configuration using the provided
107    /// reader implementing [`std::io::Read`].
108    pub fn from_reader(reader: R) -> Self {
109        SplitterBuilder::new().from_reader(reader)
110    }
111
112    /// Returns whether this reader has been configured to interpret the first
113    /// record as a header.
114    pub fn has_headers(&self) -> bool {
115        self.has_headers
116    }
117
118    /// Attempt to return a reference to this splitter's first record.
119    pub fn byte_headers(&mut self) -> error::Result<&[u8]> {
120        self.on_first_read()?;
121
122        Ok(&self.headers)
123    }
124
125    #[inline(always)]
126    fn on_first_read(&mut self) -> error::Result<()> {
127        if self.has_read {
128            return Ok(());
129        }
130
131        let input = self.buffer.fill_buf()?;
132        let bom_len = trim_bom(input);
133        self.buffer.consume(bom_len);
134
135        if let Some(record) = self.split_record_impl()? {
136            self.headers = record.to_vec();
137        } else {
138            self.must_reemit_headers = false;
139        }
140
141        self.has_read = true;
142
143        Ok(())
144    }
145
146    /// Consume the reader completely to count the number of records as fast as
147    /// possible.
148    pub fn count_records(&mut self) -> error::Result<u64> {
149        use ReadResult::*;
150
151        self.on_first_read()?;
152        self.buffer.reset();
153
154        let mut count: u64 = 0;
155
156        if self.must_reemit_headers {
157            count += 1;
158            self.must_reemit_headers = false;
159        }
160
161        loop {
162            let input = self.buffer.fill_buf()?;
163
164            let (result, pos) = self.inner.split_record(input);
165
166            self.buffer.consume(pos);
167
168            match result {
169                End => break,
170                InputEmpty | Cr | Lf => continue,
171                Record => {
172                    count += 1;
173                }
174            };
175        }
176
177        Ok(count)
178    }
179
180    fn split_record_impl(&mut self) -> error::Result<Option<&[u8]>> {
181        use ReadResult::*;
182
183        self.buffer.reset();
184
185        loop {
186            let input = self.buffer.fill_buf()?;
187
188            let (result, pos) = self.inner.split_record(input);
189
190            match result {
191                End => {
192                    self.buffer.consume(pos);
193                    return Ok(None);
194                }
195                Cr | Lf => {
196                    self.buffer.consume(pos);
197                }
198                InputEmpty => {
199                    self.buffer.save();
200                }
201                Record => {
202                    return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
203                }
204            };
205        }
206    }
207
208    /// Attempt to split the next CSV record and return an optional reference to
209    /// its byte slice.
210    ///
211    /// Returns `Ok(None)` when the reader is fully consumed.
212    pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
213        self.on_first_read()?;
214
215        if self.must_reemit_headers {
216            self.must_reemit_headers = false;
217            return Ok(Some(&self.headers));
218        }
219
220        self.split_record_impl()
221    }
222
223    /// Attempt to split the next CSV record and return an optional byte offset
224    /// as well as a reference to its byte slice.
225    ///
226    /// Returns `Ok(None)` when the reader is fully consumed.
227    pub fn split_record_with_position(&mut self) -> error::Result<Option<(u64, &[u8])>> {
228        self.on_first_read()?;
229
230        let pos = self.position();
231
232        if self.must_reemit_headers {
233            self.must_reemit_headers = false;
234            return Ok(Some((pos, &self.headers)));
235        }
236
237        match self.split_record_impl() {
238            Ok(Some(record)) => Ok(Some((pos, record))),
239            Ok(None) => Ok(None),
240            Err(err) => Err(err),
241        }
242    }
243
244    /// Unwrap into an optional first record (only when the reader was
245    /// configured not to interpret the first record as a header, and when the
246    /// first record was pre-buffered but not yet reemitted), and the underlying
247    /// [`BufReader`].
248    pub fn into_bufreader(self) -> (Option<Vec<u8>>, BufReader<R>) {
249        (
250            self.must_reemit_headers.then_some(self.headers),
251            self.buffer.into_bufreader(),
252        )
253    }
254
255    /// Returns the current byte offset of the reader in the wrapped stream.
256    #[inline(always)]
257    pub fn position(&self) -> u64 {
258        if self.must_reemit_headers {
259            0
260        } else {
261            self.buffer.position()
262        }
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use std::io::Cursor;
269
270    use super::*;
271
272    fn count_records(data: &str, capacity: usize) -> u64 {
273        let mut splitter = SplitterBuilder::with_capacity(capacity)
274            .has_headers(false)
275            .from_reader(Cursor::new(data));
276        splitter.count_records().unwrap()
277    }
278
279    fn split_records(data: &str, capacity: usize) -> u64 {
280        let mut splitter = SplitterBuilder::with_capacity(capacity)
281            .has_headers(false)
282            .from_reader(Cursor::new(data));
283        let mut count: u64 = 0;
284
285        while let Some(_) = splitter.split_record().unwrap() {
286            count += 1;
287        }
288
289        count
290    }
291
292    #[test]
293    fn test_count() {
294        // Empty
295        assert_eq!(count_records("", 1024), 0);
296
297        // Single cells with various empty lines
298        let tests = vec![
299            "name\njohn\nlucy",
300            "name\njohn\nlucy\n",
301            "name\n\njohn\r\nlucy\n",
302            "name\n\njohn\r\nlucy\n\n",
303            "name\n\n\njohn\r\n\r\nlucy\n\n\n",
304            "\nname\njohn\nlucy",
305            "\n\nname\njohn\nlucy",
306            "\r\n\r\nname\njohn\nlucy",
307            "name\njohn\nlucy\r\n",
308            "name\njohn\nlucy\r\n\r\n",
309        ];
310
311        for capacity in [32usize, 4, 3, 2, 1] {
312            for test in tests.iter() {
313                assert_eq!(
314                    count_records(test, capacity),
315                    3,
316                    "capacity={} string={:?}",
317                    capacity,
318                    test
319                );
320            }
321        }
322
323        // Multiple cells
324        let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
325        assert_eq!(count_records(data, 1024), 3);
326        assert_eq!(split_records(data, 1024), 3);
327
328        // Quoting
329        for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
330            let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
331
332            assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
333            assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
334        }
335
336        // Different separator
337        let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
338        assert_eq!(count_records(data, 1024), 3);
339        assert_eq!(split_records(data, 1024), 3);
340    }
341
342    #[test]
343    fn test_empty_row() -> error::Result<()> {
344        let data = "name\n\"\"\nlucy\n\"\"";
345
346        // Counting
347        let mut reader = Splitter::from_reader(Cursor::new(data));
348
349        assert_eq!(reader.count_records()?, 3);
350
351        Ok(())
352    }
353}