freezeout_server/
tables_pool.rs1use anyhow::Result;
6use log::error;
7use std::{collections::VecDeque, sync::Arc};
8use thiserror::Error;
9use tokio::sync::{Mutex, broadcast, mpsc};
10
11use freezeout_core::{
12 crypto::{PeerId, SigningKey},
13 poker::Chips,
14};
15
16use crate::{
17 db::Db,
18 table::{Table, TableJoinError, TableMessage},
19};
20
21#[derive(Error, Debug)]
23pub enum TablesPoolsError {
24 #[error("no tables left")]
26 NoTablesLeft,
27 #[error("player already joined")]
29 AlreadyJoined,
30}
31
32#[derive(Debug, Clone)]
34pub struct TablesPool(Arc<Mutex<Shared>>);
35
36#[derive(Debug)]
37struct Shared {
38 avail: VecDeque<Arc<Table>>,
39 full: VecDeque<Arc<Table>>,
40}
41
42impl TablesPool {
43 pub fn new(
45 tables: usize,
46 seats: usize,
47 sk: Arc<SigningKey>,
48 db: Db,
49 shutdown_broadcast_tx: &broadcast::Sender<()>,
50 shutdown_complete_tx: &mpsc::Sender<()>,
51 ) -> Self {
52 let avail = (0..tables)
53 .map(|_| {
54 Arc::new(Table::new(
55 seats,
56 sk.clone(),
57 db.clone(),
58 shutdown_broadcast_tx.subscribe(),
59 shutdown_complete_tx.clone(),
60 ))
61 })
62 .collect();
63
64 let state = Shared {
65 avail,
66 full: VecDeque::with_capacity(tables),
67 };
68
69 Self(Arc::new(Mutex::new(state)))
70 }
71
72 pub async fn join(
74 &self,
75 player_id: &PeerId,
76 nickname: &str,
77 join_chips: Chips,
78 table_tx: mpsc::Sender<TableMessage>,
79 ) -> Result<Arc<Table>, TablesPoolsError> {
80 let mut pool = self.0.lock().await;
81
82 if pool.avail.is_empty() {
84 for _ in 0..pool.full.len() {
85 if let Some(table) = pool.full.pop_front() {
86 if table.player_can_join().await {
87 pool.avail.push_back(table);
88 } else {
89 pool.full.push_back(table);
90 }
91 }
92 }
93 }
94
95 if let Some(table) = pool.avail.front() {
96 let res = table
97 .try_join(player_id, nickname, join_chips, table_tx.clone())
98 .await;
99 match res {
100 Err(TableJoinError::AlreadyJoined) => {
101 return Err(TablesPoolsError::AlreadyJoined);
102 }
103 Err(_) => {
104 return Err(TablesPoolsError::NoTablesLeft);
105 }
106 _ => {}
107 };
108
109 if !table.player_can_join().await {
111 let table = pool.avail.pop_front().unwrap();
112 pool.full.push_back(table.clone());
113 Ok(table)
114 } else {
115 Ok(table.clone())
116 }
117 } else {
118 Err(TablesPoolsError::NoTablesLeft)
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use freezeout_core::poker::TableId;
127
128 struct TestPool {
129 pool: TablesPool,
130 _shutdown_broadcast_tx: broadcast::Sender<()>,
131 _shutdown_complete_rx: mpsc::Receiver<()>,
132 }
133
134 impl TestPool {
135 fn new(n: usize) -> Self {
136 let sk = SigningKey::default();
137 let db = Db::open_in_memory().unwrap();
138 let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
139 let (shutdown_broadcast_tx, _) = broadcast::channel(1);
140 let pool = TablesPool::new(
141 n,
142 2,
143 Arc::new(sk),
144 db,
145 &shutdown_broadcast_tx,
146 &shutdown_complete_tx,
147 );
148
149 Self {
150 pool,
151 _shutdown_broadcast_tx: shutdown_broadcast_tx,
152 _shutdown_complete_rx: shutdown_complete_rx,
153 }
154 }
155
156 async fn join(&self, p: &TestPlayer) -> Option<Arc<Table>> {
157 self.pool
158 .join(&p.peer_id, "nn", Chips::new(1_000_000), p.tx.clone())
159 .await
160 .ok()
161 }
162
163 async fn avail_ids(&self) -> Vec<TableId> {
164 let pool = self.pool.0.lock().await;
165 pool.avail.iter().map(|t| t.table_id()).collect()
166 }
167
168 async fn count_avail(&self) -> usize {
169 let pool = self.pool.0.lock().await;
170 pool.avail.len()
171 }
172
173 async fn full_ids(&self) -> Vec<TableId> {
174 let pool = self.pool.0.lock().await;
175 pool.full.iter().map(|t| t.table_id()).collect()
176 }
177
178 async fn count_full(&self) -> usize {
179 let pool = self.pool.0.lock().await;
180 pool.full.len()
181 }
182 }
183
184 struct TestPlayer {
185 tx: mpsc::Sender<TableMessage>,
186 _rx: mpsc::Receiver<TableMessage>,
187 peer_id: PeerId,
188 }
189
190 impl TestPlayer {
191 fn new() -> Self {
192 let sk = SigningKey::default();
193 let peer_id = sk.verifying_key().peer_id();
194 let (tx, rx) = mpsc::channel(64);
195 Self {
196 tx,
197 _rx: rx,
198 peer_id,
199 }
200 }
201 }
202
203 #[tokio::test]
204 async fn test_table_pool() {
205 let tp = TestPool::new(2);
206 let tids = tp.avail_ids().await;
207
208 let p1 = TestPlayer::new();
210 let t1 = tp.join(&p1).await.unwrap();
211 assert_eq!(t1.table_id(), tids[0]);
212
213 let p2 = TestPlayer::new();
215 let t1 = tp.join(&p2).await.unwrap();
216 assert_eq!(t1.table_id(), tids[0]);
217
218 let tids = tp.full_ids().await;
220 assert_eq!(t1.table_id(), tids[0]);
221
222 let tids = tp.avail_ids().await;
224 let t2 = tp.join(&p1).await.unwrap();
225 assert_eq!(t2.table_id(), tids[0]);
226
227 let t2 = tp.join(&p2).await.unwrap();
229 assert_eq!(t2.table_id(), tids[0]);
230
231 let p3 = TestPlayer::new();
233 assert!(tp.join(&p3).await.is_none());
234
235 t1.leave(&p2.peer_id).await;
239
240 let t2 = tp.join(&p1).await.unwrap();
243 let tids = tp.avail_ids().await;
244 assert_eq!(t2.table_id(), tids[0]);
245
246 let t2 = tp.join(&p2).await.unwrap();
248 assert_eq!(t2.table_id(), tids[0]);
249 }
250
251 #[tokio::test]
252 async fn test_big_pool() {
253 const N: usize = 1_000;
254 let tp = TestPool::new(N);
255
256 let mut players = Vec::with_capacity(N * 2);
258 for _ in 0..N * 2 {
259 let p = TestPlayer::new();
260 let t = tp.join(&p).await.unwrap();
261 players.push((p, t));
262 }
263
264 assert_eq!(tp.count_avail().await, 0);
265 assert_eq!(tp.count_full().await, N);
266
267 for (p, t) in players {
269 t.leave(&p.peer_id).await;
270 }
271
272 let p = TestPlayer::new();
274 tp.join(&p).await.unwrap();
275
276 assert_eq!(tp.count_avail().await, N);
277 assert_eq!(tp.count_full().await, 0);
278
279 let p = TestPlayer::new();
281 tp.join(&p).await.unwrap();
282 assert_eq!(tp.count_avail().await, N - 1);
283 assert_eq!(tp.count_full().await, 1);
284 }
285}