Skip to main content

pylon_runtime/
rooms.rs

1use std::collections::HashMap;
2use std::sync::Mutex;
3use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use serde::{Deserialize, Serialize};
6
7// ---------------------------------------------------------------------------
8// Room events — the messages exchanged in rooms
9// ---------------------------------------------------------------------------
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(tag = "type", rename_all = "snake_case")]
13pub enum RoomEvent {
14    /// Someone joined the room.
15    Join {
16        room: String,
17        user_id: String,
18        #[serde(skip_serializing_if = "Option::is_none")]
19        data: Option<serde_json::Value>,
20    },
21    /// Someone left the room.
22    Leave { room: String, user_id: String },
23    /// Presence update (cursor position, typing indicator, custom data).
24    Presence {
25        room: String,
26        user_id: String,
27        data: serde_json::Value,
28    },
29    /// Arbitrary broadcast to a room.
30    Broadcast {
31        room: String,
32        #[serde(skip_serializing_if = "Option::is_none")]
33        sender: Option<String>,
34        topic: String,
35        data: serde_json::Value,
36    },
37    /// Room state snapshot (sent on join).
38    Snapshot { room: String, peers: Vec<PeerInfo> },
39}
40
41/// Info about a peer in a room.
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct PeerInfo {
44    pub user_id: String,
45    pub data: serde_json::Value,
46    pub joined_at: String,
47}
48
49// ---------------------------------------------------------------------------
50// Room — a single room's state
51// ---------------------------------------------------------------------------
52
53#[derive(Debug, Clone)]
54struct RoomMember {
55    user_id: String,
56    data: serde_json::Value,
57    joined_at: String,
58    last_active: Instant,
59}
60
61#[derive(Debug)]
62#[allow(dead_code)]
63struct Room {
64    name: String,
65    members: HashMap<String, RoomMember>,
66    created_at: String,
67}
68
69impl Room {
70    fn new(name: &str) -> Self {
71        Self {
72            name: name.to_string(),
73            members: HashMap::new(),
74            created_at: now_iso(),
75        }
76    }
77
78    fn peer_infos(&self) -> Vec<PeerInfo> {
79        self.members
80            .values()
81            .map(|m| PeerInfo {
82                user_id: m.user_id.clone(),
83                data: m.data.clone(),
84                joined_at: m.joined_at.clone(),
85            })
86            .collect()
87    }
88}
89
90// ---------------------------------------------------------------------------
91// RoomManager — manages all rooms
92// ---------------------------------------------------------------------------
93
94/// Error returned when a room operation fails.
95#[derive(Debug, Clone)]
96pub struct RoomError {
97    pub code: String,
98    pub message: String,
99}
100
101impl std::fmt::Display for RoomError {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(f, "[{}] {}", self.code, self.message)
104    }
105}
106
107/// Manages named rooms with membership, presence, and broadcasting.
108///
109/// Rooms are created lazily on first join and removed when the last member leaves.
110/// Each room tracks its members, their ephemeral state (typing, cursors, custom data),
111/// and supports topic-scoped broadcasting.
112pub struct RoomManager {
113    rooms: Mutex<HashMap<String, Room>>,
114    /// How long before an inactive member is considered gone.
115    idle_timeout: Duration,
116    /// Maximum number of rooms that can exist simultaneously.
117    max_rooms: usize,
118}
119
120/// Default maximum number of concurrent rooms.
121const DEFAULT_MAX_ROOMS: usize = 10_000;
122
123impl RoomManager {
124    pub fn new(idle_timeout_secs: u64) -> Self {
125        Self {
126            rooms: Mutex::new(HashMap::new()),
127            idle_timeout: Duration::from_secs(idle_timeout_secs),
128            max_rooms: DEFAULT_MAX_ROOMS,
129        }
130    }
131
132    /// Create a RoomManager with a custom maximum room limit.
133    pub fn with_max_rooms(idle_timeout_secs: u64, max_rooms: usize) -> Self {
134        Self {
135            rooms: Mutex::new(HashMap::new()),
136            idle_timeout: Duration::from_secs(idle_timeout_secs),
137            max_rooms,
138        }
139    }
140
141    /// Join a room. Creates the room if it doesn't exist.
142    /// Returns a snapshot of current peers (before this join) and the join event,
143    /// or an error if the room limit has been reached and this would create a new room.
144    pub fn join(
145        &self,
146        room: &str,
147        user_id: &str,
148        data: Option<serde_json::Value>,
149    ) -> Result<(RoomEvent, RoomEvent), RoomError> {
150        let mut rooms = self.rooms.lock().unwrap();
151
152        // Check if this join would create a new room and if we're at the limit.
153        let room_exists = rooms.contains_key(room);
154        if !room_exists && rooms.len() >= self.max_rooms {
155            return Err(RoomError {
156                code: "ROOM_LIMIT_REACHED".to_string(),
157                message: format!(
158                    "Maximum number of rooms ({}) reached. Cannot create new room.",
159                    self.max_rooms
160                ),
161            });
162        }
163
164        let room_state = rooms
165            .entry(room.to_string())
166            .or_insert_with(|| Room::new(room));
167
168        let member = RoomMember {
169            user_id: user_id.to_string(),
170            data: data.clone().unwrap_or(serde_json::Value::Null),
171            joined_at: now_iso(),
172            last_active: Instant::now(),
173        };
174
175        // Snapshot of existing peers (before the new member joins).
176        let snapshot = RoomEvent::Snapshot {
177            room: room.to_string(),
178            peers: room_state.peer_infos(),
179        };
180
181        room_state.members.insert(user_id.to_string(), member);
182
183        let join_event = RoomEvent::Join {
184            room: room.to_string(),
185            user_id: user_id.to_string(),
186            data,
187        };
188
189        Ok((snapshot, join_event))
190    }
191
192    /// Leave a room. Removes the room if it becomes empty.
193    /// Returns the leave event, or None if the user wasn't in the room.
194    pub fn leave(&self, room: &str, user_id: &str) -> Option<RoomEvent> {
195        let mut rooms = self.rooms.lock().unwrap();
196        let room_state = rooms.get_mut(room)?;
197        room_state.members.remove(user_id)?;
198
199        let event = RoomEvent::Leave {
200            room: room.to_string(),
201            user_id: user_id.to_string(),
202        };
203
204        // Clean up empty rooms.
205        if room_state.members.is_empty() {
206            rooms.remove(room);
207        }
208
209        Some(event)
210    }
211
212    /// Update a member's ephemeral state (typing indicator, cursor position, etc.).
213    /// Returns a presence event, or None if not in the room.
214    pub fn set_presence(
215        &self,
216        room: &str,
217        user_id: &str,
218        data: serde_json::Value,
219    ) -> Option<RoomEvent> {
220        let mut rooms = self.rooms.lock().unwrap();
221        let room_state = rooms.get_mut(room)?;
222        let member = room_state.members.get_mut(user_id)?;
223
224        member.data = data.clone();
225        member.last_active = Instant::now();
226
227        Some(RoomEvent::Presence {
228            room: room.to_string(),
229            user_id: user_id.to_string(),
230            data,
231        })
232    }
233
234    /// Get a member's current ephemeral data.
235    pub fn get_presence(&self, room: &str, user_id: &str) -> Option<serde_json::Value> {
236        let rooms = self.rooms.lock().unwrap();
237        rooms
238            .get(room)?
239            .members
240            .get(user_id)
241            .map(|m| m.data.clone())
242    }
243
244    /// Broadcast an arbitrary event to a room.
245    /// Returns the broadcast event, or None if the room doesn't exist.
246    pub fn broadcast(
247        &self,
248        room: &str,
249        sender: Option<&str>,
250        topic: &str,
251        data: serde_json::Value,
252    ) -> Option<RoomEvent> {
253        let rooms = self.rooms.lock().unwrap();
254        if !rooms.contains_key(room) {
255            return None;
256        }
257
258        Some(RoomEvent::Broadcast {
259            room: room.to_string(),
260            sender: sender.map(|s| s.to_string()),
261            topic: topic.to_string(),
262            data,
263        })
264    }
265
266    /// List all members currently in a room.
267    pub fn members(&self, room: &str) -> Vec<PeerInfo> {
268        let rooms = self.rooms.lock().unwrap();
269        rooms.get(room).map(|r| r.peer_infos()).unwrap_or_default()
270    }
271
272    /// List all active room names.
273    pub fn list_rooms(&self) -> Vec<String> {
274        let rooms = self.rooms.lock().unwrap();
275        rooms.keys().cloned().collect()
276    }
277
278    /// Check if a user is in a specific room.
279    pub fn is_in_room(&self, room: &str, user_id: &str) -> bool {
280        let rooms = self.rooms.lock().unwrap();
281        rooms
282            .get(room)
283            .map(|r| r.members.contains_key(user_id))
284            .unwrap_or(false)
285    }
286
287    /// Get the number of members in a room.
288    pub fn room_size(&self, room: &str) -> usize {
289        let rooms = self.rooms.lock().unwrap();
290        rooms.get(room).map(|r| r.members.len()).unwrap_or(0)
291    }
292
293    /// Remove a user from ALL rooms they're in. Returns leave events.
294    pub fn disconnect(&self, user_id: &str) -> Vec<RoomEvent> {
295        let mut rooms = self.rooms.lock().unwrap();
296        let mut events = Vec::new();
297        let mut empty_rooms = Vec::new();
298
299        for (room_name, room_state) in rooms.iter_mut() {
300            if room_state.members.remove(user_id).is_some() {
301                events.push(RoomEvent::Leave {
302                    room: room_name.clone(),
303                    user_id: user_id.to_string(),
304                });
305                if room_state.members.is_empty() {
306                    empty_rooms.push(room_name.clone());
307                }
308            }
309        }
310
311        for name in empty_rooms {
312            rooms.remove(&name);
313        }
314
315        events
316    }
317
318    /// Remove idle members from all rooms. Returns leave events for each removed member.
319    pub fn cleanup_idle(&self) -> Vec<RoomEvent> {
320        let now = Instant::now();
321        let timeout = self.idle_timeout;
322        let mut rooms = self.rooms.lock().unwrap();
323        let mut events = Vec::new();
324        let mut empty_rooms = Vec::new();
325
326        for (room_name, room_state) in rooms.iter_mut() {
327            let idle_users: Vec<String> = room_state
328                .members
329                .iter()
330                .filter(|(_, m)| now.duration_since(m.last_active) >= timeout)
331                .map(|(uid, _)| uid.clone())
332                .collect();
333
334            for uid in idle_users {
335                room_state.members.remove(&uid);
336                events.push(RoomEvent::Leave {
337                    room: room_name.clone(),
338                    user_id: uid,
339                });
340            }
341
342            if room_state.members.is_empty() {
343                empty_rooms.push(room_name.clone());
344            }
345        }
346
347        for name in empty_rooms {
348            rooms.remove(&name);
349        }
350
351        events
352    }
353
354    /// Get all rooms a user is currently in.
355    pub fn user_rooms(&self, user_id: &str) -> Vec<String> {
356        let rooms = self.rooms.lock().unwrap();
357        rooms
358            .iter()
359            .filter(|(_, r)| r.members.contains_key(user_id))
360            .map(|(name, _)| name.clone())
361            .collect()
362    }
363}
364
365fn now_iso() -> String {
366    let ts = SystemTime::now()
367        .duration_since(UNIX_EPOCH)
368        .unwrap_or_default()
369        .as_secs();
370    format!("{ts}Z")
371}
372
373// ---------------------------------------------------------------------------
374// Tests
375// ---------------------------------------------------------------------------
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use std::collections::HashSet;
381
382    #[test]
383    fn join_creates_room() {
384        let mgr = RoomManager::new(60);
385        assert!(mgr.list_rooms().is_empty());
386
387        let (snapshot, join) = mgr.join("lobby", "alice", None).unwrap();
388
389        // Snapshot should be empty (alice is the first member).
390        assert!(matches!(snapshot, RoomEvent::Snapshot { ref peers, .. } if peers.is_empty()));
391        assert!(matches!(join, RoomEvent::Join { ref user_id, .. } if user_id == "alice"));
392        assert_eq!(mgr.list_rooms(), vec!["lobby"]);
393        assert_eq!(mgr.room_size("lobby"), 1);
394    }
395
396    #[test]
397    fn join_returns_existing_peers_in_snapshot() {
398        let mgr = RoomManager::new(60);
399        mgr.join("lobby", "alice", Some(serde_json::json!({"color": "red"})))
400            .unwrap();
401
402        let (snapshot, _) = mgr.join("lobby", "bob", None).unwrap();
403        if let RoomEvent::Snapshot { peers, .. } = snapshot {
404            assert_eq!(peers.len(), 1);
405            assert_eq!(peers[0].user_id, "alice");
406            assert_eq!(peers[0].data, serde_json::json!({"color": "red"}));
407        } else {
408            panic!("expected Snapshot");
409        }
410    }
411
412    #[test]
413    fn leave_removes_member() {
414        let mgr = RoomManager::new(60);
415        mgr.join("lobby", "alice", None).unwrap();
416        mgr.join("lobby", "bob", None).unwrap();
417
418        let event = mgr.leave("lobby", "alice");
419        assert!(event.is_some());
420        assert!(!mgr.is_in_room("lobby", "alice"));
421        assert!(mgr.is_in_room("lobby", "bob"));
422        assert_eq!(mgr.room_size("lobby"), 1);
423    }
424
425    #[test]
426    fn leave_last_member_removes_room() {
427        let mgr = RoomManager::new(60);
428        mgr.join("lobby", "alice", None).unwrap();
429
430        mgr.leave("lobby", "alice");
431        assert!(mgr.list_rooms().is_empty());
432        assert_eq!(mgr.room_size("lobby"), 0);
433    }
434
435    #[test]
436    fn leave_nonexistent_returns_none() {
437        let mgr = RoomManager::new(60);
438        assert!(mgr.leave("lobby", "alice").is_none());
439    }
440
441    #[test]
442    fn set_and_get_presence() {
443        let mgr = RoomManager::new(60);
444        mgr.join("doc:123", "alice", None).unwrap();
445
446        let event = mgr.set_presence(
447            "doc:123",
448            "alice",
449            serde_json::json!({"cursor": {"x": 10, "y": 20}}),
450        );
451        assert!(event.is_some());
452
453        let data = mgr.get_presence("doc:123", "alice").unwrap();
454        assert_eq!(data, serde_json::json!({"cursor": {"x": 10, "y": 20}}));
455    }
456
457    #[test]
458    fn presence_not_in_room_returns_none() {
459        let mgr = RoomManager::new(60);
460        assert!(mgr
461            .set_presence("lobby", "alice", serde_json::json!({}))
462            .is_none());
463        assert!(mgr.get_presence("lobby", "alice").is_none());
464    }
465
466    #[test]
467    fn broadcast_to_room() {
468        let mgr = RoomManager::new(60);
469        mgr.join("lobby", "alice", None).unwrap();
470
471        let event = mgr.broadcast(
472            "lobby",
473            Some("alice"),
474            "typing",
475            serde_json::json!({"active": true}),
476        );
477        assert!(event.is_some());
478
479        if let Some(RoomEvent::Broadcast {
480            topic,
481            sender,
482            data,
483            ..
484        }) = event
485        {
486            assert_eq!(topic, "typing");
487            assert_eq!(sender, Some("alice".to_string()));
488            assert_eq!(data, serde_json::json!({"active": true}));
489        } else {
490            panic!("expected Broadcast");
491        }
492    }
493
494    #[test]
495    fn broadcast_to_nonexistent_room() {
496        let mgr = RoomManager::new(60);
497        assert!(mgr
498            .broadcast("ghost", None, "ping", serde_json::json!({}))
499            .is_none());
500    }
501
502    #[test]
503    fn members_list() {
504        let mgr = RoomManager::new(60);
505        mgr.join("lobby", "alice", Some(serde_json::json!({"role": "admin"})))
506            .unwrap();
507        mgr.join("lobby", "bob", None).unwrap();
508
509        let members = mgr.members("lobby");
510        assert_eq!(members.len(), 2);
511
512        let ids: HashSet<String> = members.iter().map(|m| m.user_id.clone()).collect();
513        assert!(ids.contains("alice"));
514        assert!(ids.contains("bob"));
515    }
516
517    #[test]
518    fn disconnect_removes_from_all_rooms() {
519        let mgr = RoomManager::new(60);
520        mgr.join("lobby", "alice", None).unwrap();
521        mgr.join("kitchen", "alice", None).unwrap();
522        mgr.join("lobby", "bob", None).unwrap();
523
524        let events = mgr.disconnect("alice");
525        assert_eq!(events.len(), 2);
526        assert!(!mgr.is_in_room("lobby", "alice"));
527        assert!(!mgr.is_in_room("kitchen", "alice"));
528        assert!(mgr.is_in_room("lobby", "bob"));
529        // Kitchen should be removed (was only alice).
530        assert!(!mgr.list_rooms().contains(&"kitchen".to_string()));
531    }
532
533    #[test]
534    fn cleanup_idle_members() {
535        let mgr = RoomManager::new(0); // 0 timeout = immediate expiry
536        mgr.join("lobby", "alice", None).unwrap();
537        mgr.join("lobby", "bob", None).unwrap();
538
539        let events = mgr.cleanup_idle();
540        assert_eq!(events.len(), 2);
541        assert!(mgr.list_rooms().is_empty());
542    }
543
544    #[test]
545    fn user_rooms() {
546        let mgr = RoomManager::new(60);
547        mgr.join("lobby", "alice", None).unwrap();
548        mgr.join("kitchen", "alice", None).unwrap();
549        mgr.join("lobby", "bob", None).unwrap();
550
551        let mut rooms = mgr.user_rooms("alice");
552        rooms.sort();
553        assert_eq!(rooms, vec!["kitchen", "lobby"]);
554        assert_eq!(mgr.user_rooms("bob"), vec!["lobby"]);
555        assert!(mgr.user_rooms("nobody").is_empty());
556    }
557
558    #[test]
559    fn entity_scoped_room() {
560        let mgr = RoomManager::new(60);
561        mgr.join(
562            "Todo:t1",
563            "alice",
564            Some(serde_json::json!({"editing": true})),
565        )
566        .unwrap();
567        mgr.join("Todo:t1", "bob", Some(serde_json::json!({"viewing": true})))
568            .unwrap();
569
570        assert_eq!(mgr.room_size("Todo:t1"), 2);
571
572        mgr.set_presence("Todo:t1", "alice", serde_json::json!({"cursor": 42}));
573        let data = mgr.get_presence("Todo:t1", "alice").unwrap();
574        assert_eq!(data, serde_json::json!({"cursor": 42}));
575    }
576
577    #[test]
578    fn rejoin_updates_data() {
579        let mgr = RoomManager::new(60);
580        mgr.join("lobby", "alice", Some(serde_json::json!({"v": 1})))
581            .unwrap();
582        mgr.join("lobby", "alice", Some(serde_json::json!({"v": 2})))
583            .unwrap();
584
585        // Should still be 1 member, not 2.
586        assert_eq!(mgr.room_size("lobby"), 1);
587        let members = mgr.members("lobby");
588        assert_eq!(members[0].data, serde_json::json!({"v": 2}));
589    }
590
591    #[test]
592    fn room_event_serialization() {
593        let event = RoomEvent::Join {
594            room: "lobby".into(),
595            user_id: "alice".into(),
596            data: Some(serde_json::json!({"color": "red"})),
597        };
598        let json = serde_json::to_string(&event).unwrap();
599        assert!(json.contains("\"type\":\"join\""));
600        assert!(json.contains("\"room\":\"lobby\""));
601
602        let parsed: RoomEvent = serde_json::from_str(&json).unwrap();
603        assert_eq!(event, parsed);
604    }
605
606    #[test]
607    fn broadcast_event_serialization() {
608        let event = RoomEvent::Broadcast {
609            room: "lobby".into(),
610            sender: None,
611            topic: "system".into(),
612            data: serde_json::json!({"msg": "hello"}),
613        };
614        let json = serde_json::to_string(&event).unwrap();
615        assert!(json.contains("\"type\":\"broadcast\""));
616        assert!(!json.contains("\"sender\"")); // skip_serializing_if None
617
618        let parsed: RoomEvent = serde_json::from_str(&json).unwrap();
619        assert_eq!(event, parsed);
620    }
621
622    // --- Room limit tests ---
623
624    #[test]
625    fn max_rooms_enforced() {
626        let mgr = RoomManager::with_max_rooms(60, 2);
627        mgr.join("room1", "alice", None).unwrap();
628        mgr.join("room2", "bob", None).unwrap();
629
630        // Third distinct room should fail.
631        let result = mgr.join("room3", "charlie", None);
632        assert!(result.is_err());
633        let err = result.unwrap_err();
634        assert_eq!(err.code, "ROOM_LIMIT_REACHED");
635        assert!(err.message.contains("2"));
636    }
637
638    #[test]
639    fn joining_existing_room_at_limit_succeeds() {
640        let mgr = RoomManager::with_max_rooms(60, 2);
641        mgr.join("room1", "alice", None).unwrap();
642        mgr.join("room2", "bob", None).unwrap();
643
644        // Joining an existing room should still work even at the limit.
645        let result = mgr.join("room1", "charlie", None);
646        assert!(result.is_ok());
647        assert_eq!(mgr.room_size("room1"), 2);
648    }
649
650    #[test]
651    fn room_limit_freed_after_leave() {
652        let mgr = RoomManager::with_max_rooms(60, 2);
653        mgr.join("room1", "alice", None).unwrap();
654        mgr.join("room2", "bob", None).unwrap();
655
656        // At limit.
657        assert!(mgr.join("room3", "charlie", None).is_err());
658
659        // Free up a slot by having last member leave.
660        mgr.leave("room2", "bob");
661
662        // Now we can create a new room.
663        assert!(mgr.join("room3", "charlie", None).is_ok());
664    }
665
666    #[test]
667    fn default_max_rooms_is_10000() {
668        let mgr = RoomManager::new(60);
669        assert_eq!(mgr.max_rooms, DEFAULT_MAX_ROOMS);
670    }
671}