use bytes::Bytes;
use eventsource_stream::Eventsource;
use futures::stream::{Stream, StreamExt};
use crate::error::Error;
use crate::types::StreamEvent;
pub fn parse_sse_stream<S>(byte_stream: S) -> impl Stream<Item = Result<StreamEvent, Error>> + Send
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Unpin + 'static,
{
byte_stream.eventsource().filter_map(|result| async move {
match result {
Ok(event) => {
let data = event.data.trim();
if data.is_empty() {
return None;
}
match serde_json::from_str::<StreamEvent>(data) {
Ok(stream_event) => Some(Ok(stream_event)),
Err(e) => Some(Err(Error::Json(e))),
}
}
Err(e) => Some(Err(Error::stream(e))),
}
})
}