use std::cell::RefCell;
use std::collections::VecDeque;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
use async_io_stream::IoStream;
use futures::prelude::{Sink, Stream};
use futures::{ready, FutureExt, StreamExt};
use pharos::{Filter, Observable, SharedPharos};
use send_wrapper::SendWrapper;
use wasm_bindgen::closure::Closure;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::spawn_local;
use web_sys::{CloseEvent as JsCloseEvt, WebSocket, *};
use crate::{notify, WsErr, WsEvent, WsMessage, WsState, WsStreamIo};
pub struct WsStream {
ws: SendWrapper<Rc<WebSocket>>,
queue: SendWrapper<Rc<RefCell<VecDeque<WsMessage>>>>,
waker: SendWrapper<Rc<RefCell<Option<Waker>>>>,
sink_waker: SendWrapper<Rc<RefCell<Option<Waker>>>>,
pharos: SharedPharos<WsEvent>,
_on_open: SendWrapper<Closure<dyn FnMut()>>,
_on_error: SendWrapper<Closure<dyn FnMut()>>,
_on_close: SendWrapper<Closure<dyn FnMut(JsCloseEvt)>>,
_on_mesg: SendWrapper<Closure<dyn FnMut(MessageEvent)>>,
closer: Option<SendWrapper<Pin<Box<dyn Future<Output = ()> + Send>>>>,
}
impl WsStream {
pub(crate) fn new(
ws: SendWrapper<Rc<WebSocket>>,
pharos: SharedPharos<WsEvent>,
on_open: SendWrapper<Closure<dyn FnMut()>>,
on_error: SendWrapper<Closure<dyn FnMut()>>,
on_close: SendWrapper<Closure<dyn FnMut(JsCloseEvt)>>,
) -> Self {
let waker: SendWrapper<Rc<RefCell<Option<Waker>>>> =
SendWrapper::new(Rc::new(RefCell::new(None)));
let sink_waker: SendWrapper<Rc<RefCell<Option<Waker>>>> =
SendWrapper::new(Rc::new(RefCell::new(None)));
let queue = SendWrapper::new(Rc::new(RefCell::new(VecDeque::new())));
let q2 = queue.clone();
let w2 = waker.clone();
let ph2 = pharos.clone();
#[allow(trivial_casts)]
let on_mesg = Closure::wrap(Box::new(move |msg_evt: MessageEvent| {
match WsMessage::try_from(msg_evt) {
Ok(msg) => q2.borrow_mut().push_back(msg),
Err(err) => notify(ph2.clone(), WsEvent::WsErr(err)),
}
if let Some(w) = w2.borrow_mut().take() {
w.wake()
}
}) as Box<dyn FnMut(MessageEvent)>);
ws.set_onmessage(Some(on_mesg.as_ref().unchecked_ref()));
let ph = pharos.clone();
let wake = waker.clone();
let swake = sink_waker.clone();
let wake_on_close = async move {
let mut rx;
{
match ph
.observe_shared(Filter::Pointer(WsEvent::is_closed).into())
.await
{
Ok(events) => rx = events,
Err(e) => unreachable!("{:?}", e), }
}
rx.next().await;
if let Some(w) = &*wake.borrow() {
w.wake_by_ref();
}
if let Some(w) = &*swake.borrow() {
w.wake_by_ref();
}
};
spawn_local(wake_on_close);
Self {
ws,
queue,
waker,
sink_waker,
pharos,
closer: None,
_on_mesg: SendWrapper::new(on_mesg),
_on_open: on_open,
_on_error: on_error,
_on_close: on_close,
}
}
pub fn ready_state(&self) -> Result<WsState, WsErr> {
self.ws.ready_state().try_into()
}
pub fn wrapped(&self) -> &WebSocket {
&self.ws
}
pub fn into_io(self) -> IoStream<WsStreamIo, Vec<u8>> {
IoStream::new(WsStreamIo::new(self))
}
}
impl fmt::Debug for WsStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WsStream for connection: {}", self.ws.url())
}
}
impl Drop for WsStream {
fn drop(&mut self) {
match self.ready_state() {
Ok(WsState::Closing) | Ok(WsState::Closed) => {}
Ok(WsState::Open) => {
let _ = self.ws.close();
notify(self.pharos.clone(), WsEvent::Closing)
}
Ok(WsState::Connecting) => {
notify(self.pharos.clone(), WsEvent::Closing)
}
Err(_) => {}
}
self.ws.set_onmessage(None);
self.ws.set_onerror(None);
self.ws.set_onopen(None);
self.ws.set_onclose(None);
}
}
impl Stream for WsStream {
type Item = WsMessage;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.queue.borrow().is_empty() {
*self.waker.borrow_mut() = Some(cx.waker().clone());
match self.ready_state() {
Ok(WsState::Open) | Ok(WsState::Connecting) => Poll::Pending,
_ => None.into(),
}
}
else {
self.queue.borrow_mut().pop_front().into()
}
}
}
impl Sink<WsMessage> for WsStream {
type Error = WsErr;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.ready_state()? {
WsState::Connecting => {
*self.sink_waker.borrow_mut() = Some(cx.waker().clone());
Poll::Pending
}
WsState::Open => Ok(()).into(),
_ => Err(WsErr::ConnectionNotOpen).into(),
}
}
fn start_send(self: Pin<&mut Self>, item: WsMessage) -> Result<(), Self::Error> {
match self.ready_state()? {
WsState::Open => {
match item {
WsMessage::Binary(d) => self
.ws
.send_with_u8_array(&d)
.map_err(|_| WsErr::ConnectionNotOpen)?,
WsMessage::Text(s) => self
.ws
.send_with_str(&s)
.map_err(|_| WsErr::ConnectionNotOpen)?,
}
Ok(())
}
_ => Err(WsErr::ConnectionNotOpen),
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let state = self.ready_state()?;
if state == WsState::Open {
let _ = self.ws.close();
notify(self.pharos.clone(), WsEvent::Closing);
}
match state {
WsState::Closed => Ok(()).into(),
_ => {
if self.closer.is_none() {
let mut ph = self.pharos.clone();
let closer = async move {
let mut rx =
match ph.observe(Filter::Pointer(WsEvent::is_closed).into()).await {
Ok(events) => events,
Err(e) => unreachable!("{:?}", e), };
rx.next().await;
};
self.closer = Some(SendWrapper::new(closer.boxed()));
}
if let Some(c) = self.closer.as_mut() {
ready!(c.as_mut().poll(cx));
}
Ok(()).into()
}
}
}
}