use std::convert::Infallible;
use bytes::Bytes;
use bytes::BytesMut;
use http::StatusCode;
use http::header;
use http_body_util::StreamBody;
use futures_util::Stream;
use futures_util::StreamExt;
use crate::body::TakoBody;
use crate::responder::Responder;
use crate::types::Response;
const PREFIX: &[u8] = b"data: ";
const SUFFIX: &[u8] = b"\n\n";
const PS_LEN: usize = PREFIX.len() + SUFFIX.len();
#[doc(alias = "sse")]
#[doc(alias = "eventsource")]
pub struct Sse<S>
where
S: Stream<Item = Bytes> + Send + 'static,
{
pub stream: S,
}
impl<S> Sse<S>
where
S: Stream<Item = Bytes> + Send + 'static,
{
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S> Responder for Sse<S>
where
S: Stream<Item = Bytes> + Send + 'static,
{
fn into_response(self) -> Response {
let stream = self.stream.map(|msg| {
let mut buf = BytesMut::with_capacity(PS_LEN + msg.len());
buf.extend_from_slice(PREFIX);
buf.extend_from_slice(&msg);
buf.extend_from_slice(SUFFIX);
Ok::<_, Infallible>(http_body::Frame::data(Bytes::from(buf)))
});
http::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/event-stream")
.header(header::CACHE_CONTROL, "no-cache")
.header(header::CONNECTION, "keep-alive")
.body(TakoBody::new(StreamBody::new(stream)))
.expect("valid SSE response")
}
}