oxygengine_network_backend_desktop/
lib.rs1extern crate oxygengine_core as core;
2extern crate oxygengine_network as network;
3
4pub mod prelude {
5 pub use crate::*;
6}
7
8#[cfg(test)]
9mod tests;
10
11use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
12use network::{
13 client::{ClientId, ClientState, MessageId},
14 server::{Server, ServerId, ServerState},
15};
16use std::{
17 collections::{HashMap, VecDeque},
18 io::{Cursor, Read, Write},
19 ops::Range,
20 sync::{Arc, RwLock},
21 thread::{spawn, JoinHandle},
22};
23use ws::{CloseCode, Handler, Handshake, Message, Result, Sender as WsSender, WebSocket};
24
25type MsgData = (ClientId, MessageId, Vec<u8>);
26
27#[derive(Clone)]
28struct Client {
29 id: ClientId,
30 ws: WsSender,
31 state: ClientState,
32 messages: VecDeque<MsgData>,
33}
34
35struct ClientHandler {
36 client: Arc<RwLock<Client>>,
37}
38
39impl Handler for ClientHandler {
40 fn on_open(&mut self, _: Handshake) -> Result<()> {
41 if let Ok(mut client) = self.client.write() {
42 client.state = ClientState::Open;
43 }
44 Ok(())
45 }
46
47 fn on_message(&mut self, msg: Message) -> Result<()> {
48 if let Message::Binary(msg) = msg {
49 let size = msg.len();
50 if size >= 8 {
51 let mut stream = Cursor::new(msg);
52 let id = stream.read_u32::<BigEndian>().unwrap();
53 let version = stream.read_u32::<BigEndian>().unwrap();
54 let mut data = Vec::with_capacity(size - 8);
55 stream.read_to_end(&mut data).unwrap();
56 if let Ok(mut client) = self.client.write() {
57 let client_id = client.id;
58 client
59 .messages
60 .push_back((client_id, MessageId::new(id, version), data));
61 }
62 }
63 }
64 Ok(())
65 }
66
67 fn on_close(&mut self, _: CloseCode, _: &str) {
68 if let Ok(mut client) = self.client.write() {
69 client.state = ClientState::Closed;
70 }
71 }
72}
73
74pub struct DesktopServer {
75 id: ServerId,
76 state: Arc<RwLock<ServerState>>,
77 clients: Arc<RwLock<HashMap<ClientId, Arc<RwLock<Client>>>>>,
78 clients_ids_cached: Vec<ClientId>,
79 messages: VecDeque<MsgData>,
80 ws: Arc<RwLock<Option<WsSender>>>,
81 thread: Option<JoinHandle<()>>,
82}
83
84impl Drop for DesktopServer {
85 fn drop(&mut self) {
86 self.cleanup();
87 }
88}
89
90impl DesktopServer {
91 fn cleanup(&mut self) {
92 {
93 if let Ok(mut state) = self.state.write() {
94 *state = ServerState::Closed;
95 }
96 if let Ok(mut ws) = self.ws.write() {
97 if let Some(ws) = ws.as_ref() {
98 ws.shutdown().unwrap();
99 }
100 *ws = None;
101 }
102 }
103 if let Some(thread) = self.thread.take() {
104 thread.join().unwrap();
105 }
106 }
107}
108
109impl Server for DesktopServer {
110 fn open(url: &str) -> Option<Self> {
111 let url = url.to_owned();
112 let state = Arc::new(RwLock::new(ServerState::Starting));
113 let state2 = state.clone();
114 let clients = Arc::new(RwLock::new(HashMap::default()));
115 let clients2 = clients.clone();
116 let sender = Arc::new(RwLock::new(None));
117 let sender2 = sender.clone();
118 let thread = Some(spawn(move || {
119 let ws = WebSocket::new(|ws| {
120 let id = ClientId::default();
121 let client = Arc::new(RwLock::new(Client {
122 id,
123 ws,
124 state: ClientState::Connecting,
125 messages: VecDeque::new(),
126 }));
127 if let Ok(mut clients) = clients2.write() {
128 clients.insert(id, client.clone());
129 }
130 ClientHandler { client }
131 })
132 .unwrap();
133 if let Ok(mut sender) = sender2.write() {
134 *sender = Some(ws.broadcaster());
135 }
136 if let Ok(mut state) = state2.write() {
137 *state = ServerState::Open;
138 }
139 ws.listen(&url).unwrap();
140 if let Ok(mut sender) = sender2.write() {
141 *sender = None;
142 }
143 if let Ok(mut state) = state2.write() {
144 *state = ServerState::Closed;
145 }
146 }));
147 Some(Self {
148 id: Default::default(),
149 state,
150 clients,
151 clients_ids_cached: vec![],
152 messages: Default::default(),
153 ws: sender,
154 thread,
155 })
156 }
157
158 fn close(mut self) -> Self {
159 self.cleanup();
160 self
161 }
162
163 fn id(&self) -> ServerId {
164 self.id
165 }
166
167 fn state(&self) -> ServerState {
168 if let Ok(state) = self.state.read() {
169 *state
170 } else {
171 ServerState::default()
172 }
173 }
174
175 fn clients(&self) -> &[ClientId] {
176 &self.clients_ids_cached
177 }
178
179 fn disconnect(&mut self, id: ClientId) {
180 if let Ok(mut clients) = self.clients.write() {
181 if let Some(client) = clients.get_mut(&id) {
182 if let Ok(client) = client.read() {
183 drop(client.ws.close(CloseCode::Normal));
184 drop(client.ws.shutdown());
185 }
186 }
187 clients.remove(&id);
188 }
189 }
190
191 fn disconnect_all(&mut self) {
192 if let Ok(mut clients) = self.clients.write() {
193 for client in clients.values() {
194 if let Ok(client) = client.read() {
195 drop(client.ws.close(CloseCode::Normal));
196 drop(client.ws.shutdown());
197 }
198 }
199 clients.clear();
200 }
201 }
202
203 fn send(&mut self, id: ClientId, msg_id: MessageId, data: &[u8]) -> Option<Range<usize>> {
204 if self.state() != ServerState::Open {
205 return None;
206 }
207
208 if let Ok(mut clients) = self.clients.write() {
209 if let Some(client) = clients.get_mut(&id) {
210 let size = data.len();
211 let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
212 drop(stream.write_u32::<BigEndian>(msg_id.id()));
213 drop(stream.write_u32::<BigEndian>(msg_id.version()));
214 drop(stream.write(data));
215 let data = stream.into_inner();
216 if let Ok(client) = client.read() {
217 if client.state == ClientState::Open
218 && client.ws.send(Message::Binary(data)).is_ok()
219 {
220 return Some(0..size);
221 }
222 }
223 }
224 }
225 None
226 }
227
228 fn send_all(&mut self, id: MessageId, data: &[u8]) {
229 if self.state() != ServerState::Open {
230 return;
231 }
232 let size = data.len();
233 let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
234 drop(stream.write_u32::<BigEndian>(id.id()));
235 drop(stream.write_u32::<BigEndian>(id.version()));
236 drop(stream.write(data));
237 let data = stream.into_inner();
238 if let Ok(ws) = self.ws.read() {
239 if let Some(ws) = ws.as_ref() {
240 ws.send(Message::Binary(data)).unwrap();
241 }
242 }
243 }
244
245 fn read(&mut self) -> Option<MsgData> {
246 self.messages.pop_front()
247 }
248
249 fn read_all(&mut self) -> Vec<MsgData> {
250 let result = self.messages.iter().cloned().collect();
251 self.messages.clear();
252 result
253 }
254
255 fn process(&mut self) {
256 if let Ok(mut clients) = self.clients.write() {
257 for client in clients.values() {
258 if let Ok(mut client) = client.write() {
259 self.messages.append(&mut client.messages);
260 }
261 }
262 clients.retain(|_, client| {
263 if let Ok(client) = client.read() {
264 client.state != ClientState::Closed
265 } else {
266 true
267 }
268 });
269 self.clients_ids_cached.clear();
270 for id in clients.keys() {
271 self.clients_ids_cached.push(*id);
272 }
273 }
274 }
275}