blizzard_server/server/
controller.rs

1//! # Controller
2//! The controller is in charge of opening the game ports and handling client connections to games.
3//! It opens threads per client.
4//! Each client has a receiver and a sender thread.
5
6use std::io::{BufRead, BufReader, Error, Write};
7use std::net::{TcpListener, TcpStream};
8use std::str;
9use std::sync::mpsc::{Receiver, Sender};
10use std::sync::{mpsc, Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use serde::de::DeserializeOwned;
15use serde::Serialize;
16
17use blizzard_engine::core::network_application::Application;
18use blizzard_engine::game::Game;
19use blizzard_id::Uid;
20
21use crate::game::Player;
22use crate::server::connector::Connector;
23
24/// # Functionality:
25/// * Connection controller
26/// * Provides information to connector
27/// * Handles client connections to the game
28pub struct Controller {
29    // Waiting for future use
30    port: i32,
31    connector: Arc<Mutex<Connector>>,
32    players: Vec<Player>,
33    max_players: i32,
34}
35
36impl Controller {
37    /// Create a new controller from a connector
38    fn new(port: i32, max_players: i32, connector: Arc<Mutex<Connector>>) -> Controller {
39        Controller {
40            players: vec![],
41            max_players,
42            port,
43            connector,
44        }
45    }
46
47    /// What it does:
48    /// * Runs a game (application).
49    /// * Creates messaging channel between controller and app.
50    /// * Creates a shared state to share between app and client.
51    /// * Opens a port for game.
52    pub fn open_game_port<'de, T: Game<K, I>, K, I, M>(
53        port: i32,
54        max_players: i32,
55        connector: Arc<Mutex<Connector>>,
56        handle_input: &'static (dyn Fn(Receiver<(M, usize)>, Arc<Mutex<I>>) -> I + Sync),
57        mut app: Application<T, K, I>,
58        send_data_rate: i32,
59    ) where
60        T: Send + 'static,
61        K: Send + Serialize + 'static,
62        I: Send + Copy,
63        M: Send + DeserializeOwned,
64    {
65        // Store port id
66        let id = port;
67
68        // Create message channel
69        let (tx, rx) = mpsc::channel();
70
71        // Get shared state between controller and app
72        let shared_state = Arc::clone(&app.shared_state);
73
74        // Create controller mutex with reference counter
75        let controller = Arc::new(Mutex::new(Controller::new(port, max_players, connector)));
76
77        // Format port
78        let port = format!("0.0.0.0:{}", port);
79        println!("Opening game in port {}", port);
80
81        // Start app / game in a new thread
82        let builder = thread::Builder::new().name(format!("App-thread-{}", id));
83        builder
84            .spawn(move || {
85                app.start(rx, handle_input);
86            })
87            .expect("Could not create thread");
88
89        // Create tcp listener
90        let listener = TcpListener::bind(port).expect("Could not bind");
91
92        for stream in listener.incoming() {
93            match stream {
94                Err(e) => {
95                    eprintln!("failed: {}", e)
96                }
97                Ok(mut stream) => {
98                    // Push a new player to the game
99                    let (could_join, player_id) = controller.lock().unwrap().add_player();
100
101                    if could_join {
102                        // Clone the controller
103                        let controller = Arc::clone(&controller);
104
105                        // Update game wrapper player count
106                        controller
107                            .lock()
108                            .unwrap()
109                            .connector
110                            .lock()
111                            .unwrap()
112                            .add_player();
113
114                        // Create concurrency clones
115                        let sender = tx.clone();
116                        let shared_state = Arc::clone(&shared_state);
117
118                        // Spawn thread and move thread and controller
119                        let builder = thread::Builder::new()
120                            .name(format!("Game-{}-player-{}", id, player_id));
121                        builder
122                            .spawn(move || {
123                                Controller::handle_player_connection::<K, M>(
124                                    stream,
125                                    controller,
126                                    player_id,
127                                    sender,
128                                    shared_state,
129                                    send_data_rate,
130                                )
131                                .unwrap_or_else(|error| eprintln!("{:?}", error));
132                            })
133                            .expect("Could not create thread");
134                    } else {
135                        stream.write("Could not join".as_bytes()).unwrap();
136                    }
137                }
138            }
139        }
140    }
141
142    /// Handles player writing and reading
143    pub fn handle_player_connection<'de, K, M>(
144        stream: TcpStream,
145        game: Arc<Mutex<Controller>>,
146        id: usize,
147        sender: Sender<(M, usize)>,
148        shared_state: Arc<Mutex<K>>,
149        send_data_rate: i32,
150    ) -> Result<(), Error>
151    where
152        K: Send + Serialize + 'static,
153        M: Send + DeserializeOwned + 'static,
154    {
155        println!("Connecting player {} to game", id);
156        let mut stream_clone = stream.try_clone().unwrap();
157        let sender = sender.clone();
158
159        // Defines bool for dropping the thread on disconnection
160        let drop_thread = Arc::new(Mutex::new(false));
161        let drop_copy = Arc::clone(&drop_thread);
162
163        // Stream receiver: Read from client
164        thread::spawn(move || {
165            let drop = drop_copy;
166            loop {
167                let mut buffer: Vec<u8> = Vec::new();
168                let mut reader = BufReader::new(&stream);
169                reader
170                    .read_until(b'\n', &mut buffer)
171                    .expect("Could not read into buffer");
172                let bytes_read = buffer.len();
173
174                // On stream input, get lock and aquire player id
175                let mut game = game.lock().unwrap();
176                let player_index = game.players.iter().position(|p| p.id == id).unwrap();
177
178                // If no bytes end connection
179                if bytes_read == 0 {
180                    // Remove player
181                    game.connector.lock().unwrap().remove_player();
182                    game.remove_player(player_index);
183
184                    // Mark thread for dropping
185                    *drop.lock().unwrap() = true;
186                    break;
187                }
188
189                // Parse message and send to app
190                let json = str::from_utf8(&buffer).unwrap();
191                let signal: M = serde_json::from_str(&json.trim()).unwrap();
192
193                match sender.send((signal, id)) {
194                    Ok(_) => {}
195                    Err(_) => println!("Could not send signal to app."),
196                }
197            }
198        });
199
200        // Stream sender: write to client
201        thread::spawn(move || {
202            // 1000 / millis = frames per sec
203            // millis = 1000 / frames_per_sec
204            let sleep_time: u64 = (1000 / send_data_rate) as u64;
205
206            // Client event loop
207            loop {
208                thread::sleep(Duration::from_millis(sleep_time));
209
210                // On stream input, aquire shared state lock
211                let reduced_state = shared_state.lock().unwrap();
212                let serialized = serde_json::to_string(&*reduced_state);
213
214                // Send state to client
215                if let Ok(s) = serialized {
216                    let s = s + "\n";
217                    match stream_clone.write(s.as_bytes()) {
218                        Ok(_) => {}
219                        Err(_) => println!("Could not send data to client."),
220                    }
221                }
222
223                // Drop thread when client disconnects
224                if *drop_thread.lock().unwrap() {
225                    break;
226                }
227            }
228        });
229
230        return Ok(());
231    }
232
233    /// Add a player to the game
234    pub fn add_player(&mut self) -> (bool, usize) {
235        if self.players.len() < self.max_players as usize {
236            let id = Uid::new_numerical(4) as usize;
237            let new_player = Player::new(id);
238            self.players.push(new_player);
239            return (true, id);
240        }
241        (false, 0)
242    }
243
244    /// Remove a player from the game
245    pub fn remove_player(&mut self, index: usize) -> bool {
246        if self.players.len() == 0 {
247            return false;
248        }
249        self.players.remove(index);
250        true
251    }
252}