Skip to main content

simd_csv/
zero_copy_reader.rs

1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::reader::ReaderBuilder;
7use crate::records::{ByteRecord, ZeroCopyByteRecord};
8use crate::splitter::SplitterBuilder;
9use crate::utils::trim_bom;
10
11/// Builds a [`ZeroCopyReader`] with given configuration.
12#[derive(Clone)]
13pub struct ZeroCopyReaderBuilder {
14    delimiter: u8,
15    quote: u8,
16    buffer_capacity: usize,
17    flexible: bool,
18    has_headers: bool,
19}
20
21impl Default for ZeroCopyReaderBuilder {
22    fn default() -> Self {
23        Self {
24            delimiter: b',',
25            quote: b'"',
26            buffer_capacity: 8192,
27            flexible: false,
28            has_headers: true,
29        }
30    }
31}
32
33impl ZeroCopyReaderBuilder {
34    /// Create a new [`ZeroCopyReaderBuilder`] with default configuration.
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Create a new [`ZeroCopyReaderBuilder`] with provided `capacity`.
40    pub fn with_capacity(capacity: usize) -> Self {
41        let mut reader = Self::default();
42        reader.buffer_capacity(capacity);
43        reader
44    }
45
46    /// Set the delimiter to be used by the created [`ZeroCopyReader`].
47    ///
48    /// This delimiter must be a single byte.
49    ///
50    /// Will default to a comma.
51    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
52        self.delimiter = delimiter;
53        self
54    }
55
56    /// Set the quote char to be used by the created [`ZeroCopyReader`].
57    ///
58    /// This char must be a single byte.
59    ///
60    /// Will default to a double quote.
61    pub fn quote(&mut self, quote: u8) -> &mut Self {
62        self.quote = quote;
63        self
64    }
65
66    /// Set the capacity of the created [`ZeroCopyReader`]'s buffered reader.
67    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
68        self.buffer_capacity = capacity;
69        self
70    }
71
72    /// Indicate whether the created [`ZeroCopyReader`] should be "flexible",
73    /// i.e. whether it should allow reading records having different number of
74    /// fields than the first one.
75    ///
76    /// Will default to `false`.
77    pub fn flexible(&mut self, yes: bool) -> &mut Self {
78        self.flexible = yes;
79        self
80    }
81
82    /// Indicate whether first record must be understood as a header.
83    ///
84    /// Will default to `true`.
85    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
86        self.has_headers = yes;
87        self
88    }
89
90    /// Create a matching [`SplitterBuilder`] from this builder.
91    pub fn to_splitter_builder(&self) -> SplitterBuilder {
92        let mut splitter_builder = SplitterBuilder::new();
93
94        splitter_builder
95            .buffer_capacity(self.buffer_capacity)
96            .has_headers(self.has_headers)
97            .quote(self.quote)
98            .delimiter(self.delimiter);
99
100        splitter_builder
101    }
102
103    /// Create a matching [`ReaderBuilder`] from this builder.
104    pub fn to_reader_builder(&self) -> ReaderBuilder {
105        let mut reader_builder = ReaderBuilder::new();
106
107        reader_builder
108            .buffer_capacity(self.buffer_capacity)
109            .has_headers(self.has_headers)
110            .quote(self.quote)
111            .delimiter(self.delimiter);
112
113        reader_builder
114    }
115
116    /// Create a new [`ZeroCopyReader`] using the provided reader implementing
117    /// [`std::io::Read`].
118    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
119        ZeroCopyReader {
120            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
121            inner: CoreReader::new(self.delimiter, self.quote),
122            byte_headers: ByteRecord::new(),
123            raw_headers: (Vec::new(), Vec::new()),
124            seps: Vec::new(),
125            flexible: self.flexible,
126            has_read: false,
127            must_reemit_headers: !self.has_headers,
128            has_headers: self.has_headers,
129            index: 0,
130        }
131    }
132}
133
134/// An already configured zero-copy CSV reader.
135///
136/// # Configuration
137///
138/// To configure a [`ZeroCopyReader`], if you need a custom delimiter for
139/// instance of if you want to tweak the size of the inner buffer. Check out the
140/// [`ZeroCopyReaderBuilder`].
141pub struct ZeroCopyReader<R> {
142    buffer: ScratchBuffer<R>,
143    inner: CoreReader,
144    byte_headers: ByteRecord,
145    raw_headers: (Vec<usize>, Vec<u8>),
146    seps: Vec<usize>,
147    flexible: bool,
148    has_read: bool,
149    must_reemit_headers: bool,
150    has_headers: bool,
151    index: u64,
152}
153
154impl<R: Read> ZeroCopyReader<R> {
155    pub fn from_reader(reader: R) -> Self {
156        ZeroCopyReaderBuilder::new().from_reader(reader)
157    }
158
159    #[inline]
160    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
161        if self.flexible {
162            return Ok(());
163        }
164
165        let headers_len = self.raw_headers.0.len() + 1;
166
167        if self.has_read && written != headers_len {
168            return Err(Error::new(ErrorKind::UnequalLengths {
169                expected_len: headers_len,
170                len: written,
171                pos: Some((byte, self.index)),
172            }));
173        }
174
175        Ok(())
176    }
177
178    #[inline]
179    fn on_first_read(&mut self) -> error::Result<()> {
180        if self.has_read {
181            return Ok(());
182        }
183
184        // Trimming BOM
185        let input = self.buffer.fill_buf()?;
186        let bom_len = trim_bom(input);
187        self.buffer.consume(bom_len);
188
189        // Reading headers
190        let mut headers_seps = Vec::new();
191        let mut headers_slice = Vec::new();
192        let mut byte_headers = ByteRecord::new();
193
194        if let Some(headers) = self.read_byte_record_impl()? {
195            (headers_seps, headers_slice) = headers.to_parts();
196            byte_headers = headers.to_byte_record();
197        } else {
198            self.must_reemit_headers = false;
199        }
200
201        self.raw_headers = (headers_seps, headers_slice);
202        self.byte_headers = byte_headers;
203
204        self.has_read = true;
205
206        Ok(())
207    }
208
209    /// Attempt to return a reference to this reader's first record.
210    #[inline]
211    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
212        self.on_first_read()?;
213
214        Ok(&self.byte_headers)
215    }
216
217    /// Returns whether this reader has been configured to interpret the first
218    /// record as a header.
219    #[inline]
220    pub fn has_headers(&self) -> bool {
221        self.has_headers
222    }
223
224    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
225        use ReadResult::*;
226
227        self.buffer.reset();
228        self.seps.clear();
229
230        let byte = self.position();
231
232        loop {
233            let seps_offset = self.buffer.saved().len();
234            let input = self.buffer.fill_buf()?;
235
236            let (result, pos) =
237                self.inner
238                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
239
240            match result {
241                End => {
242                    self.buffer.consume(pos);
243                    return Ok(None);
244                }
245                Cr | Lf => {
246                    self.buffer.consume(pos);
247                }
248                InputEmpty => {
249                    self.buffer.save();
250                }
251                Record => {
252                    self.index += 1;
253                    self.check_field_count(byte, self.seps.len() + 1)?;
254
255                    let bytes = self.buffer.flush(pos);
256
257                    let record = ZeroCopyByteRecord::new(bytes, &self.seps, self.inner.quote);
258
259                    return Ok(Some(record));
260                }
261            };
262        }
263    }
264
265    #[inline(always)]
266    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
267        self.on_first_read()?;
268
269        if self.must_reemit_headers {
270            self.must_reemit_headers = false;
271            return Ok(Some(ZeroCopyByteRecord::new(
272                &self.raw_headers.1,
273                &self.raw_headers.0,
274                self.inner.quote,
275            )));
276        }
277
278        self.read_byte_record_impl()
279    }
280
281    /// Returns the current byte offset of the reader in the wrapped stream.
282    #[inline(always)]
283    pub fn position(&self) -> u64 {
284        if self.must_reemit_headers {
285            0
286        } else {
287            self.buffer.position()
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use std::io::Cursor;
295
296    use super::*;
297
298    impl<R: Read> ZeroCopyReader<R> {
299        fn from_reader_no_headers(reader: R) -> Self {
300            ZeroCopyReaderBuilder::new()
301                .has_headers(false)
302                .from_reader(reader)
303        }
304    }
305
306    #[test]
307    fn test_read_zero_copy_byte_record() -> error::Result<()> {
308        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
309
310        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
311            .has_headers(false)
312            .from_reader(Cursor::new(csv));
313        let mut records = Vec::new();
314
315        let expected = vec![
316            vec!["name", "surname", "age"],
317            vec![
318                "\"john\"",
319                "\"landy, the \"\"everlasting\"\" bastard\"",
320                "45",
321            ],
322            vec!["lucy", "rose", "\"67\""],
323            vec!["jermaine", "jackson", "\"89\""],
324            vec!["karine", "loucan", "\"52\""],
325            vec!["rose", "\"glib\"", "12"],
326            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
327        ]
328        .into_iter()
329        .map(|record| {
330            record
331                .into_iter()
332                .map(|cell| cell.as_bytes().to_vec())
333                .collect::<Vec<_>>()
334        })
335        .collect::<Vec<_>>();
336
337        while let Some(record) = reader.read_byte_record()? {
338            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
339        }
340
341        assert_eq!(records, expected);
342
343        Ok(())
344    }
345
346    #[test]
347    fn test_empty_row() -> error::Result<()> {
348        let data = "name\n\"\"\nlucy\n\"\"";
349
350        // Zero-copy
351        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
352
353        let expected = vec![
354            vec!["name".as_bytes().to_vec()],
355            vec!["\"\"".as_bytes().to_vec()],
356            vec!["lucy".as_bytes().to_vec()],
357            vec!["\"\"".as_bytes().to_vec()],
358        ];
359
360        // Read
361        let mut records = Vec::new();
362
363        while let Some(record) = reader.read_byte_record()? {
364            records.push(vec![record.as_slice().to_vec()]);
365        }
366
367        assert_eq!(records, expected);
368
369        Ok(())
370    }
371
372    #[test]
373    fn test_byte_headers() -> error::Result<()> {
374        let data = b"name,surname\njohn,dandy";
375
376        // Headers, call before read
377        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
378        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
379        assert_eq!(
380            reader.read_byte_record()?.unwrap().to_byte_record(),
381            brec!["john", "dandy"]
382        );
383
384        // Headers, call after read
385        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
386        assert_eq!(
387            reader.read_byte_record()?.unwrap().to_byte_record(),
388            brec!["john", "dandy"]
389        );
390        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
391
392        // No headers, call before read
393        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
394        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
395        assert_eq!(
396            reader.read_byte_record()?.unwrap().to_byte_record(),
397            brec!["name", "surname"]
398        );
399
400        // No headers, call after read
401        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
402        assert_eq!(
403            reader.read_byte_record()?.unwrap().to_byte_record(),
404            brec!["name", "surname"]
405        );
406        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
407
408        // Headers, empty
409        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
410        assert_eq!(reader.byte_headers()?, &brec![]);
411        assert!(reader.read_byte_record()?.is_none());
412
413        // No headers, empty
414        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
415        assert_eq!(reader.byte_headers()?, &brec![]);
416        assert!(reader.read_byte_record()?.is_none());
417
418        Ok(())
419    }
420}