ws_stream_wasm 0.6.1

A convenience library for using websockets in WASM
Documentation
use crate::{ import::*, * };


/// A futures 0.3 Sink/Stream of [WsMessage]. Created with [WsMeta::connect](crate::WsMeta::connect).
///
/// ## Closing the connection
///
/// When this is dropped, the connection closes, but you should favor calling one of the close
/// methods on [WsMeta](crate::WsMeta), which allow you to set a proper close code and reason.
///
/// Since this implements [`Sink`], it has to have a close method. This method will call the
/// web api [`WebSocket.close`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close)
/// without parameters. Eg. a default value of `1005` will be assumed for the close code. The
/// situation is the same when dropping without calling close.
///
/// **Warning**: This object holds the callbacks needed to receive events from the browser.
/// If you drop it before the close event was emitted, you will no longer receive events. Thus,
/// observers will never receive a `Close` event. Drop will issue a `Closing` event and this
/// will be the very last event observers receive. The the stream will end if `WsMeta` is also dropped.
///
/// See the [integration tests](https://github.com/najamelan/ws_stream_wasm/blob/master/tests/futures_codec.rs)
/// if you need an example.
///
//
pub struct WsStream
{
	ws: SendWrapper< Rc< WebSocket > >,

	// The queue of received messages
	//
	queue: SendWrapper< Rc<RefCell< VecDeque<WsMessage> >> >,

	// Last waker of task that wants to read incoming messages to be woken up on a new message
	//
	waker: SendWrapper< Rc<RefCell< Option<Waker> >> >,

	// Last waker of task that wants to write to the Sink
	//
	sink_waker: SendWrapper< Rc<RefCell< Option<Waker> >> >,

	// A pointer to the pharos of WsMeta for when we need to listen to events
	//
	pharos: SendWrapper< Rc<RefCell< Pharos<WsEvent> >> >,

	// The callback closures.
	//
	_on_open : SendWrapper< Closure< dyn FnMut()               > >,
	_on_error: SendWrapper< Closure< dyn FnMut()               > >,
	_on_close: SendWrapper< Closure< dyn FnMut( JsCloseEvt   ) > >,
	_on_mesg : SendWrapper< Closure< dyn FnMut( MessageEvent ) > >,

	// This allows us to store a future to poll when Sink::poll_close is called
	//
	closer: Option< Events<WsEvent> >,
}


impl WsStream
{
	/// Create a new WsStream.
	//
	pub(crate) fn new
	(
		ws      : SendWrapper< Rc<WebSocket>                        > ,
		pharos  : SendWrapper< Rc<RefCell< Pharos<WsEvent> >>       > ,
		on_open : SendWrapper< Closure< dyn FnMut()               > > ,
		on_error: SendWrapper< Closure< dyn FnMut()               > > ,
		on_close: SendWrapper< Closure< dyn FnMut( JsCloseEvt   ) > > ,

	) -> Self

	{
		let waker     : SendWrapper< Rc<RefCell<Option<Waker>>> > = SendWrapper::new( Rc::new( RefCell::new( None )) );
		let sink_waker: SendWrapper< Rc<RefCell<Option<Waker>>> > = SendWrapper::new( Rc::new( RefCell::new( None )) );

		let queue = SendWrapper::new( Rc::new( RefCell::new( VecDeque::new() ) ) );
		let q2    = queue.clone();
		let w2    = waker.clone();
		let ph2   = pharos.clone();


		// Send the incoming ws messages to the WsMeta object
		//
		#[ allow( trivial_casts ) ]
		//
		let on_mesg = Closure::wrap( Box::new( move |msg_evt: MessageEvent|
		{
			match WsMessage::try_from( msg_evt )
			{
				Ok (msg) => q2.borrow_mut().push_back( msg ),
				Err(err) => notify( ph2.clone(), WsEvent::WsErr( err ) ),
			}

			if let Some( w ) = w2.borrow_mut().take()
			{
				w.wake()
			}

		}) as Box< dyn FnMut( MessageEvent ) > );


		// Install callback
		//
		ws.set_onmessage  ( Some( on_mesg.as_ref().unchecked_ref() ) );


		// When the connection closes, we need to verify if there are any tasks
		// waiting on poll_next. We need to wake them up.
		//
		let ph    = pharos    .clone();
		let wake  = waker     .clone();
		let swake = sink_waker.clone();

		let wake_on_close = async move
		{
			let mut rx;

			// Scope to avoid borrowing across await point.
			//
			{
				match ph.borrow_mut().observe( Filter::Pointer( WsEvent::is_closed ).into() )
				{
					Ok(events) => rx = events               ,
					Err(e)     => unreachable!( "{:?}", e ) , // only happens if we closed it.
				}
			}

			rx.next().await;

			if let Some(w) = &*wake.borrow()
			{
				w.wake_by_ref();
			}

			if let Some(w) = &*swake.borrow()
			{
				w.wake_by_ref();
			}
		};

		spawn_local( wake_on_close );


		Self
		{
			ws                                      ,
			queue                                   ,
			waker                                   ,
			sink_waker                              ,
			pharos                                  ,
			closer    : None                        ,
			_on_mesg  : SendWrapper::new( on_mesg ) ,
			_on_open  : on_open                     ,
			_on_error : on_error                    ,
			_on_close : on_close                    ,
		}
	}



	/// Verify the [WsState] of the connection.
	//
	pub fn ready_state( &self ) -> WsState
	{
		self.ws.ready_state().try_into()

			// This can't throw unless the browser gives us an invalid ready state
			//
			.expect_throw( "Convert ready state from browser API" )
	}



