flnet_wasm/
web_socket_client.rs

1use std::sync::{Arc};
2use futures::lock::Mutex;
3
4use async_trait::async_trait;
5use flnet::websocket::{WSClientInput, WSClientOutput, WSClientMessage};
6use flmodules::broker::{Broker, Destination, Subsystem, SubsystemListener};
7use thiserror::Error;
8use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
9use web_sys::{ErrorEvent, MessageEvent, WebSocket};
10
11pub struct WebSocketClient {
12    ws: Arc<Mutex<WebSocket>>,
13}
14
15unsafe impl Send for WebSocketClient {}
16
17#[derive(Error, Debug)]
18pub enum WSClientError {
19    #[error("While connecting {0}")]
20    Connection(String),
21    #[error(transparent)]
22    Broker(#[from] flmodules::broker::BrokerError),
23}
24
25impl WebSocketClient {
26    pub async fn connect(url: &str) -> Result<Broker<WSClientMessage>, WSClientError> {
27        log::info!("connecting to: {}", url);
28        let ws = WebSocket::new(url).map_err(|e| WSClientError::Connection(format!("{:?}", e)))?;
29        let mut wsw = WebSocketClient {
30            ws: Arc::new(Mutex::new(ws)),
31        };
32        let mut broker = wsw.attach_callbacks().await;
33        broker
34            .add_subsystem(Subsystem::Handler(Box::new(wsw)))
35            .await?;
36        Ok(broker)
37    }
38
39    async fn attach_callbacks(&mut self) -> Broker<WSClientMessage> {
40        let broker = Broker::new();
41        let ws = self.ws.lock().await;
42
43        // create callback
44        let broker_clone = broker.clone();
45        let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
46            if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
47                let txt_str = txt.as_string().unwrap();
48                broker_clone.emit_msg_wasm(WSClientOutput::Message(txt_str).into());
49            } else {
50                log::warn!("message event, received Unknown: {:?}", e);
51            }
52        }) as Box<dyn FnMut(MessageEvent)>);
53        // set message event handler on WebSocket
54        ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
55        // forget the callback to keep it alive
56        onmessage_callback.forget();
57
58        let broker_clone = broker.clone();
59        let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| {
60            log::error!("error event: {:?}", e);
61            broker_clone.emit_msg_wasm(WSClientMessage::Output(WSClientOutput::Error(
62                e.as_string().unwrap_or("not an error string".into()),
63            )));
64        }) as Box<dyn FnMut(ErrorEvent)>);
65        ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
66        onerror_callback.forget();
67
68        let broker_clone = broker.clone();
69        let onopen_callback = Closure::wrap(Box::new(move |_| {
70            broker_clone.emit_msg_wasm(WSClientMessage::Output(WSClientOutput::Connected));
71        }) as Box<dyn FnMut(JsValue)>);
72        ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
73        onopen_callback.forget();
74
75        broker
76    }
77}
78
79#[async_trait(?Send)]
80impl SubsystemListener<WSClientMessage> for WebSocketClient {
81    async fn messages(
82        &mut self,
83        msgs: Vec<WSClientMessage>,
84    ) -> Vec<(Destination, WSClientMessage)> {
85        if let Some(ws) = self.ws.try_lock() {
86            for msg in msgs {
87                if let WSClientMessage::Input(msg_in) = msg {
88                    match msg_in {
89                        WSClientInput::Message(msg) => {
90                            if ws.ready_state() != WebSocket::OPEN {
91                                log::error!("WebSocket is not open");
92                                return vec![];
93                            }
94                            ws.send_with_str(&msg)
95                                .err()
96                                .map(|e| log::error!("Error sending message: {:?}", e));
97                        }
98                        WSClientInput::Disconnect => {
99                            // ws.disconnect();
100                        }
101                    }
102                }
103            }
104        }
105        vec![]
106    }
107}