1use std::io::Read;
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::utils::{trim_bom, trim_trailing_crlf};
7
8pub struct SplitterBuilder {
9 delimiter: u8,
10 quote: u8,
11 buffer_capacity: Option<usize>,
12}
13
14impl Default for SplitterBuilder {
15 fn default() -> Self {
16 Self {
17 delimiter: b',',
18 quote: b'"',
19 buffer_capacity: None,
20 }
21 }
22}
23
24impl SplitterBuilder {
25 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn with_capacity(capacity: usize) -> Self {
30 let mut splitter = Self::default();
31 splitter.buffer_capacity(capacity);
32 splitter
33 }
34
35 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
36 self.delimiter = delimiter;
37 self
38 }
39
40 pub fn quote(&mut self, quote: u8) -> &mut Self {
41 self.quote = quote;
42 self
43 }
44
45 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
46 self.buffer_capacity = Some(capacity);
47 self
48 }
49
50 pub fn from_reader<R: Read>(&self, reader: R) -> Splitter<R> {
51 Splitter {
52 buffer: ScratchBuffer::with_optional_capacity(self.buffer_capacity, reader),
53 inner: CoreReader::new(self.delimiter, self.quote),
54 has_read: false,
55 }
56 }
57}
58
59pub struct Splitter<R> {
60 buffer: ScratchBuffer<R>,
61 inner: CoreReader,
62 has_read: bool,
63}
64
65impl<R: Read> Splitter<R> {
66 pub fn from_reader(reader: R) -> Self {
67 SplitterBuilder::new().from_reader(reader)
68 }
69
70 #[inline(always)]
71 fn on_first_read(&mut self) -> error::Result<()> {
72 if self.has_read {
73 return Ok(());
74 }
75
76 let input = self.buffer.fill_buf()?;
77 let bom_len = trim_bom(input);
78 self.buffer.consume(bom_len);
79 self.has_read = true;
80
81 Ok(())
82 }
83
84 pub fn count_records(&mut self) -> error::Result<u64> {
85 use ReadResult::*;
86
87 self.on_first_read()?;
88
89 let mut count: u64 = 0;
90
91 loop {
92 let input = self.buffer.fill_buf()?;
93
94 let (result, pos) = self.inner.split_record(input);
95
96 self.buffer.consume(pos);
97
98 match result {
99 End => break,
100 InputEmpty | Cr | Lf => continue,
101 Record => {
102 count += 1;
103 }
104 };
105 }
106
107 Ok(count)
108 }
109
110 pub fn split_record(&mut self) -> error::Result<Option<&[u8]>> {
111 use ReadResult::*;
112
113 self.on_first_read()?;
114
115 self.buffer.reset();
116
117 loop {
118 let input = self.buffer.fill_buf()?;
119
120 let (result, pos) = self.inner.split_record(input);
121
122 match result {
123 End => {
124 self.buffer.consume(pos);
125 return Ok(None);
126 }
127 Cr | Lf => {
128 self.buffer.consume(pos);
129 }
130 InputEmpty => {
131 self.buffer.save();
132 }
133 Record => {
134 return Ok(Some(trim_trailing_crlf(self.buffer.flush(pos))));
135 }
136 };
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::io::Cursor;
144
145 use super::*;
146
147 fn count_records(data: &str, capacity: usize) -> u64 {
148 let mut splitter = SplitterBuilder::with_capacity(capacity).from_reader(Cursor::new(data));
149 splitter.count_records().unwrap()
150 }
151
152 fn split_records(data: &str, capacity: usize) -> u64 {
153 let mut splitter = SplitterBuilder::with_capacity(capacity).from_reader(Cursor::new(data));
154 let mut count: u64 = 0;
155
156 while let Some(_) = splitter.split_record().unwrap() {
157 count += 1;
158 }
159
160 count
161 }
162
163 #[test]
164 fn test_count() {
165 assert_eq!(count_records("", 1024), 0);
167
168 let tests = vec![
170 "name\njohn\nlucy",
171 "name\njohn\nlucy\n",
172 "name\n\njohn\r\nlucy\n",
173 "name\n\njohn\r\nlucy\n\n",
174 "name\n\n\njohn\r\n\r\nlucy\n\n\n",
175 "\nname\njohn\nlucy",
176 "\n\nname\njohn\nlucy",
177 "\r\n\r\nname\njohn\nlucy",
178 "name\njohn\nlucy\r\n",
179 "name\njohn\nlucy\r\n\r\n",
180 ];
181
182 for capacity in [32usize, 4, 3, 2, 1] {
183 for test in tests.iter() {
184 assert_eq!(
185 count_records(test, capacity),
186 3,
187 "capacity={} string={:?}",
188 capacity,
189 test
190 );
191 }
192 }
193
194 let data = "name,surname,age\njohn,landy,45\nlucy,rose,67";
196 assert_eq!(count_records(data, 1024), 3);
197 assert_eq!(split_records(data, 1024), 3);
198
199 for capacity in [1024usize, 32usize, 4, 3, 2, 1] {
201 let data = "name,surname,age\n\"john\",\"landy, the \"\"everlasting\"\" bastard\",45\nlucy,rose,\"67\"\njermaine,jackson,\"89\"\n\nkarine,loucan,\"52\"\r\n";
202
203 assert_eq!(count_records(data, capacity), 5, "capacity={}", capacity);
204 assert_eq!(split_records(data, capacity), 5, "capacity={}", capacity);
205 }
206
207 let data = "name\tsurname\tage\njohn\tlandy\t45\nlucy\trose\t67";
209 assert_eq!(count_records(data, 1024), 3);
210 assert_eq!(split_records(data, 1024), 3);
211 }
212
213 #[test]
214 fn test_empty_row() -> error::Result<()> {
215 let data = "name\n\"\"\nlucy\n\"\"";
216
217 let mut reader = Splitter::from_reader(Cursor::new(data));
219
220 assert_eq!(reader.count_records()?, 4);
221
222 Ok(())
223 }
224}