simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9/// Builds a [`Reader`] with given configuration.
10pub struct ReaderBuilder {
11    delimiter: u8,
12    quote: u8,
13    buffer_capacity: usize,
14    flexible: bool,
15    has_headers: bool,
16}
17
18impl Default for ReaderBuilder {
19    fn default() -> Self {
20        Self {
21            delimiter: b',',
22            quote: b'"',
23            buffer_capacity: 8192,
24            flexible: false,
25            has_headers: true,
26        }
27    }
28}
29
30impl ReaderBuilder {
31    /// Create a new [`ReaderBuilder`] with default configuration.
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    /// Create a new [`ReaderBuilder`] with provided `capacity`.
37    pub fn with_capacity(capacity: usize) -> Self {
38        let mut reader = Self::default();
39        reader.buffer_capacity(capacity);
40        reader
41    }
42
43    /// Set the delimiter to be used by the created [`Reader`].
44    ///
45    /// This delimiter must be a single byte.
46    ///
47    /// Will default to a comma.
48    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
49        self.delimiter = delimiter;
50        self
51    }
52
53    /// Set the quote char to be used by the created [`Reader`].
54    ///
55    /// This char must be a single byte.
56    ///
57    /// Will default to a double quote.
58    pub fn quote(&mut self, quote: u8) -> &mut Self {
59        self.quote = quote;
60        self
61    }
62
63    /// Set the capacity of the created [`Reader`]'s buffered reader.
64    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
65        self.buffer_capacity = capacity;
66        self
67    }
68
69    /// Indicate whether the created [`Reader`] should be "flexible", i.e.
70    /// whether it should allow reading records having different number of
71    /// fields than the first one.
72    ///
73    /// Will default to `false`.
74    pub fn flexible(&mut self, yes: bool) -> &mut Self {
75        self.flexible = yes;
76        self
77    }
78
79    /// Indicate whether first record must be understood as a header.
80    ///
81    /// Will default to `true`.
82    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
83        self.has_headers = yes;
84        self
85    }
86
87    /// Create a new [`Reader`] using the provided reader implementing
88    /// [`std::io::Read`].
89    pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
90        Reader {
91            buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
92            inner: CoreReader::new(self.delimiter, self.quote),
93            flexible: self.flexible,
94            headers: ByteRecord::new(),
95            has_read: false,
96            must_reemit_headers: !self.has_headers,
97            has_headers: self.has_headers,
98            index: 0,
99        }
100    }
101
102    /// Create a new [`ReverseReader`] using the provided reader implementing
103    /// both [`std::io::Read`] and [`std::io::Seek`].
104    pub fn reverse_from_reader<R: Read + Seek>(
105        &self,
106        mut reader: R,
107    ) -> error::Result<ReverseReader<R>> {
108        let initial_pos = reader.stream_position()?;
109
110        let mut forward_reader = self.from_reader(reader);
111        let headers = forward_reader.byte_headers()?.clone();
112        let position_after_headers = forward_reader.position();
113
114        let mut reader = forward_reader.into_inner();
115
116        let file_len = reader.seek(SeekFrom::End(0))?;
117
118        let offset = if self.has_headers {
119            initial_pos + position_after_headers
120        } else {
121            initial_pos
122        };
123
124        let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
125
126        Ok(ReverseReader {
127            buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
128            inner: CoreReader::new(self.delimiter, self.quote),
129            flexible: self.flexible,
130            headers,
131        })
132    }
133}
134
135/// An already configured copying/unescaping CSV reader.
136///
137/// # Configuration
138///
139/// To configure a [`Reader`], if you need a custom delimiter for instance of if
140/// you want to tweak the size of the inner buffer. Check out the
141/// [`ReaderBuilder`].
142pub struct Reader<R> {
143    buffer: BufReaderWithPosition<R>,
144    inner: CoreReader,
145    flexible: bool,
146    headers: ByteRecord,
147    has_read: bool,
148    must_reemit_headers: bool,
149    has_headers: bool,
150    index: u64,
151}
152
153impl<R: Read> Reader<R> {
154    /// Create a new reader with default configuration using the provided reader
155    /// implementing [`std::io::Read`].
156    pub fn from_reader(reader: R) -> Self {
157        ReaderBuilder::new().from_reader(reader)
158    }
159
160    #[inline]
161    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
162        if self.flexible {
163            return Ok(());
164        }
165
166        if self.has_read && written != self.headers.len() {
167            return Err(Error::new(ErrorKind::UnequalLengths {
168                expected_len: self.headers.len(),
169                len: written,
170                pos: Some((
171                    byte,
172                    self.index
173                        .saturating_sub(if self.has_headers { 1 } else { 0 }),
174                )),
175            }));
176        }
177
178        Ok(())
179    }
180
181    fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
182        use ReadResult::*;
183
184        record.clear();
185
186        let mut record_builder = ByteRecordBuilder::wrap(record);
187        let byte = self.position();
188
189        loop {
190            let input = self.buffer.fill_buf()?;
191
192            let (result, pos) = self.inner.read_record(input, &mut record_builder);
193
194            self.buffer.consume(pos);
195
196            match result {
197                End => {
198                    return Ok(false);
199                }
200                Cr | Lf | InputEmpty => {
201                    continue;
202                }
203                Record => {
204                    self.index += 1;
205                    self.check_field_count(byte, record.len())?;
206                    return Ok(true);
207                }
208            };
209        }
210    }
211
212    #[inline]
213    fn on_first_read(&mut self) -> error::Result<()> {
214        if self.has_read {
215            return Ok(());
216        }
217
218        // Trimming BOM
219        let input = self.buffer.fill_buf()?;
220        let bom_len = trim_bom(input);
221        self.buffer.consume(bom_len);
222
223        // Reading headers
224        let mut headers = ByteRecord::new();
225
226        let has_data = self.read_byte_record_impl(&mut headers)?;
227
228        if !has_data {
229            self.must_reemit_headers = false;
230        }
231
232        self.headers = headers;
233        self.has_read = true;
234
235        Ok(())
236    }
237
238    /// Returns whether this reader has been configured to interpret the first
239    /// record as a header.
240    #[inline]
241    pub fn has_headers(&self) -> bool {
242        self.has_headers
243    }
244
245    /// Attempt to return a reference to this reader's first record.
246    #[inline]
247    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
248        self.on_first_read()?;
249
250        Ok(&self.headers)
251    }
252
253    /// Attempt to read the next CSV record into a pre-allocated [`ByteRecord`].
254    ///
255    /// Returns a boolean indicating whether a record was actually read or if we
256    /// reached the end of the stream.
257    #[inline(always)]
258    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
259        self.on_first_read()?;
260
261        if self.must_reemit_headers {
262            self.headers.clone_into(record);
263            self.must_reemit_headers = false;
264            return Ok(true);
265        }
266
267        self.read_byte_record_impl(record)
268    }
269
270    /// Return an iterator yielding [`ByteRecord`] structs.
271    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
272        ByteRecordsIter {
273            reader: self,
274            record: ByteRecord::new(),
275        }
276    }
277
278    /// Transform the reader into an iterator yielding [`ByteRecord`] structs.
279    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
280        ByteRecordsIntoIter {
281            reader: self,
282            record: ByteRecord::new(),
283        }
284    }
285
286    /// Get an immutable reference to the underlying reader.
287    pub fn get_ref(&self) -> &R {
288        self.buffer.get_ref()
289    }
290
291    /// Get a mutable reference to the underlying reader.
292    pub fn get_mut(&mut self) -> &mut R {
293        self.buffer.get_mut()
294    }
295
296    /// Unwrap into the underlying reader.
297    ///
298    /// **BEWARE**: any already buffered data will be lost!
299    pub fn into_inner(self) -> R {
300        self.buffer.into_inner().into_inner()
301    }
302
303    /// Unwrap into an optional first record (only when the reader was
304    /// configured not to interpret the first record as a header, and when the
305    /// first record was pre-buffered but not yet reemitted), and the underlying
306    /// [`BufReader`].
307    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
308        (
309            self.must_reemit_headers.then_some(self.headers),
310            self.buffer.into_inner(),
311        )
312    }
313
314    /// Returns the current byte offset of the reader in the wrapped stream.
315    #[inline(always)]
316    pub fn position(&self) -> u64 {
317        if self.must_reemit_headers {
318            0
319        } else {
320            self.buffer.position()
321        }
322    }
323}
324
325pub struct ByteRecordsIter<'r, R> {
326    reader: &'r mut Reader<R>,
327    record: ByteRecord,
328}
329
330impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
331    type Item = error::Result<ByteRecord>;
332
333    #[inline]
334    fn next(&mut self) -> Option<Self::Item> {
335        // NOTE: cloning the record will not carry over excess capacity
336        // because the record only contains `Vec` currently.
337        match self.reader.read_byte_record(&mut self.record) {
338            Err(err) => Some(Err(err)),
339            Ok(true) => Some(Ok(self.record.clone())),
340            Ok(false) => None,
341        }
342    }
343}
344
345pub struct ByteRecordsIntoIter<R> {
346    reader: Reader<R>,
347    record: ByteRecord,
348}
349
350impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
351    type Item = error::Result<ByteRecord>;
352
353    #[inline]
354    fn next(&mut self) -> Option<Self::Item> {
355        // NOTE: cloning the record will not carry over excess capacity
356        // because the record only contains `Vec` currently.
357        match self.reader.read_byte_record(&mut self.record) {
358            Err(err) => Some(Err(err)),
359            Ok(true) => Some(Ok(self.record.clone())),
360            Ok(false) => None,
361        }
362    }
363}
364
365/// An already configured reverse CSV reader.
366///
367/// # Configuration
368///
369/// To configure a [`ReverseReader`], if you need a custom delimiter for instance of if
370/// you want to tweak the size of the inner buffer. Check out the
371/// [`ReaderBuilder`].
372pub struct ReverseReader<R> {
373    inner: CoreReader,
374    buffer: BufReader<utils::ReverseReader<R>>,
375    flexible: bool,
376    headers: ByteRecord,
377}
378
379impl<R: Read + Seek> ReverseReader<R> {
380    /// Create a new reverse reader with default configuration using the
381    /// provided reader implementing both [`std::io::Read`] and
382    /// [`std::io::Seek`].
383    pub fn from_reader(reader: R) -> error::Result<Self> {
384        ReaderBuilder::new().reverse_from_reader(reader)
385    }
386
387    /// Attempt to return a reference to this reader's first record.
388    pub fn byte_headers(&self) -> &ByteRecord {
389        &self.headers
390    }
391
392    #[inline]
393    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
394        if self.flexible {
395            return Ok(());
396        }
397
398        if written != self.headers.len() {
399            return Err(Error::new(ErrorKind::UnequalLengths {
400                expected_len: self.headers.len(),
401                len: written,
402                pos: None,
403            }));
404        }
405
406        Ok(())
407    }
408
409    /// Attempt to read the next CSV record into a pre-allocated [`ByteRecord`].
410    ///
411    /// Returns a boolean indicating whether a record was actually read or if we
412    /// reached the end of the stream.
413    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
414        use ReadResult::*;
415
416        record.clear();
417
418        let mut record_builder = ByteRecordBuilder::wrap(record);
419
420        loop {
421            let input = self.buffer.fill_buf()?;
422
423            let (result, pos) = self.inner.read_record(input, &mut record_builder);
424
425            self.buffer.consume(pos);
426
427            match result {
428                End => {
429                    return Ok(false);
430                }
431                Cr | Lf | InputEmpty => {
432                    continue;
433                }
434                Record => {
435                    self.check_field_count(record.len())?;
436                    record.reverse();
437                    return Ok(true);
438                }
439            };
440        }
441    }
442
443    /// Return an iterator yielding [`ByteRecord`] structs.
444    pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
445        ReverseByteRecordsIter {
446            reader: self,
447            record: ByteRecord::new(),
448        }
449    }
450
451    /// Transform the reader into an iterator yielding [`ByteRecord`] structs.
452    pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
453        ReverseByteRecordsIntoIter {
454            reader: self,
455            record: ByteRecord::new(),
456        }
457    }
458}
459
460pub struct ReverseByteRecordsIter<'r, R> {
461    reader: &'r mut ReverseReader<R>,
462    record: ByteRecord,
463}
464
465impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
466    type Item = error::Result<ByteRecord>;
467
468    #[inline]
469    fn next(&mut self) -> Option<Self::Item> {
470        // NOTE: cloning the record will not carry over excess capacity
471        // because the record only contains `Vec` currently.
472        match self.reader.read_byte_record(&mut self.record) {
473            Err(err) => Some(Err(err)),
474            Ok(true) => Some(Ok(self.record.clone())),
475            Ok(false) => None,
476        }
477    }
478}
479
480pub struct ReverseByteRecordsIntoIter<R> {
481    reader: ReverseReader<R>,
482    record: ByteRecord,
483}
484
485impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
486    type Item = error::Result<ByteRecord>;
487
488    #[inline]
489    fn next(&mut self) -> Option<Self::Item> {
490        // NOTE: cloning the record will not carry over excess capacity
491        // because the record only contains `Vec` currently.
492        match self.reader.read_byte_record(&mut self.record) {
493            Err(err) => Some(Err(err)),
494            Ok(true) => Some(Ok(self.record.clone())),
495            Ok(false) => None,
496        }
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use std::io::Cursor;
503
504    use super::*;
505
506    impl<R: Read> Reader<R> {
507        fn from_reader_no_headers(reader: R) -> Self {
508            ReaderBuilder::new().has_headers(false).from_reader(reader)
509        }
510    }
511
512    #[test]
513    fn test_read_byte_record() -> error::Result<()> {
514        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
515
516        let expected = vec![
517            brec!["name", "surname", "age"],
518            brec!["john", "landy, the \"everlasting\" bastard", "45"],
519            brec!["\"ok\"", "whatever", "dude"],
520            brec!["lucy", "rose", "67"],
521            brec!["jermaine", "jackson", "89"],
522            brec!["karine", "loucan", "52"],
523            brec!["rose", "glib", "12"],
524            brec!["guillaume", "plique", "42"],
525        ];
526
527        for capacity in [32usize, 4, 3, 2, 1] {
528            let mut reader = ReaderBuilder::with_capacity(capacity)
529                .has_headers(false)
530                .from_reader(Cursor::new(csv));
531
532            assert_eq!(
533                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
534                expected,
535            );
536        }
537
538        Ok(())
539    }
540
541    #[test]
542    fn test_strip_bom() -> error::Result<()> {
543        let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
544
545        assert_eq!(
546            reader.byte_records().next().unwrap()?,
547            brec!["name", "surname", "age"]
548        );
549
550        let mut reader =
551            Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
552
553        assert_eq!(
554            reader.byte_records().next().unwrap()?,
555            brec!["name", "surname", "age"]
556        );
557
558        Ok(())
559    }
560
561    #[test]
562    fn test_empty_row() -> error::Result<()> {
563        let data = "name\n\"\"\nlucy\n\"\"";
564
565        // Read
566        let reader = Reader::from_reader_no_headers(Cursor::new(data));
567
568        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
569
570        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
571
572        assert_eq!(records, expected);
573
574        Ok(())
575    }
576
577    #[test]
578    fn test_crlf() -> error::Result<()> {
579        let reader = Reader::from_reader_no_headers(Cursor::new(
580            "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
581        ));
582
583        let expected = vec![
584            brec!["name", "surname"],
585            brec!["lucy", "john"],
586            brec!["evan", "zhong"],
587            brec!["béatrice", "glougou"],
588        ];
589
590        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
591
592        assert_eq!(records, expected);
593
594        Ok(())
595    }
596
597    #[test]
598    fn test_quote_always() -> error::Result<()> {
599        let reader = Reader::from_reader_no_headers(Cursor::new(
600            "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
601        ));
602
603        let expected = vec![
604            brec!["name", "surname"],
605            brec!["lucy", "rose"],
606            brec!["john", "mayhew"],
607        ];
608
609        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
610
611        assert_eq!(records, expected);
612
613        Ok(())
614    }
615
616    #[test]
617    fn test_byte_headers() -> error::Result<()> {
618        let data = b"name,surname\njohn,dandy";
619
620        // Headers, call before read
621        let mut reader = Reader::from_reader(Cursor::new(data));
622        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
623        assert_eq!(
624            reader.byte_records().next().unwrap()?,
625            brec!["john", "dandy"]
626        );
627
628        // Headers, call after read
629        let mut reader = Reader::from_reader(Cursor::new(data));
630        assert_eq!(
631            reader.byte_records().next().unwrap()?,
632            brec!["john", "dandy"]
633        );
634        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
635
636        // No headers, call before read
637        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
638        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
639        assert_eq!(
640            reader.byte_records().next().unwrap()?,
641            brec!["name", "surname"]
642        );
643
644        // No headers, call after read
645        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
646        assert_eq!(
647            reader.byte_records().next().unwrap()?,
648            brec!["name", "surname"]
649        );
650        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
651
652        // Headers, empty
653        let mut reader = Reader::from_reader(Cursor::new(b""));
654        assert_eq!(reader.byte_headers()?, &brec![]);
655        assert!(reader.byte_records().next().is_none());
656
657        // No headers, empty
658        let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
659        assert_eq!(reader.byte_headers()?, &brec![]);
660        assert!(reader.byte_records().next().is_none());
661
662        Ok(())
663    }
664
665    #[test]
666    fn test_weirdness() -> error::Result<()> {
667        // Data after quotes, before next delimiter
668        let data =
669            b"name,surname\n\"test\"  \"wat\", ok\ntest \"wat\",ok  \ntest,\"whatever\"  ok\n\"test\"   there,\"ok\"\r\n";
670        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
671
672        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
673
674        let expected = vec![
675            brec!["name", "surname"],
676            brec!["test  \"wat", " ok"],
677            brec!["test \"wat", "ok  "],
678            brec!["test", "whatever  ok"],
679            brec!["test   there", "ok"],
680        ];
681
682        assert_eq!(records, expected);
683
684        // let data = "aaa\"aaa,bbb";
685        // let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
686        // let record = reader.byte_records().next().unwrap().unwrap();
687
688        // assert_eq!(record, brec!["aaa\"aaa", "bbb"]);
689
690        let data = b"name,surname\n\r\rjohn,coucou";
691        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
692        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
693
694        assert_eq!(
695            records,
696            vec![brec!["name", "surname"], brec!["john", "coucou"]]
697        );
698
699        Ok(())
700    }
701
702    #[test]
703    fn test_position() -> error::Result<()> {
704        let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
705
706        let mut reader = Reader::from_reader(&data[..]);
707        let mut record = ByteRecord::new();
708
709        let mut positions = vec![reader.position()];
710
711        reader.byte_headers()?;
712
713        positions.push(reader.position());
714
715        while reader.read_byte_record(&mut record)? {
716            positions.push(reader.position());
717        }
718
719        assert_eq!(positions, vec![0, 13, 32, 54]);
720
721        let mut reader = ReaderBuilder::new()
722            .has_headers(false)
723            .from_reader(&data[..]);
724
725        reader.byte_headers()?;
726
727        assert_eq!(reader.position(), 0);
728
729        Ok(())
730    }
731
732    #[test]
733    fn test_reverse_reader() -> error::Result<()> {
734        let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
735        let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
736
737        assert_eq!(
738            reader.byte_records().collect::<Result<Vec<_>, _>>()?,
739            vec![
740                brec!["evan", "michalak"],
741                brec!["beatrice", "babka"],
742                brec!["john", "landis"]
743            ]
744        );
745
746        assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
747
748        Ok(())
749    }
750}