simd_csv/
reader.rs

1use std::io::{BufRead, BufReader, Read};
2
3use crate::core::{CoreReader, ReadResult};
4use crate::error::{self, Error};
5use crate::ext::StripBom;
6use crate::records::{ByteRecord, ByteRecordBuilder};
7
8pub struct ReaderBuilder {
9    delimiter: u8,
10    quote: u8,
11    buffer_capacity: Option<usize>,
12    flexible: bool,
13}
14
15impl Default for ReaderBuilder {
16    fn default() -> Self {
17        Self {
18            delimiter: b',',
19            quote: b'"',
20            buffer_capacity: None,
21            flexible: false,
22        }
23    }
24}
25
26impl ReaderBuilder {
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    pub fn with_capacity(capacity: usize) -> Self {
32        let mut reader = Self::default();
33        reader.buffer_capacity(capacity);
34        reader
35    }
36
37    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
38        self.delimiter = delimiter;
39        self
40    }
41
42    pub fn quote(&mut self, quote: u8) -> &mut Self {
43        self.quote = quote;
44        self
45    }
46
47    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
48        self.buffer_capacity = Some(capacity);
49        self
50    }
51
52    pub fn flexible(&mut self, yes: bool) -> &mut Self {
53        self.flexible = yes;
54        self
55    }
56
57    fn bufreader<R: Read>(&self, reader: R) -> BufReader<R> {
58        match self.buffer_capacity {
59            None => BufReader::new(reader),
60            Some(capacity) => BufReader::with_capacity(capacity, reader),
61        }
62    }
63
64    pub fn from_reader<R: Read>(&self, reader: R) -> Reader<R> {
65        Reader {
66            buffer: self.bufreader(reader),
67            inner: CoreReader::new(self.delimiter, self.quote),
68            field_count: None,
69            flexible: false,
70        }
71    }
72}
73
74pub struct Reader<R> {
75    buffer: BufReader<R>,
76    inner: CoreReader,
77    field_count: Option<usize>,
78    flexible: bool,
79}
80
81impl<R: Read> Reader<R> {
82    pub fn from_reader(reader: R) -> Self {
83        ReaderBuilder::new().from_reader(reader)
84    }
85
86    #[inline]
87    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
88        if self.flexible {
89            return Ok(());
90        }
91
92        match self.field_count {
93            Some(expected) => {
94                if written != expected {
95                    return Err(Error::unequal_lengths(expected, written));
96                }
97            }
98            None => {
99                self.field_count = Some(written);
100            }
101        }
102
103        Ok(())
104    }
105
106    pub fn strip_bom(&mut self) -> error::Result<()> {
107        self.buffer.strip_bom()?;
108        Ok(())
109    }
110
111    pub fn peek_byte_record(&mut self, consume: bool) -> error::Result<ByteRecord> {
112        use ReadResult::*;
113
114        let mut record = ByteRecord::new();
115        let mut record_builder = ByteRecordBuilder::wrap(&mut record);
116
117        let input = self.buffer.fill_buf()?;
118
119        let (result, pos) = self.inner.read_record(input, &mut record_builder);
120
121        match result {
122            End => Ok(ByteRecord::new()),
123
124            // TODO: we could expand the capacity of the buffer automagically here
125            // if this becomes an issue.
126            Cr | Lf | ReadResult::InputEmpty => Err(Error::invalid_headers()),
127            Record => {
128                if consume {
129                    self.buffer.consume(pos);
130                }
131
132                Ok(record)
133            }
134        }
135    }
136
137    pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
138        use ReadResult::*;
139
140        record.clear();
141
142        let mut record_builder = ByteRecordBuilder::wrap(record);
143
144        loop {
145            let input = self.buffer.fill_buf()?;
146
147            let (result, pos) = self.inner.read_record(input, &mut record_builder);
148
149            self.buffer.consume(pos);
150
151            match result {
152                End => {
153                    return Ok(false);
154                }
155                Cr | Lf | InputEmpty => {
156                    continue;
157                }
158                Record => {
159                    self.check_field_count(record.len())?;
160                    return Ok(true);
161                }
162            };
163        }
164    }
165
166    pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
167        ByteRecordsIter {
168            reader: self,
169            record: ByteRecord::new(),
170        }
171    }
172
173    pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
174        ByteRecordsIntoIter {
175            reader: self,
176            record: ByteRecord::new(),
177        }
178    }
179}
180
181pub struct ByteRecordsIter<'r, R> {
182    reader: &'r mut Reader<R>,
183    record: ByteRecord,
184}
185
186impl<'r, R: Read> Iterator for ByteRecordsIter<'r, R> {
187    type Item = error::Result<ByteRecord>;
188
189    fn next(&mut self) -> Option<Self::Item> {
190        // NOTE: cloning the record will not carry over excess capacity
191        // because the record only contains `Vec` currently.
192        match self.reader.read_byte_record(&mut self.record) {
193            Err(err) => Some(Err(err)),
194            Ok(true) => Some(Ok(self.record.clone())),
195            Ok(false) => None,
196        }
197    }
198}
199
200pub struct ByteRecordsIntoIter<R> {
201    reader: Reader<R>,
202    record: ByteRecord,
203}
204
205impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
206    type Item = error::Result<ByteRecord>;
207
208    fn next(&mut self) -> Option<Self::Item> {
209        // NOTE: cloning the record will not carry over excess capacity
210        // because the record only contains `Vec` currently.
211        match self.reader.read_byte_record(&mut self.record) {
212            Err(err) => Some(Err(err)),
213            Ok(true) => Some(Ok(self.record.clone())),
214            Ok(false) => None,
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::io::Cursor;
222
223    use crate::brec;
224
225    use super::*;
226
227    #[test]
228    fn test_read_byte_record() -> error::Result<()> {
229        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
230
231        let expected = vec![
232            brec!["name", "surname", "age"],
233            brec!["john", "landy, the \"everlasting\" bastard", "45"],
234            brec!["\"ok\"", "whatever", "dude"],
235            brec!["lucy", "rose", "67"],
236            brec!["jermaine", "jackson", "89"],
237            brec!["karine", "loucan", "52"],
238            brec!["rose", "glib", "12"],
239            brec!["guillaume", "plique", "42"],
240        ];
241
242        for capacity in [32usize, 4, 3, 2, 1] {
243            let mut reader = ReaderBuilder::with_capacity(capacity).from_reader(Cursor::new(csv));
244
245            assert_eq!(
246                reader.byte_records().collect::<Result<Vec<_>, _>>()?,
247                expected
248            );
249        }
250
251        Ok(())
252    }
253
254    #[test]
255    fn test_strip_bom() -> error::Result<()> {
256        let mut reader = Reader::from_reader(Cursor::new("name,surname,age"));
257        reader.strip_bom()?;
258
259        assert_eq!(
260            reader.byte_records().next().unwrap()?,
261            brec!["name", "surname", "age"]
262        );
263
264        let mut reader = Reader::from_reader(Cursor::new(b"\xef\xbb\xbfname,surname,age"));
265        reader.strip_bom()?;
266
267        assert_eq!(
268            reader.byte_records().next().unwrap()?,
269            brec!["name", "surname", "age"]
270        );
271
272        Ok(())
273    }
274
275    #[test]
276    fn test_empty_row() -> error::Result<()> {
277        let data = "name\n\"\"\nlucy\n\"\"";
278
279        // Read
280        let reader = Reader::from_reader(Cursor::new(data));
281
282        let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
283
284        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
285
286        assert_eq!(records, expected);
287
288        Ok(())
289    }
290
291    #[test]
292    fn test_crlf() -> error::Result<()> {
293        let reader = Reader::from_reader(Cursor::new(
294            "name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n",
295        ));
296
297        let expected = vec![
298            brec!["name", "surname"],
299            brec!["lucy", "john"],
300            brec!["evan", "zhong"],
301            brec!["béatrice", "glougou"],
302        ];
303
304        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
305
306        assert_eq!(records, expected);
307
308        Ok(())
309    }
310
311    #[test]
312    fn test_quote_always() -> error::Result<()> {
313        let reader = Reader::from_reader(Cursor::new(
314            "\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\"",
315        ));
316
317        let expected = vec![
318            brec!["name", "surname"],
319            brec!["lucy", "rose"],
320            brec!["john", "mayhew"],
321        ];
322
323        let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
324
325        assert_eq!(records, expected);
326
327        Ok(())
328    }
329}