	/// Access the wrapped [web_sys::WebSocket](https://docs.rs/web-sys/0.3.25/web_sys/struct.WebSocket.html) directly.
	///
	/// _ws_stream_wasm_ tries to expose all useful functionality through an idiomatic rust API, so hopefully
	/// you won't need this, however if I missed something, you can.
	///
	/// ## Caveats
	/// If you call `set_onopen`, `set_onerror`, `set_onmessage` or `set_onclose` on this, you will overwrite
	/// the event listeners from `ws_stream_wasm`, and things will break.
	//
	pub fn wrapped( &self ) -> &WebSocket
	{
		&self.ws
	}


	/// Wrap this object in [`IoStream`]. `IoStream` implements `AsyncRead`/`AsyncWrite`/`AsyncBufRead`.
	/// **Beware**: that this will transparenty include text messages as bytes.
	//
	pub fn into_io( self ) -> IoStream< WsStreamIo, Vec<u8> >
	{
		IoStream::new( WsStreamIo::new( self ) )
	}
}



impl fmt::Debug for WsStream
{
	fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
	{
		write!( f, "WsStream for connection: {}", self.ws.url() )
	}
}



impl Drop for WsStream
{
	// We don't block here, just tell the browser to close the connection and move on.
	//
	fn drop( &mut self )
	{
		match self.ready_state()
		{
			WsState::Closing | WsState::Closed => {}

			_ =>
			{
				// This can't fail. Only exceptions are related to invalid
				// close codes and reason strings to long.
				//
				self.ws.close().expect( "WsStream::drop - close ws socket" );


				// Notify Observers. This event is not emitted by the websocket API.
				//
				notify( self.pharos.clone(), WsEvent::Closing )
			}
		}

		self.ws.set_onmessage( None );
		self.ws.set_onerror  ( None );
		self.ws.set_onopen   ( None );
		self.ws.set_onclose  ( None );
	}
}



impl Stream for WsStream
{
	type Item = WsMessage;

	// Currently requires an unfortunate copy from Js memory to WASM memory. Hopefully one
	// day we will be able to receive the MessageEvt directly in WASM.
	//
	fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option< Self::Item >>
	{
		// Once the queue is empty, check the state of the connection.
		// When it is closing or closed, no more messages will arrive, so
		// return Poll::Ready( None )
		//
		if self.queue.borrow().is_empty()
		{
			*self.waker.borrow_mut() = Some( cx.waker().clone() );

			match self.ready_state()
			{
				WsState::Open | WsState::Connecting => Poll::Pending ,
				_                                   => None.into()   ,
			}
		}

		// As long as there is things in the queue, just keep reading
		//
		else { self.queue.borrow_mut().pop_front().into() }
	}
}



impl Sink<WsMessage> for WsStream
{
	type Error = WsErr;


	// Web API does not really seem to let us check for readiness, other than the connection state.
	//
	fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
	{
		match self.ready_state()
		{
			WsState::Connecting =>
			{
				*self.sink_waker.borrow_mut() = Some( cx.waker().clone() );

				Poll::Pending
			}

			WsState::Open => Ok(()).into(),
			_             => Err( WsErr::ConnectionNotOpen ).into(),
		}
	}


	fn start_send( self: Pin<&mut Self>, item: WsMessage ) -> Result<(), Self::Error>
	{
		match self.ready_state()
		{
			WsState::Open =>
			{
				// The send method can return 2 errors:
				// - unpaired surrogates in UTF (we shouldn't get those in rust strings)
				// - connection is already closed.
				//
				// So if this returns an error, we will return ConnectionNotOpen. In principle
				// we just checked that it's open, but this guarantees correctness.
				//
				match item
				{
					WsMessage::Binary( d ) => self.ws.send_with_u8_array( &d ).map_err( |_| WsErr::ConnectionNotOpen )? ,
					WsMessage::Text  ( s ) => self.ws.send_with_str     ( &s ).map_err( |_| WsErr::ConnectionNotOpen )? ,
				}

				Ok(())
			},


			// Connecting, Closing or Closed
			//
			_ => Err( WsErr::ConnectionNotOpen ),
		}
	}



	fn poll_flush( self: Pin<&mut Self>, _: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
	{
		Ok(()).into()
	}



	// TODO: find a simpler implementation, notably this needs to spawn a future.
	//       this can be done by creating a custom future. If we are going to implement
	//       events with pharos, that's probably a good time to re-evaluate this.
	//
	fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>
	{
		let state = self.ready_state();


		// First close the inner connection
		//
		if state == WsState::Connecting
		|| state == WsState::Open
		{
			// Can't fail
			//
			self.ws.close().unwrap_throw();

			notify( self.pharos.clone(), WsEvent::Closing );
		}


		// Check whether it's closed
		//
		match state
		{
			WsState::Closed => Ok(()).into(),

			_ =>
			{
				// Create a future that will resolve with the close event, so we can
				// poll it.
				//
				if self.closer.is_none()
				{
					let rx = match self.pharos.borrow_mut().observe( Filter::Pointer( WsEvent::is_closed ).into() )
					{
						Ok(events) => events                    ,
						Err(e)     => unreachable!( "{:?}", e ) , // only happens if we closed it.
					};

					self.closer = Some( rx );
				}


				let _ = ready!( Pin::new( &mut self.closer.as_mut().unwrap() ).poll_next(cx) );

				Ok(()).into()
			}
		}
	}
}