use axum::extract::ws::{Message, WebSocket};
use axum::extract::{State, WebSocketUpgrade};
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use tokio::sync::broadcast;
use crate::otel::types::TelemetryEvent;
use super::routes::DashboardState;
pub fn ws_router(state: DashboardState) -> Router {
Router::new()
.route("/ws", get(ws_handler))
.with_state(state)
}
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<DashboardState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state.events_tx.subscribe()))
}
async fn handle_socket(mut socket: WebSocket, mut rx: broadcast::Receiver<TelemetryEvent>) {
loop {
tokio::select! {
result = rx.recv() => {
match result {
Ok(event) => {
let json = match serde_json::to_string(&event) {
Ok(j) => j,
Err(_) => continue,
};
if socket.send(Message::Text(json.into())).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::debug!("WebSocket client lagged by {} events", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Ping(data))) => {
#[allow(clippy::collapsible_match)]
if socket.send(Message::Pong(data)).await.is_err() {
break;
}
}
_ => {}
}
}
}
}
}