simd_csv/
zero_copy_reader.rs

1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10#[derive(Clone)]
11pub struct ZeroCopyReaderBuilder {
12    delimiter: u8,
13    quote: u8,
14    buffer_capacity: usize,
15    flexible: bool,
16    has_headers: bool,
17}
18
19impl Default for ZeroCopyReaderBuilder {
20    fn default() -> Self {
21        Self {
22            delimiter: b',',
23            quote: b'"',
24            buffer_capacity: 8192,
25            flexible: false,
26            has_headers: true,
27        }
28    }
29}
30
31impl ZeroCopyReaderBuilder {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn with_capacity(capacity: usize) -> Self {
37        let mut reader = Self::default();
38        reader.buffer_capacity(capacity);
39        reader
40    }
41
42    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
43        self.delimiter = delimiter;
44        self
45    }
46
47    pub fn quote(&mut self, quote: u8) -> &mut Self {
48        self.quote = quote;
49        self
50    }
51
52    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
53        self.buffer_capacity = capacity;
54        self
55    }
56
57    pub fn flexible(&mut self, yes: bool) -> &mut Self {
58        self.flexible = yes;
59        self
60    }
61
62    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
63        self.has_headers = yes;
64        self
65    }
66
67    pub fn to_splitter_builder(&self) -> SplitterBuilder {
68        let mut splitter_builder = SplitterBuilder::new();
69
70        splitter_builder
71            .buffer_capacity(self.buffer_capacity)
72            .has_headers(self.has_headers)
73            .quote(self.quote)
74            .delimiter(self.delimiter);
75
76        splitter_builder
77    }
78
79    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
80        ZeroCopyReader {
81            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
82            inner: CoreReader::new(self.delimiter, self.quote),
83            byte_headers: ByteRecord::new(),
84            raw_headers: (Vec::new(), Vec::new()),
85            seps: Vec::new(),
86            flexible: self.flexible,
87            has_read: false,
88            must_reemit_headers: !self.has_headers,
89            has_headers: self.has_headers,
90            index: 0,
91        }
92    }
93}
94
95pub struct ZeroCopyReader<R> {
96    buffer: ScratchBuffer<R>,
97    inner: CoreReader,
98    byte_headers: ByteRecord,
99    raw_headers: (Vec<usize>, Vec<u8>),
100    seps: Vec<usize>,
101    flexible: bool,
102    has_read: bool,
103    must_reemit_headers: bool,
104    has_headers: bool,
105    index: u64,
106}
107
108impl<R: Read> ZeroCopyReader<R> {
109    pub fn from_reader(reader: R) -> Self {
110        ZeroCopyReaderBuilder::new().from_reader(reader)
111    }
112
113    #[inline]
114    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
115        if self.flexible {
116            return Ok(());
117        }
118
119        let headers_len = self.raw_headers.0.len() + 1;
120
121        if self.has_read && written != headers_len {
122            return Err(Error::new(ErrorKind::UnequalLengths {
123                expected_len: headers_len,
124                len: written,
125                pos: Some((byte, self.index)),
126            }));
127        }
128
129        Ok(())
130    }
131
132    #[inline]
133    fn on_first_read(&mut self) -> error::Result<()> {
134        if self.has_read {
135            return Ok(());
136        }
137
138        // Trimming BOM
139        let input = self.buffer.fill_buf()?;
140        let bom_len = trim_bom(input);
141        self.buffer.consume(bom_len);
142
143        // Reading headers
144        let mut headers_seps = Vec::new();
145        let mut headers_slice = Vec::new();
146        let mut byte_headers = ByteRecord::new();
147
148        if let Some(headers) = self.read_byte_record_impl()? {
149            (headers_seps, headers_slice) = headers.to_parts();
150            byte_headers = headers.to_byte_record();
151        } else {
152            self.must_reemit_headers = false;
153        }
154
155        self.raw_headers = (headers_seps, headers_slice);
156        self.byte_headers = byte_headers;
157
158        self.has_read = true;
159
160        Ok(())
161    }
162
163    #[inline]
164    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
165        self.on_first_read()?;
166
167        Ok(&self.byte_headers)
168    }
169
170    #[inline]
171    pub fn has_headers(&self) -> bool {
172        self.has_headers
173    }
174
175    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
176        use ReadResult::*;
177
178        self.buffer.reset();
179        self.seps.clear();
180
181        let byte = self.position();
182
183        loop {
184            let seps_offset = self.buffer.saved().len();
185            let input = self.buffer.fill_buf()?;
186
187            let (result, pos) =
188                self.inner
189                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
190
191            match result {
192                End => {
193                    self.buffer.consume(pos);
194                    return Ok(None);
195                }
196                Cr | Lf => {
197                    self.buffer.consume(pos);
198                }
199                InputEmpty => {
200                    self.buffer.save();
201                }
202                Record => {
203                    self.index += 1;
204                    self.check_field_count(byte, self.seps.len() + 1)?;
205
206                    let record = ZeroCopyByteRecord::new(
207                        self.buffer.flush(pos),
208                        &self.seps,
209                        self.inner.quote,
210                    );
211
212                    return Ok(Some(record));
213                }
214            };
215        }
216    }
217
218    #[inline(always)]
219    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
220        self.on_first_read()?;
221
222        if self.must_reemit_headers {
223            self.must_reemit_headers = false;
224            return Ok(Some(ZeroCopyByteRecord::new(
225                &self.raw_headers.1,
226                &self.raw_headers.0,
227                self.inner.quote,
228            )));
229        }
230
231        self.read_byte_record_impl()
232    }
233
234    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
235        (
236            self.must_reemit_headers.then_some(self.byte_headers),
237            self.buffer.into_bufreader(),
238        )
239    }
240
241    #[inline(always)]
242    pub fn position(&self) -> u64 {
243        if self.must_reemit_headers {
244            0
245        } else {
246            self.buffer.position()
247        }
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use std::io::Cursor;
254
255    use crate::brec;
256
257    use super::*;
258
259    impl<R: Read> ZeroCopyReader<R> {
260        fn from_reader_no_headers(reader: R) -> Self {
261            ZeroCopyReaderBuilder::new()
262                .has_headers(false)
263                .from_reader(reader)
264        }
265    }
266
267    #[test]
268    fn test_read_zero_copy_byte_record() -> error::Result<()> {
269        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
270
271        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
272            .has_headers(false)
273            .from_reader(Cursor::new(csv));
274        let mut records = Vec::new();
275
276        let expected = vec![
277            vec!["name", "surname", "age"],
278            vec![
279                "\"john\"",
280                "\"landy, the \"\"everlasting\"\" bastard\"",
281                "45",
282            ],
283            vec!["lucy", "rose", "\"67\""],
284            vec!["jermaine", "jackson", "\"89\""],
285            vec!["karine", "loucan", "\"52\""],
286            vec!["rose", "\"glib\"", "12"],
287            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
288        ]
289        .into_iter()
290        .map(|record| {
291            record
292                .into_iter()
293                .map(|cell| cell.as_bytes().to_vec())
294                .collect::<Vec<_>>()
295        })
296        .collect::<Vec<_>>();
297
298        while let Some(record) = reader.read_byte_record()? {
299            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
300        }
301
302        assert_eq!(records, expected);
303
304        Ok(())
305    }
306
307    #[test]
308    fn test_empty_row() -> error::Result<()> {
309        let data = "name\n\"\"\nlucy\n\"\"";
310
311        // Zero-copy
312        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
313
314        let expected = vec![
315            vec!["name".as_bytes().to_vec()],
316            vec!["\"\"".as_bytes().to_vec()],
317            vec!["lucy".as_bytes().to_vec()],
318            vec!["\"\"".as_bytes().to_vec()],
319        ];
320
321        // Read
322        let mut records = Vec::new();
323
324        while let Some(record) = reader.read_byte_record()? {
325            records.push(vec![record.as_slice().to_vec()]);
326        }
327
328        assert_eq!(records, expected);
329
330        Ok(())
331    }
332
333    #[test]
334    fn test_byte_headers() -> error::Result<()> {
335        let data = b"name,surname\njohn,dandy";
336
337        // Headers, call before read
338        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
339        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
340        assert_eq!(
341            reader.read_byte_record()?.unwrap().to_byte_record(),
342            brec!["john", "dandy"]
343        );
344
345        // Headers, call after read
346        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
347        assert_eq!(
348            reader.read_byte_record()?.unwrap().to_byte_record(),
349            brec!["john", "dandy"]
350        );
351        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
352
353        // No headers, call before read
354        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
355        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
356        assert_eq!(
357            reader.read_byte_record()?.unwrap().to_byte_record(),
358            brec!["name", "surname"]
359        );
360
361        // No headers, call after read
362        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
363        assert_eq!(
364            reader.read_byte_record()?.unwrap().to_byte_record(),
365            brec!["name", "surname"]
366        );
367        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
368
369        // Headers, empty
370        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
371        assert_eq!(reader.byte_headers()?, &brec![]);
372        assert!(reader.read_byte_record()?.is_none());
373
374        // No headers, empty
375        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
376        assert_eq!(reader.byte_headers()?, &brec![]);
377        assert!(reader.read_byte_record()?.is_none());
378
379        Ok(())
380    }
381}