sc2_proxy/
supervisor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
//! Game supervisor, manages games and passes messages

#![allow(dead_code)]

use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::ErrorKind::WouldBlock;

use websocket::message::OwnedMessage;
use websocket::result::WebSocketError;

use protobuf::parse_from_bytes;
use protobuf::Message;
use sc2_proto::{self, sc2api::RequestJoinGame};

use crate::config::{Config, MatchmakingMode};
use crate::game::{spawn as spawn_game, FromSupervisor, GameLobby, Handle as GameHandle};
use crate::proxy::Client;
use crate::remote_control::Remote;

enum PlaylistAction {
    Respond(OwnedMessage),
    RespondQuit(OwnedMessage),
    JoinGame(sc2_proto::sc2api::RequestJoinGame),
    Kick,
}
impl PlaylistAction {
    pub fn respond(r: sc2_proto::sc2api::Response) -> Self {
        let m = OwnedMessage::Binary(r.write_to_bytes().expect("Invalid protobuf message"));
        PlaylistAction::Respond(m)
    }
    pub fn respond_quit(r: sc2_proto::sc2api::Response) -> Self {
        let m = OwnedMessage::Binary(r.write_to_bytes().expect("Invalid protobuf message"));
        PlaylistAction::RespondQuit(m)
    }
}

/// Unique identifier for lobby and running games
/// Game keeps same id from lobby creation until all clients leave the game
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct GameId(u64);
impl GameId {
    fn next(self) -> Self {
        Self(self.0 + 1)
    }
}

/// Supervisor manages a pool of games and client waiting for games
pub struct Supervisor {
    /// Configuration
    config: Config,
    /// Running games
    games: HashMap<GameId, GameHandle>,
    /// Games waiting for more players
    lobbies: HashMap<GameId, GameLobby>,
    /// Connections (in nonblocking mode) waiting for a game
    /// If a game join is requested is pending (with remote), then also contains that
    playlist: Vec<(Client, Option<RequestJoinGame>)>,
    /// Id counter to allocate next id
    id_counter: GameId,
}
impl Supervisor {
    /// Create new emty supervisor from config
    pub fn new(config: Config) -> Self {
        Self {
            config,
            games: HashMap::new(),
            lobbies: HashMap::new(),
            playlist: Vec::new(),
            id_counter: GameId(0),
        }
    }

    /// Create new lobby
    fn create_lobby(&mut self) -> GameId {
        if let Err(e) = self.config.check() {
            error!("Invalid configuration: {}", e);
            panic!("Invalid configuration");
        }

        let lobby = GameLobby::new(self.config.clone());
        let id = self.id_counter;
        debug_assert!(!self.lobbies.contains_key(&id));
        debug_assert!(!self.games.contains_key(&id));
        self.id_counter = self.id_counter.next();
        self.lobbies.insert(id, lobby);
        id
    }

    /// Add a new client socket to playlist
    pub fn add_client(&mut self, client: Client) {
        client.set_nonblocking(true).expect("Could not set nonblocking");
        self.playlist.push((client, None));
    }

    /// Remove client from playlist, closing the connection
    fn drop_client(&mut self, index: usize) {
        let (client, _) = &mut self.playlist[index];
        info!("Removing client {:?} from playlist", client.peer_addr().unwrap());
        client.shutdown().expect("Connection shutdown failed");
        self.playlist.remove(index);
    }

    /// Gets a client index by identifier (peer address for now) if any
    #[must_use]
    pub fn client_index_by_id(&mut self, client_id: String) -> Option<usize> {
        self.playlist
            .iter()
            .enumerate()
            .filter(|(_, (c, _))| c.peer_addr().expect("Could not get peer_addr").to_string() == client_id)
            .map(|(i, _)| i)
            .nth(0)
    }

