freezeout_server/
table.rs1use anyhow::Result;
6use log::{error, info};
7use std::{sync::Arc, time::Duration};
8use tokio::{
9 sync::{broadcast, mpsc, oneshot},
10 time,
11};
12
13use freezeout_core::{
14 crypto::{PeerId, SigningKey},
15 message::SignedMessage,
16 poker::{Chips, TableId},
17};
18
19use crate::db::Db;
20
21mod player;
22mod state;
23
24pub use state::TableJoinError;
25
26#[derive(Debug)]
28pub struct Table {
29 commands_tx: mpsc::Sender<TableCommand>,
31 table_id: TableId,
33}
34
35#[derive(Debug)]
37pub enum TableMessage {
38 Send(SignedMessage),
40 PlayerLeft,
42 Throttle(Duration),
44 Close,
46}
47
48#[derive(Debug)]
50enum TableCommand {
51 TryJoin {
53 player_id: PeerId,
54 nickname: String,
55 join_chips: Chips,
56 table_tx: mpsc::Sender<TableMessage>,
57 resp_tx: oneshot::Sender<Result<(), state::TableJoinError>>,
58 },
59 PlayerCanJoin { resp_tx: oneshot::Sender<bool> },
61 Leave(PeerId),
63 Message(SignedMessage),
65}
66
67impl Table {
68 pub fn new(
70 seats: usize,
71 sk: Arc<SigningKey>,
72 db: Db,
73 shutdown_broadcast_rx: broadcast::Receiver<()>,
74 shutdown_complete_tx: mpsc::Sender<()>,
75 ) -> Self {
76 assert!(seats > 1);
78
79 let (commands_tx, commands_rx) = mpsc::channel(128);
80
81 let table_id = TableId::new_id();
82
83 let mut task = TableTask {
84 table_id,
85 seats,
86 sk,
87 db,
88 commands_rx,
89 shutdown_broadcast_rx,
90 _shutdown_complete_tx: shutdown_complete_tx,
91 };
92
93 tokio::spawn(async move {
94 if let Err(err) = task.run().await {
95 error!("Table {} error {err}", task.table_id);
96 }
97
98 info!("Table task for table {} stopped", task.table_id);
99 });
100
101 Self {
102 commands_tx,
103 table_id,
104 }
105 }
106
107 pub fn table_id(&self) -> TableId {
109 self.table_id
110 }
111
112 pub async fn player_can_join(&self) -> bool {
114 let (resp_tx, resp_rx) = oneshot::channel();
115
116 let res = self
117 .commands_tx
118 .send(TableCommand::PlayerCanJoin { resp_tx })
119 .await
120 .is_ok();
121 res && resp_rx.await.unwrap_or(false)
122 }
123
124 pub async fn try_join(
126 &self,
127 player_id: &PeerId,
128 nickname: &str,
129 join_chips: Chips,
130 table_tx: mpsc::Sender<TableMessage>,
131 ) -> Result<(), TableJoinError> {
132 let (resp_tx, resp_rx) = oneshot::channel();
133
134 self.commands_tx
135 .send(TableCommand::TryJoin {
136 player_id: player_id.clone(),
137 nickname: nickname.to_string(),
138 join_chips,
139 table_tx,
140 resp_tx,
141 })
142 .await
143 .map_err(|_| TableJoinError::Unknown)?;
144
145 resp_rx.await.map_err(|_| TableJoinError::Unknown)?
146 }
147
148 pub async fn leave(&self, player_id: &PeerId) {
150 let _ = self
151 .commands_tx
152 .send(TableCommand::Leave(player_id.clone()))
153 .await;
154 }
155
156 pub async fn message(&self, msg: SignedMessage) {
158 let _ = self.commands_tx.send(TableCommand::Message(msg)).await;
159 }
160}
161
162struct TableTask {
163 table_id: TableId,
165 seats: usize,
167 sk: Arc<SigningKey>,
169 db: Db,
171 commands_rx: mpsc::Receiver<TableCommand>,
173 shutdown_broadcast_rx: broadcast::Receiver<()>,
175 _shutdown_complete_tx: mpsc::Sender<()>,
177}
178
179impl TableTask {
180 async fn run(&mut self) -> Result<()> {
181 let mut state =
182 state::State::new(self.table_id, self.seats, self.sk.clone(), self.db.clone());
183 let mut ticks = time::interval(Duration::from_millis(500));
184
185 loop {
186 tokio::select! {
187 _ = self.shutdown_broadcast_rx.recv() => break Ok(()),
189 _ = ticks.tick() => {
190 state.tick().await;
191 }
192 res = self.commands_rx.recv() => match res {
194 Some(TableCommand::TryJoin{ player_id, nickname, join_chips, table_tx, resp_tx }) => {
195 let res = state.try_join(&player_id, &nickname, join_chips, table_tx).await;
196 let _ = resp_tx.send(res);
197 }
198 Some(TableCommand::PlayerCanJoin { resp_tx }) => {
199 let res = state.player_can_join();
200 let _ = resp_tx.send(res);
201 }
202 Some(TableCommand::Leave(peer_id)) => {
203 state.leave(&peer_id).await;
204 }
205 Some(TableCommand::Message(msg)) => {
206 state.message(msg).await;
207
208 }
209 None => break Ok(()),
210 },
211 }
212 }
213 }
214}