#![deny(missing_docs, rust_2018_idioms, unused, unused_crate_dependencies, unused_import_braces, unused_lifetimes, unused_qualifications, warnings)]
#![forbid(unsafe_code)]
use {
std::{
fmt,
future::Future,
io,
num::TryFromIntError,
pin::Pin,
string::FromUtf8Error,
sync::Arc,
},
derive_more::From,
tokio::io::{
AsyncRead,
AsyncWrite,
},
};
#[cfg(any(feature = "read-sync", feature = "write-sync"))] use std::io::prelude::*;
#[cfg(any(feature = "tokio-tungstenite", feature = "warp"))] use futures::{
Sink,
SinkExt as _,
stream::{
Stream,
TryStreamExt as _,
},
};
pub use async_proto_derive::Protocol;
#[doc(hidden)] pub use {
derive_more,
tokio,
};
mod impls;
#[derive(Debug, From, Clone)]
#[allow(missing_docs)]
pub enum ReadError {
BufSize(TryFromIntError),
Custom(String),
#[cfg(any(feature = "tokio-tungstenite", feature = "warp"))]
EndOfStream,
#[cfg(feature = "serde_json")]
FloatNotFinite,
Io(Arc<io::Error>),
ReadNever,
#[cfg(feature = "tokio-tungstenite")]
Tungstenite(Arc<dep_tokio_tungstenite::tungstenite::Error>),
#[from(ignore)]
UnknownVariant(u8),
Utf8(FromUtf8Error),
#[cfg(feature = "warp")]
Warp(Arc<dep_warp::Error>),
}
impl From<io::Error> for ReadError {
fn from(e: io::Error) -> ReadError {
ReadError::Io(Arc::new(e))
}
}
#[cfg(feature = "tokio-tungstenite")]
impl From<dep_tokio_tungstenite::tungstenite::Error> for ReadError {
fn from(e: dep_tokio_tungstenite::tungstenite::Error) -> ReadError {
ReadError::Tungstenite(Arc::new(e))
}
}
#[cfg(feature = "warp")]
impl From<dep_warp::Error> for ReadError {
fn from(e: dep_warp::Error) -> ReadError {
ReadError::Warp(Arc::new(e))
}
}
impl fmt::Display for ReadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ReadError::BufSize(e) => write!(f, "received a buffer with more than usize::MAX elements: {}", e),
ReadError::Custom(msg) => msg.fmt(f),
#[cfg(any(feature = "tokio-tungstenite", feature = "warp"))]
ReadError::EndOfStream => write!(f, "reached end of stream"),
#[cfg(feature = "serde_json")]
ReadError::FloatNotFinite => write!(f, "received an infinite or NaN JSON number"),
ReadError::Io(e) => write!(f, "I/O error: {}", e),
ReadError::ReadNever => write!(f, "attempted to read an empty type"),
#[cfg(feature = "tokio-tungstenite")]
ReadError::Tungstenite(e) => write!(f, "tungstenite error: {}", e),
ReadError::UnknownVariant(n) => write!(f, "unknown enum variant: {}", n),
ReadError::Utf8(e) => e.fmt(f),
#[cfg(feature = "warp")]
ReadError::Warp(e) => write!(f, "warp error: {}", e),
}
}
}
#[derive(Debug, From, Clone)]
#[allow(missing_docs)]
pub enum WriteError {
BufSize(TryFromIntError),
Custom(String),
Io(Arc<io::Error>),
#[cfg(feature = "tokio-tungstenite")]
Tungstenite(Arc<dep_tokio_tungstenite::tungstenite::Error>),
#[cfg(feature = "warp")]
Warp(Arc<dep_warp::Error>),
}
impl From<io::Error> for WriteError {
fn from(e: io::Error) -> WriteError {
WriteError::Io(Arc::new(e))
}
}
#[cfg(feature = "tokio-tungstenite")]
impl From<dep_tokio_tungstenite::tungstenite::Error> for WriteError {
fn from(e: dep_tokio_tungstenite::tungstenite::Error) -> WriteError {
WriteError::Tungstenite(Arc::new(e))
}
}
#[cfg(feature = "warp")]
impl From<dep_warp::Error> for WriteError {
fn from(e: dep_warp::Error) -> WriteError {
WriteError::Warp(Arc::new(e))
}
}
impl fmt::Display for WriteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WriteError::BufSize(e) => write!(f, "tried to send a buffer with more than u64::MAX elements: {}", e),
WriteError::Custom(msg) => msg.fmt(f),
WriteError::Io(e) => write!(f, "I/O error: {}", e),
#[cfg(feature = "tokio-tungstenite")]
WriteError::Tungstenite(e) => write!(f, "tungstenite error: {}", e),
#[cfg(feature = "warp")]
WriteError::Warp(e) => write!(f, "warp error: {}", e),
}
}
}
pub trait Protocol: Sized {
fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>>;
fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>;
#[cfg(feature = "read-sync")]
fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError>;
#[cfg(feature = "write-sync")]
fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError>;
#[cfg(feature = "tokio-tungstenite")]
fn read_ws<'a, R: Stream<Item = Result<dep_tokio_tungstenite::tungstenite::Message, dep_tokio_tungstenite::tungstenite::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await?.ok_or(ReadError::EndOfStream)?;
Self::read(&mut &*packet.into_data()).await
})
}
#[cfg(feature = "tokio-tungstenite")]
fn write_ws<'a, W: Sink<dep_tokio_tungstenite::tungstenite::Message, Error = dep_tokio_tungstenite::tungstenite::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write(&mut buf).await?;
sink.send(dep_tokio_tungstenite::tungstenite::Message::binary(buf)).await?;
Ok(())
})
}
#[cfg(feature = "warp")]
fn read_warp<'a, R: Stream<Item = Result<dep_warp::filters::ws::Message, dep_warp::Error>> + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
Box::pin(async move {
let packet = stream.try_next().await?.ok_or(ReadError::EndOfStream)?;
Self::read(&mut packet.as_bytes()).await
})
}
#[cfg(feature = "warp")]
fn write_warp<'a, W: Sink<dep_warp::filters::ws::Message, Error = dep_warp::Error> + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>>
where Self: Sync {
Box::pin(async move {
let mut buf = Vec::default();
self.write(&mut buf).await?;
sink.send(dep_warp::filters::ws::Message::binary(buf)).await?;
Ok(())
})
}
}