use super::e2e_test_server::E2eTestServer;
use super::rtp_utils::{RtpPacket, RtpReceiver, RtpSender, RtpStats, extract_media_endpoint};
use super::test_ua::{TestUa, TestUaEvent};
use crate::config::MediaProxyMode;
use anyhow::{Result, anyhow};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
pub struct RtpFlowTestConfig {
pub packet_count: usize,
pub payload_size: usize,
pub payload_type: u8,
pub ssrc: u32,
pub interval_ms: u64,
pub expected_loss_rate: f64,
}
impl Default for RtpFlowTestConfig {
fn default() -> Self {
Self {
packet_count: 100,
payload_size: 160, payload_type: 0, ssrc: 0x12345678,
interval_ms: 20, expected_loss_rate: 0.05, }
}
}
#[derive(Debug, Clone)]
pub struct RtpFlowTestResult {
pub packets_received: u64,
pub packet_loss_rate: f64,
pub seq_num_gaps: Vec<(u16, u16)>,
pub is_valid: bool,
pub errors: Vec<String>,
}
impl RtpFlowTestResult {
pub fn validate(&mut self, config: &RtpFlowTestConfig) {
if self.packet_loss_rate > config.expected_loss_rate {
self.errors.push(format!(
"Packet loss too high: {:.2}% > {:.2}%",
self.packet_loss_rate * 100.0,
config.expected_loss_rate * 100.0
));
self.is_valid = false;
}
if !self.seq_num_gaps.is_empty() {
warn!("Sequence gaps detected: {:?}", self.seq_num_gaps);
}
}
}
pub struct RtpE2eTest {
pub server: Arc<E2eTestServer>,
pub caller: Option<TestUa>,
pub callee: Option<TestUa>,
pub caller_rtp_sender: Option<RtpSender>,
pub caller_rtp_receiver: Option<RtpReceiver>,
pub callee_rtp_sender: Option<RtpSender>,
pub callee_rtp_receiver: Option<RtpReceiver>,
}
impl RtpE2eTest {
pub async fn new_with_mode(mode: MediaProxyMode) -> Result<Self> {
let server = Arc::new(E2eTestServer::start_with_mode(mode).await?);
Ok(Self {
server,
caller: None,
callee: None,
caller_rtp_sender: None,
caller_rtp_receiver: None,
callee_rtp_sender: None,
callee_rtp_receiver: None,
})
}
pub async fn setup_caller(&mut self, username: &str) -> Result<()> {
let ua = self.server.create_ua(username).await?;
let sender = RtpSender::bind().await?;
let receiver = RtpReceiver::bind(0).await?;
self.caller = Some(ua);
self.caller_rtp_sender = Some(sender);
self.caller_rtp_receiver = Some(receiver);
Ok(())
}
pub async fn setup_callee(&mut self, username: &str) -> Result<()> {
let ua = self.server.create_ua(username).await?;
let sender = RtpSender::bind().await?;
let receiver = RtpReceiver::bind(0).await?;
self.callee = Some(ua);
self.callee_rtp_sender = Some(sender);
self.callee_rtp_receiver = Some(receiver);
Ok(())
}
pub fn get_caller_rtp_port(&self) -> Option<u16> {
self.caller_rtp_receiver
.as_ref()
.and_then(|r| r.port().ok())
}
pub fn get_callee_rtp_port(&self) -> Option<u16> {
self.callee_rtp_receiver
.as_ref()
.and_then(|r| r.port().ok())
}
pub fn generate_sdp(ip: &str, port: u16, payload_type: u8, codec_name: &str) -> String {
let clock_rate = if codec_name == "opus" { 48000 } else { 8000 };
format!(
"v=0\r\n\
o=- {} {} IN IP4 {}\r\n\
s=-\r\n\
c=IN IP4 {}\r\n\
t=0 0\r\n\
m=audio {} RTP/AVP {} 101\r\n\
a=rtpmap:{} {}/{}\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n",
chrono::Utc::now().timestamp(),
chrono::Utc::now().timestamp() + 1,
ip,
ip,
port,
payload_type,
payload_type,
codec_name,
clock_rate
)
}
pub async fn execute_bidirectional_rtp_test(
&mut self,
config: RtpFlowTestConfig,
) -> Result<(RtpFlowTestResult, RtpFlowTestResult)> {
if let Some(ref receiver) = self.callee_rtp_receiver {
receiver.start_receiving();
}
if let Some(ref receiver) = self.caller_rtp_receiver {
receiver.start_receiving();
}
let callee_rtp_port = self
.get_callee_rtp_port()
.ok_or_else(|| anyhow!("Callee RTP port not available"))?;
let caller_rtp_port = self
.get_caller_rtp_port()
.ok_or_else(|| anyhow!("Caller RTP port not available"))?;
let caller_to_callee_packets = RtpPacket::create_sequence(
config.packet_count,
1000,
50000,
config.ssrc,
config.payload_type,
config.payload_size,
(config.interval_ms as u32) * 8, );
let callee_to_caller_packets = RtpPacket::create_sequence(
config.packet_count,
2000,
60000,
config.ssrc + 1,
config.payload_type,
config.payload_size,
(config.interval_ms as u32) * 8,
);
let callee_addr: SocketAddr = format!("127.0.0.1:{}", callee_rtp_port).parse()?;
let caller_addr: SocketAddr = format!("127.0.0.1:{}", caller_rtp_port).parse()?;
info!(
"Starting RTP flow test: caller:{} <-> callee:{}",
caller_rtp_port, callee_rtp_port
);
if let Some(ref sender) = self.caller_rtp_sender {
sender.start_sending(callee_addr, caller_to_callee_packets, config.interval_ms);
}
if let Some(ref sender) = self.callee_rtp_sender {
sender.start_sending(caller_addr, callee_to_caller_packets, config.interval_ms);
}
let test_duration =
Duration::from_millis(config.packet_count as u64 * config.interval_ms + 500);
sleep(test_duration).await;
if let Some(ref sender) = self.caller_rtp_sender {
sender.stop();
}
if let Some(ref sender) = self.callee_rtp_sender {
sender.stop();
}
sleep(Duration::from_millis(200)).await;
let caller_stats = if let Some(ref receiver) = self.caller_rtp_receiver {
receiver.get_stats().await
} else {
RtpStats::default()
};
let callee_stats = if let Some(ref receiver) = self.callee_rtp_receiver {
receiver.get_stats().await
} else {
RtpStats::default()
};
let mut caller_result = RtpFlowTestResult {
packets_received: caller_stats.packets_received,
packet_loss_rate: caller_stats.packet_loss_rate(),
seq_num_gaps: caller_stats.seq_num_gaps.clone(),
is_valid: true,
errors: Vec::new(),
};
caller_result.validate(&config);
let mut callee_result = RtpFlowTestResult {
packets_received: callee_stats.packets_received,
packet_loss_rate: callee_stats.packet_loss_rate(),
seq_num_gaps: callee_stats.seq_num_gaps.clone(),
is_valid: true,
errors: Vec::new(),
};
callee_result.validate(&config);
info!(
caller_received = caller_stats.packets_received,
callee_received = callee_stats.packets_received,
"RTP flow test completed"
);
Ok((caller_result, callee_result))
}
}
#[tokio::test]
async fn test_rtp_direct_flow_no_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut test = RtpE2eTest::new_with_mode(MediaProxyMode::None).await?;
test.setup_caller("alice").await?;
test.setup_callee("bob").await?;
sleep(Duration::from_millis(100)).await;
let caller_port = test.get_caller_rtp_port().unwrap();
let callee_port = test.get_callee_rtp_port().unwrap();
let caller_sdp = RtpE2eTest::generate_sdp("127.0.0.1", caller_port, 0, "PCMU");
let callee_sdp = RtpE2eTest::generate_sdp("127.0.0.1", callee_port, 0, "PCMU");
let caller = Arc::new(test.caller.take().unwrap());
let callee = test.callee.take().unwrap();
let caller_handle = crate::utils::spawn({
let c = caller.clone();
let sdp = caller_sdp.clone();
async move { c.make_call("bob", Some(sdp)).await }
});
for _ in 0..50 {
let events = callee.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
callee.answer_call(&id, Some(callee_sdp.clone())).await?;
info!("Call answered");
break;
}
}
sleep(Duration::from_millis(100)).await;
}
let _ = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
let config = RtpFlowTestConfig::default();
let (caller_result, callee_result) = test.execute_bidirectional_rtp_test(config).await?;
info!(
caller_received = caller_result.packets_received,
callee_received = callee_result.packets_received,
"RTP direct flow results"
);
assert!(
caller_result.packets_received > 0 || callee_result.packets_received > 0,
"At least some RTP packets should be received"
);
test.server.stop();
Ok(())
}
#[tokio::test]
async fn test_rtp_through_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut test = RtpE2eTest::new_with_mode(MediaProxyMode::All).await?;
test.setup_caller("alice").await?;
test.setup_callee("bob").await?;
if let Some(ref receiver) = test.callee_rtp_receiver {
receiver.start_receiving();
}
if let Some(ref receiver) = test.caller_rtp_receiver {
receiver.start_receiving();
}
let caller_port = test
.get_caller_rtp_port()
.ok_or_else(|| anyhow!("Caller RTP port not available"))?;
let callee_port = test
.get_callee_rtp_port()
.ok_or_else(|| anyhow!("Callee RTP port not available"))?;
sleep(Duration::from_millis(100)).await;
let caller_sdp = RtpE2eTest::generate_sdp("127.0.0.1", caller_port, 0, "PCMU");
let callee_sdp = RtpE2eTest::generate_sdp("127.0.0.1", callee_port, 0, "PCMU");
let caller = Arc::new(test.caller.take().unwrap());
let callee = test.callee.take().unwrap();
let caller_handle = crate::utils::spawn({
let c = caller.clone();
let sdp = caller_sdp.clone();
async move { c.make_call("bob", Some(sdp)).await }
});
let mut call_established = false;
let mut callee_received_offer_sdp: Option<String> = None;
let mut incoming_dialog_id = None;
for _ in 0..50 {
let events = callee.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer_sdp) = event {
callee_received_offer_sdp = offer_sdp;
incoming_dialog_id = Some(id.clone());
callee.answer_call(&id, Some(callee_sdp.clone())).await?;
call_established = true;
break;
}
}
if call_established {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(call_established, "Call should be established in proxy mode");
let caller_dialog_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow!("Caller task timed out"))?
.map_err(|e| anyhow!("Caller task join error: {}", e))??;
let caller_answer_sdp = caller
.get_negotiated_answer_sdp(&caller_dialog_id)
.await
.ok_or_else(|| anyhow!("Missing negotiated answer SDP on caller side"))?;
let callee_offer_sdp = callee_received_offer_sdp
.ok_or_else(|| anyhow!("Missing incoming offer SDP on callee side"))?;
let callee_to_proxy = extract_media_endpoint(&callee_offer_sdp)
.ok_or_else(|| anyhow!("Failed to parse callee-side proxy media endpoint"))?;
let caller_to_proxy = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller-side proxy media endpoint"))?;
info!(
caller_to_proxy = %caller_to_proxy,
callee_to_proxy = %callee_to_proxy,
?incoming_dialog_id,
"Extracted proxy media endpoints"
);
let caller_packets = RtpPacket::create_sequence(60, 3000, 70000, 0xA1A1A1A1, 0, 160, 160);
let callee_packets = RtpPacket::create_sequence(60, 4000, 80000, 0xB2B2B2B2, 0, 160, 160);
if let Some(ref sender) = test.caller_rtp_sender {
sender.start_sending(caller_to_proxy, caller_packets, 20);
}
if let Some(ref sender) = test.callee_rtp_sender {
sender.start_sending(callee_to_proxy, callee_packets, 20);
}
sleep(Duration::from_secs(2)).await;
if let Some(ref sender) = test.caller_rtp_sender {
sender.stop();
}
if let Some(ref sender) = test.callee_rtp_sender {
sender.stop();
}
sleep(Duration::from_millis(200)).await;
let caller_stats = if let Some(ref receiver) = test.caller_rtp_receiver {
receiver.get_stats().await
} else {
RtpStats::default()
};
let callee_stats = if let Some(ref receiver) = test.callee_rtp_receiver {
receiver.get_stats().await
} else {
RtpStats::default()
};
info!(
caller_received = caller_stats.packets_received,
callee_received = callee_stats.packets_received,
caller_ssrcs = ?caller_stats.ssrcs,
callee_ssrcs = ?callee_stats.ssrcs,
"RTP through proxy stats"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive forwarded RTP through proxy"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive forwarded RTP through proxy"
);
info!("RTP through proxy test completed with real media verification");
test.server.stop();
Ok(())
}
#[tokio::test]
async fn test_rtp_packet_integrity() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let test_ssrc = 0xDEADBEEFu32;
let test_seq_start = 1000u16;
let packets = RtpPacket::create_sequence(
50,
test_seq_start,
50000,
test_ssrc,
0, 160,
160,
);
for (i, packet) in packets.iter().enumerate() {
let encoded = packet.encode();
let decoded = RtpPacket::decode(&encoded)?;
assert_eq!(decoded.version, 2, "RTP version should be 2");
assert_eq!(decoded.payload_type, 0, "Payload type should be 0 (PCMU)");
assert_eq!(
decoded.sequence_number,
test_seq_start + i as u16,
"Sequence number mismatch"
);
assert_eq!(decoded.ssrc, test_ssrc, "SSRC mismatch");
assert_eq!(
decoded.timestamp,
50000 + (i as u32) * 160,
"Timestamp mismatch"
);
assert_eq!(decoded.payload, packet.payload, "Payload mismatch");
}
info!("RTP packet integrity test passed");
Ok(())
}
#[tokio::test]
async fn test_rtp_sequence_validation() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let packets = RtpPacket::create_sequence(
100, 5000, 100000, 0x12345678, 0, 160, 160, );
let mut last_seq: Option<u16> = None;
let mut last_ts: Option<u32> = None;
for packet in &packets {
if let Some(last) = last_seq {
let expected = last.wrapping_add(1);
assert_eq!(
packet.sequence_number, expected,
"Sequence gap detected: expected {}, got {}",
expected, packet.sequence_number
);
}
last_seq = Some(packet.sequence_number);
if let Some(last) = last_ts {
let expected = last + 160;
assert_eq!(
packet.timestamp, expected,
"Timestamp jump detected: expected {}, got {}",
expected, packet.timestamp
);
}
last_ts = Some(packet.timestamp);
assert_eq!(packet.ssrc, 0x12345678, "SSRC should be constant");
}
info!("RTP sequence validation test passed");
Ok(())
}
#[tokio::test]
async fn test_rtp_high_packet_rate() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let _config = RtpFlowTestConfig {
packet_count: 200,
interval_ms: 10,
..Default::default()
};
let mut test = RtpE2eTest::new_with_mode(MediaProxyMode::Auto).await?;
test.setup_caller("alice").await?;
test.setup_callee("bob").await?;
info!("High packet rate RTP test completed");
Ok(())
}
#[tokio::test]
async fn test_rtp_various_payload_sizes() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
for payload_size in [80, 160, 240, 320] {
let packets = RtpPacket::create_sequence(
10,
1000,
50000,
0x12345678,
0,
payload_size,
payload_size as u32,
);
assert_eq!(packets.len(), 10);
for packet in &packets {
assert_eq!(packet.payload.len(), payload_size);
let encoded = packet.encode();
let decoded = RtpPacket::decode(&encoded)?;
assert_eq!(decoded.payload.len(), payload_size);
}
info!(payload_size, "Payload size test passed");
}
Ok(())
}
#[tokio::test]
async fn test_rtp_different_codecs() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let codecs = vec![(0, "PCMU", 160), (8, "PCMA", 160), (18, "G729", 20)];
for (pt, name, frame_size) in codecs {
let packets = RtpPacket::create_sequence(
10,
1000,
50000,
0x12345678,
pt,
frame_size,
frame_size as u32,
);
for packet in &packets {
assert_eq!(
packet.payload_type, pt,
"Payload type mismatch for {}",
name
);
let encoded = packet.encode();
let decoded = RtpPacket::decode(&encoded)?;
assert_eq!(
decoded.payload_type, pt,
"Payload type not preserved for {}",
name
);
}
info!(codec = name, payload_type = pt, "Codec test passed");
}
Ok(())
}
#[tokio::test]
async fn test_full_call_with_rtp_verification() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut test = RtpE2eTest::new_with_mode(MediaProxyMode::Auto).await?;
test.setup_caller("alice").await?;
test.setup_callee("bob").await?;
let caller_port = test.get_caller_rtp_port().unwrap();
let callee_port = test.get_callee_rtp_port().unwrap();
info!(caller_port, callee_port, "RTP ports allocated");
let caller_sdp = RtpE2eTest::generate_sdp("127.0.0.1", caller_port, 0, "PCMU");
let callee_sdp = RtpE2eTest::generate_sdp("127.0.0.1", callee_port, 0, "PCMU");
if let Some(ref receiver) = test.callee_rtp_receiver {
receiver.start_receiving();
}
if let Some(ref receiver) = test.caller_rtp_receiver {
receiver.start_receiving();
}
let caller = Arc::new(test.caller.take().unwrap());
let callee = test.callee.take().unwrap();
let caller_clone = caller.clone();
let caller_handle =
crate::utils::spawn(async move { caller_clone.make_call("bob", Some(caller_sdp)).await });
let mut call_established = false;
for _ in 0..50 {
let events = callee.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
callee.answer_call(&id, Some(callee_sdp.clone())).await?;
call_established = true;
info!("Call established");
break;
}
}
if call_established {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(call_established, "Call should be established");
let _ = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
let callee_addr: SocketAddr = format!("127.0.0.1:{}", callee_port).parse()?;
let caller_addr: SocketAddr = format!("127.0.0.1:{}", caller_port).parse()?;
let caller_packets = RtpPacket::create_sequence(50, 1000, 50000, 0x11111111, 0, 160, 160);
let callee_packets = RtpPacket::create_sequence(50, 2000, 60000, 0x22222222, 0, 160, 160);
if let Some(ref sender) = test.caller_rtp_sender {
sender.start_sending(callee_addr, caller_packets, 20);
}
if let Some(ref sender) = test.callee_rtp_sender {
sender.start_sending(caller_addr, callee_packets, 20);
}
sleep(Duration::from_secs(2)).await;
if let Some(ref sender) = test.caller_rtp_sender {
sender.stop();
}
if let Some(ref sender) = test.callee_rtp_sender {
sender.stop();
}
sleep(Duration::from_millis(200)).await;
let caller_stats = if let Some(ref receiver) = test.caller_rtp_receiver {
receiver.get_stats().await
} else {
RtpStats::default()
};
let callee_stats = if let Some(ref receiver) = test.callee_rtp_receiver {
receiver.get_stats().await
} else {
RtpStats::default()
};
info!(
caller_received = caller_stats.packets_received,
caller_ssrcs = ?caller_stats.ssrcs,
callee_received = callee_stats.packets_received,
callee_ssrcs = ?callee_stats.ssrcs,
"RTP test results"
);
assert!(
caller_stats.packets_received > 0 || callee_stats.packets_received > 0,
"At least some RTP should be received"
);
info!("Full call with RTP verification test completed successfully");
test.server.stop();
Ok(())
}