simd_csv/
zero_copy_reader.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10#[derive(Clone)]
11pub struct ZeroCopyReaderBuilder {
12    delimiter: u8,
13    quote: u8,
14    buffer_capacity: usize,
15    flexible: bool,
16    has_headers: bool,
17}
18
19impl Default for ZeroCopyReaderBuilder {
20    fn default() -> Self {
21        Self {
22            delimiter: b',',
23            quote: b'"',
24            buffer_capacity: 8192,
25            flexible: false,
26            has_headers: true,
27        }
28    }
29}
30
31impl ZeroCopyReaderBuilder {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn with_capacity(capacity: usize) -> Self {
37        let mut reader = Self::default();
38        reader.buffer_capacity(capacity);
39        reader
40    }
41
42    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
43        self.delimiter = delimiter;
44        self
45    }
46
47    pub fn quote(&mut self, quote: u8) -> &mut Self {
48        self.quote = quote;
49        self
50    }
51
52    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
53        self.buffer_capacity = capacity;
54        self
55    }
56
57    pub fn flexible(&mut self, yes: bool) -> &mut Self {
58        self.flexible = yes;
59        self
60    }
61
62    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
63        self.has_headers = yes;
64        self
65    }
66
67    pub fn to_splitter_builder(&self) -> SplitterBuilder {
68        let mut splitter_builder = SplitterBuilder::new();
69
70        splitter_builder
71            .buffer_capacity(self.buffer_capacity)
72            .has_headers(self.has_headers)
73            .quote(self.quote)
74            .delimiter(self.delimiter);
75
76        splitter_builder
77    }
78
79    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
80        ZeroCopyReader {
81            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
82            inner: CoreReader::new(self.delimiter, self.quote),
83            byte_headers: ByteRecord::new(),
84            raw_headers: (Vec::new(), Vec::new()),
85            seps: Vec::new(),
86            flexible: self.flexible,
87            has_read: false,
88            must_reemit_headers: !self.has_headers,
89            has_headers: self.has_headers,
90            index: 0,
91        }
92    }
93}
94
95pub struct ZeroCopyReader<R> {
96    buffer: ScratchBuffer<R>,
97    inner: CoreReader,
98    byte_headers: ByteRecord,
99    raw_headers: (Vec<usize>, Vec<u8>),
100    seps: Vec<usize>,
101    flexible: bool,
102    has_read: bool,
103    must_reemit_headers: bool,
104    has_headers: bool,
105    index: u64,
106}
107
108impl<R: Read> ZeroCopyReader<R> {
109    pub fn from_reader(reader: R) -> Self {
110        ZeroCopyReaderBuilder::new().from_reader(reader)
111    }
112
113    #[inline]
114    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
115        if self.flexible {
116            return Ok(());
117        }
118
119        let headers_len = self.raw_headers.0.len() + 1;
120
121        if self.has_read && written != headers_len {
122            return Err(Error::new(ErrorKind::UnequalLengths {
123                expected_len: headers_len,
124                len: written,
125                pos: Some((byte, self.index)),
126            }));
127        }
128
129        Ok(())
130    }
131
132    #[inline]
133    fn on_first_read(&mut self) -> error::Result<()> {
134        if self.has_read {
135            return Ok(());
136        }
137
138        // Trimming BOM
139        let input = self.buffer.fill_buf()?;
140        let bom_len = trim_bom(input);
141        self.buffer.consume(bom_len);
142
143        // Reading headers
144        let mut headers_seps = Vec::new();
145        let mut headers_slice = Vec::new();
146        let mut byte_headers = ByteRecord::new();
147
148        if let Some(headers) = self.read_byte_record_impl()? {
149            (headers_seps, headers_slice) = headers.to_parts();
150            byte_headers = headers.to_byte_record();
151        } else {
152            self.must_reemit_headers = false;
153        }
154
155        self.raw_headers = (headers_seps, headers_slice);
156        self.byte_headers = byte_headers;
157
158        self.has_read = true;
159
160        Ok(())
161    }
162
163    #[inline]
164    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
165        self.on_first_read()?;
166
167        Ok(&self.byte_headers)
168    }
169
170    #[inline]
171    pub fn has_headers(&self) -> bool {
172        self.has_headers
173    }
174
175    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
176        use ReadResult::*;
177
178        self.buffer.reset();
179        self.seps.clear();
180
181        let byte = self.position();
182
183        loop {
184            let seps_offset = self.buffer.saved().len();
185            let input = self.buffer.fill_buf()?;
186
187            let (result, pos) =
188                self.inner
189                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
190
191            match result {
192                End => {
193                    self.buffer.consume(pos);
194                    return Ok(None);
195                }
196                Cr | Lf => {
197                    self.buffer.consume(pos);
198                }
199                InputEmpty => {
200                    self.buffer.save();
201                }
202                Record => {
203                    self.index += 1;
204                    self.check_field_count(byte, self.seps.len() + 1)?;
205
206                    let record = ZeroCopyByteRecord::new(
207                        self.buffer.flush(pos),
208                        &self.seps,
209                        self.inner.quote,
210                    );
211
212                    return Ok(Some(record));
213                }
214            };
215        }
216    }
217
218    #[inline(always)]
219    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
220        self.on_first_read()?;
221
222        if self.must_reemit_headers {
223            self.must_reemit_headers = false;
224            return Ok(Some(ZeroCopyByteRecord::new(
225                &self.raw_headers.1,
226                &self.raw_headers.0,
227                self.inner.quote,
228            )));
229        }
230
231        self.read_byte_record_impl()
232    }
233
234    pub fn into_bufreader(self) -> BufReader<R> {
235        self.buffer.into_bufreader()
236    }
237
238    #[inline(always)]
239    pub fn position(&self) -> u64 {
240        if self.must_reemit_headers {
241            0
242        } else {
243            self.buffer.position()
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use std::io::Cursor;
251
252    use crate::brec;
253
254    use super::*;
255
256    impl<R: Read> ZeroCopyReader<R> {
257        fn from_reader_no_headers(reader: R) -> Self {
258            ZeroCopyReaderBuilder::new()
259                .has_headers(false)
260                .from_reader(reader)
261        }
262    }
263
264    #[test]
265    fn test_read_zero_copy_byte_record() -> error::Result<()> {
266        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
267
268        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
269            .has_headers(false)
270            .from_reader(Cursor::new(csv));
271        let mut records = Vec::new();
272
273        let expected = vec![
274            vec!["name", "surname", "age"],
275            vec![
276                "\"john\"",
277                "\"landy, the \"\"everlasting\"\" bastard\"",
278                "45",
279            ],
280            vec!["lucy", "rose", "\"67\""],
281            vec!["jermaine", "jackson", "\"89\""],
282            vec!["karine", "loucan", "\"52\""],
283            vec!["rose", "\"glib\"", "12"],
284            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
285        ]
286        .into_iter()
287        .map(|record| {
288            record
289                .into_iter()
290                .map(|cell| cell.as_bytes().to_vec())
291                .collect::<Vec<_>>()
292        })
293        .collect::<Vec<_>>();
294
295        while let Some(record) = reader.read_byte_record()? {
296            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
297        }
298
299        assert_eq!(records, expected);
300
301        Ok(())
302    }
303
304    #[test]
305    fn test_empty_row() -> error::Result<()> {
306        let data = "name\n\"\"\nlucy\n\"\"";
307
308        // Zero-copy
309        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
310
311        let expected = vec![
312            vec!["name".as_bytes().to_vec()],
313            vec!["\"\"".as_bytes().to_vec()],
314            vec!["lucy".as_bytes().to_vec()],
315            vec!["\"\"".as_bytes().to_vec()],
316        ];
317
318        // Read
319        let mut records = Vec::new();
320
321        while let Some(record) = reader.read_byte_record()? {
322            records.push(vec![record.as_slice().to_vec()]);
323        }
324
325        assert_eq!(records, expected);
326
327        Ok(())
328    }
329
330    #[test]
331    fn test_byte_headers() -> error::Result<()> {
332        let data = b"name,surname\njohn,dandy";
333
334        // Headers, call before read
335        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
336        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
337        assert_eq!(
338            reader.read_byte_record()?.unwrap().to_byte_record(),
339            brec!["john", "dandy"]
340        );
341
342        // Headers, call after read
343        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
344        assert_eq!(
345            reader.read_byte_record()?.unwrap().to_byte_record(),
346            brec!["john", "dandy"]
347        );
348        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
349
350        // No headers, call before read
351        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
352        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
353        assert_eq!(
354            reader.read_byte_record()?.unwrap().to_byte_record(),
355            brec!["name", "surname"]
356        );
357
358        // No headers, call after read
359        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
360        assert_eq!(
361            reader.read_byte_record()?.unwrap().to_byte_record(),
362            brec!["name", "surname"]
363        );
364        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
365
366        // Headers, empty
367        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
368        assert_eq!(reader.byte_headers()?, &brec![]);
369        assert!(reader.read_byte_record()?.is_none());
370
371        // No headers, empty
372        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
373        assert_eq!(reader.byte_headers()?, &brec![]);
374        assert!(reader.read_byte_record()?.is_none());
375
376        Ok(())
377    }
378}