use super::test_helpers;
use super::test_ua::{TestUa, TestUaConfig, TestUaEvent};
use crate::config::ProxyConfig;
use crate::proxy::{
locator::MemoryLocator,
server::{SipServer, SipServerBuilder},
user::MemoryUserBackend,
};
use anyhow::Result;
use rsipstack::dialog::DialogId;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
fn create_test_proxy_config(port: u16) -> ProxyConfig {
test_helpers::test_proxy_config(port)
}
async fn create_test_ua(username: &str, proxy_addr: SocketAddr, local_port: u16) -> Result<TestUa> {
let password = match username {
"alice" => "password123",
"bob" => "password456",
"charlie" => "password789",
_ => "password123",
};
let config = TestUaConfig {
username: username.to_string(),
password: password.to_string(),
realm: "127.0.0.1".to_string(),
local_port,
proxy_addr,
};
let mut ua = TestUa::new(config);
ua.start().await?;
Ok(ua)
}
async fn setup_proxy_and_users(port: u16) -> (Arc<SipServer>, CancellationToken) {
setup_proxy_and_users_with_config(port, Arc::new(create_test_proxy_config(port))).await
}
async fn setup_proxy_and_users_with_config(
_port: u16,
config: Arc<ProxyConfig>,
) -> (Arc<SipServer>, CancellationToken) {
let user_backend = MemoryUserBackend::new(None);
for user in test_helpers::standard_test_users() {
user_backend.create_user(user).await.unwrap();
}
let locator = MemoryLocator::new();
let cancel_token = CancellationToken::new();
let builder = test_helpers::register_standard_modules(
SipServerBuilder::new(config)
.with_user_backend(Box::new(user_backend))
.with_locator(Box::new(locator))
.with_cancel_token(cancel_token.clone()),
);
let server = Arc::new(builder.build().await.unwrap());
let server_clone = server.clone();
crate::utils::spawn(async move {
if let Err(e) = server_clone.serve().await {
warn!("Proxy server error: {:?}", e);
}
});
sleep(Duration::from_millis(200)).await;
(server, cancel_token)
}
async fn establish_call(
alice: &TestUa,
bob: &TestUa,
offer_sdp: &str,
bob_answer_sdp: &str,
) -> (DialogId, DialogId) {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let alice_clone = alice.clone();
let offer = offer_sdp.to_string();
crate::utils::spawn(async move {
let res = alice_clone.make_call("bob", Some(offer)).await;
let _ = tx.send(res).await;
});
let mut bob_call_id = None;
for _ in 0..30 {
sleep(Duration::from_millis(100)).await;
if let Ok(events) = bob.process_dialog_events().await {
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob.answer_call(&id, Some(bob_answer_sdp.to_string()))
.await
.unwrap();
bob_call_id = Some(id);
break;
}
}
}
if bob_call_id.is_some() {
break;
}
}
let bob_call_id = bob_call_id.expect("Bob should receive incoming call");
let alice_call_id = rx.recv().await.unwrap().expect("Alice call should succeed");
sleep(Duration::from_millis(500)).await;
(alice_call_id, bob_call_id)
}
#[tokio::test]
async fn test_reinvite_audio_hold_unhold() {
let _ = tracing_subscriber::fmt::try_init();
let port = portpicker::pick_unused_port().unwrap_or(15065);
let (_server, cancel_token) = setup_proxy_and_users(port).await;
let proxy_addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap();
let alice_port = portpicker::pick_unused_port().unwrap_or(25061);
let bob_port = portpicker::pick_unused_port().unwrap_or(25062);
let alice = create_test_ua("alice", proxy_addr, alice_port)
.await
.unwrap();
let bob = create_test_ua("bob", proxy_addr, bob_port).await.unwrap();
alice.register().await.unwrap();
bob.register().await.unwrap();
sleep(Duration::from_millis(200)).await;
let offer_sdp = "v=0\r\no=- 123 456 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 10000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\n".to_string();
let answer_sdp = "v=0\r\no=- 456 789 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 20000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\n".to_string();
let (alice_call_id, _bob_call_id) = establish_call(&alice, &bob, &offer_sdp, &answer_sdp).await;
let bob_clone = bob.clone();
let bob_handle = crate::utils::spawn(async move {
for _ in 0..50 {
if let Ok(events) = bob_clone.process_dialog_events().await {
for event in &events {
if let TestUaEvent::CallUpdated(_, method, _) = event
&& *method == rsipstack::sip::Method::Invite
{
return true;
}
}
}
sleep(Duration::from_millis(100)).await;
}
false
});
let hold_sdp = "v=0\r\no=- 123 457 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 10000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\na=sendonly\r\n".to_string();
let alice_received_sdp = alice
.send_reinvite(&alice_call_id, Some(hold_sdp.clone()))
.await
.unwrap();
assert!(
alice_received_sdp.is_some(),
"Alice should receive SDP answer"
);
info!(
"Alice received SDP answer from Proxy: {:?}",
alice_received_sdp
);
let bob_processed = bob_handle.await.unwrap();
assert!(bob_processed, "Bob should receive forwarded re-INVITE");
let received_sdp = alice_received_sdp.unwrap();
assert!(
received_sdp.contains("PCMU/8000"),
"SDP answer should contain PCMU"
);
alice.hangup(&alice_call_id).await.ok();
cancel_token.cancel();
}
#[tokio::test]
async fn test_reinvite_audio_only_no_video() {
let _ = tracing_subscriber::fmt::try_init();
let port = portpicker::pick_unused_port().unwrap_or(15066);
let (_server, cancel_token) = setup_proxy_and_users(port).await;
let proxy_addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap();
let alice_port = portpicker::pick_unused_port().unwrap_or(25063);
let bob_port = portpicker::pick_unused_port().unwrap_or(25064);
let alice = create_test_ua("alice", proxy_addr, alice_port)
.await
.unwrap();
let bob = create_test_ua("bob", proxy_addr, bob_port).await.unwrap();
alice.register().await.unwrap();
bob.register().await.unwrap();
sleep(Duration::from_millis(200)).await;
let offer_sdp = "v=0\r\no=- 123 456 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 10000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\n".to_string();
let answer_sdp = "v=0\r\no=- 456 789 IN IP4 127.0.0.1\r\ns=-\r\nc=IN IP4 127.0.0.1\r\nt=0 0\r\nm=audio 20000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\n".to_string();
let (alice_call_id, _bob_call_id) = establish_call(&alice, &bob, &offer_sdp, &answer_sdp).await;
alice.hangup(&alice_call_id).await.ok();
cancel_token.cancel();
}
#[tokio::test]
async fn test_reinvite_audio_to_video_add_via_bridge() {
use crate::media::negotiate::MediaNegotiator;
use rustrtc::sdp::{SdpType, SessionDescription};
let caller_sdp = "v=0\r\no=- 1 1 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\nm=audio 10000 RTP/AVP 0 101\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:101 telephone-event/8000\r\n";
let caller_codecs = MediaNegotiator::build_codec_list_from_offer(caller_sdp, &[]);
let audio_caps: Vec<_> = caller_codecs
.iter()
.filter_map(|c| c.to_audio_capability())
.collect();
let bridge = crate::media::bridge::BridgePeerBuilder::new("test-bridge-video".to_string())
.with_caller_audio_capabilities(audio_caps.clone())
.with_callee_audio_capabilities(audio_caps)
.build();
bridge.setup_bridge().await.unwrap();
assert!(!bridge.has_video().await, "No video before add_video_track");
bridge.add_video_track(96, 90000).await.unwrap();
assert!(
bridge.has_video().await,
"Both sides should have video after add_video_track"
);
let reinvite_offer = "v=0\r\no=- 123 456 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\nm=audio 10000 RTP/AVP 0\r\na=rtpmap:0 PCMU/8000\r\nm=video 10002 RTP/AVP 96\r\na=rtpmap:96 H264/90000\r\na=fmtp:96 packetization-mode=1\r\n";
let pc = bridge.callee_pc();
let offer = SessionDescription::parse(SdpType::Offer, reinvite_offer).unwrap();
pc.set_remote_description(offer).await.unwrap();
let answer = pc.create_answer().await.unwrap();
pc.set_local_description(answer).unwrap();
let answer_sdp = pc.local_description().unwrap().to_sdp_string();
assert!(
answer_sdp.contains("m=video"),
"Answer SDP must contain video when re-INVITE adds video: {}",
answer_sdp
);
info!("Video re-INVITE bridge test passed");
}