Skip to main content

atomr_agents_coding_cli_harness_web/
sse.rs

1//! Server-Sent Events stream of normalized `CodingCliEvent`.
2
3use std::convert::Infallible;
4use std::time::Duration;
5
6use atomr_agents_coding_cli_core::CodingCliEvent;
7use axum::extract::State;
8use axum::response::sse::{Event, KeepAlive, Sse};
9use futures::stream::Stream;
10use tokio::sync::broadcast;
11use tokio_stream::wrappers::BroadcastStream;
12use tokio_stream::StreamExt;
13
14use crate::AppState;
15
16/// `GET /api/cli/runs/events` — SSE of every normalized event the
17/// harness broadcasts.
18pub async fn sse_events(
19    State(state): State<AppState>,
20) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
21    let rx: broadcast::Receiver<CodingCliEvent> = state.harness.event_sender().subscribe();
22    let stream = BroadcastStream::new(rx).filter_map(|res| match res {
23        Ok(ev) => {
24            let json = serde_json::to_string(&ev).unwrap_or_else(|_| "null".into());
25            Some(Ok(Event::default().event("coding_cli_event").data(json)))
26        }
27        Err(_lag) => None,
28    });
29    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
30}