use crate::error::BoxError;
use crate::webc::WebStream;
use futures::Stream;
use reqwest::RequestBuilder;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct EventSourceStream {
inner: WebStream,
opened: bool,
}
#[derive(Debug)]
pub enum Event {
Open,
Message(Message),
}
#[derive(Debug)]
pub struct Message {
pub event: String,
pub data: String,
}
impl EventSourceStream {
pub fn new(reqwest_builder: RequestBuilder) -> Self {
Self {
inner: WebStream::new_with_delimiter(reqwest_builder, "\n\n"),
opened: false,
}
}
}
impl Stream for EventSourceStream {
type Item = Result<Event, BoxError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if !this.opened {
this.opened = true;
return Poll::Ready(Some(Ok(Event::Open)));
}
loop {
let nx = Pin::new(&mut this.inner).poll_next(cx);
match nx {
Poll::Ready(Some(Ok(raw_event))) => {
let mut event = "message".to_string();
let mut data = String::new();
for line in raw_event.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with(':') {
continue;
}
if let Some(e) = line.strip_prefix("event:") {
event = e.trim().to_string();
} else if let Some(d) = line.strip_prefix("data:") {
if !data.is_empty() {
data.push('\n');
}
data.push_str(d.trim());
}
}
if data.is_empty() {
continue;
}
return Poll::Ready(Some(Ok(Event::Message(Message { event, data }))));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}