1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10#[derive(Clone)]
12pub struct ZeroCopyReaderBuilder {
13 delimiter: u8,
14 quote: u8,
15 buffer_capacity: usize,
16 flexible: bool,
17 has_headers: bool,
18}
19
20impl Default for ZeroCopyReaderBuilder {
21 fn default() -> Self {
22 Self {
23 delimiter: b',',
24 quote: b'"',
25 buffer_capacity: 8192,
26 flexible: false,
27 has_headers: true,
28 }
29 }
30}
31
32impl ZeroCopyReaderBuilder {
33 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn with_capacity(capacity: usize) -> Self {
40 let mut reader = Self::default();
41 reader.buffer_capacity(capacity);
42 reader
43 }
44
45 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
51 self.delimiter = delimiter;
52 self
53 }
54
55 pub fn quote(&mut self, quote: u8) -> &mut Self {
61 self.quote = quote;
62 self
63 }
64
65 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
67 self.buffer_capacity = capacity;
68 self
69 }
70
71 pub fn flexible(&mut self, yes: bool) -> &mut Self {
77 self.flexible = yes;
78 self
79 }
80
81 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
85 self.has_headers = yes;
86 self
87 }
88
89 pub fn to_splitter_builder(&self) -> SplitterBuilder {
91 let mut splitter_builder = SplitterBuilder::new();
92
93 splitter_builder
94 .buffer_capacity(self.buffer_capacity)
95 .has_headers(self.has_headers)
96 .quote(self.quote)
97 .delimiter(self.delimiter);
98
99 splitter_builder
100 }
101
102 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
105 ZeroCopyReader {
106 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
107 inner: CoreReader::new(self.delimiter, self.quote),
108 byte_headers: ByteRecord::new(),
109 raw_headers: (Vec::new(), Vec::new()),
110 seps: Vec::new(),
111 flexible: self.flexible,
112 has_read: false,
113 must_reemit_headers: !self.has_headers,
114 has_headers: self.has_headers,
115 index: 0,
116 }
117 }
118}
119
120pub struct ZeroCopyReader<R> {
128 buffer: ScratchBuffer<R>,
129 inner: CoreReader,
130 byte_headers: ByteRecord,
131 raw_headers: (Vec<usize>, Vec<u8>),
132 seps: Vec<usize>,
133 flexible: bool,
134 has_read: bool,
135 must_reemit_headers: bool,
136 has_headers: bool,
137 index: u64,
138}
139
140impl<R: Read> ZeroCopyReader<R> {
141 pub fn from_reader(reader: R) -> Self {
142 ZeroCopyReaderBuilder::new().from_reader(reader)
143 }
144
145 #[inline]
146 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
147 if self.flexible {
148 return Ok(());
149 }
150
151 let headers_len = self.raw_headers.0.len() + 1;
152
153 if self.has_read && written != headers_len {
154 return Err(Error::new(ErrorKind::UnequalLengths {
155 expected_len: headers_len,
156 len: written,
157 pos: Some((byte, self.index)),
158 }));
159 }
160
161 Ok(())
162 }
163
164 #[inline]
165 fn on_first_read(&mut self) -> error::Result<()> {
166 if self.has_read {
167 return Ok(());
168 }
169
170 let input = self.buffer.fill_buf()?;
172 let bom_len = trim_bom(input);
173 self.buffer.consume(bom_len);
174
175 let mut headers_seps = Vec::new();
177 let mut headers_slice = Vec::new();
178 let mut byte_headers = ByteRecord::new();
179
180 if let Some(headers) = self.read_byte_record_impl()? {
181 (headers_seps, headers_slice) = headers.to_parts();
182 byte_headers = headers.to_byte_record();
183 } else {
184 self.must_reemit_headers = false;
185 }
186
187 self.raw_headers = (headers_seps, headers_slice);
188 self.byte_headers = byte_headers;
189
190 self.has_read = true;
191
192 Ok(())
193 }
194
195 #[inline]
197 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
198 self.on_first_read()?;
199
200 Ok(&self.byte_headers)
201 }
202
203 #[inline]
206 pub fn has_headers(&self) -> bool {
207 self.has_headers
208 }
209
210 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
211 use ReadResult::*;
212
213 self.buffer.reset();
214 self.seps.clear();
215
216 let byte = self.position();
217
218 loop {
219 let seps_offset = self.buffer.saved().len();
220 let input = self.buffer.fill_buf()?;
221
222 let (result, pos) =
223 self.inner
224 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
225
226 match result {
227 End => {
228 self.buffer.consume(pos);
229 return Ok(None);
230 }
231 Cr | Lf => {
232 self.buffer.consume(pos);
233 }
234 InputEmpty => {
235 self.buffer.save();
236 }
237 Record => {
238 self.index += 1;
239 self.check_field_count(byte, self.seps.len() + 1)?;
240
241 let record = ZeroCopyByteRecord::new(
242 self.buffer.flush(pos),
243 &self.seps,
244 self.inner.quote,
245 );
246
247 return Ok(Some(record));
248 }
249 };
250 }
251 }
252
253 #[inline(always)]
254 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
255 self.on_first_read()?;
256
257 if self.must_reemit_headers {
258 self.must_reemit_headers = false;
259 return Ok(Some(ZeroCopyByteRecord::new(
260 &self.raw_headers.1,
261 &self.raw_headers.0,
262 self.inner.quote,
263 )));
264 }
265
266 self.read_byte_record_impl()
267 }
268
269 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
274 (
275 self.must_reemit_headers.then_some(self.byte_headers),
276 self.buffer.into_bufreader(),
277 )
278 }
279
280 #[inline(always)]
282 pub fn position(&self) -> u64 {
283 if self.must_reemit_headers {
284 0
285 } else {
286 self.buffer.position()
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use std::io::Cursor;
294
295 use super::*;
296
297 impl<R: Read> ZeroCopyReader<R> {
298 fn from_reader_no_headers(reader: R) -> Self {
299 ZeroCopyReaderBuilder::new()
300 .has_headers(false)
301 .from_reader(reader)
302 }
303 }
304
305 #[test]
306 fn test_read_zero_copy_byte_record() -> error::Result<()> {
307 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
308
309 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
310 .has_headers(false)
311 .from_reader(Cursor::new(csv));
312 let mut records = Vec::new();
313
314 let expected = vec![
315 vec!["name", "surname", "age"],
316 vec![
317 "\"john\"",
318 "\"landy, the \"\"everlasting\"\" bastard\"",
319 "45",
320 ],
321 vec!["lucy", "rose", "\"67\""],
322 vec!["jermaine", "jackson", "\"89\""],
323 vec!["karine", "loucan", "\"52\""],
324 vec!["rose", "\"glib\"", "12"],
325 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
326 ]
327 .into_iter()
328 .map(|record| {
329 record
330 .into_iter()
331 .map(|cell| cell.as_bytes().to_vec())
332 .collect::<Vec<_>>()
333 })
334 .collect::<Vec<_>>();
335
336 while let Some(record) = reader.read_byte_record()? {
337 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
338 }
339
340 assert_eq!(records, expected);
341
342 Ok(())
343 }
344
345 #[test]
346 fn test_empty_row() -> error::Result<()> {
347 let data = "name\n\"\"\nlucy\n\"\"";
348
349 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
351
352 let expected = vec![
353 vec!["name".as_bytes().to_vec()],
354 vec!["\"\"".as_bytes().to_vec()],
355 vec!["lucy".as_bytes().to_vec()],
356 vec!["\"\"".as_bytes().to_vec()],
357 ];
358
359 let mut records = Vec::new();
361
362 while let Some(record) = reader.read_byte_record()? {
363 records.push(vec![record.as_slice().to_vec()]);
364 }
365
366 assert_eq!(records, expected);
367
368 Ok(())
369 }
370
371 #[test]
372 fn test_byte_headers() -> error::Result<()> {
373 let data = b"name,surname\njohn,dandy";
374
375 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
377 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
378 assert_eq!(
379 reader.read_byte_record()?.unwrap().to_byte_record(),
380 brec!["john", "dandy"]
381 );
382
383 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
385 assert_eq!(
386 reader.read_byte_record()?.unwrap().to_byte_record(),
387 brec!["john", "dandy"]
388 );
389 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
390
391 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
393 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
394 assert_eq!(
395 reader.read_byte_record()?.unwrap().to_byte_record(),
396 brec!["name", "surname"]
397 );
398
399 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
401 assert_eq!(
402 reader.read_byte_record()?.unwrap().to_byte_record(),
403 brec!["name", "surname"]
404 );
405 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
406
407 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
409 assert_eq!(reader.byte_headers()?, &brec![]);
410 assert!(reader.read_byte_record()?.is_none());
411
412 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
414 assert_eq!(reader.byte_headers()?, &brec![]);
415 assert!(reader.read_byte_record()?.is_none());
416
417 Ok(())
418 }
419}