use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use axum::extract::State;
use axum::response::IntoResponse;
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::routing::get;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use super::event::WebEvent;
use super::state::AppState;
pub fn router() -> Router<Arc<AppState>> {
Router::new().route("/api/events", get(events_handler))
}
async fn events_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let rx = state.event_bus.subscribe();
let stream = BroadcastStream::new(rx).filter_map(
|result: Result<WebEvent, tokio_stream::wrappers::errors::BroadcastStreamRecvError>| {
match result {
Ok(event) => {
let event_type = event.event_type();
match serde_json::to_string(&event) {
Ok(json) => Some(Ok::<_, Infallible>(
Event::default().event(event_type).data(json),
)),
Err(_) => None,
}
}
Err(_) => None,
}
},
);
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("ping"),
)
}
impl WebEvent {
pub fn event_type(&self) -> &'static str {
match self {
Self::Token { .. } => "token",
Self::Response { .. } => "response",
Self::ToolCall { .. } => "tool_call",
Self::ToolResult { .. } => "tool_result",
Self::FileModified { .. } => "file_modified",
Self::Error { .. } => "error",
Self::GuardStop { .. } => "guard_stop",
Self::StreamRetry { .. } => "stream_retry",
Self::Status { .. } => "status",
Self::PhaseChange { .. } => "phase_change",
Self::PlanReady { .. } => "plan_ready",
Self::SwarmAgentStarted { .. } => "hive_agent_started",
Self::SwarmAgentProgress { .. } => "hive_agent_progress",
Self::SwarmAgentToolCall { .. } => "hive_agent_tool_call",
Self::SwarmAgentToolResult { .. } => "hive_agent_tool_result",
Self::SwarmAgentToken { .. } => "hive_agent_token",
Self::SwarmAgentResponse { .. } => "hive_agent_response",
Self::SwarmAgentDone { .. } => "hive_agent_done",
Self::SwarmWorkerApproaching { .. } => "hive_worker_approaching",
Self::SwarmConflict { .. } => "hive_conflict",
Self::SwarmDone { .. } => "hive_done",
Self::SwarmWorkersDispatched => "hive_workers_dispatched",
Self::SwarmWorkerPaused { .. } => "hive_worker_paused",
Self::SwarmWorkerResumed { .. } => "hive_worker_resumed",
Self::PerformanceUpdate { .. } => "performance_update",
Self::Done => "done",
}
}
}