1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::reader::ReaderBuilder;
7use crate::records::{ByteRecord, ZeroCopyByteRecord};
8use crate::splitter::SplitterBuilder;
9use crate::utils::trim_bom;
10
11#[derive(Clone)]
13pub struct ZeroCopyReaderBuilder {
14 delimiter: u8,
15 quote: u8,
16 buffer_capacity: usize,
17 flexible: bool,
18 has_headers: bool,
19}
20
21impl Default for ZeroCopyReaderBuilder {
22 fn default() -> Self {
23 Self {
24 delimiter: b',',
25 quote: b'"',
26 buffer_capacity: 8192,
27 flexible: false,
28 has_headers: true,
29 }
30 }
31}
32
33impl ZeroCopyReaderBuilder {
34 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn with_capacity(capacity: usize) -> Self {
41 let mut reader = Self::default();
42 reader.buffer_capacity(capacity);
43 reader
44 }
45
46 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
52 self.delimiter = delimiter;
53 self
54 }
55
56 pub fn quote(&mut self, quote: u8) -> &mut Self {
62 self.quote = quote;
63 self
64 }
65
66 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
68 self.buffer_capacity = capacity;
69 self
70 }
71
72 pub fn flexible(&mut self, yes: bool) -> &mut Self {
78 self.flexible = yes;
79 self
80 }
81
82 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
86 self.has_headers = yes;
87 self
88 }
89
90 pub fn to_splitter_builder(&self) -> SplitterBuilder {
92 let mut splitter_builder = SplitterBuilder::new();
93
94 splitter_builder
95 .buffer_capacity(self.buffer_capacity)
96 .has_headers(self.has_headers)
97 .quote(self.quote)
98 .delimiter(self.delimiter);
99
100 splitter_builder
101 }
102
103 pub fn to_reader_builder(&self) -> ReaderBuilder {
105 let mut reader_builder = ReaderBuilder::new();
106
107 reader_builder
108 .buffer_capacity(self.buffer_capacity)
109 .has_headers(self.has_headers)
110 .quote(self.quote)
111 .delimiter(self.delimiter);
112
113 reader_builder
114 }
115
116 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
119 ZeroCopyReader {
120 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
121 inner: CoreReader::new(self.delimiter, self.quote),
122 byte_headers: ByteRecord::new(),
123 raw_headers: (Vec::new(), Vec::new()),
124 seps: Vec::new(),
125 flexible: self.flexible,
126 has_read: false,
127 must_reemit_headers: !self.has_headers,
128 has_headers: self.has_headers,
129 index: 0,
130 }
131 }
132}
133
134pub struct ZeroCopyReader<R> {
142 buffer: ScratchBuffer<R>,
143 inner: CoreReader,
144 byte_headers: ByteRecord,
145 raw_headers: (Vec<usize>, Vec<u8>),
146 seps: Vec<usize>,
147 flexible: bool,
148 has_read: bool,
149 must_reemit_headers: bool,
150 has_headers: bool,
151 index: u64,
152}
153
154impl<R: Read> ZeroCopyReader<R> {
155 pub fn from_reader(reader: R) -> Self {
156 ZeroCopyReaderBuilder::new().from_reader(reader)
157 }
158
159 #[inline]
160 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
161 if self.flexible {
162 return Ok(());
163 }
164
165 let headers_len = self.raw_headers.0.len() + 1;
166
167 if self.has_read && written != headers_len {
168 return Err(Error::new(ErrorKind::UnequalLengths {
169 expected_len: headers_len,
170 len: written,
171 pos: Some((byte, self.index)),
172 }));
173 }
174
175 Ok(())
176 }
177
178 #[inline]
179 fn on_first_read(&mut self) -> error::Result<()> {
180 if self.has_read {
181 return Ok(());
182 }
183
184 let input = self.buffer.fill_buf()?;
186 let bom_len = trim_bom(input);
187 self.buffer.consume(bom_len);
188
189 let mut headers_seps = Vec::new();
191 let mut headers_slice = Vec::new();
192 let mut byte_headers = ByteRecord::new();
193
194 if let Some(headers) = self.read_byte_record_impl()? {
195 (headers_seps, headers_slice) = headers.to_parts();
196 byte_headers = headers.to_byte_record();
197 } else {
198 self.must_reemit_headers = false;
199 }
200
201 self.raw_headers = (headers_seps, headers_slice);
202 self.byte_headers = byte_headers;
203
204 self.has_read = true;
205
206 Ok(())
207 }
208
209 #[inline]
211 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
212 self.on_first_read()?;
213
214 Ok(&self.byte_headers)
215 }
216
217 #[inline]
220 pub fn has_headers(&self) -> bool {
221 self.has_headers
222 }
223
224 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
225 use ReadResult::*;
226
227 self.buffer.reset();
228 self.seps.clear();
229
230 let byte = self.position();
231
232 loop {
233 let seps_offset = self.buffer.saved().len();
234 let input = self.buffer.fill_buf()?;
235
236 let (result, pos) =
237 self.inner
238 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
239
240 match result {
241 End => {
242 self.buffer.consume(pos);
243 return Ok(None);
244 }
245 Cr | Lf => {
246 self.buffer.consume(pos);
247 }
248 InputEmpty => {
249 self.buffer.save();
250 }
251 Record => {
252 self.index += 1;
253 self.check_field_count(byte, self.seps.len() + 1)?;
254
255 let bytes = self.buffer.flush(pos);
256
257 let record = ZeroCopyByteRecord::new(bytes, &self.seps, self.inner.quote);
258
259 return Ok(Some(record));
260 }
261 };
262 }
263 }
264
265 #[inline(always)]
266 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
267 self.on_first_read()?;
268
269 if self.must_reemit_headers {
270 self.must_reemit_headers = false;
271 return Ok(Some(ZeroCopyByteRecord::new(
272 &self.raw_headers.1,
273 &self.raw_headers.0,
274 self.inner.quote,
275 )));
276 }
277
278 self.read_byte_record_impl()
279 }
280
281 #[inline(always)]
283 pub fn position(&self) -> u64 {
284 if self.must_reemit_headers {
285 0
286 } else {
287 self.buffer.position()
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use std::io::Cursor;
295
296 use super::*;
297
298 impl<R: Read> ZeroCopyReader<R> {
299 fn from_reader_no_headers(reader: R) -> Self {
300 ZeroCopyReaderBuilder::new()
301 .has_headers(false)
302 .from_reader(reader)
303 }
304 }
305
306 #[test]
307 fn test_read_zero_copy_byte_record() -> error::Result<()> {
308 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
309
310 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
311 .has_headers(false)
312 .from_reader(Cursor::new(csv));
313 let mut records = Vec::new();
314
315 let expected = vec![
316 vec!["name", "surname", "age"],
317 vec![
318 "\"john\"",
319 "\"landy, the \"\"everlasting\"\" bastard\"",
320 "45",
321 ],
322 vec!["lucy", "rose", "\"67\""],
323 vec!["jermaine", "jackson", "\"89\""],
324 vec!["karine", "loucan", "\"52\""],
325 vec!["rose", "\"glib\"", "12"],
326 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
327 ]
328 .into_iter()
329 .map(|record| {
330 record
331 .into_iter()
332 .map(|cell| cell.as_bytes().to_vec())
333 .collect::<Vec<_>>()
334 })
335 .collect::<Vec<_>>();
336
337 while let Some(record) = reader.read_byte_record()? {
338 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
339 }
340
341 assert_eq!(records, expected);
342
343 Ok(())
344 }
345
346 #[test]
347 fn test_empty_row() -> error::Result<()> {
348 let data = "name\n\"\"\nlucy\n\"\"";
349
350 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
352
353 let expected = vec![
354 vec!["name".as_bytes().to_vec()],
355 vec!["\"\"".as_bytes().to_vec()],
356 vec!["lucy".as_bytes().to_vec()],
357 vec!["\"\"".as_bytes().to_vec()],
358 ];
359
360 let mut records = Vec::new();
362
363 while let Some(record) = reader.read_byte_record()? {
364 records.push(vec![record.as_slice().to_vec()]);
365 }
366
367 assert_eq!(records, expected);
368
369 Ok(())
370 }
371
372 #[test]
373 fn test_byte_headers() -> error::Result<()> {
374 let data = b"name,surname\njohn,dandy";
375
376 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
378 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
379 assert_eq!(
380 reader.read_byte_record()?.unwrap().to_byte_record(),
381 brec!["john", "dandy"]
382 );
383
384 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
386 assert_eq!(
387 reader.read_byte_record()?.unwrap().to_byte_record(),
388 brec!["john", "dandy"]
389 );
390 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
391
392 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
394 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
395 assert_eq!(
396 reader.read_byte_record()?.unwrap().to_byte_record(),
397 brec!["name", "surname"]
398 );
399
400 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
402 assert_eq!(
403 reader.read_byte_record()?.unwrap().to_byte_record(),
404 brec!["name", "surname"]
405 );
406 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
407
408 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
410 assert_eq!(reader.byte_headers()?, &brec![]);
411 assert!(reader.read_byte_record()?.is_none());
412
413 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
415 assert_eq!(reader.byte_headers()?, &brec![]);
416 assert!(reader.read_byte_record()?.is_none());
417
418 Ok(())
419 }
420}