simd_csv/
zero_copy_reader.rs

1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::utils::trim_bom;
8
9pub struct ZeroCopyReaderBuilder {
10    delimiter: u8,
11    quote: u8,
12    buffer_capacity: Option<usize>,
13    flexible: bool,
14    has_headers: bool,
15}
16
17impl Default for ZeroCopyReaderBuilder {
18    fn default() -> Self {
19        Self {
20            delimiter: b',',
21            quote: b'"',
22            buffer_capacity: None,
23            flexible: false,
24            has_headers: true,
25        }
26    }
27}
28
29impl ZeroCopyReaderBuilder {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn with_capacity(capacity: usize) -> Self {
35        let mut reader = Self::default();
36        reader.buffer_capacity(capacity);
37        reader
38    }
39
40    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
41        self.delimiter = delimiter;
42        self
43    }
44
45    pub fn quote(&mut self, quote: u8) -> &mut Self {
46        self.quote = quote;
47        self
48    }
49
50    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
51        self.buffer_capacity = Some(capacity);
52        self
53    }
54
55    pub fn flexible(&mut self, yes: bool) -> &mut Self {
56        self.flexible = yes;
57        self
58    }
59
60    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
61        self.has_headers = yes;
62        self
63    }
64
65    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
66        ZeroCopyReader {
67            buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
68            inner: CoreReader::new(self.delimiter, self.quote),
69            headers_seps: Vec::new(),
70            headers_slice: Vec::new(),
71            headers: ByteRecord::new(),
72            seps: Vec::new(),
73            flexible: self.flexible,
74            has_read: false,
75            must_reemit_headers: !self.has_headers,
76        }
77    }
78}
79
80pub struct ZeroCopyReader<R> {
81    buffer: ScratchBuffer<R>,
82    inner: CoreReader,
83    headers: ByteRecord,
84    headers_seps: Vec<usize>,
85    headers_slice: Vec<u8>,
86    seps: Vec<usize>,
87    flexible: bool,
88    has_read: bool,
89    must_reemit_headers: bool,
90}
91
92impl<R: Read> ZeroCopyReader<R> {
93    pub fn from_reader(reader: R) -> Self {
94        ZeroCopyReaderBuilder::new().from_reader(reader)
95    }
96
97    #[inline]
98    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
99        if self.flexible {
100            return Ok(());
101        }
102
103        let headers_len = self.headers_seps.len() + 1;
104
105        if self.has_read && written != headers_len {
106            return Err(Error::unequal_lengths(headers_len, written));
107        }
108
109        Ok(())
110    }
111
112    #[inline]
113    fn on_first_read(&mut self) -> error::Result<()> {
114        if self.has_read {
115            return Ok(());
116        }
117
118        // Trimming BOM
119        let input = self.buffer.fill_buf()?;
120        let bom_len = trim_bom(input);
121        self.buffer.consume(bom_len);
122
123        // Reading headers
124        let mut headers_seps = Vec::new();
125        let mut headers_slice = Vec::new();
126        let mut byte_headers = ByteRecord::new();
127
128        if let Some(headers) = self.read_byte_record_impl()? {
129            (headers_seps, headers_slice) = headers.to_parts();
130            byte_headers = headers.to_byte_record();
131        } else {
132            self.must_reemit_headers = false;
133        }
134
135        self.headers_seps = headers_seps;
136        self.headers_slice = headers_slice;
137        self.headers = byte_headers;
138
139        self.has_read = true;
140
141        Ok(())
142    }
143
144    #[inline]
145    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
146        self.on_first_read()?;
147
148        Ok(&self.headers)
149    }
150
151    fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
152        use ReadResult::*;
153
154        self.buffer.reset();
155        self.seps.clear();
156
157        loop {
158            let seps_offset = self.buffer.saved().len();
159            let input = self.buffer.fill_buf()?;
160
161            let (result, pos) =
162                self.inner
163                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
164
165            match result {
166                End => {
167                    self.buffer.consume(pos);
168                    return Ok(None);
169                }
170                Cr | Lf => {
171                    self.buffer.consume(pos);
172                }
173                InputEmpty => {
174                    self.buffer.save();
175                }
176                Record => {
177                    self.check_field_count(self.seps.len() + 1)?;
178
179                    let record = ZeroCopyByteRecord::new(
180                        self.buffer.flush(pos),
181                        &self.seps,
182                        self.inner.quote,
183                    );
184
185                    return Ok(Some(record));
186                }
187            };
188        }
189    }
190
191    #[inline(always)]
192    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
193        self.on_first_read()?;
194
195        if self.must_reemit_headers {
196            self.must_reemit_headers = false;
197            return Ok(Some(ZeroCopyByteRecord::new(
198                &self.headers_slice,
199                &self.headers_seps,
200                self.inner.quote,
201            )));
202        }
203
204        self.read_byte_record_impl()
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::io::Cursor;
211
212    use crate::brec;
213
214    use super::*;
215
216    impl<R: Read> ZeroCopyReader<R> {
217        fn from_reader_no_headers(reader: R) -> Self {
218            ZeroCopyReaderBuilder::new()
219                .has_headers(false)
220                .from_reader(reader)
221        }
222    }
223
224    #[test]
225    fn test_read_zero_copy_byte_record() -> error::Result<()> {
226        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
227
228        let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
229            .has_headers(false)
230            .from_reader(Cursor::new(csv));
231        let mut records = Vec::new();
232
233        let expected = vec![
234            vec!["name", "surname", "age"],
235            vec![
236                "\"john\"",
237                "\"landy, the \"\"everlasting\"\" bastard\"",
238                "45",
239            ],
240            vec!["lucy", "rose", "\"67\""],
241            vec!["jermaine", "jackson", "\"89\""],
242            vec!["karine", "loucan", "\"52\""],
243            vec!["rose", "\"glib\"", "12"],
244            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
245        ]
246        .into_iter()
247        .map(|record| {
248            record
249                .into_iter()
250                .map(|cell| cell.as_bytes().to_vec())
251                .collect::<Vec<_>>()
252        })
253        .collect::<Vec<_>>();
254
255        while let Some(record) = reader.read_byte_record()? {
256            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
257        }
258
259        assert_eq!(records, expected);
260
261        Ok(())
262    }
263
264    #[test]
265    fn test_empty_row() -> error::Result<()> {
266        let data = "name\n\"\"\nlucy\n\"\"";
267
268        // Zero-copy
269        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
270
271        let expected = vec![
272            vec!["name".as_bytes().to_vec()],
273            vec!["\"\"".as_bytes().to_vec()],
274            vec!["lucy".as_bytes().to_vec()],
275            vec!["\"\"".as_bytes().to_vec()],
276        ];
277
278        // Read
279        let mut records = Vec::new();
280
281        while let Some(record) = reader.read_byte_record()? {
282            records.push(vec![record.as_slice().to_vec()]);
283        }
284
285        assert_eq!(records, expected);
286
287        Ok(())
288    }
289
290    #[test]
291    fn test_byte_headers() -> error::Result<()> {
292        let data = b"name,surname\njohn,dandy";
293
294        // Headers, call before read
295        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
296        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
297        assert_eq!(
298            reader.read_byte_record()?.unwrap().to_byte_record(),
299            brec!["john", "dandy"]
300        );
301
302        // Headers, call after read
303        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
304        assert_eq!(
305            reader.read_byte_record()?.unwrap().to_byte_record(),
306            brec!["john", "dandy"]
307        );
308        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
309
310        // No headers, call before read
311        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
312        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
313        assert_eq!(
314            reader.read_byte_record()?.unwrap().to_byte_record(),
315            brec!["name", "surname"]
316        );
317
318        // No headers, call after read
319        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
320        assert_eq!(
321            reader.read_byte_record()?.unwrap().to_byte_record(),
322            brec!["name", "surname"]
323        );
324        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
325
326        // Headers, empty
327        let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
328        assert_eq!(reader.byte_headers()?, &brec![]);
329        assert!(reader.read_byte_record()?.is_none());
330
331        // No headers, empty
332        let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
333        assert_eq!(reader.byte_headers()?, &brec![]);
334        assert!(reader.read_byte_record()?.is_none());
335
336        Ok(())
337    }
338}