use crate::client::{ClientConfig, ClientConnector};
use crate::socket::{CloseFrame, Message, RawMessage};
use std::pin::Pin;
use std::task::{Context, Poll};
use tungstenite::protocol::frame::coding::CloseCode as TungsteniteCloseCode;
fn utf8bytes_to_tungstenite(bytes: tokio_tungstenite_wasm::Utf8Bytes) -> tungstenite::Utf8Bytes {
bytes::Bytes::from(bytes)
.try_into()
.expect("bytes are already validated as utf8")
}
fn utf8bytes_to_ttw(bytes: tungstenite::Utf8Bytes) -> tokio_tungstenite_wasm::Utf8Bytes {
bytes::Bytes::from(bytes)
.try_into()
.expect("bytes are already validated as utf8")
}
impl From<tokio_tungstenite_wasm::CloseFrame> for CloseFrame {
fn from(frame: tokio_tungstenite_wasm::CloseFrame) -> Self {
Self {
code: Into::<TungsteniteCloseCode>::into(Into::<u16>::into(frame.code)).into(),
reason: utf8bytes_to_tungstenite(frame.reason),
}
}
}
impl From<CloseFrame> for tokio_tungstenite_wasm::CloseFrame {
fn from(frame: CloseFrame) -> Self {
Self {
code: Into::<u16>::into(Into::<TungsteniteCloseCode>::into(frame.code)).into(),
reason: utf8bytes_to_ttw(frame.reason),
}
}
}
impl From<RawMessage> for tokio_tungstenite_wasm::Message {
fn from(message: RawMessage) -> Self {
match message {
RawMessage::Text(text) => Self::Text(utf8bytes_to_ttw(text)),
RawMessage::Binary(bytes) => Self::Binary(bytes),
RawMessage::Ping(_) => Self::Close(Some(tokio_tungstenite_wasm::CloseFrame {
code: tokio_tungstenite_wasm::CloseCode::Abnormal,
reason: "raw pings not supported".into(),
})),
RawMessage::Pong(_) => Self::Close(Some(tokio_tungstenite_wasm::CloseFrame {
code: tokio_tungstenite_wasm::CloseCode::Abnormal,
reason: "raw pongs not supported".into(),
})),
RawMessage::Close(frame) => Self::Close(frame.map(CloseFrame::into)),
}
}
}
impl From<tokio_tungstenite_wasm::Message> for RawMessage {
fn from(message: tokio_tungstenite_wasm::Message) -> Self {
match message {
tokio_tungstenite_wasm::Message::Text(text) => {
Self::Text(utf8bytes_to_tungstenite(text))
}
tokio_tungstenite_wasm::Message::Binary(bytes) => Self::Binary(bytes),
tokio_tungstenite_wasm::Message::Close(frame) => {
Self::Close(frame.map(CloseFrame::from))
}
}
}
}
impl From<Message> for tokio_tungstenite_wasm::Message {
fn from(message: Message) -> Self {
match message {
Message::Text(text) => tokio_tungstenite_wasm::Message::Text(utf8bytes_to_ttw(text)),
Message::Binary(bytes) => tokio_tungstenite_wasm::Message::Binary(bytes),
Message::Close(frame) => {
tokio_tungstenite_wasm::Message::Close(frame.map(CloseFrame::into))
}
}
}
}
#[derive(Clone)]
pub struct ClientConnectorWasm {
handle: enfync::builtin::wasm::WASMHandle,
}
impl Default for ClientConnectorWasm {
fn default() -> Self {
let handle = enfync::builtin::wasm::WASMHandle::default();
Self { handle }
}
}
#[async_trait::async_trait]
impl ClientConnector for ClientConnectorWasm {
type Handle = enfync::builtin::wasm::WASMHandle;
type Message = tokio_tungstenite_wasm::Message;
type WSError = tokio_tungstenite_wasm::Error;
type Socket = WebSocketStreamProxy;
fn handle(&self) -> Self::Handle {
self.handle.clone()
}
async fn connect(&self, config: &ClientConfig) -> Result<Self::Socket, Self::WSError> {
if config.headers().len() > 0 {
panic!("client may not submit HTTP headers in WASM connection requests");
}
let request_url = config.connect_url();
let socket = wasm_client_connect(String::from(request_url)).await?;
Ok(socket)
}
}
pub struct WebSocketStreamProxy {
inner: fragile::Fragile<tokio_tungstenite_wasm::WebSocketStream>,
}
impl futures_util::Stream for WebSocketStreamProxy {
type Item = tokio_tungstenite_wasm::Result<tokio_tungstenite_wasm::Message>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = self.inner.get_mut();
futures::pin_mut!(inner);
inner.poll_next(cx)
}
}
impl futures_util::Sink<tokio_tungstenite_wasm::Message> for WebSocketStreamProxy {
type Error = tokio_tungstenite_wasm::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let inner = self.inner.get_mut();
futures::pin_mut!(inner);
inner.poll_ready(cx)
}
fn start_send(
mut self: Pin<&mut Self>,
item: tokio_tungstenite_wasm::Message,
) -> Result<(), Self::Error> {
let inner = self.inner.get_mut();
futures::pin_mut!(inner);
inner.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let inner = self.inner.get_mut();
futures::pin_mut!(inner);
inner.poll_close(cx)
}
}
async fn wasm_client_connect(
request_url: String,
) -> Result<WebSocketStreamProxy, tokio_tungstenite_wasm::Error> {
let (result_sender, result_receiver) = async_channel::bounded(1usize);
wasm_bindgen_futures::spawn_local(async move {
let result = tokio_tungstenite_wasm::connect(request_url.as_str()).await;
result_sender
.send_blocking(result.map(|websocket| fragile::Fragile::new(websocket)))
.unwrap();
});
let websocket = result_receiver
.recv()
.await
.unwrap_or(Err(tokio_tungstenite_wasm::Error::ConnectionClosed))?;
Ok(WebSocketStreamProxy { inner: websocket })
}