Skip to main content

ccs_proxy/api/
stream.rs

1use crate::AppState;
2use crate::capture::CaptureEvent;
3use axum::extract::State;
4use axum::response::sse::{Event, KeepAlive, Sse};
5use futures::stream::Stream;
6use std::convert::Infallible;
7use tokio_stream::StreamExt;
8use tokio_stream::wrappers::BroadcastStream;
9
10pub async fn stream(
11    State(state): State<AppState>,
12) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
13    let rx = state.events.subscribe();
14    let sse_stream = BroadcastStream::new(rx).filter_map(|res| match res {
15        Ok(ev) => Some(Ok(event_to_sse(ev))),
16        Err(_lagged) => None, // dropped events: skip silently
17    });
18    Sse::new(sse_stream).keep_alive(KeepAlive::default())
19}
20
21fn event_to_sse(ev: CaptureEvent) -> Event {
22    let (name, data) = match &ev {
23        CaptureEvent::RequestStarted { .. } => (
24            "request_started",
25            serde_json::to_string(&ev).unwrap_or_default(),
26        ),
27        CaptureEvent::RequestCompleted { .. } => (
28            "request_completed",
29            serde_json::to_string(&ev).unwrap_or_default(),
30        ),
31    };
32    Event::default().event(name).data(data)
33}