#![deny(missing_docs)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Event {
pub event: Option<String>,
pub data: String,
pub id: Option<String>,
pub retry_ms: Option<u64>,
}
#[derive(Debug, Default)]
pub struct Parser {
line_buf: Vec<u8>,
pending: Event,
has_pending: bool,
}
impl Parser {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, bytes: &[u8]) -> Vec<Event> {
let mut out = Vec::new();
for &b in bytes {
if b == b'\n' {
self.consume_line(&mut out);
} else if b != b'\r' {
self.line_buf.push(b);
}
}
out
}
pub fn flush(&mut self) -> Option<Event> {
if self.has_pending {
let ev = std::mem::take(&mut self.pending);
self.has_pending = false;
Some(ev)
} else {
None
}
}
fn consume_line(&mut self, out: &mut Vec<Event>) {
if self.line_buf.is_empty() {
if self.has_pending {
out.push(std::mem::take(&mut self.pending));
self.has_pending = false;
}
return;
}
if self.line_buf[0] == b':' {
self.line_buf.clear();
return;
}
let line = std::mem::take(&mut self.line_buf);
let split = line.iter().position(|&c| c == b':');
let (field, value) = match split {
Some(i) => {
let f = String::from_utf8_lossy(&line[..i]).to_string();
let mut v = &line[i + 1..];
if v.first() == Some(&b' ') {
v = &v[1..];
}
(f, String::from_utf8_lossy(v).to_string())
}
None => (String::from_utf8_lossy(&line).to_string(), String::new()),
};
self.has_pending = true;
match field.as_str() {
"event" => self.pending.event = Some(value),
"data" => {
if !self.pending.data.is_empty() {
self.pending.data.push('\n');
}
self.pending.data.push_str(&value);
}
"id" => self.pending.id = Some(value),
"retry" => self.pending.retry_ms = value.parse().ok(),
_ => {}
}
}
}