1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//! WebSocket handler for real-time event streaming.
//!
//! GET /api/v1/ws — upgrade to WebSocket, receive JSON events.
use axum::{
extract::{
ws::{Message, WebSocket},
State, WebSocketUpgrade,
},
response::Response,
};
use super::state::BancoState;
/// GET /api/v1/ws — WebSocket upgrade for real-time events.
pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<BancoState>) -> Response {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
/// Handle a connected WebSocket — forward events from the bus.
async fn handle_socket(mut socket: WebSocket, state: BancoState) {
let mut rx = state.events.subscribe();
// Send a welcome message
let welcome = serde_json::json!({
"type": "connected",
"data": {
"endpoints": 66,
"model_loaded": state.model.is_loaded(),
}
});
if socket.send(Message::Text(welcome.to_string())).await.is_err() {
return;
}
// Forward events until the client disconnects
loop {
tokio::select! {
// Event from bus → send to client
event = rx.recv() => {
match event {
Ok(json) => {
if socket.send(Message::Text(json)).await.is_err() {
break; // Client disconnected
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
// Client too slow — notify and continue
let lag_msg = serde_json::json!({
"type": "system_event",
"data": {"message": format!("Missed {n} events (slow consumer)")}
});
let _ = socket.send(Message::Text(lag_msg.to_string())).await;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
// Client message (ping/pong handled by axum, we ignore text)
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Ping(data))) => {
let _ = socket.send(Message::Pong(data)).await;
}
_ => {} // Ignore other messages
}
}
}
}
}