Skip to main content

reinhardt_websockets/
room.rs

1//! WebSocket room management with advanced features
2//!
3//! This module provides room-based WebSocket connection management,
4//! including client tracking, metadata storage, and targeted messaging.
5
6use crate::connection::{Message, WebSocketConnection, WebSocketError};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12
13/// Error types for room operations
14#[derive(Debug, thiserror::Error)]
15pub enum RoomError {
16	/// The specified client was not found in the room.
17	#[error("Client not found")]
18	ClientNotFound(String),
19	/// The specified room does not exist.
20	#[error("Room not found")]
21	RoomNotFound(String),
22	/// A client with the same ID already exists in the room.
23	#[error("Client already exists")]
24	ClientAlreadyExists(String),
25	/// An underlying WebSocket error occurred.
26	#[error("WebSocket error")]
27	WebSocket(#[from] WebSocketError),
28	/// An error related to room or client metadata.
29	#[error("Metadata error")]
30	Metadata(String),
31}
32
33/// A specialized `Result` type for room operations.
34pub type RoomResult<T> = Result<T, RoomError>;
35
36/// Result of a broadcast operation that tracks individual send outcomes.
37///
38/// This provides detailed information about which clients received the message
39/// and which failed, enabling callers to take appropriate action (e.g., logging,
40/// notifying administrators, or retrying).
41///
42/// Dead connections that fail during broadcast are automatically removed from
43/// the room.
44#[derive(Debug)]
45pub struct BroadcastResult {
46	/// Client IDs that received the message successfully
47	pub successful: Vec<String>,
48	/// Client IDs that failed with their respective errors
49	pub failed: Vec<(String, WebSocketError)>,
50}
51
52impl BroadcastResult {
53	/// Returns `true` if all sends were successful (no failures).
54	pub fn is_complete_success(&self) -> bool {
55		self.failed.is_empty()
56	}
57
58	/// Returns `true` if at least one send was successful.
59	pub fn is_partial_success(&self) -> bool {
60		!self.successful.is_empty()
61	}
62
63	/// Returns `true` if all sends failed.
64	pub fn is_complete_failure(&self) -> bool {
65		self.successful.is_empty() && !self.failed.is_empty()
66	}
67
68	/// Returns the number of failed sends.
69	pub fn failure_count(&self) -> usize {
70		self.failed.len()
71	}
72
73	/// Returns the IDs of clients that failed to receive the message.
74	pub fn failed_client_ids(&self) -> Vec<&str> {
75		self.failed.iter().map(|(id, _)| id.as_str()).collect()
76	}
77}
78
79/// A WebSocket room that manages multiple client connections
80///
81/// # Examples
82///
83/// ```
84/// use reinhardt_websockets::room::Room;
85/// use reinhardt_websockets::WebSocketConnection;
86/// use tokio::sync::mpsc;
87/// use std::sync::Arc;
88///
89/// # tokio_test::block_on(async {
90/// let room = Room::new("chat_room".to_string());
91///
92/// let (tx, _rx) = mpsc::unbounded_channel();
93/// let client = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
94///
95/// room.join("user1".to_string(), client).await.unwrap();
96/// assert_eq!(room.client_count().await, 1);
97/// # });
98/// ```
99pub struct Room {
100	id: String,
101	clients: Arc<RwLock<HashMap<String, Arc<WebSocketConnection>>>>,
102	metadata: Arc<RwLock<HashMap<String, Value>>>,
103}
104
105impl Room {
106	/// Create a new room with the given ID
107	///
108	/// # Examples
109	///
110	/// ```
111	/// use reinhardt_websockets::room::Room;
112	///
113	/// let room = Room::new("general".to_string());
114	/// assert_eq!(room.id(), "general");
115	/// ```
116	pub fn new(id: String) -> Self {
117		Self {
118			id,
119			clients: Arc::new(RwLock::new(HashMap::new())),
120			metadata: Arc::new(RwLock::new(HashMap::new())),
121		}
122	}
123
124	/// Get the room ID
125	///
126	/// # Examples
127	///
128	/// ```
129	/// use reinhardt_websockets::room::Room;
130	///
131	/// let room = Room::new("lobby".to_string());
132	/// assert_eq!(room.id(), "lobby");
133	/// ```
134	pub fn id(&self) -> &str {
135		&self.id
136	}
137
138	/// Add a client to the room
139	///
140	/// # Examples
141	///
142	/// ```
143	/// use reinhardt_websockets::room::Room;
144	/// use reinhardt_websockets::WebSocketConnection;
145	/// use tokio::sync::mpsc;
146	/// use std::sync::Arc;
147	///
148	/// # tokio_test::block_on(async {
149	/// let room = Room::new("chat".to_string());
150	/// let (tx, _rx) = mpsc::unbounded_channel();
151	/// let client = Arc::new(WebSocketConnection::new("alice".to_string(), tx));
152	///
153	/// room.join("alice".to_string(), client).await.unwrap();
154	/// assert!(room.has_client("alice").await);
155	/// # });
156	/// ```
157	pub async fn join(
158		&self,
159		client_id: String,
160		client: Arc<WebSocketConnection>,
161	) -> RoomResult<()> {
162		let mut clients = self.clients.write().await;
163
164		if clients.contains_key(&client_id) {
165			return Err(RoomError::ClientAlreadyExists(client_id));
166		}
167
168		clients.insert(client_id, client);
169		Ok(())
170	}
171
172	/// Remove a client from the room
173	///
174	/// # Examples
175	///
176	/// ```
177	/// use reinhardt_websockets::room::Room;
178	/// use reinhardt_websockets::WebSocketConnection;
179	/// use tokio::sync::mpsc;
180	/// use std::sync::Arc;
181	///
182	/// # tokio_test::block_on(async {
183	/// let room = Room::new("chat".to_string());
184	/// let (tx, _rx) = mpsc::unbounded_channel();
185	/// let client = Arc::new(WebSocketConnection::new("bob".to_string(), tx));
186	///
187	/// room.join("bob".to_string(), client).await.unwrap();
188	/// assert!(room.has_client("bob").await);
189	///
190	/// room.leave("bob").await.unwrap();
191	/// assert!(!room.has_client("bob").await);
192	/// # });
193	/// ```
194	pub async fn leave(&self, client_id: &str) -> RoomResult<()> {
195		let mut clients = self.clients.write().await;
196
197		clients
198			.remove(client_id)
199			.ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?;
200
201		Ok(())
202	}
203
204	/// Broadcast a message to all clients in the room.
205	///
206	/// Returns a [`BroadcastResult`] that reports which clients received the
207	/// message and which failed. Dead connections that fail during broadcast
208	/// are automatically removed from the room.
209	///
210	/// # Examples
211	///
212	/// ```
213	/// use reinhardt_websockets::room::Room;
214	/// use reinhardt_websockets::{WebSocketConnection, Message};
215	/// use tokio::sync::mpsc;
216	/// use std::sync::Arc;
217	///
218	/// # tokio_test::block_on(async {
219	/// let room = Room::new("chat".to_string());
220	///
221	/// let (tx1, _rx1) = mpsc::unbounded_channel();
222	/// let (tx2, _rx2) = mpsc::unbounded_channel();
223	/// let client1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
224	/// let client2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
225	///
226	/// room.join("user1".to_string(), client1).await.unwrap();
227	/// room.join("user2".to_string(), client2).await.unwrap();
228	///
229	/// let msg = Message::text("Hello everyone!".to_string());
230	/// let result = room.broadcast(msg).await;
231	///
232	/// assert!(result.is_complete_success());
233	/// assert_eq!(result.successful.len(), 2);
234	/// # });
235	/// ```
236	pub async fn broadcast(&self, message: Message) -> BroadcastResult {
237		let clients = self.clients.read().await;
238
239		let mut successful = Vec::new();
240		let mut failed = Vec::new();
241
242		for (client_id, client) in clients.iter() {
243			match client.send(message.clone()).await {
244				Ok(()) => successful.push(client_id.clone()),
245				Err(e) => failed.push((client_id.clone(), e)),
246			}
247		}
248
249		// Drop read lock before acquiring write lock
250		drop(clients);
251
252		// Automatically remove dead connections from the room
253		if !failed.is_empty() {
254			let mut clients_write = self.clients.write().await;
255			for (client_id, _) in &failed {
256				clients_write.remove(client_id);
257			}
258		}
259
260		BroadcastResult { successful, failed }
261	}
262
263	/// Broadcasts a message to all clients with a per-client send timeout.
264	///
265	/// Slow consumers that do not accept the message within the given timeout
266	/// are treated as failed and automatically removed from the room, applying
267	/// backpressure to prevent slow receivers from blocking the entire broadcast.
268	///
269	/// # Arguments
270	///
271	/// * `message` - The message to broadcast
272	/// * `send_timeout` - Maximum time to wait for each client to accept the message
273	///
274	/// # Examples
275	///
276	/// ```
277	/// use reinhardt_websockets::room::Room;
278	/// use reinhardt_websockets::{WebSocketConnection, Message};
279	/// use tokio::sync::mpsc;
280	/// use std::sync::Arc;
281	/// use std::time::Duration;
282	///
283	/// # tokio_test::block_on(async {
284	/// let room = Room::new("chat".to_string());
285	///
286	/// let (tx, _rx) = mpsc::unbounded_channel();
287	/// let client = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
288	///
289	/// room.join("user1".to_string(), client).await.unwrap();
290	///
291	/// let msg = Message::text("Hello!".to_string());
292	/// let result = room.broadcast_with_timeout(msg, Duration::from_secs(5)).await;
293	///
294	/// assert!(result.is_complete_success());
295	/// # });
296	/// ```
297	pub async fn broadcast_with_timeout(
298		&self,
299		message: Message,
300		send_timeout: Duration,
301	) -> BroadcastResult {
302		let clients = self.clients.read().await;
303
304		let mut successful = Vec::new();
305		let mut failed = Vec::new();
306
307		for (client_id, client) in clients.iter() {
308			let send_future = client.send(message.clone());
309			match tokio::time::timeout(send_timeout, send_future).await {
310				Ok(Ok(())) => successful.push(client_id.clone()),
311				Ok(Err(e)) => failed.push((client_id.clone(), e)),
312				Err(_elapsed) => {
313					failed.push((
314						client_id.clone(),
315						WebSocketError::SlowConsumer(send_timeout),
316					));
317				}
318			}
319		}
320
321		// Drop read lock before acquiring write lock
322		drop(clients);
323
324		// Automatically remove failed connections from the room
325		if !failed.is_empty() {
326			let mut clients_write = self.clients.write().await;
327			for (client_id, _) in &failed {
328				clients_write.remove(client_id);
329			}
330		}
331
332		BroadcastResult { successful, failed }
333	}
334
335	/// Send a message to a specific client
336	///
337	/// # Examples
338	///
339	/// ```
340	/// use reinhardt_websockets::room::Room;
341	/// use reinhardt_websockets::{WebSocketConnection, Message};
342	/// use tokio::sync::mpsc;
343	/// use std::sync::Arc;
344	///
345	/// # tokio_test::block_on(async {
346	/// let room = Room::new("private".to_string());
347	///
348	/// let (tx, mut rx) = mpsc::unbounded_channel();
349	/// let client = Arc::new(WebSocketConnection::new("charlie".to_string(), tx));
350	///
351	/// room.join("charlie".to_string(), client).await.unwrap();
352	///
353	/// let msg = Message::text("Private message".to_string());
354	/// room.send_to("charlie", msg).await.unwrap();
355	///
356	/// let received = rx.recv().await.unwrap();
357	/// match received {
358	///     Message::Text { data } => assert_eq!(data, "Private message"),
359	///     _ => panic!("Expected text message"),
360	/// }
361	/// # });
362	/// ```
363	pub async fn send_to(&self, client_id: &str, message: Message) -> RoomResult<()> {
364		let client = {
365			let clients = self.clients.read().await;
366			clients
367				.get(client_id)
368				.ok_or_else(|| RoomError::ClientNotFound(client_id.to_string()))?
369				.clone()
370		};
371
372		client.send(message).await?;
373
374		Ok(())
375	}
376
377	/// Get the number of clients in the room
378	///
379	/// # Examples
380	///
381	/// ```
382	/// use reinhardt_websockets::room::Room;
383	/// use reinhardt_websockets::WebSocketConnection;
384	/// use tokio::sync::mpsc;
385	/// use std::sync::Arc;
386	///
387	/// # tokio_test::block_on(async {
388	/// let room = Room::new("game".to_string());
389	/// assert_eq!(room.client_count().await, 0);
390	///
391	/// let (tx1, _rx1) = mpsc::unbounded_channel();
392	/// let (tx2, _rx2) = mpsc::unbounded_channel();
393	///
394	/// let client1 = Arc::new(WebSocketConnection::new("player1".to_string(), tx1));
395	/// let client2 = Arc::new(WebSocketConnection::new("player2".to_string(), tx2));
396	///
397	/// room.join("player1".to_string(), client1).await.unwrap();
398	/// room.join("player2".to_string(), client2).await.unwrap();
399	///
400	/// assert_eq!(room.client_count().await, 2);
401	/// # });
402	/// ```
403	pub async fn client_count(&self) -> usize {
404		let clients = self.clients.read().await;
405		clients.len()
406	}
407
408	/// Get all client IDs in the room
409	///
410	/// # Examples
411	///
412	/// ```
413	/// use reinhardt_websockets::room::Room;
414	/// use reinhardt_websockets::WebSocketConnection;
415	/// use tokio::sync::mpsc;
416	/// use std::sync::Arc;
417	///
418	/// # tokio_test::block_on(async {
419	/// let room = Room::new("meeting".to_string());
420	///
421	/// let (tx1, _rx1) = mpsc::unbounded_channel();
422	/// let (tx2, _rx2) = mpsc::unbounded_channel();
423	///
424	/// let client1 = Arc::new(WebSocketConnection::new("dave".to_string(), tx1));
425	/// let client2 = Arc::new(WebSocketConnection::new("eve".to_string(), tx2));
426	///
427	/// room.join("dave".to_string(), client1).await.unwrap();
428	/// room.join("eve".to_string(), client2).await.unwrap();
429	///
430	/// let ids = room.client_ids().await;
431	/// assert_eq!(ids.len(), 2);
432	/// assert!(ids.contains(&"dave".to_string()));
433	/// assert!(ids.contains(&"eve".to_string()));
434	/// # });
435	/// ```
436	pub async fn client_ids(&self) -> Vec<String> {
437		let clients = self.clients.read().await;
438		clients.keys().cloned().collect()
439	}
440
441	/// Check if a client is in the room
442	///
443	/// # Examples
444	///
445	/// ```
446	/// use reinhardt_websockets::room::Room;
447	/// use reinhardt_websockets::WebSocketConnection;
448	/// use tokio::sync::mpsc;
449	/// use std::sync::Arc;
450	///
451	/// # tokio_test::block_on(async {
452	/// let room = Room::new("support".to_string());
453	///
454	/// let (tx, _rx) = mpsc::unbounded_channel();
455	/// let client = Arc::new(WebSocketConnection::new("frank".to_string(), tx));
456	///
457	/// room.join("frank".to_string(), client).await.unwrap();
458	///
459	/// assert!(room.has_client("frank").await);
460	/// assert!(!room.has_client("grace").await);
461	/// # });
462	/// ```
463	pub async fn has_client(&self, client_id: &str) -> bool {
464		let clients = self.clients.read().await;
465		clients.contains_key(client_id)
466	}
467
468	/// Set metadata for the room
469	///
470	/// # Examples
471	///
472	/// ```
473	/// use reinhardt_websockets::room::Room;
474	/// use serde_json::json;
475	///
476	/// # tokio_test::block_on(async {
477	/// let room = Room::new("config".to_string());
478	///
479	/// room.set_metadata("max_users", json!(10)).await.unwrap();
480	/// room.set_metadata("topic", json!("General Discussion")).await.unwrap();
481	///
482	/// let max_users: i64 = room.get_metadata("max_users").await.unwrap().unwrap();
483	/// assert_eq!(max_users, 10);
484	/// # });
485	/// ```
486	pub async fn set_metadata<T: serde::Serialize>(&self, key: &str, value: T) -> RoomResult<()> {
487		let json_value =
488			serde_json::to_value(value).map_err(|e| RoomError::Metadata(e.to_string()))?;
489
490		let mut metadata = self.metadata.write().await;
491		metadata.insert(key.to_string(), json_value);
492
493		Ok(())
494	}
495
496	/// Get metadata from the room
497	///
498	/// # Examples
499	///
500	/// ```
501	/// use reinhardt_websockets::room::Room;
502	/// use serde_json::json;
503	///
504	/// # tokio_test::block_on(async {
505	/// let room = Room::new("data".to_string());
506	///
507	/// room.set_metadata("counter", json!(42)).await.unwrap();
508	///
509	/// let counter: i64 = room.get_metadata("counter").await.unwrap().unwrap();
510	/// assert_eq!(counter, 42);
511	///
512	/// let missing: Option<String> = room.get_metadata("nonexistent").await.unwrap();
513	/// assert!(missing.is_none());
514	/// # });
515	/// ```
516	pub async fn get_metadata<T>(&self, key: &str) -> RoomResult<Option<T>>
517	where
518		T: serde::de::DeserializeOwned,
519	{
520		let metadata = self.metadata.read().await;
521
522		metadata
523			.get(key)
524			.map(|v| serde_json::from_value(v.clone()))
525			.transpose()
526			.map_err(|e| RoomError::Metadata(e.to_string()))
527	}
528
529	/// Remove metadata from the room
530	///
531	/// # Examples
532	///
533	/// ```
534	/// use reinhardt_websockets::room::Room;
535	/// use serde_json::json;
536	///
537	/// # tokio_test::block_on(async {
538	/// let room = Room::new("temp".to_string());
539	///
540	/// room.set_metadata("temp_key", json!("temp_value")).await.unwrap();
541	/// assert!(room.get_metadata::<String>("temp_key").await.unwrap().is_some());
542	///
543	/// room.remove_metadata("temp_key").await;
544	/// assert!(room.get_metadata::<String>("temp_key").await.unwrap().is_none());
545	/// # });
546	/// ```
547	pub async fn remove_metadata(&self, key: &str) -> Option<Value> {
548		let mut metadata = self.metadata.write().await;
549		metadata.remove(key)
550	}
551
552	/// Clear all metadata
553	///
554	/// # Examples
555	///
556	/// ```
557	/// use reinhardt_websockets::room::Room;
558	/// use serde_json::json;
559	///
560	/// # tokio_test::block_on(async {
561	/// let room = Room::new("reset".to_string());
562	///
563	/// room.set_metadata("key1", json!("value1")).await.unwrap();
564	/// room.set_metadata("key2", json!("value2")).await.unwrap();
565	///
566	/// room.clear_metadata().await;
567	///
568	/// assert!(room.get_metadata::<String>("key1").await.unwrap().is_none());
569	/// assert!(room.get_metadata::<String>("key2").await.unwrap().is_none());
570	/// # });
571	/// ```
572	pub async fn clear_metadata(&self) {
573		let mut metadata = self.metadata.write().await;
574		metadata.clear();
575	}
576
577	/// Check if room is empty
578	///
579	/// # Examples
580	///
581	/// ```
582	/// use reinhardt_websockets::room::Room;
583	/// use reinhardt_websockets::WebSocketConnection;
584	/// use tokio::sync::mpsc;
585	/// use std::sync::Arc;
586	///
587	/// # tokio_test::block_on(async {
588	/// let room = Room::new("empty_check".to_string());
589	/// assert!(room.is_empty().await);
590	///
591	/// let (tx, _rx) = mpsc::unbounded_channel();
592	/// let client = Arc::new(WebSocketConnection::new("henry".to_string(), tx));
593	///
594	/// room.join("henry".to_string(), client).await.unwrap();
595	/// assert!(!room.is_empty().await);
596	/// # });
597	/// ```
598	pub async fn is_empty(&self) -> bool {
599		let clients = self.clients.read().await;
600		clients.is_empty()
601	}
602}
603
604/// Manages multiple WebSocket rooms
605///
606/// # Examples
607///
608/// ```
609/// use reinhardt_websockets::room::RoomManager;
610///
611/// # tokio_test::block_on(async {
612/// let manager = RoomManager::new();
613///
614/// let room = manager.create_room("lobby".to_string()).await;
615/// assert_eq!(room.id(), "lobby");
616/// assert_eq!(manager.room_count().await, 1);
617/// # });
618/// ```
619pub struct RoomManager {
620	rooms: Arc<RwLock<HashMap<String, Arc<Room>>>>,
621}
622
623impl RoomManager {
624	/// Create a new RoomManager
625	///
626	/// # Examples
627	///
628	/// ```
629	/// use reinhardt_websockets::room::RoomManager;
630	///
631	/// let manager = RoomManager::new();
632	/// # tokio_test::block_on(async {
633	/// assert_eq!(manager.room_count().await, 0);
634	/// # });
635	/// ```
636	pub fn new() -> Self {
637		Self {
638			rooms: Arc::new(RwLock::new(HashMap::new())),
639		}
640	}
641
642	/// Create a new room
643	///
644	/// # Examples
645	///
646	/// ```
647	/// use reinhardt_websockets::room::RoomManager;
648	///
649	/// # tokio_test::block_on(async {
650	/// let manager = RoomManager::new();
651	/// let room = manager.create_room("game_room".to_string()).await;
652	/// assert_eq!(room.id(), "game_room");
653	/// # });
654	/// ```
655	pub async fn create_room(&self, id: String) -> Arc<Room> {
656		let mut rooms = self.rooms.write().await;
657
658		let room = Arc::new(Room::new(id.clone()));
659		rooms.insert(id, room.clone());
660
661		room
662	}
663
664	/// Get an existing room
665	///
666	/// # Examples
667	///
668	/// ```
669	/// use reinhardt_websockets::room::RoomManager;
670	///
671	/// # tokio_test::block_on(async {
672	/// let manager = RoomManager::new();
673	/// manager.create_room("test".to_string()).await;
674	///
675	/// let room = manager.get_room("test").await;
676	/// assert!(room.is_some());
677	/// assert_eq!(room.unwrap().id(), "test");
678	/// # });
679	/// ```
680	pub async fn get_room(&self, id: &str) -> Option<Arc<Room>> {
681		let rooms = self.rooms.read().await;
682		rooms.get(id).cloned()
683	}
684
685	/// Get or create a room
686	///
687	/// This method uses a single write lock to avoid TOCTOU race conditions
688	/// that could occur with separate get and create operations.
689	///
690	/// # Examples
691	///
692	/// ```
693	/// use reinhardt_websockets::room::RoomManager;
694	///
695	/// # tokio_test::block_on(async {
696	/// let manager = RoomManager::new();
697	///
698	/// let room1 = manager.get_or_create_room("auto".to_string()).await;
699	/// let room2 = manager.get_or_create_room("auto".to_string()).await;
700	///
701	/// assert_eq!(room1.id(), room2.id());
702	/// # });
703	/// ```
704	pub async fn get_or_create_room(&self, id: String) -> Arc<Room> {
705		let mut rooms = self.rooms.write().await;
706
707		if let Some(room) = rooms.get(&id) {
708			return room.clone();
709		}
710
711		let room = Arc::new(Room::new(id.clone()));
712		rooms.insert(id, room.clone());
713		room
714	}
715
716	/// Delete a room
717	///
718	/// # Examples
719	///
720	/// ```
721	/// use reinhardt_websockets::room::RoomManager;
722	///
723	/// # tokio_test::block_on(async {
724	/// let manager = RoomManager::new();
725	/// manager.create_room("temporary".to_string()).await;
726	///
727	/// assert!(manager.get_room("temporary").await.is_some());
728	///
729	/// manager.delete_room("temporary").await.unwrap();
730	/// assert!(manager.get_room("temporary").await.is_none());
731	/// # });
732	/// ```
733	pub async fn delete_room(&self, id: &str) -> RoomResult<()> {
734		let mut rooms = self.rooms.write().await;
735
736		rooms
737			.remove(id)
738			.ok_or_else(|| RoomError::RoomNotFound(id.to_string()))?;
739
740		Ok(())
741	}
742
743	/// Get the number of rooms
744	///
745	/// # Examples
746	///
747	/// ```
748	/// use reinhardt_websockets::room::RoomManager;
749	///
750	/// # tokio_test::block_on(async {
751	/// let manager = RoomManager::new();
752	/// assert_eq!(manager.room_count().await, 0);
753	///
754	/// manager.create_room("room1".to_string()).await;
755	/// manager.create_room("room2".to_string()).await;
756	///
757	/// assert_eq!(manager.room_count().await, 2);
758	/// # });
759	/// ```
760	pub async fn room_count(&self) -> usize {
761		let rooms = self.rooms.read().await;
762		rooms.len()
763	}
764
765	/// Get all room IDs
766	///
767	/// # Examples
768	///
769	/// ```
770	/// use reinhardt_websockets::room::RoomManager;
771	///
772	/// # tokio_test::block_on(async {
773	/// let manager = RoomManager::new();
774	///
775	/// manager.create_room("alpha".to_string()).await;
776	/// manager.create_room("beta".to_string()).await;
777	///
778	/// let ids = manager.room_ids().await;
779	/// assert_eq!(ids.len(), 2);
780	/// assert!(ids.contains(&"alpha".to_string()));
781	/// assert!(ids.contains(&"beta".to_string()));
782	/// # });
783	/// ```
784	pub async fn room_ids(&self) -> Vec<String> {
785		let rooms = self.rooms.read().await;
786		rooms.keys().cloned().collect()
787	}
788
789	/// Check if a room exists
790	///
791	/// # Examples
792	///
793	/// ```
794	/// use reinhardt_websockets::room::RoomManager;
795	///
796	/// # tokio_test::block_on(async {
797	/// let manager = RoomManager::new();
798	/// manager.create_room("exists".to_string()).await;
799	///
800	/// assert!(manager.has_room("exists").await);
801	/// assert!(!manager.has_room("missing").await);
802	/// # });
803	/// ```
804	pub async fn has_room(&self, id: &str) -> bool {
805		let rooms = self.rooms.read().await;
806		rooms.contains_key(id)
807	}
808
809	/// Delete all empty rooms
810	///
811	/// # Examples
812	///
813	/// ```
814	/// use reinhardt_websockets::room::RoomManager;
815	/// use reinhardt_websockets::WebSocketConnection;
816	/// use tokio::sync::mpsc;
817	/// use std::sync::Arc;
818	///
819	/// # tokio_test::block_on(async {
820	/// let manager = RoomManager::new();
821	///
822	/// let empty_room = manager.create_room("empty".to_string()).await;
823	/// let occupied_room = manager.create_room("occupied".to_string()).await;
824	///
825	/// let (tx, _rx) = mpsc::unbounded_channel();
826	/// let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
827	/// occupied_room.join("user".to_string(), client).await.unwrap();
828	///
829	/// manager.cleanup_empty_rooms().await;
830	///
831	/// assert!(!manager.has_room("empty").await);
832	/// assert!(manager.has_room("occupied").await);
833	/// # });
834	/// ```
835	pub async fn cleanup_empty_rooms(&self) {
836		let mut rooms = self.rooms.write().await;
837		let empty_room_ids: Vec<String> = {
838			let mut empty_ids = Vec::new();
839			for (id, room) in rooms.iter() {
840				if room.is_empty().await {
841					empty_ids.push(id.clone());
842				}
843			}
844			empty_ids
845		};
846
847		for id in empty_room_ids {
848			rooms.remove(&id);
849		}
850	}
851
852	/// Add a client to a specific room
853	///
854	/// # Examples
855	///
856	/// ```
857	/// use reinhardt_websockets::room::RoomManager;
858	/// use reinhardt_websockets::WebSocketConnection;
859	/// use tokio::sync::mpsc;
860	/// use std::sync::Arc;
861	///
862	/// # tokio_test::block_on(async {
863	/// let manager = RoomManager::new();
864	/// manager.create_room("game".to_string()).await;
865	///
866	/// let (tx, _rx) = mpsc::unbounded_channel();
867	/// let conn = Arc::new(WebSocketConnection::new("player1".to_string(), tx));
868	///
869	/// manager.join_room("game".to_string(), conn).await.unwrap();
870	/// # });
871	/// ```
872	pub async fn join_room(
873		&self,
874		room_id: String,
875		connection: Arc<WebSocketConnection>,
876	) -> RoomResult<()> {
877		let room = self
878			.get_room(&room_id)
879			.await
880			.ok_or_else(|| RoomError::RoomNotFound(room_id.clone()))?;
881
882		let client_id = connection.id().to_string();
883		room.join(client_id, connection).await
884	}
885
886	/// Remove a client from a specific room
887	///
888	/// # Examples
889	///
890	/// ```
891	/// use reinhardt_websockets::room::RoomManager;
892	/// use reinhardt_websockets::WebSocketConnection;
893	/// use tokio::sync::mpsc;
894	/// use std::sync::Arc;
895	///
896	/// # tokio_test::block_on(async {
897	/// let manager = RoomManager::new();
898	/// manager.create_room("chat".to_string()).await;
899	///
900	/// let (tx, _rx) = mpsc::unbounded_channel();
901	/// let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
902	///
903	/// manager.join_room("chat".to_string(), conn).await.unwrap();
904	/// manager.leave_room("chat", "user1").await.unwrap();
905	/// # });
906	/// ```
907	pub async fn leave_room(&self, room_id: &str, user_id: &str) -> RoomResult<()> {
908		let room = self
909			.get_room(room_id)
910			.await
911			.ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
912
913		room.leave(user_id).await
914	}
915
916	/// Get the number of clients in a specific room
917	///
918	/// # Examples
919	///
920	/// ```
921	/// use reinhardt_websockets::room::RoomManager;
922	/// use reinhardt_websockets::WebSocketConnection;
923	/// use tokio::sync::mpsc;
924	/// use std::sync::Arc;
925	///
926	/// # tokio_test::block_on(async {
927	/// let manager = RoomManager::new();
928	/// manager.create_room("lobby".to_string()).await;
929	///
930	/// let (tx, _rx) = mpsc::unbounded_channel();
931	/// let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
932	///
933	/// manager.join_room("lobby".to_string(), conn).await.unwrap();
934	/// assert_eq!(manager.get_room_size("lobby").await, 1);
935	/// # });
936	/// ```
937	pub async fn get_room_size(&self, room_id: &str) -> usize {
938		if let Some(room) = self.get_room(room_id).await {
939			room.client_count().await
940		} else {
941			0
942		}
943	}
944
945	/// Broadcast a message to all clients in a specific room.
946	///
947	/// Returns a [`BroadcastResult`] describing which clients received the
948	/// message and which failed. Returns [`RoomError::RoomNotFound`] if the
949	/// room does not exist.
950	///
951	/// # Examples
952	///
953	/// ```
954	/// use reinhardt_websockets::room::RoomManager;
955	/// use reinhardt_websockets::{WebSocketConnection, Message};
956	/// use tokio::sync::mpsc;
957	/// use std::sync::Arc;
958	///
959	/// # tokio_test::block_on(async {
960	/// let manager = RoomManager::new();
961	/// manager.create_room("announcement".to_string()).await;
962	///
963	/// let (tx, mut rx) = mpsc::unbounded_channel();
964	/// let conn = Arc::new(WebSocketConnection::new("listener".to_string(), tx));
965	///
966	/// manager.join_room("announcement".to_string(), conn).await.unwrap();
967	///
968	/// let msg = Message::text("Hello everyone!".to_string());
969	/// let result = manager.broadcast_to_room("announcement", msg).await.unwrap();
970	/// assert!(result.is_complete_success());
971	/// # });
972	/// ```
973	pub async fn broadcast_to_room(
974		&self,
975		room_id: &str,
976		message: Message,
977	) -> RoomResult<BroadcastResult> {
978		let room = self
979			.get_room(room_id)
980			.await
981			.ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
982
983		Ok(room.broadcast(message).await)
984	}
985
986	/// Broadcasts a message to all clients in a room with a per-client timeout.
987	///
988	/// Slow consumers that do not accept the message within the given timeout
989	/// are treated as failed and removed from the room.
990	///
991	/// Returns [`RoomError::RoomNotFound`] if the room does not exist.
992	///
993	/// # Examples
994	///
995	/// ```
996	/// use reinhardt_websockets::room::RoomManager;
997	/// use reinhardt_websockets::{WebSocketConnection, Message};
998	/// use tokio::sync::mpsc;
999	/// use std::sync::Arc;
1000	/// use std::time::Duration;
1001	///
1002	/// # tokio_test::block_on(async {
1003	/// let manager = RoomManager::new();
1004	/// manager.create_room("timeout_test".to_string()).await;
1005	///
1006	/// let (tx, _rx) = mpsc::unbounded_channel();
1007	/// let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
1008	///
1009	/// manager.join_room("timeout_test".to_string(), conn).await.unwrap();
1010	///
1011	/// let msg = Message::text("Hello!".to_string());
1012	/// let result = manager
1013	///     .broadcast_to_room_with_timeout("timeout_test", msg, Duration::from_secs(5))
1014	///     .await
1015	///     .unwrap();
1016	/// assert!(result.is_complete_success());
1017	/// # });
1018	/// ```
1019	pub async fn broadcast_to_room_with_timeout(
1020		&self,
1021		room_id: &str,
1022		message: Message,
1023		send_timeout: Duration,
1024	) -> RoomResult<BroadcastResult> {
1025		let room = self
1026			.get_room(room_id)
1027			.await
1028			.ok_or_else(|| RoomError::RoomNotFound(room_id.to_string()))?;
1029
1030		Ok(room.broadcast_with_timeout(message, send_timeout).await)
1031	}
1032
1033	/// Broadcast a message to all clients in all rooms.
1034	///
1035	/// Returns a [`BroadcastResult`] aggregated across all rooms, reporting
1036	/// which clients received the message and which failed.
1037	///
1038	/// # Examples
1039	///
1040	/// ```
1041	/// use reinhardt_websockets::room::RoomManager;
1042	/// use reinhardt_websockets::{WebSocketConnection, Message};
1043	/// use tokio::sync::mpsc;
1044	/// use std::sync::Arc;
1045	///
1046	/// # tokio_test::block_on(async {
1047	/// let manager = RoomManager::new();
1048	/// manager.create_room("room1".to_string()).await;
1049	/// manager.create_room("room2".to_string()).await;
1050	///
1051	/// let (tx1, _rx1) = mpsc::unbounded_channel();
1052	/// let (tx2, _rx2) = mpsc::unbounded_channel();
1053	///
1054	/// let conn1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
1055	/// let conn2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
1056	///
1057	/// manager.join_room("room1".to_string(), conn1).await.unwrap();
1058	/// manager.join_room("room2".to_string(), conn2).await.unwrap();
1059	///
1060	/// let msg = Message::text("System message".to_string());
1061	/// let result = manager.broadcast_to_all(msg).await;
1062	/// assert!(result.is_complete_success());
1063	/// # });
1064	/// ```
1065	pub async fn broadcast_to_all(&self, message: Message) -> BroadcastResult {
1066		let rooms = self.rooms.read().await;
1067
1068		let mut successful = Vec::new();
1069		let mut failed = Vec::new();
1070
1071		for room in rooms.values() {
1072			let result = room.broadcast(message.clone()).await;
1073			successful.extend(result.successful);
1074			failed.extend(result.failed);
1075		}
1076
1077		BroadcastResult { successful, failed }
1078	}
1079
1080	/// Get all room IDs (alias for room_ids for compatibility)
1081	///
1082	/// # Examples
1083	///
1084	/// ```
1085	/// use reinhardt_websockets::room::RoomManager;
1086	///
1087	/// # tokio_test::block_on(async {
1088	/// let manager = RoomManager::new();
1089	/// manager.create_room("alpha".to_string()).await;
1090	/// manager.create_room("beta".to_string()).await;
1091	///
1092	/// let rooms = manager.get_all_rooms().await;
1093	/// assert_eq!(rooms.len(), 2);
1094	/// assert!(rooms.contains(&"alpha".to_string()));
1095	/// assert!(rooms.contains(&"beta".to_string()));
1096	/// # });
1097	/// ```
1098	pub async fn get_all_rooms(&self) -> Vec<String> {
1099		self.room_ids().await
1100	}
1101}
1102
1103impl Default for RoomManager {
1104	fn default() -> Self {
1105		Self::new()
1106	}
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111	use super::*;
1112	use rstest::rstest;
1113	use tokio::sync::mpsc;
1114
1115	#[rstest]
1116	#[tokio::test]
1117	async fn test_room_new() {
1118		let room = Room::new("test_room".to_string());
1119		assert_eq!(room.id(), "test_room");
1120		assert_eq!(room.client_count().await, 0);
1121		assert!(room.is_empty().await);
1122	}
1123
1124	#[tokio::test]
1125	async fn test_room_join_client() {
1126		let room = Room::new("join_test".to_string());
1127		let (tx, _rx) = mpsc::unbounded_channel();
1128		let client = Arc::new(WebSocketConnection::new("client1".to_string(), tx));
1129
1130		room.join("client1".to_string(), client).await.unwrap();
1131		assert_eq!(room.client_count().await, 1);
1132		assert!(room.has_client("client1").await);
1133	}
1134
1135	#[tokio::test]
1136	async fn test_room_join_duplicate_client() {
1137		let room = Room::new("duplicate_test".to_string());
1138		let (tx1, _rx1) = mpsc::unbounded_channel();
1139		let (tx2, _rx2) = mpsc::unbounded_channel();
1140
1141		let client1 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx1));
1142		let client2 = Arc::new(WebSocketConnection::new("duplicate".to_string(), tx2));
1143
1144		room.join("duplicate".to_string(), client1).await.unwrap();
1145		let result = room.join("duplicate".to_string(), client2).await;
1146
1147		assert!(result.is_err());
1148		assert!(matches!(
1149			result.unwrap_err(),
1150			RoomError::ClientAlreadyExists(_)
1151		));
1152	}
1153
1154	#[tokio::test]
1155	async fn test_room_leave_client() {
1156		let room = Room::new("leave_test".to_string());
1157		let (tx, _rx) = mpsc::unbounded_channel();
1158		let client = Arc::new(WebSocketConnection::new("leaver".to_string(), tx));
1159
1160		room.join("leaver".to_string(), client).await.unwrap();
1161		assert!(room.has_client("leaver").await);
1162
1163		room.leave("leaver").await.unwrap();
1164		assert!(!room.has_client("leaver").await);
1165		assert_eq!(room.client_count().await, 0);
1166	}
1167
1168	#[tokio::test]
1169	async fn test_room_leave_nonexistent_client() {
1170		let room = Room::new("leave_error_test".to_string());
1171		let result = room.leave("nonexistent").await;
1172
1173		assert!(result.is_err());
1174		assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1175	}
1176
1177	#[rstest]
1178	#[tokio::test]
1179	async fn test_room_broadcast() {
1180		// Arrange
1181		let room = Room::new("broadcast_test".to_string());
1182
1183		let (tx1, mut rx1) = mpsc::unbounded_channel();
1184		let (tx2, mut rx2) = mpsc::unbounded_channel();
1185		let (tx3, mut rx3) = mpsc::unbounded_channel();
1186
1187		let client1 = Arc::new(WebSocketConnection::new("user1".to_string(), tx1));
1188		let client2 = Arc::new(WebSocketConnection::new("user2".to_string(), tx2));
1189		let client3 = Arc::new(WebSocketConnection::new("user3".to_string(), tx3));
1190
1191		room.join("user1".to_string(), client1).await.unwrap();
1192		room.join("user2".to_string(), client2).await.unwrap();
1193		room.join("user3".to_string(), client3).await.unwrap();
1194
1195		// Act
1196		let msg = Message::text("Broadcast message".to_string());
1197		let result = room.broadcast(msg).await;
1198
1199		// Assert
1200		assert!(result.is_complete_success());
1201		assert_eq!(result.successful.len(), 3);
1202		assert_eq!(result.failure_count(), 0);
1203		assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1204		assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1205		assert!(matches!(rx3.try_recv(), Ok(Message::Text { .. })));
1206	}
1207
1208	#[tokio::test]
1209	async fn test_room_send_to_specific_client() {
1210		let room = Room::new("private_msg_test".to_string());
1211
1212		let (tx1, mut rx1) = mpsc::unbounded_channel();
1213		let (tx2, mut rx2) = mpsc::unbounded_channel();
1214
1215		let client1 = Arc::new(WebSocketConnection::new("target".to_string(), tx1));
1216		let client2 = Arc::new(WebSocketConnection::new("other".to_string(), tx2));
1217
1218		room.join("target".to_string(), client1).await.unwrap();
1219		room.join("other".to_string(), client2).await.unwrap();
1220
1221		let msg = Message::text("Private message".to_string());
1222		room.send_to("target", msg).await.unwrap();
1223
1224		assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1225		assert!(rx2.try_recv().is_err());
1226	}
1227
1228	#[tokio::test]
1229	async fn test_room_send_to_nonexistent_client() {
1230		let room = Room::new("send_error_test".to_string());
1231		let msg = Message::text("Test".to_string());
1232		let result = room.send_to("nonexistent", msg).await;
1233
1234		assert!(result.is_err());
1235		assert!(matches!(result.unwrap_err(), RoomError::ClientNotFound(_)));
1236	}
1237
1238	#[tokio::test]
1239	async fn test_room_client_ids() {
1240		let room = Room::new("ids_test".to_string());
1241
1242		let (tx1, _rx1) = mpsc::unbounded_channel();
1243		let (tx2, _rx2) = mpsc::unbounded_channel();
1244
1245		let client1 = Arc::new(WebSocketConnection::new("alpha".to_string(), tx1));
1246		let client2 = Arc::new(WebSocketConnection::new("beta".to_string(), tx2));
1247
1248		room.join("alpha".to_string(), client1).await.unwrap();
1249		room.join("beta".to_string(), client2).await.unwrap();
1250
1251		let ids = room.client_ids().await;
1252		assert_eq!(ids.len(), 2);
1253		assert!(ids.contains(&"alpha".to_string()));
1254		assert!(ids.contains(&"beta".to_string()));
1255	}
1256
1257	#[tokio::test]
1258	async fn test_room_metadata_set_and_get() {
1259		use serde_json::json;
1260
1261		let room = Room::new("metadata_test".to_string());
1262
1263		room.set_metadata("max_users", json!(100)).await.unwrap();
1264		room.set_metadata("topic", json!("General Chat"))
1265			.await
1266			.unwrap();
1267
1268		let max_users: i64 = room.get_metadata("max_users").await.unwrap().unwrap();
1269		assert_eq!(max_users, 100);
1270
1271		let topic: String = room.get_metadata("topic").await.unwrap().unwrap();
1272		assert_eq!(topic, "General Chat");
1273	}
1274
1275	#[tokio::test]
1276	async fn test_room_metadata_get_nonexistent() {
1277		let room = Room::new("metadata_missing_test".to_string());
1278		let result: Option<String> = room.get_metadata("nonexistent").await.unwrap();
1279		assert!(result.is_none());
1280	}
1281
1282	#[tokio::test]
1283	async fn test_room_metadata_remove() {
1284		use serde_json::json;
1285
1286		let room = Room::new("metadata_remove_test".to_string());
1287
1288		room.set_metadata("temp", json!("value")).await.unwrap();
1289		assert!(room.get_metadata::<String>("temp").await.unwrap().is_some());
1290
1291		room.remove_metadata("temp").await;
1292		assert!(room.get_metadata::<String>("temp").await.unwrap().is_none());
1293	}
1294
1295	#[tokio::test]
1296	async fn test_room_metadata_clear() {
1297		use serde_json::json;
1298
1299		let room = Room::new("metadata_clear_test".to_string());
1300
1301		room.set_metadata("key1", json!("value1")).await.unwrap();
1302		room.set_metadata("key2", json!("value2")).await.unwrap();
1303
1304		room.clear_metadata().await;
1305
1306		assert!(room.get_metadata::<String>("key1").await.unwrap().is_none());
1307		assert!(room.get_metadata::<String>("key2").await.unwrap().is_none());
1308	}
1309
1310	#[tokio::test]
1311	async fn test_room_is_empty() {
1312		let room = Room::new("empty_test".to_string());
1313		assert!(room.is_empty().await);
1314
1315		let (tx, _rx) = mpsc::unbounded_channel();
1316		let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1317
1318		room.join("user".to_string(), client).await.unwrap();
1319		assert!(!room.is_empty().await);
1320
1321		room.leave("user").await.unwrap();
1322		assert!(room.is_empty().await);
1323	}
1324
1325	#[tokio::test]
1326	async fn test_room_manager_new() {
1327		let manager = RoomManager::new();
1328		assert_eq!(manager.room_count().await, 0);
1329	}
1330
1331	#[tokio::test]
1332	async fn test_room_manager_create_room() {
1333		let manager = RoomManager::new();
1334		let room = manager.create_room("new_room".to_string()).await;
1335
1336		assert_eq!(room.id(), "new_room");
1337		assert_eq!(manager.room_count().await, 1);
1338	}
1339
1340	#[tokio::test]
1341	async fn test_room_manager_get_room() {
1342		let manager = RoomManager::new();
1343		manager.create_room("existing".to_string()).await;
1344
1345		let room = manager.get_room("existing").await;
1346		assert!(room.is_some());
1347		assert_eq!(room.unwrap().id(), "existing");
1348
1349		let missing = manager.get_room("missing").await;
1350		assert!(missing.is_none());
1351	}
1352
1353	#[tokio::test]
1354	async fn test_room_manager_get_or_create_room() {
1355		let manager = RoomManager::new();
1356
1357		let room1 = manager.get_or_create_room("auto_room".to_string()).await;
1358		assert_eq!(manager.room_count().await, 1);
1359
1360		let room2 = manager.get_or_create_room("auto_room".to_string()).await;
1361		assert_eq!(manager.room_count().await, 1);
1362
1363		assert_eq!(room1.id(), room2.id());
1364	}
1365
1366	#[tokio::test]
1367	async fn test_room_manager_delete_room() {
1368		let manager = RoomManager::new();
1369		manager.create_room("to_delete".to_string()).await;
1370
1371		assert!(manager.has_room("to_delete").await);
1372
1373		manager.delete_room("to_delete").await.unwrap();
1374		assert!(!manager.has_room("to_delete").await);
1375	}
1376
1377	#[tokio::test]
1378	async fn test_room_manager_delete_nonexistent_room() {
1379		let manager = RoomManager::new();
1380		let result = manager.delete_room("nonexistent").await;
1381
1382		assert!(result.is_err());
1383		assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1384	}
1385
1386	#[tokio::test]
1387	async fn test_room_manager_room_ids() {
1388		let manager = RoomManager::new();
1389
1390		manager.create_room("room1".to_string()).await;
1391		manager.create_room("room2".to_string()).await;
1392		manager.create_room("room3".to_string()).await;
1393
1394		let ids = manager.room_ids().await;
1395		assert_eq!(ids.len(), 3);
1396		assert!(ids.contains(&"room1".to_string()));
1397		assert!(ids.contains(&"room2".to_string()));
1398		assert!(ids.contains(&"room3".to_string()));
1399	}
1400
1401	#[tokio::test]
1402	async fn test_room_manager_has_room() {
1403		let manager = RoomManager::new();
1404		manager.create_room("check".to_string()).await;
1405
1406		assert!(manager.has_room("check").await);
1407		assert!(!manager.has_room("missing").await);
1408	}
1409
1410	#[tokio::test]
1411	async fn test_room_manager_cleanup_empty_rooms() {
1412		let manager = RoomManager::new();
1413
1414		let _empty_room = manager.create_room("empty".to_string()).await;
1415		let occupied_room = manager.create_room("occupied".to_string()).await;
1416
1417		let (tx, _rx) = mpsc::unbounded_channel();
1418		let client = Arc::new(WebSocketConnection::new("user".to_string(), tx));
1419		occupied_room
1420			.join("user".to_string(), client)
1421			.await
1422			.unwrap();
1423
1424		manager.cleanup_empty_rooms().await;
1425
1426		assert!(!manager.has_room("empty").await);
1427		assert!(manager.has_room("occupied").await);
1428		assert_eq!(manager.room_count().await, 1);
1429	}
1430
1431	#[rstest]
1432	#[tokio::test]
1433	async fn test_broadcast_partial_failure_reports_failed_clients() {
1434		// Arrange
1435		let room = Room::new("partial_fail".to_string());
1436
1437		let (tx_alive, mut rx_alive) = mpsc::unbounded_channel();
1438		let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1439
1440		let alive_client = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1441		let dead_client = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1442
1443		room.join("alive".to_string(), alive_client).await.unwrap();
1444		room.join("dead".to_string(), dead_client).await.unwrap();
1445
1446		// Simulate dead connection by dropping the receiver
1447		drop(_rx_dead);
1448
1449		// Act
1450		let msg = Message::text("test message".to_string());
1451		let result = room.broadcast(msg).await;
1452
1453		// Assert
1454		assert!(!result.is_complete_success());
1455		assert!(result.is_partial_success());
1456		assert!(!result.is_complete_failure());
1457		assert_eq!(result.successful.len(), 1);
1458		assert!(result.successful.contains(&"alive".to_string()));
1459		assert_eq!(result.failure_count(), 1);
1460		assert!(result.failed_client_ids().contains(&"dead"));
1461		assert!(matches!(rx_alive.try_recv(), Ok(Message::Text { .. })));
1462	}
1463
1464	#[rstest]
1465	#[tokio::test]
1466	async fn test_broadcast_partial_failure_removes_dead_connections() {
1467		// Arrange
1468		let room = Room::new("cleanup".to_string());
1469
1470		let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1471		let (tx_dead1, _rx_dead1) = mpsc::unbounded_channel::<Message>();
1472		let (tx_dead2, _rx_dead2) = mpsc::unbounded_channel::<Message>();
1473
1474		let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1475		let dead1 = Arc::new(WebSocketConnection::new("dead1".to_string(), tx_dead1));
1476		let dead2 = Arc::new(WebSocketConnection::new("dead2".to_string(), tx_dead2));
1477
1478		room.join("alive".to_string(), alive).await.unwrap();
1479		room.join("dead1".to_string(), dead1).await.unwrap();
1480		room.join("dead2".to_string(), dead2).await.unwrap();
1481		assert_eq!(room.client_count().await, 3);
1482
1483		// Simulate dead connections
1484		drop(_rx_dead1);
1485		drop(_rx_dead2);
1486
1487		// Act
1488		let msg = Message::text("cleanup test".to_string());
1489		let result = room.broadcast(msg).await;
1490
1491		// Assert - dead connections are automatically removed
1492		assert_eq!(result.failure_count(), 2);
1493		assert_eq!(room.client_count().await, 1);
1494		assert!(room.has_client("alive").await);
1495		assert!(!room.has_client("dead1").await);
1496		assert!(!room.has_client("dead2").await);
1497	}
1498
1499	#[rstest]
1500	#[tokio::test]
1501	async fn test_broadcast_complete_failure() {
1502		// Arrange
1503		let room = Room::new("all_dead".to_string());
1504
1505		let (tx1, _rx1) = mpsc::unbounded_channel::<Message>();
1506		let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1507
1508		let client1 = Arc::new(WebSocketConnection::new("c1".to_string(), tx1));
1509		let client2 = Arc::new(WebSocketConnection::new("c2".to_string(), tx2));
1510
1511		room.join("c1".to_string(), client1).await.unwrap();
1512		room.join("c2".to_string(), client2).await.unwrap();
1513
1514		// Simulate all dead connections
1515		drop(_rx1);
1516		drop(_rx2);
1517
1518		// Act
1519		let msg = Message::text("nobody home".to_string());
1520		let result = room.broadcast(msg).await;
1521
1522		// Assert
1523		assert!(result.is_complete_failure());
1524		assert!(!result.is_partial_success());
1525		assert_eq!(result.failure_count(), 2);
1526		assert!(result.successful.is_empty());
1527		assert_eq!(room.client_count().await, 0);
1528	}
1529
1530	#[rstest]
1531	#[tokio::test]
1532	async fn test_broadcast_empty_room() {
1533		// Arrange
1534		let room = Room::new("empty_broadcast".to_string());
1535
1536		// Act
1537		let msg = Message::text("echo".to_string());
1538		let result = room.broadcast(msg).await;
1539
1540		// Assert
1541		assert!(result.is_complete_success());
1542		assert!(result.successful.is_empty());
1543		assert!(result.failed.is_empty());
1544	}
1545
1546	#[rstest]
1547	#[tokio::test]
1548	async fn test_broadcast_to_room_returns_broadcast_result() {
1549		// Arrange
1550		let manager = RoomManager::new();
1551		manager.create_room("results".to_string()).await;
1552
1553		let (tx1, _rx1) = mpsc::unbounded_channel();
1554		let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1555		let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1556		let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1557
1558		manager
1559			.join_room("results".to_string(), conn1)
1560			.await
1561			.unwrap();
1562		manager
1563			.join_room("results".to_string(), conn2)
1564			.await
1565			.unwrap();
1566
1567		// Simulate dead connection
1568		drop(_rx2);
1569
1570		// Act
1571		let msg = Message::text("room broadcast".to_string());
1572		let result = manager.broadcast_to_room("results", msg).await.unwrap();
1573
1574		// Assert
1575		assert!(result.is_partial_success());
1576		assert_eq!(result.successful.len(), 1);
1577		assert_eq!(result.failure_count(), 1);
1578	}
1579
1580	#[rstest]
1581	#[tokio::test]
1582	async fn test_broadcast_to_all_aggregates_results() {
1583		// Arrange
1584		let manager = RoomManager::new();
1585		let room1 = manager.create_room("r1".to_string()).await;
1586		let room2 = manager.create_room("r2".to_string()).await;
1587
1588		let (tx1, _rx1) = mpsc::unbounded_channel();
1589		let (tx2, _rx2) = mpsc::unbounded_channel::<Message>();
1590		let conn1 = Arc::new(WebSocketConnection::new("ok".to_string(), tx1));
1591		let conn2 = Arc::new(WebSocketConnection::new("dead".to_string(), tx2));
1592
1593		room1.join("ok".to_string(), conn1).await.unwrap();
1594		room2.join("dead".to_string(), conn2).await.unwrap();
1595
1596		// Simulate dead connection in room2
1597		drop(_rx2);
1598
1599		// Act
1600		let msg = Message::text("global".to_string());
1601		let result = manager.broadcast_to_all(msg).await;
1602
1603		// Assert - aggregated results from both rooms
1604		assert!(result.is_partial_success());
1605		assert_eq!(result.successful.len(), 1);
1606		assert_eq!(result.failure_count(), 1);
1607		assert!(result.successful.contains(&"ok".to_string()));
1608		assert!(result.failed_client_ids().contains(&"dead"));
1609	}
1610
1611	#[rstest]
1612	#[tokio::test]
1613	async fn test_broadcast_with_timeout_succeeds_for_responsive_clients() {
1614		// Arrange
1615		let room = Room::new("timeout_ok".to_string());
1616
1617		let (tx1, mut rx1) = mpsc::unbounded_channel();
1618		let (tx2, mut rx2) = mpsc::unbounded_channel();
1619
1620		let client1 = Arc::new(WebSocketConnection::new("fast1".to_string(), tx1));
1621		let client2 = Arc::new(WebSocketConnection::new("fast2".to_string(), tx2));
1622
1623		room.join("fast1".to_string(), client1).await.unwrap();
1624		room.join("fast2".to_string(), client2).await.unwrap();
1625
1626		// Act
1627		let msg = Message::text("hello with timeout".to_string());
1628		let result = room
1629			.broadcast_with_timeout(msg, Duration::from_secs(5))
1630			.await;
1631
1632		// Assert
1633		assert!(result.is_complete_success());
1634		assert_eq!(result.successful.len(), 2);
1635		assert!(matches!(rx1.try_recv(), Ok(Message::Text { .. })));
1636		assert!(matches!(rx2.try_recv(), Ok(Message::Text { .. })));
1637	}
1638
1639	#[rstest]
1640	#[tokio::test]
1641	async fn test_broadcast_with_timeout_removes_dead_connections() {
1642		// Arrange
1643		let room = Room::new("timeout_dead".to_string());
1644
1645		let (tx_alive, _rx_alive) = mpsc::unbounded_channel();
1646		let (tx_dead, _rx_dead) = mpsc::unbounded_channel::<Message>();
1647
1648		let alive = Arc::new(WebSocketConnection::new("alive".to_string(), tx_alive));
1649		let dead = Arc::new(WebSocketConnection::new("dead".to_string(), tx_dead));
1650
1651		room.join("alive".to_string(), alive).await.unwrap();
1652		room.join("dead".to_string(), dead).await.unwrap();
1653
1654		// Simulate dead connection
1655		drop(_rx_dead);
1656
1657		// Act
1658		let msg = Message::text("test".to_string());
1659		let result = room
1660			.broadcast_with_timeout(msg, Duration::from_secs(5))
1661			.await;
1662
1663		// Assert
1664		assert!(result.is_partial_success());
1665		assert_eq!(result.successful.len(), 1);
1666		assert_eq!(result.failure_count(), 1);
1667		assert_eq!(room.client_count().await, 1);
1668		assert!(room.has_client("alive").await);
1669		assert!(!room.has_client("dead").await);
1670	}
1671
1672	#[rstest]
1673	#[tokio::test]
1674	async fn test_broadcast_with_timeout_empty_room() {
1675		// Arrange
1676		let room = Room::new("timeout_empty".to_string());
1677
1678		// Act
1679		let msg = Message::text("nobody".to_string());
1680		let result = room
1681			.broadcast_with_timeout(msg, Duration::from_secs(1))
1682			.await;
1683
1684		// Assert
1685		assert!(result.is_complete_success());
1686		assert!(result.successful.is_empty());
1687		assert!(result.failed.is_empty());
1688	}
1689
1690	#[rstest]
1691	#[tokio::test]
1692	async fn test_broadcast_to_room_with_timeout() {
1693		// Arrange
1694		let manager = RoomManager::new();
1695		manager.create_room("timeout_room".to_string()).await;
1696
1697		let (tx, _rx) = mpsc::unbounded_channel();
1698		let conn = Arc::new(WebSocketConnection::new("user1".to_string(), tx));
1699
1700		manager
1701			.join_room("timeout_room".to_string(), conn)
1702			.await
1703			.unwrap();
1704
1705		// Act
1706		let msg = Message::text("hello".to_string());
1707		let result = manager
1708			.broadcast_to_room_with_timeout("timeout_room", msg, Duration::from_secs(5))
1709			.await
1710			.unwrap();
1711
1712		// Assert
1713		assert!(result.is_complete_success());
1714		assert_eq!(result.successful.len(), 1);
1715	}
1716
1717	#[rstest]
1718	#[tokio::test]
1719	async fn test_broadcast_to_room_with_timeout_nonexistent_room() {
1720		// Arrange
1721		let manager = RoomManager::new();
1722
1723		// Act
1724		let msg = Message::text("hello".to_string());
1725		let result = manager
1726			.broadcast_to_room_with_timeout("missing", msg, Duration::from_secs(1))
1727			.await;
1728
1729		// Assert
1730		assert!(result.is_err());
1731		assert!(matches!(result.unwrap_err(), RoomError::RoomNotFound(_)));
1732	}
1733}