use super::test_ua::TestUaEvent;
use crate::config::MediaProxyMode;
use crate::rwi::gateway::{EventCacheEntry, RwiGateway};
use anyhow::Result;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::sleep;
use tracing::info;
struct DnEventCapture {
events: Vec<String>,
_rx: broadcast::Receiver<EventCacheEntry>,
}
impl DnEventCapture {
fn collect(&mut self) {
while let Ok(entry) = self._rx.try_recv() {
let flat = &entry.event;
if flat.event_type.contains("dn") || flat.event_type.contains("state") {
self.events.push(flat.event_type.to_string());
}
}
}
fn has_event(&self, name: &str) -> bool {
self.events.iter().any(|n| n == name)
}
}
fn setup_gateway_with_capture() -> (
Arc<RwLock<RwiGateway>>,
broadcast::Receiver<EventCacheEntry>,
) {
let (tx, rx) = broadcast::channel::<EventCacheEntry>(1000);
let mut gw = RwiGateway::new();
gw.set_webhook_tx(tx);
(Arc::new(RwLock::new(gw)), rx)
}
fn pcmu_sdp(port: u16) -> String {
format!(
"v=0\r\n\
o=- 12345 12345 IN IP4 127.0.0.1\r\n\
s=-\r\n\
c=IN IP4 127.0.0.1\r\n\
t=0 0\r\n\
m=audio {port} RTP/AVP 0 101\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:101 telephone-event/8000\r\n\
a=sendrecv\r\n"
)
}
#[tokio::test]
#[ignore = "DnStateChanged removed; use agent_state_changed instead"]
async fn test_dn_events_register_and_call_flow() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let (gw, rx) = setup_gateway_with_capture();
let mut proxy_config = crate::config::ProxyConfig::default();
proxy_config.media_proxy = MediaProxyMode::Auto;
proxy_config.addr = "127.0.0.1".to_string();
let port = portpicker::pick_unused_port().unwrap_or(15060);
let base = super::test_helpers::test_proxy_config(port);
proxy_config.addr = base.addr;
proxy_config.udp_port = base.udp_port;
proxy_config.tcp_port = base.tcp_port;
proxy_config.tls_port = base.tls_port;
proxy_config.ws_port = base.ws_port;
proxy_config.useragent = base.useragent;
proxy_config.modules = base.modules;
proxy_config.ensure_user = Some(false);
proxy_config.enable_latching = false;
let config = Arc::new(proxy_config);
let user_backend = crate::proxy::user::MemoryUserBackend::new(None);
for user in super::test_helpers::standard_test_users() {
user_backend.create_user(user).await?;
}
let locator = crate::proxy::locator::MemoryLocator::new();
let cancel_token = tokio_util::sync::CancellationToken::new();
use crate::proxy::server::SipServerBuilder;
let (_cdr_capture, cdr_sender) = super::cdr_capture::CdrCapture::new();
let builder = super::test_helpers::register_standard_modules(
SipServerBuilder::new(config)
.with_user_backend(Box::new(user_backend))
.with_locator(Box::new(locator))
.with_cancel_token(cancel_token.clone())
.with_callrecord_sender(Some(cdr_sender))
.with_rwi_gateway(gw.clone()),
);
let server = Arc::new(builder.build().await?);
let _server_ref = server.get_inner();
let cancel_token_clone = cancel_token.clone();
let _server_handle = crate::utils::spawn(async move {
tokio::select! {
_ = cancel_token_clone.cancelled() => {}
result = server.serve() => {
if let Err(e) = result {
tracing::warn!("E2E test server error: {:?}", e);
}
}
}
});
sleep(Duration::from_millis(300)).await;
let proxy_addr = format!("127.0.0.1:{}", port).parse()?;
let alice_port = portpicker::pick_unused_port().unwrap_or(25000);
let alice_config = super::test_ua::TestUaConfig {
username: "alice".to_string(),
password: "password123".to_string(),
realm: "127.0.0.1".to_string(),
local_port: alice_port,
proxy_addr,
};
let mut alice = super::test_ua::TestUa::new(alice_config);
alice.start().await?;
alice.register().await?;
let bob_port = portpicker::pick_unused_port().unwrap_or(25001);
let bob_config = super::test_ua::TestUaConfig {
username: "bob".to_string(),
password: "password456".to_string(),
realm: "127.0.0.1".to_string(),
local_port: bob_port,
proxy_addr,
};
let mut bob = super::test_ua::TestUa::new(bob_config);
bob.start().await?;
bob.register().await?;
sleep(Duration::from_millis(300)).await;
let mut capture = DnEventCapture {
events: vec![],
_rx: rx,
};
capture.collect();
assert!(
capture.has_event("REGISTERED"),
"REGISTERED event should be emitted after registration"
);
let alice_sdp = pcmu_sdp(portpicker::pick_unused_port().unwrap_or(30000));
let bob_sdp = pcmu_sdp(portpicker::pick_unused_port().unwrap_or(30001));
let caller_handle = {
let a = alice.clone();
let sdp = alice_sdp.clone();
crate::utils::spawn(async move { a.make_call("bob", Some(sdp)).await })
};
sleep(Duration::from_millis(200)).await;
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_dialog_id.is_some(), "Bob should receive the call");
capture.collect();
assert!(
capture.has_event("DIALING"),
"DIALING event should be emitted when INVITE is sent"
);
if let Some(ref id) = bob_dialog_id {
bob.send_ringing(id, None).await?;
}
sleep(Duration::from_millis(200)).await;
capture.collect();
assert!(
capture.has_event("RINGING"),
"RINGING event should be emitted when callee rings"
);
if let Some(ref id) = bob_dialog_id {
bob.answer_call(id, Some(bob_sdp.clone())).await?;
}
let _alice_dialog_id = match tokio::time::timeout(Duration::from_secs(5), caller_handle).await {
Ok(Ok(Ok(id))) => {
info!("Call established: {}", id);
Some(id)
}
other => {
info!("Caller result: {:?}", other);
None
}
};
sleep(Duration::from_millis(300)).await;
capture.collect();
assert!(
capture.has_event("ESTABLISHED"),
"ESTABLISHED event should be emitted when call is answered"
);
if let Some(ref id) = bob_dialog_id {
bob.hangup(id).await?;
}
sleep(Duration::from_millis(500)).await;
capture.collect();
assert!(
capture.has_event("ONHOOK"),
"ONHOOK event should be emitted when a party hangs up"
);
assert!(
capture.has_event("RELEASED"),
"RELEASED event should be emitted when call is released"
);
cancel_token.cancel();
sleep(Duration::from_millis(100)).await;
Ok(())
}
#[tokio::test]
#[ignore = "DnStateChanged removed; use agent_state_changed instead"]
async fn test_dn_events_abandoned_on_reject() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let (gw, rx) = setup_gateway_with_capture();
let port = portpicker::pick_unused_port().unwrap_or(15060);
let mut proxy_config = super::test_helpers::test_proxy_config(port);
proxy_config.media_proxy = MediaProxyMode::Auto;
proxy_config.ensure_user = Some(false);
proxy_config.enable_latching = false;
let config = Arc::new(proxy_config);
let user_backend = crate::proxy::user::MemoryUserBackend::new(None);
for user in super::test_helpers::standard_test_users() {
user_backend.create_user(user).await?;
}
let locator = crate::proxy::locator::MemoryLocator::new();
let cancel_token = tokio_util::sync::CancellationToken::new();
use crate::proxy::server::SipServerBuilder;
let (_cdr_capture, cdr_sender) = super::cdr_capture::CdrCapture::new();
let builder = super::test_helpers::register_standard_modules(
SipServerBuilder::new(config)
.with_user_backend(Box::new(user_backend))
.with_locator(Box::new(locator))
.with_cancel_token(cancel_token.clone())
.with_callrecord_sender(Some(cdr_sender))
.with_rwi_gateway(gw.clone()),
);
let server = Arc::new(builder.build().await?);
let _server_ref = server.get_inner();
let cancel_token_clone = cancel_token.clone();
let _server_handle = crate::utils::spawn(async move {
tokio::select! {
_ = cancel_token_clone.cancelled() => {}
result = server.serve() => {
if let Err(e) = result {
tracing::warn!("E2E test server error: {:?}", e);
}
}
}
});
sleep(Duration::from_millis(300)).await;
let proxy_addr = format!("127.0.0.1:{}", port).parse()?;
let alice_port = portpicker::pick_unused_port().unwrap_or(25010);
let alice_config = super::test_ua::TestUaConfig {
username: "alice".to_string(),
password: "password123".to_string(),
realm: "127.0.0.1".to_string(),
local_port: alice_port,
proxy_addr,
};
let mut alice = super::test_ua::TestUa::new(alice_config);
alice.start().await?;
alice.register().await?;
let bob_port = portpicker::pick_unused_port().unwrap_or(25011);
let bob_config = super::test_ua::TestUaConfig {
username: "bob".to_string(),
password: "password456".to_string(),
realm: "127.0.0.1".to_string(),
local_port: bob_port,
proxy_addr,
};
let mut bob = super::test_ua::TestUa::new(bob_config);
bob.start().await?;
bob.register().await?;
sleep(Duration::from_millis(300)).await;
let alice_sdp = pcmu_sdp(portpicker::pick_unused_port().unwrap_or(30010));
let _caller_handle = {
let a = alice.clone();
let sdp = alice_sdp.clone();
crate::utils::spawn(async move { a.make_call("bob", Some(sdp)).await })
};
sleep(Duration::from_millis(200)).await;
let mut bob_dialog_id = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, _) = event {
bob_dialog_id = Some(id.clone());
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(bob_dialog_id.is_some(), "Bob should receive the call");
if let Some(ref id) = bob_dialog_id {
bob.reject_call_with_reason(id, Some(486), Some("Busy Here".to_string()))
.await?;
}
sleep(Duration::from_millis(500)).await;
let mut capture = DnEventCapture {
events: vec![],
_rx: rx,
};
capture.collect();
assert!(
capture.has_event("ABANDONED"),
"ABANDONED event should be emitted when callee rejects (486)"
);
cancel_token.cancel();
sleep(Duration::from_millis(100)).await;
Ok(())
}