yew_hooks/hooks/
use_websocket.rs

1use std::{cell::RefCell, rc::Rc};
2
3use gloo::timers::callback::Timeout;
4use js_sys::Array;
5use wasm_bindgen::{prelude::*, JsCast, JsValue};
6use web_sys::{BinaryType, MessageEvent, WebSocket};
7use yew::prelude::*;
8
9use super::{use_mut_latest, use_state_ptr_eq, use_unmount, UseStatePtrEqHandle};
10
11pub use web_sys::CloseEvent;
12
13/// The current state of the `WebSocket` connection.
14#[derive(Debug, PartialEq, Eq, Clone)]
15pub enum UseWebSocketReadyState {
16    Connecting,
17    Open,
18    Closing,
19    Closed,
20}
21
22/// Options for `WebSocket`.
23#[derive(Default)]
24pub struct UseWebSocketOptions {
25    /// `WebSocket` connect callback.
26    pub onopen: Option<Box<dyn FnMut(Event)>>,
27    /// `WebSocket` message callback for text.
28    pub onmessage: Option<Box<dyn FnMut(String)>>,
29    /// `WebSocket` message callback for binary.
30    pub onmessage_bytes: Option<Box<dyn FnMut(Vec<u8>)>>,
31    /// `WebSocket` error callback.
32    pub onerror: Option<Box<dyn FnMut(Event)>>,
33    /// `WebSocket` close callback.
34    pub onclose: Option<Box<dyn FnMut(CloseEvent)>>,
35
36    /// Retry times. Defaults to `u32::MAX` for infinite retries.
37    pub reconnect_limit: Option<u32>,
38    /// Retry interval(ms). Defaults to 3000.
39    pub reconnect_interval: Option<u32>,
40    /// Manually starts connection
41    pub manual: Option<bool>,
42    /// Sub protocols
43    pub protocols: Option<Vec<String>>,
44}
45
46/// State handle for the [`use_websocket`] hook.
47pub struct UseWebSocketHandle {
48    /// The current state of the `WebSocket` connection.
49    pub ready_state: UseStateHandle<UseWebSocketReadyState>,
50    /// Latest text message received from `WebSocket`.
51    pub message: UseStatePtrEqHandle<Option<String>>,
52    /// Latest binary message received from `WebSocket`.
53    pub message_bytes: UseStatePtrEqHandle<Option<Vec<u8>>>,
54    /// The `WebSocket` instance.
55    pub ws: Rc<RefCell<Option<WebSocket>>>,
56
57    open: Rc<dyn Fn()>,
58    close: Rc<dyn Fn()>,
59    send: Rc<dyn Fn(String)>,
60    send_bytes: Rc<dyn Fn(Vec<u8>)>,
61}
62
63impl UseWebSocketHandle {
64    /// Connect `WebSocket` manually. If already connected, close the current one and reconnect.
65    pub fn open(&self) {
66        (self.open)();
67    }
68
69    /// Disconnect `WebSocket` manually.
70    pub fn close(&self) {
71        (self.close)();
72    }
73
74    /// Send text message to `WebSocket`.
75    pub fn send(&self, data: String) {
76        (self.send)(data);
77    }
78
79    /// Send binary message to `WebSocket`.
80    pub fn send_bytes(&self, data: Vec<u8>) {
81        (self.send_bytes)(data);
82    }
83}
84
85impl Clone for UseWebSocketHandle {
86    fn clone(&self) -> Self {
87        Self {
88            ready_state: self.ready_state.clone(),
89            message: self.message.clone(),
90            message_bytes: self.message_bytes.clone(),
91            ws: self.ws.clone(),
92
93            open: self.open.clone(),
94            close: self.close.clone(),
95            send: self.send.clone(),
96            send_bytes: self.send_bytes.clone(),
97        }
98    }
99}
100
101/// This hook communicates with `WebSocket`.
102///
103/// # Example
104///
105/// ```rust
106/// # use yew::prelude::*;
107/// #
108/// use yew_hooks::prelude::*;
109///
110/// #[function_component(UseWebSocket)]
111/// fn web_socket() -> Html {
112///     let history = use_list(vec![]);
113///
114///     let ws = use_websocket("wss://echo.websocket.events/".to_string());
115///     let onclick = {
116///         let ws = ws.clone();
117///         let history = history.clone();
118///         Callback::from(move |_| {
119///             let message = "Hello, world!".to_string();
120///             ws.send(message.clone());
121///             history.push(format!("[send]: {}", message));
122///         })
123///     };
124///     {
125///         let history = history.clone();
126///         let ws = ws.clone();
127///         // Receive message by depending on `ws.message`.
128///         use_effect_with(
129///             ws.message,
130///             move |message| {
131///                 if let Some(message) = &**message {
132///                     history.push(format!("[recv]: {}", message.clone()));
133///                 }
134///                 || ()
135///             },
136///         );
137///     }
138///
139///     html! {
140///         <>
141///             <p>
142///                 <button {onclick} disabled={*ws.ready_state != UseWebSocketReadyState::Open}>{ "Send" }</button>
143///             </p>
144///             <p>
145///                 <b>{ "Message history: " }</b>
146///             </p>
147///             {
148///                 for history.current().iter().map(|message| {
149///                     html! {
150///                         <p>{ message }</p>
151///                     }
152///                 })
153///             }
154///         </>
155///     }
156/// }
157/// ```
158#[hook]
159pub fn use_websocket(url: String) -> UseWebSocketHandle {
160    use_websocket_with_options(url, UseWebSocketOptions::default())
161}
162
163/// This hook communicates with `WebSocket` with options.
164///
165/// # Example
166///
167/// ```rust
168/// # use yew::prelude::*;
169/// #
170/// use yew_hooks::prelude::*;
171///
172/// #[function_component(UseWebSocket)]
173/// fn web_socket() -> Html {
174///     let history = use_list(vec![]);
175///
176///     let ws = {
177///         let history = history.clone();
178///         use_websocket_with_options(
179///             "wss://echo.websocket.events/".to_string(),
180///             UseWebSocketOptions {
181///                 // Receive message by callback `onmessage`.
182///                 onmessage: Some(Box::new(move |message| {
183///                     history.push(format!("[recv]: {}", message));
184///                 })),
185///                 manual: Some(true),
186///                 ..Default::default()
187///             },
188///         )
189///     };
190///     let onclick = {
191///         let ws = ws.clone();
192///         let history = history.clone();
193///         Callback::from(move |_| {
194///             let message = "Hello, world!".to_string();
195///             ws.send(message.clone());
196///             history.push(format!("[send]: {}", message));
197///         })
198///     };
199///     let onopen = {
200///         let ws = ws.clone();
201///         Callback::from(move |_| {
202///             ws.open();
203///         })
204///     };
205///
206///     html! {
207///         <>
208///             <p>
209///                 <button onclick={onopen} disabled={*ws.ready_state != UseWebSocketReadyState::Closed}>{ "Connect" }</button>
210///                 <button {onclick} disabled={*ws.ready_state != UseWebSocketReadyState::Open}>{ "Send with options" }</button>
211///             </p>
212///             <p>
213///                 <b>{ "Message history: " }</b>
214///             </p>
215///             {
216///                 for history.current().iter().map(|message| {
217///                     html! {
218///                         <p>{ message }</p>
219///                     }
220///                 })
221///             }
222///         </>
223///     }
224/// }
225/// ```
226#[hook]
227pub fn use_websocket_with_options(url: String, options: UseWebSocketOptions) -> UseWebSocketHandle {
228    let ready_state = use_state(|| UseWebSocketReadyState::Closed);
229    let message = use_state_ptr_eq(|| None);
230    let message_bytes = use_state_ptr_eq(|| None);
231    let ws = use_mut_ref(|| None);
232
233    let onopen_ref = use_mut_latest(options.onopen);
234    let onmessage_ref = use_mut_latest(options.onmessage);
235    let onmessage_bytes_ref = use_mut_latest(options.onmessage_bytes);
236    let onerror_ref = use_mut_latest(options.onerror);
237    let onclose_ref = use_mut_latest(options.onclose);
238    let reconnect_limit = options.reconnect_limit.unwrap_or(u32::MAX);
239    let reconnect_interval = options.reconnect_interval.unwrap_or(3 * 1000);
240    let manual = options.manual.unwrap_or(false);
241    let protocols = options.protocols;
242
243    let reconnect_times_ref = use_mut_ref(|| 0);
244    let reconnect_timer_ref = use_mut_ref(|| None);
245    let unmounted_ref = use_mut_ref(|| false);
246
247    let reconnect = use_mut_ref(|| None);
248    let connect_ws = use_mut_ref(|| None);
249
250    *reconnect.borrow_mut() = {
251        let ws = ws.clone();
252        let reconnect_times_ref = reconnect_times_ref.clone();
253        let reconnect_timer_ref = reconnect_timer_ref.clone();
254        let connect_ws = connect_ws.clone();
255        Some(Rc::new(move || {
256            if *reconnect_times_ref.borrow() < reconnect_limit
257                && ws
258                    .borrow()
259                    .as_ref()
260                    .is_some_and(|ws: &WebSocket| ws.ready_state() != WebSocket::OPEN)
261            {
262                let connect_ws = connect_ws.clone();
263                let reconnect_times_ref = reconnect_times_ref.clone();
264                *reconnect_timer_ref.borrow_mut() =
265                    Some(Timeout::new(reconnect_interval, move || {
266                        let connect_ws = {
267                            let connect_ws = connect_ws.borrow();
268                            let connect_ws: &Rc<dyn Fn()> = connect_ws.as_ref().unwrap();
269                            connect_ws.clone()
270                        };
271                        connect_ws();
272                        *reconnect_times_ref.borrow_mut() += 1;
273                    }));
274            }
275        }) as Rc<dyn Fn()>)
276    };
277
278    *connect_ws.borrow_mut() = {
279        let ws = ws.clone();
280        let ready_state = ready_state.clone();
281        let message = message.clone();
282        let message_bytes = message_bytes.clone();
283        let url = url.clone();
284        let reconnect = reconnect.clone();
285        let unmounted_ref = unmounted_ref.clone();
286        let onopen_ref = onopen_ref.clone();
287        let onmessage_ref = onmessage_ref.clone();
288        let onmessage_bytes_ref = onmessage_bytes_ref.clone();
289        let onerror_ref = onerror_ref.clone();
290        let onclose_ref = onclose_ref.clone();
291        let reconnect_timer_ref = reconnect_timer_ref.clone();
292
293        Some(Rc::new(move || {
294            *reconnect_timer_ref.borrow_mut() = None;
295
296            {
297                let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
298                if let Some(web_socket) = web_socket {
299                    let _ = web_socket.close();
300                }
301            }
302
303            let web_socket = {
304                protocols.as_ref().map_or_else(
305                    || WebSocket::new(&url).unwrap_throw(),
306                    |protocols| {
307                        let array = protocols
308                            .iter()
309                            .map(|p| JsValue::from(p.clone()))
310                            .collect::<Array>();
311                        WebSocket::new_with_str_sequence(&url, &JsValue::from(&array))
312                            .unwrap_throw()
313                    },
314                )
315            };
316            web_socket.set_binary_type(BinaryType::Arraybuffer);
317            ready_state.set(UseWebSocketReadyState::Connecting);
318
319            {
320                let unmounted_ref = unmounted_ref.clone();
321                let ready_state = ready_state.clone();
322                let onopen_ref = onopen_ref.clone();
323                let onopen_closure = Closure::wrap(Box::new(move |e: Event| {
324                    if *unmounted_ref.borrow() {
325                        return;
326                    }
327
328                    let onopen_ref = onopen_ref.current();
329                    let onopen = &mut *onopen_ref.borrow_mut();
330                    if let Some(onopen) = onopen {
331                        onopen(e);
332                    }
333                    ready_state.set(UseWebSocketReadyState::Open);
334                }) as Box<dyn FnMut(Event)>);
335                web_socket.set_onopen(Some(onopen_closure.as_ref().unchecked_ref()));
336                // Forget the closure to keep it alive
337                onopen_closure.forget();
338            }
339
340            {
341                let unmounted_ref = unmounted_ref.clone();
342                let message_bytes = message_bytes.clone();
343                let message = message.clone();
344                let onmessage_ref = onmessage_ref.clone();
345                let onmessage_bytes_ref = onmessage_bytes_ref.clone();
346                let onmessage_closure = Closure::wrap(Box::new(move |e: MessageEvent| {
347                    if *unmounted_ref.borrow() {
348                        return;
349                    }
350
351                    e.data().dyn_into::<js_sys::ArrayBuffer>().map_or_else(
352                        |_| {
353                            e.data().dyn_into::<js_sys::JsString>().map_or_else(
354                                |_| {
355                                    unreachable!("message event, received Unknown: {:?}", e.data());
356                                },
357                                |txt| {
358                                    let txt = String::from(&txt);
359                                    let onmessage_ref = onmessage_ref.current();
360                                    let onmessage = &mut *onmessage_ref.borrow_mut();
361                                    if let Some(onmessage) = onmessage {
362                                        let txt = txt.clone();
363                                        onmessage(txt);
364                                    }
365                                    message.set(Some(txt));
366                                },
367                            );
368                        },
369                        |array_buffer| {
370                            let array = js_sys::Uint8Array::new(&array_buffer);
371                            let array = array.to_vec();
372                            let onmessage_bytes_ref = onmessage_bytes_ref.current();
373                            let onmessage_bytes = &mut *onmessage_bytes_ref.borrow_mut();
374                            if let Some(onmessage_bytes) = onmessage_bytes {
375                                let array = array.clone();
376                                onmessage_bytes(array);
377                            }
378                            message_bytes.set(Some(array));
379                        },
380                    );
381                })
382                    as Box<dyn FnMut(MessageEvent)>);
383                web_socket.set_onmessage(Some(onmessage_closure.as_ref().unchecked_ref()));
384                onmessage_closure.forget();
385            }
386
387            {
388                let unmounted_ref = unmounted_ref.clone();
389                let ready_state = ready_state.clone();
390                let onerror_ref = onerror_ref.clone();
391                let reconnect = reconnect.clone();
392                let onerror_closure = Closure::wrap(Box::new(move |e: Event| {
393                    if *unmounted_ref.borrow() {
394                        return;
395                    }
396
397                    let reconnect: Rc<dyn Fn()> = { reconnect.borrow().as_ref().unwrap().clone() };
398                    reconnect();
399
400                    let onerror_ref = onerror_ref.current();
401                    let onerror = &mut *onerror_ref.borrow_mut();
402                    if let Some(onerror) = onerror {
403                        onerror(e);
404                    }
405                    ready_state.set(UseWebSocketReadyState::Closed);
406                }) as Box<dyn FnMut(Event)>);
407                web_socket.set_onerror(Some(onerror_closure.as_ref().unchecked_ref()));
408                onerror_closure.forget();
409            }
410
411            {
412                let unmounted_ref = unmounted_ref.clone();
413                let ready_state = ready_state.clone();
414                let onclose_ref = onclose_ref.clone();
415                let reconnect = reconnect.clone();
416                let onclose_closure = Closure::wrap(Box::new(move |e: CloseEvent| {
417                    if *unmounted_ref.borrow() {
418                        return;
419                    }
420
421                    let reconnect: Rc<dyn Fn()> = { reconnect.borrow().as_ref().unwrap().clone() };
422                    reconnect();
423
424                    let onclose_ref = onclose_ref.current();
425                    let onclose = &mut *onclose_ref.borrow_mut();
426                    if let Some(onclose) = onclose {
427                        onclose(e);
428                    }
429                    ready_state.set(UseWebSocketReadyState::Closed);
430                })
431                    as Box<dyn FnMut(CloseEvent)>);
432                web_socket.set_onclose(Some(onclose_closure.as_ref().unchecked_ref()));
433                onclose_closure.forget();
434            }
435
436            *ws.borrow_mut() = Some(web_socket);
437        }) as Rc<dyn Fn()>)
438    };
439
440    let send = {
441        let ready_state = ready_state.clone();
442        let ws = ws.clone();
443        Rc::new(move |data: String| {
444            if *ready_state == UseWebSocketReadyState::Open {
445                let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
446                if let Some(web_socket) = web_socket {
447                    let _ = web_socket.send_with_str(&data);
448                }
449            }
450        })
451    };
452
453    let send_bytes = {
454        let ready_state = ready_state.clone();
455        let ws = ws.clone();
456        Rc::new(move |data: Vec<u8>| {
457            if *ready_state == UseWebSocketReadyState::Open {
458                let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
459                if let Some(web_socket) = web_socket {
460                    let _ = web_socket.send_with_u8_array(&data);
461                }
462            }
463        })
464    };
465
466    let open = {
467        let reconnect_times_ref = reconnect_times_ref.clone();
468        let connect_ws = connect_ws.clone();
469        Rc::new(move || {
470            *reconnect_times_ref.borrow_mut() = 0;
471            let connect_ws: Rc<dyn Fn()> = { connect_ws.borrow().as_ref().unwrap().clone() };
472            connect_ws();
473        })
474    };
475
476    let close = {
477        let ws = ws.clone();
478        Rc::new(move || {
479            *reconnect_timer_ref.borrow_mut() = None;
480            *reconnect_times_ref.borrow_mut() = reconnect_limit;
481
482            let web_socket: &mut Option<WebSocket> = &mut ws.borrow_mut();
483            if let Some(web_socket) = web_socket {
484                let _ = web_socket.close();
485            }
486        })
487    };
488
489    {
490        let open = open.clone();
491        use_effect_with((url, manual), move |(_, manual)| {
492            if !*manual {
493                open();
494            }
495
496            || ()
497        });
498    }
499
500    {
501        let close = close.clone();
502        use_unmount(move || {
503            *unmounted_ref.borrow_mut() = true;
504            close();
505        });
506    }
507
508    UseWebSocketHandle {
509        ready_state,
510        message,
511        message_bytes,
512        ws,
513        open,
514        close,
515        send,
516        send_bytes,
517    }
518}