1use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
2
3use crate::buffer::BufReaderWithPosition;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ByteRecordBuilder};
7use crate::utils::{self, trim_bom};
8
9pub struct ReaderBuilder {
10 delimiter: u8,
11 quote: u8,
12 buffer_capacity: usize,
13 flexible: bool,
14 has_headers: bool,
15}
16
17impl Default for ReaderBuilder {
18 fn default() -> Self {
19 Self {
20 delimiter: b',',
21 quote: b'"',
22 buffer_capacity: 8192,
23 flexible: false,
24 has_headers: true,
25 }
26 }
27}
28
29impl ReaderBuilder {
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn with_capacity(capacity: usize) -> Self {
35 let mut reader = Self::default();
36 reader.buffer_capacity(capacity);
37 reader
38 }
39
40 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
41 self.delimiter = delimiter;
42 self
43 }
44
45 pub fn quote(&mut self, quote: u8) -> &mut Self {
46 self.quote = quote;
47 self
48 }
49
50 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
51 self.buffer_capacity = capacity;
52 self
53 }
54
55 pub fn flexible(&mut self, yes: bool) -> &mut Self {
56 self.flexible = yes;
57 self
58 }
59
60 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
61 self.has_headers = yes;
62 self
63 }
64
65 pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
66 Reader {
67 buffer: BufReaderWithPosition::with_capacity(self.buffer_capacity, reader),
68 inner: CoreReader::new(self.delimiter, self.quote),
69 flexible: self.flexible,
70 headers: ByteRecord::new(),
71 has_read: false,
72 must_reemit_headers: !self.has_headers,
73 has_headers: self.has_headers,
74 index: 0,
75 }
76 }
77
78 pub fn reverse_from_reader<R: Read + Seek>(
79 &self,
80 mut reader: R,
81 ) -> error::Result<ReverseReader<R>> {
82 let initial_pos = reader.stream_position()?;
83
84 let mut forward_reader = self.from_reader(reader);
85 let headers = forward_reader.byte_headers()?.clone();
86 let position_after_headers = forward_reader.position();
87
88 let mut reader = forward_reader.into_inner();
89
90 let file_len = reader.seek(SeekFrom::End(0))?;
91
92 let offset = if self.has_headers {
93 initial_pos + position_after_headers
94 } else {
95 initial_pos
96 };
97
98 let reverse_io_reader = utils::ReverseReader::new(reader, file_len, offset);
99
100 Ok(ReverseReader {
101 buffer: BufReader::with_capacity(self.buffer_capacity, reverse_io_reader),
102 inner: CoreReader::new(self.delimiter, self.quote),
103 flexible: self.flexible,
104 headers,
105 })
106 }
107}
108
109pub struct Reader<R> {
110 buffer: BufReaderWithPosition<R>,
111 inner: CoreReader,
112 flexible: bool,
113 headers: ByteRecord,
114 has_read: bool,
115 must_reemit_headers: bool,
116 has_headers: bool,
117 index: u64,
118}
119
120impl<R: Read> Reader<R> {
121 pub fn from_reader(reader: R) -> Self {
122 ReaderBuilder::new().from_reader(reader)
123 }
124
125 #[inline]
126 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
127 if self.flexible {
128 return Ok(());
129 }
130
131 if self.has_read && written != self.headers.len() {
132 return Err(Error::new(ErrorKind::UnequalLengths {
133 expected_len: self.headers.len(),
134 len: written,
135 pos: Some((
136 byte,
137 self.index
138 .saturating_sub(if self.has_headers { 1 } else { 0 }),
139 )),
140 }));
141 }
142
143 Ok(())
144 }
145
146 fn read_byte_record_impl(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
147 use ReadResult::*;
148
149 record.clear();
150
151 let mut record_builder = ByteRecordBuilder::wrap(record);
152 let byte = self.position();
153
154 loop {
155 let input = self.buffer.fill_buf()?;
156
157 let (result, pos) = self.inner.read_record(input, &mut record_builder);
158
159 self.buffer.consume(pos);
160
161 match result {
162 End => {
163 return Ok(false);
164 }
165 Cr | Lf | InputEmpty => {
166 continue;
167 }
168 Record => {
169 self.index += 1;
170 self.check_field_count(byte, record.len())?;
171 return Ok(true);
172 }
173 };
174 }
175 }
176
177 #[inline]
178 fn on_first_read(&mut self) -> error::Result<()> {
179 if self.has_read {
180 return Ok(());
181 }
182
183 let input = self.buffer.fill_buf()?;
185 let bom_len = trim_bom(input);
186 self.buffer.consume(bom_len);
187
188 let mut headers = ByteRecord::new();
190
191 let has_data = self.read_byte_record_impl(&mut headers)?;
192
193 if !has_data {
194 self.must_reemit_headers = false;
195 }
196
197 self.headers = headers;
198 self.has_read = true;
199
200 Ok(())
201 }
202
203 #[inline]
204 pub fn has_headers(&self) -> bool {
205 self.has_headers
206 }
207
208 #[inline]
209 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
210 self.on_first_read()?;
211
212 Ok(&self.headers)
213 }
214
215 #[inline(always)]
216 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
217 self.on_first_read()?;
218
219 if self.must_reemit_headers {
220 self.headers.clone_into(record);
221 self.must_reemit_headers = false;
222 return Ok(true);
223 }
224
225 self.read_byte_record_impl(record)
226 }
227
228 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
229 ByteRecordsIter {
230 reader: self,
231 record: ByteRecord::new(),
232 }
233 }
234
235 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
236 ByteRecordsIntoIter {
237 reader: self,
238 record: ByteRecord::new(),
239 }
240 }
241
242 pub fn get_ref(&self) -> &R {
243 self.buffer.get_ref()
244 }
245
246 pub fn get_mut(&mut self) -> &mut R {
247 self.buffer.get_mut()
248 }
249
250 pub fn into_inner(self) -> R {
251 self.buffer.into_inner().into_inner()
252 }
253
254 pub fn into_bufreader(self) -> BufReader<R> {
255 self.buffer.into_inner()
256 }
257
258 #[inline(always)]
259 pub fn position(&self) -> u64 {
260 if self.must_reemit_headers {
261 0
262 } else {
263 self.buffer.position()
264 }
265 }
266}
267
268pub struct ByteRecordsIter<'r, R> {
269 reader: &'r mut Reader<R>,
270 record: ByteRecord,
271}
272
273impl<R: Read> Iterator for ByteRecordsIter<'_, R> {
274 type Item = error::Result<ByteRecord>;
275
276 #[inline]
277 fn next(&mut self) -> Option<Self::Item> {
278 match self.reader.read_byte_record(&mut self.record) {
281 Err(err) => Some(Err(err)),
282 Ok(true) => Some(Ok(self.record.clone())),
283 Ok(false) => None,
284 }
285 }
286}
287
288pub struct ByteRecordsIntoIter<R> {
289 reader: Reader<R>,
290 record: ByteRecord,
291}
292
293impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
294 type Item = error::Result<ByteRecord>;
295
296 #[inline]
297 fn next(&mut self) -> Option<Self::Item> {
298 match self.reader.read_byte_record(&mut self.record) {
301 Err(err) => Some(Err(err)),
302 Ok(true) => Some(Ok(self.record.clone())),
303 Ok(false) => None,
304 }
305 }
306}
307
308pub struct ReverseReader<R> {
309 inner: CoreReader,
310 buffer: BufReader<utils::ReverseReader<R>>,
311 flexible: bool,
312 headers: ByteRecord,
313}
314
315impl<R: Read + Seek> ReverseReader<R> {
316 pub fn from_reader(reader: R) -> error::Result<Self> {
317 ReaderBuilder::new().reverse_from_reader(reader)
318 }
319
320 pub fn byte_headers(&self) -> &ByteRecord {
321 &self.headers
322 }
323
324 #[inline]
325 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
326 if self.flexible {
327 return Ok(());
328 }
329
330 if written != self.headers.len() {
331 return Err(Error::new(ErrorKind::UnequalLengths {
332 expected_len: self.headers.len(),
333 len: written,
334 pos: None,
335 }));
336 }
337
338 Ok(())
339 }
340
341 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
342 use ReadResult::*;
343
344 record.clear();
345
346 let mut record_builder = ByteRecordBuilder::wrap(record);
347
348 loop {
349 let input = self.buffer.fill_buf()?;
350
351 let (result, pos) = self.inner.read_record(input, &mut record_builder);
352
353 self.buffer.consume(pos);
354
355 match result {
356 End => {
357 return Ok(false);
358 }
359 Cr | Lf | InputEmpty => {
360 continue;
361 }
362 Record => {
363 self.check_field_count(record.len())?;
364 record.reverse();
365 return Ok(true);
366 }
367 };
368 }
369 }
370
371 pub fn byte_records(&mut self) -> ReverseByteRecordsIter<'_, R> {
372 ReverseByteRecordsIter {
373 reader: self,
374 record: ByteRecord::new(),
375 }
376 }
377
378 pub fn into_byte_records(self) -> ReverseByteRecordsIntoIter<R> {
379 ReverseByteRecordsIntoIter {
380 reader: self,
381 record: ByteRecord::new(),
382 }
383 }
384}
385
386pub struct ReverseByteRecordsIter<'r, R> {
387 reader: &'r mut ReverseReader<R>,
388 record: ByteRecord,
389}
390
391impl<R: Read + Seek> Iterator for ReverseByteRecordsIter<'_, R> {
392 type Item = error::Result<ByteRecord>;
393
394 #[inline]
395 fn next(&mut self) -> Option<Self::Item> {
396 match self.reader.read_byte_record(&mut self.record) {
399 Err(err) => Some(Err(err)),
400 Ok(true) => Some(Ok(self.record.clone())),
401 Ok(false) => None,
402 }
403 }
404}
405
406pub struct ReverseByteRecordsIntoIter<R> {
407 reader: ReverseReader<R>,
408 record: ByteRecord,
409}
410
411impl<R: Read + Seek> Iterator for ReverseByteRecordsIntoIter<R> {
412 type Item = error::Result<ByteRecord>;
413
414 #[inline]
415 fn next(&mut self) -> Option<Self::Item> {
416 match self.reader.read_byte_record(&mut self.record) {
419 Err(err) => Some(Err(err)),
420 Ok(true) => Some(Ok(self.record.clone())),
421 Ok(false) => None,
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use std::io::Cursor;
429
430 use crate::brec;
431
432 use super::*;
433
434 impl<R: Read> Reader<R> {
435 fn from_reader_no_headers(reader: R) -> Self {
436 ReaderBuilder::new().has_headers(false).from_reader(reader)
437 }
438 }
439
440 #[test]
441 fn test_read_byte_record() -> error::Result<()> {
442 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
443
444 let expected = vec![
445 brec!["name", "surname", "age"],
446 brec!["john", "landy, the \"everlasting\" bastard", "45"],
447 brec!["\"ok\"", "whatever", "dude"],
448 brec!["lucy", "rose", "67"],
449 brec!["jermaine", "jackson", "89"],
450 brec!["karine", "loucan", "52"],
451 brec!["rose", "glib", "12"],
452 brec!["guillaume", "plique", "42"],
453 ];
454
455 for capacity in [32usize, 4, 3, 2, 1] {
456 let mut reader = ReaderBuilder::with_capacity(capacity)
457 .has_headers(false)
458 .from_reader(Cursor::new(csv));
459
460 assert_eq!(
461 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
462 expected,
463 );
464 }
465
466 Ok(())
467 }
468
469 #[test]
470 fn test_strip_bom() -> error::Result<()> {
471 let mut reader = Reader::from_reader_no_headers(Cursor::new("name,surname,age"));
472
473 assert_eq!(
474 reader.byte_records().next().unwrap()?,
475 brec!["name", "surname", "age"]
476 );
477
478 let mut reader =
479 Reader::from_reader_no_headers(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
480
481 assert_eq!(
482 reader.byte_records().next().unwrap()?,
483 brec!["name", "surname", "age"]
484 );
485
486 Ok(())
487 }
488
489 #[test]
490 fn test_empty_row() -> error::Result<()> {
491 let data = "name\n\"\"\nlucy\n\"\"";
492
493 let reader = Reader::from_reader_no_headers(Cursor::new(data));
495
496 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
497
498 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
499
500 assert_eq!(records, expected);
501
502 Ok(())
503 }
504
505 #[test]
506 fn test_crlf() -> error::Result<()> {
507 let reader = Reader::from_reader_no_headers(Cursor::new(
508 "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
509 ));
510
511 let expected = vec![
512 brec!["name", "surname"],
513 brec!["lucy", "john"],
514 brec!["evan", "zhong"],
515 brec!["béatrice", "glougou"],
516 ];
517
518 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
519
520 assert_eq!(records, expected);
521
522 Ok(())
523 }
524
525 #[test]
526 fn test_quote_always() -> error::Result<()> {
527 let reader = Reader::from_reader_no_headers(Cursor::new(
528 "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
529 ));
530
531 let expected = vec![
532 brec!["name", "surname"],
533 brec!["lucy", "rose"],
534 brec!["john", "mayhew"],
535 ];
536
537 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
538
539 assert_eq!(records, expected);
540
541 Ok(())
542 }
543
544 #[test]
545 fn test_byte_headers() -> error::Result<()> {
546 let data = b"name,surname\njohn,dandy";
547
548 let mut reader = Reader::from_reader(Cursor::new(data));
550 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
551 assert_eq!(
552 reader.byte_records().next().unwrap()?,
553 brec!["john", "dandy"]
554 );
555
556 let mut reader = Reader::from_reader(Cursor::new(data));
558 assert_eq!(
559 reader.byte_records().next().unwrap()?,
560 brec!["john", "dandy"]
561 );
562 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
563
564 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
566 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
567 assert_eq!(
568 reader.byte_records().next().unwrap()?,
569 brec!["name", "surname"]
570 );
571
572 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
574 assert_eq!(
575 reader.byte_records().next().unwrap()?,
576 brec!["name", "surname"]
577 );
578 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
579
580 let mut reader = Reader::from_reader(Cursor::new(b""));
582 assert_eq!(reader.byte_headers()?, &brec![]);
583 assert!(reader.byte_records().next().is_none());
584
585 let mut reader = Reader::from_reader_no_headers(Cursor::new(b""));
587 assert_eq!(reader.byte_headers()?, &brec![]);
588 assert!(reader.byte_records().next().is_none());
589
590 Ok(())
591 }
592
593 #[test]
594 fn test_weirdness() -> error::Result<()> {
595 let data =
597 b"name,surname\n\"test\" \"wat\", ok\ntest \"wat\",ok \ntest,\"whatever\" ok\n\"test\" there,\"ok\"\r\n";
598 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
599
600 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
601
602 let expected = vec![
603 brec!["name", "surname"],
604 brec!["test \"wat", " ok"],
605 brec!["test \"wat", "ok "],
606 brec!["test", "whatever ok"],
607 brec!["test there", "ok"],
608 ];
609
610 assert_eq!(records, expected);
611
612 let data = b"name,surname\n\r\rjohn,coucou";
619 let mut reader = Reader::from_reader_no_headers(Cursor::new(data));
620 let records = reader.byte_records().collect::<Result<Vec<_>, _>>()?;
621
622 assert_eq!(
623 records,
624 vec![brec!["name", "surname"], brec!["john", "coucou"]]
625 );
626
627 Ok(())
628 }
629
630 #[test]
631 fn test_position() -> error::Result<()> {
632 let data = b"name,surname\njohnny,landis crue\nbabka,bob caterpillar\n";
633
634 let mut reader = Reader::from_reader(&data[..]);
635 let mut record = ByteRecord::new();
636
637 let mut positions = vec![reader.position()];
638
639 reader.byte_headers()?;
640
641 positions.push(reader.position());
642
643 while reader.read_byte_record(&mut record)? {
644 positions.push(reader.position());
645 }
646
647 assert_eq!(positions, vec![0, 13, 32, 54]);
648
649 let mut reader = ReaderBuilder::new()
650 .has_headers(false)
651 .from_reader(&data[..]);
652
653 reader.byte_headers()?;
654
655 assert_eq!(reader.position(), 0);
656
657 Ok(())
658 }
659
660 #[test]
661 fn test_reverse_reader() -> error::Result<()> {
662 let data = b"name,surname\njohn,landis\nbeatrice,babka\nevan,michalak";
663 let mut reader = ReverseReader::from_reader(Cursor::new(data))?;
664
665 assert_eq!(
666 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
667 vec![
668 brec!["evan", "michalak"],
669 brec!["beatrice", "babka"],
670 brec!["john", "landis"]
671 ]
672 );
673
674 assert_eq!(reader.byte_headers(), &brec!["name", "surname"]);
675
676 Ok(())
677 }
678}