    /// Join to game from playlist
    /// Íff game join fails, drops connection
    #[must_use]
    fn playlist_join_game(&mut self, index: usize, req: RequestJoinGame) -> Option<()> {
        let (client, old_req) = self.playlist.remove(index);

        if old_req != None {
            warn!("Client attempted to join a game twice (dropping connection)");
            return None;
        }

        client.set_nonblocking(false).expect("Could not set nonblocking");

        // TODO: Verify that InterfaceOptions are allowed

        match self.config.matchmaking.mode {
            MatchmakingMode::AgainstBuiltinAI => {
                let id = self.create_lobby();
                let mut lobby = self.lobbies.remove(&id).unwrap();
                lobby.join(client, req);
                lobby.add_computer(
                    self.config.matchmaking.cpu_race,
                    self.config.matchmaking.cpu_difficulty,
                );
                let game = lobby.start()?;
                self.games.insert(id, spawn_game(game));
            },
            MatchmakingMode::Pairs => {
                if let Some(&id) = self.lobbies.keys().nth(0) {
                    let mut lobby = self.lobbies.remove(&id).unwrap();
                    lobby.join(client, req);
                    let game = lobby.start()?;
                    self.games.insert(id, spawn_game(game));
                } else {
                    let id = self.create_lobby();
                    let lobby = self.lobbies.get_mut(&id).unwrap();
                    lobby.join(client, req);
                }
            },
            MatchmakingMode::RemoteController => {
                // Return client to playlist, the remote can handle this
                client.set_nonblocking(true).expect("Could not set nonblocking");
                self.playlist.push((client, Some(req)));
            },
            other => panic!("Unimplemented matchmaking mode {:?}", other),
        }

        Some(())
    }

    /// Process message from a client in the playlist
    fn process_playlist_message(&mut self, msg: OwnedMessage) -> PlaylistAction {
        match msg {
            OwnedMessage::Binary(bytes) => {
                let req = parse_from_bytes::<sc2_proto::sc2api::Request>(&bytes);
                debug!("Incoming playlist request: {:?}", req);

                match req {
                    Ok(ref m) if m.has_quit() => {
                        info!("Client quit");
                        let mut resp = sc2_proto::sc2api::Response::new();
                        let quit = sc2_proto::sc2api::ResponseQuit::new();
                        resp.set_quit(quit);
                        PlaylistAction::respond_quit(resp)
                    },
                    Ok(ref m) if m.has_ping() => {
                        trace!("Ping => Pong");
                        let mut resp = sc2_proto::sc2api::Response::new();
                        let pong = sc2_proto::sc2api::ResponsePing::new();
                        // TODO: Set pong fields, like game version?
                        resp.set_ping(pong);
                        PlaylistAction::respond(resp)
                    },
                    Ok(ref m) if m.has_join_game() => {
                        debug!("Game join");
                        PlaylistAction::JoinGame(m.get_join_game().clone())
                    },
                    Ok(other) => {
                        warn!("Unsupported message in playlist {:?}", other);
                        PlaylistAction::Kick
                    },
                    Err(err) => {
                        warn!("Invalid message {:?}", err);
                        PlaylistAction::Kick
                    },
                }
            },
            other => {
                warn!("Unsupported message type {:?}", other);
                PlaylistAction::Kick
            },
        }
    }

    /// Update clients in playlist to see if they join a game or disconnect
    pub fn update_playlist(&mut self) {
        for i in (0..self.playlist.len()).rev() {
            match self.playlist[i].0.recv_message() {
                Ok(msg) => match self.process_playlist_message(msg) {
                    PlaylistAction::Kick => self.drop_client(i),
                    PlaylistAction::Respond(resp) => {
                        self.playlist[i].0.send_message(&resp).expect("Could not respond");
                    },
                    PlaylistAction::RespondQuit(resp) => {
                        self.playlist[i].0.send_message(&resp).expect("Could not respond");
                        self.drop_client(i);
                    },
                    PlaylistAction::JoinGame(req) => {
                        let joinres = self.playlist_join_game(i, req);
                        if joinres == None {
                            warn!("Game creation / joining failed");
                        }
                    },
                },
                Err(WebSocketError::IoError(ref e)) if e.kind() == WouldBlock => {},
                Err(err) => {
                    warn!("Invalid message {:?}", err);
                    self.drop_client(i);
                },
            };
        }
    }

