use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::{Error, Message};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::error;
use crate::transport::error::TransportError;
#[async_trait]
pub(crate) trait CableDataChannel: Send {
async fn send(&mut self, message: &[u8]) -> Result<(), TransportError>;
async fn recv(&mut self) -> Result<Option<Vec<u8>>, TransportError>;
}
pub(crate) struct WebSocketDataChannel {
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl WebSocketDataChannel {
pub(crate) fn new(stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
Self { stream }
}
}
#[async_trait]
impl CableDataChannel for WebSocketDataChannel {
async fn send(&mut self, message: &[u8]) -> Result<(), TransportError> {
self.stream
.send(Message::Binary(message.to_vec().into()))
.await
.map_err(|e| {
error!(?e, "Failed to send WebSocket message");
match e {
Error::Io(io) => TransportError::IoError(io.kind()),
_ => TransportError::ConnectionFailed,
}
})
}
async fn recv(&mut self) -> Result<Option<Vec<u8>>, TransportError> {
loop {
match self.stream.next().await {
Some(Ok(Message::Binary(data))) => return Ok(Some(data.into())),
Some(Ok(Message::Ping(_) | Message::Pong(_))) => continue,
Some(Ok(Message::Close(_))) | None | Some(Err(Error::ConnectionClosed)) => {
return Ok(None)
}
Some(Ok(other)) => {
error!(?other, "Unexpected WebSocket message type");
return Err(TransportError::ConnectionFailed);
}
Some(Err(Error::Io(e))) => {
error!(?e, "Failed to read WebSocket message");
return Err(TransportError::IoError(e.kind()));
}
Some(Err(e)) => {
error!(?e, "Failed to read WebSocket message");
return Err(TransportError::ConnectionFailed);
}
}
}
}
}