simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9/// Builds a [`Reader`] with given configuration.
10pub struct ReaderBuilder {
11    delimiter: u8,
12    quote: u8,
13    buffer_capacity: usize,
14    flexible: bool,
15    has_headers: bool,
16}
17
18impl Default for ReaderBuilder {
19    fn default() -> Self {
20        Self {
21            delimiter: b',',
22            quote: b'"',
23            buffer_capacity: 8192,
24            flexible: false,
25            has_headers: true,
26        }
27    }
28}
29
30impl ReaderBuilder {
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    pub fn with_capacity(capacity: usize) -> Self {
36        let mut reader = Self::default();
37        reader.buffer_capacity(capacity);
38        reader
39    }
40
41    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
42        self.delimiter = delimiter;
43        self
44    }
45
46    pub fn quote(&mut self, quote: u8) -> &mut Self {
47        self.quote = quote;
48        self
49    }
50
51    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
52        self.buffer_capacity = capacity;
53        self
54    }
55
56    pub fn flexible(&mut self, yes: bool) -> &mut Self {
57        self.flexible = yes;
58        self
59    }
60
61    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
62        self.has_headers = yes;
63        self
64    }
65
66    pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
67        Reader {
68            buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
69            inner: CoreReader::new(self.delimiter, self.quote),
70            flexible: self.flexible,
71            headers: ByteRecord::new(),
72            has_read: false,
73            must_reemit_headers: !self.has_headers,
74            has_headers: self.has_headers,
75            index: 0,
76        }
77    }
78
79    pub fn reverse_from_reader<R: Read + Seek>(
80        &self,
81        mut reader: R,
82    ) -> error::Result<ReverseReader<R>> {
83        let initial_pos = reader.stream_position()?;
84
85        let mut forward_reader = self.from_reader(reader);
86        let headers = forward_reader.byte_headers()?.clone();
87        let position_after_headers = forward_reader.position();
88
89        let mut reader = forward_reader.into_inner();
90
91        let file_len = reader.seek(SeekFrom::End(0))?;
92
93        let offset = if self.has_headers {
94            initial_pos + position_after_headers
95        } else {
96            initial_pos
97        };
98
99        let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
100
101        Ok(ReverseReader {
102            buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
103            inner: CoreReader::new(self.delimiter, self.quote),
104            flexible: self.flexible,
105            headers,
106        })
107    }
108}
109
110/// An already configured copying/unescaping CSV reader.
111///
112/// # Configuration
113///
114/// To configure a [`Reader`], if you need a custom delimiter for instance of if
115/// you want to tweak the size of the inner buffer. Check out the
116/// [`ReaderBuilder`].
117pub struct Reader<R> {
118    buffer: BufReaderWithPosition<R>,
119    inner: CoreReader,
120    flexible: bool,
121    headers: ByteRecord,
122    has_read: bool,
123    must_reemit_headers: bool,
124    has_headers: bool,
125    index: u64,
126}
127
128impl<R: Read> Reader<R> {
129    /// Create a new reader with default configuration using the provided reader
130    /// implementing [`std::io::Read`].
131    pub fn from_reader(reader: R) -> Self {
132        ReaderBuilder::new().from_reader(reader)
133    }
134
135    #[inline]
136    fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
137        if self.flexible {
138            return Ok(());
139        }
140
141        if self.has_read && written != self.headers.len() {
142            return Err(Error::new(ErrorKind::UnequalLengths {
143                expected_len: self.headers.len(),
144                len: written,
145                pos: Some((
146                    byte,
147                    self.index
148                        .saturating_sub(if self.has_headers { 1 } else { 0 }),
149                )),
150            }));
151        }
152
153        Ok(())
154    }
155
156    fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
157        use ReadResult::*;
158
159        record.clear();
160
161        let mut record_builder = ByteRecordBuilder::wrap(record);
162        let byte = self.position();
163
164        loop {
165            let input = self.buffer.fill_buf()?;
166
167            let (result, pos) = self.inner.read_record(input, &mut record_builder);
168
169            self.buffer.consume(pos);
170
171            match result {
172                End => {
173                    return Ok(false);
174                }
175                Cr | Lf | InputEmpty => {
176                    continue;
177                }
178                Record => {
179                    self.index += 1;
180                    self.check_field_count(byte, record.len())?;
181                    return Ok(true);
182                }
183            };
184        }
185    }
186
187    #[inline]
188    fn on_first_read(&mut self) -> error::Result<()> {
189        if self.has_read {
190            return Ok(());
191        }
192
193        // Trimming BOM
194        let input = self.buffer.fill_buf()?;
195        let bom_len = trim_bom(input);
196        self.buffer.consume(bom_len);
197
198        // Reading headers
199        let mut headers = ByteRecord::new();
200
201        let has_data = self.read_byte_record_impl(&mut headers)?;
202
203        if !has_data {
204            self.must_reemit_headers = false;
205        }
206
207        self.headers = headers;
208        self.has_read = true;
209
210        Ok(())
211    }
212
213    /// Returns whether this reader has been configured to interpret the first
214    /// record as a header.
215    #[inline]
216    pub fn has_headers(&self) -> bool {
217        self.has_headers
218    }
219
220    #[inline]
221    pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
222        self.on_first_read()?;
223
224        Ok(&self.headers)
225    }
226
227    #[inline(always)]
228    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
229        self.on_first_read()?;
230
231        if self.must_reemit_headers {
232            self.headers.clone_into(record);
233            self.must_reemit_headers = false;
234            return Ok(true);
235        }
236
237        self.read_byte_record_impl(record)
238    }
239
240    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
241        ByteRecordsIter {
242            reader: self,
243            record: ByteRecord::new(),
244        }
245    }
246
247    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
248        ByteRecordsIntoIter {
249            reader: self,
250            record: ByteRecord::new(),
251        }
252    }
253
254    pub fn get_ref(&self) -> &R {
255        self.buffer.get_ref()
256    }
257
258    pub fn get_mut(&mut self) -> &mut R {
259        self.buffer.get_mut()
260    }
261
262    pub fn into_inner(self) -> R {
263        self.buffer.into_inner().into_inner()
264    }
265
266    pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
267        (
268            self.must_reemit_headers.then_some(self.headers),
269            self.buffer.into_inner(),
270        )
271    }
272
273    /// Returns the current byte offset of the reader in the wrapped stream.
274    #[inline(always)]
275    pub fn position(&self) -> u64 {
276        if self.must_reemit_headers {
277            0
278        } else {
279            self.buffer.position()
280        }
281    }
282}
283
284pub struct ByteRecordsIter<'r, R> {
285    reader: &'r mut Reader<R>,
286    record: ByteRecord,
287}
288
289impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
290    type Item = error::Result<ByteRecord>;
291
292    #[inline]
293    fn next(&mut self) -> Option<Self::Item> {
294        // NOTE: cloning the record will not carry over excess capacity
295        // because the record only contains `Vec` currently.
296        match self.reader.read_byte_record(&mut self.record) {
297            Err(err) => Some(Err(err)),
298            Ok(true) => Some(Ok(self.record.clone())),
299            Ok(false) => None,
300        }
301    }
302}
303
304pub struct ByteRecordsIntoIter<R> {
305    reader: Reader<R>,
306    record: ByteRecord,
307}
308
309impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
310    type Item = error::Result<ByteRecord>;
311
312    #[inline]
313    fn next(&mut self) -> Option<Self::Item> {
314        // NOTE: cloning the record will not carry over excess capacity
315        // because the record only contains `Vec` currently.
316        match self.reader.read_byte_record(&mut self.record) {
317            Err(err) => Some(Err(err)),
318            Ok(true) => Some(Ok(self.record.clone())),
319            Ok(false) => None,
320        }
321    }
322}
323
324/// An already configured reverse CSV reader.
325///
326/// # Configuration
327///
328/// To configure a [`ReverseReader`], if you need a custom delimiter for instance of if
329/// you want to tweak the size of the inner buffer. Check out the
330/// [`ReaderBuilder`].
331pub struct ReverseReader<R> {
332    inner: CoreReader,
333    buffer: BufReader<utils::ReverseReader<R>>,
334    flexible: bool,
335    headers: ByteRecord,
336}
337
338impl<R: Read + Seek> ReverseReader<R> {
339    pub fn from_reader(reader: R) -> error::Result<Self> {
340        ReaderBuilder::new().reverse_from_reader(reader)
341    }
342
343    pub fn byte_headers(&self) -> &ByteRecord {
344        &self.headers
345    }
346
347    #[inline]
348    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
349        if self.flexible {
350            return Ok(());
351        }
352
353        if written != self.headers.len() {
354            return Err(Error::new(ErrorKind::UnequalLengths {
355                expected_len: self.headers.len(),
356                len: written,
357                pos: None,
358            }));
359        }
360
361        Ok(())
362    }
363
364    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
365        use ReadResult::*;
366
367        record.clear();
368
369        let mut record_builder = ByteRecordBuilder::wrap(record);
370
371        loop {
372            let input = self.buffer.fill_buf()?;
373
374            let (result, pos) = self.inner.read_record(input, &mut record_builder);
375
376            self.buffer.consume(pos);
377
378            match result {
379                End => {
380                    return Ok(false);
381                }
382                Cr | Lf | InputEmpty => {
383                    continue;
384                }
385                Record => {
386                    self.check_field_count(record.len())?;
387                    record.reverse();
388                    return Ok(true);
389                }
390            };
391        }
392    }
393
394    pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
395        ReverseByteRecordsIter {
396            reader: self,
397            record: ByteRecord::new(),
398        }
399    }
400
401    pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
402        ReverseByteRecordsIntoIter {
403            reader: self,
404            record: ByteRecord::new(),
405        }
406    }
407}
408
409pub struct ReverseByteRecordsIter<'r, R> {
410    reader: &'r mut ReverseReader<R>,
411    record: ByteRecord,
412}
413
414impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
415    type Item = error::Result<ByteRecord>;
416
417    #[inline]
418    fn next(&mut self) -> Option<Self::Item> {
419        // NOTE: cloning the record will not carry over excess capacity
420        // because the record only contains `Vec` currently.
421        match self.reader.read_byte_record(&mut self.record) {
422            Err(err) => Some(Err(err)),
423            Ok(true) => Some(Ok(self.record.clone())),
424            Ok(false) => None,
425        }
426    }
427}
428
429pub struct ReverseByteRecordsIntoIter<R> {
430    reader: ReverseReader<R>,
431    record: ByteRecord,
432}
433
434impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
435    type Item = error::Result<ByteRecord>;
436
437    #[inline]
438    fn next(&mut self) -> Option<Self::Item> {
439        // NOTE: cloning the record will not carry over excess capacity
440        // because the record only contains `Vec` currently.
441        match self.reader.read_byte_record(&mut self.record) {
442            Err(err) => Some(Err(err)),
443            Ok(true) => Some(Ok(self.record.clone())),
444            Ok(false) => None,
445        }
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use std::io::Cursor;
452
453    use super::*;
454
455    impl<R: Read> Reader<R> {
456        fn from_reader_no_headers(reader: R) -> Self {
457            ReaderBuilder::new().has_headers(false).from_reader(reader)
458        }
459    }
460
461    #[test]
462    fn test_read_byte_record() -> error::Result<()> {
463        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
464
465        let expected = vec![
466            brec!["name", "surname", "age"],
467            brec!["john", "landy, the \"everlasting\" bastard", "45"],
468            brec!["\"ok\"", "whatever", "dude"],
469            brec!["lucy", "rose", "67"],
470            brec!["jermaine", "jackson", "89"],
471            brec!["karine", "loucan", "52"],
472            brec!["rose", "glib", "12"],
473            brec!["guillaume", "plique", "42"],
474        ];
475
476        for capacity in [32usize, 4, 3, 2, 1] {
477            let mut reader = ReaderBuilder::with_capacity(capacity)
478                .has_headers(false)
479                .from_reader(Cursor::new(csv));
480
481            assert_eq!(
482                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
483                expected,
484            );
485        }
486
487        Ok(())
488    }
489
490    #[test]
491    fn test_strip_bom() -> error::Result<()> {
492        let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
493
494        assert_eq!(
495            reader.byte_records().next().unwrap()?,
496            brec!["name", "surname", "age"]
497        );
498
499        let mut reader =
500            Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
501
502        assert_eq!(
503            reader.byte_records().next().unwrap()?,
504            brec!["name", "surname", "age"]
505        );
506
507        Ok(())
508    }
509
510    #[test]
511    fn test_empty_row() -> error::Result<()> {
512        let data = "name\n\"\"\nlucy\n\"\"";
513
514        // Read
515        let reader = Reader::from_reader_no_headers(Cursor::new(data));
516
517        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
518
519        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
520
521        assert_eq!(records, expected);
522
523        Ok(())
524    }
525
526    #[test]
527    fn test_crlf() -> error::Result<()> {
528        let reader = Reader::from_reader_no_headers(Cursor::new(
529            "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
530        ));
531
532        let expected = vec![
533            brec!["name", "surname"],
534            brec!["lucy", "john"],
535            brec!["evan", "zhong"],
536            brec!["béatrice", "glougou"],
537        ];
538
539        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
540
541        assert_eq!(records, expected);
542
543        Ok(())
544    }
545
546    #[test]
547    fn test_quote_always() -> error::Result<()> {
548        let reader = Reader::from_reader_no_headers(Cursor::new(
549            "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
550        ));
551
552        let expected = vec![
553            brec!["name", "surname"],
554            brec!["lucy", "rose"],
555            brec!["john", "mayhew"],
556        ];
557
558        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
559
560        assert_eq!(records, expected);
561
562        Ok(())
563    }
564
565    #[test]
566    fn test_byte_headers() -> error::Result<()> {
567        let data = b"name,surname\njohn,dandy";
568
569        // Headers, call before read
570        let mut reader = Reader::from_reader(Cursor::new(data));
571        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
572        assert_eq!(
573            reader.byte_records().next().unwrap()?,
574            brec!["john", "dandy"]
575        );
576
577        // Headers, call after read
578        let mut reader = Reader::from_reader(Cursor::new(data));
579        assert_eq!(
580            reader.byte_records().next().unwrap()?,
581            brec!["john", "dandy"]
582        );
583        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
584
585        // No headers, call before read
586        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
587        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
588        assert_eq!(
589            reader.byte_records().next().unwrap()?,
590            brec!["name", "surname"]
591        );
592
593        // No headers, call after read
594        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
595        assert_eq!(
596            reader.byte_records().next().unwrap()?,
597            brec!["name", "surname"]
598        );
599        assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
600
601        // Headers, empty
602        let mut reader = Reader::from_reader(Cursor::new(b""));
603        assert_eq!(reader.byte_headers()?, &brec![]);
604        assert!(reader.byte_records().next().is_none());
605
606        // No headers, empty
607        let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
608        assert_eq!(reader.byte_headers()?, &brec![]);
609        assert!(reader.byte_records().next().is_none());
610
611        Ok(())
612    }
613
614    #[test]
615    fn test_weirdness() -> error::Result<()> {
616        // Data after quotes, before next delimiter
617        let data =
618            b"name,surname\n\"test\"  \"wat\", ok\ntest \"wat\",ok  \ntest,\"whatever\"  ok\n\"test\"   there,\"ok\"\r\n";
619        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
620
621        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
622
623        let expected = vec![
624            brec!["name", "surname"],
625            brec!["test  \"wat", " ok"],
626            brec!["test \"wat", "ok  "],
627            brec!["test", "whatever  ok"],
628            brec!["test   there", "ok"],
629        ];
630
631        assert_eq!(records, expected);
632
633        // let data = "aaa\"aaa,bbb";
634        // let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
635        // let record = reader.byte_records().next().unwrap().unwrap();
636
637        // assert_eq!(record, brec!["aaa\"aaa", "bbb"]);
638
639        let data = b"name,surname\n\r\rjohn,coucou";
640        let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
641        let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
642
643        assert_eq!(
644            records,
645            vec![brec!["name", "surname"], brec!["john", "coucou"]]
646        );
647
648        Ok(())
649    }
650
651    #[test]
652    fn test_position() -> error::Result<()> {
653        let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
654
655        let mut reader = Reader::from_reader(&data[..]);
656        let mut record = ByteRecord::new();
657
658        let mut positions = vec![reader.position()];
659
660        reader.byte_headers()?;
661
662        positions.push(reader.position());
663
664        while reader.read_byte_record(&mut record)? {
665            positions.push(reader.position());
666        }
667
668        assert_eq!(positions, vec![0, 13, 32, 54]);
669
670        let mut reader = ReaderBuilder::new()
671            .has_headers(false)
672            .from_reader(&data[..]);
673
674        reader.byte_headers()?;
675
676        assert_eq!(reader.position(), 0);
677
678        Ok(())
679    }
680
681    #[test]
682    fn test_reverse_reader() -> error::Result<()> {
683        let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
684        let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
685
686        assert_eq!(
687            reader.byte_records().collect::<Result<Vec<_>, _>>()?,
688            vec![
689                brec!["evan", "michalak"],
690                brec!["beatrice", "babka"],
691                brec!["john", "landis"]
692            ]
693        );
694
695        assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
696
697        Ok(())
698    }
699}