1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
10 delimiter: u8,
11 quote: u8,
12 buffer_capacity: usize,
13 flexible: bool,
14 has_headers: bool,
15}
16
17impl Default for ReaderBuilder {
18 fn default() -> Self {
19 Self {
20 delimiter: b',',
21 quote: b'"',
22 buffer_capacity: 8192,
23 flexible: false,
24 has_headers: true,
25 }
26 }
27}
28
29impl ReaderBuilder {
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn with_capacity(capacity: usize) -> Self {
35 let mut reader = Self::default();
36 reader.buffer_capacity(capacity);
37 reader
38 }
39
40 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
41 self.delimiter = delimiter;
42 self
43 }
44
45 pub fn quote(&mut self, quote: u8) -> &mut Self {
46 self.quote = quote;
47 self
48 }
49
50 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
51 self.buffer_capacity = capacity;
52 self
53 }
54
55 pub fn flexible(&mut self, yes: bool) -> &mut Self {
56 self.flexible = yes;
57 self
58 }
59
60 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
61 self.has_headers = yes;
62 self
63 }
64
65 pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
66 Reader {
67 buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
68 inner: CoreReader::new(self.delimiter, self.quote),
69 flexible: self.flexible,
70 headers: ByteRecord::new(),
71 has_read: false,
72 must_reemit_headers: !self.has_headers,
73 has_headers: self.has_headers,
74 index: 0,
75 }
76 }
77
78 pub fn reverse_from_reader<R: Read + Seek>(
79 &self,
80 mut reader: R,
81 ) -> error::Result<ReverseReader<R>> {
82 let initial_pos = reader.stream_position()?;
83
84 let mut forward_reader = self.from_reader(reader);
85 let headers = forward_reader.byte_headers()?.clone();
86 let position_after_headers = forward_reader.position();
87
88 let mut reader = forward_reader.into_inner();
89
90 let file_len = reader.seek(SeekFrom::End(0))?;
91
92 let offset = if self.has_headers {
93 initial_pos + position_after_headers
94 } else {
95 initial_pos
96 };
97
98 let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
99
100 Ok(ReverseReader {
101 buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
102 inner: CoreReader::new(self.delimiter, self.quote),
103 flexible: self.flexible,
104 headers,
105 })
106 }
107}
108
109pub struct Reader<R> {
110 buffer: BufReaderWithPosition<R>,
111 inner: CoreReader,
112 flexible: bool,
113 headers: ByteRecord,
114 has_read: bool,
115 must_reemit_headers: bool,
116 has_headers: bool,
117 index: u64,
118}
119
120impl<R: Read> Reader<R> {
121 pub fn from_reader(reader: R) -> Self {
122 ReaderBuilder::new().from_reader(reader)
123 }
124
125 #[inline]
126 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
127 if self.flexible {
128 return Ok(());
129 }
130
131 if self.has_read && written != self.headers.len() {
132 return Err(Error::new(ErrorKind::UnequalLengths {
133 expected_len: self.headers.len(),
134 len: written,
135 pos: Some((
136 byte,
137 self.index
138 .saturating_sub(if self.has_headers { 1 } else { 0 }),
139 )),
140 }));
141 }
142
143 Ok(())
144 }
145
146 fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
147 use ReadResult::*;
148
149 record.clear();
150
151 let mut record_builder = ByteRecordBuilder::wrap(record);
152 let byte = self.position();
153
154 loop {
155 let input = self.buffer.fill_buf()?;
156
157 let (result, pos) = self.inner.read_record(input, &mut record_builder);
158
159 self.buffer.consume(pos);
160
161 match result {
162 End => {
163 return Ok(false);
164 }
165 Cr | Lf | InputEmpty => {
166 continue;
167 }
168 Record => {
169 self.index += 1;
170 self.check_field_count(byte, record.len())?;
171 return Ok(true);
172 }
173 };
174 }
175 }
176
177 #[inline]
178 fn on_first_read(&mut self) -> error::Result<()> {
179 if self.has_read {
180 return Ok(());
181 }
182
183 let input = self.buffer.fill_buf()?;
185 let bom_len = trim_bom(input);
186 self.buffer.consume(bom_len);
187
188 let mut headers = ByteRecord::new();
190
191 let has_data = self.read_byte_record_impl(&mut headers)?;
192
193 if !has_data {
194 self.must_reemit_headers = false;
195 }
196
197 self.headers = headers;
198 self.has_read = true;
199
200 Ok(())
201 }
202
203 #[inline]
204 pub fn has_headers(&self) -> bool {
205 self.has_headers
206 }
207
208 #[inline]
209 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
210 self.on_first_read()?;
211
212 Ok(&self.headers)
213 }
214
215 #[inline(always)]
216 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
217 self.on_first_read()?;
218
219 if self.must_reemit_headers {
220 self.headers.clone_into(record);
221 self.must_reemit_headers = false;
222 return Ok(true);
223 }
224
225 self.read_byte_record_impl(record)
226 }
227
228 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
229 ByteRecordsIter {
230 reader: self,
231 record: ByteRecord::new(),
232 }
233 }
234
235 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
236 ByteRecordsIntoIter {
237 reader: self,
238 record: ByteRecord::new(),
239 }
240 }
241
242 pub fn get_ref(&self) -> &R {
243 self.buffer.get_ref()
244 }
245
246 pub fn get_mut(&mut self) -> &mut R {
247 self.buffer.get_mut()
248 }
249
250 pub fn into_inner(self) -> R {
251 self.buffer.into_inner().into_inner()
252 }
253
254 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
255 (
256 self.must_reemit_headers.then_some(self.headers),
257 self.buffer.into_inner(),
258 )
259 }
260
261 #[inline(always)]
262 pub fn position(&self) -> u64 {
263 if self.must_reemit_headers {
264 0
265 } else {
266 self.buffer.position()
267 }
268 }
269}
270
271pub struct ByteRecordsIter<'r, R> {
272 reader: &'r mut Reader<R>,
273 record: ByteRecord,
274}
275
276impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
277 type Item = error::Result<ByteRecord>;
278
279 #[inline]
280 fn next(&mut self) -> Option<Self::Item> {
281 match self.reader.read_byte_record(&mut self.record) {
284 Err(err) => Some(Err(err)),
285 Ok(true) => Some(Ok(self.record.clone())),
286 Ok(false) => None,
287 }
288 }
289}
290
291pub struct ByteRecordsIntoIter<R> {
292 reader: Reader<R>,
293 record: ByteRecord,
294}
295
296impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
297 type Item = error::Result<ByteRecord>;
298
299 #[inline]
300 fn next(&mut self) -> Option<Self::Item> {
301 match self.reader.read_byte_record(&mut self.record) {
304 Err(err) => Some(Err(err)),
305 Ok(true) => Some(Ok(self.record.clone())),
306 Ok(false) => None,
307 }
308 }
309}
310
311pub struct ReverseReader<R> {
312 inner: CoreReader,
313 buffer: BufReader<utils::ReverseReader<R>>,
314 flexible: bool,
315 headers: ByteRecord,
316}
317
318impl<R: Read + Seek> ReverseReader<R> {
319 pub fn from_reader(reader: R) -> error::Result<Self> {
320 ReaderBuilder::new().reverse_from_reader(reader)
321 }
322
323 pub fn byte_headers(&self) -> &ByteRecord {
324 &self.headers
325 }
326
327 #[inline]
328 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
329 if self.flexible {
330 return Ok(());
331 }
332
333 if written != self.headers.len() {
334 return Err(Error::new(ErrorKind::UnequalLengths {
335 expected_len: self.headers.len(),
336 len: written,
337 pos: None,
338 }));
339 }
340
341 Ok(())
342 }
343
344 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
345 use ReadResult::*;
346
347 record.clear();
348
349 let mut record_builder = ByteRecordBuilder::wrap(record);
350
351 loop {
352 let input = self.buffer.fill_buf()?;
353
354 let (result, pos) = self.inner.read_record(input, &mut record_builder);
355
356 self.buffer.consume(pos);
357
358 match result {
359 End => {
360 return Ok(false);
361 }
362 Cr | Lf | InputEmpty => {
363 continue;
364 }
365 Record => {
366 self.check_field_count(record.len())?;
367 record.reverse();
368 return Ok(true);
369 }
370 };
371 }
372 }
373
374 pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
375 ReverseByteRecordsIter {
376 reader: self,
377 record: ByteRecord::new(),
378 }
379 }
380
381 pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
382 ReverseByteRecordsIntoIter {
383 reader: self,
384 record: ByteRecord::new(),
385 }
386 }
387}
388
389pub struct ReverseByteRecordsIter<'r, R> {
390 reader: &'r mut ReverseReader<R>,
391 record: ByteRecord,
392}
393
394impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
395 type Item = error::Result<ByteRecord>;
396
397 #[inline]
398 fn next(&mut self) -> Option<Self::Item> {
399 match self.reader.read_byte_record(&mut self.record) {
402 Err(err) => Some(Err(err)),
403 Ok(true) => Some(Ok(self.record.clone())),
404 Ok(false) => None,
405 }
406 }
407}
408
409pub struct ReverseByteRecordsIntoIter<R> {
410 reader: ReverseReader<R>,
411 record: ByteRecord,
412}
413
414impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
415 type Item = error::Result<ByteRecord>;
416
417 #[inline]
418 fn next(&mut self) -> Option<Self::Item> {
419 match self.reader.read_byte_record(&mut self.record) {
422 Err(err) => Some(Err(err)),
423 Ok(true) => Some(Ok(self.record.clone())),
424 Ok(false) => None,
425 }
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use std::io::Cursor;
432
433 use crate::brec;
434
435 use super::*;
436
437 impl<R: Read> Reader<R> {
438 fn from_reader_no_headers(reader: R) -> Self {
439 ReaderBuilder::new().has_headers(false).from_reader(reader)
440 }
441 }
442
443 #[test]
444 fn test_read_byte_record() -> error::Result<()> {
445 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
446
447 let expected = vec![
448 brec!["name", "surname", "age"],
449 brec!["john", "landy, the \"everlasting\" bastard", "45"],
450 brec!["\"ok\"", "whatever", "dude"],
451 brec!["lucy", "rose", "67"],
452 brec!["jermaine", "jackson", "89"],
453 brec!["karine", "loucan", "52"],
454 brec!["rose", "glib", "12"],
455 brec!["guillaume", "plique", "42"],
456 ];
457
458 for capacity in [32usize, 4, 3, 2, 1] {
459 let mut reader = ReaderBuilder::with_capacity(capacity)
460 .has_headers(false)
461 .from_reader(Cursor::new(csv));
462
463 assert_eq!(
464 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
465 expected,
466 );
467 }
468
469 Ok(())
470 }
471
472 #[test]
473 fn test_strip_bom() -> error::Result<()> {
474 let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
475
476 assert_eq!(
477 reader.byte_records().next().unwrap()?,
478 brec!["name", "surname", "age"]
479 );
480
481 let mut reader =
482 Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
483
484 assert_eq!(
485 reader.byte_records().next().unwrap()?,
486 brec!["name", "surname", "age"]
487 );
488
489 Ok(())
490 }
491
492 #[test]
493 fn test_empty_row() -> error::Result<()> {
494 let data = "name\n\"\"\nlucy\n\"\"";
495
496 let reader = Reader::from_reader_no_headers(Cursor::new(data));
498
499 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
500
501 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
502
503 assert_eq!(records, expected);
504
505 Ok(())
506 }
507
508 #[test]
509 fn test_crlf() -> error::Result<()> {
510 let reader = Reader::from_reader_no_headers(Cursor::new(
511 "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
512 ));
513
514 let expected = vec![
515 brec!["name", "surname"],
516 brec!["lucy", "john"],
517 brec!["evan", "zhong"],
518 brec!["béatrice", "glougou"],
519 ];
520
521 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
522
523 assert_eq!(records, expected);
524
525 Ok(())
526 }
527
528 #[test]
529 fn test_quote_always() -> error::Result<()> {
530 let reader = Reader::from_reader_no_headers(Cursor::new(
531 "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
532 ));
533
534 let expected = vec![
535 brec!["name", "surname"],
536 brec!["lucy", "rose"],
537 brec!["john", "mayhew"],
538 ];
539
540 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
541
542 assert_eq!(records, expected);
543
544 Ok(())
545 }
546
547 #[test]
548 fn test_byte_headers() -> error::Result<()> {
549 let data = b"name,surname\njohn,dandy";
550
551 let mut reader = Reader::from_reader(Cursor::new(data));
553 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
554 assert_eq!(
555 reader.byte_records().next().unwrap()?,
556 brec!["john", "dandy"]
557 );
558
559 let mut reader = Reader::from_reader(Cursor::new(data));
561 assert_eq!(
562 reader.byte_records().next().unwrap()?,
563 brec!["john", "dandy"]
564 );
565 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
566
567 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
569 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
570 assert_eq!(
571 reader.byte_records().next().unwrap()?,
572 brec!["name", "surname"]
573 );
574
575 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
577 assert_eq!(
578 reader.byte_records().next().unwrap()?,
579 brec!["name", "surname"]
580 );
581 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
582
583 let mut reader = Reader::from_reader(Cursor::new(b""));
585 assert_eq!(reader.byte_headers()?, &brec![]);
586 assert!(reader.byte_records().next().is_none());
587
588 let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
590 assert_eq!(reader.byte_headers()?, &brec![]);
591 assert!(reader.byte_records().next().is_none());
592
593 Ok(())
594 }
595
596 #[test]
597 fn test_weirdness() -> error::Result<()> {
598 let data =
600 b"name,surname\n\"test\" \"wat\", ok\ntest \"wat\",ok \ntest,\"whatever\" ok\n\"test\" there,\"ok\"\r\n";
601 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
602
603 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
604
605 let expected = vec![
606 brec!["name", "surname"],
607 brec!["test \"wat", " ok"],
608 brec!["test \"wat", "ok "],
609 brec!["test", "whatever ok"],
610 brec!["test there", "ok"],
611 ];
612
613 assert_eq!(records, expected);
614
615 let data = b"name,surname\n\r\rjohn,coucou";
622 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
623 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
624
625 assert_eq!(
626 records,
627 vec![brec!["name", "surname"], brec!["john", "coucou"]]
628 );
629
630 Ok(())
631 }
632
633 #[test]
634 fn test_position() -> error::Result<()> {
635 let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
636
637 let mut reader = Reader::from_reader(&data[..]);
638 let mut record = ByteRecord::new();
639
640 let mut positions = vec![reader.position()];
641
642 reader.byte_headers()?;
643
644 positions.push(reader.position());
645
646 while reader.read_byte_record(&mut record)? {
647 positions.push(reader.position());
648 }
649
650 assert_eq!(positions, vec![0, 13, 32, 54]);
651
652 let mut reader = ReaderBuilder::new()
653 .has_headers(false)
654 .from_reader(&data[..]);
655
656 reader.byte_headers()?;
657
658 assert_eq!(reader.position(), 0);
659
660 Ok(())
661 }
662
663 #[test]
664 fn test_reverse_reader() -> error::Result<()> {
665 let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
666 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
667
668 assert_eq!(
669 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
670 vec![
671 brec!["evan", "michalak"],
672 brec!["beatrice", "babka"],
673 brec!["john", "landis"]
674 ]
675 );
676
677 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
678
679 Ok(())
680 }
681}