simd_csv/
zero_copy_reader.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10/// Builds a [`ZeroCopyReader`] with given configuration.
11#[derive(Clone)]
12pub struct ZeroCopyReaderBuilder {
13    delimiter: u8,
14    quote: u8,
15    buffer_capacity: usize,
16    flexible: bool,
17    has_headers: bool,
18}
19
20impl Default for ZeroCopyReaderBuilder {
21    fn default() -> Self {
22        Self {
23            delimiter: b',',
24            quote: b'"',
25            buffer_capacity: 8192,
26            flexible: false,
27            has_headers: true,
28        }
29    }
30}
31
32impl ZeroCopyReaderBuilder {
33    /// Create a new [`ZeroCopyReaderBuilder`] with default configuration.
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Create a new [`ZeroCopyReaderBuilder`] with provided `capacity`.
39    pub fn with_capacity(capacity: usize) -> Self {
40        let mut reader = Self::default();
41        reader.buffer_capacity(capacity);
42        reader
43    }
44
45    /// Set the delimiter to be used by the created [`ZeroCopyReader`].
46    ///
47    /// This delimiter must be a single byte.
48    ///
49    /// Will default to a comma.
50    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
51        self.delimiter = delimiter;
52        self
53    }
54
55    /// Set the quote char to be used by the created [`ZeroCopyReader`].
56    ///
57    /// This char must be a single byte.
58    ///
59    /// Will default to a double quote.
60    pub fn quote(&mut self, quote: u8) -> &mut Self {
61        self.quote = quote;
62        self
63    }
64
65    /// Set the capacity of the created [`ZeroCopyReader`]'s buffered reader.
66    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
67        self.buffer_capacity = capacity;
68        self
69    }
70
71    /// Indicate whether the created [`ZeroCopyReader`] should be "flexible",
72    /// i.e. whether it should allow reading records having different number of
73    /// fields than the first one.
74    ///
75    /// Will default to `false`.
76    pub fn flexible(&mut self, yes: bool) -> &mut Self {
77        self.flexible = yes;
78        self
79    }
80
81    /// Indicate whether first record must be understood as a header.
82    ///
83    /// Will default to `true`.
84    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
85        self.has_headers = yes;
86        self
87    }
88
89    /// Create a matching [`SplitterBuilder`] from this builder.
90    pub fn to_splitter_builder(&self) -> SplitterBuilder {
91        let mut splitter_builder = SplitterBuilder::new();
92
93        splitter_builder
94            .buffer_capacity(self.buffer_capacity)
95            .has_headers(self.has_headers)
96            .quote(self.quote)
97            .delimiter(self.delimiter);
98
99        splitter_builder
100    }
101
102    /// Create a new [`ZeroCopyReader`] using the provided reader implementing
103    /// [`std::io::Read`].
104    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
105        ZeroCopyReader {
106            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
107            inner: CoreReader::new(self.delimiter, self.quote),
108            byte_headers: ByteRecord::new(),
109            raw_headers: (Vec::new(), Vec::new()),
110            seps: Vec::new(),
111            flexible: self.flexible,
112            has_read: false,
113            must_reemit_headers: !self.has_headers,
114            has_headers: self.has_headers,
115            index: 0,
116        }
117    }
118}
119
120/// An already configured zero-copy CSV reader.
121///
122/// # Configuration
123///
124/// To configure a [`ZeroCopyReader`], if you need a custom delimiter for
125/// instance of if you want to tweak the size of the inner buffer. Check out the
126/// [`ZeroCopyReaderBuilder`].
127pub struct ZeroCopyReader<R> {
128    buffer: ScratchBuffer<R>,
129    inner: CoreReader,
130    byte_headers: ByteRecord,
131    raw_headers: (Vec<usize>, Vec<u8>),
132    seps: Vec<usize>,
133    flexible: bool,
134    has_read: bool,
135    must_reemit_headers: bool,
136    has_headers: bool,
137    index: u64,
138}
139
140impl<R: Read> ZeroCopyReader<R> {
141    pub fn from_reader(reader: R) -> Self {
142        ZeroCopyReaderBuilder::new().from_reader(reader)
143    }
144
145    #[inline]
146    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
147        if self.flexible {
148            return Ok(());
149        }
150
151        let headers_len = self.raw_headers.0.len() + 1;
152
153        if self.has_read && written != headers_len {
154            return Err(Error::new(ErrorKind::UnequalLengths {
155                expected_len: headers_len,
156                len: written,
157                pos: Some((byte, self.index)),
158            }));
159        }
160
161        Ok(())
162    }
163
164    #[inline]
165    fn on_first_read(&mut self) -> error::Result<()> {
166        if self.has_read {
167            return Ok(());
168        }
169
170        // Trimming BOM
171        let input = self.buffer.fill_buf()?;
172        let bom_len = trim_bom(input);
173        self.buffer.consume(bom_len);
174
175        // Reading headers
176        let mut headers_seps = Vec::new();
177        let mut headers_slice = Vec::new();
178        let mut byte_headers = ByteRecord::new();
179
180        if let Some(headers) = self.read_byte_record_impl()? {
181            (headers_seps, headers_slice) = headers.to_parts();
182            byte_headers = headers.to_byte_record();
183        } else {
184            self.must_reemit_headers = false;
185        }
186
187        self.raw_headers = (headers_seps, headers_slice);
188        self.byte_headers = byte_headers;
189
190        self.has_read = true;
191
192        Ok(())
193    }
194
195    /// Attempt to return a reference to this reader's first record.
196    #[inline]
197    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
198        self.on_first_read()?;
199
200        Ok(&self.byte_headers)
201    }
202
203    /// Returns whether this reader has been configured to interpret the first
204    /// record as a header.
205    #[inline]
206    pub fn has_headers(&self) -> bool {
207        self.has_headers
208    }
209
210    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
211        use ReadResult::*;
212
213        self.buffer.reset();
214        self.seps.clear();
215
216        let byte = self.position();
217
218        loop {
219            let seps_offset = self.buffer.saved().len();
220            let input = self.buffer.fill_buf()?;
221
222            let (result, pos) =
223                self.inner
224                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
225
226            match result {
227                End => {
228                    self.buffer.consume(pos);
229                    return Ok(None);
230                }
231                Cr | Lf => {
232                    self.buffer.consume(pos);
233                }
234                InputEmpty => {
235                    self.buffer.save();
236                }
237                Record => {
238                    self.index += 1;
239                    self.check_field_count(byte, self.seps.len() + 1)?;
240
241                    let record = ZeroCopyByteRecord::new(
242                        self.buffer.flush(pos),
243                        &self.seps,
244                        self.inner.quote,
245                    );
246
247                    return Ok(Some(record));
248                }
249            };
250        }
251    }
252
253    #[inline(always)]
254    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
255        self.on_first_read()?;
256
257        if self.must_reemit_headers {
258            self.must_reemit_headers = false;
259            return Ok(Some(ZeroCopyByteRecord::new(
260                &self.raw_headers.1,
261                &self.raw_headers.0,
262                self.inner.quote,
263            )));
264        }
265
266        self.read_byte_record_impl()
267    }
268
269    /// Unwrap into an optional first record (only when the reader was
270    /// configured not to interpret the first record as a header, and when the
271    /// first record was pre-buffered but not yet reemitted), and the underlying
272    /// [`BufReader`].
273    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
274        (
275            self.must_reemit_headers.then_some(self.byte_headers),
276            self.buffer.into_bufreader(),
277        )
278    }
279
280    /// Returns the current byte offset of the reader in the wrapped stream.
281    #[inline(always)]
282    pub fn position(&self) -> u64 {
283        if self.must_reemit_headers {
284            0
285        } else {
286            self.buffer.position()
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use std::io::Cursor;
294
295    use super::*;
296
297    impl<R: Read> ZeroCopyReader<R> {
298        fn from_reader_no_headers(reader: R) -> Self {
299            ZeroCopyReaderBuilder::new()
300                .has_headers(false)
301                .from_reader(reader)
302        }
303    }
304
305    #[test]
306    fn test_read_zero_copy_byte_record() -> error::Result<()> {
307        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
308
309        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
310            .has_headers(false)
311            .from_reader(Cursor::new(csv));
312        let mut records = Vec::new();
313
314        let expected = vec![
315            vec!["name", "surname", "age"],
316            vec![
317                "\"john\"",
318                "\"landy, the \"\"everlasting\"\" bastard\"",
319                "45",
320            ],
321            vec!["lucy", "rose", "\"67\""],
322            vec!["jermaine", "jackson", "\"89\""],
323            vec!["karine", "loucan", "\"52\""],
324            vec!["rose", "\"glib\"", "12"],
325            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
326        ]
327        .into_iter()
328        .map(|record| {
329            record
330                .into_iter()
331                .map(|cell| cell.as_bytes().to_vec())
332                .collect::<Vec<_>>()
333        })
334        .collect::<Vec<_>>();
335
336        while let Some(record) = reader.read_byte_record()? {
337            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
338        }
339
340        assert_eq!(records, expected);
341
342        Ok(())
343    }
344
345    #[test]
346    fn test_empty_row() -> error::Result<()> {
347        let data = "name\n\"\"\nlucy\n\"\"";
348
349        // Zero-copy
350        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
351
352        let expected = vec![
353            vec!["name".as_bytes().to_vec()],
354            vec!["\"\"".as_bytes().to_vec()],
355            vec!["lucy".as_bytes().to_vec()],
356            vec!["\"\"".as_bytes().to_vec()],
357        ];
358
359        // Read
360        let mut records = Vec::new();
361
362        while let Some(record) = reader.read_byte_record()? {
363            records.push(vec![record.as_slice().to_vec()]);
364        }
365
366        assert_eq!(records, expected);
367
368        Ok(())
369    }
370
371    #[test]
372    fn test_byte_headers() -> error::Result<()> {
373        let data = b"name,surname\njohn,dandy";
374
375        // Headers, call before read
376        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
377        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
378        assert_eq!(
379            reader.read_byte_record()?.unwrap().to_byte_record(),
380            brec!["john", "dandy"]
381        );
382
383        // Headers, call after read
384        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
385        assert_eq!(
386            reader.read_byte_record()?.unwrap().to_byte_record(),
387            brec!["john", "dandy"]
388        );
389        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
390
391        // No headers, call before read
392        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
393        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
394        assert_eq!(
395            reader.read_byte_record()?.unwrap().to_byte_record(),
396            brec!["name", "surname"]
397        );
398
399        // No headers, call after read
400        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
401        assert_eq!(
402            reader.read_byte_record()?.unwrap().to_byte_record(),
403            brec!["name", "surname"]
404        );
405        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
406
407        // Headers, empty
408        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
409        assert_eq!(reader.byte_headers()?, &brec![]);
410        assert!(reader.read_byte_record()?.is_none());
411
412        // No headers, empty
413        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
414        assert_eq!(reader.byte_headers()?, &brec![]);
415        assert!(reader.read_byte_record()?.is_none());
416
417        Ok(())
418    }
419}