simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read};
2
3use crate::core::{CoreReader, ReadResult};
4use crate::error::{self, Error};
5use crate::ext::StripBom;
6use crate::records::{ByteRecord, ByteRecordBuilder};
7
8pub struct ReaderBuilder {
9    delimiter: u8,
10    quote: u8,
11    buffer_capacity: Option<usize>,
12}
13
14impl Default for ReaderBuilder {
15    fn default() -> Self {
16        Self {
17            delimiter: b',',
18            quote: b'"',
19            buffer_capacity: None,
20        }
21    }
22}
23
24impl ReaderBuilder {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn with_capacity(capacity: usize) -> Self {
30        let mut reader = Self::default();
31        reader.buffer_capacity(capacity);
32        reader
33    }
34
35    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
36        self.delimiter = delimiter;
37        self
38    }
39
40    pub fn quote(&mut self, quote: u8) -> &mut Self {
41        self.quote = quote;
42        self
43    }
44
45    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
46        self.buffer_capacity = Some(capacity);
47        self
48    }
49
50    fn bufreader<R: Read>(&self, reader: R) -> BufReader<R> {
51        match self.buffer_capacity {
52            None => BufReader::new(reader),
53            Some(capacity) => BufReader::with_capacity(capacity, reader),
54        }
55    }
56
57    pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
58        Reader {
59            buffer: self.bufreader(reader),
60            inner: CoreReader::new(self.delimiter, self.quote),
61            field_count: None,
62        }
63    }
64}
65
66pub struct Reader<R> {
67    buffer: BufReader<R>,
68    inner: CoreReader,
69    field_count: Option<usize>,
70}
71
72impl<R: Read> Reader<R> {
73    pub fn from_reader(reader: R) -> Self {
74        ReaderBuilder::new().from_reader(reader)
75    }
76
77    #[inline]
78    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
79        match self.field_count {
80            Some(expected) => {
81                if written != expected {
82                    return Err(Error::unequal_lengths(expected, written));
83                }
84            }
85            None => {
86                self.field_count = Some(written);
87            }
88        }
89
90        Ok(())
91    }
92
93    pub fn strip_bom(&mut self) -> error::Result<()> {
94        self.buffer.strip_bom()?;
95        Ok(())
96    }
97
98    pub fn first_byte_record(&mut self, consume: bool) -> error::Result<ByteRecord> {
99        use ReadResult::*;
100
101        let mut record = ByteRecord::new();
102        let mut record_builder = ByteRecordBuilder::wrap(&mut record);
103
104        let input = self.buffer.fill_buf()?;
105
106        let (result, pos) = self.inner.read_record(input, &mut record_builder);
107
108        match result {
109            End => Ok(ByteRecord::new()),
110
111            // TODO: we could expand the capacity of the buffer automagically here
112            // if this becomes an issue.
113            Cr | Lf | ReadResult::InputEmpty => Err(Error::invalid_headers()),
114            Record => {
115                if consume {
116                    self.buffer.consume(pos);
117                }
118
119                Ok(record)
120            }
121        }
122    }
123
124    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
125        use ReadResult::*;
126
127        record.clear();
128
129        let mut record_builder = ByteRecordBuilder::wrap(record);
130
131        loop {
132            let input = self.buffer.fill_buf()?;
133
134            let (result, pos) = self.inner.read_record(input, &mut record_builder);
135
136            self.buffer.consume(pos);
137
138            match result {
139                End => {
140                    return Ok(false);
141                }
142                Cr | Lf | InputEmpty => {
143                    continue;
144                }
145                Record => {
146                    self.check_field_count(record.len())?;
147                    return Ok(true);
148                }
149            };
150        }
151    }
152
153    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
154        ByteRecordsIter {
155            reader: self,
156            record: ByteRecord::new(),
157        }
158    }
159
160    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
161        ByteRecordsIntoIter {
162            reader: self,
163            record: ByteRecord::new(),
164        }
165    }
166}
167
168pub struct ByteRecordsIter<'r, R> {
169    reader: &'r mut Reader<R>,
170    record: ByteRecord,
171}
172
173impl<'r, R: Read> Iterator for ByteRecordsIter<'r, R> {
174    type Item = error::Result<ByteRecord>;
175
176    fn next(&mut self) -> Option<Self::Item> {
177        // NOTE: cloning the record will not carry over excess capacity
178        // because the record only contains `Vec` currently.
179        match self.reader.read_byte_record(&mut self.record) {
180            Err(err) => Some(Err(err)),
181            Ok(true) => Some(Ok(self.record.clone())),
182            Ok(false) => None,
183        }
184    }
185}
186
187pub struct ByteRecordsIntoIter<R> {
188    reader: Reader<R>,
189    record: ByteRecord,
190}
191
192impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
193    type Item = error::Result<ByteRecord>;
194
195    fn next(&mut self) -> Option<Self::Item> {
196        // NOTE: cloning the record will not carry over excess capacity
197        // because the record only contains `Vec` currently.
198        match self.reader.read_byte_record(&mut self.record) {
199            Err(err) => Some(Err(err)),
200            Ok(true) => Some(Ok(self.record.clone())),
201            Ok(false) => None,
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::io::Cursor;
209
210    use crate::brec;
211
212    use super::*;
213
214    #[test]
215    fn test_read_byte_record() -> error::Result<()> {
216        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
217
218        let expected = vec![
219            brec!["name", "surname", "age"],
220            brec!["john", "landy, the \"everlasting\" bastard", "45"],
221            brec!["\"ok\"", "whatever", "dude"],
222            brec!["lucy", "rose", "67"],
223            brec!["jermaine", "jackson", "89"],
224            brec!["karine", "loucan", "52"],
225            brec!["rose", "glib", "12"],
226            brec!["guillaume", "plique", "42"],
227        ];
228
229        for capacity in [32usize, 4, 3, 2, 1] {
230            let mut reader = ReaderBuilder::with_capacity(capacity).from_reader(Cursor::new(csv));
231
232            assert_eq!(
233                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
234                expected
235            );
236        }
237
238        Ok(())
239    }
240
241    #[test]
242    fn test_strip_bom() -> error::Result<()> {
243        let mut reader = Reader::from_reader(Cursor::new("name,surname,age"));
244        reader.strip_bom()?;
245
246        assert_eq!(
247            reader.byte_records().next().unwrap()?,
248            brec!["name", "surname", "age"]
249        );
250
251        let mut reader = Reader::from_reader(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
252        reader.strip_bom()?;
253
254        assert_eq!(
255            reader.byte_records().next().unwrap()?,
256            brec!["name", "surname", "age"]
257        );
258
259        Ok(())
260    }
261
262    #[test]
263    fn test_empty_row() -> error::Result<()> {
264        let data = "name\n\"\"\nlucy\n\"\"";
265
266        // Read
267        let reader = Reader::from_reader(Cursor::new(data));
268
269        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
270
271        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
272
273        assert_eq!(records, expected);
274
275        Ok(())
276    }
277
278    #[test]
279    fn test_crlf() -> error::Result<()> {
280        let reader = Reader::from_reader(Cursor::new(
281            "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
282        ));
283
284        let expected = vec![
285            brec!["name", "surname"],
286            brec!["lucy", "john"],
287            brec!["evan", "zhong"],
288            brec!["béatrice", "glougou"],
289        ];
290
291        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
292
293        assert_eq!(records, expected);
294
295        Ok(())
296    }
297
298    #[test]
299    fn test_quote_always() -> error::Result<()> {
300        let reader = Reader::from_reader(Cursor::new(
301            "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
302        ));
303
304        let expected = vec![
305            brec!["name", "surname"],
306            brec!["lucy", "rose"],
307            brec!["john", "mayhew"],
308        ];
309
310        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
311
312        assert_eq!(records, expected);
313
314        Ok(())
315    }
316}