wasm_ws/stream.rs
1// Copyright (c) 2019-2022 Naja Melan
2// Copyright (c) 2023-2024 Yuki Kishimoto
3// Distributed under the MIT software license
4
5use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::convert::{TryFrom, TryInto};
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::rc::Rc;
12use std::task::{Context, Poll, Waker};
13
14use async_io_stream::IoStream;
15use futures::prelude::{Sink, Stream};
16use futures::{ready, FutureExt, StreamExt};
17use pharos::{Filter, Observable, SharedPharos};
18use send_wrapper::SendWrapper;
19use wasm_bindgen::closure::Closure;
20use wasm_bindgen::JsCast;
21use wasm_bindgen_futures::spawn_local;
22use web_sys::{CloseEvent as JsCloseEvt, WebSocket, *};
23
24use crate::{notify, WsErr, WsEvent, WsMessage, WsState, WsStreamIo};
25
26/// A futures 0.3 Sink/Stream of [WsMessage]. Created with [WsMeta::connect](crate::WsMeta::connect).
27///
28/// ## Closing the connection
29///
30/// When this is dropped, the connection closes, but you should favor calling one of the close
31/// methods on [WsMeta](crate::WsMeta), which allow you to set a proper close code and reason.
32///
33/// Since this implements [`Sink`], it has to have a close method. This method will call the
34/// web api [`WebSocket.close`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
35/// without parameters. Eg. a default value of `1005` will be assumed for the close code. The
36/// situation is the same when dropping without calling close.
37///
38/// **Warning**: This object holds the callbacks needed to receive events from the browser.
39/// If you drop it before the close event was emitted, you will no longer receive events. Thus,
40/// observers will never receive a `Close` event. Drop will issue a `Closing` event and this
41/// will be the very last event observers receive. The the stream will end if `WsMeta` is also dropped.
42///
43/// See the [integration tests](https://github.com/najamelan/ws_stream_wasm/blob/release/tests/futures_codec.rs)
44/// if you need an example.
45pub struct WsStream {
46 ws: SendWrapper<Rc<WebSocket>>,
47
48 // The queue of received messages
49 queue: SendWrapper<Rc<RefCell<VecDeque<WsMessage>>>>,
50
51 // Last waker of task that wants to read incoming messages to be woken up on a new message
52 waker: SendWrapper<Rc<RefCell<Option<Waker>>>>,
53
54 // Last waker of task that wants to write to the Sink
55 sink_waker: SendWrapper<Rc<RefCell<Option<Waker>>>>,
56
57 // A pointer to the pharos of WsMeta for when we need to listen to events
58 pharos: SharedPharos<WsEvent>,
59
60 // The callback closures.
61 _on_open: SendWrapper<Closure<dyn FnMut()>>,
62 _on_error: SendWrapper<Closure<dyn FnMut()>>,
63 _on_close: SendWrapper<Closure<dyn FnMut(JsCloseEvt)>>,
64 _on_mesg: SendWrapper<Closure<dyn FnMut(MessageEvent)>>,
65
66 // This allows us to store a future to poll when Sink::poll_close is called
67 closer: Option<SendWrapper<Pin<Box<dyn Future<Output = ()> + Send>>>>,
68}
69
70impl WsStream {
71 /// Create a new WsStream.
72 //
73 pub(crate) fn new(
74 ws: SendWrapper<Rc<WebSocket>>,
75 pharos: SharedPharos<WsEvent>,
76 on_open: SendWrapper<Closure<dyn FnMut()>>,
77 on_error: SendWrapper<Closure<dyn FnMut()>>,
78 on_close: SendWrapper<Closure<dyn FnMut(JsCloseEvt)>>,
79 ) -> Self {
80 let waker: SendWrapper<Rc<RefCell<Option<Waker>>>> =
81 SendWrapper::new(Rc::new(RefCell::new(None)));
82 let sink_waker: SendWrapper<Rc<RefCell<Option<Waker>>>> =
83 SendWrapper::new(Rc::new(RefCell::new(None)));
84
85 let queue = SendWrapper::new(Rc::new(RefCell::new(VecDeque::new())));
86 let q2 = queue.clone();
87 let w2 = waker.clone();
88 let ph2 = pharos.clone();
89
90 // Send the incoming ws messages to the WsMeta object
91 //
92 #[allow(trivial_casts)]
93 //
94 let on_mesg = Closure::wrap(Box::new(move |msg_evt: MessageEvent| {
95 match WsMessage::try_from(msg_evt) {
96 Ok(msg) => q2.borrow_mut().push_back(msg),
97 Err(err) => notify(ph2.clone(), WsEvent::WsErr(err)),
98 }
99
100 if let Some(w) = w2.borrow_mut().take() {
101 w.wake()
102 }
103 }) as Box<dyn FnMut(MessageEvent)>);
104
105 // Install callback
106 //
107 ws.set_onmessage(Some(on_mesg.as_ref().unchecked_ref()));
108
109 // When the connection closes, we need to verify if there are any tasks
110 // waiting on poll_next. We need to wake them up.
111 //
112 let ph = pharos.clone();
113 let wake = waker.clone();
114 let swake = sink_waker.clone();
115
116 let wake_on_close = async move {
117 let mut rx;
118
119 // Scope to avoid borrowing across await point.
120 //
121 {
122 match ph
123 .observe_shared(Filter::Pointer(WsEvent::is_closed).into())
124 .await
125 {
126 Ok(events) => rx = events,
127 Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
128 }
129 }
130
131 rx.next().await;
132
133 if let Some(w) = &*wake.borrow() {
134 w.wake_by_ref();
135 }
136
137 if let Some(w) = &*swake.borrow() {
138 w.wake_by_ref();
139 }
140 };
141
142 spawn_local(wake_on_close);
143
144 Self {
145 ws,
146 queue,
147 waker,
148 sink_waker,
149 pharos,
150 closer: None,
151 _on_mesg: SendWrapper::new(on_mesg),
152 _on_open: on_open,
153 _on_error: on_error,
154 _on_close: on_close,
155 }
156 }
157
158 /// Verify the [WsState] of the connection.
159 pub fn ready_state(&self) -> Result<WsState, WsErr> {
160 self.ws.ready_state().try_into()
161 }
162
163 /// Access the wrapped [web_sys::WebSocket](https://docs.rs/web-sys/0.3.25/web_sys/struct.WebSocket.html) directly.
164 ///
165 /// _ws_stream_wasm_ tries to expose all useful functionality through an idiomatic rust API, so hopefully
166 /// you won't need this, however if I missed something, you can.
167 ///
168 /// ## Caveats
169 /// If you call `set_onopen`, `set_onerror`, `set_onmessage` or `set_onclose` on this, you will overwrite
170 /// the event listeners from `ws_stream_wasm`, and things will break.
171 //
172 pub fn wrapped(&self) -> &WebSocket {
173 &self.ws
174 }
175
176 /// Wrap this object in [`IoStream`]. `IoStream` implements `AsyncRead`/`AsyncWrite`/`AsyncBufRead`.
177 /// **Beware**: that this will transparenty include text messages as bytes.
178 //
179 pub fn into_io(self) -> IoStream<WsStreamIo, Vec<u8>> {
180 IoStream::new(WsStreamIo::new(self))
181 }
182}
183
184impl fmt::Debug for WsStream {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 write!(f, "WsStream for connection: {}", self.ws.url())
187 }
188}
189
190impl Drop for WsStream {
191 // We don't block here, just tell the browser to close the connection and move on.
192 //
193 fn drop(&mut self) {
194 match self.ready_state() {
195 Ok(WsState::Closing) | Ok(WsState::Closed) => {}
196 Ok(WsState::Open) => {
197 // This can't fail. Only exceptions are related to invalid
198 // close codes and reason strings to long.
199 let _ = self.ws.close();
200
201 // Notify Observers. This event is not emitted by the websocket API.
202 notify(self.pharos.clone(), WsEvent::Closing)
203 }
204 Ok(WsState::Connecting) => {
205 // Notify Observers. This event is not emitted by the websocket API.
206 notify(self.pharos.clone(), WsEvent::Closing)
207 }
208 Err(_) => {}
209 }
210
211 self.ws.set_onmessage(None);
212 self.ws.set_onerror(None);
213 self.ws.set_onopen(None);
214 self.ws.set_onclose(None);
215 }
216}
217
218impl Stream for WsStream {
219 type Item = Result<WsMessage, WsErr>;
220
221 // Using `Result<T, E>` to keep same format of `tungstenite` code
222
223 // Currently requires an unfortunate copy from Js memory to WASM memory. Hopefully one
224 // day we will be able to receive the MessageEvt directly in WASM.
225 //
226 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
227 // Once the queue is empty, check the state of the connection.
228 // When it is closing or closed, no more messages will arrive, so
229 // return Poll::Ready( None )
230 //
231 if self.queue.borrow().is_empty() {
232 *self.waker.borrow_mut() = Some(cx.waker().clone());
233
234 match self.ready_state() {
235 Ok(WsState::Open) | Ok(WsState::Connecting) => Poll::Pending,
236 _ => None.into(),
237 }
238 } else {
239 // As long as there is things in the queue, just keep reading
240 self.queue.borrow_mut().pop_front().map(Ok).into()
241 }
242 }
243}
244
245impl Sink<WsMessage> for WsStream {
246 type Error = WsErr;
247
248 // Web API does not really seem to let us check for readiness, other than the connection state.
249 //
250 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
251 match self.ready_state()? {
252 WsState::Connecting => {
253 *self.sink_waker.borrow_mut() = Some(cx.waker().clone());
254
255 Poll::Pending
256 }
257 WsState::Open => Ok(()).into(),
258 _ => Err(WsErr::ConnectionNotOpen).into(),
259 }
260 }
261
262 fn start_send(self: Pin<&mut Self>, item: WsMessage) -> Result<(), Self::Error> {
263 match self.ready_state()? {
264 WsState::Open => {
265 // The send method can return 2 errors:
266 // - unpaired surrogates in UTF (we shouldn't get those in rust strings)
267 // - connection is already closed.
268 //
269 // So if this returns an error, we will return ConnectionNotOpen. In principle
270 // we just checked that it's open, but this guarantees correctness.
271 //
272 match item {
273 WsMessage::Binary(d) => self
274 .ws
275 .send_with_u8_array(&d)
276 .map_err(|_| WsErr::ConnectionNotOpen)?,
277 WsMessage::Text(s) => self
278 .ws
279 .send_with_str(&s)
280 .map_err(|_| WsErr::ConnectionNotOpen)?,
281 }
282
283 Ok(())
284 }
285
286 // Connecting, Closing or Closed
287 _ => Err(WsErr::ConnectionNotOpen),
288 }
289 }
290
291 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
292 Ok(()).into()
293 }
294
295 // TODO: find a simpler implementation, notably this needs to spawn a future.
296 // this can be done by creating a custom future. If we are going to implement
297 // events with pharos, that's probably a good time to re-evaluate this.
298 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
299 let state = self.ready_state()?;
300
301 // First close the inner connection
302 if state == WsState::Open {
303 let _ = self.ws.close();
304 notify(self.pharos.clone(), WsEvent::Closing);
305 }
306
307 // Check whether it's closed
308 match state {
309 WsState::Closed => Ok(()).into(),
310 _ => {
311 // Create a future that will resolve with the close event, so we can poll it.
312 if self.closer.is_none() {
313 let mut ph = self.pharos.clone();
314
315 let closer = async move {
316 let mut rx =
317 match ph.observe(Filter::Pointer(WsEvent::is_closed).into()).await {
318 Ok(events) => events,
319 Err(e) => unreachable!("{:?}", e), // only happens if we closed it.
320 };
321
322 rx.next().await;
323 };
324
325 self.closer = Some(SendWrapper::new(closer.boxed()));
326 }
327
328 if let Some(c) = self.closer.as_mut() {
329 ready!(c.as_mut().poll(cx));
330 }
331
332 Ok(()).into()
333 }
334 }
335 }
336}