1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
11 delimiter: u8,
12 quote: u8,
13 buffer_capacity: usize,
14 flexible: bool,
15 has_headers: bool,
16}
17
18impl Default for ReaderBuilder {
19 fn default() -> Self {
20 Self {
21 delimiter: b',',
22 quote: b'"',
23 buffer_capacity: 8192,
24 flexible: false,
25 has_headers: true,
26 }
27 }
28}
29
30impl ReaderBuilder {
31 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn with_capacity(capacity: usize) -> Self {
36 let mut reader = Self::default();
37 reader.buffer_capacity(capacity);
38 reader
39 }
40
41 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
42 self.delimiter = delimiter;
43 self
44 }
45
46 pub fn quote(&mut self, quote: u8) -> &mut Self {
47 self.quote = quote;
48 self
49 }
50
51 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
52 self.buffer_capacity = capacity;
53 self
54 }
55
56 pub fn flexible(&mut self, yes: bool) -> &mut Self {
57 self.flexible = yes;
58 self
59 }
60
61 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
62 self.has_headers = yes;
63 self
64 }
65
66 pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
67 Reader {
68 buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
69 inner: CoreReader::new(self.delimiter, self.quote),
70 flexible: self.flexible,
71 headers: ByteRecord::new(),
72 has_read: false,
73 must_reemit_headers: !self.has_headers,
74 has_headers: self.has_headers,
75 index: 0,
76 }
77 }
78
79 pub fn reverse_from_reader<R: Read + Seek>(
80 &self,
81 mut reader: R,
82 ) -> error::Result<ReverseReader<R>> {
83 let initial_pos = reader.stream_position()?;
84
85 let mut forward_reader = self.from_reader(reader);
86 let headers = forward_reader.byte_headers()?.clone();
87 let position_after_headers = forward_reader.position();
88
89 let mut reader = forward_reader.into_inner();
90
91 let file_len = reader.seek(SeekFrom::End(0))?;
92
93 let offset = if self.has_headers {
94 initial_pos + position_after_headers
95 } else {
96 initial_pos
97 };
98
99 let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
100
101 Ok(ReverseReader {
102 buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
103 inner: CoreReader::new(self.delimiter, self.quote),
104 flexible: self.flexible,
105 headers,
106 })
107 }
108}
109
110pub struct Reader<R> {
118 buffer: BufReaderWithPosition<R>,
119 inner: CoreReader,
120 flexible: bool,
121 headers: ByteRecord,
122 has_read: bool,
123 must_reemit_headers: bool,
124 has_headers: bool,
125 index: u64,
126}
127
128impl<R: Read> Reader<R> {
129 pub fn from_reader(reader: R) -> Self {
132 ReaderBuilder::new().from_reader(reader)
133 }
134
135 #[inline]
136 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
137 if self.flexible {
138 return Ok(());
139 }
140
141 if self.has_read && written != self.headers.len() {
142 return Err(Error::new(ErrorKind::UnequalLengths {
143 expected_len: self.headers.len(),
144 len: written,
145 pos: Some((
146 byte,
147 self.index
148 .saturating_sub(if self.has_headers { 1 } else { 0 }),
149 )),
150 }));
151 }
152
153 Ok(())
154 }
155
156 fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
157 use ReadResult::*;
158
159 record.clear();
160
161 let mut record_builder = ByteRecordBuilder::wrap(record);
162 let byte = self.position();
163
164 loop {
165 let input = self.buffer.fill_buf()?;
166
167 let (result, pos) = self.inner.read_record(input, &mut record_builder);
168
169 self.buffer.consume(pos);
170
171 match result {
172 End => {
173 return Ok(false);
174 }
175 Cr | Lf | InputEmpty => {
176 continue;
177 }
178 Record => {
179 self.index += 1;
180 self.check_field_count(byte, record.len())?;
181 return Ok(true);
182 }
183 };
184 }
185 }
186
187 #[inline]
188 fn on_first_read(&mut self) -> error::Result<()> {
189 if self.has_read {
190 return Ok(());
191 }
192
193 let input = self.buffer.fill_buf()?;
195 let bom_len = trim_bom(input);
196 self.buffer.consume(bom_len);
197
198 let mut headers = ByteRecord::new();
200
201 let has_data = self.read_byte_record_impl(&mut headers)?;
202
203 if !has_data {
204 self.must_reemit_headers = false;
205 }
206
207 self.headers = headers;
208 self.has_read = true;
209
210 Ok(())
211 }
212
213 #[inline]
216 pub fn has_headers(&self) -> bool {
217 self.has_headers
218 }
219
220 #[inline]
221 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
222 self.on_first_read()?;
223
224 Ok(&self.headers)
225 }
226
227 #[inline(always)]
228 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
229 self.on_first_read()?;
230
231 if self.must_reemit_headers {
232 self.headers.clone_into(record);
233 self.must_reemit_headers = false;
234 return Ok(true);
235 }
236
237 self.read_byte_record_impl(record)
238 }
239
240 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
241 ByteRecordsIter {
242 reader: self,
243 record: ByteRecord::new(),
244 }
245 }
246
247 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
248 ByteRecordsIntoIter {
249 reader: self,
250 record: ByteRecord::new(),
251 }
252 }
253
254 pub fn get_ref(&self) -> &R {
255 self.buffer.get_ref()
256 }
257
258 pub fn get_mut(&mut self) -> &mut R {
259 self.buffer.get_mut()
260 }
261
262 pub fn into_inner(self) -> R {
263 self.buffer.into_inner().into_inner()
264 }
265
266 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
267 (
268 self.must_reemit_headers.then_some(self.headers),
269 self.buffer.into_inner(),
270 )
271 }
272
273 #[inline(always)]
275 pub fn position(&self) -> u64 {
276 if self.must_reemit_headers {
277 0
278 } else {
279 self.buffer.position()
280 }
281 }
282}
283
284pub struct ByteRecordsIter<'r, R> {
285 reader: &'r mut Reader<R>,
286 record: ByteRecord,
287}
288
289impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
290 type Item = error::Result<ByteRecord>;
291
292 #[inline]
293 fn next(&mut self) -> Option<Self::Item> {
294 match self.reader.read_byte_record(&mut self.record) {
297 Err(err) => Some(Err(err)),
298 Ok(true) => Some(Ok(self.record.clone())),
299 Ok(false) => None,
300 }
301 }
302}
303
304pub struct ByteRecordsIntoIter<R> {
305 reader: Reader<R>,
306 record: ByteRecord,
307}
308
309impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
310 type Item = error::Result<ByteRecord>;
311
312 #[inline]
313 fn next(&mut self) -> Option<Self::Item> {
314 match self.reader.read_byte_record(&mut self.record) {
317 Err(err) => Some(Err(err)),
318 Ok(true) => Some(Ok(self.record.clone())),
319 Ok(false) => None,
320 }
321 }
322}
323
324pub struct ReverseReader<R> {
332 inner: CoreReader,
333 buffer: BufReader<utils::ReverseReader<R>>,
334 flexible: bool,
335 headers: ByteRecord,
336}
337
338impl<R: Read + Seek> ReverseReader<R> {
339 pub fn from_reader(reader: R) -> error::Result<Self> {
340 ReaderBuilder::new().reverse_from_reader(reader)
341 }
342
343 pub fn byte_headers(&self) -> &ByteRecord {
344 &self.headers
345 }
346
347 #[inline]
348 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
349 if self.flexible {
350 return Ok(());
351 }
352
353 if written != self.headers.len() {
354 return Err(Error::new(ErrorKind::UnequalLengths {
355 expected_len: self.headers.len(),
356 len: written,
357 pos: None,
358 }));
359 }
360
361 Ok(())
362 }
363
364 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
365 use ReadResult::*;
366
367 record.clear();
368
369 let mut record_builder = ByteRecordBuilder::wrap(record);
370
371 loop {
372 let input = self.buffer.fill_buf()?;
373
374 let (result, pos) = self.inner.read_record(input, &mut record_builder);
375
376 self.buffer.consume(pos);
377
378 match result {
379 End => {
380 return Ok(false);
381 }
382 Cr | Lf | InputEmpty => {
383 continue;
384 }
385 Record => {
386 self.check_field_count(record.len())?;
387 record.reverse();
388 return Ok(true);
389 }
390 };
391 }
392 }
393
394 pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
395 ReverseByteRecordsIter {
396 reader: self,
397 record: ByteRecord::new(),
398 }
399 }
400
401 pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
402 ReverseByteRecordsIntoIter {
403 reader: self,
404 record: ByteRecord::new(),
405 }
406 }
407}
408
409pub struct ReverseByteRecordsIter<'r, R> {
410 reader: &'r mut ReverseReader<R>,
411 record: ByteRecord,
412}
413
414impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
415 type Item = error::Result<ByteRecord>;
416
417 #[inline]
418 fn next(&mut self) -> Option<Self::Item> {
419 match self.reader.read_byte_record(&mut self.record) {
422 Err(err) => Some(Err(err)),
423 Ok(true) => Some(Ok(self.record.clone())),
424 Ok(false) => None,
425 }
426 }
427}
428
429pub struct ReverseByteRecordsIntoIter<R> {
430 reader: ReverseReader<R>,
431 record: ByteRecord,
432}
433
434impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
435 type Item = error::Result<ByteRecord>;
436
437 #[inline]
438 fn next(&mut self) -> Option<Self::Item> {
439 match self.reader.read_byte_record(&mut self.record) {
442 Err(err) => Some(Err(err)),
443 Ok(true) => Some(Ok(self.record.clone())),
444 Ok(false) => None,
445 }
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use std::io::Cursor;
452
453 use super::*;
454
455 impl<R: Read> Reader<R> {
456 fn from_reader_no_headers(reader: R) -> Self {
457 ReaderBuilder::new().has_headers(false).from_reader(reader)
458 }
459 }
460
461 #[test]
462 fn test_read_byte_record() -> error::Result<()> {
463 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
464
465 let expected = vec![
466 brec!["name", "surname", "age"],
467 brec!["john", "landy, the \"everlasting\" bastard", "45"],
468 brec!["\"ok\"", "whatever", "dude"],
469 brec!["lucy", "rose", "67"],
470 brec!["jermaine", "jackson", "89"],
471 brec!["karine", "loucan", "52"],
472 brec!["rose", "glib", "12"],
473 brec!["guillaume", "plique", "42"],
474 ];
475
476 for capacity in [32usize, 4, 3, 2, 1] {
477 let mut reader = ReaderBuilder::with_capacity(capacity)
478 .has_headers(false)
479 .from_reader(Cursor::new(csv));
480
481 assert_eq!(
482 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
483 expected,
484 );
485 }
486
487 Ok(())
488 }
489
490 #[test]
491 fn test_strip_bom() -> error::Result<()> {
492 let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
493
494 assert_eq!(
495 reader.byte_records().next().unwrap()?,
496 brec!["name", "surname", "age"]
497 );
498
499 let mut reader =
500 Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
501
502 assert_eq!(
503 reader.byte_records().next().unwrap()?,
504 brec!["name", "surname", "age"]
505 );
506
507 Ok(())
508 }
509
510 #[test]
511 fn test_empty_row() -> error::Result<()> {
512 let data = "name\n\"\"\nlucy\n\"\"";
513
514 let reader = Reader::from_reader_no_headers(Cursor::new(data));
516
517 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
518
519 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
520
521 assert_eq!(records, expected);
522
523 Ok(())
524 }
525
526 #[test]
527 fn test_crlf() -> error::Result<()> {
528 let reader = Reader::from_reader_no_headers(Cursor::new(
529 "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
530 ));
531
532 let expected = vec![
533 brec!["name", "surname"],
534 brec!["lucy", "john"],
535 brec!["evan", "zhong"],
536 brec!["béatrice", "glougou"],
537 ];
538
539 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
540
541 assert_eq!(records, expected);
542
543 Ok(())
544 }
545
546 #[test]
547 fn test_quote_always() -> error::Result<()> {
548 let reader = Reader::from_reader_no_headers(Cursor::new(
549 "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
550 ));
551
552 let expected = vec![
553 brec!["name", "surname"],
554 brec!["lucy", "rose"],
555 brec!["john", "mayhew"],
556 ];
557
558 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
559
560 assert_eq!(records, expected);
561
562 Ok(())
563 }
564
565 #[test]
566 fn test_byte_headers() -> error::Result<()> {
567 let data = b"name,surname\njohn,dandy";
568
569 let mut reader = Reader::from_reader(Cursor::new(data));
571 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
572 assert_eq!(
573 reader.byte_records().next().unwrap()?,
574 brec!["john", "dandy"]
575 );
576
577 let mut reader = Reader::from_reader(Cursor::new(data));
579 assert_eq!(
580 reader.byte_records().next().unwrap()?,
581 brec!["john", "dandy"]
582 );
583 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
584
585 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
587 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
588 assert_eq!(
589 reader.byte_records().next().unwrap()?,
590 brec!["name", "surname"]
591 );
592
593 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
595 assert_eq!(
596 reader.byte_records().next().unwrap()?,
597 brec!["name", "surname"]
598 );
599 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
600
601 let mut reader = Reader::from_reader(Cursor::new(b""));
603 assert_eq!(reader.byte_headers()?, &brec![]);
604 assert!(reader.byte_records().next().is_none());
605
606 let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
608 assert_eq!(reader.byte_headers()?, &brec![]);
609 assert!(reader.byte_records().next().is_none());
610
611 Ok(())
612 }
613
614 #[test]
615 fn test_weirdness() -> error::Result<()> {
616 let data =
618 b"name,surname\n\"test\" \"wat\", ok\ntest \"wat\",ok \ntest,\"whatever\" ok\n\"test\" there,\"ok\"\r\n";
619 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
620
621 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
622
623 let expected = vec![
624 brec!["name", "surname"],
625 brec!["test \"wat", " ok"],
626 brec!["test \"wat", "ok "],
627 brec!["test", "whatever ok"],
628 brec!["test there", "ok"],
629 ];
630
631 assert_eq!(records, expected);
632
633 let data = b"name,surname\n\r\rjohn,coucou";
640 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
641 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
642
643 assert_eq!(
644 records,
645 vec![brec!["name", "surname"], brec!["john", "coucou"]]
646 );
647
648 Ok(())
649 }
650
651 #[test]
652 fn test_position() -> error::Result<()> {
653 let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
654
655 let mut reader = Reader::from_reader(&data[..]);
656 let mut record = ByteRecord::new();
657
658 let mut positions = vec![reader.position()];
659
660 reader.byte_headers()?;
661
662 positions.push(reader.position());
663
664 while reader.read_byte_record(&mut record)? {
665 positions.push(reader.position());
666 }
667
668 assert_eq!(positions, vec![0, 13, 32, 54]);
669
670 let mut reader = ReaderBuilder::new()
671 .has_headers(false)
672 .from_reader(&data[..]);
673
674 reader.byte_headers()?;
675
676 assert_eq!(reader.position(), 0);
677
678 Ok(())
679 }
680
681 #[test]
682 fn test_reverse_reader() -> error::Result<()> {
683 let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
684 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
685
686 assert_eq!(
687 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
688 vec![
689 brec!["evan", "michalak"],
690 brec!["beatrice", "babka"],
691 brec!["john", "landis"]
692 ]
693 );
694
695 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
696
697 Ok(())
698 }
699}