use super::e2e_test_server::E2eTestServer;
use super::rtp_utils::{RtpPacket, RtpReceiver, RtpSender, RtpStats, extract_media_endpoint};
use super::test_helpers;
use super::test_ua::TestUaEvent;
use crate::callrecord::CallRecordHangupReason;
use crate::config::MediaProxyMode;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
use test_helpers::{pcma_sdp, pcmu_sdp};
struct EstablishedCall {
caller_id: rsipstack::dialog::DialogId,
callee_id: rsipstack::dialog::DialogId,
caller_target: std::net::SocketAddr,
callee_target: std::net::SocketAddr,
}
async fn establish_call(
server: &E2eTestServer,
caller: &str,
callee: &str,
caller_rtp_port: u16,
callee_rtp_port: u16,
) -> Result<(
EstablishedCall,
super::test_ua::TestUa,
super::test_ua::TestUa,
)> {
let caller_ua = Arc::new(server.create_ua(caller).await?);
let callee_ua = server.create_ua(callee).await?;
sleep(Duration::from_millis(100)).await;
let caller_sdp = pcmu_sdp("127.0.0.1", caller_rtp_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_rtp_port);
let caller_clone = caller_ua.clone();
let callee_str = callee.to_string();
let caller_handle =
crate::utils::spawn(
async move { caller_clone.make_call(&callee_str, Some(caller_sdp)).await },
);
let mut callee_dialog_id = None;
let mut callee_offer_sdp: Option<String> = None;
for _ in 0..50 {
let events = callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
callee_dialog_id = Some(id.clone());
callee_offer_sdp = offer;
callee_ua.answer_call(&id, Some(callee_sdp.clone())).await?;
break;
}
}
if callee_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let callee_id =
callee_dialog_id.ok_or_else(|| anyhow::anyhow!("Callee never received INVITE"))?;
let caller_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("Caller timed out"))?
.map_err(|e| anyhow::anyhow!("Join error: {}", e))?
.map_err(|e| anyhow::anyhow!("Call error: {}", e))?;
let caller_answer = caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow::anyhow!("No answer SDP on caller"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow::anyhow!("No offer SDP on callee"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow::anyhow!("Failed to parse callee proxy endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer)
.ok_or_else(|| anyhow::anyhow!("Failed to parse caller proxy endpoint"))?;
Ok((
EstablishedCall {
caller_id,
callee_id,
caller_target,
callee_target,
},
Arc::try_unwrap(caller_ua).unwrap_or_else(|_| panic!("caller_ua still has refs")),
callee_ua,
))
}
#[allow(clippy::too_many_arguments)]
async fn exchange_rtp(
caller_sender: &RtpSender,
callee_sender: &RtpSender,
caller_receiver: &RtpReceiver,
callee_receiver: &RtpReceiver,
caller_target: std::net::SocketAddr,
callee_target: std::net::SocketAddr,
payload_type: u8,
duration_ms: u64,
) -> Result<(RtpStats, RtpStats)> {
caller_receiver.start_receiving();
callee_receiver.start_receiving();
let packet_count = (duration_ms / 20) as usize;
let caller_ssrc = 0xA1A1A1A1u32;
let callee_ssrc = 0xB2B2B2B2u32;
let caller_packets = RtpPacket::create_sequence(
packet_count,
1000,
50000,
caller_ssrc,
payload_type,
160,
160,
);
let callee_packets = RtpPacket::create_sequence(
packet_count,
2000,
60000,
callee_ssrc,
payload_type,
160,
160,
);
caller_sender.start_sending(callee_target, caller_packets, 20);
callee_sender.start_sending(caller_target, callee_packets, 20);
sleep(Duration::from_millis(duration_ms + 500)).await;
caller_sender.stop();
callee_sender.stop();
sleep(Duration::from_millis(200)).await;
let caller_stats = caller_receiver.get_stats().await;
let callee_stats = callee_receiver.get_stats().await;
Ok((caller_stats, callee_stats))
}
async fn wait_for_cdr(server: &E2eTestServer, timeout_ms: u64) -> Result<()> {
sleep(Duration::from_millis(timeout_ms)).await;
let records = server.cdr_capture.get_all_records().await;
assert!(!records.is_empty(), "Should have at least one CDR record");
let record = &records[0];
info!(
call_id = %record.call_id,
status = %record.details.status,
direction = %record.details.direction,
hangup_reason = ?record.hangup_reason,
caller = %record.caller,
callee = %record.callee,
sip_trunk_id = ?record.details.sip_trunk_id,
sip_gateway = ?record.details.sip_gateway,
"CDR record"
);
Ok(())
}
async fn send_dtmf_and_wait(
sender: Arc<super::test_ua::TestUa>,
dialog_id: rsipstack::dialog::DialogId,
receiver: &super::test_ua::TestUa,
expected_digit: &str,
timeout: Duration,
) -> Result<bool> {
let digit_for_send = expected_digit.to_string();
let send_handle =
crate::utils::spawn(
async move { sender.send_dtmf_info(&dialog_id, &digit_for_send).await },
);
let deadline = tokio::time::Instant::now() + timeout;
let mut received = false;
while tokio::time::Instant::now() < deadline {
let events = receiver.process_dialog_events().await?;
for event in events {
if let TestUaEvent::DtmfInfo(_, digit) = event
&& digit == expected_digit
{
received = true;
break;
}
}
if received {
break;
}
sleep(Duration::from_millis(50)).await;
}
let send_result = tokio::time::timeout(Duration::from_secs(5), send_handle)
.await
.map_err(|_| anyhow::anyhow!("SIP INFO send timed out"))?;
send_result.map_err(|e| anyhow::anyhow!("SIP INFO send task failed: {}", e))??;
Ok(received)
}
#[tokio::test]
async fn test_trunk_b2bua_inbound_caller_hangup_rtp_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let callee_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_port = callee_receiver.port()?;
let (call, caller_ua, _callee_ua) =
establish_call(&server, "alice", "bob", caller_port, callee_port).await?;
info!("Wholesale inbound call established");
let (caller_stats, callee_stats) = exchange_rtp(
&caller_sender,
&callee_sender,
&caller_receiver,
&callee_receiver,
call.caller_target,
call.callee_target,
0,
2000,
)
.await?;
info!(
caller_received = caller_stats.packets_received,
callee_received = callee_stats.packets_received,
"Wholesale RTP results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP from caller"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive RTP from callee"
);
assert!(
callee_stats.payload_types.contains(&0),
"Callee should see PCMU (PT 0), got {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&0),
"Caller should see PCMU (PT 0), got {:?}",
caller_stats.payload_types
);
assert!(
callee_stats.packet_loss_rate() < 0.10,
"Callee packet loss too high: {:.1}%",
callee_stats.packet_loss_rate() * 100.0
);
caller_ua.hangup(&call.caller_id).await?;
wait_for_cdr(&server, 800).await?;
let records = server.cdr_capture.get_all_records().await;
let record = &records[0];
assert_eq!(record.details.status, "completed");
assert!(
matches!(record.hangup_reason, Some(CallRecordHangupReason::ByCaller)),
"Expected ByCaller, got {:?}",
record.hangup_reason
);
caller_receiver.stop();
callee_receiver.stop();
server.stop();
info!("test_trunk_b2bua_inbound_caller_hangup_rtp_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_inbound_user_hangup_rtp_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let callee_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_port = callee_receiver.port()?;
let (call, _caller_ua, callee_ua) =
establish_call(&server, "alice", "bob", caller_port, callee_port).await?;
let (caller_stats, callee_stats) = exchange_rtp(
&caller_sender,
&callee_sender,
&caller_receiver,
&callee_receiver,
call.caller_target,
call.callee_target,
0,
1500,
)
.await?;
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive RTP"
);
callee_ua.hangup(&call.callee_id).await?;
wait_for_cdr(&server, 800).await?;
let records = server.cdr_capture.get_all_records().await;
let record = &records[0];
assert_eq!(record.details.status, "completed");
assert!(
matches!(record.hangup_reason, Some(CallRecordHangupReason::ByCallee)),
"Expected ByCallee, got {:?}",
record.hangup_reason
);
caller_receiver.stop();
callee_receiver.stop();
server.stop();
info!("test_trunk_b2bua_inbound_user_hangup_rtp_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_reject_486_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = Arc::new(server.create_ua("bob").await?);
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle =
crate::utils::spawn(async move { alice_clone.make_call("bob", Some(sdp_clone)).await });
let mut bob_rejected = false;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob.reject_call_with_reason(&id, Some(486), Some("Busy Here".to_string()))
.await?;
bob_rejected = true;
info!("Bob rejected with 486 Busy Here");
break;
}
}
if bob_rejected {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_rejected, "Bob should receive the call");
let call_result = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
match call_result {
Ok(Ok(Err(e))) => {
let err_str = e.to_string();
assert!(
err_str.contains("486"),
"Alice should get 486, got: {}",
err_str
);
}
Ok(Ok(Ok(_))) => panic!("Call should have been rejected"),
_ => {}
}
sleep(Duration::from_millis(1500)).await;
let records = server.cdr_capture.get_all_records().await;
if !records.is_empty() {
let record = &records[0];
info!(
status = %record.details.status,
hangup_reason = ?record.hangup_reason,
"Wholesale reject CDR"
);
assert_eq!(record.details.status, "failed");
assert!(
matches!(
record.hangup_reason,
Some(CallRecordHangupReason::Rejected) | Some(CallRecordHangupReason::Failed)
),
"Expected Rejected or Failed, got {:?}",
record.hangup_reason
);
} else {
warn!("No CDR for rejected wholesale call — 486 passthrough verified at SIP level");
}
server.stop();
info!("test_trunk_b2bua_reject_486_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_pcma_rtp_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let callee_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_port = callee_receiver.port()?;
let caller_ua = Arc::new(server.create_ua("alice").await?);
let callee_ua = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let caller_sdp = pcma_sdp("127.0.0.1", caller_port);
let callee_sdp = pcma_sdp("127.0.0.1", callee_port);
let caller_clone = caller_ua.clone();
let caller_handle =
crate::utils::spawn(async move { caller_clone.make_call("bob", Some(caller_sdp)).await });
let mut callee_dialog_id = None;
let mut callee_offer_sdp: Option<String> = None;
for _ in 0..50 {
let events = callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
callee_dialog_id = Some(id.clone());
callee_offer_sdp = offer;
callee_ua.answer_call(&id, Some(callee_sdp.clone())).await?;
break;
}
}
if callee_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _callee_id = callee_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let caller_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let caller_answer = caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow::anyhow!("No answer SDP"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow::anyhow!("No offer SDP"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow::anyhow!("No callee endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer)
.ok_or_else(|| anyhow::anyhow!("No caller endpoint"))?;
let (caller_stats, callee_stats) = exchange_rtp(
&caller_sender,
&callee_sender,
&caller_receiver,
&callee_receiver,
caller_target,
callee_target,
8,
2000,
)
.await?;
info!(
caller_received = caller_stats.packets_received,
caller_pts = ?caller_stats.payload_types,
callee_received = callee_stats.packets_received,
callee_pts = ?callee_stats.payload_types,
"PCMA wholesale results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive PCMA RTP"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive PCMA RTP"
);
assert!(
callee_stats.payload_types.contains(&8),
"Callee should see PCMA (PT 8), got {:?}",
callee_stats.payload_types
);
assert!(
caller_stats.payload_types.contains(&8),
"Caller should see PCMA (PT 8), got {:?}",
caller_stats.payload_types
);
caller_ua.hangup(&caller_id).await?;
wait_for_cdr(&server, 800).await?;
let records = server.cdr_capture.get_all_records().await;
let record = &records[0];
assert_eq!(record.details.status, "completed");
assert!(matches!(
record.hangup_reason,
Some(CallRecordHangupReason::ByCaller)
));
caller_receiver.stop();
callee_receiver.stop();
server.stop();
info!("test_trunk_b2bua_pcma_rtp_cdr PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_cdr_duration_accuracy() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle =
crate::utils::spawn(async move { alice_clone.make_call("bob", Some(sdp_clone)).await });
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _bob_id = bob_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
sleep(Duration::from_secs(2)).await;
alice.hangup(&alice_id).await?;
sleep(Duration::from_millis(800)).await;
let records = server.cdr_capture.get_all_records().await;
assert!(!records.is_empty(), "Should have CDR");
let record = &records[0];
let duration_secs = (record.end_time - record.start_time).num_seconds();
info!(duration_secs, status = %record.details.status, "CDR duration");
assert!(
(1..=5).contains(&duration_secs),
"Duration should be ~2s, got {}s",
duration_secs
);
assert_eq!(record.details.status, "completed");
assert!(matches!(
record.hangup_reason,
Some(CallRecordHangupReason::ByCaller)
));
server.stop();
info!("test_trunk_b2bua_cdr_duration_accuracy PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_rtp_payload_integrity() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let callee_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_port = callee_receiver.port()?;
let caller_ua = Arc::new(server.create_ua("alice").await?);
let callee_ua = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let caller_clone = caller_ua.clone();
let caller_handle =
crate::utils::spawn(async move { caller_clone.make_call("bob", Some(caller_sdp)).await });
let mut callee_dialog_id = None;
let mut callee_offer_sdp: Option<String> = None;
for _ in 0..50 {
let events = callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
callee_dialog_id = Some(id.clone());
callee_offer_sdp = offer;
callee_ua.answer_call(&id, Some(callee_sdp.clone())).await?;
break;
}
}
if callee_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _callee_id = callee_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let caller_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let caller_answer = caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow::anyhow!("No answer SDP"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow::anyhow!("No offer SDP"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow::anyhow!("No callee endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer)
.ok_or_else(|| anyhow::anyhow!("No caller endpoint"))?;
caller_receiver.start_receiving();
callee_receiver.start_receiving();
let dummy_packets = RtpPacket::create_sequence(50, 7000, 80000, 0xBBBBBBBB, 0, 160, 160);
callee_sender.start_sending(caller_target, dummy_packets, 20);
sleep(Duration::from_millis(200)).await;
let mut test_packets = Vec::new();
for i in 0..50u16 {
let mut payload = vec![0u8; 160];
payload[0] = (i >> 8) as u8;
payload[1] = (i & 0xFF) as u8;
payload[2] = 0xDE;
payload[3] = 0xAD;
for (j, byte) in payload.iter_mut().enumerate().skip(4) {
*byte = ((i as u8).wrapping_add(j as u8)) ^ 0x55;
}
test_packets.push(RtpPacket::new(
0,
5000 + i,
100000 + (i as u32) * 160,
0xCAFEBABE,
payload,
));
}
caller_sender.start_sending(callee_target, test_packets, 20);
sleep(Duration::from_millis(1500)).await;
caller_sender.stop();
callee_sender.stop();
sleep(Duration::from_millis(200)).await;
let callee_stats = callee_receiver.get_stats().await;
info!(
received = callee_stats.packets_received,
pts = ?callee_stats.payload_types,
ssrcs = ?callee_stats.ssrcs,
"Payload integrity results"
);
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP through proxy"
);
assert!(
callee_stats.payload_types.contains(&0),
"Callee should see PT 0 (PCMU), got {:?}",
callee_stats.payload_types
);
if !callee_stats.ssrcs.contains(&0xCAFEBABE) {
warn!(
"Proxy rewrote SSRC: expected 0xCAFEBABE, got {:?} (expected for B2BUA)",
callee_stats.ssrcs
);
}
caller_ua.hangup(&caller_id).await.ok();
wait_for_cdr(&server, 800).await?;
let records = server.cdr_capture.get_all_records().await;
let record = &records[0];
assert_eq!(record.details.status, "completed");
caller_receiver.stop();
callee_receiver.stop();
server.stop();
info!("test_trunk_b2bua_rtp_payload_integrity PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_two_concurrent_calls() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server1 = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let server2 = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice1 = Arc::new(server1.create_ua("alice").await?);
let bob1 = server1.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice1_c = alice1.clone();
let sdp1 = sdp.clone();
let h1 = crate::utils::spawn(async move { alice1_c.make_call("bob", Some(sdp1)).await });
let mut bob1_id = None;
for _ in 0..50 {
let events = bob1.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob1_id = Some(id.clone());
bob1.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob1_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice1_id = tokio::time::timeout(Duration::from_secs(5), h1)
.await
.map_err(|_| anyhow::anyhow!("Call 1 timeout"))?
.map_err(|e| anyhow::anyhow!("Call 1 join: {}", e))?
.map_err(|e| anyhow::anyhow!("Call 1 failed: {}", e))?;
let alice2 = Arc::new(server2.create_ua("alice").await?);
let bob2 = server2.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let alice2_c = alice2.clone();
let sdp2 = sdp.clone();
let h2 = crate::utils::spawn(async move { alice2_c.make_call("bob", Some(sdp2)).await });
let mut bob2_id = None;
for _ in 0..50 {
let events = bob2.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob2_id = Some(id.clone());
bob2.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob2_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _alice2_id = tokio::time::timeout(Duration::from_secs(5), h2)
.await
.map_err(|_| anyhow::anyhow!("Call 2 timeout"))?
.map_err(|e| anyhow::anyhow!("Call 2 join: {}", e))?
.map_err(|e| anyhow::anyhow!("Call 2 failed: {}", e))?;
sleep(Duration::from_millis(300)).await;
alice1.hangup(&alice1_id).await?;
if let Some(ref id) = bob2_id {
bob2.hangup(id).await?;
}
sleep(Duration::from_millis(800)).await;
let records1 = server1.cdr_capture.get_all_records().await;
assert!(!records1.is_empty(), "Server 1 should have CDR");
assert_eq!(
records1[0].hangup_reason,
Some(CallRecordHangupReason::ByCaller),
"Call 1 should be ByCaller"
);
let records2 = server2.cdr_capture.get_all_records().await;
assert!(!records2.is_empty(), "Server 2 should have CDR");
assert_eq!(
records2[0].hangup_reason,
Some(CallRecordHangupReason::ByCallee),
"Call 2 should be ByCallee"
);
server1.stop();
server2.stop();
info!("test_trunk_b2bua_two_concurrent_calls PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_no_answer() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = Arc::new(server.create_ua("bob").await?);
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle = crate::utils::spawn(async move {
tokio::time::timeout(
Duration::from_secs(3),
alice_clone.make_call("bob", Some(sdp_clone)),
)
.await
});
let mut bob_received = false;
for _ in 0..30 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_received = true;
info!("Bob received INVITE but won't answer");
let _ = id;
}
}
if bob_received {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _ = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
sleep(Duration::from_millis(800)).await;
let records = server.cdr_capture.get_all_records().await;
if !records.is_empty() {
let record = &records[0];
info!(status = %record.details.status, "No-answer CDR");
assert_ne!(record.details.status, "completed");
}
server.stop();
info!("test_trunk_b2bua_no_answer PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_dtmf_info_passthrough() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle =
crate::utils::spawn(async move { alice_clone.make_call("bob", Some(sdp_clone)).await });
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _bob_id = bob_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let digits = ["1", "2", "3", "#"];
let mut received_digits: Vec<String> = Vec::new();
for digit in &digits {
if send_dtmf_and_wait(
alice.clone(),
alice_id.clone(),
&bob,
digit,
Duration::from_secs(3),
)
.await?
{
info!(digit, "Bob received DTMF INFO");
received_digits.push((*digit).to_string());
}
}
info!(
sent = ?digits,
received = ?received_digits,
"DTMF passthrough results"
);
assert!(
!received_digits.is_empty(),
"Bob should have received at least one DTMF INFO through the proxy"
);
alice.hangup(&alice_id).await?;
sleep(Duration::from_millis(800)).await;
let records = server.cdr_capture.get_all_records().await;
if !records.is_empty() {
assert_eq!(records[0].details.status, "completed");
}
server.stop();
info!("test_trunk_b2bua_dtmf_info_passthrough PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_early_media_183() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let callee_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_port = callee_receiver.port()?;
let caller_ua = Arc::new(server.create_ua("alice").await?);
let callee_ua = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let caller_clone = caller_ua.clone();
let caller_handle =
crate::utils::spawn(async move { caller_clone.make_call("bob", Some(caller_sdp)).await });
let mut callee_dialog_id = None;
let mut callee_offer_sdp: Option<String> = None;
for _ in 0..50 {
let events = callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
callee_dialog_id = Some(id.clone());
callee_offer_sdp = offer;
callee_ua.answer_call(&id, Some(callee_sdp.clone())).await?;
break;
}
}
if callee_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _callee_id = callee_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let caller_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let caller_answer = caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow::anyhow!("No answer SDP"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow::anyhow!("No offer SDP"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow::anyhow!("No callee endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer)
.ok_or_else(|| anyhow::anyhow!("No caller endpoint"))?;
let (caller_stats, callee_stats) = exchange_rtp(
&caller_sender,
&callee_sender,
&caller_receiver,
&callee_receiver,
caller_target,
callee_target,
0,
1500,
)
.await?;
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP"
);
assert!(
caller_stats.packets_received > 0,
"Caller should receive RTP"
);
caller_ua.hangup(&caller_id).await?;
wait_for_cdr(&server, 800).await?;
let records = server.cdr_capture.get_all_records().await;
assert_eq!(records[0].details.status, "completed");
caller_receiver.stop();
callee_receiver.stop();
server.stop();
info!("test_trunk_b2bua_early_media_183 PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_basic_call_cdr_roundtrip() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let sdp = pcmu_sdp("127.0.0.1", 12345);
let alice_clone = alice.clone();
let sdp_clone = sdp.clone();
let caller_handle =
crate::utils::spawn(async move { alice_clone.make_call("bob", Some(sdp_clone)).await });
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _bob_id = bob_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
sleep(Duration::from_millis(500)).await;
alice.hangup(&alice_id).await?;
sleep(Duration::from_millis(800)).await;
let records = server.cdr_capture.get_all_records().await;
assert!(!records.is_empty(), "Should have CDR");
assert_eq!(records[0].details.status, "completed");
server.stop();
info!("test_trunk_b2bua_options_keepalive PASSED");
Ok(())
}
#[tokio::test]
async fn test_trunk_b2bua_mid_call_reinvite() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let callee_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_port = callee_receiver.port()?;
let caller_ua = Arc::new(server.create_ua("alice").await?);
let callee_ua = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let caller_sdp = pcmu_sdp("127.0.0.1", caller_port);
let callee_sdp = pcmu_sdp("127.0.0.1", callee_port);
let caller_clone = caller_ua.clone();
let caller_handle =
crate::utils::spawn(async move { caller_clone.make_call("bob", Some(caller_sdp)).await });
let mut callee_dialog_id = None;
let mut callee_offer_sdp: Option<String> = None;
for _ in 0..50 {
let events = callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
callee_dialog_id = Some(id.clone());
callee_offer_sdp = offer;
callee_ua.answer_call(&id, Some(callee_sdp.clone())).await?;
break;
}
}
if callee_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _callee_id = callee_dialog_id.ok_or_else(|| anyhow::anyhow!("No INVITE"))?;
let caller_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))?
.map_err(|e| anyhow::anyhow!("join: {}", e))?
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let caller_answer = caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow::anyhow!("No answer SDP"))?;
let callee_offer = callee_offer_sdp.ok_or_else(|| anyhow::anyhow!("No offer SDP"))?;
let callee_target = extract_media_endpoint(&callee_offer)
.ok_or_else(|| anyhow::anyhow!("No callee endpoint"))?;
let caller_target = extract_media_endpoint(&caller_answer)
.ok_or_else(|| anyhow::anyhow!("No caller endpoint"))?;
let (_caller_stats, callee_stats) = exchange_rtp(
&caller_sender,
&callee_sender,
&caller_receiver,
&callee_receiver,
caller_target,
callee_target,
0,
1000,
)
.await?;
assert!(
callee_stats.packets_received > 0,
"Callee should receive RTP before re-INVITE"
);
let pcma_sdp_offer = pcma_sdp("127.0.0.1", caller_port);
let reinvite_result = caller_ua
.send_reinvite(&caller_id, Some(pcma_sdp_offer))
.await;
if let Ok(Some(_answer)) = reinvite_result {
info!("re-INVITE succeeded, codec changed to PCMA");
let (_caller_stats2, callee_stats2) = exchange_rtp(
&caller_sender,
&callee_sender,
&caller_receiver,
&callee_receiver,
caller_target,
callee_target,
8,
1000,
)
.await?;
assert!(
callee_stats2.packets_received > 0 || callee_stats.packets_received > 0,
"Callee should receive RTP after re-INVITE"
);
} else {
info!(
"re-INVITE was not completed (proxy may not support mid-call codec change in this mode)"
);
}
caller_ua.hangup(&caller_id).await?;
wait_for_cdr(&server, 800).await?;
let records = server.cdr_capture.get_all_records().await;
assert_eq!(records[0].details.status, "completed");
caller_receiver.stop();
callee_receiver.stop();
server.stop();
info!("test_trunk_b2bua_mid_call_reinvite PASSED");
Ok(())
}