Skip to main content

rbp_server/hosting/
casino.rs

1use super::*;
2use rbp_auth::Lurker;
3use rbp_core::ID;
4use rbp_gameroom::*;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use tokio::sync::RwLock;
9use tokio::sync::mpsc::UnboundedReceiver;
10use tokio::sync::mpsc::UnboundedSender;
11use tokio_postgres::Client;
12
13type Tx = UnboundedSender<String>;
14type Rx = Arc<Mutex<UnboundedReceiver<String>>>;
15
16/// Manages active game rooms and their lifecycles.
17pub struct Casino {
18    db: Arc<Client>,
19    rooms: RwLock<HashMap<ID<Room>, RoomHandle>>,
20}
21
22impl Casino {
23    pub fn new(db: Arc<Client>) -> Self {
24        Self {
25            db,
26            rooms: RwLock::new(HashMap::new()),
27        }
28    }
29}
30
31impl Casino {
32    /// Opens a new room with HTTP client vs Fish CPU.
33    /// Spawns the room task (waits for start signal) and returns the room ID.
34    pub async fn start(self: &Arc<Self>) -> anyhow::Result<ID<Room>> {
35        let id = ID::default();
36        let channels = RoomHandle::pair(id);
37        let mut room = Room::new(id, 2, self.db.clone());
38        self.db.create_room(&room).await?;
39        self.rooms.write().await.insert(id, channels.handle);
40        room.sit(channels.client, Lurker::default());
41        room.sit(Fish, Lurker::default());
42        tokio::spawn(room.run(channels.start, channels.done_tx));
43        let casino = self.clone();
44        tokio::spawn(async move {
45            let _ = channels.done_rx.await;
46            let _ = casino.close(id).await;
47            log::info!("[casino] room {} cleaned up", id);
48        });
49        log::debug!("[casino] created room {}", id);
50        Ok(id)
51    }
52    /// Closes a room and removes it from the casino.
53    pub async fn close(&self, id: ID<Room>) -> anyhow::Result<()> {
54        self.rooms
55            .write()
56            .await
57            .remove(&id)
58            .map(|_| ())
59            .ok_or_else(|| anyhow::anyhow!("room not found"))
60    }
61    /// Gets channel endpoints and start signal for WebSocket bridging.
62    pub async fn channels(
63        &self,
64        id: ID<Room>,
65    ) -> anyhow::Result<(Tx, Rx, Option<tokio::sync::oneshot::Sender<()>>)> {
66        self.rooms
67            .write()
68            .await
69            .get_mut(&id)
70            .map(|h| (h.tx.clone(), h.rx.clone(), h.start.take()))
71            .ok_or_else(|| anyhow::anyhow!("room not found"))
72    }
73    /// Spawns WebSocket bridge between client and room channels.
74    /// Sends start signal to room when first client connects.
75    pub async fn bridge(
76        &self,
77        id: ID<Room>,
78        mut session: actix_ws::Session,
79        mut streams: actix_ws::MessageStream,
80    ) -> anyhow::Result<()> {
81        use futures::StreamExt;
82        let (tx, rx, start) = self.channels(id).await?;
83        session
84            .text(ServerMessage::connected(&id.to_string(), 0).to_json())
85            .await
86            .map_err(|e| anyhow::anyhow!("{}", e))?;
87        start.map(|s| s.send(()));
88        log::debug!("[bridge {}] connected", id);
89        actix_web::rt::spawn(async move {
90            'sesh: loop {
91                tokio::select! {
92                    biased;
93                    msg = async { rx.lock().await.recv().await } => match msg {
94                        Some(json) => if session.text(json).await.is_err() { break 'sesh },
95                        None => break 'sesh,
96                    },
97                    msg = streams.next() => match msg {
98                        Some(Ok(actix_ws::Message::Text(text))) => if tx.send(text.to_string()).is_err() { break 'sesh },
99                        Some(Ok(actix_ws::Message::Close(_))) => break 'sesh,
100                        Some(Err(_)) => break 'sesh,
101                        None => break 'sesh,
102                        _ => continue 'sesh,
103                    },
104                }
105            }
106            log::debug!("[bridge {}] disconnected", id);
107        });
108        Ok(())
109    }
110}