#![allow(unused_assignments)]
#![allow(unused_variables)]
use anyhow::Result;
use bytes::BytesMut;
use rtc::data_channel::RTCDataChannelState;
use rtc::peer_connection::configuration::RTCConfigurationBuilder;
use rtc::peer_connection::configuration::setting_engine::SettingEngine;
use rtc::peer_connection::event::{RTCDataChannelEvent, RTCPeerConnectionEvent};
use rtc::peer_connection::message::RTCMessage;
use rtc::peer_connection::state::RTCPeerConnectionState;
use rtc::peer_connection::transport::{
CandidateConfig, CandidateHostConfig, RTCDtlsRole, RTCDtlsTransportState, RTCIceCandidate,
RTCIceServer, RTCIceTransportState,
};
use rtc::peer_connection::{RTCPeerConnection, RTCPeerConnectionBuilder};
use rtc::sansio::Protocol;
use rtc::shared::{TaggedBytesMut, TransportContext, TransportProtocol};
use rtc::statistics::StatsSelector;
use rtc::statistics::report::RTCStatsReportEntry;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(30);
struct PeerRunner {
offer_pc: RTCPeerConnection,
answer_pc: RTCPeerConnection,
offer_socket: Arc<UdpSocket>,
answer_socket: Arc<UdpSocket>,
offer_local_addr: std::net::SocketAddr,
answer_local_addr: std::net::SocketAddr,
}
impl PeerRunner {
async fn new() -> Result<Self> {
let offer_socket = UdpSocket::bind("127.0.0.1:0").await?;
let offer_local_addr = offer_socket.local_addr()?;
let mut offer_setting_engine = SettingEngine::default();
offer_setting_engine.set_answering_dtls_role(RTCDtlsRole::Server)?;
let offer_config = RTCConfigurationBuilder::new()
.with_ice_servers(vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}])
.build();
let mut offer_pc = RTCPeerConnectionBuilder::new()
.with_configuration(offer_config)
.with_setting_engine(offer_setting_engine)
.build()?;
let offer_candidate = CandidateHostConfig {
base_config: CandidateConfig {
network: "udp".to_owned(),
address: offer_local_addr.ip().to_string(),
port: offer_local_addr.port(),
component: 1,
..Default::default()
},
..Default::default()
}
.new_candidate_host()?;
offer_pc.add_local_candidate(RTCIceCandidate::from(&offer_candidate).to_json()?)?;
let answer_socket = UdpSocket::bind("127.0.0.1:0").await?;
let answer_local_addr = answer_socket.local_addr()?;
let mut answer_setting_engine = SettingEngine::default();
answer_setting_engine.set_answering_dtls_role(RTCDtlsRole::Client)?;
let answer_config = RTCConfigurationBuilder::new()
.with_ice_servers(vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}])
.build();
let mut answer_pc = RTCPeerConnectionBuilder::new()
.with_configuration(answer_config)
.with_setting_engine(answer_setting_engine)
.build()?;
let answer_candidate = CandidateHostConfig {
base_config: CandidateConfig {
network: "udp".to_owned(),
address: answer_local_addr.ip().to_string(),
port: answer_local_addr.port(),
component: 1,
..Default::default()
},
..Default::default()
}
.new_candidate_host()?;
answer_pc.add_local_candidate(RTCIceCandidate::from(&answer_candidate).to_json()?)?;
Ok(Self {
offer_pc,
answer_pc,
offer_socket: Arc::new(offer_socket),
answer_socket: Arc::new(answer_socket),
offer_local_addr,
answer_local_addr,
})
}
}
#[tokio::test]
async fn test_data_channel_statistics_collection() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting data channel statistics test");
let mut runner = PeerRunner::new().await?;
let dc_label = "stats-test-channel";
runner.offer_pc.create_data_channel(dc_label, None)?;
log::info!("Created data channel: {}", dc_label);
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut offer_connected = false;
let mut answer_connected = false;
let mut offer_dc_id = None;
let mut _answer_dc_id = None;
let messages_to_send = 5;
let mut offer_messages_sent = 0;
let mut answer_messages_received = 0;
let message_size = 100;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
match event {
RTCPeerConnectionEvent::OnConnectionStateChangeEvent(state) => {
if state == RTCPeerConnectionState::Connected {
offer_connected = true;
}
}
RTCPeerConnectionEvent::OnDataChannel(RTCDataChannelEvent::OnOpen(channel_id)) => {
offer_dc_id = Some(channel_id);
}
_ => {}
}
}
while let Some(message) = runner.offer_pc.poll_read() {
if let RTCMessage::DataChannelMessage(_, _) = message {
}
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.answer_pc.poll_event() {
match event {
RTCPeerConnectionEvent::OnConnectionStateChangeEvent(state) => {
if state == RTCPeerConnectionState::Connected {
answer_connected = true;
}
}
RTCPeerConnectionEvent::OnDataChannel(RTCDataChannelEvent::OnOpen(channel_id)) => {
_answer_dc_id = Some(channel_id);
}
_ => {}
}
}
while let Some(message) = runner.answer_pc.poll_read() {
if let RTCMessage::DataChannelMessage(_, _) = message {
answer_messages_received += 1;
}
}
if offer_connected && offer_dc_id.is_some() && offer_messages_sent < messages_to_send {
if let Some(mut dc) = runner.offer_pc.data_channel(offer_dc_id.unwrap()) {
let msg = "x".repeat(message_size);
dc.send_text(msg)?;
offer_messages_sent += 1;
}
}
if answer_messages_received >= messages_to_send {
log::info!("All messages received, checking stats");
break;
}
let offer_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let answer_timeout = runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let next_timeout = offer_timeout.min(answer_timeout);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
let now = Instant::now();
let offer_stats = runner.offer_pc.get_stats(now, StatsSelector::None);
let answer_stats = runner.answer_pc.get_stats(now, StatsSelector::None);
log::info!("Offer peer stats report has {} entries", offer_stats.len());
for entry in offer_stats.iter() {
log::info!(" - {:?}: {}", entry.stats_type(), entry.id());
}
assert!(
offer_stats.peer_connection().is_some(),
"Offer should have peer connection stats"
);
let pc_stats = offer_stats.peer_connection().unwrap();
assert!(
pc_stats.data_channels_opened >= 1,
"Should have at least 1 data channel opened"
);
let dc_stats: Vec<_> = offer_stats.data_channels().collect();
assert!(!dc_stats.is_empty(), "Offer should have data channel stats");
let dc = dc_stats[0];
assert_eq!(dc.label, dc_label, "Data channel label should match");
assert_eq!(
dc.state,
RTCDataChannelState::Open,
"Data channel should be open"
);
assert_eq!(
dc.messages_sent, messages_to_send as u32,
"Messages sent count should match"
);
assert!(
dc.bytes_sent >= (messages_to_send * message_size) as u64,
"Bytes sent should be at least {} but was {}",
messages_to_send * message_size,
dc.bytes_sent
);
let answer_dc_stats: Vec<_> = answer_stats.data_channels().collect();
assert!(
!answer_dc_stats.is_empty(),
"Answer should have data channel stats"
);
let answer_dc = answer_dc_stats[0];
assert_eq!(
answer_dc.messages_received, messages_to_send as u32,
"Messages received count should match"
);
assert!(
answer_dc.bytes_received >= (messages_to_send * message_size) as u64,
"Bytes received should be at least {} but was {}",
messages_to_send * message_size,
answer_dc.bytes_received
);
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("Data channel statistics test passed!");
Ok(())
}
#[tokio::test]
async fn test_transport_statistics_collection() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting transport statistics test");
let mut runner = PeerRunner::new().await?;
runner
.offer_pc
.create_data_channel("transport-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut offer_connected = false;
let mut answer_connected = false;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout && (!offer_connected || !answer_connected) {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
if let RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) = event
{
offer_connected = true;
}
}
while let Some(event) = runner.answer_pc.poll_event() {
if let RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) = event
{
answer_connected = true;
}
}
while runner.offer_pc.poll_read().is_some() {}
while runner.answer_pc.poll_read().is_some() {}
let offer_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let answer_timeout = runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let next_timeout = offer_timeout.min(answer_timeout);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
assert!(offer_connected, "Offer peer should be connected");
assert!(answer_connected, "Answer peer should be connected");
let now = Instant::now();
let offer_stats = runner.offer_pc.get_stats(now, StatsSelector::None);
log::info!("Transport stats report has {} entries", offer_stats.len());
let transport = offer_stats.transport();
assert!(transport.is_some(), "Should have transport stats");
let transport = transport.unwrap();
log::info!("Transport stats:");
log::info!(" - ICE state: {:?}", transport.ice_state);
log::info!(" - DTLS state: {:?}", transport.dtls_state);
log::info!(" - Packets sent: {}", transport.packets_sent);
log::info!(" - Packets received: {}", transport.packets_received);
log::info!(" - Bytes sent: {}", transport.bytes_sent);
log::info!(" - Bytes received: {}", transport.bytes_received);
assert_eq!(
transport.ice_state,
RTCIceTransportState::Connected,
"ICE should be connected"
);
assert_eq!(
transport.dtls_state,
RTCDtlsTransportState::Connected,
"DTLS should be connected"
);
assert!(
transport.packets_sent > 0,
"Should have sent packets during connection"
);
assert!(
transport.packets_received > 0,
"Should have received packets during connection"
);
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("Transport statistics test passed!");
Ok(())
}
#[tokio::test]
async fn test_stats_report_completeness() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting stats report completeness test");
let mut runner = PeerRunner::new().await?;
runner
.offer_pc
.create_data_channel("completeness-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut connected = false;
let mut dc_open = false;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout && !(connected && dc_open) {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
match event {
RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) => {
connected = true;
}
RTCPeerConnectionEvent::OnDataChannel(RTCDataChannelEvent::OnOpen(_)) => {
dc_open = true;
}
_ => {}
}
}
while runner.answer_pc.poll_event().is_some() {}
while runner.offer_pc.poll_read().is_some() {}
while runner.answer_pc.poll_read().is_some() {}
let offer_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let answer_timeout = runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let next_timeout = offer_timeout.min(answer_timeout);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
assert!(connected, "Should be connected");
assert!(dc_open, "Data channel should be open");
let now = Instant::now();
let stats = runner.offer_pc.get_stats(now, StatsSelector::None);
log::info!("Stats report completeness check:");
log::info!(" Total entries: {}", stats.len());
let mut has_peer_connection = false;
let mut has_transport = false;
let mut has_candidate_pair = false;
let mut has_data_channel = false;
for entry in stats.iter() {
log::info!(" - {:?}: {}", entry.stats_type(), entry.id());
assert!(!entry.id().is_empty(), "Stats entry should have an ID");
match entry {
RTCStatsReportEntry::PeerConnection(pc) => {
has_peer_connection = true;
assert_eq!(pc.stats.id, "RTCPeerConnection");
}
RTCStatsReportEntry::Transport(_) => {
has_transport = true;
}
RTCStatsReportEntry::IceCandidatePair(_) => {
has_candidate_pair = true;
}
RTCStatsReportEntry::DataChannel(dc) => {
has_data_channel = true;
assert!(!dc.label.is_empty(), "Data channel should have a label");
}
_ => {}
}
}
assert!(has_peer_connection, "Should have peer connection stats");
assert!(has_transport, "Should have transport stats");
assert!(
has_candidate_pair,
"Should have ICE candidate pair stats after connection"
);
assert!(has_data_channel, "Should have data channel stats");
assert!(
stats.len() >= 4,
"Should have at least 4 stats entries, got {}",
stats.len()
);
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("Stats report completeness test passed!");
Ok(())
}
#[tokio::test]
async fn test_stats_json_serialization() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting stats JSON serialization test");
let mut runner = PeerRunner::new().await?;
runner.offer_pc.create_data_channel("json-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let mut connected = false;
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout && !connected {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
if let RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) = event
{
connected = true;
}
}
while runner.answer_pc.poll_event().is_some() {}
while runner.offer_pc.poll_read().is_some() {}
while runner.answer_pc.poll_read().is_some() {}
let next_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION)
.min(
runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION),
);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
let now = Instant::now();
let stats = runner.offer_pc.get_stats(now, StatsSelector::None);
for entry in stats.iter() {
let json_result = match entry {
RTCStatsReportEntry::PeerConnection(s) => serde_json::to_string(s),
RTCStatsReportEntry::Transport(s) => serde_json::to_string(s),
RTCStatsReportEntry::IceCandidatePair(s) => serde_json::to_string(s),
RTCStatsReportEntry::LocalCandidate(s) => serde_json::to_string(s),
RTCStatsReportEntry::RemoteCandidate(s) => serde_json::to_string(s),
RTCStatsReportEntry::Certificate(s) => serde_json::to_string(s),
RTCStatsReportEntry::Codec(s) => serde_json::to_string(s),
RTCStatsReportEntry::DataChannel(s) => serde_json::to_string(s),
RTCStatsReportEntry::InboundRtp(s) => serde_json::to_string(s),
RTCStatsReportEntry::OutboundRtp(s) => serde_json::to_string(s),
RTCStatsReportEntry::RemoteInboundRtp(s) => serde_json::to_string(s),
RTCStatsReportEntry::RemoteOutboundRtp(s) => serde_json::to_string(s),
RTCStatsReportEntry::AudioSource(s) => serde_json::to_string(s),
RTCStatsReportEntry::VideoSource(s) => serde_json::to_string(s),
RTCStatsReportEntry::AudioPlayout(s) => serde_json::to_string(s),
};
assert!(
json_result.is_ok(),
"Failed to serialize {:?}: {:?}",
entry.stats_type(),
json_result.err()
);
let json = json_result.unwrap();
log::info!("{}: {}", entry.id(), json);
assert!(
json.contains("\"type\""),
"JSON should contain 'type' field: {}",
json
);
assert!(
json.contains("\"timestamp\""),
"JSON should contain 'timestamp' field: {}",
json
);
assert!(
json.contains("\"id\""),
"JSON should contain 'id' field: {}",
json
);
}
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("Stats JSON serialization test passed!");
Ok(())
}
#[tokio::test]
async fn test_get_stats_selector_none_complete() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting get_stats selector None test");
let mut runner = PeerRunner::new().await?;
runner.offer_pc.create_data_channel("selector-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut connected = false;
let mut dc_open = false;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout && !(connected && dc_open) {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
match event {
RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) => {
connected = true;
}
RTCPeerConnectionEvent::OnDataChannel(RTCDataChannelEvent::OnOpen(_)) => {
dc_open = true;
}
_ => {}
}
}
while runner.answer_pc.poll_event().is_some() {}
while runner.offer_pc.poll_read().is_some() {}
while runner.answer_pc.poll_read().is_some() {}
let offer_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let answer_timeout = runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let next_timeout = offer_timeout.min(answer_timeout);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
assert!(connected, "Should be connected");
let now = Instant::now();
let report = runner.offer_pc.get_stats(now, StatsSelector::None);
assert!(
report.peer_connection().is_some(),
"Should have peer connection stats"
);
assert!(report.transport().is_some(), "Should have transport stats");
log::info!("Stats with None selector:");
log::info!(" Total entries: {}", report.len());
log::info!(" Data channels: {}", report.data_channels().count());
log::info!(" Candidate pairs: {}", report.candidate_pairs().count());
assert!(
report.data_channels().count() > 0,
"Should have data channel stats with None selector"
);
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector None test passed!");
Ok(())
}
#[tokio::test]
async fn test_get_stats_selector_sender_filtering() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting get_stats selector Sender test");
let mut runner = PeerRunner::new().await?;
runner
.offer_pc
.create_data_channel("sender-filter-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut connected = false;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout && !connected {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
if let RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) = event
{
connected = true;
}
}
while runner.answer_pc.poll_event().is_some() {}
while runner.offer_pc.poll_read().is_some() {}
while runner.answer_pc.poll_read().is_some() {}
let offer_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let answer_timeout = runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let next_timeout = offer_timeout.min(answer_timeout);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
assert!(connected, "Should be connected");
let now = Instant::now();
let sender_id = runner.offer_pc.get_senders().next();
let report = if let Some(id) = sender_id {
runner.offer_pc.get_stats(now, StatsSelector::Sender(id))
} else {
log::info!("No senders available, testing empty sender filter behavior");
runner.offer_pc.get_stats(now, StatsSelector::None)
};
if sender_id.is_none() {
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector Sender test passed (no senders to test)!");
return Ok(());
}
let report = runner
.offer_pc
.get_stats(now, StatsSelector::Sender(sender_id.unwrap()));
assert!(
report.peer_connection().is_none(),
"Sender selector should not include peer connection stats"
);
assert_eq!(
report.data_channels().count(),
0,
"Sender selector should not include data channel stats"
);
assert_eq!(
report.inbound_rtp_streams().count(),
0,
"Sender selector should not include inbound RTP streams"
);
log::info!("Stats with Sender(0) selector:");
log::info!(" Total entries: {}", report.len());
log::info!(
" Outbound RTP streams: {}",
report.outbound_rtp_streams().count()
);
if report.outbound_rtp_streams().count() > 0 {
assert!(
report.transport().is_some(),
"Sender with streams should include transport stats"
);
}
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector Sender test passed!");
Ok(())
}
#[tokio::test]
async fn test_get_stats_selector_receiver_filtering() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting get_stats selector Receiver test");
let mut runner = PeerRunner::new().await?;
runner
.offer_pc
.create_data_channel("receiver-filter-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let mut connected = false;
let mut offer_buf = vec![0u8; 2000];
let mut answer_buf = vec![0u8; 2000];
let start_time = Instant::now();
let test_timeout = Duration::from_secs(30);
while start_time.elapsed() < test_timeout && !connected {
while let Some(msg) = runner.offer_pc.poll_write() {
runner
.offer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(msg) = runner.answer_pc.poll_write() {
runner
.answer_socket
.send_to(&msg.message, msg.transport.peer_addr)
.await?;
}
while let Some(event) = runner.offer_pc.poll_event() {
if let RTCPeerConnectionEvent::OnConnectionStateChangeEvent(
RTCPeerConnectionState::Connected,
) = event
{
connected = true;
}
}
while runner.answer_pc.poll_event().is_some() {}
while runner.offer_pc.poll_read().is_some() {}
while runner.answer_pc.poll_read().is_some() {}
let offer_timeout = runner
.offer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let answer_timeout = runner
.answer_pc
.poll_timeout()
.unwrap_or(Instant::now() + DEFAULT_TIMEOUT_DURATION);
let next_timeout = offer_timeout.min(answer_timeout);
let delay = next_timeout
.saturating_duration_since(Instant::now())
.min(Duration::from_millis(10));
if delay.is_zero() {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
continue;
}
let sleep = tokio::time::sleep(delay);
tokio::pin!(sleep);
tokio::select! {
_ = sleep => {
runner.offer_pc.handle_timeout(Instant::now()).ok();
runner.answer_pc.handle_timeout(Instant::now()).ok();
}
Ok((n, peer_addr)) = runner.offer_socket.recv_from(&mut offer_buf) => {
runner.offer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.offer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&offer_buf[..n]),
}).ok();
}
Ok((n, peer_addr)) = runner.answer_socket.recv_from(&mut answer_buf) => {
runner.answer_pc.handle_read(TaggedBytesMut {
now: Instant::now(),
transport: TransportContext {
local_addr: runner.answer_local_addr,
peer_addr,
ecn: None,
transport_protocol: TransportProtocol::UDP,
},
message: BytesMut::from(&answer_buf[..n]),
}).ok();
}
}
}
assert!(connected, "Should be connected");
let now = Instant::now();
let receiver_id = runner.offer_pc.get_receivers().next();
if receiver_id.is_none() {
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector Receiver test passed (no receivers to test)!");
return Ok(());
}
let report = runner
.offer_pc
.get_stats(now, StatsSelector::Receiver(receiver_id.unwrap()));
assert!(
report.peer_connection().is_none(),
"Receiver selector should not include peer connection stats"
);
assert_eq!(
report.data_channels().count(),
0,
"Receiver selector should not include data channel stats"
);
assert_eq!(
report.outbound_rtp_streams().count(),
0,
"Receiver selector should not include outbound RTP streams"
);
log::info!("Stats with Receiver(0) selector:");
log::info!(" Total entries: {}", report.len());
log::info!(
" Inbound RTP streams: {}",
report.inbound_rtp_streams().count()
);
if report.inbound_rtp_streams().count() > 0 {
assert!(
report.transport().is_some(),
"Receiver with streams should include transport stats"
);
}
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector Receiver test passed!");
Ok(())
}
#[tokio::test]
async fn test_get_stats_selector_filtering_behavior() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting get_stats selector filtering behavior test");
let mut runner = PeerRunner::new().await?;
runner
.offer_pc
.create_data_channel("filter-behavior-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let now = Instant::now();
let complete_report = runner.offer_pc.get_stats(now, StatsSelector::None);
assert!(
complete_report.peer_connection().is_some(),
"None selector should include peer connection stats"
);
let senders: Vec<_> = runner.offer_pc.get_senders().collect();
log::info!("Found {} senders", senders.len());
for sender_id in &senders {
let sender_report = runner
.offer_pc
.get_stats(now, StatsSelector::Sender(*sender_id));
assert_eq!(
sender_report.data_channels().count(),
0,
"Sender filter should not include data channel stats"
);
assert!(
sender_report.peer_connection().is_none(),
"Sender filter should not include peer connection stats"
);
}
let receivers: Vec<_> = runner.offer_pc.get_receivers().collect();
log::info!("Found {} receivers", receivers.len());
for receiver_id in &receivers {
let receiver_report = runner
.offer_pc
.get_stats(now, StatsSelector::Receiver(*receiver_id));
assert_eq!(
receiver_report.data_channels().count(),
0,
"Receiver filter should not include data channel stats"
);
assert!(
receiver_report.peer_connection().is_none(),
"Receiver filter should not include peer connection stats"
);
}
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector filtering behavior test passed!");
Ok(())
}
#[tokio::test]
async fn test_get_stats_selector_subset_property() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
.try_init()
.ok();
log::info!("Starting get_stats selector subset test");
let mut runner = PeerRunner::new().await?;
runner.offer_pc.create_data_channel("subset-test", None)?;
let offer = runner.offer_pc.create_offer(None)?;
runner.offer_pc.set_local_description(offer.clone())?;
runner.answer_pc.set_remote_description(offer)?;
let answer = runner.answer_pc.create_answer(None)?;
runner.answer_pc.set_local_description(answer.clone())?;
runner.offer_pc.set_remote_description(answer)?;
let now = Instant::now();
let complete_report = runner.offer_pc.get_stats(now, StatsSelector::None);
let complete_count = complete_report.len();
let sender_ids: Vec<_> = runner.offer_pc.get_senders().collect();
let mut filtered_outbound_count = 0;
for sender_id in sender_ids {
let sender_report = runner
.offer_pc
.get_stats(now, StatsSelector::Sender(sender_id));
filtered_outbound_count += sender_report.outbound_rtp_streams().count();
}
let receiver_ids: Vec<_> = runner.offer_pc.get_receivers().collect();
let mut filtered_inbound_count = 0;
for receiver_id in receiver_ids {
let receiver_report = runner
.offer_pc
.get_stats(now, StatsSelector::Receiver(receiver_id));
filtered_inbound_count += receiver_report.inbound_rtp_streams().count();
}
let complete_outbound = complete_report.outbound_rtp_streams().count();
let complete_inbound = complete_report.inbound_rtp_streams().count();
assert_eq!(
filtered_outbound_count, complete_outbound,
"Sum of filtered outbound streams should match complete report"
);
assert_eq!(
filtered_inbound_count, complete_inbound,
"Sum of filtered inbound streams should match complete report"
);
log::info!(
"Complete: {} entries ({} outbound, {} inbound)",
complete_count,
complete_outbound,
complete_inbound
);
log::info!(
"Filtered: {} outbound, {} inbound",
filtered_outbound_count,
filtered_inbound_count
);
runner.offer_pc.close()?;
runner.answer_pc.close()?;
log::info!("get_stats selector subset test passed!");
Ok(())
}