private_poker/net/
server.rs

1use anyhow::{Error, bail};
2use log::{debug, error, info, warn};
3use mio::{
4    Events, Interest, Poll, Token, Waker,
5    net::{TcpListener, TcpStream},
6};
7use serde::{Deserialize, Serialize};
8use std::{
9    cmp::max,
10    collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
11    io,
12    net::SocketAddr,
13    sync::mpsc::{Receiver, Sender, channel},
14    thread,
15    time::{Duration, Instant},
16};
17
18use super::{
19    super::{
20        UserError,
21        entities::Vote,
22        game::{
23            GameEvent, GameSettings, GameStateManagement, PhaseDependentUserManagement,
24            PhaseIndependentUserManagement, PokerState,
25            entities::{Action, ActionChoices, GameView, Username},
26        },
27    },
28    messages::{ClientError, ClientMessage, ServerMessage, UserCommand, UserState},
29    utils::{read_prefixed, write_prefixed},
30};
31
32pub const DEFAULT_ACTION_TIMEOUT: Duration = Duration::from_secs(30);
33pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
34pub const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
35pub const DEFAULT_STEP_TIMEOUT: Duration = Duration::from_secs(5);
36pub const MAX_NETWORK_EVENTS_PER_USER: usize = 6;
37pub const SERVER: Token = Token(0);
38pub const WAKER: Token = Token(1);
39
40/// A server message for communication between poker server threads. This
41/// message is never sent directly to poker clients, but fields within the
42/// underlying variants are.
43#[derive(Debug, Deserialize, Serialize)]
44enum ServerData {
45    /// An acknowledgement of a client message, signaling that the client's
46    /// command was successfully processed by the game thread.
47    Ack(ClientMessage),
48    /// An acknowledgement of a server game event, signaling that an internal
49    /// event was processed during a state transition or as a result of a user's
50    /// action.
51    Event(GameEvent),
52    /// Signaling a specific user made an error.
53    UserError {
54        username: Username,
55        error: UserError,
56    },
57    /// Signaling it's a specific user's turn.
58    TurnSignal {
59        username: Username,
60        action_choices: ActionChoices,
61    },
62    /// Game state represented as a string.
63    Status(String),
64    /// Mapping of usernames to their game views.
65    Views(HashMap<Username, GameView>),
66}
67
68fn token_to_string(token: &Token) -> String {
69    let id = token.0;
70    format!("token({id})")
71}
72
73pub struct ServerTimeouts {
74    pub action: Duration,
75    pub connect: Duration,
76    pub poll: Duration,
77    pub step: Duration,
78}
79
80impl Default for ServerTimeouts {
81    fn default() -> Self {
82        Self {
83            action: DEFAULT_ACTION_TIMEOUT,
84            connect: DEFAULT_CONNECT_TIMEOUT,
85            poll: DEFAULT_POLL_TIMEOUT,
86            step: DEFAULT_STEP_TIMEOUT,
87        }
88    }
89}
90
91#[derive(Default)]
92pub struct PokerConfig {
93    pub game_settings: GameSettings,
94    pub server_timeouts: ServerTimeouts,
95}
96
97impl From<GameSettings> for PokerConfig {
98    fn from(value: GameSettings) -> Self {
99        let server_timeouts = ServerTimeouts::default();
100        Self {
101            game_settings: value,
102            server_timeouts,
103        }
104    }
105}
106
107impl From<ServerTimeouts> for PokerConfig {
108    fn from(value: ServerTimeouts) -> Self {
109        let game_config = GameSettings::default();
110        Self {
111            game_settings: game_config,
112            server_timeouts: value,
113        }
114    }
115}
116
117struct UnconfirmedClient {
118    stream: TcpStream,
119    t: Instant,
120    timeout: Duration,
121}
122
123impl UnconfirmedClient {
124    pub fn new(stream: TcpStream) -> Self {
125        Self {
126            stream,
127            t: Instant::now(),
128            timeout: Duration::ZERO,
129        }
130    }
131}
132
133/// This manager enables a few mechanisms:
134///
135/// - Helps keep tokens bounded, recycling unused tokens for future
136///   connections.
137/// - Associates tokens and usernames with clients, making it easier
138///   to read from and write to clients based on those attributes.
139/// - Tracks client connection states, dividing clients that haven't
140///   sent their username, clients that have sent their username but
141///   their usernames haven't been confirmed by the poker game, and
142///   clients that have sent their usernames and those usernames have
143///   been confirmed by the poker game.
144struct TokenManager {
145    pub confirmed_tokens: BTreeMap<Token, TcpStream>,
146    confirmed_usernames_to_tokens: HashMap<Username, Token>,
147    recycled_tokens: BTreeSet<Token>,
148    token_association_timeout: Duration,
149    tokens_to_usernames: BTreeMap<Token, Username>,
150    unconfirmed_tokens: BTreeMap<Token, UnconfirmedClient>,
151    unconfirmed_usernames_to_tokens: HashMap<Username, Token>,
152}
153
154impl TokenManager {
155    /// Associate a token with a TCP stream. Since tokens are usually registered
156    /// with a poll, the typical workflow is:
157    ///
158    /// 1. Create a new token.
159    /// 2. Register the token and stream with the poll.
160    /// 3. Associate the token and stream with the token manager.
161    ///
162    /// This transfers ownership of the stream to the token manager, allowing
163    /// deallocation of the stream wheenver the token is recycled.
164    pub fn associate_token_and_stream(&mut self, token: Token, stream: TcpStream) {
165        let unconfirmed_client = UnconfirmedClient::new(stream);
166        self.unconfirmed_tokens.insert(token, unconfirmed_client);
167    }
168
169    /// Associate a token with a username. This should be called in response
170    /// to a client declaring a username. This will catch cases where the username
171    /// is already taken, and cases where the client took too long to declare
172    /// a username after its connection has already been accepted by the server.
173    pub fn associate_token_and_username(
174        &mut self,
175        token: Token,
176        username: &Username,
177    ) -> Result<(), ClientError> {
178        if self.tokens_to_usernames.contains_key(&token)
179            || self.unconfirmed_usernames_to_tokens.contains_key(username)
180            || self.confirmed_usernames_to_tokens.contains_key(username)
181        {
182            Err(ClientError::AlreadyAssociated)
183        } else if self.recycled_tokens.contains(&token) {
184            Err(ClientError::Expired)
185        } else {
186            self.tokens_to_usernames.insert(token, username.clone());
187            self.unconfirmed_usernames_to_tokens
188                .insert(username.clone(), token);
189            Ok(())
190        }
191    }
192
193    /// Confirm a token's declared username. This acknowledges that the poker
194    /// game accepted their username and relieves the token from potential
195    /// expiration.
196    pub fn confirm_username(&mut self, token: Token) -> Result<(), ClientError> {
197        let username = self
198            .tokens_to_usernames
199            .get(&token)
200            .ok_or(ClientError::Unassociated)?;
201
202        let (username, token) = self
203            .unconfirmed_usernames_to_tokens
204            .remove_entry(username)
205            .ok_or(ClientError::Unassociated)?;
206
207        let unconfirmed_client = self
208            .unconfirmed_tokens
209            .remove(&token)
210            .expect("an unconfirmed username should correspond to an unconfirmed token");
211
212        self.confirmed_tokens
213            .insert(token, unconfirmed_client.stream);
214        self.confirmed_usernames_to_tokens.insert(username, token);
215
216        Ok(())
217    }
218
219    pub fn get_confirmed_username_with_token(&self, token: Token) -> Result<Username, ClientError> {
220        self.confirmed_tokens
221            .get(&token)
222            .and_then(|_| self.tokens_to_usernames.get(&token))
223            .cloned()
224            .ok_or(ClientError::Unassociated)
225    }
226
227    pub fn get_mut_stream_with_token(
228        &mut self,
229        token: &Token,
230    ) -> Result<&mut TcpStream, ClientError> {
231        if let Some(unconfirmed) = self.unconfirmed_tokens.get_mut(token) {
232            return Ok(&mut unconfirmed.stream);
233        }
234        if let Some(stream) = self.confirmed_tokens.get_mut(token) {
235            return Ok(stream);
236        }
237        Err(ClientError::DoesNotExist)
238    }
239
240    pub fn get_token_with_username(&self, username: &Username) -> Result<Token, ClientError> {
241        self.unconfirmed_usernames_to_tokens
242            .get(username)
243            .or_else(|| self.confirmed_usernames_to_tokens.get(username))
244            .copied()
245            .ok_or(ClientError::Unassociated)
246    }
247
248    pub fn new(token_association_timeout: Duration) -> Self {
249        Self {
250            confirmed_tokens: BTreeMap::new(),
251            confirmed_usernames_to_tokens: HashMap::new(),
252            recycled_tokens: BTreeSet::new(),
253            token_association_timeout,
254            tokens_to_usernames: BTreeMap::new(),
255            unconfirmed_tokens: BTreeMap::new(),
256            unconfirmed_usernames_to_tokens: HashMap::new(),
257        }
258    }
259
260    /// Create a new token.
261    pub fn new_token(&mut self) -> Token {
262        self.recycled_tokens.pop_first().unwrap_or_else(|| {
263            let newest = match (
264                self.unconfirmed_tokens.last_key_value(),
265                self.confirmed_tokens.last_key_value(),
266            ) {
267                (Some((unconfirmed, _)), Some((confirmed, _))) => max(unconfirmed, confirmed),
268                (Some((unconfirmed, _)), None) => unconfirmed,
269                (None, Some((verified, _))) => verified,
270                (None, None) => &WAKER,
271            };
272            Token(newest.0 + 1)
273        })
274    }
275
276    /// Recycle tokens that've gone stale because the client has yet
277    /// to associate a username with itself before the association timeout.
278    pub fn recycle_expired_tokens(&mut self) -> VecDeque<(Token, TcpStream)> {
279        let mut tokens_to_recycle = VecDeque::new();
280        for (token, unknown_client) in self
281            .unconfirmed_tokens
282            .iter_mut()
283            .filter(|(token, _)| !self.tokens_to_usernames.contains_key(token))
284        {
285            let t = Instant::now();
286            let dt = t - unknown_client.t;
287            unknown_client.t = t;
288            unknown_client.timeout += dt;
289            if unknown_client.timeout >= self.token_association_timeout {
290                tokens_to_recycle.push_back(*token);
291            }
292        }
293        let mut recyclables = VecDeque::new();
294        for token in tokens_to_recycle {
295            let unconfirmed_client = self
296                .unconfirmed_tokens
297                .remove(&token)
298                .expect("an unassociated token should be unconfirmed");
299            self.recycled_tokens.insert(token);
300            recyclables.push_back((token, unconfirmed_client.stream));
301        }
302        recyclables
303    }
304
305    /// Manually recycle an individual token. Should be used when a client is dropped,
306    /// unfaithful, or when a user leaves the game.
307    pub fn recycle_token(&mut self, token: Token) -> Result<TcpStream, ClientError> {
308        if let Some(username) = self.tokens_to_usernames.remove(&token) {
309            self.unconfirmed_usernames_to_tokens.remove(&username);
310            self.confirmed_usernames_to_tokens.remove(&username);
311        }
312
313        let stream = self
314            .unconfirmed_tokens
315            .remove(&token)
316            .map(|u| u.stream)
317            .or_else(|| self.confirmed_tokens.remove(&token))
318            .ok_or(ClientError::DoesNotExist)?;
319
320        self.recycled_tokens.insert(token);
321        Ok(stream)
322    }
323}
324
325/// Server data sender and waker need to be triggered at the same time, so just
326/// wrap them in the same struct to minimize mistakes.
327struct ServerDataSender {
328    sender: Sender<ServerData>,
329    waker: Waker,
330}
331
332impl ServerDataSender {
333    fn send(&self, msg: ServerData) -> Result<(), Error> {
334        self.sender.send(msg)?;
335        self.waker.wake()?;
336        Ok(())
337    }
338}
339
340/// Run the poker server in two separate threads. The parent thread manages
341/// the poker game state while the child thread manages non-blocking networking
342/// IO.
343pub fn run(addr: SocketAddr, config: PokerConfig) -> Result<(), Error> {
344    let max_network_events = MAX_NETWORK_EVENTS_PER_USER * config.game_settings.max_users;
345
346    let (tx_client, rx_client): (Sender<ClientMessage>, Receiver<ClientMessage>) = channel();
347    let (tx_server, rx_server): (Sender<ServerData>, Receiver<ServerData>) = channel();
348
349    let mut poll = Poll::new()?;
350    let waker = Waker::new(poll.registry(), WAKER)?;
351
352    let server_data_sender = ServerDataSender {
353        sender: tx_server,
354        waker,
355    };
356
357    // This thread is where the actual networking happens for non-blocking IO.
358    // A server is bound to the address and manages connections to clients.
359    // Messages from the main thread are queued for each client/user
360    // connection.
361    thread::spawn(move || -> Result<(), Error> {
362        let mut events = Events::with_capacity(max_network_events);
363        let mut messages_to_process: HashMap<Token, VecDeque<ClientMessage>> = HashMap::new();
364        let mut messages_to_write: HashMap<Token, VecDeque<ServerMessage>> = HashMap::new();
365        let mut server = TcpListener::bind(addr)?;
366        let mut token_manager = TokenManager::new(config.server_timeouts.connect);
367        let mut tokens_to_remove: HashSet<Token> = HashSet::new();
368        let mut tokens_to_reregister: HashSet<Token> = HashSet::new();
369        poll.registry()
370            .register(&mut server, SERVER, Interest::READABLE)?;
371
372        loop {
373            if let Err(error) = poll.poll(&mut events, Some(config.server_timeouts.poll)) {
374                match error.kind() {
375                    io::ErrorKind::Interrupted => continue,
376                    _ => bail!(error),
377                }
378            }
379
380            for event in &events {
381                match event.token() {
382                    SERVER => loop {
383                        // Received an event for the TCP server socket, which
384                        // indicates we can accept a connection.
385                        let mut stream = match server.accept() {
386                            Ok((stream, _)) => stream,
387                            Err(error) => {
388                                match error.kind() {
389                                    // If we get a `WouldBlock` error we know our
390                                    // listener has no more incoming connections queued,
391                                    // so we can return to polling and wait for some
392                                    // more.
393                                    io::ErrorKind::WouldBlock => break,
394                                    // If it was any other kind of error, something went
395                                    // wrong and we should terminate.
396                                    _ => bail!(error),
397                                }
398                            }
399                        };
400
401                        let token = token_manager.new_token();
402                        poll.registry()
403                            .register(&mut stream, token, Interest::READABLE)?;
404                        token_manager.associate_token_and_stream(token, stream);
405                        let repr = token_to_string(&token);
406                        debug!("accepted new connection with {repr}");
407                    },
408                    WAKER => {
409                        // Drain server messages received from the parent thread so
410                        // they can be relayed to the respective clients.
411                        while let Ok(msg) = rx_server.try_recv() {
412                            match msg {
413                                // Acks are effectively successful responses to client
414                                // messages and are relayed to all clients.
415                                ServerData::Ack(msg) => {
416                                    // We only need to check this connect edge case because all other
417                                    // client commands can only go through to the parent thread if the
418                                    // client's username has already been confirmed by the parent
419                                    // thread.
420                                    if msg.command == UserCommand::Connect {
421                                        let disconnected = token_manager
422                                            .get_token_with_username(&msg.username)
423                                            .map_or(true, |token| {
424                                                token_manager.confirm_username(token).is_err()
425                                            });
426                                        // The client disconnected before the server could confirm their
427                                        // username even though the username was OK. A bit of an edge case,
428                                        // we need to notify the main thread that they disconnected. We'll
429                                        // still send out the acknowledgement to other clients saying that
430                                        // they were able to connect briefly.
431                                        if disconnected {
432                                            let msg = ClientMessage {
433                                                username: msg.username.clone(),
434                                                command: UserCommand::Disconnect,
435                                            };
436                                            tx_client.send(msg)?;
437                                        }
438                                    }
439                                    for token in token_manager.confirmed_tokens.keys() {
440                                        let msg = ServerMessage::Ack(msg.clone());
441                                        messages_to_write.entry(*token).or_default().push_back(msg);
442                                        tokens_to_reregister.insert(*token);
443                                    }
444                                }
445                                ServerData::Event(msg) => {
446                                    // Sanity check to make sure we no longer send messages to this
447                                    // user since they are no longer in the game.
448                                    match &msg {
449                                        GameEvent::Kicked(username)
450                                        | GameEvent::Removed(username) => {
451                                            if let Ok(token) =
452                                                token_manager.get_token_with_username(username)
453                                            {
454                                                messages_to_write.remove(&token);
455                                                if let Ok(mut stream) =
456                                                    token_manager.recycle_token(token)
457                                                {
458                                                    poll.registry().deregister(&mut stream)?;
459                                                }
460                                            }
461                                        }
462                                        _ => {}
463                                    }
464                                    for token in token_manager.confirmed_tokens.keys() {
465                                        let msg = ServerMessage::GameEvent(msg.clone());
466                                        messages_to_write.entry(*token).or_default().push_back(msg);
467                                        tokens_to_reregister.insert(*token);
468                                    }
469                                }
470                                // A response goes to a single client. We can safely ignore cases where a
471                                // client no longer exists to receive a response because the response
472                                // is meant just for the client.
473                                ServerData::TurnSignal {
474                                    username,
475                                    action_choices,
476                                } => {
477                                    if let Ok(token) =
478                                        token_manager.get_token_with_username(&username)
479                                    {
480                                        let msg = ServerMessage::TurnSignal(action_choices);
481                                        messages_to_write.entry(token).or_default().push_back(msg);
482                                        tokens_to_reregister.insert(token);
483                                    }
484                                }
485                                // A response goes to a single client. We can safely ignore cases where a
486                                // client no longer exists to receive a response because the response
487                                // is meant just for the client.
488                                ServerData::UserError { username, error } => {
489                                    if let Ok(token) =
490                                        token_manager.get_token_with_username(&username)
491                                    {
492                                        let msg = ServerMessage::UserError(error);
493                                        messages_to_write.entry(token).or_default().push_back(msg);
494                                        tokens_to_reregister.insert(token);
495                                    }
496                                }
497                                // Server status is a game status update to all clients.
498                                ServerData::Status(msg) => {
499                                    for token in token_manager.confirmed_tokens.keys() {
500                                        let msg = ServerMessage::Status(msg.clone());
501                                        messages_to_write.entry(*token).or_default().push_back(msg);
502                                        tokens_to_reregister.insert(*token);
503                                    }
504                                }
505                                // Views go to all clients. We can safely ignore cases where a client
506                                // no longer exists to receive a view because the view is specific
507                                // to the client.
508                                ServerData::Views(views) => {
509                                    for (username, view) in views {
510                                        if let Ok(token) =
511                                            token_manager.get_token_with_username(&username)
512                                        {
513                                            let msg = ServerMessage::GameView(view);
514                                            messages_to_write
515                                                .entry(token)
516                                                .or_default()
517                                                .push_back(msg);
518                                            tokens_to_reregister.insert(token);
519                                        }
520                                    }
521                                }
522                            }
523                        }
524                        for token in tokens_to_reregister.drain() {
525                            if let Ok(stream) = token_manager.get_mut_stream_with_token(&token) {
526                                poll.registry().reregister(
527                                    stream,
528                                    token,
529                                    Interest::READABLE | Interest::WRITABLE,
530                                )?;
531                            }
532                        }
533                    }
534                    // Only care about events associated with clients that are
535                    // still valid.
536                    token if !tokens_to_remove.contains(&token) => {
537                        // Maybe received an event for a TCP connection.
538                        if let Ok(stream) = token_manager.get_mut_stream_with_token(&token) {
539                            if event.is_writable()
540                                && let Some(messages) = messages_to_write.get_mut(&token)
541                            {
542                                // Need to handle the case where there's an unresponsive or
543                                // misbehaving client that doesn't let us write messages to
544                                // them. If their message queue reaches a certain size, queue
545                                // them for removal.
546                                if messages.len() >= max_network_events {
547                                    let repr = token_to_string(&token);
548                                    error!("{repr} has not been receiving and will be removed");
549                                    tokens_to_remove.insert(token);
550                                    continue;
551                                }
552                                while let Some(msg) = messages.pop_front() {
553                                    match write_prefixed::<ServerMessage, TcpStream>(stream, &msg) {
554                                        Ok(()) => {
555                                            // Client errors are strict and result in the removal of a connection.
556                                            if let ServerMessage::ClientError(_) = msg {
557                                                let repr = token_to_string(&token);
558                                                debug!("{repr}: {msg}");
559                                                tokens_to_remove.insert(token);
560                                                break;
561                                            }
562                                        }
563                                        Err(error) => {
564                                            match error.kind() {
565                                                // `write_prefixed` uses `write_all` under the hood, so we know
566                                                // that if any of these occur, then the connection was probably
567                                                // dropped at some point.
568                                                io::ErrorKind::BrokenPipe
569                                                | io::ErrorKind::ConnectionAborted
570                                                | io::ErrorKind::ConnectionReset
571                                                | io::ErrorKind::TimedOut
572                                                | io::ErrorKind::UnexpectedEof => {
573                                                    let repr = token_to_string(&token);
574                                                    debug!("{repr} connection dropped");
575                                                    tokens_to_remove.insert(token);
576                                                }
577                                                // Would block "errors" are the OS's way of saying that the
578                                                // connection is not actually ready to perform this I/O operation.
579                                                io::ErrorKind::WouldBlock => {
580                                                    // The message couldn't be sent, so we need to push it back
581                                                    // onto the queue so we don't accidentally forget about it.
582                                                    messages.push_front(msg);
583                                                }
584                                                // Retry writing in the case that the full message couldn't
585                                                // be written. This should be infrequent.
586                                                io::ErrorKind::WriteZero => {
587                                                    let repr = token_to_string(&token);
588                                                    debug!(
589                                                        "{repr} got a zero write, but will retry"
590                                                    );
591                                                    messages.push_front(msg);
592                                                    continue;
593                                                }
594                                                // Other errors we'll consider fatal.
595                                                _ => bail!(error),
596                                            }
597                                            poll.registry().reregister(
598                                                stream,
599                                                token,
600                                                Interest::READABLE,
601                                            )?;
602                                            break;
603                                        }
604                                    }
605                                }
606                            }
607
608                            if event.is_readable() {
609                                // We can (maybe) read from the connection.
610                                loop {
611                                    match read_prefixed::<ClientMessage, TcpStream>(stream) {
612                                        Ok(msg) => {
613                                            let messages =
614                                                messages_to_process.entry(token).or_default();
615                                            messages.push_back(msg);
616                                            if messages.len() >= MAX_NETWORK_EVENTS_PER_USER {
617                                                let repr = token_to_string(&token);
618                                                error!(
619                                                    "{repr} has been spamming and will be removed"
620                                                );
621                                                tokens_to_remove.insert(token);
622                                                break;
623                                            }
624                                        }
625                                        Err(error) => {
626                                            match error.kind() {
627                                                // `read_prefixed` uses `read_exact` under the hood, so we know
628                                                // that an Eof error means the connection was dropped.
629                                                io::ErrorKind::BrokenPipe
630                                                | io::ErrorKind::ConnectionAborted
631                                                | io::ErrorKind::ConnectionReset
632                                                | io::ErrorKind::InvalidData
633                                                | io::ErrorKind::TimedOut
634                                                | io::ErrorKind::UnexpectedEof => {
635                                                    let repr = token_to_string(&token);
636                                                    debug!("{repr}'s connection dropped");
637                                                    tokens_to_remove.insert(token);
638                                                }
639                                                // Would block "errors" are the OS's way of saying that the
640                                                // connection is not actually ready to perform this I/O operation.
641                                                io::ErrorKind::WouldBlock => {}
642                                                // Other errors we'll consider fatal.
643                                                _ => {
644                                                    bail!(error)
645                                                }
646                                            }
647                                            break;
648                                        }
649                                    }
650                                }
651                            }
652                        }
653                    }
654                    // The client is already queued for removal and so this event
655                    // will be ignored.
656                    _ => {}
657                }
658            }
659
660            // Process all the messages received from the clients.
661            for (token, msgs) in messages_to_process
662                .drain()
663                .filter(|(t, _)| !tokens_to_remove.contains(t))
664            {
665                for msg in msgs {
666                    let result = match msg.command {
667                        // Check if the client wasn't able to associate its token with a username
668                        // in time, or if that username is already taken.
669                        UserCommand::Connect => {
670                            token_manager.associate_token_and_username(token, &msg.username)
671                        }
672                        // Check if the client is being faithful and sending messages with
673                        // the correct username.
674                        _ => match token_manager.get_token_with_username(&msg.username) {
675                            Ok(associated_token) => {
676                                if token == associated_token {
677                                    Ok(())
678                                } else {
679                                    Err(ClientError::Unassociated)
680                                }
681                            }
682                            Err(error) => Err(error),
683                        },
684                    };
685                    let repr = token_to_string(&token);
686                    match result {
687                        Ok(()) => {
688                            debug!("{repr}: {msg}");
689                            tx_client.send(msg)?;
690                        }
691                        Err(error) => {
692                            debug!("{repr}: {error}");
693                            let msg = ServerMessage::ClientError(error);
694                            messages_to_write.entry(token).or_default().push_back(msg);
695                            tokens_to_reregister.insert(token);
696                        }
697                    }
698                }
699            }
700            // Make sure we allow writing errors back to the client.
701            for token in tokens_to_reregister.drain() {
702                if let Ok(stream) = token_manager.get_mut_stream_with_token(&token) {
703                    poll.registry().reregister(
704                        stream,
705                        token,
706                        Interest::READABLE | Interest::WRITABLE,
707                    )?;
708                }
709            }
710
711            // Recycle all tokens that need to be removed, deregistering their streams
712            // with the poll.
713            for token in tokens_to_remove.drain() {
714                let repr = token_to_string(&token);
715                debug!("{repr} is being removed");
716                if let Ok(username) = token_manager.get_confirmed_username_with_token(token) {
717                    let msg = ClientMessage {
718                        username,
719                        command: UserCommand::Disconnect,
720                    };
721                    tx_client.send(msg)?;
722                }
723                messages_to_write.remove(&token);
724                if let Ok(mut stream) = token_manager.recycle_token(token) {
725                    poll.registry().deregister(&mut stream)?;
726                }
727            }
728            for (token, mut stream) in token_manager.recycle_expired_tokens() {
729                let repr = token_to_string(&token);
730                debug!("{repr} expired");
731                messages_to_write.remove(&token);
732                poll.registry().deregister(&mut stream)?;
733            }
734        }
735    });
736
737    let mut state: PokerState = config.game_settings.into();
738    let mut status = state.to_string();
739    loop {
740        // Order is kind of key here. We get the status string before
741        // we step so we can inform users what's happening rather than
742        // what's going to happen in the future. This allows faster
743        // feedback from a user's perspective.
744        let repr = state.to_string();
745        // Only send new statuses to clients to avoid spam.
746        if status != repr {
747            info!("{repr}");
748            status = repr;
749            let msg = ServerData::Status(status.clone());
750            server_data_sender.send(msg)?;
751        }
752        state = state.step();
753
754        let views = state.get_views();
755        let msg = ServerData::Views(views);
756        server_data_sender.send(msg)?;
757
758        let mut next_action_username = state.get_next_action_username();
759        let mut timeout = config.server_timeouts.step;
760        'command: loop {
761            // Check if it's a user's turn. If so, send them a turn signal
762            // and increase the timeout to give them time to make their
763            // decision. We also keep track of their username so we
764            // can tell if they don't make a decision in time.
765            if let Some(username) = state.get_next_action_username()
766                && let Some(action_choices) = state.get_action_choices()
767            {
768                // Check if the username from the last turn is the same as the
769                // username from this turn. If so, we need to check if there
770                // was a timeout.
771                if let Some(ref last_username) = next_action_username {
772                    // If there's a timeout, then that means the user didn't
773                    // make a decision in time, and they have to fold.
774                    if timeout.as_secs() == 0 && &username == last_username {
775                        // Ack that they will fold (the poker state will
776                        // fold for them).
777                        warn!("{username} ran out of time and will be forced to fold");
778                        let command = UserCommand::TakeAction(Action::Fold);
779                        let msg = ServerData::Ack(ClientMessage {
780                            username: username.clone(),
781                            command,
782                        });
783                        server_data_sender.send(msg)?;
784
785                        // Force remove them so they don't disrupt future games.
786                        warn!("{username} will be removed at the end of the game");
787                        state.remove_user(&username)?;
788
789                        break 'command;
790                    }
791                    // Let all users know whose turn it is.
792                    let status = format!("it's {username}'s turn and they can {action_choices}");
793                    let msg = ServerData::Status(status.clone());
794                    server_data_sender.send(msg)?;
795
796                    // Let player know it's their turn.
797                    info!("{status}");
798                    let msg = ServerData::TurnSignal {
799                        username: username.clone(),
800                        action_choices,
801                    };
802                    server_data_sender.send(msg)?;
803
804                    next_action_username = Some(username);
805                    timeout = config.server_timeouts.action;
806                }
807            }
808            // If it's no one's turn and there's a timeout, then we must
809            // break to update the poker state.
810            else if timeout.as_secs() == 0 {
811                break 'command;
812            }
813
814            // Use the timeout duration to process events from the server's
815            // IO thread. This is really the main server loop.
816            while timeout.as_secs() > 0 {
817                // Drain game events before catching more user commands.
818                for event in state.drain_events() {
819                    info!("{event}");
820                    let msg = ServerData::Event(event);
821                    server_data_sender.send(msg)?;
822                }
823                let start = Instant::now();
824                if let Ok(mut msg) = rx_client.recv_timeout(timeout) {
825                    let result = match msg.command {
826                        UserCommand::ChangeState(ref new_user_state) => match new_user_state {
827                            UserState::Play => state.waitlist_user(&msg.username).map(|_| ()),
828                            UserState::Spectate => state.spectate_user(&msg.username).map(|_| ()),
829                        },
830                        UserCommand::Connect => state.new_user(&msg.username).map(|_| ()),
831                        UserCommand::Disconnect => state.remove_user(&msg.username).map(|_| ()),
832                        UserCommand::ShowHand => state.show_hand(&msg.username),
833                        UserCommand::StartGame => state.init_start(&msg.username),
834                        UserCommand::TakeAction(ref mut action) => state
835                            .take_action(&msg.username, action.clone())
836                            .map(|new_action| {
837                                timeout = Duration::ZERO;
838                                *action = new_action;
839                            }),
840                        UserCommand::CastVote(ref vote) => state
841                            .cast_vote(&msg.username, vote.clone())
842                            .and_then(|maybe_vote| {
843                                maybe_vote.map_or(Ok(()), |vote| match vote {
844                                    Vote::Kick(username) => state.kick_user(&username).map(|_| ()),
845                                    Vote::Reset(None) => {
846                                        state.reset_all_money();
847                                        Ok(())
848                                    }
849                                    Vote::Reset(Some(username)) => {
850                                        state.reset_user_money(&username).map(|_| ())
851                                    }
852                                })
853                            }),
854                    };
855
856                    // Get the result from a client's command. If their command
857                    // is OK, ack the command to all clients so they know what
858                    // happened. If their command is bad, send an error back to
859                    // the commanding client.
860                    match result {
861                        Ok(()) => {
862                            info!("{msg}");
863                            let msg = ServerData::Ack(msg);
864                            server_data_sender.send(msg)?;
865
866                            let msg = ServerData::Views(state.get_views());
867                            server_data_sender.send(msg)?;
868                        }
869                        Err(error) => {
870                            error!("{error}: {msg}");
871                            let msg = ServerData::UserError {
872                                username: msg.username,
873                                error,
874                            };
875                            server_data_sender.send(msg)?;
876                        }
877                    }
878                }
879                timeout = timeout.saturating_sub(start.elapsed());
880            }
881        }
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use std::time::Duration;
888
889    use mio::{
890        Token,
891        net::{TcpListener, TcpStream},
892    };
893
894    use crate::entities::Username;
895    use crate::net::messages::ClientError;
896
897    use super::TokenManager;
898
899    fn get_server() -> TcpListener {
900        let random_port_addr = "127.0.0.1:0".parse().unwrap();
901        TcpListener::bind(random_port_addr).unwrap()
902    }
903
904    fn get_stream(listener: &TcpListener) -> TcpStream {
905        let addr = listener.local_addr().unwrap();
906        TcpStream::connect(addr).unwrap();
907        let (stream, _) = listener.accept().unwrap();
908        stream
909    }
910
911    #[test]
912    fn confirm_username() {
913        let server = get_server();
914        let stream = get_stream(&server);
915        let mut token_manager = TokenManager::new(Duration::ZERO);
916
917        let token = token_manager.new_token();
918        token_manager.associate_token_and_stream(token, stream);
919
920        let username = Username::new("ognf");
921        assert_eq!(
922            token_manager.get_token_with_username(&username),
923            Err(ClientError::Unassociated)
924        );
925        assert_eq!(
926            token_manager.associate_token_and_username(token, &username),
927            Ok(())
928        );
929        assert_eq!(token_manager.get_token_with_username(&username), Ok(token));
930
931        assert_eq!(token_manager.confirm_username(token), Ok(()));
932        assert_eq!(
933            token_manager.get_confirmed_username_with_token(token),
934            Ok(username.clone())
935        );
936        assert_eq!(token_manager.get_token_with_username(&username), Ok(token));
937    }
938
939    #[test]
940    fn confirm_username_recycled_token() {
941        let server = get_server();
942        let stream = get_stream(&server);
943        let mut token_manager = TokenManager::new(Duration::ZERO);
944
945        let token = token_manager.new_token();
946        token_manager.associate_token_and_stream(token, stream);
947        token_manager.recycle_expired_tokens();
948
949        let username = Username::new("ognf");
950        assert_eq!(
951            token_manager.get_token_with_username(&username),
952            Err(ClientError::Unassociated)
953        );
954        assert_eq!(
955            token_manager.associate_token_and_username(token, &username),
956            Err(ClientError::Expired)
957        );
958    }
959
960    #[test]
961    fn recycle_expired_tokens() {
962        let server = get_server();
963        let stream1 = get_stream(&server);
964        let stream2 = get_stream(&server);
965        let stream3 = get_stream(&server);
966        let stream4 = get_stream(&server);
967        let mut token_manager = TokenManager::new(Duration::ZERO);
968
969        // Create a couple of tokens and immediately recycle them.
970        let token1 = token_manager.new_token();
971        token_manager.associate_token_and_stream(token1, stream1);
972        let token2 = token_manager.new_token();
973        token_manager.associate_token_and_stream(token2, stream2);
974        token_manager.recycle_expired_tokens();
975
976        // Tokens are immediately resused.
977        let token3 = token_manager.new_token();
978        token_manager.associate_token_and_stream(token1, stream3);
979        let token4 = token_manager.new_token();
980        token_manager.associate_token_and_stream(token2, stream4);
981        assert_eq!(token1, Token(2));
982        assert_eq!(token1, token3);
983        assert_eq!(token2, Token(3));
984        assert_eq!(token2, token4);
985    }
986
987    #[test]
988    fn recycle_token() {
989        let server = get_server();
990        let stream1 = get_stream(&server);
991        let stream2 = get_stream(&server);
992        let mut token_manager = TokenManager::new(Duration::ZERO);
993
994        let token1 = token_manager.new_token();
995        token_manager.associate_token_and_stream(token1, stream1);
996        let token2 = token_manager.new_token();
997        token_manager.associate_token_and_stream(token2, stream2);
998
999        let username = Username::new("ognf");
1000        assert_eq!(
1001            token_manager.associate_token_and_username(token1, &username),
1002            Ok(())
1003        );
1004        assert_eq!(
1005            token_manager.associate_token_and_username(token2, &username),
1006            Err(ClientError::AlreadyAssociated)
1007        );
1008        assert!(token_manager.recycle_token(token1).is_ok());
1009        assert_eq!(
1010            token_manager.associate_token_and_username(token2, &username),
1011            Ok(())
1012        );
1013        assert_eq!(token1, token_manager.new_token());
1014    }
1015}