#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use {
std::{
fmt,
future::Future,
io,
num::TryFromIntError,
pin::Pin,
string::FromUtf8Error,
sync::Arc,
},
derive_more::From,
tokio::io::{
AsyncRead,
AsyncWrite,
},
};
#[cfg(any(feature = "read-sync", feature = "write-sync"))] use std::io::prelude::*;
#[cfg(any(feature = "tokio-tungstenite", feature = "warp"))] use futures::{
Sink,
SinkExt as _,
stream::{
Stream,
TryStreamExt as _,
},
};
pub use async_proto_derive::Protocol;
#[doc(hidden)] pub use { derive_more,
tokio,
};
mod impls;
#[cfg_attr(not(feature = "read-sync"), doc = "The error returned from the [`read`](Protocol::read) method.")]
#[cfg_attr(feature = "read-sync", doc = "The error returned from the [`read`](Protocol::read) and [`read_sync`](Protocol::read_sync) methods.")]
#[derive(Debug, From, Clone)]
#[allow(missing_docs)]
pub enum ReadError {
BufSize(TryFromIntError),
Custom(String),
EndOfStream,
#[cfg(feature = "serde_json")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde_json")))]
FloatNotFinite,
Io(Arc<io::Error>),
ReadNever,
#[cfg(feature = "tokio-tungstenite")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite")))]
Tungstenite(Arc<dep_tokio_tungstenite::tungstenite::Error>),
#[from(ignore)]
UnknownVariant8(u8),
#[from(ignore)]
UnknownVariant16(u16),
#[from(ignore)]
UnknownVariant32(u32),
#[from(ignore)]
UnknownVariant64(u64),
Utf8(FromUtf8Error),
#[cfg(feature = "warp")]
#[cfg_attr(docsrs, doc(cfg(feature = "warp")))]
Warp(Arc<dep_warp::Error>),
}
impl From<io::Error> for ReadError {
fn from(e: io::Error) -> ReadError {
ReadError::Io(Arc::new(e))
}
}
#[cfg(feature = "tokio-tungstenite")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite")))]
impl From<dep_tokio_tungstenite::tungstenite::Error> for ReadError {
fn from(e: dep_tokio_tungstenite::tungstenite::Error) -> ReadError {
ReadError::Tungstenite(Arc::new(e))
}
}
#[cfg(feature = "warp")]
#[cfg_attr(docsrs, doc(cfg(feature = "warp")))]
impl From<dep_warp::Error> for ReadError {
fn from(e: dep_warp::Error) -> ReadError {
ReadError::Warp(Arc::new(e))
}
}
impl fmt::Display for ReadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ReadError::BufSize(e) => write!(f, "received a buffer with more than usize::MAX elements: {}", e),
ReadError::Custom(msg) => msg.fmt(f),
ReadError::EndOfStream => write!(f, "reached end of stream"),
#[cfg(feature = "serde_json")]
ReadError::FloatNotFinite => write!(f, "received an infinite or NaN JSON number"),
ReadError::Io(e) => write!(f, "I/O error: {}", e),
ReadError::ReadNever => write!(f, "attempted to read an empty type"),
#[cfg(feature = "tokio-tungstenite")]
ReadError::Tungstenite(e) => write!(f, "tungstenite error: {}", e),
ReadError::UnknownVariant8(n) => write!(f, "unknown enum variant: {}", n),
ReadError::UnknownVariant16(n) => write!(f, "unknown enum variant: {}", n),
ReadError::UnknownVariant32(n) => write!(f, "unknown enum variant: {}", n),
ReadError::UnknownVariant64(n) => write!(f, "unknown enum variant: {}", n),
ReadError::Utf8(e) => e.fmt(f),
#[cfg(feature = "warp")]
ReadError::Warp(e) => write!(f, "warp error: {}", e),
}
}
}
#[derive(Debug, From, Clone)]
#[allow(missing_docs)]
pub enum WriteError {
BufSize(TryFromIntError),
Custom(String),
Io(Arc<io::Error>),
#[cfg(feature = "tokio-tungstenite")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite")))]
Tungstenite(Arc<dep_tokio_tungstenite::tungstenite::Error>),
#[cfg(feature = "warp")]
#[cfg_attr(docsrs, doc(cfg(feature = "warp")))]
Warp(Arc<dep_warp::Error>),
}
impl From<io::Error> for WriteError {
fn from(e: io::Error) -> WriteError {
WriteError::Io(Arc::new(e))
}
}
#[cfg(feature = "tokio-tungstenite")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite")))]
impl From<dep_tokio_tungstenite::tungstenite::Error> for WriteError {
fn from(e: dep_tokio_tungstenite::tungstenite::Error) -> WriteError {
WriteError::Tungstenite(Arc::new(e))
}
}
#[cfg(feature = "warp")]
#[cfg_attr(docsrs, doc(cfg(feature = "warp")))]
impl From<dep_warp::Error> for WriteError {
fn from(e: dep_warp::Error) -> WriteError {
WriteError::Warp(Arc::new(e))
}
}
impl fmt::Display for WriteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WriteError::BufSize(e) => write!(f, "tried to send a buffer with more than u64::MAX elements: {}", e),
WriteError::Custom(msg) => msg.fmt(f),
WriteError::Io(e) => write!(f, "I/O error: {}", e),
#[cfg(feature = "tokio-tungstenite")]
WriteError::Tungstenite(e) => write!(f, "tungstenite error: {}", e),
#[cfg(feature = "warp")]
WriteError::Warp(e) => write!(f, "warp error: {}", e),
}
}
}
pub trait Protocol: Sized {
fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
#[cfg(feature = "read-sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "read-sync")))]
fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
#[cfg(feature = "write-sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "write-sync")))]
fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
#[cfg(feature = "read-sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "read-sync")))]
fn try_read(stream: &mut impl Read, buf: &mut Vec<u8>) -> Result<Option<Self>, ReadError> {
let mut temp_buf = vec![0; 8];
loop {
let mut slice = &mut &**buf;
match Self::read_sync(&mut slice) {
Ok(value) => {
let value_len = slice.len();
drop(slice);
buf.drain(..buf.len() - value_len);
return Ok(Some(value))
}
Err(ReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {}
Err(e) => return Err(e),
}
match stream.read(&mut temp_buf) {
Ok(0) => return Err(ReadError::EndOfStream),
Ok(n) => buf.extend_from_slice(&temp_buf[..n]),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(e) => return Err(e.into()),
}
}
}
#[cfg(feature = "tokio-tungstenite")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite")))]
fn read_ws<'a, R: Stream<Item = Result<dep_tokio_tungstenite::tungstenite::Message, dep_tokio_tungstenite::tungstenite::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await?.ok_or(ReadError::EndOfStream)?;
Self::read(&mut &*packet.into_data()).await
})
}
#[cfg(feature = "tokio-tungstenite")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-tungstenite")))]
fn write_ws<'a, W: Sink<dep_tokio_tungstenite::tungstenite::Message, Error = dep_tokio_tungstenite::tungstenite::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write(&mut buf).await?;
sink.send(dep_tokio_tungstenite::tungstenite::Message::binary(buf)).await?;
Ok(())
})
}
#[cfg(feature = "warp")]
#[cfg_attr(docsrs, doc(cfg(feature = "warp")))]
fn read_warp<'a, R: Stream<Item = Result<dep_warp::filters::ws::Message, dep_warp::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await?.ok_or(ReadError::EndOfStream)?;
Self::read(&mut packet.as_bytes()).await
})
}
#[cfg(feature = "warp")]
#[cfg_attr(docsrs, doc(cfg(feature = "warp")))]
fn write_warp<'a, W: Sink<dep_warp::filters::ws::Message, Error = dep_warp::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write(&mut buf).await?;
sink.send(dep_warp::filters::ws::Message::binary(buf)).await?;
Ok(())
})
}
}