dioxus_websocket_hooks/
lib.rs1use std::{rc::Rc, sync::Arc, time::Duration};
2
3use async_std::sync::RwLock;
4use dioxus::prelude::*;
5use futures::{
6 stream::{SplitSink, SplitStream},
7 SinkExt, StreamExt,
8};
9use reqwasm::websocket::{futures::WebSocket, Message};
10use serde::{Deserialize, Serialize};
11use wasm_bindgen::JsValue;
12use wasm_bindgen_futures::spawn_local;
13
14pub struct DioxusWs {
15 url: String,
16 sender: Arc<RwLock<SplitSink<WebSocket, Message>>>,
17 receiver: Arc<RwLock<SplitStream<WebSocket>>>,
18 is_open: Arc<RwLock<bool>>,
19}
20
21impl DioxusWs {
22 pub fn new(url: &str) -> DioxusWs {
23 let ws = WebSocket::open(url).unwrap();
24
25 let (sender, receiver) = ws.split();
26 let sender = Arc::new(RwLock::new(sender));
27 let receiver = Arc::new(RwLock::new(receiver));
28
29 DioxusWs {
30 url: url.to_string(),
31 sender,
32 receiver,
33 is_open: Arc::new(RwLock::new(false)),
34 }
35 }
36
37 pub fn send(&self, msg: Message) {
39 let sender = self.sender.clone();
40 let is_open = self.is_open.clone();
41
42 spawn_local(async move {
43 let is_open = *is_open.read().await;
44
45 if is_open {
46 let mut sender = sender.write().await;
47 sender.send(msg).await.ok();
48 }
49 });
50 }
51
52 pub fn set_open(&self, open: bool) {
53 let is_open = self.is_open.clone();
54 let sender = self.sender.clone();
55
56 spawn_local(async move {
57 let mut is_open = is_open.write().await;
58 *is_open = open;
59
60 let mut sender = sender.write().await;
61 sender.close().await.ok();
62 });
63 }
64
65 pub fn send_text(&self, text: String) {
67 let msg = Message::Text(text);
68 self.send(msg);
69 }
70
71 pub fn send_json<T: Serialize>(&self, value: &T) {
73 let json = serde_json::to_string(value).unwrap();
74 let msg = Message::Text(json);
75 self.send(msg);
76 }
77
78 pub async fn reconnect(&self) {
79 let ws = WebSocket::open(&self.url).unwrap();
80
81 let (sender, receiver) = ws.split();
82
83 {
84 let mut self_sender = self.sender.write().await;
85 *self_sender = sender;
86 }
87
88 {
89 let mut self_receiver = self.receiver.write().await;
90 *self_receiver = receiver;
91 }
92 }
93}
94
95fn log_err(s: &str) {
96 web_sys::console::error_1(&JsValue::from_str(s));
97}
98
99pub fn use_ws_context_provider(cx: &ScopeState, url: &str, handler: impl Fn(Message) + 'static) {
101 let handler = Rc::new(handler);
102
103 cx.use_hook(|_| {
104 let ws = cx.provide_context(DioxusWs::new(url));
105 let receiver = ws.receiver.clone();
106
107 cx.push_future(async move {
108 loop {
109 let mut err = None;
110
111 {
112 let mut receiver = receiver.write().await;
113 while let Some(msg) = receiver.next().await {
114 match msg {
115 Ok(msg) => {
116 ws.set_open(true);
117 handler(msg)
118 },
119 Err(e) => {
120 err = Some(e);
121 }
122 }
123 }
124 }
125
126 if let Some(err) = err {
127 ws.set_open(false);
128
129 log_err(&format!(
130 "Error while trying to receive message over websocket, reconnecting in 1s...\n{:?}", err
131 ));
132
133 async_std::task::sleep(Duration::from_millis(1000)).await;
134
135 ws.reconnect().await;
136 }
137 }
138 })
139 });
140}
141
142pub fn use_ws_context_provider_text(
144 cx: &ScopeState,
145 url: &str,
146 handler: impl Fn(String) + 'static,
147) {
148 let handler = move |msg| {
149 if let Message::Text(text) = msg {
150 handler(text)
151 }
152 };
153
154 use_ws_context_provider(cx, url, handler)
155}
156
157pub fn use_ws_context_provider_json<T>(cx: &ScopeState, url: &str, handler: impl Fn(T) + 'static)
160where
161 T: for<'de> Deserialize<'de>,
162{
163 let handler = move |msg| match msg {
164 Message::Text(text) => {
165 let json = serde_json::from_str::<T>(&text);
166
167 match json {
168 Ok(json) => handler(json),
169 Err(e) => log_err(&format!(
170 "Error while deserializing websocket response: {}",
171 e
172 )),
173 }
174 }
175 Message::Bytes(_) => {}
176 };
177
178 use_ws_context_provider(cx, url, handler)
179}
180
181pub fn use_ws_context(cx: &ScopeState) -> Rc<DioxusWs> {
189 cx.consume_context::<DioxusWs>().unwrap()
190}