Skip to main content

simd_csv/
zero_copy_reader.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::reader::{ReaderBuilder};
7use crate::records::{ByteRecord, ZeroCopyByteRecord};
8use crate::splitter::SplitterBuilder;
9use crate::utils::trim_bom;
10
11/// Builds a [`ZeroCopyReader`] with given configuration.
12#[derive(Clone)]
13pub struct ZeroCopyReaderBuilder {
14    delimiter: u8,
15    quote: u8,
16    buffer_capacity: usize,
17    flexible: bool,
18    has_headers: bool,
19}
20
21impl Default for ZeroCopyReaderBuilder {
22    fn default() -> Self {
23        Self {
24            delimiter: b',',
25            quote: b'"',
26            buffer_capacity: 8192,
27            flexible: false,
28            has_headers: true,
29        }
30    }
31}
32
33impl ZeroCopyReaderBuilder {
34    /// Create a new [`ZeroCopyReaderBuilder`] with default configuration.
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Create a new [`ZeroCopyReaderBuilder`] with provided `capacity`.
40    pub fn with_capacity(capacity: usize) -> Self {
41        let mut reader = Self::default();
42        reader.buffer_capacity(capacity);
43        reader
44    }
45
46    /// Set the delimiter to be used by the created [`ZeroCopyReader`].
47    ///
48    /// This delimiter must be a single byte.
49    ///
50    /// Will default to a comma.
51    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
52        self.delimiter = delimiter;
53        self
54    }
55
56    /// Set the quote char to be used by the created [`ZeroCopyReader`].
57    ///
58    /// This char must be a single byte.
59    ///
60    /// Will default to a double quote.
61    pub fn quote(&mut self, quote: u8) -> &mut Self {
62        self.quote = quote;
63        self
64    }
65
66    /// Set the capacity of the created [`ZeroCopyReader`]'s buffered reader.
67    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
68        self.buffer_capacity = capacity;
69        self
70    }
71
72    /// Indicate whether the created [`ZeroCopyReader`] should be "flexible",
73    /// i.e. whether it should allow reading records having different number of
74    /// fields than the first one.
75    ///
76    /// Will default to `false`.
77    pub fn flexible(&mut self, yes: bool) -> &mut Self {
78        self.flexible = yes;
79        self
80    }
81
82    /// Indicate whether first record must be understood as a header.
83    ///
84    /// Will default to `true`.
85    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
86        self.has_headers = yes;
87        self
88    }
89
90    /// Create a matching [`SplitterBuilder`] from this builder.
91    pub fn to_splitter_builder(&self) -> SplitterBuilder {
92        let mut splitter_builder = SplitterBuilder::new();
93
94        splitter_builder
95            .buffer_capacity(self.buffer_capacity)
96            .has_headers(self.has_headers)
97            .quote(self.quote)
98            .delimiter(self.delimiter);
99
100        splitter_builder
101    }
102
103    /// Create a matching [`ReaderBuilder`] from this builder.
104    pub fn to_reader_builder(&self) -> ReaderBuilder {
105        let mut reader_builder = ReaderBuilder::new();
106
107        reader_builder
108            .buffer_capacity(self.buffer_capacity)
109            .has_headers(self.has_headers)
110            .quote(self.quote)
111            .delimiter(self.delimiter);
112
113        reader_builder
114    }
115
116    /// Create a new [`ZeroCopyReader`] using the provided reader implementing
117    /// [`std::io::Read`].
118    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
119        ZeroCopyReader {
120            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
121            inner: CoreReader::new(self.delimiter, self.quote),
122            byte_headers: ByteRecord::new(),
123            raw_headers: (Vec::new(), Vec::new()),
124            seps: Vec::new(),
125            flexible: self.flexible,
126            has_read: false,
127            must_reemit_headers: !self.has_headers,
128            has_headers: self.has_headers,
129            index: 0,
130        }
131    }
132}
133
134/// An already configured zero-copy CSV reader.
135///
136/// # Configuration
137///
138/// To configure a [`ZeroCopyReader`], if you need a custom delimiter for
139/// instance of if you want to tweak the size of the inner buffer. Check out the
140/// [`ZeroCopyReaderBuilder`].
141pub struct ZeroCopyReader<R> {
142    buffer: ScratchBuffer<R>,
143    inner: CoreReader,
144    byte_headers: ByteRecord,
145    raw_headers: (Vec<usize>, Vec<u8>),
146    seps: Vec<usize>,
147    flexible: bool,
148    has_read: bool,
149    must_reemit_headers: bool,
150    has_headers: bool,
151    index: u64,
152}
153
154impl<R: Read> ZeroCopyReader<R> {
155    pub fn from_reader(reader: R) -> Self {
156        ZeroCopyReaderBuilder::new().from_reader(reader)
157    }
158
159    #[inline]
160    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
161        if self.flexible {
162            return Ok(());
163        }
164
165        let headers_len = self.raw_headers.0.len() + 1;
166
167        if self.has_read && written != headers_len {
168            return Err(Error::new(ErrorKind::UnequalLengths {
169                expected_len: headers_len,
170                len: written,
171                pos: Some((byte, self.index)),
172            }));
173        }
174
175        Ok(())
176    }
177
178    #[inline]
179    fn on_first_read(&mut self) -> error::Result<()> {
180        if self.has_read {
181            return Ok(());
182        }
183
184        // Trimming BOM
185        let input = self.buffer.fill_buf()?;
186        let bom_len = trim_bom(input);
187        self.buffer.consume(bom_len);
188
189        // Reading headers
190        let mut headers_seps = Vec::new();
191        let mut headers_slice = Vec::new();
192        let mut byte_headers = ByteRecord::new();
193
194        if let Some(headers) = self.read_byte_record_impl()? {
195            (headers_seps, headers_slice) = headers.to_parts();
196            byte_headers = headers.to_byte_record();
197        } else {
198            self.must_reemit_headers = false;
199        }
200
201        self.raw_headers = (headers_seps, headers_slice);
202        self.byte_headers = byte_headers;
203
204        self.has_read = true;
205
206        Ok(())
207    }
208
209    /// Attempt to return a reference to this reader's first record.
210    #[inline]
211    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
212        self.on_first_read()?;
213
214        Ok(&self.byte_headers)
215    }
216
217    /// Returns whether this reader has been configured to interpret the first
218    /// record as a header.
219    #[inline]
220    pub fn has_headers(&self) -> bool {
221        self.has_headers
222    }
223
224    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
225        use ReadResult::*;
226
227        self.buffer.reset();
228        self.seps.clear();
229
230        let byte = self.position();
231
232        loop {
233            let seps_offset = self.buffer.saved().len();
234            let input = self.buffer.fill_buf()?;
235
236            let (result, pos) =
237                self.inner
238                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
239
240            match result {
241                End => {
242                    self.buffer.consume(pos);
243                    return Ok(None);
244                }
245                Cr | Lf => {
246                    self.buffer.consume(pos);
247                }
248                InputEmpty => {
249                    self.buffer.save();
250                }
251                Record => {
252                    self.index += 1;
253                    self.check_field_count(byte, self.seps.len() + 1)?;
254
255                    let record = ZeroCopyByteRecord::new(
256                        self.buffer.flush(pos),
257                        &self.seps,
258                        self.inner.quote,
259                    );
260
261                    return Ok(Some(record));
262                }
263            };
264        }
265    }
266
267    #[inline(always)]
268    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
269        self.on_first_read()?;
270
271        if self.must_reemit_headers {
272            self.must_reemit_headers = false;
273            return Ok(Some(ZeroCopyByteRecord::new(
274                &self.raw_headers.1,
275                &self.raw_headers.0,
276                self.inner.quote,
277            )));
278        }
279
280        self.read_byte_record_impl()
281    }
282
283    /// Unwrap into an optional first record (only when the reader was
284    /// configured not to interpret the first record as a header, and when the
285    /// first record was pre-buffered but not yet reemitted), and the underlying
286    /// [`BufReader`].
287    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
288        (
289            self.must_reemit_headers.then_some(self.byte_headers),
290            self.buffer.into_bufreader(),
291        )
292    }
293
294    /// Returns the current byte offset of the reader in the wrapped stream.
295    #[inline(always)]
296    pub fn position(&self) -> u64 {
297        if self.must_reemit_headers {
298            0
299        } else {
300            self.buffer.position()
301        }
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use std::io::Cursor;
308
309    use super::*;
310
311    impl<R: Read> ZeroCopyReader<R> {
312        fn from_reader_no_headers(reader: R) -> Self {
313            ZeroCopyReaderBuilder::new()
314                .has_headers(false)
315                .from_reader(reader)
316        }
317    }
318
319    #[test]
320    fn test_read_zero_copy_byte_record() -> error::Result<()> {
321        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
322
323        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
324            .has_headers(false)
325            .from_reader(Cursor::new(csv));
326        let mut records = Vec::new();
327
328        let expected = vec![
329            vec!["name", "surname", "age"],
330            vec![
331                "\"john\"",
332                "\"landy, the \"\"everlasting\"\" bastard\"",
333                "45",
334            ],
335            vec!["lucy", "rose", "\"67\""],
336            vec!["jermaine", "jackson", "\"89\""],
337            vec!["karine", "loucan", "\"52\""],
338            vec!["rose", "\"glib\"", "12"],
339            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
340        ]
341        .into_iter()
342        .map(|record| {
343            record
344                .into_iter()
345                .map(|cell| cell.as_bytes().to_vec())
346                .collect::<Vec<_>>()
347        })
348        .collect::<Vec<_>>();
349
350        while let Some(record) = reader.read_byte_record()? {
351            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
352        }
353
354        assert_eq!(records, expected);
355
356        Ok(())
357    }
358
359    #[test]
360    fn test_empty_row() -> error::Result<()> {
361        let data = "name\n\"\"\nlucy\n\"\"";
362
363        // Zero-copy
364        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
365
366        let expected = vec![
367            vec!["name".as_bytes().to_vec()],
368            vec!["\"\"".as_bytes().to_vec()],
369            vec!["lucy".as_bytes().to_vec()],
370            vec!["\"\"".as_bytes().to_vec()],
371        ];
372
373        // Read
374        let mut records = Vec::new();
375
376        while let Some(record) = reader.read_byte_record()? {
377            records.push(vec![record.as_slice().to_vec()]);
378        }
379
380        assert_eq!(records, expected);
381
382        Ok(())
383    }
384
385    #[test]
386    fn test_byte_headers() -> error::Result<()> {
387        let data = b"name,surname\njohn,dandy";
388
389        // Headers, call before read
390        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
391        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
392        assert_eq!(
393            reader.read_byte_record()?.unwrap().to_byte_record(),
394            brec!["john", "dandy"]
395        );
396
397        // Headers, call after read
398        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
399        assert_eq!(
400            reader.read_byte_record()?.unwrap().to_byte_record(),
401            brec!["john", "dandy"]
402        );
403        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
404
405        // No headers, call before read
406        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
407        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
408        assert_eq!(
409            reader.read_byte_record()?.unwrap().to_byte_record(),
410            brec!["name", "surname"]
411        );
412
413        // No headers, call after read
414        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
415        assert_eq!(
416            reader.read_byte_record()?.unwrap().to_byte_record(),
417            brec!["name", "surname"]
418        );
419        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
420
421        // Headers, empty
422        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
423        assert_eq!(reader.byte_headers()?, &brec![]);
424        assert!(reader.read_byte_record()?.is_none());
425
426        // No headers, empty
427        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
428        assert_eq!(reader.byte_headers()?, &brec![]);
429        assert!(reader.read_byte_record()?.is_none());
430
431        Ok(())
432    }
433}