1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error, ErrorKind};
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::splitter::SplitterBuilder;
8use crate::utils::trim_bom;
9
10#[derive(Clone)]
11pub struct ZeroCopyReaderBuilder {
12 delimiter: u8,
13 quote: u8,
14 buffer_capacity: usize,
15 flexible: bool,
16 has_headers: bool,
17}
18
19impl Default for ZeroCopyReaderBuilder {
20 fn default() -> Self {
21 Self {
22 delimiter: b',',
23 quote: b'"',
24 buffer_capacity: 8192,
25 flexible: false,
26 has_headers: true,
27 }
28 }
29}
30
31impl ZeroCopyReaderBuilder {
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn with_capacity(capacity: usize) -> Self {
37 let mut reader = Self::default();
38 reader.buffer_capacity(capacity);
39 reader
40 }
41
42 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
43 self.delimiter = delimiter;
44 self
45 }
46
47 pub fn quote(&mut self, quote: u8) -> &mut Self {
48 self.quote = quote;
49 self
50 }
51
52 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
53 self.buffer_capacity = capacity;
54 self
55 }
56
57 pub fn flexible(&mut self, yes: bool) -> &mut Self {
58 self.flexible = yes;
59 self
60 }
61
62 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
63 self.has_headers = yes;
64 self
65 }
66
67 pub fn to_splitter_builder(&self) -> SplitterBuilder {
68 let mut splitter_builder = SplitterBuilder::new();
69
70 splitter_builder
71 .buffer_capacity(self.buffer_capacity)
72 .has_headers(self.has_headers)
73 .quote(self.quote)
74 .delimiter(self.delimiter);
75
76 splitter_builder
77 }
78
79 pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
80 ZeroCopyReader {
81 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
82 inner: CoreReader::new(self.delimiter, self.quote),
83 byte_headers: ByteRecord::new(),
84 raw_headers: (Vec::new(), Vec::new()),
85 seps: Vec::new(),
86 flexible: self.flexible,
87 has_read: false,
88 must_reemit_headers: !self.has_headers,
89 has_headers: self.has_headers,
90 index: 0,
91 }
92 }
93}
94
95pub struct ZeroCopyReader<R> {
96 buffer: ScratchBuffer<R>,
97 inner: CoreReader,
98 byte_headers: ByteRecord,
99 raw_headers: (Vec<usize>, Vec<u8>),
100 seps: Vec<usize>,
101 flexible: bool,
102 has_read: bool,
103 must_reemit_headers: bool,
104 has_headers: bool,
105 index: u64,
106}
107
108impl<R: Read> ZeroCopyReader<R> {
109 pub fn from_reader(reader: R) -> Self {
110 ZeroCopyReaderBuilder::new().from_reader(reader)
111 }
112
113 #[inline]
114 fn check_field_count(&mut self, byte: u64, written: usize) -> error::Result<()> {
115 if self.flexible {
116 return Ok(());
117 }
118
119 let headers_len = self.raw_headers.0.len() + 1;
120
121 if self.has_read && written != headers_len {
122 return Err(Error::new(ErrorKind::UnequalLengths {
123 expected_len: headers_len,
124 len: written,
125 pos: Some((byte, self.index)),
126 }));
127 }
128
129 Ok(())
130 }
131
132 #[inline]
133 fn on_first_read(&mut self) -> error::Result<()> {
134 if self.has_read {
135 return Ok(());
136 }
137
138 let input = self.buffer.fill_buf()?;
140 let bom_len = trim_bom(input);
141 self.buffer.consume(bom_len);
142
143 let mut headers_seps = Vec::new();
145 let mut headers_slice = Vec::new();
146 let mut byte_headers = ByteRecord::new();
147
148 if let Some(headers) = self.read_byte_record_impl()? {
149 (headers_seps, headers_slice) = headers.to_parts();
150 byte_headers = headers.to_byte_record();
151 } else {
152 self.must_reemit_headers = false;
153 }
154
155 self.raw_headers = (headers_seps, headers_slice);
156 self.byte_headers = byte_headers;
157
158 self.has_read = true;
159
160 Ok(())
161 }
162
163 #[inline]
164 pub fn byte_headers(&mut self) -> error::Result<&ByteRecord> {
165 self.on_first_read()?;
166
167 Ok(&self.byte_headers)
168 }
169
170 #[inline]
171 pub fn has_headers(&self) -> bool {
172 self.has_headers
173 }
174
175 fn read_byte_record_impl(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
176 use ReadResult::*;
177
178 self.buffer.reset();
179 self.seps.clear();
180
181 let byte = self.position();
182
183 loop {
184 let seps_offset = self.buffer.saved().len();
185 let input = self.buffer.fill_buf()?;
186
187 let (result, pos) =
188 self.inner
189 .split_record_and_find_separators(input, seps_offset, &mut self.seps);
190
191 match result {
192 End => {
193 self.buffer.consume(pos);
194 return Ok(None);
195 }
196 Cr | Lf => {
197 self.buffer.consume(pos);
198 }
199 InputEmpty => {
200 self.buffer.save();
201 }
202 Record => {
203 self.index += 1;
204 self.check_field_count(byte, self.seps.len() + 1)?;
205
206 let record = ZeroCopyByteRecord::new(
207 self.buffer.flush(pos),
208 &self.seps,
209 self.inner.quote,
210 );
211
212 return Ok(Some(record));
213 }
214 };
215 }
216 }
217
218 #[inline(always)]
219 pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
220 self.on_first_read()?;
221
222 if self.must_reemit_headers {
223 self.must_reemit_headers = false;
224 return Ok(Some(ZeroCopyByteRecord::new(
225 &self.raw_headers.1,
226 &self.raw_headers.0,
227 self.inner.quote,
228 )));
229 }
230
231 self.read_byte_record_impl()
232 }
233
234 pub fn into_bufreader(self) -> BufReader<R> {
235 self.buffer.into_bufreader()
236 }
237
238 #[inline(always)]
239 pub fn position(&self) -> u64 {
240 if self.must_reemit_headers {
241 0
242 } else {
243 self.buffer.position()
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use std::io::Cursor;
251
252 use crate::brec;
253
254 use super::*;
255
256 impl<R: Read> ZeroCopyReader<R> {
257 fn from_reader_no_headers(reader: R) -> Self {
258 ZeroCopyReaderBuilder::new()
259 .has_headers(false)
260 .from_reader(reader)
261 }
262 }
263
264 #[test]
265 fn test_read_zero_copy_byte_record() -> error::Result<()> {
266 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
267
268 let mut reader = ZeroCopyReaderBuilder::with_capacity(32)
269 .has_headers(false)
270 .from_reader(Cursor::new(csv));
271 let mut records = Vec::new();
272
273 let expected = vec![
274 vec!["name", "surname", "age"],
275 vec![
276 "\"john\"",
277 "\"landy, the \"\"everlasting\"\" bastard\"",
278 "45",
279 ],
280 vec!["lucy", "rose", "\"67\""],
281 vec!["jermaine", "jackson", "\"89\""],
282 vec!["karine", "loucan", "\"52\""],
283 vec!["rose", "\"glib\"", "12"],
284 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
285 ]
286 .into_iter()
287 .map(|record| {
288 record
289 .into_iter()
290 .map(|cell| cell.as_bytes().to_vec())
291 .collect::<Vec<_>>()
292 })
293 .collect::<Vec<_>>();
294
295 while let Some(record) = reader.read_byte_record()? {
296 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
297 }
298
299 assert_eq!(records, expected);
300
301 Ok(())
302 }
303
304 #[test]
305 fn test_empty_row() -> error::Result<()> {
306 let data = "name\n\"\"\nlucy\n\"\"";
307
308 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
310
311 let expected = vec![
312 vec!["name".as_bytes().to_vec()],
313 vec!["\"\"".as_bytes().to_vec()],
314 vec!["lucy".as_bytes().to_vec()],
315 vec!["\"\"".as_bytes().to_vec()],
316 ];
317
318 let mut records = Vec::new();
320
321 while let Some(record) = reader.read_byte_record()? {
322 records.push(vec![record.as_slice().to_vec()]);
323 }
324
325 assert_eq!(records, expected);
326
327 Ok(())
328 }
329
330 #[test]
331 fn test_byte_headers() -> error::Result<()> {
332 let data = b"name,surname\njohn,dandy";
333
334 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
336 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
337 assert_eq!(
338 reader.read_byte_record()?.unwrap().to_byte_record(),
339 brec!["john", "dandy"]
340 );
341
342 let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
344 assert_eq!(
345 reader.read_byte_record()?.unwrap().to_byte_record(),
346 brec!["john", "dandy"]
347 );
348 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
349
350 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
352 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
353 assert_eq!(
354 reader.read_byte_record()?.unwrap().to_byte_record(),
355 brec!["name", "surname"]
356 );
357
358 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(data));
360 assert_eq!(
361 reader.read_byte_record()?.unwrap().to_byte_record(),
362 brec!["name", "surname"]
363 );
364 assert_eq!(reader.byte_headers()?, &brec!["name", "surname"]);
365
366 let mut reader = ZeroCopyReader::from_reader(Cursor::new(b""));
368 assert_eq!(reader.byte_headers()?, &brec![]);
369 assert!(reader.read_byte_record()?.is_none());
370
371 let mut reader = ZeroCopyReader::from_reader_no_headers(Cursor::new(b""));
373 assert_eq!(reader.byte_headers()?, &brec![]);
374 assert!(reader.read_byte_record()?.is_none());
375
376 Ok(())
377 }
378}