1use crate::AppState;
2use crate::capture::CaptureEvent;
3use axum::extract::State;
4use axum::response::sse::{Event, KeepAlive, Sse};
5use futures::stream::Stream;
6use std::convert::Infallible;
7use tokio_stream::StreamExt;
8use tokio_stream::wrappers::BroadcastStream;
9
10pub async fn stream(
11 State(state): State<AppState>,
12) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
13 let rx = state.events.subscribe();
14 let sse_stream = BroadcastStream::new(rx).filter_map(|res| match res {
15 Ok(ev) => Some(Ok(event_to_sse(ev))),
16 Err(_lagged) => None, });
18 Sse::new(sse_stream).keep_alive(KeepAlive::default())
19}
20
21fn event_to_sse(ev: CaptureEvent) -> Event {
22 let (name, data) = match &ev {
23 CaptureEvent::RequestStarted { .. } => (
24 "request_started",
25 serde_json::to_string(&ev).unwrap_or_default(),
26 ),
27 CaptureEvent::RequestCompleted { .. } => (
28 "request_completed",
29 serde_json::to_string(&ev).unwrap_or_default(),
30 ),
31 };
32 Event::default().event(name).data(data)
33}