eventsource-stream 0.2.3

A basic building block for building an Eventsource from a Stream of bytes
Documentation
#[cfg(not(feature = "std"))]
use alloc::string::{FromUtf8Error, String, ToString};

#[cfg(feature = "std")]
use std::string::FromUtf8Error;

use crate::event::Event;
use crate::parser::{is_bom, is_lf, line, RawEventLine};
use crate::utf8_stream::{Utf8Stream, Utf8StreamError};
use core::fmt;
use core::pin::Pin;
use core::time::Duration;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use nom::error::Error as NomError;
use pin_project_lite::pin_project;

#[derive(Default, Debug)]
struct EventBuilder {
    event: Event,
    is_complete: bool,
}

impl EventBuilder {
    /// From the HTML spec
    ///
    /// -> If the field name is "event"
    ///    Set the event type buffer to field value.
    ///
    /// -> If the field name is "data"
    ///    Append the field value to the data buffer, then append a single U+000A LINE FEED (LF)
    ///    character to the data buffer.
    ///
    /// -> If the field name is "id"
    ///    If the field value does not contain U+0000 NULL, then set the last event ID buffer
    ///    to the field value. Otherwise, ignore the field.
    ///
    /// -> If the field name is "retry"
    ///    If the field value consists of only ASCII digits, then interpret the field value as
    ///    an integer in base ten, and set the event stream's reconnection time to that integer.
    ///    Otherwise, ignore the field.
    ///
    /// -> Otherwise
    ///    The field is ignored.
    fn add(&mut self, line: RawEventLine) {
        match line {
            RawEventLine::Field(field, val) => {
                let val = val.unwrap_or("");
                match field {
                    "event" => {
                        self.event.event = val.to_string();
                    }
                    "data" => {
                        self.event.data.push_str(val);
                        self.event.data.push('\u{000A}');
                    }
                    "id" => {
                        if !val.contains('\u{0000}') {
                            self.event.id = val.to_string()
                        }
                    }
                    "retry" => {
                        if let Ok(val) = val.parse::<u64>() {
                            self.event.retry = Some(Duration::from_millis(val))
                        }
                    }
                    _ => {}
                }
            }
            RawEventLine::Comment(_) => {}
            RawEventLine::Empty => self.is_complete = true,
        }
    }

    /// From the HTML spec
    ///
    /// 1. Set the last event ID string of the event source to the value of the last event ID
    /// buffer. The buffer does not get reset, so the last event ID string of the event source
    /// remains set to this value until the next time it is set by the server.
    /// 2. If the data buffer is an empty string, set the data buffer and the event type buffer
    /// to the empty string and return.
    /// 3. If the data buffer's last character is a U+000A LINE FEED (LF) character, then remove
    /// the last character from the data buffer.
    /// 4. Let event be the result of creating an event using MessageEvent, in the relevant Realm
    /// of the EventSource object.
    /// 5. Initialize event's type attribute to message, its data attribute to data, its origin
    /// attribute to the serialization of the origin of the event stream's final URL (i.e., the
    /// URL after redirects), and its lastEventId attribute to the last event ID string of the
    /// event source.
    /// 6. If the event type buffer has a value other than the empty string, change the type of
    /// the newly created event to equal the value of the event type buffer.
    /// 7. Set the data buffer and the event type buffer to the empty string.
    /// 8. Queue a task which, if the readyState attribute is set to a value other than CLOSED,
    /// dispatches the newly created event at the EventSource object.
    fn dispatch(&mut self) -> Option<Event> {
        let builder = core::mem::take(self);
        let mut event = builder.event;
        self.event.id = event.id.clone();

        if event.data.is_empty() {
            return None;
        }

        if is_lf(event.data.chars().next_back().unwrap()) {
            event.data.pop();
        }

        if event.event.is_empty() {
            event.event = "message".to_string();
        }

        Some(event)
    }
}

