Skip to main content

reinhardt_websockets/
room.rs

1//! WebSocket room management with advanced features
2//!
3//! This module provides room-based WebSocket connection management,
4//! including client tracking, metadata storage, and targeted messaging.
5
6use crate::connection::{Message, WebSocketConnection, WebSocketError};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12
13/// Error types for room operations
14#[derive(Debug, thiserror::Error)]
15pub enum RoomError {
16	#[error("Client not found")]
17	ClientNotFound(String),
18	#[error("Room not found")]
19	RoomNotFound(String),
20	#[error("Client already exists")]
21	ClientAlreadyExists(String),
22	#[error("WebSocket error")]
23	WebSocket(#[from] WebSocketError),
24	#[error("Metadata error")]
25	Metadata(String),
26}
27
28pub type RoomResult<T> = Result<T, RoomError>;
29
30/// Result of a broadcast operation that tracks individual send outcomes.
31///
32/// This provides detailed information about which clients received the message
33/// and which failed, enabling callers to take appropriate action (e.g., logging,
34/// notifying administrators, or retrying).
35///
36/// Dead connections that fail during broadcast are automatically removed from
37/// the room.
38#[derive(Debug)]
39pub struct BroadcastResult {
40	/// Client IDs that received the message successfully
41	pub successful: Vec<String>,
42	/// Client IDs that failed with their respective errors
43	pub failed: Vec<(String, WebSocketError)>,
44}
45
46impl BroadcastResult {
47	/// Returns `true` if all sends were successful (no failures).
48	pub fn is_complete_success(&self) -> bool {
49		self.failed.is_empty()
50	}
51
52	/// Returns `true` if at least one send was successful.
53	pub fn is_partial_success(&self) -> bool {
54		!self.successful.is_empty()
55	}
56
57	/// Returns `true` if all sends failed.
58	pub fn is_complete_failure(&self) -> bool {
59		self.successful.is_empty() && !self.failed.is_empty()
60	}
61
62	/// Returns the number of failed sends.
63	pub fn failure_count(&self) -> usize {
64		self.failed.len()
65	}
66
67	/// Returns the IDs of clients that failed to receive the message.
68	pub fn failed_client_ids(&self) -> Vec<&str> {
69		self.failed.iter().map(|(id, _)| id.as_str()).collect()
70	}
71}
72
73/// A WebSocket room that manages multiple client connections
74///
75/// # Examples
76///
77/// ```
78/// use reinhardt_websockets::room::Room;
79/// use reinhardt_websockets::WebSocketConnection;
80/// use tokio::sync::mpsc;
81/// use std::sync::Arc;
82///
83/// # tokio_test::block_on(async {
84/// let room = Room::new("chat_room".to_string());
85///
86/// let (tx, _rx) = mpsc::unbounded_channel();
87/// let client = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
88///
89/// room.join("user1".to_string(), client).await.unwrap();
90/// assert_eq!(room.client_count().await, 1);
91/// # });
92/// ```
93pub struct Room {
94	id: String,
95	clients: Arc<RwLock<HashMap<String, Arc<WebSocketConnection>>>>,
96	metadata: Arc<RwLock<HashMap<String, Value>>>,
97}
98
99impl Room {
100	/// Create a new room with the given ID
101	///
102	/// # Examples
103	///
104	/// ```
105	/// use reinhardt_websockets::room::Room;
106	///
107	/// let room = Room::new("general".to_string());
108	/// assert_eq!(room.id(), "general");
109	/// ```
110	pub fn new(id: String) -> Self {
111		Self {
112			id,
113			clients: Arc::new(RwLock::new(HashMap::new())),
114			metadata: Arc::new(RwLock::new(HashMap::new())),
115		}
116	}
117
118	/// Get the room ID
119	///
120	/// # Examples
121	///
122	/// ```
123	/// use reinhardt_websockets::room::Room;
124	///
125	/// let room = Room::new("lobby".to_string());
126	/// assert_eq!(room.id(), "lobby");
127	/// ```
128	pub fn id(&self) -> &str {
129		&self.id
130	}
131
132	/// Add a client to the room
133	///
134	/// # Examples
135	///
136	/// ```
137	/// use reinhardt_websockets::room::Room;
138	/// use reinhardt_websockets::WebSocketConnection;
139	/// use tokio::sync::mpsc;
140	/// use std::sync::Arc;
141	///
142	/// # tokio_test::block_on(async {
143	/// let room = Room::new("chat".to_string());
144	/// let (tx, _rx) = mpsc::unbounded_channel();
145	/// let client = Arc::new(WebSocketConnection::new("alice".to_string(), tx));
146	///
147	/// room.join("alice".to_string(), client).await.unwrap();
148	/// assert!(room.has_client("alice").await);
149	/// # });
150	/// ```
151	pub async fn join(
152		&self,
153		client_id: String,
154		client: Arc<WebSocketConnection>,
155	) -> RoomResult<()> {
156		let mut clients = self.clients.write().await;
157
158		if clients.contains_key(&client_id) {
159			return Err(RoomError::ClientAlreadyExists(client_id));
160		}
161
162		clients.insert(client_id, client);
163		Ok(())
164	}
165
166	/// Remove a client from the room
167	///
168	/// # Examples
169	///
170	/// ```
171	/// use reinhardt_websockets::room::Room;
172	/// use reinhardt_websockets::WebSocketConnection;
173	/// use tokio::sync::mpsc;
174	/// use std::sync::Arc;
175	///
176	/// # tokio_test::block_on(async {
177	/// let room = Room::new("chat".to_string());
178	/// let (tx, _rx) = mpsc::unbounded_channel();
179	/// let client = Arc::new(WebSocketConnection::new("bob".to_string(), tx));
180	///
181	/// room.join("bob".to_string(), client).await.unwrap();
182	/// assert!(room.has_client("bob").await);
183	///
184	/// room.leave("bob").await.unwrap();
185	/// assert!(!room.has_client("bob").await);
186	/// # });
187	/// ```
188	pub async fn leave(&self, client_id: &str) -> RoomResult<()> {
189		let mut clients = self.clients.write().await;
190
191		clients
192			.remove(client_id)
193			.ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?;
194
195		Ok(())
196	}
197
198	/// Broadcast a message to all clients in the room.
199	///
200	/// Returns a [`BroadcastResult`] that reports which clients received the
201	/// message and which failed. Dead connections that fail during broadcast
202	/// are automatically removed from the room.
203	///
204	/// # Examples
205	///
206	/// ```
207	/// use reinhardt_websockets::room::Room;
208	/// use reinhardt_websockets::{WebSocketConnection, Message};
209	/// use tokio::sync::mpsc;
210	/// use std::sync::Arc;
211	///
212	/// # tokio_test::block_on(async {
213	/// let room = Room::new("chat".to_string());
214	///
215	/// let (tx1, _rx1) = mpsc::unbounded_channel();
216	/// let (tx2, _rx2) = mpsc::unbounded_channel();
217	/// let client1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
218	/// let client2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
219	///
220	/// room.join("user1".to_string(), client1).await.unwrap();
221	/// room.join("user2".to_string(), client2).await.unwrap();
222	///
223	/// let msg = Message::text("Hello everyone!".to_string());
224	/// let result = room.broadcast(msg).await;
225	///
226	/// assert!(result.is_complete_success());
227	/// assert_eq!(result.successful.len(), 2);
228	/// # });
229	/// ```
230	pub async fn broadcast(&self, message: Message) -> BroadcastResult {
231		let clients = self.clients.read().await;
232
233		let mut successful = Vec::new();
234		let mut failed = Vec::new();
235
236		for (client_id, client) in clients.iter() {
237			match client.send(message.clone()).await {
238				Ok(()) => successful.push(client_id.clone()),
239				Err(e) => failed.push((client_id.clone(), e)),
240			}
241		}
242
243		// Drop read lock before acquiring write lock
244		drop(clients);
245
246		// Automatically remove dead connections from the room
247		if !failed.is_empty() {
248			let mut clients_write = self.clients.write().await;
249			for (client_id, _) in &failed {
250				clients_write.remove(client_id);
251			}
252		}
253
254		BroadcastResult { successful, failed }
255	}
256
257	/// Broadcasts a message to all clients with a per-client send timeout.
258	///
259	/// Slow consumers that do not accept the message within the given timeout
260	/// are treated as failed and automatically removed from the room, applying
261	/// backpressure to prevent slow receivers from blocking the entire broadcast.
262	///
263	/// # Arguments
264	///
265	/// * `message` - The message to broadcast
266	/// * `send_timeout` - Maximum time to wait for each client to accept the message
267	///
268	/// # Examples
269	///
270	/// ```
271	/// use reinhardt_websockets::room::Room;
272	/// use reinhardt_websockets::{WebSocketConnection, Message};
273	/// use tokio::sync::mpsc;
274	/// use std::sync::Arc;
275	/// use std::time::Duration;
276	///
277	/// # tokio_test::block_on(async {
278	/// let room = Room::new("chat".to_string());
279	///
280	/// let (tx, _rx) = mpsc::unbounded_channel();
281	/// let client = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
282	///
283	/// room.join("user1".to_string(), client).await.unwrap();
284	///
285	/// let msg = Message::text("Hello!".to_string());
286	/// let result = room.broadcast_with_timeout(msg, Duration::from_secs(5)).await;
287	///
288	/// assert!(result.is_complete_success());
289	/// # });
290	/// ```
291	pub async fn broadcast_with_timeout(
292		&self,
293		message: Message,
294		send_timeout: Duration,
295	) -> BroadcastResult {
296		let clients = self.clients.read().await;
297
298		let mut successful = Vec::new();
299		let mut failed = Vec::new();
300
301		for (client_id, client) in clients.iter() {
302			let send_future = client.send(message.clone());
303			match tokio::time::timeout(send_timeout, send_future).await {
304				Ok(Ok(())) => successful.push(client_id.clone()),
305				Ok(Err(e)) => failed.push((client_id.clone(), e)),
306				Err(_elapsed) => {
307					failed.push((
308						client_id.clone(),
309						WebSocketError::SlowConsumer(send_timeout),
310					));
311				}
312			}
313		}
314
315		// Drop read lock before acquiring write lock
316		drop(clients);
317
318		// Automatically remove failed connections from the room
319		if !failed.is_empty() {
320			let mut clients_write = self.clients.write().await;
321			for (client_id, _) in &failed {
322				clients_write.remove(client_id);
323			}
324		}
325
326		BroadcastResult { successful, failed }
327	}
328
329	/// Send a message to a specific client
330	///
331	/// # Examples
332	///
333	/// ```
334	/// use reinhardt_websockets::room::Room;
335	/// use reinhardt_websockets::{WebSocketConnection, Message};
336	/// use tokio::sync::mpsc;
337	/// use std::sync::Arc;
338	///
339	/// # tokio_test::block_on(async {
340	/// let room = Room::new("private".to_string());
341	///
342	/// let (tx, mut rx) = mpsc::unbounded_channel();
343	/// let client = Arc::new(WebSocketConnection::new("charlie".to_string(), tx));
344	///
345	/// room.join("charlie".to_string(), client).await.unwrap();
346	///
347	/// let msg = Message::text("Private message".to_string());
348	/// room.send_to("charlie", msg).await.unwrap();
349	///
350	/// let received = rx.recv().await.unwrap();
351	/// match received {
352	///     Message::Text { data } => assert_eq!(data, "Private message"),
353	///     _ => panic!("Expected text message"),
354	/// }
355	/// # });
356	/// ```
357	pub async fn send_to(&self, client_id: &str, message: Message) -> RoomResult<()> {
358		let clients = self.clients.read().await;
359
360		let client = clients
361			.get(client_id)
362			.ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?;
363
364		client.send(message).await?;
365
366		Ok(())
367	}
368
369	/// Get the number of clients in the room
370	///
371	/// # Examples
372	///
373	/// ```
374	/// use reinhardt_websockets::room::Room;
375	/// use reinhardt_websockets::WebSocketConnection;
376	/// use tokio::sync::mpsc;
377	/// use std::sync::Arc;
378	///
379	/// # tokio_test::block_on(async {
380	/// let room = Room::new("game".to_string());
381	/// assert_eq!(room.client_count().await, 0);
382	///
383	/// let (tx1, _rx1) = mpsc::unbounded_channel();
384	/// let (tx2, _rx2) = mpsc::unbounded_channel();
385	///
386	/// let client1 = Arc::new(WebSocketConnection::new("player1".to_string(), tx1));
387	/// let client2 = Arc::new(WebSocketConnection::new("player2".to_string(), tx2));
388	///
389	/// room.join("player1".to_string(), client1).await.unwrap();
390	/// room.join("player2".to_string(), client2).await.unwrap();
391	///
392	/// assert_eq!(room.client_count().await, 2);
393	/// # });
394	/// ```
395	pub async fn client_count(&self) -> usize {
396		let clients = self.clients.read().await;
397		clients.len()
398	}
399
400	/// Get all client IDs in the room
401	///
402	/// # Examples
403	///
404	/// ```
405	/// use reinhardt_websockets::room::Room;
406	/// use reinhardt_websockets::WebSocketConnection;
407	/// use tokio::sync::mpsc;
408	/// use std::sync::Arc;
409	///
410	/// # tokio_test::block_on(async {
411	/// let room = Room::new("meeting".to_string());
412	///
413	/// let (tx1, _rx1) = mpsc::unbounded_channel();
414	/// let (tx2, _rx2) = mpsc::unbounded_channel();
415	///
416	/// let client1 = Arc::new(WebSocketConnection::new("dave".to_string(), tx1));
417	/// let client2 = Arc::new(WebSocketConnection::new("eve".to_string(), tx2));
418	///
419	/// room.join("dave".to_string(), client1).await.unwrap();
420	/// room.join("eve".to_string(), client2).await.unwrap();
421	///
422	/// let ids = room.client_ids().await;
423	/// assert_eq!(ids.len(), 2);
424	/// assert!(ids.contains(&"dave".to_string()));
425	/// assert!(ids.contains(&"eve".to_string()));
426	/// # });
427	/// ```
428	pub async fn client_ids(&self) -> Vec<String> {
429		let clients = self.clients.read().await;
430		clients.keys().cloned().collect()
431	}
432
433	/// Check if a client is in the room
434	///
435	/// # Examples
436	///
437	/// ```
438	/// use reinhardt_websockets::room::Room;
439	/// use reinhardt_websockets::WebSocketConnection;
440	/// use tokio::sync::mpsc;
441	/// use std::sync::Arc;
442	///
443	/// # tokio_test::block_on(async {
444	/// let room = Room::new("support".to_string());
445	///
446	/// let (tx, _rx) = mpsc::unbounded_channel();
447	/// let client = Arc::new(WebSocketConnection::new("frank".to_string(), tx));
448	///
449	/// room.join("frank".to_string(), client).await.unwrap();
450	///
451	/// assert!(room.has_client("frank").await);
452	/// assert!(!room.has_client("grace").await);
453	/// # });
454	/// ```
455	pub async fn has_client(&self, client_id: &str) -> bool {
456		let clients = self.clients.read().await;
457		clients.contains_key(client_id)
458	}
459
460	/// Set metadata for the room
461	///
462	/// # Examples
463	///
464	/// ```
465	/// use reinhardt_websockets::room::Room;
466	/// use serde_json::json;
467	///
468	/// # tokio_test::block_on(async {
469	/// let room = Room::new("config".to_string());
470	///
471	/// room.set_metadata("max_users", json!(10)).await.unwrap();
472	/// room.set_metadata("topic", json!("General Discussion")).await.unwrap();
473	///
474	/// let max_users: i64 = room.get_metadata("max_users").await.unwrap().unwrap();
475	/// assert_eq!(max_users, 10);
476	/// # });
477	/// ```
478	pub async fn set_metadata<T: serde::Serialize>(&self, key: &str, value: T) -> RoomResult<()> {
479		let json_value =
480			serde_json::to_value(value).map_err(|e| RoomError::Metadata(e.to_string()))?;
481
482		let mut metadata = self.metadata.write().await;
483		metadata.insert(key.to_string(), json_value);
484
485		Ok(())
486	}
487
488	/// Get metadata from the room
489	///
490	/// # Examples
491	///
492	/// ```
493	/// use reinhardt_websockets::room::Room;
494	/// use serde_json::json;
495	///
496	/// # tokio_test::block_on(async {
497	/// let room = Room::new("data".to_string());
498	///
499	/// room.set_metadata("counter", json!(42)).await.unwrap();
500	///
501	/// let counter: i64 = room.get_metadata("counter").await.unwrap().unwrap();
502	/// assert_eq!(counter, 42);
503	///
504	/// let missing: Option<String> = room.get_metadata("nonexistent").await.unwrap();
505	/// assert!(missing.is_none());
506	/// # });
507	/// ```
508	pub async fn get_metadata<T>(&self, key: &str) -> RoomResult<Option<T>>
509	where
510		T: serde::de::DeserializeOwned,
511	{
512		let metadata = self.metadata.read().await;
513
514		metadata
515			.get(key)
516			.map(|v| serde_json::from_value(v.clone()))
517			.transpose()
518			.map_err(|e| RoomError::Metadata(e.to_string()))
519	}
520
521	/// Remove metadata from the room
522	///
523	/// # Examples
524	///
525	/// ```
526	/// use reinhardt_websockets::room::Room;
527	/// use serde_json::json;
528	///
529	/// # tokio_test::block_on(async {
530	/// let room = Room::new("temp".to_string());
531	///
532	/// room.set_metadata("temp_key", json!("temp_value")).await.unwrap();
533	/// assert!(room.get_metadata::<String>("temp_key").await.unwrap().is_some());
534	///
535	/// room.remove_metadata("temp_key").await;
536	/// assert!(room.get_metadata::<String>("temp_key").await.unwrap().is_none());
537	/// # });
538	/// ```
539	pub async fn remove_metadata(&self, key: &str) -> Option<Value> {
540		let mut metadata = self.metadata.write().await;
541		metadata.remove(key)
542	}
543
544	/// Clear all metadata
545	///
546	/// # Examples
547	///
548	/// ```
549	/// use reinhardt_websockets::room::Room;
550	/// use serde_json::json;
551	///
552	/// # tokio_test::block_on(async {
553	/// let room = Room::new("reset".to_string());
554	///
555	/// room.set_metadata("key1", json!("value1")).await.unwrap();
556	/// room.set_metadata("key2", json!("value2")).await.unwrap();
557	///
558	/// room.clear_metadata().await;
559	///
560	/// assert!(room.get_metadata::<String>("key1").await.unwrap().is_none());
561	/// assert!(room.get_metadata::<String>("key2").await.unwrap().is_none());
562	/// # });
563	/// ```
564	pub async fn clear_metadata(&self) {
565		let mut metadata = self.metadata.write().await;
566		metadata.clear();
567	}
568
569	/// Check if room is empty
570	///
571	/// # Examples
572	///
573	/// ```
574	/// use reinhardt_websockets::room::Room;
575	/// use reinhardt_websockets::WebSocketConnection;
576	/// use tokio::sync::mpsc;
577	/// use std::sync::Arc;
578	///
579	/// # tokio_test::block_on(async {
580	/// let room = Room::new("empty_check".to_string());
581	/// assert!(room.is_empty().await);
582	///
583	/// let (tx, _rx) = mpsc::unbounded_channel();
584	/// let client = Arc::new(WebSocketConnection::new("henry".to_string(), tx));
585	///
586	/// room.join("henry".to_string(), client).await.unwrap();
587	/// assert!(!room.is_empty().await);
588	/// # });
589	/// ```
590	pub async fn is_empty(&self) -> bool {
591		let clients = self.clients.read().await;
592		clients.is_empty()
593	}
594}
595
596/// Manages multiple WebSocket rooms
597///
598/// # Examples
599///
600/// ```
601/// use reinhardt_websockets::room::RoomManager;
602///
603/// # tokio_test::block_on(async {
604/// let manager = RoomManager::new();
605///
606/// let room = manager.create_room("lobby".to_string()).await;
607/// assert_eq!(room.id(), "lobby");
608/// assert_eq!(manager.room_count().await, 1);
609/// # });
610/// ```
611pub struct RoomManager {
612	rooms: Arc<RwLock<HashMap<String, Arc<Room>>>>,
613}
614
615impl RoomManager {
616	/// Create a new RoomManager
617	///
618	/// # Examples
619	///
620	/// ```
621	/// use reinhardt_websockets::room::RoomManager;
622	///
623	/// let manager = RoomManager::new();
624	/// # tokio_test::block_on(async {
625	/// assert_eq!(manager.room_count().await, 0);
626	/// # });
627	/// ```
628	pub fn new() -> Self {
629		Self {
630			rooms: Arc::new(RwLock::new(HashMap::new())),
631		}
632	}
633
634	/// Create a new room
635	///
636	/// # Examples
637	///
638	/// ```
639	/// use reinhardt_websockets::room::RoomManager;
640	///
641	/// # tokio_test::block_on(async {
642	/// let manager = RoomManager::new();
643	/// let room = manager.create_room("game_room".to_string()).await;
644	/// assert_eq!(room.id(), "game_room");
645	/// # });
646	/// ```
647	pub async fn create_room(&self, id: String) -> Arc<Room> {
648		let mut rooms = self.rooms.write().await;
649
650		let room = Arc::new(Room::new(id.clone()));
651		rooms.insert(id, room.clone());
652
653		room
654	}
655
656	/// Get an existing room
657	///
658	/// # Examples
659	///
660	/// ```
661	/// use reinhardt_websockets::room::RoomManager;
662	///
663	/// # tokio_test::block_on(async {
664	/// let manager = RoomManager::new();
665	/// manager.create_room("test".to_string()).await;
666	///
667	/// let room = manager.get_room("test").await;
668	/// assert!(room.is_some());
669	/// assert_eq!(room.unwrap().id(), "test");
670	/// # });
671	/// ```
672	pub async fn get_room(&self, id: &str) -> Option<Arc<Room>> {
673		let rooms = self.rooms.read().await;
674		rooms.get(id).cloned()
675	}
676
677	/// Get or create a room
678	///
679	/// This method uses a single write lock to avoid TOCTOU race conditions
680	/// that could occur with separate get and create operations.
681	///
682	/// # Examples
683	///
684	/// ```
685	/// use reinhardt_websockets::room::RoomManager;
686	///
687	/// # tokio_test::block_on(async {
688	/// let manager = RoomManager::new();
689	///
690	/// let room1 = manager.get_or_create_room("auto".to_string()).await;
691	/// let room2 = manager.get_or_create_room("auto".to_string()).await;
692	///
693	/// assert_eq!(room1.id(), room2.id());
694	/// # });
695	/// ```
696	pub async fn get_or_create_room(&self, id: String) -> Arc<Room> {
697		let mut rooms = self.rooms.write().await;
698
699		if let Some(room) = rooms.get(&id) {
700			return room.clone();
701		}
702
703		let room = Arc::new(Room::new(id.clone()));
704		rooms.insert(id, room.clone());
705		room
706	}
707
708	/// Delete a room
709	///
710	/// # Examples
711	///
712	/// ```
713	/// use reinhardt_websockets::room::RoomManager;
714	///
715	/// # tokio_test::block_on(async {
716	/// let manager = RoomManager::new();
717	/// manager.create_room("temporary".to_string()).await;
718	///
719	/// assert!(manager.get_room("temporary").await.is_some());
720	///
721	/// manager.delete_room("temporary").await.unwrap();
722	/// assert!(manager.get_room("temporary").await.is_none());
723	/// # });
724	/// ```
725	pub async fn delete_room(&self, id: &str) -> RoomResult<()> {
726		let mut rooms = self.rooms.write().await;
727
728		rooms
729			.remove(id)
730			.ok_or_else(|| RoomError::RoomNotFound(id.to_string()))?;
731
732		Ok(())
733	}
734
735	/// Get the number of rooms
736	///
737	/// # Examples
738	///
739	/// ```
740	/// use reinhardt_websockets::room::RoomManager;
741	///
742	/// # tokio_test::block_on(async {
743	/// let manager = RoomManager::new();
744	/// assert_eq!(manager.room_count().await, 0);
745	///
746	/// manager.create_room("room1".to_string()).await;
747	/// manager.create_room("room2".to_string()).await;
748	///
749	/// assert_eq!(manager.room_count().await, 2);
750	/// # });
751	/// ```
752	pub async fn room_count(&self) -> usize {
753		let rooms = self.rooms.read().await;
754		rooms.len()
755	}
756
757	/// Get all room IDs
758	///
759	/// # Examples
760	///
761	/// ```
762	/// use reinhardt_websockets::room::RoomManager;
763	///
764	/// # tokio_test::block_on(async {
765	/// let manager = RoomManager::new();
766	///
767	/// manager.create_room("alpha".to_string()).await;
768	/// manager.create_room("beta".to_string()).await;
769	///
770	/// let ids = manager.room_ids().await;
771	/// assert_eq!(ids.len(), 2);
772	/// assert!(ids.contains(&"alpha".to_string()));
773	/// assert!(ids.contains(&"beta".to_string()));
774	/// # });
775	/// ```
776	pub async fn room_ids(&self) -> Vec<String> {
777		let rooms = self.rooms.read().await;
778		rooms.keys().cloned().collect()
779	}
780
781	/// Check if a room exists
782	///
783	/// # Examples
784	///
785	/// ```
786	/// use reinhardt_websockets::room::RoomManager;
787	///
788	/// # tokio_test::block_on(async {
789	/// let manager = RoomManager::new();
790	/// manager.create_room("exists".to_string()).await;
791	///
792	/// assert!(manager.has_room("exists").await);
793	/// assert!(!manager.has_room("missing").await);
794	/// # });
795	/// ```
796	pub async fn has_room(&self, id: &str) -> bool {
797		let rooms = self.rooms.read().await;
798		rooms.contains_key(id)
799	}
800
801	/// Delete all empty rooms
802	///
803	/// # Examples
804	///
805	/// ```
806	/// use reinhardt_websockets::room::RoomManager;
807	/// use reinhardt_websockets::WebSocketConnection;
808	/// use tokio::sync::mpsc;
809	/// use std::sync::Arc;
810	///
811	/// # tokio_test::block_on(async {
812	/// let manager = RoomManager::new();
813	///
814	/// let empty_room = manager.create_room("empty".to_string()).await;
815	/// let occupied_room = manager.create_room("occupied".to_string()).await;
816	///
817	/// let (tx, _rx) = mpsc::unbounded_channel();
818	/// let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
819	/// occupied_room.join("user".to_string(), client).await.unwrap();
820	///
821	/// manager.cleanup_empty_rooms().await;
822	///
823	/// assert!(!manager.has_room("empty").await);
824	/// assert!(manager.has_room("occupied").await);
825	/// # });
826	/// ```
827	pub async fn cleanup_empty_rooms(&self) {
828		let mut rooms = self.rooms.write().await;
829		let empty_room_ids: Vec<String> = {
830			let mut empty_ids = Vec::new();
831			for (id, room) in rooms.iter() {
832				if room.is_empty().await {
833					empty_ids.push(id.clone());
834				}
835			}
836			empty_ids
837		};
838
839		for id in empty_room_ids {
840			rooms.remove(&id);
841		}
842	}
843
844	/// Add a client to a specific room
845	///
846	/// # Examples
847	///
848	/// ```
849	/// use reinhardt_websockets::room::RoomManager;
850	/// use reinhardt_websockets::WebSocketConnection;
851	/// use tokio::sync::mpsc;
852	/// use std::sync::Arc;
853	///
854	/// # tokio_test::block_on(async {
855	/// let manager = RoomManager::new();
856	/// manager.create_room("game".to_string()).await;
857	///
858	/// let (tx, _rx) = mpsc::unbounded_channel();
859	/// let conn = Arc::new(WebSocketConnection::new("player1".to_string(), tx));
860	///
861	/// manager.join_room("game".to_string(), conn).await.unwrap();
862	/// # });
863	/// ```
864	pub async fn join_room(
865		&self,
866		room_id: String,
867		connection: Arc<WebSocketConnection>,
868	) -> RoomResult<()> {
869		let room = self
870			.get_room(&room_id)
871			.await
872			.ok_or_else(|| RoomError::RoomNotFound(room_id.clone()))?;
873
874		let client_id = connection.id().to_string();
875		room.join(client_id, connection).await
876	}
877
878	/// Remove a client from a specific room
879	///
880	/// # Examples
881	///
882	/// ```
883	/// use reinhardt_websockets::room::RoomManager;
884	/// use reinhardt_websockets::WebSocketConnection;
885	/// use tokio::sync::mpsc;
886	/// use std::sync::Arc;
887	///
888	/// # tokio_test::block_on(async {
889	/// let manager = RoomManager::new();
890	/// manager.create_room("chat".to_string()).await;
891	///
892	/// let (tx, _rx) = mpsc::unbounded_channel();
893	/// let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
894	///
895	/// manager.join_room("chat".to_string(), conn).await.unwrap();
896	/// manager.leave_room("chat", "user1").await.unwrap();
897	/// # });
898	/// ```
899	pub async fn leave_room(&self, room_id: &str, user_id: &str) -> RoomResult<()> {
900		let room = self
901			.get_room(room_id)
902			.await
903			.ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
904
905		room.leave(user_id).await
906	}
907
908	/// Get the number of clients in a specific room
909	///
910	/// # Examples
911	///
912	/// ```
913	/// use reinhardt_websockets::room::RoomManager;
914	/// use reinhardt_websockets::WebSocketConnection;
915	/// use tokio::sync::mpsc;
916	/// use std::sync::Arc;
917	///
918	/// # tokio_test::block_on(async {
919	/// let manager = RoomManager::new();
920	/// manager.create_room("lobby".to_string()).await;
921	///
922	/// let (tx, _rx) = mpsc::unbounded_channel();
923	/// let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
924	///
925	/// manager.join_room("lobby".to_string(), conn).await.unwrap();
926	/// assert_eq!(manager.get_room_size("lobby").await, 1);
927	/// # });
928	/// ```
929	pub async fn get_room_size(&self, room_id: &str) -> usize {
930		if let Some(room) = self.get_room(room_id).await {
931			room.client_count().await
932		} else {
933			0
934		}
935	}
936
937	/// Broadcast a message to all clients in a specific room.
938	///
939	/// Returns a [`BroadcastResult`] describing which clients received the
940	/// message and which failed. Returns [`RoomError::RoomNotFound`] if the
941	/// room does not exist.
942	///
943	/// # Examples
944	///
945	/// ```
946	/// use reinhardt_websockets::room::RoomManager;
947	/// use reinhardt_websockets::{WebSocketConnection, Message};
948	/// use tokio::sync::mpsc;
949	/// use std::sync::Arc;
950	///
951	/// # tokio_test::block_on(async {
952	/// let manager = RoomManager::new();
953	/// manager.create_room("announcement".to_string()).await;
954	///
955	/// let (tx, mut rx) = mpsc::unbounded_channel();
956	/// let conn = Arc::new(WebSocketConnection::new("listener".to_string(), tx));
957	///
958	/// manager.join_room("announcement".to_string(), conn).await.unwrap();
959	///
960	/// let msg = Message::text("Hello everyone!".to_string());
961	/// let result = manager.broadcast_to_room("announcement", msg).await.unwrap();
962	/// assert!(result.is_complete_success());
963	/// # });
964	/// ```
965	pub async fn broadcast_to_room(
966		&self,
967		room_id: &str,
968		message: Message,
969	) -> RoomResult<BroadcastResult> {
970		let room = self
971			.get_room(room_id)
972			.await
973			.ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
974
975		Ok(room.broadcast(message).await)
976	}
977
978	/// Broadcasts a message to all clients in a room with a per-client timeout.
979	///
980	/// Slow consumers that do not accept the message within the given timeout
981	/// are treated as failed and removed from the room.
982	///
983	/// Returns [`RoomError::RoomNotFound`] if the room does not exist.
984	///
985	/// # Examples
986	///
987	/// ```
988	/// use reinhardt_websockets::room::RoomManager;
989	/// use reinhardt_websockets::{WebSocketConnection, Message};
990	/// use tokio::sync::mpsc;
991	/// use std::sync::Arc;
992	/// use std::time::Duration;
993	///
994	/// # tokio_test::block_on(async {
995	/// let manager = RoomManager::new();
996	/// manager.create_room("timeout_test".to_string()).await;
997	///
998	/// let (tx, _rx) = mpsc::unbounded_channel();
999	/// let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
1000	///
1001	/// manager.join_room("timeout_test".to_string(), conn).await.unwrap();
1002	///
1003	/// let msg = Message::text("Hello!".to_string());
1004	/// let result = manager
1005	///     .broadcast_to_room_with_timeout("timeout_test", msg, Duration::from_secs(5))
1006	///     .await
1007	///     .unwrap();
1008	/// assert!(result.is_complete_success());
1009	/// # });
1010	/// ```
1011	pub async fn broadcast_to_room_with_timeout(
1012		&self,
1013		room_id: &str,
1014		message: Message,
1015		send_timeout: Duration,
1016	) -> RoomResult<BroadcastResult> {
1017		let room = self
1018			.get_room(room_id)
1019			.await
1020			.ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
1021
1022		Ok(room.broadcast_with_timeout(message, send_timeout).await)
1023	}
1024
1025	/// Broadcast a message to all clients in all rooms.
1026	///
1027	/// Returns a [`BroadcastResult`] aggregated across all rooms, reporting
1028	/// which clients received the message and which failed.
1029	///
1030	/// # Examples
1031	///
1032	/// ```
1033	/// use reinhardt_websockets::room::RoomManager;
1034	/// use reinhardt_websockets::{WebSocketConnection, Message};
1035	/// use tokio::sync::mpsc;
1036	/// use std::sync::Arc;
1037	///
1038	/// # tokio_test::block_on(async {
1039	/// let manager = RoomManager::new();
1040	/// manager.create_room("room1".to_string()).await;
1041	/// manager.create_room("room2".to_string()).await;
1042	///
1043	/// let (tx1, _rx1) = mpsc::unbounded_channel();
1044	/// let (tx2, _rx2) = mpsc::unbounded_channel();
1045	///
1046	/// let conn1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
1047	/// let conn2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
1048	///
1049	/// manager.join_room("room1".to_string(), conn1).await.unwrap();
1050	/// manager.join_room("room2".to_string(), conn2).await.unwrap();
1051	///
1052	/// let msg = Message::text("System message".to_string());
1053	/// let result = manager.broadcast_to_all(msg).await;
1054	/// assert!(result.is_complete_success());
1055	/// # });
1056	/// ```
1057	pub async fn broadcast_to_all(&self, message: Message) -> BroadcastResult {
1058		let rooms = self.rooms.read().await;
1059
1060		let mut successful = Vec::new();
1061		let mut failed = Vec::new();
1062
1063		for room in rooms.values() {
1064			let result = room.broadcast(message.clone()).await;
1065			successful.extend(result.successful);
1066			failed.extend(result.failed);
1067		}
1068
1069		BroadcastResult { successful, failed }
1070	}
1071
1072	/// Get all room IDs (alias for room_ids for compatibility)
1073	///
1074	/// # Examples
1075	///
1076	/// ```
1077	/// use reinhardt_websockets::room::RoomManager;
1078	///
1079	/// # tokio_test::block_on(async {
1080	/// let manager = RoomManager::new();
1081	/// manager.create_room("alpha".to_string()).await;
1082	/// manager.create_room("beta".to_string()).await;
1083	///
1084	/// let rooms = manager.get_all_rooms().await;
1085	/// assert_eq!(rooms.len(), 2);
1086	/// assert!(rooms.contains(&"alpha".to_string()));
1087	/// assert!(rooms.contains(&"beta".to_string()));
1088	/// # });
1089	/// ```
1090	pub async fn get_all_rooms(&self) -> Vec<String> {
1091		self.room_ids().await
1092	}
1093}
1094
1095impl Default for RoomManager {
1096	fn default() -> Self {
1097		Self::new()
1098	}
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103	use super::*;
1104	use rstest::rstest;
1105	use tokio::sync::mpsc;
1106
1107	#[rstest]
1108	#[tokio::test]
1109	async fn test_room_new() {
1110		let room = Room::new("test_room".to_string());
1111		assert_eq!(room.id(), "test_room");
1112		assert_eq!(room.client_count().await, 0);
1113		assert!(room.is_empty().await);
1114	}
1115
1116	#[tokio::test]
1117	async fn test_room_join_client() {
1118		let room = Room::new("join_test".to_string());
1119		let (tx, _rx) = mpsc::unbounded_channel();
1120		let client = Arc::new(WebSocketConnection::new("client1".to_string(), tx));
1121
1122		room.join("client1".to_string(), client).await.unwrap();
1123		assert_eq!(room.client_count().await, 1);
1124		assert!(room.has_client("client1").await);
1125	}
1126
1127	#[tokio::test]
1128	async fn test_room_join_duplicate_client() {
1129		let room = Room::new("duplicate_test".to_string());
1130		let (tx1, _rx1) = mpsc::unbounded_channel();
1131		let (tx2, _rx2) = mpsc::unbounded_channel();
1132
1133		let client1 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx1));
1134		let client2 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx2));
1135
1136		room.join("duplicate".to_string(), client1).await.unwrap();
1137		let result = room.join("duplicate".to_string(), client2).await;
1138
1139		assert!(result.is_err());
1140		assert!(matches!(
1141			result.unwrap_err(),
1142			RoomError::ClientAlreadyExists(_)
1143		));
1144	}
1145
1146	#[tokio::test]
1147	async fn test_room_leave_client() {
1148		let room = Room::new("leave_test".to_string());
1149		let (tx, _rx) = mpsc::unbounded_channel();
1150		let client = Arc::new(WebSocketConnection::new("leaver".to_string(), tx));
1151
1152		room.join("leaver".to_string(), client).await.unwrap();
1153		assert!(room.has_client("leaver").await);
1154
1155		room.leave("leaver").await.unwrap();
1156		assert!(!room.has_client("leaver").await);
1157		assert_eq!(room.client_count().await, 0);
1158	}
1159
1160	#[tokio::test]
1161	async fn test_room_leave_nonexistent_client() {
1162		let room = Room::new("leave_error_test".to_string());
1163		let result = room.leave("nonexistent").await;
1164
1165		assert!(result.is_err());
1166		assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1167	}
1168
1169	#[rstest]
1170	#[tokio::test]
1171	async fn test_room_broadcast() {
1172		// Arrange
1173		let room = Room::new("broadcast_test".to_string());
1174
1175		let (tx1, mut rx1) = mpsc::unbounded_channel();
1176		let (tx2, mut rx2) = mpsc::unbounded_channel();
1177		let (tx3, mut rx3) = mpsc::unbounded_channel();
1178
1179		let client1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
1180		let client2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
1181		let client3 = Arc::new(WebSocketConnection::new("user3".to_string(), tx3));
1182
1183		room.join("user1".to_string(), client1).await.unwrap();
1184		room.join("user2".to_string(), client2).await.unwrap();
1185		room.join("user3".to_string(), client3).await.unwrap();
1186
1187		// Act
1188		let msg = Message::text("Broadcast message".to_string());
1189		let result = room.broadcast(msg).await;
1190
1191		// Assert
1192		assert!(result.is_complete_success());
1193		assert_eq!(result.successful.len(), 3);
1194		assert_eq!(result.failure_count(), 0);
1195		assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1196		assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1197		assert!(matches!(rx3.try_recv(), Ok(Message::Text { .. })));
1198	}
1199
1200	#[tokio::test]
1201	async fn test_room_send_to_specific_client() {
1202		let room = Room::new("private_msg_test".to_string());
1203
1204		let (tx1, mut rx1) = mpsc::unbounded_channel();
1205		let (tx2, mut rx2) = mpsc::unbounded_channel();
1206
1207		let client1 = Arc::new(WebSocketConnection::new("target".to_string(), tx1));
1208		let client2 = Arc::new(WebSocketConnection::new("other".to_string(), tx2));
1209
1210		room.join("target".to_string(), client1).await.unwrap();
1211		room.join("other".to_string(), client2).await.unwrap();
1212
1213		let msg = Message::text("Private message".to_string());
1214		room.send_to("target", msg).await.unwrap();
1215
1216		assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1217		assert!(rx2.try_recv().is_err());
1218	}
1219
1220	#[tokio::test]
1221	async fn test_room_send_to_nonexistent_client() {
1222		let room = Room::new("send_error_test".to_string());
1223		let msg = Message::text("Test".to_string());
1224		let result = room.send_to("nonexistent", msg).await;
1225
1226		assert!(result.is_err());
1227		assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1228	}
1229
1230	#[tokio::test]
1231	async fn test_room_client_ids() {
1232		let room = Room::new("ids_test".to_string());
1233
1234		let (tx1, _rx1) = mpsc::unbounded_channel();
1235		let (tx2, _rx2) = mpsc::unbounded_channel();
1236
1237		let client1 = Arc::new(WebSocketConnection::new("alpha".to_string(), tx1));
1238		let client2 = Arc::new(WebSocketConnection::new("beta".to_string(), tx2));
1239
1240		room.join("alpha".to_string(), client1).await.unwrap();
1241		room.join("beta".to_string(), client2).await.unwrap();
1242
1243		let ids = room.client_ids().await;
1244		assert_eq!(ids.len(), 2);
1245		assert!(ids.contains(&"alpha".to_string()));
1246		assert!(ids.contains(&"beta".to_string()));
1247	}
1248
1249	#[tokio::test]
1250	async fn test_room_metadata_set_and_get() {
1251		use serde_json::json;
1252
1253		let room = Room::new("metadata_test".to_string());
1254
1255		room.set_metadata("max_users", json!(100)).await.unwrap();
1256		room.set_metadata("topic", json!("General Chat"))
1257			.await
1258			.unwrap();
1259
1260		let max_users: i64 = room.get_metadata("max_users").await.unwrap().unwrap();
1261		assert_eq!(max_users, 100);
1262
1263		let topic: String = room.get_metadata("topic").await.unwrap().unwrap();
1264		assert_eq!(topic, "General Chat");
1265	}
1266
1267	#[tokio::test]
1268	async fn test_room_metadata_get_nonexistent() {
1269		let room = Room::new("metadata_missing_test".to_string());
1270		let result: Option<String> = room.get_metadata("nonexistent").await.unwrap();
1271		assert!(result.is_none());
1272	}
1273
1274	#[tokio::test]
1275	async fn test_room_metadata_remove() {
1276		use serde_json::json;
1277
1278		let room = Room::new("metadata_remove_test".to_string());
1279
1280		room.set_metadata("temp", json!("value")).await.unwrap();
1281		assert!(room.get_metadata::<String>("temp").await.unwrap().is_some());
1282
1283		room.remove_metadata("temp").await;
1284		assert!(room.get_metadata::<String>("temp").await.unwrap().is_none());
1285	}
1286
1287	#[tokio::test]
1288	async fn test_room_metadata_clear() {
1289		use serde_json::json;
1290
1291		let room = Room::new("metadata_clear_test".to_string());
1292
1293		room.set_metadata("key1", json!("value1")).await.unwrap();
1294		room.set_metadata("key2", json!("value2")).await.unwrap();
1295
1296		room.clear_metadata().await;
1297
1298		assert!(room.get_metadata::<String>("key1").await.unwrap().is_none());
1299		assert!(room.get_metadata::<String>("key2").await.unwrap().is_none());
1300	}
1301
1302	#[tokio::test]
1303	async fn test_room_is_empty() {
1304		let room = Room::new("empty_test".to_string());
1305		assert!(room.is_empty().await);
1306
1307		let (tx, _rx) = mpsc::unbounded_channel();
1308		let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1309
1310		room.join("user".to_string(), client).await.unwrap();
1311		assert!(!room.is_empty().await);
1312
1313		room.leave("user").await.unwrap();
1314		assert!(room.is_empty().await);
1315	}
1316
1317	#[tokio::test]
1318	async fn test_room_manager_new() {
1319		let manager = RoomManager::new();
1320		assert_eq!(manager.room_count().await, 0);
1321	}
1322
1323	#[tokio::test]
1324	async fn test_room_manager_create_room() {
1325		let manager = RoomManager::new();
1326		let room = manager.create_room("new_room".to_string()).await;
1327
1328		assert_eq!(room.id(), "new_room");
1329		assert_eq!(manager.room_count().await, 1);
1330	}
1331
1332	#[tokio::test]
1333	async fn test_room_manager_get_room() {
1334		let manager = RoomManager::new();
1335		manager.create_room("existing".to_string()).await;
1336
1337		let room = manager.get_room("existing").await;
1338		assert!(room.is_some());
1339		assert_eq!(room.unwrap().id(), "existing");
1340
1341		let missing = manager.get_room("missing").await;
1342		assert!(missing.is_none());
1343	}
1344
1345	#[tokio::test]
1346	async fn test_room_manager_get_or_create_room() {
1347		let manager = RoomManager::new();
1348
1349		let room1 = manager.get_or_create_room("auto_room".to_string()).await;
1350		assert_eq!(manager.room_count().await, 1);
1351
1352		let room2 = manager.get_or_create_room("auto_room".to_string()).await;
1353		assert_eq!(manager.room_count().await, 1);
1354
1355		assert_eq!(room1.id(), room2.id());
1356	}
1357
1358	#[tokio::test]
1359	async fn test_room_manager_delete_room() {
1360		let manager = RoomManager::new();
1361		manager.create_room("to_delete".to_string()).await;
1362
1363		assert!(manager.has_room("to_delete").await);
1364
1365		manager.delete_room("to_delete").await.unwrap();
1366		assert!(!manager.has_room("to_delete").await);
1367	}
1368
1369	#[tokio::test]
1370	async fn test_room_manager_delete_nonexistent_room() {
1371		let manager = RoomManager::new();
1372		let result = manager.delete_room("nonexistent").await;
1373
1374		assert!(result.is_err());
1375		assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1376	}
1377
1378	#[tokio::test]
1379	async fn test_room_manager_room_ids() {
1380		let manager = RoomManager::new();
1381
1382		manager.create_room("room1".to_string()).await;
1383		manager.create_room("room2".to_string()).await;
1384		manager.create_room("room3".to_string()).await;
1385
1386		let ids = manager.room_ids().await;
1387		assert_eq!(ids.len(), 3);
1388		assert!(ids.contains(&"room1".to_string()));
1389		assert!(ids.contains(&"room2".to_string()));
1390		assert!(ids.contains(&"room3".to_string()));
1391	}
1392
1393	#[tokio::test]
1394	async fn test_room_manager_has_room() {
1395		let manager = RoomManager::new();
1396		manager.create_room("check".to_string()).await;
1397
1398		assert!(manager.has_room("check").await);
1399		assert!(!manager.has_room("missing").await);
1400	}
1401
1402	#[tokio::test]
1403	async fn test_room_manager_cleanup_empty_rooms() {
1404		let manager = RoomManager::new();
1405
1406		let _empty_room = manager.create_room("empty".to_string()).await;
1407		let occupied_room = manager.create_room("occupied".to_string()).await;
1408
1409		let (tx, _rx) = mpsc::unbounded_channel();
1410		let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1411		occupied_room
1412			.join("user".to_string(), client)
1413			.await
1414			.unwrap();
1415
1416		manager.cleanup_empty_rooms().await;
1417
1418		assert!(!manager.has_room("empty").await);
1419		assert!(manager.has_room("occupied").await);
1420		assert_eq!(manager.room_count().await, 1);
1421	}
1422
1423	#[rstest]
1424	#[tokio::test]
1425	async fn test_broadcast_partial_failure_reports_failed_clients() {
1426		// Arrange
1427		let room = Room::new("partial_fail".to_string());
1428
1429		let (tx_alive, mut rx_alive) = mpsc::unbounded_channel();
1430		let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1431
1432		let alive_client = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1433		let dead_client = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1434
1435		room.join("alive".to_string(), alive_client).await.unwrap();
1436		room.join("dead".to_string(), dead_client).await.unwrap();
1437
1438		// Simulate dead connection by dropping the receiver
1439		drop(_rx_dead);
1440
1441		// Act
1442		let msg = Message::text("test message".to_string());
1443		let result = room.broadcast(msg).await;
1444
1445		// Assert
1446		assert!(!result.is_complete_success());
1447		assert!(result.is_partial_success());
1448		assert!(!result.is_complete_failure());
1449		assert_eq!(result.successful.len(), 1);
1450		assert!(result.successful.contains(&"alive".to_string()));
1451		assert_eq!(result.failure_count(), 1);
1452		assert!(result.failed_client_ids().contains(&"dead"));
1453		assert!(matches!(rx_alive.try_recv(), Ok(Message::Text { .. })));
1454	}
1455
1456	#[rstest]
1457	#[tokio::test]
1458	async fn test_broadcast_partial_failure_removes_dead_connections() {
1459		// Arrange
1460		let room = Room::new("cleanup".to_string());
1461
1462		let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1463		let (tx_dead1, _rx_dead1) = mpsc::unbounded_channel::<Message>();
1464		let (tx_dead2, _rx_dead2) = mpsc::unbounded_channel::<Message>();
1465
1466		let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1467		let dead1 = Arc::new(WebSocketConnection::new("dead1".to_string(), tx_dead1));
1468		let dead2 = Arc::new(WebSocketConnection::new("dead2".to_string(), tx_dead2));
1469
1470		room.join("alive".to_string(), alive).await.unwrap();
1471		room.join("dead1".to_string(), dead1).await.unwrap();
1472		room.join("dead2".to_string(), dead2).await.unwrap();
1473		assert_eq!(room.client_count().await, 3);
1474
1475		// Simulate dead connections
1476		drop(_rx_dead1);
1477		drop(_rx_dead2);
1478
1479		// Act
1480		let msg = Message::text("cleanup test".to_string());
1481		let result = room.broadcast(msg).await;
1482
1483		// Assert - dead connections are automatically removed
1484		assert_eq!(result.failure_count(), 2);
1485		assert_eq!(room.client_count().await, 1);
1486		assert!(room.has_client("alive").await);
1487		assert!(!room.has_client("dead1").await);
1488		assert!(!room.has_client("dead2").await);
1489	}
1490
1491	#[rstest]
1492	#[tokio::test]
1493	async fn test_broadcast_complete_failure() {
1494		// Arrange
1495		let room = Room::new("all_dead".to_string());
1496
1497		let (tx1, _rx1) = mpsc::unbounded_channel::<Message>();
1498		let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1499
1500		let client1 = Arc::new(WebSocketConnection::new("c1".to_string(), tx1));
1501		let client2 = Arc::new(WebSocketConnection::new("c2".to_string(), tx2));
1502
1503		room.join("c1".to_string(), client1).await.unwrap();
1504		room.join("c2".to_string(), client2).await.unwrap();
1505
1506		// Simulate all dead connections
1507		drop(_rx1);
1508		drop(_rx2);
1509
1510		// Act
1511		let msg = Message::text("nobody home".to_string());
1512		let result = room.broadcast(msg).await;
1513
1514		// Assert
1515		assert!(result.is_complete_failure());
1516		assert!(!result.is_partial_success());
1517		assert_eq!(result.failure_count(), 2);
1518		assert!(result.successful.is_empty());
1519		assert_eq!(room.client_count().await, 0);
1520	}
1521
1522	#[rstest]
1523	#[tokio::test]
1524	async fn test_broadcast_empty_room() {
1525		// Arrange
1526		let room = Room::new("empty_broadcast".to_string());
1527
1528		// Act
1529		let msg = Message::text("echo".to_string());
1530		let result = room.broadcast(msg).await;
1531
1532		// Assert
1533		assert!(result.is_complete_success());
1534		assert!(result.successful.is_empty());
1535		assert!(result.failed.is_empty());
1536	}
1537
1538	#[rstest]
1539	#[tokio::test]
1540	async fn test_broadcast_to_room_returns_broadcast_result() {
1541		// Arrange
1542		let manager = RoomManager::new();
1543		manager.create_room("results".to_string()).await;
1544
1545		let (tx1, _rx1) = mpsc::unbounded_channel();
1546		let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1547		let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1548		let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1549
1550		manager
1551			.join_room("results".to_string(), conn1)
1552			.await
1553			.unwrap();
1554		manager
1555			.join_room("results".to_string(), conn2)
1556			.await
1557			.unwrap();
1558
1559		// Simulate dead connection
1560		drop(_rx2);
1561
1562		// Act
1563		let msg = Message::text("room broadcast".to_string());
1564		let result = manager.broadcast_to_room("results", msg).await.unwrap();
1565
1566		// Assert
1567		assert!(result.is_partial_success());
1568		assert_eq!(result.successful.len(), 1);
1569		assert_eq!(result.failure_count(), 1);
1570	}
1571
1572	#[rstest]
1573	#[tokio::test]
1574	async fn test_broadcast_to_all_aggregates_results() {
1575		// Arrange
1576		let manager = RoomManager::new();
1577		let room1 = manager.create_room("r1".to_string()).await;
1578		let room2 = manager.create_room("r2".to_string()).await;
1579
1580		let (tx1, _rx1) = mpsc::unbounded_channel();
1581		let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1582		let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1583		let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1584
1585		room1.join("ok".to_string(), conn1).await.unwrap();
1586		room2.join("dead".to_string(), conn2).await.unwrap();
1587
1588		// Simulate dead connection in room2
1589		drop(_rx2);
1590
1591		// Act
1592		let msg = Message::text("global".to_string());
1593		let result = manager.broadcast_to_all(msg).await;
1594
1595		// Assert - aggregated results from both rooms
1596		assert!(result.is_partial_success());
1597		assert_eq!(result.successful.len(), 1);
1598		assert_eq!(result.failure_count(), 1);
1599		assert!(result.successful.contains(&"ok".to_string()));
1600		assert!(result.failed_client_ids().contains(&"dead"));
1601	}
1602
1603	#[rstest]
1604	#[tokio::test]
1605	async fn test_broadcast_with_timeout_succeeds_for_responsive_clients() {
1606		// Arrange
1607		let room = Room::new("timeout_ok".to_string());
1608
1609		let (tx1, mut rx1) = mpsc::unbounded_channel();
1610		let (tx2, mut rx2) = mpsc::unbounded_channel();
1611
1612		let client1 = Arc::new(WebSocketConnection::new("fast1".to_string(), tx1));
1613		let client2 = Arc::new(WebSocketConnection::new("fast2".to_string(), tx2));
1614
1615		room.join("fast1".to_string(), client1).await.unwrap();
1616		room.join("fast2".to_string(), client2).await.unwrap();
1617
1618		// Act
1619		let msg = Message::text("hello with timeout".to_string());
1620		let result = room
1621			.broadcast_with_timeout(msg, Duration::from_secs(5))
1622			.await;
1623
1624		// Assert
1625		assert!(result.is_complete_success());
1626		assert_eq!(result.successful.len(), 2);
1627		assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1628		assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1629	}
1630
1631	#[rstest]
1632	#[tokio::test]
1633	async fn test_broadcast_with_timeout_removes_dead_connections() {
1634		// Arrange
1635		let room = Room::new("timeout_dead".to_string());
1636
1637		let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1638		let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1639
1640		let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1641		let dead = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1642
1643		room.join("alive".to_string(), alive).await.unwrap();
1644		room.join("dead".to_string(), dead).await.unwrap();
1645
1646		// Simulate dead connection
1647		drop(_rx_dead);
1648
1649		// Act
1650		let msg = Message::text("test".to_string());
1651		let result = room
1652			.broadcast_with_timeout(msg, Duration::from_secs(5))
1653			.await;
1654
1655		// Assert
1656		assert!(result.is_partial_success());
1657		assert_eq!(result.successful.len(), 1);
1658		assert_eq!(result.failure_count(), 1);
1659		assert_eq!(room.client_count().await, 1);
1660		assert!(room.has_client("alive").await);
1661		assert!(!room.has_client("dead").await);
1662	}
1663
1664	#[rstest]
1665	#[tokio::test]
1666	async fn test_broadcast_with_timeout_empty_room() {
1667		// Arrange
1668		let room = Room::new("timeout_empty".to_string());
1669
1670		// Act
1671		let msg = Message::text("nobody".to_string());
1672		let result = room
1673			.broadcast_with_timeout(msg, Duration::from_secs(1))
1674			.await;
1675
1676		// Assert
1677		assert!(result.is_complete_success());
1678		assert!(result.successful.is_empty());
1679		assert!(result.failed.is_empty());
1680	}
1681
1682	#[rstest]
1683	#[tokio::test]
1684	async fn test_broadcast_to_room_with_timeout() {
1685		// Arrange
1686		let manager = RoomManager::new();
1687		manager.create_room("timeout_room".to_string()).await;
1688
1689		let (tx, _rx) = mpsc::unbounded_channel();
1690		let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
1691
1692		manager
1693			.join_room("timeout_room".to_string(), conn)
1694			.await
1695			.unwrap();
1696
1697		// Act
1698		let msg = Message::text("hello".to_string());
1699		let result = manager
1700			.broadcast_to_room_with_timeout("timeout_room", msg, Duration::from_secs(5))
1701			.await
1702			.unwrap();
1703
1704		// Assert
1705		assert!(result.is_complete_success());
1706		assert_eq!(result.successful.len(), 1);
1707	}
1708
1709	#[rstest]
1710	#[tokio::test]
1711	async fn test_broadcast_to_room_with_timeout_nonexistent_room() {
1712		// Arrange
1713		let manager = RoomManager::new();
1714
1715		// Act
1716		let msg = Message::text("hello".to_string());
1717		let result = manager
1718			.broadcast_to_room_with_timeout("missing", msg, Duration::from_secs(1))
1719			.await;
1720
1721		// Assert
1722		assert!(result.is_err());
1723		assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1724	}
1725}