hassium_network_backend_web/
lib.rs

1extern crate byteorder;
2extern crate hassium_core as core;
3extern crate hassium_network as network;
4
5use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6use js_sys::{ArrayBuffer, Uint8Array};
7use network::client::{Client, ClientID, ClientState, MessageID};
8use std::{
9    cell::{Cell, RefCell},
10    collections::VecDeque,
11    io::{Cursor, Write},
12    ops::Range,
13    rc::Rc,
14};
15use wasm_bindgen::{prelude::*, JsCast};
16use web_sys::*;
17
18pub mod prelude {
19    pub use crate::*;
20}
21
22type MsgData = (MessageID, Vec<u8>);
23
24#[derive(Clone)]
25pub struct WebClient {
26    socket: WebSocket,
27    id: ClientID,
28    history_size: Rc<Cell<usize>>,
29    state: Rc<Cell<ClientState>>,
30    #[allow(clippy::type_complexity)]
31    messages: Rc<RefCell<VecDeque<MsgData>>>,
32}
33
34unsafe impl Send for WebClient {}
35unsafe impl Sync for WebClient {}
36
37impl WebClient {
38    pub fn history_size(&self) -> usize {
39        self.history_size.get()
40    }
41
42    pub fn set_history_size(&mut self, value: usize) {
43        self.history_size.set(value);
44    }
45}
46
47impl Client for WebClient {
48    fn open(url: &str) -> Option<Self> {
49        if let Ok(socket) = WebSocket::new(url) {
50            socket.set_binary_type(BinaryType::Arraybuffer);
51            let history_size = Rc::new(Cell::new(0));
52            let state = Rc::new(Cell::new(ClientState::Connecting));
53            let messages = Rc::new(RefCell::new(Default::default()));
54            {
55                let state2 = state.clone();
56                let closure = Closure::wrap(Box::new(move |_: Event| {
57                    state2.set(ClientState::Open);
58                }) as Box<dyn FnMut(_)>);
59                socket.set_onopen(Some(closure.as_ref().unchecked_ref()));
60                closure.forget();
61            }
62            {
63                let state2 = state.clone();
64                let closure = Closure::wrap(Box::new(move |_: Event| {
65                    state2.set(ClientState::Closed);
66                }) as Box<dyn FnMut(_)>);
67                socket.set_onclose(Some(closure.as_ref().unchecked_ref()));
68                closure.forget();
69            }
70            {
71                let history_size2 = history_size.clone();
72                let messages2 = messages.clone();
73                let closure = Closure::wrap(Box::new(move |event: MessageEvent| {
74                    let buff = event.data();
75                    if buff.is_instance_of::<ArrayBuffer>() {
76                        let typebuf: Uint8Array = Uint8Array::new(&buff);
77                        let mut body = vec![0; typebuf.length() as usize];
78                        typebuf.copy_to(&mut body[..]);
79                        let mut stream = Cursor::new(body);
80                        if let Ok(id) = stream.read_u32::<BigEndian>() {
81                            if let Ok(version) = stream.read_u32::<BigEndian>() {
82                                let id = MessageID::new(id, version);
83                                let data = stream.into_inner()[8..].to_vec();
84                                let messages: &mut VecDeque<_> = &mut messages2.borrow_mut();
85                                messages.push_back((id, data));
86                                let history_size = history_size2.get();
87                                if history_size > 0 {
88                                    while messages.len() > history_size {
89                                        messages.pop_front();
90                                    }
91                                }
92                            }
93                        }
94                    }
95                }) as Box<dyn FnMut(_)>);
96                socket.set_onmessage(Some(closure.as_ref().unchecked_ref()));
97                closure.forget();
98            }
99            Some(Self {
100                socket,
101                id: Default::default(),
102                history_size,
103                state,
104                messages,
105            })
106        } else {
107            None
108        }
109    }
110
111    fn close(self) -> Self {
112        // TODO: unbind closures from socket.
113        if self.state.get() != ClientState::Closed {
114            drop(self.socket.close());
115            self.state.set(ClientState::Closed);
116        }
117        self
118    }
119
120    fn id(&self) -> ClientID {
121        self.id
122    }
123
124    fn state(&self) -> ClientState {
125        self.state.get()
126    }
127
128    fn send(&mut self, id: MessageID, data: &[u8]) -> Option<Range<usize>> {
129        if self.state.get() == ClientState::Open {
130            let size = data.len();
131            let mut stream = Cursor::new(Vec::<u8>::with_capacity(size + 8));
132            drop(stream.write_u32::<BigEndian>(id.id()));
133            drop(stream.write_u32::<BigEndian>(id.version()));
134            drop(stream.write(data));
135            let mut data = stream.into_inner();
136            if self.socket.send_with_u8_array(&mut data).is_ok() {
137                return Some(0..size);
138            }
139        }
140        None
141    }
142
143    fn read(&mut self) -> Option<MsgData> {
144        self.messages.borrow_mut().pop_front()
145    }
146
147    fn read_all(&mut self) -> Vec<MsgData> {
148        let mut messages = self.messages.borrow_mut();
149        let result = messages.iter().cloned().collect();
150        messages.clear();
151        result
152    }
153}