#[derive(Debug, Clone, Copy)]
pub enum EventStreamState {
    NotStarted,
    Started,
    Terminated,
}

impl EventStreamState {
    fn is_terminated(self) -> bool {
        matches!(self, Self::Terminated)
    }
    fn is_started(self) -> bool {
        matches!(self, Self::Started)
    }
}

pin_project! {
/// A Stream of events
pub struct EventStream<S> {
    #[pin]
    stream: Utf8Stream<S>,
    buffer: String,
    builder: EventBuilder,
    state: EventStreamState,
    last_event_id: String,
}
}

impl<S> EventStream<S> {
    /// Initialize the EventStream with a Stream
    pub fn new(stream: S) -> Self {
        Self {
            stream: Utf8Stream::new(stream),
            buffer: String::new(),
            builder: EventBuilder::default(),
            state: EventStreamState::NotStarted,
            last_event_id: String::new(),
        }
    }

    /// Set the last event ID of the stream. Useful for initializing the stream with a previous
    /// last event ID
    pub fn set_last_event_id(&mut self, id: impl Into<String>) {
        self.last_event_id = id.into();
    }

    /// Get the last event ID of the stream
    pub fn last_event_id(&self) -> &str {
        &self.last_event_id
    }
}

/// Error thrown while parsing an event line
#[derive(Debug, PartialEq)]
pub enum EventStreamError<E> {
    /// Source stream is not valid UTF8
    Utf8(FromUtf8Error),
    /// Source stream is not a valid EventStream
    Parser(NomError<String>),
    /// Underlying source stream error
    Transport(E),
}

impl<E> From<Utf8StreamError<E>> for EventStreamError<E> {
    fn from(err: Utf8StreamError<E>) -> Self {
        match err {
            Utf8StreamError::Utf8(err) => Self::Utf8(err),
            Utf8StreamError::Transport(err) => Self::Transport(err),
        }
    }
}

impl<E> From<NomError<&str>> for EventStreamError<E> {
    fn from(err: NomError<&str>) -> Self {
        EventStreamError::Parser(NomError::new(err.input.to_string(), err.code))
    }
}

impl<E> fmt::Display for EventStreamError<E>
where
    E: fmt::Display,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Utf8(err) => f.write_fmt(format_args!("UTF8 error: {}", err)),
            Self::Parser(err) => f.write_fmt(format_args!("Parse error: {}", err)),
            Self::Transport(err) => f.write_fmt(format_args!("Transport error: {}", err)),
        }
    }
}

#[cfg(feature = "std")]
impl<E> std::error::Error for EventStreamError<E> where E: fmt::Display + fmt::Debug + Send + Sync {}

fn parse_event<E>(
    buffer: &mut String,
    builder: &mut EventBuilder,
) -> Result<Option<Event>, EventStreamError<E>> {
    if buffer.is_empty() {
        return Ok(None);
    }
    loop {
        match line(buffer.as_ref()) {
            Ok((rem, next_line)) => {
                builder.add(next_line);
                let consumed = buffer.len() - rem.len();
                let rem = buffer.split_off(consumed);
                *buffer = rem;
                if builder.is_complete {
                    if let Some(event) = builder.dispatch() {
                        return Ok(Some(event));
                    }
                }
            }
            Err(nom::Err::Incomplete(_)) => return Ok(None),
            Err(nom::Err::Error(err)) | Err(nom::Err::Failure(err)) => return Err(err.into()),
        }
    }
}

