1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8pub struct SplitterBuilder {
10 delimiter: u8,
11 quote: u8,
12 buffer_capacity: Option<usize>,
13 has_headers: bool,
14}
15
16impl Default for SplitterBuilder {
17 fn default() -> Self {
18 Self {
19 delimiter: b',',
20 quote: b'"',
21 buffer_capacity: None,
22 has_headers: true,
23 }
24 }
25}
26
27impl SplitterBuilder {
28 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn with_capacity(capacity: usize) -> Self {
33 let mut splitter = Self::default();
34 splitter.buffer_capacity(capacity);
35 splitter
36 }
37
38 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
39 self.delimiter = delimiter;
40 self
41 }
42
43 pub fn quote(&mut self, quote: u8) -> &mut Self {
44 self.quote = quote;
45 self
46 }
47
48 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
49 self.has_headers = yes;
50 self
51 }
52
53 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
54 self.buffer_capacity = Some(capacity);
55 self
56 }
57
58 pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
59 Splitter {
60 buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
61 inner: CoreReader::new(self.delimiter, self.quote),
62 headers: Vec::new(),
63 has_read: false,
64 has_headers: self.has_headers,
65 must_reemit_headers: !self.has_headers,
66 }
67 }
68}
69
70#[derive(Debug)]
78pub struct Splitter<R> {
79 buffer: ScratchBuffer<R>,
80 inner: CoreReader,
81 headers: Vec<u8>,
82 has_read: bool,
83 has_headers: bool,
84 must_reemit_headers: bool,
85}
86
87impl<R: Read> Splitter<R> {
88 pub fn from_reader(reader: R) -> Self {
89 SplitterBuilder::new().from_reader(reader)
90 }
91
92 pub fn has_headers(&self) -> bool {
95 self.has_headers
96 }
97
98 pub fn byte_headers(&mut self) -> error::Result<&[u8]> {
99 self.on_first_read()?;
100
101 Ok(&self.headers)
102 }
103
104 #[inline(always)]
105 fn on_first_read(&mut self) -> error::Result<()> {
106 if self.has_read {
107 return Ok(());
108 }
109
110 let input = self.buffer.fill_buf()?;
111 let bom_len = trim_bom(input);
112 self.buffer.consume(bom_len);
113
114 if let Some(record) = self.split_record_impl()? {
115 self.headers = record.to_vec();
116 } else {
117 self.must_reemit_headers = false;
118 }
119
120 self.has_read = true;
121
122 Ok(())
123 }
124
125 pub fn count_records(&mut self) -> error::Result<u64> {
126 use ReadResult::*;
127
128 self.on_first_read()?;
129 self.buffer.reset();
130
131 let mut count: u64 = 0;
132
133 if self.must_reemit_headers {
134 count += 1;
135 self.must_reemit_headers = false;
136 }
137
138 loop {
139 let input = self.buffer.fill_buf()?;
140
141 let (result, pos) = self.inner.split_record(input);
142
143 self.buffer.consume(pos);
144
145 match result {
146 End => break,
147 InputEmpty | Cr | Lf => continue,
148 Record => {
149 count += 1;
150 }
151 };
152 }
153
154 Ok(count)
155 }
156
157 pub fn split_record_impl(&mut self) -> error::Result<Option<&[u8]>> {
158 use ReadResult::*;
159
160 self.buffer.reset();
161
162 loop {
163 let input = self.buffer.fill_buf()?;
164
165 let (result, pos) = self.inner.split_record(input);
166
167 match result {
168 End => {
169 self.buffer.consume(pos);
170 return Ok(None);
171 }
172 Cr | Lf => {
173 self.buffer.consume(pos);
174 }
175 InputEmpty => {
176 self.buffer.save();
177 }
178 Record => {
179 return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
180 }
181 };
182 }
183 }
184
185 pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
186 self.on_first_read()?;
187
188 if self.must_reemit_headers {
189 self.must_reemit_headers = false;
190 return Ok(Some(&self.headers));
191 }
192
193 self.split_record_impl()
194 }
195
196 pub fn split_record_with_position(&mut self) -> error::Result<Option<(u64, &[u8])>> {
197 self.on_first_read()?;
198
199 let pos = self.position();
200
201 if self.must_reemit_headers {
202 self.must_reemit_headers = false;
203 return Ok(Some((pos, &self.headers)));
204 }
205
206 match self.split_record_impl() {
207 Ok(Some(record)) => Ok(Some((pos, record))),
208 Ok(None) => Ok(None),
209 Err(err) => Err(err),
210 }
211 }
212
213 pub fn into_bufreader(self) -> (Option<Vec<u8>>, BufReader<R>) {
214 (
215 self.must_reemit_headers.then_some(self.headers),
216 self.buffer.into_bufreader(),
217 )
218 }
219
220 #[inline(always)]
221 pub fn position(&self) -> u64 {
222 if self.must_reemit_headers {
223 0
224 } else {
225 self.buffer.position()
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use std::io::Cursor;
233
234 use super::*;
235
236 fn count_records(data: &str, capacity: usize) -> u64 {
237 let mut splitter = SplitterBuilder::with_capacity(capacity)
238 .has_headers(false)
239 .from_reader(Cursor::new(data));
240 splitter.count_records().unwrap()
241 }
242
243 fn split_records(data: &str, capacity: usize) -> u64 {
244 let mut splitter = SplitterBuilder::with_capacity(capacity)
245 .has_headers(false)
246 .from_reader(Cursor::new(data));
247 let mut count: u64 = 0;
248
249 while let Some(_) = splitter.split_record().unwrap() {
250 count += 1;
251 }
252
253 count
254 }
255
256 #[test]
257 fn test_count() {
258 assert_eq!(count_records("", 1024), 0);
260
261 let tests = vec![
263 "name\njohn\nlucy",
264 "name\njohn\nlucy\n",
265 "name\n\njohn\r\nlucy\n",
266 "name\n\njohn\r\nlucy\n\n",
267 "name\n\n\njohn\r\n\r\nlucy\n\n\n",
268 "\nname\njohn\nlucy",
269 "\n\nname\njohn\nlucy",
270 "\r\n\r\nname\njohn\nlucy",
271 "name\njohn\nlucy\r\n",
272 "name\njohn\nlucy\r\n\r\n",
273 ];
274
275 for capacity in [32usize, 4, 3, 2, 1] {
276 for test in tests.iter() {
277 assert_eq!(
278 count_records(test, capacity),
279 3,
280 "capacity={} string={:?}",
281 capacity,
282 test
283 );
284 }
285 }
286
287 let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
289 assert_eq!(count_records(data, 1024), 3);
290 assert_eq!(split_records(data, 1024), 3);
291
292 for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
294 let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
295
296 assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
297 assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
298 }
299
300 let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
302 assert_eq!(count_records(data, 1024), 3);
303 assert_eq!(split_records(data, 1024), 3);
304 }
305
306 #[test]
307 fn test_empty_row() -> error::Result<()> {
308 let data = "name\n\"\"\nlucy\n\"\"";
309
310 let mut reader = Splitter::from_reader(Cursor::new(data));
312
313 assert_eq!(reader.count_records()?, 3);
314
315 Ok(())
316 }
317}