use crate::StateChange;
use super::Changes;
const MAX_EVENT_SIZE: usize = 1024 * 1024;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum EventType {
Ping,
State,
}
impl Default for EventType {
fn default() -> Self {
Self::State
}
}
#[derive(Default, Debug)]
pub struct Event {
pub event: EventType,
pub id: Vec<u8>,
pub data: Vec<u8>,
}
#[derive(Debug, Copy, Clone)]
enum EventParserState {
Init,
Comment,
Field,
Value,
}
impl Default for EventParserState {
fn default() -> Self {
Self::Init
}
}
#[derive(Default, Debug)]
pub struct EventParser {
state: EventParserState,
field: Vec<u8>,
value: Vec<u8>,
bytes: Option<Vec<u8>>,
pos: usize,
result: Event,
}
impl EventParser {
pub fn push_bytes(&mut self, bytes: Vec<u8>) {
self.bytes = Some(bytes);
}
pub fn needs_bytes(&self) -> bool {
self.bytes.is_none()
}
pub fn filter_state(&mut self) -> Option<crate::Result<Changes>> {
#[allow(clippy::while_let_on_iterator)]
while let Some(event) = self.next() {
match event {
Ok(Event {
event: EventType::State,
data,
id,
..
}) => {
return match serde_json::from_slice::<StateChange>(&data) {
Ok(state_change) => Some(Ok(Changes {
id: if !id.is_empty() {
Some(String::from_utf8(id).unwrap_or_default())
} else {
None
},
changes: state_change.changed,
})),
Err(err) => Some(Err(err.into())),
};
}
Ok(Event {
event: EventType::Ping,
#[cfg(feature = "debug")]
id,
..
}) => {
#[cfg(feature = "debug")]
use std::iter::FromIterator;
#[cfg(feature = "debug")]
return Some(Ok(Changes {
id: if !id.is_empty() {
Some(String::from_utf8(id).unwrap_or_default())
} else {
None
},
changes: ahash::AHashMap::from_iter([(
"ping".to_string(),
ahash::AHashMap::new(),
)]),
}));
}
Err(err) => return Some(Err(err)),
}
}
None
}
}
impl Iterator for EventParser {
type Item = crate::Result<Event>;
fn next(&mut self) -> Option<Self::Item> {
let bytes = self.bytes.as_ref()?;
for byte in bytes.get(self.pos..)? {
self.pos += 1;
match self.state {
EventParserState::Init => match byte {
b':' => {
self.state = EventParserState::Comment;
}
b'\r' | b' ' => (),
b'\n' => {
return Some(Ok(std::mem::take(&mut self.result)));
}
_ => {
self.state = EventParserState::Field;
self.field.push(*byte);
}
},
EventParserState::Comment => {
if *byte == b'\n' {
self.state = EventParserState::Init;
}
}
EventParserState::Field => match byte {
b'\r' => (),
b'\n' => {
self.state = EventParserState::Init;
self.field.clear();
}
b':' => {
self.state = EventParserState::Value;
}
_ => {
if self.field.len() >= MAX_EVENT_SIZE {
return Some(Err(crate::Error::Internal(
"EventSource response is too long.".to_string(),
)));
}
self.field.push(*byte);
}
},
EventParserState::Value => match byte {
b'\r' => (),
b' ' if self.value.is_empty() => (),
b'\n' => {
self.state = EventParserState::Init;
match &self.field[..] {
b"id" => {
self.result.id.extend_from_slice(&self.value);
}
b"data" => {
self.result.data.extend_from_slice(&self.value);
}
b"event" => {
if self.value == b"ping" {
self.result.event = EventType::Ping;
} else {
self.result.event = EventType::State;
}
}
_ => {
}
}
self.field.clear();
self.value.clear();
}
_ => {
if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
return Some(Err(crate::Error::Internal(
"EventSource response is too long.".to_string(),
)));
}
self.value.push(*byte);
}
},
}
}
self.bytes = None;
self.pos = 0;
None
}
}
#[cfg(test)]
mod tests {
use super::{Event, EventType};
#[derive(Debug, PartialEq, Eq)]
struct EventString {
event: EventType,
id: String,
data: String,
}
impl From<Event> for EventString {
fn from(event: Event) -> Self {
Self {
event: event.event,
id: String::from_utf8(event.id).unwrap(),
data: String::from_utf8(event.data).unwrap(),
}
}
}
#[test]
fn parse() {
let mut parser = super::EventParser::default();
let mut results = Vec::new();
for frame in [
Vec::from("event: state\nid: 0\ndata: test\n\n"),
Vec::from("event: ping\nid:123\ndata: ping pa"),
Vec::from("yload"),
Vec::from("\n\n"),
Vec::from(":comment\n\n"),
Vec::from("data: YHOO\n"),
Vec::from("data: +2\n"),
Vec::from("data: 10\n\n"),
Vec::from(": test stream\n"),
Vec::from("data: first event\n"),
Vec::from("id: 1\n\n"),
Vec::from("data:second event\n"),
Vec::from("id\n\n"),
Vec::from("data: third event\n\n"),
Vec::from("data:hello\n\ndata: world\n\n"),
] {
parser.push_bytes(frame);
#[allow(clippy::while_let_on_iterator)]
while let Some(event) = parser.next() {
results.push(EventString::from(event.unwrap()));
}
}
assert_eq!(
results,
vec![
EventString {
event: EventType::State,
id: "0".to_string(),
data: "test".to_string()
},
EventString {
event: EventType::Ping,
id: "123".to_string(),
data: "ping payload".to_string()
},
EventString {
event: EventType::State,
id: "".to_string(),
data: "".to_string()
},
EventString {
event: EventType::State,
id: "".to_string(),
data: "YHOO+210".to_string()
},
EventString {
event: EventType::State,
id: "1".to_string(),
data: "first event".to_string()
},
EventString {
event: EventType::State,
id: "".to_string(),
data: "second event".to_string()
},
EventString {
event: EventType::State,
id: "".to_string(),
data: "third event".to_string()
},
EventString {
event: EventType::State,
id: "".to_string(),
data: "hello".to_string()
},
EventString {
event: EventType::State,
id: "".to_string(),
data: "world".to_string()
}
]
);
}
}