use super::*;
use crate::config::RtcConfigurationBuilder;
use crate::transports::PacketReceiver;
use crate::transports::ice::upnp::{
DEFAULT_LEASE_DURATION, MAX_LEASE_DURATION, MIN_LEASE_DURATION, PortMapping, UpnpPortMapper,
};
use crate::{IceServer, IceTransportPolicy, RtcConfiguration};
use ::turn::{
auth::{AuthHandler, generate_auth_key},
relay::relay_static::RelayAddressGeneratorStatic,
server::{
Server,
config::{ConnConfig, ServerConfig},
},
};
use anyhow::Result;
use bytes::Bytes;
use futures::FutureExt;
use tokio::sync::broadcast;
use serial_test::serial;
use tokio::time::{Duration, timeout};
type TurnResult<T> = std::result::Result<T, ::turn::Error>;
#[test]
fn parse_turn_uri() {
let uri = IceServerUri::parse("turn:example.com:3478?transport=tcp").unwrap();
assert_eq!(uri.host, "example.com");
assert_eq!(uri.port, 3478);
assert_eq!(uri.transport, IceTransportProtocol::Tcp);
assert_eq!(uri.kind, IceUriKind::Turn);
}
#[tokio::test]
async fn builder_starts_gathering() {
let (transport, runner) = IceTransportBuilder::new(RtcConfiguration::default()).build();
tokio::spawn(runner);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(matches!(
transport.gather_state(),
IceGathererState::Complete
));
}
#[tokio::test]
async fn stun_probe_yields_server_reflexive_candidate() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config
.ice_servers
.push(IceServer::new(vec![turn_server.stun_url()]));
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
assert!(
candidates
.iter()
.any(|c| matches!(c.typ, IceCandidateType::ServerReflexive))
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
async fn stun_candidate_raddr_is_not_unspecified() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config
.ice_servers
.push(IceServer::new(vec![turn_server.stun_url()]));
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
let srflx = candidates
.iter()
.find(|c| matches!(c.typ, IceCandidateType::ServerReflexive));
if let Some(candidate) = srflx {
if let Some(raddr) = candidate.related_address {
assert!(
!raddr.ip().is_unspecified(),
"STUN candidate raddr should not be unspecified (0.0.0.0), got: {}",
raddr.ip()
);
let host_addresses: Vec<_> = candidates
.iter()
.filter(|c| matches!(c.typ, IceCandidateType::Host))
.map(|c| c.address.ip())
.collect();
assert!(
host_addresses.contains(&raddr.ip()),
"STUN candidate raddr ({}) should match a host candidate address. Host addresses: {:?}",
raddr.ip(),
host_addresses
);
} else {
panic!("STUN candidate should have a related_address");
}
}
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn turn_probe_yields_relay_candidate() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
assert!(
candidates
.iter()
.any(|c| matches!(c.typ, IceCandidateType::Relay))
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
async fn policy_relay_only_gathers_relay_candidates() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
config
.ice_servers
.push(IceServer::new(vec![turn_server.stun_url()]));
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
assert!(!candidates.is_empty());
for c in candidates {
assert_eq!(
c.typ,
IceCandidateType::Relay,
"Found non-relay candidate: {:?}",
c
);
}
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn turn_client_can_create_permission() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let uri = IceServerUri::parse(&turn_server.turn_url())?;
let server =
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD);
let client = TurnClient::connect(&uri, false).await?;
let creds = TurnCredentials::from_server(&server)?;
client.allocate(creds).await?;
let peer: SocketAddr = "127.0.0.1:5000".parse().unwrap();
client.create_permission(peer).await?;
turn_server.stop().await?;
Ok(())
}
#[test]
fn candidate_pair_priority_calculation() {
let local = IceCandidate::host("127.0.0.1:1000".parse().unwrap(), 1);
let remote = IceCandidate::host("127.0.0.1:2000".parse().unwrap(), 1);
let pair = IceCandidatePair::new(local.clone(), remote.clone());
let p1 = pair.priority(IceRole::Controlling);
let p2 = pair.priority(IceRole::Controlled);
assert_eq!(p1, p2);
let local_relay = IceCandidate::relay("127.0.0.1:1000".parse().unwrap(), 1, "udp");
let pair2 = IceCandidatePair::new(local_relay, remote);
let prio_controlling = pair2.priority(IceRole::Controlling);
let prio_controlled = pair2.priority(IceRole::Controlled);
assert!(prio_controlled > prio_controlling);
assert_eq!(prio_controlled - prio_controlling, 1);
}
#[tokio::test]
#[serial]
async fn turn_connection_relay_to_host() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut config1 = RtcConfiguration::default();
config1.ice_transport_policy = IceTransportPolicy::Relay;
config1.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (transport1, runner1) = IceTransportBuilder::new(config1)
.role(IceRole::Controlling)
.build();
tokio::spawn(runner1);
let config2 = RtcConfiguration::default();
let (transport2, runner2) = IceTransportBuilder::new(config2)
.role(IceRole::Controlled)
.build();
tokio::spawn(runner2);
tokio::time::sleep(Duration::from_millis(100)).await;
let t1 = transport1.clone();
let t2 = transport2.clone();
let mut rx1 = t1.subscribe_candidates();
let mut rx2 = t2.subscribe_candidates();
for c in t1.local_candidates() {
t2.add_remote_candidate(c);
}
for c in t2.local_candidates() {
t1.add_remote_candidate(c);
}
tokio::spawn(async move {
while let Ok(c) = rx1.recv().await {
t2.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = rx2.recv().await {
t1.add_remote_candidate(c);
}
});
let state1 = transport1.subscribe_state();
let state2 = transport2.subscribe_state();
transport1.start(transport2.local_parameters())?;
transport2.start(transport1.local_parameters())?;
let wait_connected = |mut state: watch::Receiver<IceTransportState>, name: &'static str| async move {
loop {
let s = *state.borrow_and_update();
if s == IceTransportState::Connected {
return Ok(());
}
if s == IceTransportState::Failed {
return Err(anyhow::anyhow!("Transport {} failed", name));
}
if state.changed().await.is_err() {
return Err(anyhow::anyhow!("Transport {} state channel closed", name));
}
}
};
let result = tokio::try_join!(
timeout(Duration::from_secs(15), wait_connected(state1, "1")),
timeout(Duration::from_secs(15), wait_connected(state2, "2"))
);
if let Err(e) = &result {
eprintln!("Connection failed: {:?}", e);
}
let (r1, r2) = result?;
r1?;
r2?;
let pair1 = transport1.get_selected_pair().await.unwrap();
assert_eq!(pair1.local.typ, IceCandidateType::Relay);
let (tx1, mut rx1_data) = tokio::sync::mpsc::channel(10);
let (tx2, mut rx2_data) = tokio::sync::mpsc::channel(10);
struct TestReceiver(tokio::sync::mpsc::Sender<Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for TestReceiver {
async fn receive(&self, packet: Bytes, _addr: SocketAddr) {
let _ = self.0.send(packet).await;
}
}
transport1
.set_data_receiver(Arc::new(TestReceiver(tx1)))
.await;
transport2
.set_data_receiver(Arc::new(TestReceiver(tx2)))
.await;
let socket1 = transport1.get_selected_socket().await.unwrap();
let pair1 = transport1.get_selected_pair().await.unwrap();
let data = Bytes::from_static(b"hello from 1");
socket1.send_to(&data, pair1.remote.address).await?;
let received = timeout(Duration::from_secs(5), rx2_data.recv())
.await?
.unwrap();
assert_eq!(received, data);
let socket2 = transport2.get_selected_socket().await.unwrap();
let pair2 = transport2.get_selected_pair().await.unwrap();
let data2 = Bytes::from_static(b"hello from 2");
socket2.send_to(&data2, pair2.remote.address).await?;
let received2 = timeout(Duration::from_secs(5), rx1_data.recv())
.await?
.unwrap();
assert_eq!(received2, data2);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_ice_connection_timeout() -> Result<()> {
let mut config = RtcConfiguration::default();
config.ice_connection_timeout = Duration::from_millis(100);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
transport
.inner
.state
.send(IceTransportState::Connected)
.unwrap();
tokio::time::sleep(Duration::from_millis(1200)).await;
assert_eq!(transport.state(), IceTransportState::Failed);
Ok(())
}
const TEST_USERNAME: &str = "test";
const TEST_PASSWORD: &str = "test";
const TEST_REALM: &str = ".turn";
struct TestTurnServer {
server: Option<Server>,
addr: SocketAddr,
}
impl TestTurnServer {
async fn start() -> Result<Self> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let addr = socket.local_addr()?;
let conn = Arc::new(socket);
let relay_addr_generator = Box::new(RelayAddressGeneratorStatic {
relay_address: addr.ip(),
address: "0.0.0.0".to_string(),
net: Arc::new(webrtc_util::vnet::net::Net::new(None)),
});
let auth_handler = Arc::new(StaticAuthHandler::new(
TEST_USERNAME.to_string(),
TEST_PASSWORD.to_string(),
));
let config = ServerConfig {
conn_configs: vec![ConnConfig {
conn,
relay_addr_generator,
}],
realm: TEST_REALM.to_string(),
auth_handler,
channel_bind_timeout: Duration::from_secs(600),
alloc_close_notify: None,
};
let server = Server::new(config).await?;
Ok(Self {
server: Some(server),
addr,
})
}
fn stun_url(&self) -> String {
format!("stun:{}", self.addr)
}
fn turn_url(&self) -> String {
format!("turn:{}", self.addr)
}
async fn stop(&mut self) -> Result<()> {
if let Some(server) = self.server.take() {
server.close().await?;
}
Ok(())
}
}
struct StaticAuthHandler {
username: String,
password: String,
}
impl StaticAuthHandler {
fn new(username: String, password: String) -> Self {
Self { username, password }
}
}
impl AuthHandler for StaticAuthHandler {
fn auth_handle(
&self,
username: &str,
realm: &str,
_src_addr: SocketAddr,
) -> TurnResult<Vec<u8>> {
if username != self.username {
return Err(::turn::Error::ErrNoSuchUser);
}
Ok(generate_auth_key(username, realm, &self.password))
}
}
#[test]
fn ice_candidate_foundation_compliance() {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
let host = IceCandidate::host(addr, 1);
assert!(!host.foundation.contains(':'));
assert!(host.foundation.chars().all(|c| c.is_ascii_alphanumeric()));
let sdp = host.to_sdp();
assert!(sdp.contains(" typ host"));
let parts: Vec<&str> = sdp.split_whitespace().collect();
let foundation = parts[0];
assert_eq!(foundation, host.foundation);
let mapped: SocketAddr = "1.2.3.4:5000".parse().unwrap();
let srflx = IceCandidate::server_reflexive(addr, mapped, 1);
assert!(!srflx.foundation.contains(':'));
assert!(srflx.foundation.chars().all(|c| c.is_ascii_alphanumeric()));
let srflx2 = IceCandidate::server_reflexive(addr, "1.2.3.5:6000".parse().unwrap(), 1);
assert_eq!(srflx.foundation, srflx2.foundation);
let addr2: SocketAddr = "192.168.0.1:5000".parse().unwrap();
let srflx3 = IceCandidate::server_reflexive(addr2, mapped, 1);
assert_ne!(srflx.foundation, srflx3.foundation);
let relay = IceCandidate::relay(mapped, 1, "udp");
assert!(!relay.foundation.contains(':'));
let host_same_addr = IceCandidate::host(addr, 1);
let srflx_same_base = IceCandidate::server_reflexive(addr, mapped, 1);
assert_ne!(host_same_addr.foundation, srflx_same_base.foundation);
}
#[tokio::test]
#[serial]
async fn test_ice_lite_stun_response() -> Result<()> {
use crate::TransportMode;
let mut config = RtcConfiguration::default();
config.transport_mode = TransportMode::Rtp;
config.enable_ice_lite = true;
config.bind_ip = Some("127.0.0.1".to_string());
let (ice_lite, runner) = IceTransport::new(config);
tokio::spawn(runner);
let local_addr = ice_lite.setup_direct_rtp_offer().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let _local_params = ice_lite.local_parameters();
let remote_params = IceParameters::new("remote_ufrag", "remote_pwd_12345");
ice_lite.set_remote_parameters(remote_params.clone());
ice_lite.set_role(IceRole::Controlled);
let remote_socket = UdpSocket::bind("127.0.0.1:0").await?;
let remote_addr = remote_socket.local_addr()?;
let tx_id = crate::transports::ice::stun::random_bytes::<12>();
let binding_request = StunMessage::binding_request(tx_id, Some("ice-lite-test"));
let request_bytes = binding_request.encode(None, false)?;
println!(
"Sending STUN Binding Request from {} to ICE-lite agent at {}",
remote_addr, local_addr
);
let mut buf = [0u8; 1500];
let (len, response_from) = {
let mut result = None;
for _ in 0..3 {
remote_socket.send_to(&request_bytes, local_addr).await?;
match tokio::time::timeout(Duration::from_secs(2), remote_socket.recv_from(&mut buf))
.await
{
Ok(Ok(recv_result)) => {
result = Some(Ok(recv_result));
break;
}
Ok(Err(e)) => {
result = Some(Err(anyhow::anyhow!("Socket recv error: {}", e)));
}
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
result.ok_or_else(|| anyhow::anyhow!("Should receive STUN response within 5 seconds"))??
};
println!(
"Received STUN response from {}, {} bytes",
response_from, len
);
assert_eq!(
response_from, local_addr,
"Response should come from ICE-lite local address"
);
let decoded_response = StunMessage::decode(&buf[..len])?;
assert_eq!(
decoded_response.class,
crate::transports::ice::stun::StunClass::SuccessResponse
);
assert_eq!(
decoded_response.method,
crate::transports::ice::stun::StunMethod::Binding
);
assert_eq!(
decoded_response.transaction_id, tx_id,
"Transaction ID should match request"
);
assert!(
decoded_response.xor_mapped_address.is_some(),
"STUN response should contain XOR-MAPPED-ADDRESS"
);
let mapped_addr = decoded_response.xor_mapped_address.unwrap();
assert_eq!(
mapped_addr, remote_addr,
"XOR-MAPPED-ADDRESS should reflect remote agent's address"
);
println!("✓ ICE-lite correctly responded to STUN binding request");
println!("✓ Response contains correct transaction ID and XOR-MAPPED-ADDRESS");
let candidates = ice_lite.remote_candidates();
let prflx_candidates: Vec<_> = candidates
.iter()
.filter(|c| c.typ == IceCandidateType::PeerReflexive && c.address == remote_addr)
.collect();
assert!(
!prflx_candidates.is_empty(),
"Remote address should be added as peer-reflexive candidate"
);
println!(
"✓ Peer-reflexive candidate discovered for remote address {}",
remote_addr
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_ice_lite_connectivity_establishment() -> Result<()> {
use crate::TransportMode;
let mut lite_config = RtcConfiguration::default();
lite_config.transport_mode = TransportMode::Rtp;
lite_config.enable_ice_lite = true;
lite_config.bind_ip = Some("127.0.0.1".to_string());
let (ice_lite, lite_runner) = IceTransport::new(lite_config);
tokio::spawn(lite_runner);
let full_config = RtcConfiguration::default();
let (ice_full, full_runner) = IceTransportBuilder::new(full_config)
.role(IceRole::Controlling)
.build();
tokio::spawn(full_runner);
let _lite_addr = ice_lite.setup_direct_rtp_offer().await?;
let lite_params = ice_lite.local_parameters();
let full_params = ice_full.local_parameters();
ice_lite.set_remote_parameters(full_params.clone());
ice_lite.set_role(IceRole::Controlled);
let lite_candidates = ice_lite.local_candidates();
assert!(
!lite_candidates.is_empty(),
"ICE-lite should have local candidates"
);
for candidate in lite_candidates {
ice_full.add_remote_candidate(candidate);
}
ice_full.start(lite_params.clone())?;
tokio::time::sleep(Duration::from_millis(100)).await;
let full_candidates = ice_full.local_candidates();
let full_host_candidate = full_candidates
.iter()
.find(|c| c.typ == IceCandidateType::Host)
.expect("Full ICE agent should have host candidate")
.clone();
ice_lite.complete_direct_rtp(full_host_candidate.address);
ice_lite.add_remote_candidate(full_host_candidate);
let lite_state = ice_lite.subscribe_state();
let full_state = ice_full.subscribe_state();
async fn wait_connected(
mut state: watch::Receiver<IceTransportState>,
name: &str,
) -> Result<()> {
for _ in 0..50 {
let current_state = *state.borrow();
if current_state == IceTransportState::Connected {
println!("{} transport connected", name);
return Ok(());
}
if current_state == IceTransportState::Failed {
return Err(anyhow::anyhow!("{} transport failed", name));
}
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = state.changed().now_or_never();
}
Err(anyhow::anyhow!(
"{} transport did not connect within timeout",
name
))
}
tokio::try_join!(
wait_connected(lite_state, "ICE-lite"),
wait_connected(full_state, "Full ICE")
)?;
let lite_pair = ice_lite.get_selected_pair().await.unwrap();
let full_pair = ice_full.get_selected_pair().await.unwrap();
println!(
"ICE-lite selected pair: {} -> {}",
lite_pair.local.address, lite_pair.remote.address
);
println!(
"Full ICE selected pair: {} -> {}",
full_pair.local.address, full_pair.remote.address
);
let (lite_tx, mut lite_rx) = tokio::sync::mpsc::channel(10);
let (full_tx, mut full_rx) = tokio::sync::mpsc::channel(10);
struct DataReceiver(tokio::sync::mpsc::Sender<Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for DataReceiver {
async fn receive(&self, packet: Bytes, _addr: SocketAddr) {
if !packet.is_empty() && packet[0] >= 2 {
let _ = self.0.send(packet).await;
}
}
}
ice_lite
.set_data_receiver(Arc::new(DataReceiver(lite_tx)))
.await;
ice_full
.set_data_receiver(Arc::new(DataReceiver(full_tx)))
.await;
let full_socket = ice_full.get_selected_socket().await.unwrap();
let test_data = Bytes::from_static(b"Hello from full ICE agent");
full_socket
.send_to(&test_data, full_pair.remote.address)
.await?;
let received_by_lite = timeout(Duration::from_secs(5), lite_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("ICE-lite did not receive data"))?;
assert_eq!(received_by_lite, test_data);
let lite_socket = ice_lite.get_selected_socket().await.unwrap();
let response_data = Bytes::from_static(b"Hello from ICE-lite agent");
lite_socket
.send_to(&response_data, lite_pair.remote.address)
.await?;
let received_by_full = timeout(Duration::from_secs(5), full_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("Full ICE agent did not receive data"))?;
assert_eq!(received_by_full, response_data);
println!("✓ ICE-lite successfully established connectivity with full ICE agent");
println!("✓ Bidirectional data flow verified");
Ok(())
}
#[test]
fn test_nomination_timeout_larger_than_stun_timeout() {
let config = RtcConfiguration::default();
assert!(
config.nomination_timeout > config.stun_timeout,
"nomination_timeout ({:?}) must be > stun_timeout ({:?}) to allow more retransmissions",
config.nomination_timeout,
config.stun_timeout,
);
}
#[test]
fn test_nomination_timeout_builder() {
use crate::config::RtcConfigurationBuilder;
let custom = std::time::Duration::from_secs(20);
let config = RtcConfigurationBuilder::new()
.nomination_timeout(custom)
.build();
assert_eq!(config.nomination_timeout, custom);
assert_eq!(config.stun_timeout, std::time::Duration::from_secs(5));
}
async fn setup_host_pair(
controlling_config: RtcConfiguration,
controlled_config: RtcConfiguration,
) -> (IceTransport, IceTransport) {
let (controlling, runner_c) = IceTransportBuilder::new(controlling_config)
.role(IceRole::Controlling)
.build();
tokio::spawn(runner_c);
let (controlled, runner_d) = IceTransportBuilder::new(controlled_config)
.role(IceRole::Controlled)
.build();
tokio::spawn(runner_d);
for c in controlling.local_candidates() {
controlled.add_remote_candidate(c);
}
for c in controlled.local_candidates() {
controlling.add_remote_candidate(c);
}
let ctrl_clone = controlling.clone();
let ctrd_clone = controlled.clone();
let mut rx_ctrl = controlling.subscribe_candidates();
let mut rx_ctrd = controlled.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = rx_ctrl.recv().await {
ctrd_clone.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = rx_ctrd.recv().await {
ctrl_clone.add_remote_candidate(c);
}
});
controlling
.start(controlled.local_parameters())
.expect("controlling.start");
controlled
.start(controlling.local_parameters())
.expect("controlled.start");
(controlling, controlled)
}
async fn wait_ice_connected(
mut state_rx: watch::Receiver<IceTransportState>,
deadline: Duration,
) -> bool {
let result = timeout(deadline, async move {
loop {
let s = *state_rx.borrow_and_update();
match s {
IceTransportState::Connected | IceTransportState::Completed => return true,
IceTransportState::Failed => return false,
_ => {}
}
if state_rx.changed().await.is_err() {
return false;
}
}
})
.await;
result.unwrap_or(false)
}
#[tokio::test]
#[serial]
async fn test_nomination_complete_fires_on_connection() -> Result<()> {
let config1 = RtcConfiguration::default();
let config2 = RtcConfiguration::default();
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_nomination_rx = controlling.subscribe_nomination_complete();
let mut ctrd_nomination_rx = controlled.subscribe_nomination_complete();
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling agent failed to reach Connected");
assert!(ok2, "Controlled agent failed to reach Connected");
let ctrl_result = timeout(Duration::from_secs(5), async {
if ctrl_nomination_rx.borrow().is_some() {
return *ctrl_nomination_rx.borrow();
}
ctrl_nomination_rx.changed().await.ok()?;
*ctrl_nomination_rx.borrow()
})
.await
.expect("nomination_complete timed out on controlling side");
assert_eq!(
ctrl_result,
Some(true),
"Controlling nomination should succeed (Some(true))"
);
let ctrd_result = timeout(Duration::from_secs(5), async {
if ctrd_nomination_rx.borrow().is_some() {
return *ctrd_nomination_rx.borrow();
}
ctrd_nomination_rx.changed().await.ok()?;
*ctrd_nomination_rx.borrow()
})
.await
.expect("nomination_complete timed out on controlled side");
assert_eq!(
ctrd_result,
Some(true),
"Controlled nomination should be Some(true) (after receiving USE-CANDIDATE)"
);
Ok(())
}
#[tokio::test]
async fn test_nomination_uses_nomination_timeout_not_stun_timeout() -> Result<()> {
let mut config = RtcConfiguration::default();
config.stun_timeout = Duration::from_secs(30); config.nomination_timeout = Duration::from_millis(200);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
let local_candidate = IceCandidate::host("127.0.0.1:0".parse().unwrap(), 1);
let remote_candidate = IceCandidate::host("127.0.0.1:1".parse().unwrap(), 1);
let pair = IceCandidatePair::new(local_candidate, remote_candidate);
*transport.inner.role.lock() = IceRole::Controlling;
let remote_params = IceParameters::new("dummy_ufrag", "dummy_password_1234567890");
transport.set_remote_parameters(remote_params);
let mut nomination_rx = transport.subscribe_nomination_complete();
let inner_clone = transport.inner.clone();
let pair_clone = pair.clone();
tokio::spawn(async move {
let result = perform_binding_check(
&pair_clone.local,
&pair_clone.remote,
&inner_clone,
IceRole::Controlling,
true, )
.await;
match result {
Ok(_) => {
let _ = inner_clone.nomination_complete.send(Some(true));
}
Err(_) => {
let _ = inner_clone.nomination_complete.send(Some(false));
}
}
});
let start = std::time::Instant::now();
let result = timeout(Duration::from_secs(5), async {
if nomination_rx.borrow().is_some() {
return *nomination_rx.borrow();
}
nomination_rx.changed().await.ok()?;
*nomination_rx.borrow()
})
.await
.expect("nomination_complete should fire within 5 s");
let elapsed = start.elapsed();
assert_eq!(
result,
Some(false),
"Nomination to a black-hole address should fail"
);
assert!(
elapsed < Duration::from_secs(5),
"Nomination should have timed out using nomination_timeout (200 ms), not stun_timeout (30 s); elapsed: {:?}",
elapsed
);
assert!(
elapsed < Duration::from_secs(2),
"Elapsed ({:?}) should be close to nomination_timeout (200 ms), not stun_timeout (30 s)",
elapsed
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_succeeds_under_moderate_packet_loss() -> Result<()> {
struct ScopeGuard {
prev: u32,
}
impl Drop for ScopeGuard {
fn drop(&mut self) {
PACKET_LOSS_RATE.store(self.prev, Ordering::SeqCst);
}
}
let _guard = ScopeGuard {
prev: PACKET_LOSS_RATE.swap(3000, Ordering::SeqCst),
};
let result: Result<()> = async {
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_secs(15);
config1.stun_timeout = Duration::from_secs(5);
config2.nomination_timeout = Duration::from_secs(15);
config2.stun_timeout = Duration::from_secs(5);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(20)),
wait_ice_connected(ctrd_state, Duration::from_secs(20)),
);
assert!(ok1, "Controlling agent failed to connect under packet loss");
assert!(ok2, "Controlled agent failed to connect under packet loss");
let nom_result = timeout(Duration::from_secs(20), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await
.expect("nomination_complete should fire within 20 s even under 30% loss");
assert_eq!(
nom_result,
Some(true),
"Nomination should succeed under 30% packet loss with nomination_timeout > stun_timeout"
);
Ok(())
}
.await;
result
}
#[test]
fn test_base_address_returns_related_address_for_host_candidate() {
let local_addr: SocketAddr = "192.168.1.100:54321".parse().unwrap();
let external_addr: SocketAddr = "203.0.113.5:54321".parse().unwrap();
let mut candidate = IceCandidate::host(external_addr, 1);
candidate.related_address = Some(local_addr);
assert_eq!(
candidate.base_address(),
local_addr,
"base_address() should return related_address for host candidate with external IP"
);
assert_eq!(
candidate.address, external_addr,
"address should be the external IP"
);
}
#[test]
fn test_base_address_returns_address_when_no_related_address() {
let addr: SocketAddr = "192.168.1.100:54321".parse().unwrap();
let candidate = IceCandidate::host(addr, 1);
assert_eq!(
candidate.base_address(),
addr,
"base_address() should return address when related_address is None"
);
}
#[tokio::test]
#[serial]
async fn test_ice_connection_with_external_ip() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.external_ip = Some("203.0.113.10".to_string());
let mut config2 = RtcConfiguration::default();
config2.external_ip = Some("203.0.113.20".to_string());
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_candidates = controlling.local_candidates();
let non_loopback_candidate = ctrl_candidates
.iter()
.find(|c| !c.address.ip().is_loopback());
if let Some(cand) = non_loopback_candidate {
assert!(
cand.related_address.is_some(),
"Host candidate should have related_address when external_ip is configured"
);
assert_ne!(
cand.address.ip(),
cand.base_address().ip(),
"Candidate address (external) should differ from base_address (local)"
);
}
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(
ok1,
"Controlling agent failed to reach Connected with external_ip"
);
assert!(
ok2,
"Controlled agent failed to reach Connected with external_ip"
);
let selected_pair = controlling.get_selected_pair().await;
assert!(
selected_pair.is_some(),
"Controlling agent should have a selected pair"
);
let selected_pair = controlled.get_selected_pair().await;
assert!(
selected_pair.is_some(),
"Controlled agent should have a selected pair"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_with_external_ip() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.external_ip = Some("203.0.113.10".to_string());
let mut config2 = RtcConfiguration::default();
config2.external_ip = Some("203.0.113.20".to_string());
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let mut ctrd_nom_rx = controlled.subscribe_nomination_complete();
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling agent failed to connect");
assert!(ok2, "Controlled agent failed to connect");
let ctrl_nom = timeout(Duration::from_secs(15), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await
.expect("Controlling nomination_complete should fire");
let ctrd_nom = timeout(Duration::from_secs(5), async {
if ctrd_nom_rx.borrow().is_some() {
return *ctrd_nom_rx.borrow();
}
ctrd_nom_rx.changed().await.ok()?;
*ctrd_nom_rx.borrow()
})
.await
.expect("Controlled nomination_complete should fire");
assert_eq!(
ctrd_nom,
Some(true),
"Controlled side should signal nomination_complete immediately"
);
assert!(
ctrl_nom.is_some(),
"Controlling nomination_complete should fire (got {:?})",
ctrl_nom
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_ice_connection_without_external_ip() -> Result<()> {
let config1 = RtcConfiguration::default();
let config2 = RtcConfiguration::default();
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_candidates = controlling.local_candidates();
for cand in &ctrl_candidates {
if cand.typ == IceCandidateType::Host {
if let Some(related) = cand.related_address {
assert_eq!(
related, cand.address,
"Without external_ip, related_address should equal address"
);
}
assert_eq!(
cand.base_address(),
cand.address,
"Without external_ip, base_address() should equal address"
);
}
}
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(
ok1,
"Controlling agent failed to reach Connected without external_ip"
);
assert!(
ok2,
"Controlled agent failed to reach Connected without external_ip"
);
let ctrl_pair = controlling.get_selected_pair().await;
assert!(
ctrl_pair.is_some(),
"Controlling agent should have a selected pair"
);
let pair = ctrl_pair.unwrap();
assert!(
pair.local.address.port() > 0,
"Local address should have valid port"
);
assert!(
pair.remote.address.port() > 0,
"Remote address should have valid port"
);
let ctrd_pair = controlled.get_selected_pair().await;
assert!(
ctrd_pair.is_some(),
"Controlled agent should have a selected pair"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_delayed_by_dtls_socket_contention() -> Result<()> {
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_millis(500);
config1.stun_timeout = Duration::from_millis(200);
config2.nomination_timeout = Duration::from_millis(500);
config2.stun_timeout = Duration::from_millis(200);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let mut ctrd_nom_rx = controlled.subscribe_nomination_complete();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling ICE failed to connect");
assert!(ok2, "Controlled ICE failed to connect");
let ice_connected_at = std::time::Instant::now();
let ctrd_nom = timeout(Duration::from_millis(600), async {
if ctrd_nom_rx.borrow().is_some() {
return *ctrd_nom_rx.borrow();
}
ctrd_nom_rx.changed().await.ok()?;
*ctrd_nom_rx.borrow()
})
.await;
assert!(
ctrd_nom.is_ok(),
"Controlled side nomination_complete should fire after receiving USE-CANDIDATE (within 600ms), \
but timed out — this means the controlled side never received USE-CANDIDATE"
);
assert_eq!(
ctrd_nom.unwrap(),
Some(true),
"Controlled side should signal nomination success after receiving USE-CANDIDATE"
);
let ctrl_nom = timeout(Duration::from_millis(600), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await;
let elapsed = ice_connected_at.elapsed();
assert!(
ctrl_nom.is_ok(),
"Controlling side nomination_complete should fire within nomination_timeout (500ms + margin), \
elapsed={:?}. If this fails it means nomination is stuck indefinitely.",
elapsed
);
let nom_result = ctrl_nom.unwrap();
assert!(
nom_result.is_some(),
"nomination_complete must be Some(_), got None after {:?}",
elapsed
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_refresh_succeeds_normally() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config.clone(), tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
let relay = candidates
.iter()
.find(|c| c.typ == IceCandidateType::Relay)
.expect("should have relay candidate")
.clone();
let client = {
let clients = gatherer.turn_clients.lock();
clients
.get(&relay.address)
.cloned()
.expect("should have TurnClient for relay")
};
let peer: SocketAddr = "127.0.0.1:12345".parse().unwrap();
client.create_permission(peer).await?;
client.add_channel(peer, 0x4000).await;
let (transport, runner) = IceTransport::new(config);
tokio::spawn(runner);
{
let mut clients = transport.inner.gatherer.turn_clients.lock();
for (addr, c) in gatherer.turn_clients.lock().iter() {
clients.insert(*addr, c.clone());
}
}
transport
.inner
.gatherer
.local_candidates
.lock()
.extend(candidates);
let remote = IceCandidate::host("127.0.0.1:9999".parse().unwrap(), 1);
let pair = IceCandidatePair::new(relay.clone(), remote);
*transport.inner.selected_pair.lock() = Some(pair);
let _ = transport.inner.state.send(IceTransportState::Connected);
IceTransportRunner::run_turn_refresh(&transport.inner).await;
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_client_update_nonce_takes_effect() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let uri = IceServerUri::parse(&turn_server.turn_url())?;
let server =
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD);
let client = TurnClient::connect(&uri, false).await?;
let creds = TurnCredentials::from_server(&server)?;
client.allocate(creds).await?;
client
.update_nonce("new-realm".to_string(), "new-nonce-xyz".to_string())
.await;
let (bytes, _tx_id) = client.create_refresh_packet().await?;
let decoded = StunMessage::decode(&bytes)?;
let has_new_nonce = decoded.nonce.as_deref() == Some("new-nonce-xyz");
assert!(
has_new_nonce,
"Refresh packet should contain the updated nonce; got {:?}",
decoded.nonce
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_refresh_retries_on_stale_nonce() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let uri = IceServerUri::parse(&turn_server.turn_url())?;
let server =
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD);
let client = Arc::new(TurnClient::connect(&uri, false).await?);
let creds = TurnCredentials::from_server(&server)?;
client.allocate(creds).await?;
let peer: SocketAddr = "127.0.0.1:12346".parse().unwrap();
client.create_permission(peer).await?;
client
.update_nonce(TEST_REALM.to_string(), "stale-nonce-AAAA".to_string())
.await;
let (bytes1, tx_id1) = client.create_channel_rebind_packet(peer, 0x4000).await?;
client.send(&bytes1).await?;
let mut buf = [0u8; 1500];
let len = client.recv(&mut buf).await?;
let resp1 = StunMessage::decode(&buf[..len])?;
let _ = tx_id1;
assert!(
matches!(resp1.error_code, Some(400..=438)),
"Expected 4xx error for stale nonce, got {:?}",
resp1.error_code
);
if let (Some(realm), Some(nonce)) = (resp1.realm.clone(), resp1.nonce.clone()) {
client.update_nonce(realm, nonce).await;
}
let (bytes2, _tx_id2) = client.create_channel_rebind_packet(peer, 0x4000).await?;
client.send(&bytes2).await?;
let len2 = client.recv(&mut buf).await?;
let resp2 = StunMessage::decode(&buf[..len2])?;
assert_eq!(
resp2.class,
StunClass::SuccessResponse,
"Second ChannelBind with fresh nonce should succeed, got error={:?}",
resp2.error_code
);
turn_server.stop().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_turn_refresh_tolerates_server_unreachable() -> Result<()> {
let mut turn_server = TestTurnServer::start().await?;
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.ice_servers.push(
IceServer::new(vec![turn_server.turn_url()]).with_credential(TEST_USERNAME, TEST_PASSWORD),
);
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config.clone(), tx, socket_tx);
gatherer.gather().await?;
let candidates = gatherer.local_candidates();
let relay = candidates
.iter()
.find(|c| c.typ == IceCandidateType::Relay)
.expect("should have relay candidate")
.clone();
turn_server.stop().await?;
let (transport, runner) = IceTransport::new(config);
tokio::spawn(runner);
{
let mut clients = transport.inner.gatherer.turn_clients.lock();
for (addr, c) in gatherer.turn_clients.lock().iter() {
clients.insert(*addr, c.clone());
}
}
transport
.inner
.gatherer
.local_candidates
.lock()
.extend(candidates);
let remote = IceCandidate::host("127.0.0.1:9999".parse().unwrap(), 1);
let pair = IceCandidatePair::new(relay, remote);
*transport.inner.selected_pair.lock() = Some(pair);
let _ = transport.inner.state.send(IceTransportState::Connected);
let result = timeout(
Duration::from_secs(25),
IceTransportRunner::run_turn_refresh(&transport.inner),
)
.await;
assert!(
result.is_ok(),
"run_turn_refresh should complete even when server is unreachable"
);
Ok(())
}
#[tokio::test]
async fn test_nomination_fails_immediately_on_host_unreachable() -> Result<()> {
let mut config = RtcConfiguration::default();
config.stun_timeout = Duration::from_secs(30);
config.nomination_timeout = Duration::from_millis(500);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
let local_candidate = IceCandidate::host("127.0.0.1:0".parse().unwrap(), 1);
let remote_candidate = IceCandidate::host("127.0.0.1:1".parse().unwrap(), 1);
*transport.inner.role.lock() = IceRole::Controlling;
transport.set_remote_parameters(IceParameters::new(
"testufrag",
"testpassword_long_enough_1234",
));
let mut nom_rx = transport.subscribe_nomination_complete();
let inner_clone = transport.inner.clone();
let local_clone = local_candidate.clone();
let remote_clone = remote_candidate.clone();
tokio::spawn(async move {
let result = perform_binding_check(
&local_clone,
&remote_clone,
&inner_clone,
IceRole::Controlling,
true,
)
.await;
let signal = if result.is_ok() {
Some(true)
} else {
Some(false)
};
let _ = inner_clone.nomination_complete.send(signal);
});
let start = std::time::Instant::now();
let result = timeout(Duration::from_millis(1500), async {
if nom_rx.borrow().is_some() {
return *nom_rx.borrow();
}
nom_rx.changed().await.ok()?;
*nom_rx.borrow()
})
.await;
let elapsed = start.elapsed();
assert!(
result.is_ok(),
"nomination_complete should fire after nomination_timeout (500ms) when host is \
unreachable, but timed out after {:?}",
elapsed
);
let nom_value = result.unwrap();
assert_eq!(
nom_value,
Some(false),
"Nomination to unreachable address should produce Some(false), got {:?}",
nom_value
);
Ok(())
}
#[tokio::test]
async fn test_dtls_proceeds_after_nomination_timeout() -> Result<()> {
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_millis(1);
config1.stun_timeout = Duration::from_secs(5);
config2.nomination_timeout = Duration::from_millis(1);
config2.stun_timeout = Duration::from_secs(5);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)),
wait_ice_connected(ctrd_state, Duration::from_secs(10)),
);
assert!(ok1, "Controlling ICE failed to connect");
assert!(ok2, "Controlled ICE failed to connect");
let nom = timeout(Duration::from_millis(200), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await;
let ctrl_pair = controlling.get_selected_pair().await;
assert!(
ctrl_pair.is_some(),
"Even when nomination times out, ICE selected pair should exist. nom={:?}",
nom
);
let ctrd_pair = controlled.get_selected_pair().await;
assert!(
ctrd_pair.is_some(),
"Controlled side should have a selected pair even when controlling nomination times out"
);
let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel::<bytes::Bytes>();
let (tx2, mut rx2) = tokio::sync::mpsc::unbounded_channel::<bytes::Bytes>();
struct Chan(tokio::sync::mpsc::UnboundedSender<bytes::Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for Chan {
async fn receive(&self, packet: bytes::Bytes, _addr: std::net::SocketAddr) {
let _ = self.0.send(packet);
}
}
controlling
.inner
.data_receiver
.lock()
.replace(Arc::new(Chan(tx1)));
controlled
.inner
.data_receiver
.lock()
.replace(Arc::new(Chan(tx2)));
let test_payload = bytes::Bytes::from_static(b"\xffhello-after-nomination-timeout");
let ctrl_socket_rx = controlling.subscribe_selected_socket();
let ctrd_socket_rx = controlled.subscribe_selected_socket();
let ctrl_sock = timeout(Duration::from_secs(3), async {
let mut rx = ctrl_socket_rx;
loop {
if rx.borrow().is_some() {
return rx.borrow().clone();
}
if rx.changed().await.is_err() {
return None;
}
}
})
.await
.ok()
.flatten();
let ctrd_sock = timeout(Duration::from_secs(3), async {
let mut rx = ctrd_socket_rx;
loop {
if rx.borrow().is_some() {
return rx.borrow().clone();
}
if rx.changed().await.is_err() {
return None;
}
}
})
.await
.ok()
.flatten();
if let (Some(sock), Some(ctrl_pair)) = (ctrl_sock, controlling.get_selected_pair().await) {
let _ = sock.send_to(&test_payload, ctrl_pair.remote.address).await;
let received = timeout(Duration::from_secs(2), rx2.recv()).await;
if let Ok(Some(pkt)) = received {
assert_eq!(
&pkt[..],
&test_payload[..],
"Received payload mismatch after nomination timeout"
);
}
let _ = ctrd_sock;
let _ = rx1;
}
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_fallback_all_pairs_fail() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_micros(10);
config1.stun_timeout = Duration::from_secs(5);
let mut config2 = RtcConfiguration::default();
config2.nomination_timeout = Duration::from_micros(10);
config2.stun_timeout = Duration::from_secs(5);
let (controlling, _controlled) = setup_host_pair(config1, config2).await;
let mut ctrl_state = controlling.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
assert!(
wait_ice_connected(ctrl_state.clone(), Duration::from_secs(10)).await,
"ICE should connect"
);
let nom_fired = timeout(Duration::from_secs(30), async {
if ctrl_nom_rx.borrow().is_some() {
return;
}
loop {
tokio::select! {
_ = ctrl_nom_rx.changed() => {
return;
}
_ = ctrl_state.changed() => {
if ctrl_nom_rx.borrow().is_some() {
return;
}
}
}
}
})
.await;
assert!(
nom_fired.is_ok(),
"nomination_complete must fire (Some(true) or Some(false)), \
it should never hang"
);
let nom = *ctrl_nom_rx.borrow();
match nom {
Some(true) => {
debug!("Nomination succeeded (fast path) – all good");
}
Some(false) => {
debug!("Nomination failed – verifying ICE transitions to Failed");
let failed = timeout(Duration::from_secs(30), async {
loop {
let s = *ctrl_state.borrow_and_update();
if s == IceTransportState::Failed {
return;
}
if ctrl_state.changed().await.is_err() {
return;
}
}
})
.await;
assert!(
failed.is_ok(),
"ICE should transition to Failed after all pairs fail nomination"
);
}
None => {
panic!("nomination_complete value should be Some(_), got None");
}
}
let pair = controlling.get_selected_pair().await;
assert!(
pair.is_some(),
"selected pair should exist even after nomination outcome"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_fallback_controlled_side_works() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.nomination_timeout = Duration::ZERO;
config1.stun_timeout = Duration::from_secs(5);
let mut config2 = RtcConfiguration::default();
config2.nomination_timeout = Duration::ZERO;
config2.stun_timeout = Duration::from_secs(5);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrd_nom_rx = controlled.subscribe_nomination_complete();
assert!(
wait_ice_connected(ctrl_state.clone(), Duration::from_secs(10)).await,
"Controlling ICE should connect"
);
assert!(
wait_ice_connected(ctrd_state.clone(), Duration::from_secs(10)).await,
"Controlled ICE should connect"
);
let ctrd_nom = timeout(Duration::from_secs(10), async {
if ctrd_nom_rx.borrow().is_some() {
return *ctrd_nom_rx.borrow();
}
ctrd_nom_rx.changed().await.ok()?;
*ctrd_nom_rx.borrow()
})
.await;
assert_eq!(
ctrd_nom,
Ok(Some(true)),
"Controlled side should receive USE-CANDIDATE and set nomination_complete = Some(true)"
);
let ctrd_pair = controlled.get_selected_pair().await;
assert!(
ctrd_pair.is_some(),
"Controlled side should have a selected pair"
);
Ok(())
}
#[tokio::test]
#[serial]
async fn test_nomination_race_under_high_packet_loss() -> Result<()> {
struct ScopeGuard {
prev: u32,
}
impl Drop for ScopeGuard {
fn drop(&mut self) {
PACKET_LOSS_RATE.store(self.prev, Ordering::SeqCst);
}
}
let _guard = ScopeGuard {
prev: PACKET_LOSS_RATE.swap(8000, Ordering::SeqCst),
};
let mut config1 = RtcConfiguration::default();
let mut config2 = RtcConfiguration::default();
config1.nomination_timeout = Duration::from_secs(3);
config1.stun_timeout = Duration::from_secs(1);
config2.nomination_timeout = Duration::from_secs(3);
config2.stun_timeout = Duration::from_secs(1);
let (controlling, controlled) = setup_host_pair(config1, config2).await;
let ctrl_state = controlling.subscribe_state();
let ctrd_state = controlled.subscribe_state();
let mut ctrl_nom_rx = controlling.subscribe_nomination_complete();
let (ok1, ok2) = tokio::join!(
wait_ice_connected(ctrl_state, Duration::from_secs(15)),
wait_ice_connected(ctrd_state, Duration::from_secs(15)),
);
if !ok1 || !ok2 {
return Ok(());
}
let nom_result = timeout(Duration::from_secs(5), async {
if ctrl_nom_rx.borrow().is_some() {
return *ctrl_nom_rx.borrow();
}
ctrl_nom_rx.changed().await.ok()?;
*ctrl_nom_rx.borrow()
})
.await;
assert!(
nom_result.is_ok(),
"Under 80% packet loss, nomination_complete must still fire (Some(true) or Some(false)), \
but it timed out (hung indefinitely). This reproduces the log issue where the connection \
gets stuck waiting for nomination."
);
let nom = nom_result.unwrap();
assert!(
nom.is_some(),
"nomination_complete value must be Some(_), got None. \
This means the watch channel was closed unexpectedly."
);
println!(
"High packet loss nomination result: {:?} (either is acceptable, None is not)",
nom
);
Ok(())
}
#[test]
fn test_upnp_default_constants() {
assert!(MIN_LEASE_DURATION > 0);
assert!(MAX_LEASE_DURATION > MIN_LEASE_DURATION);
assert!(DEFAULT_LEASE_DURATION >= MIN_LEASE_DURATION);
assert!(DEFAULT_LEASE_DURATION <= MAX_LEASE_DURATION);
assert_eq!(DEFAULT_LEASE_DURATION, 3600); }
#[test]
fn test_port_mapping_expiry() {
let mapping = PortMapping {
external_port: 12345,
internal_addr: "192.168.1.100:5000".parse().unwrap(),
lease_duration: 70, description: "test".to_string(),
created_at: std::time::Instant::now(),
};
assert!(!mapping.is_expired_or_stale());
let remaining = mapping.remaining_lifetime();
assert!(remaining >= 69 && remaining <= 70);
}
#[test]
fn test_port_mapping_remaining_lifetime() {
let mapping = PortMapping {
external_port: 12345,
internal_addr: "192.168.1.100:5000".parse().unwrap(),
lease_duration: 60,
description: "test".to_string(),
created_at: std::time::Instant::now(),
};
let remaining = mapping.remaining_lifetime();
assert!(remaining > 55 && remaining <= 60);
std::thread::sleep(std::time::Duration::from_millis(100));
let new_remaining = mapping.remaining_lifetime();
assert!(
new_remaining <= remaining,
"new_remaining should not increase"
);
}
#[test]
fn test_upnp_mapper_creation() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
assert!(mapper.is_enabled());
assert!(!mapper.has_gateway());
}
#[test]
fn test_upnp_mapper_custom_lease() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::with_lease_duration(addr, 100);
assert_eq!(mapper.default_lease_duration, MIN_LEASE_DURATION);
let mapper = UpnpPortMapper::with_lease_duration(addr, 100000);
assert_eq!(mapper.default_lease_duration, MAX_LEASE_DURATION);
let mapper = UpnpPortMapper::with_lease_duration(addr, 1800);
assert_eq!(mapper.default_lease_duration, 1800);
}
#[test]
fn test_upnp_mapper_disable_enable() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
assert!(mapper.is_enabled());
assert!(!mapper.has_gateway());
mapper.disable();
assert!(!mapper.is_enabled());
mapper.enable();
assert!(mapper.is_enabled());
}
#[tokio::test]
async fn test_upnp_mapper_loopback_rejection() {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
let result = mapper.discover().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("loopback"));
}
#[tokio::test]
async fn test_upnp_mapper_disabled() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mut mapper = UpnpPortMapper::new(addr);
mapper.disable();
let result = mapper.discover().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("disabled"));
}
#[tokio::test]
async fn test_upnp_mapper_no_gateway() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
let result = mapper.add_mapping(12345).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No UPnP gateway"));
}
#[tokio::test]
async fn test_upnp_mapper_clone() {
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
let cloned = mapper.clone();
assert!(!cloned.has_gateway());
assert_eq!(cloned.local_addr, addr);
assert!(cloned.is_enabled());
}
#[test]
fn test_config_upnp_defaults() {
let config = RtcConfiguration::default();
assert!(!config.enable_upnp, "UPnP should be disabled by default");
assert_eq!(
config.upnp_lease_duration, 3600,
"Default lease should be 1 hour"
);
}
#[test]
fn test_config_builder_upnp_methods() {
let config = RtcConfigurationBuilder::new()
.enable_upnp(false)
.upnp_lease_duration(7200)
.build();
assert!(!config.enable_upnp);
assert_eq!(config.upnp_lease_duration, 7200);
}
#[tokio::test]
async fn test_upnp_disabled_when_relay_policy() {
let mut config = RtcConfiguration::default();
config.ice_transport_policy = IceTransportPolicy::Relay;
config.enable_upnp = true;
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await.unwrap();
let mappers = gatherer.upnp_mappers.lock();
assert!(
mappers.is_empty(),
"UPnP should not be used in Relay-only mode"
);
}
#[tokio::test]
async fn test_upnp_disabled_in_config() {
let mut config = RtcConfiguration::default();
config.enable_upnp = false;
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await.unwrap();
let candidates = gatherer.local_candidates();
let has_host = candidates.iter().any(|c| c.typ == IceCandidateType::Host);
assert!(has_host, "Should have host candidates");
let mappers = gatherer.upnp_mappers.lock();
assert!(
mappers.is_empty(),
"Should not have UPnP mappers when disabled"
);
}
#[tokio::test]
async fn test_upnp_cleanup_mappings() {
let config = RtcConfiguration::default();
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
assert_eq!(gatherer.upnp_mappers.lock().len(), 0);
{
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let mapper = UpnpPortMapper::new(addr);
gatherer.upnp_mappers.lock().push(mapper);
}
assert_eq!(gatherer.upnp_mappers.lock().len(), 1);
gatherer.cleanup_upnp_mappings().await;
assert_eq!(gatherer.upnp_mappers.lock().len(), 0);
}
#[tokio::test]
async fn test_upnp_ipv6_rejection() {
let addr: SocketAddr = "[::1]:5000".parse().unwrap();
assert!(addr.ip().is_loopback());
let config = RtcConfiguration::default();
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
gatherer.gather().await.unwrap();
let mappers = gatherer.upnp_mappers.lock();
for mapper in mappers.iter() {
assert!(
!mapper.local_addr.is_ipv6(),
"UPnP mappers should not have IPv6 addresses"
);
}
}
#[tokio::test]
async fn test_upnp_gathering_graceful_errors() {
let mut config = RtcConfiguration::default();
config.enable_upnp = true;
config.upnp_lease_duration = 1800;
let (tx, _) = broadcast::channel(100);
let (socket_tx, _) = tokio::sync::mpsc::unbounded_channel();
let gatherer = IceGatherer::new(config, tx, socket_tx);
let result = gatherer.gather().await;
assert!(result.is_ok(), "Gather should succeed even if UPnP fails");
let candidates = gatherer.local_candidates();
let has_host = candidates.iter().any(|c| c.typ == IceCandidateType::Host);
assert!(has_host, "Should have host candidates even if UPnP fails");
}
#[test]
fn test_upnp_library_exports() {
use crate::UpnpPortMapper;
use crate::{DEFAULT_LEASE_DURATION, MAX_LEASE_DURATION, MIN_LEASE_DURATION};
let _ = DEFAULT_LEASE_DURATION;
let _ = MIN_LEASE_DURATION;
let _ = MAX_LEASE_DURATION;
let addr: SocketAddr = "192.168.1.100:5000".parse().unwrap();
let _mapper = UpnpPortMapper::new(addr);
}
#[tokio::test]
async fn test_rtp_mode_upnp_disabled() {
use crate::TransportMode;
let mut config = RtcConfiguration::default();
config.transport_mode = TransportMode::Rtp;
config.enable_upnp = false;
let (transport, _runner) = IceTransport::new(config);
let local_addr = transport.setup_direct_rtp_offer().await.unwrap();
let candidates = transport.local_candidates();
assert!(!candidates.is_empty());
let host_candidate = candidates
.iter()
.find(|c| c.typ == IceCandidateType::Host)
.unwrap();
assert_eq!(host_candidate.address.port(), local_addr.port());
let mappers = transport.inner.gatherer.upnp_mappers.lock();
assert!(mappers.is_empty());
}
#[tokio::test]
async fn test_rtp_mode_upnp_enabled_graceful() {
use crate::TransportMode;
let mut config = RtcConfiguration::default();
config.transport_mode = TransportMode::Rtp;
config.enable_upnp = true;
let (transport, _runner) = IceTransport::new(config);
let result = transport.setup_direct_rtp_offer().await;
assert!(
result.is_ok(),
"setup_direct_rtp_offer should succeed even if UPnP fails"
);
let candidates = transport.local_candidates();
assert!(!candidates.is_empty());
}
#[tokio::test]
async fn use_candidate_nominates_first_pair() -> Result<()> {
let (t1, r1) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlling)
.build();
let (t2, r2) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlled)
.build();
tokio::spawn(r1);
tokio::spawn(r2);
for c in t1.local_candidates() {
t2.add_remote_candidate(c);
}
for c in t2.local_candidates() {
t1.add_remote_candidate(c);
}
let t1c = t1.clone();
let t2c = t2.clone();
let mut cand_rx1 = t1.subscribe_candidates();
let mut cand_rx2 = t2.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = cand_rx1.recv().await {
t2c.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = cand_rx2.recv().await {
t1c.add_remote_candidate(c);
}
});
t1.start(t2.local_parameters())?;
t2.start(t1.local_parameters())?;
let wait_connected = |mut rx: watch::Receiver<IceTransportState>| async move {
loop {
if *rx.borrow_and_update() == IceTransportState::Connected {
return Ok::<_, anyhow::Error>(());
}
if rx.changed().await.is_err() {
anyhow::bail!("state channel closed");
}
}
};
timeout(
Duration::from_secs(10),
futures::future::try_join(
wait_connected(t1.subscribe_state()),
wait_connected(t2.subscribe_state()),
),
)
.await
.context("timed out waiting for ICE connection")??;
assert!(
t2.get_selected_pair().await.is_some(),
"controlled side must have a selected pair after the first USE-CANDIDATE"
);
Ok(())
}
#[tokio::test]
async fn use_candidate_no_renomination_after_nomination() -> Result<()> {
let (t1, r1) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlling)
.build();
let (t2, r2) = IceTransportBuilder::new(RtcConfiguration::default())
.role(IceRole::Controlled)
.build();
tokio::spawn(r1);
tokio::spawn(r2);
for c in t1.local_candidates() {
t2.add_remote_candidate(c);
}
for c in t2.local_candidates() {
t1.add_remote_candidate(c);
}
let t1c = t1.clone();
let t2c = t2.clone();
let mut cand_rx1 = t1.subscribe_candidates();
let mut cand_rx2 = t2.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = cand_rx1.recv().await {
t2c.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = cand_rx2.recv().await {
t1c.add_remote_candidate(c);
}
});
t1.start(t2.local_parameters())?;
t2.start(t1.local_parameters())?;
let wait_connected = |mut rx: watch::Receiver<IceTransportState>| async move {
loop {
if *rx.borrow_and_update() == IceTransportState::Connected {
return Ok::<_, anyhow::Error>(());
}
if rx.changed().await.is_err() {
anyhow::bail!("state channel closed");
}
}
};
timeout(
Duration::from_secs(10),
futures::future::try_join(
wait_connected(t1.subscribe_state()),
wait_connected(t2.subscribe_state()),
),
)
.await
.context("timed out waiting for ICE connection")??;
let nominated_pair = t2
.get_selected_pair()
.await
.expect("controlled side must have a selected pair after nomination");
let mut pair_rx = t2.subscribe_selected_pair();
let _ = pair_rx.borrow_and_update();
let second_socket = UdpSocket::bind("127.0.0.1:0").await?;
let second_addr = second_socket.local_addr()?;
assert_ne!(
second_addr, nominated_pair.remote.address,
"second socket must be a distinct address"
);
t2.add_remote_candidate(IceCandidate::host(second_addr, 1));
let controlled_addr = nominated_pair.local.base_address();
let tx_id = random_bytes::<12>();
let mut msg = StunMessage::binding_request(tx_id, None);
msg.attributes.push(StunAttribute::UseCandidate);
let bytes = msg.encode(None, false)?;
second_socket.send_to(&bytes, controlled_addr).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
!pair_rx.has_changed().unwrap_or(true),
"selected_pair must NOT be updated by a USE-CANDIDATE after initial nomination \
(re-nomination guard is missing or broken)"
);
let final_pair = t2
.get_selected_pair()
.await
.expect("selected pair should still be present");
assert_eq!(
nominated_pair.remote.address, final_pair.remote.address,
"remote address of selected pair must not change after subsequent USE-CANDIDATE keepalives"
);
Ok(())
}
#[tokio::test]
async fn test_buffered_dtls_packets_delivered_when_dtls_receiver_registered_first() {
use super::conn::IceConn;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let config = RtcConfiguration::default();
let (ctrl, _ctrd) = setup_host_pair(config.clone(), config.clone()).await;
let ctrl_state = ctrl.subscribe_state();
assert!(
wait_ice_connected(ctrl_state, Duration::from_secs(10)).await,
"ICE failed to connect"
);
let dtls_packet = vec![
0x16, 0xfe, 0xfd, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, ];
let fake_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9999);
{
let mut buf = ctrl.inner.buffered_packets.lock();
buf.push_back((dtls_packet.clone(), fake_addr));
buf.push_back((dtls_packet.clone(), fake_addr));
}
assert_eq!(
ctrl.inner.buffered_packets.lock().len(),
2,
"Should have 2 buffered DTLS packets before set_data_receiver"
);
let selected_pair = ctrl
.get_selected_pair()
.await
.expect("Should have selected pair after ICE connected");
let socket_rx = ctrl.subscribe_selected_socket();
let ice_conn = IceConn::new(socket_rx, selected_pair.remote.address, None);
let (dtls_tx, mut dtls_rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
struct DtlsRecorder(tokio::sync::mpsc::UnboundedSender<Bytes>);
#[async_trait::async_trait]
impl PacketReceiver for DtlsRecorder {
async fn receive(&self, packet: Bytes, _addr: SocketAddr) {
let _ = self.0.send(packet);
}
}
ice_conn.set_dtls_receiver(Arc::new(DtlsRecorder(dtls_tx)));
ctrl.set_data_receiver(ice_conn.clone()).await;
for i in 0..2 {
let received = tokio::time::timeout(Duration::from_secs(1), dtls_rx.recv()).await;
assert!(
received.is_ok(),
"Buffered DTLS packet #{} was NOT delivered — set_data_receiver flushed before dtls_receiver was registered (the bug!)",
i + 1
);
if let Ok(Some(pkt)) = received {
assert_eq!(
pkt[0],
0x16,
"Packet #{} should be a DTLS handshake record",
i + 1
);
}
}
{
let mut buf = ctrl.inner.buffered_packets.lock();
buf.push_back((dtls_packet.clone(), fake_addr));
buf.push_back((dtls_packet.clone(), fake_addr));
}
let ice_conn2 = IceConn::new(
ctrl.subscribe_selected_socket(),
selected_pair.remote.address,
None,
);
let (lost_tx, mut lost_rx) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
ctrl.set_data_receiver(ice_conn2.clone()).await;
let lost = tokio::time::timeout(Duration::from_millis(200), lost_rx.recv()).await;
assert!(
lost.is_err() || lost.ok().flatten().is_none(),
"Without dtls_receiver registered, buffered DTLS packets MUST be dropped (demonstrates the bug)"
);
drop((lost_tx, lost_rx));
}
#[test]
fn test_ice_tcp_candidate_sdp_roundtrip() {
let addr: SocketAddr = "192.168.1.100:3478".parse().unwrap();
let cand = IceCandidate::host_tcp(addr, 1, TcpType::Passive);
let sdp = cand.to_sdp();
assert!(sdp.contains("tcptype passive"), "SDP should contain tcptype passive");
assert!(sdp.contains("tcp"), "SDP transport should be tcp");
assert!(sdp.contains("host"), "SDP type should be host");
let parsed = IceCandidate::from_sdp(&sdp).unwrap();
assert_eq!(parsed.transport, "tcp");
assert_eq!(parsed.tcp_type, Some(TcpType::Passive));
assert_eq!(parsed.address, addr);
assert_eq!(parsed.typ, IceCandidateType::Host);
let active = IceCandidate::host_tcp(addr, 1, TcpType::Active);
let sdp_active = active.to_sdp();
assert!(sdp_active.contains("tcptype active"));
let parsed_active = IceCandidate::from_sdp(&sdp_active).unwrap();
assert_eq!(parsed_active.tcp_type, Some(TcpType::Active));
let so = IceCandidate::host_tcp(addr, 1, TcpType::So);
let parsed_so = IceCandidate::from_sdp(&so.to_sdp()).unwrap();
assert_eq!(parsed_so.tcp_type, Some(TcpType::So));
}
#[test]
fn test_ice_tcp_priority_ordering() {
let addr: SocketAddr = "192.168.1.100:3478".parse().unwrap();
let passive = IceCandidate::host_tcp(addr, 1, TcpType::Passive);
let active = IceCandidate::host_tcp(addr, 1, TcpType::Active);
let so = IceCandidate::host_tcp(addr, 1, TcpType::So);
assert!(
passive.priority > active.priority,
"Passive TCP priority ({}) should be > Active ({})",
passive.priority,
active.priority
);
assert!(
active.priority > so.priority,
"Active TCP priority ({}) should be > SO ({})",
active.priority,
so.priority
);
let udp = IceCandidate::host(addr, 1);
assert_eq!(
udp.priority, passive.priority,
"UDP and TCP passive host candidates should have same priority"
);
}
#[tokio::test]
#[serial]
async fn test_ice_tcp_pair_formation() -> Result<()> {
let mut config1 = RtcConfiguration::default();
config1.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
let mut config2 = RtcConfiguration::default();
config2.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
let (t1, r1) = IceTransportBuilder::new(config1)
.role(IceRole::Controlling)
.build();
tokio::spawn(r1);
let (t2, r2) = IceTransportBuilder::new(config2)
.role(IceRole::Controlled)
.build();
tokio::spawn(r2);
let mut g1 = t1.subscribe_gathering_state();
let mut g2 = t2.subscribe_gathering_state();
wait_ice_connected_or_timeout(&mut g1, Duration::from_secs(2), IceGathererState::Complete).await;
wait_ice_connected_or_timeout(&mut g2, Duration::from_secs(2), IceGathererState::Complete).await;
let locals1 = t1.local_candidates();
let locals2 = t2.local_candidates();
assert!(locals1.iter().any(|c| c.transport == "tcp"), "t1 should have TCP candidates");
assert!(locals2.iter().any(|c| c.transport == "tcp"), "t2 should have TCP candidates");
for c in locals1.iter().filter(|c| c.transport == "tcp") {
t2.add_remote_candidate(c.clone());
}
for c in locals2.iter().filter(|c| c.transport == "tcp") {
t1.add_remote_candidate(c.clone());
}
let remote1 = t1.remote_candidates();
let remote2 = t2.remote_candidates();
assert!(!remote1.is_empty(), "t1 should have remote TCP candidates");
assert!(!remote2.is_empty(), "t2 should have remote TCP candidates");
assert!(remote1.iter().all(|c| c.transport == "tcp"), "t1 remotes should all be TCP");
assert!(remote2.iter().all(|c| c.transport == "tcp"), "t2 remotes should all be TCP");
for local in &locals1 {
for remote in &remote1 {
if local.transport == remote.transport {
let pair = IceCandidatePair::new(local.clone(), remote.clone());
assert!(
pair.priority(IceRole::Controlling) > 0,
"TCP pair priority should be > 0"
);
}
}
}
Ok(())
}
async fn wait_ice_connected_or_timeout(
rx: &mut watch::Receiver<IceGathererState>,
deadline: Duration,
target: IceGathererState,
) {
let _ = tokio::time::timeout(deadline, async {
loop {
if *rx.borrow_and_update() == target {
return;
}
if rx.changed().await.is_err() {
return;
}
}
})
.await;
}
#[tokio::test]
#[serial]
async fn test_ice_tcp_disabled_gathers_udp_only() -> Result<()> {
let config = RtcConfiguration::default();
assert_eq!(config.ice_tcp_policy, crate::config::IceTcpPolicy::Disabled);
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
tokio::time::sleep(Duration::from_millis(100)).await;
for candidate in transport.local_candidates() {
assert_eq!(
candidate.transport, "udp",
"With TCP disabled, all candidates should be UDP, got {:?} at {}",
candidate.transport, candidate.address
);
assert!(candidate.tcp_type.is_none(), "TCP candidates should not exist when TCP is disabled");
}
Ok(())
}
#[tokio::test]
#[serial]
async fn test_ice_tcp_gathers_tcp_candidates() -> Result<()> {
let mut config = RtcConfiguration::default();
config.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
let (transport, runner) = IceTransportBuilder::new(config).build();
tokio::spawn(runner);
tokio::time::sleep(Duration::from_millis(100)).await;
let candidates = transport.local_candidates();
let has_udp = candidates.iter().any(|c| c.transport == "udp");
let has_tcp = candidates.iter().any(|c| c.transport == "tcp");
assert!(has_udp, "Should have UDP candidates");
assert!(has_tcp, "Should have TCP candidates when TCP is enabled");
for candidate in &candidates {
if candidate.transport == "tcp" {
assert!(candidate.tcp_type.is_some(), "TCP candidate should have tcptype set");
assert_eq!(
candidate.tcp_type,
Some(TcpType::Passive),
"Default TCP candidate should be passive"
);
}
}
Ok(())
}
#[test]
fn test_frame_stun_for_tcp() {
let data = b"hello stun";
let framed = frame_stun_for_tcp(data);
assert_eq!(framed.len(), 2 + data.len());
let len = u16::from_be_bytes([framed[0], framed[1]]);
assert_eq!(len as usize, data.len());
assert_eq!(&framed[2..], data);
let empty = frame_stun_for_tcp(b"");
assert_eq!(empty.len(), 2);
assert_eq!(empty[0], 0);
assert_eq!(empty[1], 0);
}
#[tokio::test]
async fn test_tcp_write_all_loopback() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let server_addr = listener.local_addr().unwrap();
let client = tokio::net::TcpStream::connect(server_addr).await.unwrap();
let (client_read, client_write) = client.into_split();
let client_write = Arc::new(tokio::sync::Mutex::new(client_write));
let (mut server, _) = listener.accept().await.unwrap();
let payload = b"tcp_write_all test payload";
tcp_write_all(&client_write, payload).await.unwrap();
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; payload.len()];
let s = &mut server;
s.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, payload);
drop(client_read);
}
#[tokio::test]
async fn test_tcp_write_all_large_data() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let server_addr = listener.local_addr().unwrap();
let client = tokio::net::TcpStream::connect(server_addr).await.unwrap();
let (client_read, client_write) = client.into_split();
let client_write = Arc::new(tokio::sync::Mutex::new(client_write));
let (mut server, _) = listener.accept().await.unwrap();
let payload = vec![0xABu8; 1024 * 1024];
tcp_write_all(&client_write, &payload).await.unwrap();
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; payload.len()];
let s = &mut server;
s.read_exact(&mut buf).await.unwrap();
assert_eq!(buf.len(), payload.len());
assert_eq!(buf[0], 0xAB);
assert_eq!(buf[payload.len() - 1], 0xAB);
drop(client_read);
}
#[tokio::test]
#[serial]
async fn test_ice_tcp_end_to_end_connectivity() -> Result<()> {
let mut controlled_config = RtcConfiguration::default();
controlled_config.ice_gather_udp_hosts = false;
controlled_config.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
controlled_config.tcp_port_range_start = Some(20_000);
controlled_config.tcp_port_range_end = Some(20_010);
let mut controlling_config = RtcConfiguration::default();
controlling_config.ice_gather_udp_hosts = false;
controlling_config.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
let (controlling, runner_c) = IceTransportBuilder::new(controlling_config)
.role(IceRole::Controlling)
.build();
tokio::spawn(runner_c);
let (controlled, runner_d) = IceTransportBuilder::new(controlled_config)
.role(IceRole::Controlled)
.build();
tokio::spawn(runner_d);
tokio::time::sleep(Duration::from_millis(500)).await;
let ctrl_locals = controlling.local_candidates();
let ctrd_locals = controlled.local_candidates();
assert!(!ctrl_locals.is_empty(), "Controlling should have local candidates");
assert!(!ctrd_locals.is_empty(), "Controlled should have local candidates");
assert!(
ctrl_locals.iter().any(|c| c.transport == "tcp"),
"Controlling should have TCP candidates"
);
assert!(
ctrd_locals.iter().any(|c| c.transport == "tcp"),
"Controlled should have TCP candidates"
);
for c in ctrl_locals.iter() {
controlled.add_remote_candidate(c.clone());
}
for c in ctrd_locals.iter() {
controlling.add_remote_candidate(c.clone());
}
let ctrl_clone = controlling.clone();
let ctrd_clone = controlled.clone();
let mut rx_ctrl = controlling.subscribe_candidates();
let mut rx_ctrd = controlled.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = rx_ctrl.recv().await {
ctrd_clone.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = rx_ctrd.recv().await {
ctrl_clone.add_remote_candidate(c);
}
});
let controlled_params = controlled.local_parameters();
let controlling_params = controlling.local_parameters();
controlling
.start(controlled_params)
.expect("controlling.start");
controlled
.start(controlling_params)
.expect("controlled.start");
let ctrl_state_rx = controlling.subscribe_state();
let ctrd_state_rx = controlled.subscribe_state();
let ctrl_ok = wait_ice_connected(ctrl_state_rx, Duration::from_secs(15)).await;
let ctrd_ok = wait_ice_connected(ctrd_state_rx, Duration::from_secs(15)).await;
assert!(ctrl_ok, "Controlling side should connect over TCP");
assert!(ctrd_ok, "Controlled side should connect over TCP");
let selected_pair = controlling.get_selected_pair().await;
assert!(selected_pair.is_some(), "Should have a selected pair");
let pair = selected_pair.unwrap();
assert_eq!(
pair.local.transport, "tcp",
"Selected pair should use TCP transport, got {}",
pair.local.transport
);
let wrapper = controlling.get_selected_socket().await;
assert!(wrapper.is_some(), "Should have a selected socket");
let wrapper = wrapper.unwrap();
assert!(
matches!(wrapper, IceSocketWrapper::TcpStream(_, _, _)),
"Selected socket should be TcpStream"
);
let test_data = b"test-data-over-tcp";
wrapper.send_to(test_data, pair.remote.address).await.unwrap();
Ok(())
}
#[tokio::test]
#[serial]
async fn test_ice_tcp_data_flow_bidirectional() -> Result<()> {
let mut controlled_config = RtcConfiguration::default();
controlled_config.ice_gather_udp_hosts = false;
controlled_config.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
controlled_config.tcp_port_range_start = Some(20_011);
controlled_config.tcp_port_range_end = Some(20_020);
let mut controlling_config = RtcConfiguration::default();
controlling_config.ice_gather_udp_hosts = false;
controlling_config.ice_tcp_policy = crate::config::IceTcpPolicy::Enabled;
let (controlling, runner_c) = IceTransportBuilder::new(controlling_config)
.role(IceRole::Controlling)
.build();
tokio::spawn(runner_c);
let (controlled, runner_d) = IceTransportBuilder::new(controlled_config)
.role(IceRole::Controlled)
.build();
tokio::spawn(runner_d);
tokio::time::sleep(Duration::from_millis(500)).await;
for c in controlling.local_candidates() {
controlled.add_remote_candidate(c);
}
for c in controlled.local_candidates() {
controlling.add_remote_candidate(c);
}
let ctrl_clone = controlling.clone();
let ctrd_clone = controlled.clone();
let mut rx_ctrl = controlling.subscribe_candidates();
let mut rx_ctrd = controlled.subscribe_candidates();
tokio::spawn(async move {
while let Ok(c) = rx_ctrl.recv().await {
ctrd_clone.add_remote_candidate(c);
}
});
tokio::spawn(async move {
while let Ok(c) = rx_ctrd.recv().await {
ctrl_clone.add_remote_candidate(c);
}
});
controlling
.start(controlled.local_parameters())
.expect("controlling.start");
controlled
.start(controlling.local_parameters())
.expect("controlled.start");
let ctrl_ok = wait_ice_connected(
controlling.subscribe_state(),
Duration::from_secs(15),
)
.await;
let ctrd_ok = wait_ice_connected(
controlled.subscribe_state(),
Duration::from_secs(15),
)
.await;
assert!(ctrl_ok, "Controlling should connect over TCP");
assert!(ctrd_ok, "Controlled should connect over TCP");
let ctrl_pair = controlling.get_selected_pair().await.unwrap();
let ctrd_pair = controlled.get_selected_pair().await.unwrap();
assert_eq!(ctrl_pair.local.transport, "tcp");
assert_eq!(ctrd_pair.local.transport, "tcp");
let ctrl_socket = controlling.get_selected_socket().await.unwrap();
let ctrd_socket = controlled.get_selected_socket().await.unwrap();
assert!(matches!(ctrl_socket, IceSocketWrapper::TcpStream(_, _, _)));
assert!(matches!(ctrd_socket, IceSocketWrapper::TcpStream(_, _, _)));
let msg_a = b"msg-from-controlling";
ctrl_socket.send_to(msg_a, ctrl_pair.remote.address).await.unwrap();
let msg_b = b"msg-from-controlled";
ctrd_socket.send_to(msg_b, ctrd_pair.remote.address).await.unwrap();
Ok(())
}