1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10#[derive(Clone)]
12pub struct ZeroCopyReaderBuilder {
13 delimiter: u8,
14 quote: u8,
15 buffer_capacity: usize,
16 flexible: bool,
17 has_headers: bool,
18}
19
20impl Default for ZeroCopyReaderBuilder {
21 fn default() -> Self {
22 Self {
23 delimiter: b',',
24 quote: b'"',
25 buffer_capacity: 8192,
26 flexible: false,
27 has_headers: true,
28 }
29 }
30}
31
32impl ZeroCopyReaderBuilder {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn with_capacity(capacity: usize) -> Self {
38 let mut reader = Self::default();
39 reader.buffer_capacity(capacity);
40 reader
41 }
42
43 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
44 self.delimiter = delimiter;
45 self
46 }
47
48 pub fn quote(&mut self, quote: u8) -> &mut Self {
49 self.quote = quote;
50 self
51 }
52
53 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
54 self.buffer_capacity = capacity;
55 self
56 }
57
58 pub fn flexible(&mut self, yes: bool) -> &mut Self {
59 self.flexible = yes;
60 self
61 }
62
63 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
64 self.has_headers = yes;
65 self
66 }
67
68 pub fn to_splitter_builder(&self) -> SplitterBuilder {
69 let mut splitter_builder = SplitterBuilder::new();
70
71 splitter_builder
72 .buffer_capacity(self.buffer_capacity)
73 .has_headers(self.has_headers)
74 .quote(self.quote)
75 .delimiter(self.delimiter);
76
77 splitter_builder
78 }
79
80 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
81 ZeroCopyReader {
82 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
83 inner: CoreReader::new(self.delimiter, self.quote),
84 byte_headers: ByteRecord::new(),
85 raw_headers: (Vec::new(), Vec::new()),
86 seps: Vec::new(),
87 flexible: self.flexible,
88 has_read: false,
89 must_reemit_headers: !self.has_headers,
90 has_headers: self.has_headers,
91 index: 0,
92 }
93 }
94}
95
96pub struct ZeroCopyReader<R> {
104 buffer: ScratchBuffer<R>,
105 inner: CoreReader,
106 byte_headers: ByteRecord,
107 raw_headers: (Vec<usize>, Vec<u8>),
108 seps: Vec<usize>,
109 flexible: bool,
110 has_read: bool,
111 must_reemit_headers: bool,
112 has_headers: bool,
113 index: u64,
114}
115
116impl<R: Read> ZeroCopyReader<R> {
117 pub fn from_reader(reader: R) -> Self {
118 ZeroCopyReaderBuilder::new().from_reader(reader)
119 }
120
121 #[inline]
122 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
123 if self.flexible {
124 return Ok(());
125 }
126
127 let headers_len = self.raw_headers.0.len() + 1;
128
129 if self.has_read && written != headers_len {
130 return Err(Error::new(ErrorKind::UnequalLengths {
131 expected_len: headers_len,
132 len: written,
133 pos: Some((byte, self.index)),
134 }));
135 }
136
137 Ok(())
138 }
139
140 #[inline]
141 fn on_first_read(&mut self) -> error::Result<()> {
142 if self.has_read {
143 return Ok(());
144 }
145
146 let input = self.buffer.fill_buf()?;
148 let bom_len = trim_bom(input);
149 self.buffer.consume(bom_len);
150
151 let mut headers_seps = Vec::new();
153 let mut headers_slice = Vec::new();
154 let mut byte_headers = ByteRecord::new();
155
156 if let Some(headers) = self.read_byte_record_impl()? {
157 (headers_seps, headers_slice) = headers.to_parts();
158 byte_headers = headers.to_byte_record();
159 } else {
160 self.must_reemit_headers = false;
161 }
162
163 self.raw_headers = (headers_seps, headers_slice);
164 self.byte_headers = byte_headers;
165
166 self.has_read = true;
167
168 Ok(())
169 }
170
171 #[inline]
172 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
173 self.on_first_read()?;
174
175 Ok(&self.byte_headers)
176 }
177
178 #[inline]
181 pub fn has_headers(&self) -> bool {
182 self.has_headers
183 }
184
185 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
186 use ReadResult::*;
187
188 self.buffer.reset();
189 self.seps.clear();
190
191 let byte = self.position();
192
193 loop {
194 let seps_offset = self.buffer.saved().len();
195 let input = self.buffer.fill_buf()?;
196
197 let (result, pos) =
198 self.inner
199 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
200
201 match result {
202 End => {
203 self.buffer.consume(pos);
204 return Ok(None);
205 }
206 Cr | Lf => {
207 self.buffer.consume(pos);
208 }
209 InputEmpty => {
210 self.buffer.save();
211 }
212 Record => {
213 self.index += 1;
214 self.check_field_count(byte, self.seps.len() + 1)?;
215
216 let record = ZeroCopyByteRecord::new(
217 self.buffer.flush(pos),
218 &self.seps,
219 self.inner.quote,
220 );
221
222 return Ok(Some(record));
223 }
224 };
225 }
226 }
227
228 #[inline(always)]
229 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
230 self.on_first_read()?;
231
232 if self.must_reemit_headers {
233 self.must_reemit_headers = false;
234 return Ok(Some(ZeroCopyByteRecord::new(
235 &self.raw_headers.1,
236 &self.raw_headers.0,
237 self.inner.quote,
238 )));
239 }
240
241 self.read_byte_record_impl()
242 }
243
244 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
245 (
246 self.must_reemit_headers.then_some(self.byte_headers),
247 self.buffer.into_bufreader(),
248 )
249 }
250
251 #[inline(always)]
252 pub fn position(&self) -> u64 {
253 if self.must_reemit_headers {
254 0
255 } else {
256 self.buffer.position()
257 }
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use std::io::Cursor;
264
265 use super::*;
266
267 impl<R: Read> ZeroCopyReader<R> {
268 fn from_reader_no_headers(reader: R) -> Self {
269 ZeroCopyReaderBuilder::new()
270 .has_headers(false)
271 .from_reader(reader)
272 }
273 }
274
275 #[test]
276 fn test_read_zero_copy_byte_record() -> error::Result<()> {
277 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
278
279 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
280 .has_headers(false)
281 .from_reader(Cursor::new(csv));
282 let mut records = Vec::new();
283
284 let expected = vec![
285 vec!["name", "surname", "age"],
286 vec![
287 "\"john\"",
288 "\"landy, the \"\"everlasting\"\" bastard\"",
289 "45",
290 ],
291 vec!["lucy", "rose", "\"67\""],
292 vec!["jermaine", "jackson", "\"89\""],
293 vec!["karine", "loucan", "\"52\""],
294 vec!["rose", "\"glib\"", "12"],
295 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
296 ]
297 .into_iter()
298 .map(|record| {
299 record
300 .into_iter()
301 .map(|cell| cell.as_bytes().to_vec())
302 .collect::<Vec<_>>()
303 })
304 .collect::<Vec<_>>();
305
306 while let Some(record) = reader.read_byte_record()? {
307 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
308 }
309
310 assert_eq!(records, expected);
311
312 Ok(())
313 }
314
315 #[test]
316 fn test_empty_row() -> error::Result<()> {
317 let data = "name\n\"\"\nlucy\n\"\"";
318
319 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
321
322 let expected = vec![
323 vec!["name".as_bytes().to_vec()],
324 vec!["\"\"".as_bytes().to_vec()],
325 vec!["lucy".as_bytes().to_vec()],
326 vec!["\"\"".as_bytes().to_vec()],
327 ];
328
329 let mut records = Vec::new();
331
332 while let Some(record) = reader.read_byte_record()? {
333 records.push(vec![record.as_slice().to_vec()]);
334 }
335
336 assert_eq!(records, expected);
337
338 Ok(())
339 }
340
341 #[test]
342 fn test_byte_headers() -> error::Result<()> {
343 let data = b"name,surname\njohn,dandy";
344
345 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
347 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
348 assert_eq!(
349 reader.read_byte_record()?.unwrap().to_byte_record(),
350 brec!["john", "dandy"]
351 );
352
353 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
355 assert_eq!(
356 reader.read_byte_record()?.unwrap().to_byte_record(),
357 brec!["john", "dandy"]
358 );
359 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
360
361 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
363 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
364 assert_eq!(
365 reader.read_byte_record()?.unwrap().to_byte_record(),
366 brec!["name", "surname"]
367 );
368
369 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
371 assert_eq!(
372 reader.read_byte_record()?.unwrap().to_byte_record(),
373 brec!["name", "surname"]
374 );
375 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
376
377 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
379 assert_eq!(reader.byte_headers()?, &brec![]);
380 assert!(reader.read_byte_record()?.is_none());
381
382 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
384 assert_eq!(reader.byte_headers()?, &brec![]);
385 assert!(reader.read_byte_record()?.is_none());
386
387 Ok(())
388 }
389}