Skip to main content

yewlish_fetch_utils/
web_socket.rs

1use std::{cell::RefCell, rc::Rc};
2
3use serde::Serialize;
4use wasm_bindgen::prelude::Closure;
5use wasm_bindgen::JsCast;
6use web_sys::{CloseEvent, ErrorEvent, Event, MessageEvent, WebSocket};
7use yew::Callback;
8
9use crate::{deserialize_response, FetchError};
10
11#[derive(Debug, PartialEq, Clone)]
12pub struct WebSocketSubscriber<T>
13where
14    T: for<'de> serde::Deserialize<'de> + Clone + PartialEq + 'static,
15{
16    pub onopen: Callback<Event>,
17    pub onmessage: Callback<(MessageEvent, T)>,
18    pub onerror: Callback<FetchError>,
19    pub onclose: Callback<CloseEvent>,
20}
21
22#[derive(Debug, PartialEq)]
23pub struct WebSocketWatcher<T>
24where
25    T: for<'de> serde::Deserialize<'de> + Clone + PartialEq + 'static,
26{
27    url: String,
28    ws: Option<WebSocket>,
29    subscribers: Rc<RefCell<Vec<WebSocketSubscriber<T>>>>,
30}
31
32impl<T> WebSocketWatcher<T>
33where
34    T: for<'de> serde::Deserialize<'de> + Clone + PartialEq + 'static,
35{
36    #[must_use]
37    pub fn new(url: String) -> Self {
38        Self {
39            url,
40            ws: None,
41            subscribers: Rc::new(RefCell::new(Vec::new())),
42        }
43    }
44
45    pub fn subscribe(&mut self, subscriber: WebSocketSubscriber<T>) -> Result<(), FetchError> {
46        (*self.subscribers).borrow_mut().push(subscriber);
47
48        if self.ws.is_none() {
49            self.ws = WebSocket::new(&self.url)
50                .map_err(|error| FetchError::NetworkError(format!("{error:?}")))?
51                .into();
52        }
53
54        let Some(ws) = self.ws.as_ref() else {
55            return Err(FetchError::NetworkError(
56                "WebSocket connection is not established".to_string(),
57            ));
58        };
59
60        let subscribers = self.subscribers.clone();
61
62        let onopen_callback = Closure::wrap(Box::new(move |event: Event| {
63            for subscriber in subscribers.borrow().iter() {
64                subscriber.onopen.emit(event.clone());
65            }
66        }) as Box<dyn FnMut(_)>);
67
68        ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
69        onopen_callback.forget();
70        let subscribers = self.subscribers.clone();
71
72        let onmessage_callback = Closure::wrap(Box::new(move |event: MessageEvent| {
73            match event.data().dyn_into::<web_sys::js_sys::JsString>() {
74                Ok(message) => match deserialize_response::<T>(String::from(message).as_str()) {
75                    Ok(res) => {
76                        for subscriber in subscribers.borrow().iter() {
77                            subscriber.onmessage.emit((event.clone(), res.clone()));
78                        }
79                    }
80                    Err(err) => {
81                        for subscriber in subscribers.borrow().iter() {
82                            subscriber.onerror.emit(err.clone());
83                        }
84                    }
85                },
86                Err(err) => {
87                    for subscriber in subscribers.borrow().iter() {
88                        subscriber
89                            .onerror
90                            .emit(FetchError::ResponseDeserializationError(format!("{err:?}")));
91                    }
92                }
93            }
94        }) as Box<dyn FnMut(_)>);
95
96        ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
97        onmessage_callback.forget();
98        let subscribers = self.subscribers.clone();
99
100        let onerror_callback = Closure::wrap(Box::new(move |event: ErrorEvent| {
101            let error = FetchError::NetworkError(event.message().to_string());
102
103            for subscriber in subscribers.borrow().iter() {
104                subscriber.onerror.emit(error.clone());
105            }
106        }) as Box<dyn FnMut(_)>);
107
108        ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
109        onerror_callback.forget();
110        let subscribers = self.subscribers.clone();
111
112        let onclose_callback = Closure::wrap(Box::new(move |event: CloseEvent| {
113            for subscriber in subscribers.borrow().iter() {
114                subscriber.onclose.emit(event.clone());
115            }
116        }) as Box<dyn FnMut(_)>);
117
118        ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
119        onclose_callback.forget();
120
121        Ok(())
122    }
123
124    pub fn unsubscribe(&mut self, subscriber: &WebSocketSubscriber<T>) -> Result<(), FetchError> {
125        (*self.subscribers).borrow_mut().retain(|s| s != subscriber);
126
127        if self.subscribers.borrow().is_empty() {
128            let Some(ws) = self.ws.as_ref() else {
129                return Ok(());
130            };
131
132            if ws.ready_state() == WebSocket::OPEN {
133                ws.close()
134                    .map_err(|error| FetchError::NetworkError(format!("{error:?}")))?;
135            }
136        }
137
138        Ok(())
139    }
140
141    pub fn send<B>(&self, data: &B) -> Result<(), FetchError>
142    where
143        B: Serialize + Default + PartialEq,
144    {
145        let body_str = serde_json::to_string(&data)
146            .map_err(|error| FetchError::BodySerializationError(error.to_string()))?;
147
148        let Some(ws) = self.ws.as_ref() else {
149            return Err(FetchError::NetworkError(
150                "WebSocket connection is not established".to_string(),
151            ));
152        };
153
154        if ws.ready_state() != WebSocket::OPEN {
155            return Err(FetchError::NetworkError(
156                "WebSocket connection is not open".to_string(),
157            ));
158        }
159
160        ws.send_with_str(&body_str)
161            .map_err(|error| FetchError::NetworkError(format!("{error:?}")))?;
162
163        Ok(())
164    }
165
166    #[must_use]
167    pub fn get_ready_state(&self) -> Option<u16> {
168        let ws = self.ws.as_ref()?;
169        ws.ready_state().into()
170    }
171}
172
173#[derive(Default, Debug, Clone, PartialEq)]
174pub enum WsStatus {
175    Open,
176    #[default]
177    Closed,
178}