use super::{
FnCreateProxyModule, ProxyAction, ProxyModule,
data::ProxyDataContext,
locator::{Locator, create_locator_with_migrate},
user::{UserBackend, build_user_backend},
};
use crate::{
auth::{jwt_auth_backend::JwtAuthBackend, jwt_validator::JwtValidator},
auto_external_ip,
call::{TransactionCookie, policy::FrequencyLimiter},
callrecord::{
CallRecordSender,
sipflow::{SipFlow, SipFlowBuilder},
},
config::{ProxyConfig, RtpConfig, SipFlowConfig},
proxy::{
FnCreateRouteInvite,
active_call_registry::ActiveProxyCallRegistry,
auth::AuthBackend,
call::{CallRouter, DialplanInspector},
cluster_event::{ClusterEventHub, ClusterEventModule},
locator::{DialogTargetLocator, LocatorEventSender, TransportInspectorLocator},
pre_auth_registry::PreAuthRegistry,
presence::PresenceManager,
},
sipflow::SipFlowBackend,
sipflow::backend::create_backend,
};
use anyhow::{Result, anyhow};
use rsipstack::sip::prelude::HeadersExt;
use rsipstack::sip::{Auth, Param, Transport};
use rsipstack::{
EndpointBuilder,
dialog::dialog_layer::DialogLayer,
transaction::{
Endpoint, TransactionReceiver,
endpoint::{EndpointOption, MessageInspector},
transaction::Transaction,
},
transport::{
TcpListenerConnection, TlsConfig, TlsListenerConnection, TransportLayer,
WebSocketListenerConnection, udp::UdpConnection,
},
};
use sea_orm::DatabaseConnection;
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, SocketAddr},
sync::{
Arc, OnceLock,
atomic::{AtomicUsize, Ordering},
},
time::{Duration, Instant},
};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
pub struct SipServerInner {
pub cancel_token: CancellationToken,
pub rtp_config: RtpConfig,
pub proxy_config: Arc<ProxyConfig>,
pub data_context: Arc<ProxyDataContext>,
pub database: Option<DatabaseConnection>,
pub user_backend: Box<dyn UserBackend>,
pub auth_backend: Vec<Box<dyn AuthBackend>>,
pub call_router: Option<Box<dyn CallRouter>>,
pub dialplan_inspectors: Vec<Box<dyn DialplanInspector>>,
pub locator: Arc<Box<dyn Locator>>,
pub callrecord_sender: Option<CallRecordSender>,
pub endpoint: Endpoint,
pub dialog_layer: Arc<DialogLayer>,
pub create_route_invites: Vec<FnCreateRouteInvite>,
pub ignore_out_of_dialog_request: bool,
pub locator_events: Option<LocatorEventSender>,
pub sipflow_config: Option<SipFlowConfig>,
pub sip_flow: Option<SipFlow>,
pub active_call_registry: Arc<ActiveProxyCallRegistry>,
pub frequency_limiter: Option<Arc<dyn FrequencyLimiter>>,
pub call_record_hooks: Arc<Vec<Box<dyn crate::callrecord::CallRecordHook>>>,
pub runnings_tx: Arc<AtomicUsize>,
pub storage: Option<crate::storage::Storage>,
pub presence_manager: Arc<PresenceManager>,
pub addon_registry: Option<Arc<crate::addons::registry::AddonRegistry>>,
pub rwi_gateway: Option<crate::rwi::RwiGatewayRef>,
pub ivr_trace: Option<Arc<crate::call::app::ivr::trace::IvrTraceCollector>>,
pub tls_listener: Option<rsipstack::transport::TlsListenerConnection>,
pub queue_manager: Arc<crate::call::runtime::QueueManager>,
pub conference_manager: Arc<crate::call::runtime::ConferenceManager>,
pub agent_registry: Option<Arc<dyn crate::call::app::agent_registry::AgentRegistry>>,
pub queue_location_enricher: Option<Arc<dyn crate::proxy::call::QueueLocationEnricher>>,
pub transfer_notify_subscribers:
Arc<tokio::sync::Mutex<Vec<crate::call::domain::ReferNotifyTx>>>,
pub cluster_event_hub: Option<Arc<ClusterEventHub>>,
pub cluster_peer_ips: Vec<IpAddr>,
pub media_policy: Arc<dyn crate::call::MediaPolicy>,
pub trunk_health: Option<crate::proxy::trunk_health::HealthStateMap>,
pub session_hooks: Arc<Vec<Arc<dyn crate::proxy::proxy_call::session_hooks::CallSessionHook>>>,
pub pre_auth_registry: Option<Arc<PreAuthRegistry>>,
pub contact_username: String,
pub rtc_cname: String,
pub media_engine: crate::media::engine::MediaEngine,
}
fn random_hex() -> String {
format!("{:016x}", rand::random::<u64>())
}
pub type SipServerRef = Arc<SipServerInner>;
#[derive(Clone)]
pub struct SipServer {
pub inner: SipServerRef,
modules: Arc<Vec<Box<dyn ProxyModule>>>,
}
pub struct SipServerBuilder {
rtp_config: Option<RtpConfig>,
config: Arc<ProxyConfig>,
cancel_token: Option<CancellationToken>,
user_backend: Option<Box<dyn UserBackend>>,
auth_backend: Vec<Box<dyn AuthBackend>>,
call_router: Option<Box<dyn CallRouter>>,
module_fns: HashMap<String, FnCreateProxyModule>,
locator: Option<Box<dyn Locator>>,
callrecord_sender: Option<CallRecordSender>,
message_inspectors: Vec<Box<dyn MessageInspector>>,
dialplan_inspectors: Vec<Box<dyn DialplanInspector>>,
create_route_invites: Vec<FnCreateRouteInvite>,
database: Option<DatabaseConnection>,
data_context: Option<Arc<ProxyDataContext>>,
ignore_out_of_dialog_request: bool,
locator_events: Option<LocatorEventSender>,
frequency_limiter: Option<Arc<dyn FrequencyLimiter>>,
call_record_hooks: Vec<Box<dyn crate::callrecord::CallRecordHook>>,
storage: Option<crate::storage::Storage>,
sipflow_config: Option<SipFlowConfig>,
sipflow_backend: Option<Arc<dyn SipFlowBackend>>,
no_bind: bool,
addon_registry: Option<Arc<crate::addons::registry::AddonRegistry>>,
rwi_gateway: Option<crate::rwi::RwiGatewayRef>,
ivr_trace: Option<Arc<crate::call::app::ivr::trace::IvrTraceCollector>>,
agent_registry: Option<Arc<dyn crate::call::app::agent_registry::AgentRegistry>>,
queue_location_enricher: Option<Arc<dyn crate::proxy::call::QueueLocationEnricher>>,
skip_migrate: bool,
cluster_peers: Vec<SocketAddr>,
media_policy: Option<Arc<dyn crate::call::MediaPolicy>>,
trunk_health: Option<crate::proxy::trunk_health::HealthStateMap>,
session_hooks: Vec<Arc<dyn crate::proxy::proxy_call::session_hooks::CallSessionHook>>,
}
impl SipServerBuilder {
pub fn new(config: Arc<ProxyConfig>) -> Self {
Self {
config,
rtp_config: None,
cancel_token: None,
user_backend: None,
auth_backend: Vec::new(),
call_router: None,
module_fns: HashMap::new(),
locator: None,
callrecord_sender: None,
message_inspectors: Vec::new(),
dialplan_inspectors: Vec::new(),
create_route_invites: Vec::new(),
database: None,
data_context: None,
ignore_out_of_dialog_request: true,
locator_events: None,
frequency_limiter: None,
call_record_hooks: Vec::new(),
storage: None,
sipflow_config: None,
sipflow_backend: None,
no_bind: false,
addon_registry: None,
rwi_gateway: None,
ivr_trace: None,
agent_registry: None,
queue_location_enricher: None,
skip_migrate: false,
cluster_peers: Vec::new(),
media_policy: None,
trunk_health: None,
session_hooks: Vec::new(),
}
}
pub fn with_trunk_health(mut self, health: crate::proxy::trunk_health::HealthStateMap) -> Self {
self.trunk_health = Some(health);
self
}
pub fn with_media_policy(mut self, policy: Arc<dyn crate::call::MediaPolicy>) -> Self {
self.media_policy = Some(policy);
self
}
pub fn with_cluster_peers(mut self, peers: Vec<SocketAddr>) -> Self {
self.cluster_peers = peers;
self
}
pub fn with_sipflow_config(mut self, config: Option<SipFlowConfig>) -> Self {
self.sipflow_config = config;
self
}
pub fn with_sipflow_backend(mut self, backend: Option<Arc<dyn SipFlowBackend>>) -> Self {
self.sipflow_backend = backend;
self
}
pub fn with_no_bind(mut self, no_bind: bool) -> Self {
self.no_bind = no_bind;
self
}
pub fn with_user_backend(mut self, user_backend: Box<dyn UserBackend>) -> Self {
self.user_backend = Some(user_backend);
self
}
pub fn with_ignore_out_of_dialog_request(mut self, ignore: bool) -> Self {
self.ignore_out_of_dialog_request = ignore;
self
}
pub fn with_auth_backend(mut self, auth_backend: Box<dyn AuthBackend>) -> Self {
self.auth_backend.push(auth_backend);
self
}
pub fn with_call_router(mut self, call_router: Box<dyn CallRouter>) -> Self {
self.call_router = Some(call_router);
self
}
pub fn with_dialplan_inspector(
mut self,
dialplan_inspector: Box<dyn DialplanInspector>,
) -> Self {
self.dialplan_inspectors.push(dialplan_inspector);
self
}
pub fn with_locator(mut self, locator: Box<dyn Locator>) -> Self {
self.locator = Some(locator);
self
}
pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
self.cancel_token = Some(cancel_token);
self
}
pub fn with_create_route_invite(mut self, f: FnCreateRouteInvite) -> Self {
self.create_route_invites.push(f);
self
}
pub fn register_module(mut self, name: &str, module_fn: FnCreateProxyModule) -> Self {
self.module_fns.insert(name.to_lowercase(), module_fn);
self
}
pub fn with_callrecord_sender(mut self, callrecord_sender: Option<CallRecordSender>) -> Self {
self.callrecord_sender = callrecord_sender;
self
}
pub fn with_message_inspector(mut self, inspector: Box<dyn MessageInspector>) -> Self {
self.message_inspectors.push(inspector);
self
}
pub fn with_rtp_config(mut self, config: RtpConfig) -> Self {
self.rtp_config = Some(config);
self
}
pub fn with_database_connection(mut self, db: DatabaseConnection) -> Self {
self.database = Some(db);
self
}
pub fn with_data_context(mut self, context: Arc<ProxyDataContext>) -> Self {
self.data_context = Some(context);
self
}
pub fn with_locator_events(mut self, locator_events: Option<LocatorEventSender>) -> Self {
self.locator_events = locator_events;
self
}
pub fn with_frequency_limiter(mut self, limiter: Arc<dyn FrequencyLimiter>) -> Self {
self.frequency_limiter = Some(limiter);
self
}
pub fn with_call_record_hooks(
mut self,
hooks: Vec<Box<dyn crate::callrecord::CallRecordHook>>,
) -> Self {
self.call_record_hooks = hooks;
self
}
pub fn with_storage(mut self, storage: crate::storage::Storage) -> Self {
self.storage = Some(storage);
self
}
pub fn with_addon_registry(
mut self,
registry: Option<Arc<crate::addons::registry::AddonRegistry>>,
) -> Self {
self.addon_registry = registry;
self
}
pub fn with_rwi_gateway(mut self, gateway: crate::rwi::RwiGatewayRef) -> Self {
self.rwi_gateway = Some(gateway);
self
}
pub fn with_agent_registry(
mut self,
registry: Arc<dyn crate::call::app::agent_registry::AgentRegistry>,
) -> Self {
self.agent_registry = Some(registry);
self
}
pub fn with_queue_location_enricher(
mut self,
enricher: Arc<dyn crate::proxy::call::QueueLocationEnricher>,
) -> Self {
self.queue_location_enricher = Some(enricher);
self
}
pub fn with_session_hook(
mut self,
hook: Arc<dyn crate::proxy::proxy_call::session_hooks::CallSessionHook>,
) -> Self {
self.session_hooks.push(hook);
self
}
pub fn with_skip_migrate(mut self, skip: bool) -> Self {
self.skip_migrate = skip;
self
}
pub async fn build(mut self) -> Result<SipServer> {
let user_backend = if let Some(backend) = self.user_backend {
backend
} else {
match build_user_backend(self.config.as_ref()).await {
Ok(backend) => backend,
Err(e) => {
warn!(
"failed to create user backend: {} {:?}",
e, &self.config.user_backends
);
return Err(e);
}
}
};
let mut auth_backend = self.auth_backend;
if let Some(ref jwt_cfg) = self.config.jwt_auth {
if jwt_cfg.enabled {
let validator = JwtValidator::new(jwt_cfg);
let local_ub = if jwt_cfg.check_local_user {
match build_user_backend(self.config.as_ref()).await {
Ok(ub) => Some(ub),
Err(e) => {
warn!("failed to create user backend for JWT auth: {}", e);
None
}
}
} else {
None
};
let jwt_backend =
JwtAuthBackend::new(validator, local_ub, jwt_cfg.sip_header_name.clone());
info!(
header = %jwt_cfg.sip_header_name,
check_local = jwt_cfg.check_local_user,
"JWT auth backend enabled"
);
auth_backend.push(Box::new(jwt_backend));
}
}
for backend_cfg in &self.config.user_backends {
if let crate::config::UserBackendConfig::Http {
url,
method,
username_field,
realm_field,
request_uri_field,
headers,
sip_headers,
token_header: Some(token_hdr),
http_timeout_ms,
http_retry_count,
http_retry_delay_ms,
token_cache_ttl_secs,
token_cache_size,
} = backend_cfg
{
let http_backend = crate::proxy::user_http::HttpUserBackend::new(
url,
method,
username_field,
realm_field,
request_uri_field,
headers,
sip_headers,
&Some(token_hdr.clone()),
http_timeout_ms,
http_retry_count,
http_retry_delay_ms,
);
let cache_ttl = Duration::from_secs(token_cache_ttl_secs.unwrap_or(0));
let cache_size = token_cache_size.unwrap_or(10000);
let token_backend = crate::auth::http_token_auth_backend::HttpTokenAuthBackend::new(
http_backend,
token_hdr.clone(),
cache_ttl,
cache_size,
);
info!(
header = %token_hdr,
cache_ttl_secs = token_cache_ttl_secs.unwrap_or(0),
"HTTP token auth backend enabled"
);
auth_backend.push(Box::new(token_backend));
}
}
let pre_auth_registry = if self.config.jwt_auth.as_ref().is_some_and(|c| c.enabled) {
Some(PreAuthRegistry::new())
} else {
None
};
let locator = if let Some(locator) = self.locator {
locator
} else {
match create_locator_with_migrate(&self.config.locator, !self.skip_migrate).await {
Ok(locator) => locator,
Err(e) => {
warn!("failed to create locator: {} {:?}", e, self.config.locator);
return Err(e);
}
}
};
let locator = Arc::new(locator);
let mut rtp_config = self.rtp_config.unwrap_or_default();
let cancel_token = self.cancel_token.unwrap_or_default();
let config = self.config.clone();
#[cfg(unix)]
log_rlimit_nofile();
let transport_layer = TransportLayer::new(cancel_token.clone());
let mut tls_listener_clone: Option<rsipstack::transport::TlsListenerConnection> = None;
let mut local_addrs: HashSet<SocketAddr> = HashSet::new();
if !self.no_bind {
let local_addr = config
.addr
.parse::<IpAddr>()
.map_err(|e| anyhow!("failed to parse local ip address: {}", e))?;
if rtp_config.external_ip.is_none() {
if let Some(ref url) = rtp_config.auto_external_ip {
match auto_external_ip::detect_external_ip(url).await {
Ok(ip) => {
warn!(
"auto_external_ip: detected {} from '{}'",
ip,
if url.is_empty() {
auto_external_ip::DEFAULT_AUTO_EXTERNAL_IP_URL
} else {
url
}
);
rtp_config.external_ip = Some(ip.to_string());
}
Err(e) => {
warn!("auto_external_ip: detection failed: {}", e);
}
}
}
}
let external_ip = match rtp_config.external_ip {
Some(ref s) => Some(
s.parse::<IpAddr>()
.map_err(|e| anyhow!("failed to parse external ip address {}: {}", s, e))?,
),
None => None,
};
if config.all_udp_ports().is_empty()
&& config.tcp_port.is_none()
&& config.tls_port.is_none()
&& config.ws_port.is_none()
{
return Err(anyhow::anyhow!(
"No port specified, please specify at least one port: udp, tcp, tls, ws"
));
}
for udp_port in config.all_udp_ports() {
let local_addr = SocketAddr::new(local_addr, udp_port);
let external_addr = external_ip
.as_ref()
.map(|ip| SocketAddr::new(*ip, udp_port));
let udp_conn = UdpConnection::create_connection(
local_addr,
external_addr,
Some(cancel_token.child_token()),
)
.await
.map_err(|e| {
anyhow!("Failed to create proxy UDP connection {} {}", local_addr, e)
})?;
info!("start proxy, udp port: {}", udp_conn.get_addr());
transport_layer.add_transport(udp_conn.into());
local_addrs.insert(local_addr);
}
if let Some(tcp_port) = config.tcp_port {
let local_addr = SocketAddr::new(local_addr, tcp_port);
let external_addr = external_ip
.as_ref()
.map(|ip| SocketAddr::new(*ip, tcp_port));
let tcp_conn = TcpListenerConnection::new(local_addr.into(), external_addr)
.await
.map_err(|e| anyhow!("Failed to create TCP connection: {}", e))?;
info!("start proxy, tcp port: {}", tcp_conn.get_addr());
transport_layer.add_transport(tcp_conn.into());
local_addrs.insert(local_addr);
}
if let Some(tls_port) = config.tls_port {
let local_addr = SocketAddr::new(local_addr, tls_port);
let external_addr = external_ip
.as_ref()
.map(|ip| SocketAddr::new(*ip, tls_port));
let cert_path = config
.ssl_certificate
.as_ref()
.ok_or_else(|| anyhow!("ssl_certificate is required for tls transport"))?;
let key_path = config
.ssl_private_key
.as_ref()
.ok_or_else(|| anyhow!("ssl_private_key is required for tls transport"))?;
let mut well_done = true;
if !std::path::Path::new(cert_path).exists() {
well_done = false;
warn!("ssl_certificate file does not exist: {}", cert_path);
}
if !std::path::Path::new(key_path).exists() {
well_done = false;
warn!("ssl_private_key file does not exist: {}", key_path);
}
if well_done {
match async {
let cert = tokio::fs::read(cert_path)
.await
.map_err(|e| anyhow!("failed to read cert: {}", e))?;
let key = tokio::fs::read(key_path)
.await
.map_err(|e| anyhow!("failed to read key: {}", e))?;
Ok::<_, anyhow::Error>((cert, key))
}
.await
{
Ok((cert_data, key_data)) => {
let tls_config = TlsConfig {
cert: Some(cert_data),
key: Some(key_data),
client_cert: None,
client_key: None,
ca_certs: None,
sni_hostname: None,
};
match TlsListenerConnection::new(
local_addr.into(),
external_addr,
tls_config,
)
.await
{
Ok(conn) => {
info!(
"start proxy, tls port: {} cert: {}, key: {}",
conn.get_addr(),
cert_path,
key_path
);
tls_listener_clone = Some(conn.clone());
transport_layer.add_transport(conn.into());
local_addrs.insert(local_addr);
}
Err(e) => {
warn!("failed to create TLS connection: {}", e);
}
};
}
Err(e) => {
warn!("failed to read TLS files: {}", e);
}
}
} else {
warn!("skip starting TLS transport due to missing certificate or key");
}
}
if let Some(ws_port) = config.ws_port {
let local_addr = SocketAddr::new(local_addr, ws_port);
let external_addr = external_ip.as_ref().map(|ip| SocketAddr::new(*ip, ws_port));
let ws_conn =
WebSocketListenerConnection::new(local_addr.into(), external_addr, false)
.await
.map_err(|e| anyhow!("Failed to create WS connection: {}", e))?;
info!("start proxy, ws port: {}", ws_conn.get_addr());
transport_layer.add_transport(ws_conn.into());
local_addrs.insert(local_addr);
}
}
let mut endpoint_builder = EndpointBuilder::new();
if let Some(ref user_agent) = config.useragent {
endpoint_builder.with_user_agent(user_agent.as_str());
}
let mut endpoint_option = EndpointOption {
callid_suffix: config.callid_suffix.clone(),
..Default::default()
};
if let Some(t1_timer) = config.t1_timer {
endpoint_option.t1 = Duration::from_millis(t1_timer);
}
if let Some(t1x64_timer) = config.t1x64_timer {
endpoint_option.t1x64 = Duration::from_millis(t1x64_timer);
}
let endpoint_local_addrs = transport_layer.get_addrs();
let mut endpoint_builder = endpoint_builder
.with_cancel_token(cancel_token.clone())
.with_option(endpoint_option)
.with_transport_layer(transport_layer);
let advertised_methods = Arc::new(OnceLock::new());
let mut inspectors: Vec<Box<dyn MessageInspector>> = self.message_inspectors;
if self.config.nat_fix {
inspectors.insert(0, Box::new(super::nat::NatInspector::new()));
}
inspectors.push(Box::new(
super::capability_headers::CapabilityHeadersInspector::new(advertised_methods.clone()),
));
let mut sip_flow = None;
let sipflow_backend = self.sipflow_backend.take().or_else(|| {
self.sipflow_config
.as_ref()
.and_then(|cfg| create_backend(cfg).ok())
.map(|b| Arc::from(b) as Arc<dyn SipFlowBackend>)
});
if let Some(backend) = sipflow_backend {
info!("Sipflow backend initialized");
let local_addr_strs: Vec<String> = endpoint_local_addrs
.iter()
.map(|a| a.addr.to_string())
.collect();
let sflow = SipFlowBuilder::new()
.with_backend(backend)
.with_local_addrs(local_addr_strs)
.build();
sip_flow = Some(sflow.clone());
inspectors.push(Box::new(sflow));
}
endpoint_builder =
endpoint_builder.with_inspector(
Box::new(CompositeMessageInspector { inspectors }) as Box<dyn MessageInspector>
);
let locator_events = self.locator_events.unwrap_or_else(|| {
let (tx, _) = tokio::sync::broadcast::channel(12);
tx
});
let locator_local_addrs = endpoint_local_addrs;
let cluster_enabled = !self.cluster_peers.is_empty();
endpoint_builder = endpoint_builder
.with_target_locator(DialogTargetLocator::new(
locator.clone(),
locator_local_addrs,
cluster_enabled,
))
.with_transport_inspector(TransportInspectorLocator::new(
locator.clone(),
locator_events.clone(),
));
let endpoint = endpoint_builder.build();
let mut call_router = self.call_router;
if call_router.is_none()
&& let Some(http_router_config) = &self.config.http_router
{
call_router = Some(Box::new(crate::proxy::routing::http::HttpCallRouter::new(
http_router_config.clone(),
rtp_config.clone(),
self.config.media_proxy,
)));
}
let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
let database = self.database.clone();
let data_context = if let Some(ref context) = self.data_context {
context.clone()
} else {
let dc = Arc::new(
ProxyDataContext::new(self.config.clone(), database.clone())
.await
.map_err(|err| anyhow!("failed to initialize proxy data context: {err}"))?,
);
self.data_context = Some(dc.clone());
dc
};
data_context
.trunk_registrar()
.set_endpoint(endpoint.inner.clone());
{
let trunks = data_context.trunks_snapshot();
data_context.trunk_registrar().reconcile(&trunks).await;
}
let active_call_registry = Arc::new(ActiveProxyCallRegistry::new());
let presence_manager = Arc::new(PresenceManager::new(database.clone()));
presence_manager.load_from_db().await.ok();
let cluster_peer_ips: Vec<IpAddr> = self.cluster_peers.iter().map(|p| p.ip()).collect();
let cluster_event_hub: Option<Arc<ClusterEventHub>> = if !self.cluster_peers.is_empty() {
let local_cluster_ip = rtp_config
.external_ip
.as_deref()
.unwrap_or(&config.addr)
.parse::<IpAddr>()
.map_err(|e| anyhow!("failed to parse cluster local ip address: {}", e))?;
let local_cluster_port = config.udp_port.unwrap_or(5060);
let local_cluster_addr = SocketAddr::new(local_cluster_ip, local_cluster_port);
Some(Arc::new(ClusterEventHub::new(
locator_events.clone(),
presence_manager.clone(),
endpoint.inner.clone(),
local_cluster_addr,
self.cluster_peers.clone(),
cancel_token.child_token(),
)))
} else {
None
};
let queue_manager = Arc::new(crate::call::runtime::QueueManager::new());
let conference_manager = Arc::new(crate::call::runtime::ConferenceManager::new());
let trunk_health_states: crate::proxy::trunk_health::HealthStateMap =
Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
self.trunk_health = Some(trunk_health_states.clone());
let inner = Arc::new(SipServerInner {
rtp_config,
proxy_config: self.config.clone(),
cancel_token,
data_context,
database: database.clone(),
user_backend,
auth_backend,
call_router,
locator: locator.clone(),
callrecord_sender: self.callrecord_sender,
endpoint,
dialog_layer,
dialplan_inspectors: self.dialplan_inspectors,
create_route_invites: self.create_route_invites,
ignore_out_of_dialog_request: self.ignore_out_of_dialog_request,
locator_events: Some(locator_events),
sipflow_config: self.sipflow_config.clone(),
sip_flow,
active_call_registry,
frequency_limiter: self.frequency_limiter,
call_record_hooks: Arc::new(self.call_record_hooks),
runnings_tx: Arc::new(AtomicUsize::new(0)),
storage: self.storage,
presence_manager,
addon_registry: self.addon_registry,
rwi_gateway: self.rwi_gateway,
ivr_trace: self.ivr_trace,
tls_listener: tls_listener_clone,
queue_manager,
conference_manager,
agent_registry: self.agent_registry,
queue_location_enricher: self.queue_location_enricher,
transfer_notify_subscribers: Arc::new(tokio::sync::Mutex::new(Vec::new())),
cluster_event_hub,
cluster_peer_ips,
media_policy: self
.media_policy
.unwrap_or_else(|| Arc::new(crate::call::DefaultMediaPolicy)),
trunk_health: self.trunk_health.clone(),
session_hooks: Arc::new(self.session_hooks),
pre_auth_registry,
contact_username: self
.config
.contact_username
.clone()
.unwrap_or_else(random_hex),
rtc_cname: self.config.rtc_cname.clone().unwrap_or_else(random_hex),
media_engine: {
use crate::media::engine::{MediaEngine, MediaEngineConfig};
let (engine, handle) = MediaEngine::new(MediaEngineConfig {
command_channel_capacity: self.config.media_cmd_channel_capacity,
event_channel_capacity: self.config.media_event_channel_capacity,
});
let _task = engine.spawn(handle);
engine
},
});
let inner_weak = Arc::downgrade(&inner);
inner.locator.set_realm_checker(Arc::new(move |realm| {
let inner = inner_weak.clone();
let realm = realm.to_string();
Box::pin(async move {
if let Some(inner) = inner.upgrade() {
inner.is_same_realm(&realm).await
} else {
false
}
})
}));
let mut allow_methods = Vec::new();
let mut modules = Vec::new();
if let Some(hub) = inner.cluster_event_hub.as_ref() {
let mut module =
ClusterEventModule::create(hub.clone(), &self.cluster_peers, local_addrs.clone());
let _ = module.on_start().await; allow_methods.extend(module.allow_methods());
modules.push(module);
}
if let Some(load_modules) = self.config.modules.as_ref() {
let start_time = Instant::now();
for name in load_modules.iter() {
if let Some(module_fn) = self.module_fns.get(name) {
let module_start_time = Instant::now();
let mut module = match module_fn(inner.clone(), self.config.clone()) {
Ok(module) => module,
Err(e) => {
warn!("failed to create module {}: {}", name, e);
continue;
}
};
match module.on_start().await {
Ok(_) => {}
Err(e) => {
warn!("failed to start module {}: {}", name, e);
continue;
}
}
allow_methods.extend(module.allow_methods());
modules.push(module);
debug!(
"module {} loaded in {:?}",
name,
module_start_time.elapsed()
);
} else {
warn!("module {} not found", name);
}
}
let mut i = 0;
while i < allow_methods.len() {
let mut j = i + 1;
while j < allow_methods.len() {
if allow_methods[i] == allow_methods[j] {
allow_methods.remove(j);
} else {
j += 1;
}
}
i += 1;
}
info!(
"modules loaded in {:?} modules: {:?} allows: {}",
start_time.elapsed(),
modules.iter().map(|m| m.name()).collect::<Vec<_>>(),
allow_methods
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>()
.join(",")
);
}
if let Some(ref dc) = self.data_context {
let local_sip = format!(
"{}:{}",
self.config.addr,
self.config.udp_port.unwrap_or(5060),
);
let ep = inner.endpoint.inner.clone();
let dc = dc.clone();
crate::proxy::trunk_health::spawn_health_loop(
move || dc.trunks_snapshot(),
trunk_health_states,
ep,
local_sip,
30u64,
inner.cancel_token.clone(),
);
}
advertised_methods
.set(allow_methods.clone())
.map_err(|_| anyhow!("advertised SIP methods already initialized"))?;
inner.endpoint.inner.allows.lock().replace(allow_methods);
Ok(SipServer {
inner,
modules: Arc::new(modules),
})
}
}
impl SipServer {
pub fn get_tls_listener(&self) -> Option<rsipstack::transport::TlsListenerConnection> {
self.inner.tls_listener.clone()
}
pub async fn serve(&self) -> Result<()> {
let incoming = self.inner.endpoint.incoming_transactions()?;
let cancel_token = self.inner.cancel_token.clone();
if let Some(webhook_config) = &self.inner.proxy_config.locator_webhook
&& let Some(events) = &self.inner.locator_events
{
let rx = events.subscribe();
crate::utils::spawn(super::locator_webhook::handle_locator_webhook(
webhook_config.clone(),
rx,
));
}
let registry = self.inner.active_call_registry.clone();
let cleanup_cancel = cancel_token.clone();
crate::utils::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cleanup_cancel.cancelled() => break,
_ = interval.tick() => {
let removed = registry.cleanup_stale(std::time::Duration::from_secs(3600));
if removed > 0 {
tracing::warn!("Cleaned up {} stale registry entries", removed);
}
}
}
}
});
tokio::select! {
_ = cancel_token.cancelled() => {
info!("cancelled");
}
_ = self.inner.endpoint.serve() => {
info!("endpoint finished");
}
_ = self.handle_incoming(incoming) => {
info!("incoming transactions stopped");
}
};
for module in self.modules.iter() {
match module.on_stop().await {
Ok(_) => {}
Err(e) => {
warn!("failed to stop module {}: {}", module.name(), e);
}
}
}
info!("stopped");
Ok(())
}
pub fn stop(&self) {
self.inner.cancel_token.cancel();
}
pub fn get_inner(&self) -> SipServerRef {
self.inner.clone()
}
pub fn get_modules(&self) -> impl Iterator<Item = &Box<dyn ProxyModule>> {
self.modules.iter()
}
pub fn get_cancel_token(&self) -> CancellationToken {
self.inner.cancel_token.clone()
}
async fn handle_incoming(&self, mut incoming: TransactionReceiver) -> Result<()> {
let mut tx_count: u64 = 0;
while let Some(mut tx) = incoming.recv().await {
crate::metrics::transaction::received();
tx_count += 1;
if tx_count % 100 == 0 {
let stats = self.inner.endpoint.inner.get_stats();
crate::metrics::transaction::set_endpoint_running(stats.running_transactions);
crate::metrics::transaction::set_endpoint_finished(stats.finished_transactions);
crate::metrics::transaction::set_endpoint_waiting_ack(stats.waiting_ack);
crate::metrics::transaction::set_running(
self.inner.runnings_tx.load(Ordering::Relaxed),
);
}
let modules = self.modules.clone();
let token = tx
.connection
.as_ref()
.and_then(|c| c.cancel_token())
.unwrap_or_else(|| self.inner.cancel_token.clone())
.child_token();
let runnings_tx = self.inner.runnings_tx.clone();
if let Some(max_concurrency) = self.inner.proxy_config.max_concurrency
&& runnings_tx.load(Ordering::Relaxed) >= max_concurrency
{
info!(
key = %tx.key,
runnings = runnings_tx.load(Ordering::Relaxed),
"max concurrency reached, not process this transaction"
);
crate::metrics::transaction::rejected("max_concurrency");
tx.reply(rsipstack::sip::StatusCode::ServiceUnavailable)
.await
.ok();
continue;
}
if self.inner.ignore_out_of_dialog_request
&& matches!(
tx.original.method,
rsipstack::sip::Method::Options
| rsipstack::sip::method::Method::Info
| rsipstack::sip::method::Method::Refer
| rsipstack::sip::method::Method::Update
)
{
let to_tag = tx
.original
.to_header()
.and_then(|to| to.tag())
.ok()
.flatten();
if to_tag.is_none() {
let via_ip = crate::proxy::routing::extract_via_ip(&tx.original);
let via_ip_str = via_ip
.map(|ip| ip.to_string())
.unwrap_or_else(|| "unknown".to_string());
if tx.original.method == rsipstack::sip::Method::Options {
let is_trunk = if let Some(ref ip) = via_ip {
self.inner.data_context.find_trunk_by_ip(ip).await.is_some()
} else {
false
};
if is_trunk {
info!(key = %tx.key, via_ip = %via_ip_str, "responding 200 OK OPTIONS (trunk health probe)");
tx.reply(rsipstack::sip::StatusCode::OK).await.ok();
continue;
}
}
debug!(key = %tx.key, via_ip = %via_ip_str, "ignoring out-of-dialog {} request", tx.original.method);
continue;
}
}
crate::utils::spawn(async move {
runnings_tx.fetch_add(1, Ordering::Relaxed);
let start_time = Instant::now();
let cookie = TransactionCookie::from(&tx.key);
let guard = token.clone().drop_guard();
select! {
r = Self::process_transaction(token.clone(), modules, cookie.clone(), &mut tx) => {
let final_status = tx.last_response.as_ref().map(|r| r.status_code());
match r {
Ok(_) => {
debug!(key = %tx.key, ?final_status, "transaction processed in {:?}", start_time.elapsed());
},
Err(e) => {
warn!(key = %tx.key, ?final_status, "failed to process transaction: {} in {:?}", e, start_time.elapsed());
}
}
}
_ = token.cancelled() => {
info!(key = %tx.key, "transaction cancelled");
}
};
crate::metrics::transaction::latency_seconds(start_time.elapsed().as_secs_f64());
runnings_tx.fetch_sub(1, Ordering::Relaxed);
let is_mid_dialog = tx
.original
.to_header()
.ok()
.and_then(|h| h.tag().ok().flatten())
.is_some();
if !matches!(
tx.original.method,
rsipstack::sip::Method::Bye
| rsipstack::sip::Method::Cancel
| rsipstack::sip::Method::Ack
) && !is_mid_dialog
&& tx.last_response.is_none()
&& !cookie.is_spam()
{
tx.reply(rsipstack::sip::StatusCode::NotImplemented)
.await
.ok();
}
let _ = guard;
Ok::<(), anyhow::Error>(())
});
}
Ok(())
}
async fn process_transaction(
token: CancellationToken,
modules: Arc<Vec<Box<dyn ProxyModule>>>,
cookie: TransactionCookie,
tx: &mut Transaction,
) -> Result<()> {
for module in modules.iter() {
match module
.on_transaction_begin(token.clone(), tx, cookie.clone())
.await
{
Ok(action) => match action {
ProxyAction::Continue => {}
ProxyAction::Abort => break,
},
Err(e) => {
warn!(
key = %tx.key,
module = module.name(),
"failed to handle transaction: {}",
e
);
if tx.last_response.is_none() {
tx.reply(rsipstack::sip::StatusCode::ServerInternalError)
.await
.ok();
}
return Ok(());
}
}
}
for module in modules.iter() {
match module.on_transaction_end(tx).await {
Ok(_) => {}
Err(e) => {
warn!(key = %tx.key, "failed to handle transaction: {}", e);
}
}
}
Ok(())
}
}
impl Drop for SipServerInner {
fn drop(&mut self) {
self.cancel_token.cancel();
info!("SipServerInner dropped");
}
}
impl SipServerInner {
pub fn default_contact_uri(&self) -> Option<rsipstack::sip::Uri> {
let addr = self.endpoint.get_addrs().first()?.clone();
let mut params = Vec::new();
if let Some(transport) = addr.r#type
&& !matches!(transport, Transport::Udp)
{
params.push(Param::Transport(transport));
}
Some(rsipstack::sip::Uri {
scheme: addr.r#type.map(|t| t.sip_scheme()),
auth: Some(Auth {
user: self.contact_username.clone(),
password: None,
}),
host_with_port: addr.addr,
params,
..Default::default()
})
}
pub async fn is_same_realm(&self, callee_realm: &str) -> bool {
let (host, port) = if let Some(pos) = callee_realm.find(':') {
(
&callee_realm[..pos],
callee_realm[pos + 1..].parse::<u16>().ok(),
)
} else {
(callee_realm, None)
};
let is_my_port = |p: u16| {
self.proxy_config.udp_port == Some(p)
|| self.proxy_config.tcp_port == Some(p)
|| self.proxy_config.tls_port == Some(p)
|| self.proxy_config.ws_port == Some(p)
};
match host {
"localhost" | "127.0.0.1" | "::1" => port.map(is_my_port).unwrap_or(true),
_ => {
if let Some(external_ip) = self.rtp_config.external_ip.as_ref()
&& external_ip == host
{
return port.map(is_my_port).unwrap_or(true);
}
if let Some(realms) = self.proxy_config.realms.as_ref() {
for item in realms {
if item == callee_realm {
return true;
}
if item == host {
return port.map(is_my_port).unwrap_or(true);
}
}
}
let realms_empty = self
.proxy_config
.realms
.as_ref()
.map_or(true, |v| v.is_empty());
if self.endpoint.get_addrs().iter().any(|addr| {
let addr_host = addr.addr.host.to_string();
if addr_host == host {
port.map(|p| addr.addr.port == Some(p.into()))
.unwrap_or(true)
} else if realms_empty && (addr_host == "0.0.0.0" || addr_host == "::") {
port.map(|p| addr.addr.port == Some(p.into()))
.unwrap_or(true)
} else {
false
}
}) {
return true;
}
self.user_backend.is_same_realm(callee_realm).await
}
}
}
}
struct CompositeMessageInspector {
inspectors: Vec<Box<dyn MessageInspector>>,
}
impl MessageInspector for CompositeMessageInspector {
fn before_send(
&self,
mut msg: rsipstack::sip::SipMessage,
dest: Option<&rsipstack::transport::SipAddr>,
) -> rsipstack::sip::SipMessage {
for inspector in &self.inspectors {
msg = inspector.before_send(msg, dest);
}
msg
}
fn after_received(
&self,
mut msg: rsipstack::sip::SipMessage,
from: Option<&rsipstack::transport::SipAddr>,
) -> rsipstack::sip::SipMessage {
for inspector in &self.inspectors {
msg = inspector.after_received(msg, from);
}
msg
}
}
#[cfg(unix)]
fn log_rlimit_nofile() {
if let Ok(content) = std::fs::read_to_string("/proc/self/limits") {
for line in content.lines() {
if line.contains("open files") || line.contains("Max open files") {
info!("{line}");
return;
}
}
}
let count = std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0);
info!("RLIMIT_NOFILE: current fd count ~{count}");
}