Skip to main content

deboa_extras/http/sse/io/
stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{ready, Stream};
7use hyper::body::Body;
8use pin_project_lite::pin_project;
9
10use crate::{
11    errors::{DeboaExtrasError, SSEError},
12    http::sse::event::ServerEvent,
13};
14
15use deboa::response::DeboaBody;
16
17pin_project! {
18    /// A data stream created from a [`Body`].
19    #[derive(Debug)]
20    pub struct ServerEventStream{
21        #[pin]
22        stream: DeboaBody,
23    }
24}
25
26impl ServerEventStream {
27    pub fn new(stream: DeboaBody) -> Self {
28        Self { stream }
29    }
30}
31
32impl Stream for ServerEventStream {
33    type Item = Result<ServerEvent, DeboaExtrasError>;
34
35    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36        loop {
37            return match ready!(self
38                .as_mut()
39                .project()
40                .stream
41                .poll_frame(cx))
42            {
43                Some(Ok(frame)) => match frame.into_data() {
44                    Ok(bytes) => {
45                        let event = ServerEvent::parse(&bytes);
46                        match event {
47                            Ok(event) => Poll::Ready(Some(Ok(event))),
48                            Err(err) => Poll::Ready(Some(Err(err))),
49                        }
50                    }
51                    Err(_) => continue,
52                },
53                Some(Err(err)) => {
54                    Poll::Ready(Some(Err(DeboaExtrasError::SSE(SSEError::ReceiveEvent {
55                        message: err.to_string(),
56                    }))))
57                }
58                None => Poll::Ready(None),
59            };
60        }
61    }
62}