flnet_wasm/
web_socket_client.rs1use std::sync::{Arc};
2use futures::lock::Mutex;
3
4use async_trait::async_trait;
5use flnet::websocket::{WSClientInput, WSClientOutput, WSClientMessage};
6use flmodules::broker::{Broker, Destination, Subsystem, SubsystemListener};
7use thiserror::Error;
8use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
9use web_sys::{ErrorEvent, MessageEvent, WebSocket};
10
11pub struct WebSocketClient {
12 ws: Arc<Mutex<WebSocket>>,
13}
14
15unsafe impl Send for WebSocketClient {}
16
17#[derive(Error, Debug)]
18pub enum WSClientError {
19 #[error("While connecting {0}")]
20 Connection(String),
21 #[error(transparent)]
22 Broker(#[from] flmodules::broker::BrokerError),
23}
24
25impl WebSocketClient {
26 pub async fn connect(url: &str) -> Result<Broker<WSClientMessage>, WSClientError> {
27 log::info!("connecting to: {}", url);
28 let ws = WebSocket::new(url).map_err(|e| WSClientError::Connection(format!("{:?}", e)))?;
29 let mut wsw = WebSocketClient {
30 ws: Arc::new(Mutex::new(ws)),
31 };
32 let mut broker = wsw.attach_callbacks().await;
33 broker
34 .add_subsystem(Subsystem::Handler(Box::new(wsw)))
35 .await?;
36 Ok(broker)
37 }
38
39 async fn attach_callbacks(&mut self) -> Broker<WSClientMessage> {
40 let broker = Broker::new();
41 let ws = self.ws.lock().await;
42
43 let broker_clone = broker.clone();
45 let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
46 if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
47 let txt_str = txt.as_string().unwrap();
48 broker_clone.emit_msg_wasm(WSClientOutput::Message(txt_str).into());
49 } else {
50 log::warn!("message event, received Unknown: {:?}", e);
51 }
52 }) as Box<dyn FnMut(MessageEvent)>);
53 ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
55 onmessage_callback.forget();
57
58 let broker_clone = broker.clone();
59 let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| {
60 log::error!("error event: {:?}", e);
61 broker_clone.emit_msg_wasm(WSClientMessage::Output(WSClientOutput::Error(
62 e.as_string().unwrap_or("not an error string".into()),
63 )));
64 }) as Box<dyn FnMut(ErrorEvent)>);
65 ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
66 onerror_callback.forget();
67
68 let broker_clone = broker.clone();
69 let onopen_callback = Closure::wrap(Box::new(move |_| {
70 broker_clone.emit_msg_wasm(WSClientMessage::Output(WSClientOutput::Connected));
71 }) as Box<dyn FnMut(JsValue)>);
72 ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
73 onopen_callback.forget();
74
75 broker
76 }
77}
78
79#[async_trait(?Send)]
80impl SubsystemListener<WSClientMessage> for WebSocketClient {
81 async fn messages(
82 &mut self,
83 msgs: Vec<WSClientMessage>,
84 ) -> Vec<(Destination, WSClientMessage)> {
85 if let Some(ws) = self.ws.try_lock() {
86 for msg in msgs {
87 if let WSClientMessage::Input(msg_in) = msg {
88 match msg_in {
89 WSClientInput::Message(msg) => {
90 if ws.ready_state() != WebSocket::OPEN {
91 log::error!("WebSocket is not open");
92 return vec![];
93 }
94 ws.send_with_str(&msg)
95 .err()
96 .map(|e| log::error!("Error sending message: {:?}", e));
97 }
98 WSClientInput::Disconnect => {
99 }
101 }
102 }
103 }
104 }
105 vec![]
106 }
107}