use std::sync::Arc;
use axum::extract::ws::{Message, WebSocket};
use super::ControlPanelState;
use crate::coding_agent::status::AgentConnectionStatus;
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type")]
pub enum WsEvent {
#[serde(rename = "connected")]
Connected { message: String },
#[serde(rename = "log")]
Log {
timestamp: String,
level: String,
message: String,
target: Option<String>,
},
#[serde(rename = "agent_state")]
AgentState { agent_id: String, state: String },
#[serde(rename = "dashboard")]
Dashboard {
uptime_secs: u64,
session_count: u64,
channel_count: usize,
},
#[serde(rename = "coding_agent_status")]
CodingAgentStatus {
agent_id: String,
previous_status: AgentConnectionStatus,
new_status: AgentConnectionStatus,
timestamp: String,
},
#[serde(rename = "coding_agent_task")]
CodingAgentTask {
agent_id: String,
task_id: String,
state: String,
timestamp: String,
},
#[serde(rename = "coding_agent_cost_warning")]
CodingAgentCostWarning {
agent_id: String,
current_cost_usd: f64,
cap_usd: f64,
timestamp: String,
},
}
pub(crate) async fn ws_events_handler(
axum::extract::State(state): axum::extract::State<Arc<ControlPanelState>>,
ws: axum::extract::WebSocketUpgrade,
) -> impl axum::response::IntoResponse {
ws.on_upgrade(move |socket| handle_ws_connection(socket, state))
}
async fn handle_ws_connection(mut socket: WebSocket, state: Arc<ControlPanelState>) {
let connected_msg = serde_json::to_string(&WsEvent::Connected {
message: "event stream active".to_string(),
})
.unwrap_or_default();
if socket
.send(Message::Text(connected_msg.into()))
.await
.is_err()
{
return;
}
let mut rx = state.ws_broadcast.subscribe();
let mut coding_agent_rx = state
.coding_agent_registry
.as_ref()
.map(|registry| registry.subscribe_status());
loop {
tokio::select! {
result = rx.recv() => {
match result {
Ok(event) => {
let json = match serde_json::to_string(&event) {
Ok(j) => j,
Err(_) => continue,
};
if socket.send(Message::Text(json.into())).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::debug!(skipped = n, "WebSocket client lagged, skipping messages");
continue;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
result = async {
match coding_agent_rx.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
match result {
Ok(status_event) => {
let ws_event = WsEvent::CodingAgentStatus {
agent_id: status_event.agent_id,
previous_status: status_event.previous_status,
new_status: status_event.new_status,
timestamp: status_event.timestamp.to_rfc3339(),
};
let json = match serde_json::to_string(&ws_event) {
Ok(j) => j,
Err(_) => continue,
};
if socket.send(Message::Text(json.into())).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::debug!(
skipped = n,
"Coding agent status receiver lagged, skipping events"
);
continue;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
coding_agent_rx = None;
continue;
}
}
}
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
Some(Err(_)) => break,
_ => {} }
}
}
}
}