simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read};
2
3use memchr::{memchr, memchr2};
4
5use crate::error::{self, Error};
6use crate::records::{ByteRecord, ByteRecordBuilder, ZeroCopyByteRecord};
7use crate::searcher::Searcher;
8use crate::utils::trim_trailing_cr;
9
10#[derive(Debug)]
11enum ReadResult {
12    InputEmpty,
13    Cr,
14    Lf,
15    Record,
16    End,
17}
18
19#[derive(Debug)]
20enum ReadState {
21    Unquoted,
22    Quoted,
23    Quote,
24}
25
26// NOTE: funnily enough, knowing the delimiter is not required to split the records,
27// but since we expose a single unified `struct` here, it is simpler to include it.
28struct Reader {
29    delimiter: u8,
30    quote: u8,
31    state: ReadState,
32    record_was_read: bool,
33    searcher: Searcher,
34}
35
36impl Reader {
37    fn new(delimiter: u8, quote: u8) -> Self {
38        Self {
39            delimiter,
40            quote,
41            state: ReadState::Unquoted,
42            // Must be true at the beginning to avoid counting one record for empty input
43            record_was_read: true,
44            searcher: Searcher::new(delimiter, b'\n', quote),
45        }
46    }
47
48    fn split_record(&mut self, input: &[u8]) -> (ReadResult, usize) {
49        use ReadState::*;
50
51        if input.is_empty() {
52            if !self.record_was_read {
53                self.record_was_read = true;
54                return (ReadResult::Record, 0);
55            }
56
57            return (ReadResult::End, 0);
58        }
59
60        if self.record_was_read {
61            if input[0] == b'\n' {
62                return (ReadResult::Lf, 1);
63            } else if input[0] == b'\r' {
64                return (ReadResult::Cr, 1);
65            }
66        }
67
68        self.record_was_read = false;
69
70        let mut pos: usize = 0;
71
72        while pos < input.len() {
73            match self.state {
74                Unquoted => {
75                    // Fast path for quoted field start
76                    if input[pos] == self.quote {
77                        self.state = Quoted;
78                        pos += 1;
79                        continue;
80                    }
81
82                    // Here we are moving to next quote or end of line
83                    if let Some(offset) = memchr2(b'\n', self.quote, &input[pos..]) {
84                        pos += offset;
85
86                        let byte = input[pos];
87
88                        pos += 1;
89
90                        if byte == b'\n' {
91                            self.record_was_read = true;
92                            return (ReadResult::Record, pos);
93                        }
94
95                        // Here, `byte` is guaranteed to be a quote
96                        self.state = Quoted;
97                    } else {
98                        break;
99                    }
100                }
101                Quoted => {
102                    // Here we moving to next quote
103                    if let Some(offset) = memchr(self.quote, &input[pos..]) {
104                        pos += offset + 1;
105                        self.state = Quote;
106                    } else {
107                        break;
108                    }
109                }
110                Quote => {
111                    let byte = input[pos];
112
113                    pos += 1;
114
115                    if byte == self.quote {
116                        self.state = Quoted;
117                    } else if byte == b'\n' {
118                        self.record_was_read = true;
119                        self.state = Unquoted;
120                        return (ReadResult::Record, pos);
121                    } else {
122                        self.state = Unquoted;
123                    }
124                }
125            }
126        }
127
128        (ReadResult::InputEmpty, input.len())
129    }
130
131    fn split_record_and_find_separators(
132        &mut self,
133        input: &[u8],
134        seps_offset: usize,
135        seps: &mut Vec<usize>,
136    ) -> (ReadResult, usize) {
137        use ReadState::*;
138
139        if input.is_empty() {
140            if !self.record_was_read {
141                self.record_was_read = true;
142                return (ReadResult::Record, 0);
143            }
144
145            return (ReadResult::End, 0);
146        }
147
148        if self.record_was_read {
149            if input[0] == b'\n' {
150                return (ReadResult::Lf, 1);
151            } else if input[0] == b'\r' {
152                return (ReadResult::Cr, 1);
153            }
154        }
155
156        self.record_was_read = false;
157
158        let mut pos: usize = 0;
159
160        while pos < input.len() {
161            match self.state {
162                Unquoted => {
163                    // Fast path for quoted field start
164                    if input[pos] == self.quote {
165                        self.state = Quoted;
166                        pos += 1;
167                        continue;
168                    }
169
170                    // Here we are moving to next quote or end of line
171                    let mut last_offset: usize = 0;
172
173                    for offset in self.searcher.search(&input[pos..]) {
174                        last_offset = offset + 1;
175
176                        let byte = input[pos + offset];
177
178                        if byte == self.delimiter {
179                            seps.push(seps_offset + pos + offset);
180                            continue;
181                        }
182
183                        if byte == b'\n' {
184                            self.record_was_read = true;
185                            return (ReadResult::Record, pos + last_offset);
186                        }
187
188                        // Here, `byte` is guaranteed to be a quote
189                        self.state = Quoted;
190                        break;
191                    }
192
193                    if last_offset > 0 {
194                        pos += last_offset;
195                    } else {
196                        break;
197                    }
198                }
199                Quoted => {
200                    // Here we moving to next quote
201                    if let Some(offset) = memchr(self.quote, &input[pos..]) {
202                        pos += offset + 1;
203                        self.state = Quote;
204                    } else {
205                        break;
206                    }
207                }
208                Quote => {
209                    let byte = input[pos];
210
211                    pos += 1;
212
213                    if byte == self.quote {
214                        self.state = Quoted;
215                    } else if byte == self.delimiter {
216                        seps.push(seps_offset + pos - 1);
217                        self.state = Unquoted;
218                    } else if byte == b'\n' {
219                        self.record_was_read = true;
220                        self.state = Unquoted;
221                        return (ReadResult::Record, pos);
222                    } else {
223                        self.state = Unquoted;
224                    }
225                }
226            }
227        }
228
229        (ReadResult::InputEmpty, input.len())
230    }
231
232    fn read_record(
233        &mut self,
234        input: &[u8],
235        record_builder: &mut ByteRecordBuilder,
236    ) -> (ReadResult, usize) {
237        use ReadState::*;
238
239        if input.is_empty() {
240            if !self.record_was_read {
241                self.record_was_read = true;
242
243                // NOTE: this is required to handle streams not ending with a newline
244                record_builder.finalize_field();
245                return (ReadResult::Record, 0);
246            }
247
248            return (ReadResult::End, 0);
249        }
250
251        if self.record_was_read {
252            if input[0] == b'\n' {
253                return (ReadResult::Lf, 1);
254            } else if input[0] == b'\r' {
255                return (ReadResult::Cr, 1);
256            }
257        }
258
259        self.record_was_read = false;
260
261        let mut pos: usize = 0;
262
263        while pos < input.len() {
264            match self.state {
265                Unquoted => {
266                    // Fast path for quoted field start
267                    if input[pos] == self.quote {
268                        self.state = Quoted;
269                        pos += 1;
270                        continue;
271                    }
272
273                    // Here we are moving to next quote or end of line
274                    let mut last_offset: usize = 0;
275
276                    for offset in self.searcher.search(&input[pos..]) {
277                        last_offset = offset + 1;
278
279                        let byte = input[pos + offset];
280
281                        // NOTE: we don't copy here yet to avoid slowing down
282                        // because of multiple tiny copies.
283                        if byte == self.delimiter {
284                            record_builder.finalize_field_preemptively(offset);
285                            continue;
286                        }
287
288                        if byte == b'\n' {
289                            record_builder
290                                .extend_from_slice(trim_trailing_cr(&input[pos..pos + offset]));
291                            record_builder.finalize_field();
292                            self.record_was_read = true;
293                            return (ReadResult::Record, pos + last_offset);
294                        }
295
296                        // Here, `byte` is guaranteed to be a quote
297                        self.state = Quoted;
298                        record_builder.bump();
299                        break;
300                    }
301
302                    if last_offset > 0 {
303                        record_builder.extend_from_slice(&input[pos..pos + last_offset]);
304                        pos += last_offset
305                    } else {
306                        break;
307                    }
308                }
309                Quoted => {
310                    // Here we moving to next quote
311                    if let Some(offset) = memchr(self.quote, &input[pos..]) {
312                        record_builder.extend_from_slice(&input[pos..pos + offset]);
313                        pos += offset + 1;
314                        self.state = Quote;
315                    } else {
316                        break;
317                    }
318                }
319                Quote => {
320                    let byte = input[pos];
321
322                    pos += 1;
323
324                    if byte == self.quote {
325                        self.state = Quoted;
326                        record_builder.push_byte(byte);
327                    } else if byte == self.delimiter {
328                        record_builder.finalize_field();
329                        self.state = Unquoted;
330                    } else if byte == b'\n' {
331                        self.record_was_read = true;
332                        self.state = Unquoted;
333                        record_builder.finalize_field();
334                        return (ReadResult::Record, pos);
335                    } else {
336                        self.state = Unquoted;
337                    }
338                }
339            }
340        }
341
342        record_builder.extend_from_slice(&input[pos..]);
343
344        (ReadResult::InputEmpty, input.len())
345    }
346
347    // NOTE: this version of the method wraps the state machine logic within the
348    // SIMD iteration logic. Ironically it seems slower than the multiple-speed
349    // stop-and-go implementation above.
350    // Be advised that this code does not handle final \r correctly yet.
351    // fn read_record(&mut self, input: &[u8], record: &mut ByteRecord) -> (ReadResult, usize) {
352    //     use ReadState::*;
353
354    //     if input.is_empty() {
355    //         if !self.record_was_read {
356    //             self.record_was_read = true;
357    //             record.finalize_field();
358    //             return (ReadResult::Record, 0);
359    //         }
360
361    //         return (ReadResult::End, 0);
362    //     }
363
364    //     if self.record_was_read {
365    //         if input[0] == b'\n' {
366    //             return (ReadResult::Lf, 1);
367    //         } else if input[0] == b'\r' {
368    //             return (ReadResult::Cr, 1);
369    //         }
370    //     }
371
372    //     self.record_was_read = false;
373
374    //     let mut last_offset: Option<usize> = None;
375    //     let mut start: usize;
376
377    //     for offset in self.searcher.search(input) {
378    //         let byte = input[offset];
379
380    //         if let Quote = self.state {
381    //             if byte == self.quote {
382    //                 let was_previously_a_quote = match last_offset {
383    //                     None => true,
384    //                     Some(o) => o == offset - 1,
385    //                 };
386
387    //                 if was_previously_a_quote {
388    //                     self.state = Quoted;
389    //                     continue;
390    //                 } else {
391    //                     self.state = Unquoted;
392    //                 }
393    //             } else {
394    //                 self.state = Unquoted;
395    //             }
396    //         }
397
398    //         start = last_offset.map(|o| o + 1).unwrap_or(0);
399    //         last_offset = Some(offset);
400
401    //         match self.state {
402    //             Unquoted => {
403    //                 record.extend_from_slice(&input[start..offset]);
404
405    //                 last_offset = Some(offset);
406
407    //                 if byte == self.delimiter {
408    //                     record.finalize_field();
409    //                     continue;
410    //                 }
411
412    //                 if byte == b'\n' {
413    //                     record.finalize_field();
414    //                     self.record_was_read = true;
415    //                     return (ReadResult::Record, offset + 1);
416    //                 }
417
418    //                 // Here, `byte` is guaranteed to be a quote
419    //                 self.state = Quoted;
420    //             }
421    //             Quoted => {
422    //                 record.extend_from_slice(&input[start..offset]);
423
424    //                 if byte != self.quote {
425    //                     record.push_byte(byte);
426    //                     continue;
427    //                 }
428
429    //                 self.state = Quote;
430    //             }
431    //             _ => unreachable!(),
432    //         }
433    //     }
434
435    //     start = last_offset.map(|o| o + 1).unwrap_or(0);
436    //     record.extend_from_slice(&input[start..]);
437
438    //     (ReadResult::InputEmpty, input.len())
439    // }
440}
441
442pub struct BufferedReader<R> {
443    buffer: BufReader<R>,
444    scratch: Vec<u8>,
445    seps: Vec<usize>,
446    actual_buffer_position: Option<usize>,
447    inner: Reader,
448    field_count: Option<usize>,
449}
450
451impl<R: Read> BufferedReader<R> {
452    pub fn new(reader: R, delimiter: u8, quote: u8) -> Self {
453        Self {
454            buffer: BufReader::new(reader),
455            scratch: Vec::new(),
456            seps: Vec::new(),
457            actual_buffer_position: None,
458            inner: Reader::new(delimiter, quote),
459            field_count: None,
460        }
461    }
462
463    pub fn with_capacity(capacity: usize, reader: R, delimiter: u8, quote: u8) -> Self {
464        Self {
465            buffer: BufReader::with_capacity(capacity, reader),
466            scratch: Vec::new(),
467            seps: Vec::new(),
468            actual_buffer_position: None,
469            inner: Reader::new(delimiter, quote),
470            field_count: None,
471        }
472    }
473
474    #[inline]
475    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
476        match self.field_count {
477            Some(expected) => {
478                if written != expected {
479                    return Err(Error::unequal_lengths(expected, written));
480                }
481            }
482            None => {
483                self.field_count = Some(written);
484            }
485        }
486
487        Ok(())
488    }
489
490    pub fn strip_bom(&mut self) -> error::Result<()> {
491        let input = self.buffer.fill_buf()?;
492
493        if input.len() >= 3 && &input[..3] == b"\xef\xbb\xbf" {
494            self.buffer.consume(3);
495        }
496
497        Ok(())
498    }
499
500    pub fn first_byte_record(&mut self, consume: bool) -> error::Result<ByteRecord> {
501        use ReadResult::*;
502
503        let mut record = ByteRecord::new();
504        let mut record_builder = ByteRecordBuilder::wrap(&mut record);
505
506        let input = self.buffer.fill_buf()?;
507
508        let (result, pos) = self.inner.read_record(input, &mut record_builder);
509
510        match result {
511            End => Ok(ByteRecord::new()),
512
513            // TODO: we could expand the capacity of the buffer automagically here
514            // if this becomes an issue.
515            Cr | Lf | ReadResult::InputEmpty => Err(Error::invalid_headers()),
516            Record => {
517                if consume {
518                    self.buffer.consume(pos);
519                }
520
521                Ok(record)
522            }
523        }
524    }
525
526    pub fn count_records(&mut self) -> error::Result<u64> {
527        use ReadResult::*;
528
529        let mut count: u64 = 0;
530
531        loop {
532            let input = self.buffer.fill_buf()?;
533
534            let (result, pos) = self.inner.split_record(input);
535
536            self.buffer.consume(pos);
537
538            match result {
539                End => break,
540                InputEmpty | Cr | Lf => continue,
541                Record => {
542                    count += 1;
543                }
544            };
545        }
546
547        Ok(count)
548    }
549
550    pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
551        use ReadResult::*;
552
553        self.scratch.clear();
554
555        if let Some(last_pos) = self.actual_buffer_position.take() {
556            self.buffer.consume(last_pos);
557        }
558
559        loop {
560            let input = self.buffer.fill_buf()?;
561
562            let (result, pos) = self.inner.split_record(input);
563
564            match result {
565                End => {
566                    self.buffer.consume(pos);
567                    return Ok(None);
568                }
569                Cr | Lf => {
570                    self.buffer.consume(pos);
571                }
572                InputEmpty => {
573                    self.scratch.extend_from_slice(input);
574                    self.buffer.consume(pos);
575                }
576                Record => {
577                    if self.scratch.is_empty() {
578                        self.actual_buffer_position = Some(pos);
579                        return Ok(Some(&self.buffer.buffer()[..pos]));
580                    } else {
581                        self.scratch.extend_from_slice(&input[..pos]);
582                        self.buffer.consume(pos);
583
584                        return Ok(Some(&self.scratch));
585                    }
586                }
587            };
588        }
589    }
590
591    pub fn read_zero_copy_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
592        use ReadResult::*;
593
594        self.scratch.clear();
595        self.seps.clear();
596
597        if let Some(last_pos) = self.actual_buffer_position.take() {
598            self.buffer.consume(last_pos);
599        }
600
601        loop {
602            let input = self.buffer.fill_buf()?;
603
604            let (result, pos) = self.inner.split_record_and_find_separators(
605                input,
606                self.scratch.len(),
607                &mut self.seps,
608            );
609
610            match result {
611                End => {
612                    self.buffer.consume(pos);
613                    return Ok(None);
614                }
615                Cr | Lf => {
616                    self.buffer.consume(pos);
617                }
618                InputEmpty => {
619                    self.scratch.extend_from_slice(input);
620                    self.buffer.consume(pos);
621                }
622                Record => {
623                    if self.scratch.is_empty() {
624                        self.check_field_count(self.seps.len() + 1)?;
625                        self.actual_buffer_position = Some(pos);
626                        return Ok(Some(ZeroCopyByteRecord::new(
627                            &self.buffer.buffer()[..pos],
628                            &self.seps,
629                        )));
630                    } else {
631                        self.scratch.extend_from_slice(&input[..pos]);
632                        self.buffer.consume(pos);
633                        self.check_field_count(self.seps.len() + 1)?;
634                        return Ok(Some(ZeroCopyByteRecord::new(&self.scratch, &self.seps)));
635                    }
636                }
637            };
638        }
639    }
640
641    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
642        use ReadResult::*;
643
644        record.clear();
645
646        let mut record_builder = ByteRecordBuilder::wrap(record);
647
648        if let Some(last_pos) = self.actual_buffer_position.take() {
649            self.buffer.consume(last_pos);
650        }
651
652        loop {
653            let input = self.buffer.fill_buf()?;
654
655            let (result, pos) = self.inner.read_record(input, &mut record_builder);
656
657            self.buffer.consume(pos);
658
659            match result {
660                End => {
661                    return Ok(false);
662                }
663                Cr | Lf | InputEmpty => {
664                    continue;
665                }
666                Record => {
667                    self.check_field_count(record.len())?;
668                    return Ok(true);
669                }
670            };
671        }
672    }
673
674    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
675        ByteRecordsIter {
676            reader: self,
677            record: ByteRecord::new(),
678        }
679    }
680
681    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
682        ByteRecordsIntoIter {
683            reader: self,
684            record: ByteRecord::new(),
685        }
686    }
687}
688
689pub struct ByteRecordsIter<'r, R> {
690    reader: &'r mut BufferedReader<R>,
691    record: ByteRecord,
692}
693
694impl<'r, R: Read> Iterator for ByteRecordsIter<'r, R> {
695    type Item = error::Result<ByteRecord>;
696
697    fn next(&mut self) -> Option<Self::Item> {
698        // NOTE: cloning the record will not carry over excess capacity
699        // because the record only contains `Vec` currently.
700        match self.reader.read_byte_record(&mut self.record) {
701            Err(err) => Some(Err(err)),
702            Ok(true) => Some(Ok(self.record.clone())),
703            Ok(false) => None,
704        }
705    }
706}
707
708pub struct ByteRecordsIntoIter<R> {
709    reader: BufferedReader<R>,
710    record: ByteRecord,
711}
712
713impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
714    type Item = error::Result<ByteRecord>;
715
716    fn next(&mut self) -> Option<Self::Item> {
717        // NOTE: cloning the record will not carry over excess capacity
718        // because the record only contains `Vec` currently.
719        match self.reader.read_byte_record(&mut self.record) {
720            Err(err) => Some(Err(err)),
721            Ok(true) => Some(Ok(self.record.clone())),
722            Ok(false) => None,
723        }
724    }
725}
726
727// NOTE: a reader to be used when the whole data fits into memory or when using
728// memory maps.
729pub struct TotalReader<'b> {
730    inner: Reader,
731    bytes: &'b [u8],
732    pos: usize,
733}
734
735impl<'b> TotalReader<'b> {
736    pub fn new(delimiter: u8, quote: u8, bytes: &'b [u8]) -> Self {
737        Self {
738            inner: Reader::new(delimiter, quote),
739            bytes,
740            pos: 0,
741        }
742    }
743
744    pub fn count_records(&mut self) -> u64 {
745        use ReadResult::*;
746
747        let mut count: u64 = 0;
748
749        loop {
750            let (result, pos) = self.inner.split_record(&self.bytes[self.pos..]);
751
752            self.pos += pos;
753
754            match result {
755                End => break,
756                InputEmpty | Cr | Lf => continue,
757                Record => {
758                    count += 1;
759                }
760            };
761        }
762
763        count
764    }
765
766    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
767        use ReadResult::*;
768
769        record.clear();
770
771        let mut record_builder = ByteRecordBuilder::wrap(record);
772
773        loop {
774            let (result, pos) = self
775                .inner
776                .read_record(&self.bytes[self.pos..], &mut record_builder);
777
778            self.pos += pos;
779
780            match result {
781                End => {
782                    return Ok(false);
783                }
784                Cr | Lf | InputEmpty => {
785                    continue;
786                }
787                Record => {
788                    return Ok(true);
789                }
790            };
791        }
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use std::io::Cursor;
798
799    use crate::brec;
800
801    use super::*;
802
803    fn count_records(data: &str, capacity: usize) -> u64 {
804        let mut splitter = BufferedReader::with_capacity(capacity, Cursor::new(data), b',', b'"');
805        splitter.count_records().unwrap()
806    }
807
808    #[test]
809    fn test_count() {
810        // Empty
811        assert_eq!(count_records("", 1024), 0);
812
813        // Single cells with various empty lines
814        let tests = vec![
815            "name\njohn\nlucy",
816            "name\njohn\nlucy\n",
817            "name\n\njohn\r\nlucy\n",
818            "name\n\njohn\r\nlucy\n\n",
819            "name\n\n\njohn\r\n\r\nlucy\n\n\n",
820            "\nname\njohn\nlucy",
821            "\n\nname\njohn\nlucy",
822            "\r\n\r\nname\njohn\nlucy",
823            "name\njohn\nlucy\r\n",
824            "name\njohn\nlucy\r\n\r\n",
825        ];
826
827        for capacity in [32usize, 4, 3, 2, 1] {
828            for test in tests.iter() {
829                assert_eq!(
830                    count_records(test, capacity),
831                    3,
832                    "capacity={} string={:?}",
833                    capacity,
834                    test
835                );
836            }
837        }
838
839        // Multiple cells
840        let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
841        assert_eq!(count_records(data, 1024), 3);
842
843        // Quoting
844        for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
845            let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
846
847            assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
848        }
849
850        // Different separator
851        let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
852        assert_eq!(count_records(data, 1024), 3);
853    }
854
855    #[test]
856    fn test_read_zero_copy_byte_record() -> error::Result<()> {
857        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
858
859        let mut reader = BufferedReader::with_capacity(32, Cursor::new(csv), b',', b'"');
860        let mut records = Vec::new();
861
862        let expected = vec![
863            vec!["name", "surname", "age"],
864            vec![
865                "\"john\"",
866                "\"landy, the \"\"everlasting\"\" bastard\"",
867                "45",
868            ],
869            vec!["lucy", "rose", "\"67\""],
870            vec!["jermaine", "jackson", "\"89\""],
871            vec!["karine", "loucan", "\"52\""],
872            vec!["rose", "\"glib\"", "12"],
873            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
874        ]
875        .into_iter()
876        .map(|record| {
877            record
878                .into_iter()
879                .map(|cell| cell.as_bytes().to_vec())
880                .collect::<Vec<_>>()
881        })
882        .collect::<Vec<_>>();
883
884        while let Some(record) = reader.read_zero_copy_byte_record()? {
885            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
886        }
887
888        assert_eq!(records, expected);
889
890        Ok(())
891    }
892
893    #[test]
894    fn test_read_byte_record() -> error::Result<()> {
895        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
896
897        let expected = vec![
898            brec!["name", "surname", "age"],
899            brec!["john", "landy, the \"everlasting\" bastard", "45"],
900            brec!["\"ok\"", "whatever", "dude"],
901            brec!["lucy", "rose", "67"],
902            brec!["jermaine", "jackson", "89"],
903            brec!["karine", "loucan", "52"],
904            brec!["rose", "glib", "12"],
905            brec!["guillaume", "plique", "42"],
906        ];
907
908        for capacity in [32usize, 4, 3, 2, 1] {
909            let mut reader = BufferedReader::with_capacity(capacity, Cursor::new(csv), b',', b'"');
910
911            assert_eq!(
912                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
913                expected
914            );
915        }
916
917        Ok(())
918    }
919
920    #[test]
921    fn test_strip_bom() -> error::Result<()> {
922        let mut reader = BufferedReader::new(Cursor::new("name,surname,age"), b',', b'"');
923        reader.strip_bom()?;
924
925        assert_eq!(
926            reader.byte_records().next().unwrap()?,
927            brec!["name", "surname", "age"]
928        );
929
930        let mut reader =
931            BufferedReader::new(Cursor::new(b"\xef\xbb\xbfname,surname,age"), b',', b'"');
932        reader.strip_bom()?;
933
934        assert_eq!(
935            reader.byte_records().next().unwrap()?,
936            brec!["name", "surname", "age"]
937        );
938
939        Ok(())
940    }
941
942    #[test]
943    fn test_empty_row() -> error::Result<()> {
944        let data = "name\n\"\"\nlucy\n\"\"";
945
946        // Counting
947        let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
948
949        assert_eq!(reader.count_records()?, 4);
950
951        // Zero-copy
952        let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
953
954        let expected = vec![
955            vec!["name".as_bytes().to_vec()],
956            vec!["\"\"".as_bytes().to_vec()],
957            vec!["lucy".as_bytes().to_vec()],
958            vec!["\"\"".as_bytes().to_vec()],
959        ];
960
961        // Read
962        let mut records = Vec::new();
963
964        while let Some(record) = reader.read_zero_copy_byte_record()? {
965            records.push(vec![record.as_slice().to_vec()]);
966        }
967
968        assert_eq!(records, expected);
969
970        let reader = BufferedReader::new(Cursor::new(data), b',', b'"');
971
972        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
973
974        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
975
976        assert_eq!(records, expected);
977
978        Ok(())
979    }
980
981    #[test]
982    fn test_crlf() -> error::Result<()> {
983        let reader = BufferedReader::new(
984            Cursor::new("name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n"),
985            b',',
986            b'"',
987        );
988
989        let expected = vec![
990            brec!["name", "surname"],
991            brec!["lucy", "john"],
992            brec!["evan", "zhong"],
993            brec!["béatrice", "glougou"],
994        ];
995
996        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
997
998        assert_eq!(records, expected);
999
1000        Ok(())
1001    }
1002
1003    #[test]
1004    fn test_quote_always() -> error::Result<()> {
1005        let reader = BufferedReader::new(
1006            Cursor::new("\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\""),
1007            b',',
1008            b'"',
1009        );
1010
1011        let expected = vec![
1012            brec!["name", "surname"],
1013            brec!["lucy", "rose"],
1014            brec!["john", "mayhew"],
1015        ];
1016
1017        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
1018
1019        assert_eq!(records, expected);
1020
1021        Ok(())
1022    }
1023}