impl<S, B, E> Stream for EventStream<S>
where
    S: Stream<Item = Result<B, E>>,
    B: AsRef<[u8]>,
{
    type Item = Result<Event, EventStreamError<E>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        match parse_event(this.buffer, this.builder) {
            Ok(Some(event)) => {
                *this.last_event_id = event.id.clone();
                return Poll::Ready(Some(Ok(event)));
            }
            Err(err) => return Poll::Ready(Some(Err(err))),
            _ => {}
        }

        if this.state.is_terminated() {
            return Poll::Ready(None);
        }

        loop {
            match this.stream.as_mut().poll_next(cx) {
                Poll::Ready(Some(Ok(string))) => {
                    if string.is_empty() {
                        continue;
                    }

                    let slice = if this.state.is_started() {
                        &string
                    } else {
                        *this.state = EventStreamState::Started;
                        if is_bom(string.chars().next().unwrap()) {
                            &string[1..]
                        } else {
                            &string
                        }
                    };
                    this.buffer.push_str(slice);

                    match parse_event(this.buffer, this.builder) {
                        Ok(Some(event)) => {
                            *this.last_event_id = event.id.clone();
                            return Poll::Ready(Some(Ok(event)));
                        }
                        Err(err) => return Poll::Ready(Some(Err(err))),
                        _ => {}
                    }
                }
                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
                Poll::Ready(None) => {
                    *this.state = EventStreamState::Terminated;
                    return Poll::Ready(None);
                }
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::prelude::*;

    #[tokio::test]
    async fn valid_data_fields() {
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data: Hello, world!\n\n"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![Event {
                event: "message".to_string(),
                data: "Hello, world!".to_string(),
                ..Default::default()
            }]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![
                Ok::<_, ()>("data: Hello,"),
                Ok::<_, ()>(" world!\n\n")
            ]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![Event {
                event: "message".to_string(),
                data: "Hello, world!".to_string(),
                ..Default::default()
            }]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![
                Ok::<_, ()>("data: Hello,"),
                Ok::<_, ()>(""),
                Ok::<_, ()>(" world!\n\n")
            ]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![Event {
                event: "message".to_string(),
                data: "Hello, world!".to_string(),
                ..Default::default()
            }]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data: Hello, world!\n"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data: Hello,\ndata: world!\n\n"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![Event {
                event: "message".to_string(),
                data: "Hello,\nworld!".to_string(),
                ..Default::default()
            }]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data: Hello,\n\ndata: world!\n\n"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![
                Event {
                    event: "message".to_string(),
                    data: "Hello,".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: "world!".to_string(),
                    ..Default::default()
                }
            ]
        );
    }

    #[tokio::test]
    async fn spec_examples() {
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data: This is the first message.

data: This is the second message, it
data: has two lines.

data: This is the third message.

"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![
                Event {
                    event: "message".to_string(),
                    data: "This is the first message.".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: "This is the second message, it\nhas two lines.".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: "This is the third message.".to_string(),
                    ..Default::default()
                }
            ]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "event: add
data: 73857293

event: remove
data: 2153

event: add
data: 113411

"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![
                Event {
                    event: "add".to_string(),
                    data: "73857293".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "remove".to_string(),
                    data: "2153".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "add".to_string(),
                    data: "113411".to_string(),
                    ..Default::default()
                }
            ]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data: YHOO
data: +2
data: 10

"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![Event {
                event: "message".to_string(),
                data: "YHOO\n+2\n10".to_string(),
                ..Default::default()
            },]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                ": test stream

data: first event
id: 1

data:second event
id

data:  third event

"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![
                Event {
                    event: "message".to_string(),
                    id: "1".to_string(),
                    data: "first event".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: "second event".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: " third event".to_string(),
                    ..Default::default()
                }
            ]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data

data
data

data:
"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![
                Event {
                    event: "message".to_string(),
                    data: "".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: "\n".to_string(),
                    ..Default::default()
                },
            ]
        );
        assert_eq!(
            EventStream::new(futures::stream::iter(vec![Ok::<_, ()>(
                "data:test

data: test

"
            )]))
            .try_collect::<Vec<_>>()
            .await
            .unwrap(),
            vec![
                Event {
                    event: "message".to_string(),
                    data: "test".to_string(),
                    ..Default::default()
                },
                Event {
                    event: "message".to_string(),
                    data: "test".to_string(),
                    ..Default::default()
                },
            ]
        );
    }
}