collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! SSE (Server-Sent Events) endpoint for real-time agent event streaming.

use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use axum::Router;
use axum::extract::State;
use axum::response::IntoResponse;
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::routing::get;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

use super::event::WebEvent;
use super::state::AppState;

pub fn router() -> Router<Arc<AppState>> {
    Router::new().route("/api/events", get(events_handler))
}

/// SSE endpoint: clients receive a real-time stream of `WebEvent`s.
///
/// Each event is JSON-serialised with the event type as the SSE `event` field.
/// A 15-second keep-alive prevents proxy timeouts.
async fn events_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
    let rx = state.event_bus.subscribe();
    let stream = BroadcastStream::new(rx).filter_map(
        |result: Result<WebEvent, tokio_stream::wrappers::errors::BroadcastStreamRecvError>| {
            match result {
                Ok(event) => {
                    let event_type = event.event_type();
                    match serde_json::to_string(&event) {
                        Ok(json) => Some(Ok::<_, Infallible>(
                            Event::default().event(event_type).data(json),
                        )),
                        Err(_) => None,
                    }
                }
                // Lagged — subscriber fell behind, skip lost events.
                Err(_) => None,
            }
        },
    );

    Sse::new(stream).keep_alive(
        KeepAlive::new()
            .interval(Duration::from_secs(15))
            .text("ping"),
    )
}

impl WebEvent {
    /// Returns the SSE event type name for this variant.
    pub fn event_type(&self) -> &'static str {
        match self {
            Self::Token { .. } => "token",
            Self::Response { .. } => "response",
            Self::ToolCall { .. } => "tool_call",
            Self::ToolResult { .. } => "tool_result",
            Self::FileModified { .. } => "file_modified",
            Self::Error { .. } => "error",
            Self::GuardStop { .. } => "guard_stop",
            Self::StreamRetry { .. } => "stream_retry",
            Self::Status { .. } => "status",
            Self::PhaseChange { .. } => "phase_change",
            Self::PlanReady { .. } => "plan_ready",
            Self::SwarmAgentStarted { .. } => "hive_agent_started",
            Self::SwarmAgentProgress { .. } => "hive_agent_progress",
            Self::SwarmAgentToolCall { .. } => "hive_agent_tool_call",
            Self::SwarmAgentToolResult { .. } => "hive_agent_tool_result",
            Self::SwarmAgentToken { .. } => "hive_agent_token",
            Self::SwarmAgentResponse { .. } => "hive_agent_response",
            Self::SwarmAgentDone { .. } => "hive_agent_done",
            Self::SwarmWorkerApproaching { .. } => "hive_worker_approaching",
            Self::SwarmConflict { .. } => "hive_conflict",
            Self::SwarmDone { .. } => "hive_done",
            Self::SwarmWorkersDispatched => "hive_workers_dispatched",
            Self::SwarmWorkerPaused { .. } => "hive_worker_paused",
            Self::SwarmWorkerResumed { .. } => "hive_worker_resumed",
            Self::PerformanceUpdate { .. } => "performance_update",
            Self::Done => "done",
        }
    }
}