blizzard_server/server/
controller.rs1use std::io::{BufRead, BufReader, Error, Write};
7use std::net::{TcpListener, TcpStream};
8use std::str;
9use std::sync::mpsc::{Receiver, Sender};
10use std::sync::{mpsc, Arc, Mutex};
11use std::thread;
12use std::time::Duration;
13
14use serde::de::DeserializeOwned;
15use serde::Serialize;
16
17use blizzard_engine::core::network_application::Application;
18use blizzard_engine::game::Game;
19use blizzard_id::Uid;
20
21use crate::game::Player;
22use crate::server::connector::Connector;
23
24pub struct Controller {
29 port: i32,
31 connector: Arc<Mutex<Connector>>,
32 players: Vec<Player>,
33 max_players: i32,
34}
35
36impl Controller {
37 fn new(port: i32, max_players: i32, connector: Arc<Mutex<Connector>>) -> Controller {
39 Controller {
40 players: vec![],
41 max_players,
42 port,
43 connector,
44 }
45 }
46
47 pub fn open_game_port<'de, T: Game<K, I>, K, I, M>(
53 port: i32,
54 max_players: i32,
55 connector: Arc<Mutex<Connector>>,
56 handle_input: &'static (dyn Fn(Receiver<(M, usize)>, Arc<Mutex<I>>) -> I + Sync),
57 mut app: Application<T, K, I>,
58 send_data_rate: i32,
59 ) where
60 T: Send + 'static,
61 K: Send + Serialize + 'static,
62 I: Send + Copy,
63 M: Send + DeserializeOwned,
64 {
65 let id = port;
67
68 let (tx, rx) = mpsc::channel();
70
71 let shared_state = Arc::clone(&app.shared_state);
73
74 let controller = Arc::new(Mutex::new(Controller::new(port, max_players, connector)));
76
77 let port = format!("0.0.0.0:{}", port);
79 println!("Opening game in port {}", port);
80
81 let builder = thread::Builder::new().name(format!("App-thread-{}", id));
83 builder
84 .spawn(move || {
85 app.start(rx, handle_input);
86 })
87 .expect("Could not create thread");
88
89 let listener = TcpListener::bind(port).expect("Could not bind");
91
92 for stream in listener.incoming() {
93 match stream {
94 Err(e) => {
95 eprintln!("failed: {}", e)
96 }
97 Ok(mut stream) => {
98 let (could_join, player_id) = controller.lock().unwrap().add_player();
100
101 if could_join {
102 let controller = Arc::clone(&controller);
104
105 controller
107 .lock()
108 .unwrap()
109 .connector
110 .lock()
111 .unwrap()
112 .add_player();
113
114 let sender = tx.clone();
116 let shared_state = Arc::clone(&shared_state);
117
118 let builder = thread::Builder::new()
120 .name(format!("Game-{}-player-{}", id, player_id));
121 builder
122 .spawn(move || {
123 Controller::handle_player_connection::<K, M>(
124 stream,
125 controller,
126 player_id,
127 sender,
128 shared_state,
129 send_data_rate,
130 )
131 .unwrap_or_else(|error| eprintln!("{:?}", error));
132 })
133 .expect("Could not create thread");
134 } else {
135 stream.write("Could not join".as_bytes()).unwrap();
136 }
137 }
138 }
139 }
140 }
141
142 pub fn handle_player_connection<'de, K, M>(
144 stream: TcpStream,
145 game: Arc<Mutex<Controller>>,
146 id: usize,
147 sender: Sender<(M, usize)>,
148 shared_state: Arc<Mutex<K>>,
149 send_data_rate: i32,
150 ) -> Result<(), Error>
151 where
152 K: Send + Serialize + 'static,
153 M: Send + DeserializeOwned + 'static,
154 {
155 println!("Connecting player {} to game", id);
156 let mut stream_clone = stream.try_clone().unwrap();
157 let sender = sender.clone();
158
159 let drop_thread = Arc::new(Mutex::new(false));
161 let drop_copy = Arc::clone(&drop_thread);
162
163 thread::spawn(move || {
165 let drop = drop_copy;
166 loop {
167 let mut buffer: Vec<u8> = Vec::new();
168 let mut reader = BufReader::new(&stream);
169 reader
170 .read_until(b'\n', &mut buffer)
171 .expect("Could not read into buffer");
172 let bytes_read = buffer.len();
173
174 let mut game = game.lock().unwrap();
176 let player_index = game.players.iter().position(|p| p.id == id).unwrap();
177
178 if bytes_read == 0 {
180 game.connector.lock().unwrap().remove_player();
182 game.remove_player(player_index);
183
184 *drop.lock().unwrap() = true;
186 break;
187 }
188
189 let json = str::from_utf8(&buffer).unwrap();
191 let signal: M = serde_json::from_str(&json.trim()).unwrap();
192
193 match sender.send((signal, id)) {
194 Ok(_) => {}
195 Err(_) => println!("Could not send signal to app."),
196 }
197 }
198 });
199
200 thread::spawn(move || {
202 let sleep_time: u64 = (1000 / send_data_rate) as u64;
205
206 loop {
208 thread::sleep(Duration::from_millis(sleep_time));
209
210 let reduced_state = shared_state.lock().unwrap();
212 let serialized = serde_json::to_string(&*reduced_state);
213
214 if let Ok(s) = serialized {
216 let s = s + "\n";
217 match stream_clone.write(s.as_bytes()) {
218 Ok(_) => {}
219 Err(_) => println!("Could not send data to client."),
220 }
221 }
222
223 if *drop_thread.lock().unwrap() {
225 break;
226 }
227 }
228 });
229
230 return Ok(());
231 }
232
233 pub fn add_player(&mut self) -> (bool, usize) {
235 if self.players.len() < self.max_players as usize {
236 let id = Uid::new_numerical(4) as usize;
237 let new_player = Player::new(id);
238 self.players.push(new_player);
239 return (true, id);
240 }
241 (false, 0)
242 }
243
244 pub fn remove_player(&mut self, index: usize) -> bool {
246 if self.players.len() == 0 {
247 return false;
248 }
249 self.players.remove(index);
250 true
251 }
252}