use std::task::Poll;
use bytes::Bytes;
use futures_util::{Stream, StreamExt, stream::BoxStream};
use tokio::time::Duration;
use super::Event;
use crate::{Body, IntoResponse, Response};
pub struct SSE {
stream: BoxStream<'static, Event>,
keep_alive: Option<Duration>,
}
impl SSE {
pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
Self {
stream: stream.boxed(),
keep_alive: None,
}
}
#[must_use]
pub fn keep_alive(self, duration: Duration) -> Self {
Self {
keep_alive: Some(duration),
..self
}
}
}
impl IntoResponse for SSE {
fn into_response(self) -> Response {
let mut stream = self
.stream
.map(|event| Ok::<_, std::io::Error>(Bytes::from(event.to_string())))
.boxed();
if let Some(duration) = self.keep_alive {
let comment = Bytes::from_static(b":\n\n");
let mut interval =
tokio::time::interval_at(tokio::time::Instant::now() + duration, duration);
stream = futures_util::stream::poll_fn(move |cx| {
if let Poll::Ready(msg) = stream.poll_next_unpin(cx) {
return Poll::Ready(msg);
}
interval.poll_tick(cx).map(|_| Some(Ok(comment.clone())))
})
.boxed();
}
Response::builder()
.content_type("text/event-stream")
.header("X-Accel-Buffering", "no")
.header("Cache-Control", "no-cache")
.body(Body::from_async_read(tokio_util::io::StreamReader::new(
stream,
)))
}
}