volans-core 0.2.0

Core trait and struct for Volans networking framework
Documentation
use crate::{ConnectedPoint, Listener, ListenerEvent, Multiaddr, Transport, TransportError};
use futures::TryFuture;
use std::{
    marker::PhantomData,
    pin::Pin,
    task::{Context, Poll},
};

#[derive(Debug, Copy, Clone)]
pub struct Map<T, F> {
    transport: T,
    map: F,
}

impl<T, TMap> Map<T, TMap> {
    pub(crate) fn new(transport: T, map: TMap) -> Self {
        Map { transport, map }
    }

    pub fn inner(&self) -> &T {
        &self.transport
    }
}

impl<T, D, TMap> Transport for Map<T, TMap>
where
    T: Transport,
    TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
{
    type Output = D;
    type Error = T::Error;
    type Dial = MapFuture<T::Dial, TMap>;
    type Incoming = MapFuture<T::Incoming, TMap>;
    type Listener = MapListener<T, TMap>;

    fn dial(&self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
        match self.transport.dial(addr.clone()) {
            Ok(dial) => Ok(MapFuture {
                inner: dial,
                args: Some((self.map.clone(), ConnectedPoint::Dialer { addr })),
            }),
            Err(err) => Err(err),
        }
    }

    fn listen(&self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
        match self.transport.listen(addr) {
            Ok(listener) => Ok(MapListener {
                inner: listener,
                fun: self.map.clone(),
                _phantom: PhantomData,
            }),
            Err(err) => Err(err),
        }
    }
}

#[pin_project::pin_project]
#[derive(Clone, Debug)]
pub struct MapListener<T, TMap>
where
    T: Transport,
{
    #[pin]
    inner: T::Listener,
    fun: TMap,
    _phantom: PhantomData<T>,
}

impl<D, T, TMap> Listener for MapListener<T, TMap>
where
    T: Transport,
    TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
{
    type Output = D;
    type Error = T::Error;
    type Upgrade = MapFuture<T::Incoming, TMap>;

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.inner.poll_close(cx)
    }

    fn poll_event(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
        let this = self.project();
        this.inner.poll_event(cx).map(|event| match event {
            ListenerEvent::Incoming {
                local_addr,
                remote_addr,
                upgrade,
            } => ListenerEvent::Incoming {
                local_addr: local_addr.clone(),
                remote_addr: remote_addr.clone(),
                upgrade: MapFuture {
                    inner: upgrade,
                    args: Some((
                        this.fun.clone(),
                        ConnectedPoint::Listener {
                            local_addr,
                            remote_addr,
                        },
                    )),
                },
            },
            ListenerEvent::Closed(cause) => ListenerEvent::Closed(cause),
            ListenerEvent::NewAddress(addr) => ListenerEvent::NewAddress(addr),
            ListenerEvent::AddressExpired(addr) => ListenerEvent::AddressExpired(addr),
            ListenerEvent::Error(err) => ListenerEvent::Error(err),
        })
    }
}

#[pin_project::pin_project]
#[derive(Debug)]
pub struct MapFuture<T, F> {
    #[pin]
    inner: T,
    args: Option<(F, ConnectedPoint)>,
}

impl<T, A, F, B> Future for MapFuture<T, F>
where
    T: TryFuture<Ok = A>,
    F: FnOnce(A, ConnectedPoint) -> B,
{
    type Output = Result<B, T::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let item = match TryFuture::try_poll(this.inner, cx) {
            Poll::Pending => return Poll::Pending,
            Poll::Ready(Ok(v)) => v,
            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
        };
        let (f, a) = this.args.take().expect("MapFuture has already finished.");
        Poll::Ready(Ok(f(item, a)))
    }
}