nil-server 0.5.1

Multiplayer strategy game
Documentation
// Copyright (C) Call of Nil contributors
// SPDX-License-Identifier: AGPL-3.0-only

use axum::extract::ws::{Message, WebSocket};
use futures::sink::SinkExt;
use futures::stream::{SplitSink, SplitStream, StreamExt};
use nil_core::event::{EventTarget, Listener};
use nil_core::player::PlayerId;
use tokio::select;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::{JoinHandle, spawn};

type Sender = SplitSink<WebSocket, Message>;
type Receiver = SplitStream<WebSocket>;

pub(crate) async fn handle_socket(socket: WebSocket, listener: Listener, player: PlayerId) {
  let (socket_tx, socket_rx) = socket.split();
  let mut tx_task = spawn_tx(socket_tx, listener, player);
  let mut rx_task = spawn_rx(socket_rx);

  select! {
    _ = (&mut tx_task) => rx_task.abort(),
    _ = (&mut rx_task) => tx_task.abort()
  }
}

fn spawn_tx(mut tx: Sender, mut listener: Listener, player: PlayerId) -> JoinHandle<()> {
  spawn(async move {
    loop {
      match listener.recv().await {
        Ok((bytes, target)) => {
          let send = match target {
            EventTarget::Broadcast => true,
            EventTarget::Player(id) if id == player => true,
            _ => false,
          };

          if send {
            let _ = tx.send(Message::Binary(bytes)).await;
          }
        }
        Err(RecvError::Closed) => break,
        Err(RecvError::Lagged(n)) => {
          tracing::warn!("Websocket listener for player {player} lagged by {n} messages");
        }
      }
    }
  })
}

fn spawn_rx(mut rx: Receiver) -> JoinHandle<()> {
  spawn(async move {
    while let Some(Ok(message)) = rx.next().await {
      if let Message::Close(..) = message {
        break;
      }
    }
  })
}