1use std::io::{BufReader, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8pub struct SplitterBuilder {
9 delimiter: u8,
10 quote: u8,
11 buffer_capacity: Option<usize>,
12 has_headers: bool,
13}
14
15impl Default for SplitterBuilder {
16 fn default() -> Self {
17 Self {
18 delimiter: b',',
19 quote: b'"',
20 buffer_capacity: None,
21 has_headers: true,
22 }
23 }
24}
25
26impl SplitterBuilder {
27 pub fn new() -> Self {
28 Self::default()
29 }
30
31 pub fn with_capacity(capacity: usize) -> Self {
32 let mut splitter = Self::default();
33 splitter.buffer_capacity(capacity);
34 splitter
35 }
36
37 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
38 self.delimiter = delimiter;
39 self
40 }
41
42 pub fn quote(&mut self, quote: u8) -> &mut Self {
43 self.quote = quote;
44 self
45 }
46
47 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
48 self.has_headers = yes;
49 self
50 }
51
52 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
53 self.buffer_capacity = Some(capacity);
54 self
55 }
56
57 pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
58 Splitter {
59 buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
60 inner: CoreReader::new(self.delimiter, self.quote),
61 headers: Vec::new(),
62 has_read: false,
63 has_headers: self.has_headers,
64 must_reemit_headers: !self.has_headers,
65 }
66 }
67}
68
69#[derive(Debug)]
70pub struct Splitter<R> {
71 buffer: ScratchBuffer<R>,
72 inner: CoreReader,
73 headers: Vec<u8>,
74 has_read: bool,
75 has_headers: bool,
76 must_reemit_headers: bool,
77}
78
79impl<R: Read> Splitter<R> {
80 pub fn from_reader(reader: R) -> Self {
81 SplitterBuilder::new().from_reader(reader)
82 }
83
84 pub fn has_headers(&self) -> bool {
85 self.has_headers
86 }
87
88 pub fn byte_headers(&mut self) -> error::Result<&[u8]> {
89 self.on_first_read()?;
90
91 Ok(&self.headers)
92 }
93
94 #[inline(always)]
95 fn on_first_read(&mut self) -> error::Result<()> {
96 if self.has_read {
97 return Ok(());
98 }
99
100 let input = self.buffer.fill_buf()?;
101 let bom_len = trim_bom(input);
102 self.buffer.consume(bom_len);
103
104 if let Some(record) = self.split_record_impl()? {
105 self.headers = record.to_vec();
106 } else {
107 self.must_reemit_headers = false;
108 }
109
110 self.has_read = true;
111
112 Ok(())
113 }
114
115 pub fn count_records(&mut self) -> error::Result<u64> {
116 use ReadResult::*;
117
118 self.on_first_read()?;
119 self.buffer.reset();
120
121 let mut count: u64 = 0;
122
123 if self.must_reemit_headers {
124 count += 1;
125 self.must_reemit_headers = false;
126 }
127
128 loop {
129 let input = self.buffer.fill_buf()?;
130
131 let (result, pos) = self.inner.split_record(input);
132
133 self.buffer.consume(pos);
134
135 match result {
136 End => break,
137 InputEmpty | Cr | Lf => continue,
138 Record => {
139 count += 1;
140 }
141 };
142 }
143
144 Ok(count)
145 }
146
147 pub fn split_record_impl(&mut self) -> error::Result<Option<&[u8]>> {
148 use ReadResult::*;
149
150 self.buffer.reset();
151
152 loop {
153 let input = self.buffer.fill_buf()?;
154
155 let (result, pos) = self.inner.split_record(input);
156
157 match result {
158 End => {
159 self.buffer.consume(pos);
160 return Ok(None);
161 }
162 Cr | Lf => {
163 self.buffer.consume(pos);
164 }
165 InputEmpty => {
166 self.buffer.save();
167 }
168 Record => {
169 return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
170 }
171 };
172 }
173 }
174
175 pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
176 self.on_first_read()?;
177
178 if self.must_reemit_headers {
179 self.must_reemit_headers = false;
180 return Ok(Some(&self.headers));
181 }
182
183 self.split_record_impl()
184 }
185
186 pub fn split_record_with_position(&mut self) -> error::Result<Option<(u64, &[u8])>> {
187 self.on_first_read()?;
188
189 let pos = self.position();
190
191 if self.must_reemit_headers {
192 self.must_reemit_headers = false;
193 return Ok(Some((pos, &self.headers)));
194 }
195
196 match self.split_record_impl() {
197 Ok(Some(record)) => Ok(Some((pos, record))),
198 Ok(None) => Ok(None),
199 Err(err) => Err(err),
200 }
201 }
202
203 pub fn into_bufreader(self) -> (Option<Vec<u8>>, BufReader<R>) {
204 (
205 self.must_reemit_headers.then_some(self.headers),
206 self.buffer.into_bufreader(),
207 )
208 }
209
210 #[inline(always)]
211 pub fn position(&self) -> u64 {
212 if self.must_reemit_headers {
213 0
214 } else {
215 self.buffer.position()
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use std::io::Cursor;
223
224 use super::*;
225
226 fn count_records(data: &str, capacity: usize) -> u64 {
227 let mut splitter = SplitterBuilder::with_capacity(capacity)
228 .has_headers(false)
229 .from_reader(Cursor::new(data));
230 splitter.count_records().unwrap()
231 }
232
233 fn split_records(data: &str, capacity: usize) -> u64 {
234 let mut splitter = SplitterBuilder::with_capacity(capacity)
235 .has_headers(false)
236 .from_reader(Cursor::new(data));
237 let mut count: u64 = 0;
238
239 while let Some(_) = splitter.split_record().unwrap() {
240 count += 1;
241 }
242
243 count
244 }
245
246 #[test]
247 fn test_count() {
248 assert_eq!(count_records("", 1024), 0);
250
251 let tests = vec![
253 "name\njohn\nlucy",
254 "name\njohn\nlucy\n",
255 "name\n\njohn\r\nlucy\n",
256 "name\n\njohn\r\nlucy\n\n",
257 "name\n\n\njohn\r\n\r\nlucy\n\n\n",
258 "\nname\njohn\nlucy",
259 "\n\nname\njohn\nlucy",
260 "\r\n\r\nname\njohn\nlucy",
261 "name\njohn\nlucy\r\n",
262 "name\njohn\nlucy\r\n\r\n",
263 ];
264
265 for capacity in [32usize, 4, 3, 2, 1] {
266 for test in tests.iter() {
267 assert_eq!(
268 count_records(test, capacity),
269 3,
270 "capacity={} string={:?}",
271 capacity,
272 test
273 );
274 }
275 }
276
277 let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
279 assert_eq!(count_records(data, 1024), 3);
280 assert_eq!(split_records(data, 1024), 3);
281
282 for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
284 let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
285
286 assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
287 assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
288 }
289
290 let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
292 assert_eq!(count_records(data, 1024), 3);
293 assert_eq!(split_records(data, 1024), 3);
294 }
295
296 #[test]
297 fn test_empty_row() -> error::Result<()> {
298 let data = "name\n\"\"\nlucy\n\"\"";
299
300 let mut reader = Splitter::from_reader(Cursor::new(data));
302
303 assert_eq!(reader.count_records()?, 3);
304
305 Ok(())
306 }
307}