1use std::io::{Chain, Cursor, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::utils::trim_bom;
8
9pub struct PeekerBuilder {
11 delimiter: u8,
12 quote: u8,
13 buffer_capacity: usize,
14 has_headers: bool,
15}
16
17impl Default for PeekerBuilder {
18 fn default() -> Self {
19 Self {
20 delimiter: b',',
21 quote: b'"',
22 buffer_capacity: 8192,
23 has_headers: true,
24 }
25 }
26}
27
28impl PeekerBuilder {
29 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn with_capacity(capacity: usize) -> Self {
36 let mut reader = Self::default();
37 reader.buffer_capacity(capacity);
38 reader
39 }
40
41 pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
47 self.delimiter = delimiter;
48 self
49 }
50
51 pub fn quote(&mut self, quote: u8) -> &mut Self {
57 self.quote = quote;
58 self
59 }
60
61 pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
63 self.buffer_capacity = capacity;
64 self
65 }
66
67 pub fn has_headers(&mut self, yes: bool) -> &mut Self {
71 self.has_headers = yes;
72 self
73 }
74
75 pub fn from_reader<R: Read>(&self, reader: R) -> Peeker<R> {
78 Peeker {
79 buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
80 inner: CoreReader::new(self.delimiter, self.quote),
81 headers: ByteRecord::new(),
82 rest: Vec::new(),
83 has_headers: self.has_headers,
84 has_read: false,
85 has_crlf_newlines: false,
86 must_reemit_headers: !self.has_headers,
87 }
88 }
89}
90
91pub struct Peeker<R> {
93 buffer: ScratchBuffer<R>,
94 inner: CoreReader,
95 headers: ByteRecord,
96 rest: Vec<u8>,
97 has_headers: bool,
98 has_read: bool,
99 has_crlf_newlines: bool,
100 must_reemit_headers: bool,
101}
102
103impl<R: Read> Peeker<R> {
104 pub fn from_reader(reader: R) -> Self {
107 PeekerBuilder::new().from_reader(reader)
108 }
109
110 fn read_byte_record_impl(&mut self) -> error::Result<bool> {
111 use ReadResult::*;
112
113 let mut seps = vec![];
114
115 loop {
116 let seps_offset = self.buffer.saved().len();
117 let input = self.buffer.fill_buf()?;
118
119 let (result, pos) =
120 self.inner
121 .split_record_and_find_separators(input, seps_offset, &mut seps);
122
123 match result {
124 End => {
125 self.buffer.consume(pos);
126 return Ok(false);
127 }
128 Cr | Lf => {
129 self.buffer.consume(pos);
130 }
131 InputEmpty => {
132 self.buffer.save();
133 }
134 Record => {
135 let bytes = self.buffer.flush(pos);
136
137 let record = ZeroCopyByteRecord::new(bytes, &seps, self.inner.quote);
138
139 if bytes.len().saturating_sub(2) == record.as_slice().len() {
140 self.has_crlf_newlines = true;
141 }
142
143 self.rest = bytes.to_vec();
144 self.headers = record.to_byte_record();
145
146 return Ok(true);
147 }
148 };
149 }
150 }
151
152 fn on_first_read(&mut self) -> error::Result<()> {
153 if self.has_read {
154 return Ok(());
155 }
156
157 let input = self.buffer.fill_buf()?;
159 let bom_len = trim_bom(input);
160 self.buffer.consume(bom_len);
161
162 let has_data = self.read_byte_record_impl()?;
164
165 if !has_data {
166 self.must_reemit_headers = false;
167 }
168
169 self.has_read = true;
170
171 Ok(())
172 }
173
174 #[inline]
177 pub fn has_headers(&self) -> bool {
178 self.has_headers
179 }
180
181 #[inline]
184 pub fn has_crlf_newlines(&mut self) -> error::Result<bool> {
185 self.on_first_read()?;
186
187 Ok(self.has_crlf_newlines)
188 }
189
190 pub fn peek_byte_record(&mut self) -> error::Result<&ByteRecord> {
193 self.on_first_read()?;
194
195 Ok(&self.headers)
196 }
197
198 pub fn peek(&mut self) -> error::Result<&[u8]> {
201 self.on_first_read()?;
202
203 Ok(&self.rest)
204 }
205
206 pub fn into_reader(mut self) -> Chain<Cursor<Vec<u8>>, R> {
207 let bufreader = self.buffer.into_bufreader();
208
209 if !self.must_reemit_headers {
210 self.rest.clear();
211 }
212
213 self.rest.extend_from_slice(bufreader.buffer());
214
215 Cursor::new(self.rest).chain(bufreader.into_inner())
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_peeker() -> error::Result<()> {
225 let mut buffer: Vec<u8> = Vec::new();
227
228 let mut peeker = Peeker::from_reader(&b"name,surname\nhello,world\njohn,lucy"[..]);
229
230 assert_eq!(peeker.peek_byte_record()?, &brec!["name", "surname"]);
231 assert_eq!(peeker.has_crlf_newlines()?, false);
232
233 peeker.into_reader().read_to_end(&mut buffer)?;
234 assert_eq!(&buffer, b"hello,world\njohn,lucy");
235
236 let mut peeker = Peeker::from_reader(&b"name,surname\r\nhello,world\r\njohn,lucy"[..]);
238
239 assert_eq!(peeker.peek_byte_record()?, &brec!["name", "surname"]);
240 assert_eq!(peeker.has_crlf_newlines()?, true);
241
242 buffer.clear();
243 peeker.into_reader().read_to_end(&mut buffer)?;
244 assert_eq!(&buffer, b"hello,world\r\njohn,lucy");
245
246 let mut peeker = PeekerBuilder::new()
248 .has_headers(false)
249 .from_reader(&b"bonjour,le monde\nhello,world\njohn,lucy"[..]);
250
251 assert_eq!(peeker.peek_byte_record()?, &brec!["bonjour", "le monde"]);
252 assert_eq!(peeker.has_crlf_newlines()?, false);
253
254 buffer.clear();
255 peeker.into_reader().read_to_end(&mut buffer)?;
256 assert_eq!(&buffer, b"bonjour,le monde\nhello,world\njohn,lucy");
257
258 let mut peeker = PeekerBuilder::new()
260 .has_headers(false)
261 .from_reader(&b"bonjour,le monde\r\nhello,world\r\njohn,lucy"[..]);
262
263 assert_eq!(peeker.peek_byte_record()?, &brec!["bonjour", "le monde"]);
264 assert_eq!(peeker.has_crlf_newlines()?, true);
265
266 buffer.clear();
267 peeker.into_reader().read_to_end(&mut buffer)?;
268 assert_eq!(&buffer, b"bonjour,le monde\r\nhello,world\r\njohn,lucy");
269
270 Ok(())
271 }
272}