use crate::transport::{Transport, TransportError};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::error;
#[derive(Debug, Copy, Clone)]
pub struct MapErr<T, F> {
transport: T,
map: F,
}
impl<T, F> MapErr<T, F> {
#[inline]
pub(crate) fn new(transport: T, map: F) -> MapErr<T, F> {
MapErr { transport, map }
}
}
impl<T, F, TErr> Transport for MapErr<T, F>
where
T: Transport,
F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error,
{
type Output = T::Output;
type Error = TErr;
type Listener = MapErrListener<T, F>;
type ListenerUpgrade = MapErrListenerUpgrade<T, F>;
type Dial = MapErrDial<T, F>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let map = self.map;
match self.transport.listen_on(addr) {
Ok((stream, listen_addr)) => {
let stream = MapErrListener { inner: stream, map };
Ok((stream, listen_addr))
}
Err(err) => Err(err.map(move |err| map(err))),
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let map = self.map;
match self.transport.dial(addr) {
Ok(future) => Ok(MapErrDial { inner: future, map: Some(map) }),
Err(err) => Err(err.map(move |err| map(err))),
}
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(server, observed)
}
}
pub struct MapErrListener<T, F>
where T: Transport {
inner: T::Listener,
map: F,
}
impl<T, F, TErr> Stream for MapErrListener<T, F>
where T: Transport,
F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error,
{
type Item = (MapErrListenerUpgrade<T, F>, Multiaddr);
type Error = TErr;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(Some((value, addr)))) => Ok(Async::Ready(
Some((MapErrListenerUpgrade { inner: value, map: Some(self.map.clone()) }, addr)))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err((self.map.clone())(err)),
}
}
}
pub struct MapErrListenerUpgrade<T, F>
where T: Transport {
inner: T::ListenerUpgrade,
map: Option<F>,
}
impl<T, F, TErr> Future for MapErrListenerUpgrade<T, F>
where T: Transport,
F: FnOnce(T::Error) -> TErr,
{
type Item = T::Output;
type Error = TErr;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(value)) => {
Ok(Async::Ready(value))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
let map = self.map.take().expect("poll() called again after error");
Err(map(err))
}
}
}
}
pub struct MapErrDial<T, F>
where T: Transport
{
inner: T::Dial,
map: Option<F>,
}
impl<T, F, TErr> Future for MapErrDial<T, F>
where T: Transport,
F: FnOnce(T::Error) -> TErr,
{
type Item = T::Output;
type Error = TErr;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(value)) => {
Ok(Async::Ready(value))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
let map = self.map.take().expect("poll() called again after error");
Err(map(err))
}
}
}
}