use std::convert::Infallible;
use std::time::Duration;
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::{BoxStream, Stream, StreamExt};
use serde::Serialize;
#[derive(Debug, Default, Clone)]
pub struct SseEvent {
event: Option<String>,
data: String,
id: Option<String>,
retry: Option<u64>,
}
impl SseEvent {
pub fn data(mut self, data: impl Into<String>) -> Self {
self.data = data.into();
self
}
pub fn json_data<T: Serialize>(mut self, value: T) -> Result<Self, serde_json::Error> {
self.data = serde_json::to_string(&value)?;
Ok(self)
}
pub fn event(mut self, name: impl Into<String>) -> Self {
self.event = Some(name.into());
self
}
pub fn id(mut self, id: impl Into<String>) -> Self {
self.id = Some(id.into());
self
}
pub fn retry(mut self, millis: u64) -> Self {
self.retry = Some(millis);
self
}
fn into_transport(self) -> Event {
let mut e = Event::default().data(self.data);
if let Some(name) = self.event {
e = e.event(name);
}
if let Some(id) = self.id {
e = e.id(id);
}
if let Some(ms) = self.retry {
e = e.retry(Duration::from_millis(ms));
}
e
}
}
pub struct SseStream {
inner: BoxStream<'static, Result<SseEvent, Infallible>>,
keep_alive: Duration,
}
impl SseStream {
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<SseEvent, Infallible>> + Send + 'static,
{
Self {
inner: stream.boxed(),
keep_alive: Duration::from_secs(15),
}
}
pub fn keep_alive(mut self, interval: Duration) -> Self {
self.keep_alive = interval;
self
}
}
impl axum::response::IntoResponse for SseStream {
fn into_response(self) -> axum::response::Response {
let interval = self.keep_alive;
let mapped = self.inner.map(|res| res.map(SseEvent::into_transport));
Sse::new(mapped)
.keep_alive(KeepAlive::new().interval(interval))
.into_response()
}
}