wasm_web_helpers/
websocket.rs

1pub use gloo_utils::errors::JsError;
2pub use reqwasm::websocket::{futures::WebSocket, Message, WebSocketError};
3
4use futures_channel::mpsc;
5use futures_util::{stream::StreamExt, SinkExt};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum ControlFlow {
9    Break,
10    Continue,
11}
12
13impl From<()> for ControlFlow {
14    fn from(_: ()) -> Self {
15        Self::Continue
16    }
17}
18
19#[derive(Clone)]
20pub struct WebSocketService {
21    sender: mpsc::UnboundedSender<Message>,
22}
23
24impl WebSocketService {
25    pub fn open<S, R>(
26        url: impl AsRef<str>,
27        send_callback: impl Fn(Result<(), WebSocketError>) -> S + 'static,
28        receive_callback: impl Fn(Result<Message, WebSocketError>) -> R + 'static,
29        close_send_callback: impl FnOnce() + 'static,
30        close_receive_callback: impl FnOnce() + 'static,
31    ) -> Result<Self, JsError>
32    where
33        S: Into<ControlFlow>,
34        R: Into<ControlFlow>,
35    {
36        let ws = WebSocket::open(url.as_ref())?;
37        let (mut sink, mut stream) = ws.split();
38        let (sender, mut receiver) = mpsc::unbounded();
39
40        wasm_bindgen_futures::spawn_local(async move {
41            while let Some(msg) = receiver.next().await {
42                if send_callback(sink.send(msg).await).into() == ControlFlow::Break {
43                    break;
44                }
45            }
46            close_send_callback();
47        });
48
49        wasm_bindgen_futures::spawn_local(async move {
50            while let Some(msg) = stream.next().await {
51                if receive_callback(msg).into() == ControlFlow::Break {
52                    break;
53                }
54            }
55            close_receive_callback();
56        });
57
58        Ok(Self { sender })
59    }
60
61    pub fn send(&mut self, msg: Message) -> Result<(), mpsc::TrySendError<Message>> {
62        self.sender.unbounded_send(msg)
63    }
64}