1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder, StringRecord};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
11 delimiter: u8,
12 quote: u8,
13 buffer_capacity: usize,
14 flexible: bool,
15 has_headers: bool,
16}
17
18impl Default for ReaderBuilder {
19 fn default() -> Self {
20 Self {
21 delimiter: b',',
22 quote: b'"',
23 buffer_capacity: 8192,
24 flexible: false,
25 has_headers: true,
26 }
27 }
28}
29
30impl ReaderBuilder {
31 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn with_capacity(capacity: usize) -> Self {
38 let mut reader = Self::default();
39 reader.buffer_capacity(capacity);
40 reader
41 }
42
43 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
49 self.delimiter = delimiter;
50 self
51 }
52
53 pub fn quote(&mut self, quote: u8) -> &mut Self {
59 self.quote = quote;
60 self
61 }
62
63 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
65 self.buffer_capacity = capacity;
66 self
67 }
68
69 pub fn flexible(&mut self, yes: bool) -> &mut Self {
75 self.flexible = yes;
76 self
77 }
78
79 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
83 self.has_headers = yes;
84 self
85 }
86
87 pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
90 Reader {
91 buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
92 inner: CoreReader::new(self.delimiter, self.quote),
93 flexible: self.flexible,
94 headers: ByteRecord::new(),
95 has_read: false,
96 must_reemit_headers: !self.has_headers,
97 has_headers: self.has_headers,
98 index: 0,
99 }
100 }
101
102 pub fn reverse_from_reader<R: Read + Seek>(
105 &self,
106 mut reader: R,
107 ) -> error::Result<ReverseReader<R>> {
108 let initial_pos = reader.stream_position()?;
109
110 let mut forward_reader = self.from_reader(reader);
111 let headers = forward_reader.byte_headers()?.clone();
112 let position_after_headers = forward_reader.position();
113
114 let mut reader = forward_reader.into_inner();
115
116 let file_len = reader.seek(SeekFrom::End(0))?;
117
118 let offset = if self.has_headers {
119 initial_pos + position_after_headers
120 } else {
121 initial_pos
122 };
123
124 let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
125
126 Ok(ReverseReader {
127 buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
128 inner: CoreReader::new(self.delimiter, self.quote),
129 flexible: self.flexible,
130 headers,
131 })
132 }
133}
134
135pub struct Reader<R> {
143 buffer: BufReaderWithPosition<R>,
144 inner: CoreReader,
145 flexible: bool,
146 headers: ByteRecord,
147 has_read: bool,
148 must_reemit_headers: bool,
149 has_headers: bool,
150 index: u64,
151}
152
153impl<R: Read> Reader<R> {
154 pub fn from_reader(reader: R) -> Self {
157 ReaderBuilder::new().from_reader(reader)
158 }
159
160 #[inline]
161 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
162 if self.flexible {
163 return Ok(());
164 }
165
166 if self.has_read && written != self.headers.len() {
167 return Err(Error::new(ErrorKind::UnequalLengths {
168 expected_len: self.headers.len(),
169 len: written,
170 pos: Some((
171 byte,
172 self.index
173 .saturating_sub(if self.has_headers { 1 } else { 0 }),
174 )),
175 }));
176 }
177
178 Ok(())
179 }
180
181 fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
182 use ReadResult::*;
183
184 record.clear();
185
186 let mut record_builder = ByteRecordBuilder::wrap(record);
187 let byte = self.position();
188
189 loop {
190 let input = self.buffer.fill_buf()?;
191
192 let (result, pos) = self.inner.read_record(input, &mut record_builder);
193
194 self.buffer.consume(pos);
195
196 match result {
197 End => {
198 return Ok(false);
199 }
200 Cr | Lf | InputEmpty => {
201 continue;
202 }
203 Record => {
204 self.index += 1;
205 self.check_field_count(byte, record.len())?;
206 return Ok(true);
207 }
208 };
209 }
210 }
211
212 #[inline]
213 fn on_first_read(&mut self) -> error::Result<()> {
214 if self.has_read {
215 return Ok(());
216 }
217
218 let input = self.buffer.fill_buf()?;
220 let bom_len = trim_bom(input);
221 self.buffer.consume(bom_len);
222
223 let mut headers = ByteRecord::new();
225
226 let has_data = self.read_byte_record_impl(&mut headers)?;
227
228 if !has_data {
229 self.must_reemit_headers = false;
230 }
231
232 self.headers = headers;
233 self.has_read = true;
234
235 Ok(())
236 }
237
238 #[inline]
241 pub fn has_headers(&self) -> bool {
242 self.has_headers
243 }
244
245 #[inline]
247 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
248 self.on_first_read()?;
249
250 Ok(&self.headers)
251 }
252
253 #[inline(always)]
258 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
259 self.on_first_read()?;
260
261 if self.must_reemit_headers {
262 self.headers.clone_into(record);
263 self.must_reemit_headers = false;
264 return Ok(true);
265 }
266
267 self.read_byte_record_impl(record)
268 }
269
270 pub fn read_record(&mut self, record: &mut StringRecord) -> error::Result<bool> {
271 if self.read_byte_record(record.as_inner_mut())? {
272 if !record.validate_utf8() {
273 Err(Error::new(ErrorKind::Utf8Error))
274 } else {
275 Ok(true)
276 }
277 } else {
278 Ok(false)
279 }
280 }
281
282 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
284 ByteRecordsIter {
285 reader: self,
286 record: ByteRecord::new(),
287 }
288 }
289
290 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
292 ByteRecordsIntoIter {
293 reader: self,
294 record: ByteRecord::new(),
295 }
296 }
297
298 pub fn records(&mut self) -> StringRecordsIter<'_, R> {
300 StringRecordsIter {
301 reader: self,
302 record: StringRecord::new(),
303 }
304 }
305
306 pub fn into_records(self) -> StringRecordsIntoIter<R> {
308 StringRecordsIntoIter {
309 reader: self,
310 record: StringRecord::new(),
311 }
312 }
313
314 pub fn get_ref(&self) -> &R {
316 self.buffer.get_ref()
317 }
318
319 pub fn get_mut(&mut self) -> &mut R {
321 self.buffer.get_mut()
322 }
323
324 pub fn into_inner(self) -> R {
328 self.buffer.into_inner().into_inner()
329 }
330
331 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
336 (
337 self.must_reemit_headers.then_some(self.headers),
338 self.buffer.into_inner(),
339 )
340 }
341
342 #[inline(always)]
344 pub fn position(&self) -> u64 {
345 if self.must_reemit_headers {
346 0
347 } else {
348 self.buffer.position()
349 }
350 }
351}
352
353pub struct ByteRecordsIter<'r, R> {
354 reader: &'r mut Reader<R>,
355 record: ByteRecord,
356}
357
358impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
359 type Item = error::Result<ByteRecord>;
360
361 #[inline]
362 fn next(&mut self) -> Option<Self::Item> {
363 match self.reader.read_byte_record(&mut self.record) {
366 Err(err) => Some(Err(err)),
367 Ok(true) => Some(Ok(self.record.clone())),
368 Ok(false) => None,
369 }
370 }
371}
372
373pub struct ByteRecordsIntoIter<R> {
374 reader: Reader<R>,
375 record: ByteRecord,
376}
377
378impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
379 type Item = error::Result<ByteRecord>;
380
381 #[inline]
382 fn next(&mut self) -> Option<Self::Item> {
383 match self.reader.read_byte_record(&mut self.record) {
386 Err(err) => Some(Err(err)),
387 Ok(true) => Some(Ok(self.record.clone())),
388 Ok(false) => None,
389 }
390 }
391}
392
393pub struct StringRecordsIter<'r, R> {
394 reader: &'r mut Reader<R>,
395 record: StringRecord,
396}
397
398impl<R: Read> Iterator for StringRecordsIter<'_, R> {
399 type Item = error::Result<StringRecord>;
400
401 #[inline]
402 fn next(&mut self) -> Option<Self::Item> {
403 match self.reader.read_record(&mut self.record) {
406 Err(err) => Some(Err(err)),
407 Ok(true) => Some(Ok(self.record.clone())),
408 Ok(false) => None,
409 }
410 }
411}
412
413pub struct StringRecordsIntoIter<R> {
414 reader: Reader<R>,
415 record: StringRecord,
416}
417
418impl<R: Read> Iterator for StringRecordsIntoIter<R> {
419 type Item = error::Result<StringRecord>;
420
421 #[inline]
422 fn next(&mut self) -> Option<Self::Item> {
423 match self.reader.read_record(&mut self.record) {
426 Err(err) => Some(Err(err)),
427 Ok(true) => Some(Ok(self.record.clone())),
428 Ok(false) => None,
429 }
430 }
431}
432
433pub struct ReverseReader<R> {
441 inner: CoreReader,
442 buffer: BufReader<utils::ReverseReader<R>>,
443 flexible: bool,
444 headers: ByteRecord,
445}
446
447impl<R: Read + Seek> ReverseReader<R> {
448 pub fn from_reader(reader: R) -> error::Result<Self> {
452 ReaderBuilder::new().reverse_from_reader(reader)
453 }
454
455 pub fn byte_headers(&self) -> &ByteRecord {
457 &self.headers
458 }
459
460 #[inline]
461 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
462 if self.flexible {
463 return Ok(());
464 }
465
466 if written != self.headers.len() {
467 return Err(Error::new(ErrorKind::UnequalLengths {
468 expected_len: self.headers.len(),
469 len: written,
470 pos: None,
471 }));
472 }
473
474 Ok(())
475 }
476
477 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
482 use ReadResult::*;
483
484 record.clear();
485
486 let mut record_builder = ByteRecordBuilder::wrap(record);
487
488 loop {
489 let input = self.buffer.fill_buf()?;
490
491 let (result, pos) = self.inner.read_record(input, &mut record_builder);
492
493 self.buffer.consume(pos);
494
495 match result {
496 End => {
497 return Ok(false);
498 }
499 Cr | Lf | InputEmpty => {
500 continue;
501 }
502 Record => {
503 self.check_field_count(record.len())?;
504 record.reverse();
505 return Ok(true);
506 }
507 };
508 }
509 }
510
511 pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
513 ReverseByteRecordsIter {
514 reader: self,
515 record: ByteRecord::new(),
516 }
517 }
518
519 pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
521 ReverseByteRecordsIntoIter {
522 reader: self,
523 record: ByteRecord::new(),
524 }
525 }
526}
527
528pub struct ReverseByteRecordsIter<'r, R> {
529 reader: &'r mut ReverseReader<R>,
530 record: ByteRecord,
531}
532
533impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
534 type Item = error::Result<ByteRecord>;
535
536 #[inline]
537 fn next(&mut self) -> Option<Self::Item> {
538 match self.reader.read_byte_record(&mut self.record) {
541 Err(err) => Some(Err(err)),
542 Ok(true) => Some(Ok(self.record.clone())),
543 Ok(false) => None,
544 }
545 }
546}
547
548pub struct ReverseByteRecordsIntoIter<R> {
549 reader: ReverseReader<R>,
550 record: ByteRecord,
551}
552
553impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
554 type Item = error::Result<ByteRecord>;
555
556 #[inline]
557 fn next(&mut self) -> Option<Self::Item> {
558 match self.reader.read_byte_record(&mut self.record) {
561 Err(err) => Some(Err(err)),
562 Ok(true) => Some(Ok(self.record.clone())),
563 Ok(false) => None,
564 }
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use std::io::Cursor;
571
572 use super::*;
573
574 impl<R: Read> Reader<R> {
575 fn from_reader_no_headers(reader: R) -> Self {
576 ReaderBuilder::new().has_headers(false).from_reader(reader)
577 }
578 }
579
580 #[test]
581 fn test_read_byte_record() -> error::Result<()> {
582 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
583
584 let expected = vec![
585 brec!["name", "surname", "age"],
586 brec!["john", "landy, the \"everlasting\" bastard", "45"],
587 brec!["\"ok\"", "whatever", "dude"],
588 brec!["lucy", "rose", "67"],
589 brec!["jermaine", "jackson", "89"],
590 brec!["karine", "loucan", "52"],
591 brec!["rose", "glib", "12"],
592 brec!["guillaume", "plique", "42"],
593 ];
594
595 for capacity in [32usize, 4, 3, 2, 1] {
596 let mut reader = ReaderBuilder::with_capacity(capacity)
597 .has_headers(false)
598 .from_reader(Cursor::new(csv));
599
600 assert_eq!(
601 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
602 expected,
603 );
604 }
605
606 Ok(())
607 }
608
609 #[test]
610 fn test_read_record() -> error::Result<()> {
611 let csv =
612 "french,chinese\nReine-Mère de l'Ouest,西王母\nEmpereur du Pic de l'Est,东华帝君\r\n";
613
614 let expected = vec![
615 srec!["french", "chinese"],
616 srec!["Reine-Mère de l'Ouest", "西王母"],
617 srec!["Empereur du Pic de l'Est", "东华帝君"],
618 ];
619
620 for capacity in [32usize, 4, 3, 2, 1] {
621 let mut reader = ReaderBuilder::with_capacity(capacity)
622 .has_headers(false)
623 .from_reader(Cursor::new(csv));
624
625 assert_eq!(reader.records().collect::<Result<Vec<_>, _>>()?, expected,);
626 }
627
628 Ok(())
629 }
630
631 #[test]
632 fn test_strip_bom() -> error::Result<()> {
633 let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
634
635 assert_eq!(
636 reader.byte_records().next().unwrap()?,
637 brec!["name", "surname", "age"]
638 );
639
640 let mut reader =
641 Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
642
643 assert_eq!(
644 reader.byte_records().next().unwrap()?,
645 brec!["name", "surname", "age"]
646 );
647
648 Ok(())
649 }
650
651 #[test]
652 fn test_empty_row() -> error::Result<()> {
653 let data = "name\n\"\"\nlucy\n\"\"";
654
655 let reader = Reader::from_reader_no_headers(Cursor::new(data));
657
658 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
659
660 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
661
662 assert_eq!(records, expected);
663
664 Ok(())
665 }
666
667 #[test]
668 fn test_crlf() -> error::Result<()> {
669 let reader = Reader::from_reader_no_headers(Cursor::new(
670 "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
671 ));
672
673 let expected = vec![
674 brec!["name", "surname"],
675 brec!["lucy", "john"],
676 brec!["evan", "zhong"],
677 brec!["béatrice", "glougou"],
678 ];
679
680 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
681
682 assert_eq!(records, expected);
683
684 Ok(())
685 }
686
687 #[test]
688 fn test_quote_always() -> error::Result<()> {
689 let reader = Reader::from_reader_no_headers(Cursor::new(
690 "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
691 ));
692
693 let expected = vec![
694 brec!["name", "surname"],
695 brec!["lucy", "rose"],
696 brec!["john", "mayhew"],
697 ];
698
699 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
700
701 assert_eq!(records, expected);
702
703 Ok(())
704 }
705
706 #[test]
707 fn test_byte_headers() -> error::Result<()> {
708 let data = b"name,surname\njohn,dandy";
709
710 let mut reader = Reader::from_reader(Cursor::new(data));
712 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
713 assert_eq!(
714 reader.byte_records().next().unwrap()?,
715 brec!["john", "dandy"]
716 );
717
718 let mut reader = Reader::from_reader(Cursor::new(data));
720 assert_eq!(
721 reader.byte_records().next().unwrap()?,
722 brec!["john", "dandy"]
723 );
724 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
725
726 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
728 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
729 assert_eq!(
730 reader.byte_records().next().unwrap()?,
731 brec!["name", "surname"]
732 );
733
734 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
736 assert_eq!(
737 reader.byte_records().next().unwrap()?,
738 brec!["name", "surname"]
739 );
740 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
741
742 let mut reader = Reader::from_reader(Cursor::new(b""));
744 assert_eq!(reader.byte_headers()?, &brec![]);
745 assert!(reader.byte_records().next().is_none());
746
747 let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
749 assert_eq!(reader.byte_headers()?, &brec![]);
750 assert!(reader.byte_records().next().is_none());
751
752 Ok(())
753 }
754
755 #[test]
756 fn test_weirdness() -> error::Result<()> {
757 let data =
759 b"name,surname\n\"test\" \"wat\", ok\ntest \"wat\",ok \ntest,\"whatever\" ok\n\"test\" there,\"ok\"\r\n";
760 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
761
762 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
763
764 let expected = vec![
765 brec!["name", "surname"],
766 brec!["test \"wat", " ok"],
767 brec!["test \"wat", "ok "],
768 brec!["test", "whatever ok"],
769 brec!["test there", "ok"],
770 ];
771
772 assert_eq!(records, expected);
773
774 let data = b"name,surname\n\r\rjohn,coucou";
781 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
782 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
783
784 assert_eq!(
785 records,
786 vec![brec!["name", "surname"], brec!["john", "coucou"]]
787 );
788
789 Ok(())
790 }
791
792 #[test]
793 fn test_position() -> error::Result<()> {
794 let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
795
796 let mut reader = Reader::from_reader(&data[..]);
797 let mut record = ByteRecord::new();
798
799 let mut positions = vec![reader.position()];
800
801 reader.byte_headers()?;
802
803 positions.push(reader.position());
804
805 while reader.read_byte_record(&mut record)? {
806 positions.push(reader.position());
807 }
808
809 assert_eq!(positions, vec![0, 13, 32, 54]);
810
811 let mut reader = ReaderBuilder::new()
812 .has_headers(false)
813 .from_reader(&data[..]);
814
815 reader.byte_headers()?;
816
817 assert_eq!(reader.position(), 0);
818
819 Ok(())
820 }
821
822 #[test]
823 fn test_reverse_reader() -> error::Result<()> {
824 let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
825 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
826
827 assert_eq!(
828 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
829 vec![
830 brec!["evan", "michalak"],
831 brec!["beatrice", "babka"],
832 brec!["john", "landis"]
833 ]
834 );
835
836 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
837
838 Ok(())
839 }
840
841 #[test]
842 fn test_weird_sequence() -> error::Result<()> {
843 let data = b"\r\r`\"\",\n,`\"\r\",\n";
844 let mut record = ByteRecord::new();
845 let mut reader = ReaderBuilder::new()
846 .flexible(true)
847 .has_headers(false)
848 .from_reader(&data[..]);
849
850 reader.read_byte_record(&mut record)?;
851 assert_eq!(record, brec!["`\"", ""]);
852
853 reader.read_byte_record(&mut record)?;
854
855 assert_eq!(record, brec!["", "\"\r", ""]);
856
857 Ok(())
858 }
859
860 #[test]
861 fn test_quoted_final_cr() -> error::Result<()> {
862 let csv = b"name,surname\n\"test\",\"\r\"\njohn,landis";
863
864 let expected = vec![
865 brec!["name", "surname"],
866 brec!["test", "\r"],
867 brec!["john", "landis"],
868 ];
869
870 for capacity in [32usize, 4, 3, 2, 1] {
871 let mut reader = ReaderBuilder::with_capacity(capacity)
872 .has_headers(false)
873 .from_reader(Cursor::new(csv));
874
875 assert_eq!(
876 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
877 expected,
878 );
879 }
880
881 Ok(())
882 }
883
884 }