oxygengine_network_backend_desktop/
lib.rs

1extern crate oxygengine_core as core;
2extern crate oxygengine_network as network;
3
4pub mod prelude {
5    pub use crate::*;
6}
7
8#[cfg(test)]
9mod tests;
10
11use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
12use network::{
13    client::{ClientId, ClientState, MessageId},
14    server::{Server, ServerId, ServerState},
15};
16use std::{
17    collections::{HashMap, VecDeque},
18    io::{Cursor, Read, Write},
19    ops::Range,
20    sync::{Arc, RwLock},
21    thread::{spawn, JoinHandle},
22};
23use ws::{CloseCode, Handler, Handshake, Message, Result, Sender as WsSender, WebSocket};
24
25type MsgData = (ClientId, MessageId, Vec<u8>);
26
27#[derive(Clone)]
28struct Client {
29    id: ClientId,
30    ws: WsSender,
31    state: ClientState,
32    messages: VecDeque<MsgData>,
33}
34
35struct ClientHandler {
36    client: Arc<RwLock<Client>>,
37}
38
39impl Handler for ClientHandler {
40    fn on_open(&mut self, _: Handshake) -> Result<()> {
41        if let Ok(mut client) = self.client.write() {
42            client.state = ClientState::Open;
43        }
44        Ok(())
45    }
46
47    fn on_message(&mut self, msg: Message) -> Result<()> {
48        if let Message::Binary(msg) = msg {
49            let size = msg.len();
50            if size >= 8 {
51                let mut stream = Cursor::new(msg);
52                let id = stream.read_u32::<BigEndian>().unwrap();
53                let version = stream.read_u32::<BigEndian>().unwrap();
54                let mut data = Vec::with_capacity(size - 8);
55                stream.read_to_end(&mut data).unwrap();
56                if let Ok(mut client) = self.client.write() {
57                    let client_id = client.id;
58                    client
59                        .messages
60                        .push_back((client_id, MessageId::new(id, version), data));
61                }
62            }
63        }
64        Ok(())
65    }
66
67    fn on_close(&mut self, _: CloseCode, _: &str) {
68        if let Ok(mut client) = self.client.write() {
69            client.state = ClientState::Closed;
70        }
71    }
72}
73
74pub struct DesktopServer {
75    id: ServerId,
76    state: Arc<RwLock<ServerState>>,
77    clients: Arc<RwLock<HashMap<ClientId, Arc<RwLock<Client>>>>>,
78    clients_ids_cached: Vec<ClientId>,
79    messages: VecDeque<MsgData>,
80    ws: Arc<RwLock<Option<WsSender>>>,
81    thread: Option<JoinHandle<()>>,
82}
83
84impl Drop for DesktopServer {
85    fn drop(&mut self) {
86        self.cleanup();
87    }
88}
89
90impl DesktopServer {
91    fn cleanup(&mut self) {
92        {
93            if let Ok(mut state) = self.state.write() {
94                *state = ServerState::Closed;
95            }
96            if let Ok(mut ws) = self.ws.write() {
97                if let Some(ws) = ws.as_ref() {
98                    ws.shutdown().unwrap();
99                }
100                *ws = None;
101            }
102        }
103        if let Some(thread) = self.thread.take() {
104            thread.join().unwrap();
105        }
106    }
107}
108
109impl Server for DesktopServer {
110    fn open(url: &str) -> Option<Self> {
111        let url = url.to_owned();
112        let state = Arc::new(RwLock::new(ServerState::Starting));
113        let state2 = state.clone();
114        let clients = Arc::new(RwLock::new(HashMap::default()));
115        let clients2 = clients.clone();
116        let sender = Arc::new(RwLock::new(None));
117        let sender2 = sender.clone();
118        let thread = Some(spawn(move || {
119            let ws = WebSocket::new(|ws| {
120                let id = ClientId::default();
121                let client = Arc::new(RwLock::new(Client {
122                    id,
123                    ws,
124                    state: ClientState::Connecting,
125                    messages: VecDeque::new(),
126                }));
127                if let Ok(mut clients) = clients2.write() {
128                    clients.insert(id, client.clone());
129                }
130                ClientHandler { client }
131            })
132            .unwrap();
133            if let Ok(mut sender) = sender2.write() {
134                *sender = Some(ws.broadcaster());
135            }
136            if let Ok(mut state) = state2.write() {
137                *state = ServerState::Open;
138            }
139            ws.listen(&url).unwrap();
140            if let Ok(mut sender) = sender2.write() {
141                *sender = None;
142            }
143            if let Ok(mut state) = state2.write() {
144                *state = ServerState::Closed;
145            }
146        }));
147        Some(Self {
148            id: Default::default(),
149            state,
150            clients,
151            clients_ids_cached: vec![],
152            messages: Default::default(),
153            ws: sender,
154            thread,
155        })
156    }
157
158    fn close(mut self) -> Self {
159        self.cleanup();
160        self
161    }
162
163    fn id(&self) -> ServerId {
164        self.id
165    }
166
167    fn state(&self) -> ServerState {
168        if let Ok(state) = self.state.read() {
169            *state
170        } else {
171            ServerState::default()
172        }
173    }
174
175    fn clients(&self) -> &[ClientId] {
176        &self.clients_ids_cached
177    }
178
179    fn disconnect(&mut self, id: ClientId) {
180        if let Ok(mut clients) = self.clients.write() {
181            if let Some(client) = clients.get_mut(&id) {
182                if let Ok(client) = client.read() {
183                    drop(client.ws.close(CloseCode::Normal));
184                    drop(client.ws.shutdown());
185                }
186            }
187            clients.remove(&id);
188        }
189    }
190
191    fn disconnect_all(&mut self) {
192        if let Ok(mut clients) = self.clients.write() {
193            for client in clients.values() {
194                if let Ok(client) = client.read() {
195                    drop(client.ws.close(CloseCode::Normal));
196                    drop(client.ws.shutdown());
197                }
198            }
199            clients.clear();
200        }
201    }
202
203    fn send(&mut self, id: ClientId, msg_id: MessageId, data: &[u8]) -> Option<Range<usize>> {
204        if self.state() != ServerState::Open {
205            return None;
206        }
207
208        if let Ok(mut clients) = self.clients.write() {
209            if let Some(client) = clients.get_mut(&id) {
210                let size = data.len();
211                let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
212                drop(stream.write_u32::<BigEndian>(msg_id.id()));
213                drop(stream.write_u32::<BigEndian>(msg_id.version()));
214                drop(stream.write(data));
215                let data = stream.into_inner();
216                if let Ok(client) = client.read() {
217                    if client.state == ClientState::Open
218                        && client.ws.send(Message::Binary(data)).is_ok()
219                    {
220                        return Some(0..size);
221                    }
222                }
223            }
224        }
225        None
226    }
227
228    fn send_all(&mut self, id: MessageId, data: &[u8]) {
229        if self.state() != ServerState::Open {
230            return;
231        }
232        let size = data.len();
233        let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
234        drop(stream.write_u32::<BigEndian>(id.id()));
235        drop(stream.write_u32::<BigEndian>(id.version()));
236        drop(stream.write(data));
237        let data = stream.into_inner();
238        if let Ok(ws) = self.ws.read() {
239            if let Some(ws) = ws.as_ref() {
240                ws.send(Message::Binary(data)).unwrap();
241            }
242        }
243    }
244
245    fn read(&mut self) -> Option<MsgData> {
246        self.messages.pop_front()
247    }
248
249    fn read_all(&mut self) -> Vec<MsgData> {
250        let result = self.messages.iter().cloned().collect();
251        self.messages.clear();
252        result
253    }
254
255    fn process(&mut self) {
256        if let Ok(mut clients) = self.clients.write() {
257            for client in clients.values() {
258                if let Ok(mut client) = client.write() {
259                    self.messages.append(&mut client.messages);
260                }
261            }
262            clients.retain(|_, client| {
263                if let Ok(client) = client.read() {
264                    client.state != ClientState::Closed
265                } else {
266                    true
267                }
268            });
269            self.clients_ids_cached.clear();
270            for id in clients.keys() {
271                self.clients_ids_cached.push(*id);
272            }
273        }
274    }
275}