deboa_extras/http/sse/io/
stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{ready, Stream};
7use hyper::body::Body;
8use hyper_body_utils::HttpBody;
9use pin_project_lite::pin_project;
10
11use crate::{
12 errors::{DeboaExtrasError, SSEError},
13 http::sse::event::ServerEvent,
14};
15
16pin_project! {
17 pub struct ServerEventStream{
19 #[pin]
20 stream: HttpBody,
21 }
22}
23
24impl ServerEventStream {
25 pub fn new(stream: HttpBody) -> Self {
26 Self { stream }
27 }
28}
29
30impl Stream for ServerEventStream {
31 type Item = Result<ServerEvent, DeboaExtrasError>;
32
33 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34 loop {
35 return match ready!(self
36 .as_mut()
37 .project()
38 .stream
39 .poll_frame(cx))
40 {
41 Some(Ok(frame)) => match frame.into_data() {
42 Ok(bytes) => {
43 let event = ServerEvent::parse(&bytes);
44 match event {
45 Ok(event) => Poll::Ready(Some(Ok(event))),
46 Err(err) => Poll::Ready(Some(Err(err))),
47 }
48 }
49 Err(_) => continue,
50 },
51 Some(Err(err)) => {
52 Poll::Ready(Some(Err(DeboaExtrasError::SSE(SSEError::ReceiveEvent {
53 message: err.to_string(),
54 }))))
55 }
56 None => Poll::Ready(None),
57 };
58 }
59 }
60}