simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read};
2
3use crate::core::{self, ReadResult};
4use crate::error::{self, Error};
5use crate::records::{ByteRecord, ByteRecordBuilder, ZeroCopyByteRecord};
6
7pub struct BufferedReader<R> {
8    buffer: BufReader<R>,
9    scratch: Vec<u8>,
10    seps: Vec<usize>,
11    actual_buffer_position: Option<usize>,
12    inner: core::Reader,
13    field_count: Option<usize>,
14}
15
16impl<R: Read> BufferedReader<R> {
17    pub fn new(reader: R, delimiter: u8, quote: u8) -> Self {
18        Self {
19            buffer: BufReader::new(reader),
20            scratch: Vec::new(),
21            seps: Vec::new(),
22            actual_buffer_position: None,
23            inner: core::Reader::new(delimiter, quote),
24            field_count: None,
25        }
26    }
27
28    pub fn with_capacity(capacity: usize, reader: R, delimiter: u8, quote: u8) -> Self {
29        Self {
30            buffer: BufReader::with_capacity(capacity, reader),
31            scratch: Vec::new(),
32            seps: Vec::new(),
33            actual_buffer_position: None,
34            inner: core::Reader::new(delimiter, quote),
35            field_count: None,
36        }
37    }
38
39    #[inline]
40    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
41        match self.field_count {
42            Some(expected) => {
43                if written != expected {
44                    return Err(Error::unequal_lengths(expected, written));
45                }
46            }
47            None => {
48                self.field_count = Some(written);
49            }
50        }
51
52        Ok(())
53    }
54
55    pub fn strip_bom(&mut self) -> error::Result<()> {
56        let input = self.buffer.fill_buf()?;
57
58        if input.len() >= 3 && &input[..3] == b"\xef\xbb\xbf" {
59            self.buffer.consume(3);
60        }
61
62        Ok(())
63    }
64
65    pub fn first_byte_record(&mut self, consume: bool) -> error::Result<ByteRecord> {
66        use ReadResult::*;
67
68        let mut record = ByteRecord::new();
69        let mut record_builder = ByteRecordBuilder::wrap(&mut record);
70
71        let input = self.buffer.fill_buf()?;
72
73        let (result, pos) = self.inner.read_record(input, &mut record_builder);
74
75        match result {
76            End => Ok(ByteRecord::new()),
77
78            // TODO: we could expand the capacity of the buffer automagically here
79            // if this becomes an issue.
80            Cr | Lf | ReadResult::InputEmpty => Err(Error::invalid_headers()),
81            Record => {
82                if consume {
83                    self.buffer.consume(pos);
84                }
85
86                Ok(record)
87            }
88        }
89    }
90
91    pub fn count_records(&mut self) -> error::Result<u64> {
92        use ReadResult::*;
93
94        let mut count: u64 = 0;
95
96        loop {
97            let input = self.buffer.fill_buf()?;
98
99            let (result, pos) = self.inner.split_record(input);
100
101            self.buffer.consume(pos);
102
103            match result {
104                End => break,
105                InputEmpty | Cr | Lf => continue,
106                Record => {
107                    count += 1;
108                }
109            };
110        }
111
112        Ok(count)
113    }
114
115    pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
116        use ReadResult::*;
117
118        self.scratch.clear();
119
120        if let Some(last_pos) = self.actual_buffer_position.take() {
121            self.buffer.consume(last_pos);
122        }
123
124        loop {
125            let input = self.buffer.fill_buf()?;
126
127            let (result, pos) = self.inner.split_record(input);
128
129            match result {
130                End => {
131                    self.buffer.consume(pos);
132                    return Ok(None);
133                }
134                Cr | Lf => {
135                    self.buffer.consume(pos);
136                }
137                InputEmpty => {
138                    self.scratch.extend_from_slice(input);
139                    self.buffer.consume(pos);
140                }
141                Record => {
142                    if self.scratch.is_empty() {
143                        self.actual_buffer_position = Some(pos);
144                        return Ok(Some(&self.buffer.buffer()[..pos]));
145                    } else {
146                        self.scratch.extend_from_slice(&input[..pos]);
147                        self.buffer.consume(pos);
148
149                        return Ok(Some(&self.scratch));
150                    }
151                }
152            };
153        }
154    }
155
156    pub fn read_zero_copy_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
157        use ReadResult::*;
158
159        self.scratch.clear();
160        self.seps.clear();
161
162        if let Some(last_pos) = self.actual_buffer_position.take() {
163            self.buffer.consume(last_pos);
164        }
165
166        loop {
167            let input = self.buffer.fill_buf()?;
168
169            let (result, pos) = self.inner.split_record_and_find_separators(
170                input,
171                self.scratch.len(),
172                &mut self.seps,
173            );
174
175            match result {
176                End => {
177                    self.buffer.consume(pos);
178                    return Ok(None);
179                }
180                Cr | Lf => {
181                    self.buffer.consume(pos);
182                }
183                InputEmpty => {
184                    self.scratch.extend_from_slice(input);
185                    self.buffer.consume(pos);
186                }
187                Record => {
188                    if self.scratch.is_empty() {
189                        self.check_field_count(self.seps.len() + 1)?;
190                        self.actual_buffer_position = Some(pos);
191                        return Ok(Some(ZeroCopyByteRecord::new(
192                            &self.buffer.buffer()[..pos],
193                            &self.seps,
194                        )));
195                    } else {
196                        self.scratch.extend_from_slice(&input[..pos]);
197                        self.buffer.consume(pos);
198                        self.check_field_count(self.seps.len() + 1)?;
199                        return Ok(Some(ZeroCopyByteRecord::new(&self.scratch, &self.seps)));
200                    }
201                }
202            };
203        }
204    }
205
206    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
207        use ReadResult::*;
208
209        record.clear();
210
211        let mut record_builder = ByteRecordBuilder::wrap(record);
212
213        if let Some(last_pos) = self.actual_buffer_position.take() {
214            self.buffer.consume(last_pos);
215        }
216
217        loop {
218            let input = self.buffer.fill_buf()?;
219
220            let (result, pos) = self.inner.read_record(input, &mut record_builder);
221
222            self.buffer.consume(pos);
223
224            match result {
225                End => {
226                    return Ok(false);
227                }
228                Cr | Lf | InputEmpty => {
229                    continue;
230                }
231                Record => {
232                    self.check_field_count(record.len())?;
233                    return Ok(true);
234                }
235            };
236        }
237    }
238
239    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
240        ByteRecordsIter {
241            reader: self,
242            record: ByteRecord::new(),
243        }
244    }
245
246    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
247        ByteRecordsIntoIter {
248            reader: self,
249            record: ByteRecord::new(),
250        }
251    }
252}
253
254pub struct ByteRecordsIter<'r, R> {
255    reader: &'r mut BufferedReader<R>,
256    record: ByteRecord,
257}
258
259impl<'r, R: Read> Iterator for ByteRecordsIter<'r, R> {
260    type Item = error::Result<ByteRecord>;
261
262    fn next(&mut self) -> Option<Self::Item> {
263        // NOTE: cloning the record will not carry over excess capacity
264        // because the record only contains `Vec` currently.
265        match self.reader.read_byte_record(&mut self.record) {
266            Err(err) => Some(Err(err)),
267            Ok(true) => Some(Ok(self.record.clone())),
268            Ok(false) => None,
269        }
270    }
271}
272
273pub struct ByteRecordsIntoIter<R> {
274    reader: BufferedReader<R>,
275    record: ByteRecord,
276}
277
278impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
279    type Item = error::Result<ByteRecord>;
280
281    fn next(&mut self) -> Option<Self::Item> {
282        // NOTE: cloning the record will not carry over excess capacity
283        // because the record only contains `Vec` currently.
284        match self.reader.read_byte_record(&mut self.record) {
285            Err(err) => Some(Err(err)),
286            Ok(true) => Some(Ok(self.record.clone())),
287            Ok(false) => None,
288        }
289    }
290}
291
292// NOTE: a reader to be used when the whole data fits into memory or when using
293// memory maps.
294pub struct TotalReader<'b> {
295    inner: core::Reader,
296    bytes: &'b [u8],
297    pos: usize,
298}
299
300impl<'b> TotalReader<'b> {
301    pub fn new(delimiter: u8, quote: u8, bytes: &'b [u8]) -> Self {
302        Self {
303            inner: core::Reader::new(delimiter, quote),
304            bytes,
305            pos: 0,
306        }
307    }
308
309    pub fn count_records(&mut self) -> u64 {
310        use ReadResult::*;
311
312        let mut count: u64 = 0;
313
314        loop {
315            let (result, pos) = self.inner.split_record(&self.bytes[self.pos..]);
316
317            self.pos += pos;
318
319            match result {
320                End => break,
321                InputEmpty | Cr | Lf => continue,
322                Record => {
323                    count += 1;
324                }
325            };
326        }
327
328        count
329    }
330
331    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
332        use ReadResult::*;
333
334        record.clear();
335
336        let mut record_builder = ByteRecordBuilder::wrap(record);
337
338        loop {
339            let (result, pos) = self
340                .inner
341                .read_record(&self.bytes[self.pos..], &mut record_builder);
342
343            self.pos += pos;
344
345            match result {
346                End => {
347                    return Ok(false);
348                }
349                Cr | Lf | InputEmpty => {
350                    continue;
351                }
352                Record => {
353                    return Ok(true);
354                }
355            };
356        }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::io::Cursor;
363
364    use crate::brec;
365
366    use super::*;
367
368    fn count_records(data: &str, capacity: usize) -> u64 {
369        let mut splitter = BufferedReader::with_capacity(capacity, Cursor::new(data), b',', b'"');
370        splitter.count_records().unwrap()
371    }
372
373    #[test]
374    fn test_count() {
375        // Empty
376        assert_eq!(count_records("", 1024), 0);
377
378        // Single cells with various empty lines
379        let tests = vec![
380            "name\njohn\nlucy",
381            "name\njohn\nlucy\n",
382            "name\n\njohn\r\nlucy\n",
383            "name\n\njohn\r\nlucy\n\n",
384            "name\n\n\njohn\r\n\r\nlucy\n\n\n",
385            "\nname\njohn\nlucy",
386            "\n\nname\njohn\nlucy",
387            "\r\n\r\nname\njohn\nlucy",
388            "name\njohn\nlucy\r\n",
389            "name\njohn\nlucy\r\n\r\n",
390        ];
391
392        for capacity in [32usize, 4, 3, 2, 1] {
393            for test in tests.iter() {
394                assert_eq!(
395                    count_records(test, capacity),
396                    3,
397                    "capacity={} string={:?}",
398                    capacity,
399                    test
400                );
401            }
402        }
403
404        // Multiple cells
405        let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
406        assert_eq!(count_records(data, 1024), 3);
407
408        // Quoting
409        for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
410            let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
411
412            assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
413        }
414
415        // Different separator
416        let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
417        assert_eq!(count_records(data, 1024), 3);
418    }
419
420    #[test]
421    fn test_read_zero_copy_byte_record() -> error::Result<()> {
422        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
423
424        let mut reader = BufferedReader::with_capacity(32, Cursor::new(csv), b',', b'"');
425        let mut records = Vec::new();
426
427        let expected = vec![
428            vec!["name", "surname", "age"],
429            vec![
430                "\"john\"",
431                "\"landy, the \"\"everlasting\"\" bastard\"",
432                "45",
433            ],
434            vec!["lucy", "rose", "\"67\""],
435            vec!["jermaine", "jackson", "\"89\""],
436            vec!["karine", "loucan", "\"52\""],
437            vec!["rose", "\"glib\"", "12"],
438            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
439        ]
440        .into_iter()
441        .map(|record| {
442            record
443                .into_iter()
444                .map(|cell| cell.as_bytes().to_vec())
445                .collect::<Vec<_>>()
446        })
447        .collect::<Vec<_>>();
448
449        while let Some(record) = reader.read_zero_copy_byte_record()? {
450            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
451        }
452
453        assert_eq!(records, expected);
454
455        Ok(())
456    }
457
458    #[test]
459    fn test_read_byte_record() -> error::Result<()> {
460        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
461
462        let expected = vec![
463            brec!["name", "surname", "age"],
464            brec!["john", "landy, the \"everlasting\" bastard", "45"],
465            brec!["\"ok\"", "whatever", "dude"],
466            brec!["lucy", "rose", "67"],
467            brec!["jermaine", "jackson", "89"],
468            brec!["karine", "loucan", "52"],
469            brec!["rose", "glib", "12"],
470            brec!["guillaume", "plique", "42"],
471        ];
472
473        for capacity in [32usize, 4, 3, 2, 1] {
474            let mut reader = BufferedReader::with_capacity(capacity, Cursor::new(csv), b',', b'"');
475
476            assert_eq!(
477                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
478                expected
479            );
480        }
481
482        Ok(())
483    }
484
485    #[test]
486    fn test_strip_bom() -> error::Result<()> {
487        let mut reader = BufferedReader::new(Cursor::new("name,surname,age"), b',', b'"');
488        reader.strip_bom()?;
489
490        assert_eq!(
491            reader.byte_records().next().unwrap()?,
492            brec!["name", "surname", "age"]
493        );
494
495        let mut reader =
496            BufferedReader::new(Cursor::new(b"\xef\xbb\xbfname,surname,age"), b',', b'"');
497        reader.strip_bom()?;
498
499        assert_eq!(
500            reader.byte_records().next().unwrap()?,
501            brec!["name", "surname", "age"]
502        );
503
504        Ok(())
505    }
506
507    #[test]
508    fn test_empty_row() -> error::Result<()> {
509        let data = "name\n\"\"\nlucy\n\"\"";
510
511        // Counting
512        let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
513
514        assert_eq!(reader.count_records()?, 4);
515
516        // Zero-copy
517        let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
518
519        let expected = vec![
520            vec!["name".as_bytes().to_vec()],
521            vec!["\"\"".as_bytes().to_vec()],
522            vec!["lucy".as_bytes().to_vec()],
523            vec!["\"\"".as_bytes().to_vec()],
524        ];
525
526        // Read
527        let mut records = Vec::new();
528
529        while let Some(record) = reader.read_zero_copy_byte_record()? {
530            records.push(vec![record.as_slice().to_vec()]);
531        }
532
533        assert_eq!(records, expected);
534
535        let reader = BufferedReader::new(Cursor::new(data), b',', b'"');
536
537        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
538
539        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
540
541        assert_eq!(records, expected);
542
543        Ok(())
544    }
545
546    #[test]
547    fn test_crlf() -> error::Result<()> {
548        let reader = BufferedReader::new(
549            Cursor::new("name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n"),
550            b',',
551            b'"',
552        );
553
554        let expected = vec![
555            brec!["name", "surname"],
556            brec!["lucy", "john"],
557            brec!["evan", "zhong"],
558            brec!["béatrice", "glougou"],
559        ];
560
561        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
562
563        assert_eq!(records, expected);
564
565        Ok(())
566    }
567
568    #[test]
569    fn test_quote_always() -> error::Result<()> {
570        let reader = BufferedReader::new(
571            Cursor::new("\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\""),
572            b',',
573            b'"',
574        );
575
576        let expected = vec![
577            brec!["name", "surname"],
578            brec!["lucy", "rose"],
579            brec!["john", "mayhew"],
580        ];
581
582        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
583
584        assert_eq!(records, expected);
585
586        Ok(())
587    }
588}