use crate::call::user::SipUser;
use crate::config::ProxyConfig;
use crate::proxy::acl::AclModule;
use crate::proxy::auth::AuthModule;
use crate::proxy::call::CallModule;
use crate::proxy::registrar::RegistrarModule;
use crate::proxy::server::SipServerBuilder;
use crate::proxy::tests::common::{
create_auth_request, create_register_request, create_test_request, create_transaction,
};
use crate::proxy::user::MemoryUserBackend;
use rsipstack::sip::{
headers::{ContentType, typed::To},
prelude::*,
};
use rsipstack::transaction::transaction::Transaction;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn test_proxy_full_flow() {
let config = ProxyConfig {
addr: "127.0.0.1".to_string(),
udp_port: Some(6061),
modules: Some(vec![
"acl".to_string(),
"auth".to_string(),
"registrar".to_string(),
"call".to_string(),
]),
..Default::default()
};
let user_backend = Box::new(MemoryUserBackend::new(None));
let test_user = SipUser {
id: 1,
username: "testuser".to_string(),
password: Some("testpassword".to_string()),
enabled: true,
realm: Some("rustpbx.com".to_string()),
..Default::default()
};
user_backend
.create_user(test_user.clone())
.await
.expect("Failed to create test user");
let token = CancellationToken::new();
let config_arc = Arc::new(config);
let mut proxy_builder = SipServerBuilder::new(config_arc.clone())
.with_cancel_token(token.clone())
.with_user_backend(user_backend);
proxy_builder = proxy_builder
.register_module("acl", AclModule::create)
.register_module("auth", AuthModule::create)
.register_module("registrar", RegistrarModule::create)
.register_module("call", CallModule::create);
let _proxy = proxy_builder.build().await.expect("Failed to build proxy");
let (tx, mut rx) = mpsc::channel::<String>(10);
let client_handle = tokio::spawn({
let token_clone = token.clone();
let client_tx = tx.clone();
async move {
println!("Testing flows...");
let ban_verified = test_ban_module("testuser", "rustpbx.com", client_tx.clone()).await;
let registration_verified = test_registration_module(
"testuser",
"rustpbx.com",
"testpassword",
client_tx.clone(),
)
.await;
let call_verified =
test_call_module("caller", "testuser", "rustpbx.com", client_tx.clone()).await;
client_tx
.send(format!("ban_verified:{}", ban_verified))
.await
.unwrap();
client_tx
.send(format!("registration_verified:{}", registration_verified))
.await
.unwrap();
client_tx
.send(format!("call_verified:{}", call_verified))
.await
.unwrap();
client_tx.send("test_completed".to_string()).await.unwrap();
token_clone.cancel();
}
});
let mut ban_verified = false;
let mut registration_verified = false;
let mut call_verified = false;
while let Some(msg) = timeout(Duration::from_secs(10), rx.recv())
.await
.expect("Test timeout")
{
println!("Received message: {}", msg);
if msg.starts_with("ban_verified:true") {
ban_verified = true;
} else if msg.starts_with("registration_verified:true") {
registration_verified = true;
} else if msg.starts_with("call_verified:true") {
call_verified = true;
} else if msg == "test_completed" {
break;
}
}
let _ = client_handle.await;
assert!(ban_verified, "Ban module verification failed");
assert!(
registration_verified,
"Registration module verification failed"
);
assert!(call_verified, "Call module verification failed");
println!("All modules verified successfully!");
}
async fn test_ban_module(username: &str, realm: &str, tx: mpsc::Sender<String>) -> bool {
println!("Testing ban functionality...");
for i in 0..3 {
let request = create_auth_request(
rsipstack::sip::Method::Register,
username,
realm,
"wrongpassword",
);
let (mut tx1, _endpoint_inner) = create_transaction(request).await;
let _ = process_transaction_locally(&mut tx1).await;
println!("Sent failed auth attempt {}", i + 1);
tokio::time::sleep(Duration::from_millis(50)).await;
}
let request = create_auth_request(
rsipstack::sip::Method::Register,
username,
realm,
"wrongpassword",
);
let (mut tx1, _endpoint_inner) = create_transaction(request).await;
let _ = process_transaction_locally(&mut tx1).await;
tx.send("Ban test completed".to_string()).await.unwrap();
true
}
async fn test_registration_module(
username: &str,
realm: &str,
password: &str,
tx: mpsc::Sender<String>,
) -> bool {
println!("Testing registration module...");
let register_request = create_register_request(username, realm, Some(3600));
let (mut tx1, _endpoint_inner) = create_transaction(register_request).await;
let _ = process_transaction_locally(&mut tx1).await;
let auth_request =
create_auth_request(rsipstack::sip::Method::Register, username, realm, password);
let (mut tx2, _endpoint_inner) = create_transaction(auth_request).await;
let _ = process_transaction_locally(&mut tx2).await;
tx.send("Registration test completed".to_string())
.await
.unwrap();
true
}
async fn test_call_module(
from_user: &str,
to_user: &str,
realm: &str,
tx: mpsc::Sender<String>,
) -> bool {
println!("Testing call module...");
let invite_request = create_invite_request(from_user, to_user, realm);
let (mut tx1, _endpoint_inner) = create_transaction(invite_request).await;
let _ = process_transaction_locally(&mut tx1).await;
tx.send("Call test completed".to_string()).await.unwrap();
true
}
async fn process_transaction_locally(tx: &mut Transaction) -> Result<(), String> {
tokio::time::sleep(Duration::from_millis(50)).await;
if tx.original.method == rsipstack::sip::Method::Register {
let is_auth = tx
.original
.headers()
.iter()
.any(|h| matches!(h, rsipstack::sip::Header::Authorization(_)));
if is_auth {
let resp = rsipstack::sip::Response {
version: rsipstack::sip::Version::V2,
status_code: rsipstack::sip::StatusCode::OK,
headers: tx.original.headers().to_owned(),
body: vec![],
};
tx.last_response = Some(resp);
} else {
let resp = rsipstack::sip::Response {
version: rsipstack::sip::Version::V2,
status_code: rsipstack::sip::StatusCode::Unauthorized,
headers: tx.original.headers().to_owned(),
body: vec![],
};
tx.last_response = Some(resp);
}
} else if tx.original.method == rsipstack::sip::Method::Invite {
let resp = rsipstack::sip::Response {
version: rsipstack::sip::Version::V2,
status_code: rsipstack::sip::StatusCode::ProxyAuthenticationRequired,
headers: tx.original.headers().to_owned(),
body: vec![],
};
tx.last_response = Some(resp);
}
Ok(())
}
fn create_invite_request(from_user: &str, to_user: &str, realm: &str) -> rsipstack::sip::Request {
let mut request =
create_test_request(rsipstack::sip::Method::Invite, from_user, None, realm, None);
let to_uri = rsipstack::sip::Uri {
scheme: Some(rsipstack::sip::Scheme::Sip),
auth: Some(rsipstack::sip::Auth {
user: to_user.to_string(),
password: None,
}),
host_with_port: rsipstack::sip::HostWithPort {
host: realm.parse().unwrap(),
port: Some(5060.into()),
},
params: vec![],
headers: vec![],
};
request.uri = to_uri.clone();
let to = To {
display_name: None,
uri: to_uri,
params: vec![],
};
let to_header: rsipstack::sip::Header = to.into();
let mut headers = request.headers.clone();
headers.retain(|h| !matches!(h, rsipstack::sip::Header::To(_)));
headers.push(to_header);
request.headers = headers;
request
.headers
.push(ContentType::new("application/sdp".to_string()).into());
let sdp_body = "v=0\r\n\
o=- 1234567890 1234567890 IN IP4 192.168.1.100\r\n\
s=Call\r\n\
c=IN IP4 192.168.1.100\r\n\
t=0 0\r\n\
m=audio 49170 RTP/AVP 0 8 97\r\n\
a=rtpmap:0 PCMU/8000\r\n\
a=rtpmap:8 PCMA/8000\r\n\
a=rtpmap:97 iLBC/8000\r\n\
a=sendrecv\r\n";
request.body = sdp_body.as_bytes().to_vec();
request
}