1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8pub struct SplitterBuilder {
10 delimiter: u8,
11 quote: u8,
12 buffer_capacity: Option<usize>,
13 has_headers: bool,
14}
15
16impl Default for SplitterBuilder {
17 fn default() -> Self {
18 Self {
19 delimiter: b',',
20 quote: b'"',
21 buffer_capacity: None,
22 has_headers: true,
23 }
24 }
25}
26
27impl SplitterBuilder {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn with_capacity(capacity: usize) -> Self {
35 let mut splitter = Self::default();
36 splitter.buffer_capacity(capacity);
37 splitter
38 }
39
40 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
46 self.delimiter = delimiter;
47 self
48 }
49
50 pub fn quote(&mut self, quote: u8) -> &mut Self {
56 self.quote = quote;
57 self
58 }
59
60 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
64 self.has_headers = yes;
65 self
66 }
67
68 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
70 self.buffer_capacity = Some(capacity);
71 self
72 }
73
74 pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
77 Splitter {
78 buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
79 inner: CoreReader::new(self.delimiter, self.quote),
80 headers: Vec::new(),
81 has_read: false,
82 has_headers: self.has_headers,
83 must_reemit_headers: !self.has_headers,
84 }
85 }
86}
87
88#[derive(Debug)]
96pub struct Splitter<R> {
97 buffer: ScratchBuffer<R>,
98 inner: CoreReader,
99 headers: Vec<u8>,
100 has_read: bool,
101 has_headers: bool,
102 must_reemit_headers: bool,
103}
104
105impl<R: Read> Splitter<R> {
106 pub fn from_reader(reader: R) -> Self {
109 SplitterBuilder::new().from_reader(reader)
110 }
111
112 pub fn has_headers(&self) -> bool {
115 self.has_headers
116 }
117
118 pub fn byte_headers(&mut self) -> error::Result<&[u8]> {
120 self.on_first_read()?;
121
122 Ok(&self.headers)
123 }
124
125 #[inline(always)]
126 fn on_first_read(&mut self) -> error::Result<()> {
127 if self.has_read {
128 return Ok(());
129 }
130
131 let input = self.buffer.fill_buf()?;
132 let bom_len = trim_bom(input);
133 self.buffer.consume(bom_len);
134
135 if let Some(record) = self.split_record_impl()? {
136 self.headers = record.to_vec();
137 } else {
138 self.must_reemit_headers = false;
139 }
140
141 self.has_read = true;
142
143 Ok(())
144 }
145
146 pub fn count_records(&mut self) -> error::Result<u64> {
149 use ReadResult::*;
150
151 self.on_first_read()?;
152 self.buffer.reset();
153
154 let mut count: u64 = 0;
155
156 if self.must_reemit_headers {
157 count += 1;
158 self.must_reemit_headers = false;
159 }
160
161 loop {
162 let input = self.buffer.fill_buf()?;
163
164 let (result, pos) = self.inner.split_record(input);
165
166 self.buffer.consume(pos);
167
168 match result {
169 End => break,
170 InputEmpty | Cr | Lf => continue,
171 Record => {
172 count += 1;
173 }
174 };
175 }
176
177 Ok(count)
178 }
179
180 fn split_record_impl(&mut self) -> error::Result<Option<&[u8]>> {
181 use ReadResult::*;
182
183 self.buffer.reset();
184
185 loop {
186 let input = self.buffer.fill_buf()?;
187
188 let (result, pos) = self.inner.split_record(input);
189
190 match result {
191 End => {
192 self.buffer.consume(pos);
193 return Ok(None);
194 }
195 Cr | Lf => {
196 self.buffer.consume(pos);
197 }
198 InputEmpty => {
199 self.buffer.save();
200 }
201 Record => {
202 return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
203 }
204 };
205 }
206 }
207
208 pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
213 self.on_first_read()?;
214
215 if self.must_reemit_headers {
216 self.must_reemit_headers = false;
217 return Ok(Some(&self.headers));
218 }
219
220 self.split_record_impl()
221 }
222
223 pub fn split_record_with_position(&mut self) -> error::Result<Option<(u64, &[u8])>> {
228 self.on_first_read()?;
229
230 let pos = self.position();
231
232 if self.must_reemit_headers {
233 self.must_reemit_headers = false;
234 return Ok(Some((pos, &self.headers)));
235 }
236
237 match self.split_record_impl() {
238 Ok(Some(record)) => Ok(Some((pos, record))),
239 Ok(None) => Ok(None),
240 Err(err) => Err(err),
241 }
242 }
243
244 pub fn into_bufreader(self) -> (Option<Vec<u8>>, BufReader<R>) {
249 (
250 self.must_reemit_headers.then_some(self.headers),
251 self.buffer.into_bufreader(),
252 )
253 }
254
255 #[inline(always)]
257 pub fn position(&self) -> u64 {
258 if self.must_reemit_headers {
259 0
260 } else {
261 self.buffer.position()
262 }
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use std::io::Cursor;
269
270 use super::*;
271
272 fn count_records(data: &str, capacity: usize) -> u64 {
273 let mut splitter = SplitterBuilder::with_capacity(capacity)
274 .has_headers(false)
275 .from_reader(Cursor::new(data));
276 splitter.count_records().unwrap()
277 }
278
279 fn split_records(data: &str, capacity: usize) -> u64 {
280 let mut splitter = SplitterBuilder::with_capacity(capacity)
281 .has_headers(false)
282 .from_reader(Cursor::new(data));
283 let mut count: u64 = 0;
284
285 while let Some(_) = splitter.split_record().unwrap() {
286 count += 1;
287 }
288
289 count
290 }
291
292 #[test]
293 fn test_count() {
294 assert_eq!(count_records("", 1024), 0);
296
297 let tests = vec![
299 "name\njohn\nlucy",
300 "name\njohn\nlucy\n",
301 "name\n\njohn\r\nlucy\n",
302 "name\n\njohn\r\nlucy\n\n",
303 "name\n\n\njohn\r\n\r\nlucy\n\n\n",
304 "\nname\njohn\nlucy",
305 "\n\nname\njohn\nlucy",
306 "\r\n\r\nname\njohn\nlucy",
307 "name\njohn\nlucy\r\n",
308 "name\njohn\nlucy\r\n\r\n",
309 ];
310
311 for capacity in [32usize, 4, 3, 2, 1] {
312 for test in tests.iter() {
313 assert_eq!(
314 count_records(test, capacity),
315 3,
316 "capacity={} string={:?}",
317 capacity,
318 test
319 );
320 }
321 }
322
323 let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
325 assert_eq!(count_records(data, 1024), 3);
326 assert_eq!(split_records(data, 1024), 3);
327
328 for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
330 let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
331
332 assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
333 assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
334 }
335
336 let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
338 assert_eq!(count_records(data, 1024), 3);
339 assert_eq!(split_records(data, 1024), 3);
340 }
341
342 #[test]
343 fn test_empty_row() -> error::Result<()> {
344 let data = "name\n\"\"\nlucy\n\"\"";
345
346 let mut reader = Splitter::from_reader(Cursor::new(data));
348
349 assert_eq!(reader.count_records()?, 3);
350
351 Ok(())
352 }
353}