use crate::{nodes::raw_swarm::ConnectedPoint, transport::Transport, transport::TransportError};
use futures::{prelude::*, try_ready};
use multiaddr::Multiaddr;
#[derive(Debug, Copy, Clone)]
pub struct Map<T, F> { transport: T, fun: F }
impl<T, F> Map<T, F> {
#[inline]
pub(crate) fn new(transport: T, fun: F) -> Self {
Map { transport, fun }
}
}
impl<T, F, D> Transport for Map<T, F>
where
T: Transport,
F: FnOnce(T::Output, ConnectedPoint) -> D + Clone
{
type Output = D;
type Error = T::Error;
type Listener = MapStream<T::Listener, F>;
type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
type Dial = MapFuture<T::Dial, F>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let (stream, listen_addr) = self.transport.listen_on(addr)?;
let stream = MapStream {
stream,
listen_addr: listen_addr.clone(),
fun: self.fun
};
Ok((stream, listen_addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial(addr.clone())?;
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture {
inner: future,
args: Some((self.fun, p))
})
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(server, observed)
}
}
#[derive(Clone, Debug)]
pub struct MapStream<T, F> { stream: T, listen_addr: Multiaddr, fun: F }
impl<T, F, A, B, X> Stream for MapStream<T, F>
where
T: Stream<Item = (X, Multiaddr)>,
X: Future<Item = A>,
F: FnOnce(A, ConnectedPoint) -> B + Clone
{
type Item = (MapFuture<X, F>, Multiaddr);
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.stream.poll()? {
Async::Ready(Some((future, addr))) => {
let f = self.fun.clone();
let p = ConnectedPoint::Listener {
listen_addr: self.listen_addr.clone(),
send_back_addr: addr.clone()
};
let future = MapFuture {
inner: future,
args: Some((f, p))
};
Ok(Async::Ready(Some((future, addr))))
}
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady)
}
}
}
#[derive(Clone, Debug)]
pub struct MapFuture<T, F> {
inner: T,
args: Option<(F, ConnectedPoint)>
}
impl<T, A, F, B> Future for MapFuture<T, F>
where
T: Future<Item = A>,
F: FnOnce(A, ConnectedPoint) -> B
{
type Item = B;
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let item = try_ready!(self.inner.poll());
let (f, a) = self.args.take().expect("MapFuture has already finished.");
Ok(Async::Ready(f(item, a)))
}
}