use super::error::Error;
use super::igd::forward_port;
use super::wire_msg::WireMsg;
use super::{
api::DEFAULT_UPNP_LEASE_DURATION_SEC,
connection_deduplicator::ConnectionDeduplicator,
connection_pool::ConnectionPool,
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection, RecvStream,
SendStream,
},
error::Result,
Config,
};
use bytes::Bytes;
use log::{debug, error, info, trace, warn};
use std::{net::SocketAddr, time::Duration};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::time::timeout;
const CERT_SERVER_NAME: &str = "MaidSAFE.net";
const PORT_FORWARD_TIMEOUT: u64 = 30;
pub struct IncomingMessages(pub(crate) UnboundedReceiver<(SocketAddr, Bytes)>);
impl IncomingMessages {
pub async fn next(&mut self) -> Option<(SocketAddr, Bytes)> {
self.0.recv().await
}
}
pub struct IncomingConnections(pub(crate) UnboundedReceiver<SocketAddr>);
impl IncomingConnections {
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}
pub struct DisconnectionEvents(pub(crate) UnboundedReceiver<SocketAddr>);
impl DisconnectionEvents {
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}
#[derive(Clone)]
pub struct Endpoint {
local_addr: SocketAddr,
public_addr: Option<SocketAddr>,
quic_endpoint: quinn::Endpoint,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
disconnection_tx: UnboundedSender<SocketAddr>,
client_cfg: quinn::ClientConfig,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
connection_pool: ConnectionPool,
connection_deduplicator: ConnectionDeduplicator,
}
impl std::fmt::Debug for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Endpoint")
.field("local_addr", &self.local_addr)
.field("quic_endpoint", &"<endpoint omitted>")
.field("client_cfg", &self.client_cfg)
.finish()
}
}
impl Endpoint {
pub(crate) async fn new(
quic_endpoint: quinn::Endpoint,
quic_incoming: quinn::Incoming,
client_cfg: quinn::ClientConfig,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
) -> Result<(
Self,
IncomingConnections,
IncomingMessages,
DisconnectionEvents,
)> {
let local_addr = quic_endpoint.local_addr()?;
let public_addr = match (qp2p_config.external_ip, qp2p_config.external_port) {
(Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)),
_ => None,
};
let connection_pool = ConnectionPool::new();
let (message_tx, message_rx) = mpsc::unbounded_channel();
let (connection_tx, connection_rx) = mpsc::unbounded_channel();
let (disconnection_tx, disconnection_rx) = mpsc::unbounded_channel();
let mut endpoint = Self {
local_addr,
public_addr,
quic_endpoint,
message_tx: message_tx.clone(),
disconnection_tx: disconnection_tx.clone(),
client_cfg,
bootstrap_nodes,
qp2p_config,
connection_pool: connection_pool.clone(),
connection_deduplicator: ConnectionDeduplicator::new(),
};
if let Some(addr) = endpoint.public_addr {
if let Some(contact) = endpoint.bootstrap_nodes.get(0) {
info!("Verifying provided public IP address");
endpoint.connect_to(contact).await?;
let connection = endpoint
.get_connection(&contact)
.ok_or(Error::MissingConnection)?;
let (mut send, mut recv) = connection.open_bi().await?;
send.send(WireMsg::EndpointVerificationReq(addr)).await?;
let response = WireMsg::read_from_stream(&mut recv.quinn_recv_stream).await?;
match response {
WireMsg::EndpointVerficationResp(valid) => {
if valid {
info!("Endpoint verification successful! {} is reachable.", addr);
} else {
error!("Endpoint verification failed! {} is not reachable.", addr);
return Err(Error::IncorrectPublicAddress);
}
}
other => {
error!(
"Unexpected message when verifying public endpoint: {}",
other
);
return Err(Error::UnexpectedMessageType(other));
}
}
} else {
warn!("Public IP address not verified since bootstrap contacts are empty");
}
} else {
endpoint.public_addr = Some(endpoint.fetch_public_address().await?);
}
listen_for_incoming_connections(
quic_incoming,
connection_pool,
message_tx,
connection_tx,
disconnection_tx,
);
Ok((
endpoint,
IncomingConnections(connection_rx),
IncomingMessages(message_rx),
DisconnectionEvents(disconnection_rx),
))
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn socket_addr(&self) -> SocketAddr {
self.public_addr.unwrap_or(self.local_addr)
}
async fn fetch_public_address(&mut self) -> Result<SocketAddr> {
if self.local_addr.ip().is_loopback() {
return Ok(self.local_addr);
}
if let Some(socket_addr) = self.public_addr {
return Ok(socket_addr);
}
let mut addr = None;
if self.qp2p_config.forward_port {
match timeout(
Duration::from_secs(PORT_FORWARD_TIMEOUT),
forward_port(
self.local_addr,
self.qp2p_config
.upnp_lease_duration
.unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC),
),
)
.await
{
Ok(res) => match res {
Ok(public_sa) => {
debug!("IGD success: {:?}", SocketAddr::V4(public_sa));
addr = Some(SocketAddr::V4(public_sa));
}
Err(e) => {
info!("IGD request failed: {} - {:?}", e, e);
return Err(Error::IgdNotSupported);
}
},
Err(e) => {
info!("IGD request timeout: {:?}", e);
return Err(Error::IgdNotSupported);
}
}
}
match timeout(Duration::from_secs(30), self.query_ip_echo_service()).await {
Ok(res) => match res {
Ok(echo_res) => match addr {
None => {
addr = Some(echo_res);
}
Some(address) => {
info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address);
}
},
Err(err) => {
info!("Could not contact echo service: {} - {:?}", err, err);
}
},
Err(e) => info!("Echo service timed out: {:?}", e),
}
addr.map_or(Err(Error::NoEchoServiceResponse), |socket_addr| {
self.public_addr = Some(socket_addr);
Ok(socket_addr)
})
}
pub fn disconnect_from(&mut self, peer_addr: &SocketAddr) -> Result<()> {
self.connection_pool
.remove(peer_addr)
.iter()
.for_each(|conn| {
conn.close(0u8.into(), b"");
});
Ok(())
}
pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<()> {
if self.connection_pool.has(node_addr) {
trace!("We are already connected to this peer: {}", node_addr);
}
match self.connection_deduplicator.query(node_addr).await {
Some(Ok(())) => return Ok(()),
Some(Err(error)) => return Err(error.into()),
None => {}
}
let connecting = match self.quic_endpoint.connect_with(
self.client_cfg.clone(),
node_addr,
CERT_SERVER_NAME,
) {
Ok(connecting) => connecting,
Err(error) => {
self.connection_deduplicator
.complete(node_addr, Err(error.clone().into()))
.await;
return Err(error.into());
}
};
let new_conn = match connecting.await {
Ok(new_conn) => new_conn,
Err(error) => {
self.connection_deduplicator
.complete(node_addr, Err(error.clone().into()))
.await;
return Err(error.into());
}
};
trace!("Successfully connected to peer: {}", node_addr);
let guard = self
.connection_pool
.insert(*node_addr, new_conn.connection.clone());
listen_for_incoming_messages(
new_conn.uni_streams,
new_conn.bi_streams,
guard,
self.message_tx.clone(),
self.disconnection_tx.clone(),
);
self.connection_deduplicator
.complete(node_addr, Ok(()))
.await;
Ok(())
}
pub(crate) fn get_connection(&self, peer_addr: &SocketAddr) -> Option<Connection> {
if let Some((conn, guard)) = self.connection_pool.get(peer_addr) {
trace!("Connection exists in the connection pool: {}", peer_addr);
Some(Connection::new(conn, guard))
} else {
None
}
}
pub async fn open_bidirectional_stream(
&self,
peer_addr: &SocketAddr,
) -> Result<(SendStream, RecvStream)> {
self.connect_to(peer_addr).await?;
let connection = self
.get_connection(peer_addr)
.ok_or(Error::MissingConnection)?;
connection.open_bi().await
}
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr) -> Result<()> {
let connection = self.get_connection(dest).ok_or(Error::MissingConnection)?;
connection.send_uni(msg).await?;
Ok(())
}
pub fn close(&self) {
self.quic_endpoint.close(0_u32.into(), b"")
}
async fn query_ip_echo_service(&self) -> Result<SocketAddr> {
if self.bootstrap_nodes.is_empty() {
return Err(Error::NoEchoServerEndpointDefined);
}
let mut tasks = Vec::default();
for node in self.bootstrap_nodes.iter().cloned() {
debug!("Connecting to {:?}", &node);
self.connect_to(&node).await?;
let connection = self.get_connection(&node).ok_or(Error::MissingConnection)?;
let task_handle = tokio::spawn(async move {
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
send_stream.send(WireMsg::EndpointEchoReq).await?;
match WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await {
Ok(WireMsg::EndpointEchoResp(socket_addr)) => Ok(socket_addr),
Ok(msg) => Err(Error::UnexpectedMessageType(msg)),
Err(err) => Err(err),
}
});
tasks.push(task_handle);
}
let (result, _) = futures::future::select_ok(tasks).await.map_err(|err| {
log::error!("Failed to contact echo service: {}", err);
Error::EchoServiceFailure(err.to_string())
})?;
result
}
pub(crate) fn bootstrap_nodes(&self) -> &[SocketAddr] {
&self.bootstrap_nodes
}
}