yewlish_fetch_utils/
web_socket.rs1use std::{cell::RefCell, rc::Rc};
2
3use serde::Serialize;
4use wasm_bindgen::prelude::Closure;
5use wasm_bindgen::JsCast;
6use web_sys::{CloseEvent, ErrorEvent, Event, MessageEvent, WebSocket};
7use yew::Callback;
8
9use crate::{deserialize_response, FetchError};
10
11#[derive(Debug, PartialEq, Clone)]
12pub struct WebSocketSubscriber<T>
13where
14 T: for<'de> serde::Deserialize<'de> + Clone + PartialEq + 'static,
15{
16 pub onopen: Callback<Event>,
17 pub onmessage: Callback<(MessageEvent, T)>,
18 pub onerror: Callback<FetchError>,
19 pub onclose: Callback<CloseEvent>,
20}
21
22#[derive(Debug, PartialEq)]
23pub struct WebSocketWatcher<T>
24where
25 T: for<'de> serde::Deserialize<'de> + Clone + PartialEq + 'static,
26{
27 url: String,
28 ws: Option<WebSocket>,
29 subscribers: Rc<RefCell<Vec<WebSocketSubscriber<T>>>>,
30}
31
32impl<T> WebSocketWatcher<T>
33where
34 T: for<'de> serde::Deserialize<'de> + Clone + PartialEq + 'static,
35{
36 #[must_use]
37 pub fn new(url: String) -> Self {
38 Self {
39 url,
40 ws: None,
41 subscribers: Rc::new(RefCell::new(Vec::new())),
42 }
43 }
44
45 pub fn subscribe(&mut self, subscriber: WebSocketSubscriber<T>) -> Result<(), FetchError> {
46 (*self.subscribers).borrow_mut().push(subscriber);
47
48 if self.ws.is_none() {
49 self.ws = WebSocket::new(&self.url)
50 .map_err(|error| FetchError::NetworkError(format!("{error:?}")))?
51 .into();
52 }
53
54 let Some(ws) = self.ws.as_ref() else {
55 return Err(FetchError::NetworkError(
56 "WebSocket connection is not established".to_string(),
57 ));
58 };
59
60 let subscribers = self.subscribers.clone();
61
62 let onopen_callback = Closure::wrap(Box::new(move |event: Event| {
63 for subscriber in subscribers.borrow().iter() {
64 subscriber.onopen.emit(event.clone());
65 }
66 }) as Box<dyn FnMut(_)>);
67
68 ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
69 onopen_callback.forget();
70 let subscribers = self.subscribers.clone();
71
72 let onmessage_callback = Closure::wrap(Box::new(move |event: MessageEvent| {
73 match event.data().dyn_into::<web_sys::js_sys::JsString>() {
74 Ok(message) => match deserialize_response::<T>(String::from(message).as_str()) {
75 Ok(res) => {
76 for subscriber in subscribers.borrow().iter() {
77 subscriber.onmessage.emit((event.clone(), res.clone()));
78 }
79 }
80 Err(err) => {
81 for subscriber in subscribers.borrow().iter() {
82 subscriber.onerror.emit(err.clone());
83 }
84 }
85 },
86 Err(err) => {
87 for subscriber in subscribers.borrow().iter() {
88 subscriber
89 .onerror
90 .emit(FetchError::ResponseDeserializationError(format!("{err:?}")));
91 }
92 }
93 }
94 }) as Box<dyn FnMut(_)>);
95
96 ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
97 onmessage_callback.forget();
98 let subscribers = self.subscribers.clone();
99
100 let onerror_callback = Closure::wrap(Box::new(move |event: ErrorEvent| {
101 let error = FetchError::NetworkError(event.message().to_string());
102
103 for subscriber in subscribers.borrow().iter() {
104 subscriber.onerror.emit(error.clone());
105 }
106 }) as Box<dyn FnMut(_)>);
107
108 ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
109 onerror_callback.forget();
110 let subscribers = self.subscribers.clone();
111
112 let onclose_callback = Closure::wrap(Box::new(move |event: CloseEvent| {
113 for subscriber in subscribers.borrow().iter() {
114 subscriber.onclose.emit(event.clone());
115 }
116 }) as Box<dyn FnMut(_)>);
117
118 ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
119 onclose_callback.forget();
120
121 Ok(())
122 }
123
124 pub fn unsubscribe(&mut self, subscriber: &WebSocketSubscriber<T>) -> Result<(), FetchError> {
125 (*self.subscribers).borrow_mut().retain(|s| s != subscriber);
126
127 if self.subscribers.borrow().is_empty() {
128 let Some(ws) = self.ws.as_ref() else {
129 return Ok(());
130 };
131
132 if ws.ready_state() == WebSocket::OPEN {
133 ws.close()
134 .map_err(|error| FetchError::NetworkError(format!("{error:?}")))?;
135 }
136 }
137
138 Ok(())
139 }
140
141 pub fn send<B>(&self, data: &B) -> Result<(), FetchError>
142 where
143 B: Serialize + Default + PartialEq,
144 {
145 let body_str = serde_json::to_string(&data)
146 .map_err(|error| FetchError::BodySerializationError(error.to_string()))?;
147
148 let Some(ws) = self.ws.as_ref() else {
149 return Err(FetchError::NetworkError(
150 "WebSocket connection is not established".to_string(),
151 ));
152 };
153
154 if ws.ready_state() != WebSocket::OPEN {
155 return Err(FetchError::NetworkError(
156 "WebSocket connection is not open".to_string(),
157 ));
158 }
159
160 ws.send_with_str(&body_str)
161 .map_err(|error| FetchError::NetworkError(format!("{error:?}")))?;
162
163 Ok(())
164 }
165
166 #[must_use]
167 pub fn get_ready_state(&self) -> Option<u16> {
168 let ws = self.ws.as_ref()?;
169 ws.ready_state().into()
170 }
171}
172
173#[derive(Default, Debug, Clone, PartialEq)]
174pub enum WsStatus {
175 Open,
176 #[default]
177 Closed,
178}