use super::*;
use crate::config::RtcConfigurationBuilder;
use crate::transports::PacketReceiver;
use crate::transports::ice::upnp::{
DEFAULT_LEASE_DURATION, MAX_LEASE_DURATION, MIN_LEASE_DURATION, PortMapping, UpnpPortMapper,
};
use crate::{IceServer, IceTransportPolicy, RtcConfiguration};
use ::turn::{
auth::{AuthHandler, generate_auth_key},
relay::relay_static::RelayAddressGeneratorStatic,
server::{
Server,
config::{ConnConfig, ServerConfig},
},
};
use anyhow::Result;
use bytes::Bytes;
use futures::FutureExt;
use tokio::sync::broadcast;
use serial_test::serial;
use tokio::time::{Duration, timeout};
type TurnResult<T> = std::result::Result<T, ::turn::Error>;
#[test]
fn parse_turn_uri() {
let uri = IceServerUri::parse("turn:example.com:3478?transport=tcp").unwrap();
assert_eq!(uri.host, "example.com");
assert_eq!(uri.port, 3478);
assert_eq!(uri.transport, IceTransportProtocol::Tcp);
assert_eq!(uri.kind, IceUriKind::Turn);
}
#[tokio::test]
async fn builder_starts_gathering() {
let (transport, runner) = IceTransportBuilder::new(RtcConfiguration::default()).build();
tokio::spawn(runner);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(matches!(
transport.gather_state(),
IceGathererState::Complete
));
}
#[tokio::test]
async fn stun_probe_yields_server_reflexive_candidate() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config
.ice_servers
.push(IceServer::new(vec![turn_server.stun_url()]));
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
assert!(
candidates
.iter()
.any(|c| matches!(c.typ, IceCandidateType::ServerReflexive))
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
async fn stun_candidate_raddr_is_not_unspecified() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config
.ice_servers
.push(IceServer::new(vec![turn_server.stun_url()]));
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
let srflx = candidates
.iter()
.find(|c| matches!(c.typ, IceCandidateType::ServerReflexive));
if let Some(candidate) = srflx {
if let Some(raddr) = candidate.related_address {
assert!(
!raddr.ip().is_unspecified(),
"STUN candidate raddr should not be unspecified (0.0.0.0), got: {}",
raddr.ip()
);
let host_addresses: Vec<_> = candidates
.iter()
.filter(|c| matches!(c.typ, IceCandidateType::Host))
.map(|c| c.address.ip())
.collect();
assert!(
host_addresses.contains(&raddr.ip()),
"STUN candidate raddr ({}) should match a host candidate address. Host addresses: {:?}",
raddr.ip(),
host_addresses
);
} else {
panic!("STUN candidate should have a related_address");
}
}
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn turn_probe_yields_relay_candidate() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
assert!(
candidates
.iter()
.any(|c| matches!(c.typ, IceCandidateType::Relay))
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
async fn policy_relay_only_gathers_relay_candidates() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
config
.ice_servers
.push(IceServer::new(vec![turn_server.stun_url()]));
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
assert!(!candidates.is_empty());
for c in candidates {
assert_eq!(
c.typ,
IceCandidateType::Relay,
"Found non-relay candidate: {:?}",
c
);
}
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn turn_client_can_create_permission() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let uri = IceServerUri::parse(&turn_server.turn_url())?;
let server =
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD);
let client = TurnClient::connect(&uri, false).await?;
let creds = TurnCredentials::from_server(&server)?;
client.allocate(creds).await?;
let peer: SocketAddr = "127.0.0.1:5000".parse().unwrap();
client.create_permission(peer).await?;
turn_server.stop().await?;
Ok(())
}
#[test]
fn candidate_pair_priority_calculation() {
let local = IceCandidate::host("127.0.0.1:1000".parse().unwrap(), 1);
let remote = IceCandidate::host("127.0.0.1:2000".parse().unwrap(), 1);
let pair = IceCandidatePair::new(local.clone(), remote.clone());
let p1 = pair.priority(IceRole::Controlling);
let p2 = pair.priority(IceRole::Controlled);
assert_eq!(p1, p2);
let local_relay = IceCandidate::relay("127.0.0.1:1000".parse().unwrap(), 1, "udp");
let pair2 = IceCandidatePair::new(local_relay, remote);
let prio_controlling = pair2.priority(IceRole::Controlling);
let prio_controlled = pair2.priority(IceRole::Controlled);
assert!(prio_controlled > prio_controlling);
assert_eq!(prio_controlled - prio_controlling, 1);
}
#[tokio::test]
#[serial]
async fn turn_connection_relay_to_host() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut config1 = RtcConfiguration::default();
config1.ice_transport_policy = IceTransportPolicy::Relay;
config1.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (transport1, runner1) = IceTransportBuilder::new(config1)
.role(IceRole::Controlling)
.build();
tokio::spawn(runner1);
let config2 = RtcConfiguration::default();
let (transport2, runner2) = IceTransportBuilder::new(config2)
.role(IceRole::Controlled)
.build();
tokio::spawn(runner2);
tokio::time::sleep(Duration::from_millis(100)).await;
let t1 = transport1.clone();
let t2 = transport2.clone();
let mut rx1 = t1.subscribe_candidates();
let mut rx2 = t2.subscribe_candidates();
for c in t1.local_candidates() {
t2.add_remote_candidate(c);
}
for c in t2.local_candidates() {
t1.add_remote_candidate(c);
}
tokio::spawn(async move {
while let Ok(c) = rx1.recv().await {
t2.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = rx2.recv().await {
t1.add_remote_candidate(c);
}
});
let state1 = transport1.subscribe_state();
let state2 = transport2.subscribe_state();
transport1.start(transport2.local_parameters())?;
transport2.start(transport1.local_parameters())?;
let wait_connected = |mut state: watch::Receiver<IceTransportState>, name: &'static str| async move {
loop {
let s = *state.borrow_and_update();
if s == IceTransportState::Connected {
return Ok(());
}
if s == IceTransportState::Failed {
return Err(anyhow::anyhow!("Transport {} failed", name));
}
if state.changed().await.is_err() {
return Err(anyhow::anyhow!("Transport {} state channel closed", name));
}
}
};
let result = tokio::try_join!(
timeout(Duration::from_secs(15), wait_connected(state1, "1")),
timeout(Duration::from_secs(15), wait_connected(state2, "2"))
);
if let Err(e) = &result {
eprintln!("Connection failed: {:?}", e);
}
let (r1, r2) = result?;
r1?;
r2?;
let pair1 = transport1.get_selected_pair().await.unwrap();
assert_eq!(pair1.local.typ, IceCandidateType::Relay);
let (tx1, mut rx1_data) = tokio::sync::mpsc::channel(10);
let (tx2, mut rx2_data) = tokio::sync::mpsc::channel(10);
struct TestReceiver(tokio::sync::mpsc::Sender<Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for TestReceiver {
async fn receive(&self, packet: Bytes, _addr: SocketAddr) {
let _ = self.0.send(packet).await;
}
}
transport1
.set_data_receiver(Arc::new(TestReceiver(tx1)))
.await;
transport2
.set_data_receiver(Arc::new(TestReceiver(tx2)))
.await;
let socket1 = transport1.get_selected_socket().await.unwrap();
let pair1 = transport1.get_selected_pair().await.unwrap();
let data = Bytes::from_static(b"hello from 1");
socket1.send_to(&data, pair1.remote.address).await?;
let received = timeout(Duration::from_secs(5), rx2_data.recv())
.await?
.unwrap();
assert_eq!(received, data);
let socket2 = transport2.get_selected_socket().await.unwrap();
let pair2 = transport2.get_selected_pair().await.unwrap();
let data2 = Bytes::from_static(b"hello from 2");
socket2.send_to(&data2, pair2.remote.address).await?;
let received2 = timeout(Duration::from_secs(5), rx1_data.recv())
.await?
.unwrap();
assert_eq!(received2, data2);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_ice_connection_timeout() -> Result<()> {
let mut config = RtcConfiguration::default();
config.ice_connection_timeout = Duration::from_millis(100);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
transport
.inner
.state
.send(IceTransportState::Connected)
.unwrap();
tokio::time::sleep(Duration::from_millis(1200)).await;
assert_eq!(transport.state(), IceTransportState::Failed);
Ok(())
}
const TEST_USERNAME: &str = "test";
const TEST_PASSWORD: &str = "test";
const TEST_REALM: &str = ".turn";
struct TestTurnServer {
server: Option<Server>,
addr: SocketAddr,
}
impl TestTurnServer {
async fn start() -> Result<Self> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let addr = socket.local_addr()?;
let conn = Arc::new(socket);
let relay_addr_generator = Box::new(RelayAddressGeneratorStatic {
relay_address: addr.ip(),
address: "0.0.0.0".to_string(),
net: Arc::new(webrtc_util::vnet::net::Net::new(None)),
});
let auth_handler = Arc::new(StaticAuthHandler::new(
TEST_USERNAME.to_string(),
TEST_PASSWORD.to_string(),
));
let config = ServerConfig {
conn_configs: vec![ConnConfig {
conn,
relay_addr_generator,
}],
realm: TEST_REALM.to_string(),
auth_handler,
channel_bind_timeout: Duration::from_secs(600),
alloc_close_notify: None,
};
let server = Server::new(config).await?;
Ok(Self {
server: Some(server),
addr,
})
}
fn stun_url(&self) -> String {
format!("stun:{}", self.addr)
}
fn turn_url(&self) -> String {
format!("turn:{}", self.addr)
}
async fn stop(&mut self) -> Result<()> {
if let Some(server) = self.server.take() {
server.close().await?;
}
Ok(())
}
}
struct StaticAuthHandler {
username: String,
password: String,
}
impl StaticAuthHandler {
fn new(username: String, password: String) -> Self {
Self { username, password }
}
}
impl AuthHandler for StaticAuthHandler {
fn auth_handle(
&self,
username: &str,
realm: &str,
_src_addr: SocketAddr,
) -> TurnResult<Vec<u8>> {
if username != self.username {
return Err(::turn::Error::ErrNoSuchUser);
}
Ok(generate_auth_key(username, realm, &self.password))
}
}
#[test]
fn ice_candidate_foundation_compliance() {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
let host = IceCandidate::host(addr, 1);
assert!(!host.foundation.contains(':'));
assert!(host.foundation.chars().all(|c| c.is_ascii_alphanumeric()));
let sdp = host.to_sdp();
assert!(sdp.contains(" typ host"));
let parts: Vec<&str> = sdp.split_whitespace().collect();
let foundation = parts[0];
assert_eq!(foundation, host.foundation);
let mapped: SocketAddr = "1.2.3.4:5000".parse().unwrap();
let srflx = IceCandidate::server_reflexive(addr, mapped, 1);
assert!(!srflx.foundation.contains(':'));
assert!(srflx.foundation.chars().all(|c| c.is_ascii_alphanumeric()));
let srflx2 = IceCandidate::server_reflexive(addr, "1.2.3.5:6000".parse().unwrap(), 1);
assert_eq!(srflx.foundation, srflx2.foundation);
let addr2: SocketAddr = "192.168.0.1:5000".parse().unwrap();
let srflx3 = IceCandidate::server_reflexive(addr2, mapped, 1);
assert_ne!(srflx.foundation, srflx3.foundation);
let relay = IceCandidate::relay(mapped, 1, "udp");
assert!(!relay.foundation.contains(':'));
let host_same_addr = IceCandidate::host(addr, 1);
let srflx_same_base = IceCandidate::server_reflexive(addr, mapped, 1);
assert_ne!(host_same_addr.foundation, srflx_same_base.foundation);
}
#[tokio::test]
#[serial]
async fn test_ice_lite_stun_response() -> Result<()> {
use crate::TransportMode;
let mut config = RtcConfiguration::default();
config.transport_mode = TransportMode::Rtp;
config.enable_ice_lite = true;
config.bind_ip = Some("127.0.0.1".to_string());
let (ice_lite, runner) = IceTransport::new(config);
tokio::spawn(runner);
let local_addr = ice_lite.setup_direct_rtp_offer().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let _local_params = ice_lite.local_parameters();
let remote_params = IceParameters::new("remote_ufrag", "remote_pwd_12345");
ice_lite.set_remote_parameters(remote_params.clone());
ice_lite.set_role(IceRole::Controlled);
let remote_socket = UdpSocket::bind("127.0.0.1:0").await?;
let remote_addr = remote_socket.local_addr()?;
let tx_id = crate::transports::ice::stun::random_bytes::<12>();
let binding_request = StunMessage::binding_request(tx_id, Some("ice-lite-test"));
let request_bytes = binding_request.encode(None, false)?;
println!(
"Sending STUN Binding Request from {} to ICE-lite agent at {}",
remote_addr, local_addr
);
let mut buf = [0u8; 1500];
let (len, response_from) = {
let mut result = None;
for _ in 0..3 {
remote_socket.send_to(&request_bytes, local_addr).await?;
match tokio::time::timeout(Duration::from_secs(2), remote_socket.recv_from(&mut buf))
.await
{
Ok(Ok(recv_result)) => {
result = Some(Ok(recv_result));
break;
}
Ok(Err(e)) => {
result = Some(Err(anyhow::anyhow!("Socket recv error: {}", e)));
}
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
result.ok_or_else(|| anyhow::anyhow!("Should receive STUN response within 5 seconds"))??
};
println!(
"Received STUN response from {}, {} bytes",
response_from, len
);
assert_eq!(
response_from, local_addr,
"Response should come from ICE-lite local address"
);
let decoded_response = StunMessage::decode(&buf[..len])?;
assert_eq!(
decoded_response.class,
crate::transports::ice::stun::StunClass::SuccessResponse
);
assert_eq!(
decoded_response.method,
crate::transports::ice::stun::StunMethod::Binding
);
assert_eq!(
decoded_response.transaction_id, tx_id,
"Transaction ID should match request"
);
assert!(
decoded_response.xor_mapped_address.is_some(),
"STUN response should contain XOR-MAPPED-ADDRESS"
);
let mapped_addr = decoded_response.xor_mapped_address.unwrap();
assert_eq!(
mapped_addr, remote_addr,
"XOR-MAPPED-ADDRESS should reflect remote agent's address"
);
println!("✓ ICE-lite correctly responded to STUN binding request");
println!("✓ Response contains correct transaction ID and XOR-MAPPED-ADDRESS");
let candidates = ice_lite.remote_candidates();
let prflx_candidates: Vec<_> = candidates
.iter()
.filter(|c| c.typ == IceCandidateType::PeerReflexive && c.address == remote_addr)
.collect();
assert!(
!prflx_candidates.is_empty(),
"Remote address should be added as peer-reflexive candidate"
);
println!(
"✓ Peer-reflexive candidate discovered for remote address {}",
remote_addr
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_ice_lite_connectivity_establishment() -> Result<()> {
use crate::TransportMode;
let mut lite_config = RtcConfiguration::default();
lite_config.transport_mode = TransportMode::Rtp;
lite_config.enable_ice_lite = true;
lite_config.bind_ip = Some("127.0.0.1".to_string());
let (ice_lite, lite_runner) = IceTransport::new(lite_config);
tokio::spawn(lite_runner);
let full_config = RtcConfiguration::default();
let (ice_full, full_runner) = IceTransportBuilder::new(full_config)
.role(IceRole::Controlling)
.build();
tokio::spawn(full_runner);
let _lite_addr = ice_lite.setup_direct_rtp_offer().await?;
let lite_params = ice_lite.local_parameters();
let full_params = ice_full.local_parameters();
ice_lite.set_remote_parameters(full_params.clone());
ice_lite.set_role(IceRole::Controlled);
let lite_candidates = ice_lite.local_candidates();
assert!(
!lite_candidates.is_empty(),
"ICE-lite should have local candidates"
);
for candidate in lite_candidates {
ice_full.add_remote_candidate(candidate);
}
ice_full.start(lite_params.clone())?;
tokio::time::sleep(Duration::from_millis(100)).await;
let full_candidates = ice_full.local_candidates();
let full_host_candidate = full_candidates
.iter()
.find(|c| c.typ == IceCandidateType::Host)
.expect("Full ICE agent should have host candidate")
.clone();
ice_lite.complete_direct_rtp(full_host_candidate.address);
ice_lite.add_remote_candidate(full_host_candidate);
let lite_state = ice_lite.subscribe_state();
let full_state = ice_full.subscribe_state();
async fn wait_connected(
mut state: watch::Receiver<IceTransportState>,
name: &str,
) -> Result<()> {
for _ in 0..50 {
let current_state = *state.borrow();
if current_state == IceTransportState::Connected {
println!("{} transport connected", name);
return Ok(());
}
if current_state == IceTransportState::Failed {
return Err(anyhow::anyhow!("{} transport failed", name));
}
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = state.changed().now_or_never();
}
Err(anyhow::anyhow!(
"{} transport did not connect within timeout",
name
))
}
tokio::try_join!(
wait_connected(lite_state, "ICE-lite"),
wait_connected(full_state, "Full ICE")
)?;
let lite_pair = ice_lite.get_selected_pair().await.unwrap();
let full_pair = ice_full.get_selected_pair().await.unwrap();
println!(
"ICE-lite selected pair: {} -> {}",
lite_pair.local.address, lite_pair.remote.address
);
println!(
"Full ICE selected pair: {} -> {}",
full_pair.local.address, full_pair.remote.address
);
let (lite_tx, mut lite_rx) = tokio::sync::mpsc::channel(10);
let (full_tx, mut full_rx) = tokio::sync::mpsc::channel(10);
struct DataReceiver(tokio::sync::mpsc::Sender<Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for DataReceiver {
async fn receive(&self, packet: Bytes, _addr: SocketAddr) {
if !packet.is_empty() && packet[0] >= 2 {
let _ = self.0.send(packet).await;
}
}
}
ice_lite
.set_data_receiver(Arc::new(DataReceiver(lite_tx)))
.await;
ice_full
.set_data_receiver(Arc::new(DataReceiver(full_tx)))
.await;
let full_socket = ice_full.get_selected_socket().await.unwrap();
let test_data = Bytes::from_static(b"Hello from full ICE agent");
full_socket
.send_to(&test_data, full_pair.remote.address)
.await?;
let received_by_lite = timeout(Duration::from_secs(5), lite_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("ICE-lite did not receive data"))?;
assert_eq!(received_by_lite, test_data);
let lite_socket = ice_lite.get_selected_socket().await.unwrap();
let response_data = Bytes::from_static(b"Hello from ICE-lite agent");
lite_socket
.send_to(&response_data, lite_pair.remote.address)
.await?;
let received_by_full = timeout(Duration::from_secs(5), full_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("Full ICE agent did not receive data"))?;
assert_eq!(received_by_full, response_data);
println!("✓ ICE-lite successfully established connectivity with full ICE agent");
println!("✓ Bidirectional data flow verified");
Ok(())
}
#[test]
fn test_nomination_timeout_larger_than_stun_timeout() {
let config = RtcConfiguration::default();
assert!(
config.nomination_timeout > config.stun_timeout,
"nomination_timeout ({:?}) must be > stun_timeout ({:?}) to allow more retransmissions",
config.nomination_timeout,
config.stun_timeout,
);
}
#[test]
fn test_nomination_timeout_builder() {
use crate::config::RtcConfigurationBuilder;
let custom = std::time::Duration::from_secs(20);
let config = RtcConfigurationBuilder::new()
.nomination_timeout(custom)
.build();
assert_eq!(config.nomination_timeout, custom);
assert_eq!(config.stun_timeout, std::time::Duration::from_secs(5));
}
async fn setup_host_pair(
controlling_config: RtcConfiguration,
controlled_config: RtcConfiguration,
) -> (IceTransport, IceTransport) {
let (controlling, runner_c) = IceTransportBuilder::new(controlling_config)
.role(IceRole::Controlling)
.build();
tokio::spawn(runner_c);
let (controlled, runner_d) = IceTransportBuilder::new(controlled_config)
.role(IceRole::Controlled)
.build();
tokio::spawn(runner_d);
for c in controlling.local_candidates() {
controlled.add_remote_candidate(c);
}
for c in controlled.local_candidates() {
controlling.add_remote_candidate(c);
}
let ctrl_clone = controlling.clone();
let ctrd_clone = controlled.clone();
let mut rx_ctrl = controlling.subscribe_candidates();
let mut rx_ctrd = controlled.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = rx_ctrl.recv().await {
ctrd_clone.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = rx_ctrd.recv().await {
ctrl_clone.add_remote_candidate(c);
}
});
controlling
.start(controlled.local_parameters())
.expect("controlling.start");
controlled
.start(controlling.local_parameters())
.expect("controlled.start");
(controlling, controlled)
}
async fn wait_ice_connected(
mut state_rx: watch::Receiver<IceTransportState>,
deadline: Duration,
) -> bool {
let result = timeout(deadline, async move {
loop {
let s = *state_rx.borrow_and_update();
match s {
IceTransportState::Connected | IceTransportState::Completed => return true,
IceTransportState::Failed => return false,
_ => {}
}
if state_rx.changed().await.is_err() {
return false;
}
}
})
.await;
result.unwrap_or(false)
}
#[tokio::test]
#[serial]
async fn test_nomination_complete_fires_on_connection() -> Result<()> {
let config1 = RtcConfiguration::default();
let config2 = RtcConfiguration::default();
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_nomination_rx = controlling.subscribe_nomination_complete();
let mut ctrd_nomination_rx = controlled.subscribe_nomination_complete();
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling agent failed to reach Connected");
assert!(ok2, "Controlled agent failed to reach Connected");
let ctrl_result = timeout(Duration::from_secs(5), async {
if ctrl_nomination_rx.borrow().is_some() {
return *ctrl_nomination_rx.borrow();
}
ctrl_nomination_rx.changed().await.ok()?;
*ctrl_nomination_rx.borrow()
})
.await
.expect("nomination_complete timed out on controlling side");
assert_eq!(
ctrl_result,
Some(true),
"Controlling nomination should succeed (Some(true))"
);
let ctrd_result = timeout(Duration::from_secs(5), async {
if ctrd_nomination_rx.borrow().is_some() {
return *ctrd_nomination_rx.borrow();
}
ctrd_nomination_rx.changed().await.ok()?;
*ctrd_nomination_rx.borrow()
})
.await
.expect("nomination_complete timed out on controlled side");
assert_eq!(
ctrd_result,
Some(true),
"Controlled nomination should be Some(true) (after receiving USE-CANDIDATE)"
);
Ok(())
}
#[tokio::test]
async fn test_nomination_uses_nomination_timeout_not_stun_timeout() -> Result<()> {
let mut config = RtcConfiguration::default();
config.stun_timeout = Duration::from_secs(30); config.nomination_timeout = Duration::from_millis(200);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
let local_candidate = IceCandidate::host("127.0.0.1:0".parse().unwrap(), 1);
let remote_candidate = IceCandidate::host("127.0.0.1:1".parse().unwrap(), 1);
let pair = IceCandidatePair::new(local_candidate, remote_candidate);
*transport.inner.role.lock() = IceRole::Controlling;
let remote_params = IceParameters::new("dummy_ufrag", "dummy_password_1234567890");
transport.set_remote_parameters(remote_params);
let mut nomination_rx = transport.subscribe_nomination_complete();
let inner_clone = transport.inner.clone();
let pair_clone = pair.clone();
tokio::spawn(async move {
let result = perform_binding_check(
&pair_clone.local,
&pair_clone.remote,
&inner_clone,
IceRole::Controlling,
true, )
.await;
match result {
Ok(_) => {
let _ = inner_clone.nomination_complete.send(Some(true));
}
Err(_) => {
let _ = inner_clone.nomination_complete.send(Some(false));
}
}
});
let start = std::time::Instant::now();
let result = timeout(Duration::from_secs(5), async {
if nomination_rx.borrow().is_some() {
return *nomination_rx.borrow();
}
nomination_rx.changed().await.ok()?;
*nomination_rx.borrow()
})
.await
.expect("nomination_complete should fire within 5 s");
let elapsed = start.elapsed();
assert_eq!(
result,
Some(false),
"Nomination to a black-hole address should fail"
);
assert!(
elapsed < Duration::from_secs(5),
"Nomination should have timed out using nomination_timeout (200 ms), not stun_timeout (30 s); elapsed: {:?}",
elapsed
);
assert!(
elapsed < Duration::from_secs(2),
"Elapsed ({:?}) should be close to nomination_timeout (200 ms), not stun_timeout (30 s)",
elapsed
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_succeeds_under_moderate_packet_loss() -> Result<()> {
struct ScopeGuard {
prev: u32,
}
impl Drop for ScopeGuard {
fn drop(&mut self) {
PACKET_LOSS_RATE.store(self.prev, Ordering::SeqCst);
}
}
let _guard = ScopeGuard {
prev: PACKET_LOSS_RATE.swap(3000, Ordering::SeqCst),
};
let result: Result<()> = async {
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_secs(15);
config1.stun_timeout = Duration::from_secs(5);
config2.nomination_timeout = Duration::from_secs(15);
config2.stun_timeout = Duration::from_secs(5);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(20)),
wait_ice_connected(ctrd_state, Duration::from_secs(20)),
);
assert!(ok1, "Controlling agent failed to connect under packet loss");
assert!(ok2, "Controlled agent failed to connect under packet loss");
let nom_result = timeout(Duration::from_secs(20), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await
.expect("nomination_complete should fire within 20 s even under 30% loss");
assert_eq!(
nom_result,
Some(true),
"Nomination should succeed under 30% packet loss with nomination_timeout > stun_timeout"
);
Ok(())
}
.await;
result
}
#[test]
fn test_base_address_returns_related_address_for_host_candidate() {
let local_addr: SocketAddr = "192.168.1.100:54321".parse().unwrap();
let external_addr: SocketAddr = "203.0.113.5:54321".parse().unwrap();
let mut candidate = IceCandidate::host(external_addr, 1);
candidate.related_address = Some(local_addr);
assert_eq!(
candidate.base_address(),
local_addr,
"base_address() should return related_address for host candidate with external IP"
);
assert_eq!(
candidate.address, external_addr,
"address should be the external IP"
);
}
#[test]
fn test_base_address_returns_address_when_no_related_address() {
let addr: SocketAddr = "192.168.1.100:54321".parse().unwrap();
let candidate = IceCandidate::host(addr, 1);
assert_eq!(
candidate.base_address(),
addr,
"base_address() should return address when related_address is None"
);
}
#[tokio::test]
#[serial]
async fn test_ice_connection_with_external_ip() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.external_ip = Some("203.0.113.10".to_string());
let mut config2 = RtcConfiguration::default();
config2.external_ip = Some("203.0.113.20".to_string());
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_candidates = controlling.local_candidates();
let non_loopback_candidate = ctrl_candidates
.iter()
.find(|c| !c.address.ip().is_loopback());
if let Some(cand) = non_loopback_candidate {
assert!(
cand.related_address.is_some(),
"Host candidate should have related_address when external_ip is configured"
);
assert_ne!(
cand.address.ip(),
cand.base_address().ip(),
"Candidate address (external) should differ from base_address (local)"
);
}
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(
ok1,
"Controlling agent failed to reach Connected with external_ip"
);
assert!(
ok2,
"Controlled agent failed to reach Connected with external_ip"
);
let selected_pair = controlling.get_selected_pair().await;
assert!(
selected_pair.is_some(),
"Controlling agent should have a selected pair"
);
let selected_pair = controlled.get_selected_pair().await;
assert!(
selected_pair.is_some(),
"Controlled agent should have a selected pair"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_with_external_ip() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.external_ip = Some("203.0.113.10".to_string());
let mut config2 = RtcConfiguration::default();
config2.external_ip = Some("203.0.113.20".to_string());
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let mut ctrd_nom_rx = controlled.subscribe_nomination_complete();
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling agent failed to connect");
assert!(ok2, "Controlled agent failed to connect");
let ctrl_nom = timeout(Duration::from_secs(15), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await
.expect("Controlling nomination_complete should fire");
let ctrd_nom = timeout(Duration::from_secs(5), async {
if ctrd_nom_rx.borrow().is_some() {
return *ctrd_nom_rx.borrow();
}
ctrd_nom_rx.changed().await.ok()?;
*ctrd_nom_rx.borrow()
})
.await
.expect("Controlled nomination_complete should fire");
assert_eq!(
ctrd_nom,
Some(true),
"Controlled side should signal nomination_complete immediately"
);
assert!(
ctrl_nom.is_some(),
"Controlling nomination_complete should fire (got {:?})",
ctrl_nom
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_ice_connection_without_external_ip() -> Result<()> {
let config1 = RtcConfiguration::default();
let config2 = RtcConfiguration::default();
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_candidates = controlling.local_candidates();
for cand in &ctrl_candidates {
if cand.typ == IceCandidateType::Host {
if let Some(related) = cand.related_address {
assert_eq!(
related, cand.address,
"Without external_ip, related_address should equal address"
);
}
assert_eq!(
cand.base_address(),
cand.address,
"Without external_ip, base_address() should equal address"
);
}
}
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(
ok1,
"Controlling agent failed to reach Connected without external_ip"
);
assert!(
ok2,
"Controlled agent failed to reach Connected without external_ip"
);
let ctrl_pair = controlling.get_selected_pair().await;
assert!(
ctrl_pair.is_some(),
"Controlling agent should have a selected pair"
);
let pair = ctrl_pair.unwrap();
assert!(
pair.local.address.port() > 0,
"Local address should have valid port"
);
assert!(
pair.remote.address.port() > 0,
"Remote address should have valid port"
);
let ctrd_pair = controlled.get_selected_pair().await;
assert!(
ctrd_pair.is_some(),
"Controlled agent should have a selected pair"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_delayed_by_dtls_socket_contention() -> Result<()> {
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_millis(500);
config1.stun_timeout = Duration::from_millis(200);
config2.nomination_timeout = Duration::from_millis(500);
config2.stun_timeout = Duration::from_millis(200);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let mut ctrd_nom_rx = controlled.subscribe_nomination_complete();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling ICE failed to connect");
assert!(ok2, "Controlled ICE failed to connect");
let ice_connected_at = std::time::Instant::now();
let ctrd_nom = timeout(Duration::from_millis(600), async {
if ctrd_nom_rx.borrow().is_some() {
return *ctrd_nom_rx.borrow();
}
ctrd_nom_rx.changed().await.ok()?;
*ctrd_nom_rx.borrow()
})
.await;
assert!(
ctrd_nom.is_ok(),
"Controlled side nomination_complete should fire after receiving USE-CANDIDATE (within 600ms), \
but timed out — this means the controlled side never received USE-CANDIDATE"
);
assert_eq!(
ctrd_nom.unwrap(),
Some(true),
"Controlled side should signal nomination success after receiving USE-CANDIDATE"
);
let ctrl_nom = timeout(Duration::from_millis(600), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await;
let elapsed = ice_connected_at.elapsed();
assert!(
ctrl_nom.is_ok(),
"Controlling side nomination_complete should fire within nomination_timeout (500ms + margin), \
elapsed={:?}. If this fails it means nomination is stuck indefinitely.",
elapsed
);
let nom_result = ctrl_nom.unwrap();
assert!(
nom_result.is_some(),
"nomination_complete must be Some(_), got None after {:?}",
elapsed
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_refresh_succeeds_normally() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config.clone(), tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
let relay = candidates
.iter()
.find(|c| c.typ == IceCandidateType::Relay)
.expect("should have relay candidate")
.clone();
let client = {
let clients = gatherer.turn_clients.lock();
clients
.get(&relay.address)
.cloned()
.expect("should have TurnClient for relay")
};
let peer: SocketAddr = "127.0.0.1:12345".parse().unwrap();
client.create_permission(peer).await?;
client.add_channel(peer, 0x4000).await;
let (transport, runner) = IceTransport::new(config);
tokio::spawn(runner);
{
let mut clients = transport.inner.gatherer.turn_clients.lock();
for (addr, c) in gatherer.turn_clients.lock().iter() {
clients.insert(*addr, c.clone());
}
}
transport
.inner
.gatherer
.local_candidates
.lock()
.extend(candidates);
let remote = IceCandidate::host("127.0.0.1:9999".parse().unwrap(), 1);
let pair = IceCandidatePair::new(relay.clone(), remote);
*transport.inner.selected_pair.lock() = Some(pair);
let _ = transport.inner.state.send(IceTransportState::Connected);
IceTransportRunner::run_turn_refresh(&transport.inner).await;
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_client_update_nonce_takes_effect() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let uri = IceServerUri::parse(&turn_server.turn_url())?;
let server =
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD);
let client = TurnClient::connect(&uri, false).await?;
let creds = TurnCredentials::from_server(&server)?;
client.allocate(creds).await?;
client
.update_nonce("new-realm".to_string(), "new-nonce-xyz".to_string())
.await;
let (bytes, _tx_id) = client.create_refresh_packet().await?;
let decoded = StunMessage::decode(&bytes)?;
let has_new_nonce = decoded.nonce.as_deref() == Some("new-nonce-xyz");
assert!(
has_new_nonce,
"Refresh packet should contain the updated nonce; got {:?}",
decoded.nonce
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_refresh_retries_on_stale_nonce() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let uri = IceServerUri::parse(&turn_server.turn_url())?;
let server =
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD);
let client = Arc::new(TurnClient::connect(&uri, false).await?);
let creds = TurnCredentials::from_server(&server)?;
client.allocate(creds).await?;
let peer: SocketAddr = "127.0.0.1:12346".parse().unwrap();
client.create_permission(peer).await?;
client
.update_nonce(TEST_REALM.to_string(), "stale-nonce-AAAA".to_string())
.await;
let (bytes1, tx_id1) = client.create_channel_rebind_packet(peer, 0x4000).await?;
client.send(&bytes1).await?;
let mut buf = [0u8; 1500];
let len = client.recv(&mut buf).await?;
let resp1 = StunMessage::decode(&buf[..len])?;
let _ = tx_id1;
assert!(
matches!(resp1.error_code, Some(400..=438)),
"Expected 4xx error for stale nonce, got {:?}",
resp1.error_code
);
if let (Some(realm), Some(nonce)) = (resp1.realm.clone(), resp1.nonce.clone()) {
client.update_nonce(realm, nonce).await;
}
let (bytes2, _tx_id2) = client.create_channel_rebind_packet(peer, 0x4000).await?;
client.send(&bytes2).await?;
let len2 = client.recv(&mut buf).await?;
let resp2 = StunMessage::decode(&buf[..len2])?;
assert_eq!(
resp2.class,
StunClass::SuccessResponse,
"Second ChannelBind with fresh nonce should succeed, got error={:?}",
resp2.error_code
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_refresh_tolerates_server_unreachable() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config.clone(), tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
let relay = candidates
.iter()
.find(|c| c.typ == IceCandidateType::Relay)
.expect("should have relay candidate")
.clone();
turn_server.stop().await?;
let (transport, runner) = IceTransport::new(config);
tokio::spawn(runner);
{
let mut clients = transport.inner.gatherer.turn_clients.lock();
for (addr, c) in gatherer.turn_clients.lock().iter() {
clients.insert(*addr, c.clone());
}
}
transport
.inner
.gatherer
.local_candidates
.lock()
.extend(candidates);
let remote = IceCandidate::host("127.0.0.1:9999".parse().unwrap(), 1);
let pair = IceCandidatePair::new(relay, remote);
*transport.inner.selected_pair.lock() = Some(pair);
let _ = transport.inner.state.send(IceTransportState::Connected);
let result = timeout(
Duration::from_secs(25),
IceTransportRunner::run_turn_refresh(&transport.inner),
)
.await;
assert!(
result.is_ok(),
"run_turn_refresh should complete even when server is unreachable"
);
Ok(())
}
#[tokio::test]
async fn test_nomination_fails_immediately_on_host_unreachable() -> Result<()> {
let mut config = RtcConfiguration::default();
config.stun_timeout = Duration::from_secs(30);
config.nomination_timeout = Duration::from_millis(500);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
let local_candidate = IceCandidate::host("127.0.0.1:0".parse().unwrap(), 1);
let remote_candidate = IceCandidate::host("127.0.0.1:1".parse().unwrap(), 1);
*transport.inner.role.lock() = IceRole::Controlling;
transport.set_remote_parameters(IceParameters::new(
"testufrag",
"testpassword_long_enough_1234",
));
let mut nom_rx = transport.subscribe_nomination_complete();
let inner_clone = transport.inner.clone();
let local_clone = local_candidate.clone();
let remote_clone = remote_candidate.clone();
tokio::spawn(async move {
let result = perform_binding_check(
&local_clone,
&remote_clone,
&inner_clone,
IceRole::Controlling,
true,
)
.await;
let signal = if result.is_ok() {
Some(true)
} else {
Some(false)
};
let _ = inner_clone.nomination_complete.send(signal);
});
let start = std::time::Instant::now();
let result = timeout(Duration::from_millis(1500), async {
if nom_rx.borrow().is_some() {
return *nom_rx.borrow();
}
nom_rx.changed().await.ok()?;
*nom_rx.borrow()
})
.await;
let elapsed = start.elapsed();
assert!(
result.is_ok(),
"nomination_complete should fire after nomination_timeout (500ms) when host is \
unreachable, but timed out after {:?}",
elapsed
);
let nom_value = result.unwrap();
assert_eq!(
nom_value,
Some(false),
"Nomination to unreachable address should produce Some(false), got {:?}",
nom_value
);
Ok(())
}
#[tokio::test]
async fn test_dtls_proceeds_after_nomination_timeout() -> Result<()> {
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_millis(1);
config1.stun_timeout = Duration::from_secs(5);
config2.nomination_timeout = Duration::from_millis(1);
config2.stun_timeout = Duration::from_secs(5);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling ICE failed to connect");
assert!(ok2, "Controlled ICE failed to connect");
let nom = timeout(Duration::from_millis(200), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await;
let ctrl_pair = controlling.get_selected_pair().await;
assert!(
ctrl_pair.is_some(),
"Even when nomination times out, ICE selected pair should exist. nom={:?}",
nom
);
let ctrd_pair = controlled.get_selected_pair().await;
assert!(
ctrd_pair.is_some(),
"Controlled side should have a selected pair even when controlling nomination times out"
);
let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel::<bytes::Bytes>();
let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel::<bytes::Bytes>();
struct Chan(tokio::sync::mpsc::UnboundedSender<bytes::Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for Chan {
async fn receive(&self, packet: bytes::Bytes, _addr: std::net::SocketAddr) {
let _ = self.0.send(packet);
}
}
controlling
.inner
.data_receiver
.lock()
.replace(Arc::new(Chan(tx1)));
controlled
.inner
.data_receiver
.lock()
.replace(Arc::new(Chan(tx2)));
let test_payload = bytes::Bytes::from_static(b"\xffhello-after-nomination-timeout");
let ctrl_socket_rx = controlling.subscribe_selected_socket();
let ctrd_socket_rx = controlled.subscribe_selected_socket();
let ctrl_sock = timeout(Duration::from_secs(3), async {
let mut rx = ctrl_socket_rx;
loop {
if rx.borrow().is_some() {
return rx.borrow().clone();
}
if rx.changed().await.is_err() {
return None;
}
}
})
.await
.ok()
.flatten();
let ctrd_sock = timeout(Duration::from_secs(3), async {
let mut rx = ctrd_socket_rx;
loop {
if rx.borrow().is_some() {
return rx.borrow().clone();
}
if rx.changed().await.is_err() {
return None;
}
}
})
.await
.ok()
.flatten();
if let (Some(sock), Some(ctrl_pair)) = (ctrl_sock, controlling.get_selected_pair().await) {
let _ = sock.send_to(&test_payload, ctrl_pair.remote.address).await;
let received = timeout(Duration::from_secs(2), rx2.recv()).await;
if let Ok(Some(pkt)) = received {
assert_eq!(
&pkt[..],
&test_payload[..],
"Received payload mismatch after nomination timeout"
);
}
let _ = ctrd_sock;
let _ = rx1;
}
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_race_under_high_packet_loss() -> Result<()> {
struct ScopeGuard {
prev: u32,
}
impl Drop for ScopeGuard {
fn drop(&mut self) {
PACKET_LOSS_RATE.store(self.prev, Ordering::SeqCst);
}
}
let _guard = ScopeGuard {
prev: PACKET_LOSS_RATE.swap(8000, Ordering::SeqCst),
};
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_secs(3);
config1.stun_timeout = Duration::from_secs(1);
config2.nomination_timeout = Duration::from_secs(3);
config2.stun_timeout = Duration::from_secs(1);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(15)),
wait_ice_connected(ctrd_state, Duration::from_secs(15)),
);
if !ok1 || !ok2 {
return Ok(());
}
let nom_result = timeout(Duration::from_secs(5), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await;
assert!(
nom_result.is_ok(),
"Under 80% packet loss, nomination_complete must still fire (Some(true) or Some(false)), \
but it timed out (hung indefinitely). This reproduces the log issue where the connection \
gets stuck waiting for nomination."
);
let nom = nom_result.unwrap();
assert!(
nom.is_some(),
"nomination_complete value must be Some(_), got None. \
This means the watch channel was closed unexpectedly."
);
println!(
"High packet loss nomination result: {:?} (either is acceptable, None is not)",
nom
);
Ok(())
}
#[test]
fn test_upnp_default_constants() {
assert!(MIN_LEASE_DURATION > 0);
assert!(MAX_LEASE_DURATION > MIN_LEASE_DURATION);
assert!(DEFAULT_LEASE_DURATION >= MIN_LEASE_DURATION);
assert!(DEFAULT_LEASE_DURATION <= MAX_LEASE_DURATION);
assert_eq!(DEFAULT_LEASE_DURATION, 3600); }
#[test]
fn test_port_mapping_expiry() {
let mapping = PortMapping {
external_port: 12345,
internal_addr: "192.168.1.100:5000".parse().unwrap(),
lease_duration: 70, description: "test".to_string(),
created_at: std::time::Instant::now(),
};
assert!(!mapping.is_expired_or_stale());
let remaining = mapping.remaining_lifetime();
assert!(remaining >= 69 && remaining <= 70);
}
#[test]
fn test_port_mapping_remaining_lifetime() {
let mapping = PortMapping {
external_port: 12345,
internal_addr: "192.168.1.100:5000".parse().unwrap(),
lease_duration: 60,
description: "test".to_string(),
created_at: std::time::Instant::now(),
};
let remaining = mapping.remaining_lifetime();
assert!(remaining > 55 && remaining <= 60);
std::thread::sleep(std::time::Duration::from_millis(100));
let new_remaining = mapping.remaining_lifetime();
assert!(
new_remaining <= remaining,
"new_remaining should not increase"
);
}
#[test]
fn test_upnp_mapper_creation() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
assert!(mapper.is_enabled());
assert!(!mapper.has_gateway());
}
#[test]
fn test_upnp_mapper_custom_lease() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::with_lease_duration(addr, 100);
assert_eq!(mapper.default_lease_duration, MIN_LEASE_DURATION);
let mapper = UpnpPortMapper::with_lease_duration(addr, 100000);
assert_eq!(mapper.default_lease_duration, MAX_LEASE_DURATION);
let mapper = UpnpPortMapper::with_lease_duration(addr, 1800);
assert_eq!(mapper.default_lease_duration, 1800);
}
#[test]
fn test_upnp_mapper_disable_enable() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
assert!(mapper.is_enabled());
assert!(!mapper.has_gateway());
mapper.disable();
assert!(!mapper.is_enabled());
mapper.enable();
assert!(mapper.is_enabled());
}
#[tokio::test]
async fn test_upnp_mapper_loopback_rejection() {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
let result = mapper.discover().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("loopback"));
}
#[tokio::test]
async fn test_upnp_mapper_disabled() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
mapper.disable();
let result = mapper.discover().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("disabled"));
}
#[tokio::test]
async fn test_upnp_mapper_no_gateway() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
let result = mapper.add_mapping(12345).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No UPnP gateway"));
}
#[tokio::test]
async fn test_upnp_mapper_clone() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
let cloned = mapper.clone();
assert!(!cloned.has_gateway());
assert_eq!(cloned.local_addr, addr);
assert!(cloned.is_enabled());
}
#[test]
fn test_config_upnp_defaults() {
let config = RtcConfiguration::default();
assert!(!config.enable_upnp, "UPnP should be disabled by default");
assert_eq!(
config.upnp_lease_duration, 3600,
"Default lease should be 1 hour"
);
}
#[test]
fn test_config_builder_upnp_methods() {
let config = RtcConfigurationBuilder::new()
.enable_upnp(false)
.upnp_lease_duration(7200)
.build();
assert!(!config.enable_upnp);
assert_eq!(config.upnp_lease_duration, 7200);
}
#[tokio::test]
async fn test_upnp_disabled_when_relay_policy() {
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.enable_upnp = true;
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await.unwrap();
let mappers = gatherer.upnp_mappers.lock();
assert!(
mappers.is_empty(),
"UPnP should not be used in Relay-only mode"
);
}
#[tokio::test]
async fn test_upnp_disabled_in_config() {
let mut config = RtcConfiguration::default();
config.enable_upnp = false;
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await.unwrap();
let candidates = gatherer.local_candidates();
let has_host = candidates.iter().any(|c| c.typ == IceCandidateType::Host);
assert!(has_host, "Should have host candidates");
let mappers = gatherer.upnp_mappers.lock();
assert!(
mappers.is_empty(),
"Should not have UPnP mappers when disabled"
);
}
#[tokio::test]
async fn test_upnp_cleanup_mappings() {
let config = RtcConfiguration::default();
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
assert_eq!(gatherer.upnp_mappers.lock().len(), 0);
{
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
gatherer.upnp_mappers.lock().push(mapper);
}
assert_eq!(gatherer.upnp_mappers.lock().len(), 1);
gatherer.cleanup_upnp_mappings().await;
assert_eq!(gatherer.upnp_mappers.lock().len(), 0);
}
#[tokio::test]
async fn test_upnp_ipv6_rejection() {
let addr: SocketAddr = "[::1]:5000".parse().unwrap();
assert!(addr.ip().is_loopback());
let config = RtcConfiguration::default();
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await.unwrap();
let mappers = gatherer.upnp_mappers.lock();
for mapper in mappers.iter() {
assert!(
!mapper.local_addr.is_ipv6(),
"UPnP mappers should not have IPv6 addresses"
);
}
}
#[tokio::test]
async fn test_upnp_gathering_graceful_errors() {
let mut config = RtcConfiguration::default();
config.enable_upnp = true;
config.upnp_lease_duration = 1800;
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
let result = gatherer.gather().await;
assert!(result.is_ok(), "Gather should succeed even if UPnP fails");
let candidates = gatherer.local_candidates();
let has_host = candidates.iter().any(|c| c.typ == IceCandidateType::Host);
assert!(has_host, "Should have host candidates even if UPnP fails");
}
#[test]
fn test_upnp_library_exports() {
use crate::UpnpPortMapper;
use crate::{DEFAULT_LEASE_DURATION, MAX_LEASE_DURATION, MIN_LEASE_DURATION};
let _ = DEFAULT_LEASE_DURATION;
let _ = MIN_LEASE_DURATION;
let _ = MAX_LEASE_DURATION;
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let _mapper = UpnpPortMapper::new(addr);
}
#[tokio::test]
async fn test_rtp_mode_upnp_disabled() {
use crate::TransportMode;
let mut config = RtcConfiguration::default();
config.transport_mode = TransportMode::Rtp;
config.enable_upnp = false;
let (transport, _runner) = IceTransport::new(config);
let local_addr = transport.setup_direct_rtp_offer().await.unwrap();
let candidates = transport.local_candidates();
assert!(!candidates.is_empty());
let host_candidate = candidates
.iter()
.find(|c| c.typ == IceCandidateType::Host)
.unwrap();
assert_eq!(host_candidate.address.port(), local_addr.port());
let mappers = transport.inner.gatherer.upnp_mappers.lock();
assert!(mappers.is_empty());
}
#[tokio::test]
async fn test_rtp_mode_upnp_enabled_graceful() {
use crate::TransportMode;
let mut config = RtcConfiguration::default();
config.transport_mode = TransportMode::Rtp;
config.enable_upnp = true;
let (transport, _runner) = IceTransport::new(config);
let result = transport.setup_direct_rtp_offer().await;
assert!(
result.is_ok(),
"setup_direct_rtp_offer should succeed even if UPnP fails"
);
let candidates = transport.local_candidates();
assert!(!candidates.is_empty());
}
#[tokio::test]
async fn use_candidate_nominates_first_pair() -> Result<()> {
let (t1, r1) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlling)
.build();
let (t2, r2) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlled)
.build();
tokio::spawn(r1);
tokio::spawn(r2);
for c in t1.local_candidates() {
t2.add_remote_candidate(c);
}
for c in t2.local_candidates() {
t1.add_remote_candidate(c);
}
let t1c = t1.clone();
let t2c = t2.clone();
let mut cand_rx1 = t1.subscribe_candidates();
let mut cand_rx2 = t2.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = cand_rx1.recv().await {
t2c.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = cand_rx2.recv().await {
t1c.add_remote_candidate(c);
}
});
t1.start(t2.local_parameters())?;
t2.start(t1.local_parameters())?;
let wait_connected = |mut rx: watch::Receiver<IceTransportState>| async move {
loop {
if *rx.borrow_and_update() == IceTransportState::Connected {
return Ok::<_, anyhow::Error>(());
}
if rx.changed().await.is_err() {
anyhow::bail!("state channel closed");
}
}
};
timeout(
Duration::from_secs(10),
futures::future::try_join(
wait_connected(t1.subscribe_state()),
wait_connected(t2.subscribe_state()),
),
)
.await
.context("timed out waiting for ICE connection")??;
assert!(
t2.get_selected_pair().await.is_some(),
"controlled side must have a selected pair after the first USE-CANDIDATE"
);
Ok(())
}
#[tokio::test]
async fn use_candidate_no_renomination_after_nomination() -> Result<()> {
let (t1, r1) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlling)
.build();
let (t2, r2) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlled)
.build();
tokio::spawn(r1);
tokio::spawn(r2);
for c in t1.local_candidates() {
t2.add_remote_candidate(c);
}
for c in t2.local_candidates() {
t1.add_remote_candidate(c);
}
let t1c = t1.clone();
let t2c = t2.clone();
let mut cand_rx1 = t1.subscribe_candidates();
let mut cand_rx2 = t2.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = cand_rx1.recv().await {
t2c.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = cand_rx2.recv().await {
t1c.add_remote_candidate(c);
}
});
t1.start(t2.local_parameters())?;
t2.start(t1.local_parameters())?;
let wait_connected = |mut rx: watch::Receiver<IceTransportState>| async move {
loop {
if *rx.borrow_and_update() == IceTransportState::Connected {
return Ok::<_, anyhow::Error>(());
}
if rx.changed().await.is_err() {
anyhow::bail!("state channel closed");
}
}
};
timeout(
Duration::from_secs(10),
futures::future::try_join(
wait_connected(t1.subscribe_state()),
wait_connected(t2.subscribe_state()),
),
)
.await
.context("timed out waiting for ICE connection")??;
let nominated_pair = t2
.get_selected_pair()
.await
.expect("controlled side must have a selected pair after nomination");
let mut pair_rx = t2.subscribe_selected_pair();
let _ = pair_rx.borrow_and_update();
let second_socket = UdpSocket::bind("127.0.0.1:0").await?;
let second_addr = second_socket.local_addr()?;
assert_ne!(
second_addr, nominated_pair.remote.address,
"second socket must be a distinct address"
);
t2.add_remote_candidate(IceCandidate::host(second_addr, 1));
let controlled_addr = nominated_pair.local.base_address();
let tx_id = random_bytes::<12>();
let mut msg = StunMessage::binding_request(tx_id, None);
msg.attributes.push(StunAttribute::UseCandidate);
let bytes = msg.encode(None, false)?;
second_socket.send_to(&bytes, controlled_addr).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
!pair_rx.has_changed().unwrap_or(true),
"selected_pair must NOT be updated by a USE-CANDIDATE after initial nomination \
(re-nomination guard is missing or broken)"
);
let final_pair = t2
.get_selected_pair()
.await
.expect("selected pair should still be present");
assert_eq!(
nominated_pair.remote.address, final_pair.remote.address,
"remote address of selected pair must not change after subsequent USE-CANDIDATE keepalives"
);
Ok(())
}