use anyhow::Result;
use async_trait::async_trait;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::distributed::DistributedLock;
use crate::protocol::{PlayerId, RoomId};
use super::MessageCoordinator;
#[async_trait]
pub trait RoomOperationCoordinatorTrait: Send + Sync {
async fn transition_room_to_lobby(&self, room_id: &RoomId) -> Result<bool>;
async fn coordinate_authority_transfer(
&self,
room_id: &RoomId,
new_authority: &PlayerId,
) -> Result<bool>;
async fn execute_distributed_operation(&self, operation: &str, room_id: &RoomId) -> Result<()>;
async fn handle_authority_request(
&self,
room_id: &RoomId,
player_id: &PlayerId,
become_authority: bool,
) -> Result<(bool, Option<String>)>;
async fn handle_player_ready(
&self,
room_id: &RoomId,
player_id: &PlayerId,
app_id: Option<Uuid>,
) -> Result<()>;
async fn clear_ready_players(&self, room_id: &RoomId) -> Result<()>;
}
pub struct InMemoryRoomOperationCoordinator {
coordinator: Arc<dyn MessageCoordinator>,
distributed_lock: Arc<dyn DistributedLock>,
database: Arc<dyn crate::database::GameDatabase>,
ready_players: Arc<RwLock<HashMap<RoomId, HashSet<PlayerId>>>>,
}
impl InMemoryRoomOperationCoordinator {
pub fn new(
coordinator: Arc<dyn MessageCoordinator>,
distributed_lock: Arc<dyn DistributedLock>,
database: Arc<dyn crate::database::GameDatabase>,
) -> Self {
Self {
coordinator,
distributed_lock,
database,
ready_players: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[async_trait]
impl RoomOperationCoordinatorTrait for InMemoryRoomOperationCoordinator {
async fn transition_room_to_lobby(&self, room_id: &RoomId) -> Result<bool> {
let lock_key = format!("room_lobby_transition:{room_id}");
let _lock_handle = self
.distributed_lock
.acquire(&lock_key, Duration::from_secs(10))
.await?;
let message = crate::protocol::ServerMessage::LobbyStateChanged {
lobby_state: crate::protocol::LobbyState::Lobby,
ready_players: Vec::new(),
all_ready: false,
};
self.coordinator
.broadcast_to_room(room_id, Arc::new(message))
.await?;
tracing::info!(%room_id, "Room transitioned to lobby state (in-memory)");
Ok(true)
}
async fn coordinate_authority_transfer(
&self,
room_id: &RoomId,
new_authority: &PlayerId,
) -> Result<bool> {
let lock_key = format!("authority_transfer:{room_id}:{new_authority}");
let _lock_handle = self
.distributed_lock
.acquire(&lock_key, Duration::from_secs(5))
.await?;
let message = crate::protocol::ServerMessage::AuthorityChanged {
authority_player: Some(*new_authority),
you_are_authority: false, };
self.coordinator
.broadcast_to_room(room_id, Arc::new(message))
.await?;
tracing::info!(%room_id, %new_authority, "Authority transferred (in-memory)");
Ok(true)
}
async fn execute_distributed_operation(&self, operation: &str, room_id: &RoomId) -> Result<()> {
let lock_key = format!("distributed_op:{room_id}:{operation}");
let _lock_handle = self
.distributed_lock
.acquire(&lock_key, Duration::from_secs(5))
.await?;
tracing::info!(%room_id, %operation, "Executed distributed operation (in-memory)");
Ok(())
}
async fn handle_authority_request(
&self,
room_id: &RoomId,
player_id: &PlayerId,
become_authority: bool,
) -> Result<(bool, Option<String>)> {
let lock_key = format!("room_authority:{room_id}");
let _lock_handle = self
.distributed_lock
.acquire(&lock_key, Duration::from_secs(5))
.await?;
tracing::info!(%room_id, %player_id, %become_authority, "InMemory: Processing authority request");
let result = self
.database
.request_room_authority(room_id, player_id, become_authority)
.await;
match result {
Ok((granted, reason)) => {
if granted {
let new_authority = if become_authority {
Some(*player_id)
} else {
None
};
tracing::info!(%room_id, %player_id, "Sending AuthorityResponse to player");
let response_message = crate::protocol::ServerMessage::AuthorityResponse {
granted,
reason: reason.clone(),
error_code: if granted {
None
} else {
Some(crate::protocol::ErrorCode::AuthorityDenied)
},
};
if let Err(e) = self
.coordinator
.send_to_player(player_id, Arc::new(response_message))
.await
{
tracing::error!("Failed to send authority response to player: {}", e);
}
tracing::info!(%room_id, "Sending customized AuthorityChanged messages");
let room = match self.database.get_room_by_id(room_id).await {
Ok(Some(room)) => room,
Ok(None) => {
tracing::error!(%room_id, "Room not found when handling authority change");
return Ok((granted, reason));
}
Err(e) => {
tracing::error!(%room_id, "Failed to get room: {}", e);
return Ok((granted, reason));
}
};
for room_player_id in room.players.keys() {
let is_authority = become_authority && room_player_id == player_id;
let auth_message = crate::protocol::ServerMessage::AuthorityChanged {
authority_player: new_authority,
you_are_authority: is_authority,
};
tracing::info!(
%room_id,
%room_player_id,
%is_authority,
"Sending customized AuthorityChanged message"
);
if let Err(e) = self
.coordinator
.send_to_player(room_player_id, Arc::new(auth_message))
.await
{
tracing::error!(%room_player_id, "Failed to send authority change: {}", e);
}
}
tracing::info!(%room_id, %player_id, %become_authority, "Authority request granted (in-memory)");
} else {
let response_message = crate::protocol::ServerMessage::AuthorityResponse {
granted,
reason: reason.clone(),
error_code: if granted {
None
} else {
Some(crate::protocol::ErrorCode::AuthorityDenied)
},
};
if let Err(e) = self
.coordinator
.send_to_player(player_id, Arc::new(response_message))
.await
{
tracing::error!(
"Failed to send authority denial response to player: {}",
e
);
}
tracing::info!(%room_id, %player_id, %become_authority, ?reason, "Authority request denied (in-memory)");
}
Ok((granted, reason))
}
Err(e) => {
tracing::error!(%room_id, %player_id, %become_authority, "Authority request failed: {}", e);
let response_message = crate::protocol::ServerMessage::AuthorityResponse {
granted: false,
reason: Some("Storage error".to_string()),
error_code: Some(crate::protocol::ErrorCode::StorageError),
};
if let Err(send_err) = self
.coordinator
.send_to_player(player_id, Arc::new(response_message))
.await
{
tracing::error!("Failed to send error response to player: {}", send_err);
}
Ok((false, Some("Storage error".to_string())))
}
}
}
async fn handle_player_ready(
&self,
room_id: &RoomId,
player_id: &PlayerId,
_app_id: Option<Uuid>,
) -> Result<()> {
let lock_key = format!("room_ready_state:{room_id}");
let _lock_handle = self
.distributed_lock
.acquire(&lock_key, Duration::from_secs(5))
.await?;
let room = match self.database.get_room_by_id(room_id).await {
Ok(Some(room)) => room,
Ok(None) => {
return Err(anyhow::anyhow!("Room not found"));
}
Err(e) => {
return Err(anyhow::anyhow!("Failed to get room: {e}"));
}
};
if !room.should_enter_lobby() && room.lobby_state != crate::protocol::LobbyState::Lobby {
return Err(anyhow::anyhow!("Player ready failed: room may not be in lobby state. Current state: {:?}, player count: {}/{}", room.lobby_state, room.players.len(), room.max_players));
}
let mut ready_map = self.ready_players.write().await;
let room_ready_players = ready_map.entry(*room_id).or_insert_with(HashSet::new);
let was_ready = room_ready_players.contains(player_id);
if was_ready {
room_ready_players.remove(player_id);
} else {
room_ready_players.insert(*player_id);
}
let ready_players_vec: Vec<PlayerId> = room_ready_players.iter().copied().collect();
let room_players = match self.database.get_room_players(room_id).await {
Ok(players) => players,
Err(e) => {
tracing::error!("Failed to get room players: {}", e);
Vec::new()
}
};
let all_ready = !room_players.is_empty() && ready_players_vec.len() == room_players.len();
drop(ready_map);
let message = crate::protocol::ServerMessage::LobbyStateChanged {
lobby_state: crate::protocol::LobbyState::Lobby,
ready_players: ready_players_vec.clone(),
all_ready,
};
self.coordinator
.broadcast_to_room(room_id, Arc::new(message))
.await?;
if all_ready {
let peer_connections: Vec<crate::protocol::PeerConnectionInfo> = room_players
.into_iter()
.map(|player| crate::protocol::PeerConnectionInfo {
player_id: player.id,
player_name: player.name,
is_authority: player.is_authority,
relay_type: room.relay_type.clone(),
connection_info: player.connection_info,
})
.collect();
let game_start_message =
Arc::new(crate::protocol::ServerMessage::GameStarting { peer_connections });
self.coordinator
.broadcast_to_room(room_id, game_start_message)
.await?;
let mut ready_map = self.ready_players.write().await;
ready_map.remove(room_id);
}
tracing::info!(%room_id, %player_id, ready = !was_ready, "Player ready state toggled (in-memory)");
Ok(())
}
async fn clear_ready_players(&self, room_id: &RoomId) -> Result<()> {
let mut ready_map = self.ready_players.write().await;
ready_map.remove(room_id);
tracing::info!(%room_id, "Cleared ready players from coordinator (in-memory)");
Ok(())
}
}