wasm_ws/
meta.rs

1// Copyright (c) 2019-2022 Naja Melan
2// Copyright (c) 2023-2024 Yuki Kishimoto
3// Distributed under the MIT software license
4
5use std::fmt;
6use std::rc::Rc;
7
8use futures::StreamExt;
9use pharos::{Filter, Observable, Observe, ObserveConfig, PharErr, SharedPharos};
10use send_wrapper::SendWrapper;
11use wasm_bindgen::closure::Closure;
12use wasm_bindgen::{JsCast, UnwrapThrowExt};
13use web_sys::{BinaryType, CloseEvent as JsCloseEvt, DomException, WebSocket};
14
15use crate::{notify, CloseEvent, WsErr, WsEvent, WsState, WsStream};
16
17/// The meta data related to a websocket. Allows access to the methods on the WebSocket API.
18/// This is split from the `Stream`/`Sink` so you can pass the latter to a combinator whilst
19/// continuing to use this API.
20///
21/// A `WsMeta` instance is observable through the [`pharos::Observable`](https://docs.rs/pharos/0.4.3/pharos/trait.Observable.html)
22/// trait. The type of event is [WsEvent]. In the case of a Close event, there will be additional information included
23/// as a [CloseEvent].
24///
25/// When you drop this, the connection does not get closed, however when you drop [WsStream] it does.
26///
27/// Most of the methods on this type directly map to the web API. For more documentation, check the
28/// [MDN WebSocket documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket).
29pub struct WsMeta {
30    ws: SendWrapper<Rc<WebSocket>>,
31    pharos: SharedPharos<WsEvent>,
32}
33
34impl WsMeta {
35    const OPEN_CLOSE: Filter<WsEvent> =
36        Filter::Pointer(|evt: &WsEvent| evt.is_open() | evt.is_closed());
37
38    /// Connect to the server. The future will resolve when the connection has been established with a successful WebSocket
39    /// handshake.
40    pub async fn connect(url: impl AsRef<str>) -> Result<(Self, WsStream), WsErr> {
41        let ws = match WebSocket::new(url.as_ref()) {
42            Ok(ws) => SendWrapper::new(Rc::new(ws)),
43            Err(e) => {
44                let de: &DomException = e.unchecked_ref();
45                return match de.code() {
46                    DomException::SYNTAX_ERR => Err(WsErr::InvalidUrl {
47                        supplied: url.as_ref().to_string(),
48                    }),
49                    code => {
50                        if code == 0 {
51                            Err(WsErr::Other(
52                                e.as_string().unwrap_or_else(|| String::from("None")),
53                            ))
54                        } else {
55                            Err(WsErr::Dom(code))
56                        }
57                    }
58                };
59            }
60        };
61
62        // Create our pharos.
63        let mut pharos = SharedPharos::default();
64        let ph1 = pharos.clone();
65        let ph2 = pharos.clone();
66        let ph3 = pharos.clone();
67        let ph4 = pharos.clone();
68
69        // Setup our event listeners
70        let on_open = Closure::wrap(Box::new(move || {
71            // notify observers
72            //
73            notify(ph1.clone(), WsEvent::Open)
74        }) as Box<dyn FnMut()>);
75
76        // TODO: is there no information at all in an error?
77        //
78        #[allow(trivial_casts)]
79        //
80        let on_error = Closure::wrap(Box::new(move || {
81            // notify observers.
82            //
83            notify(ph2.clone(), WsEvent::Error)
84        }) as Box<dyn FnMut()>);
85
86        #[allow(trivial_casts)]
87        //
88        let on_close = Closure::wrap(Box::new(move |evt: JsCloseEvt| {
89            let c = WsEvent::Closed(CloseEvent {
90                code: evt.code(),
91                reason: evt.reason(),
92                was_clean: evt.was_clean(),
93            });
94
95            notify(ph3.clone(), c)
96        }) as Box<dyn FnMut(JsCloseEvt)>);
97
98        ws.set_onopen(Some(on_open.as_ref().unchecked_ref()));
99        ws.set_onclose(Some(on_close.as_ref().unchecked_ref()));
100        ws.set_onerror(Some(on_error.as_ref().unchecked_ref()));
101
102        // In case of future task cancellation the current task may be interrupted at an await, therefore not reaching
103        // the `WsStream` construction, whose `Drop` glue would have been responsible for unregistering the callbacks.
104        // We therefore use a guard to be responsible for unregistering the callbacks until the `WsStream` is
105        // constructed.
106        //
107        let guard = {
108            struct Guard<'lt> {
109                ws: &'lt WebSocket,
110            }
111
112            impl Drop for Guard<'_> {
113                fn drop(&mut self) {
114                    self.ws.set_onopen(None);
115                    self.ws.set_onclose(None);
116                    self.ws.set_onerror(None);
117
118                    // Check if connection is `OPEN`. Will cause a panic if is not `open`
119                    if let Ok(WsState::Open) = self.ws.ready_state().try_into() {
120                        let _ = self.ws.close();
121                    }
122
123                    println!(
124                        "WsMeta::connect future was dropped while connecting to: {}.",
125                        self.ws.url()
126                    );
127                }
128            }
129
130            Guard { ws: &ws }
131        };
132
133        // Listen to the events to figure out whether the connection opens successfully. We don't want to deal with
134        // the error event. Either a close event happens, in which case we want to recover the CloseEvent to return it
135        // to the user, or an Open event happens in which case we are happy campers.
136        //
137        let mut evts = pharos
138            .observe(Self::OPEN_CLOSE.into())
139            .await
140            .expect("we didn't close pharos");
141
142        // If the connection is closed, return error
143        //
144        if let Some(WsEvent::Closed(evt)) = evts.next().await {
145            return Err(WsErr::ConnectionFailed { event: evt });
146        }
147
148        // We have now passed all the `await` points in this function and so the `WsStream` construction is guaranteed
149        // so we let it take over the responsibility of unregistering the callbacks by disabling our guard.
150        //
151        std::mem::forget(guard);
152
153        // We don't handle Blob's
154        //
155        ws.set_binary_type(BinaryType::Arraybuffer);
156
157        Ok((
158            Self {
159                pharos,
160                ws: ws.clone(),
161            },
162            WsStream::new(
163                ws,
164                ph4,
165                SendWrapper::new(on_open),
166                SendWrapper::new(on_error),
167                SendWrapper::new(on_close),
168            ),
169        ))
170    }
171
172    /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`.
173    /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
174    pub async fn close(&self) -> Result<CloseEvent, WsErr> {
175        match self.ready_state() {
176            WsState::Closed => return Err(WsErr::ConnectionNotOpen),
177            WsState::Closing => {}
178            WsState::Open => {
179                let _ = self.ws.close();
180
181                // Notify Observers
182                notify(self.pharos.clone(), WsEvent::Closing)
183            }
184            WsState::Connecting => {
185                // Notify Observers
186                notify(self.pharos.clone(), WsEvent::Closing)
187            }
188        }
189
190        let mut evts = match self
191            .pharos
192            .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
193            .await
194        {
195            Ok(events) => events,
196            Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
197        };
198
199        // We promised the user a CloseEvent, so we don't have much choice but to unwrap this. In any case, the stream will
200        // never end and this will hang if the browser fails to send a close event.
201        //
202        let ce = evts.next().await.expect_throw("receive a close event");
203
204        if let WsEvent::Closed(e) = ce {
205            Ok(e)
206        } else {
207            unreachable!()
208        }
209    }
210
211    /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`.
212    /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
213    //
214    pub async fn close_code(&self, code: u16) -> Result<CloseEvent, WsErr> {
215        match self.ready_state() {
216            WsState::Closed => return Err(WsErr::ConnectionNotOpen),
217            WsState::Closing => {}
218
219            _ => {
220                match self.ws.close_with_code(code) {
221                    // Notify Observers
222                    Ok(_) => notify(self.pharos.clone(), WsEvent::Closing),
223
224                    Err(_) => {
225                        return Err(WsErr::InvalidCloseCode { supplied: code });
226                    }
227                }
228            }
229        }
230
231        let mut evts = match self
232            .pharos
233            .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
234            .await
235        {
236            Ok(events) => events,
237            Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
238        };
239
240        let ce = evts.next().await.expect_throw("receive a close event");
241
242        if let WsEvent::Closed(e) = ce {
243            Ok(e)
244        } else {
245            unreachable!()
246        }
247    }
248
249    /// Close the socket. The future will resolve once the socket's state has become `WsState::CLOSED`.
250    /// See: [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
251    //
252    pub async fn close_reason(
253        &self,
254        code: u16,
255        reason: impl AsRef<str>,
256    ) -> Result<CloseEvent, WsErr> {
257        match self.ready_state() {
258            WsState::Closed => return Err(WsErr::ConnectionNotOpen),
259            WsState::Closing => {}
260
261            _ => {
262                if reason.as_ref().len() > 123 {
263                    return Err(WsErr::ReasonStringToLong);
264                }
265
266                match self.ws.close_with_code_and_reason(code, reason.as_ref()) {
267                    // Notify Observers
268                    Ok(_) => notify(self.pharos.clone(), WsEvent::Closing),
269
270                    Err(_) => return Err(WsErr::InvalidCloseCode { supplied: code }),
271                }
272            }
273        }
274
275        let mut evts = match self
276            .pharos
277            .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
278            .await
279        {
280            Ok(events) => events,
281            Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
282        };
283
284        let ce = evts.next().await.expect_throw("receive a close event");
285
286        if let WsEvent::Closed(e) = ce {
287            Ok(e)
288        } else {
289            unreachable!()
290        }
291    }
292
293    /// Verify the [WsState] of the connection.
294    //
295    pub fn ready_state(&self) -> WsState {
296        self.ws
297            .ready_state()
298            .try_into()
299            // This can't throw unless the browser gives us an invalid ready state.
300            .expect_throw("Convert ready state from browser API")
301    }
302
303    /// Access the wrapped [web_sys::WebSocket](https://docs.rs/web-sys/0.3.25/web_sys/struct.WebSocket.html) directly.
304    ///
305    /// _ws_stream_wasm_ tries to expose all useful functionality through an idiomatic rust API, so hopefully
306    /// you won't need this, however if I missed something, you can.
307    ///
308    /// ## Caveats
309    /// If you call `set_onopen`, `set_onerror`, `set_onmessage` or `set_onclose` on this, you will overwrite
310    /// the event listeners from `ws_stream_wasm`, and things will break.
311    //
312    pub fn wrapped(&self) -> &WebSocket {
313        &self.ws
314    }
315
316    /// The number of bytes of data that have been queued but not yet transmitted to the network.
317    ///
318    /// **NOTE:** that this is the number of bytes buffered by the underlying platform WebSocket
319    /// implementation. It does not reflect any buffering performed by _ws_stream_wasm_.
320    //
321    pub fn buffered_amount(&self) -> u32 {
322        self.ws.buffered_amount()
323    }
324
325    /// The extensions selected by the server as negotiated during the connection.
326    ///
327    /// **NOTE**: This is an untested feature. The back-end server we use for testing (_tungstenite_)
328    /// does not support Extensions.
329    //
330    pub fn extensions(&self) -> String {
331        self.ws.extensions()
332    }
333
334    /// The name of the sub-protocol the server selected during the connection.
335    ///
336    /// This will be one of the strings specified in the protocols parameter when
337    /// creating this WsMeta instance.
338    //
339    pub fn protocol(&self) -> String {
340        self.ws.protocol()
341    }
342
343    /// Retrieve the address to which this socket is connected.
344    //
345    pub fn url(&self) -> String {
346        self.ws.url()
347    }
348}
349
350impl fmt::Debug for WsMeta {
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        write!(f, "WsMeta for connection: {}", self.url())
353    }
354}
355
356impl Observable<WsEvent> for WsMeta {
357    type Error = PharErr;
358
359    fn observe(&mut self, options: ObserveConfig<WsEvent>) -> Observe<'_, WsEvent, Self::Error> {
360        self.pharos.observe(options)
361    }
362}