zero-trust-rps 0.0.5

Online Multiplayer Rock Paper Scissors
Documentation
use std::fmt::Display;

use uuid::Uuid;

use crate::{
    cli_utils::COMMIT_HASH,
    common::{
        connection::{GetNextMessageError, Reader, WriteMessageError, Writer},
        constants::CRATE_VERSION,
        message::{ClientMessage, RoomId, ServerMessage, User, UserState, PROTOCOL_VERSION},
        rps::{
            move_kind::UserMoveKind,
            move_validation::{validate_move, MoveValidationError},
        },
    },
    server::utils::send_room_update,
};

use super::{
    types::{ArcMut, Connections, RoomData, Rooms},
    utils::SendRoomUpdateError,
};

#[derive(thiserror::Error, Debug)]
pub enum ProcessError {
    #[error("{}", .0)]
    WriteMessageErr(#[from] WriteMessageError),
    #[error("{}", .0)]
    ReadMessageErr(#[from] GetNextMessageError),
    #[error("{}", .0)]
    ProcessErr(#[from] ProcessMessageError),
    #[error("{}", .0)]
    Other(String),
}

impl From<String> for ProcessError {
    fn from(value: String) -> Self {
        ProcessError::Other(value)
    }
}

pub(crate) async fn process<P: Display + Eq + std::hash::Hash + Clone, Conn: Reader, W: Writer>(
    peer: &P,
    mut stream: Conn,
    streams: Connections<P, W>,
    rooms: Rooms<P>,
) -> Result<(), ProcessError> {
    log::info!("Accepted from `{peer}`");

    streams
        .lock()
        .await
        .get_mut(peer)
        .ok_or_else(|| format!("Could not get connection for {peer}"))?
        .write_message(ServerMessage::Hello(
            PROTOCOL_VERSION,
            CRATE_VERSION.into(),
            COMMIT_HASH,
        ))
        .await?;

    let mut user = None;
    let mut room = None;

    while let Some(msg) = stream.get_next_message::<ClientMessage>().await? {
        log::debug!("{peer}: {msg:?}");
        let error = match process_message(msg, &mut user, &mut room, peer, &streams, &rooms).await {
            Ok(()) => None,
            Err(err) => match err {
                ProcessMessageError::WriteMsgErr(write_message_error)
                | ProcessMessageError::SendRoomUpdateErr(SendRoomUpdateError::WriteMessageError(
                    write_message_error,
                )) => {
                    return Err(write_message_error.into());
                }
                ProcessMessageError::Other(err)
                | ProcessMessageError::SendRoomUpdateErr(SendRoomUpdateError::Other(err)) => {
                    Some(err)
                }
                ProcessMessageError::InvalidMove(err) => Some(format!("{err}")), // TODO: typed response instead of string
            },
        };
        if let Some(error) = error {
            log::info!("handling message failed: {error}");
            streams
                .lock()
                .await
                .get_mut(peer)
                .ok_or_else(|| format!("Could not get connection for {peer}"))?
                .write_message(ServerMessage::Error(error))
                .await?;
        }
    }

    log::info!("Connection closed `{peer}`");
    Ok(())
}

#[derive(thiserror::Error, Debug)]
pub enum ProcessMessageError {
    #[error("{}", .0)]
    WriteMsgErr(#[from] WriteMessageError),
    #[error("{}", .0)]
    SendRoomUpdateErr(#[from] SendRoomUpdateError),
    #[error("Invalid move: {}", .0)]
    InvalidMove(#[from] MoveValidationError),
    #[error("{}", .0)]
    Other(String),
}

impl From<String> for ProcessMessageError {
    fn from(value: String) -> Self {
        ProcessMessageError::Other(value)
    }
}

async fn process_message<P: Display + Eq + std::hash::Hash + Clone, W: Writer>(
    message: ClientMessage,
    user_id: &mut Option<Uuid>,
    room: &mut Option<(RoomId, ArcMut<RoomData<P>>)>,
    peer: &P,
    streams: &Connections<P, W>,
    rooms: &Rooms<P>,
) -> Result<(), ProcessMessageError> {
    let user: Option<User> = match (user_id.as_ref(), room.as_ref()) {
        (Some(user_id), Some((_, room))) => room
            .lock()
            .await
            .users
            .values()
            .find(|u| u.id == *user_id)
            .cloned(),
        _ => None,
    };

    let move_kind = UserMoveKind::from_client_message(&message);

    let _: () = match message {
        ClientMessage::Ping { c } => {
            streams
                .lock()
                .await
                .get_mut(peer)
                .ok_or_else(|| {
                    ProcessMessageError::Other(format!("Could not get connection for {peer}"))
                })?
                .write_message(ServerMessage::Pong { c })
                .await?;
        }
        ClientMessage::Join {
            room_id,
            user: uuid,
        } => {
            if let Some((id, room)) = room {
                // remove from old room
                room.lock().await.users.remove(peer);
                send_room_update(*id, room, streams).await?;
            }

            let saved_room = rooms
                .lock()
                .await
                .entry(room_id)
                .or_insert_with(Default::default)
                .clone();

            {
                let mut room = saved_room.lock().await;

                validate_move(
                    uuid,
                    move_kind.expect("is move"),
                    room.users.iter(),
                    room.round.as_ref(),
                )?;

                room.users.insert(
                    peer.clone(),
                    User {
                        id: uuid,
                        state: UserState::InRoom,
                    },
                );
            };
            *room = Some((room_id, saved_room.clone()));
            *user_id = Some(uuid);
            log::info!("User {uuid} joined {room_id}");
            send_room_update(room_id, &saved_room, streams).await?;
        }
        ClientMessage::Play { value, round } => {
            round.validate()?;
            let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
            let (room_id, room) = room.as_ref().expect("should exist if user exists");
            {
                let mut room = room.lock().await;
                validate_move(
                    u.id,
                    move_kind.expect("is move"),
                    room.users.iter(),
                    room.round.as_ref(),
                )?;
                u.state = UserState::Played(value);
                if let Some(ref old_round) = room.round {
                    if *old_round != round {
                        return Err(ProcessMessageError::Other(format!(
                            "Expected round {old_round:?}"
                        )));
                    }
                } else {
                    room.round = Some(round);
                }
                room.users.insert(peer.clone(), u);
            }
            send_room_update(*room_id, room, streams).await?;
        }
        ClientMessage::ConfirmPlay(hash_with_data) => {
            let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
            if let UserState::Played(hash) = u.state {
                if hash_with_data.get_hash() != hash.as_ref() {
                    Err(format!(
                        "User played wrong hash {} expected {}",
                        hash_with_data.get_hash(),
                        hash.as_ref()
                    ))?
                }
            }
            let (room_id, room) = room.as_ref().expect("should exist if user exists");
            {
                let mut room = room.lock().await;
                validate_move(
                    u.id,
                    move_kind.expect("is move"),
                    room.users.iter(),
                    room.round.as_ref(),
                )?;
                hash_with_data.verify(room.round.as_ref().expect("round"))?;

                u.state = UserState::Confirmed(hash_with_data);
                room.users.insert(peer.clone(), u);
            };

            send_room_update(*room_id, room, streams).await?;
        }
        ClientMessage::RoundFinished { round_id } => {
            let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
            let (room_id, room) = room.as_ref().expect("should exist if user exists");
            {
                let mut room = room.lock().await;
                validate_move(
                    u.id,
                    move_kind.expect("is move"),
                    room.users.iter(),
                    room.round.as_ref(),
                )?;
                u.state = UserState::InRoom;
                room.users.insert(peer.clone(), u);
                if room.round.as_ref().map(|r| r.round_id) != Some(round_id) {
                    Err(format!(
                        "Invalid round id, expected: {:?}",
                        room.round.as_ref().map(|r| r.round_id)
                    ))?;
                }
                let round = room.round.as_ref().expect("round checked above");
                if room
                    .users
                    .values()
                    .filter(|u| round.users.contains(&u.id))
                    .all(|u| matches!(u.state, UserState::InRoom))
                {
                    room.round = None;
                }
            };

            send_room_update(*room_id, room, streams).await?;
        }
    };
    Ok(())
}