simd_csv/
zero_copy_reader.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10/// Builds a [`ZeroCopyReader`] with given configuration.
11#[derive(Clone)]
12pub struct ZeroCopyReaderBuilder {
13    delimiter: u8,
14    quote: u8,
15    buffer_capacity: usize,
16    flexible: bool,
17    has_headers: bool,
18}
19
20impl Default for ZeroCopyReaderBuilder {
21    fn default() -> Self {
22        Self {
23            delimiter: b',',
24            quote: b'"',
25            buffer_capacity: 8192,
26            flexible: false,
27            has_headers: true,
28        }
29    }
30}
31
32impl ZeroCopyReaderBuilder {
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    pub fn with_capacity(capacity: usize) -> Self {
38        let mut reader = Self::default();
39        reader.buffer_capacity(capacity);
40        reader
41    }
42
43    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
44        self.delimiter = delimiter;
45        self
46    }
47
48    pub fn quote(&mut self, quote: u8) -> &mut Self {
49        self.quote = quote;
50        self
51    }
52
53    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
54        self.buffer_capacity = capacity;
55        self
56    }
57
58    pub fn flexible(&mut self, yes: bool) -> &mut Self {
59        self.flexible = yes;
60        self
61    }
62
63    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
64        self.has_headers = yes;
65        self
66    }
67
68    pub fn to_splitter_builder(&self) -> SplitterBuilder {
69        let mut splitter_builder = SplitterBuilder::new();
70
71        splitter_builder
72            .buffer_capacity(self.buffer_capacity)
73            .has_headers(self.has_headers)
74            .quote(self.quote)
75            .delimiter(self.delimiter);
76
77        splitter_builder
78    }
79
80    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
81        ZeroCopyReader {
82            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
83            inner: CoreReader::new(self.delimiter, self.quote),
84            byte_headers: ByteRecord::new(),
85            raw_headers: (Vec::new(), Vec::new()),
86            seps: Vec::new(),
87            flexible: self.flexible,
88            has_read: false,
89            must_reemit_headers: !self.has_headers,
90            has_headers: self.has_headers,
91            index: 0,
92        }
93    }
94}
95
96/// An already configured zero-copy CSV reader.
97///
98/// # Configuration
99///
100/// To configure a [`ZeroCopyReader`], if you need a custom delimiter for
101/// instance of if you want to tweak the size of the inner buffer. Check out the
102/// [`ZeroCopyReaderBuilder`].
103pub struct ZeroCopyReader<R> {
104    buffer: ScratchBuffer<R>,
105    inner: CoreReader,
106    byte_headers: ByteRecord,
107    raw_headers: (Vec<usize>, Vec<u8>),
108    seps: Vec<usize>,
109    flexible: bool,
110    has_read: bool,
111    must_reemit_headers: bool,
112    has_headers: bool,
113    index: u64,
114}
115
116impl<R: Read> ZeroCopyReader<R> {
117    pub fn from_reader(reader: R) -> Self {
118        ZeroCopyReaderBuilder::new().from_reader(reader)
119    }
120
121    #[inline]
122    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
123        if self.flexible {
124            return Ok(());
125        }
126
127        let headers_len = self.raw_headers.0.len() + 1;
128
129        if self.has_read && written != headers_len {
130            return Err(Error::new(ErrorKind::UnequalLengths {
131                expected_len: headers_len,
132                len: written,
133                pos: Some((byte, self.index)),
134            }));
135        }
136
137        Ok(())
138    }
139
140    #[inline]
141    fn on_first_read(&mut self) -> error::Result<()> {
142        if self.has_read {
143            return Ok(());
144        }
145
146        // Trimming BOM
147        let input = self.buffer.fill_buf()?;
148        let bom_len = trim_bom(input);
149        self.buffer.consume(bom_len);
150
151        // Reading headers
152        let mut headers_seps = Vec::new();
153        let mut headers_slice = Vec::new();
154        let mut byte_headers = ByteRecord::new();
155
156        if let Some(headers) = self.read_byte_record_impl()? {
157            (headers_seps, headers_slice) = headers.to_parts();
158            byte_headers = headers.to_byte_record();
159        } else {
160            self.must_reemit_headers = false;
161        }
162
163        self.raw_headers = (headers_seps, headers_slice);
164        self.byte_headers = byte_headers;
165
166        self.has_read = true;
167
168        Ok(())
169    }
170
171    #[inline]
172    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
173        self.on_first_read()?;
174
175        Ok(&self.byte_headers)
176    }
177
178    /// Returns whether this reader has been configured to interpret the first
179    /// record as a header.
180    #[inline]
181    pub fn has_headers(&self) -> bool {
182        self.has_headers
183    }
184
185    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
186        use ReadResult::*;
187
188        self.buffer.reset();
189        self.seps.clear();
190
191        let byte = self.position();
192
193        loop {
194            let seps_offset = self.buffer.saved().len();
195            let input = self.buffer.fill_buf()?;
196
197            let (result, pos) =
198                self.inner
199                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
200
201            match result {
202                End => {
203                    self.buffer.consume(pos);
204                    return Ok(None);
205                }
206                Cr | Lf => {
207                    self.buffer.consume(pos);
208                }
209                InputEmpty => {
210                    self.buffer.save();
211                }
212                Record => {
213                    self.index += 1;
214                    self.check_field_count(byte, self.seps.len() + 1)?;
215
216                    let record = ZeroCopyByteRecord::new(
217                        self.buffer.flush(pos),
218                        &self.seps,
219                        self.inner.quote,
220                    );
221
222                    return Ok(Some(record));
223                }
224            };
225        }
226    }
227
228    #[inline(always)]
229    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
230        self.on_first_read()?;
231
232        if self.must_reemit_headers {
233            self.must_reemit_headers = false;
234            return Ok(Some(ZeroCopyByteRecord::new(
235                &self.raw_headers.1,
236                &self.raw_headers.0,
237                self.inner.quote,
238            )));
239        }
240
241        self.read_byte_record_impl()
242    }
243
244    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
245        (
246            self.must_reemit_headers.then_some(self.byte_headers),
247            self.buffer.into_bufreader(),
248        )
249    }
250
251    #[inline(always)]
252    pub fn position(&self) -> u64 {
253        if self.must_reemit_headers {
254            0
255        } else {
256            self.buffer.position()
257        }
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use std::io::Cursor;
264
265    use super::*;
266
267    impl<R: Read> ZeroCopyReader<R> {
268        fn from_reader_no_headers(reader: R) -> Self {
269            ZeroCopyReaderBuilder::new()
270                .has_headers(false)
271                .from_reader(reader)
272        }
273    }
274
275    #[test]
276    fn test_read_zero_copy_byte_record() -> error::Result<()> {
277        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
278
279        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
280            .has_headers(false)
281            .from_reader(Cursor::new(csv));
282        let mut records = Vec::new();
283
284        let expected = vec![
285            vec!["name", "surname", "age"],
286            vec![
287                "\"john\"",
288                "\"landy, the \"\"everlasting\"\" bastard\"",
289                "45",
290            ],
291            vec!["lucy", "rose", "\"67\""],
292            vec!["jermaine", "jackson", "\"89\""],
293            vec!["karine", "loucan", "\"52\""],
294            vec!["rose", "\"glib\"", "12"],
295            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
296        ]
297        .into_iter()
298        .map(|record| {
299            record
300                .into_iter()
301                .map(|cell| cell.as_bytes().to_vec())
302                .collect::<Vec<_>>()
303        })
304        .collect::<Vec<_>>();
305
306        while let Some(record) = reader.read_byte_record()? {
307            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
308        }
309
310        assert_eq!(records, expected);
311
312        Ok(())
313    }
314
315    #[test]
316    fn test_empty_row() -> error::Result<()> {
317        let data = "name\n\"\"\nlucy\n\"\"";
318
319        // Zero-copy
320        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
321
322        let expected = vec![
323            vec!["name".as_bytes().to_vec()],
324            vec!["\"\"".as_bytes().to_vec()],
325            vec!["lucy".as_bytes().to_vec()],
326            vec!["\"\"".as_bytes().to_vec()],
327        ];
328
329        // Read
330        let mut records = Vec::new();
331
332        while let Some(record) = reader.read_byte_record()? {
333            records.push(vec![record.as_slice().to_vec()]);
334        }
335
336        assert_eq!(records, expected);
337
338        Ok(())
339    }
340
341    #[test]
342    fn test_byte_headers() -> error::Result<()> {
343        let data = b"name,surname\njohn,dandy";
344
345        // Headers, call before read
346        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
347        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
348        assert_eq!(
349            reader.read_byte_record()?.unwrap().to_byte_record(),
350            brec!["john", "dandy"]
351        );
352
353        // Headers, call after read
354        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
355        assert_eq!(
356            reader.read_byte_record()?.unwrap().to_byte_record(),
357            brec!["john", "dandy"]
358        );
359        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
360
361        // No headers, call before read
362        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
363        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
364        assert_eq!(
365            reader.read_byte_record()?.unwrap().to_byte_record(),
366            brec!["name", "surname"]
367        );
368
369        // No headers, call after read
370        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
371        assert_eq!(
372            reader.read_byte_record()?.unwrap().to_byte_record(),
373            brec!["name", "surname"]
374        );
375        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
376
377        // Headers, empty
378        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
379        assert_eq!(reader.byte_headers()?, &brec![]);
380        assert!(reader.read_byte_record()?.is_none());
381
382        // No headers, empty
383        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
384        assert_eq!(reader.byte_headers()?, &brec![]);
385        assert!(reader.read_byte_record()?.is_none());
386
387        Ok(())
388    }
389}