rbp_server/hosting/
casino.rs1use super::*;
2use rbp_auth::Lurker;
3use rbp_core::ID;
4use rbp_gameroom::*;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use tokio::sync::RwLock;
9use tokio::sync::mpsc::UnboundedReceiver;
10use tokio::sync::mpsc::UnboundedSender;
11use tokio_postgres::Client;
12
13type Tx = UnboundedSender<String>;
14type Rx = Arc<Mutex<UnboundedReceiver<String>>>;
15
16pub struct Casino {
18 db: Arc<Client>,
19 rooms: RwLock<HashMap<ID<Room>, RoomHandle>>,
20}
21
22impl Casino {
23 pub fn new(db: Arc<Client>) -> Self {
24 Self {
25 db,
26 rooms: RwLock::new(HashMap::new()),
27 }
28 }
29}
30
31impl Casino {
32 pub async fn start(self: &Arc<Self>) -> anyhow::Result<ID<Room>> {
35 let id = ID::default();
36 let channels = RoomHandle::pair(id);
37 let mut room = Room::new(id, 2, self.db.clone());
38 self.db.create_room(&room).await?;
39 self.rooms.write().await.insert(id, channels.handle);
40 room.sit(channels.client, Lurker::default());
41 room.sit(Fish, Lurker::default());
42 tokio::spawn(room.run(channels.start, channels.done_tx));
43 let casino = self.clone();
44 tokio::spawn(async move {
45 let _ = channels.done_rx.await;
46 let _ = casino.close(id).await;
47 log::info!("[casino] room {} cleaned up", id);
48 });
49 log::debug!("[casino] created room {}", id);
50 Ok(id)
51 }
52 pub async fn close(&self, id: ID<Room>) -> anyhow::Result<()> {
54 self.rooms
55 .write()
56 .await
57 .remove(&id)
58 .map(|_| ())
59 .ok_or_else(|| anyhow::anyhow!("room not found"))
60 }
61 pub async fn channels(
63 &self,
64 id: ID<Room>,
65 ) -> anyhow::Result<(Tx, Rx, Option<tokio::sync::oneshot::Sender<()>>)> {
66 self.rooms
67 .write()
68 .await
69 .get_mut(&id)
70 .map(|h| (h.tx.clone(), h.rx.clone(), h.start.take()))
71 .ok_or_else(|| anyhow::anyhow!("room not found"))
72 }
73 pub async fn bridge(
76 &self,
77 id: ID<Room>,
78 mut session: actix_ws::Session,
79 mut streams: actix_ws::MessageStream,
80 ) -> anyhow::Result<()> {
81 use futures::StreamExt;
82 let (tx, rx, start) = self.channels(id).await?;
83 session
84 .text(ServerMessage::connected(&id.to_string(), 0).to_json())
85 .await
86 .map_err(|e| anyhow::anyhow!("{}", e))?;
87 start.map(|s| s.send(()));
88 log::debug!("[bridge {}] connected", id);
89 actix_web::rt::spawn(async move {
90 'sesh: loop {
91 tokio::select! {
92 biased;
93 msg = async { rx.lock().await.recv().await } => match msg {
94 Some(json) => if session.text(json).await.is_err() { break 'sesh },
95 None => break 'sesh,
96 },
97 msg = streams.next() => match msg {
98 Some(Ok(actix_ws::Message::Text(text))) => if tx.send(text.to_string()).is_err() { break 'sesh },
99 Some(Ok(actix_ws::Message::Close(_))) => break 'sesh,
100 Some(Err(_)) => break 'sesh,
101 None => break 'sesh,
102 _ => continue 'sesh,
103 },
104 }
105 }
106 log::debug!("[bridge {}] disconnected", id);
107 });
108 Ok(())
109 }
110}