use crate::{parse_finish, parse_streaming, Paragraph, Streaming, SyntaxError};
use alloc::vec::Vec;
use core::{
fmt,
str::{from_utf8, Utf8Error},
};
pub trait BufParseInput {
type Error;
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
}
#[cfg(feature = "std")]
impl<R: std::io::Read + ?Sized> BufParseInput for R {
type Error = std::io::Error;
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.read(buf)
}
}
#[derive(Debug)]
pub enum BufParseError<'a> {
InvalidUtf8(Utf8Error),
InvalidSyntax(SyntaxError<'a>),
}
impl fmt::Display for BufParseError<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BufParseError::InvalidUtf8(err) => write!(f, "invalid utf-8 in input: {}", err),
BufParseError::InvalidSyntax(err) => write!(f, "invalid syntax: {}", err),
}
}
}
impl<'a> From<Utf8Error> for BufParseError<'a> {
fn from(err: Utf8Error) -> Self {
BufParseError::InvalidUtf8(err)
}
}
impl<'a> From<SyntaxError<'a>> for BufParseError<'a> {
fn from(err: SyntaxError<'a>) -> Self {
BufParseError::InvalidSyntax(err)
}
}
#[cfg(feature = "std")]
impl std::error::Error for BufParseError<'_> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
BufParseError::InvalidUtf8(err) => Some(err),
BufParseError::InvalidSyntax(_) => None,
}
}
}
#[derive(Debug)]
pub struct BufParse<R> {
chunk_size: usize,
buf: Vec<u8>,
pos: usize,
read: R,
exhausted: bool,
}
impl<R: BufParseInput> BufParse<R> {
pub fn new(read: R, chunk_size: usize) -> Self {
BufParse {
chunk_size,
buf: Vec::with_capacity(chunk_size),
pos: 0,
read,
exhausted: false,
}
}
pub fn buffer(&mut self) -> Result<(), R::Error> {
let size = self.chunk_size;
if self.buf.capacity() - self.buf.len() < size {
self.buf.drain(..self.pos);
self.pos = 0;
}
let end = self.buf.len();
self.buf.resize(end + size, 0);
let read = self.read.read(&mut self.buf[end..])?;
self.buf.truncate(end + read);
if read == 0 {
self.exhausted = true;
}
Ok(())
}
pub fn try_next(&mut self) -> Result<Option<Streaming<Paragraph>>, BufParseError> {
let input = self.as_longest_utf8(&self.buf)?;
match parse_streaming(input)? {
Streaming::Item((rest, paragraph)) => {
let parsed = input.len() - rest.len();
self.pos += parsed;
Ok(Some(Streaming::Item(paragraph)))
}
Streaming::Incomplete => {
if self.exhausted {
let input = self.as_utf8(&self.buf)?;
let result = parse_finish(input)?;
self.pos += input.len();
Ok(result.map(Streaming::Item))
} else {
Ok(Some(Streaming::Incomplete))
}
}
}
}
pub fn into_inner(self) -> R {
self.read
}
fn as_longest_utf8<'a>(&'_ self, buf: &'a [u8]) -> Result<&'a str, Utf8Error> {
self.as_utf8(buf).or_else(|err| match err.error_len() {
Some(_) => Err(err),
None => {
let valid = &buf[self.pos..self.pos + err.valid_up_to()];
from_utf8(valid)
}
})
}
fn as_utf8<'a>(&'_ self, buf: &'a [u8]) -> Result<&'a str, Utf8Error> {
from_utf8(&buf[self.pos..])
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::{
string::{String, ToString},
vec,
};
use assert_matches::assert_matches;
use core::cmp::min;
use indoc::indoc;
#[derive(Debug, PartialEq, Eq, Clone)]
struct Bytes<'a> {
bytes: &'a [u8],
pos: usize,
}
impl<'a> Bytes<'a> {
pub fn new(bytes: &'a [u8]) -> Self {
Bytes { bytes, pos: 0 }
}
}
impl<'a> BufParseInput for Bytes<'a> {
type Error = ();
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
let to_read = min(self.bytes.len() - self.pos, buf.len());
buf[..to_read].copy_from_slice(&self.bytes[self.pos..self.pos + to_read]);
self.pos += to_read;
Ok(to_read)
}
}
fn parse_input(input: &[u8], chunk_size: usize) -> Vec<(String, String)> {
let mut parser = BufParse::new(Bytes::new(input), chunk_size);
let mut fields = vec![];
while let Some(result) = parser.try_next().unwrap() {
match result {
Streaming::Item(paragraph) => {
fields.extend(
paragraph
.fields
.into_iter()
.map(|field| (field.name.to_string(), field.value)),
);
}
Streaming::Incomplete => parser.buffer().unwrap(),
}
}
fields
}
#[test]
fn should_parse_input_in_a_single_chunk() {
let result = parse_input(
indoc!(
"field: value
another-field: value"
)
.as_bytes(),
1000,
);
assert_eq!(
result,
vec![
("field".to_string(), "value".to_string()),
("another-field".to_string(), "value".to_string())
]
);
}
#[test]
fn should_handle_partial_utf8_on_chunk_boundary() {
let result = parse_input("12345:äöüöäüääöüäöäüöüöä".as_bytes(), 7);
assert_eq!(
result,
vec![("12345".to_string(), "äöüöäüääöüäöäüöüöä".to_string())]
);
}
#[test]
fn should_handle_partial_utf8_after_advancing_position() {
let result = parse_input("1:2\n\n3:äöü".as_bytes(), 8);
assert_eq!(
result,
vec![
("1".to_string(), "2".to_string()),
("3".to_string(), "äöü".to_string()),
]
);
}
#[test]
fn should_need_to_buffer_at_least_twice_for_nonempty_input() {
let mut parse = BufParse::new(Bytes::new(b"a: b"), 100);
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Ok(Some(Streaming::Incomplete)));
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Ok(Some(Streaming::Item(_))));
assert_matches!(parse.try_next(), Ok(None));
}
#[test]
fn should_keep_returning_none_when_input_is_exhausted() {
let mut parse = BufParse::new(Bytes::new(b""), 10);
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Ok(None));
assert_matches!(parse.try_next(), Ok(None));
assert_matches!(parse.try_next(), Ok(None));
}
#[test]
fn should_fail_on_invalid_utf8_inside_chunk() {
let mut parse = BufParse::new(Bytes::new(b"abc: a\xe2\x82\x28bcd efgh"), 100);
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
}
#[test]
fn should_fail_on_invalid_utf8_on_chunk_border() {
let mut parse = BufParse::new(Bytes::new(b"abc: ab\xe2\x82\x28bcd efgh"), 8);
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Ok(Some(Streaming::Incomplete)));
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
}
#[test]
fn should_fail_on_trailing_invalid_utf8() {
let mut parse = BufParse::new(Bytes::new(b"abc: a\xe2\x82\x28"), 100);
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
}
#[test]
fn should_fail_on_trailing_partial_utf8() {
let mut parse = BufParse::new(Bytes::new(b"abc: a\xe2\x82"), 100);
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Ok(Some(Streaming::Incomplete)));
parse.buffer().unwrap();
assert_matches!(parse.try_next(), Err(BufParseError::InvalidUtf8(_)));
}
#[test]
fn should_return_inner() {
let input = Bytes::new(b"abcd");
let parse = BufParse::new(input.clone(), 100);
let inner = parse.into_inner();
assert_eq!(inner, input);
}
}