#![recursion_limit = "256"]
#[cfg(not(target_arch = "wasm32"))]
extern crate websocket;
#[cfg(target_arch = "wasm32")]
#[macro_use]
extern crate stdweb;
extern crate simple_error;
#[cfg(target_arch = "wasm32")]
extern crate wasm_bindgen;
use simple_error::*;
#[cfg(target_arch = "wasm32")]
use std::cell::RefCell;
use std::error::Error;
#[cfg(not(target_arch = "wasm32"))]
use std::io::ErrorKind;
#[cfg(not(target_arch = "wasm32"))]
use std::marker::PhantomData;
#[cfg(target_arch = "wasm32")]
use std::rc::Rc;
#[cfg(target_arch = "wasm32")]
use stdweb::web;
#[cfg(target_arch = "wasm32")]
use stdweb::Value;
#[cfg(not(target_arch = "wasm32"))]
use websocket::client::sync::Client;
#[cfg(not(target_arch = "wasm32"))]
use websocket::stream::sync::NetworkStream;
#[cfg(not(target_arch = "wasm32"))]
use websocket::*;
#[derive(Debug, Clone)]
pub enum SocketMessage {
Text(String),
Binary(Vec<u8>),
}
#[cfg(target_arch = "wasm32")]
struct SocketState {
queued: Rc<RefCell<Vec<SocketMessage>>>,
received: Rc<RefCell<Vec<SocketMessage>>>,
disconnected: Rc<RefCell<bool>>,
error: Rc<RefCell<bool>>,
}
#[cfg(target_arch = "wasm32")]
pub struct Socket {
js_obj: stdweb::Value,
state: SocketState,
}
#[cfg(target_arch = "wasm32")]
impl Socket {
pub fn new(url: String) -> Result<Socket, Box<dyn Error>> {
stdweb::initialize();
let state = SocketState {
queued: Rc::new(RefCell::new(vec![])),
received: Rc::new(RefCell::new(vec![])),
disconnected: Rc::new(RefCell::new(false)),
error: Rc::new(RefCell::new(false)),
};
let queued = state.queued.clone();
let received = state.received.clone();
let received2 = state.received.clone();
let disconnected = state.disconnected.clone();
let error = state.error.clone();
let get_queued = move || -> stdweb::Array {
let mut queued = queued.borrow_mut();
let queued: Vec<Value> = queued
.drain(..)
.map(|x| match x {
SocketMessage::Text(data) => Value::String(data),
SocketMessage::Binary(data) => {
let typed_array = web::TypedArray::from(&data[..]);
let data = typed_array.buffer();
stdweb::Value::Reference(stdweb::Reference::from(data))
}
}).collect();
stdweb::Array::from(queued)
};
let add_received_text = move |msg: String| {
let mut received = received.borrow_mut();
received.push(SocketMessage::Text(msg));
};
let add_received_binary = move |msg: web::TypedArray<u8>| {
let mut received = received2.borrow_mut();
received.push(SocketMessage::Binary(msg.to_vec()));
};
let set_disconnected = move || {
let mut disconnected = disconnected.borrow_mut();
*disconnected = true;
};
let set_error = move || {
let mut error = error.borrow_mut();
*error = true;
};
let js_obj = js! {
var socket = new WebSocket(@{url});
var get_queued = @{get_queued};
var add_received_text = @{add_received_text};
var add_received_binary = @{add_received_binary};
var set_disconnected = @{set_disconnected};
var set_error = @{set_error};
if (socket) {
socket.binaryType = "arraybuffer";
socket.onopen = function(e) {
var queued = get_queued();
for (var i = 0; i < queued.length; i++) {
socket.send(queued[i]);
}
get_queued.drop();
};
socket.onerror = function(e) {
console.log("Socket error: " + e);
set_error();
};
socket.onclose = function(e) {
console.log("Socket closed");
set_disconnected();
};
socket.onmessage = function(m) {
if (m.data instanceof ArrayBuffer) {
add_received_binary(new Uint8Array(m.data));
} else {
add_received_text(m.data);
}
};
return socket;
} else {
console.log("Unable to create socket.");
return null;
}
};
if js_obj == stdweb::Value::Null {
Err(Box::new(SimpleError::new(
"Unable to create js_obj for socket",
)))
} else {
Ok(Socket { js_obj, state })
}
}
pub fn send(&mut self, data: String) -> Result<(), Box<dyn Error>> {
let queued = self.state.queued.clone();
let ready = match js! {
var socket = @{&self.js_obj};
if (socket.readyState == 2 || socket.readyState == 3) {
console.log("Error: socket already closed!");
}
return socket.readyState === 1;
} {
Value::Bool(bool) => bool,
_ => panic!("invalid type"),
};
if ready {
js! {
var data = @{data};
var socket = @{&self.js_obj};
socket.send(data);
};
} else {
let mut queued = queued.borrow_mut();
queued.push(SocketMessage::Text(data));
}
Ok(())
}
pub fn send_binary(&mut self, data: Vec<u8>) -> Result<(), Box<dyn Error>> {
let queued = self.state.queued.clone();
let ready = match js! {
var socket = @{&self.js_obj};
if (socket.readyState == 2 || socket.readyState == 3) {
console.log("Error: socket already closed!");
}
return socket.readyState === 1;
} {
Value::Bool(bool) => bool,
_ => panic!("invalid type"),
};
if ready {
let typed_array = web::TypedArray::from(&data[..]);
let data = typed_array.buffer();
js! {
var data = @{data};
var socket = @{&self.js_obj};
socket.send(data);
};
} else {
let mut queued = queued.borrow_mut();
queued.push(SocketMessage::Binary(data));
}
Ok(())
}
pub fn recv_all(&mut self) -> Result<Vec<SocketMessage>, Box<dyn Error>> {
let disconnected = self.state.disconnected.borrow();
let error = self.state.error.borrow();
if *disconnected || *error {
Err(Box::new(SimpleError::new(
"Socket disconnected or there's been an error",
)))
} else {
let mut received = self.state.received.borrow_mut();
let res = received.drain(..).collect();
Ok(res)
}
}
}
#[cfg(target_arch = "wasm32")]
impl Drop for Socket {
fn drop(&mut self) {
js! {
var socket = @{&self.js_obj};
socket.close();
}
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct Socket {
client: Client<Box<dyn NetworkStream + Send>>,
not_send: PhantomData<*const ()>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Socket {
pub fn new(url: String) -> Result<Socket, Box<dyn Error>> {
let client = ClientBuilder::new(&url)?.connect(None)?;
client.stream_ref().as_tcp().set_nodelay(true)?;
client.stream_ref().as_tcp().set_nonblocking(true)?;
Ok(Socket {
client,
not_send: PhantomData,
})
}
pub fn send(&mut self, data: String) -> Result<(), Box<dyn Error>> {
self.client
.send_message(&message::OwnedMessage::Text(data))?;
Ok(())
}
pub fn send_binary(&mut self, data: Vec<u8>) -> Result<(), Box<dyn Error>> {
self.client
.send_message(&message::OwnedMessage::Binary(data))?;
Ok(())
}
pub fn recv_all(&mut self) -> Result<Vec<SocketMessage>, Box<dyn Error>> {
let mut res = vec![];
loop {
match self.client.recv_message() {
Ok(message) => {
match message {
message::OwnedMessage::Text(msg) => res.push(SocketMessage::Text(msg)),
message::OwnedMessage::Binary(msg) => res.push(SocketMessage::Binary(msg)),
message::OwnedMessage::Ping(data) => {
self.client
.send_message(&message::OwnedMessage::Pong(data))
.unwrap();
}
message::OwnedMessage::Close(_) => {
return Err(Box::new(SimpleError::new(
"Socket disconnected or there's been an error",
)));
}
other => panic!("Unsupported message type: {:?}", other),
};
}
Err(err) => match err {
WebSocketError::IoError(ref err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
_ => {
return Err(Box::new(SimpleError::new(
"Socket disconnected or there's been an error",
)));
}
},
}
}
Ok(res)
}
}