yew_hooks/hooks/
use_websocket.rs1use std::{cell::RefCell, rc::Rc};
2
3use gloo::timers::callback::Timeout;
4use js_sys::Array;
5use wasm_bindgen::{prelude::*, JsCast, JsValue};
6use web_sys::{BinaryType, MessageEvent, WebSocket};
7use yew::prelude::*;
8
9use super::{use_mut_latest, use_state_ptr_eq, use_unmount, UseStatePtrEqHandle};
10
11pub use web_sys::CloseEvent;
12
13#[derive(Debug, PartialEq, Eq, Clone)]
15pub enum UseWebSocketReadyState {
16 Connecting,
17 Open,
18 Closing,
19 Closed,
20}
21
22#[derive(Default)]
24pub struct UseWebSocketOptions {
25 pub onopen: Option<Box<dyn FnMut(Event)>>,
27 pub onmessage: Option<Box<dyn FnMut(String)>>,
29 pub onmessage_bytes: Option<Box<dyn FnMut(Vec<u8>)>>,
31 pub onerror: Option<Box<dyn FnMut(Event)>>,
33 pub onclose: Option<Box<dyn FnMut(CloseEvent)>>,
35
36 pub reconnect_limit: Option<u32>,
38 pub reconnect_interval: Option<u32>,
40 pub manual: Option<bool>,
42 pub protocols: Option<Vec<String>>,
44}
45
46pub struct UseWebSocketHandle {
48 pub ready_state: UseStateHandle<UseWebSocketReadyState>,
50 pub message: UseStatePtrEqHandle<Option<String>>,
52 pub message_bytes: UseStatePtrEqHandle<Option<Vec<u8>>>,
54 pub ws: Rc<RefCell<Option<WebSocket>>>,
56
57 open: Rc<dyn Fn()>,
58 close: Rc<dyn Fn()>,
59 send: Rc<dyn Fn(String)>,
60 send_bytes: Rc<dyn Fn(Vec<u8>)>,
61}
62
63impl UseWebSocketHandle {
64 pub fn open(&self) {
66 (self.open)();
67 }
68
69 pub fn close(&self) {
71 (self.close)();
72 }
73
74 pub fn send(&self, data: String) {
76 (self.send)(data);
77 }
78
79 pub fn send_bytes(&self, data: Vec<u8>) {
81 (self.send_bytes)(data);
82 }
83}
84
85impl Clone for UseWebSocketHandle {
86 fn clone(&self) -> Self {
87 Self {
88 ready_state: self.ready_state.clone(),
89 message: self.message.clone(),
90 message_bytes: self.message_bytes.clone(),
91 ws: self.ws.clone(),
92
93 open: self.open.clone(),
94 close: self.close.clone(),
95 send: self.send.clone(),
96 send_bytes: self.send_bytes.clone(),
97 }
98 }
99}
100
101#[hook]
159pub fn use_websocket(url: String) -> UseWebSocketHandle {
160 use_websocket_with_options(url, UseWebSocketOptions::default())
161}
162
163#[hook]
227pub fn use_websocket_with_options(url: String, options: UseWebSocketOptions) -> UseWebSocketHandle {
228 let ready_state = use_state(|| UseWebSocketReadyState::Closed);
229 let message = use_state_ptr_eq(|| None);
230 let message_bytes = use_state_ptr_eq(|| None);
231 let ws = use_mut_ref(|| None);
232
233 let onopen_ref = use_mut_latest(options.onopen);
234 let onmessage_ref = use_mut_latest(options.onmessage);
235 let onmessage_bytes_ref = use_mut_latest(options.onmessage_bytes);
236 let onerror_ref = use_mut_latest(options.onerror);
237 let onclose_ref = use_mut_latest(options.onclose);
238 let reconnect_limit = options.reconnect_limit.unwrap_or(u32::MAX);
239 let reconnect_interval = options.reconnect_interval.unwrap_or(3 * 1000);
240 let manual = options.manual.unwrap_or(false);
241 let protocols = options.protocols;
242
243 let reconnect_times_ref = use_mut_ref(|| 0);
244 let reconnect_timer_ref = use_mut_ref(|| None);
245 let unmounted_ref = use_mut_ref(|| false);
246
247 let reconnect = use_mut_ref(|| None);
248 let connect_ws = use_mut_ref(|| None);
249
250 *reconnect.borrow_mut() = {
251 let ws = ws.clone();
252 let reconnect_times_ref = reconnect_times_ref.clone();
253 let reconnect_timer_ref = reconnect_timer_ref.clone();
254 let connect_ws = connect_ws.clone();
255 Some(Rc::new(move || {
256 if *reconnect_times_ref.borrow() < reconnect_limit
257 && ws
258 .borrow()
259 .as_ref()
260 .is_some_and(|ws: &WebSocket| ws.ready_state() != WebSocket::OPEN)
261 {
262 let connect_ws = connect_ws.clone();
263 let reconnect_times_ref = reconnect_times_ref.clone();
264 *reconnect_timer_ref.borrow_mut() =
265 Some(Timeout::new(reconnect_interval, move || {
266 let connect_ws = {
267 let connect_ws = connect_ws.borrow();
268 let connect_ws: &Rc<dyn Fn()> = connect_ws.as_ref().unwrap();
269 connect_ws.clone()
270 };
271 connect_ws();
272 *reconnect_times_ref.borrow_mut() += 1;
273 }));
274 }
275 }) as Rc<dyn Fn()>)
276 };
277
278 *connect_ws.borrow_mut() = {
279 let ws = ws.clone();
280 let ready_state = ready_state.clone();
281 let message = message.clone();
282 let message_bytes = message_bytes.clone();
283 let url = url.clone();
284 let reconnect = reconnect.clone();
285 let unmounted_ref = unmounted_ref.clone();
286 let onopen_ref = onopen_ref.clone();
287 let onmessage_ref = onmessage_ref.clone();
288 let onmessage_bytes_ref = onmessage_bytes_ref.clone();
289 let onerror_ref = onerror_ref.clone();
290 let onclose_ref = onclose_ref.clone();
291 let reconnect_timer_ref = reconnect_timer_ref.clone();
292
293 Some(Rc::new(move || {
294 *reconnect_timer_ref.borrow_mut() = None;
295
296 {
297 let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
298 if let Some(web_socket) = web_socket {
299 let _ = web_socket.close();
300 }
301 }
302
303 let web_socket = {
304 protocols.as_ref().map_or_else(
305 || WebSocket::new(&url).unwrap_throw(),
306 |protocols| {
307 let array = protocols
308 .iter()
309 .map(|p| JsValue::from(p.clone()))
310 .collect::<Array>();
311 WebSocket::new_with_str_sequence(&url, &JsValue::from(&array))
312 .unwrap_throw()
313 },
314 )
315 };
316 web_socket.set_binary_type(BinaryType::Arraybuffer);
317 ready_state.set(UseWebSocketReadyState::Connecting);
318
319 {
320 let unmounted_ref = unmounted_ref.clone();
321 let ready_state = ready_state.clone();
322 let onopen_ref = onopen_ref.clone();
323 let onopen_closure = Closure::wrap(Box::new(move |e: Event| {
324 if *unmounted_ref.borrow() {
325 return;
326 }
327
328 let onopen_ref = onopen_ref.current();
329 let onopen = &mut *onopen_ref.borrow_mut();
330 if let Some(onopen) = onopen {
331 onopen(e);
332 }
333 ready_state.set(UseWebSocketReadyState::Open);
334 }) as Box<dyn FnMut(Event)>);
335 web_socket.set_onopen(Some(onopen_closure.as_ref().unchecked_ref()));
336 onopen_closure.forget();
338 }
339
340 {
341 let unmounted_ref = unmounted_ref.clone();
342 let message_bytes = message_bytes.clone();
343 let message = message.clone();
344 let onmessage_ref = onmessage_ref.clone();
345 let onmessage_bytes_ref = onmessage_bytes_ref.clone();
346 let onmessage_closure = Closure::wrap(Box::new(move |e: MessageEvent| {
347 if *unmounted_ref.borrow() {
348 return;
349 }
350
351 e.data().dyn_into::<js_sys::ArrayBuffer>().map_or_else(
352 |_| {
353 e.data().dyn_into::<js_sys::JsString>().map_or_else(
354 |_| {
355 unreachable!("message event, received Unknown: {:?}", e.data());
356 },
357 |txt| {
358 let txt = String::from(&txt);
359 let onmessage_ref = onmessage_ref.current();
360 let onmessage = &mut *onmessage_ref.borrow_mut();
361 if let Some(onmessage) = onmessage {
362 let txt = txt.clone();
363 onmessage(txt);
364 }
365 message.set(Some(txt));
366 },
367 );
368 },
369 |array_buffer| {
370 let array = js_sys::Uint8Array::new(&array_buffer);
371 let array = array.to_vec();
372 let onmessage_bytes_ref = onmessage_bytes_ref.current();
373 let onmessage_bytes = &mut *onmessage_bytes_ref.borrow_mut();
374 if let Some(onmessage_bytes) = onmessage_bytes {
375 let array = array.clone();
376 onmessage_bytes(array);
377 }
378 message_bytes.set(Some(array));
379 },
380 );
381 })
382 as Box<dyn FnMut(MessageEvent)>);
383 web_socket.set_onmessage(Some(onmessage_closure.as_ref().unchecked_ref()));
384 onmessage_closure.forget();
385 }
386
387 {
388 let unmounted_ref = unmounted_ref.clone();
389 let ready_state = ready_state.clone();
390 let onerror_ref = onerror_ref.clone();
391 let reconnect = reconnect.clone();
392 let onerror_closure = Closure::wrap(Box::new(move |e: Event| {
393 if *unmounted_ref.borrow() {
394 return;
395 }
396
397 let reconnect: Rc<dyn Fn()> = { reconnect.borrow().as_ref().unwrap().clone() };
398 reconnect();
399
400 let onerror_ref = onerror_ref.current();
401 let onerror = &mut *onerror_ref.borrow_mut();
402 if let Some(onerror) = onerror {
403 onerror(e);
404 }
405 ready_state.set(UseWebSocketReadyState::Closed);
406 }) as Box<dyn FnMut(Event)>);
407 web_socket.set_onerror(Some(onerror_closure.as_ref().unchecked_ref()));
408 onerror_closure.forget();
409 }
410
411 {
412 let unmounted_ref = unmounted_ref.clone();
413 let ready_state = ready_state.clone();
414 let onclose_ref = onclose_ref.clone();
415 let reconnect = reconnect.clone();
416 let onclose_closure = Closure::wrap(Box::new(move |e: CloseEvent| {
417 if *unmounted_ref.borrow() {
418 return;
419 }
420
421 let reconnect: Rc<dyn Fn()> = { reconnect.borrow().as_ref().unwrap().clone() };
422 reconnect();
423
424 let onclose_ref = onclose_ref.current();
425 let onclose = &mut *onclose_ref.borrow_mut();
426 if let Some(onclose) = onclose {
427 onclose(e);
428 }
429 ready_state.set(UseWebSocketReadyState::Closed);
430 })
431 as Box<dyn FnMut(CloseEvent)>);
432 web_socket.set_onclose(Some(onclose_closure.as_ref().unchecked_ref()));
433 onclose_closure.forget();
434 }
435
436 *ws.borrow_mut() = Some(web_socket);
437 }) as Rc<dyn Fn()>)
438 };
439
440 let send = {
441 let ready_state = ready_state.clone();
442 let ws = ws.clone();
443 Rc::new(move |data: String| {
444 if *ready_state == UseWebSocketReadyState::Open {
445 let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
446 if let Some(web_socket) = web_socket {
447 let _ = web_socket.send_with_str(&data);
448 }
449 }
450 })
451 };
452
453 let send_bytes = {
454 let ready_state = ready_state.clone();
455 let ws = ws.clone();
456 Rc::new(move |data: Vec<u8>| {
457 if *ready_state == UseWebSocketReadyState::Open {
458 let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
459 if let Some(web_socket) = web_socket {
460 let _ = web_socket.send_with_u8_array(&data);
461 }
462 }
463 })
464 };
465
466 let open = {
467 let reconnect_times_ref = reconnect_times_ref.clone();
468 let connect_ws = connect_ws.clone();
469 Rc::new(move || {
470 *reconnect_times_ref.borrow_mut() = 0;
471 let connect_ws: Rc<dyn Fn()> = { connect_ws.borrow().as_ref().unwrap().clone() };
472 connect_ws();
473 })
474 };
475
476 let close = {
477 let ws = ws.clone();
478 Rc::new(move || {
479 *reconnect_timer_ref.borrow_mut() = None;
480 *reconnect_times_ref.borrow_mut() = reconnect_limit;
481
482 let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
483 if let Some(web_socket) = web_socket {
484 let _ = web_socket.close();
485 }
486 })
487 };
488
489 {
490 let open = open.clone();
491 use_effect_with((url, manual), move |(_, manual)| {
492 if !*manual {
493 open();
494 }
495
496 || ()
497 });
498 }
499
500 {
501 let close = close.clone();
502 use_unmount(move || {
503 *unmounted_ref.borrow_mut() = true;
504 close();
505 });
506 }
507
508 UseWebSocketHandle {
509 ready_state,
510 message,
511 message_bytes,
512 ws,
513 open,
514 close,
515 send,
516 send_bytes,
517 }
518}