use async_lock::Mutex;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use std::io::ErrorKind;
use ws_stream_wasm::*;
pub struct WsAdapter {
sender: Mutex<SplitSink<WsStream, WsMessage>>,
recver: Mutex<SplitStream<WsStream>>,
}
impl WsAdapter {
pub async fn connect<A: AsRef<str>>(address: A) -> anyhow::Result<Self> {
let (_ws, wsio) = WsMeta::connect(address, None).await?;
let (sink, stream) = wsio.split();
Ok(Self {
sender: Mutex::new(sink),
recver: Mutex::new(stream),
})
}
}
const MAX_PACK_SIZE: usize = 60 * 1024;
#[async_trait::async_trait]
impl crate::Adapter for WsAdapter {
async fn send(&self, pack: Vec<u8>) -> anyhow::Result<()> {
let mut sender = self.sender.lock().await;
for c in pack.chunks(MAX_PACK_SIZE) {
sender.send(WsMessage::Binary(c.into())).await?;
}
Ok(())
}
async fn recv(&self) -> anyhow::Result<Vec<u8>> {
let mut recver = self.recver.lock().await;
let mut result = vec![];
loop {
let msg = recver
.next()
.await
.ok_or(std::io::Error::from(ErrorKind::ConnectionReset))?;
let data: Vec<u8> = match msg {
WsMessage::Binary(x) => x,
WsMessage::Text(s) => s.into(),
};
let len = data.len();
if result.is_empty() {
result = data;
} else {
result.extend_from_slice(&data);
}
if len < MAX_PACK_SIZE {
break;
}
}
Ok(result)
}
}