1use std::io::{BufRead, BufReader, Read};
2
3use crate::core::{self, ReadResult};
4use crate::error::{self, Error};
5use crate::records::{ByteRecord, ByteRecordBuilder, ZeroCopyByteRecord};
6
7pub struct BufferedReader<R> {
8 buffer: BufReader<R>,
9 scratch: Vec<u8>,
10 seps: Vec<usize>,
11 actual_buffer_position: Option<usize>,
12 inner: core::Reader,
13 field_count: Option<usize>,
14}
15
16impl<R: Read> BufferedReader<R> {
17 pub fn new(reader: R, delimiter: u8, quote: u8) -> Self {
18 Self {
19 buffer: BufReader::new(reader),
20 scratch: Vec::new(),
21 seps: Vec::new(),
22 actual_buffer_position: None,
23 inner: core::Reader::new(delimiter, quote),
24 field_count: None,
25 }
26 }
27
28 pub fn with_capacity(capacity: usize, reader: R, delimiter: u8, quote: u8) -> Self {
29 Self {
30 buffer: BufReader::with_capacity(capacity, reader),
31 scratch: Vec::new(),
32 seps: Vec::new(),
33 actual_buffer_position: None,
34 inner: core::Reader::new(delimiter, quote),
35 field_count: None,
36 }
37 }
38
39 #[inline]
40 fn check_field_count(&mut self, written: usize) -> error::Result<()> {
41 match self.field_count {
42 Some(expected) => {
43 if written != expected {
44 return Err(Error::unequal_lengths(expected, written));
45 }
46 }
47 None => {
48 self.field_count = Some(written);
49 }
50 }
51
52 Ok(())
53 }
54
55 pub fn strip_bom(&mut self) -> error::Result<()> {
56 let input = self.buffer.fill_buf()?;
57
58 if input.len() >= 3 && &input[..3] == b"\xef\xbb\xbf" {
59 self.buffer.consume(3);
60 }
61
62 Ok(())
63 }
64
65 pub fn first_byte_record(&mut self, consume: bool) -> error::Result<ByteRecord> {
66 use ReadResult::*;
67
68 let mut record = ByteRecord::new();
69 let mut record_builder = ByteRecordBuilder::wrap(&mut record);
70
71 let input = self.buffer.fill_buf()?;
72
73 let (result, pos) = self.inner.read_record(input, &mut record_builder);
74
75 match result {
76 End => Ok(ByteRecord::new()),
77
78 Cr | Lf | ReadResult::InputEmpty => Err(Error::invalid_headers()),
81 Record => {
82 if consume {
83 self.buffer.consume(pos);
84 }
85
86 Ok(record)
87 }
88 }
89 }
90
91 pub fn count_records(&mut self) -> error::Result<u64> {
92 use ReadResult::*;
93
94 let mut count: u64 = 0;
95
96 loop {
97 let input = self.buffer.fill_buf()?;
98
99 let (result, pos) = self.inner.split_record(input);
100
101 self.buffer.consume(pos);
102
103 match result {
104 End => break,
105 InputEmpty | Cr | Lf => continue,
106 Record => {
107 count += 1;
108 }
109 };
110 }
111
112 Ok(count)
113 }
114
115 pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
116 use ReadResult::*;
117
118 self.scratch.clear();
119
120 if let Some(last_pos) = self.actual_buffer_position.take() {
121 self.buffer.consume(last_pos);
122 }
123
124 loop {
125 let input = self.buffer.fill_buf()?;
126
127 let (result, pos) = self.inner.split_record(input);
128
129 match result {
130 End => {
131 self.buffer.consume(pos);
132 return Ok(None);
133 }
134 Cr | Lf => {
135 self.buffer.consume(pos);
136 }
137 InputEmpty => {
138 self.scratch.extend_from_slice(input);
139 self.buffer.consume(pos);
140 }
141 Record => {
142 if self.scratch.is_empty() {
143 self.actual_buffer_position = Some(pos);
144 return Ok(Some(&self.buffer.buffer()[..pos]));
145 } else {
146 self.scratch.extend_from_slice(&input[..pos]);
147 self.buffer.consume(pos);
148
149 return Ok(Some(&self.scratch));
150 }
151 }
152 };
153 }
154 }
155
156 pub fn read_zero_copy_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
157 use ReadResult::*;
158
159 self.scratch.clear();
160 self.seps.clear();
161
162 if let Some(last_pos) = self.actual_buffer_position.take() {
163 self.buffer.consume(last_pos);
164 }
165
166 loop {
167 let input = self.buffer.fill_buf()?;
168
169 let (result, pos) = self.inner.split_record_and_find_separators(
170 input,
171 self.scratch.len(),
172 &mut self.seps,
173 );
174
175 match result {
176 End => {
177 self.buffer.consume(pos);
178 return Ok(None);
179 }
180 Cr | Lf => {
181 self.buffer.consume(pos);
182 }
183 InputEmpty => {
184 self.scratch.extend_from_slice(input);
185 self.buffer.consume(pos);
186 }
187 Record => {
188 if self.scratch.is_empty() {
189 self.check_field_count(self.seps.len() + 1)?;
190 self.actual_buffer_position = Some(pos);
191 return Ok(Some(ZeroCopyByteRecord::new(
192 &self.buffer.buffer()[..pos],
193 &self.seps,
194 )));
195 } else {
196 self.scratch.extend_from_slice(&input[..pos]);
197 self.buffer.consume(pos);
198 self.check_field_count(self.seps.len() + 1)?;
199 return Ok(Some(ZeroCopyByteRecord::new(&self.scratch, &self.seps)));
200 }
201 }
202 };
203 }
204 }
205
206 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
207 use ReadResult::*;
208
209 record.clear();
210
211 let mut record_builder = ByteRecordBuilder::wrap(record);
212
213 if let Some(last_pos) = self.actual_buffer_position.take() {
214 self.buffer.consume(last_pos);
215 }
216
217 loop {
218 let input = self.buffer.fill_buf()?;
219
220 let (result, pos) = self.inner.read_record(input, &mut record_builder);
221
222 self.buffer.consume(pos);
223
224 match result {
225 End => {
226 return Ok(false);
227 }
228 Cr | Lf | InputEmpty => {
229 continue;
230 }
231 Record => {
232 self.check_field_count(record.len())?;
233 return Ok(true);
234 }
235 };
236 }
237 }
238
239 pub fn byte_records(&mut self) -> ByteRecordsIter<'_, R> {
240 ByteRecordsIter {
241 reader: self,
242 record: ByteRecord::new(),
243 }
244 }
245
246 pub fn into_byte_records(self) -> ByteRecordsIntoIter<R> {
247 ByteRecordsIntoIter {
248 reader: self,
249 record: ByteRecord::new(),
250 }
251 }
252}
253
254pub struct ByteRecordsIter<'r, R> {
255 reader: &'r mut BufferedReader<R>,
256 record: ByteRecord,
257}
258
259impl<'r, R: Read> Iterator for ByteRecordsIter<'r, R> {
260 type Item = error::Result<ByteRecord>;
261
262 fn next(&mut self) -> Option<Self::Item> {
263 match self.reader.read_byte_record(&mut self.record) {
266 Err(err) => Some(Err(err)),
267 Ok(true) => Some(Ok(self.record.clone())),
268 Ok(false) => None,
269 }
270 }
271}
272
273pub struct ByteRecordsIntoIter<R> {
274 reader: BufferedReader<R>,
275 record: ByteRecord,
276}
277
278impl<R: Read> Iterator for ByteRecordsIntoIter<R> {
279 type Item = error::Result<ByteRecord>;
280
281 fn next(&mut self) -> Option<Self::Item> {
282 match self.reader.read_byte_record(&mut self.record) {
285 Err(err) => Some(Err(err)),
286 Ok(true) => Some(Ok(self.record.clone())),
287 Ok(false) => None,
288 }
289 }
290}
291
292pub struct TotalReader<'b> {
295 inner: core::Reader,
296 bytes: &'b [u8],
297 pos: usize,
298}
299
300impl<'b> TotalReader<'b> {
301 pub fn new(delimiter: u8, quote: u8, bytes: &'b [u8]) -> Self {
302 Self {
303 inner: core::Reader::new(delimiter, quote),
304 bytes,
305 pos: 0,
306 }
307 }
308
309 pub fn count_records(&mut self) -> u64 {
310 use ReadResult::*;
311
312 let mut count: u64 = 0;
313
314 loop {
315 let (result, pos) = self.inner.split_record(&self.bytes[self.pos..]);
316
317 self.pos += pos;
318
319 match result {
320 End => break,
321 InputEmpty | Cr | Lf => continue,
322 Record => {
323 count += 1;
324 }
325 };
326 }
327
328 count
329 }
330
331 pub fn read_byte_record(&mut self, record: &mut ByteRecord) -> error::Result<bool> {
332 use ReadResult::*;
333
334 record.clear();
335
336 let mut record_builder = ByteRecordBuilder::wrap(record);
337
338 loop {
339 let (result, pos) = self
340 .inner
341 .read_record(&self.bytes[self.pos..], &mut record_builder);
342
343 self.pos += pos;
344
345 match result {
346 End => {
347 return Ok(false);
348 }
349 Cr | Lf | InputEmpty => {
350 continue;
351 }
352 Record => {
353 return Ok(true);
354 }
355 };
356 }
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use std::io::Cursor;
363
364 use crate::brec;
365
366 use super::*;
367
368 fn count_records(data: &str, capacity: usize) -> u64 {
369 let mut splitter = BufferedReader::with_capacity(capacity, Cursor::new(data), b',', b'"');
370 splitter.count_records().unwrap()
371 }
372
373 #[test]
374 fn test_count() {
375 assert_eq!(count_records("", 1024), 0);
377
378 let tests = vec![
380 "name\njohn\nlucy",
381 "name\njohn\nlucy\n",
382 "name\n\njohn\r\nlucy\n",
383 "name\n\njohn\r\nlucy\n\n",
384 "name\n\n\njohn\r\n\r\nlucy\n\n\n",
385 "\nname\njohn\nlucy",
386 "\n\nname\njohn\nlucy",
387 "\r\n\r\nname\njohn\nlucy",
388 "name\njohn\nlucy\r\n",
389 "name\njohn\nlucy\r\n\r\n",
390 ];
391
392 for capacity in [32usize, 4, 3, 2, 1] {
393 for test in tests.iter() {
394 assert_eq!(
395 count_records(test, capacity),
396 3,
397 "capacity={} string={:?}",
398 capacity,
399 test
400 );
401 }
402 }
403
404 let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
406 assert_eq!(count_records(data, 1024), 3);
407
408 for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
410 let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
411
412 assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
413 }
414
415 let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
417 assert_eq!(count_records(data, 1024), 3);
418 }
419
420 #[test]
421 fn test_read_zero_copy_byte_record() -> error::Result<()> {
422 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
423
424 let mut reader = BufferedReader::with_capacity(32, Cursor::new(csv), b',', b'"');
425 let mut records = Vec::new();
426
427 let expected = vec![
428 vec!["name", "surname", "age"],
429 vec![
430 "\"john\"",
431 "\"landy, the \"\"everlasting\"\" bastard\"",
432 "45",
433 ],
434 vec!["lucy", "rose", "\"67\""],
435 vec!["jermaine", "jackson", "\"89\""],
436 vec!["karine", "loucan", "\"52\""],
437 vec!["rose", "\"glib\"", "12"],
438 vec!["\"guillaume\"", "\"plique\"", "\"42\""],
439 ]
440 .into_iter()
441 .map(|record| {
442 record
443 .into_iter()
444 .map(|cell| cell.as_bytes().to_vec())
445 .collect::<Vec<_>>()
446 })
447 .collect::<Vec<_>>();
448
449 while let Some(record) = reader.read_zero_copy_byte_record()? {
450 records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
451 }
452
453 assert_eq!(records, expected);
454
455 Ok(())
456 }
457
458 #[test]
459 fn test_read_byte_record() -> error::Result<()> {
460 let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\n\"\"\"ok\"\"\",whatever,dude\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
461
462 let expected = vec![
463 brec!["name", "surname", "age"],
464 brec!["john", "landy, the \"everlasting\" bastard", "45"],
465 brec!["\"ok\"", "whatever", "dude"],
466 brec!["lucy", "rose", "67"],
467 brec!["jermaine", "jackson", "89"],
468 brec!["karine", "loucan", "52"],
469 brec!["rose", "glib", "12"],
470 brec!["guillaume", "plique", "42"],
471 ];
472
473 for capacity in [32usize, 4, 3, 2, 1] {
474 let mut reader = BufferedReader::with_capacity(capacity, Cursor::new(csv), b',', b'"');
475
476 assert_eq!(
477 reader.byte_records().collect::<Result<Vec<_>, _>>()?,
478 expected
479 );
480 }
481
482 Ok(())
483 }
484
485 #[test]
486 fn test_strip_bom() -> error::Result<()> {
487 let mut reader = BufferedReader::new(Cursor::new("name,surname,age"), b',', b'"');
488 reader.strip_bom()?;
489
490 assert_eq!(
491 reader.byte_records().next().unwrap()?,
492 brec!["name", "surname", "age"]
493 );
494
495 let mut reader =
496 BufferedReader::new(Cursor::new(b"\xef\xbb\xbfname,surname,age"), b',', b'"');
497 reader.strip_bom()?;
498
499 assert_eq!(
500 reader.byte_records().next().unwrap()?,
501 brec!["name", "surname", "age"]
502 );
503
504 Ok(())
505 }
506
507 #[test]
508 fn test_empty_row() -> error::Result<()> {
509 let data = "name\n\"\"\nlucy\n\"\"";
510
511 let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
513
514 assert_eq!(reader.count_records()?, 4);
515
516 let mut reader = BufferedReader::new(Cursor::new(data), b',', b'"');
518
519 let expected = vec![
520 vec!["name".as_bytes().to_vec()],
521 vec!["\"\"".as_bytes().to_vec()],
522 vec!["lucy".as_bytes().to_vec()],
523 vec!["\"\"".as_bytes().to_vec()],
524 ];
525
526 let mut records = Vec::new();
528
529 while let Some(record) = reader.read_zero_copy_byte_record()? {
530 records.push(vec![record.as_slice().to_vec()]);
531 }
532
533 assert_eq!(records, expected);
534
535 let reader = BufferedReader::new(Cursor::new(data), b',', b'"');
536
537 let expected = vec![brec!["name"], brec![""], brec!["lucy"], brec![""]];
538
539 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
540
541 assert_eq!(records, expected);
542
543 Ok(())
544 }
545
546 #[test]
547 fn test_crlf() -> error::Result<()> {
548 let reader = BufferedReader::new(
549 Cursor::new("name,surname\r\nlucy,\"john\"\r\nevan,zhong\r\nbéatrice,glougou\r\n"),
550 b',',
551 b'"',
552 );
553
554 let expected = vec![
555 brec!["name", "surname"],
556 brec!["lucy", "john"],
557 brec!["evan", "zhong"],
558 brec!["béatrice", "glougou"],
559 ];
560
561 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
562
563 assert_eq!(records, expected);
564
565 Ok(())
566 }
567
568 #[test]
569 fn test_quote_always() -> error::Result<()> {
570 let reader = BufferedReader::new(
571 Cursor::new("\"name\",\"surname\"\n\"lucy\",\"rose\"\n\"john\",\"mayhew\""),
572 b',',
573 b'"',
574 );
575
576 let expected = vec![
577 brec!["name", "surname"],
578 brec!["lucy", "rose"],
579 brec!["john", "mayhew"],
580 ];
581
582 let records = reader.into_byte_records().collect::<Result<Vec<_>, _>>()?;
583
584 assert_eq!(records, expected);
585
586 Ok(())
587 }
588}