use parking_lot::Mutex;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use tokio::net::UdpSocket;
use super::{PortMapperClient, PortMapping, PortMappingError, Protocol};
pub const NATPMP_PORT: u16 = 5351;
pub const NATPMP_VERSION: u8 = 0;
pub const OP_EXTERNAL_ADDRESS: u8 = 0;
pub const OP_MAP_UDP: u8 = 1;
pub const RESPONSE_OP_OFFSET: u8 = 128;
pub const NATPMP_DEADLINE: Duration = Duration::from_secs(1);
pub const EXTERNAL_REQUEST_LEN: usize = 2;
pub const MAP_REQUEST_LEN: usize = 12;
pub const EXTERNAL_RESPONSE_LEN: usize = 12;
pub const MAP_RESPONSE_LEN: usize = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResultCode {
Success,
UnsupportedVersion,
NotAuthorized,
NetworkFailure,
OutOfResources,
UnsupportedOpcode,
Unknown(u16),
}
impl ResultCode {
pub fn from_u16(raw: u16) -> Self {
match raw {
0 => Self::Success,
1 => Self::UnsupportedVersion,
2 => Self::NotAuthorized,
3 => Self::NetworkFailure,
4 => Self::OutOfResources,
5 => Self::UnsupportedOpcode,
other => Self::Unknown(other),
}
}
pub fn as_str(&self) -> &'static str {
match self {
Self::Success => "success",
Self::UnsupportedVersion => "unsupported-version",
Self::NotAuthorized => "not-authorized",
Self::NetworkFailure => "network-failure",
Self::OutOfResources => "out-of-resources",
Self::UnsupportedOpcode => "unsupported-opcode",
Self::Unknown(_) => "unknown",
}
}
pub fn to_error(self) -> PortMappingError {
PortMappingError::Refused(self.as_str().to_string())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NatPmpRequest {
ExternalAddress,
MapUdp {
internal_port: u16,
external_port_hint: u16,
lifetime: u32,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NatPmpResponse {
ExternalAddress {
result: ResultCode,
epoch_seconds: u32,
external_ip: Ipv4Addr,
},
MapUdp {
result: ResultCode,
epoch_seconds: u32,
internal_port: u16,
mapped_port: u16,
lifetime: u32,
},
}
pub fn encode_request(req: &NatPmpRequest) -> Bytes {
match req {
NatPmpRequest::ExternalAddress => {
let mut buf = BytesMut::with_capacity(EXTERNAL_REQUEST_LEN);
buf.put_u8(NATPMP_VERSION);
buf.put_u8(OP_EXTERNAL_ADDRESS);
buf.freeze()
}
NatPmpRequest::MapUdp {
internal_port,
external_port_hint,
lifetime,
} => {
let mut buf = BytesMut::with_capacity(MAP_REQUEST_LEN);
buf.put_u8(NATPMP_VERSION);
buf.put_u8(OP_MAP_UDP);
buf.put_u16(0); buf.put_u16(*internal_port);
buf.put_u16(*external_port_hint);
buf.put_u32(*lifetime);
debug_assert_eq!(buf.len(), MAP_REQUEST_LEN);
buf.freeze()
}
}
}
pub fn decode_response(data: &[u8]) -> Option<NatPmpResponse> {
if data.len() < EXTERNAL_RESPONSE_LEN {
return None;
}
if data[0] != NATPMP_VERSION {
return None;
}
let raw_op = data[1];
if raw_op < RESPONSE_OP_OFFSET {
return None;
}
let op = raw_op - RESPONSE_OP_OFFSET;
let result = ResultCode::from_u16(u16::from_be_bytes([data[2], data[3]]));
let epoch_seconds = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
match op {
OP_EXTERNAL_ADDRESS => {
if data.len() < EXTERNAL_RESPONSE_LEN {
return None;
}
let external_ip = Ipv4Addr::new(data[8], data[9], data[10], data[11]);
Some(NatPmpResponse::ExternalAddress {
result,
epoch_seconds,
external_ip,
})
}
OP_MAP_UDP => {
if data.len() < MAP_RESPONSE_LEN {
return None;
}
let internal_port = u16::from_be_bytes([data[8], data[9]]);
let mapped_port = u16::from_be_bytes([data[10], data[11]]);
let lifetime = u32::from_be_bytes([data[12], data[13], data[14], data[15]]);
Some(NatPmpResponse::MapUdp {
result,
epoch_seconds,
internal_port,
mapped_port,
lifetime,
})
}
_ => None,
}
}
pub struct NatPmpMapper {
gateway: Ipv4Addr,
target_port: u16,
cached_external: Mutex<Option<Ipv4Addr>>,
}
impl NatPmpMapper {
pub fn new(gateway: Ipv4Addr) -> Self {
Self {
gateway,
target_port: NATPMP_PORT,
cached_external: Mutex::new(None),
}
}
#[cfg(test)]
pub(crate) fn new_for_test(gateway: Ipv4Addr, target_port: u16) -> Self {
Self {
gateway,
target_port,
cached_external: Mutex::new(None),
}
}
pub fn gateway(&self) -> Ipv4Addr {
self.gateway
}
fn cached_external(&self) -> Option<Ipv4Addr> {
*self.cached_external.lock()
}
fn set_cached_external(&self, ip: Ipv4Addr) {
*self.cached_external.lock() = Some(ip);
}
async fn round_trip(&self, request: Bytes) -> Result<Vec<u8>, PortMappingError> {
let sock = UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|e| PortMappingError::Transport(e.to_string()))?;
let target = SocketAddr::new(IpAddr::V4(self.gateway), self.target_port);
sock.connect(target)
.await
.map_err(|e| PortMappingError::Transport(e.to_string()))?;
sock.send(&request)
.await
.map_err(|e| PortMappingError::Transport(e.to_string()))?;
let mut buf = [0u8; 64];
let n = match tokio::time::timeout(NATPMP_DEADLINE, sock.recv(&mut buf)).await {
Ok(Ok(n)) => n,
Ok(Err(e)) => return Err(PortMappingError::Transport(e.to_string())),
Err(_) => return Err(PortMappingError::Timeout),
};
Ok(buf[..n].to_vec())
}
}
#[async_trait]
impl PortMapperClient for NatPmpMapper {
async fn probe(&self) -> Result<(), PortMappingError> {
let bytes = self
.round_trip(encode_request(&NatPmpRequest::ExternalAddress))
.await?;
let resp = decode_response(&bytes)
.ok_or_else(|| PortMappingError::Transport("malformed NAT-PMP response".into()))?;
match resp {
NatPmpResponse::ExternalAddress {
result: ResultCode::Success,
external_ip,
..
} => {
self.set_cached_external(external_ip);
Ok(())
}
NatPmpResponse::ExternalAddress { result, .. } => Err(result.to_error()),
_ => Err(PortMappingError::Transport(
"unexpected NAT-PMP response opcode".into(),
)),
}
}
async fn install(
&self,
internal_port: u16,
ttl: Duration,
) -> Result<PortMapping, PortMappingError> {
if ttl.is_zero() {
return Err(PortMappingError::Transport(
"NAT-PMP install with ttl=0 would unmap; \
caller must supply a non-zero lifetime"
.into(),
));
}
let lifetime = ttl.as_secs().min(u32::MAX as u64) as u32;
let req = NatPmpRequest::MapUdp {
internal_port,
external_port_hint: internal_port,
lifetime,
};
let bytes = self.round_trip(encode_request(&req)).await?;
let resp = decode_response(&bytes)
.ok_or_else(|| PortMappingError::Transport("malformed NAT-PMP response".into()))?;
match resp {
NatPmpResponse::MapUdp {
result: ResultCode::Success,
mapped_port,
lifetime: granted,
..
} => {
let external_ip = self.cached_external().ok_or_else(|| {
PortMappingError::Transport(
"NAT-PMP install called before successful probe — \
external address cache empty, refusing to publish \
gateway's private IP as external"
.into(),
)
})?;
Ok(PortMapping {
external: SocketAddr::new(IpAddr::V4(external_ip), mapped_port),
internal_port,
ttl: Duration::from_secs(granted as u64),
protocol: Protocol::NatPmp,
})
}
NatPmpResponse::MapUdp { result, .. } => Err(result.to_error()),
_ => Err(PortMappingError::Transport(
"unexpected NAT-PMP response opcode".into(),
)),
}
}
async fn renew(&self, mapping: &PortMapping) -> Result<PortMapping, PortMappingError> {
self.install(mapping.internal_port, mapping.ttl).await
}
async fn remove(&self, mapping: &PortMapping) {
const REMOVE_DEADLINE: std::time::Duration = std::time::Duration::from_millis(200);
let req = NatPmpRequest::MapUdp {
internal_port: mapping.internal_port,
external_port_hint: 0,
lifetime: 0,
};
let sock = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
tracing::warn!(
internal_port = mapping.internal_port,
error = %e,
"NAT-PMP remove: failed to bind UDP socket — \
mapping not revoked, gateway holds it until TTL"
);
return;
}
};
let target = SocketAddr::new(IpAddr::V4(self.gateway), self.target_port);
if let Err(e) = sock.connect(target).await {
tracing::warn!(
internal_port = mapping.internal_port,
error = %e,
"NAT-PMP remove: connect to gateway failed — \
mapping not revoked, gateway holds it until TTL"
);
return;
}
let bytes = encode_request(&req);
if let Err(e) = sock.send(&bytes).await {
tracing::warn!(
internal_port = mapping.internal_port,
error = %e,
"NAT-PMP remove: send to gateway failed — \
mapping not revoked, gateway holds it until TTL"
);
return;
}
let mut buf = [0u8; 16];
match tokio::time::timeout(REMOVE_DEADLINE, sock.recv(&mut buf)).await {
Ok(Ok(n)) if n > 0 => {
if n >= 4 {
let result_code = u16::from_be_bytes([buf[2], buf[3]]);
if result_code != 0 {
tracing::warn!(
internal_port = mapping.internal_port,
result_code,
"NAT-PMP remove: gateway returned non-zero result code"
);
}
}
}
Ok(Ok(_)) => {
tracing::warn!(
internal_port = mapping.internal_port,
"NAT-PMP remove: empty response from gateway"
);
}
Ok(Err(e)) => {
tracing::warn!(
internal_port = mapping.internal_port,
error = %e,
"NAT-PMP remove: recv error"
);
}
Err(_) => {
tracing::warn!(
internal_port = mapping.internal_port,
"NAT-PMP remove: gateway did not ack within {}ms — \
mapping may still be live",
REMOVE_DEADLINE.as_millis()
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn external_request_encodes_to_two_bytes() {
let bytes = encode_request(&NatPmpRequest::ExternalAddress);
assert_eq!(&bytes[..], &[NATPMP_VERSION, OP_EXTERNAL_ADDRESS]);
}
#[test]
fn map_udp_request_encodes_with_big_endian_fields() {
let req = NatPmpRequest::MapUdp {
internal_port: 9001,
external_port_hint: 9001,
lifetime: 3600,
};
let bytes = encode_request(&req);
assert_eq!(bytes.len(), MAP_REQUEST_LEN);
assert_eq!(bytes[0], NATPMP_VERSION);
assert_eq!(bytes[1], OP_MAP_UDP);
assert_eq!(&bytes[2..4], &[0, 0], "reserved must be zero");
assert_eq!(u16::from_be_bytes([bytes[4], bytes[5]]), 9001);
assert_eq!(u16::from_be_bytes([bytes[6], bytes[7]]), 9001);
assert_eq!(
u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]),
3600
);
}
#[test]
fn decode_external_address_response() {
let mut buf = Vec::with_capacity(EXTERNAL_RESPONSE_LEN);
buf.push(NATPMP_VERSION);
buf.push(OP_EXTERNAL_ADDRESS + RESPONSE_OP_OFFSET);
buf.extend_from_slice(&0u16.to_be_bytes()); buf.extend_from_slice(&12345u32.to_be_bytes());
buf.extend_from_slice(&[203, 0, 113, 7]);
match decode_response(&buf) {
Some(NatPmpResponse::ExternalAddress {
result: ResultCode::Success,
epoch_seconds: 12345,
external_ip,
}) => {
assert_eq!(external_ip, Ipv4Addr::new(203, 0, 113, 7));
}
other => panic!("expected ExternalAddress Success, got {other:?}"),
}
}
#[test]
fn decode_map_udp_response() {
let mut buf = Vec::with_capacity(MAP_RESPONSE_LEN);
buf.push(NATPMP_VERSION);
buf.push(OP_MAP_UDP + RESPONSE_OP_OFFSET);
buf.extend_from_slice(&0u16.to_be_bytes()); buf.extend_from_slice(&1234u32.to_be_bytes()); buf.extend_from_slice(&9001u16.to_be_bytes()); buf.extend_from_slice(&45678u16.to_be_bytes()); buf.extend_from_slice(&3600u32.to_be_bytes());
match decode_response(&buf) {
Some(NatPmpResponse::MapUdp {
result: ResultCode::Success,
internal_port: 9001,
mapped_port: 45678,
lifetime: 3600,
..
}) => {}
other => panic!("expected MapUdp Success, got {other:?}"),
}
}
#[tokio::test]
async fn install_rejects_zero_ttl_before_sending_wire_request() {
let gateway = Ipv4Addr::new(10, 0, 0, 1);
let mapper = NatPmpMapper::new(gateway);
let result = mapper.install(9001, Duration::ZERO).await;
match result {
Err(PortMappingError::Transport(msg)) => {
assert!(
msg.contains("ttl=0"),
"error message should mention ttl=0 (got: {msg})"
);
}
other => panic!(
"ttl=0 install must reject synchronously with Transport \
error, got {:?}",
other
),
}
}
#[test]
fn decode_result_code_variants() {
assert_eq!(ResultCode::from_u16(0), ResultCode::Success);
assert_eq!(ResultCode::from_u16(1), ResultCode::UnsupportedVersion);
assert_eq!(ResultCode::from_u16(2), ResultCode::NotAuthorized);
assert_eq!(ResultCode::from_u16(3), ResultCode::NetworkFailure);
assert_eq!(ResultCode::from_u16(4), ResultCode::OutOfResources);
assert_eq!(ResultCode::from_u16(5), ResultCode::UnsupportedOpcode);
assert_eq!(ResultCode::from_u16(42), ResultCode::Unknown(42));
}
#[test]
fn decode_rejects_wrong_version() {
let mut buf = vec![0u8; EXTERNAL_RESPONSE_LEN];
buf[0] = 2; buf[1] = OP_EXTERNAL_ADDRESS + RESPONSE_OP_OFFSET;
assert!(decode_response(&buf).is_none());
}
#[test]
fn decode_rejects_request_opcode() {
let mut buf = vec![0u8; EXTERNAL_RESPONSE_LEN];
buf[0] = NATPMP_VERSION;
buf[1] = OP_EXTERNAL_ADDRESS; assert!(decode_response(&buf).is_none());
}
#[test]
fn decode_rejects_truncated_response() {
let buf = vec![NATPMP_VERSION, OP_EXTERNAL_ADDRESS + RESPONSE_OP_OFFSET];
assert!(decode_response(&buf).is_none());
}
#[test]
fn decode_rejects_truncated_map_response() {
let mut buf = vec![0u8; 12];
buf[0] = NATPMP_VERSION;
buf[1] = OP_MAP_UDP + RESPONSE_OP_OFFSET;
assert!(decode_response(&buf).is_none());
}
#[test]
fn decode_rejects_unknown_opcode() {
let mut buf = vec![0u8; MAP_RESPONSE_LEN];
buf[0] = NATPMP_VERSION;
buf[1] = RESPONSE_OP_OFFSET + 42; assert!(decode_response(&buf).is_none());
}
#[test]
fn decode_response_never_panics_on_malformed_input() {
for len in 0..=32 {
let zeros = vec![0u8; len];
let _ = decode_response(&zeros);
let ones = vec![0xFFu8; len];
let _ = decode_response(&ones);
let ascending: Vec<u8> = (0..len).map(|i| i as u8).collect();
let _ = decode_response(&ascending);
let descending: Vec<u8> = (0..len).map(|i| (255 - i) as u8).collect();
let _ = decode_response(&descending);
}
let mut short_map = vec![0u8; EXTERNAL_RESPONSE_LEN];
short_map[0] = NATPMP_VERSION;
short_map[1] = OP_MAP_UDP + RESPONSE_OP_OFFSET;
assert!(decode_response(&short_map).is_none());
}
async fn spawn_mock_gateway<F>(respond: F) -> (u16, tokio::task::JoinHandle<()>)
where
F: Fn(&[u8]) -> Option<Vec<u8>> + Send + Sync + 'static,
{
let sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let port = sock.local_addr().unwrap().port();
let handle = tokio::spawn(async move {
let mut buf = [0u8; 64];
loop {
match sock.recv_from(&mut buf).await {
Ok((n, from)) => {
if let Some(reply) = respond(&buf[..n]) {
let _ = sock.send_to(&reply, from).await;
}
}
Err(_) => return,
}
}
});
(port, handle)
}
struct TestMapper {
gateway_port: u16,
}
impl TestMapper {
fn new(gateway_port: u16) -> Self {
Self { gateway_port }
}
async fn round_trip(&self, request: Bytes) -> Result<Vec<u8>, PortMappingError> {
let sock = UdpSocket::bind("127.0.0.1:0")
.await
.map_err(|e| PortMappingError::Transport(e.to_string()))?;
let target: SocketAddr = format!("127.0.0.1:{}", self.gateway_port).parse().unwrap();
sock.send_to(&request, target)
.await
.map_err(|e| PortMappingError::Transport(e.to_string()))?;
let mut buf = [0u8; 64];
let (n, _from) =
match tokio::time::timeout(NATPMP_DEADLINE, sock.recv_from(&mut buf)).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => return Err(PortMappingError::Transport(e.to_string())),
Err(_) => return Err(PortMappingError::Timeout),
};
Ok(buf[..n].to_vec())
}
}
fn encode_external_success(ip: Ipv4Addr) -> Vec<u8> {
let mut buf = Vec::with_capacity(EXTERNAL_RESPONSE_LEN);
buf.push(NATPMP_VERSION);
buf.push(OP_EXTERNAL_ADDRESS + RESPONSE_OP_OFFSET);
buf.extend_from_slice(&0u16.to_be_bytes());
buf.extend_from_slice(&0u32.to_be_bytes());
buf.extend_from_slice(&ip.octets());
buf
}
fn encode_map_success(internal: u16, mapped: u16, lifetime: u32) -> Vec<u8> {
let mut buf = Vec::with_capacity(MAP_RESPONSE_LEN);
buf.push(NATPMP_VERSION);
buf.push(OP_MAP_UDP + RESPONSE_OP_OFFSET);
buf.extend_from_slice(&0u16.to_be_bytes());
buf.extend_from_slice(&0u32.to_be_bytes());
buf.extend_from_slice(&internal.to_be_bytes());
buf.extend_from_slice(&mapped.to_be_bytes());
buf.extend_from_slice(&lifetime.to_be_bytes());
buf
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn nat_pmp_mapper_probe_times_out_against_dead_gateway() {
let silent = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let silent_port = silent.local_addr().unwrap().port();
let mapper = NatPmpMapper::new_for_test(Ipv4Addr::LOCALHOST, silent_port);
assert_eq!(mapper.gateway(), Ipv4Addr::LOCALHOST);
let start = tokio::time::Instant::now();
let res = mapper.probe().await;
let elapsed = start.elapsed();
assert!(
matches!(res, Err(PortMappingError::Timeout)),
"expected Timeout, got {res:?}",
);
assert!(
elapsed >= Duration::from_millis(800) && elapsed < Duration::from_millis(2000),
"deadline should be ~1 s; got {elapsed:?}",
);
assert!(mapper.cached_external().is_none());
drop(silent);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn round_trip_times_out_when_gateway_silent() {
let sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let port = sock.local_addr().unwrap().port();
let mapper = TestMapper::new(port);
let start = tokio::time::Instant::now();
let res = mapper
.round_trip(encode_request(&NatPmpRequest::ExternalAddress))
.await;
let elapsed = start.elapsed();
assert!(matches!(res, Err(PortMappingError::Timeout)));
assert!(
elapsed >= Duration::from_millis(800) && elapsed < Duration::from_millis(2000),
"expected ~1 s timeout, got {elapsed:?}",
);
drop(sock);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn probe_against_mock_gateway_returns_success() {
let (port, gw) = spawn_mock_gateway(|req| {
let decoded = req.to_vec();
if decoded == vec![NATPMP_VERSION, OP_EXTERNAL_ADDRESS] {
Some(encode_external_success(Ipv4Addr::new(198, 51, 100, 7)))
} else {
None
}
})
.await;
let sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let target: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
sock.send_to(&encode_request(&NatPmpRequest::ExternalAddress), target)
.await
.unwrap();
let mut buf = [0u8; 64];
let (n, _) = tokio::time::timeout(Duration::from_secs(1), sock.recv_from(&mut buf))
.await
.expect("gateway response")
.unwrap();
let resp = decode_response(&buf[..n]).expect("decode");
match resp {
NatPmpResponse::ExternalAddress {
result: ResultCode::Success,
external_ip,
..
} => {
assert_eq!(external_ip, Ipv4Addr::new(198, 51, 100, 7));
}
other => panic!("unexpected response {other:?}"),
}
gw.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn install_against_mock_gateway_returns_mapping() {
let (port, gw) = spawn_mock_gateway(|req| match req {
r if r == [NATPMP_VERSION, OP_EXTERNAL_ADDRESS] => {
Some(encode_external_success(Ipv4Addr::new(198, 51, 100, 7)))
}
r if r.len() == MAP_REQUEST_LEN && r[0] == NATPMP_VERSION && r[1] == OP_MAP_UDP => {
let internal = u16::from_be_bytes([r[4], r[5]]);
Some(encode_map_success(internal, 54321, 3600))
}
_ => None,
})
.await;
let sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let target: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
sock.send_to(&encode_request(&NatPmpRequest::ExternalAddress), target)
.await
.unwrap();
let mut buf = [0u8; 64];
let (n, _) = tokio::time::timeout(Duration::from_secs(1), sock.recv_from(&mut buf))
.await
.unwrap()
.unwrap();
let probe_ip = match decode_response(&buf[..n]).unwrap() {
NatPmpResponse::ExternalAddress {
result: ResultCode::Success,
external_ip,
..
} => external_ip,
other => panic!("probe failure {other:?}"),
};
assert_eq!(probe_ip, Ipv4Addr::new(198, 51, 100, 7));
let install_req = NatPmpRequest::MapUdp {
internal_port: 9001,
external_port_hint: 9001,
lifetime: 3600,
};
sock.send_to(&encode_request(&install_req), target)
.await
.unwrap();
let (n, _) = tokio::time::timeout(Duration::from_secs(1), sock.recv_from(&mut buf))
.await
.unwrap()
.unwrap();
let mapping = match decode_response(&buf[..n]).unwrap() {
NatPmpResponse::MapUdp {
result: ResultCode::Success,
internal_port: 9001,
mapped_port: 54321,
lifetime: 3600,
..
} => (9001u16, 54321u16),
other => panic!("install failure {other:?}"),
};
assert_eq!(mapping, (9001, 54321));
gw.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn round_trip_rejects_response_from_non_gateway_source() {
let gateway_sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let gateway_port = gateway_sock.local_addr().unwrap().port();
let (src_tx, mut src_rx) = tokio::sync::mpsc::unbounded_channel::<SocketAddr>();
let gateway_handle = tokio::spawn(async move {
let mut buf = [0u8; 64];
while let Ok((_n, from)) = gateway_sock.recv_from(&mut buf).await {
let _ = src_tx.send(from);
}
});
let spoofer_sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let spoofer_port = spoofer_sock.local_addr().unwrap().port();
assert_ne!(
spoofer_port, gateway_port,
"spoofer must use a different port"
);
let spoofer_handle = tokio::spawn(async move {
let forged = encode_external_success(Ipv4Addr::new(203, 0, 113, 66));
while let Some(client_src) = src_rx.recv().await {
let _ = spoofer_sock.send_to(&forged, client_src).await;
}
});
let mapper = NatPmpMapper::new_for_test(Ipv4Addr::LOCALHOST, gateway_port);
let start = tokio::time::Instant::now();
let res = mapper.probe().await;
let elapsed = start.elapsed();
assert!(
matches!(res, Err(PortMappingError::Timeout)),
"probe must time out — spoofed response should not leak through kernel filter; \
got {res:?} after {elapsed:?}",
);
assert!(
elapsed >= Duration::from_millis(800),
"timeout fired too early ({elapsed:?}); spoof may have short-circuited the deadline",
);
assert!(
mapper.cached_external().is_none(),
"external-IP cache must stay empty — if a spoofed IP landed here, the mesh would \
advertise the attacker's address as its reflex",
);
gateway_handle.abort();
spoofer_handle.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remove_is_fire_and_forget_and_does_not_block_on_silent_gateway() {
let silent = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let gateway_port = silent.local_addr().unwrap().port();
let mapper = NatPmpMapper::new_for_test(Ipv4Addr::LOCALHOST, gateway_port);
let mapping = PortMapping {
external: "203.0.113.1:9001".parse().unwrap(),
internal_port: 9001,
ttl: Duration::from_secs(3600),
protocol: Protocol::NatPmp,
};
let start = tokio::time::Instant::now();
mapper.remove(&mapping).await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"remove() blocked for {elapsed:?} — bounded recv regressed. \
On a silent gateway it must complete within REMOVE_DEADLINE \
(~200 ms) plus jitter, well under the original 1 s deadline",
);
drop(silent);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn install_without_prior_probe_refuses_rather_than_publishing_gateway_ip() {
let (port, gw) = spawn_mock_gateway(|req| {
if req.len() == MAP_REQUEST_LEN && req[0] == NATPMP_VERSION && req[1] == OP_MAP_UDP {
let internal = u16::from_be_bytes([req[4], req[5]]);
Some(encode_map_success(internal, internal, 3600))
} else {
None
}
})
.await;
let mapper = NatPmpMapper::new_for_test(Ipv4Addr::LOCALHOST, port);
assert!(
mapper.cached_external().is_none(),
"freshly-constructed mapper must have an empty external-IP cache",
);
let res = mapper.install(9001, Duration::from_secs(3600)).await;
match res {
Err(PortMappingError::Transport(msg)) => {
assert!(
msg.contains("install called before successful probe"),
"error detail must identify the precondition violation; got {msg:?}",
);
}
Ok(mapping) => panic!(
"install must NOT silently substitute the gateway LAN IP — \
got mapping with external={:?}. Pre-fix behavior would publish \
the gateway's private address to capability announcements",
mapping.external,
),
Err(other) => panic!("expected Transport(<precondition msg>); got {other:?}",),
}
assert!(mapper.cached_external().is_none());
gw.abort();
}
}