1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::reader::{ReaderBuilder};
7use crate::records::{ByteRecord, ZeroCopyByteRecord};
8use crate::splitter::SplitterBuilder;
9use crate::utils::trim_bom;
10
11#[derive(Clone)]
13pub struct ZeroCopyReaderBuilder {
14 delimiter: u8,
15 quote: u8,
16 buffer_capacity: usize,
17 flexible: bool,
18 has_headers: bool,
19}
20
21impl Default for ZeroCopyReaderBuilder {
22 fn default() -> Self {
23 Self {
24 delimiter: b',',
25 quote: b'"',
26 buffer_capacity: 8192,
27 flexible: false,
28 has_headers: true,
29 }
30 }
31}
32
33impl ZeroCopyReaderBuilder {
34 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn with_capacity(capacity: usize) -> Self {
41 let mut reader = Self::default();
42 reader.buffer_capacity(capacity);
43 reader
44 }
45
46 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
52 self.delimiter = delimiter;
53 self
54 }
55
56 pub fn quote(&mut self, quote: u8) -> &mut Self {
62 self.quote = quote;
63 self
64 }
65
66 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
68 self.buffer_capacity = capacity;
69 self
70 }
71
72 pub fn flexible(&mut self, yes: bool) -> &mut Self {
78 self.flexible = yes;
79 self
80 }
81
82 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
86 self.has_headers = yes;
87 self
88 }
89
90 pub fn to_splitter_builder(&self) -> SplitterBuilder {
92 let mut splitter_builder = SplitterBuilder::new();
93
94 splitter_builder
95 .buffer_capacity(self.buffer_capacity)
96 .has_headers(self.has_headers)
97 .quote(self.quote)
98 .delimiter(self.delimiter);
99
100 splitter_builder
101 }
102
103 pub fn to_reader_builder(&self) -> ReaderBuilder {
105 let mut reader_builder = ReaderBuilder::new();
106
107 reader_builder
108 .buffer_capacity(self.buffer_capacity)
109 .has_headers(self.has_headers)
110 .quote(self.quote)
111 .delimiter(self.delimiter);
112
113 reader_builder
114 }
115
116 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
119 ZeroCopyReader {
120 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
121 inner: CoreReader::new(self.delimiter, self.quote),
122 byte_headers: ByteRecord::new(),
123 raw_headers: (Vec::new(), Vec::new()),
124 seps: Vec::new(),
125 flexible: self.flexible,
126 has_read: false,
127 must_reemit_headers: !self.has_headers,
128 has_headers: self.has_headers,
129 index: 0,
130 }
131 }
132}
133
134pub struct ZeroCopyReader<R> {
142 buffer: ScratchBuffer<R>,
143 inner: CoreReader,
144 byte_headers: ByteRecord,
145 raw_headers: (Vec<usize>, Vec<u8>),
146 seps: Vec<usize>,
147 flexible: bool,
148 has_read: bool,
149 must_reemit_headers: bool,
150 has_headers: bool,
151 index: u64,
152}
153
154impl<R: Read> ZeroCopyReader<R> {
155 pub fn from_reader(reader: R) -> Self {
156 ZeroCopyReaderBuilder::new().from_reader(reader)
157 }
158
159 #[inline]
160 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
161 if self.flexible {
162 return Ok(());
163 }
164
165 let headers_len = self.raw_headers.0.len() + 1;
166
167 if self.has_read && written != headers_len {
168 return Err(Error::new(ErrorKind::UnequalLengths {
169 expected_len: headers_len,
170 len: written,
171 pos: Some((byte, self.index)),
172 }));
173 }
174
175 Ok(())
176 }
177
178 #[inline]
179 fn on_first_read(&mut self) -> error::Result<()> {
180 if self.has_read {
181 return Ok(());
182 }
183
184 let input = self.buffer.fill_buf()?;
186 let bom_len = trim_bom(input);
187 self.buffer.consume(bom_len);
188
189 let mut headers_seps = Vec::new();
191 let mut headers_slice = Vec::new();
192 let mut byte_headers = ByteRecord::new();
193
194 if let Some(headers) = self.read_byte_record_impl()? {
195 (headers_seps, headers_slice) = headers.to_parts();
196 byte_headers = headers.to_byte_record();
197 } else {
198 self.must_reemit_headers = false;
199 }
200
201 self.raw_headers = (headers_seps, headers_slice);
202 self.byte_headers = byte_headers;
203
204 self.has_read = true;
205
206 Ok(())
207 }
208
209 #[inline]
211 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
212 self.on_first_read()?;
213
214 Ok(&self.byte_headers)
215 }
216
217 #[inline]
220 pub fn has_headers(&self) -> bool {
221 self.has_headers
222 }
223
224 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
225 use ReadResult::*;
226
227 self.buffer.reset();
228 self.seps.clear();
229
230 let byte = self.position();
231
232 loop {
233 let seps_offset = self.buffer.saved().len();
234 let input = self.buffer.fill_buf()?;
235
236 let (result, pos) =
237 self.inner
238 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
239
240 match result {
241 End => {
242 self.buffer.consume(pos);
243 return Ok(None);
244 }
245 Cr | Lf => {
246 self.buffer.consume(pos);
247 }
248 InputEmpty => {
249 self.buffer.save();
250 }
251 Record => {
252 self.index += 1;
253 self.check_field_count(byte, self.seps.len() + 1)?;
254
255 let record = ZeroCopyByteRecord::new(
256 self.buffer.flush(pos),
257 &self.seps,
258 self.inner.quote,
259 );
260
261 return Ok(Some(record));
262 }
263 };
264 }
265 }
266
267 #[inline(always)]
268 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
269 self.on_first_read()?;
270
271 if self.must_reemit_headers {
272 self.must_reemit_headers = false;
273 return Ok(Some(ZeroCopyByteRecord::new(
274 &self.raw_headers.1,
275 &self.raw_headers.0,
276 self.inner.quote,
277 )));
278 }
279
280 self.read_byte_record_impl()
281 }
282
283 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
288 (
289 self.must_reemit_headers.then_some(self.byte_headers),
290 self.buffer.into_bufreader(),
291 )
292 }
293
294 #[inline(always)]
296 pub fn position(&self) -> u64 {
297 if self.must_reemit_headers {
298 0
299 } else {
300 self.buffer.position()
301 }
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use std::io::Cursor;
308
309 use super::*;
310
311 impl<R: Read> ZeroCopyReader<R> {
312 fn from_reader_no_headers(reader: R) -> Self {
313 ZeroCopyReaderBuilder::new()
314 .has_headers(false)
315 .from_reader(reader)
316 }
317 }
318
319 #[test]
320 fn test_read_zero_copy_byte_record() -> error::Result<()> {
321 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
322
323 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
324 .has_headers(false)
325 .from_reader(Cursor::new(csv));
326 let mut records = Vec::new();
327
328 let expected = vec![
329 vec!["name", "surname", "age"],
330 vec![
331 "\"john\"",
332 "\"landy, the \"\"everlasting\"\" bastard\"",
333 "45",
334 ],
335 vec!["lucy", "rose", "\"67\""],
336 vec!["jermaine", "jackson", "\"89\""],
337 vec!["karine", "loucan", "\"52\""],
338 vec!["rose", "\"glib\"", "12"],
339 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
340 ]
341 .into_iter()
342 .map(|record| {
343 record
344 .into_iter()
345 .map(|cell| cell.as_bytes().to_vec())
346 .collect::<Vec<_>>()
347 })
348 .collect::<Vec<_>>();
349
350 while let Some(record) = reader.read_byte_record()? {
351 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
352 }
353
354 assert_eq!(records, expected);
355
356 Ok(())
357 }
358
359 #[test]
360 fn test_empty_row() -> error::Result<()> {
361 let data = "name\n\"\"\nlucy\n\"\"";
362
363 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
365
366 let expected = vec![
367 vec!["name".as_bytes().to_vec()],
368 vec!["\"\"".as_bytes().to_vec()],
369 vec!["lucy".as_bytes().to_vec()],
370 vec!["\"\"".as_bytes().to_vec()],
371 ];
372
373 let mut records = Vec::new();
375
376 while let Some(record) = reader.read_byte_record()? {
377 records.push(vec![record.as_slice().to_vec()]);
378 }
379
380 assert_eq!(records, expected);
381
382 Ok(())
383 }
384
385 #[test]
386 fn test_byte_headers() -> error::Result<()> {
387 let data = b"name,surname\njohn,dandy";
388
389 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
391 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
392 assert_eq!(
393 reader.read_byte_record()?.unwrap().to_byte_record(),
394 brec!["john", "dandy"]
395 );
396
397 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
399 assert_eq!(
400 reader.read_byte_record()?.unwrap().to_byte_record(),
401 brec!["john", "dandy"]
402 );
403 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
404
405 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
407 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
408 assert_eq!(
409 reader.read_byte_record()?.unwrap().to_byte_record(),
410 brec!["name", "surname"]
411 );
412
413 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
415 assert_eq!(
416 reader.read_byte_record()?.unwrap().to_byte_record(),
417 brec!["name", "surname"]
418 );
419 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
420
421 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
423 assert_eq!(reader.byte_headers()?, &brec![]);
424 assert!(reader.read_byte_record()?.is_none());
425
426 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
428 assert_eq!(reader.byte_headers()?, &brec![]);
429 assert!(reader.read_byte_record()?.is_none());
430
431 Ok(())
432 }
433}