use std::fmt::Display;
use uuid::Uuid;
use crate::{
cli_utils::COMMIT_HASH,
common::{
connection::{GetNextMessageError, Reader, WriteMessageError, Writer},
constants::CRATE_VERSION,
message::{ClientMessage, RoomId, ServerMessage, User, UserState, PROTOCOL_VERSION},
},
server::utils::send_room_update,
};
use super::{
types::{Connections, Rooms},
utils::SendRoomUpdateError,
};
#[derive(thiserror::Error, Debug)]
pub enum ProcessError {
#[error("{}", .0)]
WriteMessageErr(#[from] WriteMessageError),
#[error("{}", .0)]
ReadMessageErr(#[from] GetNextMessageError),
#[error("{}", .0)]
ProcessErr(#[from] ProcessMessageError),
#[error("{}", .0)]
Other(String),
}
impl From<String> for ProcessError {
fn from(value: String) -> Self {
ProcessError::Other(value)
}
}
pub(crate) async fn process<P: Display + Eq + std::hash::Hash + Clone, Conn: Reader, W: Writer>(
peer: &P,
mut stream: Conn,
streams: Connections<P, W>,
rooms: Rooms<P>,
) -> Result<(), ProcessError> {
log::info!("Accepted from `{peer}`");
streams
.lock()
.await
.get_mut(peer)
.ok_or_else(|| format!("Could not get connection for {peer}"))?
.write_message(ServerMessage::Hello(
PROTOCOL_VERSION,
CRATE_VERSION.into(),
COMMIT_HASH,
))
.await?;
let mut user = None;
let mut room = None;
while let Some(msg) = stream.get_next_message::<ClientMessage>().await? {
log::debug!("{peer}: {msg:?}");
let error = match process_message(msg, &mut user, &mut room, peer, &streams, &rooms).await {
Ok(()) => None,
Err(err) => match err {
ProcessMessageError::WriteMsgErr(write_message_error)
| ProcessMessageError::SendRoomUpdateErr(SendRoomUpdateError::WriteMessageError(
write_message_error,
)) => {
return Err(write_message_error.into());
}
ProcessMessageError::Other(err)
| ProcessMessageError::SendRoomUpdateErr(SendRoomUpdateError::Other(err)) => {
Some(err)
}
},
};
if let Some(error) = error {
log::info!("handling message failed: {error}");
streams
.lock()
.await
.get_mut(peer)
.ok_or_else(|| format!("Could not get connection for {peer}"))?
.write_message(ServerMessage::Error(error))
.await?;
}
}
log::info!("Connection closed `{peer}`");
Ok(())
}
#[derive(thiserror::Error, Debug)]
pub enum ProcessMessageError {
#[error("{}", .0)]
WriteMsgErr(#[from] WriteMessageError),
#[error("{}", .0)]
SendRoomUpdateErr(#[from] SendRoomUpdateError),
#[error("{}", .0)]
Other(String),
}
impl From<String> for ProcessMessageError {
fn from(value: String) -> Self {
ProcessMessageError::Other(value)
}
}
async fn process_message<P: Display + Eq + std::hash::Hash + Clone, W: Writer>(
message: ClientMessage,
user_id: &mut Option<Uuid>,
room: &mut Option<RoomId>,
peer: &P,
streams: &Connections<P, W>,
rooms: &Rooms<P>,
) -> Result<(), ProcessMessageError> {
let user: Option<User> = match (user_id.as_ref(), room.as_ref()) {
(Some(user_id), Some(room_id)) => {
let guard = rooms.lock().await;
if let Some(room) = guard.get(room_id) {
room.users.values().find(|u| u.id == *user_id).cloned()
} else {
None
}
}
_ => None,
};
let _: () = match message {
ClientMessage::Ping { c } => {
streams
.lock()
.await
.get_mut(peer)
.ok_or_else(|| {
ProcessMessageError::Other(format!("Could not get connection for {peer}"))
})?
.write_message(ServerMessage::Pong { c })
.await?;
}
ClientMessage::Join {
room_id,
user: uuid,
} => {
if let Some(old_room_id) = room {
if let Some(room) = rooms.lock().await.get_mut(old_room_id) {
room.users.remove(peer);
}
send_room_update(*old_room_id, rooms, streams).await?;
}
rooms
.lock()
.await
.entry(room_id)
.or_insert_with(Default::default)
.users
.insert(
peer.clone(),
User {
id: uuid,
state: UserState::InRoom,
},
);
*room = Some(room_id);
*user_id = Some(uuid);
log::info!("User {uuid} joined {room_id}");
send_room_update(room_id, rooms, streams).await?;
}
ClientMessage::Play { value, round } => {
round.validate()?;
let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
if !round.users.contains(&u.id) {
Err(format!("User {u:?} not in {round:?}"))?
}
match u.state {
UserState::InRoom => (),
UserState::Played(hash) => {
if hash != value {
Err("Tried to play different hash while in Played state".to_string())?;
}
return Ok(());
}
UserState::Confirmed(_) => {
Err("Tried to play while in Confirmed state".to_string())?
}
}
u.state = UserState::Played(value);
let room_id = room.expect("should exist if user exists");
let mut guard = rooms.lock().await;
let room = guard.entry(room_id).or_insert_with(Default::default);
if let Some(ref old_round) = room.round {
if *old_round != round {
return Err(ProcessMessageError::Other(format!(
"Expected round {old_round:?}"
)));
}
} else {
room.round = Some(round);
}
room.users.insert(peer.clone(), u);
drop(guard);
send_room_update(room_id, rooms, streams).await?;
}
ClientMessage::ConfirmPlay(hash_with_data) => {
let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
if let UserState::Played(hash) = u.state {
if hash_with_data.get_hash() != hash.as_ref() {
Err(format!(
"User played wrong hash {} expected {}",
hash_with_data.get_hash(),
hash.as_ref()
))?
}
} else {
Err(format!(
"User tried to confirm play when state was {:?}",
u.state
))?
}
let room_id = room.expect("should exist if user exists");
hash_with_data.verify(
rooms
.lock()
.await
.entry(room_id)
.or_insert_with(Default::default)
.round
.as_ref()
.expect("round"),
)?;
u.state = UserState::Confirmed(hash_with_data);
rooms
.lock()
.await
.entry(room_id)
.or_insert_with(Default::default)
.users
.insert(peer.clone(), u);
send_room_update(room_id, rooms, streams).await?;
}
ClientMessage::RoundFinished { round_id } => {
let mut u = user.ok_or_else(|| format!("{peer} not yet in room"))?;
if !matches!(u.state, UserState::Confirmed(_)) {
Err("not in confirmed state".to_string())?;
}
u.state = UserState::InRoom;
let room_id = room.expect("should exist if user exists");
let mut guard = rooms.lock().await;
let room = guard.entry(room_id).or_insert_with(Default::default);
room.users.insert(peer.clone(), u);
if room.round.as_ref().map(|r| r.round_id) != Some(round_id) {
Err(format!(
"Invalid round id, expected: {:?}",
room.round.as_ref().map(|r| r.round_id)
))?;
}
let round = room.round.as_ref().expect("round checked above");
if room
.users
.values()
.filter(|u| round.users.contains(&u.id))
.all(|u| matches!(u.state, UserState::InRoom))
{
room.round = None;
}
drop(guard);
send_room_update(room_id, rooms, streams).await?;
}
};
Ok(())
}