use futures::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::Message;
use super::WsError;
pub struct WsSender {
inner: Box<
dyn futures::Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin + Send,
>,
}
impl std::fmt::Debug for WsSender {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WsSender").finish_non_exhaustive()
}
}
pub struct WsReceiver {
inner: Box<
dyn futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
+ Unpin
+ Send,
>,
}
impl std::fmt::Debug for WsReceiver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WsReceiver").finish_non_exhaustive()
}
}
impl WsSender {
pub async fn send(&mut self, text: String) -> Result<(), WsError> {
self.inner
.send(Message::Text(text.into()))
.await
.map_err(|e| WsError::Send(e.to_string()))
}
pub async fn close(&mut self) -> Result<(), WsError> {
self.inner
.send(Message::Close(None))
.await
.map_err(|e| WsError::Send(e.to_string()))
}
}
impl WsReceiver {
pub async fn recv(&mut self) -> Option<Result<String, WsError>> {
loop {
match self.inner.next().await {
Some(Ok(Message::Text(text))) => return Some(Ok(text.to_string())),
Some(Ok(Message::Close(_))) | None => return None,
Some(Ok(_)) => continue, Some(Err(e)) => return Some(Err(WsError::Receive(e.to_string()))),
}
}
}
}
pub async fn connect(
url: &str,
headers: &[(&str, &str)],
) -> Result<(WsSender, WsReceiver), WsError> {
let mut request = url
.into_client_request()
.map_err(|e| WsError::Connection(e.to_string()))?;
for &(name, value) in headers {
if let (Ok(header_name), Ok(header_value)) = (
name.parse::<tokio_tungstenite::tungstenite::http::header::HeaderName>(),
value.parse::<tokio_tungstenite::tungstenite::http::header::HeaderValue>(),
) {
request.headers_mut().insert(header_name, header_value);
}
}
let (ws_stream, _) = tokio_tungstenite::connect_async(request)
.await
.map_err(|e| WsError::Connection(e.to_string()))?;
let (sink, stream) = ws_stream.split();
Ok((
WsSender {
inner: Box::new(sink),
},
WsReceiver {
inner: Box::new(stream),
},
))
}