use crate::prelude::*;
use crate::{Message, Result, traits::WebSocketSenderTrait};
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::{WebSocketStream, MaybeTlsStream};
use tokio::net::TcpStream;
use futures_util::stream::SplitSink;
use std::future::Future;
use std::sync::{Arc, Mutex};
use futures_util::SinkExt;
#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct WebSocketSender {
#[derivative(Debug="ignore")]
pub sender: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Message>>>
}
impl From<Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Message>>>> for WebSocketSender {
fn from(sender: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Message>>>) -> Self {
WebSocketSender { sender }
}
}
impl WebSocketSenderTrait for WebSocketSender {
fn send(&mut self, message: Message) -> impl Future<Output = Result<()>> {
async move {
let mut sender = self.sender.lock()
.map_err(|err| crate::error::Error::LockError(format!("Failed to lock sender: {:?}", err)))?;
match message {
Message::Text(text) => {
sender.send(text.into()).await.map_err(|error| crate::error::Error::SendError(error))
},
Message::Binary(binary) => {
sender.send(binary.into()).await.map_err(|error| crate::error::Error::SendError(error))
},
Message::Close(close) => {
let close_frame = close.map(|(code, reason)| CloseFrame {
code: CloseCode::from(code),
reason: reason.into(),
});
sender.send(tokio_tungstenite::tungstenite::Message::Close(close_frame.into())).await.map_err(|error| crate::error::Error::SendError(error))
}
}
}
}
fn close(&mut self, message: Option<(u16, String)>) -> impl Future<Output = Result<()>> {
async move {
self.send(Message::Close(message)).await
}
}
}