use core::borrow::Borrow;
use core::fmt;
use core::marker::PhantomData;
use alloc::vec::Vec;
use crate::error::SendError;
use crate::io::framed::FramedWrite;
use crate::io::sink::{AsyncSinkExt, SinkExt};
use crate::io::{AsyncWrite, Container, IntoWrite, Write};
use crate::serdes::Serializer;
use crate::statistics::{StatIO, Statistics};
mod config;
mod encoder;
pub use self::config::Config;
pub use self::encoder::Encoder;
pub struct Sender<T, W, S> {
_marker: PhantomData<fn() -> T>,
serializer: S,
framed: FramedWrite<StatIO<W>, Encoder>,
}
impl<T> Sender<T, (), ()> {
#[inline]
pub fn builder() -> Builder<T, (), ()> {
Builder::new()
}
}
#[cfg(feature = "bincode")]
impl<T, W> Sender<T, W, crate::serdes::Bincode> {
#[inline]
pub fn new(writer: impl IntoWrite<W>) -> Self {
Self::with_serializer(writer, crate::serdes::Bincode::new())
}
}
impl<T, W, S> Sender<T, W, S> {
#[inline]
pub fn with_serializer(
writer: impl IntoWrite<W>,
serializer: S,
) -> Self {
Sender::builder()
.writer(writer)
.serializer(serializer)
.build()
}
#[inline]
pub fn serializer(&self) -> &S {
&self.serializer
}
#[inline]
pub fn serializer_mut(&mut self) -> &mut S {
&mut self.serializer
}
#[inline]
pub fn config(&self) -> &Config {
self.framed.encoder().config()
}
#[inline]
#[cfg(feature = "statistics")]
pub fn statistics(&self) -> &Statistics {
&self.framed.writer().statistics
}
}
impl<T, W, S> Sender<T, W, S>
where
W: Container,
{
#[inline]
pub fn get(&self) -> &W::Inner {
self.framed.writer().inner.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut W::Inner {
self.framed.writer_mut().inner.get_mut()
}
#[inline]
pub fn into_writer(self) -> W::Inner {
self.framed.into_writer().inner.into_inner()
}
}
impl<T, W, S> Sender<T, W, S>
where
S: Serializer<T>,
{
#[inline]
fn serialize_t<Io>(
&mut self,
t: &T,
) -> Result<Vec<u8>, SendError<S::Error, Io>> {
self.serializer.serialize(t).map_err(SendError::Serde)
}
}
impl<T, W, S> Sender<T, W, S>
where
W: AsyncWrite + Unpin,
S: Serializer<T>,
{
#[inline]
pub async fn send<D>(
&mut self,
data: D,
) -> Result<(), SendError<S::Error, W::Error>>
where
D: Borrow<T>,
{
self._send(data.borrow()).await
}
async fn _send(
&mut self,
data: &T,
) -> Result<(), SendError<S::Error, W::Error>> {
let payload = self.serialize_t(data)?;
self.framed.send(payload).await.map_err(SendError::from)?;
self.framed.writer_mut().statistics.inc_total_items();
Ok(())
}
}
impl<T, W, S> Sender<T, W, S>
where
W: Write,
S: Serializer<T>,
{
#[inline]
pub fn send_blocking<D>(
&mut self,
data: D,
) -> Result<(), SendError<S::Error, W::Error>>
where
D: Borrow<T>,
{
self._send_blocking(data.borrow())
}
fn _send_blocking(
&mut self,
data: &T,
) -> Result<(), SendError<S::Error, W::Error>> {
let payload = self.serialize_t(data)?;
self.framed.send(payload).map_err(SendError::from)?;
self.framed.writer_mut().statistics.inc_total_items();
Ok(())
}
}
impl<T, W, S> fmt::Debug for Sender<T, W, S>
where
W: fmt::Debug,
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("writer", self.framed.writer())
.field("serializer", &self.serializer)
.field("config", &self.config())
.finish_non_exhaustive()
}
}
#[derive(Clone)]
#[must_use = "builders don't do anything unless you `.build()` them"]
pub struct Builder<T, W, S> {
_marker: PhantomData<fn() -> T>,
writer: W,
serializer: S,
config: Option<Config>,
}
impl<T> Builder<T, (), ()> {
#[inline]
pub fn new() -> Self {
Builder {
_marker: PhantomData,
serializer: (),
writer: (),
config: None,
}
}
}
impl<T> Default for Builder<T, (), ()> {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl<T, S> Builder<T, (), S> {
#[inline]
pub fn writer<W>(
self,
writer: impl IntoWrite<W>,
) -> Builder<T, W, S> {
Builder {
_marker: PhantomData,
writer: writer.into_write(),
serializer: self.serializer,
config: self.config,
}
}
}
impl<T, W> Builder<T, W, ()> {
#[inline]
pub fn serializer<S>(self, serializer: S) -> Builder<T, W, S> {
Builder {
_marker: PhantomData,
writer: self.writer,
serializer,
config: self.config,
}
}
}
impl<T, W, S> Builder<T, W, S> {
#[inline]
pub fn config(mut self, config: Config) -> Self {
self.config = Some(config);
self
}
#[inline]
pub fn build(self) -> Sender<T, W, S> {
let Self { _marker, config, serializer, writer } = self;
Sender {
_marker: PhantomData,
serializer,
framed: FramedWrite::new(
StatIO::new(writer),
Encoder::with_config(config.unwrap_or_default()),
),
}
}
}
impl<T, W, S> fmt::Debug for Builder<T, W, S>
where
W: fmt::Debug,
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Builder")
.field("writer", &self.writer)
.field("serializer", &self.serializer)
.field("config", &self.config)
.finish()
}
}