simd_csv/
zero_copy_reader.rs

1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error};
6use crate::records::ZeroCopyByteRecord;
7
8pub struct ZeroCopyReaderBuilder {
9    delimiter: u8,
10    quote: u8,
11    buffer_capacity: Option<usize>,
12}
13
14impl Default for ZeroCopyReaderBuilder {
15    fn default() -> Self {
16        Self {
17            delimiter: b',',
18            quote: b'"',
19            buffer_capacity: None,
20        }
21    }
22}
23
24impl ZeroCopyReaderBuilder {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn with_capacity(capacity: usize) -> Self {
30        let mut reader = Self::default();
31        reader.buffer_capacity(capacity);
32        reader
33    }
34
35    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
36        self.delimiter = delimiter;
37        self
38    }
39
40    pub fn quote(&mut self, quote: u8) -> &mut Self {
41        self.quote = quote;
42        self
43    }
44
45    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
46        self.buffer_capacity = Some(capacity);
47        self
48    }
49
50    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
51        ZeroCopyReader {
52            buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
53            inner: CoreReader::new(self.delimiter, self.quote),
54            field_count: None,
55            seps: Vec::new(),
56        }
57    }
58}
59
60pub struct ZeroCopyReader<R> {
61    buffer: ScratchBuffer<R>,
62    inner: CoreReader,
63    field_count: Option<usize>,
64    seps: Vec<usize>,
65}
66
67impl<R: Read> ZeroCopyReader<R> {
68    pub fn from_reader(reader: R) -> Self {
69        ZeroCopyReaderBuilder::new().from_reader(reader)
70    }
71
72    #[inline]
73    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
74        match self.field_count {
75            Some(expected) => {
76                if written != expected {
77                    return Err(Error::unequal_lengths(expected, written));
78                }
79            }
80            None => {
81                self.field_count = Some(written);
82            }
83        }
84
85        Ok(())
86    }
87
88    pub fn strip_bom(&mut self) -> error::Result<()> {
89        self.buffer.strip_bom()?;
90        Ok(())
91    }
92
93    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
94        use ReadResult::*;
95
96        self.buffer.reset();
97        self.seps.clear();
98
99        loop {
100            let seps_offset = self.buffer.saved().len();
101            let input = self.buffer.fill_buf()?;
102
103            let (result, pos) =
104                self.inner
105                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
106
107            match result {
108                End => {
109                    self.buffer.consume(pos);
110                    return Ok(None);
111                }
112                Cr | Lf => {
113                    self.buffer.consume(pos);
114                }
115                InputEmpty => {
116                    self.buffer.save();
117                }
118                Record => {
119                    self.check_field_count(self.seps.len() + 1)?;
120
121                    let record = ZeroCopyByteRecord::new(self.buffer.flush(pos), &self.seps);
122
123                    return Ok(Some(record));
124                }
125            };
126        }
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use std::io::Cursor;
133
134    use super::*;
135
136    #[test]
137    fn test_read_zero_copy_byte_record() -> error::Result<()> {
138        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
139
140        let mut reader = ZeroCopyReaderBuilder::with_capacity(32).from_reader(Cursor::new(csv));
141        let mut records = Vec::new();
142
143        let expected = vec![
144            vec!["name", "surname", "age"],
145            vec![
146                "\"john\"",
147                "\"landy, the \"\"everlasting\"\" bastard\"",
148                "45",
149            ],
150            vec!["lucy", "rose", "\"67\""],
151            vec!["jermaine", "jackson", "\"89\""],
152            vec!["karine", "loucan", "\"52\""],
153            vec!["rose", "\"glib\"", "12"],
154            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
155        ]
156        .into_iter()
157        .map(|record| {
158            record
159                .into_iter()
160                .map(|cell| cell.as_bytes().to_vec())
161                .collect::<Vec<_>>()
162        })
163        .collect::<Vec<_>>();
164
165        while let Some(record) = reader.read_byte_record()? {
166            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
167        }
168
169        assert_eq!(records, expected);
170
171        Ok(())
172    }
173
174    #[test]
175    fn test_empty_row() -> error::Result<()> {
176        let data = "name\n\"\"\nlucy\n\"\"";
177
178        // Zero-copy
179        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
180
181        let expected = vec![
182            vec!["name".as_bytes().to_vec()],
183            vec!["\"\"".as_bytes().to_vec()],
184            vec!["lucy".as_bytes().to_vec()],
185            vec!["\"\"".as_bytes().to_vec()],
186        ];
187
188        // Read
189        let mut records = Vec::new();
190
191        while let Some(record) = reader.read_byte_record()? {
192            records.push(vec![record.as_slice().to_vec()]);
193        }
194
195        assert_eq!(records, expected);
196
197        Ok(())
198    }
199}