// Copyright 2020 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.
#[cfg(feature = "igd")]
use super::igd::{forward_port, IgdError};
use super::wire_msg::WireMsg;
use super::{
config::{Config, InternalConfig, RetryConfig},
connection_deduplicator::{ConnectionDeduplicator, DedupHandle},
connection_pool::{ConnId, ConnectionPool},
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectionEvents, RecvStream, SendStream,
},
error::{
ClientEndpointError, ConnectionError, EndpointError, RecvError, RpcError, SendError,
SerializationError,
},
};
use backoff::{future::retry, ExponentialBackoff};
use bytes::Bytes;
use std::net::SocketAddr;
use tokio::sync::broadcast::{self, Sender};
use tokio::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::time::{timeout, Duration};
use tracing::{debug, error, info, trace, warn};
/// Host name of the Quic communication certificate used by peers
// FIXME: make it configurable
const CERT_SERVER_NAME: &str = "MaidSAFE.net";
// Number of seconds before timing out the IGD request to forward a port.
#[cfg(feature = "igd")]
const PORT_FORWARD_TIMEOUT: Duration = Duration::from_secs(30);
// Number of seconds before timing out the echo service query.
const ECHO_SERVICE_QUERY_TIMEOUT: Duration = Duration::from_secs(30);
/// Standard size of our channel bounds
const STANDARD_CHANNEL_SIZE: usize = 10000;
/// Channel on which incoming messages can be listened to
#[derive(Debug)]
pub struct IncomingMessages(pub(crate) MpscReceiver<(SocketAddr, Bytes)>);
impl IncomingMessages {
/// Blocks and returns the next incoming message and the source peer address
pub async fn next(&mut self) -> Option<(SocketAddr, Bytes)> {
self.0.recv().await
}
}
/// Channel on which incoming connections are notified on
pub struct IncomingConnections(pub(crate) MpscReceiver<SocketAddr>);
impl IncomingConnections {
/// Blocks until there is an incoming connection and returns the address of the
/// connecting peer
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}
/// Endpoint instance which can be used to create connections to peers,
/// and listen to incoming messages from other peers.
#[derive(Clone)]
pub struct Endpoint<I: ConnId> {
local_addr: SocketAddr,
public_addr: Option<SocketAddr>,
quic_endpoint: quinn::Endpoint,
message_tx: MpscSender<(SocketAddr, Bytes)>,
disconnection_tx: MpscSender<SocketAddr>,
config: InternalConfig,
termination_tx: Sender<()>,
connection_pool: ConnectionPool<I>,
connection_deduplicator: ConnectionDeduplicator,
}
impl<I: ConnId> std::fmt::Debug for Endpoint<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Endpoint")
.field("local_addr", &self.local_addr)
.field("quic_endpoint", &"<endpoint omitted>")
.field("config", &self.config)
.finish()
}
}
impl<I: ConnId> Endpoint<I> {
/// Create a peer endpoint at the given address.
///
/// A peer endpoint, unlike a [client](Self::new_client) endpoint, can receive incoming
/// connections.
///
/// # Bootstrapping
///
/// When given a non-empty list of `contacts`, this will attempt to 'bootstrap' against them.
/// This involves connecting to all the contacts concurrently and selecting the first
/// successfully connected peer (if any), whose `SocketAddr` will be returned.
///
/// If bootstrapping is successful, the connected peer will be used to perform a reachability
/// check to validate that this endpoint can be reached at its
/// [`public_addr`](Self::public_addr).
///
/// **Note:** if no contacts are given, the [`public_addr`](Self::public_addr) of the endpoint
/// will not have been validated to be reachable by anyone
///
/// # Port forwarding (UPnP)
///
/// If configured (via `config.forward_port`), an external port mapping will be set up (using
/// the IGD UPnP protocol). The established external port will be reflected in
/// [`public_addr`](Self::public_addr), and the lease will be renewed automatically every
/// `config.upnp_lease_duration`.
pub async fn new(
local_addr: impl Into<SocketAddr>,
contacts: &[SocketAddr],
config: Config,
) -> Result<
(
Self,
IncomingConnections,
IncomingMessages,
DisconnectionEvents,
Option<SocketAddr>,
),
EndpointError,
> {
let config = InternalConfig::try_from_config(config)?;
let mut builder = quinn::Endpoint::builder();
let _ = builder.listen(config.server.clone());
let (mut endpoint, quic_incoming, channels) =
Self::build_endpoint(local_addr.into(), config, builder)?;
let contact = endpoint.connect_to_any(contacts).await;
let public_addr = endpoint.resolve_public_addr(contact).await?;
#[cfg(feature = "igd")]
if endpoint.config.forward_port {
timeout(
PORT_FORWARD_TIMEOUT,
forward_port(
public_addr.port(),
endpoint.local_addr(),
endpoint.config.upnp_lease_duration,
channels.termination.1,
),
)
.await
.map_err(|_| IgdError::TimedOut)??;
}
listen_for_incoming_connections(
quic_incoming,
endpoint.connection_pool.clone(),
channels.message.0.clone(),
channels.connection.0,
channels.disconnection.0.clone(),
endpoint.clone(),
);
if let Some(peer) = contact {
let valid = endpoint
.endpoint_verification(peer, public_addr)
.await
.map_err(|error| EndpointError::EndpointVerification { peer, error })?;
if !valid {
return Err(EndpointError::Unreachable { public_addr });
}
}
Ok((
endpoint,
IncomingConnections(channels.connection.1),
IncomingMessages(channels.message.1),
DisconnectionEvents(channels.disconnection.1),
contact,
))
}
/// Create a client endpoint at the given address.
///
/// A client endpoint cannot receive incoming connections, as such they also do not need to be
/// publicly reachable. They can still communicate over outgoing connections and receive
/// incoming streams, since QUIC allows for either side of a connection to initiate streams.
pub fn new_client(
local_addr: impl Into<SocketAddr>,
config: Config,
) -> Result<(Self, IncomingMessages, DisconnectionEvents), ClientEndpointError> {
let config = InternalConfig::try_from_config(config)?;
let (endpoint, _, channels) =
Self::build_endpoint(local_addr.into(), config, quinn::Endpoint::builder())?;
Ok((
endpoint,
IncomingMessages(channels.message.1),
DisconnectionEvents(channels.disconnection.1),
))
}
// A private helper for initialising an endpoint.
fn build_endpoint(
local_addr: SocketAddr,
config: InternalConfig,
builder: quinn::EndpointBuilder,
) -> Result<(Self, quinn::Incoming, Channels), quinn::EndpointError> {
let (quic_endpoint, quic_incoming) = builder.bind(&local_addr)?;
let local_addr = quic_endpoint
.local_addr()
.map_err(quinn::EndpointError::Socket)?;
let channels = Channels::new();
let endpoint = Self {
local_addr,
public_addr: None,
quic_endpoint,
message_tx: channels.message.0.clone(),
disconnection_tx: channels.disconnection.0.clone(),
config,
termination_tx: channels.termination.0.clone(),
connection_pool: ConnectionPool::new(),
connection_deduplicator: ConnectionDeduplicator::new(),
};
Ok((endpoint, quic_incoming, channels))
}
/// Endpoint local address
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
/// Get the public address of the endpoint.
pub fn public_addr(&self) -> SocketAddr {
self.public_addr.unwrap_or(self.local_addr)
}
/// Removes all existing connections to a given peer
pub async fn disconnect_from(&self, peer_addr: &SocketAddr) {
self.connection_pool
.remove(peer_addr)
.await
.iter()
.for_each(|conn| {
conn.close(0u8.into(), b"");
});
}
/// Connects to another peer, retries for `config.retry_duration_msec` if the connection fails.
///
/// **Note:** this method is intended for use when it's necessary to connect to a specific peer.
/// See [`connect_to_any`](Self::connect_to_any) if you just need a connection with any of a set
/// of peers.
///
/// Returns `Connection` which is a handle for sending messages to the peer and
/// `IncomingMessages` which is a stream of messages received from the peer.
/// The incoming messages stream might be `None`. See the next section for more info.
///
/// # Connection pooling
///
/// Connection are stored in an internal pool and reused if possible. A connection remains in
/// the pool while its `IncomingMessages` instances exists and while the connection is open.
///
/// When a new connection is established, this function returns both the `Connection` instance
/// and the `IncomingMessages` stream. If an existing connection is retrieved from the pool,
/// the incoming messages will be `None`. Multiple `Connection` instances can exists
/// simultaneously and they all share the same underlying connection resource. On the other
/// hand, at most one `IncomingMessages` stream can exist per peer.
///
/// How to handle the `IncomingMessages` depends on the networking model of the application:
///
/// In the peer-to-peer model, where peers can arbitrarily send and receive messages to/from
/// other peers, it is recommended to keep the `IncomingMessages` around and listen on it for
/// new messages by repeatedly calling `next` and only drop it when it returns `None`.
/// On the other hand, there is no need to keep `Connection` around as it can be cheaply
/// retrieved again when needed by calling `connect_to`. When the connection gets closed by the
/// peer or it timeouts due to inactivity, the incoming messages stream gets closed and once
/// it's dropped the connection gets removed from the pool automatically. Calling `connect_to`
/// afterwards will open a new connection.
///
/// In the client-server model, where only the client send requests to the server and then
/// listens for responses and never the other way around, it's OK to ignore (drop) the incoming
/// messages stream and only use bi-directional streams obtained by calling
/// `Connection::open_bi`. In this case the connection won't be pooled and the application is
/// responsible for caching it.
///
/// When sending a message on `Connection` fails, the connection is also automatically removed
/// from the pool and the subsequent call to `connect_to` is guaranteed to reopen new connection
/// too.
pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<(), ConnectionError> {
let _ = self.get_or_connect_to(node_addr).await?;
Ok(())
}
/// Connect to any of the given peers.
///
/// Often in peer-to-peer networks, it's sufficient to communicate to any node on the network,
/// rather than having to connect to specific nodes. This method will start connecting to every
/// peer in `peer_addrs`, and return the address of the first successfully established
/// connection (the rest are cancelled and discarded). All connection attempts will be retried
/// for `config.retry_duration_msec` on failure.
///
/// The successful connection, if any, will be stored in the connection pool (see
/// [`connect_to`](Self::connect_to) for more info on connection pooling).
pub async fn connect_to_any(&self, peer_addrs: &[SocketAddr]) -> Option<SocketAddr> {
trace!("Connecting to any of {:?}", peer_addrs);
if peer_addrs.is_empty() {
return None;
}
// Attempt to create a new connection to all nodes and return the first one to succeed
let tasks = peer_addrs
.iter()
.map(|addr| Box::pin(self.get_or_connect_to(addr)));
match futures::future::select_ok(tasks).await {
Ok((connection, _)) => Some(connection.remote_address()),
Err(error) => {
error!("Failed to bootstrap to the network, last error: {}", error);
None
}
}
}
/// Verify if an address is publicly reachable. This will attempt to create
/// a new connection and use it to exchange a message and verify that the node
/// can be reached.
pub async fn is_reachable(&self, peer_addr: &SocketAddr) -> Result<(), RpcError> {
trace!("Checking is reachable");
// avoid the connection pool
let quinn::NewConnection { connection, .. } = self.new_connection(peer_addr).await?;
let (send_stream, recv_stream) = connection.open_bi().await?;
let mut send_stream = SendStream::new(send_stream);
let mut recv_stream = RecvStream::new(recv_stream);
send_stream.send(WireMsg::EndpointEchoReq).await?;
match timeout(ECHO_SERVICE_QUERY_TIMEOUT, recv_stream.next_wire_msg()).await?? {
Some(WireMsg::EndpointEchoResp(_)) => Ok(()),
Some(other) => {
info!(
"Unexpected message type when verifying reachability: {}",
&other
);
Ok(())
}
None => {
info!(
"Peer {} did not reply when verifying reachability",
peer_addr
);
Ok(())
}
}
}
/// Get the connection ID of an existing connection with the provided socket address
pub async fn get_connection_id(&self, addr: &SocketAddr) -> Option<I> {
self.connection_pool
.get_by_addr(addr)
.await
.map(|(_, remover)| remover.id())
}
/// Get the SocketAddr of a connection using the connection ID
pub async fn get_socket_addr_by_id(&self, addr: &I) -> Option<SocketAddr> {
self.connection_pool
.get_by_id(addr)
.await
.map(|(_, remover)| *remover.remote_addr())
}
/// Open a bi-directional peer with a given peer
/// Priority default is 0. Both lower and higher can be passed in.
pub async fn open_bidirectional_stream(
&self,
peer_addr: &SocketAddr,
priority: i32,
) -> Result<(SendStream, RecvStream), ConnectionError> {
let connection = self.get_or_connect_to(peer_addr).await?;
connection.open_bi(priority).await
}
/// Send a message to a peer over an existing connection.
///
/// Priority default is 0. Both lower and higher can be passed in.
///
/// # Errors
///
/// If a connection with `dest` exists in the pool but the message fails to send,
/// `Err(Some(_))` will be returned. If there's no connection with `dest` in the pool, then
/// `Err(None)` will be returned.
pub async fn try_send_message(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
) -> Result<(), Option<SendError>> {
self.try_send_message_with(msg, dest, priority, None).await
}
/// Send a message to a peer over an existing connection.
///
/// Priority default is 0. Both lower and higher can be passed in.
///
/// Optional retry config is passed in, for custom retry policy.
///
/// # Errors
///
/// If a connection with `dest` exists in the pool but the message fails to send,
/// `Err(Some(_))` will be returned. If there's no connection with `dest` in the pool, then
/// `Err(None)` will be returned.
pub async fn try_send_message_with(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
retries: Option<RetryConfig>,
) -> Result<(), Option<SendError>> {
if let Some((conn, guard)) = self.connection_pool.get_by_addr(dest).await {
trace!("Connection exists in the connection pool: {}", dest);
let connection = Connection::new(conn, guard);
let msg_retry_cfg = retries.unwrap_or(self.config.retry_config);
Self::retry(msg_retry_cfg, || async {
Ok(connection.send_uni(msg.clone(), priority).await?)
})
.await?;
Ok(())
} else {
Err(None)
}
}
/// Sends a message to a peer. This will attempt to use an existing connection
/// to the peer first. If this connection is broken or doesn't exist
/// a new connection is created and the message is sent.
/// Priority default is 0. Both lower and higher can be passed in.
pub async fn send_message(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
) -> Result<(), SendError> {
self.send_message_with(msg, dest, priority, None).await
}
/// Sends a message to a peer. This will attempt to use an existing connection
/// to the peer first. If this connection is broken or doesn't exist
/// a new connection is created and the message is sent.
/// Priority default is 0. Both lower and higher can be passed in.
/// Optional retry config is passed in, for custom retry policy.
pub async fn send_message_with(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
retries: Option<RetryConfig>,
) -> Result<(), SendError> {
let connection = self.get_or_connect_to(dest).await?;
let msg_retry_cfg = retries.unwrap_or(self.config.retry_config);
Self::retry(msg_retry_cfg, || async {
Ok(connection.send_uni(msg.clone(), priority).await?)
})
.await?;
Ok(())
}
/// Close all the connections of this endpoint immediately and stop accepting new connections.
pub fn close(&self) {
let _ = self.termination_tx.send(());
self.quic_endpoint.close(0_u32.into(), b"")
}
/// Get a connection from the pool, or create one, for the given `addr`.
pub(crate) async fn get_or_connect_to(
&self,
addr: &SocketAddr,
) -> Result<Connection<I>, ConnectionError> {
let completion = loop {
if let Some((conn, remover)) = self.connection_pool.get_by_addr(addr).await {
trace!("We are already connected to this peer: {}", addr);
return Ok(Connection::new(conn, remover));
}
// Check if a connect attempt to this address is already in progress.
match self.connection_deduplicator.query(addr).await {
DedupHandle::Dup(Ok(())) => continue, // the connection should now be in the pool
DedupHandle::Dup(Err(error)) => return Err(error), // cannot connect
DedupHandle::New(completion) => break completion,
}
};
match self.new_connection(addr).await {
Ok(new_connection) => {
trace!("Successfully connected to peer: {}", addr);
let connection = new_connection.connection;
let id = ConnId::generate(&connection.remote_address());
let remover = self
.connection_pool
.insert(id, connection.remote_address(), connection.clone())
.await;
listen_for_incoming_messages(
new_connection.uni_streams,
new_connection.bi_streams,
remover.clone(),
self.message_tx.clone(),
self.disconnection_tx.clone(),
self.clone(),
);
let _ = completion.complete(Ok(()));
Ok(Connection::new(connection, remover))
}
Err(error) => {
let _ = completion.complete(Err(error.clone()));
Err(error)
}
}
}
/// Attempt a connection to a node_addr.
///
/// All failures are retried with exponential back-off. This doesn't use the connection pool, it
/// will always try to open a new connection.
async fn new_connection(
&self,
node_addr: &SocketAddr,
) -> Result<quinn::NewConnection, ConnectionError> {
Self::retry(self.config.retry_config, || async {
trace!("Attempting to connect to {:?}", node_addr);
let connecting = match self.quic_endpoint.connect_with(
self.config.client.clone(),
node_addr,
CERT_SERVER_NAME,
) {
Ok(conn) => Ok(conn),
Err(error) => {
warn!("Connection attempt failed due to {:?}", error);
Err(ConnectionError::from(error))
}
}?;
let new_conn = match connecting.await {
Ok(new_conn) => {
debug!("okay was had");
Ok(new_conn)
}
Err(error) => {
error!("some error: {:?}", error);
Err(ConnectionError::from(error))
}
}?;
Ok(new_conn)
})
.await
}
// set an appropriate public address based on `config` and a reachability check.
async fn resolve_public_addr(
&mut self,
contact: Option<SocketAddr>,
) -> Result<SocketAddr, EndpointError> {
let mut public_addr = self.local_addr;
// get the IP seen for us by our contact
let visible_addr = if let Some(peer) = contact {
Some(
self.endpoint_echo(peer)
.await
.map_err(|error| EndpointError::EndpointEcho { peer, error })?,
)
} else {
None
};
if let Some(external_ip) = self.config.external_ip {
// set the public IP based on config
public_addr.set_ip(external_ip);
if let Some(visible_addr) = visible_addr {
// if we set a different external IP than peers can see, we will have a bad time
if visible_addr.ip() != external_ip {
warn!(
"Configured external IP ({}) does not match that seen by peers ({})",
external_ip,
visible_addr.ip()
);
}
}
} else if let Some(visible_addr) = visible_addr {
// set the public IP based on that seen by the peer
public_addr.set_ip(visible_addr.ip());
} else {
// we have no good source for public IP, leave it as the local IP and warn
warn!(
"Could not determine better public IP than local IP ({})",
public_addr.ip()
);
}
if let Some(external_port) = self.config.external_port {
// set the public port based on config
public_addr.set_port(external_port);
if let Some(visible_addr) = visible_addr {
// if we set a different external IP than peers can see, we will have a bad time
if visible_addr.port() != external_port {
warn!(
"Configured external port ({}) does not match that seen by peers ({})",
external_port,
visible_addr.port()
);
}
}
} else if let Some(visible_addr) = visible_addr {
// set the public port based on that seen by the peer
public_addr.set_port(visible_addr.port());
} else {
// we have no good source for public port, leave it as the local port and warn
warn!(
"Could not determine better public port than local port ({})",
public_addr.port()
);
}
self.public_addr = Some(public_addr);
// Return the address so callers can avoid the optionality of `self.public_addr`
Ok(public_addr)
}
/// Perform the endpoint echo RPC with the given peer.
async fn endpoint_echo(&self, peer: SocketAddr) -> Result<SocketAddr, RpcError> {
let (mut send, mut recv) = self.open_bidirectional_stream(&peer, 0).await?;
send.send(WireMsg::EndpointEchoReq).await?;
match timeout(ECHO_SERVICE_QUERY_TIMEOUT, recv.next_wire_msg()).await?? {
Some(WireMsg::EndpointEchoResp(addr)) => Ok(addr),
msg => Err(RecvError::Serialization(SerializationError::unexpected(msg)).into()),
}
}
/// Perform the endpoint verification RPC with the given peer.
async fn endpoint_verification(
&self,
peer: SocketAddr,
public_addr: SocketAddr,
) -> Result<bool, RpcError> {
let (mut send, mut recv) = self.open_bidirectional_stream(&peer, 0).await?;
send.send(WireMsg::EndpointVerificationReq(public_addr))
.await?;
match timeout(ECHO_SERVICE_QUERY_TIMEOUT, recv.next_wire_msg()).await?? {
Some(WireMsg::EndpointVerificationResp(valid)) => Ok(valid),
msg => Err(RecvError::Serialization(SerializationError::unexpected(msg)).into()),
}
}
fn retry<R, E, Fn, Fut>(cfg: RetryConfig, op: Fn) -> impl futures::Future<Output = Result<R, E>>
where
Fn: FnMut() -> Fut,
Fut: futures::Future<Output = Result<R, backoff::Error<E>>>,
{
let backoff = ExponentialBackoff {
initial_interval: cfg.initial_retry_interval,
randomization_factor: cfg.retry_delay_rand_factor,
multiplier: cfg.retry_delay_multiplier,
max_interval: cfg.max_retry_interval,
max_elapsed_time: Some(cfg.retrying_max_elapsed_time),
..Default::default()
};
retry(backoff, op)
}
}
// a private helper struct for passing a bunch of channel-related things
type Msg = (SocketAddr, Bytes);
struct Channels {
connection: (MpscSender<SocketAddr>, MpscReceiver<SocketAddr>),
message: (MpscSender<Msg>, MpscReceiver<Msg>),
disconnection: (MpscSender<SocketAddr>, MpscReceiver<SocketAddr>),
termination: (Sender<()>, broadcast::Receiver<()>),
}
impl Channels {
fn new() -> Self {
Self {
connection: mpsc::channel(STANDARD_CHANNEL_SIZE),
message: mpsc::channel(STANDARD_CHANNEL_SIZE),
disconnection: mpsc::channel(STANDARD_CHANNEL_SIZE),
termination: broadcast::channel(1),
}
}
}
#[cfg(test)]
mod tests {
use super::Endpoint;
use crate::{tests::local_addr, Config};
use color_eyre::eyre::Result;
use std::net::SocketAddr;
#[tokio::test]
async fn new_without_external_addr() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
external_ip: None,
external_port: None,
..Default::default()
},
)
.await?;
assert_eq!(endpoint.public_addr(), endpoint.local_addr());
Ok(())
}
#[tokio::test]
async fn new_with_external_ip() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
external_ip: Some([123u8, 123, 123, 123].into()),
external_port: None,
..Default::default()
},
)
.await?;
assert_eq!(
endpoint.public_addr(),
SocketAddr::new([123u8, 123, 123, 123].into(), endpoint.local_addr().port())
);
Ok(())
}
#[tokio::test]
async fn new_with_external_port() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
external_ip: None,
external_port: Some(123),
..Default::default()
},
)
.await?;
assert_eq!(
endpoint.public_addr(),
SocketAddr::new(endpoint.local_addr().ip(), 123)
);
Ok(())
}
#[tokio::test]
async fn new_with_external_addr() -> Result<()> {
let (endpoint, _, _, _, _) = Endpoint::<[u8; 32]>::new(
local_addr(),
&[],
Config {
external_ip: Some([123u8, 123, 123, 123].into()),
external_port: Some(123),
..Default::default()
},
)
.await?;
assert_eq!(endpoint.public_addr(), "123.123.123.123:123".parse()?);
Ok(())
}
}