#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use {
std::{
future::Future,
io::{
self,
prelude::*,
},
pin::Pin,
},
tokio::io::{
AsyncRead,
AsyncWrite,
},
};
#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite029"))] use {
std::{
iter,
mem,
},
fallible_collections::FallibleVec,
futures::{
Sink,
SinkExt as _,
future::{
self,
Either,
},
stream::{
self,
Stream,
StreamExt as _,
TryStreamExt as _,
},
},
};
#[cfg(feature = "tokio-tungstenite021")] use tokio_tungstenite021::tungstenite as tungstenite021;
#[cfg(feature = "tokio-tungstenite024")] use tokio_tungstenite024::tungstenite as tungstenite024;
#[cfg(feature = "tokio-tungstenite029")] use tokio_tungstenite029::tungstenite as tungstenite029;
pub use {
async_proto_derive::{
Protocol,
bitflags,
},
crate::error::*,
};
#[doc(hidden)] pub use tokio;
mod error;
mod impls;
#[cfg(any(feature = "tokio-tungstenite021", feature = "tokio-tungstenite024", feature = "tokio-tungstenite029"))] const WS_MAX_MESSAGE_SIZE: usize = 16777216;
pub trait Protocol: Sized {
fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
fn read_owned<R: AsyncRead + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
Box::pin(async move {
let value = Self::read(&mut stream).await?;
Ok((stream, value))
})
}
fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
let mut temp_buf = vec![0; 8];
loop {
let mut slice = &mut &**buf;
match Self::read_sync(&mut slice) {
Ok(value) => {
let value_len = slice.len();
buf.drain(..buf.len() - value_len);
return Ok(Some(value))
}
Err(ReadError { kind: ReadErrorKind::Io(e), .. }) if e.kind() == io::ErrorKind::UnexpectedEof => {}
Err(e) => return Err(e),
}
match stream.read(&mut temp_buf) {
Ok(0) => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
}),
Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(e) => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
}),
}
}
}
#[cfg(feature = "tokio-tungstenite021")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
fn read_ws021<'a, R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await.map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?.ok_or_else(|| ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
})?;
match packet {
tungstenite021::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
while buf.len() < len {
let packet = stream.try_next().await.map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?.ok_or_else(|| ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
})?;
if let tungstenite021::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind021(packet),
})
}
}
Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})
}
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage024(data),
}),
},
tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}),
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind021(packet),
}),
}
})
}
#[cfg(feature = "tokio-tungstenite024")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
fn read_ws024<'a, R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await.map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?.ok_or_else(|| ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
})?;
match packet {
tungstenite024::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
while buf.len() < len {
let packet = stream.try_next().await.map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?.ok_or_else(|| ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
})?;
if let tungstenite024::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind024(packet),
})
}
}
Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})
}
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage024(data),
}),
},
tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}),
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind024(packet),
}),
}
})
}
#[cfg(feature = "tokio-tungstenite029")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite029")))]
fn read_ws029<'a, R: Stream<Item = Result<tungstenite029::Message, tungstenite029::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await.map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?.ok_or_else(|| ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
})?;
match packet {
tungstenite029::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
while buf.len() < len {
let packet = stream.try_next().await.map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?.ok_or_else(|| ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::EndOfStream,
})?;
if let tungstenite029::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind029(packet),
})
}
}
Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})
}
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage029(data),
}),
},
tungstenite029::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}),
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind029(packet),
}),
}
})
}
#[cfg(feature = "tokio-tungstenite021")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
fn write_ws021<'a, W: Sink<tungstenite021::Message, Error = tungstenite021::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?;
if buf.len() <= WS_MAX_MESSAGE_SIZE {
sink.send(tungstenite021::Message::binary(buf)).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
} else {
sink.send(tungstenite021::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
sink.send(tungstenite021::Message::binary(chunk)).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
}
}
Ok(())
})
}
#[cfg(feature = "tokio-tungstenite024")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
fn write_ws024<'a, W: Sink<tungstenite024::Message, Error = tungstenite024::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?;
if buf.len() <= WS_MAX_MESSAGE_SIZE {
sink.send(tungstenite024::Message::binary(buf)).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
} else {
sink.send(tungstenite024::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
sink.send(tungstenite024::Message::binary(chunk)).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
}
}
Ok(())
})
}
#[cfg(feature = "tokio-tungstenite029")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite029")))]
fn write_ws029<'a, W: Sink<tungstenite029::Message, Error = tungstenite029::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?;
if buf.len() <= WS_MAX_MESSAGE_SIZE {
sink.send(tungstenite029::Message::binary(buf)).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
} else {
sink.send(tungstenite029::Message::text(format!("m{}", buf.len()))).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
sink.send(tungstenite029::Message::binary(tungstenite029::Bytes::copy_from_slice(chunk))).await.map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
}
}
Ok(())
})
}
#[cfg(feature = "tokio-tungstenite021")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
fn read_ws_sync021(websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
let packet = websocket.read().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
match packet {
tungstenite021::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
while buf.len() < len {
let packet = websocket.read().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
if let tungstenite021::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind021(packet),
})
}
}
Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})
}
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage024(data),
}),
},
tungstenite021::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}),
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind021(packet),
}),
}
}
#[cfg(feature = "tokio-tungstenite024")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
fn read_ws_sync024(websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
let packet = websocket.read().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
match packet {
tungstenite024::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
while buf.len() < len {
let packet = websocket.read().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
if let tungstenite024::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind024(packet),
})
}
}
Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})
}
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage024(data),
}),
},
tungstenite024::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}),
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind024(packet),
}),
}
}
#[cfg(feature = "tokio-tungstenite029")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite029")))]
fn read_ws_sync029(websocket: &mut tungstenite029::WebSocket<impl Read + Write>) -> Result<Self, ReadError> {
let packet = websocket.read().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
match packet {
tungstenite029::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let mut buf = <Vec<_> as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
while buf.len() < len {
let packet = websocket.read().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
if let tungstenite029::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind029(packet),
})
}
}
Self::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})
}
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage029(data),
}),
},
tungstenite029::Message::Binary(data) => Self::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}),
_ => Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind029(packet),
}),
}
}
#[cfg(feature = "tokio-tungstenite021")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
fn write_ws_sync021(&self, websocket: &mut tungstenite021::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
let mut buf = Vec::default();
self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?;
if buf.len() <= WS_MAX_MESSAGE_SIZE {
websocket.send(tungstenite021::Message::binary(buf)).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
} else {
websocket.send(tungstenite021::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
websocket.send(tungstenite021::Message::binary(chunk)).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
}
}
websocket.flush().map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
Ok(())
}
#[cfg(feature = "tokio-tungstenite024")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
fn write_ws_sync024(&self, websocket: &mut tungstenite024::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
let mut buf = Vec::default();
self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?;
if buf.len() <= WS_MAX_MESSAGE_SIZE {
websocket.send(tungstenite024::Message::binary(buf)).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
} else {
websocket.send(tungstenite024::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
websocket.send(tungstenite024::Message::binary(chunk)).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
}
}
websocket.flush().map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
Ok(())
}
#[cfg(feature = "tokio-tungstenite029")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite029")))]
fn write_ws_sync029(&self, websocket: &mut tungstenite029::WebSocket<impl Read + Write>) -> Result<(), WriteError> {
let mut buf = Vec::default();
self.write_sync(&mut buf).map_err(|WriteError { context, kind }| WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?;
if buf.len() <= WS_MAX_MESSAGE_SIZE {
websocket.send(tungstenite029::Message::binary(buf)).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
} else {
websocket.send(tungstenite029::Message::text(format!("m{}", buf.len()))).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
for chunk in buf.chunks(WS_MAX_MESSAGE_SIZE) {
websocket.send(tungstenite029::Message::binary(tungstenite029::Bytes::copy_from_slice(chunk))).map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
}
}
websocket.flush().map_err(|e| WriteError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
Ok(())
}
#[cfg(feature = "tokio-tungstenite021")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
fn read_ws_owned021<R: Stream<Item = Result<tungstenite021::Message, tungstenite021::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
Box::pin(async move {
let value = Self::read_ws021(&mut stream).await?;
Ok((stream, value))
})
}
#[cfg(feature = "tokio-tungstenite024")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
fn read_ws_owned024<R: Stream<Item = Result<tungstenite024::Message, tungstenite024::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
Box::pin(async move {
let value = Self::read_ws024(&mut stream).await?;
Ok((stream, value))
})
}
#[cfg(feature = "tokio-tungstenite029")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite029")))]
fn read_ws_owned029<R: Stream<Item = Result<tungstenite029::Message, tungstenite029::Error>> + Unpin + Send + 'static>(mut stream: R) -> Pin<Box<dyn Future<Output = Result<(R, Self), ReadError>> + Send>> {
Box::pin(async move {
let value = Self::read_ws029(&mut stream).await?;
Ok((stream, value))
})
}
}
pub trait LengthPrefixed: Protocol {
fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError>;
fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError>;
}
#[cfg(feature = "tokio-tungstenite021")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite021")))]
pub async fn websocket021<R: Protocol, W: Protocol>(request: impl tungstenite021::client::IntoClientRequest + Unpin) -> tungstenite021::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
let (sock, _) = tokio_tungstenite021::connect_async(request).await?;
let (sink, stream) = sock.split();
Ok((
sink.sink_map_err(|e| WriteError {
context: ErrorContext::WebSocketSink,
kind: e.into(),
}).with_flat_map::<W, _, _>(|msg| {
let mut buf = Vec::default();
match msg.write_sync(&mut buf) {
Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
Either::Left(stream::once(future::ready(tungstenite021::Message::binary(buf))))
} else {
Either::Right(stream::iter(
iter::once(tungstenite021::Message::text(format!("m{}", buf.len())))
.chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite021::Message::binary))
.collect::<Vec<_>>()
))
}.map(Ok)),
Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}))),
}
}),
stream.scan(None, |state, res| {
fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite021::Result<tungstenite021::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
let packet = res.map_err(|e| ReadError {
context: ErrorContext::WebSocketStream,
kind: e.into(),
})?;
Ok(if let Some((len, buf)) = state {
if let tungstenite021::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind021(packet),
})
}
if buf.len() >= *len {
let buf = mem::take(buf);
*state = None;
Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?)))
} else {
Either::Left(stream::empty())
}
} else {
match packet {
tungstenite021::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
*state = Some((len, buf));
Either::Left(stream::empty())
}
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage024(data),
}),
},
tungstenite021::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?))),
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind021(packet),
}),
}
})
}
future::ready(Some(scanner(state, res)))
}).try_flatten(),
))
}
#[cfg(feature = "tokio-tungstenite024")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite024")))]
pub async fn websocket024<R: Protocol, W: Protocol>(request: impl tungstenite024::client::IntoClientRequest + Unpin) -> tungstenite024::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
let (sock, _) = tokio_tungstenite024::connect_async(request).await?;
let (sink, stream) = sock.split();
Ok((
sink.sink_map_err(|e| WriteError {
context: ErrorContext::WebSocketSink,
kind: e.into(),
}).with_flat_map::<W, _, _>(|msg| {
let mut buf = Vec::default();
match msg.write_sync(&mut buf) {
Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
Either::Left(stream::once(future::ready(tungstenite024::Message::binary(buf))))
} else {
Either::Right(stream::iter(
iter::once(tungstenite024::Message::text(format!("m{}", buf.len())))
.chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(tungstenite024::Message::binary))
.collect::<Vec<_>>()
))
}.map(Ok)),
Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}))),
}
}),
stream.scan(None, |state, res| {
fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite024::Result<tungstenite024::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
let packet = res.map_err(|e| ReadError {
context: ErrorContext::WebSocketStream,
kind: e.into(),
})?;
Ok(if let Some((len, buf)) = state {
if let tungstenite024::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind024(packet),
})
}
if buf.len() >= *len {
let buf = mem::take(buf);
*state = None;
Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?)))
} else {
Either::Left(stream::empty())
}
} else {
match packet {
tungstenite024::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
*state = Some((len, buf));
Either::Left(stream::empty())
}
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage024(data),
}),
},
tungstenite024::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?))),
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind024(packet),
}),
}
})
}
future::ready(Some(scanner(state, res)))
}).try_flatten(),
))
}
#[cfg(feature = "tokio-tungstenite029")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite029")))]
pub async fn websocket029<R: Protocol, W: Protocol>(request: impl tungstenite029::client::IntoClientRequest + Unpin) -> tungstenite029::Result<(impl Sink<W, Error = WriteError>, impl Stream<Item = Result<R, ReadError>>)> {
let (sock, _) = tokio_tungstenite029::connect_async(request).await?;
let (sink, stream) = sock.split();
Ok((
sink.sink_map_err(|e| WriteError {
context: ErrorContext::WebSocketSink,
kind: e.into(),
}).with_flat_map::<W, _, _>(|msg| {
let mut buf = Vec::default();
match msg.write_sync(&mut buf) {
Ok(()) => Either::Left(if buf.len() <= WS_MAX_MESSAGE_SIZE {
Either::Left(stream::once(future::ready(tungstenite029::Message::binary(buf))))
} else {
Either::Right(stream::iter(
iter::once(tungstenite029::Message::text(format!("m{}", buf.len())))
.chain(buf.chunks(WS_MAX_MESSAGE_SIZE).map(|chunk| tungstenite029::Message::binary(tungstenite029::Bytes::copy_from_slice(chunk))))
.collect::<Vec<_>>()
))
}.map(Ok)),
Err(WriteError { context, kind }) => Either::Right(stream::once(future::err(WriteError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
}))),
}
}),
stream.scan(None, |state, res| {
fn scanner<R: Protocol>(state: &mut Option<(usize, Vec<u8>)>, res: tungstenite029::Result<tungstenite029::Message>) -> Result<impl Stream<Item = Result<R, ReadError>> + use<R>, ReadError> {
let packet = res.map_err(|e| ReadError {
context: ErrorContext::WebSocketStream,
kind: e.into(),
})?;
Ok(if let Some((len, buf)) = state {
if let tungstenite029::Message::Binary(data) = packet {
buf.extend_from_slice(&data);
} else {
return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind029(packet),
})
}
if buf.len() >= *len {
let buf = mem::take(buf);
*state = None;
Either::Right(stream::once(future::ok(R::read_sync(&mut &*buf).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?)))
} else {
Either::Left(stream::empty())
}
} else {
match packet {
tungstenite029::Message::Text(data) => match data.chars().next() {
Some('m') => {
let len = data[1..].parse::<usize>().map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
let buf = FallibleVec::try_with_capacity(len).map_err(|e| ReadError {
context: ErrorContext::DefaultImpl,
kind: e.into(),
})?;
*state = Some((len, buf));
Either::Left(stream::empty())
}
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::WebSocketTextMessage029(data),
}),
},
tungstenite029::Message::Binary(data) => Either::Right(stream::once(future::ok(R::read_sync(&mut &*data).map_err(|ReadError { context, kind }| ReadError {
context: ErrorContext::WebSocket {
source: Box::new(context),
},
kind,
})?))),
_ => return Err(ReadError {
context: ErrorContext::DefaultImpl,
kind: ReadErrorKind::MessageKind029(packet),
}),
}
})
}
future::ready(Some(scanner(state, res)))
}).try_flatten(),
))
}