use axum::response::sse::{Event, KeepAlive, Sse};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use tracing::{debug, info, trace};
use crate::protocol::JsonRpcResponse;
#[allow(dead_code)]
pub fn response_to_event(response: &JsonRpcResponse) -> Result<Event, serde_json::Error> {
let json = serde_json::to_string(response)?;
Ok(Event::default().event("message").data(json))
}
struct StreamDropGuard {
session_id: String,
}
impl Drop for StreamDropGuard {
fn drop(&mut self) {
info!(session_id = %self.session_id, "SSE notification stream closed");
}
}
pub fn notification_stream(
rx: tokio::sync::broadcast::Receiver<String>,
session_id: Option<String>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
let guard = session_id.map(|id| StreamDropGuard { session_id: id });
let stream = BroadcastStream::new(rx).filter_map(move |result| {
let _ = &guard;
match result {
Ok(data) => {
trace!("SSE event emitted");
Some(Ok(Event::default().event("message").data(data)))
}
Err(_) => {
debug!("SSE lagged message skipped");
None
}
}
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_response_to_event_success() {
let response =
JsonRpcResponse::success(serde_json::json!(1), serde_json::json!({"status": "ok"}));
let event = response_to_event(&response);
assert!(event.is_ok());
}
#[test]
fn test_response_to_event_error() {
let response = JsonRpcResponse::error(
serde_json::json!(2),
crate::protocol::JsonRpcError::internal_error("boom"),
);
let event = response_to_event(&response);
assert!(event.is_ok());
}
}