use crate::{
callrecord::{
CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender,
DefaultCallRecordFormatter, noop_saver, recording_upload::RecordingUploadHook,
sipflow_upload::SipFlowUploadHook,
},
config::{ClusterConfig, Config, UserBackendConfig},
handler::middleware::clientaddr::ClientAddr,
models::call_record::DatabaseHook,
proxy::{
acl::AclModule,
auth::AuthModule,
call::CallModule,
presence::PresenceModule,
registrar::RegistrarModule,
server::{SipServer, SipServerBuilder},
ws::sip_ws_handler,
},
tls_reloader::TlsReloaderRegistry,
};
use anyhow::Result;
use axum::{
Json, Router,
extract::{State, WebSocketUpgrade},
http::StatusCode,
middleware,
response::{Html, IntoResponse, Response},
routing::get,
};
use axum_server::tls_rustls::RustlsConfig;
use chrono::{DateTime, Utc};
use sea_orm::DatabaseConnection;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{
path::Path,
sync::atomic::{AtomicBool, AtomicU64},
};
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tower_http::{
compression::CompressionLayer,
cors::{AllowOrigin, CorsLayer},
services::ServeDir,
};
use tracing::{info, warn};
use crate::proxy::active_call_registry::ActiveProxyCallRegistry;
pub struct CoreContext {
pub config: Arc<Config>,
pub db: DatabaseConnection,
pub token: CancellationToken,
pub callrecord_sender: Option<CallRecordSender>,
pub callrecord_stats: Option<Arc<crate::callrecord::CallRecordStats>>,
pub storage: crate::storage::Storage,
pub rwi_auth: Option<crate::rwi::RwiAuthRef>,
pub rwi_gateway: Option<crate::rwi::RwiGatewayRef>,
pub rwi_call_registry: Option<Arc<ActiveProxyCallRegistry>>,
}
pub struct AppStateInner {
pub core: Arc<CoreContext>,
pub sip_server: SipServer,
pub skip_migrate: bool,
pub total_calls: AtomicU64,
pub total_failed_calls: AtomicU64,
pub uptime: DateTime<Utc>,
pub config_loaded_at: DateTime<Utc>,
pub config_path: Option<String>,
pub reload_requested: AtomicBool,
pub addon_registry: Arc<crate::addons::registry::AddonRegistry>,
#[cfg(feature = "console")]
pub console: Option<Arc<crate::console::ConsoleState>>,
pub tls_reloader: Arc<RwLock<Option<Arc<TlsReloaderRegistry>>>>,
pub cluster_config: std::sync::RwLock<Option<ClusterConfig>>,
}
pub type AppState = Arc<AppStateInner>;
pub struct AppStateBuilder {
pub config: Option<Config>,
pub callrecord_sender: Option<CallRecordSender>,
pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
pub cancel_token: Option<CancellationToken>,
pub proxy_builder: Option<SipServerBuilder>,
pub config_loaded_at: Option<DateTime<Utc>>,
pub config_path: Option<String>,
pub skip_sip_bind: bool,
pub skip_migrate: bool,
}
impl AppStateInner {
pub fn update_cluster_config(&self, config: Option<ClusterConfig>) {
*self.cluster_config.write().unwrap() = config;
}
#[cfg(feature = "console")]
pub fn get_addon_state<T: crate::console::AddonState>(&self) -> Option<T> {
self.console.as_ref().and_then(|c| c.get_addon_state::<T>())
}
#[cfg(feature = "console")]
pub fn insert_addon_state<T: crate::console::AddonState>(&self, state: T) {
if let Some(ref console) = self.console {
console.insert_addon_state(state);
}
}
pub fn config(&self) -> &Arc<Config> {
&self.core.config
}
pub fn db(&self) -> &DatabaseConnection {
&self.core.db
}
pub fn token(&self) -> &CancellationToken {
&self.core.token
}
pub fn sip_server(&self) -> &SipServer {
&self.sip_server
}
pub fn skip_migrate(&self) -> bool {
self.skip_migrate
}
pub fn get_dump_events_file(&self, session_id: &str) -> String {
let sanitized_id = crate::utils::sanitize_id(session_id);
let recorder_root = self.config().recorder_path();
let root = Path::new(&recorder_root);
if !root.exists() {
match std::fs::create_dir_all(root) {
Ok(_) => {
info!("created dump events root: {}", root.to_string_lossy());
}
Err(e) => {
warn!(
"Failed to create dump events root: {} {}",
e,
root.to_string_lossy()
);
}
}
}
root.join(format!("{}.events.jsonl", sanitized_id))
.to_string_lossy()
.to_string()
}
pub fn get_recorder_file(&self, session_id: &str) -> String {
let sanitized_id = crate::utils::sanitize_id(session_id);
let recorder_root = self.config().recorder_path();
let root = Path::new(&recorder_root);
if !root.exists() {
match std::fs::create_dir_all(root) {
Ok(_) => {
info!("created recorder root: {}", root.to_string_lossy());
}
Err(e) => {
warn!(
"Failed to create recorder root: {} {}",
e,
root.to_string_lossy()
);
}
}
}
let mut recorder_file = root.join(sanitized_id);
recorder_file.set_extension("wav");
recorder_file.to_string_lossy().to_string()
}
}
impl Default for AppStateBuilder {
fn default() -> Self {
Self::new()
}
}
impl AppStateBuilder {
pub fn new() -> Self {
Self {
config: None,
callrecord_sender: None,
callrecord_formatter: None,
cancel_token: None,
proxy_builder: None,
config_loaded_at: None,
config_path: None,
skip_sip_bind: false,
skip_migrate: false,
}
}
pub fn with_skip_sip_bind(mut self) -> Self {
self.skip_sip_bind = true;
self
}
pub fn with_skip_migrate(mut self) -> Self {
self.skip_migrate = true;
self
}
pub fn with_config(mut self, mut config: Config) -> Self {
config.ensure_recording_defaults();
self.config = Some(config);
if self.config_loaded_at.is_none() {
self.config_loaded_at = Some(Utc::now());
}
self
}
pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
self.callrecord_sender = Some(sender);
self
}
pub fn with_proxy_builder(mut self, builder: SipServerBuilder) -> Self {
self.proxy_builder = Some(builder);
self
}
pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = Some(token);
self
}
pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
self.config_path = path;
self.config_loaded_at = Some(loaded_at);
self
}
pub async fn build(self) -> Result<AppState> {
let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
let storage_config = config.storage.clone().unwrap_or_default();
let storage = crate::storage::Storage::new(&storage_config)?;
let token = self.cancel_token.unwrap_or_default();
let config_loaded_at = self.config_loaded_at.unwrap_or_else(Utc::now);
let config_path = self.config_path.clone();
let db_conn = if self.skip_migrate {
crate::models::connect_db(&config.database_url).await?
} else {
crate::models::create_db(&config.database_url).await?
};
let addon_registry = Arc::new(crate::addons::registry::AddonRegistry::new());
if !self.skip_migrate
&& let Err(e) = addon_registry.run_migrations(&db_conn).await
{
tracing::error!("Failed to run addon migrations: {}", e);
}
let sipflow_backend_arc: Option<Arc<dyn crate::sipflow::SipFlowBackend>> =
config.sipflow.as_ref().and_then(|cfg| {
crate::sipflow::backend::create_backend(cfg)
.map(|b| Arc::from(b) as Arc<dyn crate::sipflow::SipFlowBackend>)
.map_err(|e| warn!("Failed to create sipflow backend: {e}"))
.ok()
});
let sipflow_upload_config: Option<crate::config::SipFlowUploadConfig> =
config.sipflow.as_ref().and_then(|s| match s {
crate::config::SipFlowConfig::Local { upload, .. } => upload.clone(),
crate::config::SipFlowConfig::Remote { upload, .. } => upload.clone(),
});
let recording_upload_policy = config
.recording
.as_ref()
.filter(|policy| policy.uploads_recording())
.cloned();
let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
formatter
} else {
let formatter = if let Some(ref callrecord) = config.callrecord {
DefaultCallRecordFormatter::new_with_config(callrecord)
} else {
DefaultCallRecordFormatter::default()
};
Arc::new(formatter)
};
let mut callrecord_stats = None;
let mut callrecord_manager = None;
let callrecord_sender = if let Some(sender) = self.callrecord_sender {
Some(sender)
} else if config.callrecord.is_some()
|| sipflow_upload_config.is_some()
|| recording_upload_policy.is_some()
{
let mut builder = CallRecordManagerBuilder::new()
.with_cancel_token(token.child_token())
.with_formatter(callrecord_formatter.clone());
if let Some(ref callrecord) = config.callrecord {
builder = builder.with_config(callrecord.clone());
} else {
builder = builder
.with_config(crate::config::CallRecordConfig::default())
.with_saver(Arc::new(Box::new(noop_saver)));
}
if let (Some(backend), Some(upload_cfg)) =
(sipflow_backend_arc.as_ref(), sipflow_upload_config.as_ref())
{
builder = builder.with_hook(Box::new(SipFlowUploadHook {
backend: backend.clone(),
upload_config: upload_cfg.clone(),
db: Some(db_conn.clone()),
}));
}
if let Some(policy) = recording_upload_policy.as_ref() {
builder = builder.with_hook(Box::new(RecordingUploadHook::new(policy.clone())));
}
builder = builder.with_hook(Box::new(DatabaseHook {
db: db_conn.clone(),
}));
for hook in addon_registry.get_call_record_hooks(&config, &db_conn) {
builder = builder.with_hook(hook);
}
let manager = builder.build();
let sender = manager.sender.clone();
callrecord_stats = Some(manager.stats.clone());
callrecord_manager = Some(manager);
Some(sender)
} else {
None
};
#[cfg(feature = "console")]
let console_state = match config.console.clone() {
Some(console_config) => Some(
crate::console::ConsoleState::initialize(
callrecord_formatter,
db_conn.clone(),
console_config,
)
.await?,
),
None => None,
};
let mut core = Arc::new(CoreContext {
config: config.clone(),
db: db_conn.clone(),
token: token.clone(),
callrecord_sender: callrecord_sender.clone(),
callrecord_stats: callrecord_stats.clone(),
storage: storage.clone(),
rwi_auth: crate::rwi::create_rwi_auth(&config),
rwi_gateway: config.rwi.as_ref().map(|_| {
std::sync::Arc::new(tokio::sync::RwLock::new(crate::rwi::RwiGateway::new()))
}),
rwi_call_registry: None,
});
let sip_server = match self.proxy_builder {
Some(builder) => builder.build().await,
None => {
let mut proxy_config = config.proxy.clone();
for backend in proxy_config.user_backends.iter_mut() {
if let UserBackendConfig::Extension { database_url, .. } = backend
&& database_url.is_none()
{
*database_url = Some(config.database_url.clone());
}
}
if proxy_config.recording.is_none() {
proxy_config.recording = config.recording.clone();
}
proxy_config.ensure_recording_defaults();
let proxy_config = Arc::new(proxy_config);
let call_record_hooks = addon_registry.get_call_record_hooks(&config, &db_conn);
let cluster_peers: Vec<SocketAddr> = config
.cluster
.as_ref()
.map(|c| c.peers.as_slice())
.unwrap_or(&[])
.iter()
.filter_map(|p| format!("{}:{}", p.addr, p.sip_port).parse().ok())
.collect();
#[allow(unused_mut)]
let mut builder = SipServerBuilder::new(proxy_config.clone())
.with_cluster_peers(cluster_peers)
.with_cancel_token(core.token.child_token())
.with_callrecord_sender(core.callrecord_sender.clone())
.with_rtp_config(config.rtp_config())
.with_database_connection(core.db.clone())
.with_call_record_hooks(call_record_hooks)
.with_storage(core.storage.clone())
.with_sipflow_config(config.sipflow.clone())
.with_sipflow_backend(sipflow_backend_arc.clone())
.with_no_bind(self.skip_sip_bind)
.with_skip_migrate(self.skip_migrate)
.with_addon_registry(Some(addon_registry.clone()))
.register_module("acl", AclModule::create)
.register_module("auth", AuthModule::create)
.register_module("presence", PresenceModule::create)
.register_module("registrar", RegistrarModule::create)
.register_module("call", CallModule::create);
builder = addon_registry.apply_proxy_server_hooks(builder, core.clone());
if let Some(ref emg) = config.proxy.emergency {
builder = builder.with_dialplan_inspector(Box::new(
crate::proxy::emergency::EmergencyInspector::new(Some(emg.clone())),
));
}
builder = builder.with_dialplan_inspector(Box::new(
crate::proxy::number_pool::NumberPoolInspector::default(),
));
builder.build().await
}
}?;
if config.rwi.is_some() {
let registry = sip_server.inner.active_call_registry.clone();
core = Arc::new(CoreContext {
config: core.config.clone(),
db: core.db.clone(),
token: core.token.clone(),
callrecord_sender: core.callrecord_sender.clone(),
callrecord_stats: core.callrecord_stats.clone(),
storage: core.storage.clone(),
rwi_auth: core.rwi_auth.clone(),
rwi_gateway: core.rwi_gateway.clone(),
rwi_call_registry: Some(registry),
});
}
let app_state = Arc::new(AppStateInner {
core: core.clone(),
sip_server,
skip_migrate: self.skip_migrate,
total_calls: AtomicU64::new(0),
total_failed_calls: AtomicU64::new(0),
uptime: chrono::Utc::now(),
config_loaded_at,
config_path,
reload_requested: AtomicBool::new(false),
addon_registry: addon_registry.clone(),
#[cfg(feature = "console")]
console: console_state,
tls_reloader: Arc::new(RwLock::new(Some(Arc::new(TlsReloaderRegistry::new())))),
cluster_config: std::sync::RwLock::new(config.cluster.clone()),
});
if let Some(tls_listener) = app_state.sip_server().get_tls_listener() {
let reloader = Arc::new(crate::tls_reloader::RsipstackTlsReloader::new(tls_listener));
if let Some(registry_guard) = app_state.tls_reloader.read().await.as_ref() {
registry_guard.register_sip_tls(reloader).await;
}
}
if let Some(mut manager) = callrecord_manager {
tokio::spawn(async move {
manager.serve().await;
});
}
if let Some(webhook_config) = config.rwi_webhook.clone()
&& let Some(gateway_ref) = core.rwi_gateway.clone()
{
let webhook_tx = crate::rwi::webhook::start_rwi_webhook_handler(webhook_config);
let mut gw = gateway_ref.write().await;
gw.set_webhook_tx(webhook_tx);
}
if let Err(e) = addon_registry.initialize_all(app_state.clone()).await {
tracing::error!("Failed to initialize addons: {}", e);
}
#[cfg(feature = "commerce")]
{
let commercial_ids: Vec<String> = addon_registry
.list_addons(app_state.clone())
.into_iter()
.filter(|a| a.category == crate::addons::AddonCategory::Commercial)
.map(|a| a.id.clone())
.collect();
if !commercial_ids.is_empty() {
let license_cfg = app_state.config().licenses.clone();
let results =
crate::license::check_all_addon_licenses(&commercial_ids, &license_cfg).await;
if !results.is_empty() {
tracing::info!(
"License check at startup: {} addon(s) verified",
results.len()
);
crate::license::record_startup_results(results);
}
}
}
#[cfg(feature = "console")]
{
if let Some(ref console_state) = app_state.console {
crate::version::spawn_update_checker(db_conn.clone(), token.clone());
console_state.set_sip_server(Some(app_state.sip_server().get_inner()));
for (addon_id, locale_dir) in
app_state.addon_registry.get_locale_dirs(app_state.clone())
{
console_state
.i18n()
.register_addon_locales(&addon_id, locale_dir);
}
console_state.set_app_state(Some(Arc::downgrade(&app_state)));
}
}
Ok(app_state)
}
}
pub async fn run(state: AppState, mut router: Router) -> Result<()> {
let token = state.token().clone();
let addr: SocketAddr = state.config().http_addr.parse()?;
let listener = match TcpListener::bind(addr).await {
Ok(l) => l,
Err(e) => {
tracing::error!("Failed to bind to {}: {}", addr, e);
return Err(anyhow::anyhow!("Failed to bind to {}: {}", addr, e));
}
};
if let Some(ref ws_handler) = state.sip_server().inner.proxy_config.ws_handler {
info!(
"Registering WebSocket handler to sip server: {}",
ws_handler
);
let endpoint_ref = state.sip_server().inner.endpoint.inner.clone();
let token = token.clone();
router = router.route(
ws_handler,
axum::routing::get(
async move |client_ip: ClientAddr, ws: WebSocketUpgrade| -> Response {
let token = token.clone();
ws.protocols(["sip"]).on_upgrade(async move |socket| {
sip_ws_handler(token, client_ip, socket, endpoint_ref.clone()).await
})
},
),
);
}
let mut ssl_config = None;
if let (Some(cert), Some(key)) = (
&state.config().ssl_certificate,
&state.config().ssl_private_key,
) {
ssl_config = Some((cert.clone(), key.clone()));
} else {
let cert_dir = std::path::Path::new("config/certs");
if cert_dir.exists()
&& let Ok(entries) = std::fs::read_dir(cert_dir)
{
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("crt") {
let key_path = path.with_extension("key");
if key_path.exists() {
ssl_config = Some((
path.to_string_lossy().to_string(),
key_path.to_string_lossy().to_string(),
));
break;
}
}
}
}
}
let https_config = if let Some((cert, key)) = ssl_config {
match RustlsConfig::from_pem_file(&cert, &key).await {
Ok(c) => Some(c),
Err(e) => {
tracing::error!("Failed to load SSL certs: {}", e);
None
}
}
} else {
None
};
if let Some(ref config) = https_config {
let reloader = Arc::new(crate::tls_reloader::AxumRustlsReloader::new(Arc::new(
config.clone(),
)));
if let Some(registry_guard) = state.tls_reloader.read().await.as_ref() {
registry_guard.register_https(reloader).await;
}
}
let https_addr = if https_config.is_some() {
let addr_str = state
.config()
.https_addr
.as_deref()
.unwrap_or("0.0.0.0:8443");
match addr_str.parse::<SocketAddr>() {
Ok(a) => Some(a),
Err(e) => {
tracing::error!("Invalid HTTPS address {}: {}", addr_str, e);
None
}
}
} else {
None
};
if let Some(addr) = https_addr {
info!("HTTPS enabled on {}", addr);
}
let http_task = axum::serve(
listener,
router
.clone()
.into_make_service_with_connect_info::<SocketAddr>(),
);
let https_router = if https_addr.is_some() {
router.layer(middleware::from_fn(
|mut req: axum::extract::Request, next: axum::middleware::Next| async move {
req.headers_mut()
.insert("x-forwarded-proto", "https".parse().unwrap());
next.run(req).await
},
))
} else {
router
};
select! {
http_result = http_task => {
match http_result {
Ok(_) => info!("Server shut down gracefully"),
Err(e) => {
tracing::error!("Server error: {}", e);
return Err(anyhow::anyhow!("Server error: {}", e));
}
}
}
https_result = async {
if let (Some(config), Some(addr)) = (https_config, https_addr) {
axum_server::bind_rustls(addr, config)
.serve(https_router.into_make_service_with_connect_info::<SocketAddr>())
.await
} else {
std::future::pending().await
}
} => {
match https_result {
Ok(_) => info!("HTTPS Server shut down gracefully"),
Err(e) => {
tracing::error!("HTTPS Server error: {}", e);
return Err(anyhow::anyhow!("HTTPS Server error: {}", e));
}
}
}
prx_result = state.sip_server().serve() => {
if let Err(e) = prx_result {
tracing::error!("Sip server error: {}", e);
}
}
_ = token.cancelled() => {
info!("Application shutting down due to cancellation");
}
}
Ok(())
}
async fn iceservers_handler(State(state): State<AppState>) -> impl IntoResponse {
let ice_servers = state.config().ice_servers.clone().unwrap_or_default();
Json(ice_servers).into_response()
}
async fn phone_config_handler(State(state): State<AppState>) -> impl IntoResponse {
let proxy_cfg = &state.config().proxy;
let config = serde_json::json!({
"wsPath": proxy_cfg.ws_handler.clone().unwrap_or_else(|| "/ws".to_string()),
"iceServersPath": proxy_cfg.ice_servers_path.clone().unwrap_or_else(|| "/iceservers".to_string()),
"amiPath": proxy_cfg.ami_path.clone().unwrap_or_else(|| "/ami/v1".to_string()),
"staticPath": state.config().static_path(),
});
Json(config).into_response()
}
pub fn create_router(state: AppState) -> Router {
let mut router = Router::new();
let static_files_service = ServeDir::new("static");
if std::path::Path::new("static/index.html").exists() {
router = router.route(
"/",
get(|| async {
match tokio::fs::read_to_string("static/index.html").await {
Ok(content) => Html(content).into_response(),
Err(_) => StatusCode::NOT_FOUND.into_response(),
}
}),
);
}
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::any())
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
axum::http::Method::OPTIONS,
])
.allow_headers([
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
axum::http::header::ACCEPT,
axum::http::header::ORIGIN,
]);
let proxy_cfg = &state.config().proxy;
let ice_servers_path = proxy_cfg
.ice_servers_path
.clone()
.unwrap_or_else(|| "/iceservers".to_string());
let static_path = state.config().static_path();
let call_routes = crate::handler::ami_router(state.clone()).with_state(state.clone());
#[allow(unused_mut)]
let mut router = router
.route(
"/api/config/phone",
get(phone_config_handler).with_state(state.clone()),
)
.route(
&ice_servers_path,
get(iceservers_handler).with_state(state.clone()),
)
.merge(state.addon_registry.get_routers(state.clone()))
.nest_service(&static_path, static_files_service)
.merge(call_routes)
.layer(cors);
if let (Some(auth), Some(gateway), Some(call_registry)) = (
state.core.rwi_auth.clone(),
state.core.rwi_gateway.clone(),
state.core.rwi_call_registry.clone(),
) {
let rwi_auth = auth;
let rwi_gateway = gateway.clone();
let rwi_call_registry = call_registry.clone();
let rwi_sip_server: Option<crate::proxy::server::SipServerRef> =
Some(state.sip_server().get_inner());
router = router.route(
"/rwi/v1",
axum::routing::get(
async move |client_addr: crate::handler::middleware::clientaddr::ClientAddr,
ws: axum::extract::ws::WebSocketUpgrade,
axum::extract::Query(params): axum::extract::Query<
std::collections::HashMap<String, String>,
>,
headers: axum::http::HeaderMap| {
use axum::Extension;
use axum::extract::Query;
crate::rwi::handler::rwi_ws_handler(
client_addr,
ws,
Query(params),
Extension(rwi_auth),
Extension(rwi_gateway),
Extension(rwi_call_registry),
Extension(rwi_sip_server),
headers,
)
.await
},
),
);
}
#[cfg(feature = "console")]
if let Some(console_state) = state.console.clone() {
router = router.merge(crate::console::router(console_state));
}
let access_log_skip_paths = Arc::new(state.config().http_access_skip_paths.clone());
router = router.layer(middleware::from_fn_with_state(
access_log_skip_paths,
crate::handler::middleware::request_log::log_requests,
));
if state.config().http_gzip {
router = router.layer(CompressionLayer::new().gzip(true));
}
router
}