use parking_lot::Mutex;
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
use async_trait::async_trait;
use super::natpmp::NatPmpMapper;
use super::upnp::UpnpMapper;
use super::{PortMapperClient, PortMapping, PortMappingError, Protocol};
pub struct SequentialMapper {
nat_pmp: Option<Box<dyn PortMapperClient>>,
upnp: Box<dyn PortMapperClient>,
active: Mutex<Option<Protocol>>,
}
impl SequentialMapper {
pub fn new(gateway: Option<Ipv4Addr>, local_ip: IpAddr) -> Self {
Self {
nat_pmp: gateway.map(|g| Box::new(NatPmpMapper::new(g)) as Box<dyn PortMapperClient>),
upnp: Box::new(UpnpMapper::new(local_ip)),
active: Mutex::new(None),
}
}
#[cfg(test)]
pub(crate) fn new_with_clients(
nat_pmp: Option<Box<dyn PortMapperClient>>,
upnp: Box<dyn PortMapperClient>,
) -> Self {
Self {
nat_pmp,
upnp,
active: Mutex::new(None),
}
}
pub fn active_protocol(&self) -> Option<Protocol> {
*self.active.lock()
}
fn set_active(&self, protocol: Option<Protocol>) {
*self.active.lock() = protocol;
}
}
#[async_trait]
impl PortMapperClient for SequentialMapper {
async fn probe(&self) -> Result<(), PortMappingError> {
if let Some(pmp) = &self.nat_pmp {
if pmp.probe().await.is_ok() {
self.set_active(Some(Protocol::NatPmp));
return Ok(());
}
}
self.upnp.probe().await?;
self.set_active(Some(Protocol::Upnp));
Ok(())
}
#[expect(
clippy::expect_used,
reason = "active is set to Some(Protocol::NatPmp) only after the constructor verifies a NatPmpMapper was provided AND its probe succeeded; nat_pmp is therefore Some here"
)]
async fn install(
&self,
internal_port: u16,
ttl: Duration,
) -> Result<PortMapping, PortMappingError> {
let active = self.active_protocol();
let first_err = match active {
None => return Err(PortMappingError::Unavailable),
Some(Protocol::NatPmp) => {
let pmp = self
.nat_pmp
.as_ref()
.expect("active NatPmp without nat_pmp client");
match pmp.install(internal_port, ttl).await {
Ok(m) => return Ok(m),
Err(e) => e,
}
}
Some(Protocol::Upnp) => match self.upnp.install(internal_port, ttl).await {
Ok(m) => return Ok(m),
Err(e) => e,
},
};
self.set_active(None);
let fallback_proto = match active {
Some(Protocol::NatPmp) => Protocol::Upnp,
Some(Protocol::Upnp) => Protocol::NatPmp,
None => unreachable!("handled above"),
};
match fallback_proto {
Protocol::NatPmp => {
let Some(pmp) = self.nat_pmp.as_ref() else {
return Err(first_err);
};
if pmp.probe().await.is_err() {
return Err(first_err);
}
match pmp.install(internal_port, ttl).await {
Ok(m) => {
self.set_active(Some(Protocol::NatPmp));
Ok(m)
}
Err(_) => Err(first_err),
}
}
Protocol::Upnp => match self.upnp.install(internal_port, ttl).await {
Ok(m) => {
self.set_active(Some(Protocol::Upnp));
Ok(m)
}
Err(_) => Err(first_err),
},
}
}
async fn renew(&self, mapping: &PortMapping) -> Result<PortMapping, PortMappingError> {
match mapping.protocol {
Protocol::NatPmp => {
if let Some(pmp) = &self.nat_pmp {
pmp.renew(mapping).await
} else {
Err(PortMappingError::Unavailable)
}
}
Protocol::Upnp => self.upnp.renew(mapping).await,
}
}
async fn remove(&self, mapping: &PortMapping) {
match mapping.protocol {
Protocol::NatPmp => {
if let Some(pmp) = &self.nat_pmp {
pmp.remove(mapping).await;
}
}
Protocol::Upnp => self.upnp.remove(mapping).await,
}
}
}
pub async fn sequential_mapper_from_os() -> Option<SequentialMapper> {
let gateway = super::gateway::default_ipv4_gateway();
let probe_target = gateway.unwrap_or(Ipv4Addr::new(8, 8, 8, 8));
let local_ip = super::gateway::local_ipv4_for_gateway(probe_target).await?;
Some(SequentialMapper::new(gateway, local_ip))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::traversal::portmap::MockPortMapperClient;
use std::sync::Arc;
fn sample_sequencer() -> SequentialMapper {
SequentialMapper::new(Some(Ipv4Addr::LOCALHOST), IpAddr::V4(Ipv4Addr::LOCALHOST))
}
#[test]
fn fresh_sequencer_has_no_active_protocol() {
let seq = sample_sequencer();
assert!(seq.active_protocol().is_none());
}
#[test]
fn new_without_gateway_skips_nat_pmp() {
let seq = SequentialMapper::new(None, IpAddr::V4(Ipv4Addr::LOCALHOST));
assert!(seq.nat_pmp.is_none());
}
#[test]
fn new_with_gateway_constructs_nat_pmp() {
let seq = sample_sequencer();
assert!(seq.nat_pmp.is_some());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn install_before_probe_is_unavailable() {
let seq = sample_sequencer();
let res = seq.install(9001, Duration::from_secs(60)).await;
assert!(matches!(res, Err(PortMappingError::Unavailable)));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn probe_without_responders_returns_last_error() {
let seq = sample_sequencer();
let start = tokio::time::Instant::now();
let res = seq.probe().await;
let elapsed = start.elapsed();
assert!(res.is_err(), "no responders on loopback");
assert!(seq.active_protocol().is_none());
assert!(
elapsed < Duration::from_secs(5),
"both-protocol probe should bound by ~3 s; took {elapsed:?}",
);
}
fn sample_mapping(protocol: Protocol) -> PortMapping {
PortMapping {
external: "203.0.113.42:9001".parse().unwrap(),
internal_port: 9001,
ttl: Duration::from_secs(3600),
protocol,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mock_client_surface_remains_usable() {
let mock = MockPortMapperClient::new();
mock.queue_probe(Ok(()));
assert!(mock.probe().await.is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn install_failure_on_cached_protocol_falls_back_to_other() {
let pmp = MockPortMapperClient::new();
pmp.queue_probe(Ok(()));
pmp.queue_install(Err(PortMappingError::Refused("nat-pmp policy".into())));
let upnp = MockPortMapperClient::new();
upnp.queue_install(Ok(sample_mapping(Protocol::Upnp)));
let seq = SequentialMapper::new_with_clients(Some(Box::new(pmp)), Box::new(upnp));
seq.probe().await.expect("probe should succeed on NAT-PMP");
assert_eq!(seq.active_protocol(), Some(Protocol::NatPmp));
let mapping = seq
.install(9001, Duration::from_secs(3600))
.await
.expect("install should fall back to UPnP when NAT-PMP refuses MAP");
assert_eq!(
mapping.protocol,
Protocol::Upnp,
"mapping should be tagged UPnP after fallback",
);
assert_eq!(
seq.active_protocol(),
Some(Protocol::Upnp),
"cache should repoint to UPnP after successful fallback",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn install_failure_on_both_surfaces_original_error_and_clears_cache() {
let pmp = MockPortMapperClient::new();
pmp.queue_probe(Ok(()));
pmp.queue_install(Err(PortMappingError::Refused("nat-pmp policy".into())));
let upnp = MockPortMapperClient::new();
upnp.queue_install(Err(PortMappingError::Unavailable));
let seq = SequentialMapper::new_with_clients(Some(Box::new(pmp)), Box::new(upnp));
seq.probe().await.expect("probe");
let err = seq
.install(9001, Duration::from_secs(3600))
.await
.expect_err("both installs fail — result must be Err");
match err {
PortMappingError::Refused(msg) => assert!(
msg.contains("nat-pmp policy"),
"should surface the original NAT-PMP error, got {msg:?}",
),
other => panic!("expected original Refused error; got {other:?}"),
}
assert!(
seq.active_protocol().is_none(),
"cache must be cleared when both installs fail",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fallback_to_nat_pmp_probes_before_installing() {
let pmp = Arc::new(MockPortMapperClient::new());
pmp.queue_probe(Err(PortMappingError::Unavailable));
pmp.queue_probe(Ok(()));
pmp.queue_install(Ok(sample_mapping(Protocol::NatPmp)));
let upnp = MockPortMapperClient::new();
upnp.queue_probe(Ok(()));
upnp.queue_install(Err(PortMappingError::Refused("upnp gateway busy".into())));
let seq = SequentialMapper::new_with_clients(Some(Box::new(pmp.clone())), Box::new(upnp));
seq.probe().await.expect("upnp probe should succeed");
assert_eq!(seq.active_protocol(), Some(Protocol::Upnp));
let mapping = seq
.install(9001, Duration::from_secs(3600))
.await
.expect("fallback to NAT-PMP must succeed");
assert_eq!(mapping.protocol, Protocol::NatPmp);
assert_eq!(seq.active_protocol(), Some(Protocol::NatPmp));
assert_eq!(
pmp.remaining_probes(),
0,
"fallback path must have consumed the queued NAT-PMP probe",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fallback_nat_pmp_probe_failure_surfaces_original_upnp_error() {
let pmp = Arc::new(MockPortMapperClient::new());
pmp.queue_probe(Err(PortMappingError::Unavailable));
pmp.queue_probe(Err(PortMappingError::Transport("no responder".into())));
pmp.queue_install(Ok(sample_mapping(Protocol::NatPmp)));
let upnp = MockPortMapperClient::new();
upnp.queue_probe(Ok(()));
upnp.queue_install(Err(PortMappingError::Refused("upnp gateway busy".into())));
let seq = SequentialMapper::new_with_clients(Some(Box::new(pmp.clone())), Box::new(upnp));
seq.probe().await.expect("upnp probe");
let err = seq
.install(9001, Duration::from_secs(3600))
.await
.expect_err("fallback probe failure must short-circuit");
match err {
PortMappingError::Refused(msg) => assert!(
msg.contains("upnp gateway busy"),
"must surface the original UPnP error, got {msg:?}",
),
other => panic!("expected original UPnP Refused error, got {other:?}"),
}
assert_eq!(
pmp.remaining_installs(),
1,
"fallback install must NOT fire when the fallback probe fails",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn install_failure_with_no_nat_pmp_fallback_surfaces_upnp_error() {
let upnp = MockPortMapperClient::new();
upnp.queue_probe(Ok(()));
upnp.queue_install(Err(PortMappingError::Unavailable));
let seq = SequentialMapper::new_with_clients(None, Box::new(upnp));
seq.probe().await.expect("upnp probe");
assert_eq!(seq.active_protocol(), Some(Protocol::Upnp));
let err = seq.install(9001, Duration::from_secs(3600)).await;
assert!(
matches!(err, Err(PortMappingError::Unavailable)),
"UPnP-only deployment with install failure should surface \
Unavailable without panicking on missing NAT-PMP client; got {err:?}",
);
assert!(
seq.active_protocol().is_none(),
"cache cleared on install failure regardless of fallback availability",
);
}
}