1use futures_core::Stream;
2use std::{
3 error::Error as StdError,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use crate::{parser::Parser, Error, Event};
9
10pub struct Body<S> {
11 inner: S,
12
13 parser: Parser,
14}
15
16impl<S, B, E> Stream for Body<S>
17where
18 S: Stream<Item = Result<B, E>> + Unpin,
19 B: bytes::Buf,
20 E: StdError + Unpin,
21{
22 type Item = Result<Event, Error<E>>;
23
24 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
25 loop {
33 match self.parser.next() {
34 Some(Ok(ev)) => return Poll::Ready(Some(Ok(ev))),
35 Some(Err(err)) => return Poll::Ready(Some(Err(Error::parser(err)))),
36 None => (),
37 }
38
39 match Pin::new(&mut self.inner).poll_next(ctx) {
40 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(Error::inner(err)))),
41 Poll::Ready(None) => return Poll::Ready(None),
42 Poll::Pending => return Poll::Pending,
43 Poll::Ready(Some(Ok(bs))) => self.parser.put(bs),
44 }
45 }
46 }
47}
48
49impl<S, B, E> From<S> for Body<S>
50where
51 S: Stream<Item = Result<B, E>> + Unpin,
52 B: bytes::Buf,
53 E: StdError,
54{
55 fn from(inner: S) -> Self {
56 Self {
57 inner,
58 parser: Parser::default(),
59 }
60 }
61}