use super::cdr_capture::CdrCapture;
use super::rtp_utils::{RtpReceiver, RtpSender};
use super::test_helpers;
use super::test_ua::{TestUa, TestUaConfig};
use crate::call::user::SipUser;
use crate::config::{MediaProxyMode, ProxyConfig};
use crate::proxy::{
active_call_registry::ActiveProxyCallRegistry,
locator::MemoryLocator,
server::{SipServerBuilder, SipServerRef},
user::MemoryUserBackend,
};
use anyhow::Result;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
pub struct E2eTestServer {
pub port: u16,
pub proxy_addr: SocketAddr,
pub server_ref: SipServerRef,
pub cdr_capture: CdrCapture,
pub registry: Arc<ActiveProxyCallRegistry>,
pub media_proxy_mode: MediaProxyMode,
cancel_token: CancellationToken,
_server_handle: Option<tokio::task::JoinHandle<()>>,
}
impl E2eTestServer {
pub async fn start_with_mode(mode: MediaProxyMode) -> Result<Self> {
let port = portpicker::pick_unused_port().unwrap_or(15060);
let proxy_addr = format!("127.0.0.1:{}", port).parse()?;
let mut proxy_config = test_helpers::test_proxy_config(port);
proxy_config.media_proxy = mode;
proxy_config.ensure_user = Some(false);
proxy_config.enable_latching = false;
let config = Arc::new(proxy_config);
let (cdr_capture, cdr_sender) = CdrCapture::new();
let user_backend = MemoryUserBackend::new(None);
for user in test_helpers::standard_test_users() {
user_backend.create_user(user).await?;
}
let locator = MemoryLocator::new();
let cancel_token = CancellationToken::new();
let builder = test_helpers::register_standard_modules(
SipServerBuilder::new(config)
.with_user_backend(Box::new(user_backend))
.with_locator(Box::new(locator))
.with_cancel_token(cancel_token.clone())
.with_callrecord_sender(Some(cdr_sender)),
);
let server = Arc::new(builder.build().await?);
let server_ref = server.get_inner();
let registry = server_ref.active_call_registry.clone();
let cancel_token_clone = cancel_token.clone();
let _server_handle = Some(tokio::spawn(async move {
tokio::select! {
_ = cancel_token_clone.cancelled() => {
info!("E2E test server cancelled");
}
result = server.serve() => {
if let Err(e) = result {
warn!("E2E test server error: {:?}", e);
}
}
}
}));
sleep(Duration::from_millis(200)).await;
info!(port, ?mode, "E2E test server started");
Ok(Self {
port,
proxy_addr,
server_ref,
cdr_capture,
registry,
media_proxy_mode: mode,
cancel_token,
_server_handle,
})
}
pub async fn start_with_config(mut proxy_config: ProxyConfig) -> Result<Self> {
let port = portpicker::pick_unused_port().unwrap_or(15060);
let proxy_addr = format!("127.0.0.1:{}", port).parse()?;
let base = test_helpers::test_proxy_config(port);
proxy_config.addr = base.addr;
proxy_config.udp_port = base.udp_port;
proxy_config.tcp_port = base.tcp_port;
proxy_config.tls_port = base.tls_port;
proxy_config.ws_port = base.ws_port;
proxy_config.useragent = base.useragent;
proxy_config.modules = base.modules;
proxy_config.ensure_user = Some(false);
let config = Arc::new(proxy_config);
let mode = config.media_proxy;
let (cdr_capture, cdr_sender) = CdrCapture::new();
let user_backend = MemoryUserBackend::new(None);
for user in test_helpers::standard_test_users() {
user_backend.create_user(user).await?;
}
let locator = MemoryLocator::new();
let cancel_token = CancellationToken::new();
let builder = test_helpers::register_standard_modules(
SipServerBuilder::new(config)
.with_user_backend(Box::new(user_backend))
.with_locator(Box::new(locator))
.with_cancel_token(cancel_token.clone())
.with_callrecord_sender(Some(cdr_sender)),
);
let server = Arc::new(builder.build().await?);
let server_ref = server.get_inner();
let registry = server_ref.active_call_registry.clone();
let cancel_token_clone = cancel_token.clone();
let _server_handle = Some(tokio::spawn(async move {
tokio::select! {
_ = cancel_token_clone.cancelled() => {
info!("E2E test server cancelled");
}
result = server.serve() => {
if let Err(e) = result {
warn!("E2E test server error: {:?}", e);
}
}
}
}));
sleep(Duration::from_millis(200)).await;
info!(port, ?mode, "E2E test server started with custom config");
Ok(Self {
port,
proxy_addr,
server_ref,
cdr_capture,
registry,
media_proxy_mode: mode,
cancel_token,
_server_handle,
})
}
pub async fn start() -> Result<Self> {
Self::start_with_mode(MediaProxyMode::Auto).await
}
pub async fn create_ua(&self, username: &str) -> Result<TestUa> {
let password = match username {
"alice" => "password123",
"bob" => "password456",
"charlie" => "password789",
_ => "password",
};
let local_port = portpicker::pick_unused_port().unwrap_or(25000);
let config = TestUaConfig {
username: username.to_string(),
password: password.to_string(),
realm: "127.0.0.1".to_string(),
local_port,
proxy_addr: self.proxy_addr,
};
let mut ua = TestUa::new(config);
ua.start().await?;
ua.register().await?;
info!(username, port = local_port, "TestUa created and registered");
Ok(ua)
}
pub fn get_active_calls(
&self,
) -> Vec<crate::proxy::active_call_registry::ActiveProxyCallEntry> {
self.registry.list_recent(100)
}
pub async fn wait_for_active_call(&self, timeout: Duration) -> Option<String> {
let start = tokio::time::Instant::now();
while start.elapsed() < timeout {
let calls = self.get_active_calls();
if let Some(call) = calls.first() {
return Some(call.session_id.clone());
}
sleep(Duration::from_millis(100)).await;
}
None
}
pub fn stop(&self) {
self.cancel_token.cancel();
}
}
impl Drop for E2eTestServer {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
pub struct E2eTestUa {
pub ua: TestUa,
pub rtp_receiver: Option<RtpReceiver>,
pub rtp_sender: Option<RtpSender>,
pub rtp_port: Option<u16>,
}
impl E2eTestUa {
pub async fn new_with_rtp(ua: TestUa) -> Result<Self> {
let rtp_receiver = RtpReceiver::bind(0).await?;
let rtp_port = rtp_receiver.port()?;
Ok(Self {
ua,
rtp_receiver: Some(rtp_receiver),
rtp_sender: None,
rtp_port: Some(rtp_port),
})
}
pub fn start_receiving(&mut self) -> Result<()> {
if let Some(ref receiver) = self.rtp_receiver {
receiver.start_receiving();
info!("RTP receiver started");
}
Ok(())
}
pub async fn setup_sender(&mut self) -> Result<()> {
self.rtp_sender = Some(RtpSender::bind().await?);
Ok(())
}
pub fn get_sdp_with_rtp_port(&self, base_sdp: &str) -> String {
let port = self.rtp_port.unwrap_or(5004);
base_sdp
.replace(&format!("m=audio {} ", 5004), &format!("m=audio {} ", port))
.replace(
&format!("m=audio {} ", 12345),
&format!("m=audio {} ", port),
)
}
pub async fn get_rtp_stats(&self) -> Option<super::rtp_utils::RtpStats> {
if let Some(ref receiver) = self.rtp_receiver {
Some(receiver.get_stats().await)
} else {
None
}
}
pub async fn send_rtp_to(
&self,
target: SocketAddr,
packets: Vec<super::rtp_utils::RtpPacket>,
interval_ms: u64,
) -> Result<()> {
if let Some(ref sender) = self.rtp_sender {
sender.send_sequence(target, packets, interval_ms).await?;
}
Ok(())
}
}
pub struct CallScenario {
server: Arc<E2eTestServer>,
caller: Option<TestUa>,
callee: Option<TestUa>,
}
impl CallScenario {
pub fn new(server: Arc<E2eTestServer>) -> Self {
Self {
server,
caller: None,
callee: None,
}
}
pub async fn with_caller(mut self, username: &str) -> Result<Self> {
self.caller = Some(self.server.create_ua(username).await?);
Ok(self)
}
pub async fn with_callee(mut self, username: &str) -> Result<Self> {
self.callee = Some(self.server.create_ua(username).await?);
Ok(self)
}
pub async fn execute(&mut self) -> Result<&str> {
Ok("")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_e2e_server_start() {
let server = E2eTestServer::start().await;
assert!(server.is_ok());
let server = server.unwrap();
assert!(server.port > 0);
server.stop();
}
#[tokio::test]
async fn test_create_ua() {
let server = E2eTestServer::start().await.unwrap();
let ua = server.create_ua("alice").await;
assert!(ua.is_ok());
server.stop();
}
}