use crate::{InboundUpgrade, OutboundUpgrade, ConnectedPoint};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, fmt};
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
pub mod and_then;
pub mod boxed;
pub mod choice;
pub mod dummy;
pub mod map;
pub mod map_err;
pub mod memory;
pub mod timeout;
pub mod upgrade;
mod optional;
pub use self::choice::OrTransport;
pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
pub use self::upgrade::Upgrade;
pub trait Transport {
type Output;
type Error: error::Error;
type Listener: Stream<Item = ListenerEvent<Self::ListenerUpgrade>, Error = Self::Error>;
type ListenerUpgrade: Future<Item = Self::Output, Error = Self::Error>;
type Dial: Future<Item = Self::Output, Error = Self::Error>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>>
where
Self: Sized;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized;
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
{
boxed::boxed(self)
}
fn map<F, O>(self, map: F) -> map::Map<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone
{
map::Map::new(self, map)
}
fn map_err<F, TNewErr>(self, map_err: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
F: FnOnce(Self::Error) -> TNewErr + Clone
{
map_err::MapErr::new(self, map_err)
}
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
where
Self: Sized,
{
OrTransport::new(self, other)
}
fn with_upgrade<U, O, E>(self, upgrade: U) -> Upgrade<Self, U>
where
Self: Sized,
Self::Output: AsyncRead + AsyncWrite,
U: InboundUpgrade<Self::Output, Output = O, Error = E>,
U: OutboundUpgrade<Self::Output, Output = O, Error = E>
{
Upgrade::new(self, upgrade)
}
fn and_then<C, F, O>(self, upgrade: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
F: IntoFuture<Item = O>
{
and_then::AndThen::new(self, upgrade)
}
fn with_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
{
timeout::TransportTimeout::new(self, timeout)
}
fn with_outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
{
timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
}
fn with_inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
{
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum ListenerEvent<T> {
NewAddress(Multiaddr),
Upgrade {
upgrade: T,
listen_addr: Multiaddr,
remote_addr: Multiaddr
},
AddressExpired(Multiaddr)
}
impl<T> ListenerEvent<T> {
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> ListenerEvent<U> {
match self {
ListenerEvent::Upgrade { upgrade, listen_addr, remote_addr } => {
ListenerEvent::Upgrade { upgrade: f(upgrade), listen_addr, remote_addr }
}
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a)
}
}
pub fn is_upgrade(&self) -> bool {
if let ListenerEvent::Upgrade {..} = self {
true
} else {
false
}
}
pub fn into_upgrade(self) -> Option<(T, Multiaddr)> {
if let ListenerEvent::Upgrade { upgrade, remote_addr, .. } = self {
Some((upgrade, remote_addr))
} else {
None
}
}
pub fn is_new_address(&self) -> bool {
if let ListenerEvent::NewAddress(_) = self {
true
} else {
false
}
}
pub fn into_new_address(self) -> Option<Multiaddr> {
if let ListenerEvent::NewAddress(a) = self {
Some(a)
} else {
None
}
}
pub fn is_address_expired(&self) -> bool {
if let ListenerEvent::AddressExpired(_) = self {
true
} else {
false
}
}
pub fn into_address_expired(self) -> Option<Multiaddr> {
if let ListenerEvent::AddressExpired(a) = self {
Some(a)
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub enum TransportError<TErr> {
MultiaddrNotSupported(Multiaddr),
Other(TErr),
}
impl<TErr> TransportError<TErr> {
pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
match self {
TransportError::MultiaddrNotSupported(addr) => TransportError::MultiaddrNotSupported(addr),
TransportError::Other(err) => TransportError::Other(map(err)),
}
}
}
impl<TErr> fmt::Display for TransportError<TErr>
where TErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
TransportError::Other(err) => write!(f, "{}", err),
}
}
}
impl<TErr> error::Error for TransportError<TErr>
where TErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TransportError::MultiaddrNotSupported(_) => None,
TransportError::Other(err) => Some(err),
}
}
}