use std::sync::Arc;
use tokio::sync::mpsc;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use crate::error::RealtimeError;
use crate::types::RealtimeConfig;
pub(crate) struct WsSink {
ws: web_sys::WebSocket,
}
pub(crate) struct WsRead {
rx: mpsc::UnboundedReceiver<Result<WsMessage, RealtimeError>>,
_closures: Arc<WsClosures>,
}
struct WsClosures {
_on_message: Closure<dyn FnMut(web_sys::MessageEvent)>,
_on_error: Closure<dyn FnMut(web_sys::ErrorEvent)>,
_on_close: Closure<dyn FnMut(web_sys::CloseEvent)>,
}
pub(crate) enum WsMessage {
Text(String),
Close,
#[allow(dead_code)]
Ping(Vec<u8>),
}
pub(crate) async fn connect_ws(
_config: &RealtimeConfig,
ws_url: &str,
) -> Result<(WsSink, WsRead), RealtimeError> {
let ws = web_sys::WebSocket::new(ws_url)
.map_err(|e| RealtimeError::WebSocket(format!("Failed to create WebSocket: {:?}", e)))?;
ws.set_binary_type(web_sys::BinaryType::Arraybuffer);
let (tx, rx) = mpsc::unbounded_channel();
let tx_msg = tx.clone();
let on_message = Closure::<dyn FnMut(_)>::new(move |event: web_sys::MessageEvent| {
if let Ok(text) = event.data().dyn_into::<js_sys::JsString>() {
let _ = tx_msg.send(Ok(WsMessage::Text(String::from(text))));
}
});
ws.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
let tx_err = tx.clone();
let on_error = Closure::<dyn FnMut(_)>::new(move |_event: web_sys::ErrorEvent| {
let _ = tx_err.send(Err(RealtimeError::WebSocket("WebSocket error".to_string())));
});
ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
let tx_close = tx;
let on_close = Closure::<dyn FnMut(_)>::new(move |_event: web_sys::CloseEvent| {
let _ = tx_close.send(Ok(WsMessage::Close));
});
ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
let (open_tx, open_rx) = tokio::sync::oneshot::channel::<Result<(), RealtimeError>>();
let on_open = Closure::once(move || {
let _ = open_tx.send(Ok(()));
});
ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
let open_result = supabase_client_core::platform::timeout(
std::time::Duration::from_secs(10),
async {
open_rx.await.unwrap_or(Err(RealtimeError::ConnectionClosed))
},
).await;
match open_result {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(_) => return Err(RealtimeError::WebSocket("WebSocket connection timed out".to_string())),
}
let closures = Arc::new(WsClosures {
_on_message: on_message,
_on_error: on_error,
_on_close: on_close,
});
Ok((
WsSink { ws },
WsRead { rx, _closures: closures },
))
}
pub(crate) async fn send_text(sink: &mut WsSink, text: String) -> Result<(), RealtimeError> {
sink.ws
.send_with_str(&text)
.map_err(|e| RealtimeError::WebSocket(format!("Send failed: {:?}", e)))?;
Ok(())
}
pub(crate) async fn send_close(sink: &mut WsSink) -> Result<(), RealtimeError> {
let _ = sink.ws.close();
Ok(())
}
pub(crate) async fn recv_message(read: &mut WsRead) -> Option<Result<WsMessage, RealtimeError>> {
read.rx.recv().await
}
pub(crate) async fn send_pong(_sink: &mut WsSink, _data: Vec<u8>) -> Result<(), RealtimeError> {
Ok(())
}