mod sender;
mod receiver;
use futures_util::StreamExt;
pub use sender::*;
pub use receiver::*;
use crate::{prelude::*, traits::WebSocketTrait};
use crate::websocket::Result;
use tokio_tungstenite::connect_async;
pub type BackendError = tokio_tungstenite::tungstenite::Error;
use std::future::Future;
use std::sync::{Arc, Mutex};
use crate::message::Message;
use std::convert::TryFrom;
impl TryFrom<tokio_tungstenite::tungstenite::Message> for Message {
type Error = crate::error::Error;
fn try_from(message: tokio_tungstenite::tungstenite::Message) -> std::result::Result<Self, Self::Error> {
match message {
tokio_tungstenite::tungstenite::Message::Binary(binary) => {
Ok(Message::Binary(binary.into()))
},
tokio_tungstenite::tungstenite::Message::Text(text) => {
Ok(Message::Text(text.as_str().into()))
},
tokio_tungstenite::tungstenite::Message::Close(close) => {
let close = close.map(|close| (close.code.into(), close.reason.to_string()));
Ok(Message::Close(close))
},
other => {
Err(crate::error::Error::UnsupportedMessageType(format!("Unsupported message type: {:#?}", other)))
}
}
}
}
#[derive(Derivative)]
#[derivative(Debug)]
pub struct WebSocket;
impl WebSocketTrait for WebSocket {
type Sender = WebSocketSender;
type Receiver = WebSocketReceiver;
fn new(url: &str) -> impl Future<Output = Result<(WebSocketSender, WebSocketReceiver)>> {
async move {
connect_async(url)
.await
.map(|(socket, _response)| {
let (sender, receiver) = socket.split();
let sender = Arc::new(Mutex::new(sender));
let sender_clone = sender.clone();
(WebSocketSender::from(sender), WebSocketReceiver::from((receiver, sender_clone)))
})
.map_err(|error| crate::error::Error::ConnectionError(error))
}
}
}