1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(tag = "type", rename_all = "snake_case")]
13pub enum RoomEvent {
14 Join {
16 room: String,
17 user_id: String,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 data: Option<serde_json::Value>,
20 },
21 Leave { room: String, user_id: String },
23 Presence {
25 room: String,
26 user_id: String,
27 data: serde_json::Value,
28 },
29 Broadcast {
31 room: String,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 sender: Option<String>,
34 topic: String,
35 data: serde_json::Value,
36 },
37 Snapshot { room: String, peers: Vec<PeerInfo> },
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct PeerInfo {
44 pub user_id: String,
45 pub data: serde_json::Value,
46 pub joined_at: String,
47}
48
49#[derive(Debug, Clone)]
54struct RoomMember {
55 user_id: String,
56 data: serde_json::Value,
57 joined_at: String,
58 last_active: Instant,
59}
60
61#[derive(Debug)]
62#[allow(dead_code)]
63struct Room {
64 name: String,
65 members: HashMap<String, RoomMember>,
66 created_at: String,
67}
68
69impl Room {
70 fn new(name: &str) -> Self {
71 Self {
72 name: name.to_string(),
73 members: HashMap::new(),
74 created_at: now_iso(),
75 }
76 }
77
78 fn peer_infos(&self) -> Vec<PeerInfo> {
79 self.members
80 .values()
81 .map(|m| PeerInfo {
82 user_id: m.user_id.clone(),
83 data: m.data.clone(),
84 joined_at: m.joined_at.clone(),
85 })
86 .collect()
87 }
88}
89
90#[derive(Debug, Clone)]
96pub struct RoomError {
97 pub code: String,
98 pub message: String,
99}
100
101impl std::fmt::Display for RoomError {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(f, "[{}] {}", self.code, self.message)
104 }
105}
106
107pub struct RoomManager {
113 rooms: Mutex<HashMap<String, Room>>,
114 idle_timeout: Duration,
116 max_rooms: usize,
118}
119
120const DEFAULT_MAX_ROOMS: usize = 10_000;
122
123impl RoomManager {
124 pub fn new(idle_timeout_secs: u64) -> Self {
125 Self {
126 rooms: Mutex::new(HashMap::new()),
127 idle_timeout: Duration::from_secs(idle_timeout_secs),
128 max_rooms: DEFAULT_MAX_ROOMS,
129 }
130 }
131
132 pub fn with_max_rooms(idle_timeout_secs: u64, max_rooms: usize) -> Self {
134 Self {
135 rooms: Mutex::new(HashMap::new()),
136 idle_timeout: Duration::from_secs(idle_timeout_secs),
137 max_rooms,
138 }
139 }
140
141 pub fn join(
145 &self,
146 room: &str,
147 user_id: &str,
148 data: Option<serde_json::Value>,
149 ) -> Result<(RoomEvent, RoomEvent), RoomError> {
150 let mut rooms = self.rooms.lock().unwrap();
151
152 let room_exists = rooms.contains_key(room);
154 if !room_exists && rooms.len() >= self.max_rooms {
155 return Err(RoomError {
156 code: "ROOM_LIMIT_REACHED".to_string(),
157 message: format!(
158 "Maximum number of rooms ({}) reached. Cannot create new room.",
159 self.max_rooms
160 ),
161 });
162 }
163
164 let room_state = rooms
165 .entry(room.to_string())
166 .or_insert_with(|| Room::new(room));
167
168 let member = RoomMember {
169 user_id: user_id.to_string(),
170 data: data.clone().unwrap_or(serde_json::Value::Null),
171 joined_at: now_iso(),
172 last_active: Instant::now(),
173 };
174
175 let snapshot = RoomEvent::Snapshot {
177 room: room.to_string(),
178 peers: room_state.peer_infos(),
179 };
180
181 room_state.members.insert(user_id.to_string(), member);
182
183 let join_event = RoomEvent::Join {
184 room: room.to_string(),
185 user_id: user_id.to_string(),
186 data,
187 };
188
189 Ok((snapshot, join_event))
190 }
191
192 pub fn leave(&self, room: &str, user_id: &str) -> Option<RoomEvent> {
195 let mut rooms = self.rooms.lock().unwrap();
196 let room_state = rooms.get_mut(room)?;
197 room_state.members.remove(user_id)?;
198
199 let event = RoomEvent::Leave {
200 room: room.to_string(),
201 user_id: user_id.to_string(),
202 };
203
204 if room_state.members.is_empty() {
206 rooms.remove(room);
207 }
208
209 Some(event)
210 }
211
212 pub fn set_presence(
215 &self,
216 room: &str,
217 user_id: &str,
218 data: serde_json::Value,
219 ) -> Option<RoomEvent> {
220 let mut rooms = self.rooms.lock().unwrap();
221 let room_state = rooms.get_mut(room)?;
222 let member = room_state.members.get_mut(user_id)?;
223
224 member.data = data.clone();
225 member.last_active = Instant::now();
226
227 Some(RoomEvent::Presence {
228 room: room.to_string(),
229 user_id: user_id.to_string(),
230 data,
231 })
232 }
233
234 pub fn get_presence(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
236 let rooms = self.rooms.lock().unwrap();
237 rooms
238 .get(room)?
239 .members
240 .get(user_id)
241 .map(|m| m.data.clone())
242 }
243
244 pub fn broadcast(
247 &self,
248 room: &str,
249 sender: Option<&str>,
250 topic: &str,
251 data: serde_json::Value,
252 ) -> Option<RoomEvent> {
253 let rooms = self.rooms.lock().unwrap();
254 if !rooms.contains_key(room) {
255 return None;
256 }
257
258 Some(RoomEvent::Broadcast {
259 room: room.to_string(),
260 sender: sender.map(|s| s.to_string()),
261 topic: topic.to_string(),
262 data,
263 })
264 }
265
266 pub fn members(&self, room: &str) -> Vec<PeerInfo> {
268 let rooms = self.rooms.lock().unwrap();
269 rooms.get(room).map(|r| r.peer_infos()).unwrap_or_default()
270 }
271
272 pub fn list_rooms(&self) -> Vec<String> {
274 let rooms = self.rooms.lock().unwrap();
275 rooms.keys().cloned().collect()
276 }
277
278 pub fn is_in_room(&self, room: &str, user_id: &str) -> bool {
280 let rooms = self.rooms.lock().unwrap();
281 rooms
282 .get(room)
283 .map(|r| r.members.contains_key(user_id))
284 .unwrap_or(false)
285 }
286
287 pub fn room_size(&self, room: &str) -> usize {
289 let rooms = self.rooms.lock().unwrap();
290 rooms.get(room).map(|r| r.members.len()).unwrap_or(0)
291 }
292
293 pub fn disconnect(&self, user_id: &str) -> Vec<RoomEvent> {
295 let mut rooms = self.rooms.lock().unwrap();
296 let mut events = Vec::new();
297 let mut empty_rooms = Vec::new();
298
299 for (room_name, room_state) in rooms.iter_mut() {
300 if room_state.members.remove(user_id).is_some() {
301 events.push(RoomEvent::Leave {
302 room: room_name.clone(),
303 user_id: user_id.to_string(),
304 });
305 if room_state.members.is_empty() {
306 empty_rooms.push(room_name.clone());
307 }
308 }
309 }
310
311 for name in empty_rooms {
312 rooms.remove(&name);
313 }
314
315 events
316 }
317
318 pub fn cleanup_idle(&self) -> Vec<RoomEvent> {
320 let now = Instant::now();
321 let timeout = self.idle_timeout;
322 let mut rooms = self.rooms.lock().unwrap();
323 let mut events = Vec::new();
324 let mut empty_rooms = Vec::new();
325
326 for (room_name, room_state) in rooms.iter_mut() {
327 let idle_users: Vec<String> = room_state
328 .members
329 .iter()
330 .filter(|(_, m)| now.duration_since(m.last_active) >= timeout)
331 .map(|(uid, _)| uid.clone())
332 .collect();
333
334 for uid in idle_users {
335 room_state.members.remove(&uid);
336 events.push(RoomEvent::Leave {
337 room: room_name.clone(),
338 user_id: uid,
339 });
340 }
341
342 if room_state.members.is_empty() {
343 empty_rooms.push(room_name.clone());
344 }
345 }
346
347 for name in empty_rooms {
348 rooms.remove(&name);
349 }
350
351 events
352 }
353
354 pub fn user_rooms(&self, user_id: &str) -> Vec<String> {
356 let rooms = self.rooms.lock().unwrap();
357 rooms
358 .iter()
359 .filter(|(_, r)| r.members.contains_key(user_id))
360 .map(|(name, _)| name.clone())
361 .collect()
362 }
363}
364
365fn now_iso() -> String {
366 let ts = SystemTime::now()
367 .duration_since(UNIX_EPOCH)
368 .unwrap_or_default()
369 .as_secs();
370 format!("{ts}Z")
371}
372
373#[cfg(test)]
378mod tests {
379 use super::*;
380 use std::collections::HashSet;
381
382 #[test]
383 fn join_creates_room() {
384 let mgr = RoomManager::new(60);
385 assert!(mgr.list_rooms().is_empty());
386
387 let (snapshot, join) = mgr.join("lobby", "alice", None).unwrap();
388
389 assert!(matches!(snapshot, RoomEvent::Snapshot { ref peers, .. } if peers.is_empty()));
391 assert!(matches!(join, RoomEvent::Join { ref user_id, .. } if user_id == "alice"));
392 assert_eq!(mgr.list_rooms(), vec!["lobby"]);
393 assert_eq!(mgr.room_size("lobby"), 1);
394 }
395
396 #[test]
397 fn join_returns_existing_peers_in_snapshot() {
398 let mgr = RoomManager::new(60);
399 mgr.join("lobby", "alice", Some(serde_json::json!({"color": "red"})))
400 .unwrap();
401
402 let (snapshot, _) = mgr.join("lobby", "bob", None).unwrap();
403 if let RoomEvent::Snapshot { peers, .. } = snapshot {
404 assert_eq!(peers.len(), 1);
405 assert_eq!(peers[0].user_id, "alice");
406 assert_eq!(peers[0].data, serde_json::json!({"color": "red"}));
407 } else {
408 panic!("expected Snapshot");
409 }
410 }
411
412 #[test]
413 fn leave_removes_member() {
414 let mgr = RoomManager::new(60);
415 mgr.join("lobby", "alice", None).unwrap();
416 mgr.join("lobby", "bob", None).unwrap();
417
418 let event = mgr.leave("lobby", "alice");
419 assert!(event.is_some());
420 assert!(!mgr.is_in_room("lobby", "alice"));
421 assert!(mgr.is_in_room("lobby", "bob"));
422 assert_eq!(mgr.room_size("lobby"), 1);
423 }
424
425 #[test]
426 fn leave_last_member_removes_room() {
427 let mgr = RoomManager::new(60);
428 mgr.join("lobby", "alice", None).unwrap();
429
430 mgr.leave("lobby", "alice");
431 assert!(mgr.list_rooms().is_empty());
432 assert_eq!(mgr.room_size("lobby"), 0);
433 }
434
435 #[test]
436 fn leave_nonexistent_returns_none() {
437 let mgr = RoomManager::new(60);
438 assert!(mgr.leave("lobby", "alice").is_none());
439 }
440
441 #[test]
442 fn set_and_get_presence() {
443 let mgr = RoomManager::new(60);
444 mgr.join("doc:123", "alice", None).unwrap();
445
446 let event = mgr.set_presence(
447 "doc:123",
448 "alice",
449 serde_json::json!({"cursor": {"x": 10, "y": 20}}),
450 );
451 assert!(event.is_some());
452
453 let data = mgr.get_presence("doc:123", "alice").unwrap();
454 assert_eq!(data, serde_json::json!({"cursor": {"x": 10, "y": 20}}));
455 }
456
457 #[test]
458 fn presence_not_in_room_returns_none() {
459 let mgr = RoomManager::new(60);
460 assert!(mgr
461 .set_presence("lobby", "alice", serde_json::json!({}))
462 .is_none());
463 assert!(mgr.get_presence("lobby", "alice").is_none());
464 }
465
466 #[test]
467 fn broadcast_to_room() {
468 let mgr = RoomManager::new(60);
469 mgr.join("lobby", "alice", None).unwrap();
470
471 let event = mgr.broadcast(
472 "lobby",
473 Some("alice"),
474 "typing",
475 serde_json::json!({"active": true}),
476 );
477 assert!(event.is_some());
478
479 if let Some(RoomEvent::Broadcast {
480 topic,
481 sender,
482 data,
483 ..
484 }) = event
485 {
486 assert_eq!(topic, "typing");
487 assert_eq!(sender, Some("alice".to_string()));
488 assert_eq!(data, serde_json::json!({"active": true}));
489 } else {
490 panic!("expected Broadcast");
491 }
492 }
493
494 #[test]
495 fn broadcast_to_nonexistent_room() {
496 let mgr = RoomManager::new(60);
497 assert!(mgr
498 .broadcast("ghost", None, "ping", serde_json::json!({}))
499 .is_none());
500 }
501
502 #[test]
503 fn members_list() {
504 let mgr = RoomManager::new(60);
505 mgr.join("lobby", "alice", Some(serde_json::json!({"role": "admin"})))
506 .unwrap();
507 mgr.join("lobby", "bob", None).unwrap();
508
509 let members = mgr.members("lobby");
510 assert_eq!(members.len(), 2);
511
512 let ids: HashSet<String> = members.iter().map(|m| m.user_id.clone()).collect();
513 assert!(ids.contains("alice"));
514 assert!(ids.contains("bob"));
515 }
516
517 #[test]
518 fn disconnect_removes_from_all_rooms() {
519 let mgr = RoomManager::new(60);
520 mgr.join("lobby", "alice", None).unwrap();
521 mgr.join("kitchen", "alice", None).unwrap();
522 mgr.join("lobby", "bob", None).unwrap();
523
524 let events = mgr.disconnect("alice");
525 assert_eq!(events.len(), 2);
526 assert!(!mgr.is_in_room("lobby", "alice"));
527 assert!(!mgr.is_in_room("kitchen", "alice"));
528 assert!(mgr.is_in_room("lobby", "bob"));
529 assert!(!mgr.list_rooms().contains(&"kitchen".to_string()));
531 }
532
533 #[test]
534 fn cleanup_idle_members() {
535 let mgr = RoomManager::new(0); mgr.join("lobby", "alice", None).unwrap();
537 mgr.join("lobby", "bob", None).unwrap();
538
539 let events = mgr.cleanup_idle();
540 assert_eq!(events.len(), 2);
541 assert!(mgr.list_rooms().is_empty());
542 }
543
544 #[test]
545 fn user_rooms() {
546 let mgr = RoomManager::new(60);
547 mgr.join("lobby", "alice", None).unwrap();
548 mgr.join("kitchen", "alice", None).unwrap();
549 mgr.join("lobby", "bob", None).unwrap();
550
551 let mut rooms = mgr.user_rooms("alice");
552 rooms.sort();
553 assert_eq!(rooms, vec!["kitchen", "lobby"]);
554 assert_eq!(mgr.user_rooms("bob"), vec!["lobby"]);
555 assert!(mgr.user_rooms("nobody").is_empty());
556 }
557
558 #[test]
559 fn entity_scoped_room() {
560 let mgr = RoomManager::new(60);
561 mgr.join(
562 "Todo:t1",
563 "alice",
564 Some(serde_json::json!({"editing": true})),
565 )
566 .unwrap();
567 mgr.join("Todo:t1", "bob", Some(serde_json::json!({"viewing": true})))
568 .unwrap();
569
570 assert_eq!(mgr.room_size("Todo:t1"), 2);
571
572 mgr.set_presence("Todo:t1", "alice", serde_json::json!({"cursor": 42}));
573 let data = mgr.get_presence("Todo:t1", "alice").unwrap();
574 assert_eq!(data, serde_json::json!({"cursor": 42}));
575 }
576
577 #[test]
578 fn rejoin_updates_data() {
579 let mgr = RoomManager::new(60);
580 mgr.join("lobby", "alice", Some(serde_json::json!({"v": 1})))
581 .unwrap();
582 mgr.join("lobby", "alice", Some(serde_json::json!({"v": 2})))
583 .unwrap();
584
585 assert_eq!(mgr.room_size("lobby"), 1);
587 let members = mgr.members("lobby");
588 assert_eq!(members[0].data, serde_json::json!({"v": 2}));
589 }
590
591 #[test]
592 fn room_event_serialization() {
593 let event = RoomEvent::Join {
594 room: "lobby".into(),
595 user_id: "alice".into(),
596 data: Some(serde_json::json!({"color": "red"})),
597 };
598 let json = serde_json::to_string(&event).unwrap();
599 assert!(json.contains("\"type\":\"join\""));
600 assert!(json.contains("\"room\":\"lobby\""));
601
602 let parsed: RoomEvent = serde_json::from_str(&json).unwrap();
603 assert_eq!(event, parsed);
604 }
605
606 #[test]
607 fn broadcast_event_serialization() {
608 let event = RoomEvent::Broadcast {
609 room: "lobby".into(),
610 sender: None,
611 topic: "system".into(),
612 data: serde_json::json!({"msg": "hello"}),
613 };
614 let json = serde_json::to_string(&event).unwrap();
615 assert!(json.contains("\"type\":\"broadcast\""));
616 assert!(!json.contains("\"sender\"")); let parsed: RoomEvent = serde_json::from_str(&json).unwrap();
619 assert_eq!(event, parsed);
620 }
621
622 #[test]
625 fn max_rooms_enforced() {
626 let mgr = RoomManager::with_max_rooms(60, 2);
627 mgr.join("room1", "alice", None).unwrap();
628 mgr.join("room2", "bob", None).unwrap();
629
630 let result = mgr.join("room3", "charlie", None);
632 assert!(result.is_err());
633 let err = result.unwrap_err();
634 assert_eq!(err.code, "ROOM_LIMIT_REACHED");
635 assert!(err.message.contains("2"));
636 }
637
638 #[test]
639 fn joining_existing_room_at_limit_succeeds() {
640 let mgr = RoomManager::with_max_rooms(60, 2);
641 mgr.join("room1", "alice", None).unwrap();
642 mgr.join("room2", "bob", None).unwrap();
643
644 let result = mgr.join("room1", "charlie", None);
646 assert!(result.is_ok());
647 assert_eq!(mgr.room_size("room1"), 2);
648 }
649
650 #[test]
651 fn room_limit_freed_after_leave() {
652 let mgr = RoomManager::with_max_rooms(60, 2);
653 mgr.join("room1", "alice", None).unwrap();
654 mgr.join("room2", "bob", None).unwrap();
655
656 assert!(mgr.join("room3", "charlie", None).is_err());
658
659 mgr.leave("room2", "bob");
661
662 assert!(mgr.join("room3", "charlie", None).is_ok());
664 }
665
666 #[test]
667 fn default_max_rooms_is_10000() {
668 let mgr = RoomManager::new(60);
669 assert_eq!(mgr.max_rooms, DEFAULT_MAX_ROOMS);
670 }
671}