use axum::extract::ws::{Message, WebSocket};
use futures::sink::SinkExt;
use futures::stream::{SplitSink, SplitStream, StreamExt};
use nil_core::event::{EventTarget, Listener};
use nil_core::player::PlayerId;
use tokio::select;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::{JoinHandle, spawn};
type Sender = SplitSink<WebSocket, Message>;
type Receiver = SplitStream<WebSocket>;
pub(crate) async fn handle_socket(socket: WebSocket, listener: Listener, player: PlayerId) {
let (socket_tx, socket_rx) = socket.split();
let mut tx_task = spawn_tx(socket_tx, listener, player);
let mut rx_task = spawn_rx(socket_rx);
select! {
_ = (&mut tx_task) => rx_task.abort(),
_ = (&mut rx_task) => tx_task.abort()
}
}
fn spawn_tx(mut tx: Sender, mut listener: Listener, player: PlayerId) -> JoinHandle<()> {
spawn(async move {
loop {
match listener.recv().await {
Ok((bytes, target)) => {
let send = match target {
EventTarget::Broadcast => true,
EventTarget::Player(id) if id == player => true,
_ => false,
};
if send {
let _ = tx.send(Message::Binary(bytes)).await;
}
}
Err(RecvError::Closed) => break,
Err(..) => {}
}
}
})
}
fn spawn_rx(mut rx: Receiver) -> JoinHandle<()> {
spawn(async move {
while let Some(Ok(message)) = rx.next().await {
if let Message::Close(..) = message {
break;
}
}
})
}