dioxus_websocket_hooks/
lib.rs

1use std::{rc::Rc, sync::Arc, time::Duration};
2
3use async_std::sync::RwLock;
4use dioxus::prelude::*;
5use futures::{
6    stream::{SplitSink, SplitStream},
7    SinkExt, StreamExt,
8};
9use reqwasm::websocket::{futures::WebSocket, Message};
10use serde::{Deserialize, Serialize};
11use wasm_bindgen::JsValue;
12use wasm_bindgen_futures::spawn_local;
13
14pub struct DioxusWs {
15    url: String,
16    sender: Arc<RwLock<SplitSink<WebSocket, Message>>>,
17    receiver: Arc<RwLock<SplitStream<WebSocket>>>,
18    is_open: Arc<RwLock<bool>>,
19}
20
21impl DioxusWs {
22    pub fn new(url: &str) -> DioxusWs {
23        let ws = WebSocket::open(url).unwrap();
24
25        let (sender, receiver) = ws.split();
26        let sender = Arc::new(RwLock::new(sender));
27        let receiver = Arc::new(RwLock::new(receiver));
28
29        DioxusWs {
30            url: url.to_string(),
31            sender,
32            receiver,
33            is_open: Arc::new(RwLock::new(false)),
34        }
35    }
36
37    /// Sends a reqwasm Message
38    pub fn send(&self, msg: Message) {
39        let sender = self.sender.clone();
40        let is_open = self.is_open.clone();
41
42        spawn_local(async move {
43            let is_open = *is_open.read().await;
44
45            if is_open {
46                let mut sender = sender.write().await;
47                sender.send(msg).await.ok();
48            }
49        });
50    }
51
52    pub fn set_open(&self, open: bool) {
53        let is_open = self.is_open.clone();
54        let sender = self.sender.clone();
55
56        spawn_local(async move {
57            let mut is_open = is_open.write().await;
58            *is_open = open;
59
60            let mut sender = sender.write().await;
61            sender.close().await.ok();
62        });
63    }
64
65    /// Sends a plaintext string
66    pub fn send_text(&self, text: String) {
67        let msg = Message::Text(text);
68        self.send(msg);
69    }
70
71    /// Sends data that implements Serialize as JSON
72    pub fn send_json<T: Serialize>(&self, value: &T) {
73        let json = serde_json::to_string(value).unwrap();
74        let msg = Message::Text(json);
75        self.send(msg);
76    }
77
78    pub async fn reconnect(&self) {
79        let ws = WebSocket::open(&self.url).unwrap();
80
81        let (sender, receiver) = ws.split();
82
83        {
84            let mut self_sender = self.sender.write().await;
85            *self_sender = sender;
86        }
87
88        {
89            let mut self_receiver = self.receiver.write().await;
90            *self_receiver = receiver;
91        }
92    }
93}
94
95fn log_err(s: &str) {
96    web_sys::console::error_1(&JsValue::from_str(s));
97}
98
99/// Provide websocket context with a handler for incoming reqwasm Messages
100pub fn use_ws_context_provider(cx: &ScopeState, url: &str, handler: impl Fn(Message) + 'static) {
101    let handler = Rc::new(handler);
102
103    cx.use_hook(|_| {
104        let ws = cx.provide_context(DioxusWs::new(url));
105        let receiver = ws.receiver.clone();
106
107        cx.push_future(async move {
108            loop {
109                let mut err = None;
110
111                {
112                    let mut receiver = receiver.write().await;
113                    while let Some(msg) = receiver.next().await {
114                        match msg {
115                            Ok(msg) => {
116                                ws.set_open(true);
117                                handler(msg)
118                            },
119                            Err(e) => {
120                                err = Some(e);
121                            }
122                        }
123                    }
124                }
125
126                if let Some(err) = err {
127                    ws.set_open(false);
128
129                    log_err(&format!(
130                        "Error while trying to receive message over websocket, reconnecting in 1s...\n{:?}", err
131                    ));
132
133                    async_std::task::sleep(Duration::from_millis(1000)).await;
134
135                    ws.reconnect().await;
136                }
137            }
138        })
139    });
140}
141
142/// Provide websocket context with a handler for incoming plaintext messages
143pub fn use_ws_context_provider_text(
144    cx: &ScopeState,
145    url: &str,
146    handler: impl Fn(String) + 'static,
147) {
148    let handler = move |msg| {
149        if let Message::Text(text) = msg {
150            handler(text)
151        }
152    };
153
154    use_ws_context_provider(cx, url, handler)
155}
156
157/// Provide websocket context with a handler for incoming JSON messages.
158/// Note that the message type T must implement Deserialize.
159pub fn use_ws_context_provider_json<T>(cx: &ScopeState, url: &str, handler: impl Fn(T) + 'static)
160where
161    T: for<'de> Deserialize<'de>,
162{
163    let handler = move |msg| match msg {
164        Message::Text(text) => {
165            let json = serde_json::from_str::<T>(&text);
166
167            match json {
168                Ok(json) => handler(json),
169                Err(e) => log_err(&format!(
170                    "Error while deserializing websocket response: {}",
171                    e
172                )),
173            }
174        }
175        Message::Bytes(_) => {}
176    };
177
178    use_ws_context_provider(cx, url, handler)
179}
180
181/// Consumes WebSocket context. Useful for sending messages over the WebSocket
182/// connection.
183///
184/// NOTE: Currently the server is expected to send a message when the connection
185/// opens. You will not be able to send websocket messages from the client
186/// before a message has been received from the server. This is a limitation
187/// in the current reconnection logic.
188pub fn use_ws_context(cx: &ScopeState) -> Rc<DioxusWs> {
189    cx.consume_context::<DioxusWs>().unwrap()
190}