use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::{Error, Message, MessageHeader};
pub(crate) mod stream;
pub use stream::StreamTransport;
#[cfg(feature = "tcp")]
pub use stream::TcpStreamInfo;
#[cfg(feature = "unix-stream")]
pub use stream::UnixStreamInfo;
pub(crate) mod unix;
pub use unix::UnixTransport;
#[cfg(feature = "unix-seqpacket")]
pub use unix::UnixSeqpacketInfo;
pub trait Transport: Send + 'static {
type Body: crate::Body;
type Info: Clone + Send + 'static;
type Config: Clone + Default + Send + Sync + 'static;
type ReadHalf: for<'a> ReadHalfType<'a, Body = Self::Body>;
type WriteHalf: for<'a> WriteHalfType<'a, Body = Self::Body>;
#[allow(clippy::needless_lifetimes)]
fn split<'a>(&'a mut self) -> (<Self::ReadHalf as ReadHalfType<'a>>::ReadHalf, <Self::WriteHalf as WriteHalfType<'a>>::WriteHalf);
fn info(&self) -> std::io::Result<Self::Info>;
}
#[derive(Debug)]
pub struct TransportError {
inner: Error,
is_fatal: bool,
}
impl TransportError {
fn new_fatal(inner: impl Into<Error>) -> Self {
Self {
inner: inner.into(),
is_fatal: true,
}
}
fn new_non_fatal(inner: impl Into<Error>) -> Self {
Self {
inner: inner.into(),
is_fatal: false,
}
}
pub fn inner(&self) -> &Error {
&self.inner
}
pub fn into_inner(self) -> Error {
self.inner
}
pub fn is_fatal(&self) -> bool {
self.is_fatal
}
}
impl std::error::Error for TransportError {}
impl std::fmt::Display for TransportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}
pub trait ReadHalfType<'a> {
type Body: crate::Body;
type ReadHalf: TransportReadHalf<Body = Self::Body>;
}
pub trait WriteHalfType<'a> {
type Body: crate::Body;
type WriteHalf: TransportWriteHalf<Body = Self::Body>;
}
pub trait TransportReadHalf: Send + Unpin {
type Body: crate::Body;
fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>>;
fn read_msg(&mut self) -> ReadMsg<Self>
where
Self: Unpin,
{
ReadMsg { inner: self }
}
}
pub trait TransportWriteHalf: Send + Unpin {
type Body: crate::Body;
fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), TransportError>>;
fn write_msg<'c>(&'c mut self, header: &'c MessageHeader, body: &'c Self::Body) -> WriteMsg<Self> {
WriteMsg { inner: self, header, body }
}
}
pub struct ReadMsg<'c, T>
where
T: TransportReadHalf + ?Sized,
{
inner: &'c mut T,
}
pub struct WriteMsg<'c, T>
where
T: TransportWriteHalf + ?Sized,
{
inner: &'c mut T,
header: &'c MessageHeader,
body: &'c T::Body,
}
impl<T> Future for ReadMsg<'_, T>
where
T: TransportReadHalf + ?Sized + Unpin,
{
type Output = Result<Message<T::Body>, TransportError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.get_mut().inner).poll_read_msg(cx)
}
}
impl<T> Future for WriteMsg<'_, T>
where
T: TransportWriteHalf + ?Sized + Unpin,
{
type Output = Result<(), TransportError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let header = self.header;
let body = self.body;
Pin::new(&mut *self.get_mut().inner).poll_write_msg(cx, header, body)
}
}
impl<T> TransportReadHalf for &'_ mut T
where
T: TransportReadHalf + Unpin + ?Sized,
{
type Body = T::Body;
fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
T::poll_read_msg(Pin::new(*self.get_mut()), context)
}
}
impl<T> TransportReadHalf for Box<T>
where
T: TransportReadHalf + Unpin + ?Sized,
{
type Body = T::Body;
fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
T::poll_read_msg(Pin::new(&mut *self.get_mut()), context)
}
}
impl<P> TransportReadHalf for Pin<P>
where
P: std::ops::DerefMut + Send + Unpin,
P::Target: TransportReadHalf,
{
type Body = <P::Target as TransportReadHalf>::Body;
fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
P::Target::poll_read_msg(Pin::new(&mut *self.get_mut()), context)
}
}
impl<T> TransportWriteHalf for &'_ mut T
where
T: TransportWriteHalf + Unpin + ?Sized,
{
type Body = T::Body;
fn poll_write_msg(
self: Pin<&mut Self>,
context: &mut Context,
header: &MessageHeader,
body: &Self::Body,
) -> Poll<Result<(), TransportError>> {
T::poll_write_msg(Pin::new(*self.get_mut()), context, header, body)
}
}
impl<T> TransportWriteHalf for Box<T>
where
T: TransportWriteHalf + Unpin + ?Sized,
{
type Body = T::Body;
fn poll_write_msg(
self: Pin<&mut Self>,
context: &mut Context,
header: &MessageHeader,
body: &Self::Body,
) -> Poll<Result<(), TransportError>> {
T::poll_write_msg(Pin::new(&mut *self.get_mut()), context, header, body)
}
}
impl<P> TransportWriteHalf for Pin<P>
where
P: std::ops::DerefMut + Send + Unpin,
P::Target: TransportWriteHalf,
{
type Body = <P::Target as TransportWriteHalf>::Body;
fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), TransportError>> {
P::Target::poll_write_msg(Pin::new(&mut *self.get_mut()), context, header, body)
}
}