Skip to main content

flow_server/
sse.rs

1use crate::state::AppState;
2use axum::{
3    extract::State,
4    response::sse::{Event, KeepAlive, Sse},
5};
6use futures_core::Stream;
7use std::{convert::Infallible, sync::Arc, time::Duration};
8use tokio::sync::broadcast;
9use tracing::warn;
10
11/// GET /api/events — SSE endpoint for live updates
12pub async fn sse_handler(
13    State(state): State<Arc<AppState>>,
14) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
15    let mut rx = state.tx.subscribe();
16
17    let stream = async_stream::stream! {
18        // Send initial connected event
19        yield Ok(Event::default().data(r#"{"type":"connected"}"#));
20
21        loop {
22            match rx.recv().await {
23                Ok(data) => {
24                    yield Ok(Event::default().data(data));
25                }
26                Err(broadcast::error::RecvError::Lagged(n)) => {
27                    warn!("SSE client lagged by {n} messages");
28                }
29                Err(_) => break,
30            }
31        }
32    };
33
34    Sse::new(stream).keep_alive(
35        KeepAlive::new()
36            .interval(Duration::from_secs(15))
37            .text("ping"),
38    )
39}