Skip to main content

simd_csv/
peeker.rs

1use std::io::{Chain, Cursor, Read};
2
3use crate::buffer::ScratchBuffer;
4use crate::core::{CoreReader, ReadResult};
5use crate::error;
6use crate::records::{ByteRecord, ZeroCopyByteRecord};
7use crate::utils::trim_bom;
8
9/// Builds a [`Peeker`] with given configuration.
10pub struct PeekerBuilder {
11    delimiter: u8,
12    quote: u8,
13    buffer_capacity: usize,
14    has_headers: bool,
15}
16
17impl Default for PeekerBuilder {
18    fn default() -> Self {
19        Self {
20            delimiter: b',',
21            quote: b'"',
22            buffer_capacity: 8192,
23            has_headers: true,
24        }
25    }
26}
27
28impl PeekerBuilder {
29    /// Create a new [`PeekerBuilder`] with default configuration.
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Create a new [`PeekerBuilder`] with provided `capacity`.
35    pub fn with_capacity(capacity: usize) -> Self {
36        let mut reader = Self::default();
37        reader.buffer_capacity(capacity);
38        reader
39    }
40
41    /// Set the delimiter to be used by the created [`Peeker`].
42    ///
43    /// This delimiter must be a single byte.
44    ///
45    /// Will default to a comma.
46    pub fn delimiter(&mut self, delimiter: u8) -> &mut Self {
47        self.delimiter = delimiter;
48        self
49    }
50
51    /// Set the quote char to be used by the created [`Peeker`].
52    ///
53    /// This char must be a single byte.
54    ///
55    /// Will default to a double quote.
56    pub fn quote(&mut self, quote: u8) -> &mut Self {
57        self.quote = quote;
58        self
59    }
60
61    /// Set the capacity of the created [`Peeker`]'s buffer
62    pub fn buffer_capacity(&mut self, capacity: usize) -> &mut Self {
63        self.buffer_capacity = capacity;
64        self
65    }
66
67    /// Indicate whether first record must be understood as a header.
68    ///
69    /// Will default to `true`.
70    pub fn has_headers(&mut self, yes: bool) -> &mut Self {
71        self.has_headers = yes;
72        self
73    }
74
75    /// Create a new [`Peeker`] using the provided reader implementing
76    /// [`std::io::Read`].
77    pub fn from_reader<R: Read>(&self, reader: R) -> Peeker<R> {
78        Peeker {
79            buffer: ScratchBuffer::with_capacity(self.buffer_capacity, reader),
80            inner: CoreReader::new(self.delimiter, self.quote),
81            headers: ByteRecord::new(),
82            rest: Vec::new(),
83            has_headers: self.has_headers,
84            has_read: false,
85            has_crlf_newlines: false,
86            must_reemit_headers: !self.has_headers,
87        }
88    }
89}
90
91/// A [`Read`] stream peeker that can be used to check a CSV file's header.
92pub struct Peeker<R> {
93    buffer: ScratchBuffer<R>,
94    inner: CoreReader,
95    headers: ByteRecord,
96    rest: Vec<u8>,
97    has_headers: bool,
98    has_read: bool,
99    has_crlf_newlines: bool,
100    must_reemit_headers: bool,
101}
102
103impl<R: Read> Peeker<R> {
104    /// Create a new peeker with default configuration using the provided reader
105    /// implementing [`std::io::Read`].
106    pub fn from_reader(reader: R) -> Self {
107        PeekerBuilder::new().from_reader(reader)
108    }
109
110    fn read_byte_record_impl(&mut self) -> error::Result<bool> {
111        use ReadResult::*;
112
113        let mut seps = vec![];
114
115        loop {
116            let seps_offset = self.buffer.saved().len();
117            let input = self.buffer.fill_buf()?;
118
119            let (result, pos) =
120                self.inner
121                    .split_record_and_find_separators(input, seps_offset, &mut seps);
122
123            match result {
124                End => {
125                    self.buffer.consume(pos);
126                    return Ok(false);
127                }
128                Cr | Lf => {
129                    self.buffer.consume(pos);
130                }
131                InputEmpty => {
132                    self.buffer.save();
133                }
134                Record => {
135                    let bytes = self.buffer.flush(pos);
136
137                    let record = ZeroCopyByteRecord::new(bytes, &seps, self.inner.quote);
138
139                    if bytes.len().saturating_sub(2) == record.as_slice().len() {
140                        self.has_crlf_newlines = true;
141                    }
142
143                    self.rest = bytes.to_vec();
144                    self.headers = record.to_byte_record();
145
146                    return Ok(true);
147                }
148            };
149        }
150    }
151
152    fn on_first_read(&mut self) -> error::Result<()> {
153        if self.has_read {
154            return Ok(());
155        }
156
157        // Trimming BOM
158        let input = self.buffer.fill_buf()?;
159        let bom_len = trim_bom(input);
160        self.buffer.consume(bom_len);
161
162        // Reading headers
163        let has_data = self.read_byte_record_impl()?;
164
165        if !has_data {
166            self.must_reemit_headers = false;
167        }
168
169        self.has_read = true;
170
171        Ok(())
172    }
173
174    /// Returns whether this peeker has been configured to interpret the first
175    /// record as a header.
176    #[inline]
177    pub fn has_headers(&self) -> bool {
178        self.has_headers
179    }
180
181    /// Returns whether this peeker seems to be reading from a stream having
182    /// CRLF newlines.
183    #[inline]
184    pub fn has_crlf_newlines(&mut self) -> error::Result<bool> {
185        self.on_first_read()?;
186
187        Ok(self.has_crlf_newlines)
188    }
189
190    /// Attempt to read the first record of the stream without consuming related
191    /// bytes.
192    pub fn peek_byte_record(&mut self) -> error::Result<&ByteRecord> {
193        self.on_first_read()?;
194
195        Ok(&self.headers)
196    }
197
198    /// Attempt to return the first record of the stream as bytes without consuming
199    /// them.
200    pub fn peek(&mut self) -> error::Result<&[u8]> {
201        self.on_first_read()?;
202
203        Ok(&self.rest)
204    }
205
206    pub fn into_reader(mut self) -> Chain<Cursor<Vec<u8>>, R> {
207        let bufreader = self.buffer.into_bufreader();
208
209        if !self.must_reemit_headers {
210            self.rest.clear();
211        }
212
213        self.rest.extend_from_slice(bufreader.buffer());
214
215        Cursor::new(self.rest).chain(bufreader.into_inner())
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn test_peeker() -> error::Result<()> {
225        // LF, headers
226        let mut buffer: Vec<u8> = Vec::new();
227
228        let mut peeker = Peeker::from_reader(&b"name,surname\nhello,world\njohn,lucy"[..]);
229
230        assert_eq!(peeker.peek_byte_record()?, &brec!["name", "surname"]);
231        assert_eq!(peeker.has_crlf_newlines()?, false);
232
233        peeker.into_reader().read_to_end(&mut buffer)?;
234        assert_eq!(&buffer, b"hello,world\njohn,lucy");
235
236        // CRLF, headers
237        let mut peeker = Peeker::from_reader(&b"name,surname\r\nhello,world\r\njohn,lucy"[..]);
238
239        assert_eq!(peeker.peek_byte_record()?, &brec!["name", "surname"]);
240        assert_eq!(peeker.has_crlf_newlines()?, true);
241
242        buffer.clear();
243        peeker.into_reader().read_to_end(&mut buffer)?;
244        assert_eq!(&buffer, b"hello,world\r\njohn,lucy");
245
246        // LF, no headers
247        let mut peeker = PeekerBuilder::new()
248            .has_headers(false)
249            .from_reader(&b"bonjour,le monde\nhello,world\njohn,lucy"[..]);
250
251        assert_eq!(peeker.peek_byte_record()?, &brec!["bonjour", "le monde"]);
252        assert_eq!(peeker.has_crlf_newlines()?, false);
253
254        buffer.clear();
255        peeker.into_reader().read_to_end(&mut buffer)?;
256        assert_eq!(&buffer, b"bonjour,le monde\nhello,world\njohn,lucy");
257
258        // CRLF, no headers
259        let mut peeker = PeekerBuilder::new()
260            .has_headers(false)
261            .from_reader(&b"bonjour,le monde\r\nhello,world\r\njohn,lucy"[..]);
262
263        assert_eq!(peeker.peek_byte_record()?, &brec!["bonjour", "le monde"]);
264        assert_eq!(peeker.has_crlf_newlines()?, true);
265
266        buffer.clear();
267        peeker.into_reader().read_to_end(&mut buffer)?;
268        assert_eq!(&buffer, b"bonjour,le monde\r\nhello,world\r\njohn,lucy");
269
270        Ok(())
271    }
272}