pub use decoder::*;
pub use encoder::*;
pub use error::*;
pub use message::*;
mod decoder;
mod encoder;
mod error;
mod message;
use core::future::Future;
const PAYLOAD_LEN_SIZE: usize = 4;
const MAX_MESSAGES_PER_BATCH: usize = 255;
const BATCH_OVERHEAD: usize = 1 + 1 + PAYLOAD_LEN_SIZE;
const LABEL_OVERHEAD: usize = 1 + 1;
const MAX_PLAIN_MESSAGE_HEADER_SIZE: usize = 1 + 5;
#[cfg(feature = "encryption")]
const ENCRYPTED_MESSAGE_HEADER_SIZE: usize = 1 + 1 + PAYLOAD_LEN_SIZE;
#[cfg(any(
feature = "snappy",
feature = "zstd",
feature = "lz4",
feature = "brotli"
))]
const COMPRESSED_MESSAGE_HEADER_SIZE: usize = 1 + 2 + PAYLOAD_LEN_SIZE + PAYLOAD_LEN_SIZE;
#[cfg(any(
feature = "crc32",
feature = "xxhash32",
feature = "xxhash64",
feature = "xxhash3",
feature = "murmur3",
))]
const CHECKSUMED_MESSAGE_HEADER_SIZE: usize = 1 + 1 + PAYLOAD_LEN_SIZE;
const COMPOOUND_MESSAGE_TAG: u8 = 1;
const PING_MESSAGE_TAG: u8 = 2;
const INDIRECT_PING_MESSAGE_TAG: u8 = 3;
const ACK_MESSAGE_TAG: u8 = 4;
const SUSPECT_MESSAGE_TAG: u8 = 5;
const ALIVE_MESSAGE_TAG: u8 = 6;
const DEAD_MESSAGE_TAG: u8 = 7;
const PUSH_PULL_MESSAGE_TAG: u8 = 8;
const USER_DATA_MESSAGE_TAG: u8 = 9;
const NACK_MESSAGE_TAG: u8 = 10;
const ERROR_RESPONSE_MESSAGE_TAG: u8 = 11;
const LABELED_MESSAGE_TAG: u8 = 12;
const CHECKSUMED_MESSAGE_TAG: u8 = 13;
const COMPRESSED_MESSAGE_TAG: u8 = 14;
const ENCRYPTED_MESSAGE_TAG: u8 = 15;
#[inline]
const fn is_plain_message_tag(tag: u8) -> bool {
matches!(
tag,
PING_MESSAGE_TAG
| INDIRECT_PING_MESSAGE_TAG
| ACK_MESSAGE_TAG
| SUSPECT_MESSAGE_TAG
| ALIVE_MESSAGE_TAG
| DEAD_MESSAGE_TAG
| PUSH_PULL_MESSAGE_TAG
| USER_DATA_MESSAGE_TAG
| NACK_MESSAGE_TAG
| ERROR_RESPONSE_MESSAGE_TAG
)
}
pub trait ProtoReader: Send + Sync {
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send;
fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send;
}
impl<T: ProtoReader> ProtoReader for &mut T {
fn read(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send {
(**self).read(buf)
}
fn read_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send {
(**self).read_exact(buf)
}
fn peek(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<usize>> + Send {
(**self).peek(buf)
}
fn peek_exact(&mut self, buf: &mut [u8]) -> impl Future<Output = std::io::Result<()>> + Send {
(**self).peek_exact(buf)
}
}
const _: () = {
use futures_util::io::AsyncRead;
use peekable::future::AsyncPeekable;
impl<T: AsyncRead + Send + Sync + Unpin> ProtoReader for AsyncPeekable<T> {
async fn peek(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
AsyncPeekable::peek(self, buf).await
}
async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
futures_util::io::AsyncReadExt::read_exact(self, buf).await
}
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
futures_util::io::AsyncReadExt::read(self, buf).await
}
async fn peek_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
AsyncPeekable::peek_exact(self, buf).await
}
}
};
pub trait ProtoWriter: Send + Sync {
fn close(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
fn write_all(&mut self, payload: &[u8]) -> impl Future<Output = std::io::Result<()>> + Send;
fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> + Send;
}
impl<W> ProtoWriter for W
where
W: futures_util::AsyncWrite + Send + Sync + Unpin,
{
async fn close(&mut self) -> std::io::Result<()> {
futures_util::io::AsyncWriteExt::close(self).await
}
async fn write_all(&mut self, payload: &[u8]) -> std::io::Result<()> {
futures_util::io::AsyncWriteExt::write_all(self, payload).await
}
async fn flush(&mut self) -> std::io::Result<()> {
futures_util::io::AsyncWriteExt::flush(self).await
}
}
#[cfg(feature = "encryption")]
struct AeadBuffer<'a> {
buf: &'a mut [u8],
len: usize,
}
#[cfg(feature = "encryption")]
const _: () = {
impl<'a> AeadBuffer<'a> {
#[inline]
const fn new(buf: &'a mut [u8], len: usize) -> Self {
Self { buf, len }
}
}
impl AsRef<[u8]> for AeadBuffer<'_> {
fn as_ref(&self) -> &[u8] {
&self.buf[..self.len]
}
}
impl AsMut<[u8]> for AeadBuffer<'_> {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.buf[..self.len]
}
}
impl aead::Buffer for AeadBuffer<'_> {
fn extend_from_slice(&mut self, other: &[u8]) -> aead::Result<()> {
if self.len >= self.buf.len() {
return Err(aead::Error);
}
self.buf[self.len..self.len + other.len()].copy_from_slice(other);
self.len += other.len();
Ok(())
}
fn truncate(&mut self, len: usize) {
if len >= self.len {
return;
}
self.buf[len..self.len].fill(0);
self.len = len;
}
}
};