1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9#[cfg(feature = "str")]
10use crate::records::StringRecord;
11
12pub struct ReaderBuilder {
14 delimiter: u8,
15 quote: u8,
16 buffer_capacity: usize,
17 flexible: bool,
18 has_headers: bool,
19}
20
21impl Default for ReaderBuilder {
22 fn default() -> Self {
23 Self {
24 delimiter: b',',
25 quote: b'"',
26 buffer_capacity: 8192,
27 flexible: false,
28 has_headers: true,
29 }
30 }
31}
32
33impl ReaderBuilder {
34 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn with_capacity(capacity: usize) -> Self {
41 let mut reader = Self::default();
42 reader.buffer_capacity(capacity);
43 reader
44 }
45
46 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
52 self.delimiter = delimiter;
53 self
54 }
55
56 pub fn quote(&mut self, quote: u8) -> &mut Self {
62 self.quote = quote;
63 self
64 }
65
66 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
68 self.buffer_capacity = capacity;
69 self
70 }
71
72 pub fn flexible(&mut self, yes: bool) -> &mut Self {
78 self.flexible = yes;
79 self
80 }
81
82 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
86 self.has_headers = yes;
87 self
88 }
89
90 pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
93 Reader {
94 buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
95 inner: CoreReader::new(self.delimiter, self.quote),
96 flexible: self.flexible,
97 headers: ByteRecord::new(),
98 has_read: false,
99 must_reemit_headers: !self.has_headers,
100 has_headers: self.has_headers,
101 index: 0,
102 }
103 }
104
105 pub fn reverse_from_reader<R: Read + Seek>(
108 &self,
109 mut reader: R,
110 ) -> error::Result<ReverseReader<R>> {
111 let initial_pos = reader.stream_position()?;
112
113 let mut forward_reader = self.from_reader(reader);
114 let headers = forward_reader.byte_headers()?.clone();
115 let position_after_headers = forward_reader.position();
116
117 let mut reader = forward_reader.into_inner();
118
119 let file_len = reader.seek(SeekFrom::End(0))?;
120
121 let offset = if self.has_headers {
122 initial_pos + position_after_headers
123 } else {
124 initial_pos
125 };
126
127 let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
128
129 Ok(ReverseReader {
130 buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
131 inner: CoreReader::new(self.delimiter, self.quote),
132 flexible: self.flexible,
133 headers,
134 })
135 }
136}
137
138pub struct Reader<R> {
146 buffer: BufReaderWithPosition<R>,
147 inner: CoreReader,
148 flexible: bool,
149 headers: ByteRecord,
150 has_read: bool,
151 must_reemit_headers: bool,
152 has_headers: bool,
153 index: u64,
154}
155
156impl<R: Read> Reader<R> {
157 pub fn from_reader(reader: R) -> Self {
160 ReaderBuilder::new().from_reader(reader)
161 }
162
163 #[inline]
164 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
165 if self.flexible {
166 return Ok(());
167 }
168
169 if self.has_read && written != self.headers.len() {
170 return Err(Error::new(ErrorKind::UnequalLengths {
171 expected_len: self.headers.len(),
172 len: written,
173 pos: Some((
174 byte,
175 self.index
176 .saturating_sub(if self.has_headers { 1 } else { 0 }),
177 )),
178 }));
179 }
180
181 Ok(())
182 }
183
184 fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
185 use ReadResult::*;
186
187 record.clear();
188
189 let mut record_builder = ByteRecordBuilder::wrap(record);
190 let byte = self.position();
191
192 loop {
193 let input = self.buffer.fill_buf()?;
194
195 let (result, pos) = self.inner.read_record(input, &mut record_builder);
196
197 self.buffer.consume(pos);
198
199 match result {
200 End => {
201 return Ok(false);
202 }
203 Cr | Lf | InputEmpty => {
204 continue;
205 }
206 Record => {
207 self.index += 1;
208 self.check_field_count(byte, record.len())?;
209 return Ok(true);
210 }
211 };
212 }
213 }
214
215 #[inline]
216 fn on_first_read(&mut self) -> error::Result<()> {
217 if self.has_read {
218 return Ok(());
219 }
220
221 let input = self.buffer.fill_buf()?;
223 let bom_len = trim_bom(input);
224 self.buffer.consume(bom_len);
225
226 let mut headers = ByteRecord::new();
228
229 let has_data = self.read_byte_record_impl(&mut headers)?;
230
231 if !has_data {
232 self.must_reemit_headers = false;
233 }
234
235 self.headers = headers;
236 self.has_read = true;
237
238 Ok(())
239 }
240
241 #[inline]
244 pub fn has_headers(&self) -> bool {
245 self.has_headers
246 }
247
248 #[inline]
250 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
251 self.on_first_read()?;
252
253 Ok(&self.headers)
254 }
255
256 #[inline(always)]
261 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
262 self.on_first_read()?;
263
264 if self.must_reemit_headers {
265 self.headers.clone_into(record);
266 self.must_reemit_headers = false;
267 return Ok(true);
268 }
269
270 self.read_byte_record_impl(record)
271 }
272
273 #[cfg(feature = "str")]
274 pub fn read_record(&mut self, record: &mut StringRecord) -> error::Result<bool> {
275 if self.read_byte_record(record.as_inner_mut())? {
276 if !record.validate_utf8() {
277 Err(Error::new(ErrorKind::Utf8Error))
278 } else {
279 Ok(true)
280 }
281 } else {
282 Ok(false)
283 }
284 }
285
286 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
288 ByteRecordsIter {
289 reader: self,
290 record: ByteRecord::new(),
291 }
292 }
293
294 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
296 ByteRecordsIntoIter {
297 reader: self,
298 record: ByteRecord::new(),
299 }
300 }
301
302 #[cfg(feature = "str")]
304 pub fn records(&mut self) -> StringRecordsIter<'_, R> {
305 StringRecordsIter {
306 reader: self,
307 record: StringRecord::new(),
308 }
309 }
310
311 #[cfg(feature = "str")]
313 pub fn into_records(self) -> StringRecordsIntoIter<R> {
314 StringRecordsIntoIter {
315 reader: self,
316 record: StringRecord::new(),
317 }
318 }
319
320 pub fn get_ref(&self) -> &R {
322 self.buffer.get_ref()
323 }
324
325 pub fn get_mut(&mut self) -> &mut R {
327 self.buffer.get_mut()
328 }
329
330 pub fn into_inner(self) -> R {
334 self.buffer.into_inner().into_inner()
335 }
336
337 #[inline(always)]
339 pub fn position(&self) -> u64 {
340 if self.must_reemit_headers {
341 0
342 } else {
343 self.buffer.position()
344 }
345 }
346}
347
348pub struct ByteRecordsIter<'r, R> {
349 reader: &'r mut Reader<R>,
350 record: ByteRecord,
351}
352
353impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
354 type Item = error::Result<ByteRecord>;
355
356 #[inline]
357 fn next(&mut self) -> Option<Self::Item> {
358 match self.reader.read_byte_record(&mut self.record) {
361 Err(err) => Some(Err(err)),
362 Ok(true) => Some(Ok(self.record.clone())),
363 Ok(false) => None,
364 }
365 }
366}
367
368pub struct ByteRecordsIntoIter<R> {
369 reader: Reader<R>,
370 record: ByteRecord,
371}
372
373impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
374 type Item = error::Result<ByteRecord>;
375
376 #[inline]
377 fn next(&mut self) -> Option<Self::Item> {
378 match self.reader.read_byte_record(&mut self.record) {
381 Err(err) => Some(Err(err)),
382 Ok(true) => Some(Ok(self.record.clone())),
383 Ok(false) => None,
384 }
385 }
386}
387
388#[cfg(feature = "str")]
389pub struct StringRecordsIter<'r, R> {
390 reader: &'r mut Reader<R>,
391 record: StringRecord,
392}
393
394#[cfg(feature = "str")]
395impl<R: Read> Iterator for StringRecordsIter<'_, R> {
396 type Item = error::Result<StringRecord>;
397
398 #[inline]
399 fn next(&mut self) -> Option<Self::Item> {
400 match self.reader.read_record(&mut self.record) {
403 Err(err) => Some(Err(err)),
404 Ok(true) => Some(Ok(self.record.clone())),
405 Ok(false) => None,
406 }
407 }
408}
409
410#[cfg(feature = "str")]
411pub struct StringRecordsIntoIter<R> {
412 reader: Reader<R>,
413 record: StringRecord,
414}
415
416#[cfg(feature = "str")]
417impl<R: Read> Iterator for StringRecordsIntoIter<R> {
418 type Item = error::Result<StringRecord>;
419
420 #[inline]
421 fn next(&mut self) -> Option<Self::Item> {
422 match self.reader.read_record(&mut self.record) {
425 Err(err) => Some(Err(err)),
426 Ok(true) => Some(Ok(self.record.clone())),
427 Ok(false) => None,
428 }
429 }
430}
431
432pub struct ReverseReader<R> {
440 inner: CoreReader,
441 buffer: BufReader<utils::ReverseReader<R>>,
442 flexible: bool,
443 headers: ByteRecord,
444}
445
446impl<R: Read + Seek> ReverseReader<R> {
447 pub fn from_reader(reader: R) -> error::Result<Self> {
451 ReaderBuilder::new().reverse_from_reader(reader)
452 }
453
454 pub fn byte_headers(&self) -> &ByteRecord {
456 &self.headers
457 }
458
459 #[inline]
460 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
461 if self.flexible {
462 return Ok(());
463 }
464
465 if written != self.headers.len() {
466 return Err(Error::new(ErrorKind::UnequalLengths {
467 expected_len: self.headers.len(),
468 len: written,
469 pos: None,
470 }));
471 }
472
473 Ok(())
474 }
475
476 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
481 use ReadResult::*;
482
483 record.clear();
484
485 let mut record_builder = ByteRecordBuilder::wrap(record);
486
487 loop {
488 let input = self.buffer.fill_buf()?;
489
490 let (result, pos) = self.inner.read_record(input, &mut record_builder);
491
492 self.buffer.consume(pos);
493
494 match result {
495 End => {
496 return Ok(false);
497 }
498 Cr | Lf | InputEmpty => {
499 continue;
500 }
501 Record => {
502 self.check_field_count(record.len())?;
503 record.reverse();
504 return Ok(true);
505 }
506 };
507 }
508 }
509
510 pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
512 ReverseByteRecordsIter {
513 reader: self,
514 record: ByteRecord::new(),
515 }
516 }
517
518 pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
520 ReverseByteRecordsIntoIter {
521 reader: self,
522 record: ByteRecord::new(),
523 }
524 }
525}
526
527pub struct ReverseByteRecordsIter<'r, R> {
528 reader: &'r mut ReverseReader<R>,
529 record: ByteRecord,
530}
531
532impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
533 type Item = error::Result<ByteRecord>;
534
535 #[inline]
536 fn next(&mut self) -> Option<Self::Item> {
537 match self.reader.read_byte_record(&mut self.record) {
540 Err(err) => Some(Err(err)),
541 Ok(true) => Some(Ok(self.record.clone())),
542 Ok(false) => None,
543 }
544 }
545}
546
547pub struct ReverseByteRecordsIntoIter<R> {
548 reader: ReverseReader<R>,
549 record: ByteRecord,
550}
551
552impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
553 type Item = error::Result<ByteRecord>;
554
555 #[inline]
556 fn next(&mut self) -> Option<Self::Item> {
557 match self.reader.read_byte_record(&mut self.record) {
560 Err(err) => Some(Err(err)),
561 Ok(true) => Some(Ok(self.record.clone())),
562 Ok(false) => None,
563 }
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use std::io::Cursor;
570
571 use super::*;
572
573 impl<R: Read> Reader<R> {
574 fn from_reader_no_headers(reader: R) -> Self {
575 ReaderBuilder::new().has_headers(false).from_reader(reader)
576 }
577 }
578
579 #[test]
580 fn test_read_byte_record() -> error::Result<()> {
581 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
582
583 let expected = vec![
584 brec!["name", "surname", "age"],
585 brec!["john", "landy, the \"everlasting\" bastard", "45"],
586 brec!["\"ok\"", "whatever", "dude"],
587 brec!["lucy", "rose", "67"],
588 brec!["jermaine", "jackson", "89"],
589 brec!["karine", "loucan", "52"],
590 brec!["rose", "glib", "12"],
591 brec!["guillaume", "plique", "42"],
592 ];
593
594 for capacity in [32usize, 4, 3, 2, 1] {
595 let mut reader = ReaderBuilder::with_capacity(capacity)
596 .has_headers(false)
597 .from_reader(Cursor::new(csv));
598
599 assert_eq!(
600 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
601 expected,
602 );
603 }
604
605 Ok(())
606 }
607
608 #[test]
609 #[cfg(feature = "str")]
610 fn test_read_record() -> error::Result<()> {
611 let csv =
612 "french,chinese\nReine-Mère de l'Ouest,西王母\nEmpereur du Pic de l'Est,东华帝君\r\n";
613
614 let expected = vec![
615 srec!["french", "chinese"],
616 srec!["Reine-Mère de l'Ouest", "西王母"],
617 srec!["Empereur du Pic de l'Est", "东华帝君"],
618 ];
619
620 for capacity in [32usize, 4, 3, 2, 1] {
621 let mut reader = ReaderBuilder::with_capacity(capacity)
622 .has_headers(false)
623 .from_reader(Cursor::new(csv));
624
625 assert_eq!(reader.records().collect::<Result<Vec<_>, _>>()?, expected,);
626 }
627
628 Ok(())
629 }
630
631 #[test]
632 fn test_strip_bom() -> error::Result<()> {
633 let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
634
635 assert_eq!(
636 reader.byte_records().next().unwrap()?,
637 brec!["name", "surname", "age"]
638 );
639
640 let mut reader =
641 Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
642
643 assert_eq!(
644 reader.byte_records().next().unwrap()?,
645 brec!["name", "surname", "age"]
646 );
647
648 Ok(())
649 }
650
651 #[test]
652 fn test_empty_row() -> error::Result<()> {
653 let data = "name\n\"\"\nlucy\n\"\"";
654
655 let reader = Reader::from_reader_no_headers(Cursor::new(data));
657
658 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
659
660 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
661
662 assert_eq!(records, expected);
663
664 Ok(())
665 }
666
667 #[test]
668 fn test_crlf() -> error::Result<()> {
669 let reader = Reader::from_reader_no_headers(Cursor::new(
670 "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
671 ));
672
673 let expected = vec![
674 brec!["name", "surname"],
675 brec!["lucy", "john"],
676 brec!["evan", "zhong"],
677 brec!["béatrice", "glougou"],
678 ];
679
680 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
681
682 assert_eq!(records, expected);
683
684 Ok(())
685 }
686
687 #[test]
688 fn test_quote_always() -> error::Result<()> {
689 let reader = Reader::from_reader_no_headers(Cursor::new(
690 "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
691 ));
692
693 let expected = vec![
694 brec!["name", "surname"],
695 brec!["lucy", "rose"],
696 brec!["john", "mayhew"],
697 ];
698
699 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
700
701 assert_eq!(records, expected);
702
703 Ok(())
704 }
705
706 #[test]
707 fn test_byte_headers() -> error::Result<()> {
708 let data = b"name,surname\njohn,dandy";
709
710 let mut reader = Reader::from_reader(Cursor::new(data));
712 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
713 assert_eq!(
714 reader.byte_records().next().unwrap()?,
715 brec!["john", "dandy"]
716 );
717
718 let mut reader = Reader::from_reader(Cursor::new(data));
720 assert_eq!(
721 reader.byte_records().next().unwrap()?,
722 brec!["john", "dandy"]
723 );
724 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
725
726 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
728 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
729 assert_eq!(
730 reader.byte_records().next().unwrap()?,
731 brec!["name", "surname"]
732 );
733
734 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
736 assert_eq!(
737 reader.byte_records().next().unwrap()?,
738 brec!["name", "surname"]
739 );
740 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
741
742 let mut reader = Reader::from_reader(Cursor::new(b""));
744 assert_eq!(reader.byte_headers()?, &brec![]);
745 assert!(reader.byte_records().next().is_none());
746
747 let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
749 assert_eq!(reader.byte_headers()?, &brec![]);
750 assert!(reader.byte_records().next().is_none());
751
752 Ok(())
753 }
754
755 #[test]
756 fn test_weirdness() -> error::Result<()> {
757 let data =
759 b"name,surname\n\"test\" \"wat\", ok\ntest \"wat\",ok \ntest,\"whatever\" ok\n\"test\" there,\"ok\"\r\n";
760 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
761
762 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
763
764 let expected = vec![
765 brec!["name", "surname"],
766 brec!["test \"wat", " ok"],
767 brec!["test \"wat", "ok "],
768 brec!["test", "whatever ok"],
769 brec!["test there", "ok"],
770 ];
771
772 assert_eq!(records, expected);
773
774 let data = b"name,surname\n\r\rjohn,coucou";
781 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
782 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
783
784 assert_eq!(
785 records,
786 vec![brec!["name", "surname"], brec!["john", "coucou"]]
787 );
788
789 Ok(())
790 }
791
792 #[test]
793 fn test_position() -> error::Result<()> {
794 let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
795
796 let mut reader = Reader::from_reader(&data[..]);
797 let mut record = ByteRecord::new();
798
799 let mut positions = vec![reader.position()];
800
801 reader.byte_headers()?;
802
803 positions.push(reader.position());
804
805 while reader.read_byte_record(&mut record)? {
806 positions.push(reader.position());
807 }
808
809 assert_eq!(positions, vec![0, 13, 32, 54]);
810
811 let mut reader = ReaderBuilder::new()
812 .has_headers(false)
813 .from_reader(&data[..]);
814
815 reader.byte_headers()?;
816
817 assert_eq!(reader.position(), 0);
818
819 Ok(())
820 }
821
822 #[test]
823 fn test_reverse_reader() -> error::Result<()> {
824 let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
825 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
826
827 assert_eq!(
828 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
829 vec![
830 brec!["evan", "michalak"],
831 brec!["beatrice", "babka"],
832 brec!["john", "landis"]
833 ]
834 );
835
836 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
837
838 Ok(())
839 }
840
841 #[test]
842 fn test_reverse_reader_crlf() -> error::Result<()> {
843 let data = b"name,surname\r\njohn,landis\r\nbeatrice,babka\r\nevan,michalak";
844 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
845
846 assert_eq!(
847 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
848 vec![
849 brec!["evan", "michalak"],
850 brec!["beatrice", "babka"],
851 brec!["john", "landis"]
852 ]
853 );
854
855 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
856
857 Ok(())
858 }
859
860 #[test]
861 fn test_weird_sequence() -> error::Result<()> {
862 let data = b"\r\r`\"\",\n,`\"\r\",\n";
863 let mut record = ByteRecord::new();
864 let mut reader = ReaderBuilder::new()
865 .flexible(true)
866 .has_headers(false)
867 .from_reader(&data[..]);
868
869 reader.read_byte_record(&mut record)?;
870 assert_eq!(record, brec!["`\"", ""]);
871
872 reader.read_byte_record(&mut record)?;
873
874 assert_eq!(record, brec!["", "\"\r", ""]);
875
876 Ok(())
877 }
878
879 #[test]
880 fn test_quoted_final_cr() -> error::Result<()> {
881 let csv = b"name,surname\n\"test\",\"\r\"\njohn,landis";
882
883 let expected = vec![
884 brec!["name", "surname"],
885 brec!["test", "\r"],
886 brec!["john", "landis"],
887 ];
888
889 for capacity in [32usize, 4, 3, 2, 1] {
890 let mut reader = ReaderBuilder::with_capacity(capacity)
891 .has_headers(false)
892 .from_reader(Cursor::new(csv));
893
894 assert_eq!(
895 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
896 expected,
897 );
898 }
899
900 Ok(())
901 }
902
903 }