use crate::Service;
use async_lock::Mutex;
use std::{net::SocketAddr, sync::Arc};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
};
use ws_tool::{
codec::{AsyncFrameCodec, AsyncFrameRecv, AsyncFrameSend, FrameConfig},
frame::OpCode,
http::Request,
*,
};
pub struct WsAdapter {
pub peer_addr: Option<SocketAddr>,
pub request: Option<Request<()>>,
sender: Mutex<AsyncFrameSend<OwnedWriteHalf>>,
recver: Mutex<AsyncFrameRecv<OwnedReadHalf>>,
}
impl WsAdapter {
pub async fn bind_accept(addr: SocketAddr) -> anyhow::Result<Self> {
let listener = TcpListener::bind(addr).await?;
Self::accept(&listener).await
}
pub async fn accept(listener: &TcpListener) -> anyhow::Result<Self> {
let (stream, addr) = listener.accept().await?;
let (request, server) =
ServerBuilder::async_accept(stream, codec::default_handshake_handler, |req, stream| {
let config = FrameConfig {
mask_send_frame: false,
..Default::default()
};
Ok((req, AsyncFrameCodec::new_with(stream, config)))
})
.await?;
let (r, s) = server.split();
Ok(Self {
peer_addr: Some(addr),
request: Some(request),
sender: Mutex::new(s),
recver: Mutex::new(r),
})
}
pub async fn accept_from_stream(stream: TcpStream) -> anyhow::Result<Self> {
let peer_addr = stream.peer_addr().ok();
let (request, server) =
ServerBuilder::async_accept(stream, codec::default_handshake_handler, |req, stream| {
let config = FrameConfig {
mask_send_frame: false,
..Default::default()
};
Ok((req, AsyncFrameCodec::new_with(stream, config)))
})
.await?;
let (r, s) = server.split();
Ok(Self {
peer_addr,
request: Some(request),
sender: Mutex::new(s),
recver: Mutex::new(r),
})
}
pub async fn connect<A: ToString>(address: A) -> anyhow::Result<Self> {
let client = ClientBuilder::new()
.async_connect(address.to_string().parse()?, AsyncFrameCodec::check_fn)
.await?;
let (r, s) = client.split();
Ok(Self {
peer_addr: None,
request: None,
sender: Mutex::new(s),
recver: Mutex::new(r),
})
}
}
const MAX_PACK_SIZE: usize = 60 * 1024;
#[async_trait::async_trait]
impl crate::Adapter for WsAdapter {
async fn send(&self, pack: Vec<u8>) -> anyhow::Result<()> {
let mut sender = self.sender.lock().await;
for c in pack.chunks(MAX_PACK_SIZE) {
sender.send(OpCode::Binary, c).await?;
}
Ok(())
}
async fn recv(&self) -> anyhow::Result<Vec<u8>> {
let mut recver = self.recver.lock().await;
let mut result = vec![];
loop {
let msg = recver.receive().await?;
if msg.code().is_close() {
return Err(crate::error::Disconnect.into());
}
let msg = msg.payload();
result.extend_from_slice(msg.as_ref());
if msg.as_ref().len() < MAX_PACK_SIZE {
break;
}
}
Ok(result)
}
}
impl crate::session::Session {
pub fn ws_adapter(&self) -> Option<Arc<WsAdapter>> {
self.downcast_adapter()
}
pub async fn ws_connect<A: ToString + AsRef<str>, S: Service>(
address: A,
service: S,
) -> anyhow::Result<Self> {
let a = WsAdapter::connect(address).await?;
Ok(Self::from(a, service))
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn ws_accept<A: tokio::net::ToSocketAddrs, S: Service>(
address: A,
service: S,
) -> anyhow::Result<Self> {
let a = WsAdapter::accept(&tokio::net::TcpListener::bind(address).await?).await?;
Ok(Self::from(a, service))
}
}