Skip to main content

freezeout_server/
tables_pool.rs

1// Copyright (C) 2025 Vince Vasta
2// SPDX-License-Identifier: Apache-2.0
3
4//! Tables pool.
5use anyhow::Result;
6use log::error;
7use std::{collections::VecDeque, sync::Arc};
8use thiserror::Error;
9use tokio::sync::{Mutex, broadcast, mpsc};
10
11use freezeout_core::{
12    crypto::{PeerId, SigningKey},
13    poker::Chips,
14};
15
16use crate::{
17    db::Db,
18    table::{Table, TableJoinError, TableMessage},
19};
20
21/// An error from table join operations.
22#[derive(Error, Debug)]
23pub enum TablesPoolsError {
24    /// All tables are busy.
25    #[error("no tables left")]
26    NoTablesLeft,
27    /// The player has already joined the table.
28    #[error("player already joined")]
29    AlreadyJoined,
30}
31
32/// A pool of tables players can join.
33#[derive(Debug, Clone)]
34pub struct TablesPool(Arc<Mutex<Shared>>);
35
36#[derive(Debug)]
37struct Shared {
38    avail: VecDeque<Arc<Table>>,
39    full: VecDeque<Arc<Table>>,
40}
41
42impl TablesPool {
43    /// Creates a new table pool.
44    pub fn new(
45        tables: usize,
46        seats: usize,
47        sk: Arc<SigningKey>,
48        db: Db,
49        shutdown_broadcast_tx: &broadcast::Sender<()>,
50        shutdown_complete_tx: &mpsc::Sender<()>,
51    ) -> Self {
52        let avail = (0..tables)
53            .map(|_| {
54                Arc::new(Table::new(
55                    seats,
56                    sk.clone(),
57                    db.clone(),
58                    shutdown_broadcast_tx.subscribe(),
59                    shutdown_complete_tx.clone(),
60                ))
61            })
62            .collect();
63
64        let state = Shared {
65            avail,
66            full: VecDeque::with_capacity(tables),
67        };
68
69        Self(Arc::new(Mutex::new(state)))
70    }
71
72    /// Try to join a table in the pool.
73    pub async fn join(
74        &self,
75        player_id: &PeerId,
76        nickname: &str,
77        join_chips: Chips,
78        table_tx: mpsc::Sender<TableMessage>,
79    ) -> Result<Arc<Table>, TablesPoolsError> {
80        let mut pool = self.0.lock().await;
81
82        // If there are no available tables try to find them.
83        if pool.avail.is_empty() {
84            for _ in 0..pool.full.len() {
85                if let Some(table) = pool.full.pop_front() {
86                    if table.player_can_join().await {
87                        pool.avail.push_back(table);
88                    } else {
89                        pool.full.push_back(table);
90                    }
91                }
92            }
93        }
94
95        if let Some(table) = pool.avail.front() {
96            let res = table
97                .try_join(player_id, nickname, join_chips, table_tx.clone())
98                .await;
99            match res {
100                Err(TableJoinError::AlreadyJoined) => {
101                    return Err(TablesPoolsError::AlreadyJoined);
102                }
103                Err(_) => {
104                    return Err(TablesPoolsError::NoTablesLeft);
105                }
106                _ => {}
107            };
108
109            // If no other player can join the table move it to the full queue.
110            if !table.player_can_join().await {
111                let table = pool.avail.pop_front().unwrap();
112                pool.full.push_back(table.clone());
113                Ok(table)
114            } else {
115                Ok(table.clone())
116            }
117        } else {
118            Err(TablesPoolsError::NoTablesLeft)
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use freezeout_core::poker::TableId;
127
128    struct TestPool {
129        pool: TablesPool,
130        _shutdown_broadcast_tx: broadcast::Sender<()>,
131        _shutdown_complete_rx: mpsc::Receiver<()>,
132    }
133
134    impl TestPool {
135        fn new(n: usize) -> Self {
136            let sk = SigningKey::default();
137            let db = Db::open_in_memory().unwrap();
138            let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
139            let (shutdown_broadcast_tx, _) = broadcast::channel(1);
140            let pool = TablesPool::new(
141                n,
142                2,
143                Arc::new(sk),
144                db,
145                &shutdown_broadcast_tx,
146                &shutdown_complete_tx,
147            );
148
149            Self {
150                pool,
151                _shutdown_broadcast_tx: shutdown_broadcast_tx,
152                _shutdown_complete_rx: shutdown_complete_rx,
153            }
154        }
155
156        async fn join(&self, p: &TestPlayer) -> Option<Arc<Table>> {
157            self.pool
158                .join(&p.peer_id, "nn", Chips::new(1_000_000), p.tx.clone())
159                .await
160                .ok()
161        }
162
163        async fn avail_ids(&self) -> Vec<TableId> {
164            let pool = self.pool.0.lock().await;
165            pool.avail.iter().map(|t| t.table_id()).collect()
166        }
167
168        async fn count_avail(&self) -> usize {
169            let pool = self.pool.0.lock().await;
170            pool.avail.len()
171        }
172
173        async fn full_ids(&self) -> Vec<TableId> {
174            let pool = self.pool.0.lock().await;
175            pool.full.iter().map(|t| t.table_id()).collect()
176        }
177
178        async fn count_full(&self) -> usize {
179            let pool = self.pool.0.lock().await;
180            pool.full.len()
181        }
182    }
183
184    struct TestPlayer {
185        tx: mpsc::Sender<TableMessage>,
186        _rx: mpsc::Receiver<TableMessage>,
187        peer_id: PeerId,
188    }
189
190    impl TestPlayer {
191        fn new() -> Self {
192            let sk = SigningKey::default();
193            let peer_id = sk.verifying_key().peer_id();
194            let (tx, rx) = mpsc::channel(64);
195            Self {
196                tx,
197                _rx: rx,
198                peer_id,
199            }
200        }
201    }
202
203    #[tokio::test]
204    async fn test_table_pool() {
205        let tp = TestPool::new(2);
206        let tids = tp.avail_ids().await;
207
208        // Player 1 join table 1 that should be in first position.
209        let p1 = TestPlayer::new();
210        let t1 = tp.join(&p1).await.unwrap();
211        assert_eq!(t1.table_id(), tids[0]);
212
213        // Player 2 join table 1.
214        let p2 = TestPlayer::new();
215        let t1 = tp.join(&p2).await.unwrap();
216        assert_eq!(t1.table_id(), tids[0]);
217
218        // As the table is full it should move to the full queue.
219        let tids = tp.full_ids().await;
220        assert_eq!(t1.table_id(), tids[0]);
221
222        // Player 1 join table 2, table 2 should be at front of the queue.
223        let tids = tp.avail_ids().await;
224        let t2 = tp.join(&p1).await.unwrap();
225        assert_eq!(t2.table_id(), tids[0]);
226
227        // Player 2 join table 2.
228        let t2 = tp.join(&p2).await.unwrap();
229        assert_eq!(t2.table_id(), tids[0]);
230
231        // Player 3 tries to join but there are no tables.
232        let p3 = TestPlayer::new();
233        assert!(tp.join(&p3).await.is_none());
234
235        // Players 2 leaves table 1 that becomes ready because with one player left
236        // the game ends (2 seats per table), table 1 should move to the available
237        // queue when a play tries to join.
238        t1.leave(&p2.peer_id).await;
239
240        // Player 1 join table 2, not the join operation move the tables between
241        // queue.
242        let t2 = tp.join(&p1).await.unwrap();
243        let tids = tp.avail_ids().await;
244        assert_eq!(t2.table_id(), tids[0]);
245
246        // Player 2 join table 2.
247        let t2 = tp.join(&p2).await.unwrap();
248        assert_eq!(t2.table_id(), tids[0]);
249    }
250
251    #[tokio::test]
252    async fn test_big_pool() {
253        const N: usize = 1_000;
254        let tp = TestPool::new(N);
255
256        // We should be able to join all tables.
257        let mut players = Vec::with_capacity(N * 2);
258        for _ in 0..N * 2 {
259            let p = TestPlayer::new();
260            let t = tp.join(&p).await.unwrap();
261            players.push((p, t));
262        }
263
264        assert_eq!(tp.count_avail().await, 0);
265        assert_eq!(tp.count_full().await, N);
266
267        // Leave all the tables.
268        for (p, t) in players {
269            t.leave(&p.peer_id).await;
270        }
271
272        // One player joins.
273        let p = TestPlayer::new();
274        tp.join(&p).await.unwrap();
275
276        assert_eq!(tp.count_avail().await, N);
277        assert_eq!(tp.count_full().await, 0);
278
279        // Another player joins first table full.
280        let p = TestPlayer::new();
281        tp.join(&p).await.unwrap();
282        assert_eq!(tp.count_avail().await, N - 1);
283        assert_eq!(tp.count_full().await, 1);
284    }
285}