1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10#[derive(Clone)]
11pub struct ZeroCopyReaderBuilder {
12 delimiter: u8,
13 quote: u8,
14 buffer_capacity: usize,
15 flexible: bool,
16 has_headers: bool,
17}
18
19impl Default for ZeroCopyReaderBuilder {
20 fn default() -> Self {
21 Self {
22 delimiter: b',',
23 quote: b'"',
24 buffer_capacity: 8192,
25 flexible: false,
26 has_headers: true,
27 }
28 }
29}
30
31impl ZeroCopyReaderBuilder {
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn with_capacity(capacity: usize) -> Self {
37 let mut reader = Self::default();
38 reader.buffer_capacity(capacity);
39 reader
40 }
41
42 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
43 self.delimiter = delimiter;
44 self
45 }
46
47 pub fn quote(&mut self, quote: u8) -> &mut Self {
48 self.quote = quote;
49 self
50 }
51
52 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
53 self.buffer_capacity = capacity;
54 self
55 }
56
57 pub fn flexible(&mut self, yes: bool) -> &mut Self {
58 self.flexible = yes;
59 self
60 }
61
62 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
63 self.has_headers = yes;
64 self
65 }
66
67 pub fn to_splitter_builder(&self) -> SplitterBuilder {
68 let mut splitter_builder = SplitterBuilder::new();
69
70 splitter_builder
71 .buffer_capacity(self.buffer_capacity)
72 .has_headers(self.has_headers)
73 .quote(self.quote)
74 .delimiter(self.delimiter);
75
76 splitter_builder
77 }
78
79 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
80 ZeroCopyReader {
81 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
82 inner: CoreReader::new(self.delimiter, self.quote),
83 byte_headers: ByteRecord::new(),
84 raw_headers: (Vec::new(), Vec::new()),
85 seps: Vec::new(),
86 flexible: self.flexible,
87 has_read: false,
88 must_reemit_headers: !self.has_headers,
89 has_headers: self.has_headers,
90 index: 0,
91 }
92 }
93}
94
95pub struct ZeroCopyReader<R> {
96 buffer: ScratchBuffer<R>,
97 inner: CoreReader,
98 byte_headers: ByteRecord,
99 raw_headers: (Vec<usize>, Vec<u8>),
100 seps: Vec<usize>,
101 flexible: bool,
102 has_read: bool,
103 must_reemit_headers: bool,
104 has_headers: bool,
105 index: u64,
106}
107
108impl<R: Read> ZeroCopyReader<R> {
109 pub fn from_reader(reader: R) -> Self {
110 ZeroCopyReaderBuilder::new().from_reader(reader)
111 }
112
113 #[inline]
114 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
115 if self.flexible {
116 return Ok(());
117 }
118
119 let headers_len = self.raw_headers.0.len() + 1;
120
121 if self.has_read && written != headers_len {
122 return Err(Error::new(ErrorKind::UnequalLengths {
123 expected_len: headers_len,
124 len: written,
125 pos: Some((byte, self.index)),
126 }));
127 }
128
129 Ok(())
130 }
131
132 #[inline]
133 fn on_first_read(&mut self) -> error::Result<()> {
134 if self.has_read {
135 return Ok(());
136 }
137
138 let input = self.buffer.fill_buf()?;
140 let bom_len = trim_bom(input);
141 self.buffer.consume(bom_len);
142
143 let mut headers_seps = Vec::new();
145 let mut headers_slice = Vec::new();
146 let mut byte_headers = ByteRecord::new();
147
148 if let Some(headers) = self.read_byte_record_impl()? {
149 (headers_seps, headers_slice) = headers.to_parts();
150 byte_headers = headers.to_byte_record();
151 } else {
152 self.must_reemit_headers = false;
153 }
154
155 self.raw_headers = (headers_seps, headers_slice);
156 self.byte_headers = byte_headers;
157
158 self.has_read = true;
159
160 Ok(())
161 }
162
163 #[inline]
164 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
165 self.on_first_read()?;
166
167 Ok(&self.byte_headers)
168 }
169
170 #[inline]
171 pub fn has_headers(&self) -> bool {
172 self.has_headers
173 }
174
175 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
176 use ReadResult::*;
177
178 self.buffer.reset();
179 self.seps.clear();
180
181 let byte = self.position();
182
183 loop {
184 let seps_offset = self.buffer.saved().len();
185 let input = self.buffer.fill_buf()?;
186
187 let (result, pos) =
188 self.inner
189 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
190
191 match result {
192 End => {
193 self.buffer.consume(pos);
194 return Ok(None);
195 }
196 Cr | Lf => {
197 self.buffer.consume(pos);
198 }
199 InputEmpty => {
200 self.buffer.save();
201 }
202 Record => {
203 self.index += 1;
204 self.check_field_count(byte, self.seps.len() + 1)?;
205
206 let record = ZeroCopyByteRecord::new(
207 self.buffer.flush(pos),
208 &self.seps,
209 self.inner.quote,
210 );
211
212 return Ok(Some(record));
213 }
214 };
215 }
216 }
217
218 #[inline(always)]
219 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
220 self.on_first_read()?;
221
222 if self.must_reemit_headers {
223 self.must_reemit_headers = false;
224 return Ok(Some(ZeroCopyByteRecord::new(
225 &self.raw_headers.1,
226 &self.raw_headers.0,
227 self.inner.quote,
228 )));
229 }
230
231 self.read_byte_record_impl()
232 }
233
234 pub fn into_bufreader(self) -> (Option<ByteRecord>, BufReader<R>) {
235 (
236 self.must_reemit_headers.then_some(self.byte_headers),
237 self.buffer.into_bufreader(),
238 )
239 }
240
241 #[inline(always)]
242 pub fn position(&self) -> u64 {
243 if self.must_reemit_headers {
244 0
245 } else {
246 self.buffer.position()
247 }
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use std::io::Cursor;
254
255 use crate::brec;
256
257 use super::*;
258
259 impl<R: Read> ZeroCopyReader<R> {
260 fn from_reader_no_headers(reader: R) -> Self {
261 ZeroCopyReaderBuilder::new()
262 .has_headers(false)
263 .from_reader(reader)
264 }
265 }
266
267 #[test]
268 fn test_read_zero_copy_byte_record() -> error::Result<()> {
269 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
270
271 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
272 .has_headers(false)
273 .from_reader(Cursor::new(csv));
274 let mut records = Vec::new();
275
276 let expected = vec![
277 vec!["name", "surname", "age"],
278 vec![
279 "\"john\"",
280 "\"landy, the \"\"everlasting\"\" bastard\"",
281 "45",
282 ],
283 vec!["lucy", "rose", "\"67\""],
284 vec!["jermaine", "jackson", "\"89\""],
285 vec!["karine", "loucan", "\"52\""],
286 vec!["rose", "\"glib\"", "12"],
287 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
288 ]
289 .into_iter()
290 .map(|record| {
291 record
292 .into_iter()
293 .map(|cell| cell.as_bytes().to_vec())
294 .collect::<Vec<_>>()
295 })
296 .collect::<Vec<_>>();
297
298 while let Some(record) = reader.read_byte_record()? {
299 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
300 }
301
302 assert_eq!(records, expected);
303
304 Ok(())
305 }
306
307 #[test]
308 fn test_empty_row() -> error::Result<()> {
309 let data = "name\n\"\"\nlucy\n\"\"";
310
311 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
313
314 let expected = vec![
315 vec!["name".as_bytes().to_vec()],
316 vec!["\"\"".as_bytes().to_vec()],
317 vec!["lucy".as_bytes().to_vec()],
318 vec!["\"\"".as_bytes().to_vec()],
319 ];
320
321 let mut records = Vec::new();
323
324 while let Some(record) = reader.read_byte_record()? {
325 records.push(vec![record.as_slice().to_vec()]);
326 }
327
328 assert_eq!(records, expected);
329
330 Ok(())
331 }
332
333 #[test]
334 fn test_byte_headers() -> error::Result<()> {
335 let data = b"name,surname\njohn,dandy";
336
337 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
339 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
340 assert_eq!(
341 reader.read_byte_record()?.unwrap().to_byte_record(),
342 brec!["john", "dandy"]
343 );
344
345 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
347 assert_eq!(
348 reader.read_byte_record()?.unwrap().to_byte_record(),
349 brec!["john", "dandy"]
350 );
351 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
352
353 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
355 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
356 assert_eq!(
357 reader.read_byte_record()?.unwrap().to_byte_record(),
358 brec!["name", "surname"]
359 );
360
361 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
363 assert_eq!(
364 reader.read_byte_record()?.unwrap().to_byte_record(),
365 brec!["name", "surname"]
366 );
367 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
368
369 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
371 assert_eq!(reader.byte_headers()?, &brec![]);
372 assert!(reader.read_byte_record()?.is_none());
373
374 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
376 assert_eq!(reader.byte_headers()?, &brec![]);
377 assert!(reader.read_byte_record()?.is_none());
378
379 Ok(())
380 }
381}