sse_agent/
body.rs

1use futures_core::Stream;
2use std::{
3    error::Error as StdError,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use crate::{parser::Parser, Error, Event};
9
10pub struct Body<S> {
11    inner: S,
12
13    parser: Parser,
14}
15
16impl<S, B, E> Stream for Body<S>
17where
18    S: Stream<Item = Result<B, E>> + Unpin,
19    B: bytes::Buf,
20    E: StdError + Unpin,
21{
22    type Item = Result<Event, Error<E>>;
23
24    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
25        // Whenever the parser cannot yet produce an Event. We want to poll the underlying
26        // Stream.
27        //
28        // However, if we ready Ready(Some(Ok(bs))) from inner stream we also want to parse.
29        //
30        // This is probably not the nicest code, but for now, let's always start by
31        // trying to parse.
32        loop {
33            match self.parser.next() {
34                Some(Ok(ev)) => return Poll::Ready(Some(Ok(ev))),
35                Some(Err(err)) => return Poll::Ready(Some(Err(Error::parser(err)))),
36                None => (),
37            }
38
39            match Pin::new(&mut self.inner).poll_next(ctx) {
40                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(Error::inner(err)))),
41                Poll::Ready(None) => return Poll::Ready(None),
42                Poll::Pending => return Poll::Pending,
43                Poll::Ready(Some(Ok(bs))) => self.parser.put(bs),
44            }
45        }
46    }
47}
48
49impl<S, B, E> From<S> for Body<S>
50where
51    S: Stream<Item = Result<B, E>> + Unpin,
52    B: bytes::Buf,
53    E: StdError,
54{
55    fn from(inner: S) -> Self {
56        Self {
57            inner,
58            parser: Parser::default(),
59        }
60    }
61}