use crate::Error;
use lwk_common::Stream;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::js_sys;
use web_sys::{BinaryType, Event, MessageEvent, WebSocket};
pub struct WebSocketSerial {
websocket: WebSocket,
receive_buffer: Arc<Mutex<VecDeque<u8>>>,
text_mode: bool,
}
impl WebSocketSerial {
pub async fn new(url: &str) -> Result<Self, Error> {
Self::new_with_protocol(url, None, false).await
}
pub async fn new_binary(url: &str) -> Result<Self, Error> {
Self::new_with_protocol(url, None, false).await
}
pub async fn new_text(url: &str) -> Result<Self, Error> {
Self::new_with_protocol(url, None, true).await
}
pub async fn new_with_protocol(
url: &str,
protocol: Option<&str>,
text_mode: bool,
) -> Result<Self, Error> {
let websocket = if let Some(protocol) = protocol {
let protocol_array = js_sys::Array::new();
protocol_array.push(&JsValue::from_str(protocol));
WebSocket::new_with_str_sequence(url, &protocol_array).map_err(Error::JsVal)?
} else {
WebSocket::new(url).map_err(Error::JsVal)?
};
websocket.set_binary_type(BinaryType::Arraybuffer);
let receive_buffer = Arc::new(Mutex::new(VecDeque::new()));
let buffer_clone = receive_buffer.clone();
let onmessage_callback = if text_mode {
Closure::wrap(Box::new(move |e: MessageEvent| {
if let Ok(text) = e.data().dyn_into::<js_sys::JsString>() {
let text_str: String = text.into();
let data = text_str.as_bytes();
if let Ok(mut buffer) = buffer_clone.lock() {
buffer.extend(data);
}
}
}) as Box<dyn FnMut(_)>)
} else {
Closure::wrap(Box::new(move |e: MessageEvent| {
if let Ok(array_buffer) = e.data().dyn_into::<web_sys::js_sys::ArrayBuffer>() {
let uint8_array = web_sys::js_sys::Uint8Array::new(&array_buffer);
let data = uint8_array.to_vec();
if let Ok(mut buffer) = buffer_clone.lock() {
buffer.extend(data);
}
}
}) as Box<dyn FnMut(_)>)
};
websocket.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();
let connection_promise = web_sys::js_sys::Promise::new(&mut |resolve, _reject| {
let onopen_callback = Closure::wrap(Box::new(move |_event: Event| {
resolve.call0(&JsValue::NULL).unwrap();
}) as Box<dyn FnMut(Event)>);
websocket.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
});
wasm_bindgen_futures::JsFuture::from(connection_promise)
.await
.map_err(Error::JsVal)?;
Ok(Self {
websocket,
receive_buffer,
text_mode,
})
}
pub async fn new_wamp(url: &str) -> Result<Self, Error> {
Self::new_with_protocol(url, Some("wamp.2.json"), true).await
}
pub async fn send_text(&self, text: &str) -> Result<(), lwk_jade::Error> {
self.websocket
.send_with_str(text)
.map_err(|e| lwk_jade::Error::Generic(format!("WebSocket send error: {:?}", e)))?;
Ok(())
}
pub fn websocket(&self) -> &WebSocket {
&self.websocket
}
}
impl Stream for WebSocketSerial {
type Error = lwk_jade::Error;
async fn read(&self, buf: &mut [u8]) -> Result<usize, lwk_jade::Error> {
loop {
{
let mut buffer = self
.receive_buffer
.lock()
.map_err(|e| lwk_jade::Error::Generic(format!("Mutex error: {}", e)))?;
if !buffer.is_empty() {
let read_len = std::cmp::min(buf.len(), buffer.len());
for i in 0..read_len {
buf[i] = buffer.pop_front().unwrap();
}
return Ok(read_len);
}
}
let promise = web_sys::js_sys::Promise::new(&mut |resolve, _reject| {
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 10)
.unwrap();
});
wasm_bindgen_futures::JsFuture::from(promise)
.await
.map_err(|e| lwk_jade::Error::Generic(format!("Timeout error: {:?}", e)))?;
}
}
async fn write(&self, buf: &[u8]) -> Result<(), lwk_jade::Error> {
if self.text_mode {
let text = std::str::from_utf8(buf)
.map_err(|e| lwk_jade::Error::Generic(format!("Invalid UTF-8: {}", e)))?;
self.websocket
.send_with_str(text)
.map_err(|e| lwk_jade::Error::Generic(format!("WebSocket send error: {:?}", e)))?;
} else {
let uint8_array = web_sys::js_sys::Uint8Array::new_with_length(buf.len() as u32);
uint8_array.copy_from(buf);
self.websocket
.send_with_array_buffer(&uint8_array.buffer())
.map_err(|e| lwk_jade::Error::Generic(format!("WebSocket send error: {:?}", e)))?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);
#[wasm_bindgen_test]
async fn test_blockstream_green_websocket_connection() {
console_error_panic_hook::set_once();
web_sys::console::log_1(&"Connecting to WebSocket...".into());
let ws_serial =
WebSocketSerial::new_wamp("wss://green-liquid-mainnet.blockstream.com/v2/ws/")
.await
.expect("Failed to connect to WebSocket");
web_sys::console::log_1(&"Connected! Sending HELLO message...".into());
let hello_message = r#"[1, "realm1", {"roles": {"caller": {"features": {}}}}]"#;
ws_serial
.send_text(hello_message)
.await
.expect("Failed to send HELLO message");
web_sys::console::log_1(&"HELLO message sent, waiting for response...".into());
let mut response_buffer = vec![0u8; 4096];
let mut attempts = 0;
let max_attempts = 100;
let bytes_read = loop {
match ws_serial.read(&mut response_buffer).await {
Ok(bytes) if bytes > 0 => break bytes,
Ok(_) => {
attempts += 1;
if attempts >= max_attempts {
panic!("Timeout waiting for WELCOME message");
}
}
Err(e) => panic!("Failed to read WELCOME message: {:?}", e),
}
};
web_sys::console::log_1(&format!("Received {} bytes", bytes_read).into());
let response_str = String::from_utf8_lossy(&response_buffer[..bytes_read]);
web_sys::console::log_1(&format!("Received response: {}", response_str).into());
let response_json: serde_json::Value =
serde_json::from_str(&response_str).expect("Failed to parse response as JSON");
if let serde_json::Value::Array(ref arr) = response_json {
assert!(
arr.len() >= 3,
"WELCOME message should have at least 3 elements"
);
assert_eq!(
arr[0], 2,
"First element should be 2 (WELCOME message type)"
);
assert!(
arr[1].is_number(),
"Second element should be session ID (number)"
);
assert!(arr[2].is_object(), "Third element should be details object");
let details = &arr[2];
assert!(details["realm"].is_string(), "Details should contain realm");
assert!(
details["authid"].is_string(),
"Details should contain authid"
);
assert!(
details["authrole"].is_string(),
"Details should contain authrole"
);
assert!(details["roles"].is_object(), "Details should contain roles");
} else {
panic!("Response should be a JSON array");
}
web_sys::console::log_1(&"WebSocket connection test completed successfully!".into());
}
}