Skip to main content

deboa_extras/http/sse/io/
stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{ready, Stream};
7use hyper::body::Body;
8use hyper_body_utils::HttpBody;
9use pin_project_lite::pin_project;
10
11use crate::{
12    errors::{DeboaExtrasError, SSEError},
13    http::sse::event::ServerEvent,
14};
15
16pin_project! {
17    /// A data stream created from a [`Body`].
18    pub struct ServerEventStream{
19        #[pin]
20        stream: HttpBody,
21    }
22}
23
24impl ServerEventStream {
25    pub fn new(stream: HttpBody) -> Self {
26        Self { stream }
27    }
28}
29
30impl Stream for ServerEventStream {
31    type Item = Result<ServerEvent, DeboaExtrasError>;
32
33    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34        loop {
35            return match ready!(self
36                .as_mut()
37                .project()
38                .stream
39                .poll_frame(cx))
40            {
41                Some(Ok(frame)) => match frame.into_data() {
42                    Ok(bytes) => {
43                        let event = ServerEvent::parse(&bytes);
44                        match event {
45                            Ok(event) => Poll::Ready(Some(Ok(event))),
46                            Err(err) => Poll::Ready(Some(Err(err))),
47                        }
48                    }
49                    Err(_) => continue,
50                },
51                Some(Err(err)) => {
52                    Poll::Ready(Some(Err(DeboaExtrasError::SSE(SSEError::ReceiveEvent {
53                        message: err.to_string(),
54                    }))))
55                }
56                None => Poll::Ready(None),
57            };
58        }
59    }
60}