1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
11 delimiter: u8,
12 quote: u8,
13 buffer_capacity: usize,
14 flexible: bool,
15 has_headers: bool,
16}
17
18impl Default for ReaderBuilder {
19 fn default() -> Self {
20 Self {
21 delimiter: b',',
22 quote: b'"',
23 buffer_capacity: 8192,
24 flexible: false,
25 has_headers: true,
26 }
27 }
28}
29
30impl ReaderBuilder {
31 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn with_capacity(capacity: usize) -> Self {
38 let mut reader = Self::default();
39 reader.buffer_capacity(capacity);
40 reader
41 }
42
43 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
49 self.delimiter = delimiter;
50 self
51 }
52
53 pub fn quote(&mut self, quote: u8) -> &mut Self {
59 self.quote = quote;
60 self
61 }
62
63 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
65 self.buffer_capacity = capacity;
66 self
67 }
68
69 pub fn flexible(&mut self, yes: bool) -> &mut Self {
75 self.flexible = yes;
76 self
77 }
78
79 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
83 self.has_headers = yes;
84 self
85 }
86
87 pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
90 Reader {
91 buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
92 inner: CoreReader::new(self.delimiter, self.quote),
93 flexible: self.flexible,
94 headers: ByteRecord::new(),
95 has_read: false,
96 must_reemit_headers: !self.has_headers,
97 has_headers: self.has_headers,
98 index: 0,
99 }
100 }
101
102 pub fn reverse_from_reader<R: Read + Seek>(
105 &self,
106 mut reader: R,
107 ) -> error::Result<ReverseReader<R>> {
108 let initial_pos = reader.stream_position()?;
109
110 let mut forward_reader = self.from_reader(reader);
111 let headers = forward_reader.byte_headers()?.clone();
112 let position_after_headers = forward_reader.position();
113
114 let mut reader = forward_reader.into_inner();
115
116 let file_len = reader.seek(SeekFrom::End(0))?;
117
118 let offset = if self.has_headers {
119 initial_pos + position_after_headers
120 } else {
121 initial_pos
122 };
123
124 let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
125
126 Ok(ReverseReader {
127 buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
128 inner: CoreReader::new(self.delimiter, self.quote),
129 flexible: self.flexible,
130 headers,
131 })
132 }
133}
134
135pub struct Reader<R> {
143 buffer: BufReaderWithPosition<R>,
144 inner: CoreReader,
145 flexible: bool,
146 headers: ByteRecord,
147 has_read: bool,
148 must_reemit_headers: bool,
149 has_headers: bool,
150 index: u64,
151}
152
153impl<R: Read> Reader<R> {
154 pub fn from_reader(reader: R) -> Self {
157 ReaderBuilder::new().from_reader(reader)
158 }
159
160 #[inline]
161 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
162 if self.flexible {
163 return Ok(());
164 }
165
166 if self.has_read && written != self.headers.len() {
167 return Err(Error::new(ErrorKind::UnequalLengths {
168 expected_len: self.headers.len(),
169 len: written,
170 pos: Some((
171 byte,
172 self.index
173 .saturating_sub(if self.has_headers { 1 } else { 0 }),
174 )),
175 }));
176 }
177
178 Ok(())
179 }
180
181 fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
182 use ReadResult::*;
183
184 record.clear();
185
186 let mut record_builder = ByteRecordBuilder::wrap(record);
187 let byte = self.position();
188
189 loop {
190 let input = self.buffer.fill_buf()?;
191
192 let (result, pos) = self.inner.read_record(input, &mut record_builder);
193
194 self.buffer.consume(pos);
195
196 match result {
197 End => {
198 return Ok(false);
199 }
200 Cr | Lf | InputEmpty => {
201 continue;
202 }
203 Record => {
204 self.index += 1;
205 self.check_field_count(byte, record.len())?;
206 return Ok(true);
207 }
208 };
209 }
210 }
211
212 #[inline]
213 fn on_first_read(&mut self) -> error::Result<()> {
214 if self.has_read {
215 return Ok(());
216 }
217
218 let input = self.buffer.fill_buf()?;
220 let bom_len = trim_bom(input);
221 self.buffer.consume(bom_len);
222
223 let mut headers = ByteRecord::new();
225
226 let has_data = self.read_byte_record_impl(&mut headers)?;
227
228 if !has_data {
229 self.must_reemit_headers = false;
230 }
231
232 self.headers = headers;
233 self.has_read = true;
234
235 Ok(())
236 }
237
238 #[inline]
241 pub fn has_headers(&self) -> bool {
242 self.has_headers
243 }
244
245 #[inline]
247 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
248 self.on_first_read()?;
249
250 Ok(&self.headers)
251 }
252
253 #[inline(always)]
258 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
259 self.on_first_read()?;
260
261 if self.must_reemit_headers {
262 self.headers.clone_into(record);
263 self.must_reemit_headers = false;
264 return Ok(true);
265 }
266
267 self.read_byte_record_impl(record)
268 }
269
270 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
272 ByteRecordsIter {
273 reader: self,
274 record: ByteRecord::new(),
275 }
276 }
277
278 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
280 ByteRecordsIntoIter {
281 reader: self,
282 record: ByteRecord::new(),
283 }
284 }
285
286 pub fn get_ref(&self) -> &R {
288 self.buffer.get_ref()
289 }
290
291 pub fn get_mut(&mut self) -> &mut R {
293 self.buffer.get_mut()
294 }
295
296 pub fn into_inner(self) -> R {
300 self.buffer.into_inner().into_inner()
301 }
302
303 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
308 (
309 self.must_reemit_headers.then_some(self.headers),
310 self.buffer.into_inner(),
311 )
312 }
313
314 #[inline(always)]
316 pub fn position(&self) -> u64 {
317 if self.must_reemit_headers {
318 0
319 } else {
320 self.buffer.position()
321 }
322 }
323}
324
325pub struct ByteRecordsIter<'r, R> {
326 reader: &'r mut Reader<R>,
327 record: ByteRecord,
328}
329
330impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
331 type Item = error::Result<ByteRecord>;
332
333 #[inline]
334 fn next(&mut self) -> Option<Self::Item> {
335 match self.reader.read_byte_record(&mut self.record) {
338 Err(err) => Some(Err(err)),
339 Ok(true) => Some(Ok(self.record.clone())),
340 Ok(false) => None,
341 }
342 }
343}
344
345pub struct ByteRecordsIntoIter<R> {
346 reader: Reader<R>,
347 record: ByteRecord,
348}
349
350impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
351 type Item = error::Result<ByteRecord>;
352
353 #[inline]
354 fn next(&mut self) -> Option<Self::Item> {
355 match self.reader.read_byte_record(&mut self.record) {
358 Err(err) => Some(Err(err)),
359 Ok(true) => Some(Ok(self.record.clone())),
360 Ok(false) => None,
361 }
362 }
363}
364
365pub struct ReverseReader<R> {
373 inner: CoreReader,
374 buffer: BufReader<utils::ReverseReader<R>>,
375 flexible: bool,
376 headers: ByteRecord,
377}
378
379impl<R: Read + Seek> ReverseReader<R> {
380 pub fn from_reader(reader: R) -> error::Result<Self> {
384 ReaderBuilder::new().reverse_from_reader(reader)
385 }
386
387 pub fn byte_headers(&self) -> &ByteRecord {
389 &self.headers
390 }
391
392 #[inline]
393 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
394 if self.flexible {
395 return Ok(());
396 }
397
398 if written != self.headers.len() {
399 return Err(Error::new(ErrorKind::UnequalLengths {
400 expected_len: self.headers.len(),
401 len: written,
402 pos: None,
403 }));
404 }
405
406 Ok(())
407 }
408
409 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
414 use ReadResult::*;
415
416 record.clear();
417
418 let mut record_builder = ByteRecordBuilder::wrap(record);
419
420 loop {
421 let input = self.buffer.fill_buf()?;
422
423 let (result, pos) = self.inner.read_record(input, &mut record_builder);
424
425 self.buffer.consume(pos);
426
427 match result {
428 End => {
429 return Ok(false);
430 }
431 Cr | Lf | InputEmpty => {
432 continue;
433 }
434 Record => {
435 self.check_field_count(record.len())?;
436 record.reverse();
437 return Ok(true);
438 }
439 };
440 }
441 }
442
443 pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
445 ReverseByteRecordsIter {
446 reader: self,
447 record: ByteRecord::new(),
448 }
449 }
450
451 pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
453 ReverseByteRecordsIntoIter {
454 reader: self,
455 record: ByteRecord::new(),
456 }
457 }
458}
459
460pub struct ReverseByteRecordsIter<'r, R> {
461 reader: &'r mut ReverseReader<R>,
462 record: ByteRecord,
463}
464
465impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
466 type Item = error::Result<ByteRecord>;
467
468 #[inline]
469 fn next(&mut self) -> Option<Self::Item> {
470 match self.reader.read_byte_record(&mut self.record) {
473 Err(err) => Some(Err(err)),
474 Ok(true) => Some(Ok(self.record.clone())),
475 Ok(false) => None,
476 }
477 }
478}
479
480pub struct ReverseByteRecordsIntoIter<R> {
481 reader: ReverseReader<R>,
482 record: ByteRecord,
483}
484
485impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
486 type Item = error::Result<ByteRecord>;
487
488 #[inline]
489 fn next(&mut self) -> Option<Self::Item> {
490 match self.reader.read_byte_record(&mut self.record) {
493 Err(err) => Some(Err(err)),
494 Ok(true) => Some(Ok(self.record.clone())),
495 Ok(false) => None,
496 }
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use std::io::Cursor;
503
504 use super::*;
505
506 impl<R: Read> Reader<R> {
507 fn from_reader_no_headers(reader: R) -> Self {
508 ReaderBuilder::new().has_headers(false).from_reader(reader)
509 }
510 }
511
512 #[test]
513 fn test_read_byte_record() -> error::Result<()> {
514 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
515
516 let expected = vec![
517 brec!["name", "surname", "age"],
518 brec!["john", "landy, the \"everlasting\" bastard", "45"],
519 brec!["\"ok\"", "whatever", "dude"],
520 brec!["lucy", "rose", "67"],
521 brec!["jermaine", "jackson", "89"],
522 brec!["karine", "loucan", "52"],
523 brec!["rose", "glib", "12"],
524 brec!["guillaume", "plique", "42"],
525 ];
526
527 for capacity in [32usize, 4, 3, 2, 1] {
528 let mut reader = ReaderBuilder::with_capacity(capacity)
529 .has_headers(false)
530 .from_reader(Cursor::new(csv));
531
532 assert_eq!(
533 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
534 expected,
535 );
536 }
537
538 Ok(())
539 }
540
541 #[test]
542 fn test_strip_bom() -> error::Result<()> {
543 let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
544
545 assert_eq!(
546 reader.byte_records().next().unwrap()?,
547 brec!["name", "surname", "age"]
548 );
549
550 let mut reader =
551 Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
552
553 assert_eq!(
554 reader.byte_records().next().unwrap()?,
555 brec!["name", "surname", "age"]
556 );
557
558 Ok(())
559 }
560
561 #[test]
562 fn test_empty_row() -> error::Result<()> {
563 let data = "name\n\"\"\nlucy\n\"\"";
564
565 let reader = Reader::from_reader_no_headers(Cursor::new(data));
567
568 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
569
570 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
571
572 assert_eq!(records, expected);
573
574 Ok(())
575 }
576
577 #[test]
578 fn test_crlf() -> error::Result<()> {
579 let reader = Reader::from_reader_no_headers(Cursor::new(
580 "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
581 ));
582
583 let expected = vec![
584 brec!["name", "surname"],
585 brec!["lucy", "john"],
586 brec!["evan", "zhong"],
587 brec!["béatrice", "glougou"],
588 ];
589
590 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
591
592 assert_eq!(records, expected);
593
594 Ok(())
595 }
596
597 #[test]
598 fn test_quote_always() -> error::Result<()> {
599 let reader = Reader::from_reader_no_headers(Cursor::new(
600 "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
601 ));
602
603 let expected = vec![
604 brec!["name", "surname"],
605 brec!["lucy", "rose"],
606 brec!["john", "mayhew"],
607 ];
608
609 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
610
611 assert_eq!(records, expected);
612
613 Ok(())
614 }
615
616 #[test]
617 fn test_byte_headers() -> error::Result<()> {
618 let data = b"name,surname\njohn,dandy";
619
620 let mut reader = Reader::from_reader(Cursor::new(data));
622 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
623 assert_eq!(
624 reader.byte_records().next().unwrap()?,
625 brec!["john", "dandy"]
626 );
627
628 let mut reader = Reader::from_reader(Cursor::new(data));
630 assert_eq!(
631 reader.byte_records().next().unwrap()?,
632 brec!["john", "dandy"]
633 );
634 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
635
636 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
638 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
639 assert_eq!(
640 reader.byte_records().next().unwrap()?,
641 brec!["name", "surname"]
642 );
643
644 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
646 assert_eq!(
647 reader.byte_records().next().unwrap()?,
648 brec!["name", "surname"]
649 );
650 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
651
652 let mut reader = Reader::from_reader(Cursor::new(b""));
654 assert_eq!(reader.byte_headers()?, &brec![]);
655 assert!(reader.byte_records().next().is_none());
656
657 let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
659 assert_eq!(reader.byte_headers()?, &brec![]);
660 assert!(reader.byte_records().next().is_none());
661
662 Ok(())
663 }
664
665 #[test]
666 fn test_weirdness() -> error::Result<()> {
667 let data =
669 b"name,surname\n\"test\" \"wat\", ok\ntest \"wat\",ok \ntest,\"whatever\" ok\n\"test\" there,\"ok\"\r\n";
670 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
671
672 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
673
674 let expected = vec![
675 brec!["name", "surname"],
676 brec!["test \"wat", " ok"],
677 brec!["test \"wat", "ok "],
678 brec!["test", "whatever ok"],
679 brec!["test there", "ok"],
680 ];
681
682 assert_eq!(records, expected);
683
684 let data = b"name,surname\n\r\rjohn,coucou";
691 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
692 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
693
694 assert_eq!(
695 records,
696 vec![brec!["name", "surname"], brec!["john", "coucou"]]
697 );
698
699 Ok(())
700 }
701
702 #[test]
703 fn test_position() -> error::Result<()> {
704 let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
705
706 let mut reader = Reader::from_reader(&data[..]);
707 let mut record = ByteRecord::new();
708
709 let mut positions = vec![reader.position()];
710
711 reader.byte_headers()?;
712
713 positions.push(reader.position());
714
715 while reader.read_byte_record(&mut record)? {
716 positions.push(reader.position());
717 }
718
719 assert_eq!(positions, vec![0, 13, 32, 54]);
720
721 let mut reader = ReaderBuilder::new()
722 .has_headers(false)
723 .from_reader(&data[..]);
724
725 reader.byte_headers()?;
726
727 assert_eq!(reader.position(), 0);
728
729 Ok(())
730 }
731
732 #[test]
733 fn test_reverse_reader() -> error::Result<()> {
734 let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
735 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
736
737 assert_eq!(
738 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
739 vec![
740 brec!["evan", "michalak"],
741 brec!["beatrice", "babka"],
742 brec!["john", "landis"]
743 ]
744 );
745
746 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
747
748 Ok(())
749 }
750}