use std::marker::PhantomData;
use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use tokio_tungstenite::tungstenite;
use crate::codec::WsMessage;
use crate::connection::{BoxFuture, ErasedSink, ErasedStream, WsConnection};
use crate::WsEndpoint;
pub type Connection<E> = WsConnection<<E as WsEndpoint>::ClientMsg, <E as WsEndpoint>::ServerMsg>;
#[derive(Debug)]
pub struct ConnectError(tungstenite::Error);
impl std::fmt::Display for ConnectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WebSocket connect error: {}", self.0)
}
}
impl std::error::Error for ConnectError {}
pub async fn connect<E: WsEndpoint>(base_url: &str) -> Result<Connection<E>, ConnectError> {
let url = format!("{}{}", base_url.trim_end_matches('/'), E::PATH);
connect_to_url::<E>(&url).await
}
pub async fn connect_to_url<E: WsEndpoint>(url: &str) -> Result<Connection<E>, ConnectError> {
let (ws_stream, _response) = tokio_tungstenite::connect_async(url)
.await
.map_err(ConnectError)?;
let (sink, stream) = ws_stream.split();
Ok(WsConnection {
sink: Box::new(TungsteniteSink(sink)),
stream: Box::new(TungsteniteStream(stream)),
_types: PhantomData,
})
}
type WsStream =
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
struct TungsteniteSink(futures_util::stream::SplitSink<WsStream, tungstenite::Message>);
impl ErasedSink for TungsteniteSink {
fn send(&mut self, msg: WsMessage) -> BoxFuture<'_, Result<(), ()>> {
Box::pin(async move {
let tung_msg = match msg {
WsMessage::Text(t) => tungstenite::Message::Text(t.into()),
WsMessage::Binary(b) => tungstenite::Message::Binary(b.into()),
};
self.0.send(tung_msg).await.map_err(|_| ())
})
}
fn close(&mut self) -> BoxFuture<'_, Result<(), ()>> {
Box::pin(async move {
self.0
.send(tungstenite::Message::Close(None))
.await
.map_err(|_| ())
})
}
}
struct TungsteniteStream(futures_util::stream::SplitStream<WsStream>);
impl ErasedStream for TungsteniteStream {
fn next(&mut self) -> BoxFuture<'_, Option<Result<WsMessage, ()>>> {
Box::pin(async move {
loop {
match self.0.next().await {
None => return None,
Some(Err(_)) => return Some(Err(())),
Some(Ok(msg)) => match msg {
tungstenite::Message::Text(t) => {
return Some(Ok(WsMessage::Text(t.to_string())));
}
tungstenite::Message::Binary(b) => {
return Some(Ok(WsMessage::Binary(b.to_vec())));
}
tungstenite::Message::Close(_) => return None,
tungstenite::Message::Ping(_)
| tungstenite::Message::Pong(_)
| tungstenite::Message::Frame(_) => continue,
},
}
}
})
}
}