oxygengine_network_backend_web/
lib.rs

1extern crate oxygengine_backend_web as backend;
2extern crate oxygengine_core as core;
3extern crate oxygengine_network as network;
4
5use backend::closure::WebClosure;
6use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
7use js_sys::*;
8use network::client::{Client, ClientId, ClientState, MessageId};
9use std::{
10    cell::{Cell, RefCell},
11    collections::VecDeque,
12    io::{Cursor, Write},
13    ops::Range,
14    rc::Rc,
15};
16use wasm_bindgen::{prelude::*, JsCast};
17use web_sys::*;
18
19pub mod prelude {
20    pub use crate::*;
21}
22
23type MsgData = (MessageId, Vec<u8>);
24
25pub struct WebClient {
26    socket: WebSocket,
27    id: ClientId,
28    history_size: Rc<Cell<usize>>,
29    state: Rc<Cell<ClientState>>,
30    #[allow(clippy::type_complexity)]
31    messages: Rc<RefCell<VecDeque<MsgData>>>,
32    on_open_closure: WebClosure,
33    on_close_closure: WebClosure,
34    on_message_closure: WebClosure,
35}
36
37unsafe impl Send for WebClient {}
38unsafe impl Sync for WebClient {}
39
40impl WebClient {
41    pub fn history_size(&self) -> usize {
42        self.history_size.get()
43    }
44
45    pub fn set_history_size(&mut self, value: usize) {
46        self.history_size.set(value);
47    }
48}
49
50impl Client for WebClient {
51    fn open(url: &str) -> Option<Self> {
52        if let Ok(socket) = WebSocket::new(url) {
53            socket.set_binary_type(BinaryType::Arraybuffer);
54            let history_size = Rc::new(Cell::new(0));
55            let state = Rc::new(Cell::new(ClientState::Connecting));
56            let messages = Rc::new(RefCell::new(Default::default()));
57            let on_open_closure = {
58                let state2 = state.clone();
59                let closure = Closure::wrap(Box::new(move |_: Event| {
60                    state2.set(ClientState::Open);
61                }) as Box<dyn FnMut(_)>);
62                socket.set_onopen(Some(closure.as_ref().unchecked_ref()));
63                WebClosure::acquire(closure)
64            };
65            let on_close_closure = {
66                let state2 = state.clone();
67                let closure = Closure::wrap(Box::new(move |_: Event| {
68                    state2.set(ClientState::Closed);
69                }) as Box<dyn FnMut(_)>);
70                socket.set_onclose(Some(closure.as_ref().unchecked_ref()));
71                WebClosure::acquire(closure)
72            };
73            let on_message_closure = {
74                let history_size2 = history_size.clone();
75                let messages2 = messages.clone();
76                let closure = Closure::wrap(Box::new(move |event: MessageEvent| {
77                    let buff = event.data();
78                    if buff.is_instance_of::<ArrayBuffer>() {
79                        let typebuf: Uint8Array = Uint8Array::new(&buff);
80                        let mut body = vec![0; typebuf.length() as usize];
81                        typebuf.copy_to(&mut body[..]);
82                        let mut stream = Cursor::new(body);
83                        if let Ok(id) = stream.read_u32::<BigEndian>() {
84                            if let Ok(version) = stream.read_u32::<BigEndian>() {
85                                let id = MessageId::new(id, version);
86                                let data = stream.into_inner()[8..].to_vec();
87                                let messages: &mut VecDeque<_> = &mut messages2.borrow_mut();
88                                messages.push_back((id, data));
89                                let history_size = history_size2.get();
90                                if history_size > 0 {
91                                    while messages.len() > history_size {
92                                        messages.pop_front();
93                                    }
94                                }
95                            }
96                        }
97                    }
98                }) as Box<dyn FnMut(_)>);
99                socket.set_onmessage(Some(closure.as_ref().unchecked_ref()));
100                WebClosure::acquire(closure)
101            };
102            Some(Self {
103                socket,
104                id: Default::default(),
105                history_size,
106                state,
107                messages,
108                on_open_closure,
109                on_close_closure,
110                on_message_closure,
111            })
112        } else {
113            None
114        }
115    }
116
117    fn close(mut self) -> Self {
118        self.on_open_closure.release();
119        self.on_close_closure.release();
120        self.on_message_closure.release();
121        if self.state.get() != ClientState::Closed {
122            drop(self.socket.close());
123            self.state.set(ClientState::Closed);
124        }
125        self
126    }
127
128    fn id(&self) -> ClientId {
129        self.id
130    }
131
132    fn state(&self) -> ClientState {
133        self.state.get()
134    }
135
136    fn send(&mut self, id: MessageId, data: &[u8]) -> Option<Range<usize>> {
137        if self.state.get() == ClientState::Open {
138            let size = data.len();
139            let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
140            drop(stream.write_u32::<BigEndian>(id.id()));
141            drop(stream.write_u32::<BigEndian>(id.version()));
142            drop(stream.write(data));
143            let data = stream.into_inner();
144            if self.socket.send_with_u8_array(&data).is_ok() {
145                return Some(0..size);
146            }
147        }
148        None
149    }
150
151    fn read(&mut self) -> Option<MsgData> {
152        self.messages.borrow_mut().pop_front()
153    }
154
155    fn read_all(&mut self) -> Vec<MsgData> {
156        let mut messages = self.messages.borrow_mut();
157        let result = messages.iter().cloned().collect();
158        messages.clear();
159        result
160    }
161}