simd_csv/
zero_copy_reader.rs

1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error::{self, Error};
6use crate::records::ZeroCopyByteRecord;
7
8pub struct ZeroCopyReaderBuilder {
9    delimiter: u8,
10    quote: u8,
11    buffer_capacity: Option<usize>,
12    flexible: bool,
13}
14
15impl Default for ZeroCopyReaderBuilder {
16    fn default() -> Self {
17        Self {
18            delimiter: b',',
19            quote: b'"',
20            buffer_capacity: None,
21            flexible: false,
22        }
23    }
24}
25
26impl ZeroCopyReaderBuilder {
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    pub fn with_capacity(capacity: usize) -> Self {
32        let mut reader = Self::default();
33        reader.buffer_capacity(capacity);
34        reader
35    }
36
37    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
38        self.delimiter = delimiter;
39        self
40    }
41
42    pub fn quote(&mut self, quote: u8) -> &mut Self {
43        self.quote = quote;
44        self
45    }
46
47    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
48        self.buffer_capacity = Some(capacity);
49        self
50    }
51
52    pub fn flexible(&mut self, yes: bool) -> &mut Self {
53        self.flexible = yes;
54        self
55    }
56
57    pub fn from_reader<R: Read>(&self, reader: R) -> ZeroCopyReader<R> {
58        ZeroCopyReader {
59            buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
60            inner: CoreReader::new(self.delimiter, self.quote),
61            field_count: None,
62            seps: Vec::new(),
63            flexible: self.flexible,
64        }
65    }
66}
67
68pub struct ZeroCopyReader<R> {
69    buffer: ScratchBuffer<R>,
70    inner: CoreReader,
71    field_count: Option<usize>,
72    seps: Vec<usize>,
73    flexible: bool,
74}
75
76impl<R: Read> ZeroCopyReader<R> {
77    pub fn from_reader(reader: R) -> Self {
78        ZeroCopyReaderBuilder::new().from_reader(reader)
79    }
80
81    #[inline]
82    fn check_field_count(&mut self, written: usize) -> error::Result<()> {
83        if self.flexible {
84            return Ok(());
85        }
86
87        match self.field_count {
88            Some(expected) => {
89                if written != expected {
90                    return Err(Error::unequal_lengths(expected, written));
91                }
92            }
93            None => {
94                self.field_count = Some(written);
95            }
96        }
97
98        Ok(())
99    }
100
101    pub fn strip_bom(&mut self) -> error::Result<()> {
102        self.buffer.strip_bom()?;
103        Ok(())
104    }
105
106    pub fn read_byte_record(&mut self) -> error::Result<Option<ZeroCopyByteRecord<'_>>> {
107        use ReadResult::*;
108
109        self.buffer.reset();
110        self.seps.clear();
111
112        loop {
113            let seps_offset = self.buffer.saved().len();
114            let input = self.buffer.fill_buf()?;
115
116            let (result, pos) =
117                self.inner
118                    .split_record_and_find_separators(input, seps_offset, &mut self.seps);
119
120            match result {
121                End => {
122                    self.buffer.consume(pos);
123                    return Ok(None);
124                }
125                Cr | Lf => {
126                    self.buffer.consume(pos);
127                }
128                InputEmpty => {
129                    self.buffer.save();
130                }
131                Record => {
132                    self.check_field_count(self.seps.len() + 1)?;
133
134                    let record = ZeroCopyByteRecord::new(self.buffer.flush(pos), &self.seps);
135
136                    return Ok(Some(record));
137                }
138            };
139        }
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use std::io::Cursor;
146
147    use super::*;
148
149    #[test]
150    fn test_read_zero_copy_byte_record() -> error::Result<()> {
151        let csv = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\nrose,\"glib\",12\n\"guillaume\",\"plique\",\"42\"\r\n";
152
153        let mut reader = ZeroCopyReaderBuilder::with_capacity(32).from_reader(Cursor::new(csv));
154        let mut records = Vec::new();
155
156        let expected = vec![
157            vec!["name", "surname", "age"],
158            vec![
159                "\"john\"",
160                "\"landy, the \"\"everlasting\"\" bastard\"",
161                "45",
162            ],
163            vec!["lucy", "rose", "\"67\""],
164            vec!["jermaine", "jackson", "\"89\""],
165            vec!["karine", "loucan", "\"52\""],
166            vec!["rose", "\"glib\"", "12"],
167            vec!["\"guillaume\"", "\"plique\"", "\"42\""],
168        ]
169        .into_iter()
170        .map(|record| {
171            record
172                .into_iter()
173                .map(|cell| cell.as_bytes().to_vec())
174                .collect::<Vec<_>>()
175        })
176        .collect::<Vec<_>>();
177
178        while let Some(record) = reader.read_byte_record()? {
179            records.push(record.iter().map(|cell| cell.to_vec()).collect::<Vec<_>>());
180        }
181
182        assert_eq!(records, expected);
183
184        Ok(())
185    }
186
187    #[test]
188    fn test_empty_row() -> error::Result<()> {
189        let data = "name\n\"\"\nlucy\n\"\"";
190
191        // Zero-copy
192        let mut reader = ZeroCopyReader::from_reader(Cursor::new(data));
193
194        let expected = vec![
195            vec!["name".as_bytes().to_vec()],
196            vec!["\"\"".as_bytes().to_vec()],
197            vec!["lucy".as_bytes().to_vec()],
198            vec!["\"\"".as_bytes().to_vec()],
199        ];
200
201        // Read
202        let mut records = Vec::new();
203
204        while let Some(record) = reader.read_byte_record()? {
205            records.push(vec![record.as_slice().to_vec()]);
206        }
207
208        assert_eq!(records, expected);
209
210        Ok(())
211    }
212}