use super::cdr_capture::{CdrExpectation, validate_cdr};
use super::e2e_test_server::E2eTestServer;
use super::rtp_utils::{RtpPacket, RtpReceiver, RtpSender, RtpStats, extract_media_endpoint};
use super::test_helpers;
use super::test_ua::{TestUa, TestUaEvent};
use crate::callrecord::CallRecordHangupReason;
use crate::config::MediaProxyMode;
use anyhow::{Result, anyhow};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
use test_helpers::{build_sdp, pcma_sdp, pcmu_sdp};
struct MediaTestCtx {
server: Arc<E2eTestServer>,
caller_ua: TestUa,
callee_ua: TestUa,
caller_sender: RtpSender,
caller_receiver: RtpReceiver,
callee_sender: RtpSender,
callee_receiver: RtpReceiver,
}
impl MediaTestCtx {
async fn setup() -> Result<Self> {
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_ua = server.create_ua("alice").await?;
let callee_ua = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let caller_sender = RtpSender::bind().await?;
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_sender = RtpSender::bind().await?;
let callee_receiver = RtpReceiver::bind(0).await?;
Ok(Self {
server,
caller_ua,
callee_ua,
caller_sender,
caller_receiver,
callee_sender,
callee_receiver,
})
}
fn caller_rtp_port(&self) -> u16 {
self.caller_receiver.port().unwrap()
}
fn callee_rtp_port(&self) -> u16 {
self.callee_receiver.port().unwrap()
}
async fn establish_call(
&self,
caller_sdp: String,
callee_sdp: String,
) -> Result<(DialogId, DialogId, Option<String>)> {
let caller = Arc::new(self.caller_ua.clone());
let caller_handle =
tokio::spawn(async move { caller.make_call("bob", Some(caller_sdp)).await });
let mut callee_dialog_id = None;
let mut received_offer_sdp: Option<String> = None;
for _ in 0..50 {
let events = self.callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
callee_dialog_id = Some(id.clone());
received_offer_sdp = offer.clone();
self.callee_ua
.answer_call(&id, Some(callee_sdp.clone()))
.await?;
info!("Callee answered call");
break;
}
}
if callee_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let callee_id = callee_dialog_id.ok_or_else(|| anyhow!("Callee never received INVITE"))?;
let caller_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow!("Caller timed out waiting for call establishment"))?
.map_err(|e| anyhow!("Caller task join error: {}", e))?
.map_err(|e| anyhow!("Call failed: {}", e))?;
Ok((caller_id, callee_id, received_offer_sdp))
}
async fn exchange_rtp(
&self,
caller_target: SocketAddr,
callee_target: SocketAddr,
payload_type: u8,
payload_size: usize,
duration_ms: u64,
) -> Result<(RtpStats, RtpStats)> {
let caller_ssrc = 0xA1A1A1A1u32;
let callee_ssrc = 0xB2B2B2B2u32;
let packet_interval_ms: u64 = 20; let packet_count = (duration_ms / packet_interval_ms) as usize;
let ts_increment = if payload_type == 0 || payload_type == 8 {
160 } else {
960 };
let caller_packets = RtpPacket::create_sequence(
packet_count,
1000,
50000,
caller_ssrc,
payload_type,
payload_size,
ts_increment,
);
let callee_packets = RtpPacket::create_sequence(
packet_count,
2000,
60000,
callee_ssrc,
payload_type,
payload_size,
ts_increment,
);
self.caller_receiver.start_receiving();
self.callee_receiver.start_receiving();
self.caller_sender
.start_sending(callee_target, caller_packets, packet_interval_ms);
self.callee_sender
.start_sending(caller_target, callee_packets, packet_interval_ms);
sleep(Duration::from_millis(duration_ms + 500)).await;
self.caller_sender.stop();
self.callee_sender.stop();
sleep(Duration::from_millis(200)).await;
let caller_stats = self.caller_receiver.get_stats().await;
let callee_stats = self.callee_receiver.get_stats().await;
Ok((caller_stats, callee_stats))
}
async fn verify_cdr(&self, expectation: &CdrExpectation) -> Result<()> {
sleep(Duration::from_millis(800)).await;
let records = self.server.cdr_capture.get_all_records().await;
assert!(
!records.is_empty(),
"Expected at least one CDR record, got none"
);
let record = &records[0];
info!(
call_id = %record.call_id,
status = %record.details.status,
hangup_reason = ?record.hangup_reason,
caller = %record.caller,
callee = %record.callee,
"CDR record"
);
let result = validate_cdr(record, expectation);
if !result.is_valid {
for error in &result.errors {
warn!("CDR validation error: {}", error);
}
panic!("CDR validation failed:\n{}", result.errors.join("\n"));
}
Ok(())
}
fn cleanup(&self) {
self.caller_receiver.stop();
self.callee_receiver.stop();
self.server.stop();
}
}
use rsipstack::dialog::DialogId;
#[tokio::test]
async fn test_p2p_caller_hangup_rtp_and_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let (caller_id, callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
info!(
"Call established: caller={}, callee={}",
caller_id, callee_id
);
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee-side proxy media endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller-side proxy media endpoint"))?;
info!(
"RTP targets: caller→{} callee→{}",
callee_target, caller_target
);
let (caller_stats, callee_stats) = ctx
.exchange_rtp(caller_target, callee_target, 0, 160, 2000)
.await?;
info!(
caller_received = caller_stats.packets_received,
callee_received = callee_stats.packets_received,
"RTP exchange complete"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP from caller through proxy (got 0)"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive RTP from callee through proxy (got 0)"
);
assert!(
callee_stats.payload_types.contains(&0),
"Callee should receive PCMU (PT 0), got {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&0),
"Caller should receive PCMU (PT 0), got {:?}",
caller_stats.payload_types
);
let callee_loss = callee_stats.packet_loss_rate();
assert!(
callee_loss < 0.10,
"Callee packet loss too high: {:.1}% (> 10%)",
callee_loss * 100.0
);
let caller_loss = caller_stats.packet_loss_rate();
assert!(
caller_loss < 0.10,
"Caller packet loss too high: {:.1}% (> 10%)",
caller_loss * 100.0
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller)
.with_caller("alice")
.with_callee("bob")
.with_duration_range(1, 6),
)
.await?;
info!("test_p2p_caller_hangup_rtp_and_cdr PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_callee_hangup_rtp_and_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let (caller_id, callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee-side proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller-side proxy endpoint"))?;
let (caller_stats, callee_stats) = ctx
.exchange_rtp(caller_target, callee_target, 0, 160, 1500)
.await?;
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive RTP"
);
ctx.callee_ua.hangup(&callee_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCallee)
.with_caller("alice")
.with_callee("bob")
.with_duration_range(1, 6),
)
.await?;
info!("test_p2p_callee_hangup_rtp_and_cdr PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_callee_reject_486_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle =
tokio::spawn(async move { alice_clone.make_call("bob", Some(sdp_clone)).await });
let mut bob_rejected = false;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob.reject_call_with_reason(&id, Some(486), Some("Busy Here".to_string()))
.await?;
bob_rejected = true;
info!("Bob rejected with 486");
break;
}
}
if bob_rejected {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(
bob_rejected,
"Bob should have received and rejected the call"
);
let call_result = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
match call_result {
Ok(Ok(Err(e))) => {
let err_str = e.to_string();
assert!(
err_str.contains("486"),
"Alice should receive 486, but got: {}",
err_str
);
info!("Alice correctly received 486 Busy Here");
}
Ok(Ok(Ok(_))) => panic!("Call should have been rejected, not established"),
_ => panic!("Unexpected call result: {:?}", call_result),
}
sleep(Duration::from_millis(1500)).await;
let records = server.cdr_capture.get_all_records().await;
if !records.is_empty() {
let record = &records[0];
info!(
status = %record.details.status,
hangup_reason = ?record.hangup_reason,
status_code = record.status_code,
"CDR for rejected call"
);
assert!(
record.details.status == "failed",
"Expected status 'failed', got '{}'",
record.details.status
);
assert!(
matches!(
record.hangup_reason,
Some(CallRecordHangupReason::Rejected) | Some(CallRecordHangupReason::Failed)
),
"Expected Rejected or Failed, got {:?}",
record.hangup_reason
);
} else {
warn!("No CDR generated for rejected call — 486 passthrough verified at SIP level");
}
server.stop();
info!("test_p2p_callee_reject_486_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_p2p_caller_cancel_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle =
tokio::spawn(async move { alice_clone.make_call("bob", Some(sdp_clone)).await });
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id);
info!("Bob received INVITE (ringing)");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_dialog_id.is_some(), "Bob should receive INVITE");
sleep(Duration::from_millis(200)).await;
if let Some(ref id) = bob_dialog_id {
let _ = id; }
caller_handle.abort();
sleep(Duration::from_millis(1000)).await;
let records = server.cdr_capture.get_all_records().await;
if !records.is_empty() {
let record = &records[0];
info!(
status = %record.details.status,
hangup_reason = ?record.hangup_reason,
"CDR for canceled call"
);
assert!(
record.details.status == "failed" || record.details.status == "missed",
"Expected status 'failed' or 'missed', got '{}'",
record.details.status
);
} else {
warn!("No CDR generated for canceled call — may be expected for early cancel");
}
server.stop();
info!("test_p2p_caller_cancel_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_p2p_pcmu_codec_through_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = build_sdp(
"127.0.0.1",
caller_port,
&[(0, "PCMU/8000"), (101, "telephone-event/8000")],
);
let callee_sdp = build_sdp(
"127.0.0.1",
callee_port,
&[(0, "PCMU/8000"), (101, "telephone-event/8000")],
);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller proxy endpoint"))?;
let (caller_stats, callee_stats) = ctx
.exchange_rtp(caller_target, callee_target, 0, 160, 2000)
.await?;
info!(
caller_received = caller_stats.packets_received,
caller_pts = ?caller_stats.payload_types,
caller_ssrcs = ?caller_stats.ssrcs,
callee_received = callee_stats.packets_received,
callee_pts = ?callee_stats.payload_types,
callee_ssrcs = ?callee_stats.ssrcs,
"PCMU codec test results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive PCMU packets"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive PCMU packets"
);
assert!(
callee_stats.payload_types.contains(&0),
"Callee received wrong codec (expected PT 0 PCMU): {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&0),
"Caller received wrong codec (expected PT 0 PCMU): {:?}",
caller_stats.payload_types
);
assert!(
callee_stats.payload_types.iter().all(|pt| *pt == 0),
"Callee should only see PT 0, got: {:?}",
callee_stats.payload_types
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller),
)
.await?;
info!("test_p2p_pcmu_codec_through_proxy PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_pcma_codec_through_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = pcma_sdp("127.0.0.1", caller_port);
let callee_sdp = pcma_sdp("127.0.0.1", callee_port);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller proxy endpoint"))?;
let (caller_stats, callee_stats) = ctx
.exchange_rtp(caller_target, callee_target, 8, 160, 2000)
.await?;
info!(
caller_received = caller_stats.packets_received,
caller_pts = ?caller_stats.payload_types,
callee_received = callee_stats.packets_received,
callee_pts = ?callee_stats.payload_types,
"PCMA codec test results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive PCMA packets"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive PCMA packets"
);
assert!(
callee_stats.payload_types.contains(&8),
"Callee received wrong codec (expected PT 8 PCMA): {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&8),
"Caller received wrong codec (expected PT 8 PCMA): {:?}",
caller_stats.payload_types
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller),
)
.await?;
info!("test_p2p_pcma_codec_through_proxy PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_no_answer_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let caller_handle = tokio::spawn(async move {
tokio::time::timeout(
Duration::from_secs(3),
alice_clone.make_call("bob", Some(sdp)),
)
.await
});
let mut bob_received = false;
for _ in 0..30 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_received = true;
info!("Bob received INVITE but won't answer");
let _ = id;
}
}
if bob_received {
break;
}
sleep(Duration::from_millis(100)).await;
}
let call_result = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
info!("Call result after no answer: {:?}", call_result.is_ok());
sleep(Duration::from_millis(800)).await;
let records = server.cdr_capture.get_all_records().await;
if !records.is_empty() {
let record = &records[0];
info!(
status = %record.details.status,
hangup_reason = ?record.hangup_reason,
"CDR for no-answer call"
);
assert_ne!(
record.details.status, "completed",
"No-answer call should not have 'completed' status"
);
}
server.stop();
info!("test_p2p_no_answer_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_p2p_two_concurrent_calls_rtp_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server1 = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let server2 = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice1 = Arc::new(server1.create_ua("alice").await?);
let bob1 = server1.create_ua("bob").await?;
let alice2 = Arc::new(server2.create_ua("alice").await?);
let bob2 = server2.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice1_c = alice1.clone();
let sdp1 = sdp.clone();
let h1 = tokio::spawn(async move { alice1_c.make_call("bob", Some(sdp1)).await });
let mut bob1_id = None;
for _ in 0..50 {
let events = bob1.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob1_id = Some(id.clone());
bob1.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob1_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice1_id = tokio::time::timeout(Duration::from_secs(5), h1)
.await
.map_err(|_| anyhow!("Call 1 timeout"))?
.map_err(|e| anyhow!("Call 1 join: {}", e))?
.map_err(|e| anyhow!("Call 1 failed: {}", e))?;
let alice2_c = alice2.clone();
let sdp2 = sdp.clone();
let h2 = tokio::spawn(async move { alice2_c.make_call("bob", Some(sdp2)).await });
let mut bob2_id = None;
for _ in 0..50 {
let events = bob2.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob2_id = Some(id.clone());
bob2.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob2_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _alice2_id = tokio::time::timeout(Duration::from_secs(5), h2)
.await
.map_err(|_| anyhow!("Call 2 timeout"))?
.map_err(|e| anyhow!("Call 2 join: {}", e))?
.map_err(|e| anyhow!("Call 2 failed: {}", e))?;
sleep(Duration::from_millis(500)).await;
alice1.hangup(&alice1_id).await?;
if let Some(ref id) = bob2_id {
bob2.hangup(id).await?;
}
sleep(Duration::from_millis(800)).await;
let records1 = server1.cdr_capture.get_all_records().await;
assert!(!records1.is_empty(), "Server 1 should have CDR");
let record1 = &records1[0];
assert_eq!(
record1.hangup_reason,
Some(CallRecordHangupReason::ByCaller),
"Call 1 should be ByCaller"
);
let records2 = server2.cdr_capture.get_all_records().await;
assert!(!records2.is_empty(), "Server 2 should have CDR");
let record2 = &records2[0];
assert_eq!(
record2.hangup_reason,
Some(CallRecordHangupReason::ByCallee),
"Call 2 should be ByCallee"
);
server1.stop();
server2.stop();
info!("test_p2p_two_concurrent_calls_rtp_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_rtp_payload_integrity_through_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller proxy endpoint"))?;
ctx.callee_receiver.start_receiving();
ctx.caller_receiver.start_receiving();
let mut packets = Vec::new();
for i in 0..50u16 {
let mut payload = vec![0u8; 160];
payload[0] = (i >> 8) as u8;
payload[1] = (i & 0xFF) as u8;
payload[2] = 0xDE;
payload[3] = 0xAD;
for (j, byte) in payload.iter_mut().enumerate().skip(4) {
*byte = ((i as u8).wrapping_add(j as u8)) ^ 0x55;
}
packets.push(RtpPacket::new(
0, 5000 + i,
100000 + (i as u32) * 160,
0xCAFEBABE,
payload,
));
}
let dummy_callee_packets = RtpPacket::create_sequence(50, 7000, 80000, 0xBBBBBBBB, 0, 160, 160);
ctx.callee_sender
.start_sending(caller_target, dummy_callee_packets, 20);
sleep(Duration::from_millis(200)).await;
ctx.caller_sender
.start_sending(callee_target, packets.clone(), 20);
sleep(Duration::from_millis(1500)).await;
ctx.caller_sender.stop();
ctx.callee_sender.stop();
let callee_stats = ctx.callee_receiver.get_stats().await;
info!(
received = callee_stats.packets_received,
pts = ?callee_stats.payload_types,
ssrcs = ?callee_stats.ssrcs,
"Payload integrity test results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP packets through proxy"
);
if !callee_stats.ssrcs.contains(&0xCAFEBABE) {
warn!(
"Proxy rewrote SSRC: expected 0xCAFEBABE, got {:?} (expected for B2BUA)",
callee_stats.ssrcs
);
}
assert!(
callee_stats.payload_types.contains(&0),
"Callee should see PT 0 (PCMU), got: {:?}",
callee_stats.payload_types
);
if !callee_stats.seq_num_gaps.is_empty() {
warn!(
"Sequence gaps through proxy: {:?}",
callee_stats.seq_num_gaps
);
}
ctx.caller_ua.hangup(&caller_id).await.ok();
ctx.cleanup();
info!("test_rtp_payload_integrity_through_proxy PASSED");
Ok(())
}
#[tokio::test]
async fn test_p2p_direct_media_none_mode() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::None).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
let alice_sender = RtpSender::bind().await?;
let alice_receiver = RtpReceiver::bind(0).await?;
let bob_sender = RtpSender::bind().await?;
let bob_receiver = RtpReceiver::bind(0).await?;
let alice_port = alice_receiver.port()?;
let bob_port = bob_receiver.port()?;
sleep(Duration::from_millis(100)).await;
let alice_sdp = pcmu_sdp("127.0.0.1", alice_port);
let bob_sdp = pcmu_sdp("127.0.0.1", bob_port);
let alice_clone = alice.clone();
let alice_sdp_clone = alice_sdp.clone();
let caller_handle =
tokio::spawn(async move { alice_clone.make_call("bob", Some(alice_sdp_clone)).await });
let mut bob_dialog_id = None;
let mut bob_offer_sdp = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
bob_dialog_id = Some(id.clone());
bob_offer_sdp = offer;
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow!("timeout"))?
.map_err(|e| anyhow!("join: {}", e))?
.map_err(|e| anyhow!("call: {}", e))?;
let alice_answer = alice.get_negotiated_answer_sdp(&alice_id).await;
let bob_offer = bob_offer_sdp.unwrap_or_default();
let callee_target = extract_media_endpoint(&bob_offer)
.unwrap_or_else(|| format!("127.0.0.1:{}", bob_port).parse().unwrap());
let caller_target = alice_answer
.as_ref()
.and_then(|s| extract_media_endpoint(s))
.unwrap_or_else(|| format!("127.0.0.1:{}", alice_port).parse().unwrap());
alice_receiver.start_receiving();
bob_receiver.start_receiving();
let alice_packets = RtpPacket::create_sequence(50, 1000, 50000, 0xAAAA, 0, 160, 160);
let bob_packets = RtpPacket::create_sequence(50, 2000, 60000, 0xBBBB, 0, 160, 160);
alice_sender.start_sending(callee_target, alice_packets, 20);
bob_sender.start_sending(caller_target, bob_packets, 20);
sleep(Duration::from_millis(1500)).await;
alice_sender.stop();
bob_sender.stop();
let alice_stats = alice_receiver.get_stats().await;
let bob_stats = bob_receiver.get_stats().await;
info!(
alice_received = alice_stats.packets_received,
bob_received = bob_stats.packets_received,
"Direct media (None mode) results"
);
assert!(
alice_stats.packets_received > 0 || bob_stats.packets_received > 0,
"At least one side should receive RTP in None mode"
);
alice.hangup(&alice_id).await?;
sleep(Duration::from_millis(500)).await;
let records = server.cdr_capture.get_all_records().await;
assert!(!records.is_empty(), "Should have CDR");
let record = &records[0];
assert!(
matches!(record.hangup_reason, Some(CallRecordHangupReason::ByCaller)),
"Expected ByCaller, got {:?}",
record.hangup_reason
);
alice_receiver.stop();
bob_receiver.stop();
server.stop();
info!("test_p2p_direct_media_none_mode PASSED");
Ok(())
}
#[tokio::test]
async fn test_p2p_unidirectional_rtp_caller_only() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
info!("Call established: caller={}", caller_id);
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee-side proxy media endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller-side proxy media endpoint"))?;
info!(
caller_port = caller_port,
callee_port = callee_port,
callee_target = %callee_target,
caller_target = %caller_target,
"RTP targets: caller→callee_proxy={}, callee→caller_proxy={}",
callee_target, caller_target
);
ctx.callee_receiver.start_receiving();
ctx.caller_receiver.start_receiving();
let caller_ssrc = 0xA1A1A1A1u32;
let packet_count = 100usize;
let caller_packets =
RtpPacket::create_sequence(packet_count, 1000, 50000, caller_ssrc, 0, 160, 160);
ctx.caller_sender
.start_sending(caller_target, caller_packets, 20);
sleep(Duration::from_millis(packet_count as u64 * 20 + 500)).await;
ctx.caller_sender.stop();
sleep(Duration::from_millis(200)).await;
let callee_stats = ctx.callee_receiver.get_stats().await;
let caller_stats = ctx.caller_receiver.get_stats().await;
info!(
callee_received = callee_stats.packets_received,
callee_pts = ?callee_stats.payload_types,
callee_ssrcs = ?callee_stats.ssrcs,
caller_received = caller_stats.packets_received,
"Unidirectional RTP test results (caller-only sending)"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP from caller through proxy (got 0 packets) — \
unidirectional forwarding is broken"
);
assert!(
callee_stats.payload_types.contains(&0),
"Callee should receive PCMU (PT 0), got {:?}",
callee_stats.payload_types
);
assert_eq!(
caller_stats.packets_received, 0,
"Caller should NOT receive RTP when callee is silent"
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller)
.with_caller("alice")
.with_callee("bob")
.with_duration_range(1, 10),
)
.await?;
info!("test_p2p_unidirectional_rtp_caller_only PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_g722_codec_through_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = build_sdp(
"127.0.0.1",
caller_port,
&[(9, "G722/8000"), (101, "telephone-event/8000")],
);
let callee_sdp = build_sdp(
"127.0.0.1",
callee_port,
&[(9, "G722/8000"), (101, "telephone-event/8000")],
);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller proxy endpoint"))?;
let (caller_stats, callee_stats) = ctx.exchange_rtp(caller_target, callee_target, 9, 160, 2000).await?;
info!(
caller_received = caller_stats.packets_received,
caller_pts = ?caller_stats.payload_types,
callee_received = callee_stats.packets_received,
callee_pts = ?callee_stats.payload_types,
"G722 codec test results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive G722 packets"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive G722 packets"
);
assert!(
callee_stats.payload_types.contains(&9),
"Callee received wrong codec (expected PT 9 G722): {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&9),
"Caller received wrong codec (expected PT 9 G722): {:?}",
caller_stats.payload_types
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller),
)
.await?;
info!("test_p2p_g722_codec_through_proxy PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_g729_codec_through_proxy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = build_sdp(
"127.0.0.1",
caller_port,
&[
(18, "G729/8000"),
(0, "PCMU/8000"),
(101, "telephone-event/8000"),
],
);
let callee_sdp = build_sdp(
"127.0.0.1",
callee_port,
&[
(18, "G729/8000"),
(0, "PCMU/8000"),
(101, "telephone-event/8000"),
],
);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller proxy endpoint"))?;
let (caller_stats, callee_stats) = ctx
.exchange_rtp(caller_target, callee_target, 18, 20, 2000)
.await?;
info!(
caller_received = caller_stats.packets_received,
caller_pts = ?caller_stats.payload_types,
callee_received = callee_stats.packets_received,
callee_pts = ?callee_stats.payload_types,
"G729 codec test results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive G729 packets"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive G729 packets"
);
assert!(
callee_stats.payload_types.contains(&18) || callee_stats.payload_types.contains(&0),
"Callee should receive G729(18) or PCMU(0), got: {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&18) || caller_stats.payload_types.contains(&0),
"Caller should receive G729(18) or PCMU(0), got: {:?}",
caller_stats.payload_types
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller),
)
.await?;
info!("test_p2p_g729_codec_through_proxy PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_multi_codec_offer_call_establishes() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let alice_receiver = RtpReceiver::bind(0).await?;
let bob_receiver = RtpReceiver::bind(0).await?;
let alice_sender = RtpSender::bind().await?;
let bob_sender = RtpSender::bind().await?;
let alice_port = alice_receiver.port()?;
let bob_port = bob_receiver.port()?;
let alice_sdp = build_sdp(
"127.0.0.1",
alice_port,
&[
(9, "G722/8000"),
(0, "PCMU/8000"),
(8, "PCMA/8000"),
(101, "telephone-event/8000"),
],
);
let bob_sdp = build_sdp(
"127.0.0.1",
bob_port,
&[
(0, "PCMU/8000"),
(9, "G722/8000"),
(8, "PCMA/8000"),
(101, "telephone-event/8000"),
],
);
let alice_clone = alice.clone();
let alice_sdp_clone = alice_sdp.clone();
let caller_handle =
tokio::spawn(async move { alice_clone.make_call("bob", Some(alice_sdp_clone)).await });
let mut bob_dialog_id = None;
let mut bob_offer_sdp = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
bob_dialog_id = Some(id.clone());
bob_offer_sdp = offer;
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow!("timeout"))?
.map_err(|e| anyhow!("join: {}", e))?
.map_err(|e| anyhow!("call: {}", e))?;
let alice_answer = alice.get_negotiated_answer_sdp(&alice_id).await;
info!(
"Multi-codec: alice answer SDP:\n{:?}",
alice_answer.as_ref().map(|s| s.chars().take(300).collect::<String>())
);
info!(
"Multi-codec: bob offer SDP:\n{:?}",
bob_offer_sdp.as_ref().map(|s| s.chars().take(300).collect::<String>())
);
let alice_answer_sdp = alice_answer.unwrap_or_default();
let bob_offer_sdp = bob_offer_sdp.unwrap_or_default();
let callee_target = extract_media_endpoint(&bob_offer_sdp)
.unwrap_or_else(|| format!("127.0.0.1:{}", bob_port).parse().unwrap());
let caller_target = extract_media_endpoint(&alice_answer_sdp)
.unwrap_or_else(|| format!("127.0.0.1:{}", alice_port).parse().unwrap());
alice_receiver.start_receiving();
bob_receiver.start_receiving();
let alice_packets = RtpPacket::create_sequence(50, 1000, 50000, 0xAAAA, 0, 160, 160);
let bob_packets = RtpPacket::create_sequence(50, 2000, 60000, 0xBBBB, 0, 160, 160);
alice_sender.start_sending(callee_target, alice_packets, 20);
bob_sender.start_sending(caller_target, bob_packets, 20);
sleep(Duration::from_millis(1500)).await;
alice_sender.stop();
bob_sender.stop();
let alice_stats = alice_receiver.get_stats().await;
let bob_stats = bob_receiver.get_stats().await;
info!(
alice_received = alice_stats.packets_received,
alice_pts = ?alice_stats.payload_types,
bob_received = bob_stats.packets_received,
bob_pts = ?bob_stats.payload_types,
"Multi-codec test results"
);
assert!(
alice_stats.packets_received > 0 || bob_stats.packets_received > 0,
"Multi-codec call should have bidirectional RTP"
);
alice.hangup(&alice_id).await?;
sleep(Duration::from_millis(500)).await;
let records = server.cdr_capture.get_all_records().await;
assert!(!records.is_empty(), "Should have CDR");
assert_eq!(
records[0].details.status, "completed",
"Call should complete"
);
alice_receiver.stop();
bob_receiver.stop();
server.stop();
info!("test_p2p_multi_codec_offer_call_establishes PASSED");
Ok(())
}
#[tokio::test]
async fn test_p2p_g722_bidirectional_integrity() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let g722_sdp = |ip: &str, port: u16| -> String {
build_sdp(ip, port, &[(9, "G722/8000"), (101, "telephone-event/8000")])
};
let caller_sdp = g722_sdp("127.0.0.1", caller_port);
let callee_sdp = g722_sdp("127.0.0.1", callee_port);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow!("No offer SDP on callee side"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer_sdp)
.ok_or_else(|| anyhow!("Failed to parse caller proxy endpoint"))?;
let (caller_stats, callee_stats) = ctx
.exchange_rtp(caller_target, callee_target, 9, 160, 3000)
.await?;
let callee_loss = callee_stats.packet_loss_rate();
let caller_loss = caller_stats.packet_loss_rate();
info!(
caller_received = caller_stats.packets_received,
caller_loss = format!("{:.1}%", caller_loss * 100.0),
callee_received = callee_stats.packets_received,
callee_loss = format!("{:.1}%", callee_loss * 100.0),
"G722 bidirectional integrity results"
);
assert!(
callee_stats.packets_received > 50,
"Callee should receive substantial G722 packets (got {})",
callee_stats.packets_received
);
assert!(
caller_stats.packets_received > 50,
"Caller should receive substantial G722 packets (got {})",
caller_stats.packets_received
);
assert!(
callee_loss < 0.10,
"G722 callee packet loss should be < 10%, got {:.1}%",
callee_loss * 100.0
);
assert!(
caller_loss < 0.10,
"G722 caller packet loss should be < 10%, got {:.1}%",
caller_loss * 100.0
);
ctx.caller_ua.hangup(&caller_id).await?;
ctx.verify_cdr(
&CdrExpectation::default()
.with_status("completed")
.with_hangup_reason(CallRecordHangupReason::ByCaller),
)
.await?;
info!("test_p2p_g722_bidirectional_integrity PASSED");
ctx.cleanup();
Ok(())
}
#[tokio::test]
async fn test_p2p_opus_dynamic_pt_sdp_negotiation() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let bob_port = portpicker::pick_unused_port().unwrap();
let alice_sdp = build_sdp(
"127.0.0.1",
portpicker::pick_unused_port().unwrap(),
&[
(96, "opus/48000/2"),
(0, "PCMU/8000"),
(101, "telephone-event/8000"),
],
);
let bob_sdp = build_sdp(
"127.0.0.1",
bob_port,
&[
(96, "opus/48000/2"),
(0, "PCMU/8000"),
(101, "telephone-event/8000"),
],
);
let alice_clone = alice.clone();
let alice_sdp_clone = alice_sdp.clone();
let caller_handle =
tokio::spawn(async move { alice_clone.make_call("bob", Some(alice_sdp_clone)).await });
let mut bob_dialog_id = None;
let mut bob_offer_sdp = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
bob_dialog_id = Some(id.clone());
bob_offer_sdp = offer;
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow!("timeout"))?
.map_err(|e| anyhow!("join: {}", e))?
.map_err(|e| anyhow!("call: {}", e))?;
let alice_answer = alice
.get_negotiated_answer_sdp(&alice_id)
.await
.ok_or_else(|| anyhow!("No answer SDP"))?;
let bob_offer = bob_offer_sdp.ok_or_else(|| anyhow!("No offer SDP"))?;
assert!(
alice_answer.contains("opus/48000") || alice_answer.contains("PCMU/8000"),
"Proxy answer must include at least one negotiated codec (opus or PCMU)"
);
assert!(
bob_offer.contains("PCMU/8000") || bob_offer.contains("opus/48000"),
"Proxy offer to callee must include negotiated codecs"
);
assert!(!alice_answer.is_empty(), "Caller answer SDP must not be empty");
assert!(!bob_offer.is_empty(), "Callee offer SDP must not be empty");
alice.hangup(&alice_id).await?;
sleep(Duration::from_millis(500)).await;
let records = server.cdr_capture.get_all_records().await;
assert!(!records.is_empty(), "Should have CDR");
server.stop();
info!("test_p2p_opus_dynamic_pt_sdp_negotiation PASSED");
Ok(())
}
#[tokio::test]
async fn test_sdp_answer_filters_to_offered_codecs() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let ctx = MediaTestCtx::setup().await?;
let caller_port = ctx.caller_rtp_port();
let callee_port = ctx.callee_rtp_port();
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let (caller_id, _callee_id, callee_offer_sdp) =
ctx.establish_call(caller_sdp, callee_sdp).await?;
let caller_answer_sdp = ctx
.caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow!("No answer SDP on caller side"))?;
assert!(
caller_answer_sdp.contains("PCMU") || caller_answer_sdp.contains("rtpmap:0"),
"Answer must contain PCMU"
);
if let Some(callee_offer) = callee_offer_sdp {
info!("Callee offer SDP (from proxy):\n{}", callee_offer);
}
ctx.caller_ua.hangup(&caller_id).await.ok();
ctx.cleanup();
info!("test_sdp_answer_filters_to_offered_codecs PASSED");
Ok(())
}