#[cfg(not(feature = "std"))]
use alloc::string::{FromUtf8Error, String, ToString};
#[cfg(feature = "std")]
use std::string::FromUtf8Error;
use crate::event::Event;
use crate::parser::{is_bom, is_lf, line, RawEventLine};
use crate::utf8_stream::{Utf8Stream, Utf8StreamError};
use core::fmt;
use core::pin::Pin;
use core::time::Duration;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use nom::error::Error as NomError;
use pin_project_lite::pin_project;
#[derive(Default, Debug)]
struct EventBuilder {
event: Event,
is_complete: bool,
}
impl EventBuilder {
fn add(&mut self, line: RawEventLine) {
match line {
RawEventLine::Field(field, val) => {
let val = val.unwrap_or("");
match field {
"event" => {
self.event.event = val.to_string();
}
"data" => {
self.event.data.push_str(val);
self.event.data.push('\u{000A}');
}
"id" => {
if !val.contains('\u{0000}') {
self.event.id = val.to_string()
}
}
"retry" => {
if let Ok(val) = val.parse::<u64>() {
self.event.retry = Some(Duration::from_millis(val))
}
}
_ => {}
}
}
RawEventLine::Comment(_) => {}
RawEventLine::Empty => self.is_complete = true,
}
}
fn dispatch(&mut self) -> Option<Event> {
let builder = core::mem::take(self);
let mut event = builder.event;
self.event.id = event.id.clone();
if event.data.is_empty() {
return None;
}
if is_lf(event.data.chars().next_back().unwrap()) {
event.data.pop();
}
if event.event.is_empty() {
event.event = "message".to_string();
}
Some(event)
}
}
#[derive(Debug, Clone, Copy)]
pub enum EventStreamState {
NotStarted,
Started,
Terminated,
}
impl EventStreamState {
fn is_terminated(self) -> bool {
matches!(self, Self::Terminated)
}
fn is_started(self) -> bool {
matches!(self, Self::Started)
}
}
pin_project! {
pub struct EventStream<S> {
#[pin]
stream: Utf8Stream<S>,
buffer: String,
builder: EventBuilder,
state: EventStreamState,
last_event_id: String,
}
}
impl<S> EventStream<S> {
pub fn new(stream: S) -> Self {
Self {
stream: Utf8Stream::new(stream),
buffer: String::new(),
builder: EventBuilder::default(),
state: EventStreamState::NotStarted,
last_event_id: String::new(),
}
}
pub fn set_last_event_id(&mut self, id: impl Into<String>) {
self.last_event_id = id.into();
}
pub fn last_event_id(&self) -> &str {
&self.last_event_id
}
}
#[derive(Debug, PartialEq)]
pub enum EventStreamError<E> {
Utf8(FromUtf8Error),
Parser(NomError<String>),
Transport(E),
}
impl<E> From<Utf8StreamError<E>> for EventStreamError<E> {
fn from(err: Utf8StreamError<E>) -> Self {
match err {
Utf8StreamError::Utf8(err) => Self::Utf8(err),
Utf8StreamError::Transport(err) => Self::Transport(err),
}
}
}
impl<E> From<NomError<&str>> for EventStreamError<E> {
fn from(err: NomError<&str>) -> Self {
EventStreamError::Parser(NomError::new(err.input.to_string(), err.code))
}
}
impl<E> fmt::Display for EventStreamError<E>
where
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Utf8(err) => f.write_fmt(format_args!("UTF8 error: {}", err)),
Self::Parser(err) => f.write_fmt(format_args!("Parse error: {}", err)),
Self::Transport(err) => f.write_fmt(format_args!("Transport error: {}", err)),
}
}
}
#[cfg(feature = "std")]
impl<E> std::error::Error for EventStreamError<E> where E: fmt::Display + fmt::Debug + Send + Sync {}
fn parse_event<E>(
buffer: &mut String,
builder: &mut EventBuilder,
) -> Result<Option<Event>, EventStreamError<E>> {
if buffer.is_empty() {
return Ok(None);
}
loop {
match line(buffer.as_ref()) {
Ok((rem, next_line)) => {
builder.add(next_line);
let consumed = buffer.len() - rem.len();
let rem = buffer.split_off(consumed);
*buffer = rem;
if builder.is_complete {
if let Some(event) = builder.dispatch() {
return Ok(Some(event));
}
}
}
Err(nom::Err::Incomplete(_)) => return Ok(None),
Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => return Err(err.into()),
}
}
}
impl<S, B, E> Stream for EventStream<S>
where
S: Stream<Item = Result<B, E>>,
B: AsRef<[u8]>,
{
type Item = Result<Event, EventStreamError<E>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match parse_event(this.buffer, this.builder) {
Ok(Some(event)) => {
*this.last_event_id = event.id.clone();
return Poll::Ready(Some(Ok(event)));
}
Err(err) => return Poll::Ready(Some(Err(err))),
_ => {}
}
if this.state.is_terminated() {
return Poll::Ready(None);
}
loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(string))) => {
if string.is_empty() {
continue;
}
let slice = if this.state.is_started() {
&string
} else {
*this.state = EventStreamState::Started;
if is_bom(string.chars().next().unwrap()) {
&string[1..]
} else {
&string
}
};
this.buffer.push_str(slice);
match parse_event(this.buffer, this.builder) {
Ok(Some(event)) => {
*this.last_event_id = event.id.clone();
return Poll::Ready(Some(Ok(event)));
}
Err(err) => return Poll::Ready(Some(Err(err))),
_ => {}
}
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
Poll::Ready(None) => {
*this.state = EventStreamState::Terminated;
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::prelude::*;
#[tokio::test]
async fn valid_data_fields() {
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data: Hello, world!\n\n"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![Event {
event: "message".to_string(),
data: "Hello, world!".to_string(),
..Default::default()
}]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![
Ok::<_, ()>("data: Hello,"),
Ok::<_, ()>(" world!\n\n")
]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![Event {
event: "message".to_string(),
data: "Hello, world!".to_string(),
..Default::default()
}]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![
Ok::<_, ()>("data: Hello,"),
Ok::<_, ()>(""),
Ok::<_, ()>(" world!\n\n")
]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![Event {
event: "message".to_string(),
data: "Hello, world!".to_string(),
..Default::default()
}]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data: Hello, world!\n"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data: Hello,\ndata: world!\n\n"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![Event {
event: "message".to_string(),
data: "Hello,\nworld!".to_string(),
..Default::default()
}]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data: Hello,\n\ndata: world!\n\n"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![
Event {
event: "message".to_string(),
data: "Hello,".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: "world!".to_string(),
..Default::default()
}
]
);
}
#[tokio::test]
async fn spec_examples() {
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data: This is the first message.
data: This is the second message, it
data: has two lines.
data: This is the third message.
"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![
Event {
event: "message".to_string(),
data: "This is the first message.".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: "This is the second message, it\nhas two lines.".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: "This is the third message.".to_string(),
..Default::default()
}
]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"event: add
data: 73857293
event: remove
data: 2153
event: add
data: 113411
"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![
Event {
event: "add".to_string(),
data: "73857293".to_string(),
..Default::default()
},
Event {
event: "remove".to_string(),
data: "2153".to_string(),
..Default::default()
},
Event {
event: "add".to_string(),
data: "113411".to_string(),
..Default::default()
}
]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data: YHOO
data: +2
data: 10
"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![Event {
event: "message".to_string(),
data: "YHOO\n+2\n10".to_string(),
..Default::default()
},]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
": test stream
data: first event
id: 1
data:second event
id
data: third event
"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![
Event {
event: "message".to_string(),
id: "1".to_string(),
data: "first event".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: "second event".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: " third event".to_string(),
..Default::default()
}
]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data
data
data
data:
"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![
Event {
event: "message".to_string(),
data: "".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: "\n".to_string(),
..Default::default()
},
]
);
assert_eq!(
EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
"data:test
data: test
"
)]))
.try_collect::<Vec<_>>()
.await
.unwrap(),
vec![
Event {
event: "message".to_string(),
data: "test".to_string(),
..Default::default()
},
Event {
event: "message".to_string(),
data: "test".to_string(),
..Default::default()
},
]
);
}
}