wasm_web_helpers/
websocket.rs1pub use gloo_utils::errors::JsError;
2pub use reqwasm::websocket::{futures::WebSocket, Message, WebSocketError};
3
4use futures_channel::mpsc;
5use futures_util::{stream::StreamExt, SinkExt};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum ControlFlow {
9 Break,
10 Continue,
11}
12
13impl From<()> for ControlFlow {
14 fn from(_: ()) -> Self {
15 Self::Continue
16 }
17}
18
19#[derive(Clone)]
20pub struct WebSocketService {
21 sender: mpsc::UnboundedSender<Message>,
22}
23
24impl WebSocketService {
25 pub fn open<S, R>(
26 url: impl AsRef<str>,
27 send_callback: impl Fn(Result<(), WebSocketError>) -> S + 'static,
28 receive_callback: impl Fn(Result<Message, WebSocketError>) -> R + 'static,
29 close_send_callback: impl FnOnce() + 'static,
30 close_receive_callback: impl FnOnce() + 'static,
31 ) -> Result<Self, JsError>
32 where
33 S: Into<ControlFlow>,
34 R: Into<ControlFlow>,
35 {
36 let ws = WebSocket::open(url.as_ref())?;
37 let (mut sink, mut stream) = ws.split();
38 let (sender, mut receiver) = mpsc::unbounded();
39
40 wasm_bindgen_futures::spawn_local(async move {
41 while let Some(msg) = receiver.next().await {
42 if send_callback(sink.send(msg).await).into() == ControlFlow::Break {
43 break;
44 }
45 }
46 close_send_callback();
47 });
48
49 wasm_bindgen_futures::spawn_local(async move {
50 while let Some(msg) = stream.next().await {
51 if receive_callback(msg).into() == ControlFlow::Break {
52 break;
53 }
54 }
55 close_receive_callback();
56 });
57
58 Ok(Self { sender })
59 }
60
61 pub fn send(&mut self, msg: Message) -> Result<(), mpsc::TrySendError<Message>> {
62 self.sender.unbounded_send(msg)
63 }
64}