1use crate::state::AppState;
2use axum::{
3 extract::State,
4 response::sse::{Event, KeepAlive, Sse},
5};
6use futures_core::Stream;
7use std::{convert::Infallible, sync::Arc, time::Duration};
8use tokio::sync::broadcast;
9use tracing::warn;
10
11pub async fn sse_handler(
13 State(state): State<Arc<AppState>>,
14) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
15 let mut rx = state.tx.subscribe();
16
17 let stream = async_stream::stream! {
18 yield Ok(Event::default().data(r#"{"type":"connected"}"#));
20
21 loop {
22 match rx.recv().await {
23 Ok(data) => {
24 yield Ok(Event::default().data(data));
25 }
26 Err(broadcast::error::RecvError::Lagged(n)) => {
27 warn!("SSE client lagged by {n} messages");
28 }
29 Err(_) => break,
30 }
31 }
32 };
33
34 Sse::new(stream).keep_alive(
35 KeepAlive::new()
36 .interval(Duration::from_secs(15))
37 .text("ping"),
38 )
39}