use std::fmt::Display;
use uuid::Uuid;
use crate::{
cli_utils::COMMIT_HASH,
common::{
connection::{GetNextMessageError, Reader, WriteMessageError, Writer},
constants::CRATE_VERSION,
message::{ClientMessage, RoomId, ServerMessage, User, UserState, PROTOCOL_VERSION},
rps::{
move_kind::UserMoveKind,
move_validation::{validate_move, MoveValidationError},
},
},
server::utils::send_room_update,
};
use super::{
types::{ArcMut, Connections, RoomData, Rooms},
utils::SendRoomUpdateError,
};
#[derive(thiserror::Error, Debug)]
pub enum ProcessError {
#[error("{}", .0)]
WriteMessageErr(#[from] WriteMessageError),
#[error("{}", .0)]
ReadMessageErr(#[from] GetNextMessageError),
#[error("{}", .0)]
ProcessErr(#[from] ProcessMessageError),
#[error("{}", .0)]
Other(String),
}
impl From<String> for ProcessError {
fn from(value: String) -> Self {
ProcessError::Other(value)
}
}
pub(crate) async fn process<P: Display + Eq + std::hash::Hash + Clone, Conn: Reader, W: Writer>(
peer: &P,
mut stream: Conn,
streams: Connections<P, W>,
rooms: Rooms<P>,
) -> Result<(), ProcessError> {
log::info!("Accepted from `{peer}`");
streams
.lock()
.await
.get_mut(peer)
.ok_or_else(|| format!("Could not get connection for {peer}"))?
.write_message(ServerMessage::Hello(
PROTOCOL_VERSION,
CRATE_VERSION.into(),
COMMIT_HASH,
))
.await?;
let mut user = None;
let mut room = None;
while let Some(msg) = stream.get_next_message::<ClientMessage>().await? {
log::debug!("{peer}: {msg:?}");
let error = match process_message(msg, &mut user, &mut room, peer, &streams, &rooms).await {
Ok(()) => None,
Err(err) => match err {
ProcessMessageError::WriteMsgErr(write_message_error)
| ProcessMessageError::SendRoomUpdateErr(SendRoomUpdateError::WriteMessageError(
write_message_error,
)) => {
return Err(write_message_error.into());
}
ProcessMessageError::Other(err)
| ProcessMessageError::SendRoomUpdateErr(SendRoomUpdateError::Other(err)) => {
Some(err)
}
ProcessMessageError::InvalidMove(err) => Some(format!("{err}")), },
};
if let Some(error) = error {
log::info!("handling message failed: {error}");
streams
.lock()
.await
.get_mut(peer)
.ok_or_else(|| format!("Could not get connection for {peer}"))?
.write_message(ServerMessage::Error(error))
.await?;
}
}
log::info!("Connection closed `{peer}`");
Ok(())
}
#[derive(thiserror::Error, Debug)]
pub enum ProcessMessageError {
#[error("{}", .0)]
WriteMsgErr(#[from] WriteMessageError),
#[error("{}", .0)]
SendRoomUpdateErr(#[from] SendRoomUpdateError),
#[error("Invalid move: {}", .0)]
InvalidMove(#[from] MoveValidationError),
#[error("{}", .0)]
Other(String),
}
impl From<String> for ProcessMessageError {
fn from(value: String) -> Self {
ProcessMessageError::Other(value)
}
}
async fn process_message<P: Display + Eq + std::hash::Hash + Clone, W: Writer>(
message: ClientMessage,
user_id: &mut Option<Uuid>,
room: &mut Option<(RoomId, ArcMut<RoomData<P>>)>,
peer: &P,
streams: &Connections<P, W>,
rooms: &Rooms<P>,
) -> Result<(), ProcessMessageError> {
let user: Option<User> = match (user_id.as_ref(), room.as_ref()) {
(Some(user_id), Some((_, room))) => room
.lock()
.await
.users
.values()
.find(|u| u.id == *user_id)
.cloned(),
_ => None,
};
let move_kind = UserMoveKind::from_client_message(&message);
let _: () = match message {
ClientMessage::Ping { c } => {
streams
.lock()
.await
.get_mut(peer)
.ok_or_else(|| {
ProcessMessageError::Other(format!("Could not get connection for {peer}"))
})?
.write_message(ServerMessage::Pong { c })
.await?;
}
ClientMessage::Join {
room_id,
user: uuid,
} => {
if let Some((id, room)) = room {
room.lock().await.users.remove(peer);
send_room_update(*id, room, streams).await?;
}
let saved_room = rooms
.lock()
.await
.entry(room_id)
.or_insert_with(Default::default)
.clone();
{
let mut room = saved_room.lock().await;
validate_move(
uuid,
move_kind.expect("is move"),
room.users.iter(),
room.round.as_ref(),
)?;
room.users.insert(
peer.clone(),
User {
id: uuid,
state: UserState::InRoom,
},
);
};
*room = Some((room_id, saved_room.clone()));
*user_id = Some(uuid);
log::info!("User {uuid} joined {room_id}");
send_room_update(room_id, &saved_room, streams).await?;
}
ClientMessage::Play { value, round } => {
round.validate()?;
let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
let (room_id, room) = room.as_ref().expect("should exist if user exists");
{
let mut room = room.lock().await;
validate_move(
u.id,
move_kind.expect("is move"),
room.users.iter(),
room.round.as_ref(),
)?;
u.state = UserState::Played(value);
if let Some(ref old_round) = room.round {
if *old_round != round {
return Err(ProcessMessageError::Other(format!(
"Expected round {old_round:?}"
)));
}
} else {
room.round = Some(round);
}
room.users.insert(peer.clone(), u);
}
send_room_update(*room_id, room, streams).await?;
}
ClientMessage::ConfirmPlay(hash_with_data) => {
let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
if let UserState::Played(hash) = u.state {
if hash_with_data.get_hash() != hash.as_ref() {
Err(format!(
"User played wrong hash {} expected {}",
hash_with_data.get_hash(),
hash.as_ref()
))?
}
}
let (room_id, room) = room.as_ref().expect("should exist if user exists");
{
let mut room = room.lock().await;
validate_move(
u.id,
move_kind.expect("is move"),
room.users.iter(),
room.round.as_ref(),
)?;
hash_with_data.verify(room.round.as_ref().expect("round"))?;
u.state = UserState::Confirmed(hash_with_data);
room.users.insert(peer.clone(), u);
};
send_room_update(*room_id, room, streams).await?;
}
ClientMessage::RoundFinished { round_id } => {
let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
let (room_id, room) = room.as_ref().expect("should exist if user exists");
{
let mut room = room.lock().await;
validate_move(
u.id,
move_kind.expect("is move"),
room.users.iter(),
room.round.as_ref(),
)?;
u.state = UserState::InRoom;
room.users.insert(peer.clone(), u);
if room.round.as_ref().map(|r| r.round_id) != Some(round_id) {
Err(format!(
"Invalid round id, expected: {:?}",
room.round.as_ref().map(|r| r.round_id)
))?;
}
let round = room.round.as_ref().expect("round checked above");
if room
.users
.values()
.filter(|u| round.users.contains(&u.id))
.all(|u| matches!(u.state, UserState::InRoom))
{
room.round = None;
}
};
send_room_update(*room_id, room, streams).await?;
}
};
Ok(())
}