wasm_ws/meta.rs
1// Copyright (c) 2019-2022 Naja Melan
2// Copyright (c) 2023-2024 Yuki Kishimoto
3// Distributed under the MIT software license
4
5use std::fmt;
6use std::rc::Rc;
7
8use futures::StreamExt;
9use pharos::{Filter, Observable, Observe, ObserveConfig, PharErr, SharedPharos};
10use send_wrapper::SendWrapper;
11use wasm_bindgen::closure::Closure;
12use wasm_bindgen::{JsCast, UnwrapThrowExt};
13use web_sys::{BinaryType, CloseEvent as JsCloseEvt, DomException, WebSocket};
14
15use crate::{notify, CloseEvent, WsErr, WsEvent, WsState, WsStream};
16
17/// The meta data related to a websocket. Allows access to the methods on the WebSocket API.
18/// This is split from the `Stream`/`Sink` so you can pass the latter to a combinator whilst
19/// continuing to use this API.
20///
21/// A `WsMeta` instance is observable through the [`pharos::Observable`](https://docs.rs/pharos/0.4.3/pharos/trait.Observable.html)
22/// trait. The type of event is [WsEvent]. In the case of a Close event, there will be additional information included
23/// as a [CloseEvent].
24///
25/// When you drop this, the connection does not get closed, however when you drop [WsStream] it does.
26///
27/// Most of the methods on this type directly map to the web API. For more documentation, check the
28/// [MDN WebSocket documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket).
29pub struct WsMeta {
30 ws: SendWrapper<Rc<WebSocket>>,
31 pharos: SharedPharos<WsEvent>,
32}
33
34impl WsMeta {
35 const OPEN_CLOSE: Filter<WsEvent> =
36 Filter::Pointer(|evt: &WsEvent| evt.is_open() | evt.is_closed());
37
38 /// Connect to the server. The future will resolve when the connection has been established with a successful WebSocket
39 /// handshake.
40 pub async fn connect(url: impl AsRef<str>) -> Result<(Self, WsStream), WsErr> {
41 let ws = match WebSocket::new(url.as_ref()) {
42 Ok(ws) => SendWrapper::new(Rc::new(ws)),
43 Err(e) => {
44 let de: &DomException = e.unchecked_ref();
45 return match de.code() {
46 DomException::SYNTAX_ERR => Err(WsErr::InvalidUrl {
47 supplied: url.as_ref().to_string(),
48 }),
49 code => {
50 if code == 0 {
51 Err(WsErr::Other(
52 e.as_string().unwrap_or_else(|| String::from("None")),
53 ))
54 } else {
55 Err(WsErr::Dom(code))
56 }
57 }
58 };
59 }
60 };
61
62 // Create our pharos.
63 let mut pharos = SharedPharos::default();
64 let ph1 = pharos.clone();
65 let ph2 = pharos.clone();
66 let ph3 = pharos.clone();
67 let ph4 = pharos.clone();
68
69 // Setup our event listeners
70 let on_open = Closure::wrap(Box::new(move || {
71 // notify observers
72 //
73 notify(ph1.clone(), WsEvent::Open)
74 }) as Box<dyn FnMut()>);
75
76 // TODO: is there no information at all in an error?
77 //
78 #[allow(trivial_casts)]
79 //
80 let on_error = Closure::wrap(Box::new(move || {
81 // notify observers.
82 //
83 notify(ph2.clone(), WsEvent::Error)
84 }) as Box<dyn FnMut()>);
85
86 #[allow(trivial_casts)]
87 //
88 let on_close = Closure::wrap(Box::new(move |evt: JsCloseEvt| {
89 let c = WsEvent::Closed(CloseEvent {
90 code: evt.code(),
91 reason: evt.reason(),
92 was_clean: evt.was_clean(),
93 });
94
95 notify(ph3.clone(), c)
96 }) as Box<dyn FnMut(JsCloseEvt)>);
97
98 ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
99 ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
100 ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
101
102 // In case of future task cancellation the current task may be interrupted at an await, therefore not reaching
103 // the `WsStream` construction, whose `Drop` glue would have been responsible for unregistering the callbacks.
104 // We therefore use a guard to be responsible for unregistering the callbacks until the `WsStream` is
105 // constructed.
106 //
107 let guard = {
108 struct Guard<'lt> {
109 ws: &'lt WebSocket,
110 }
111
112 impl Drop for Guard<'_> {
113 fn drop(&mut self) {
114 self.ws.set_onopen(None);
115 self.ws.set_onclose(None);
116 self.ws.set_onerror(None);
117
118 // Check if connection is `OPEN`. Will cause a panic if is not `open`
119 if let Ok(WsState::Open) = self.ws.ready_state().try_into() {
120 let _ = self.ws.close();
121 }
122
123 println!(
124 "WsMeta::connect future was dropped while connecting to: {}.",
125 self.ws.url()
126 );
127 }
128 }
129
130 Guard { ws: &ws }
131 };
132
133 // Listen to the events to figure out whether the connection opens successfully. We don't want to deal with
134 // the error event. Either a close event happens, in which case we want to recover the CloseEvent to return it
135 // to the user, or an Open event happens in which case we are happy campers.
136 //
137 let mut evts = pharos
138 .observe(Self::OPEN_CLOSE.into())
139 .await
140 .expect("we didn't close pharos");
141
142 // If the connection is closed, return error
143 //
144 if let Some(WsEvent::Closed(evt)) = evts.next().await {
145 return Err(WsErr::ConnectionFailed { event: evt });
146 }
147
148 // We have now passed all the `await` points in this function and so the `WsStream` construction is guaranteed
149 // so we let it take over the responsibility of unregistering the callbacks by disabling our guard.
150 //
151 std::mem::forget(guard);
152
153 // We don't handle Blob's
154 //
155 ws.set_binary_type(BinaryType::Arraybuffer);
156
157 Ok((
158 Self {
159 pharos,
160 ws: ws.clone(),
161 },
162 WsStream::new(
163 ws,
164 ph4,
165 SendWrapper::new(on_open),
166 SendWrapper::new(on_error),
167 SendWrapper::new(on_close),
168 ),
169 ))
170 }
171
172 /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`.
173 /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
174 pub async fn close(&self) -> Result<CloseEvent, WsErr> {
175 match self.ready_state() {
176 WsState::Closed => return Err(WsErr::ConnectionNotOpen),
177 WsState::Closing => {}
178 WsState::Open => {
179 let _ = self.ws.close();
180
181 // Notify Observers
182 notify(self.pharos.clone(), WsEvent::Closing)
183 }
184 WsState::Connecting => {
185 // Notify Observers
186 notify(self.pharos.clone(), WsEvent::Closing)
187 }
188 }
189
190 let mut evts = match self
191 .pharos
192 .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
193 .await
194 {
195 Ok(events) => events,
196 Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
197 };
198
199 // We promised the user a CloseEvent, so we don't have much choice but to unwrap this. In any case, the stream will
200 // never end and this will hang if the browser fails to send a close event.
201 //
202 let ce = evts.next().await.expect_throw("receive a close event");
203
204 if let WsEvent::Closed(e) = ce {
205 Ok(e)
206 } else {
207 unreachable!()
208 }
209 }
210
211 /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`.
212 /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
213 //
214 pub async fn close_code(&self, code: u16) -> Result<CloseEvent, WsErr> {
215 match self.ready_state() {
216 WsState::Closed => return Err(WsErr::ConnectionNotOpen),
217 WsState::Closing => {}
218
219 _ => {
220 match self.ws.close_with_code(code) {
221 // Notify Observers
222 Ok(_) => notify(self.pharos.clone(), WsEvent::Closing),
223
224 Err(_) => {
225 return Err(WsErr::InvalidCloseCode { supplied: code });
226 }
227 }
228 }
229 }
230
231 let mut evts = match self
232 .pharos
233 .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
234 .await
235 {
236 Ok(events) => events,
237 Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
238 };
239
240 let ce = evts.next().await.expect_throw("receive a close event");
241
242 if let WsEvent::Closed(e) = ce {
243 Ok(e)
244 } else {
245 unreachable!()
246 }
247 }
248
249 /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`.
250 /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
251 //
252 pub async fn close_reason(
253 &self,
254 code: u16,
255 reason: impl AsRef<str>,
256 ) -> Result<CloseEvent, WsErr> {
257 match self.ready_state() {
258 WsState::Closed => return Err(WsErr::ConnectionNotOpen),
259 WsState::Closing => {}
260
261 _ => {
262 if reason.as_ref().len() > 123 {
263 return Err(WsErr::ReasonStringToLong);
264 }
265
266 match self.ws.close_with_code_and_reason(code, reason.as_ref()) {
267 // Notify Observers
268 Ok(_) => notify(self.pharos.clone(), WsEvent::Closing),
269
270 Err(_) => return Err(WsErr::InvalidCloseCode { supplied: code }),
271 }
272 }
273 }
274
275 let mut evts = match self
276 .pharos
277 .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
278 .await
279 {
280 Ok(events) => events,
281 Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
282 };
283
284 let ce = evts.next().await.expect_throw("receive a close event");
285
286 if let WsEvent::Closed(e) = ce {
287 Ok(e)
288 } else {
289 unreachable!()
290 }
291 }
292
293 /// Verify the [WsState] of the connection.
294 //
295 pub fn ready_state(&self) -> WsState {
296 self.ws
297 .ready_state()
298 .try_into()
299 // This can't throw unless the browser gives us an invalid ready state.
300 .expect_throw("Convert ready state from browser API")
301 }
302
303 /// Access the wrapped [web_sys::WebSocket](https://docs.rs/web-sys/0.3.25/web_sys/struct.WebSocket.html) directly.
304 ///
305 /// _ws_stream_wasm_ tries to expose all useful functionality through an idiomatic rust API, so hopefully
306 /// you won't need this, however if I missed something, you can.
307 ///
308 /// ## Caveats
309 /// If you call `set_onopen`, `set_onerror`, `set_onmessage` or `set_onclose` on this, you will overwrite
310 /// the event listeners from `ws_stream_wasm`, and things will break.
311 //
312 pub fn wrapped(&self) -> &WebSocket {
313 &self.ws
314 }
315
316 /// The number of bytes of data that have been queued but not yet transmitted to the network.
317 ///
318 /// **NOTE:** that this is the number of bytes buffered by the underlying platform WebSocket
319 /// implementation. It does not reflect any buffering performed by _ws_stream_wasm_.
320 //
321 pub fn buffered_amount(&self) -> u32 {
322 self.ws.buffered_amount()
323 }
324
325 /// The extensions selected by the server as negotiated during the connection.
326 ///
327 /// **NOTE**: This is an untested feature. The back-end server we use for testing (_tungstenite_)
328 /// does not support Extensions.
329 //
330 pub fn extensions(&self) -> String {
331 self.ws.extensions()
332 }
333
334 /// The name of the sub-protocol the server selected during the connection.
335 ///
336 /// This will be one of the strings specified in the protocols parameter when
337 /// creating this WsMeta instance.
338 //
339 pub fn protocol(&self) -> String {
340 self.ws.protocol()
341 }
342
343 /// Retrieve the address to which this socket is connected.
344 //
345 pub fn url(&self) -> String {
346 self.ws.url()
347 }
348}
349
350impl fmt::Debug for WsMeta {
351 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352 write!(f, "WsMeta for connection: {}", self.url())
353 }
354}
355
356impl Observable<WsEvent> for WsMeta {
357 type Error = PharErr;
358
359 fn observe(&mut self, options: ObserveConfig<WsEvent>) -> Observe<'_, WsEvent, Self::Error> {
360 self.pharos.observe(options)
361 }
362}