simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
10    delimiter: u8,
11    quote: u8,
12    buffer_capacity: usize,
13    flexible: bool,
14    has_headers: bool,
15}
16
17impl Default for ReaderBuilder {
18    fn default() -> Self {
19        Self {
20            delimiter: b',',
21            quote: b'"',
22            buffer_capacity: 8192,
23            flexible: false,
24            has_headers: true,
25        }
26    }
27}
28
29impl ReaderBuilder {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn with_capacity(capacity: usize) -> Self {
35        let mut reader = Self::default();
36        reader.buffer_capacity(capacity);
37        reader
38    }
39
40    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
41        self.delimiter = delimiter;
42        self
43    }
44
45    pub fn quote(&mut self, quote: u8) -> &mut Self {
46        self.quote = quote;
47        self
48    }
49
50    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
51        self.buffer_capacity = capacity;
52        self
53    }
54
55    pub fn flexible(&mut self, yes: bool) -> &mut Self {
56        self.flexible = yes;
57        self
58    }
59
60    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
61        self.has_headers = yes;
62        self
63    }
64
65    pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
66        Reader {
67            buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
68            inner: CoreReader::new(self.delimiter, self.quote),
69            flexible: self.flexible,
70            headers: ByteRecord::new(),
71            has_read: false,
72            must_reemit_headers: !self.has_headers,
73            has_headers: self.has_headers,
74            index: 0,
75        }
76    }
77
78    pub fn reverse_from_reader<R: Read + Seek>(
79        &self,
80        mut reader: R,
81    ) -> error::Result<ReverseReader<R>> {
82        let initial_pos = reader.stream_position()?;
83
84        let mut forward_reader = self.from_reader(reader);
85        let headers = forward_reader.byte_headers()?.clone();
86        let position_after_headers = forward_reader.position();
87
88        let mut reader = forward_reader.into_inner();
89
90        let file_len = reader.seek(SeekFrom::End(0))?;
91
92        let offset = if self.has_headers {
93            initial_pos + position_after_headers
94        } else {
95            initial_pos
96        };
97
98        let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
99
100        Ok(ReverseReader {
101            buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
102            inner: CoreReader::new(self.delimiter, self.quote),
103            flexible: self.flexible,
104            headers,
105        })
106    }
107}
108
109pub struct Reader<R> {
110    buffer: BufReaderWithPosition<R>,
111    inner: CoreReader,
112    flexible: bool,
113    headers: ByteRecord,
114    has_read: bool,
115    must_reemit_headers: bool,
116    has_headers: bool,
117    index: u64,
118}
119
120impl<R: Read> Reader<R> {
121    pub fn from_reader(reader: R) -> Self {
122        ReaderBuilder::new().from_reader(reader)
123    }
124
125    #[inline]
126    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
127        if self.flexible {
128            return Ok(());
129        }
130
131        if self.has_read && written != self.headers.len() {
132            return Err(Error::new(ErrorKind::UnequalLengths {
133                expected_len: self.headers.len(),
134                len: written,
135                pos: Some((
136                    byte,
137                    self.index
138                        .saturating_sub(if self.has_headers { 1 } else { 0 }),
139                )),
140            }));
141        }
142
143        Ok(())
144    }
145
146    fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
147        use ReadResult::*;
148
149        record.clear();
150
151        let mut record_builder = ByteRecordBuilder::wrap(record);
152        let byte = self.position();
153
154        loop {
155            let input = self.buffer.fill_buf()?;
156
157            let (result, pos) = self.inner.read_record(input, &mut record_builder);
158
159            self.buffer.consume(pos);
160
161            match result {
162                End => {
163                    return Ok(false);
164                }
165                Cr | Lf | InputEmpty => {
166                    continue;
167                }
168                Record => {
169                    self.index += 1;
170                    self.check_field_count(byte, record.len())?;
171                    return Ok(true);
172                }
173            };
174        }
175    }
176
177    #[inline]
178    fn on_first_read(&mut self) -> error::Result<()> {
179        if self.has_read {
180            return Ok(());
181        }
182
183        // Trimming BOM
184        let input = self.buffer.fill_buf()?;
185        let bom_len = trim_bom(input);
186        self.buffer.consume(bom_len);
187
188        // Reading headers
189        let mut headers = ByteRecord::new();
190
191        let has_data = self.read_byte_record_impl(&mut headers)?;
192
193        if !has_data {
194            self.must_reemit_headers = false;
195        }
196
197        self.headers = headers;
198        self.has_read = true;
199
200        Ok(())
201    }
202
203    #[inline]
204    pub fn has_headers(&self) -> bool {
205        self.has_headers
206    }
207
208    #[inline]
209    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
210        self.on_first_read()?;
211
212        Ok(&self.headers)
213    }
214
215    #[inline(always)]
216    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
217        self.on_first_read()?;
218
219        if self.must_reemit_headers {
220            self.headers.clone_into(record);
221            self.must_reemit_headers = false;
222            return Ok(true);
223        }
224
225        self.read_byte_record_impl(record)
226    }
227
228    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
229        ByteRecordsIter {
230            reader: self,
231            record: ByteRecord::new(),
232        }
233    }
234
235    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
236        ByteRecordsIntoIter {
237            reader: self,
238            record: ByteRecord::new(),
239        }
240    }
241
242    pub fn get_ref(&self) -> &R {
243        self.buffer.get_ref()
244    }
245
246    pub fn get_mut(&mut self) -> &mut R {
247        self.buffer.get_mut()
248    }
249
250    pub fn into_inner(self) -> R {
251        self.buffer.into_inner().into_inner()
252    }
253
254    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
255        (
256            self.must_reemit_headers.then_some(self.headers),
257            self.buffer.into_inner(),
258        )
259    }
260
261    #[inline(always)]
262    pub fn position(&self) -> u64 {
263        if self.must_reemit_headers {
264            0
265        } else {
266            self.buffer.position()
267        }
268    }
269}
270
271pub struct ByteRecordsIter<'r, R> {
272    reader: &'r mut Reader<R>,
273    record: ByteRecord,
274}
275
276impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
277    type Item = error::Result<ByteRecord>;
278
279    #[inline]
280    fn next(&mut self) -> Option<Self::Item> {
281        // NOTE: cloning the record will not carry over excess capacity
282        // because the record only contains `Vec` currently.
283        match self.reader.read_byte_record(&mut self.record) {
284            Err(err) => Some(Err(err)),
285            Ok(true) => Some(Ok(self.record.clone())),
286            Ok(false) => None,
287        }
288    }
289}
290
291pub struct ByteRecordsIntoIter<R> {
292    reader: Reader<R>,
293    record: ByteRecord,
294}
295
296impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
297    type Item = error::Result<ByteRecord>;
298
299    #[inline]
300    fn next(&mut self) -> Option<Self::Item> {
301        // NOTE: cloning the record will not carry over excess capacity
302        // because the record only contains `Vec` currently.
303        match self.reader.read_byte_record(&mut self.record) {
304            Err(err) => Some(Err(err)),
305            Ok(true) => Some(Ok(self.record.clone())),
306            Ok(false) => None,
307        }
308    }
309}
310
311pub struct ReverseReader<R> {
312    inner: CoreReader,
313    buffer: BufReader<utils::ReverseReader<R>>,
314    flexible: bool,
315    headers: ByteRecord,
316}
317
318impl<R: Read + Seek> ReverseReader<R> {
319    pub fn from_reader(reader: R) -> error::Result<Self> {
320        ReaderBuilder::new().reverse_from_reader(reader)
321    }
322
323    pub fn byte_headers(&self) -> &ByteRecord {
324        &self.headers
325    }
326
327    #[inline]
328    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
329        if self.flexible {
330            return Ok(());
331        }
332
333        if written != self.headers.len() {
334            return Err(Error::new(ErrorKind::UnequalLengths {
335                expected_len: self.headers.len(),
336                len: written,
337                pos: None,
338            }));
339        }
340
341        Ok(())
342    }
343
344    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
345        use ReadResult::*;
346
347        record.clear();
348
349        let mut record_builder = ByteRecordBuilder::wrap(record);
350
351        loop {
352            let input = self.buffer.fill_buf()?;
353
354            let (result, pos) = self.inner.read_record(input, &mut record_builder);
355
356            self.buffer.consume(pos);
357
358            match result {
359                End => {
360                    return Ok(false);
361                }
362                Cr | Lf | InputEmpty => {
363                    continue;
364                }
365                Record => {
366                    self.check_field_count(record.len())?;
367                    record.reverse();
368                    return Ok(true);
369                }
370            };
371        }
372    }
373
374    pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
375        ReverseByteRecordsIter {
376            reader: self,
377            record: ByteRecord::new(),
378        }
379    }
380
381    pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
382        ReverseByteRecordsIntoIter {
383            reader: self,
384            record: ByteRecord::new(),
385        }
386    }
387}
388
389pub struct ReverseByteRecordsIter<'r, R> {
390    reader: &'r mut ReverseReader<R>,
391    record: ByteRecord,
392}
393
394impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
395    type Item = error::Result<ByteRecord>;
396
397    #[inline]
398    fn next(&mut self) -> Option<Self::Item> {
399        // NOTE: cloning the record will not carry over excess capacity
400        // because the record only contains `Vec` currently.
401        match self.reader.read_byte_record(&mut self.record) {
402            Err(err) => Some(Err(err)),
403            Ok(true) => Some(Ok(self.record.clone())),
404            Ok(false) => None,
405        }
406    }
407}
408
409pub struct ReverseByteRecordsIntoIter<R> {
410    reader: ReverseReader<R>,
411    record: ByteRecord,
412}
413
414impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
415    type Item = error::Result<ByteRecord>;
416
417    #[inline]
418    fn next(&mut self) -> Option<Self::Item> {
419        // NOTE: cloning the record will not carry over excess capacity
420        // because the record only contains `Vec` currently.
421        match self.reader.read_byte_record(&mut self.record) {
422            Err(err) => Some(Err(err)),
423            Ok(true) => Some(Ok(self.record.clone())),
424            Ok(false) => None,
425        }
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use std::io::Cursor;
432
433    use crate::brec;
434
435    use super::*;
436
437    impl<R: Read> Reader<R> {
438        fn from_reader_no_headers(reader: R) -> Self {
439            ReaderBuilder::new().has_headers(false).from_reader(reader)
440        }
441    }
442
443    #[test]
444    fn test_read_byte_record() -> error::Result<()> {
445        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
446
447        let expected = vec![
448            brec!["name", "surname", "age"],
449            brec!["john", "landy, the \"everlasting\" bastard", "45"],
450            brec!["\"ok\"", "whatever", "dude"],
451            brec!["lucy", "rose", "67"],
452            brec!["jermaine", "jackson", "89"],
453            brec!["karine", "loucan", "52"],
454            brec!["rose", "glib", "12"],
455            brec!["guillaume", "plique", "42"],
456        ];
457
458        for capacity in [32usize, 4, 3, 2, 1] {
459            let mut reader = ReaderBuilder::with_capacity(capacity)
460                .has_headers(false)
461                .from_reader(Cursor::new(csv));
462
463            assert_eq!(
464                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
465                expected,
466            );
467        }
468
469        Ok(())
470    }
471
472    #[test]
473    fn test_strip_bom() -> error::Result<()> {
474        let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
475
476        assert_eq!(
477            reader.byte_records().next().unwrap()?,
478            brec!["name", "surname", "age"]
479        );
480
481        let mut reader =
482            Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
483
484        assert_eq!(
485            reader.byte_records().next().unwrap()?,
486            brec!["name", "surname", "age"]
487        );
488
489        Ok(())
490    }
491
492    #[test]
493    fn test_empty_row() -> error::Result<()> {
494        let data = "name\n\"\"\nlucy\n\"\"";
495
496        // Read
497        let reader = Reader::from_reader_no_headers(Cursor::new(data));
498
499        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
500
501        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
502
503        assert_eq!(records, expected);
504
505        Ok(())
506    }
507
508    #[test]
509    fn test_crlf() -> error::Result<()> {
510        let reader = Reader::from_reader_no_headers(Cursor::new(
511            "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
512        ));
513
514        let expected = vec![
515            brec!["name", "surname"],
516            brec!["lucy", "john"],
517            brec!["evan", "zhong"],
518            brec!["béatrice", "glougou"],
519        ];
520
521        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
522
523        assert_eq!(records, expected);
524
525        Ok(())
526    }
527
528    #[test]
529    fn test_quote_always() -> error::Result<()> {
530        let reader = Reader::from_reader_no_headers(Cursor::new(
531            "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
532        ));
533
534        let expected = vec![
535            brec!["name", "surname"],
536            brec!["lucy", "rose"],
537            brec!["john", "mayhew"],
538        ];
539
540        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
541
542        assert_eq!(records, expected);
543
544        Ok(())
545    }
546
547    #[test]
548    fn test_byte_headers() -> error::Result<()> {
549        let data = b"name,surname\njohn,dandy";
550
551        // Headers, call before read
552        let mut reader = Reader::from_reader(Cursor::new(data));
553        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
554        assert_eq!(
555            reader.byte_records().next().unwrap()?,
556            brec!["john", "dandy"]
557        );
558
559        // Headers, call after read
560        let mut reader = Reader::from_reader(Cursor::new(data));
561        assert_eq!(
562            reader.byte_records().next().unwrap()?,
563            brec!["john", "dandy"]
564        );
565        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
566
567        // No headers, call before read
568        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
569        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
570        assert_eq!(
571            reader.byte_records().next().unwrap()?,
572            brec!["name", "surname"]
573        );
574
575        // No headers, call after read
576        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
577        assert_eq!(
578            reader.byte_records().next().unwrap()?,
579            brec!["name", "surname"]
580        );
581        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
582
583        // Headers, empty
584        let mut reader = Reader::from_reader(Cursor::new(b""));
585        assert_eq!(reader.byte_headers()?, &brec![]);
586        assert!(reader.byte_records().next().is_none());
587
588        // No headers, empty
589        let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
590        assert_eq!(reader.byte_headers()?, &brec![]);
591        assert!(reader.byte_records().next().is_none());
592
593        Ok(())
594    }
595
596    #[test]
597    fn test_weirdness() -> error::Result<()> {
598        // Data after quotes, before next delimiter
599        let data =
600            b"name,surname\n\"test\"  \"wat\", ok\ntest \"wat\",ok  \ntest,\"whatever\"  ok\n\"test\"   there,\"ok\"\r\n";
601        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
602
603        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
604
605        let expected = vec![
606            brec!["name", "surname"],
607            brec!["test  \"wat", " ok"],
608            brec!["test \"wat", "ok  "],
609            brec!["test", "whatever  ok"],
610            brec!["test   there", "ok"],
611        ];
612
613        assert_eq!(records, expected);
614
615        // let data = "aaa\"aaa,bbb";
616        // let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
617        // let record = reader.byte_records().next().unwrap().unwrap();
618
619        // assert_eq!(record, brec!["aaa\"aaa", "bbb"]);
620
621        let data = b"name,surname\n\r\rjohn,coucou";
622        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
623        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
624
625        assert_eq!(
626            records,
627            vec![brec!["name", "surname"], brec!["john", "coucou"]]
628        );
629
630        Ok(())
631    }
632
633    #[test]
634    fn test_position() -> error::Result<()> {
635        let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
636
637        let mut reader = Reader::from_reader(&data[..]);
638        let mut record = ByteRecord::new();
639
640        let mut positions = vec![reader.position()];
641
642        reader.byte_headers()?;
643
644        positions.push(reader.position());
645
646        while reader.read_byte_record(&mut record)? {
647            positions.push(reader.position());
648        }
649
650        assert_eq!(positions, vec![0, 13, 32, 54]);
651
652        let mut reader = ReaderBuilder::new()
653            .has_headers(false)
654            .from_reader(&data[..]);
655
656        reader.byte_headers()?;
657
658        assert_eq!(reader.position(), 0);
659
660        Ok(())
661    }
662
663    #[test]
664    fn test_reverse_reader() -> error::Result<()> {
665        let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
666        let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
667
668        assert_eq!(
669            reader.byte_records().collect::<Result<Vec<_>, _>>()?,
670            vec![
671                brec!["evan", "michalak"],
672                brec!["beatrice", "babka"],
673                brec!["john", "landis"]
674            ]
675        );
676
677        assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
678
679        Ok(())
680    }
681}