    /// Update game handles to see if they are still running
    pub fn update_games(&mut self) {
        let mut games_over = Vec::new();
        for (id, game) in self.games.iter_mut() {
            if game.check() {
                games_over.push(id.clone());
            }
        }

        for id in games_over {
            match self.games.remove(&id).unwrap().collect_result() {
                Ok((result, players)) => {
                    // Return players to playlist
                    for p in players.into_iter() {
                        // TODO: process reuse
                        self.add_client(p.extract_client());
                    }

                    info!("Game result: {:?}", result);
                },
                Err(msg) => {
                    error!("Game thread panicked with: {:?}", msg);
                },
            }
        }
    }

    /// Update game handles to see if they are still running
    /// Returns true if a request was processed
    #[must_use]
    pub fn update_remote(&mut self, remote: &mut Remote) -> RemoteUpdateStatus {
        use crate::remote_control::message::*;

        if let Some(msg) = remote.try_recv() {
            match msg {
                Request::Quit => {
                    remote.send(Response::Quit);
                    return RemoteUpdateStatus::Quit;
                },
                Request::Ping(v) => remote.send(Response::Ping(v)),
                Request::GetConfig => {
                    remote.send(Response::GetConfig(self.config.clone()));
                },
                Request::SetConfig(config) => {
                    self.config = config.clone();
                    remote.send(Response::SetConfig(config));
                },
                Request::GetPlaylist => {
                    remote.send(Response::GetPlaylist(
                        self.playlist
                            .iter()
                            .map(|(c, r)| {
                                (
                                    c.peer_addr().expect("Could not get peer_addr").to_string(),
                                    r.is_some(),
                                )
                            })
                            .collect(),
                    ));
                },
                Request::CreateLobby => {
                    let game_id = self.create_lobby();
                    remote.send(Response::CreateLobby(game_id));
                },
                Request::AddToLobby(game_id, client_id) => {
                    if let Some(index) = self.client_index_by_id(client_id) {
                        let (client, req_opt) = self.playlist.remove(index);
                        if let Some(req) = req_opt {
                            if let Some(lobby) = self.lobbies.get_mut(&game_id) {
                                client.set_nonblocking(false).expect("Could not set nonblocking");
                                lobby.join(client, req);
                                remote.send(Response::AddToLobby);
                            } else {
                                remote.send(Response::Error("No such game".to_owned()));
                                // Client connection dropped here
                            }
                        } else {
                            remote.send(Response::Error("Client not ready".to_owned()));
                            // Client connection dropped here
                        }
                    } else {
                        remote.send(Response::Error("No such client".to_owned()));
                    }
                },
                Request::StartGame(game_id) => {
                    if let Some(lobby) = self.lobbies.remove(&game_id) {
                        if !lobby.is_valid() {
                            remote.send(Response::Error("The lobby is empty".to_owned()));
                        } else if let Some(game) = lobby.start() {
                            self.games.insert(game_id, spawn_game(game));
                            remote.send(Response::StartGame);
                        } else {
                            remote.send(Response::Error("Game start failed".to_owned()));
                            // TODO: Connections are dropped here
                            // maybe they should be returned to the playlist instead
                        }
                    } else {
                        remote.send(Response::Error("No such game".to_owned()));
                    }
                },
                _ => remote.send(Response::Error("Unsupported".to_owned())),
            };
            RemoteUpdateStatus::Processed
        } else {
            RemoteUpdateStatus::NoAction
        }
    }

    /// Destroys the supervisor, ending all games,
    /// and closing all connections and threads
    pub fn close(self) {
        debug!("Closing supervisor");

        // Tell all games to quit
        for (_id, mut game) in self.games.into_iter() {
            game.send(FromSupervisor::Quit);
        }

        // Destroy all lobbies
        for (_id, lobby) in self.lobbies.into_iter() {
            lobby.close();
        }

        // Close all gamelist connections by drop
    }
}

/// Return type of Supervisor.update_remote
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RemoteUpdateStatus {
    /// Server quit requested
    Quit,
    /// A request was processed
    Processed,
    /// No action was taken
    NoAction,
}