use core::str::Utf8Error;
use bytes::{Buf, Bytes, BytesMut};
use bytes_utils::Str;
use super::constants::{CR, LF};
#[derive(Debug, Clone, Copy)]
pub(crate) enum RawEventLine<'a> {
Comment,
Field {
field_name: &'a [u8],
field_value: Option<&'a [u8]>,
},
Empty,
}
#[derive(Debug, Clone)]
pub(crate) enum RawEventLineOwned {
Comment,
Empty,
Field {
field_name: Bytes,
field_value: Option<Bytes>,
},
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum FieldName {
Event,
Data,
Id,
Retry,
Ignored,
}
#[derive(Debug, Clone)]
pub(crate) enum ValidatedEventLine {
Comment,
Empty,
Field {
field_name: FieldName,
field_value: Option<Str>,
},
}
fn validate_bytes(val: Bytes) -> Result<Str, Utf8Error> {
match str::from_utf8(val.as_ref()) {
Ok(_) => {
Ok(unsafe { Str::from_inner_unchecked(val) })
}
Err(e) => Err(e),
}
}
impl RawEventLineOwned {
pub(crate) fn validate(self) -> Result<ValidatedEventLine, core::str::Utf8Error> {
match self {
Self::Comment => Ok(ValidatedEventLine::Comment),
Self::Empty => Ok(ValidatedEventLine::Empty),
Self::Field {
field_name,
field_value,
} => {
let field_name = match field_name.as_ref() {
b"event" => FieldName::Event,
b"data" => FieldName::Data,
b"id" => FieldName::Id,
b"retry" => FieldName::Retry,
_ => FieldName::Ignored,
};
let field_value = match field_value {
Some(b) => Some(validate_bytes(b)?),
None => None,
};
Ok(ValidatedEventLine::Field {
field_name,
field_value,
})
}
}
}
}
fn find_eol(bytes: &[u8]) -> Option<(usize, usize)> {
let first_match = memchr::memchr2(CR, LF, bytes)?;
match bytes[first_match] {
LF => Some((first_match, first_match + 1)),
CR => {
if first_match + 1 >= bytes.len() {
return None; }
if bytes[first_match + 1] == LF {
Some((first_match, first_match + 2))
} else {
Some((first_match, first_match + 1))
}
}
_ => unreachable!(),
}
}
fn read_line(bytes: &[u8]) -> RawEventLine<'_> {
match memchr::memchr(b':', bytes) {
Some(colon_pos) => {
if colon_pos == 0 {
RawEventLine::Comment
} else {
let value = &bytes[colon_pos + 1..];
let value = match value {
[b' ', rest @ ..] => rest,
_ => value,
};
RawEventLine::Field {
field_name: &bytes[..colon_pos],
field_value: Some(value),
}
}
}
None => {
if bytes.is_empty() {
RawEventLine::Empty
} else {
RawEventLine::Field {
field_name: bytes,
field_value: None,
}
}
}
}
}
pub(crate) fn parse_line_from_buffer(buffer: &mut BytesMut) -> Option<RawEventLineOwned> {
let (line_end, rem_start) = find_eol(buffer)?;
let line = buffer.split_to(line_end).freeze();
buffer.advance(rem_start - line_end);
match read_line(&line) {
RawEventLine::Field {
field_name,
field_value,
} => {
let field_name = line.slice_ref(field_name);
let field_value = field_value.map(|field_value| line.slice_ref(field_value));
Some(RawEventLineOwned::Field {
field_name,
field_value,
})
}
RawEventLine::Comment => Some(RawEventLineOwned::Comment),
RawEventLine::Empty => Some(RawEventLineOwned::Empty),
}
}