use crate::{
call::Location,
proxy::locator::{DialogTargetLocator, Locator},
proxy::locator_db::DbLocator,
};
use rsipstack::sip::{HostWithPort, Scheme};
use rsipstack::transport::SipAddr;
use std::sync::Arc;
use std::time::Instant;
#[tokio::test]
async fn test_db_locator() {
let aor = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "alice".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("rustpbx.com").unwrap(),
params: vec![],
headers: vec![],
};
let host_with_port = HostWithPort::try_from("127.0.0.1:5060").unwrap();
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: host_with_port,
};
let location = Location {
aor: aor.clone(),
expires: 3600,
destination: Some(destination.clone()),
last_modified: Some(Instant::now()),
..Default::default()
};
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
locator
.register("alice", Some("rustpbx.com"), location)
.await
.unwrap();
let locations = locator
.lookup(&"sip:alice@rustpbx.com".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations.len(), 1);
assert_eq!(locations[0].aor.to_string(), aor.to_string());
assert_eq!(locations[0].expires, 3600);
match &locations[0].destination {
Some(SipAddr { r#type, addr }) => {
assert_eq!(r#type, &Some(rsipstack::sip::transport::Transport::Udp));
match &addr.host {
rsipstack::sip::Host::IpAddr(ip) => {
assert_eq!(ip.to_string(), "127.0.0.1");
}
_ => panic!("Expected IP address"),
}
assert_eq!(addr.port.as_ref().unwrap().value().to_owned(), 5060);
}
None => panic!("Expected destination to be Some"),
}
locator
.unregister("alice", Some("rustpbx.com"))
.await
.unwrap();
let result = locator
.lookup(&"sip:alice@rustpbx.com".try_into().expect("invalid uri"))
.await;
if let Ok(v) = result {
assert!(v.is_empty(), "Expected no locations after unregister")
}
}
#[tokio::test]
async fn test_db_locator_with_custom_table() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let aor = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "bob".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("rustpbx.com:5080").unwrap(),
params: vec![],
headers: vec![],
};
let host_with_port = HostWithPort::try_from("192.168.1.1:5080").unwrap();
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Tcp),
addr: host_with_port,
};
let location = Location {
aor: aor.clone(),
expires: 1800,
destination: Some(destination.clone()),
last_modified: Some(Instant::now()),
..Default::default()
};
locator
.register("bob", Some("rustpbx.com"), location)
.await
.unwrap();
let locations = locator
.lookup(&"sip:bob@rustpbx.com:5080".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations.len(), 1);
assert_eq!(locations[0].aor.to_string(), aor.to_string());
assert_eq!(locations[0].expires, 1800);
match &locations[0].destination {
Some(SipAddr { r#type, addr }) => {
assert_eq!(r#type, &Some(rsipstack::sip::transport::Transport::Tcp));
match &addr.host {
rsipstack::sip::Host::IpAddr(ip) => {
assert_eq!(ip.to_string(), "192.168.1.1");
}
_ => panic!("Expected IP address"),
}
assert_eq!(addr.port.as_ref().unwrap().value().to_owned(), 5080);
}
None => panic!("Expected destination to be Some"),
}
locator
.unregister("bob", Some("rustpbx.com"))
.await
.unwrap();
let result = locator
.lookup(&"sip:bob@rustpbx.com".try_into().expect("invalid uri"))
.await;
if let Ok(v) = result {
assert!(v.is_empty(), "Expected no locations after unregister")
}
}
#[tokio::test]
async fn test_db_locator_multiple_lookups() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let aor1 = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "carol".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("rustpbx.com").unwrap(),
params: vec![],
headers: vec![],
};
let host_with_port1 = HostWithPort::try_from("127.0.0.1:5060").unwrap();
let destination1 = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: host_with_port1,
};
let location1 = Location {
aor: aor1.clone(),
expires: 3600,
destination: Some(destination1.clone()),
last_modified: Some(Instant::now()),
..Default::default()
};
locator
.register("carol", Some("rustpbx.com"), location1)
.await
.unwrap();
let locations = locator
.lookup(&"sip:carol@rustpbx.com".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations.len(), 1);
let aor2 = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "carol".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("otherrustpbx.com:5080").unwrap(),
params: vec![],
headers: vec![],
};
let host_with_port2 = HostWithPort::try_from("192.168.1.10:5080").unwrap();
let destination2 = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Tcp),
addr: host_with_port2,
};
let location2 = Location {
aor: aor2.clone(),
expires: 1800,
destination: Some(destination2.clone()),
last_modified: Some(Instant::now()),
..Default::default()
};
locator
.register("carol", Some("otherrustpbx.com"), location2)
.await
.unwrap();
let locations1 = locator
.lookup(&"sip:carol@rustpbx.com".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations1.len(), 1);
assert_eq!(locations1[0].aor.to_string(), aor1.to_string());
let locations2 = locator
.lookup(
&"sip:carol@otherrustpbx.com:5080"
.try_into()
.expect("invalid uri"),
)
.await
.unwrap();
assert_eq!(locations2.len(), 1);
assert_eq!(locations2[0].aor.to_string(), aor2.to_string());
locator
.unregister("carol", Some("rustpbx.com"))
.await
.unwrap();
let result1 = locator
.lookup(&"sip:carol@rustpbx.com".try_into().expect("invalid uri"))
.await;
if let Ok(v) = result1 {
assert!(
v.is_empty(),
"Expected no locations after unregister for realm rustpbx.com"
)
}
let locations2 = locator
.lookup(
&"sip:carol@otherrustpbx.com:5080"
.try_into()
.expect("invalid uri"),
)
.await
.unwrap();
assert_eq!(locations2.len(), 1);
assert_eq!(locations2[0].aor.to_string(), aor2.to_string());
}
#[tokio::test]
async fn test_db_locator_localhost_alias() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let aor = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "dave".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("192.168.3.181").unwrap(),
params: vec![],
headers: vec![],
};
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: HostWithPort::try_from("192.168.3.181:5060").unwrap(),
};
locator
.register(
"dave",
Some("192.168.3.181"),
Location {
aor: aor.clone(),
expires: 3600,
destination: Some(destination),
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let locations = locator
.lookup(&"sip:dave@localhost".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations.len(), 1);
assert_eq!(locations[0].aor.to_string(), aor.to_string());
}
#[tokio::test]
async fn test_db_locator_home_proxy_crud() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let aor = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "alice".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("rustpbx.com").unwrap(),
params: vec![],
headers: vec![],
};
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: HostWithPort::try_from("192.168.1.10:5060").unwrap(),
};
let home_proxy = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Tcp),
addr: HostWithPort::try_from("10.0.0.1:5060").unwrap(),
};
locator
.register(
"alice",
Some("rustpbx.com"),
Location {
aor: aor.clone(),
expires: 3600,
destination: Some(destination.clone()),
home_proxy: Some(home_proxy.clone()),
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let locations = locator
.lookup(&"sip:alice@rustpbx.com".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations.len(), 1);
assert_eq!(locations[0].destination, Some(destination.clone()));
assert_eq!(locations[0].home_proxy, Some(home_proxy));
let new_home_proxy = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: HostWithPort::try_from("10.0.0.2:5060").unwrap(),
};
locator
.register(
"alice",
Some("rustpbx.com"),
Location {
aor: aor.clone(),
expires: 3600,
destination: Some(destination.clone()),
home_proxy: Some(new_home_proxy.clone()),
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let locations = locator
.lookup(&"sip:alice@rustpbx.com".try_into().expect("invalid uri"))
.await
.unwrap();
assert_eq!(locations.len(), 1);
assert_eq!(locations[0].home_proxy, Some(new_home_proxy));
locator
.unregister("alice", Some("rustpbx.com"))
.await
.unwrap();
let result = locator
.lookup(&"sip:alice@rustpbx.com".try_into().expect("invalid uri"))
.await;
if let Ok(v) = result {
assert!(v.is_empty(), "Expected no locations after unregister");
}
}
#[tokio::test]
async fn test_db_locator_rewrites_legacy_registered_aor_contact_to_canonical_aor() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let contact_uri: rsipstack::sip::Uri = "sip:lp@172.25.52.29:51003;transport=UDP"
.try_into()
.unwrap();
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: HostWithPort::try_from("172.25.52.29:51003").unwrap(),
};
let home_proxy = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: HostWithPort::try_from("10.145.213.70:8060").unwrap(),
};
locator
.register(
"lp",
Some("localhost"),
Location {
aor: contact_uri.clone(),
registered_aor: Some(contact_uri),
destination: Some(destination),
home_proxy: Some(home_proxy),
expires: 3600,
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let results = locator
.lookup(&"sip:lp@localhost".try_into().unwrap())
.await
.unwrap();
assert_eq!(results.len(), 1);
let canonical = results[0].registered_aor.as_ref().unwrap().to_string();
assert_eq!(canonical, "sip:lp@localhost");
}
#[tokio::test]
async fn test_db_locator_lookup_by_domain_when_registered_with_ip() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
locator.set_realm_checker(Arc::new(|realm: &str| {
let realm = realm.to_string();
Box::pin(async move {
realm == "kefutest.xiaojukeji.com" || crate::proxy::locator::is_local_realm(&realm)
})
}));
let contact_aor: rsipstack::sip::Uri = "sip:bp@172.28.47.170:57491".try_into().unwrap();
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Udp),
addr: HostWithPort::try_from("172.28.47.170:57491").unwrap(),
};
locator
.register(
"bp",
Some("172.28.47.170"),
Location {
aor: contact_aor.clone(),
destination: Some(destination),
expires: 3600,
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let results = locator
.lookup(&"sip:bp@kefutest.xiaojukeji.com".try_into().unwrap())
.await
.unwrap();
assert_eq!(
results.len(),
1,
"Should find record when querying by domain after registering with IP"
);
assert_eq!(results[0].aor.to_string(), contact_aor.to_string());
let empty = locator
.lookup(&"sip:wp@kefutest.xiaojukeji.com".try_into().unwrap())
.await
.unwrap();
assert!(empty.is_empty(), "Different user should not match");
}
#[tokio::test]
async fn test_db_locator_lookup_with_invalid_host() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let aor = rsipstack::sip::Uri {
scheme: Some(Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: "jssip_user".to_string(),
password: None,
}),
host_with_port: HostWithPort::try_from("pbx.real-domain.com").unwrap(),
params: vec![],
headers: vec![],
};
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Ws),
addr: HostWithPort::try_from("192.168.1.200:5060").unwrap(),
};
locator
.register(
"jssip_user",
Some("pbx.real-domain.com"),
Location {
aor: aor.clone(),
expires: 3600,
destination: Some(destination.clone()),
transport: Some(rsipstack::sip::transport::Transport::Ws),
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let lookup_uri: rsipstack::sip::Uri = "sip:jssip_user@abcdef123456.invalid;transport=ws"
.try_into()
.unwrap();
let locations = locator.lookup(&lookup_uri).await.unwrap();
assert_eq!(
locations.len(),
1,
"should find registration via .invalid lookup"
);
assert_eq!(
locations[0].destination,
Some(destination),
"destination should match the WebSocket connection"
);
}
#[tokio::test]
async fn test_db_locator_unregister_with_address_protects_fresh_registration() {
let locator = DbLocator::new("sqlite::memory:".to_string()).await.unwrap();
let destination = SipAddr {
r#type: Some(rsipstack::sip::transport::Transport::Wss),
addr: HostWithPort::try_from("198.51.100.10:51234").unwrap(),
};
locator
.register(
"webphone",
Some("rustpbx.com"),
Location {
aor: "sip:webphone@abc.invalid;transport=ws".try_into().unwrap(),
expires: 3600,
destination: Some(destination.clone()),
transport: Some(rsipstack::sip::transport::Transport::Wss),
last_modified: Some(Instant::now()),
..Default::default()
},
)
.await
.unwrap();
let removed = locator.unregister_with_address(&destination).await.unwrap();
assert!(
removed.is_none(),
"fresh registration (< grace window) must not be removed by unregister_with_address"
);
let locations = locator
.lookup(&"sip:webphone@abc.invalid;transport=ws".try_into().unwrap())
.await
.unwrap();
assert_eq!(
locations.len(),
1,
"registration must survive a stale close event within the grace window"
);
}
fn make_webrtc_location(contact: &str, home_proxy: SipAddr, ws_destination: SipAddr) -> Location {
Location {
aor: contact.try_into().unwrap(),
expires: 3600,
destination: Some(ws_destination),
home_proxy: Some(home_proxy),
registered_aor: Some("sip:bob@rustpbx.com".try_into().unwrap()),
transport: Some(rsipstack::sip::transport::Transport::Wss),
supports_webrtc: true,
last_modified: Some(Instant::now()),
..Default::default()
}
}
fn sip_addr(host: &str, transport: rsipstack::sip::transport::Transport) -> SipAddr {
SipAddr {
r#type: Some(transport),
addr: HostWithPort::try_from(host).unwrap(),
}
}
#[tokio::test]
async fn test_cross_cluster_invalid_bye_routes_to_remote_home_proxy() {
let locator: Arc<Box<dyn Locator>> = Arc::new(Box::new(
DbLocator::new("sqlite::memory:".to_string()).await.unwrap(),
));
let node_a = sip_addr("10.0.0.5:5060", rsipstack::sip::transport::Transport::Udp);
let ws_a = sip_addr(
"198.51.100.10:51234",
rsipstack::sip::transport::Transport::Wss,
);
let contact_a: rsipstack::sip::Uri = "sip:bob@7s8f2k.invalid;transport=ws".try_into().unwrap();
locator
.register(
"bob",
Some("rustpbx.com"),
make_webrtc_location(
"sip:bob@7s8f2k.invalid;transport=ws",
node_a.clone(),
ws_a.clone(),
),
)
.await
.unwrap();
let node_b = sip_addr("10.0.0.6:5060", rsipstack::sip::transport::Transport::Udp);
let dtl = DialogTargetLocator::new(locator.clone(), vec![node_b], true);
let result = dtl.locate(&contact_a).await.unwrap();
assert_eq!(
result, node_a,
"cross-node BYE with .invalid contact must route to remote home_proxy"
);
}
#[tokio::test]
async fn test_cross_cluster_invalid_bye_delivers_to_local_ws() {
let locator: Arc<Box<dyn Locator>> = Arc::new(Box::new(
DbLocator::new("sqlite::memory:".to_string()).await.unwrap(),
));
let node_a = sip_addr("10.0.0.5:5060", rsipstack::sip::transport::Transport::Udp);
let ws_a = sip_addr(
"198.51.100.10:51234",
rsipstack::sip::transport::Transport::Wss,
);
let contact_a: rsipstack::sip::Uri = "sip:bob@7s8f2k.invalid;transport=ws".try_into().unwrap();
locator
.register(
"bob",
Some("rustpbx.com"),
make_webrtc_location(
"sip:bob@7s8f2k.invalid;transport=ws",
node_a.clone(),
ws_a.clone(),
),
)
.await
.unwrap();
let dtl = DialogTargetLocator::new(locator.clone(), vec![node_a], true);
let result = dtl.locate(&contact_a).await.unwrap();
assert_eq!(
result, ws_a,
"home-node BYE with .invalid contact must deliver to local WS destination"
);
}
#[tokio::test]
async fn test_cross_cluster_multi_node_same_user_routes_correctly() {
let locator: Arc<Box<dyn Locator>> = Arc::new(Box::new(
DbLocator::new("sqlite::memory:".to_string()).await.unwrap(),
));
let node_a = sip_addr("10.0.0.5:5060", rsipstack::sip::transport::Transport::Udp);
let node_b = sip_addr("10.0.0.6:5060", rsipstack::sip::transport::Transport::Udp);
let ws_a = sip_addr(
"198.51.100.10:51234",
rsipstack::sip::transport::Transport::Wss,
);
let ws_b = sip_addr(
"198.51.100.20:57890",
rsipstack::sip::transport::Transport::Wss,
);
let contact_a: rsipstack::sip::Uri = "sip:bob@aaa123.invalid;transport=ws".try_into().unwrap();
let contact_b: rsipstack::sip::Uri = "sip:bob@bbb456.invalid;transport=ws".try_into().unwrap();
locator
.register(
"bob",
Some("rustpbx.com"),
make_webrtc_location(
"sip:bob@aaa123.invalid;transport=ws",
node_a.clone(),
ws_a.clone(),
),
)
.await
.unwrap();
locator
.register(
"bob",
Some("rustpbx.com"),
make_webrtc_location(
"sip:bob@bbb456.invalid;transport=ws",
node_b.clone(),
ws_b.clone(),
),
)
.await
.unwrap();
let dtl_a = DialogTargetLocator::new(locator.clone(), vec![node_a.clone()], true);
assert_eq!(
dtl_a.locate(&contact_a).await.unwrap(),
ws_a,
"Node A → own .invalid contact must reach local WS"
);
assert_eq!(
dtl_a.locate(&contact_b).await.unwrap(),
node_b,
"Node A → Node B's .invalid contact must route to Node B home_proxy"
);
let dtl_b = DialogTargetLocator::new(locator.clone(), vec![node_b.clone()], true);
assert_eq!(
dtl_b.locate(&contact_b).await.unwrap(),
ws_b,
"Node B → own .invalid contact must reach local WS"
);
assert_eq!(
dtl_b.locate(&contact_a).await.unwrap(),
node_a,
"Node B → Node A's .invalid contact must route to Node A home_proxy"
);
}