1use crate::connection::{Message, WebSocketConnection, WebSocketError};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12
13#[derive(Debug, thiserror::Error)]
15pub enum RoomError {
16 #[error("Client not found")]
18 ClientNotFound(String),
19 #[error("Room not found")]
21 RoomNotFound(String),
22 #[error("Client already exists")]
24 ClientAlreadyExists(String),
25 #[error("WebSocket error")]
27 WebSocket(#[from] WebSocketError),
28 #[error("Metadata error")]
30 Metadata(String),
31}
32
33pub type RoomResult<T> = Result<T, RoomError>;
35
36#[derive(Debug)]
45pub struct BroadcastResult {
46 pub successful: Vec<String>,
48 pub failed: Vec<(String, WebSocketError)>,
50}
51
52impl BroadcastResult {
53 pub fn is_complete_success(&self) -> bool {
55 self.failed.is_empty()
56 }
57
58 pub fn is_partial_success(&self) -> bool {
60 !self.successful.is_empty()
61 }
62
63 pub fn is_complete_failure(&self) -> bool {
65 self.successful.is_empty() && !self.failed.is_empty()
66 }
67
68 pub fn failure_count(&self) -> usize {
70 self.failed.len()
71 }
72
73 pub fn failed_client_ids(&self) -> Vec<&str> {
75 self.failed.iter().map(|(id, _)| id.as_str()).collect()
76 }
77}
78
79pub struct Room {
100 id: String,
101 clients: Arc<RwLock<HashMap<String, Arc<WebSocketConnection>>>>,
102 metadata: Arc<RwLock<HashMap<String, Value>>>,
103}
104
105impl Room {
106 pub fn new(id: String) -> Self {
117 Self {
118 id,
119 clients: Arc::new(RwLock::new(HashMap::new())),
120 metadata: Arc::new(RwLock::new(HashMap::new())),
121 }
122 }
123
124 pub fn id(&self) -> &str {
135 &self.id
136 }
137
138 pub async fn join(
158 &self,
159 client_id: String,
160 client: Arc<WebSocketConnection>,
161 ) -> RoomResult<()> {
162 let mut clients = self.clients.write().await;
163
164 if clients.contains_key(&client_id) {
165 return Err(RoomError::ClientAlreadyExists(client_id));
166 }
167
168 clients.insert(client_id, client);
169 Ok(())
170 }
171
172 pub async fn leave(&self, client_id: &str) -> RoomResult<()> {
195 let mut clients = self.clients.write().await;
196
197 clients
198 .remove(client_id)
199 .ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?;
200
201 Ok(())
202 }
203
204 pub async fn broadcast(&self, message: Message) -> BroadcastResult {
237 let clients = self.clients.read().await;
238
239 let mut successful = Vec::new();
240 let mut failed = Vec::new();
241
242 for (client_id, client) in clients.iter() {
243 match client.send(message.clone()).await {
244 Ok(()) => successful.push(client_id.clone()),
245 Err(e) => failed.push((client_id.clone(), e)),
246 }
247 }
248
249 drop(clients);
251
252 if !failed.is_empty() {
254 let mut clients_write = self.clients.write().await;
255 for (client_id, _) in &failed {
256 clients_write.remove(client_id);
257 }
258 }
259
260 BroadcastResult { successful, failed }
261 }
262
263 pub async fn broadcast_with_timeout(
298 &self,
299 message: Message,
300 send_timeout: Duration,
301 ) -> BroadcastResult {
302 let clients = self.clients.read().await;
303
304 let mut successful = Vec::new();
305 let mut failed = Vec::new();
306
307 for (client_id, client) in clients.iter() {
308 let send_future = client.send(message.clone());
309 match tokio::time::timeout(send_timeout, send_future).await {
310 Ok(Ok(())) => successful.push(client_id.clone()),
311 Ok(Err(e)) => failed.push((client_id.clone(), e)),
312 Err(_elapsed) => {
313 failed.push((
314 client_id.clone(),
315 WebSocketError::SlowConsumer(send_timeout),
316 ));
317 }
318 }
319 }
320
321 drop(clients);
323
324 if !failed.is_empty() {
326 let mut clients_write = self.clients.write().await;
327 for (client_id, _) in &failed {
328 clients_write.remove(client_id);
329 }
330 }
331
332 BroadcastResult { successful, failed }
333 }
334
335 pub async fn send_to(&self, client_id: &str, message: Message) -> RoomResult<()> {
364 let client = {
365 let clients = self.clients.read().await;
366 clients
367 .get(client_id)
368 .ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?
369 .clone()
370 };
371
372 client.send(message).await?;
373
374 Ok(())
375 }
376
377 pub async fn client_count(&self) -> usize {
404 let clients = self.clients.read().await;
405 clients.len()
406 }
407
408 pub async fn client_ids(&self) -> Vec<String> {
437 let clients = self.clients.read().await;
438 clients.keys().cloned().collect()
439 }
440
441 pub async fn has_client(&self, client_id: &str) -> bool {
464 let clients = self.clients.read().await;
465 clients.contains_key(client_id)
466 }
467
468 pub async fn set_metadata<T: serde::Serialize>(&self, key: &str, value: T) -> RoomResult<()> {
487 let json_value =
488 serde_json::to_value(value).map_err(|e| RoomError::Metadata(e.to_string()))?;
489
490 let mut metadata = self.metadata.write().await;
491 metadata.insert(key.to_string(), json_value);
492
493 Ok(())
494 }
495
496 pub async fn get_metadata<T>(&self, key: &str) -> RoomResult<Option<T>>
517 where
518 T: serde::de::DeserializeOwned,
519 {
520 let metadata = self.metadata.read().await;
521
522 metadata
523 .get(key)
524 .map(|v| serde_json::from_value(v.clone()))
525 .transpose()
526 .map_err(|e| RoomError::Metadata(e.to_string()))
527 }
528
529 pub async fn remove_metadata(&self, key: &str) -> Option<Value> {
548 let mut metadata = self.metadata.write().await;
549 metadata.remove(key)
550 }
551
552 pub async fn clear_metadata(&self) {
573 let mut metadata = self.metadata.write().await;
574 metadata.clear();
575 }
576
577 pub async fn is_empty(&self) -> bool {
599 let clients = self.clients.read().await;
600 clients.is_empty()
601 }
602}
603
604pub struct RoomManager {
620 rooms: Arc<RwLock<HashMap<String, Arc<Room>>>>,
621}
622
623impl RoomManager {
624 pub fn new() -> Self {
637 Self {
638 rooms: Arc::new(RwLock::new(HashMap::new())),
639 }
640 }
641
642 pub async fn create_room(&self, id: String) -> Arc<Room> {
656 let mut rooms = self.rooms.write().await;
657
658 let room = Arc::new(Room::new(id.clone()));
659 rooms.insert(id, room.clone());
660
661 room
662 }
663
664 pub async fn get_room(&self, id: &str) -> Option<Arc<Room>> {
681 let rooms = self.rooms.read().await;
682 rooms.get(id).cloned()
683 }
684
685 pub async fn get_or_create_room(&self, id: String) -> Arc<Room> {
705 let mut rooms = self.rooms.write().await;
706
707 if let Some(room) = rooms.get(&id) {
708 return room.clone();
709 }
710
711 let room = Arc::new(Room::new(id.clone()));
712 rooms.insert(id, room.clone());
713 room
714 }
715
716 pub async fn delete_room(&self, id: &str) -> RoomResult<()> {
734 let mut rooms = self.rooms.write().await;
735
736 rooms
737 .remove(id)
738 .ok_or_else(|| RoomError::RoomNotFound(id.to_string()))?;
739
740 Ok(())
741 }
742
743 pub async fn room_count(&self) -> usize {
761 let rooms = self.rooms.read().await;
762 rooms.len()
763 }
764
765 pub async fn room_ids(&self) -> Vec<String> {
785 let rooms = self.rooms.read().await;
786 rooms.keys().cloned().collect()
787 }
788
789 pub async fn has_room(&self, id: &str) -> bool {
805 let rooms = self.rooms.read().await;
806 rooms.contains_key(id)
807 }
808
809 pub async fn cleanup_empty_rooms(&self) {
836 let mut rooms = self.rooms.write().await;
837 let empty_room_ids: Vec<String> = {
838 let mut empty_ids = Vec::new();
839 for (id, room) in rooms.iter() {
840 if room.is_empty().await {
841 empty_ids.push(id.clone());
842 }
843 }
844 empty_ids
845 };
846
847 for id in empty_room_ids {
848 rooms.remove(&id);
849 }
850 }
851
852 pub async fn join_room(
873 &self,
874 room_id: String,
875 connection: Arc<WebSocketConnection>,
876 ) -> RoomResult<()> {
877 let room = self
878 .get_room(&room_id)
879 .await
880 .ok_or_else(|| RoomError::RoomNotFound(room_id.clone()))?;
881
882 let client_id = connection.id().to_string();
883 room.join(client_id, connection).await
884 }
885
886 pub async fn leave_room(&self, room_id: &str, user_id: &str) -> RoomResult<()> {
908 let room = self
909 .get_room(room_id)
910 .await
911 .ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
912
913 room.leave(user_id).await
914 }
915
916 pub async fn get_room_size(&self, room_id: &str) -> usize {
938 if let Some(room) = self.get_room(room_id).await {
939 room.client_count().await
940 } else {
941 0
942 }
943 }
944
945 pub async fn broadcast_to_room(
974 &self,
975 room_id: &str,
976 message: Message,
977 ) -> RoomResult<BroadcastResult> {
978 let room = self
979 .get_room(room_id)
980 .await
981 .ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
982
983 Ok(room.broadcast(message).await)
984 }
985
986 pub async fn broadcast_to_room_with_timeout(
1020 &self,
1021 room_id: &str,
1022 message: Message,
1023 send_timeout: Duration,
1024 ) -> RoomResult<BroadcastResult> {
1025 let room = self
1026 .get_room(room_id)
1027 .await
1028 .ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
1029
1030 Ok(room.broadcast_with_timeout(message, send_timeout).await)
1031 }
1032
1033 pub async fn broadcast_to_all(&self, message: Message) -> BroadcastResult {
1066 let rooms = self.rooms.read().await;
1067
1068 let mut successful = Vec::new();
1069 let mut failed = Vec::new();
1070
1071 for room in rooms.values() {
1072 let result = room.broadcast(message.clone()).await;
1073 successful.extend(result.successful);
1074 failed.extend(result.failed);
1075 }
1076
1077 BroadcastResult { successful, failed }
1078 }
1079
1080 pub async fn get_all_rooms(&self) -> Vec<String> {
1099 self.room_ids().await
1100 }
1101}
1102
1103impl Default for RoomManager {
1104 fn default() -> Self {
1105 Self::new()
1106 }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111 use super::*;
1112 use rstest::rstest;
1113 use tokio::sync::mpsc;
1114
1115 #[rstest]
1116 #[tokio::test]
1117 async fn test_room_new() {
1118 let room = Room::new("test_room".to_string());
1119 assert_eq!(room.id(), "test_room");
1120 assert_eq!(room.client_count().await, 0);
1121 assert!(room.is_empty().await);
1122 }
1123
1124 #[tokio::test]
1125 async fn test_room_join_client() {
1126 let room = Room::new("join_test".to_string());
1127 let (tx, _rx) = mpsc::unbounded_channel();
1128 let client = Arc::new(WebSocketConnection::new("client1".to_string(), tx));
1129
1130 room.join("client1".to_string(), client).await.unwrap();
1131 assert_eq!(room.client_count().await, 1);
1132 assert!(room.has_client("client1").await);
1133 }
1134
1135 #[tokio::test]
1136 async fn test_room_join_duplicate_client() {
1137 let room = Room::new("duplicate_test".to_string());
1138 let (tx1, _rx1) = mpsc::unbounded_channel();
1139 let (tx2, _rx2) = mpsc::unbounded_channel();
1140
1141 let client1 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx1));
1142 let client2 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx2));
1143
1144 room.join("duplicate".to_string(), client1).await.unwrap();
1145 let result = room.join("duplicate".to_string(), client2).await;
1146
1147 assert!(result.is_err());
1148 assert!(matches!(
1149 result.unwrap_err(),
1150 RoomError::ClientAlreadyExists(_)
1151 ));
1152 }
1153
1154 #[tokio::test]
1155 async fn test_room_leave_client() {
1156 let room = Room::new("leave_test".to_string());
1157 let (tx, _rx) = mpsc::unbounded_channel();
1158 let client = Arc::new(WebSocketConnection::new("leaver".to_string(), tx));
1159
1160 room.join("leaver".to_string(), client).await.unwrap();
1161 assert!(room.has_client("leaver").await);
1162
1163 room.leave("leaver").await.unwrap();
1164 assert!(!room.has_client("leaver").await);
1165 assert_eq!(room.client_count().await, 0);
1166 }
1167
1168 #[tokio::test]
1169 async fn test_room_leave_nonexistent_client() {
1170 let room = Room::new("leave_error_test".to_string());
1171 let result = room.leave("nonexistent").await;
1172
1173 assert!(result.is_err());
1174 assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1175 }
1176
1177 #[rstest]
1178 #[tokio::test]
1179 async fn test_room_broadcast() {
1180 let room = Room::new("broadcast_test".to_string());
1182
1183 let (tx1, mut rx1) = mpsc::unbounded_channel();
1184 let (tx2, mut rx2) = mpsc::unbounded_channel();
1185 let (tx3, mut rx3) = mpsc::unbounded_channel();
1186
1187 let client1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
1188 let client2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
1189 let client3 = Arc::new(WebSocketConnection::new("user3".to_string(), tx3));
1190
1191 room.join("user1".to_string(), client1).await.unwrap();
1192 room.join("user2".to_string(), client2).await.unwrap();
1193 room.join("user3".to_string(), client3).await.unwrap();
1194
1195 let msg = Message::text("Broadcast message".to_string());
1197 let result = room.broadcast(msg).await;
1198
1199 assert!(result.is_complete_success());
1201 assert_eq!(result.successful.len(), 3);
1202 assert_eq!(result.failure_count(), 0);
1203 assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1204 assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1205 assert!(matches!(rx3.try_recv(), Ok(Message::Text { .. })));
1206 }
1207
1208 #[tokio::test]
1209 async fn test_room_send_to_specific_client() {
1210 let room = Room::new("private_msg_test".to_string());
1211
1212 let (tx1, mut rx1) = mpsc::unbounded_channel();
1213 let (tx2, mut rx2) = mpsc::unbounded_channel();
1214
1215 let client1 = Arc::new(WebSocketConnection::new("target".to_string(), tx1));
1216 let client2 = Arc::new(WebSocketConnection::new("other".to_string(), tx2));
1217
1218 room.join("target".to_string(), client1).await.unwrap();
1219 room.join("other".to_string(), client2).await.unwrap();
1220
1221 let msg = Message::text("Private message".to_string());
1222 room.send_to("target", msg).await.unwrap();
1223
1224 assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1225 assert!(rx2.try_recv().is_err());
1226 }
1227
1228 #[tokio::test]
1229 async fn test_room_send_to_nonexistent_client() {
1230 let room = Room::new("send_error_test".to_string());
1231 let msg = Message::text("Test".to_string());
1232 let result = room.send_to("nonexistent", msg).await;
1233
1234 assert!(result.is_err());
1235 assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1236 }
1237
1238 #[tokio::test]
1239 async fn test_room_client_ids() {
1240 let room = Room::new("ids_test".to_string());
1241
1242 let (tx1, _rx1) = mpsc::unbounded_channel();
1243 let (tx2, _rx2) = mpsc::unbounded_channel();
1244
1245 let client1 = Arc::new(WebSocketConnection::new("alpha".to_string(), tx1));
1246 let client2 = Arc::new(WebSocketConnection::new("beta".to_string(), tx2));
1247
1248 room.join("alpha".to_string(), client1).await.unwrap();
1249 room.join("beta".to_string(), client2).await.unwrap();
1250
1251 let ids = room.client_ids().await;
1252 assert_eq!(ids.len(), 2);
1253 assert!(ids.contains(&"alpha".to_string()));
1254 assert!(ids.contains(&"beta".to_string()));
1255 }
1256
1257 #[tokio::test]
1258 async fn test_room_metadata_set_and_get() {
1259 use serde_json::json;
1260
1261 let room = Room::new("metadata_test".to_string());
1262
1263 room.set_metadata("max_users", json!(100)).await.unwrap();
1264 room.set_metadata("topic", json!("General Chat"))
1265 .await
1266 .unwrap();
1267
1268 let max_users: i64 = room.get_metadata("max_users").await.unwrap().unwrap();
1269 assert_eq!(max_users, 100);
1270
1271 let topic: String = room.get_metadata("topic").await.unwrap().unwrap();
1272 assert_eq!(topic, "General Chat");
1273 }
1274
1275 #[tokio::test]
1276 async fn test_room_metadata_get_nonexistent() {
1277 let room = Room::new("metadata_missing_test".to_string());
1278 let result: Option<String> = room.get_metadata("nonexistent").await.unwrap();
1279 assert!(result.is_none());
1280 }
1281
1282 #[tokio::test]
1283 async fn test_room_metadata_remove() {
1284 use serde_json::json;
1285
1286 let room = Room::new("metadata_remove_test".to_string());
1287
1288 room.set_metadata("temp", json!("value")).await.unwrap();
1289 assert!(room.get_metadata::<String>("temp").await.unwrap().is_some());
1290
1291 room.remove_metadata("temp").await;
1292 assert!(room.get_metadata::<String>("temp").await.unwrap().is_none());
1293 }
1294
1295 #[tokio::test]
1296 async fn test_room_metadata_clear() {
1297 use serde_json::json;
1298
1299 let room = Room::new("metadata_clear_test".to_string());
1300
1301 room.set_metadata("key1", json!("value1")).await.unwrap();
1302 room.set_metadata("key2", json!("value2")).await.unwrap();
1303
1304 room.clear_metadata().await;
1305
1306 assert!(room.get_metadata::<String>("key1").await.unwrap().is_none());
1307 assert!(room.get_metadata::<String>("key2").await.unwrap().is_none());
1308 }
1309
1310 #[tokio::test]
1311 async fn test_room_is_empty() {
1312 let room = Room::new("empty_test".to_string());
1313 assert!(room.is_empty().await);
1314
1315 let (tx, _rx) = mpsc::unbounded_channel();
1316 let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1317
1318 room.join("user".to_string(), client).await.unwrap();
1319 assert!(!room.is_empty().await);
1320
1321 room.leave("user").await.unwrap();
1322 assert!(room.is_empty().await);
1323 }
1324
1325 #[tokio::test]
1326 async fn test_room_manager_new() {
1327 let manager = RoomManager::new();
1328 assert_eq!(manager.room_count().await, 0);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_room_manager_create_room() {
1333 let manager = RoomManager::new();
1334 let room = manager.create_room("new_room".to_string()).await;
1335
1336 assert_eq!(room.id(), "new_room");
1337 assert_eq!(manager.room_count().await, 1);
1338 }
1339
1340 #[tokio::test]
1341 async fn test_room_manager_get_room() {
1342 let manager = RoomManager::new();
1343 manager.create_room("existing".to_string()).await;
1344
1345 let room = manager.get_room("existing").await;
1346 assert!(room.is_some());
1347 assert_eq!(room.unwrap().id(), "existing");
1348
1349 let missing = manager.get_room("missing").await;
1350 assert!(missing.is_none());
1351 }
1352
1353 #[tokio::test]
1354 async fn test_room_manager_get_or_create_room() {
1355 let manager = RoomManager::new();
1356
1357 let room1 = manager.get_or_create_room("auto_room".to_string()).await;
1358 assert_eq!(manager.room_count().await, 1);
1359
1360 let room2 = manager.get_or_create_room("auto_room".to_string()).await;
1361 assert_eq!(manager.room_count().await, 1);
1362
1363 assert_eq!(room1.id(), room2.id());
1364 }
1365
1366 #[tokio::test]
1367 async fn test_room_manager_delete_room() {
1368 let manager = RoomManager::new();
1369 manager.create_room("to_delete".to_string()).await;
1370
1371 assert!(manager.has_room("to_delete").await);
1372
1373 manager.delete_room("to_delete").await.unwrap();
1374 assert!(!manager.has_room("to_delete").await);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_room_manager_delete_nonexistent_room() {
1379 let manager = RoomManager::new();
1380 let result = manager.delete_room("nonexistent").await;
1381
1382 assert!(result.is_err());
1383 assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1384 }
1385
1386 #[tokio::test]
1387 async fn test_room_manager_room_ids() {
1388 let manager = RoomManager::new();
1389
1390 manager.create_room("room1".to_string()).await;
1391 manager.create_room("room2".to_string()).await;
1392 manager.create_room("room3".to_string()).await;
1393
1394 let ids = manager.room_ids().await;
1395 assert_eq!(ids.len(), 3);
1396 assert!(ids.contains(&"room1".to_string()));
1397 assert!(ids.contains(&"room2".to_string()));
1398 assert!(ids.contains(&"room3".to_string()));
1399 }
1400
1401 #[tokio::test]
1402 async fn test_room_manager_has_room() {
1403 let manager = RoomManager::new();
1404 manager.create_room("check".to_string()).await;
1405
1406 assert!(manager.has_room("check").await);
1407 assert!(!manager.has_room("missing").await);
1408 }
1409
1410 #[tokio::test]
1411 async fn test_room_manager_cleanup_empty_rooms() {
1412 let manager = RoomManager::new();
1413
1414 let _empty_room = manager.create_room("empty".to_string()).await;
1415 let occupied_room = manager.create_room("occupied".to_string()).await;
1416
1417 let (tx, _rx) = mpsc::unbounded_channel();
1418 let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1419 occupied_room
1420 .join("user".to_string(), client)
1421 .await
1422 .unwrap();
1423
1424 manager.cleanup_empty_rooms().await;
1425
1426 assert!(!manager.has_room("empty").await);
1427 assert!(manager.has_room("occupied").await);
1428 assert_eq!(manager.room_count().await, 1);
1429 }
1430
1431 #[rstest]
1432 #[tokio::test]
1433 async fn test_broadcast_partial_failure_reports_failed_clients() {
1434 let room = Room::new("partial_fail".to_string());
1436
1437 let (tx_alive, mut rx_alive) = mpsc::unbounded_channel();
1438 let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1439
1440 let alive_client = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1441 let dead_client = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1442
1443 room.join("alive".to_string(), alive_client).await.unwrap();
1444 room.join("dead".to_string(), dead_client).await.unwrap();
1445
1446 drop(_rx_dead);
1448
1449 let msg = Message::text("test message".to_string());
1451 let result = room.broadcast(msg).await;
1452
1453 assert!(!result.is_complete_success());
1455 assert!(result.is_partial_success());
1456 assert!(!result.is_complete_failure());
1457 assert_eq!(result.successful.len(), 1);
1458 assert!(result.successful.contains(&"alive".to_string()));
1459 assert_eq!(result.failure_count(), 1);
1460 assert!(result.failed_client_ids().contains(&"dead"));
1461 assert!(matches!(rx_alive.try_recv(), Ok(Message::Text { .. })));
1462 }
1463
1464 #[rstest]
1465 #[tokio::test]
1466 async fn test_broadcast_partial_failure_removes_dead_connections() {
1467 let room = Room::new("cleanup".to_string());
1469
1470 let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1471 let (tx_dead1, _rx_dead1) = mpsc::unbounded_channel::<Message>();
1472 let (tx_dead2, _rx_dead2) = mpsc::unbounded_channel::<Message>();
1473
1474 let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1475 let dead1 = Arc::new(WebSocketConnection::new("dead1".to_string(), tx_dead1));
1476 let dead2 = Arc::new(WebSocketConnection::new("dead2".to_string(), tx_dead2));
1477
1478 room.join("alive".to_string(), alive).await.unwrap();
1479 room.join("dead1".to_string(), dead1).await.unwrap();
1480 room.join("dead2".to_string(), dead2).await.unwrap();
1481 assert_eq!(room.client_count().await, 3);
1482
1483 drop(_rx_dead1);
1485 drop(_rx_dead2);
1486
1487 let msg = Message::text("cleanup test".to_string());
1489 let result = room.broadcast(msg).await;
1490
1491 assert_eq!(result.failure_count(), 2);
1493 assert_eq!(room.client_count().await, 1);
1494 assert!(room.has_client("alive").await);
1495 assert!(!room.has_client("dead1").await);
1496 assert!(!room.has_client("dead2").await);
1497 }
1498
1499 #[rstest]
1500 #[tokio::test]
1501 async fn test_broadcast_complete_failure() {
1502 let room = Room::new("all_dead".to_string());
1504
1505 let (tx1, _rx1) = mpsc::unbounded_channel::<Message>();
1506 let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1507
1508 let client1 = Arc::new(WebSocketConnection::new("c1".to_string(), tx1));
1509 let client2 = Arc::new(WebSocketConnection::new("c2".to_string(), tx2));
1510
1511 room.join("c1".to_string(), client1).await.unwrap();
1512 room.join("c2".to_string(), client2).await.unwrap();
1513
1514 drop(_rx1);
1516 drop(_rx2);
1517
1518 let msg = Message::text("nobody home".to_string());
1520 let result = room.broadcast(msg).await;
1521
1522 assert!(result.is_complete_failure());
1524 assert!(!result.is_partial_success());
1525 assert_eq!(result.failure_count(), 2);
1526 assert!(result.successful.is_empty());
1527 assert_eq!(room.client_count().await, 0);
1528 }
1529
1530 #[rstest]
1531 #[tokio::test]
1532 async fn test_broadcast_empty_room() {
1533 let room = Room::new("empty_broadcast".to_string());
1535
1536 let msg = Message::text("echo".to_string());
1538 let result = room.broadcast(msg).await;
1539
1540 assert!(result.is_complete_success());
1542 assert!(result.successful.is_empty());
1543 assert!(result.failed.is_empty());
1544 }
1545
1546 #[rstest]
1547 #[tokio::test]
1548 async fn test_broadcast_to_room_returns_broadcast_result() {
1549 let manager = RoomManager::new();
1551 manager.create_room("results".to_string()).await;
1552
1553 let (tx1, _rx1) = mpsc::unbounded_channel();
1554 let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1555 let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1556 let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1557
1558 manager
1559 .join_room("results".to_string(), conn1)
1560 .await
1561 .unwrap();
1562 manager
1563 .join_room("results".to_string(), conn2)
1564 .await
1565 .unwrap();
1566
1567 drop(_rx2);
1569
1570 let msg = Message::text("room broadcast".to_string());
1572 let result = manager.broadcast_to_room("results", msg).await.unwrap();
1573
1574 assert!(result.is_partial_success());
1576 assert_eq!(result.successful.len(), 1);
1577 assert_eq!(result.failure_count(), 1);
1578 }
1579
1580 #[rstest]
1581 #[tokio::test]
1582 async fn test_broadcast_to_all_aggregates_results() {
1583 let manager = RoomManager::new();
1585 let room1 = manager.create_room("r1".to_string()).await;
1586 let room2 = manager.create_room("r2".to_string()).await;
1587
1588 let (tx1, _rx1) = mpsc::unbounded_channel();
1589 let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1590 let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1591 let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1592
1593 room1.join("ok".to_string(), conn1).await.unwrap();
1594 room2.join("dead".to_string(), conn2).await.unwrap();
1595
1596 drop(_rx2);
1598
1599 let msg = Message::text("global".to_string());
1601 let result = manager.broadcast_to_all(msg).await;
1602
1603 assert!(result.is_partial_success());
1605 assert_eq!(result.successful.len(), 1);
1606 assert_eq!(result.failure_count(), 1);
1607 assert!(result.successful.contains(&"ok".to_string()));
1608 assert!(result.failed_client_ids().contains(&"dead"));
1609 }
1610
1611 #[rstest]
1612 #[tokio::test]
1613 async fn test_broadcast_with_timeout_succeeds_for_responsive_clients() {
1614 let room = Room::new("timeout_ok".to_string());
1616
1617 let (tx1, mut rx1) = mpsc::unbounded_channel();
1618 let (tx2, mut rx2) = mpsc::unbounded_channel();
1619
1620 let client1 = Arc::new(WebSocketConnection::new("fast1".to_string(), tx1));
1621 let client2 = Arc::new(WebSocketConnection::new("fast2".to_string(), tx2));
1622
1623 room.join("fast1".to_string(), client1).await.unwrap();
1624 room.join("fast2".to_string(), client2).await.unwrap();
1625
1626 let msg = Message::text("hello with timeout".to_string());
1628 let result = room
1629 .broadcast_with_timeout(msg, Duration::from_secs(5))
1630 .await;
1631
1632 assert!(result.is_complete_success());
1634 assert_eq!(result.successful.len(), 2);
1635 assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1636 assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1637 }
1638
1639 #[rstest]
1640 #[tokio::test]
1641 async fn test_broadcast_with_timeout_removes_dead_connections() {
1642 let room = Room::new("timeout_dead".to_string());
1644
1645 let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1646 let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1647
1648 let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1649 let dead = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1650
1651 room.join("alive".to_string(), alive).await.unwrap();
1652 room.join("dead".to_string(), dead).await.unwrap();
1653
1654 drop(_rx_dead);
1656
1657 let msg = Message::text("test".to_string());
1659 let result = room
1660 .broadcast_with_timeout(msg, Duration::from_secs(5))
1661 .await;
1662
1663 assert!(result.is_partial_success());
1665 assert_eq!(result.successful.len(), 1);
1666 assert_eq!(result.failure_count(), 1);
1667 assert_eq!(room.client_count().await, 1);
1668 assert!(room.has_client("alive").await);
1669 assert!(!room.has_client("dead").await);
1670 }
1671
1672 #[rstest]
1673 #[tokio::test]
1674 async fn test_broadcast_with_timeout_empty_room() {
1675 let room = Room::new("timeout_empty".to_string());
1677
1678 let msg = Message::text("nobody".to_string());
1680 let result = room
1681 .broadcast_with_timeout(msg, Duration::from_secs(1))
1682 .await;
1683
1684 assert!(result.is_complete_success());
1686 assert!(result.successful.is_empty());
1687 assert!(result.failed.is_empty());
1688 }
1689
1690 #[rstest]
1691 #[tokio::test]
1692 async fn test_broadcast_to_room_with_timeout() {
1693 let manager = RoomManager::new();
1695 manager.create_room("timeout_room".to_string()).await;
1696
1697 let (tx, _rx) = mpsc::unbounded_channel();
1698 let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
1699
1700 manager
1701 .join_room("timeout_room".to_string(), conn)
1702 .await
1703 .unwrap();
1704
1705 let msg = Message::text("hello".to_string());
1707 let result = manager
1708 .broadcast_to_room_with_timeout("timeout_room", msg, Duration::from_secs(5))
1709 .await
1710 .unwrap();
1711
1712 assert!(result.is_complete_success());
1714 assert_eq!(result.successful.len(), 1);
1715 }
1716
1717 #[rstest]
1718 #[tokio::test]
1719 async fn test_broadcast_to_room_with_timeout_nonexistent_room() {
1720 let manager = RoomManager::new();
1722
1723 let msg = Message::text("hello".to_string());
1725 let result = manager
1726 .broadcast_to_room_with_timeout("missing", msg, Duration::from_secs(1))
1727 .await;
1728
1729 assert!(result.is_err());
1731 assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1732 }
1733}