use leptos::prelude::*;
use leptos::web_sys::{ErrorEvent, EventSource, MessageEvent};
use serde::Deserialize;
use wasm_bindgen::closure::Closure;
use wasm_bindgen::JsCast;
#[derive(Debug, Clone, PartialEq)]
pub enum SseEvent {
StatsUpdated,
SessionCreated { id: String },
SessionUpdated { id: String },
ConfigChanged { scope: String },
AnalyticsUpdated,
LoadCompleted,
WatcherError { message: String },
}
#[derive(Debug, Deserialize)]
struct SessionEventData {
id: String,
}
#[derive(Debug, Deserialize)]
struct ConfigEventData {
scope: String,
}
#[derive(Debug, Deserialize)]
struct ErrorEventData {
message: String,
}
fn parse_sse_event(event_type: &str, data: &str) -> Option<SseEvent> {
match event_type {
"stats_updated" => Some(SseEvent::StatsUpdated),
"analytics_updated" => Some(SseEvent::AnalyticsUpdated),
"load_completed" => Some(SseEvent::LoadCompleted),
"session_created" => serde_json::from_str::<SessionEventData>(data)
.ok()
.map(|payload| SseEvent::SessionCreated { id: payload.id }),
"session_updated" => serde_json::from_str::<SessionEventData>(data)
.ok()
.map(|payload| SseEvent::SessionUpdated { id: payload.id }),
"config_changed" => serde_json::from_str::<ConfigEventData>(data)
.ok()
.map(|payload| SseEvent::ConfigChanged {
scope: payload.scope,
}),
"watcher_error" => serde_json::from_str::<ErrorEventData>(data)
.ok()
.map(|payload| SseEvent::WatcherError {
message: payload.message,
}),
_ => None,
}
}
pub fn use_sse() -> ReadSignal<Option<SseEvent>> {
let (event, set_event) = signal(None::<SseEvent>);
let (_connection_status, set_connection_status) = signal(ConnectionStatus::Connecting);
Effect::new(move |_| {
let event_source = match EventSource::new("/api/events") {
Ok(es) => es,
Err(e) => {
leptos::logging::error!("Failed to create EventSource: {:?}", e);
set_connection_status.set(ConnectionStatus::Error);
return;
}
};
set_connection_status.set(ConnectionStatus::Connected);
let es_error = event_source.clone();
let on_open = Closure::wrap(Box::new(move |_: web_sys::Event| {
leptos::logging::log!("SSE connection opened");
set_connection_status.set(ConnectionStatus::Connected);
}) as Box<dyn FnMut(_)>);
event_source.set_onopen(Some(on_open.as_ref().unchecked_ref()));
on_open.forget();
let on_error = Closure::wrap(Box::new(move |e: ErrorEvent| {
leptos::logging::warn!("SSE connection error: {:?}", e.message());
set_connection_status.set(ConnectionStatus::Reconnecting);
if es_error.ready_state() == EventSource::CLOSED {
leptos::logging::error!("SSE connection closed permanently");
set_connection_status.set(ConnectionStatus::Error);
}
}) as Box<dyn FnMut(_)>);
event_source.set_onerror(Some(on_error.as_ref().unchecked_ref()));
on_error.forget();
let event_types = vec![
"stats_updated",
"session_created",
"session_updated",
"config_changed",
"analytics_updated",
"load_completed",
"watcher_error",
];
for event_type in event_types {
let event_type_owned = event_type.to_string();
let callback = Closure::wrap(Box::new(move |event: MessageEvent| {
let data = event.data().as_string().unwrap_or_default();
if let Some(parsed_event) = parse_sse_event(&event_type_owned, &data) {
leptos::logging::log!("SSE event received: {:?}", parsed_event);
set_event.set(Some(parsed_event));
}
}) as Box<dyn FnMut(_)>);
event_source
.add_event_listener_with_callback(event_type, callback.as_ref().unchecked_ref())
.unwrap_or_else(|e| {
leptos::logging::error!("Failed to add listener for {}: {:?}", event_type, e);
});
callback.forget();
}
});
event
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ConnectionStatus {
Connecting,
Connected,
Reconnecting,
Error,
}
pub fn use_sse_status() -> ReadSignal<ConnectionStatus> {
let (status, _) = signal(ConnectionStatus::Connecting);
status
}