Skip to main content

freezeout_server/
table.rs

1// Copyright (C) 2025 Vince Vasta
2// SPDX-License-Identifier: Apache-2.0
3
4//! Table implementation.
5use anyhow::Result;
6use log::{error, info};
7use std::{sync::Arc, time::Duration};
8use tokio::{
9    sync::{broadcast, mpsc, oneshot},
10    time,
11};
12
13use freezeout_core::{
14    crypto::{PeerId, SigningKey},
15    message::SignedMessage,
16    poker::{Chips, TableId},
17};
18
19use crate::db::Db;
20
21mod player;
22mod state;
23
24pub use state::TableJoinError;
25
26/// Table state shared by all players who joined the table.
27#[derive(Debug)]
28pub struct Table {
29    /// Channel for sending commands.
30    commands_tx: mpsc::Sender<TableCommand>,
31    /// This table id.
32    table_id: TableId,
33}
34
35/// A message sent to player connections.
36#[derive(Debug)]
37pub enum TableMessage {
38    /// Sends a message to a client.
39    Send(SignedMessage),
40    /// Tell the client to leave the table.
41    PlayerLeft,
42    /// Tell the client to introduce a delay between messages.
43    Throttle(Duration),
44    /// Close a client connection.
45    Close,
46}
47
48/// Command for the table task.
49#[derive(Debug)]
50enum TableCommand {
51    /// Join this table.
52    TryJoin {
53        player_id: PeerId,
54        nickname: String,
55        join_chips: Chips,
56        table_tx: mpsc::Sender<TableMessage>,
57        resp_tx: oneshot::Sender<Result<(), state::TableJoinError>>,
58    },
59    /// Query if a player can join the table.
60    PlayerCanJoin { resp_tx: oneshot::Sender<bool> },
61    /// Leave this table.
62    Leave(PeerId),
63    /// Handle a player message.
64    Message(SignedMessage),
65}
66
67impl Table {
68    /// Creates a new table that manages players and game state.
69    pub fn new(
70        seats: usize,
71        sk: Arc<SigningKey>,
72        db: Db,
73        shutdown_broadcast_rx: broadcast::Receiver<()>,
74        shutdown_complete_tx: mpsc::Sender<()>,
75    ) -> Self {
76        // There must be at least 2 seats.
77        assert!(seats > 1);
78
79        let (commands_tx, commands_rx) = mpsc::channel(128);
80
81        let table_id = TableId::new_id();
82
83        let mut task = TableTask {
84            table_id,
85            seats,
86            sk,
87            db,
88            commands_rx,
89            shutdown_broadcast_rx,
90            _shutdown_complete_tx: shutdown_complete_tx,
91        };
92
93        tokio::spawn(async move {
94            if let Err(err) = task.run().await {
95                error!("Table {} error {err}", task.table_id);
96            }
97
98            info!("Table task for table {} stopped", task.table_id);
99        });
100
101        Self {
102            commands_tx,
103            table_id,
104        }
105    }
106
107    /// Returns this table id.
108    pub fn table_id(&self) -> TableId {
109        self.table_id
110    }
111
112    /// Checks if a player can join the table.
113    pub async fn player_can_join(&self) -> bool {
114        let (resp_tx, resp_rx) = oneshot::channel();
115
116        let res = self
117            .commands_tx
118            .send(TableCommand::PlayerCanJoin { resp_tx })
119            .await
120            .is_ok();
121        res && resp_rx.await.unwrap_or(false)
122    }
123
124    /// A player tried to join this table, returns true if the player joined.
125    pub async fn try_join(
126        &self,
127        player_id: &PeerId,
128        nickname: &str,
129        join_chips: Chips,
130        table_tx: mpsc::Sender<TableMessage>,
131    ) -> Result<(), TableJoinError> {
132        let (resp_tx, resp_rx) = oneshot::channel();
133
134        self.commands_tx
135            .send(TableCommand::TryJoin {
136                player_id: player_id.clone(),
137                nickname: nickname.to_string(),
138                join_chips,
139                table_tx,
140                resp_tx,
141            })
142            .await
143            .map_err(|_| TableJoinError::Unknown)?;
144
145        resp_rx.await.map_err(|_| TableJoinError::Unknown)?
146    }
147
148    /// A player leaves the table.
149    pub async fn leave(&self, player_id: &PeerId) {
150        let _ = self
151            .commands_tx
152            .send(TableCommand::Leave(player_id.clone()))
153            .await;
154    }
155
156    /// Handle a message from a player.
157    pub async fn message(&self, msg: SignedMessage) {
158        let _ = self.commands_tx.send(TableCommand::Message(msg)).await;
159    }
160}
161
162struct TableTask {
163    /// This table identifie.
164    table_id: TableId,
165    /// Table seats.
166    seats: usize,
167    /// Table key.
168    sk: Arc<SigningKey>,
169    /// Game db.
170    db: Db,
171    /// Channel for receiving table commands.
172    commands_rx: mpsc::Receiver<TableCommand>,
173    /// Channel for listening shutdown notification.
174    shutdown_broadcast_rx: broadcast::Receiver<()>,
175    /// Sender that drops when this connection is done.
176    _shutdown_complete_tx: mpsc::Sender<()>,
177}
178
179impl TableTask {
180    async fn run(&mut self) -> Result<()> {
181        let mut state =
182            state::State::new(self.table_id, self.seats, self.sk.clone(), self.db.clone());
183        let mut ticks = time::interval(Duration::from_millis(500));
184
185        loop {
186            tokio::select! {
187                // Server is shutting down exit this handler.
188                _ = self.shutdown_broadcast_rx.recv() => break Ok(()),
189                _ = ticks.tick() => {
190                    state.tick().await;
191                }
192                // We have received a message from the client.
193                res = self.commands_rx.recv() => match res {
194                    Some(TableCommand::TryJoin{ player_id, nickname, join_chips, table_tx, resp_tx }) => {
195                        let res = state.try_join(&player_id, &nickname, join_chips, table_tx).await;
196                        let _ = resp_tx.send(res);
197                    }
198                    Some(TableCommand::PlayerCanJoin { resp_tx }) => {
199                        let res = state.player_can_join();
200                        let _ = resp_tx.send(res);
201                    }
202                    Some(TableCommand::Leave(peer_id)) => {
203                        state.leave(&peer_id).await;
204                    }
205                    Some(TableCommand::Message(msg)) => {
206                        state.message(msg).await;
207
208                    }
209                    None => break Ok(()),
210                },
211            }
212        }
213    }
214}