use super::listeners::McpListenerRegistry;
use axum::http::HeaderMap;
use axum::response::{
IntoResponse, Response,
sse::{Event, KeepAlive, Sse},
};
use futures::stream::StreamExt;
use objectiveai_sdk::client_objectiveai_mcp::McpKind;
use objectiveai_sdk::client_objectiveai_mcp::client_request::McpListChangedKind;
use std::convert::Infallible;
use std::time::Duration;
use tokio_stream::wrappers::BroadcastStream;
const SSE_KEEP_ALIVE: Duration = Duration::from_secs(15);
pub async fn handle_get_sse(
response_id: String,
mcp_kind: McpKind,
listeners: McpListenerRegistry,
_headers: HeaderMap,
) -> Response {
let rx = listeners.subscribe(&response_id, &mcp_kind);
struct GcGuard {
listeners: McpListenerRegistry,
response_id: String,
mcp_kind: McpKind,
}
impl Drop for GcGuard {
fn drop(&mut self) {
self.listeners.gc(&self.response_id, &self.mcp_kind);
}
}
let gc = GcGuard {
listeners,
response_id,
mcp_kind,
};
let stream = BroadcastStream::new(rx).filter_map(move |item: Result<
McpListChangedKind,
tokio_stream::wrappers::errors::BroadcastStreamRecvError,
>| {
let _ = &gc;
async move {
let kind = item.ok()?;
let value = serde_json::json!({
"jsonrpc": "2.0",
"method": kind.method(),
});
let json = serde_json::to_string(&value).ok()?;
Some(Ok::<_, Infallible>(Event::default().data(json)))
}
});
Sse::new(stream)
.keep_alive(KeepAlive::new().interval(SSE_KEEP_ALIVE))
.into_response()
}