use super::cdr_capture::CdrExpectation;
use super::e2e_test_server::E2eTestServer;
use super::rtp_utils::{RtpPacket, extract_media_endpoint};
use super::test_ua::TestUaEvent;
use crate::callrecord::CallRecordHangupReason;
use crate::config::MediaProxyMode;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
#[tokio::test]
async fn test_rtp_to_rtp_same_codec_no_transcoding() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::Auto).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let alice_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 RTP/AVP 0 101\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n"
.to_string();
let bob_sdp = "v=0\r\n\
o=- 789012 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 54321 RTP/AVP 0 101\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n"
.to_string();
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = alice_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
let mut _answered_sdp = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
_answered_sdp = Some(bob_sdp.clone());
info!("Bob answered call");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_dialog_id.is_some(), "Bob should receive the call");
let alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => {
info!("Call established: {}", id);
Some(id)
}
Ok(Ok(Err(e))) => {
warn!("Call failed: {:?}", e);
None
}
_ => {
warn!("Call timed out");
None
}
};
assert!(alice_dialog_id.is_some(), "Call should be established");
let session_id = server.wait_for_active_call(Duration::from_secs(3)).await;
assert!(session_id.is_some(), "Call should be in registry");
sleep(Duration::from_millis(500)).await;
if let Some(ref id) = alice_dialog_id {
alice.hangup(id).await.ok();
}
sleep(Duration::from_millis(500)).await;
let call_id = format!("{}-alice-127.0.0.1", alice_dialog_id.unwrap());
let _cdr = server.cdr_capture.find_by_call_id(&call_id).await;
let all_records = server.cdr_capture.get_all_records().await;
assert!(
!all_records.is_empty(),
"Should have at least one CDR record"
);
let record = &all_records[0];
let expected = CdrExpectation::default()
.with_caller("alice")
.with_callee("bob")
.with_hangup_reason(CallRecordHangupReason::ByCaller)
.with_recording(false);
let result = super::cdr_capture::validate_cdr(record, &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
server.stop();
info!("Test completed successfully");
Ok(())
}
#[tokio::test]
async fn test_webrtc_to_rtp_with_transcoding() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let webrtc_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 UDP/TLS/RTP/SAVPF 111 101\r\n\
a=rtpmap:111 opus/48000/2\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=fingerprint:sha-256 00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00\r\n\
a=setup:actpass\r\n\
a=mid:0\r\n\
a=sendrecv\r\n\
a=rtcp-mux\r\n".to_string();
let rtp_sdp = "v=0\r\n\
o=- 789012 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 54321 RTP/AVP 0 101\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n"
.to_string();
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = webrtc_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut received_offer_sdp: Option<String> = None;
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, Some(sdp)) = event {
info!("Bob received INVITE with SDP:\n{}", sdp);
received_offer_sdp = Some(sdp);
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(rtp_sdp.clone())).await?;
info!("Bob answered with PCMU");
break;
} else if let TestUaEvent::IncomingCall(id, None) = event {
warn!("Bob received INVITE without SDP body");
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(rtp_sdp.clone())).await?;
break;
}
}
if received_offer_sdp.is_some() || bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_dialog_id.is_some(), "Bob should receive the call");
if let Some(ref received_sdp) = received_offer_sdp {
info!("Analyzing SDP for WebRTC -> RTP bridging...");
let has_savpf = received_sdp.contains("UDP/TLS/RTP/SAVPF");
let has_opus = received_sdp.contains("opus/48000");
let has_fingerprint = received_sdp.contains("a=fingerprint:");
let has_pcmu = received_sdp.contains("PCMU/8000") || received_sdp.contains("a=rtpmap:0");
if has_savpf || has_opus || has_fingerprint {
info!("âš SDP NOT transformed - Bob received original WebRTC SDP:");
info!(
" - SAVPF protocol: {}",
if has_savpf { "present" } else { "removed" }
);
info!(
" - Opus codec: {}",
if has_opus { "present" } else { "removed" }
);
info!(
" - WebRTC fingerprint: {}",
if has_fingerprint {
"present"
} else {
"removed"
}
);
info!(
" - PCMU codec: {}",
if has_pcmu { "present" } else { "missing" }
);
info!("Note: Full SDP transformation may require MediaPeer activation");
} else {
info!("✓ SDP appears to be transformed for RTP");
info!(" - PCMU codec: present");
}
} else {
warn!("Could not analyze SDP - no SDP body received");
}
let _alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => Some(id),
_ => None,
};
sleep(Duration::from_millis(500)).await;
let calls = server.get_active_calls();
if let Some(call) = calls.first() {
info!(session_id = %call.session_id, "Active call found in registry");
}
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
if !all_records.is_empty() {
info!("✓ CDR record generated");
} else {
warn!("âš CDR not yet generated (may need more time or specific config)");
}
server.stop();
info!("WebRTC to RTP test completed - full proxy flow verified");
Ok(())
}
#[tokio::test]
async fn test_forced_media_proxy_all_mode() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let alice_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 10.0.0.1\r\n\
s=-\r\n\
c=IN IP4 10.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n"
.to_string();
let bob_sdp = "v=0\r\n\
o=- 789012 789012 IN IP4 10.0.0.2\r\n\
s=-\r\n\
c=IN IP4 10.0.0.2\r\n\
t=0 0\r\n\
m=audio 54321 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n"
.to_string();
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = alice_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
info!("Bob answered");
break;
}
}
sleep(Duration::from_millis(100)).await;
}
let _ = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
server.stop();
info!("Forced MediaProxy test completed");
Ok(())
}
#[tokio::test]
async fn test_caller_hangup_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start().await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let dummy_sdp = super::test_ua::create_test_sdp("127.0.0.1", 12345, false);
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = dummy_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(dummy_sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => Some(id),
_ => None,
};
assert!(alice_dialog_id.is_some());
sleep(Duration::from_secs(2)).await;
alice.hangup(&alice_dialog_id.unwrap()).await?;
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
assert!(!all_records.is_empty(), "Should have CDR record");
let record = &all_records[0];
assert!(
matches!(record.hangup_reason, Some(CallRecordHangupReason::ByCaller)),
"Hangup reason should be ByCaller, got {:?}",
record.hangup_reason
);
let duration = (record.end_time - record.start_time).num_seconds();
assert!(
(1..=4).contains(&duration),
"Duration should be ~2 seconds, got {} seconds",
duration
);
let expected = CdrExpectation::default().with_recording(false);
let result = super::cdr_capture::validate_cdr(record, &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
server.stop();
info!("Caller hangup CDR test completed");
Ok(())
}
#[tokio::test]
async fn test_rtp_data_integrity() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let test_ssrc = 0x12345678u32;
let test_pt = 0u8;
let packets = RtpPacket::create_sequence(
100, 1000, 50000, test_ssrc, test_pt, 160, 160, );
assert_eq!(packets.len(), 100);
for (i, packet) in packets.iter().enumerate() {
assert_eq!(packet.ssrc, test_ssrc);
assert_eq!(packet.payload_type, test_pt);
assert_eq!(packet.sequence_number, 1000 + i as u16);
assert_eq!(packet.timestamp, 50000 + (i as u32) * 160);
}
for packet in &packets {
let encoded = packet.encode();
let decoded = RtpPacket::decode(&encoded).unwrap();
assert_eq!(decoded.ssrc, packet.ssrc);
assert_eq!(decoded.sequence_number, packet.sequence_number);
assert_eq!(decoded.timestamp, packet.timestamp);
}
info!("RTP data integrity test completed");
Ok(())
}
#[tokio::test]
async fn test_media_proxy_none_mode() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::None).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let alice_sdp = super::test_ua::create_test_sdp("192.168.1.100", 10000, false);
let bob_sdp = super::test_ua::create_test_sdp("192.168.1.200", 20000, false);
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = alice_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_offer_sdp = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
bob_offer_sdp = offer;
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
break;
}
}
sleep(Duration::from_millis(100)).await;
}
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))??
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let bob_offer = bob_offer_sdp.ok_or_else(|| anyhow::anyhow!("No offer SDP on callee"))?;
let alice_answer = alice
.get_negotiated_answer_sdp(&alice_id)
.await
.ok_or_else(|| anyhow::anyhow!("No answer SDP on caller"))?;
assert_eq!(
extract_media_endpoint(&bob_offer),
extract_media_endpoint(&alice_sdp),
"None mode should pass caller SDP to callee without anchoring",
);
assert_eq!(
extract_media_endpoint(&alice_answer),
extract_media_endpoint(&bob_sdp),
"None mode should pass callee SDP to caller without anchoring",
);
server.stop();
info!("MediaProxy None mode test completed");
Ok(())
}
#[tokio::test]
async fn test_mid_dialog_passthrough_none_mode() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::None).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let initial_sdp = super::test_ua::create_test_sdp("127.0.0.1", 10000, false);
let answer_sdp = super::test_ua::create_test_sdp("127.0.0.1", 20000, false);
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = initial_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(answer_sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("timeout"))??
.map_err(|e| anyhow::anyhow!("call: {}", e))?;
let bob_id = bob_dialog_id.ok_or_else(|| anyhow::anyhow!("Bob did not receive the call"))?;
let update_sdp = "v=0\r\n\
o=- 123456 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 10000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendonly\r\n"
.to_string();
let update_handle = crate::utils::spawn({
let a = alice.clone();
let id = alice_id.clone();
let sdp = update_sdp.clone();
async move { a.send_update(&id, Some(sdp)).await }
});
let mut saw_update = false;
for _ in 0..30 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::CallUpdated(id, method, Some(sdp)) = event {
if id == bob_id && method == rsipstack::sip::Method::Update {
assert_eq!(sdp, update_sdp, "UPDATE SDP should be relayed unchanged");
saw_update = true;
break;
}
}
}
if saw_update {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(
saw_update,
"Bob should receive the relayed UPDATE in None mode"
);
let update_answer = tokio::time::timeout(Duration::from_secs(5), update_handle)
.await
.map_err(|_| anyhow::anyhow!("UPDATE timeout"))??
.map_err(|e| anyhow::anyhow!("UPDATE failed: {}", e))?;
assert_eq!(
update_answer,
Some(answer_sdp.clone()),
"Caller should receive callee SDP answer for UPDATE in None mode",
);
let reinvite_sdp = "v=0\r\n\
o=- 123456 789013 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 10000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n"
.to_string();
let reinvite_handle = crate::utils::spawn({
let a = alice.clone();
let id = alice_id.clone();
let sdp = reinvite_sdp.clone();
async move { a.send_reinvite(&id, Some(sdp)).await }
});
let mut saw_reinvite = false;
for _ in 0..30 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::CallUpdated(id, method, Some(sdp)) = event {
if id == bob_id && method == rsipstack::sip::Method::Invite {
assert_eq!(
sdp, reinvite_sdp,
"re-INVITE SDP should be relayed unchanged"
);
saw_reinvite = true;
break;
}
}
}
if saw_reinvite {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(
saw_reinvite,
"Bob should receive the relayed re-INVITE in None mode",
);
let reinvite_answer = tokio::time::timeout(Duration::from_secs(5), reinvite_handle)
.await
.map_err(|_| anyhow::anyhow!("re-INVITE timeout"))??
.map_err(|e| anyhow::anyhow!("re-INVITE failed: {}", e))?;
assert_eq!(
reinvite_answer,
Some(answer_sdp.clone()),
"Caller should receive callee SDP answer for re-INVITE in None mode",
);
alice.hangup(&alice_id).await?;
server.stop();
Ok(())
}
#[tokio::test]
async fn test_callee_hangup_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start().await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let dummy_sdp = super::test_ua::create_test_sdp("127.0.0.1", 12345, false);
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = dummy_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(dummy_sdp.clone())).await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _ = tokio::time::timeout(Duration::from_secs(5), caller_handle).await;
sleep(Duration::from_millis(500)).await;
if let Some(ref id) = bob_dialog_id {
bob.hangup(id).await?;
}
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
if !all_records.is_empty() {
let record = &all_records[0];
info!(hangup_reason = ?record.hangup_reason, "CDR hangup reason");
let expected = CdrExpectation::default().with_recording(false);
let result = super::cdr_capture::validate_cdr(record, &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
}
server.stop();
info!("Callee hangup CDR test completed");
Ok(())
}
#[tokio::test]
async fn test_multiple_calls_cdr() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start().await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
let charlie = server.create_ua("charlie").await?;
sleep(Duration::from_millis(100)).await;
let dummy_sdp = super::test_ua::create_test_sdp("127.0.0.1", 12345, false);
let handle1 = crate::utils::spawn({
let a = alice.clone();
let sdp = dummy_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id1 = None;
for _ in 0..30 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id1 = Some(id.clone());
bob.answer_call(&id, Some(dummy_sdp.clone())).await?;
break;
}
}
if bob_dialog_id1.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let _alice_dialog_id1 = match tokio::time::timeout(Duration::from_secs(3), handle1).await {
Ok(Ok(Ok(id))) => Some(id),
_ => None,
};
let handle2 = crate::utils::spawn({
let a = alice.clone();
let sdp = dummy_sdp.clone();
async move { a.make_call("charlie", Some(sdp)).await }
});
let mut charlie_dialog_id = None;
for _ in 0..30 {
let events = charlie.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
charlie_dialog_id = Some(id.clone());
charlie.answer_call(&id, Some(dummy_sdp.clone())).await?;
break;
}
}
if charlie_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_dialog_id2 = match tokio::time::timeout(Duration::from_secs(3), handle2).await {
Ok(Ok(Ok(id))) => Some(id),
_ => None,
};
sleep(Duration::from_millis(300)).await;
if let Some(ref id) = bob_dialog_id1 {
bob.hangup(id).await?;
}
sleep(Duration::from_millis(200)).await;
if let Some(ref id) = alice_dialog_id2 {
alice.hangup(id).await?;
}
sleep(Duration::from_millis(800)).await;
let all_records = server.cdr_capture.get_all_records().await;
info!(record_count = all_records.len(), "CDR records captured");
assert!(
!all_records.is_empty(),
"Should have at least 1 CDR record, got {}",
all_records.len()
);
for (i, record) in all_records.iter().enumerate() {
info!(
index = i,
call_id = %record.call_id,
hangup_reason = ?record.hangup_reason,
"CDR record"
);
assert!(
matches!(
record.hangup_reason,
Some(CallRecordHangupReason::ByCaller)
| Some(CallRecordHangupReason::ByCallee)
| Some(CallRecordHangupReason::BySystem)
),
"CDR should have valid hangup reason"
);
let expected = CdrExpectation::default().with_recording(false);
let result = super::cdr_capture::validate_cdr(record, &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
}
server.stop();
info!("Multiple calls CDR test completed successfully");
Ok(())
}
#[tokio::test]
async fn test_reinvite_hold_resume() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let initial_sdp = super::test_ua::create_test_sdp("127.0.0.1", 10000, false);
let answer_sdp = super::test_ua::create_test_sdp("127.0.0.1", 20000, false);
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = initial_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(answer_sdp.clone())).await?;
info!("Bob answered call");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => {
info!("Call established: {}", id);
Some(id)
}
_ => None,
};
assert!(alice_dialog_id.is_some(), "Call should be established");
assert!(bob_dialog_id.is_some(), "Bob should have dialog");
let alice_id = alice_dialog_id.unwrap();
let _bob_id = bob_dialog_id.unwrap();
sleep(Duration::from_millis(300)).await;
info!("=== Testing call hold (re-INVITE with sendonly) ===");
let hold_sdp = "v=0\r\n\
o=- 123456 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 10000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendonly\r\n"
.to_string();
let hold_result = alice.send_reinvite(&alice_id, Some(hold_sdp)).await;
info!("Alice re-INVITE (hold) result: {:?}", hold_result);
assert!(hold_result.is_ok(), "Hold re-INVITE should succeed");
assert!(
hold_result.as_ref().unwrap().is_some(),
"Hold re-INVITE should return SDP answer"
);
sleep(Duration::from_millis(300)).await;
info!("=== Testing call resume (re-INVITE with sendrecv) ===");
let resume_sdp = "v=0\r\n\
o=- 123456 789013 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 10000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n"
.to_string();
let resume_result = alice.send_reinvite(&alice_id, Some(resume_sdp)).await;
info!("Alice re-INVITE (resume) result: {:?}", resume_result);
assert!(resume_result.is_ok(), "Resume re-INVITE should succeed");
assert!(
resume_result.as_ref().unwrap().is_some(),
"Resume re-INVITE should return SDP answer"
);
sleep(Duration::from_millis(300)).await;
alice.hangup(&alice_id).await.ok();
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
assert!(!all_records.is_empty(), "CDR should be generated");
if let Some(record) = all_records.first() {
info!(
duration_secs = (record.end_time - record.start_time).num_seconds(),
hangup_reason = ?record.hangup_reason,
"CDR record"
);
assert_eq!(
record.hangup_reason,
Some(CallRecordHangupReason::ByCaller),
"Hangup reason should be ByCaller"
);
let expected = CdrExpectation::default().with_recording(false);
let result = super::cdr_capture::validate_cdr(record, &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
}
server.stop();
info!("re-INVITE hold/resume test completed successfully");
Ok(())
}
#[tokio::test]
async fn test_cancel_during_ringing() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start().await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let offer_sdp = super::test_ua::create_test_sdp("127.0.0.1", 10000, false);
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = offer_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_received_call = false;
let mut bob_dialog_id = None;
for _ in 0..30 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_received_call = true;
bob_dialog_id = Some(id);
info!("Bob received incoming call (ringing)");
break;
}
if let TestUaEvent::CallRinging(_) = event {
info!("Bob is ringing");
}
}
if bob_received_call {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_received_call, "Bob should receive the call");
sleep(Duration::from_millis(200)).await;
info!("Alice canceling the call...");
if let Some(ref id) = bob_dialog_id {
alice.hangup(id).await.ok();
}
sleep(Duration::from_millis(500)).await;
let call_result = tokio::time::timeout(Duration::from_secs(3), caller_handle).await;
info!("Call result after cancel: {:?}", call_result.is_ok());
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
if !all_records.is_empty() {
let record = &all_records[0];
info!(
status = %record.details.status,
hangup_reason = ?record.hangup_reason,
"CDR record for canceled call"
);
info!("✓ CDR generated for canceled call");
} else {
info!("âš No CDR generated for canceled call (may be expected for early cancel)");
}
server.stop();
info!("CANCEL during ringing test completed");
Ok(())
}
#[tokio::test]
async fn test_reinvite_codec_change() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let pcmu_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 10000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n"
.to_string();
let pcmu_answer = "v=0\r\n\
o=- 789012 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 20000 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n"
.to_string();
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = pcmu_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(pcmu_answer.clone())).await?;
info!("Bob answered with PCMU");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => Some(id),
_ => None,
};
assert!(alice_dialog_id.is_some(), "Call should be established");
let alice_id = alice_dialog_id.unwrap();
let _bob_id = bob_dialog_id.unwrap();
sleep(Duration::from_millis(300)).await;
info!("=== Testing codec change (PCMU -> PCMA) ===");
let pcma_sdp = "v=0\r\n\
o=- 123456 789013 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 10000 RTP/AVP 8\r\n\
a=rtpmap:8 PCMA/8000\r\n\
a=sendrecv\r\n"
.to_string();
let codec_change_result = alice.send_reinvite(&alice_id, Some(pcma_sdp)).await;
info!("Codec change re-INVITE result: {:?}", codec_change_result);
assert!(
codec_change_result.is_ok(),
"Codec change re-INVITE should succeed"
);
assert!(
codec_change_result.as_ref().unwrap().is_some(),
"Codec change re-INVITE should return SDP answer"
);
sleep(Duration::from_millis(200)).await;
alice.hangup(&alice_id).await.ok();
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
assert!(!all_records.is_empty(), "CDR should be generated");
let expected = CdrExpectation::default().with_recording(false);
let result = super::cdr_capture::validate_cdr(&all_records[0], &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
server.stop();
info!("re-INVITE codec change test completed");
Ok(())
}
#[tokio::test]
async fn test_auto_start_recording_creates_file() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
use crate::config::{MediaProxyMode, ProxyConfig, RecordingPolicy};
let record_dir = tempfile::tempdir()?;
let record_path = record_dir.path().to_string_lossy().to_string();
let proxy_config = ProxyConfig {
media_proxy: MediaProxyMode::All,
recording: Some(RecordingPolicy {
enabled: Some(true),
auto_start: Some(true),
path: Some(record_path.clone()),
..Default::default()
}),
..Default::default()
};
let server = Arc::new(E2eTestServer::start_with_config(proxy_config).await?);
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let pcmu_sdp = "v=0\r\n\
o=- 1 1 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\n\
m=audio 10000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\na=sendrecv\r\n"
.to_string();
let bob_sdp = "v=0\r\n\
o=- 2 2 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\n\
m=audio 10002 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\na=sendrecv\r\n"
.to_string();
let caller_handle = crate::utils::spawn({
let a = alice.clone();
let sdp = pcmu_sdp.clone();
async move { a.make_call("bob", Some(sdp)).await }
});
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
info!("Bob answered");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => Some(id),
_ => None,
};
assert!(alice_dialog_id.is_some(), "Call should be established");
sleep(Duration::from_millis(800)).await;
alice.hangup(alice_dialog_id.as_ref().unwrap()).await.ok();
sleep(Duration::from_millis(500)).await;
let all_records = server.cdr_capture.get_all_records().await;
assert!(!all_records.is_empty(), "CDR should be generated");
let record = &all_records[0];
let expected = CdrExpectation::default().with_recording(true);
let result = super::cdr_capture::validate_cdr(record, &expected);
assert!(
result.is_valid,
"CDR validation failed: {:?}",
result.errors
);
let media = &record.recorder[0];
assert!(!media.path.is_empty(), "Recorder path should not be empty");
assert!(
media.path.starts_with(&record_path),
"Recorder path should be under the configured record dir"
);
assert!(
std::path::Path::new(&media.path).exists(),
"Recorder WAV file must exist on disk: {}",
media.path
);
server.stop();
info!("auto_start recording test completed: path={}", media.path);
Ok(())
}