oxygengine_network_backend_native/
client.rs1use crate::utils::DoOnDrop;
2use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
3use network::client::{Client, ClientId, ClientState, MessageId};
4use std::{
5 collections::VecDeque,
6 io::{Cursor, ErrorKind, Read, Write},
7 net::{Shutdown, TcpStream},
8 ops::Range,
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 mpsc::{channel, Sender},
12 Arc, Mutex, RwLock,
13 },
14 thread::{sleep, Builder as ThreadBuilder, JoinHandle},
15 time::Duration,
16};
17
18const STREAM_SLEEP_MS: u64 = 10;
19
20type MsgData = (MessageId, Vec<u8>);
21
22pub struct NativeClient {
23 id: ClientId,
24 history_size: Arc<AtomicUsize>,
25 state: Arc<RwLock<ClientState>>,
26 messages: Arc<RwLock<VecDeque<MsgData>>>,
27 thread: Option<JoinHandle<()>>,
28 sender: Arc<Mutex<Sender<Vec<u8>>>>,
29}
30
31impl Drop for NativeClient {
32 fn drop(&mut self) {
33 self.cleanup();
34 }
35}
36
37impl NativeClient {
38 pub fn history_size(&self) -> usize {
39 self.history_size.load(Ordering::Relaxed)
40 }
41
42 pub fn set_history_size(&mut self, value: usize) {
43 self.history_size.store(value, Ordering::Relaxed);
44 }
45
46 fn cleanup(&mut self) {
47 if let Ok(mut state) = self.state.write() {
48 *state = ClientState::Closed;
49 }
50 if let Some(thread) = self.thread.take() {
51 thread.join().unwrap();
52 }
53 }
54
55 fn read_message(buffer: &[u8]) -> (MessageId, usize) {
56 let mut stream = Cursor::new(buffer);
57 let id = stream.read_u32::<BigEndian>().unwrap();
58 let version = stream.read_u32::<BigEndian>().unwrap();
59 let size = stream.read_u32::<BigEndian>().unwrap();
60 (MessageId::new(id, version), size as usize)
61 }
62}
63
64impl From<TcpStream> for NativeClient {
65 fn from(mut stream: TcpStream) -> Self {
66 let id = ClientId::default();
67 let url = stream.peer_addr().unwrap().to_string();
68 let state = Arc::new(RwLock::new(ClientState::Connecting));
69 let state2 = state.clone();
70 let history_size = Arc::new(AtomicUsize::new(0));
71 let history_size2 = history_size.clone();
72 let messages = Arc::new(RwLock::new(VecDeque::<MsgData>::default()));
73 let messages2 = messages.clone();
74 let (sender, receiver) = channel::<Vec<u8>>();
75 let thread = Some(
76 ThreadBuilder::new()
77 .name(format!("Client: {:?}", id))
78 .spawn(move || {
79 let state3 = state2.clone();
80 let _ = DoOnDrop::new(move || {
81 if let Ok(mut state) = state3.write() {
82 *state = ClientState::Closed;
83 }
84 });
85 stream.set_nonblocking(true).unwrap_or_else(|_| {
86 panic!(
87 "Client {:?} cannot set non-blocking streaming on: {}",
88 id, &url
89 )
90 });
91 stream.set_nodelay(true).unwrap_or_else(|_| {
92 panic!("Client {:?} cannot set no-delay streaming on: {}", id, &url,)
93 });
94 if let Ok(mut state) = state2.write() {
95 *state = ClientState::Open;
96 }
97 let mut header = vec![0; 12];
98 let mut left_to_read: Option<(MessageId, usize, Vec<u8>)> = None;
99 'main: loop {
100 if let Ok(state) = state2.read() {
101 if *state == ClientState::Closed {
102 break;
103 }
104 }
105 loop {
106 let reset = if let Some((lfr_msg, lfr_size, lfr_buff)) =
107 &mut left_to_read
108 {
109 let mut buffer = vec![0; *lfr_size];
110 match stream.read(&mut buffer) {
111 Ok(size) => {
112 lfr_buff.extend_from_slice(&buffer[0..size]);
113 if size >= *lfr_size {
114 if let Ok(mut messages) = messages2.write() {
115 messages.push_back((*lfr_msg, lfr_buff.clone()));
116 }
117 true
118 } else {
119 false
120 }
121 }
122 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
123 break;
124 }
125 Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
126 break 'main;
127 }
128 Err(e) => panic!(
129 "Client {:?} reading body {} got IO error: {}",
130 id, &url, e
131 ),
132 }
133 } else {
134 match stream.read_exact(&mut header) {
135 Ok(()) => {
136 let (msg, size) = Self::read_message(&header);
137 if size > 0 {
138 left_to_read = Some((msg, size, vec![]));
139 } else if let Ok(mut messages) = messages2.write() {
140 messages.push_back((msg, vec![]));
141 }
142 }
143 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
144 break;
145 }
146 Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
147 break 'main;
148 }
149 Err(e) => panic!(
150 "Client {:?} reading header {} got IO error: {}",
151 id, &url, e
152 ),
153 }
154 false
155 };
156 if reset {
157 left_to_read = None;
158 }
159 }
160 {
161 let history_size = history_size2.load(Ordering::Relaxed);
162 if history_size > 0 {
163 if let Ok(mut messages) = messages2.write() {
164 while messages.len() > history_size {
165 messages.pop_front();
166 }
167 }
168 }
169 }
170 while let Ok(data) = receiver.try_recv() {
171 stream.write_all(&data).unwrap();
172 }
173 sleep(Duration::from_millis(STREAM_SLEEP_MS));
174 }
175 if let Ok(mut state) = state2.write() {
176 *state = ClientState::Closed;
177 }
178 })
179 .unwrap(),
180 );
181 Self {
182 id,
183 history_size,
184 state,
185 messages,
186 thread,
187 sender: Arc::new(Mutex::new(sender)),
188 }
189 }
190}
191
192impl Client for NativeClient {
193 fn open(url: &str) -> Option<Self> {
194 let id = ClientId::default();
195 let url = url.to_owned();
196 let state = Arc::new(RwLock::new(ClientState::Connecting));
197 let state2 = state.clone();
198 let history_size = Arc::new(AtomicUsize::new(0));
199 let history_size2 = history_size.clone();
200 let messages = Arc::new(RwLock::new(VecDeque::<MsgData>::default()));
201 let messages2 = messages.clone();
202 let (sender, receiver) = channel::<Vec<u8>>();
203 let thread = Some(
204 ThreadBuilder::new()
205 .name(format!("Client: {:?}", id))
206 .spawn(move || {
207 let state3 = state2.clone();
208 let _ = DoOnDrop::new(move || {
209 if let Ok(mut state) = state3.write() {
210 *state = ClientState::Closed;
211 }
212 });
213 let mut stream = TcpStream::connect(&url)
214 .unwrap_or_else(|_| panic!("Client {:?} cannot connect to: {}", id, &url));
215 stream.set_nonblocking(true).unwrap_or_else(|_| {
216 panic!(
217 "Client {:?} cannot set non-blocking streaming on: {}",
218 id, &url
219 )
220 });
221 stream.set_nodelay(true).unwrap_or_else(|_| {
222 panic!("Client {:?} cannot set no-delay streaming on: {}", id, &url,)
223 });
224 if let Ok(mut state) = state2.write() {
225 *state = ClientState::Open;
226 }
227 let mut header = vec![0; 12];
228 let mut left_to_read: Option<(MessageId, usize, Vec<u8>)> = None;
229 'main: loop {
230 if let Ok(state) = state2.read() {
231 if *state == ClientState::Closed {
232 break;
233 }
234 }
235 loop {
236 let reset = if let Some((lfr_msg, lfr_size, lfr_buff)) =
237 &mut left_to_read
238 {
239 let mut buffer = vec![0; *lfr_size];
240 match stream.read(&mut buffer) {
241 Ok(size) => {
242 lfr_buff.extend_from_slice(&buffer[0..size]);
243 if size >= *lfr_size {
244 if let Ok(mut messages) = messages2.write() {
245 messages.push_back((*lfr_msg, lfr_buff.clone()));
246 }
247 true
248 } else {
249 false
250 }
251 }
252 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
253 break;
254 }
255 Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
256 break 'main;
257 }
258 Err(e) => panic!(
259 "Client {:?} reading body {} got IO error: {}",
260 id, &url, e
261 ),
262 }
263 } else {
264 match stream.read_exact(&mut header) {
265 Ok(()) => {
266 let (msg, size) = Self::read_message(&header);
267 if size > 0 {
268 left_to_read = Some((msg, size, vec![]));
269 } else if let Ok(mut messages) = messages2.write() {
270 messages.push_back((msg, vec![]));
271 }
272 }
273 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
274 break;
275 }
276 Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
277 break 'main;
278 }
279 Err(e) => panic!(
280 "Client {:?} reading header {} got IO error: {}",
281 id, &url, e
282 ),
283 }
284 false
285 };
286 if reset {
287 left_to_read = None;
288 }
289 }
290 {
291 let history_size = history_size2.load(Ordering::Relaxed);
292 if history_size > 0 {
293 if let Ok(mut messages) = messages2.write() {
294 while messages.len() > history_size {
295 messages.pop_front();
296 }
297 }
298 }
299 }
300 while let Ok(data) = receiver.try_recv() {
301 stream.write_all(&data).unwrap();
302 }
303 sleep(Duration::from_millis(STREAM_SLEEP_MS));
304 }
305 stream.shutdown(Shutdown::Both).unwrap();
306 if let Ok(mut state) = state2.write() {
307 *state = ClientState::Closed;
308 }
309 })
310 .unwrap(),
311 );
312 Some(Self {
313 id,
314 history_size,
315 state,
316 messages,
317 thread,
318 sender: Arc::new(Mutex::new(sender)),
319 })
320 }
321
322 fn close(mut self) -> Self {
323 self.cleanup();
324 self
325 }
326
327 fn id(&self) -> ClientId {
328 self.id
329 }
330
331 fn state(&self) -> ClientState {
332 if let Ok(state) = self.state.read() {
333 *state
334 } else {
335 ClientState::default()
336 }
337 }
338
339 fn send(&mut self, id: MessageId, data: &[u8]) -> Option<Range<usize>> {
340 if self.state() == ClientState::Open {
341 let size = data.len();
342 let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 12));
343 drop(stream.write_u32::<BigEndian>(id.id()));
344 drop(stream.write_u32::<BigEndian>(id.version()));
345 drop(stream.write_u32::<BigEndian>(size as u32));
346 drop(stream.write(data));
347 let data = stream.into_inner();
348 if self.sender.lock().unwrap().send(data).is_ok() {
349 return Some(0..size);
350 }
351 }
352 None
353 }
354
355 fn read(&mut self) -> Option<MsgData> {
356 if let Ok(mut messages) = self.messages.write() {
357 messages.pop_front()
358 } else {
359 None
360 }
361 }
362
363 fn read_all(&mut self) -> Vec<MsgData> {
364 if let Ok(mut messages) = self.messages.write() {
365 messages.drain(..).collect()
366 } else {
367 vec![]
368 }
369 }
370}