use crate::{
ConnectedPoint,
transport::{Transport, TransportError, ListenerEvent}
};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{pin::Pin, task::Context, task::Poll};
#[derive(Debug, Copy, Clone)]
pub struct Map<T, F> { transport: T, fun: F }
impl<T, F> Map<T, F> {
pub(crate) fn new(transport: T, fun: F) -> Self {
Map { transport, fun }
}
}
impl<T, F, D> Transport for Map<T, F>
where
T: Transport,
F: FnOnce(T::Output, ConnectedPoint) -> D + Clone
{
type Output = D;
type Error = T::Error;
type Listener = MapStream<T::Listener, F>;
type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
type Dial = MapFuture<T::Dial, F>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let stream = self.transport.listen_on(addr)?;
Ok(MapStream { stream, fun: self.fun })
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial(addr.clone())?;
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
}
}
#[pin_project::pin_project]
#[derive(Clone, Debug)]
pub struct MapStream<T, F> { #[pin] stream: T, fun: F }
impl<T, F, A, B, X, E> Stream for MapStream<T, F>
where
T: TryStream<Ok = ListenerEvent<X, E>, Error = E>,
X: TryFuture<Ok = A>,
F: FnOnce(A, ConnectedPoint) -> B + Clone
{
type Item = Result<ListenerEvent<MapFuture<X, F>, E>, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match TryStream::try_poll_next(this.stream, cx) {
Poll::Ready(Some(Ok(event))) => {
let event = match event {
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
let point = ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone()
};
ListenerEvent::Upgrade {
upgrade: MapFuture {
inner: upgrade,
args: Some((this.fun.clone(), point))
},
local_addr,
remote_addr
}
}
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
ListenerEvent::Error(e) => ListenerEvent::Error(e),
};
Poll::Ready(Some(Ok(event)))
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending
}
}
}
#[pin_project::pin_project]
#[derive(Clone, Debug)]
pub struct MapFuture<T, F> {
#[pin]
inner: T,
args: Option<(F, ConnectedPoint)>
}
impl<T, A, F, B> Future for MapFuture<T, F>
where
T: TryFuture<Ok = A>,
F: FnOnce(A, ConnectedPoint) -> B
{
type Output = Result<B, T::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let item = match TryFuture::try_poll(this.inner, cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(v)) => v,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
};
let (f, a) = this.args.take().expect("MapFuture has already finished.");
Poll::Ready(Ok(f(item, a)))
}
}