1use crate::connection::{Message, WebSocketConnection, WebSocketError};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12
13#[derive(Debug, thiserror::Error)]
15pub enum RoomError {
16 #[error("Client not found")]
17 ClientNotFound(String),
18 #[error("Room not found")]
19 RoomNotFound(String),
20 #[error("Client already exists")]
21 ClientAlreadyExists(String),
22 #[error("WebSocket error")]
23 WebSocket(#[from] WebSocketError),
24 #[error("Metadata error")]
25 Metadata(String),
26}
27
28pub type RoomResult<T> = Result<T, RoomError>;
29
30#[derive(Debug)]
39pub struct BroadcastResult {
40 pub successful: Vec<String>,
42 pub failed: Vec<(String, WebSocketError)>,
44}
45
46impl BroadcastResult {
47 pub fn is_complete_success(&self) -> bool {
49 self.failed.is_empty()
50 }
51
52 pub fn is_partial_success(&self) -> bool {
54 !self.successful.is_empty()
55 }
56
57 pub fn is_complete_failure(&self) -> bool {
59 self.successful.is_empty() && !self.failed.is_empty()
60 }
61
62 pub fn failure_count(&self) -> usize {
64 self.failed.len()
65 }
66
67 pub fn failed_client_ids(&self) -> Vec<&str> {
69 self.failed.iter().map(|(id, _)| id.as_str()).collect()
70 }
71}
72
73pub struct Room {
94 id: String,
95 clients: Arc<RwLock<HashMap<String, Arc<WebSocketConnection>>>>,
96 metadata: Arc<RwLock<HashMap<String, Value>>>,
97}
98
99impl Room {
100 pub fn new(id: String) -> Self {
111 Self {
112 id,
113 clients: Arc::new(RwLock::new(HashMap::new())),
114 metadata: Arc::new(RwLock::new(HashMap::new())),
115 }
116 }
117
118 pub fn id(&self) -> &str {
129 &self.id
130 }
131
132 pub async fn join(
152 &self,
153 client_id: String,
154 client: Arc<WebSocketConnection>,
155 ) -> RoomResult<()> {
156 let mut clients = self.clients.write().await;
157
158 if clients.contains_key(&client_id) {
159 return Err(RoomError::ClientAlreadyExists(client_id));
160 }
161
162 clients.insert(client_id, client);
163 Ok(())
164 }
165
166 pub async fn leave(&self, client_id: &str) -> RoomResult<()> {
189 let mut clients = self.clients.write().await;
190
191 clients
192 .remove(client_id)
193 .ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?;
194
195 Ok(())
196 }
197
198 pub async fn broadcast(&self, message: Message) -> BroadcastResult {
231 let clients = self.clients.read().await;
232
233 let mut successful = Vec::new();
234 let mut failed = Vec::new();
235
236 for (client_id, client) in clients.iter() {
237 match client.send(message.clone()).await {
238 Ok(()) => successful.push(client_id.clone()),
239 Err(e) => failed.push((client_id.clone(), e)),
240 }
241 }
242
243 drop(clients);
245
246 if !failed.is_empty() {
248 let mut clients_write = self.clients.write().await;
249 for (client_id, _) in &failed {
250 clients_write.remove(client_id);
251 }
252 }
253
254 BroadcastResult { successful, failed }
255 }
256
257 pub async fn broadcast_with_timeout(
292 &self,
293 message: Message,
294 send_timeout: Duration,
295 ) -> BroadcastResult {
296 let clients = self.clients.read().await;
297
298 let mut successful = Vec::new();
299 let mut failed = Vec::new();
300
301 for (client_id, client) in clients.iter() {
302 let send_future = client.send(message.clone());
303 match tokio::time::timeout(send_timeout, send_future).await {
304 Ok(Ok(())) => successful.push(client_id.clone()),
305 Ok(Err(e)) => failed.push((client_id.clone(), e)),
306 Err(_elapsed) => {
307 failed.push((
308 client_id.clone(),
309 WebSocketError::SlowConsumer(send_timeout),
310 ));
311 }
312 }
313 }
314
315 drop(clients);
317
318 if !failed.is_empty() {
320 let mut clients_write = self.clients.write().await;
321 for (client_id, _) in &failed {
322 clients_write.remove(client_id);
323 }
324 }
325
326 BroadcastResult { successful, failed }
327 }
328
329 pub async fn send_to(&self, client_id: &str, message: Message) -> RoomResult<()> {
358 let clients = self.clients.read().await;
359
360 let client = clients
361 .get(client_id)
362 .ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?;
363
364 client.send(message).await?;
365
366 Ok(())
367 }
368
369 pub async fn client_count(&self) -> usize {
396 let clients = self.clients.read().await;
397 clients.len()
398 }
399
400 pub async fn client_ids(&self) -> Vec<String> {
429 let clients = self.clients.read().await;
430 clients.keys().cloned().collect()
431 }
432
433 pub async fn has_client(&self, client_id: &str) -> bool {
456 let clients = self.clients.read().await;
457 clients.contains_key(client_id)
458 }
459
460 pub async fn set_metadata<T: serde::Serialize>(&self, key: &str, value: T) -> RoomResult<()> {
479 let json_value =
480 serde_json::to_value(value).map_err(|e| RoomError::Metadata(e.to_string()))?;
481
482 let mut metadata = self.metadata.write().await;
483 metadata.insert(key.to_string(), json_value);
484
485 Ok(())
486 }
487
488 pub async fn get_metadata<T>(&self, key: &str) -> RoomResult<Option<T>>
509 where
510 T: serde::de::DeserializeOwned,
511 {
512 let metadata = self.metadata.read().await;
513
514 metadata
515 .get(key)
516 .map(|v| serde_json::from_value(v.clone()))
517 .transpose()
518 .map_err(|e| RoomError::Metadata(e.to_string()))
519 }
520
521 pub async fn remove_metadata(&self, key: &str) -> Option<Value> {
540 let mut metadata = self.metadata.write().await;
541 metadata.remove(key)
542 }
543
544 pub async fn clear_metadata(&self) {
565 let mut metadata = self.metadata.write().await;
566 metadata.clear();
567 }
568
569 pub async fn is_empty(&self) -> bool {
591 let clients = self.clients.read().await;
592 clients.is_empty()
593 }
594}
595
596pub struct RoomManager {
612 rooms: Arc<RwLock<HashMap<String, Arc<Room>>>>,
613}
614
615impl RoomManager {
616 pub fn new() -> Self {
629 Self {
630 rooms: Arc::new(RwLock::new(HashMap::new())),
631 }
632 }
633
634 pub async fn create_room(&self, id: String) -> Arc<Room> {
648 let mut rooms = self.rooms.write().await;
649
650 let room = Arc::new(Room::new(id.clone()));
651 rooms.insert(id, room.clone());
652
653 room
654 }
655
656 pub async fn get_room(&self, id: &str) -> Option<Arc<Room>> {
673 let rooms = self.rooms.read().await;
674 rooms.get(id).cloned()
675 }
676
677 pub async fn get_or_create_room(&self, id: String) -> Arc<Room> {
697 let mut rooms = self.rooms.write().await;
698
699 if let Some(room) = rooms.get(&id) {
700 return room.clone();
701 }
702
703 let room = Arc::new(Room::new(id.clone()));
704 rooms.insert(id, room.clone());
705 room
706 }
707
708 pub async fn delete_room(&self, id: &str) -> RoomResult<()> {
726 let mut rooms = self.rooms.write().await;
727
728 rooms
729 .remove(id)
730 .ok_or_else(|| RoomError::RoomNotFound(id.to_string()))?;
731
732 Ok(())
733 }
734
735 pub async fn room_count(&self) -> usize {
753 let rooms = self.rooms.read().await;
754 rooms.len()
755 }
756
757 pub async fn room_ids(&self) -> Vec<String> {
777 let rooms = self.rooms.read().await;
778 rooms.keys().cloned().collect()
779 }
780
781 pub async fn has_room(&self, id: &str) -> bool {
797 let rooms = self.rooms.read().await;
798 rooms.contains_key(id)
799 }
800
801 pub async fn cleanup_empty_rooms(&self) {
828 let mut rooms = self.rooms.write().await;
829 let empty_room_ids: Vec<String> = {
830 let mut empty_ids = Vec::new();
831 for (id, room) in rooms.iter() {
832 if room.is_empty().await {
833 empty_ids.push(id.clone());
834 }
835 }
836 empty_ids
837 };
838
839 for id in empty_room_ids {
840 rooms.remove(&id);
841 }
842 }
843
844 pub async fn join_room(
865 &self,
866 room_id: String,
867 connection: Arc<WebSocketConnection>,
868 ) -> RoomResult<()> {
869 let room = self
870 .get_room(&room_id)
871 .await
872 .ok_or_else(|| RoomError::RoomNotFound(room_id.clone()))?;
873
874 let client_id = connection.id().to_string();
875 room.join(client_id, connection).await
876 }
877
878 pub async fn leave_room(&self, room_id: &str, user_id: &str) -> RoomResult<()> {
900 let room = self
901 .get_room(room_id)
902 .await
903 .ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
904
905 room.leave(user_id).await
906 }
907
908 pub async fn get_room_size(&self, room_id: &str) -> usize {
930 if let Some(room) = self.get_room(room_id).await {
931 room.client_count().await
932 } else {
933 0
934 }
935 }
936
937 pub async fn broadcast_to_room(
966 &self,
967 room_id: &str,
968 message: Message,
969 ) -> RoomResult<BroadcastResult> {
970 let room = self
971 .get_room(room_id)
972 .await
973 .ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
974
975 Ok(room.broadcast(message).await)
976 }
977
978 pub async fn broadcast_to_room_with_timeout(
1012 &self,
1013 room_id: &str,
1014 message: Message,
1015 send_timeout: Duration,
1016 ) -> RoomResult<BroadcastResult> {
1017 let room = self
1018 .get_room(room_id)
1019 .await
1020 .ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
1021
1022 Ok(room.broadcast_with_timeout(message, send_timeout).await)
1023 }
1024
1025 pub async fn broadcast_to_all(&self, message: Message) -> BroadcastResult {
1058 let rooms = self.rooms.read().await;
1059
1060 let mut successful = Vec::new();
1061 let mut failed = Vec::new();
1062
1063 for room in rooms.values() {
1064 let result = room.broadcast(message.clone()).await;
1065 successful.extend(result.successful);
1066 failed.extend(result.failed);
1067 }
1068
1069 BroadcastResult { successful, failed }
1070 }
1071
1072 pub async fn get_all_rooms(&self) -> Vec<String> {
1091 self.room_ids().await
1092 }
1093}
1094
1095impl Default for RoomManager {
1096 fn default() -> Self {
1097 Self::new()
1098 }
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103 use super::*;
1104 use rstest::rstest;
1105 use tokio::sync::mpsc;
1106
1107 #[rstest]
1108 #[tokio::test]
1109 async fn test_room_new() {
1110 let room = Room::new("test_room".to_string());
1111 assert_eq!(room.id(), "test_room");
1112 assert_eq!(room.client_count().await, 0);
1113 assert!(room.is_empty().await);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_room_join_client() {
1118 let room = Room::new("join_test".to_string());
1119 let (tx, _rx) = mpsc::unbounded_channel();
1120 let client = Arc::new(WebSocketConnection::new("client1".to_string(), tx));
1121
1122 room.join("client1".to_string(), client).await.unwrap();
1123 assert_eq!(room.client_count().await, 1);
1124 assert!(room.has_client("client1").await);
1125 }
1126
1127 #[tokio::test]
1128 async fn test_room_join_duplicate_client() {
1129 let room = Room::new("duplicate_test".to_string());
1130 let (tx1, _rx1) = mpsc::unbounded_channel();
1131 let (tx2, _rx2) = mpsc::unbounded_channel();
1132
1133 let client1 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx1));
1134 let client2 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx2));
1135
1136 room.join("duplicate".to_string(), client1).await.unwrap();
1137 let result = room.join("duplicate".to_string(), client2).await;
1138
1139 assert!(result.is_err());
1140 assert!(matches!(
1141 result.unwrap_err(),
1142 RoomError::ClientAlreadyExists(_)
1143 ));
1144 }
1145
1146 #[tokio::test]
1147 async fn test_room_leave_client() {
1148 let room = Room::new("leave_test".to_string());
1149 let (tx, _rx) = mpsc::unbounded_channel();
1150 let client = Arc::new(WebSocketConnection::new("leaver".to_string(), tx));
1151
1152 room.join("leaver".to_string(), client).await.unwrap();
1153 assert!(room.has_client("leaver").await);
1154
1155 room.leave("leaver").await.unwrap();
1156 assert!(!room.has_client("leaver").await);
1157 assert_eq!(room.client_count().await, 0);
1158 }
1159
1160 #[tokio::test]
1161 async fn test_room_leave_nonexistent_client() {
1162 let room = Room::new("leave_error_test".to_string());
1163 let result = room.leave("nonexistent").await;
1164
1165 assert!(result.is_err());
1166 assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1167 }
1168
1169 #[rstest]
1170 #[tokio::test]
1171 async fn test_room_broadcast() {
1172 let room = Room::new("broadcast_test".to_string());
1174
1175 let (tx1, mut rx1) = mpsc::unbounded_channel();
1176 let (tx2, mut rx2) = mpsc::unbounded_channel();
1177 let (tx3, mut rx3) = mpsc::unbounded_channel();
1178
1179 let client1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
1180 let client2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
1181 let client3 = Arc::new(WebSocketConnection::new("user3".to_string(), tx3));
1182
1183 room.join("user1".to_string(), client1).await.unwrap();
1184 room.join("user2".to_string(), client2).await.unwrap();
1185 room.join("user3".to_string(), client3).await.unwrap();
1186
1187 let msg = Message::text("Broadcast message".to_string());
1189 let result = room.broadcast(msg).await;
1190
1191 assert!(result.is_complete_success());
1193 assert_eq!(result.successful.len(), 3);
1194 assert_eq!(result.failure_count(), 0);
1195 assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1196 assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1197 assert!(matches!(rx3.try_recv(), Ok(Message::Text { .. })));
1198 }
1199
1200 #[tokio::test]
1201 async fn test_room_send_to_specific_client() {
1202 let room = Room::new("private_msg_test".to_string());
1203
1204 let (tx1, mut rx1) = mpsc::unbounded_channel();
1205 let (tx2, mut rx2) = mpsc::unbounded_channel();
1206
1207 let client1 = Arc::new(WebSocketConnection::new("target".to_string(), tx1));
1208 let client2 = Arc::new(WebSocketConnection::new("other".to_string(), tx2));
1209
1210 room.join("target".to_string(), client1).await.unwrap();
1211 room.join("other".to_string(), client2).await.unwrap();
1212
1213 let msg = Message::text("Private message".to_string());
1214 room.send_to("target", msg).await.unwrap();
1215
1216 assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1217 assert!(rx2.try_recv().is_err());
1218 }
1219
1220 #[tokio::test]
1221 async fn test_room_send_to_nonexistent_client() {
1222 let room = Room::new("send_error_test".to_string());
1223 let msg = Message::text("Test".to_string());
1224 let result = room.send_to("nonexistent", msg).await;
1225
1226 assert!(result.is_err());
1227 assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1228 }
1229
1230 #[tokio::test]
1231 async fn test_room_client_ids() {
1232 let room = Room::new("ids_test".to_string());
1233
1234 let (tx1, _rx1) = mpsc::unbounded_channel();
1235 let (tx2, _rx2) = mpsc::unbounded_channel();
1236
1237 let client1 = Arc::new(WebSocketConnection::new("alpha".to_string(), tx1));
1238 let client2 = Arc::new(WebSocketConnection::new("beta".to_string(), tx2));
1239
1240 room.join("alpha".to_string(), client1).await.unwrap();
1241 room.join("beta".to_string(), client2).await.unwrap();
1242
1243 let ids = room.client_ids().await;
1244 assert_eq!(ids.len(), 2);
1245 assert!(ids.contains(&"alpha".to_string()));
1246 assert!(ids.contains(&"beta".to_string()));
1247 }
1248
1249 #[tokio::test]
1250 async fn test_room_metadata_set_and_get() {
1251 use serde_json::json;
1252
1253 let room = Room::new("metadata_test".to_string());
1254
1255 room.set_metadata("max_users", json!(100)).await.unwrap();
1256 room.set_metadata("topic", json!("General Chat"))
1257 .await
1258 .unwrap();
1259
1260 let max_users: i64 = room.get_metadata("max_users").await.unwrap().unwrap();
1261 assert_eq!(max_users, 100);
1262
1263 let topic: String = room.get_metadata("topic").await.unwrap().unwrap();
1264 assert_eq!(topic, "General Chat");
1265 }
1266
1267 #[tokio::test]
1268 async fn test_room_metadata_get_nonexistent() {
1269 let room = Room::new("metadata_missing_test".to_string());
1270 let result: Option<String> = room.get_metadata("nonexistent").await.unwrap();
1271 assert!(result.is_none());
1272 }
1273
1274 #[tokio::test]
1275 async fn test_room_metadata_remove() {
1276 use serde_json::json;
1277
1278 let room = Room::new("metadata_remove_test".to_string());
1279
1280 room.set_metadata("temp", json!("value")).await.unwrap();
1281 assert!(room.get_metadata::<String>("temp").await.unwrap().is_some());
1282
1283 room.remove_metadata("temp").await;
1284 assert!(room.get_metadata::<String>("temp").await.unwrap().is_none());
1285 }
1286
1287 #[tokio::test]
1288 async fn test_room_metadata_clear() {
1289 use serde_json::json;
1290
1291 let room = Room::new("metadata_clear_test".to_string());
1292
1293 room.set_metadata("key1", json!("value1")).await.unwrap();
1294 room.set_metadata("key2", json!("value2")).await.unwrap();
1295
1296 room.clear_metadata().await;
1297
1298 assert!(room.get_metadata::<String>("key1").await.unwrap().is_none());
1299 assert!(room.get_metadata::<String>("key2").await.unwrap().is_none());
1300 }
1301
1302 #[tokio::test]
1303 async fn test_room_is_empty() {
1304 let room = Room::new("empty_test".to_string());
1305 assert!(room.is_empty().await);
1306
1307 let (tx, _rx) = mpsc::unbounded_channel();
1308 let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1309
1310 room.join("user".to_string(), client).await.unwrap();
1311 assert!(!room.is_empty().await);
1312
1313 room.leave("user").await.unwrap();
1314 assert!(room.is_empty().await);
1315 }
1316
1317 #[tokio::test]
1318 async fn test_room_manager_new() {
1319 let manager = RoomManager::new();
1320 assert_eq!(manager.room_count().await, 0);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_room_manager_create_room() {
1325 let manager = RoomManager::new();
1326 let room = manager.create_room("new_room".to_string()).await;
1327
1328 assert_eq!(room.id(), "new_room");
1329 assert_eq!(manager.room_count().await, 1);
1330 }
1331
1332 #[tokio::test]
1333 async fn test_room_manager_get_room() {
1334 let manager = RoomManager::new();
1335 manager.create_room("existing".to_string()).await;
1336
1337 let room = manager.get_room("existing").await;
1338 assert!(room.is_some());
1339 assert_eq!(room.unwrap().id(), "existing");
1340
1341 let missing = manager.get_room("missing").await;
1342 assert!(missing.is_none());
1343 }
1344
1345 #[tokio::test]
1346 async fn test_room_manager_get_or_create_room() {
1347 let manager = RoomManager::new();
1348
1349 let room1 = manager.get_or_create_room("auto_room".to_string()).await;
1350 assert_eq!(manager.room_count().await, 1);
1351
1352 let room2 = manager.get_or_create_room("auto_room".to_string()).await;
1353 assert_eq!(manager.room_count().await, 1);
1354
1355 assert_eq!(room1.id(), room2.id());
1356 }
1357
1358 #[tokio::test]
1359 async fn test_room_manager_delete_room() {
1360 let manager = RoomManager::new();
1361 manager.create_room("to_delete".to_string()).await;
1362
1363 assert!(manager.has_room("to_delete").await);
1364
1365 manager.delete_room("to_delete").await.unwrap();
1366 assert!(!manager.has_room("to_delete").await);
1367 }
1368
1369 #[tokio::test]
1370 async fn test_room_manager_delete_nonexistent_room() {
1371 let manager = RoomManager::new();
1372 let result = manager.delete_room("nonexistent").await;
1373
1374 assert!(result.is_err());
1375 assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1376 }
1377
1378 #[tokio::test]
1379 async fn test_room_manager_room_ids() {
1380 let manager = RoomManager::new();
1381
1382 manager.create_room("room1".to_string()).await;
1383 manager.create_room("room2".to_string()).await;
1384 manager.create_room("room3".to_string()).await;
1385
1386 let ids = manager.room_ids().await;
1387 assert_eq!(ids.len(), 3);
1388 assert!(ids.contains(&"room1".to_string()));
1389 assert!(ids.contains(&"room2".to_string()));
1390 assert!(ids.contains(&"room3".to_string()));
1391 }
1392
1393 #[tokio::test]
1394 async fn test_room_manager_has_room() {
1395 let manager = RoomManager::new();
1396 manager.create_room("check".to_string()).await;
1397
1398 assert!(manager.has_room("check").await);
1399 assert!(!manager.has_room("missing").await);
1400 }
1401
1402 #[tokio::test]
1403 async fn test_room_manager_cleanup_empty_rooms() {
1404 let manager = RoomManager::new();
1405
1406 let _empty_room = manager.create_room("empty".to_string()).await;
1407 let occupied_room = manager.create_room("occupied".to_string()).await;
1408
1409 let (tx, _rx) = mpsc::unbounded_channel();
1410 let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1411 occupied_room
1412 .join("user".to_string(), client)
1413 .await
1414 .unwrap();
1415
1416 manager.cleanup_empty_rooms().await;
1417
1418 assert!(!manager.has_room("empty").await);
1419 assert!(manager.has_room("occupied").await);
1420 assert_eq!(manager.room_count().await, 1);
1421 }
1422
1423 #[rstest]
1424 #[tokio::test]
1425 async fn test_broadcast_partial_failure_reports_failed_clients() {
1426 let room = Room::new("partial_fail".to_string());
1428
1429 let (tx_alive, mut rx_alive) = mpsc::unbounded_channel();
1430 let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1431
1432 let alive_client = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1433 let dead_client = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1434
1435 room.join("alive".to_string(), alive_client).await.unwrap();
1436 room.join("dead".to_string(), dead_client).await.unwrap();
1437
1438 drop(_rx_dead);
1440
1441 let msg = Message::text("test message".to_string());
1443 let result = room.broadcast(msg).await;
1444
1445 assert!(!result.is_complete_success());
1447 assert!(result.is_partial_success());
1448 assert!(!result.is_complete_failure());
1449 assert_eq!(result.successful.len(), 1);
1450 assert!(result.successful.contains(&"alive".to_string()));
1451 assert_eq!(result.failure_count(), 1);
1452 assert!(result.failed_client_ids().contains(&"dead"));
1453 assert!(matches!(rx_alive.try_recv(), Ok(Message::Text { .. })));
1454 }
1455
1456 #[rstest]
1457 #[tokio::test]
1458 async fn test_broadcast_partial_failure_removes_dead_connections() {
1459 let room = Room::new("cleanup".to_string());
1461
1462 let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1463 let (tx_dead1, _rx_dead1) = mpsc::unbounded_channel::<Message>();
1464 let (tx_dead2, _rx_dead2) = mpsc::unbounded_channel::<Message>();
1465
1466 let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1467 let dead1 = Arc::new(WebSocketConnection::new("dead1".to_string(), tx_dead1));
1468 let dead2 = Arc::new(WebSocketConnection::new("dead2".to_string(), tx_dead2));
1469
1470 room.join("alive".to_string(), alive).await.unwrap();
1471 room.join("dead1".to_string(), dead1).await.unwrap();
1472 room.join("dead2".to_string(), dead2).await.unwrap();
1473 assert_eq!(room.client_count().await, 3);
1474
1475 drop(_rx_dead1);
1477 drop(_rx_dead2);
1478
1479 let msg = Message::text("cleanup test".to_string());
1481 let result = room.broadcast(msg).await;
1482
1483 assert_eq!(result.failure_count(), 2);
1485 assert_eq!(room.client_count().await, 1);
1486 assert!(room.has_client("alive").await);
1487 assert!(!room.has_client("dead1").await);
1488 assert!(!room.has_client("dead2").await);
1489 }
1490
1491 #[rstest]
1492 #[tokio::test]
1493 async fn test_broadcast_complete_failure() {
1494 let room = Room::new("all_dead".to_string());
1496
1497 let (tx1, _rx1) = mpsc::unbounded_channel::<Message>();
1498 let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1499
1500 let client1 = Arc::new(WebSocketConnection::new("c1".to_string(), tx1));
1501 let client2 = Arc::new(WebSocketConnection::new("c2".to_string(), tx2));
1502
1503 room.join("c1".to_string(), client1).await.unwrap();
1504 room.join("c2".to_string(), client2).await.unwrap();
1505
1506 drop(_rx1);
1508 drop(_rx2);
1509
1510 let msg = Message::text("nobody home".to_string());
1512 let result = room.broadcast(msg).await;
1513
1514 assert!(result.is_complete_failure());
1516 assert!(!result.is_partial_success());
1517 assert_eq!(result.failure_count(), 2);
1518 assert!(result.successful.is_empty());
1519 assert_eq!(room.client_count().await, 0);
1520 }
1521
1522 #[rstest]
1523 #[tokio::test]
1524 async fn test_broadcast_empty_room() {
1525 let room = Room::new("empty_broadcast".to_string());
1527
1528 let msg = Message::text("echo".to_string());
1530 let result = room.broadcast(msg).await;
1531
1532 assert!(result.is_complete_success());
1534 assert!(result.successful.is_empty());
1535 assert!(result.failed.is_empty());
1536 }
1537
1538 #[rstest]
1539 #[tokio::test]
1540 async fn test_broadcast_to_room_returns_broadcast_result() {
1541 let manager = RoomManager::new();
1543 manager.create_room("results".to_string()).await;
1544
1545 let (tx1, _rx1) = mpsc::unbounded_channel();
1546 let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1547 let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1548 let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1549
1550 manager
1551 .join_room("results".to_string(), conn1)
1552 .await
1553 .unwrap();
1554 manager
1555 .join_room("results".to_string(), conn2)
1556 .await
1557 .unwrap();
1558
1559 drop(_rx2);
1561
1562 let msg = Message::text("room broadcast".to_string());
1564 let result = manager.broadcast_to_room("results", msg).await.unwrap();
1565
1566 assert!(result.is_partial_success());
1568 assert_eq!(result.successful.len(), 1);
1569 assert_eq!(result.failure_count(), 1);
1570 }
1571
1572 #[rstest]
1573 #[tokio::test]
1574 async fn test_broadcast_to_all_aggregates_results() {
1575 let manager = RoomManager::new();
1577 let room1 = manager.create_room("r1".to_string()).await;
1578 let room2 = manager.create_room("r2".to_string()).await;
1579
1580 let (tx1, _rx1) = mpsc::unbounded_channel();
1581 let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1582 let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1583 let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1584
1585 room1.join("ok".to_string(), conn1).await.unwrap();
1586 room2.join("dead".to_string(), conn2).await.unwrap();
1587
1588 drop(_rx2);
1590
1591 let msg = Message::text("global".to_string());
1593 let result = manager.broadcast_to_all(msg).await;
1594
1595 assert!(result.is_partial_success());
1597 assert_eq!(result.successful.len(), 1);
1598 assert_eq!(result.failure_count(), 1);
1599 assert!(result.successful.contains(&"ok".to_string()));
1600 assert!(result.failed_client_ids().contains(&"dead"));
1601 }
1602
1603 #[rstest]
1604 #[tokio::test]
1605 async fn test_broadcast_with_timeout_succeeds_for_responsive_clients() {
1606 let room = Room::new("timeout_ok".to_string());
1608
1609 let (tx1, mut rx1) = mpsc::unbounded_channel();
1610 let (tx2, mut rx2) = mpsc::unbounded_channel();
1611
1612 let client1 = Arc::new(WebSocketConnection::new("fast1".to_string(), tx1));
1613 let client2 = Arc::new(WebSocketConnection::new("fast2".to_string(), tx2));
1614
1615 room.join("fast1".to_string(), client1).await.unwrap();
1616 room.join("fast2".to_string(), client2).await.unwrap();
1617
1618 let msg = Message::text("hello with timeout".to_string());
1620 let result = room
1621 .broadcast_with_timeout(msg, Duration::from_secs(5))
1622 .await;
1623
1624 assert!(result.is_complete_success());
1626 assert_eq!(result.successful.len(), 2);
1627 assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1628 assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1629 }
1630
1631 #[rstest]
1632 #[tokio::test]
1633 async fn test_broadcast_with_timeout_removes_dead_connections() {
1634 let room = Room::new("timeout_dead".to_string());
1636
1637 let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1638 let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1639
1640 let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1641 let dead = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1642
1643 room.join("alive".to_string(), alive).await.unwrap();
1644 room.join("dead".to_string(), dead).await.unwrap();
1645
1646 drop(_rx_dead);
1648
1649 let msg = Message::text("test".to_string());
1651 let result = room
1652 .broadcast_with_timeout(msg, Duration::from_secs(5))
1653 .await;
1654
1655 assert!(result.is_partial_success());
1657 assert_eq!(result.successful.len(), 1);
1658 assert_eq!(result.failure_count(), 1);
1659 assert_eq!(room.client_count().await, 1);
1660 assert!(room.has_client("alive").await);
1661 assert!(!room.has_client("dead").await);
1662 }
1663
1664 #[rstest]
1665 #[tokio::test]
1666 async fn test_broadcast_with_timeout_empty_room() {
1667 let room = Room::new("timeout_empty".to_string());
1669
1670 let msg = Message::text("nobody".to_string());
1672 let result = room
1673 .broadcast_with_timeout(msg, Duration::from_secs(1))
1674 .await;
1675
1676 assert!(result.is_complete_success());
1678 assert!(result.successful.is_empty());
1679 assert!(result.failed.is_empty());
1680 }
1681
1682 #[rstest]
1683 #[tokio::test]
1684 async fn test_broadcast_to_room_with_timeout() {
1685 let manager = RoomManager::new();
1687 manager.create_room("timeout_room".to_string()).await;
1688
1689 let (tx, _rx) = mpsc::unbounded_channel();
1690 let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
1691
1692 manager
1693 .join_room("timeout_room".to_string(), conn)
1694 .await
1695 .unwrap();
1696
1697 let msg = Message::text("hello".to_string());
1699 let result = manager
1700 .broadcast_to_room_with_timeout("timeout_room", msg, Duration::from_secs(5))
1701 .await
1702 .unwrap();
1703
1704 assert!(result.is_complete_success());
1706 assert_eq!(result.successful.len(), 1);
1707 }
1708
1709 #[rstest]
1710 #[tokio::test]
1711 async fn test_broadcast_to_room_with_timeout_nonexistent_room() {
1712 let manager = RoomManager::new();
1714
1715 let msg = Message::text("hello".to_string());
1717 let result = manager
1718 .broadcast_to_room_with_timeout("missing", msg, Duration::from_secs(1))
1719 .await;
1720
1721 assert!(result.is_err());
1723 assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1724 }
1725}