1use crate::{parse_finish, parse_streaming, Paragraph, Streaming, SyntaxError};
2use alloc::vec::Vec;
3use core::{
4 fmt,
5 str::{from_utf8, Utf8Error},
6};
7
8pub trait BufParseInput {
14 type Error;
16
17 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
22}
23
24#[cfg(feature = "std")]
25impl<R: std::io::Read + ?Sized> BufParseInput for R {
26 type Error = std::io::Error;
27
28 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
29 self.read(buf)
30 }
31}
32
33#[derive(Debug)]
35pub enum BufParseError<'a> {
36 InvalidUtf8(Utf8Error),
38 InvalidSyntax(SyntaxError<'a>),
40}
41
42impl fmt::Display for BufParseError<'_> {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 BufParseError::InvalidUtf8(err) => write!(f, "invalid utf-8 in input: {}", err),
46 BufParseError::InvalidSyntax(err) => write!(f, "invalid syntax: {}", err),
47 }
48 }
49}
50
51impl<'a> From<Utf8Error> for BufParseError<'a> {
52 fn from(err: Utf8Error) -> Self {
53 BufParseError::InvalidUtf8(err)
54 }
55}
56
57impl<'a> From<SyntaxError<'a>> for BufParseError<'a> {
58 fn from(err: SyntaxError<'a>) -> Self {
59 BufParseError::InvalidSyntax(err)
60 }
61}
62
63#[cfg(feature = "std")]
64impl std::error::Error for BufParseError<'_> {
65 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
66 match self {
67 BufParseError::InvalidUtf8(err) => Some(err),
68 BufParseError::InvalidSyntax(_) => None,
69 }
70 }
71}
72
73#[derive(Debug)]
100pub struct BufParse<R> {
101 chunk_size: usize,
102 buf: Vec<u8>,
103 pos: usize,
104 read: R,
105 exhausted: bool,
106}
107
108impl<R: BufParseInput> BufParse<R> {
109 pub fn new(read: R, chunk_size: usize) -> Self {
111 BufParse {
112 chunk_size,
113 buf: Vec::with_capacity(chunk_size),
114 pos: 0,
115 read,
116 exhausted: false,
117 }
118 }
119
120 pub fn buffer(&mut self) -> Result<(), R::Error> {
122 let size = self.chunk_size;
123
124 if self.buf.capacity() - self.buf.len() < size {
126 self.buf.drain(..self.pos);
127 self.pos = 0;
128 }
129
130 let end = self.buf.len();
131 self.buf.resize(end + size, 0);
132 let read = self.read.read(&mut self.buf[end..])?;
133 self.buf.truncate(end + read);
134
135 if read == 0 {
136 self.exhausted = true;
137 }
138
139 Ok(())
140 }
141
142 pub fn try_next(&mut self) -> Result<Option<Streaming<Paragraph>>, BufParseError> {
154 let input = self.as_longest_utf8(&self.buf)?;
155
156 match parse_streaming(input)? {
157 Streaming::Item((rest, paragraph)) => {
158 let parsed = input.len() - rest.len();
159 self.pos += parsed;
160 Ok(Some(Streaming::Item(paragraph)))
161 }
162 Streaming::Incomplete => {
163 if self.exhausted {
164 let input = self.as_utf8(&self.buf)?;
165 let result = parse_finish(input)?;
166 self.pos += input.len();
167 Ok(result.map(Streaming::Item))
168 } else {
169 Ok(Some(Streaming::Incomplete))
170 }
171 }
172 }
173 }
174
175 pub fn into_inner(self) -> R {
179 self.read
180 }
181
182 fn as_longest_utf8<'a>(&'_ self, buf: &'a [u8]) -> Result<&'a str, Utf8Error> {
184 self.as_utf8(buf).or_else(|err| match err.error_len() {
185 Some(_) => Err(err),
186 None => {
187 let valid = &buf[self.pos..self.pos + err.valid_up_to()];
188 from_utf8(valid)
189 }
190 })
191 }
192
193 fn as_utf8<'a>(&'_ self, buf: &'a [u8]) -> Result<&'a str, Utf8Error> {
195 from_utf8(&buf[self.pos..])
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use alloc::{
203 string::{String, ToString},
204 vec,
205 };
206 use assert_matches::assert_matches;
207 use core::cmp::min;
208 use indoc::indoc;
209
210 #[derive(Debug, PartialEq, Eq, Clone)]
211 struct Bytes<'a> {
212 bytes: &'a [u8],
213 pos: usize,
214 }
215
216 impl<'a> Bytes<'a> {
217 pub fn new(bytes: &'a [u8]) -> Self {
218 Bytes { bytes, pos: 0 }
219 }
220 }
221
222 impl<'a> BufParseInput for Bytes<'a> {
223 type Error = ();
224
225 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
226 let to_read = min(self.bytes.len() - self.pos, buf.len());
227 buf[..to_read].copy_from_slice(&self.bytes[self.pos..self.pos + to_read]);
228 self.pos += to_read;
229 Ok(to_read)
230 }
231 }
232
233 fn parse_input(input: &[u8], chunk_size: usize) -> Vec<(String, String)> {
234 let mut parser = BufParse::new(Bytes::new(input), chunk_size);
235 let mut fields = vec![];
236 while let Some(result) = parser.try_next().unwrap() {
237 match result {
238 Streaming::Item(paragraph) => {
239 fields.extend(
240 paragraph
241 .fields
242 .into_iter()
243 .map(|field| (field.name.to_string(), field.value)),
244 );
245 }
246 Streaming::Incomplete => parser.buffer().unwrap(),
247 }
248 }
249 fields
250 }
251
252 #[test]
253 fn should_parse_input_in_a_single_chunk() {
254 let result = parse_input(
255 indoc!(
256 "field: value
257 another-field: value"
258 )
259 .as_bytes(),
260 1000,
261 );
262 assert_eq!(
263 result,
264 vec![
265 ("field".to_string(), "value".to_string()),
266 ("another-field".to_string(), "value".to_string())
267 ]
268 );
269 }
270
271 #[test]
272 fn should_handle_partial_utf8_on_chunk_boundary() {
273 let result = parse_input("12345:äöüöäüääöüäöäüöüöä".as_bytes(), 7);
274 assert_eq!(
275 result,
276 vec![("12345".to_string(), "äöüöäüääöüäöäüöüöä".to_string())]
277 );
278 }
279
280 #[test]
281 fn should_handle_partial_utf8_after_advancing_position() {
282 let result = parse_input("1:2\n\n3:äöü".as_bytes(), 8);
283 assert_eq!(
284 result,
285 vec![
286 ("1".to_string(), "2".to_string()),
287 ("3".to_string(), "äöü".to_string()),
288 ]
289 );
290 }
291
292 #[test]
293 fn should_need_to_buffer_at_least_twice_for_nonempty_input() {
294 let mut parse = BufParse::new(Bytes::new(b"a: b"), 100);
295 parse.buffer().unwrap();
296 assert_matches!(parse.try_next(), Ok(Some(Streaming::Incomplete)));
297 parse.buffer().unwrap();
298 assert_matches!(parse.try_next(), Ok(Some(Streaming::Item(_))));
299 assert_matches!(parse.try_next(), Ok(None));
300 }
301
302 #[test]
303 fn should_keep_returning_none_when_input_is_exhausted() {
304 let mut parse = BufParse::new(Bytes::new(b""), 10);
305 parse.buffer().unwrap();
306 assert_matches!(parse.try_next(), Ok(None));
307 assert_matches!(parse.try_next(), Ok(None));
308 assert_matches!(parse.try_next(), Ok(None));
309 }
310
311 #[test]
312 fn should_fail_on_invalid_utf8_inside_chunk() {
313 let mut parse = BufParse::new(Bytes::new(b"abc: a\xe2\x82\x28bcd efgh"), 100);
314 parse.buffer().unwrap();
315 assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
316 assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
317 }
318
319 #[test]
320 fn should_fail_on_invalid_utf8_on_chunk_border() {
321 let mut parse = BufParse::new(Bytes::new(b"abc: ab\xe2\x82\x28bcd efgh"), 8);
322 parse.buffer().unwrap();
323 assert_matches!(parse.try_next(), Ok(Some(Streaming::Incomplete)));
324 parse.buffer().unwrap();
325 assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
326 }
327
328 #[test]
329 fn should_fail_on_trailing_invalid_utf8() {
330 let mut parse = BufParse::new(Bytes::new(b"abc: a\xe2\x82\x28"), 100);
331 parse.buffer().unwrap();
332 assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
333 }
334
335 #[test]
336 fn should_fail_on_trailing_partial_utf8() {
337 let mut parse = BufParse::new(Bytes::new(b"abc: a\xe2\x82"), 100);
338 parse.buffer().unwrap();
339 assert_matches!(parse.try_next(), Ok(Some(Streaming::Incomplete)));
340 parse.buffer().unwrap();
341 assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
342 }
343
344 #[test]
345 fn should_return_inner() {
346 let input = Bytes::new(b"abcd");
347 let parse = BufParse::new(input.clone(), 100);
348 let inner = parse.into_inner();
349 assert_eq!(inner, input);
350 }
351}