use crate::v2::client::RelayedConnection;
use crate::v2::RequestId;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::future::{ready, BoxFuture, FutureExt, Ready};
use futures::ready;
use futures::sink::SinkExt;
use futures::stream::SelectAll;
use futures::stream::{Stream, StreamExt};
use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{ListenerId, TransportError, TransportEvent};
use libp2p_core::{PeerId, Transport};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use thiserror::Error;
pub struct ClientTransport {
to_behaviour: mpsc::Sender<TransportToBehaviourMsg>,
pending_to_behaviour: VecDeque<TransportToBehaviourMsg>,
listeners: SelectAll<RelayListener>,
}
impl ClientTransport {
pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) {
let (to_behaviour, from_transport) = mpsc::channel(0);
let transport = ClientTransport {
to_behaviour,
pending_to_behaviour: VecDeque::new(),
listeners: SelectAll::new(),
};
(transport, from_transport)
}
}
impl Transport for ClientTransport {
type Output = RelayedConnection;
type Error = RelayError;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = RelayedDial;
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? {
RelayedMultiaddr {
relay_peer_id: None,
relay_addr: _,
..
} => return Err(RelayError::MissingDstPeerId.into()),
RelayedMultiaddr {
relay_peer_id: _,
relay_addr: None,
..
} => return Err(RelayError::MissingRelayAddr.into()),
RelayedMultiaddr {
relay_peer_id: Some(peer_id),
relay_addr: Some(addr),
..
} => (peer_id, addr),
};
let (to_listener, from_behaviour) = mpsc::channel(0);
self.pending_to_behaviour
.push_back(TransportToBehaviourMsg::ListenReq {
relay_peer_id,
relay_addr,
to_listener,
});
let listener_id = ListenerId::new();
let listener = RelayListener {
listener_id,
queued_events: Default::default(),
from_behaviour,
is_closed: false,
};
self.listeners.push(listener);
Ok(listener_id)
}
fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
listener.close(Ok(()));
true
} else {
false
}
}
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let RelayedMultiaddr {
relay_peer_id,
relay_addr,
dst_peer_id,
dst_addr,
} = parse_relayed_multiaddr(addr)?;
let relay_peer_id = relay_peer_id.ok_or(RelayError::MissingRelayPeerId)?;
let relay_addr = relay_addr.ok_or(RelayError::MissingRelayAddr)?;
let dst_peer_id = dst_peer_id.ok_or(RelayError::MissingDstPeerId)?;
let mut to_behaviour = self.to_behaviour.clone();
Ok(async move {
let (tx, rx) = oneshot::channel();
to_behaviour
.send(TransportToBehaviourMsg::DialReq {
request_id: RequestId::new(),
relay_addr,
relay_peer_id,
dst_addr,
dst_peer_id,
send_back: tx,
})
.await?;
let stream = rx.await?.map_err(|()| RelayError::Connect)?;
Ok(stream)
}
.boxed())
}
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized,
{
Err(TransportError::MultiaddrNotSupported(addr))
}
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>
where
Self: Sized,
{
loop {
if !self.pending_to_behaviour.is_empty() {
match self.to_behaviour.poll_ready(cx) {
Poll::Ready(Ok(())) => {
let msg = self
.pending_to_behaviour
.pop_front()
.expect("Called !is_empty().");
let _ = self.to_behaviour.start_send(msg);
continue;
}
Poll::Ready(Err(_)) => unreachable!("Receiver is never dropped."),
Poll::Pending => {}
}
}
match self.listeners.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => return Poll::Ready(event),
_ => return Poll::Pending,
}
}
}
}
#[derive(Default)]
struct RelayedMultiaddr {
relay_peer_id: Option<PeerId>,
relay_addr: Option<Multiaddr>,
dst_peer_id: Option<PeerId>,
dst_addr: Option<Multiaddr>,
}
fn parse_relayed_multiaddr(
addr: Multiaddr,
) -> Result<RelayedMultiaddr, TransportError<RelayError>> {
if !addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) {
return Err(TransportError::MultiaddrNotSupported(addr));
}
let mut relayed_multiaddr = RelayedMultiaddr::default();
let mut before_circuit = true;
for protocol in addr.into_iter() {
match protocol {
Protocol::P2pCircuit => {
if before_circuit {
before_circuit = false;
} else {
return Err(RelayError::MultipleCircuitRelayProtocolsUnsupported.into());
}
}
Protocol::P2p(hash) => {
let peer_id = PeerId::from_multihash(hash).map_err(|_| RelayError::InvalidHash)?;
if before_circuit {
if relayed_multiaddr.relay_peer_id.is_some() {
return Err(RelayError::MalformedMultiaddr.into());
}
relayed_multiaddr.relay_peer_id = Some(peer_id)
} else {
if relayed_multiaddr.dst_peer_id.is_some() {
return Err(RelayError::MalformedMultiaddr.into());
}
relayed_multiaddr.dst_peer_id = Some(peer_id)
}
}
p => {
if before_circuit {
relayed_multiaddr
.relay_addr
.get_or_insert(Multiaddr::empty())
.push(p);
} else {
relayed_multiaddr
.dst_addr
.get_or_insert(Multiaddr::empty())
.push(p);
}
}
}
}
Ok(relayed_multiaddr)
}
pub struct RelayListener {
listener_id: ListenerId,
queued_events: VecDeque<<Self as Stream>::Item>,
from_behaviour: mpsc::Receiver<ToListenerMsg>,
is_closed: bool,
}
impl RelayListener {
fn close(&mut self, reason: Result<(), RelayError>) {
self.queued_events
.push_back(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
});
self.is_closed = true;
}
}
impl Stream for RelayListener {
type Item = TransportEvent<<ClientTransport as Transport>::ListenerUpgrade, RelayError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(Some(event));
}
if self.is_closed {
return Poll::Ready(None);
}
let msg = match ready!(self.from_behaviour.poll_next_unpin(cx)) {
Some(msg) => msg,
None => {
self.close(Ok(()));
continue;
}
};
match msg {
ToListenerMsg::Reservation(Ok(Reservation { addrs })) => {
debug_assert!(
self.queued_events.is_empty(),
"Assert empty due to previous `pop_front` attempt."
);
self.queued_events = addrs
.into_iter()
.map(|listen_addr| TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr,
})
.collect();
}
ToListenerMsg::IncomingRelayedConnection {
stream,
src_peer_id,
relay_addr,
relay_peer_id: _,
} => {
let listener_id = self.listener_id;
self.queued_events.push_back(TransportEvent::Incoming {
upgrade: ready(Ok(stream)),
listener_id,
local_addr: relay_addr.with(Protocol::P2pCircuit),
send_back_addr: Protocol::P2p(src_peer_id.into()).into(),
})
}
ToListenerMsg::Reservation(Err(())) => self.close(Err(RelayError::Reservation)),
};
}
}
}
pub type RelayedDial = BoxFuture<'static, Result<RelayedConnection, RelayError>>;
#[derive(Debug, Error)]
pub enum RelayError {
#[error("Missing relay peer id.")]
MissingRelayPeerId,
#[error("Missing relay address.")]
MissingRelayAddr,
#[error("Missing destination peer id.")]
MissingDstPeerId,
#[error("Invalid peer id hash.")]
InvalidHash,
#[error("Failed to send message to relay behaviour: {0:?}")]
SendingMessageToBehaviour(#[from] mpsc::SendError),
#[error("Response from behaviour was canceled")]
ResponseFromBehaviourCanceled(#[from] oneshot::Canceled),
#[error(
"Address contains multiple circuit relay protocols (`p2p-circuit`) which is not supported."
)]
MultipleCircuitRelayProtocolsUnsupported,
#[error("One of the provided multiaddresses is malformed.")]
MalformedMultiaddr,
#[error("Failed to get Reservation.")]
Reservation,
#[error("Failed to connect to destination.")]
Connect,
}
impl From<RelayError> for TransportError<RelayError> {
fn from(error: RelayError) -> Self {
TransportError::Other(error)
}
}
pub enum TransportToBehaviourMsg {
DialReq {
request_id: RequestId,
relay_addr: Multiaddr,
relay_peer_id: PeerId,
dst_addr: Option<Multiaddr>,
dst_peer_id: PeerId,
send_back: oneshot::Sender<Result<RelayedConnection, ()>>,
},
ListenReq {
relay_peer_id: PeerId,
relay_addr: Multiaddr,
to_listener: mpsc::Sender<ToListenerMsg>,
},
}
#[allow(clippy::large_enum_variant)]
pub enum ToListenerMsg {
Reservation(Result<Reservation, ()>),
IncomingRelayedConnection {
stream: RelayedConnection,
src_peer_id: PeerId,
relay_peer_id: PeerId,
relay_addr: Multiaddr,
},
}
pub struct Reservation {
pub(crate) addrs: Vec<Multiaddr>,
}