use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::Arc;
use acton_reactive::prelude::*;
use axum::extract::ws::Message;
use chrono::{DateTime, Utc};
use tokio::sync::mpsc;
use super::config::RoomConfig;
use super::handler::ConnectionId;
use super::messages::{
BroadcastToRoom, ConnectionDisconnected, GetRoomInfo, JoinRoomRequest, LeaveRoomRequest,
RoomInfoResponse,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RoomId(String);
impl RoomId {
#[must_use]
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for RoomId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for RoomId {
fn from(s: String) -> Self {
Self(s)
}
}
impl From<&str> for RoomId {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
#[derive(Debug, Clone)]
pub struct RoomMember {
pub connection_id: ConnectionId,
pub sender: mpsc::Sender<Message>,
pub user_id: Option<String>,
pub joined_at: DateTime<Utc>,
}
impl RoomMember {
#[must_use]
pub fn new(connection_id: ConnectionId, sender: mpsc::Sender<Message>) -> Self {
Self {
connection_id,
sender,
user_id: None,
joined_at: Utc::now(),
}
}
#[must_use]
pub fn authenticated(
connection_id: ConnectionId,
sender: mpsc::Sender<Message>,
user_id: String,
) -> Self {
Self {
connection_id,
sender,
user_id: Some(user_id),
joined_at: Utc::now(),
}
}
}
#[derive(Debug)]
pub struct Room {
pub id: RoomId,
pub members: HashMap<ConnectionId, RoomMember>,
pub created_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub metadata: HashMap<String, String>,
}
impl Room {
#[must_use]
pub fn new(id: RoomId) -> Self {
let now = Utc::now();
Self {
id,
members: HashMap::new(),
created_at: now,
last_activity: now,
metadata: HashMap::new(),
}
}
#[must_use]
pub fn member_count(&self) -> usize {
self.members.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.members.is_empty()
}
pub fn touch(&mut self) {
self.last_activity = Utc::now();
}
}
#[derive(Debug, Default)]
pub struct RoomManagerState {
rooms: HashMap<RoomId, Room>,
connection_rooms: HashMap<ConnectionId, HashSet<RoomId>>,
max_members_per_room: usize,
max_rooms_per_connection: usize,
}
pub type SharedRoomManager = Arc<ActorHandle>;
pub struct RoomManager;
impl RoomManager {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: RoomConfig,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<RoomManagerState>();
agent.model.max_members_per_room = config.max_members;
agent.model.max_rooms_per_connection = config.max_rooms_per_connection;
agent.mutate_on::<JoinRoomRequest>(|agent, envelope| {
let request = envelope.message();
let room_id = request.room_id.clone();
let member = request.member.clone();
let connection_id = member.connection_id;
let connection_rooms = agent
.model
.connection_rooms
.entry(connection_id)
.or_default();
if connection_rooms.len() >= agent.model.max_rooms_per_connection {
tracing::warn!(
connection_id = %connection_id,
limit = agent.model.max_rooms_per_connection,
"Connection at max room limit"
);
return Reply::ready();
}
let room = agent
.model
.rooms
.entry(room_id.clone())
.or_insert_with(|| Room::new(room_id.clone()));
if room.members.len() >= agent.model.max_members_per_room {
tracing::warn!(
room_id = %room_id,
limit = agent.model.max_members_per_room,
"Room at max capacity"
);
return Reply::ready();
}
room.members.insert(connection_id, member);
room.touch();
connection_rooms.insert(room_id.clone());
tracing::info!(
room_id = %room_id,
connection_id = %connection_id,
member_count = room.members.len(),
"Member joined room"
);
Reply::ready()
});
agent.mutate_on::<LeaveRoomRequest>(|agent, envelope| {
let request = envelope.message();
let room_id = &request.room_id;
let connection_id = request.connection_id;
if let Some(room) = agent.model.rooms.get_mut(room_id) {
room.members.remove(&connection_id);
room.touch();
tracing::info!(
room_id = %room_id,
connection_id = %connection_id,
member_count = room.members.len(),
"Member left room"
);
if room.is_empty() {
agent.model.rooms.remove(room_id);
tracing::debug!(room_id = %room_id, "Empty room removed");
}
}
if let Some(rooms) = agent.model.connection_rooms.get_mut(&connection_id) {
rooms.remove(room_id);
}
Reply::ready()
});
agent.act_on::<BroadcastToRoom>(|agent, envelope| {
let request = envelope.message();
let room_id = &request.room_id;
let message = request.message.clone();
let exclude_sender = request.exclude_sender;
if let Some(room) = agent.model.rooms.get(room_id) {
let senders: Vec<_> = room
.members
.values()
.filter(|m| {
exclude_sender
.map(|id| m.connection_id != id)
.unwrap_or(true)
})
.map(|m| m.sender.clone())
.collect();
let member_count = senders.len();
let room_id_log = room_id.clone();
Reply::pending(async move {
let mut sent = 0;
for sender in senders {
if sender.send(message.clone()).await.is_ok() {
sent += 1;
}
}
tracing::debug!(
room_id = %room_id_log,
sent = sent,
total = member_count,
"Broadcast completed"
);
})
} else {
Reply::ready()
}
});
agent.mutate_on::<ConnectionDisconnected>(|agent, envelope| {
let connection_id = envelope.message().connection_id;
if let Some(room_ids) = agent.model.connection_rooms.remove(&connection_id) {
for room_id in room_ids {
if let Some(room) = agent.model.rooms.get_mut(&room_id) {
room.members.remove(&connection_id);
if room.is_empty() {
agent.model.rooms.remove(&room_id);
tracing::debug!(room_id = %room_id, "Empty room removed after disconnect");
}
}
}
}
tracing::debug!(
connection_id = %connection_id,
"Connection removed from all rooms"
);
Reply::ready()
});
agent.act_on::<GetRoomInfo>(|agent, envelope| {
let room_id = envelope.message().room_id.clone();
let reply = envelope.reply_envelope();
let response = if let Some(room) = agent.model.rooms.get(&room_id) {
RoomInfoResponse {
room_id,
member_count: room.member_count(),
exists: true,
}
} else {
RoomInfoResponse {
room_id,
member_count: 0,
exists: false,
}
};
Reply::pending(async move {
reply.send(response).await;
})
});
agent.after_start(|_agent| {
tracing::info!("WebSocket room manager started");
Reply::ready()
});
agent.before_stop(|agent| {
let room_count = agent.model.rooms.len();
let connection_count = agent.model.connection_rooms.len();
tracing::info!(
rooms = room_count,
connections = connection_count,
"WebSocket room manager shutting down"
);
Reply::ready()
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_room_id_from_string() {
let id: RoomId = "test-room".into();
assert_eq!(id.as_str(), "test-room");
}
#[test]
fn test_room_creation() {
let room = Room::new("test".into());
assert!(room.is_empty());
assert_eq!(room.member_count(), 0);
}
#[tokio::test]
async fn test_room_member_creation() {
let (tx, _rx) = mpsc::channel(32);
let member = RoomMember::new(ConnectionId::new(), tx);
assert!(member.user_id.is_none());
}
#[tokio::test]
async fn test_authenticated_member() {
let (tx, _rx) = mpsc::channel(32);
let member = RoomMember::authenticated(ConnectionId::new(), tx, "user123".to_string());
assert_eq!(member.user_id, Some("user123".to_string()));
}
}