use crate::transports::ice::IceCandidate;
use anyhow::{Result, anyhow};
use igd::aio::Gateway;
use igd::PortMappingProtocol;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::{debug, trace, warn};
pub const DEFAULT_LEASE_DURATION: u32 = 3600;
pub const MIN_LEASE_DURATION: u32 = 300;
pub const MAX_LEASE_DURATION: u32 = 86400;
pub const DEFAULT_UPNP_DISCOVERY_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Debug, Clone)]
pub struct PortMapping {
pub external_port: u16,
pub internal_addr: SocketAddr,
pub lease_duration: u32,
pub description: String,
pub created_at: std::time::Instant,
}
impl PortMapping {
pub fn is_expired_or_stale(&self) -> bool {
let elapsed = self.created_at.elapsed().as_secs() as u32;
elapsed + 60 >= self.lease_duration
}
pub fn remaining_lifetime(&self) -> u32 {
let elapsed = self.created_at.elapsed().as_secs() as u32;
self.lease_duration.saturating_sub(elapsed)
}
}
#[derive(Debug, Clone)]
pub struct UpnpPortMapper {
gateway: Option<Gateway>,
mappings: Arc<Mutex<HashMap<u16, PortMapping>>>,
pub local_addr: SocketAddr,
pub default_lease_duration: u32,
enabled: bool,
}
impl UpnpPortMapper {
pub fn new(local_addr: SocketAddr) -> Self {
Self {
gateway: None,
mappings: Arc::new(Mutex::new(HashMap::new())),
local_addr,
default_lease_duration: DEFAULT_LEASE_DURATION,
enabled: true,
}
}
pub fn with_lease_duration(local_addr: SocketAddr, lease_duration: u32) -> Self {
let lease_duration = lease_duration.clamp(MIN_LEASE_DURATION, MAX_LEASE_DURATION);
Self {
gateway: None,
mappings: Arc::new(Mutex::new(HashMap::new())),
local_addr,
default_lease_duration: lease_duration,
enabled: true,
}
}
pub fn disable(&mut self) {
self.enabled = false;
self.gateway = None;
}
pub fn enable(&mut self) {
self.enabled = true;
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub async fn discover(&mut self) -> Result<()> {
self.discover_with_timeout(DEFAULT_UPNP_DISCOVERY_TIMEOUT).await
}
pub async fn discover_with_timeout(
&mut self,
timeout_duration: Duration,
) -> Result<()> {
if !self.enabled {
return Err(anyhow!("UPnP is disabled"));
}
if self.local_addr.ip().is_loopback() {
return Err(anyhow!("Cannot map loopback address"));
}
trace!(
"Starting UPnP gateway discovery (timeout: {:?})",
timeout_duration
);
let gateway = timeout(timeout_duration, igd::aio::search_gateway(Default::default()))
.await
.map_err(|_| {
anyhow!(
"UPnP gateway discovery timed out after {:?}",
timeout_duration
)
})?
.map_err(|e| anyhow!("UPnP gateway discovery failed: {}", e))?;
debug!("Found UPnP gateway");
self.gateway = Some(gateway);
Ok(())
}
pub fn has_gateway(&self) -> bool {
self.gateway.is_some()
}
pub async fn get_external_ip(&self) -> Result<Ipv4Addr> {
let gateway = self
.gateway
.as_ref()
.ok_or_else(|| anyhow!("No UPnP gateway available"))?;
let ip = gateway
.get_external_ip()
.await
.map_err(|e| anyhow!("Failed to get external IP: {}", e))?;
Ok(ip)
}
pub async fn add_mapping(&self, external_port: u16) -> Result<SocketAddr> {
if !self.enabled {
return Err(anyhow!("UPnP is disabled"));
}
let gateway = self
.gateway
.as_ref()
.ok_or_else(|| anyhow!("No UPnP gateway available, call discover() first"))?;
let external_ip = self.get_external_ip().await?;
let requested_port = if external_port == 0 {
self.local_addr.port()
} else {
external_port
};
let description = format!("rustrtc-{}", self.local_addr.port());
let local_ip = match self.local_addr.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(_) => return Err(anyhow!("IPv6 not supported for UPnP IGD")),
};
trace!(
"Adding UPnP port mapping: {}:{} -> {}:{}",
external_ip,
requested_port,
local_ip,
self.local_addr.port()
);
let internal_sock_addr = SocketAddrV4::new(local_ip, self.local_addr.port());
match gateway
.add_port(
PortMappingProtocol::UDP,
requested_port,
internal_sock_addr,
self.default_lease_duration,
&description,
)
.await
{
Ok(()) => {
let external_addr = SocketAddr::new(IpAddr::V4(external_ip), requested_port);
let mapping = PortMapping {
external_port: requested_port,
internal_addr: self.local_addr,
lease_duration: self.default_lease_duration,
description,
created_at: std::time::Instant::now(),
};
self.mappings.lock().await.insert(requested_port, mapping);
debug!(
"UPnP port mapping added: {} -> {}",
external_addr, self.local_addr
);
Ok(external_addr)
}
Err(e) => {
if external_port != 0 && requested_port != 0 {
warn!(
"Port {} is taken, trying random port: {}",
requested_port, e
);
self.add_mapping_random_port(gateway, external_ip, local_ip).await
} else {
Err(anyhow!("Failed to add UPnP port mapping: {}", e))
}
}
}
}
async fn add_mapping_random_port(
&self,
gateway: &Gateway,
external_ip: Ipv4Addr,
local_ip: Ipv4Addr,
) -> Result<SocketAddr> {
for port in 10000..=65535u16 {
let description = format!("rustrtc-{}", self.local_addr.port());
let internal_sock_addr = SocketAddrV4::new(local_ip, self.local_addr.port());
match gateway
.add_port(
PortMappingProtocol::UDP,
port,
internal_sock_addr,
self.default_lease_duration,
&description,
)
.await
{
Ok(()) => {
let external_addr = SocketAddr::new(IpAddr::V4(external_ip), port);
let mapping = PortMapping {
external_port: port,
internal_addr: self.local_addr,
lease_duration: self.default_lease_duration,
description,
created_at: std::time::Instant::now(),
};
self.mappings.lock().await.insert(port, mapping);
debug!(
"UPnP port mapping added (random port): {} -> {}",
external_addr, self.local_addr
);
return Ok(external_addr);
}
Err(_) => continue,
}
}
Err(anyhow!("Failed to find available port for UPnP mapping"))
}
pub async fn remove_mapping(&self, external_port: u16) -> Result<()> {
let gateway = match &self.gateway {
Some(g) => g,
None => {
self.mappings.lock().await.remove(&external_port);
return Ok(());
}
};
gateway
.remove_port(PortMappingProtocol::UDP, external_port)
.await
.map_err(|e| anyhow!("Failed to remove UPnP mapping: {}", e))?;
self.mappings.lock().await.remove(&external_port);
debug!("UPnP port mapping removed: {}", external_port);
Ok(())
}
pub async fn cleanup(&self) -> Result<()> {
let mappings = self.mappings.lock().await.clone();
let mut last_error = None;
for (port, _) in mappings {
if let Err(e) = self.remove_mapping(port).await {
warn!("Failed to remove UPnP mapping for port {}: {}", port, e);
last_error = Some(e);
}
}
match last_error {
Some(e) => Err(e),
None => Ok(()),
}
}
pub async fn mapping_count(&self) -> usize {
self.mappings.lock().await.len()
}
pub async fn has_mapping(&self, external_port: u16) -> bool {
self.mappings.lock().await.contains_key(&external_port)
}
pub async fn get_mappings(&self) -> HashMap<u16, PortMapping> {
self.mappings.lock().await.clone()
}
pub async fn renew_mapping(&self, external_port: u16) -> Result<bool> {
let needs_renewal = {
let mappings = self.mappings.lock().await;
match mappings.get(&external_port) {
Some(mapping) if mapping.is_expired_or_stale() => true,
Some(_) => return Ok(false), None => return Ok(false), }
};
if !needs_renewal {
return Ok(false);
}
let _ = self.remove_mapping(external_port).await;
self.add_mapping(external_port).await?;
debug!("Renewed UPnP mapping for port {}", external_port);
Ok(true)
}
pub async fn renew_all_stale(&self) -> Result<usize> {
let ports_to_renew: Vec<u16> = {
let mappings = self.mappings.lock().await;
mappings
.values()
.filter(|m| m.is_expired_or_stale())
.map(|m| m.external_port)
.collect()
};
let mut renewed = 0;
for port in ports_to_renew {
if self.renew_mapping(port).await? {
renewed += 1;
}
}
Ok(renewed)
}
pub async fn create_candidate(&self) -> Result<IceCandidate> {
let mappings = self.mappings.lock().await;
let mapping = mappings
.values()
.next()
.ok_or_else(|| anyhow!("No UPnP mappings available"))?;
let external_addr = SocketAddr::new(
IpAddr::V4(self.get_external_ip().await?),
mapping.external_port,
);
Ok(IceCandidate::server_reflexive(
mapping.internal_addr,
external_addr,
1, ))
}
}
pub async fn try_create_upnp_candidate(local_addr: SocketAddr) -> Option<IceCandidate> {
if local_addr.ip().is_loopback() {
return None;
}
let mut mapper = UpnpPortMapper::new(local_addr);
if let Err(e) = mapper.discover().await {
trace!("UPnP discovery failed for {}: {}", local_addr, e);
return None;
}
let external_addr = match mapper.add_mapping(0).await {
Ok(addr) => addr,
Err(e) => {
debug!("UPnP mapping failed for {}: {}", local_addr, e);
return None;
}
};
let candidate = IceCandidate::server_reflexive(local_addr, external_addr, 1);
debug!(
"Created UPnP candidate: {} -> {}",
local_addr, external_addr
);
Some(candidate)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_port_mapping_expiry() {
let mapping = PortMapping {
external_port: 12345,
internal_addr: "192.168.1.100:5000".parse().unwrap(),
lease_duration: 70,
description: "test".to_string(),
created_at: std::time::Instant::now(),
};
assert!(!mapping.is_expired_or_stale());
let remaining = mapping.remaining_lifetime();
assert!(remaining >= 69 && remaining <= 70);
}
#[test]
fn test_port_mapping_remaining_lifetime() {
let mapping = PortMapping {
external_port: 12345,
internal_addr: "192.168.1.100:5000".parse().unwrap(),
lease_duration: 60,
description: "test".to_string(),
created_at: std::time::Instant::now(),
};
let remaining = mapping.remaining_lifetime();
assert!(remaining > 55 && remaining <= 60);
std::thread::sleep(std::time::Duration::from_millis(100));
let new_remaining = mapping.remaining_lifetime();
assert!(new_remaining <= remaining, "remaining={}, new_remaining={}", remaining, new_remaining);
}
#[test]
fn test_upnp_mapper_creation() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
assert!(mapper.is_enabled());
assert!(!mapper.has_gateway());
assert_eq!(mapper.local_addr, addr);
}
#[test]
fn test_upnp_mapper_disable_enable() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
assert!(mapper.is_enabled());
mapper.disable();
assert!(!mapper.is_enabled());
assert!(mapper.gateway.is_none());
mapper.enable();
assert!(mapper.is_enabled());
}
#[test]
fn test_upnp_mapper_custom_lease() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::with_lease_duration(addr, 100);
assert_eq!(mapper.default_lease_duration, MIN_LEASE_DURATION);
let mapper = UpnpPortMapper::with_lease_duration(addr, 100000);
assert_eq!(mapper.default_lease_duration, MAX_LEASE_DURATION);
let mapper = UpnpPortMapper::with_lease_duration(addr, 1800);
assert_eq!(mapper.default_lease_duration, 1800);
}
#[tokio::test]
async fn test_upnp_mapper_loopback_rejection() {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
let result = mapper.discover().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("loopback"));
}
#[tokio::test]
async fn test_upnp_mapper_disabled() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
mapper.disable();
let result = mapper.discover().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("disabled"));
}
#[tokio::test]
async fn test_upnp_mapper_no_gateway() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
let result = mapper.add_mapping(12345).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("No UPnP gateway"));
}
#[test]
fn test_try_create_upnp_candidate_loopback() {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
try_create_upnp_candidate(addr).await
});
assert!(result.is_none());
}
#[tokio::test]
async fn test_upnp_mapper_clone() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
let cloned = mapper.clone();
assert_eq!(cloned.local_addr, addr);
assert!(cloned.is_enabled());
assert!(!cloned.has_gateway());
}
#[test]
fn test_mapping_constants() {
assert!(MIN_LEASE_DURATION > 0);
assert!(MAX_LEASE_DURATION > MIN_LEASE_DURATION);
assert!(DEFAULT_LEASE_DURATION >= MIN_LEASE_DURATION);
assert!(DEFAULT_LEASE_DURATION <= MAX_LEASE_DURATION);
}
}