use crate::port_mapper::{upnp, PortMapperConfig};
use futures::FutureExt;
use natpmp::{new_tokio_natpmp, NatpmpAsync, Protocol, Response};
use tokio::{
net::UdpSocket,
sync::{mpsc, oneshot},
};
use std::{future::Future, net::Ipv4Addr, time::Duration};
const LOG_TARGET: &str = "emissary-util::port-mapper::nat-pmp";
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
const NUM_RETRIES: usize = 3usize;
const PORT_MAPPING_LIFETIME: u32 = 60 * 60;
const ADDRESS_REFRESH_TIMER: Duration = Duration::from_secs(5 * 60);
pub struct PortMapper {
address_tx: mpsc::Sender<Ipv4Addr>,
config: PortMapperConfig,
ntcp2_port: Option<u16>,
shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
ssu2_port: Option<u16>,
}
impl PortMapper {
pub fn new(
config: PortMapperConfig,
ntcp2_port: Option<u16>,
ssu2_port: Option<u16>,
address_tx: mpsc::Sender<Ipv4Addr>,
shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
) -> Self {
Self {
address_tx,
config,
ntcp2_port,
shutdown_rx,
ssu2_port,
}
}
async fn with_retries_and_timeout<T>(
mut future: impl Future<Output = natpmp::Result<T>> + Unpin,
) -> Result<T, ()> {
for _ in 0..NUM_RETRIES {
match tokio::time::timeout(RESPONSE_TIMEOUT, &mut future).await {
Err(_) => tracing::debug!(
target: LOG_TARGET,
"operation timed out",
),
Ok(Err(error)) => tracing::debug!(
target: LOG_TARGET,
?error,
"operation failed",
),
Ok(Ok(res)) => return Ok(res),
}
}
Err(())
}
fn try_switch_to_upnp(self) {
if !self.config.upnp {
tracing::warn!(
target: LOG_TARGET,
"nat-pmp failed and upnp not enabled, shutting down port mapper",
);
return;
}
tracing::warn!(
target: LOG_TARGET,
"nat-pmp failed, switching to upnp",
);
tokio::spawn(
upnp::PortMapper::new(
self.config,
self.ntcp2_port,
self.ssu2_port,
self.address_tx,
self.shutdown_rx,
)
.run(),
);
}
async fn try_map_ntcp2(&self, client: &NatpmpAsync<UdpSocket>) -> Result<Option<Response>, ()> {
let Some(ntcp2_port) = self.ntcp2_port else {
return Ok(None);
};
tracing::trace!(
target: LOG_TARGET,
?ntcp2_port,
"map ntcp2 port",
);
Self::with_retries_and_timeout(
async {
client
.send_port_mapping_request(
Protocol::TCP,
ntcp2_port,
ntcp2_port,
PORT_MAPPING_LIFETIME,
)
.await?;
client.read_response_or_retry().await
}
.boxed(),
)
.await
.map(Some)
}
async fn try_map_ssu2(&self, client: &NatpmpAsync<UdpSocket>) -> Result<Option<Response>, ()> {
let Some(ssu2_port) = self.ssu2_port else {
return Ok(None);
};
tracing::trace!(
target: LOG_TARGET,
?ssu2_port,
"map ssu2 port",
);
Self::with_retries_and_timeout(
async {
client
.send_port_mapping_request(
Protocol::UDP,
ssu2_port,
ssu2_port,
PORT_MAPPING_LIFETIME,
)
.await?;
client.read_response_or_retry().await
}
.boxed(),
)
.await
.map(Some)
}
async fn try_get_external_address(
client: &mut NatpmpAsync<UdpSocket>,
) -> Result<Option<Ipv4Addr>, ()> {
Self::with_retries_and_timeout(
async {
client.send_public_address_request().await?;
client.read_response_or_retry().await
}
.boxed(),
)
.await
.map(|result| match result {
Response::Gateway(response) => Some(*response.public_address()),
response => {
tracing::warn!(
target: LOG_TARGET,
?response,
"ignoring unexpected response",
);
None
}
})
}
pub async fn run(mut self) {
let Ok(mut client) = Self::with_retries_and_timeout(new_tokio_natpmp().boxed()).await
else {
return self.try_switch_to_upnp();
};
match self.try_map_ntcp2(&client).await {
Ok(None) => {}
Err(()) => return self.try_switch_to_upnp(),
Ok(Some(Response::TCP(_))) => tracing::debug!(
target: LOG_TARGET,
"ntcp2 port mapped",
),
Ok(Some(Response::UDP(_))) => tracing::debug!(
target: LOG_TARGET,
"ssu2 port mapped",
),
Ok(Some(response)) => tracing::warn!(
target: LOG_TARGET,
?response,
"ignoring unexpected response",
),
}
match self.try_map_ssu2(&client).await {
Ok(None) => {}
Err(()) => return self.try_switch_to_upnp(),
Ok(Some(Response::UDP(_))) => tracing::debug!(
target: LOG_TARGET,
"ssu2 port mapped",
),
Ok(Some(response)) => tracing::warn!(
target: LOG_TARGET,
?response,
"ignoring unexpected response",
),
}
let mut external_address = match Self::try_get_external_address(&mut client).await {
Err(()) => return self.try_switch_to_upnp(),
Ok(None) => return self.try_switch_to_upnp(),
Ok(Some(address)) => {
let _ = self.address_tx.send(address).await;
address
}
};
let mut external_address_timer = Box::pin(tokio::time::sleep(ADDRESS_REFRESH_TIMER));
let mut port_mapping_timer = Box::pin(tokio::time::sleep(Duration::from_secs(
(PORT_MAPPING_LIFETIME - 10) as u64,
)));
loop {
tokio::select! {
event = &mut self.shutdown_rx => match event {
Ok(tx) => {
tracing::info!(
target: LOG_TARGET,
"shutting down nat-pmp port manager",
);
let _ = tx.send(());
return;
}
Err(_) => return,
},
_ = &mut external_address_timer => {
if let Ok(Some(address)) = Self::try_get_external_address(&mut client).await {
if address != external_address {
tracing::info!(
target: LOG_TARGET,
new_address = ?address,
previous_address = ?external_address,
"new external address discovered",
);
let _ = self.address_tx.send(address).await;
external_address = address;
}
};
external_address_timer = Box::pin(tokio::time::sleep(ADDRESS_REFRESH_TIMER));
}
_ = &mut port_mapping_timer => {
match self.try_map_ntcp2(&client).await {
Ok(Some(Response::TCP(_))) => tracing::debug!(
target: LOG_TARGET,
"ntcp2 port remapped",
),
Ok(Some(response)) => tracing::warn!(
target: LOG_TARGET,
?response,
"ignoring unexpected response",
),
_ => {}
}
match self.try_map_ssu2(&client).await {
Ok(Some(Response::UDP(_))) => tracing::debug!(
target: LOG_TARGET,
"ssu2 port remapped",
),
Ok(Some(response)) => tracing::warn!(
target: LOG_TARGET,
?response,
"ignoring unexpected response",
),
_ => {}
}
port_mapping_timer = Box::pin(tokio::time::sleep(Duration::from_secs(
(PORT_MAPPING_LIFETIME - 10) as u64,
)));
}
}
}
}
}