use axum::{
extract::{
State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
response::IntoResponse,
};
use futures_util::{SinkExt, StreamExt};
use crate::types::AppState;
pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
async fn handle_socket(socket: WebSocket, state: AppState) {
state.ws_connections.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (mut sender, mut receiver) = socket.split();
let mut rx = state.ws_tx.subscribe();
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Ok(text) => {
if sender.send(Message::Text(text.into())).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
client_msg = receiver.next() => {
match client_msg {
Some(Ok(Message::Close(_))) => break,
Some(Ok(_)) => {}
Some(Err(_)) | None => break,
}
}
}
}
state.ws_connections.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}