deboa_extras/http/sse/io/
stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{ready, Stream};
7use hyper::body::Body;
8use pin_project_lite::pin_project;
9
10use crate::{
11 errors::{DeboaExtrasError, SSEError},
12 http::sse::event::ServerEvent,
13};
14
15use deboa::response::DeboaBody;
16
17pin_project! {
18 #[derive(Debug)]
20 pub struct ServerEventStream{
21 #[pin]
22 stream: DeboaBody,
23 }
24}
25
26impl ServerEventStream {
27 pub fn new(stream: DeboaBody) -> Self {
28 Self { stream }
29 }
30}
31
32impl Stream for ServerEventStream {
33 type Item = Result<ServerEvent, DeboaExtrasError>;
34
35 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36 loop {
37 return match ready!(self
38 .as_mut()
39 .project()
40 .stream
41 .poll_frame(cx))
42 {
43 Some(Ok(frame)) => match frame.into_data() {
44 Ok(bytes) => {
45 let event = ServerEvent::parse(&bytes);
46 match event {
47 Ok(event) => Poll::Ready(Some(Ok(event))),
48 Err(err) => Poll::Ready(Some(Err(err))),
49 }
50 }
51 Err(_) => continue,
52 },
53 Some(Err(err)) => {
54 Poll::Ready(Some(Err(DeboaExtrasError::SSE(SSEError::ReceiveEvent {
55 message: err.to_string(),
56 }))))
57 }
58 None => Poll::Ready(None),
59 };
60 }
61 }
62}