private_poker/net/server.rs
1use anyhow::{Error, bail};
2use log::{debug, error, info, warn};
3use mio::{
4 Events, Interest, Poll, Token, Waker,
5 net::{TcpListener, TcpStream},
6};
7use serde::{Deserialize, Serialize};
8use std::{
9 cmp::max,
10 collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
11 io,
12 net::SocketAddr,
13 sync::mpsc::{Receiver, Sender, channel},
14 thread,
15 time::{Duration, Instant},
16};
17
18use super::{
19 super::{
20 UserError,
21 entities::Vote,
22 game::{
23 GameEvent, GameSettings, GameStateManagement, PhaseDependentUserManagement,
24 PhaseIndependentUserManagement, PokerState,
25 entities::{Action, ActionChoices, GameView, Username},
26 },
27 },
28 messages::{ClientError, ClientMessage, ServerMessage, UserCommand, UserState},
29 utils::{read_prefixed, write_prefixed},
30};
31
32pub const DEFAULT_ACTION_TIMEOUT: Duration = Duration::from_secs(30);
33pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
34pub const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
35pub const DEFAULT_STEP_TIMEOUT: Duration = Duration::from_secs(5);
36pub const MAX_NETWORK_EVENTS_PER_USER: usize = 6;
37pub const SERVER: Token = Token(0);
38pub const WAKER: Token = Token(1);
39
40/// A server message for communication between poker server threads. This
41/// message is never sent directly to poker clients, but fields within the
42/// underlying variants are.
43#[derive(Debug, Deserialize, Serialize)]
44enum ServerData {
45 /// An acknowledgement of a client message, signaling that the client's
46 /// command was successfully processed by the game thread.
47 Ack(ClientMessage),
48 /// An acknowledgement of a server game event, signaling that an internal
49 /// event was processed during a state transition or as a result of a user's
50 /// action.
51 Event(GameEvent),
52 /// Signaling a specific user made an error.
53 UserError {
54 username: Username,
55 error: UserError,
56 },
57 /// Signaling it's a specific user's turn.
58 TurnSignal {
59 username: Username,
60 action_choices: ActionChoices,
61 },
62 /// Game state represented as a string.
63 Status(String),
64 /// Mapping of usernames to their game views.
65 Views(HashMap<Username, GameView>),
66}
67
68fn token_to_string(token: &Token) -> String {
69 let id = token.0;
70 format!("token({id})")
71}
72
73pub struct ServerTimeouts {
74 pub action: Duration,
75 pub connect: Duration,
76 pub poll: Duration,
77 pub step: Duration,
78}
79
80impl Default for ServerTimeouts {
81 fn default() -> Self {
82 Self {
83 action: DEFAULT_ACTION_TIMEOUT,
84 connect: DEFAULT_CONNECT_TIMEOUT,
85 poll: DEFAULT_POLL_TIMEOUT,
86 step: DEFAULT_STEP_TIMEOUT,
87 }
88 }
89}
90
91#[derive(Default)]
92pub struct PokerConfig {
93 pub game_settings: GameSettings,
94 pub server_timeouts: ServerTimeouts,
95}
96
97impl From<GameSettings> for PokerConfig {
98 fn from(value: GameSettings) -> Self {
99 let server_timeouts = ServerTimeouts::default();
100 Self {
101 game_settings: value,
102 server_timeouts,
103 }
104 }
105}
106
107impl From<ServerTimeouts> for PokerConfig {
108 fn from(value: ServerTimeouts) -> Self {
109 let game_config = GameSettings::default();
110 Self {
111 game_settings: game_config,
112 server_timeouts: value,
113 }
114 }
115}
116
117struct UnconfirmedClient {
118 stream: TcpStream,
119 t: Instant,
120 timeout: Duration,
121}
122
123impl UnconfirmedClient {
124 pub fn new(stream: TcpStream) -> Self {
125 Self {
126 stream,
127 t: Instant::now(),
128 timeout: Duration::ZERO,
129 }
130 }
131}
132
133/// This manager enables a few mechanisms:
134///
135/// - Helps keep tokens bounded, recycling unused tokens for future
136/// connections.
137/// - Associates tokens and usernames with clients, making it easier
138/// to read from and write to clients based on those attributes.
139/// - Tracks client connection states, dividing clients that haven't
140/// sent their username, clients that have sent their username but
141/// their usernames haven't been confirmed by the poker game, and
142/// clients that have sent their usernames and those usernames have
143/// been confirmed by the poker game.
144struct TokenManager {
145 pub confirmed_tokens: BTreeMap<Token, TcpStream>,
146 confirmed_usernames_to_tokens: HashMap<Username, Token>,
147 recycled_tokens: BTreeSet<Token>,
148 token_association_timeout: Duration,
149 tokens_to_usernames: BTreeMap<Token, Username>,
150 unconfirmed_tokens: BTreeMap<Token, UnconfirmedClient>,
151 unconfirmed_usernames_to_tokens: HashMap<Username, Token>,
152}
153
154impl TokenManager {
155 /// Associate a token with a TCP stream. Since tokens are usually registered
156 /// with a poll, the typical workflow is:
157 ///
158 /// 1. Create a new token.
159 /// 2. Register the token and stream with the poll.
160 /// 3. Associate the token and stream with the token manager.
161 ///
162 /// This transfers ownership of the stream to the token manager, allowing
163 /// deallocation of the stream wheenver the token is recycled.
164 pub fn associate_token_and_stream(&mut self, token: Token, stream: TcpStream) {
165 let unconfirmed_client = UnconfirmedClient::new(stream);
166 self.unconfirmed_tokens.insert(token, unconfirmed_client);
167 }
168
169 /// Associate a token with a username. This should be called in response
170 /// to a client declaring a username. This will catch cases where the username
171 /// is already taken, and cases where the client took too long to declare
172 /// a username after its connection has already been accepted by the server.
173 pub fn associate_token_and_username(
174 &mut self,
175 token: Token,
176 username: &Username,
177 ) -> Result<(), ClientError> {
178 if self.tokens_to_usernames.contains_key(&token)
179 || self.unconfirmed_usernames_to_tokens.contains_key(username)
180 || self.confirmed_usernames_to_tokens.contains_key(username)
181 {
182 Err(ClientError::AlreadyAssociated)
183 } else if self.recycled_tokens.contains(&token) {
184 Err(ClientError::Expired)
185 } else {
186 self.tokens_to_usernames.insert(token, username.clone());
187 self.unconfirmed_usernames_to_tokens
188 .insert(username.clone(), token);
189 Ok(())
190 }
191 }
192
193 /// Confirm a token's declared username. This acknowledges that the poker
194 /// game accepted their username and relieves the token from potential
195 /// expiration.
196 pub fn confirm_username(&mut self, token: Token) -> Result<(), ClientError> {
197 let username = self
198 .tokens_to_usernames
199 .get(&token)
200 .ok_or(ClientError::Unassociated)?;
201
202 let (username, token) = self
203 .unconfirmed_usernames_to_tokens
204 .remove_entry(username)
205 .ok_or(ClientError::Unassociated)?;
206
207 let unconfirmed_client = self
208 .unconfirmed_tokens
209 .remove(&token)
210 .expect("an unconfirmed username should correspond to an unconfirmed token");
211
212 self.confirmed_tokens
213 .insert(token, unconfirmed_client.stream);
214 self.confirmed_usernames_to_tokens.insert(username, token);
215
216 Ok(())
217 }
218
219 pub fn get_confirmed_username_with_token(&self, token: Token) -> Result<Username, ClientError> {
220 self.confirmed_tokens
221 .get(&token)
222 .and_then(|_| self.tokens_to_usernames.get(&token))
223 .cloned()
224 .ok_or(ClientError::Unassociated)
225 }
226
227 pub fn get_mut_stream_with_token(
228 &mut self,
229 token: &Token,
230 ) -> Result<&mut TcpStream, ClientError> {
231 if let Some(unconfirmed) = self.unconfirmed_tokens.get_mut(token) {
232 return Ok(&mut unconfirmed.stream);
233 }
234 if let Some(stream) = self.confirmed_tokens.get_mut(token) {
235 return Ok(stream);
236 }
237 Err(ClientError::DoesNotExist)
238 }
239
240 pub fn get_token_with_username(&self, username: &Username) -> Result<Token, ClientError> {
241 self.unconfirmed_usernames_to_tokens
242 .get(username)
243 .or_else(|| self.confirmed_usernames_to_tokens.get(username))
244 .copied()
245 .ok_or(ClientError::Unassociated)
246 }
247
248 pub fn new(token_association_timeout: Duration) -> Self {
249 Self {
250 confirmed_tokens: BTreeMap::new(),
251 confirmed_usernames_to_tokens: HashMap::new(),
252 recycled_tokens: BTreeSet::new(),
253 token_association_timeout,
254 tokens_to_usernames: BTreeMap::new(),
255 unconfirmed_tokens: BTreeMap::new(),
256 unconfirmed_usernames_to_tokens: HashMap::new(),
257 }
258 }
259
260 /// Create a new token.
261 pub fn new_token(&mut self) -> Token {
262 self.recycled_tokens.pop_first().unwrap_or_else(|| {
263 let newest = match (
264 self.unconfirmed_tokens.last_key_value(),
265 self.confirmed_tokens.last_key_value(),
266 ) {
267 (Some((unconfirmed, _)), Some((confirmed, _))) => max(unconfirmed, confirmed),
268 (Some((unconfirmed, _)), None) => unconfirmed,
269 (None, Some((verified, _))) => verified,
270 (None, None) => &WAKER,
271 };
272 Token(newest.0 + 1)
273 })
274 }
275
276 /// Recycle tokens that've gone stale because the client has yet
277 /// to associate a username with itself before the association timeout.
278 pub fn recycle_expired_tokens(&mut self) -> VecDeque<(Token, TcpStream)> {
279 let mut tokens_to_recycle = VecDeque::new();
280 for (token, unknown_client) in self
281 .unconfirmed_tokens
282 .iter_mut()
283 .filter(|(token, _)| !self.tokens_to_usernames.contains_key(token))
284 {
285 let t = Instant::now();
286 let dt = t - unknown_client.t;
287 unknown_client.t = t;
288 unknown_client.timeout += dt;
289 if unknown_client.timeout >= self.token_association_timeout {
290 tokens_to_recycle.push_back(*token);
291 }
292 }
293 let mut recyclables = VecDeque::new();
294 for token in tokens_to_recycle {
295 let unconfirmed_client = self
296 .unconfirmed_tokens
297 .remove(&token)
298 .expect("an unassociated token should be unconfirmed");
299 self.recycled_tokens.insert(token);
300 recyclables.push_back((token, unconfirmed_client.stream));
301 }
302 recyclables
303 }
304
305 /// Manually recycle an individual token. Should be used when a client is dropped,
306 /// unfaithful, or when a user leaves the game.
307 pub fn recycle_token(&mut self, token: Token) -> Result<TcpStream, ClientError> {
308 if let Some(username) = self.tokens_to_usernames.remove(&token) {
309 self.unconfirmed_usernames_to_tokens.remove(&username);
310 self.confirmed_usernames_to_tokens.remove(&username);
311 }
312
313 let stream = self
314 .unconfirmed_tokens
315 .remove(&token)
316 .map(|u| u.stream)
317 .or_else(|| self.confirmed_tokens.remove(&token))
318 .ok_or(ClientError::DoesNotExist)?;
319
320 self.recycled_tokens.insert(token);
321 Ok(stream)
322 }
323}
324
325/// Server data sender and waker need to be triggered at the same time, so just
326/// wrap them in the same struct to minimize mistakes.
327struct ServerDataSender {
328 sender: Sender<ServerData>,
329 waker: Waker,
330}
331
332impl ServerDataSender {
333 fn send(&self, msg: ServerData) -> Result<(), Error> {
334 self.sender.send(msg)?;
335 self.waker.wake()?;
336 Ok(())
337 }
338}
339
340/// Run the poker server in two separate threads. The parent thread manages
341/// the poker game state while the child thread manages non-blocking networking
342/// IO.
343pub fn run(addr: SocketAddr, config: PokerConfig) -> Result<(), Error> {
344 let max_network_events = MAX_NETWORK_EVENTS_PER_USER * config.game_settings.max_users;
345
346 let (tx_client, rx_client): (Sender<ClientMessage>, Receiver<ClientMessage>) = channel();
347 let (tx_server, rx_server): (Sender<ServerData>, Receiver<ServerData>) = channel();
348
349 let mut poll = Poll::new()?;
350 let waker = Waker::new(poll.registry(), WAKER)?;
351
352 let server_data_sender = ServerDataSender {
353 sender: tx_server,
354 waker,
355 };
356
357 // This thread is where the actual networking happens for non-blocking IO.
358 // A server is bound to the address and manages connections to clients.
359 // Messages from the main thread are queued for each client/user
360 // connection.
361 thread::spawn(move || -> Result<(), Error> {
362 let mut events = Events::with_capacity(max_network_events);
363 let mut messages_to_process: HashMap<Token, VecDeque<ClientMessage>> = HashMap::new();
364 let mut messages_to_write: HashMap<Token, VecDeque<ServerMessage>> = HashMap::new();
365 let mut server = TcpListener::bind(addr)?;
366 let mut token_manager = TokenManager::new(config.server_timeouts.connect);
367 let mut tokens_to_remove: HashSet<Token> = HashSet::new();
368 let mut tokens_to_reregister: HashSet<Token> = HashSet::new();
369 poll.registry()
370 .register(&mut server, SERVER, Interest::READABLE)?;
371
372 loop {
373 if let Err(error) = poll.poll(&mut events, Some(config.server_timeouts.poll)) {
374 match error.kind() {
375 io::ErrorKind::Interrupted => continue,
376 _ => bail!(error),
377 }
378 }
379
380 for event in &events {
381 match event.token() {
382 SERVER => loop {
383 // Received an event for the TCP server socket, which
384 // indicates we can accept a connection.
385 let mut stream = match server.accept() {
386 Ok((stream, _)) => stream,
387 Err(error) => {
388 match error.kind() {
389 // If we get a `WouldBlock` error we know our
390 // listener has no more incoming connections queued,
391 // so we can return to polling and wait for some
392 // more.
393 io::ErrorKind::WouldBlock => break,
394 // If it was any other kind of error, something went
395 // wrong and we should terminate.
396 _ => bail!(error),
397 }
398 }
399 };
400
401 let token = token_manager.new_token();
402 poll.registry()
403 .register(&mut stream, token, Interest::READABLE)?;
404 token_manager.associate_token_and_stream(token, stream);
405 let repr = token_to_string(&token);
406 debug!("accepted new connection with {repr}");
407 },
408 WAKER => {
409 // Drain server messages received from the parent thread so
410 // they can be relayed to the respective clients.
411 while let Ok(msg) = rx_server.try_recv() {
412 match msg {
413 // Acks are effectively successful responses to client
414 // messages and are relayed to all clients.
415 ServerData::Ack(msg) => {
416 // We only need to check this connect edge case because all other
417 // client commands can only go through to the parent thread if the
418 // client's username has already been confirmed by the parent
419 // thread.
420 if msg.command == UserCommand::Connect {
421 let disconnected = token_manager
422 .get_token_with_username(&msg.username)
423 .map_or(true, |token| {
424 token_manager.confirm_username(token).is_err()
425 });
426 // The client disconnected before the server could confirm their
427 // username even though the username was OK. A bit of an edge case,
428 // we need to notify the main thread that they disconnected. We'll
429 // still send out the acknowledgement to other clients saying that
430 // they were able to connect briefly.
431 if disconnected {
432 let msg = ClientMessage {
433 username: msg.username.clone(),
434 command: UserCommand::Disconnect,
435 };
436 tx_client.send(msg)?;
437 }
438 }
439 for token in token_manager.confirmed_tokens.keys() {
440 let msg = ServerMessage::Ack(msg.clone());
441 messages_to_write.entry(*token).or_default().push_back(msg);
442 tokens_to_reregister.insert(*token);
443 }
444 }
445 ServerData::Event(msg) => {
446 // Sanity check to make sure we no longer send messages to this
447 // user since they are no longer in the game.
448 match &msg {
449 GameEvent::Kicked(username)
450 | GameEvent::Removed(username) => {
451 if let Ok(token) =
452 token_manager.get_token_with_username(username)
453 {
454 messages_to_write.remove(&token);
455 if let Ok(mut stream) =
456 token_manager.recycle_token(token)
457 {
458 poll.registry().deregister(&mut stream)?;
459 }
460 }
461 }
462 _ => {}
463 }
464 for token in token_manager.confirmed_tokens.keys() {
465 let msg = ServerMessage::GameEvent(msg.clone());
466 messages_to_write.entry(*token).or_default().push_back(msg);
467 tokens_to_reregister.insert(*token);
468 }
469 }
470 // A response goes to a single client. We can safely ignore cases where a
471 // client no longer exists to receive a response because the response
472 // is meant just for the client.
473 ServerData::TurnSignal {
474 username,
475 action_choices,
476 } => {
477 if let Ok(token) =
478 token_manager.get_token_with_username(&username)
479 {
480 let msg = ServerMessage::TurnSignal(action_choices);
481 messages_to_write.entry(token).or_default().push_back(msg);
482 tokens_to_reregister.insert(token);
483 }
484 }
485 // A response goes to a single client. We can safely ignore cases where a
486 // client no longer exists to receive a response because the response
487 // is meant just for the client.
488 ServerData::UserError { username, error } => {
489 if let Ok(token) =
490 token_manager.get_token_with_username(&username)
491 {
492 let msg = ServerMessage::UserError(error);
493 messages_to_write.entry(token).or_default().push_back(msg);
494 tokens_to_reregister.insert(token);
495 }
496 }
497 // Server status is a game status update to all clients.
498 ServerData::Status(msg) => {
499 for token in token_manager.confirmed_tokens.keys() {
500 let msg = ServerMessage::Status(msg.clone());
501 messages_to_write.entry(*token).or_default().push_back(msg);
502 tokens_to_reregister.insert(*token);
503 }
504 }
505 // Views go to all clients. We can safely ignore cases where a client
506 // no longer exists to receive a view because the view is specific
507 // to the client.
508 ServerData::Views(views) => {
509 for (username, view) in views {
510 if let Ok(token) =
511 token_manager.get_token_with_username(&username)
512 {
513 let msg = ServerMessage::GameView(view);
514 messages_to_write
515 .entry(token)
516 .or_default()
517 .push_back(msg);
518 tokens_to_reregister.insert(token);
519 }
520 }
521 }
522 }
523 }
524 for token in tokens_to_reregister.drain() {
525 if let Ok(stream) = token_manager.get_mut_stream_with_token(&token) {
526 poll.registry().reregister(
527 stream,
528 token,
529 Interest::READABLE | Interest::WRITABLE,
530 )?;
531 }
532 }
533 }
534 // Only care about events associated with clients that are
535 // still valid.
536 token if !tokens_to_remove.contains(&token) => {
537 // Maybe received an event for a TCP connection.
538 if let Ok(stream) = token_manager.get_mut_stream_with_token(&token) {
539 if event.is_writable()
540 && let Some(messages) = messages_to_write.get_mut(&token)
541 {
542 // Need to handle the case where there's an unresponsive or
543 // misbehaving client that doesn't let us write messages to
544 // them. If their message queue reaches a certain size, queue
545 // them for removal.
546 if messages.len() >= max_network_events {
547 let repr = token_to_string(&token);
548 error!("{repr} has not been receiving and will be removed");
549 tokens_to_remove.insert(token);
550 continue;
551 }
552 while let Some(msg) = messages.pop_front() {
553 match write_prefixed::<ServerMessage, TcpStream>(stream, &msg) {
554 Ok(()) => {
555 // Client errors are strict and result in the removal of a connection.
556 if let ServerMessage::ClientError(_) = msg {
557 let repr = token_to_string(&token);
558 debug!("{repr}: {msg}");
559 tokens_to_remove.insert(token);
560 break;
561 }
562 }
563 Err(error) => {
564 match error.kind() {
565 // `write_prefixed` uses `write_all` under the hood, so we know
566 // that if any of these occur, then the connection was probably
567 // dropped at some point.
568 io::ErrorKind::BrokenPipe
569 | io::ErrorKind::ConnectionAborted
570 | io::ErrorKind::ConnectionReset
571 | io::ErrorKind::TimedOut
572 | io::ErrorKind::UnexpectedEof => {
573 let repr = token_to_string(&token);
574 debug!("{repr} connection dropped");
575 tokens_to_remove.insert(token);
576 }
577 // Would block "errors" are the OS's way of saying that the
578 // connection is not actually ready to perform this I/O operation.
579 io::ErrorKind::WouldBlock => {
580 // The message couldn't be sent, so we need to push it back
581 // onto the queue so we don't accidentally forget about it.
582 messages.push_front(msg);
583 }
584 // Retry writing in the case that the full message couldn't
585 // be written. This should be infrequent.
586 io::ErrorKind::WriteZero => {
587 let repr = token_to_string(&token);
588 debug!(
589 "{repr} got a zero write, but will retry"
590 );
591 messages.push_front(msg);
592 continue;
593 }
594 // Other errors we'll consider fatal.
595 _ => bail!(error),
596 }
597 poll.registry().reregister(
598 stream,
599 token,
600 Interest::READABLE,
601 )?;
602 break;
603 }
604 }
605 }
606 }
607
608 if event.is_readable() {
609 // We can (maybe) read from the connection.
610 loop {
611 match read_prefixed::<ClientMessage, TcpStream>(stream) {
612 Ok(msg) => {
613 let messages =
614 messages_to_process.entry(token).or_default();
615 messages.push_back(msg);
616 if messages.len() >= MAX_NETWORK_EVENTS_PER_USER {
617 let repr = token_to_string(&token);
618 error!(
619 "{repr} has been spamming and will be removed"
620 );
621 tokens_to_remove.insert(token);
622 break;
623 }
624 }
625 Err(error) => {
626 match error.kind() {
627 // `read_prefixed` uses `read_exact` under the hood, so we know
628 // that an Eof error means the connection was dropped.
629 io::ErrorKind::BrokenPipe
630 | io::ErrorKind::ConnectionAborted
631 | io::ErrorKind::ConnectionReset
632 | io::ErrorKind::InvalidData
633 | io::ErrorKind::TimedOut
634 | io::ErrorKind::UnexpectedEof => {
635 let repr = token_to_string(&token);
636 debug!("{repr}'s connection dropped");
637 tokens_to_remove.insert(token);
638 }
639 // Would block "errors" are the OS's way of saying that the
640 // connection is not actually ready to perform this I/O operation.
641 io::ErrorKind::WouldBlock => {}
642 // Other errors we'll consider fatal.
643 _ => {
644 bail!(error)
645 }
646 }
647 break;
648 }
649 }
650 }
651 }
652 }
653 }
654 // The client is already queued for removal and so this event
655 // will be ignored.
656 _ => {}
657 }
658 }
659
660 // Process all the messages received from the clients.
661 for (token, msgs) in messages_to_process
662 .drain()
663 .filter(|(t, _)| !tokens_to_remove.contains(t))
664 {
665 for msg in msgs {
666 let result = match msg.command {
667 // Check if the client wasn't able to associate its token with a username
668 // in time, or if that username is already taken.
669 UserCommand::Connect => {
670 token_manager.associate_token_and_username(token, &msg.username)
671 }
672 // Check if the client is being faithful and sending messages with
673 // the correct username.
674 _ => match token_manager.get_token_with_username(&msg.username) {
675 Ok(associated_token) => {
676 if token == associated_token {
677 Ok(())
678 } else {
679 Err(ClientError::Unassociated)
680 }
681 }
682 Err(error) => Err(error),
683 },
684 };
685 let repr = token_to_string(&token);
686 match result {
687 Ok(()) => {
688 debug!("{repr}: {msg}");
689 tx_client.send(msg)?;
690 }
691 Err(error) => {
692 debug!("{repr}: {error}");
693 let msg = ServerMessage::ClientError(error);
694 messages_to_write.entry(token).or_default().push_back(msg);
695 tokens_to_reregister.insert(token);
696 }
697 }
698 }
699 }
700 // Make sure we allow writing errors back to the client.
701 for token in tokens_to_reregister.drain() {
702 if let Ok(stream) = token_manager.get_mut_stream_with_token(&token) {
703 poll.registry().reregister(
704 stream,
705 token,
706 Interest::READABLE | Interest::WRITABLE,
707 )?;
708 }
709 }
710
711 // Recycle all tokens that need to be removed, deregistering their streams
712 // with the poll.
713 for token in tokens_to_remove.drain() {
714 let repr = token_to_string(&token);
715 debug!("{repr} is being removed");
716 if let Ok(username) = token_manager.get_confirmed_username_with_token(token) {
717 let msg = ClientMessage {
718 username,
719 command: UserCommand::Disconnect,
720 };
721 tx_client.send(msg)?;
722 }
723 messages_to_write.remove(&token);
724 if let Ok(mut stream) = token_manager.recycle_token(token) {
725 poll.registry().deregister(&mut stream)?;
726 }
727 }
728 for (token, mut stream) in token_manager.recycle_expired_tokens() {
729 let repr = token_to_string(&token);
730 debug!("{repr} expired");
731 messages_to_write.remove(&token);
732 poll.registry().deregister(&mut stream)?;
733 }
734 }
735 });
736
737 let mut state: PokerState = config.game_settings.into();
738 let mut status = state.to_string();
739 loop {
740 // Order is kind of key here. We get the status string before
741 // we step so we can inform users what's happening rather than
742 // what's going to happen in the future. This allows faster
743 // feedback from a user's perspective.
744 let repr = state.to_string();
745 // Only send new statuses to clients to avoid spam.
746 if status != repr {
747 info!("{repr}");
748 status = repr;
749 let msg = ServerData::Status(status.clone());
750 server_data_sender.send(msg)?;
751 }
752 state = state.step();
753
754 let views = state.get_views();
755 let msg = ServerData::Views(views);
756 server_data_sender.send(msg)?;
757
758 let mut next_action_username = state.get_next_action_username();
759 let mut timeout = config.server_timeouts.step;
760 'command: loop {
761 // Check if it's a user's turn. If so, send them a turn signal
762 // and increase the timeout to give them time to make their
763 // decision. We also keep track of their username so we
764 // can tell if they don't make a decision in time.
765 if let Some(username) = state.get_next_action_username()
766 && let Some(action_choices) = state.get_action_choices()
767 {
768 // Check if the username from the last turn is the same as the
769 // username from this turn. If so, we need to check if there
770 // was a timeout.
771 if let Some(ref last_username) = next_action_username {
772 // If there's a timeout, then that means the user didn't
773 // make a decision in time, and they have to fold.
774 if timeout.as_secs() == 0 && &username == last_username {
775 // Ack that they will fold (the poker state will
776 // fold for them).
777 warn!("{username} ran out of time and will be forced to fold");
778 let command = UserCommand::TakeAction(Action::Fold);
779 let msg = ServerData::Ack(ClientMessage {
780 username: username.clone(),
781 command,
782 });
783 server_data_sender.send(msg)?;
784
785 // Force remove them so they don't disrupt future games.
786 warn!("{username} will be removed at the end of the game");
787 state.remove_user(&username)?;
788
789 break 'command;
790 }
791 // Let all users know whose turn it is.
792 let status = format!("it's {username}'s turn and they can {action_choices}");
793 let msg = ServerData::Status(status.clone());
794 server_data_sender.send(msg)?;
795
796 // Let player know it's their turn.
797 info!("{status}");
798 let msg = ServerData::TurnSignal {
799 username: username.clone(),
800 action_choices,
801 };
802 server_data_sender.send(msg)?;
803
804 next_action_username = Some(username);
805 timeout = config.server_timeouts.action;
806 }
807 }
808 // If it's no one's turn and there's a timeout, then we must
809 // break to update the poker state.
810 else if timeout.as_secs() == 0 {
811 break 'command;
812 }
813
814 // Use the timeout duration to process events from the server's
815 // IO thread. This is really the main server loop.
816 while timeout.as_secs() > 0 {
817 // Drain game events before catching more user commands.
818 for event in state.drain_events() {
819 info!("{event}");
820 let msg = ServerData::Event(event);
821 server_data_sender.send(msg)?;
822 }
823 let start = Instant::now();
824 if let Ok(mut msg) = rx_client.recv_timeout(timeout) {
825 let result = match msg.command {
826 UserCommand::ChangeState(ref new_user_state) => match new_user_state {
827 UserState::Play => state.waitlist_user(&msg.username).map(|_| ()),
828 UserState::Spectate => state.spectate_user(&msg.username).map(|_| ()),
829 },
830 UserCommand::Connect => state.new_user(&msg.username).map(|_| ()),
831 UserCommand::Disconnect => state.remove_user(&msg.username).map(|_| ()),
832 UserCommand::ShowHand => state.show_hand(&msg.username),
833 UserCommand::StartGame => state.init_start(&msg.username),
834 UserCommand::TakeAction(ref mut action) => state
835 .take_action(&msg.username, action.clone())
836 .map(|new_action| {
837 timeout = Duration::ZERO;
838 *action = new_action;
839 }),
840 UserCommand::CastVote(ref vote) => state
841 .cast_vote(&msg.username, vote.clone())
842 .and_then(|maybe_vote| {
843 maybe_vote.map_or(Ok(()), |vote| match vote {
844 Vote::Kick(username) => state.kick_user(&username).map(|_| ()),
845 Vote::Reset(None) => {
846 state.reset_all_money();
847 Ok(())
848 }
849 Vote::Reset(Some(username)) => {
850 state.reset_user_money(&username).map(|_| ())
851 }
852 })
853 }),
854 };
855
856 // Get the result from a client's command. If their command
857 // is OK, ack the command to all clients so they know what
858 // happened. If their command is bad, send an error back to
859 // the commanding client.
860 match result {
861 Ok(()) => {
862 info!("{msg}");
863 let msg = ServerData::Ack(msg);
864 server_data_sender.send(msg)?;
865
866 let msg = ServerData::Views(state.get_views());
867 server_data_sender.send(msg)?;
868 }
869 Err(error) => {
870 error!("{error}: {msg}");
871 let msg = ServerData::UserError {
872 username: msg.username,
873 error,
874 };
875 server_data_sender.send(msg)?;
876 }
877 }
878 }
879 timeout = timeout.saturating_sub(start.elapsed());
880 }
881 }
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use std::time::Duration;
888
889 use mio::{
890 Token,
891 net::{TcpListener, TcpStream},
892 };
893
894 use crate::entities::Username;
895 use crate::net::messages::ClientError;
896
897 use super::TokenManager;
898
899 fn get_server() -> TcpListener {
900 let random_port_addr = "127.0.0.1:0".parse().unwrap();
901 TcpListener::bind(random_port_addr).unwrap()
902 }
903
904 fn get_stream(listener: &TcpListener) -> TcpStream {
905 let addr = listener.local_addr().unwrap();
906 TcpStream::connect(addr).unwrap();
907 let (stream, _) = listener.accept().unwrap();
908 stream
909 }
910
911 #[test]
912 fn confirm_username() {
913 let server = get_server();
914 let stream = get_stream(&server);
915 let mut token_manager = TokenManager::new(Duration::ZERO);
916
917 let token = token_manager.new_token();
918 token_manager.associate_token_and_stream(token, stream);
919
920 let username = Username::new("ognf");
921 assert_eq!(
922 token_manager.get_token_with_username(&username),
923 Err(ClientError::Unassociated)
924 );
925 assert_eq!(
926 token_manager.associate_token_and_username(token, &username),
927 Ok(())
928 );
929 assert_eq!(token_manager.get_token_with_username(&username), Ok(token));
930
931 assert_eq!(token_manager.confirm_username(token), Ok(()));
932 assert_eq!(
933 token_manager.get_confirmed_username_with_token(token),
934 Ok(username.clone())
935 );
936 assert_eq!(token_manager.get_token_with_username(&username), Ok(token));
937 }
938
939 #[test]
940 fn confirm_username_recycled_token() {
941 let server = get_server();
942 let stream = get_stream(&server);
943 let mut token_manager = TokenManager::new(Duration::ZERO);
944
945 let token = token_manager.new_token();
946 token_manager.associate_token_and_stream(token, stream);
947 token_manager.recycle_expired_tokens();
948
949 let username = Username::new("ognf");
950 assert_eq!(
951 token_manager.get_token_with_username(&username),
952 Err(ClientError::Unassociated)
953 );
954 assert_eq!(
955 token_manager.associate_token_and_username(token, &username),
956 Err(ClientError::Expired)
957 );
958 }
959
960 #[test]
961 fn recycle_expired_tokens() {
962 let server = get_server();
963 let stream1 = get_stream(&server);
964 let stream2 = get_stream(&server);
965 let stream3 = get_stream(&server);
966 let stream4 = get_stream(&server);
967 let mut token_manager = TokenManager::new(Duration::ZERO);
968
969 // Create a couple of tokens and immediately recycle them.
970 let token1 = token_manager.new_token();
971 token_manager.associate_token_and_stream(token1, stream1);
972 let token2 = token_manager.new_token();
973 token_manager.associate_token_and_stream(token2, stream2);
974 token_manager.recycle_expired_tokens();
975
976 // Tokens are immediately resused.
977 let token3 = token_manager.new_token();
978 token_manager.associate_token_and_stream(token1, stream3);
979 let token4 = token_manager.new_token();
980 token_manager.associate_token_and_stream(token2, stream4);
981 assert_eq!(token1, Token(2));
982 assert_eq!(token1, token3);
983 assert_eq!(token2, Token(3));
984 assert_eq!(token2, token4);
985 }
986
987 #[test]
988 fn recycle_token() {
989 let server = get_server();
990 let stream1 = get_stream(&server);
991 let stream2 = get_stream(&server);
992 let mut token_manager = TokenManager::new(Duration::ZERO);
993
994 let token1 = token_manager.new_token();
995 token_manager.associate_token_and_stream(token1, stream1);
996 let token2 = token_manager.new_token();
997 token_manager.associate_token_and_stream(token2, stream2);
998
999 let username = Username::new("ognf");
1000 assert_eq!(
1001 token_manager.associate_token_and_username(token1, &username),
1002 Ok(())
1003 );
1004 assert_eq!(
1005 token_manager.associate_token_and_username(token2, &username),
1006 Err(ClientError::AlreadyAssociated)
1007 );
1008 assert!(token_manager.recycle_token(token1).is_ok());
1009 assert_eq!(
1010 token_manager.associate_token_and_username(token2, &username),
1011 Ok(())
1012 );
1013 assert_eq!(token1, token_manager.new_token());
1014 }
1015}