#![warn(missing_docs)]
#![warn(missing_copy_implementations, missing_debug_implementations)]
#![warn(unused_qualifications, unused_results)]
#![warn(future_incompatible)]
#![warn(unused)]
#![forbid(broken_intra_doc_links)]
use std::{future::Future, pin::Pin};
use dialectic::{
backend::{self, By, Choice, Mut, Receive, Ref, Transmit, Val},
Chan,
};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
mod error;
pub use error::*;
#[doc(no_inline)]
pub use tokio_util::codec;
pub trait Serializer {
type Error;
type Output;
fn serialize<T: Serialize>(&mut self, item: &T) -> Result<Self::Output, Self::Error>;
}
pub trait Deserializer<Input> {
type Error;
fn deserialize<T: for<'a> Deserialize<'a>>(&mut self, src: &Input) -> Result<T, Self::Error>;
}
pub fn symmetrical<F, E, W, R>(
format: F,
encoding: E,
writer: W,
reader: R,
) -> (Sender<F, E, W>, Receiver<F, E, R>)
where
F: Serializer + Deserializer<<E as Decoder>::Item> + Clone,
E: Encoder<<F as Serializer>::Output> + Decoder + Clone,
W: AsyncWrite,
R: AsyncRead,
{
(
Sender::new(format.clone(), encoding.clone(), writer),
Receiver::new(format, encoding, reader),
)
}
pub type SymmetricalChan<S, F, E, W, R> = Chan<S, Sender<F, E, W>, Receiver<F, E, R>>;
pub fn symmetrical_with_capacity<F, E, W, R>(
format: F,
encoding: E,
writer: W,
reader: R,
capacity: usize,
) -> (Sender<F, E, W>, Receiver<F, E, R>)
where
F: Serializer + Deserializer<<E as Decoder>::Item> + Clone,
E: Encoder<<F as Serializer>::Output> + Decoder + Clone,
W: AsyncWrite,
R: AsyncRead,
{
(
Sender::new(format.clone(), encoding.clone(), writer),
Receiver::with_capacity(format, encoding, reader, capacity),
)
}
#[derive(Debug)]
pub struct Sender<F, E, W> {
serializer: F,
framed_write: FramedWrite<W, E>,
}
impl<F: Serializer, E: Encoder<F::Output>, W: AsyncWrite> Sender<F, E, W> {
pub fn new(serializer: F, encoder: E, writer: W) -> Self {
Sender {
serializer,
framed_write: FramedWrite::new(writer, encoder),
}
}
}
impl<F, E, W> backend::Transmitter for Sender<F, E, W>
where
F: Serializer + Unpin + Send,
F::Output: Send,
F::Error: Send,
E: Encoder<F::Output> + Send,
W: AsyncWrite + Unpin + Send,
{
type Error = SendError<F, E>;
fn send_choice<'async_lifetime, const LENGTH: usize>(
&'async_lifetime mut self,
choice: Choice<LENGTH>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
Box::pin(async move {
let serialized = self
.serializer
.serialize(&choice)
.map_err(SendError::Serialize)?;
self.framed_write
.send(serialized)
.await
.map_err(SendError::Encode)?;
Ok(())
})
}
}
impl<T, F, E, W> Transmit<T, Val> for Sender<F, E, W>
where
T: Serialize + Send + 'static,
F: Serializer + Unpin + Send,
F::Output: Send,
F::Error: Send,
E: Encoder<F::Output> + Send,
W: AsyncWrite + Unpin + Send,
{
fn send<'a, 'async_lifetime>(
&'async_lifetime mut self,
message: <T as By<'a, Val>>::Type,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
where
'a: 'async_lifetime,
{
Box::pin(async move {
let serialized = self
.serializer
.serialize(&message)
.map_err(SendError::Serialize)?;
self.framed_write
.send(serialized)
.await
.map_err(SendError::Encode)?;
Ok(())
})
}
}
impl<T, F, E, W> Transmit<T, Ref> for Sender<F, E, W>
where
T: Serialize + Sync,
F: Serializer + Unpin + Send,
F::Output: Send,
F::Error: Send,
E: Encoder<F::Output> + Send,
W: AsyncWrite + Unpin + Send,
{
fn send<'a, 'async_lifetime>(
&'async_lifetime mut self,
message: <T as By<'a, Ref>>::Type,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
where
'a: 'async_lifetime,
{
Box::pin(async move {
let serialized = self
.serializer
.serialize(message)
.map_err(SendError::Serialize)?;
self.framed_write
.send(serialized)
.await
.map_err(SendError::Encode)?;
Ok(())
})
}
}
impl<T, F, E, W> Transmit<T, Mut> for Sender<F, E, W>
where
T: Serialize + Sync,
F: Serializer + Unpin + Send,
F::Output: Send,
F::Error: Send,
E: Encoder<F::Output> + Send,
W: AsyncWrite + Unpin + Send,
{
fn send<'a, 'async_lifetime>(
&'async_lifetime mut self,
message: <T as By<'a, Mut>>::Type,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
where
'a: 'async_lifetime,
{
<Self as Transmit<T, Ref>>::send(self, &*message)
}
}
#[derive(Debug)]
pub struct Receiver<F, D, R> {
deserializer: F,
framed_read: FramedRead<R, D>,
}
impl<F: Deserializer<D::Item>, D: Decoder, R: AsyncRead> Receiver<F, D, R> {
pub fn new(deserializer: F, decoder: D, reader: R) -> Self {
Receiver {
deserializer,
framed_read: FramedRead::new(reader, decoder),
}
}
pub fn with_capacity(deserializer: F, decoder: D, reader: R, capacity: usize) -> Self {
Receiver {
deserializer,
framed_read: FramedRead::with_capacity(reader, decoder, capacity),
}
}
}
impl<F, D, R> backend::Receiver for Receiver<F, D, R>
where
F: Deserializer<D::Item> + Unpin + Send,
D: Decoder + Send,
R: AsyncRead + Unpin + Send,
{
type Error = RecvError<F, D>;
fn recv_choice<'async_lifetime, const LENGTH: usize>(
&'async_lifetime mut self,
) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
{
<Self as backend::Receive<Choice<LENGTH>>>::recv(self)
}
}
impl<T, F, D, R> Receive<T> for Receiver<F, D, R>
where
T: for<'a> Deserialize<'a>,
F: Deserializer<D::Item> + Unpin + Send,
D: Decoder + Send,
R: AsyncRead + Unpin + Send,
{
fn recv<'async_lifetime>(
&'async_lifetime mut self,
) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
Box::pin(async move {
let unframed = self
.framed_read
.next()
.await
.ok_or(RecvError::Closed)?
.map_err(RecvError::Decode)?;
self.deserializer
.deserialize(&unframed)
.map_err(RecvError::Deserialize)
})
}
}