1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::utils::trim_bom;
8
9pub struct ZeroCopyReaderBuilder {
10 delimiter: u8,
11 quote: u8,
12 buffer_capacity: Option<usize>,
13 flexible: bool,
14 has_headers: bool,
15}
16
17impl Default for ZeroCopyReaderBuilder {
18 fn default() -> Self {
19 Self {
20 delimiter: b',',
21 quote: b'"',
22 buffer_capacity: None,
23 flexible: false,
24 has_headers: true,
25 }
26 }
27}
28
29impl ZeroCopyReaderBuilder {
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn with_capacity(capacity: usize) -> Self {
35 let mut reader = Self::default();
36 reader.buffer_capacity(capacity);
37 reader
38 }
39
40 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
41 self.delimiter = delimiter;
42 self
43 }
44
45 pub fn quote(&mut self, quote: u8) -> &mut Self {
46 self.quote = quote;
47 self
48 }
49
50 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
51 self.buffer_capacity = Some(capacity);
52 self
53 }
54
55 pub fn flexible(&mut self, yes: bool) -> &mut Self {
56 self.flexible = yes;
57 self
58 }
59
60 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
61 self.has_headers = yes;
62 self
63 }
64
65 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
66 ZeroCopyReader {
67 buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
68 inner: CoreReader::new(self.delimiter, self.quote),
69 headers_seps: Vec::new(),
70 headers_slice: Vec::new(),
71 headers: ByteRecord::new(),
72 seps: Vec::new(),
73 flexible: self.flexible,
74 has_read: false,
75 must_reemit_headers: !self.has_headers,
76 }
77 }
78}
79
80pub struct ZeroCopyReader<R> {
81 buffer: ScratchBuffer<R>,
82 inner: CoreReader,
83 headers: ByteRecord,
84 headers_seps: Vec<usize>,
85 headers_slice: Vec<u8>,
86 seps: Vec<usize>,
87 flexible: bool,
88 has_read: bool,
89 must_reemit_headers: bool,
90}
91
92impl<R: Read> ZeroCopyReader<R> {
93 pub fn from_reader(reader: R) -> Self {
94 ZeroCopyReaderBuilder::new().from_reader(reader)
95 }
96
97 #[inline]
98 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
99 if self.flexible {
100 return Ok(());
101 }
102
103 let headers_len = self.headers_seps.len() + 1;
104
105 if self.has_read && written != headers_len {
106 return Err(Error::unequal_lengths(headers_len, written));
107 }
108
109 Ok(())
110 }
111
112 #[inline]
113 fn on_first_read(&mut self) -> error::Result<()> {
114 if self.has_read {
115 return Ok(());
116 }
117
118 let input = self.buffer.fill_buf()?;
120 let bom_len = trim_bom(input);
121 self.buffer.consume(bom_len);
122
123 let mut headers_seps = Vec::new();
125 let mut headers_slice = Vec::new();
126 let mut byte_headers = ByteRecord::new();
127
128 if let Some(headers) = self.read_byte_record_impl()? {
129 (headers_seps, headers_slice) = headers.to_parts();
130 byte_headers = headers.to_byte_record();
131 } else {
132 self.must_reemit_headers = false;
133 }
134
135 self.headers_seps = headers_seps;
136 self.headers_slice = headers_slice;
137 self.headers = byte_headers;
138
139 self.has_read = true;
140
141 Ok(())
142 }
143
144 #[inline]
145 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
146 self.on_first_read()?;
147
148 Ok(&self.headers)
149 }
150
151 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
152 use ReadResult::*;
153
154 self.buffer.reset();
155 self.seps.clear();
156
157 loop {
158 let seps_offset = self.buffer.saved().len();
159 let input = self.buffer.fill_buf()?;
160
161 let (result, pos) =
162 self.inner
163 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
164
165 match result {
166 End => {
167 self.buffer.consume(pos);
168 return Ok(None);
169 }
170 Cr | Lf => {
171 self.buffer.consume(pos);
172 }
173 InputEmpty => {
174 self.buffer.save();
175 }
176 Record => {
177 self.check_field_count(self.seps.len() + 1)?;
178
179 let record = ZeroCopyByteRecord::new(
180 self.buffer.flush(pos),
181 &self.seps,
182 self.inner.quote,
183 );
184
185 return Ok(Some(record));
186 }
187 };
188 }
189 }
190
191 #[inline(always)]
192 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
193 self.on_first_read()?;
194
195 if self.must_reemit_headers {
196 self.must_reemit_headers = false;
197 return Ok(Some(ZeroCopyByteRecord::new(
198 &self.headers_slice,
199 &self.headers_seps,
200 self.inner.quote,
201 )));
202 }
203
204 self.read_byte_record_impl()
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::io::Cursor;
211
212 use crate::brec;
213
214 use super::*;
215
216 impl<R: Read> ZeroCopyReader<R> {
217 fn from_reader_no_headers(reader: R) -> Self {
218 ZeroCopyReaderBuilder::new()
219 .has_headers(false)
220 .from_reader(reader)
221 }
222 }
223
224 #[test]
225 fn test_read_zero_copy_byte_record() -> error::Result<()> {
226 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
227
228 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
229 .has_headers(false)
230 .from_reader(Cursor::new(csv));
231 let mut records = Vec::new();
232
233 let expected = vec![
234 vec!["name", "surname", "age"],
235 vec![
236 "\"john\"",
237 "\"landy, the \"\"everlasting\"\" bastard\"",
238 "45",
239 ],
240 vec!["lucy", "rose", "\"67\""],
241 vec!["jermaine", "jackson", "\"89\""],
242 vec!["karine", "loucan", "\"52\""],
243 vec!["rose", "\"glib\"", "12"],
244 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
245 ]
246 .into_iter()
247 .map(|record| {
248 record
249 .into_iter()
250 .map(|cell| cell.as_bytes().to_vec())
251 .collect::<Vec<_>>()
252 })
253 .collect::<Vec<_>>();
254
255 while let Some(record) = reader.read_byte_record()? {
256 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
257 }
258
259 assert_eq!(records, expected);
260
261 Ok(())
262 }
263
264 #[test]
265 fn test_empty_row() -> error::Result<()> {
266 let data = "name\n\"\"\nlucy\n\"\"";
267
268 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
270
271 let expected = vec![
272 vec!["name".as_bytes().to_vec()],
273 vec!["\"\"".as_bytes().to_vec()],
274 vec!["lucy".as_bytes().to_vec()],
275 vec!["\"\"".as_bytes().to_vec()],
276 ];
277
278 let mut records = Vec::new();
280
281 while let Some(record) = reader.read_byte_record()? {
282 records.push(vec![record.as_slice().to_vec()]);
283 }
284
285 assert_eq!(records, expected);
286
287 Ok(())
288 }
289
290 #[test]
291 fn test_byte_headers() -> error::Result<()> {
292 let data = b"name,surname\njohn,dandy";
293
294 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
296 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
297 assert_eq!(
298 reader.read_byte_record()?.unwrap().to_byte_record(),
299 brec!["john", "dandy"]
300 );
301
302 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
304 assert_eq!(
305 reader.read_byte_record()?.unwrap().to_byte_record(),
306 brec!["john", "dandy"]
307 );
308 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
309
310 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
312 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
313 assert_eq!(
314 reader.read_byte_record()?.unwrap().to_byte_record(),
315 brec!["name", "surname"]
316 );
317
318 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
320 assert_eq!(
321 reader.read_byte_record()?.unwrap().to_byte_record(),
322 brec!["name", "surname"]
323 );
324 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
325
326 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
328 assert_eq!(reader.byte_headers()?, &brec![]);
329 assert!(reader.read_byte_record()?.is_none());
330
331 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
333 assert_eq!(reader.byte_headers()?, &brec![]);
334 assert!(reader.read_byte_record()?.is_none());
335
336 Ok(())
337 }
338}