oxygengine_network_backend_native/
client.rs

1use crate::utils::DoOnDrop;
2use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
3use network::client::{Client, ClientId, ClientState, MessageId};
4use std::{
5    collections::VecDeque,
6    io::{Cursor, ErrorKind, Read, Write},
7    net::{Shutdown, TcpStream},
8    ops::Range,
9    sync::{
10        atomic::{AtomicUsize, Ordering},
11        mpsc::{channel, Sender},
12        Arc, Mutex, RwLock,
13    },
14    thread::{sleep, Builder as ThreadBuilder, JoinHandle},
15    time::Duration,
16};
17
18const STREAM_SLEEP_MS: u64 = 10;
19
20type MsgData = (MessageId, Vec<u8>);
21
22pub struct NativeClient {
23    id: ClientId,
24    history_size: Arc<AtomicUsize>,
25    state: Arc<RwLock<ClientState>>,
26    messages: Arc<RwLock<VecDeque<MsgData>>>,
27    thread: Option<JoinHandle<()>>,
28    sender: Arc<Mutex<Sender<Vec<u8>>>>,
29}
30
31impl Drop for NativeClient {
32    fn drop(&mut self) {
33        self.cleanup();
34    }
35}
36
37impl NativeClient {
38    pub fn history_size(&self) -> usize {
39        self.history_size.load(Ordering::Relaxed)
40    }
41
42    pub fn set_history_size(&mut self, value: usize) {
43        self.history_size.store(value, Ordering::Relaxed);
44    }
45
46    fn cleanup(&mut self) {
47        if let Ok(mut state) = self.state.write() {
48            *state = ClientState::Closed;
49        }
50        if let Some(thread) = self.thread.take() {
51            thread.join().unwrap();
52        }
53    }
54
55    fn read_message(buffer: &[u8]) -> (MessageId, usize) {
56        let mut stream = Cursor::new(buffer);
57        let id = stream.read_u32::<BigEndian>().unwrap();
58        let version = stream.read_u32::<BigEndian>().unwrap();
59        let size = stream.read_u32::<BigEndian>().unwrap();
60        (MessageId::new(id, version), size as usize)
61    }
62}
63
64impl From<TcpStream> for NativeClient {
65    fn from(mut stream: TcpStream) -> Self {
66        let id = ClientId::default();
67        let url = stream.peer_addr().unwrap().to_string();
68        let state = Arc::new(RwLock::new(ClientState::Connecting));
69        let state2 = state.clone();
70        let history_size = Arc::new(AtomicUsize::new(0));
71        let history_size2 = history_size.clone();
72        let messages = Arc::new(RwLock::new(VecDeque::<MsgData>::default()));
73        let messages2 = messages.clone();
74        let (sender, receiver) = channel::<Vec<u8>>();
75        let thread = Some(
76            ThreadBuilder::new()
77                .name(format!("Client: {:?}", id))
78                .spawn(move || {
79                    let state3 = state2.clone();
80                    let _ = DoOnDrop::new(move || {
81                        if let Ok(mut state) = state3.write() {
82                            *state = ClientState::Closed;
83                        }
84                    });
85                    stream.set_nonblocking(true).unwrap_or_else(|_| {
86                        panic!(
87                            "Client {:?} cannot set non-blocking streaming on: {}",
88                            id, &url
89                        )
90                    });
91                    stream.set_nodelay(true).unwrap_or_else(|_| {
92                        panic!("Client {:?} cannot set no-delay streaming on: {}", id, &url,)
93                    });
94                    if let Ok(mut state) = state2.write() {
95                        *state = ClientState::Open;
96                    }
97                    let mut header = vec![0; 12];
98                    let mut left_to_read: Option<(MessageId, usize, Vec<u8>)> = None;
99                    'main: loop {
100                        if let Ok(state) = state2.read() {
101                            if *state == ClientState::Closed {
102                                break;
103                            }
104                        }
105                        loop {
106                            let reset = if let Some((lfr_msg, lfr_size, lfr_buff)) =
107                                &mut left_to_read
108                            {
109                                let mut buffer = vec![0; *lfr_size];
110                                match stream.read(&mut buffer) {
111                                    Ok(size) => {
112                                        lfr_buff.extend_from_slice(&buffer[0..size]);
113                                        if size >= *lfr_size {
114                                            if let Ok(mut messages) = messages2.write() {
115                                                messages.push_back((*lfr_msg, lfr_buff.clone()));
116                                            }
117                                            true
118                                        } else {
119                                            false
120                                        }
121                                    }
122                                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
123                                        break;
124                                    }
125                                    Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
126                                        break 'main;
127                                    }
128                                    Err(e) => panic!(
129                                        "Client {:?} reading body {} got IO error: {}",
130                                        id, &url, e
131                                    ),
132                                }
133                            } else {
134                                match stream.read_exact(&mut header) {
135                                    Ok(()) => {
136                                        let (msg, size) = Self::read_message(&header);
137                                        if size > 0 {
138                                            left_to_read = Some((msg, size, vec![]));
139                                        } else if let Ok(mut messages) = messages2.write() {
140                                            messages.push_back((msg, vec![]));
141                                        }
142                                    }
143                                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
144                                        break;
145                                    }
146                                    Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
147                                        break 'main;
148                                    }
149                                    Err(e) => panic!(
150                                        "Client {:?} reading header {} got IO error: {}",
151                                        id, &url, e
152                                    ),
153                                }
154                                false
155                            };
156                            if reset {
157                                left_to_read = None;
158                            }
159                        }
160                        {
161                            let history_size = history_size2.load(Ordering::Relaxed);
162                            if history_size > 0 {
163                                if let Ok(mut messages) = messages2.write() {
164                                    while messages.len() > history_size {
165                                        messages.pop_front();
166                                    }
167                                }
168                            }
169                        }
170                        while let Ok(data) = receiver.try_recv() {
171                            stream.write_all(&data).unwrap();
172                        }
173                        sleep(Duration::from_millis(STREAM_SLEEP_MS));
174                    }
175                    if let Ok(mut state) = state2.write() {
176                        *state = ClientState::Closed;
177                    }
178                })
179                .unwrap(),
180        );
181        Self {
182            id,
183            history_size,
184            state,
185            messages,
186            thread,
187            sender: Arc::new(Mutex::new(sender)),
188        }
189    }
190}
191
192impl Client for NativeClient {
193    fn open(url: &str) -> Option<Self> {
194        let id = ClientId::default();
195        let url = url.to_owned();
196        let state = Arc::new(RwLock::new(ClientState::Connecting));
197        let state2 = state.clone();
198        let history_size = Arc::new(AtomicUsize::new(0));
199        let history_size2 = history_size.clone();
200        let messages = Arc::new(RwLock::new(VecDeque::<MsgData>::default()));
201        let messages2 = messages.clone();
202        let (sender, receiver) = channel::<Vec<u8>>();
203        let thread = Some(
204            ThreadBuilder::new()
205                .name(format!("Client: {:?}", id))
206                .spawn(move || {
207                    let state3 = state2.clone();
208                    let _ = DoOnDrop::new(move || {
209                        if let Ok(mut state) = state3.write() {
210                            *state = ClientState::Closed;
211                        }
212                    });
213                    let mut stream = TcpStream::connect(&url)
214                        .unwrap_or_else(|_| panic!("Client {:?} cannot connect to: {}", id, &url));
215                    stream.set_nonblocking(true).unwrap_or_else(|_| {
216                        panic!(
217                            "Client {:?} cannot set non-blocking streaming on: {}",
218                            id, &url
219                        )
220                    });
221                    stream.set_nodelay(true).unwrap_or_else(|_| {
222                        panic!("Client {:?} cannot set no-delay streaming on: {}", id, &url,)
223                    });
224                    if let Ok(mut state) = state2.write() {
225                        *state = ClientState::Open;
226                    }
227                    let mut header = vec![0; 12];
228                    let mut left_to_read: Option<(MessageId, usize, Vec<u8>)> = None;
229                    'main: loop {
230                        if let Ok(state) = state2.read() {
231                            if *state == ClientState::Closed {
232                                break;
233                            }
234                        }
235                        loop {
236                            let reset = if let Some((lfr_msg, lfr_size, lfr_buff)) =
237                                &mut left_to_read
238                            {
239                                let mut buffer = vec![0; *lfr_size];
240                                match stream.read(&mut buffer) {
241                                    Ok(size) => {
242                                        lfr_buff.extend_from_slice(&buffer[0..size]);
243                                        if size >= *lfr_size {
244                                            if let Ok(mut messages) = messages2.write() {
245                                                messages.push_back((*lfr_msg, lfr_buff.clone()));
246                                            }
247                                            true
248                                        } else {
249                                            false
250                                        }
251                                    }
252                                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
253                                        break;
254                                    }
255                                    Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
256                                        break 'main;
257                                    }
258                                    Err(e) => panic!(
259                                        "Client {:?} reading body {} got IO error: {}",
260                                        id, &url, e
261                                    ),
262                                }
263                            } else {
264                                match stream.read_exact(&mut header) {
265                                    Ok(()) => {
266                                        let (msg, size) = Self::read_message(&header);
267                                        if size > 0 {
268                                            left_to_read = Some((msg, size, vec![]));
269                                        } else if let Ok(mut messages) = messages2.write() {
270                                            messages.push_back((msg, vec![]));
271                                        }
272                                    }
273                                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
274                                        break;
275                                    }
276                                    Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
277                                        break 'main;
278                                    }
279                                    Err(e) => panic!(
280                                        "Client {:?} reading header {} got IO error: {}",
281                                        id, &url, e
282                                    ),
283                                }
284                                false
285                            };
286                            if reset {
287                                left_to_read = None;
288                            }
289                        }
290                        {
291                            let history_size = history_size2.load(Ordering::Relaxed);
292                            if history_size > 0 {
293                                if let Ok(mut messages) = messages2.write() {
294                                    while messages.len() > history_size {
295                                        messages.pop_front();
296                                    }
297                                }
298                            }
299                        }
300                        while let Ok(data) = receiver.try_recv() {
301                            stream.write_all(&data).unwrap();
302                        }
303                        sleep(Duration::from_millis(STREAM_SLEEP_MS));
304                    }
305                    stream.shutdown(Shutdown::Both).unwrap();
306                    if let Ok(mut state) = state2.write() {
307                        *state = ClientState::Closed;
308                    }
309                })
310                .unwrap(),
311        );
312        Some(Self {
313            id,
314            history_size,
315            state,
316            messages,
317            thread,
318            sender: Arc::new(Mutex::new(sender)),
319        })
320    }
321
322    fn close(mut self) -> Self {
323        self.cleanup();
324        self
325    }
326
327    fn id(&self) -> ClientId {
328        self.id
329    }
330
331    fn state(&self) -> ClientState {
332        if let Ok(state) = self.state.read() {
333            *state
334        } else {
335            ClientState::default()
336        }
337    }
338
339    fn send(&mut self, id: MessageId, data: &[u8]) -> Option<Range<usize>> {
340        if self.state() == ClientState::Open {
341            let size = data.len();
342            let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 12));
343            drop(stream.write_u32::<BigEndian>(id.id()));
344            drop(stream.write_u32::<BigEndian>(id.version()));
345            drop(stream.write_u32::<BigEndian>(size as u32));
346            drop(stream.write(data));
347            let data = stream.into_inner();
348            if self.sender.lock().unwrap().send(data).is_ok() {
349                return Some(0..size);
350            }
351        }
352        None
353    }
354
355    fn read(&mut self) -> Option<MsgData> {
356        if let Ok(mut messages) = self.messages.write() {
357            messages.pop_front()
358        } else {
359            None
360        }
361    }
362
363    fn read_all(&mut self) -> Vec<MsgData> {
364        if let Ok(mut messages) = self.messages.write() {
365            messages.drain(..).collect()
366        } else {
367            vec![]
368        }
369    }
370}