use core::fmt;
use core::marker::PhantomData;
use crate::error::RecvError;
use crate::io::framed::FramedRead;
use crate::io::source::{AsyncSourceExt, Source};
use crate::io::{AsyncRead, Container, IntoRead, Read};
use crate::serdes::Deserializer;
use crate::statistics::{StatIO, Statistics};
mod config;
mod decoder;
pub use self::config::Config;
pub use self::decoder::Decoder;
pub struct Receiver<T, R, D> {
_marker: PhantomData<fn() -> T>,
deserializer: D,
framed: FramedRead<StatIO<R>, Decoder>,
}
impl<T> Receiver<T, (), ()> {
#[inline]
pub fn builder() -> Builder<T, (), ()> {
Builder::new()
}
}
#[cfg(feature = "bincode")]
impl<T, R> Receiver<T, R, crate::serdes::Bincode> {
#[inline]
pub fn new(reader: impl IntoRead<R>) -> Self {
Self::with_deserializer(reader, crate::serdes::Bincode::new())
}
}
impl<T, R, D> Receiver<T, R, D> {
#[inline]
pub fn with_deserializer(
reader: impl IntoRead<R>,
deserializer: D,
) -> Self {
Receiver::builder()
.reader(reader)
.deserializer(deserializer)
.build()
}
#[inline]
pub fn deserializer(&self) -> &D {
&self.deserializer
}
#[inline]
pub fn deserializer_mut(&mut self) -> &mut D {
&mut self.deserializer
}
#[inline]
pub fn config(&self) -> &Config {
self.framed.decoder().config()
}
#[inline]
pub fn incoming(&mut self) -> Incoming<'_, T, R, D> {
Incoming { receiver: self }
}
#[inline]
#[cfg(feature = "statistics")]
pub fn statistics(&self) -> &Statistics {
&self.framed.reader().statistics
}
}
impl<T, R, D> Receiver<T, R, D>
where
R: Container,
{
#[inline]
pub fn get(&self) -> &R::Inner {
self.framed.reader().inner.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut R::Inner {
self.framed.reader_mut().inner.get_mut()
}
#[inline]
pub fn into_reader(self) -> R::Inner {
self.framed.into_reader().inner.into_inner()
}
}
impl<T, R, D> Receiver<T, R, D>
where
R: AsyncRead + Unpin,
D: Deserializer<T>,
{
pub async fn recv(
&mut self,
) -> Result<T, RecvError<D::Error, R::Error>> {
let mut payload =
self.framed.next().await.map_err(RecvError::from)?;
self.framed.reader_mut().statistics.inc_total_items();
self.deserializer
.deserialize(&mut payload)
.map_err(RecvError::Serde)
}
}
impl<T, R, D> Receiver<T, R, D>
where
R: Read,
D: Deserializer<T>,
{
#[inline]
pub fn recv_blocking(
&mut self,
) -> Result<T, RecvError<D::Error, R::Error>> {
let mut payload =
self.framed.next().map_err(RecvError::from)?;
self.framed.reader_mut().statistics.inc_total_items();
self.deserializer
.deserialize(&mut payload)
.map_err(RecvError::Serde)
}
}
impl<T, R, D> fmt::Debug for Receiver<T, R, D>
where
R: fmt::Debug,
D: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("reader", self.framed.reader())
.field("deserializer", &self.deserializer)
.field("config", self.config())
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct Incoming<'a, T, R, D> {
receiver: &'a mut Receiver<T, R, D>,
}
impl<'a, T, R, D> Iterator for Incoming<'a, T, R, D>
where
R: Read,
D: Deserializer<T>,
{
type Item = Result<T, RecvError<D::Error, R::Error>>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
Some(self.receiver.recv_blocking())
}
}
impl<'a, T, R, D> Incoming<'a, T, R, D>
where
R: AsyncRead + Unpin,
D: Deserializer<T>,
{
#[inline]
pub async fn next_async(
&mut self,
) -> Result<T, RecvError<D::Error, R::Error>> {
self.receiver.recv().await
}
}
#[derive(Clone)]
#[must_use = "builders don't do anything unless you `.build()` them"]
pub struct Builder<T, R, D> {
_marker: PhantomData<fn() -> T>,
reader: R,
deserializer: D,
config: Option<Config>,
}
impl<T> Builder<T, (), ()> {
#[inline]
pub fn new() -> Self {
Builder {
_marker: PhantomData,
reader: (),
deserializer: (),
config: None,
}
}
}
impl<T> Default for Builder<T, (), ()> {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl<T, D> Builder<T, (), D> {
#[inline]
pub fn reader<R>(
self,
reader: impl IntoRead<R>,
) -> Builder<T, R, D> {
Builder {
_marker: PhantomData,
reader: reader.into_read(),
deserializer: self.deserializer,
config: self.config,
}
}
}
impl<T, R> Builder<T, R, ()> {
#[inline]
pub fn deserializer<D>(
self,
deserializer: D,
) -> Builder<T, R, D> {
Builder {
_marker: PhantomData,
reader: self.reader,
deserializer,
config: self.config,
}
}
}
impl<T, R, D> Builder<T, R, D> {
#[inline]
pub fn config(mut self, config: Config) -> Self {
self.config = Some(config);
self
}
#[inline]
pub fn build(self) -> Receiver<T, R, D> {
let Self { _marker, config, deserializer, reader } = self;
Receiver {
_marker: PhantomData,
deserializer,
framed: FramedRead::new(
StatIO::new(reader),
Decoder::with_config(config.unwrap_or_default()),
),
}
}
}
impl<T, R, D> fmt::Debug for Builder<T, R, D>
where
R: fmt::Debug,
D: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Builder")
.field("reader", &self.reader)
.field("deserializer", &self.deserializer)
.field("config", &self.config)
.finish()
}
}