pub mod gateway;
pub mod natpmp;
pub mod sequential;
pub mod upnp;
pub use sequential::{sequential_mapper_from_os, SequentialMapper};
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use tokio::sync::Notify;
use super::classify::NatClass;
use super::TraversalStats;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Protocol {
NatPmp,
Upnp,
}
impl Protocol {
pub fn as_str(&self) -> &'static str {
match self {
Protocol::NatPmp => "nat-pmp",
Protocol::Upnp => "upnp",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PortMapping {
pub external: std::net::SocketAddr,
pub internal_port: u16,
pub ttl: Duration,
pub protocol: Protocol,
}
#[derive(Debug, thiserror::Error, Clone)]
pub enum PortMappingError {
#[error("unavailable")]
Unavailable,
#[error("timeout")]
Timeout,
#[error("transport: {0}")]
Transport(String),
#[error("refused: {0}")]
Refused(String),
}
impl PortMappingError {
pub fn kind(&self) -> &'static str {
match self {
Self::Unavailable => "unavailable",
Self::Timeout => "timeout",
Self::Transport(_) => "transport",
Self::Refused(_) => "refused",
}
}
}
#[async_trait]
pub trait PortMapperClient: Send + Sync {
async fn probe(&self) -> Result<(), PortMappingError>;
async fn install(
&self,
internal_port: u16,
ttl: Duration,
) -> Result<PortMapping, PortMappingError>;
async fn renew(&self, mapping: &PortMapping) -> Result<PortMapping, PortMappingError>;
async fn remove(&self, mapping: &PortMapping);
}
#[derive(Clone)]
pub struct MappingSink {
stats: Arc<TraversalStats>,
reflex_addr: Arc<ArcSwapOption<std::net::SocketAddr>>,
nat_class: Arc<AtomicU8>,
reflex_override_active: Arc<AtomicBool>,
publish_mu: Arc<parking_lot::Mutex<()>>,
}
impl MappingSink {
pub fn new(
stats: Arc<TraversalStats>,
reflex_addr: Arc<ArcSwapOption<std::net::SocketAddr>>,
nat_class: Arc<AtomicU8>,
reflex_override_active: Arc<AtomicBool>,
publish_mu: Arc<parking_lot::Mutex<()>>,
) -> Self {
Self {
stats,
reflex_addr,
nat_class,
reflex_override_active,
publish_mu,
}
}
pub(crate) fn apply_install(&self, mapping: &PortMapping) {
self.stats.record_port_mapping_install(mapping.external);
let _g = self.publish_mu.lock();
self.reflex_addr.store(Some(Arc::new(mapping.external)));
self.nat_class
.store(NatClass::Open.as_u8(), Ordering::Release);
self.reflex_override_active.store(true, Ordering::Release);
}
pub(crate) fn apply_renewal(&self, mapping: &PortMapping) {
let prev = self.reflex_addr.load_full().map(|arc| *arc);
if prev != Some(mapping.external) {
let _g = self.publish_mu.lock();
self.reflex_addr.store(Some(Arc::new(mapping.external)));
self.stats.replace_port_mapping_external(mapping.external);
}
self.stats.record_port_mapping_renewal();
}
pub(crate) fn apply_revoke(&self) {
self.stats.record_port_mapping_revoke();
let _g = self.publish_mu.lock();
if self.reflex_override_active.swap(false, Ordering::AcqRel) {
self.reflex_addr.store(None);
self.nat_class
.store(NatClass::Unknown.as_u8(), Ordering::Release);
}
}
}
#[derive(Debug, Default)]
pub struct NullPortMapper;
impl NullPortMapper {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl PortMapperClient for NullPortMapper {
async fn probe(&self) -> Result<(), PortMappingError> {
Err(PortMappingError::Unavailable)
}
async fn install(
&self,
_internal_port: u16,
_ttl: Duration,
) -> Result<PortMapping, PortMappingError> {
Err(PortMappingError::Unavailable)
}
async fn renew(&self, _mapping: &PortMapping) -> Result<PortMapping, PortMappingError> {
Err(PortMappingError::Unavailable)
}
async fn remove(&self, _mapping: &PortMapping) {
}
}
#[derive(Debug)]
pub struct MockPortMapperClient {
probe_results: parking_lot::Mutex<std::collections::VecDeque<Result<(), PortMappingError>>>,
install_results:
parking_lot::Mutex<std::collections::VecDeque<Result<PortMapping, PortMappingError>>>,
renew_results:
parking_lot::Mutex<std::collections::VecDeque<Result<PortMapping, PortMappingError>>>,
remove_calls: std::sync::atomic::AtomicU32,
}
impl MockPortMapperClient {
pub fn new() -> Self {
Self {
probe_results: parking_lot::Mutex::new(Default::default()),
install_results: parking_lot::Mutex::new(Default::default()),
renew_results: parking_lot::Mutex::new(Default::default()),
remove_calls: std::sync::atomic::AtomicU32::new(0),
}
}
pub fn queue_probe(&self, result: Result<(), PortMappingError>) {
self.probe_results.lock().push_back(result);
}
pub fn queue_install(&self, result: Result<PortMapping, PortMappingError>) {
self.install_results.lock().push_back(result);
}
pub fn queue_renew(&self, result: Result<PortMapping, PortMappingError>) {
self.renew_results.lock().push_back(result);
}
pub fn remove_call_count(&self) -> u32 {
self.remove_calls.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn remaining_probes(&self) -> usize {
self.probe_results.lock().len()
}
pub fn remaining_installs(&self) -> usize {
self.install_results.lock().len()
}
}
impl Default for MockPortMapperClient {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl PortMapperClient for MockPortMapperClient {
async fn probe(&self) -> Result<(), PortMappingError> {
self.probe_results
.lock()
.pop_front()
.unwrap_or(Err(PortMappingError::Unavailable))
}
async fn install(
&self,
_internal_port: u16,
_ttl: Duration,
) -> Result<PortMapping, PortMappingError> {
self.install_results
.lock()
.pop_front()
.unwrap_or(Err(PortMappingError::Unavailable))
}
async fn renew(&self, _mapping: &PortMapping) -> Result<PortMapping, PortMappingError> {
self.renew_results
.lock()
.pop_front()
.unwrap_or(Err(PortMappingError::Unavailable))
}
async fn remove(&self, _mapping: &PortMapping) {
self.remove_calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub const DEFAULT_TTL: Duration = Duration::from_secs(3600);
pub(crate) const RENEWAL_FAILURE_THRESHOLD: u32 = 3;
pub struct PortMapperTask {
client: Box<dyn PortMapperClient>,
sink: MappingSink,
internal_port: u16,
renewal_interval: Duration,
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
}
impl PortMapperTask {
pub fn new(
client: Box<dyn PortMapperClient>,
sink: MappingSink,
internal_port: u16,
renewal_interval: Duration,
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
) -> Self {
Self {
client,
sink,
internal_port,
renewal_interval,
shutdown,
shutdown_notify,
}
}
pub async fn run(self) {
if self.shutdown.load(Ordering::Acquire) {
return;
}
if self.client.probe().await.is_err() {
return;
}
let mut mapping = match self.client.install(self.internal_port, DEFAULT_TTL).await {
Ok(m) => m,
Err(_) => return,
};
self.sink.apply_install(&mapping);
const RETRY_BACKOFF: Duration = Duration::from_millis(200);
fn effective_interval(renewal_interval: Duration, ttl: Duration) -> Duration {
let configured = if renewal_interval.is_zero() {
Duration::from_secs(1)
} else {
renewal_interval
};
let half_ttl = ttl / 2;
if !half_ttl.is_zero() && half_ttl < configured {
half_ttl
} else {
configured
}
}
let mut ticker =
tokio::time::interval(effective_interval(self.renewal_interval, mapping.ttl));
ticker.tick().await;
let mut consecutive_failures: u32 = 0;
loop {
if self.shutdown.load(Ordering::Acquire) {
break;
}
tokio::select! {
_ = self.shutdown_notify.notified() => break,
_ = ticker.tick() => {
let outcome = match self.client.renew(&mapping).await {
Ok(next) => Ok(next),
Err(e) => {
tokio::time::sleep(RETRY_BACKOFF).await;
if self.shutdown.load(Ordering::Acquire) {
break;
}
tracing::debug!(
error = %e,
"PortMapper renew: transient failure, retrying after {:?}",
RETRY_BACKOFF
);
self.client.renew(&mapping).await
}
};
match outcome {
Ok(next) => {
consecutive_failures = 0;
self.sink.apply_renewal(&next);
mapping = next;
let new_interval =
effective_interval(self.renewal_interval, mapping.ttl);
if new_interval != ticker.period() {
ticker = tokio::time::interval(new_interval);
ticker.tick().await; }
}
Err(_) => {
consecutive_failures = consecutive_failures.saturating_add(1);
if consecutive_failures >= RENEWAL_FAILURE_THRESHOLD {
break;
}
}
}
}
}
}
self.client.remove(&mapping).await;
self.sink.apply_revoke();
}
}
#[async_trait]
impl<C> PortMapperClient for Arc<C>
where
C: PortMapperClient + ?Sized,
{
async fn probe(&self) -> Result<(), PortMappingError> {
C::probe(self).await
}
async fn install(
&self,
internal_port: u16,
ttl: Duration,
) -> Result<PortMapping, PortMappingError> {
C::install(self, internal_port, ttl).await
}
async fn renew(&self, mapping: &PortMapping) -> Result<PortMapping, PortMappingError> {
C::renew(self, mapping).await
}
async fn remove(&self, mapping: &PortMapping) {
C::remove(self, mapping).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicU64;
fn sample_sink() -> (MappingSink, Arc<TraversalStats>, Arc<AtomicBool>) {
let stats = Arc::new(TraversalStats::new());
let reflex_addr = Arc::new(ArcSwapOption::empty());
let nat_class = Arc::new(AtomicU8::new(NatClass::Unknown.as_u8()));
let override_active = Arc::new(AtomicBool::new(false));
let publish_mu = Arc::new(parking_lot::Mutex::new(()));
let sink = MappingSink::new(
Arc::clone(&stats),
reflex_addr,
nat_class,
Arc::clone(&override_active),
publish_mu,
);
(sink, stats, override_active)
}
fn sample_mapping() -> PortMapping {
PortMapping {
external: "203.0.113.42:9001".parse().unwrap(),
internal_port: 9001,
ttl: Duration::from_secs(3600),
protocol: Protocol::NatPmp,
}
}
#[test]
fn null_port_mapper_always_unavailable() {
let mapper = NullPortMapper::new();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
assert!(matches!(
mapper.probe().await,
Err(PortMappingError::Unavailable)
));
assert!(matches!(
mapper.install(9001, DEFAULT_TTL).await,
Err(PortMappingError::Unavailable)
));
let m = sample_mapping();
assert!(matches!(
mapper.renew(&m).await,
Err(PortMappingError::Unavailable)
));
mapper.remove(&m).await;
});
}
#[test]
fn port_mapping_error_kind_stable() {
assert_eq!(PortMappingError::Unavailable.kind(), "unavailable");
assert_eq!(PortMappingError::Timeout.kind(), "timeout");
assert_eq!(
PortMappingError::Transport("socket died".into()).kind(),
"transport"
);
assert_eq!(
PortMappingError::Refused("NOT_AUTHORIZED".into()).kind(),
"refused"
);
}
#[test]
fn protocol_as_str_stable() {
assert_eq!(Protocol::NatPmp.as_str(), "nat-pmp");
assert_eq!(Protocol::Upnp.as_str(), "upnp");
}
#[test]
fn mapping_sink_apply_install_flips_override() {
let (sink, stats, override_active) = sample_sink();
let mapping = sample_mapping();
sink.apply_install(&mapping);
assert!(override_active.load(Ordering::Acquire));
let snap = stats.snapshot();
assert!(snap.port_mapping_active);
assert_eq!(snap.port_mapping_external, Some(mapping.external));
assert_eq!(snap.port_mapping_renewals, 0);
}
#[test]
fn mapping_sink_apply_renewal_bumps_counter() {
let (sink, stats, _) = sample_sink();
let mapping = sample_mapping();
sink.apply_install(&mapping);
assert_eq!(stats.snapshot().port_mapping_renewals, 0);
sink.apply_renewal(&mapping);
sink.apply_renewal(&mapping);
sink.apply_renewal(&mapping);
assert_eq!(stats.snapshot().port_mapping_renewals, 3);
assert_eq!(
stats.snapshot().port_mapping_external,
Some(mapping.external)
);
}
#[test]
fn mapping_sink_apply_renewal_propagates_new_external() {
use crate::adapter::net::traversal::classify::NatClass;
let (sink, stats, override_active) = sample_sink();
let initial = sample_mapping();
let initial_external = initial.external;
sink.apply_install(&initial);
assert_eq!(
stats.snapshot().port_mapping_external,
Some(initial_external)
);
let new_external: std::net::SocketAddr = "203.0.113.200:55555".parse().unwrap();
assert_ne!(new_external, initial_external, "test precondition");
let rebind = PortMapping {
external: new_external,
internal_port: initial.internal_port,
ttl: initial.ttl,
protocol: initial.protocol,
};
sink.apply_renewal(&rebind);
let snap = stats.snapshot();
assert_eq!(
snap.port_mapping_external,
Some(new_external),
"stats must carry the new mapped external, not the stale one",
);
assert_eq!(
snap.port_mapping_renewals, 1,
"renewal counter should still bump",
);
assert!(
override_active.load(Ordering::Acquire),
"override flag stays set across a rebind — only the address moves",
);
let nc = sink.nat_class.load(Ordering::Acquire);
assert_eq!(NatClass::from_u8(nc), NatClass::Open);
}
#[test]
fn mapping_sink_apply_renewal_skips_republish_when_external_unchanged() {
let (sink, stats, _) = sample_sink();
let mapping = sample_mapping();
sink.apply_install(&mapping);
let before = sink.reflex_addr.load_full().unwrap();
sink.apply_renewal(&mapping);
sink.apply_renewal(&mapping);
let after = sink.reflex_addr.load_full().unwrap();
assert!(
Arc::ptr_eq(&before, &after),
"unchanged external must not re-store the reflex Arc; got a new pointer",
);
assert_eq!(stats.snapshot().port_mapping_renewals, 2);
}
#[test]
fn mapping_sink_apply_revoke_clears_override() {
let (sink, stats, override_active) = sample_sink();
sink.apply_install(&sample_mapping());
assert!(override_active.load(Ordering::Acquire));
sink.apply_revoke();
assert!(!override_active.load(Ordering::Acquire));
let snap = stats.snapshot();
assert!(!snap.port_mapping_active);
assert_eq!(snap.port_mapping_external, None);
}
#[test]
fn mapping_sink_revoke_is_idempotent() {
let (sink, _, override_active) = sample_sink();
sink.apply_revoke();
assert!(!override_active.load(Ordering::Acquire));
sink.apply_install(&sample_mapping());
sink.apply_revoke();
sink.apply_revoke();
assert!(!override_active.load(Ordering::Acquire));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_exits_on_probe_failure_without_side_effects() {
let (sink, stats, override_active) = sample_sink();
let mock = Arc::new(MockPortMapperClient::new());
let task = PortMapperTask::new(
Box::new(Arc::clone(&mock)),
sink,
9001,
Duration::from_millis(50),
Arc::new(AtomicBool::new(false)),
Arc::new(Notify::new()),
);
task.run().await;
assert!(!override_active.load(Ordering::Acquire));
assert!(!stats.snapshot().port_mapping_active);
assert_eq!(mock.remove_call_count(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_installs_on_probe_and_install_success() {
let (sink, stats, override_active) = sample_sink();
let mock = Arc::new(MockPortMapperClient::new());
mock.queue_probe(Ok(()));
mock.queue_install(Ok(sample_mapping()));
let shutdown = Arc::new(AtomicBool::new(false));
let notify = Arc::new(Notify::new());
let task = PortMapperTask::new(
Box::new(Arc::clone(&mock)),
sink,
9001,
Duration::from_millis(20),
Arc::clone(&shutdown),
Arc::clone(¬ify),
);
let handle = tokio::spawn(task.run());
let mut installed = false;
for _ in 0..50 {
if stats.snapshot().port_mapping_active {
installed = true;
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(installed, "install should have fired");
assert!(override_active.load(Ordering::Acquire));
assert_eq!(
stats.snapshot().port_mapping_external,
Some(sample_mapping().external),
);
let task_done = tokio::time::timeout(Duration::from_secs(1), handle).await;
assert!(
task_done.is_ok(),
"task should exit after 3 renewal failures"
);
assert!(!override_active.load(Ordering::Acquire));
assert!(!stats.snapshot().port_mapping_active);
assert_eq!(mock.remove_call_count(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_survives_transient_renewal_failure() {
let (sink, stats, override_active) = sample_sink();
let mock = Arc::new(MockPortMapperClient::new());
mock.queue_probe(Ok(()));
mock.queue_install(Ok(sample_mapping()));
mock.queue_renew(Err(PortMappingError::Timeout));
mock.queue_renew(Err(PortMappingError::Timeout));
mock.queue_renew(Ok(sample_mapping()));
let shutdown = Arc::new(AtomicBool::new(false));
let notify = Arc::new(Notify::new());
let task = PortMapperTask::new(
Box::new(Arc::clone(&mock)),
sink,
9001,
Duration::from_millis(20),
Arc::clone(&shutdown),
Arc::clone(¬ify),
);
let handle = tokio::spawn(task.run());
let mut renewed = false;
for _ in 0..200 {
if stats.snapshot().port_mapping_renewals >= 1 {
renewed = true;
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(
renewed,
"renewal counter should bump after 2 transient failures → success",
);
assert!(override_active.load(Ordering::Acquire));
let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_exits_cleanly_on_shutdown() {
let (sink, stats, override_active) = sample_sink();
let mock = Arc::new(MockPortMapperClient::new());
mock.queue_probe(Ok(()));
mock.queue_install(Ok(sample_mapping()));
for _ in 0..100 {
mock.queue_renew(Ok(sample_mapping()));
}
let shutdown = Arc::new(AtomicBool::new(false));
let notify = Arc::new(Notify::new());
let task = PortMapperTask::new(
Box::new(Arc::clone(&mock)),
sink,
9001,
Duration::from_millis(30),
Arc::clone(&shutdown),
Arc::clone(¬ify),
);
let handle = tokio::spawn(task.run());
let mut installed = false;
for _ in 0..50 {
if stats.snapshot().port_mapping_active {
installed = true;
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(installed);
shutdown.store(true, Ordering::Release);
notify.notify_waiters();
let exit = tokio::time::timeout(Duration::from_secs(1), handle).await;
assert!(exit.is_ok(), "task should exit within 1 s of shutdown");
assert_eq!(mock.remove_call_count(), 1);
assert!(!override_active.load(Ordering::Acquire));
let snap = stats.snapshot();
assert!(!snap.port_mapping_active);
assert_eq!(snap.port_mapping_external, None);
}
#[test]
fn stats_snapshot_includes_port_mapping_fields() {
let stats = Arc::new(TraversalStats::new());
let _renewals_to_suppress_warning: AtomicU64 = AtomicU64::new(0);
let snap = stats.snapshot();
assert!(!snap.port_mapping_active);
assert_eq!(snap.port_mapping_external, None);
assert_eq!(snap.port_mapping_renewals, 0);
}
}