use anyhow::Result;
use log::{error, info};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{broadcast, mpsc, oneshot},
time,
};
use freezeout_core::{
crypto::{PeerId, SigningKey},
message::SignedMessage,
poker::{Chips, TableId},
};
use crate::db::Db;
mod player;
mod state;
use state::State;
#[derive(Debug)]
pub struct Table {
commands_tx: mpsc::Sender<TableCommand>,
table_id: TableId,
}
#[derive(Debug)]
pub enum TableMessage {
Send(SignedMessage),
PlayerLeft,
Throttle(Duration),
Close,
}
#[derive(Debug)]
enum TableCommand {
TryJoin {
player_id: PeerId,
nickname: String,
join_chips: Chips,
table_tx: mpsc::Sender<TableMessage>,
resp_tx: oneshot::Sender<Result<()>>,
},
HasGameStarted { resp_tx: oneshot::Sender<bool> },
IsEmpty { resp_tx: oneshot::Sender<bool> },
Leave(PeerId),
Message(SignedMessage),
}
impl Table {
pub fn new(
seats: usize,
sk: Arc<SigningKey>,
db: Db,
shutdown_broadcast_rx: broadcast::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Self {
assert!(seats > 1);
let (commands_tx, commands_rx) = mpsc::channel(128);
let table_id = TableId::new_id();
let mut task = TableTask {
table_id,
seats,
sk,
db,
commands_rx,
shutdown_broadcast_rx,
_shutdown_complete_tx: shutdown_complete_tx,
};
tokio::spawn(async move {
if let Err(err) = task.run().await {
error!("Table {} error {err}", task.table_id);
}
info!("Table task for table {} stopped", task.table_id);
});
Self {
commands_tx,
table_id,
}
}
pub fn table_id(&self) -> TableId {
self.table_id
}
pub async fn has_game_started(&self) -> bool {
let (resp_tx, resp_rx) = oneshot::channel();
let res = self
.commands_tx
.send(TableCommand::HasGameStarted { resp_tx })
.await
.is_ok();
if !res {
false
} else {
resp_rx.await.unwrap_or(false)
}
}
pub async fn is_empty(&self) -> bool {
let (resp_tx, resp_rx) = oneshot::channel();
let res = self
.commands_tx
.send(TableCommand::IsEmpty { resp_tx })
.await
.is_ok();
if !res {
false
} else {
resp_rx.await.unwrap_or(false)
}
}
pub async fn try_join(
&self,
player_id: &PeerId,
nickname: &str,
join_chips: Chips,
table_tx: mpsc::Sender<TableMessage>,
) -> Result<()> {
let (resp_tx, resp_rx) = oneshot::channel();
self.commands_tx
.send(TableCommand::TryJoin {
player_id: player_id.clone(),
nickname: nickname.to_string(),
join_chips,
table_tx,
resp_tx,
})
.await?;
resp_rx.await?
}
pub async fn leave(&self, player_id: &PeerId) {
let _ = self
.commands_tx
.send(TableCommand::Leave(player_id.clone()))
.await;
}
pub async fn message(&self, msg: SignedMessage) {
let _ = self.commands_tx.send(TableCommand::Message(msg)).await;
}
}
struct TableTask {
table_id: TableId,
seats: usize,
sk: Arc<SigningKey>,
db: Db,
commands_rx: mpsc::Receiver<TableCommand>,
shutdown_broadcast_rx: broadcast::Receiver<()>,
_shutdown_complete_tx: mpsc::Sender<()>,
}
impl TableTask {
async fn run(&mut self) -> Result<()> {
let mut state = State::new(self.table_id, self.seats, self.sk.clone(), self.db.clone());
let mut ticks = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = self.shutdown_broadcast_rx.recv() => break Ok(()),
_ = ticks.tick() => {
state.tick().await;
}
res = self.commands_rx.recv() => match res {
Some(TableCommand::TryJoin{ player_id, nickname, join_chips, table_tx, resp_tx }) => {
let res = state.try_join(&player_id, &nickname, join_chips, table_tx).await;
let _ = resp_tx.send(res);
}
Some(TableCommand::HasGameStarted { resp_tx }) => {
let res = state.has_game_started();
let _ = resp_tx.send(res);
}
Some(TableCommand::IsEmpty { resp_tx }) => {
let res = state.is_empty();
let _ = resp_tx.send(res);
}
Some(TableCommand::Leave(peer_id)) => {
state.leave(&peer_id).await;
}
Some(TableCommand::Message(msg)) => {
state.message(msg).await;
}
None => break Ok(()),
},
}
}
}
}