simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
10    delimiter: u8,
11    quote: u8,
12    buffer_capacity: usize,
13    flexible: bool,
14    has_headers: bool,
15}
16
17impl Default for ReaderBuilder {
18    fn default() -> Self {
19        Self {
20            delimiter: b',',
21            quote: b'"',
22            buffer_capacity: 8192,
23            flexible: false,
24            has_headers: true,
25        }
26    }
27}
28
29impl ReaderBuilder {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn with_capacity(capacity: usize) -> Self {
35        let mut reader = Self::default();
36        reader.buffer_capacity(capacity);
37        reader
38    }
39
40    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
41        self.delimiter = delimiter;
42        self
43    }
44
45    pub fn quote(&mut self, quote: u8) -> &mut Self {
46        self.quote = quote;
47        self
48    }
49
50    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
51        self.buffer_capacity = capacity;
52        self
53    }
54
55    pub fn flexible(&mut self, yes: bool) -> &mut Self {
56        self.flexible = yes;
57        self
58    }
59
60    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
61        self.has_headers = yes;
62        self
63    }
64
65    pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
66        Reader {
67            buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
68            inner: CoreReader::new(self.delimiter, self.quote),
69            flexible: self.flexible,
70            headers: ByteRecord::new(),
71            has_read: false,
72            must_reemit_headers: !self.has_headers,
73            has_headers: self.has_headers,
74            index: 0,
75        }
76    }
77
78    pub fn reverse_from_reader<R: Read + Seek>(
79        &self,
80        mut reader: R,
81    ) -> error::Result<ReverseReader<R>> {
82        let initial_pos = reader.stream_position()?;
83
84        let mut forward_reader = self.from_reader(reader);
85        let headers = forward_reader.byte_headers()?.clone();
86        let position_after_headers = forward_reader.position();
87
88        let mut reader = forward_reader.into_inner();
89
90        let file_len = reader.seek(SeekFrom::End(0))?;
91
92        let offset = if self.has_headers {
93            initial_pos + position_after_headers
94        } else {
95            initial_pos
96        };
97
98        let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
99
100        Ok(ReverseReader {
101            buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
102            inner: CoreReader::new(self.delimiter, self.quote),
103            flexible: self.flexible,
104            headers,
105        })
106    }
107}
108
109pub struct Reader<R> {
110    buffer: BufReaderWithPosition<R>,
111    inner: CoreReader,
112    flexible: bool,
113    headers: ByteRecord,
114    has_read: bool,
115    must_reemit_headers: bool,
116    has_headers: bool,
117    index: u64,
118}
119
120impl<R: Read> Reader<R> {
121    pub fn from_reader(reader: R) -> Self {
122        ReaderBuilder::new().from_reader(reader)
123    }
124
125    #[inline]
126    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
127        if self.flexible {
128            return Ok(());
129        }
130
131        if self.has_read && written != self.headers.len() {
132            return Err(Error::new(ErrorKind::UnequalLengths {
133                expected_len: self.headers.len(),
134                len: written,
135                pos: Some((
136                    byte,
137                    self.index
138                        .saturating_sub(if self.has_headers { 1 } else { 0 }),
139                )),
140            }));
141        }
142
143        Ok(())
144    }
145
146    fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
147        use ReadResult::*;
148
149        record.clear();
150
151        let mut record_builder = ByteRecordBuilder::wrap(record);
152        let byte = self.position();
153
154        loop {
155            let input = self.buffer.fill_buf()?;
156
157            let (result, pos) = self.inner.read_record(input, &mut record_builder);
158
159            self.buffer.consume(pos);
160
161            match result {
162                End => {
163                    return Ok(false);
164                }
165                Cr | Lf | InputEmpty => {
166                    continue;
167                }
168                Record => {
169                    self.index += 1;
170                    self.check_field_count(byte, record.len())?;
171                    return Ok(true);
172                }
173            };
174        }
175    }
176
177    #[inline]
178    fn on_first_read(&mut self) -> error::Result<()> {
179        if self.has_read {
180            return Ok(());
181        }
182
183        // Trimming BOM
184        let input = self.buffer.fill_buf()?;
185        let bom_len = trim_bom(input);
186        self.buffer.consume(bom_len);
187
188        // Reading headers
189        let mut headers = ByteRecord::new();
190
191        let has_data = self.read_byte_record_impl(&mut headers)?;
192
193        if !has_data {
194            self.must_reemit_headers = false;
195        }
196
197        self.headers = headers;
198        self.has_read = true;
199
200        Ok(())
201    }
202
203    #[inline]
204    pub fn has_headers(&self) -> bool {
205        self.has_headers
206    }
207
208    #[inline]
209    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
210        self.on_first_read()?;
211
212        Ok(&self.headers)
213    }
214
215    #[inline(always)]
216    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
217        self.on_first_read()?;
218
219        if self.must_reemit_headers {
220            self.headers.clone_into(record);
221            self.must_reemit_headers = false;
222            return Ok(true);
223        }
224
225        self.read_byte_record_impl(record)
226    }
227
228    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
229        ByteRecordsIter {
230            reader: self,
231            record: ByteRecord::new(),
232        }
233    }
234
235    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
236        ByteRecordsIntoIter {
237            reader: self,
238            record: ByteRecord::new(),
239        }
240    }
241
242    pub fn get_ref(&self) -> &R {
243        self.buffer.get_ref()
244    }
245
246    pub fn get_mut(&mut self) -> &mut R {
247        self.buffer.get_mut()
248    }
249
250    pub fn into_inner(self) -> R {
251        self.buffer.into_inner().into_inner()
252    }
253
254    pub fn into_bufreader(self) -> BufReader<R> {
255        self.buffer.into_inner()
256    }
257
258    #[inline(always)]
259    pub fn position(&self) -> u64 {
260        if self.must_reemit_headers {
261            0
262        } else {
263            self.buffer.position()
264        }
265    }
266}
267
268pub struct ByteRecordsIter<'r, R> {
269    reader: &'r mut Reader<R>,
270    record: ByteRecord,
271}
272
273impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
274    type Item = error::Result<ByteRecord>;
275
276    #[inline]
277    fn next(&mut self) -> Option<Self::Item> {
278        // NOTE: cloning the record will not carry over excess capacity
279        // because the record only contains `Vec` currently.
280        match self.reader.read_byte_record(&mut self.record) {
281            Err(err) => Some(Err(err)),
282            Ok(true) => Some(Ok(self.record.clone())),
283            Ok(false) => None,
284        }
285    }
286}
287
288pub struct ByteRecordsIntoIter<R> {
289    reader: Reader<R>,
290    record: ByteRecord,
291}
292
293impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
294    type Item = error::Result<ByteRecord>;
295
296    #[inline]
297    fn next(&mut self) -> Option<Self::Item> {
298        // NOTE: cloning the record will not carry over excess capacity
299        // because the record only contains `Vec` currently.
300        match self.reader.read_byte_record(&mut self.record) {
301            Err(err) => Some(Err(err)),
302            Ok(true) => Some(Ok(self.record.clone())),
303            Ok(false) => None,
304        }
305    }
306}
307
308pub struct ReverseReader<R> {
309    inner: CoreReader,
310    buffer: BufReader<utils::ReverseReader<R>>,
311    flexible: bool,
312    headers: ByteRecord,
313}
314
315impl<R: Read + Seek> ReverseReader<R> {
316    pub fn from_reader(reader: R) -> error::Result<Self> {
317        ReaderBuilder::new().reverse_from_reader(reader)
318    }
319
320    pub fn byte_headers(&self) -> &ByteRecord {
321        &self.headers
322    }
323
324    #[inline]
325    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
326        if self.flexible {
327            return Ok(());
328        }
329
330        if written != self.headers.len() {
331            return Err(Error::new(ErrorKind::UnequalLengths {
332                expected_len: self.headers.len(),
333                len: written,
334                pos: None,
335            }));
336        }
337
338        Ok(())
339    }
340
341    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
342        use ReadResult::*;
343
344        record.clear();
345
346        let mut record_builder = ByteRecordBuilder::wrap(record);
347
348        loop {
349            let input = self.buffer.fill_buf()?;
350
351            let (result, pos) = self.inner.read_record(input, &mut record_builder);
352
353            self.buffer.consume(pos);
354
355            match result {
356                End => {
357                    return Ok(false);
358                }
359                Cr | Lf | InputEmpty => {
360                    continue;
361                }
362                Record => {
363                    self.check_field_count(record.len())?;
364                    record.reverse();
365                    return Ok(true);
366                }
367            };
368        }
369    }
370
371    pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
372        ReverseByteRecordsIter {
373            reader: self,
374            record: ByteRecord::new(),
375        }
376    }
377
378    pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
379        ReverseByteRecordsIntoIter {
380            reader: self,
381            record: ByteRecord::new(),
382        }
383    }
384}
385
386pub struct ReverseByteRecordsIter<'r, R> {
387    reader: &'r mut ReverseReader<R>,
388    record: ByteRecord,
389}
390
391impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
392    type Item = error::Result<ByteRecord>;
393
394    #[inline]
395    fn next(&mut self) -> Option<Self::Item> {
396        // NOTE: cloning the record will not carry over excess capacity
397        // because the record only contains `Vec` currently.
398        match self.reader.read_byte_record(&mut self.record) {
399            Err(err) => Some(Err(err)),
400            Ok(true) => Some(Ok(self.record.clone())),
401            Ok(false) => None,
402        }
403    }
404}
405
406pub struct ReverseByteRecordsIntoIter<R> {
407    reader: ReverseReader<R>,
408    record: ByteRecord,
409}
410
411impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
412    type Item = error::Result<ByteRecord>;
413
414    #[inline]
415    fn next(&mut self) -> Option<Self::Item> {
416        // NOTE: cloning the record will not carry over excess capacity
417        // because the record only contains `Vec` currently.
418        match self.reader.read_byte_record(&mut self.record) {
419            Err(err) => Some(Err(err)),
420            Ok(true) => Some(Ok(self.record.clone())),
421            Ok(false) => None,
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use std::io::Cursor;
429
430    use crate::brec;
431
432    use super::*;
433
434    impl<R: Read> Reader<R> {
435        fn from_reader_no_headers(reader: R) -> Self {
436            ReaderBuilder::new().has_headers(false).from_reader(reader)
437        }
438    }
439
440    #[test]
441    fn test_read_byte_record() -> error::Result<()> {
442        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
443
444        let expected = vec![
445            brec!["name", "surname", "age"],
446            brec!["john", "landy, the \"everlasting\" bastard", "45"],
447            brec!["\"ok\"", "whatever", "dude"],
448            brec!["lucy", "rose", "67"],
449            brec!["jermaine", "jackson", "89"],
450            brec!["karine", "loucan", "52"],
451            brec!["rose", "glib", "12"],
452            brec!["guillaume", "plique", "42"],
453        ];
454
455        for capacity in [32usize, 4, 3, 2, 1] {
456            let mut reader = ReaderBuilder::with_capacity(capacity)
457                .has_headers(false)
458                .from_reader(Cursor::new(csv));
459
460            assert_eq!(
461                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
462                expected,
463            );
464        }
465
466        Ok(())
467    }
468
469    #[test]
470    fn test_strip_bom() -> error::Result<()> {
471        let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
472
473        assert_eq!(
474            reader.byte_records().next().unwrap()?,
475            brec!["name", "surname", "age"]
476        );
477
478        let mut reader =
479            Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
480
481        assert_eq!(
482            reader.byte_records().next().unwrap()?,
483            brec!["name", "surname", "age"]
484        );
485
486        Ok(())
487    }
488
489    #[test]
490    fn test_empty_row() -> error::Result<()> {
491        let data = "name\n\"\"\nlucy\n\"\"";
492
493        // Read
494        let reader = Reader::from_reader_no_headers(Cursor::new(data));
495
496        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
497
498        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
499
500        assert_eq!(records, expected);
501
502        Ok(())
503    }
504
505    #[test]
506    fn test_crlf() -> error::Result<()> {
507        let reader = Reader::from_reader_no_headers(Cursor::new(
508            "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
509        ));
510
511        let expected = vec![
512            brec!["name", "surname"],
513            brec!["lucy", "john"],
514            brec!["evan", "zhong"],
515            brec!["béatrice", "glougou"],
516        ];
517
518        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
519
520        assert_eq!(records, expected);
521
522        Ok(())
523    }
524
525    #[test]
526    fn test_quote_always() -> error::Result<()> {
527        let reader = Reader::from_reader_no_headers(Cursor::new(
528            "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
529        ));
530
531        let expected = vec![
532            brec!["name", "surname"],
533            brec!["lucy", "rose"],
534            brec!["john", "mayhew"],
535        ];
536
537        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
538
539        assert_eq!(records, expected);
540
541        Ok(())
542    }
543
544    #[test]
545    fn test_byte_headers() -> error::Result<()> {
546        let data = b"name,surname\njohn,dandy";
547
548        // Headers, call before read
549        let mut reader = Reader::from_reader(Cursor::new(data));
550        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
551        assert_eq!(
552            reader.byte_records().next().unwrap()?,
553            brec!["john", "dandy"]
554        );
555
556        // Headers, call after read
557        let mut reader = Reader::from_reader(Cursor::new(data));
558        assert_eq!(
559            reader.byte_records().next().unwrap()?,
560            brec!["john", "dandy"]
561        );
562        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
563
564        // No headers, call before read
565        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
566        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
567        assert_eq!(
568            reader.byte_records().next().unwrap()?,
569            brec!["name", "surname"]
570        );
571
572        // No headers, call after read
573        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
574        assert_eq!(
575            reader.byte_records().next().unwrap()?,
576            brec!["name", "surname"]
577        );
578        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
579
580        // Headers, empty
581        let mut reader = Reader::from_reader(Cursor::new(b""));
582        assert_eq!(reader.byte_headers()?, &brec![]);
583        assert!(reader.byte_records().next().is_none());
584
585        // No headers, empty
586        let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
587        assert_eq!(reader.byte_headers()?, &brec![]);
588        assert!(reader.byte_records().next().is_none());
589
590        Ok(())
591    }
592
593    #[test]
594    fn test_weirdness() -> error::Result<()> {
595        // Data after quotes, before next delimiter
596        let data =
597            b"name,surname\n\"test\"  \"wat\", ok\ntest \"wat\",ok  \ntest,\"whatever\"  ok\n\"test\"   there,\"ok\"\r\n";
598        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
599
600        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
601
602        let expected = vec![
603            brec!["name", "surname"],
604            brec!["test  \"wat", " ok"],
605            brec!["test \"wat", "ok  "],
606            brec!["test", "whatever  ok"],
607            brec!["test   there", "ok"],
608        ];
609
610        assert_eq!(records, expected);
611
612        // let data = "aaa\"aaa,bbb";
613        // let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
614        // let record = reader.byte_records().next().unwrap().unwrap();
615
616        // assert_eq!(record, brec!["aaa\"aaa", "bbb"]);
617
618        let data = b"name,surname\n\r\rjohn,coucou";
619        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
620        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
621
622        assert_eq!(
623            records,
624            vec![brec!["name", "surname"], brec!["john", "coucou"]]
625        );
626
627        Ok(())
628    }
629
630    #[test]
631    fn test_position() -> error::Result<()> {
632        let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
633
634        let mut reader = Reader::from_reader(&data[..]);
635        let mut record = ByteRecord::new();
636
637        let mut positions = vec![reader.position()];
638
639        reader.byte_headers()?;
640
641        positions.push(reader.position());
642
643        while reader.read_byte_record(&mut record)? {
644            positions.push(reader.position());
645        }
646
647        assert_eq!(positions, vec![0, 13, 32, 54]);
648
649        let mut reader = ReaderBuilder::new()
650            .has_headers(false)
651            .from_reader(&data[..]);
652
653        reader.byte_headers()?;
654
655        assert_eq!(reader.position(), 0);
656
657        Ok(())
658    }
659
660    #[test]
661    fn test_reverse_reader() -> error::Result<()> {
662        let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
663        let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
664
665        assert_eq!(
666            reader.byte_records().collect::<Result<Vec<_>, _>>()?,
667            vec![
668                brec!["evan", "michalak"],
669                brec!["beatrice", "babka"],
670                brec!["john", "landis"]
671            ]
672        );
673
674        assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
675
676        Ok(())
677    }
678}