composition/server/
mod.rs

1/// Internal messaging for the server.
2pub mod messages;
3/// Put the network client struct in its own file.
4pub mod net;
5
6use crate::entity::player::Player;
7use log::*;
8use messages::*;
9use net::*;
10use std::sync::mpsc::{self, Receiver, TryRecvError};
11use tokio::net::{TcpListener, ToSocketAddrs};
12
13/// The struct containing all the data and running all the updates.
14pub struct Server {
15    network_clients: Vec<NetworkClient>,
16    network_receiver: Receiver<NetworkClient>,
17    message_receiver: Receiver<ServerboundMessage>,
18    // message_sender: Bus<BroadcastMessage>,
19    pub players: Vec<Player>,
20}
21impl Server {
22    pub fn new<A: 'static + ToSocketAddrs + Send>(addr: A) -> Server {
23        let (network_client_tx, network_client_rx) = mpsc::channel();
24        let (serverbound_message_tx, serverbound_message_rx) = mpsc::channel();
25        // let mut broadcast_message_tx = Bus::new(1_000); // Hold up to 1,000 messages in the queue.
26        tokio::task::spawn(async move {
27            let listener = TcpListener::bind(addr)
28                .await
29                .expect("Could not bind to TCP socket");
30            let mut id = 0;
31            loop {
32                let (stream, _) = listener
33                    .accept()
34                    .await
35                    .expect("Network receiver disconnected");
36                network_client_tx
37                    .send(NetworkClient::new(
38                        stream,
39                        id as u128,
40                        serverbound_message_tx.clone(),
41                        // broadcast_message_tx.add_rx(),
42                    ))
43                    .expect("Network receiver disconnected");
44                id += 1;
45            }
46        });
47        info!("Network server started!");
48        Server {
49            network_receiver: network_client_rx,
50            network_clients: vec![],
51            message_receiver: serverbound_message_rx,
52            players: vec![],
53        }
54    }
55
56    /// Shut down the server.
57    ///
58    /// Disconnects all clients.
59    pub async fn shutdown(&mut self) {
60        info!(
61            "Server shutting down. Uptime: {:?}",
62            crate::START_TIME.elapsed()
63        );
64        self.broadcast_message(BroadcastMessage::Disconnect(
65            "The server is shutting down".into(),
66        ))
67        .await;
68    }
69
70    /// Update the network server.
71    ///
72    /// Update each client in `self.network_clients`.
73    async fn update_network(&mut self) -> tokio::io::Result<()> {
74        // Read new clients from the network.
75        loop {
76            match self.network_receiver.try_recv() {
77                Ok(client) => {
78                    info!(
79                        "Got client at {}",
80                        client.stream.peer_addr().expect("Could not get peer addr")
81                    );
82                    self.network_clients.push(client)
83                }
84                Err(TryRecvError::Empty) => break,
85                Err(TryRecvError::Disconnected) => panic!("Network sender disconnected"),
86            }
87        }
88        // Count the number of players in the Play state.
89        let num_players = self.network_clients.iter().fold(0, |acc, nc| {
90            if nc.state == NetworkClientState::Play {
91                acc + 1
92            } else {
93                acc
94            }
95        });
96        // Update each client, disconnecting those with errors.
97        for client in self.network_clients.iter_mut() {
98            if client.update(num_players).await.is_err() {
99                client.force_disconnect();
100            }
101        }
102        // Remove disconnected clients.
103        self.network_clients
104            .retain(|nc| nc.state != NetworkClientState::Disconnected);
105        // Read new messages from the clients.
106        loop {
107            match self.message_receiver.try_recv() {
108                Ok(message) => match message {
109                    ServerboundMessage::Chat(msg) => {
110                        self.broadcast_message(BroadcastMessage::Chat(msg)).await;
111                    }
112                    ServerboundMessage::PlayerJoin(_uuid, username) => {
113                        self.broadcast_message(BroadcastMessage::Chat(format!(
114                            "Welcome {} to the server!",
115                            username
116                        )))
117                        .await;
118                    }
119                },
120                Err(TryRecvError::Empty) => break,
121                Err(TryRecvError::Disconnected) => panic!("Message sender disconnected"),
122            }
123        }
124        Ok(())
125    }
126
127    pub async fn broadcast_message(&mut self, message: BroadcastMessage) {
128        let mut v = Vec::new();
129        for client in self.network_clients.iter_mut() {
130            v.push(client.handle_broadcast_message(message.clone()));
131        }
132        futures::future::join_all(v).await;
133    }
134
135    /// Update the game server.
136    ///
137    /// Start by updating the network.
138    pub async fn update(&mut self) -> tokio::io::Result<()> {
139        self.update_network().await?;
140        Ok(())
141    }
142}