use crate::state::AppState;
use axum::{
extract::{
State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
response::Response,
};
use futures::{SinkExt, StreamExt};
use tracing::{debug, error, info};
pub async fn positions_ws(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| handle_positions_ws(socket, state))
}
async fn handle_positions_ws(socket: WebSocket, state: AppState) {
let (mut sender, mut receiver) = socket.split();
let mut rx = state.subscribe_positions();
info!("Position WebSocket client connected");
let send_task = tokio::spawn(async move {
while let Ok(update) = rx.recv().await {
let msg = serde_json::to_string(&update).unwrap_or_default();
if sender.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
});
let recv_task = tokio::spawn(async move {
while let Some(msg) = receiver.next().await {
match msg {
Ok(Message::Ping(_data)) => {
debug!("Received ping");
}
Ok(Message::Close(_)) => {
debug!("Client closed connection");
break;
}
Err(e) => {
error!(error = %e, "WebSocket error");
break;
}
_ => {}
}
}
});
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
info!("Position WebSocket client disconnected");
}
pub async fn alerts_ws(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
ws.on_upgrade(|socket| handle_alerts_ws(socket, state))
}
async fn handle_alerts_ws(socket: WebSocket, state: AppState) {
let (mut sender, mut receiver) = socket.split();
let mut rx = state.subscribe_alerts();
info!("Alerts WebSocket client connected");
let send_task = tokio::spawn(async move {
while let Ok(alert) = rx.recv().await {
let msg = serde_json::to_string(&alert).unwrap_or_default();
if sender.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
});
let recv_task = tokio::spawn(async move {
while let Some(msg) = receiver.next().await {
match msg {
Ok(Message::Close(_)) => {
debug!("Client closed connection");
break;
}
Err(e) => {
error!(error = %e, "WebSocket error");
break;
}
_ => {}
}
}
});
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
info!("Alerts WebSocket client disconnected");
}