use super::e2e_test_server::E2eTestServer;
use super::test_ua::{TestUa, TestUaConfig, TestUaEvent};
use crate::config::MediaProxyMode;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
#[tokio::test]
async fn test_webrtc_to_rtp_media_proxy_auto() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = E2eTestServer::start_with_mode(MediaProxyMode::Auto).await?;
let proxy_addr = server.proxy_addr;
let mut alice_ua = TestUa::new(TestUaConfig {
username: "alice".to_string(),
password: "password123".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15061),
proxy_addr,
});
alice_ua.start().await?;
alice_ua.register().await?;
let alice = Arc::new(alice_ua);
let mut bob = TestUa::new(TestUaConfig {
username: "bob".to_string(),
password: "password456".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15062),
proxy_addr,
});
bob.start().await?;
bob.register().await?;
let webrtc_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 UDP/TLS/RTP/SAVPF 111\r\n\
a=rtpmap:111 opus/48000/2\r\n\
a=fingerprint:sha-256 00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00\r\n\
a=setup:actpass\r\n\
a=mid:0\r\n\
a=sendrecv\r\n\
a=rtcp-mux\r\n";
let caller_handle = tokio::spawn({
let a = alice.clone();
let sdp = webrtc_sdp.to_string();
async move { a.make_call("bob", Some(sdp)).await }
});
info!("Alice call initiated.");
let mut bob_dialog_id = None;
info!("Waiting for Bob to receive call...");
for i in 0..50 {
let events = bob.process_dialog_events().await.unwrap_or_default();
for event in events {
info!("Bob received event: {:?}", event);
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
bob.answer_call(&id, None).await?;
info!("Bob answered call");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
if i % 10 == 0 {
info!("Still waiting for Bob...");
}
}
assert!(bob_dialog_id.is_some(), "Bob should receive incoming call");
match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(alice_dialog_id))) => {
info!("Call established successfully");
sleep(Duration::from_millis(200)).await;
alice.hangup(&alice_dialog_id).await.ok();
}
Ok(Ok(Err(e))) => {
warn!("Call failed: {:?}", e);
}
Ok(Err(e)) => {
warn!("Caller task panicked: {:?}", e);
}
Err(_) => {
warn!("Caller timed out after 5 seconds");
}
}
Ok(())
}
#[tokio::test]
async fn test_codec_negotiation_optimization() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = E2eTestServer::start_with_mode(MediaProxyMode::Auto).await?;
let proxy_addr = server.proxy_addr;
let mut alice_ua = TestUa::new(TestUaConfig {
username: "alice".to_string(),
password: "password123".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15071),
proxy_addr,
});
alice_ua.start().await?;
alice_ua.register().await?;
let alice = Arc::new(alice_ua);
let mut bob = TestUa::new(TestUaConfig {
username: "bob".to_string(),
password: "password456".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15072),
proxy_addr,
});
bob.start().await?;
bob.register().await?;
let multi_codec_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 UDP/TLS/RTP/SAVPF 111 0\r\n\
a=rtpmap:111 opus/48000/2\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=fingerprint:sha-256 00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00\r\n\
a=setup:actpass\r\n\
a=mid:0\r\n\
a=sendrecv\r\n\
a=rtcp-mux\r\n";
let caller_handle = tokio::spawn({
let a = alice.clone();
let sdp = multi_codec_sdp.to_string();
async move { a.make_call("bob", Some(sdp)).await }
});
info!("Alice call initiated with multi-codec offer (Opus + PCMU)");
let mut bob_dialog_id = None;
for i in 0..50 {
let events = bob.process_dialog_events().await.unwrap_or_default();
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
let bob_pcmu_sdp = "v=0\r\n\
o=- 789012 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 54321 RTP/AVP 0\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=sendrecv\r\n";
bob.answer_call(&id, Some(bob_pcmu_sdp.to_string())).await?;
info!("Bob answered with PCMU codec");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
if i % 10 == 0 {
info!("Still waiting for Bob to receive call...");
}
}
assert!(bob_dialog_id.is_some(), "Bob should receive call");
match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(alice_dialog_id))) => {
info!("Call established successfully");
sleep(Duration::from_millis(200)).await;
alice.hangup(&alice_dialog_id).await.ok();
info!("Test completed - check logs for codec optimization messages");
}
Ok(Ok(Err(e))) => {
warn!("Call failed: {:?}", e);
}
Ok(Err(e)) => {
warn!("Caller task panicked: {:?}", e);
}
Err(_) => {
warn!("Caller timed out after 5 seconds");
}
}
Ok(())
}
#[tokio::test]
async fn test_webrtc_to_rtp_sdp_bridge() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = E2eTestServer::start_with_mode(MediaProxyMode::Auto).await?;
let proxy_addr = server.proxy_addr;
let mut alice_ua = TestUa::new(TestUaConfig {
username: "alice".to_string(),
password: "password123".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15081),
proxy_addr,
});
alice_ua.start().await?;
alice_ua.register().await?;
let alice = Arc::new(alice_ua);
let mut bob = TestUa::new(TestUaConfig {
username: "bob".to_string(),
password: "password456".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15082),
proxy_addr,
});
bob.start().await?;
bob.register().await?;
let webrtc_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 UDP/TLS/RTP/SAVPF 111 101\r\n\
a=rtpmap:111 opus/48000/2\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=fingerprint:sha-256 AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99\r\n\
a=setup:actpass\r\n\
a=ice-ufrag:alice\r\n\
a=ice-pwd:alicepwd\r\n\
a=mid:0\r\n\
a=sendrecv\r\n\
a=rtcp-mux\r\n";
let caller_handle = tokio::spawn({
let a = alice.clone();
let sdp = webrtc_sdp.to_string();
async move { a.make_call("bob", Some(sdp)).await }
});
info!("Alice (WebRTC) initiated call to Bob (RTP)");
let mut bob_dialog_id = None;
let mut _received_sdp: Option<String> = None;
for i in 0..50 {
let events = bob.process_dialog_events().await.unwrap_or_default();
for event in events {
if let TestUaEvent::IncomingCall(id, sdp) = event {
bob_dialog_id = Some(id.clone());
_received_sdp = sdp.clone();
if let Some(ref sdp_str) = sdp {
info!("Bob received SDP:\n{}", sdp_str);
assert!(
!sdp_str.contains("SAVPF"),
"SAVPF should be removed from RTP SDP"
);
assert!(
!sdp_str.contains("fingerprint"),
"DTLS fingerprint should be removed"
);
assert!(
!sdp_str.contains("ice-ufrag"),
"ICE ufrag should be removed"
);
assert!(sdp_str.contains("RTP/AVP"), "Should use RTP/AVP protocol");
info!("✓ SDP bridge: WebRTC -> RTP conversion verified");
}
let bob_rtp_sdp = "v=0\r\n\
o=- 789012 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 54321 RTP/AVP 0 101\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n";
bob.answer_call(&id, Some(bob_rtp_sdp.to_string())).await?;
info!("Bob answered with RTP SDP");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
if i % 10 == 0 {
info!("Waiting for Bob to receive call...");
}
}
assert!(bob_dialog_id.is_some(), "Bob should receive the call");
match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(alice_dialog_id))) => {
info!("Call established successfully: {}", alice_dialog_id);
sleep(Duration::from_millis(200)).await;
alice.hangup(&alice_dialog_id).await.ok();
info!("✓ WebRTC -> RTP SDP bridge test passed");
}
Ok(Ok(Err(e))) => {
warn!("Call failed: {:?}", e);
}
Ok(Err(e)) => {
warn!("Caller task panicked: {:?}", e);
}
Err(_) => {
warn!("Call timed out");
}
}
Ok(())
}
#[tokio::test]
async fn test_rtp_to_webrtc_sdp_bridge() -> Result<()> {
use crate::call::SipUser;
use crate::config::MediaProxyMode;
use crate::proxy::tests::test_ua::{TestUa, TestUaConfig};
let _ = tracing_subscriber::fmt::try_init();
let port = portpicker::pick_unused_port().unwrap_or(15090);
let config = crate::config::ProxyConfig {
addr: "127.0.0.1".to_string(),
udp_port: Some(port),
tcp_port: None,
tls_port: None,
ws_port: None,
useragent: Some("RustPBX-Test/0.1.0".to_string()),
modules: Some(vec![
"auth".to_string(),
"registrar".to_string(),
"call".to_string(),
]),
media_proxy: MediaProxyMode::All, ..Default::default()
};
let proxy_addr = format!("127.0.0.1:{}", port).parse()?;
let user_backend = crate::proxy::user::MemoryUserBackend::new(None);
user_backend
.create_user(SipUser {
id: 1,
username: "alice".to_string(),
password: Some("password123".to_string()),
enabled: true,
realm: Some("127.0.0.1".to_string()),
is_support_webrtc: false, ..Default::default()
})
.await?;
user_backend
.create_user(SipUser {
id: 2,
username: "bob".to_string(),
password: Some("password456".to_string()),
enabled: true,
realm: Some("127.0.0.1".to_string()),
is_support_webrtc: true, ..Default::default()
})
.await?;
let locator = crate::proxy::locator::MemoryLocator::new();
let cancel_token = tokio_util::sync::CancellationToken::new();
let builder = super::test_helpers::register_standard_modules(
crate::proxy::server::SipServerBuilder::new(std::sync::Arc::new(config))
.with_user_backend(Box::new(user_backend))
.with_locator(Box::new(locator))
.with_cancel_token(cancel_token.clone()),
);
let server = std::sync::Arc::new(builder.build().await?);
let server_clone = server.clone();
tokio::spawn(async move {
if let Err(e) = server_clone.serve().await {
warn!("Proxy server error: {:?}", e);
}
});
sleep(Duration::from_millis(200)).await;
let mut alice = TestUa::new(TestUaConfig {
username: "alice".to_string(),
password: "password123".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15091),
proxy_addr,
});
alice.start().await?;
alice.register().await?;
let alice = std::sync::Arc::new(alice);
let mut bob = TestUa::new(TestUaConfig {
username: "bob".to_string(),
password: "password456".to_string(),
realm: "127.0.0.1".to_string(),
local_port: portpicker::pick_unused_port().unwrap_or(15092),
proxy_addr,
});
bob.start().await?;
bob.register().await?;
let rtp_sdp = "v=0\r\n\
o=- 123456 123456 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 12345 RTP/AVP 0 101\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n";
let caller_handle = tokio::spawn({
let a = alice.clone();
let sdp = rtp_sdp.to_string();
async move { a.make_call("bob", Some(sdp)).await }
});
info!("Alice (RTP) initiated call to Bob (WebRTC)");
let mut bob_dialog_id = None;
let mut _received_sdp: Option<String> = None;
for i in 0..50 {
let events = bob.process_dialog_events().await.unwrap_or_default();
for event in events {
if let TestUaEvent::IncomingCall(id, sdp) = event {
bob_dialog_id = Some(id.clone());
_received_sdp = sdp.clone();
if let Some(ref sdp_str) = sdp {
info!("Bob received SDP:\n{}", sdp_str);
info!("Note: RTP -> WebRTC bridge test depends on proper WebRTC signaling");
}
let bob_webrtc_sdp = "v=0\r\n\
o=- 789012 789012 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio 54321 UDP/TLS/RTP/SAVPF 111 101\r\n\
a=rtpmap:111 opus/48000/2\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=fingerprint:sha-256 BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00:11:22:33:44:55:66:77:88:99:AA\r\n\
a=setup:active\r\n\
a=ice-ufrag:bob\r\n\
a=ice-pwd:bobpwd\r\n\
a=mid:0\r\n\
a=sendrecv\r\n\
a=rtcp-mux\r\n";
bob.answer_call(&id, Some(bob_webrtc_sdp.to_string()))
.await?;
info!("Bob answered with WebRTC SDP");
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
if i % 10 == 0 {
info!("Waiting for Bob to receive call...");
}
}
assert!(bob_dialog_id.is_some(), "Bob should receive the call");
match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(alice_dialog_id))) => {
info!("Call established successfully: {}", alice_dialog_id);
sleep(Duration::from_millis(200)).await;
alice.hangup(&alice_dialog_id).await.ok();
info!("✓ RTP -> WebRTC SDP bridge test completed");
}
Ok(Ok(Err(e))) => {
warn!("Call failed: {:?}", e);
}
Ok(Err(e)) => {
warn!("Caller task panicked: {:?}", e);
}
Err(_) => {
warn!("Call timed out");
}
}
Ok(())
}