oxygengine_network_backend_web/
lib.rs1extern crate oxygengine_backend_web as backend;
2extern crate oxygengine_core as core;
3extern crate oxygengine_network as network;
4
5use backend::closure::WebClosure;
6use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
7use js_sys::*;
8use network::client::{Client, ClientId, ClientState, MessageId};
9use std::{
10 cell::{Cell, RefCell},
11 collections::VecDeque,
12 io::{Cursor, Write},
13 ops::Range,
14 rc::Rc,
15};
16use wasm_bindgen::{prelude::*, JsCast};
17use web_sys::*;
18
19pub mod prelude {
20 pub use crate::*;
21}
22
23type MsgData = (MessageId, Vec<u8>);
24
25pub struct WebClient {
26 socket: WebSocket,
27 id: ClientId,
28 history_size: Rc<Cell<usize>>,
29 state: Rc<Cell<ClientState>>,
30 #[allow(clippy::type_complexity)]
31 messages: Rc<RefCell<VecDeque<MsgData>>>,
32 on_open_closure: WebClosure,
33 on_close_closure: WebClosure,
34 on_message_closure: WebClosure,
35}
36
37unsafe impl Send for WebClient {}
38unsafe impl Sync for WebClient {}
39
40impl WebClient {
41 pub fn history_size(&self) -> usize {
42 self.history_size.get()
43 }
44
45 pub fn set_history_size(&mut self, value: usize) {
46 self.history_size.set(value);
47 }
48}
49
50impl Client for WebClient {
51 fn open(url: &str) -> Option<Self> {
52 if let Ok(socket) = WebSocket::new(url) {
53 socket.set_binary_type(BinaryType::Arraybuffer);
54 let history_size = Rc::new(Cell::new(0));
55 let state = Rc::new(Cell::new(ClientState::Connecting));
56 let messages = Rc::new(RefCell::new(Default::default()));
57 let on_open_closure = {
58 let state2 = state.clone();
59 let closure = Closure::wrap(Box::new(move |_: Event| {
60 state2.set(ClientState::Open);
61 }) as Box<dyn FnMut(_)>);
62 socket.set_onopen(Some(closure.as_ref().unchecked_ref()));
63 WebClosure::acquire(closure)
64 };
65 let on_close_closure = {
66 let state2 = state.clone();
67 let closure = Closure::wrap(Box::new(move |_: Event| {
68 state2.set(ClientState::Closed);
69 }) as Box<dyn FnMut(_)>);
70 socket.set_onclose(Some(closure.as_ref().unchecked_ref()));
71 WebClosure::acquire(closure)
72 };
73 let on_message_closure = {
74 let history_size2 = history_size.clone();
75 let messages2 = messages.clone();
76 let closure = Closure::wrap(Box::new(move |event: MessageEvent| {
77 let buff = event.data();
78 if buff.is_instance_of::<ArrayBuffer>() {
79 let typebuf: Uint8Array = Uint8Array::new(&buff);
80 let mut body = vec![0; typebuf.length() as usize];
81 typebuf.copy_to(&mut body[..]);
82 let mut stream = Cursor::new(body);
83 if let Ok(id) = stream.read_u32::<BigEndian>() {
84 if let Ok(version) = stream.read_u32::<BigEndian>() {
85 let id = MessageId::new(id, version);
86 let data = stream.into_inner()[8..].to_vec();
87 let messages: &mut VecDeque<_> = &mut messages2.borrow_mut();
88 messages.push_back((id, data));
89 let history_size = history_size2.get();
90 if history_size > 0 {
91 while messages.len() > history_size {
92 messages.pop_front();
93 }
94 }
95 }
96 }
97 }
98 }) as Box<dyn FnMut(_)>);
99 socket.set_onmessage(Some(closure.as_ref().unchecked_ref()));
100 WebClosure::acquire(closure)
101 };
102 Some(Self {
103 socket,
104 id: Default::default(),
105 history_size,
106 state,
107 messages,
108 on_open_closure,
109 on_close_closure,
110 on_message_closure,
111 })
112 } else {
113 None
114 }
115 }
116
117 fn close(mut self) -> Self {
118 self.on_open_closure.release();
119 self.on_close_closure.release();
120 self.on_message_closure.release();
121 if self.state.get() != ClientState::Closed {
122 drop(self.socket.close());
123 self.state.set(ClientState::Closed);
124 }
125 self
126 }
127
128 fn id(&self) -> ClientId {
129 self.id
130 }
131
132 fn state(&self) -> ClientState {
133 self.state.get()
134 }
135
136 fn send(&mut self, id: MessageId, data: &[u8]) -> Option<Range<usize>> {
137 if self.state.get() == ClientState::Open {
138 let size = data.len();
139 let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
140 drop(stream.write_u32::<BigEndian>(id.id()));
141 drop(stream.write_u32::<BigEndian>(id.version()));
142 drop(stream.write(data));
143 let data = stream.into_inner();
144 if self.socket.send_with_u8_array(&data).is_ok() {
145 return Some(0..size);
146 }
147 }
148 None
149 }
150
151 fn read(&mut self) -> Option<MsgData> {
152 self.messages.borrow_mut().pop_front()
153 }
154
155 fn read_all(&mut self) -> Vec<MsgData> {
156 let mut messages = self.messages.borrow_mut();
157 let result = messages.iter().cloned().collect();
158 messages.clear();
159 result
160 }
161}