use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{
error::Error,
fmt,
pin::Pin,
task::{Context, Poll},
};
pub mod and_then;
pub mod choice;
pub mod dummy;
pub mod map;
pub mod map_err;
pub mod memory;
pub mod timeout;
pub mod upgrade;
mod boxed;
mod optional;
use crate::ConnectedPoint;
pub use self::boxed::Boxed;
pub use self::choice::OrTransport;
pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
pub use self::upgrade::Upgrade;
pub trait Transport {
type Output;
type Error: Error;
type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
type Dial: Future<Output = Result<Self::Output, Self::Error>>;
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>>;
fn remove_listener(&mut self, id: ListenerId) -> bool;
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>;
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>>;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>;
fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
{
boxed::boxed(self)
}
fn map<F, O>(self, f: F) -> map::Map<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output, ConnectedPoint) -> O,
{
map::Map::new(self, f)
}
fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
F: FnOnce(Self::Error) -> E,
{
map_err::MapErr::new(self, f)
}
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
where
Self: Sized,
U: Transport,
<U as Transport>::Error: 'static,
{
OrTransport::new(self, other)
}
fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F,
F: TryFuture<Ok = O>,
<F as TryFuture>::Error: Error + 'static,
{
and_then::AndThen::new(self, f)
}
fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
where
Self: Sized,
Self::Error: 'static,
{
upgrade::Builder::new(self, version)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ListenerId(u64);
impl ListenerId {
pub fn new() -> Self {
ListenerId(rand::random())
}
}
impl Default for ListenerId {
fn default() -> Self {
Self::new()
}
}
pub enum TransportEvent<TUpgr, TErr> {
NewAddress {
listener_id: ListenerId,
listen_addr: Multiaddr,
},
AddressExpired {
listener_id: ListenerId,
listen_addr: Multiaddr,
},
Incoming {
listener_id: ListenerId,
upgrade: TUpgr,
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
ListenerClosed {
listener_id: ListenerId,
reason: Result<(), TErr>,
},
ListenerError {
listener_id: ListenerId,
error: TErr,
},
}
impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
match self {
TransportEvent::Incoming {
listener_id,
upgrade,
local_addr,
send_back_addr,
} => TransportEvent::Incoming {
listener_id,
upgrade: map(upgrade),
local_addr,
send_back_addr,
},
TransportEvent::NewAddress {
listen_addr,
listener_id,
} => TransportEvent::NewAddress {
listen_addr,
listener_id,
},
TransportEvent::AddressExpired {
listen_addr,
listener_id,
} => TransportEvent::AddressExpired {
listen_addr,
listener_id,
},
TransportEvent::ListenerError { listener_id, error } => {
TransportEvent::ListenerError { listener_id, error }
}
TransportEvent::ListenerClosed {
listener_id,
reason,
} => TransportEvent::ListenerClosed {
listener_id,
reason,
},
}
}
pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
match self {
TransportEvent::Incoming {
listener_id,
upgrade,
local_addr,
send_back_addr,
} => TransportEvent::Incoming {
listener_id,
upgrade,
local_addr,
send_back_addr,
},
TransportEvent::NewAddress {
listen_addr,
listener_id,
} => TransportEvent::NewAddress {
listen_addr,
listener_id,
},
TransportEvent::AddressExpired {
listen_addr,
listener_id,
} => TransportEvent::AddressExpired {
listen_addr,
listener_id,
},
TransportEvent::ListenerError { listener_id, error } => TransportEvent::ListenerError {
listener_id,
error: map_err(error),
},
TransportEvent::ListenerClosed {
listener_id,
reason,
} => TransportEvent::ListenerClosed {
listener_id,
reason: reason.map_err(map_err),
},
}
}
pub fn is_upgrade(&self) -> bool {
matches!(self, TransportEvent::Incoming { .. })
}
pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> {
if let TransportEvent::Incoming {
upgrade,
send_back_addr,
..
} = self
{
Some((upgrade, send_back_addr))
} else {
None
}
}
pub fn is_new_address(&self) -> bool {
matches!(self, TransportEvent::NewAddress { .. })
}
pub fn into_new_address(self) -> Option<Multiaddr> {
if let TransportEvent::NewAddress { listen_addr, .. } = self {
Some(listen_addr)
} else {
None
}
}
pub fn is_address_expired(&self) -> bool {
matches!(self, TransportEvent::AddressExpired { .. })
}
pub fn into_address_expired(self) -> Option<Multiaddr> {
if let TransportEvent::AddressExpired { listen_addr, .. } = self {
Some(listen_addr)
} else {
None
}
}
pub fn is_listener_error(&self) -> bool {
matches!(self, TransportEvent::ListenerError { .. })
}
pub fn into_listener_error(self) -> Option<TErr> {
if let TransportEvent::ListenerError { error, .. } = self {
Some(error)
} else {
None
}
}
}
impl<TUpgr, TErr: fmt::Debug> fmt::Debug for TransportEvent<TUpgr, TErr> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
TransportEvent::NewAddress {
listener_id,
listen_addr,
} => f
.debug_struct("TransportEvent::NewAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
TransportEvent::AddressExpired {
listener_id,
listen_addr,
} => f
.debug_struct("TransportEvent::AddressExpired")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
TransportEvent::Incoming {
listener_id,
local_addr,
..
} => f
.debug_struct("TransportEvent::Incoming")
.field("listener_id", listener_id)
.field("local_addr", local_addr)
.finish(),
TransportEvent::ListenerClosed {
listener_id,
reason,
} => f
.debug_struct("TransportEvent::Closed")
.field("listener_id", listener_id)
.field("reason", reason)
.finish(),
TransportEvent::ListenerError { listener_id, error } => f
.debug_struct("TransportEvent::ListenerError")
.field("listener_id", listener_id)
.field("error", error)
.finish(),
}
}
}
#[derive(Debug, Clone)]
pub enum TransportError<TErr> {
MultiaddrNotSupported(Multiaddr),
Other(TErr),
}
impl<TErr> TransportError<TErr> {
pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
match self {
TransportError::MultiaddrNotSupported(addr) => {
TransportError::MultiaddrNotSupported(addr)
}
TransportError::Other(err) => TransportError::Other(map(err)),
}
}
}
impl<TErr> fmt::Display for TransportError<TErr>
where
TErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransportError::MultiaddrNotSupported(addr) => {
write!(f, "Multiaddr is not supported: {}", addr)
}
TransportError::Other(_) => Ok(()),
}
}
}
impl<TErr> Error for TransportError<TErr>
where
TErr: Error + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
TransportError::MultiaddrNotSupported(_) => None,
TransportError::Other(err) => Some(err),
}
}
}