use alloc::collections::VecDeque;
use alloc::vec::Vec;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;
use crate::sse::{SseEvent, SseParser};
pub struct SseStream<S, B, E>
where
S: Stream<Item = Result<B, E>>,
B: AsRef<[u8]>,
{
inner: S,
parser: SseParser,
ready: VecDeque<SseEvent>,
scratch: Vec<SseEvent>,
done: bool,
}
impl<S, B, E> SseStream<S, B, E>
where
S: Stream<Item = Result<B, E>>,
B: AsRef<[u8]>,
{
pub fn new(inner: S) -> Self {
Self {
inner,
parser: SseParser::new(),
ready: VecDeque::new(),
scratch: Vec::new(),
done: false,
}
}
}
impl<S, B, E> Stream for SseStream<S, B, E>
where
S: Stream<Item = Result<B, E>> + Unpin,
B: AsRef<[u8]>,
{
type Item = Result<SseEvent, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(ev) = self.ready.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
if self.done {
return Poll::Ready(None);
}
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
let this = &mut *self;
this.scratch.clear();
this.parser.feed(chunk.as_ref(), &mut this.scratch);
this.ready.extend(this.scratch.drain(..));
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
self.done = true;
let this = &mut *self;
this.scratch.clear();
this.parser.finish(&mut this.scratch);
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
}