use crate::app::AppStateInner;
use crate::config::{
CallRecordConfig, CallRecordStorageConfig, Config, HttpRouterConfig, LocatorWebhookConfig,
ProxyConfig, UserBackendConfig,
};
#[cfg(feature = "commerce")]
use crate::console::ReloadTarget;
use crate::console::config_helpers::{
ensure_table_mut, find_or_404, get_config_path, json_error, load_document,
parse_config_from_str, persist_document,
};
use crate::console::handlers::forms;
use crate::console::{ConsoleState, middleware::AuthRequired};
use crate::models::department::{
ActiveModel as DepartmentActiveModel, Column as DepartmentColumn, Entity as DepartmentEntity,
};
use crate::models::rbac::{
ActiveModel as RoleActiveModel, Column as RoleColumn, Entity as RoleEntity, role_permission,
user_role,
};
use crate::models::user::{
ActiveModel as UserActiveModel, Column as UserColumn, Entity as UserEntity, Model as UserModel,
};
use crate::rwi::auth::RwiConfig;
use argon2::Argon2;
use argon2::password_hash::rand_core::OsRng;
use argon2::password_hash::{PasswordHasher, SaltString};
use axum::extract::{Path as AxumPath, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::sse::{KeepAlive, Sse};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, patch, post};
use axum::{Json, Router};
use chrono::{DateTime, Duration, Utc};
use futures::stream;
use sea_orm::sea_query::Condition;
use sea_orm::{
ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter,
QueryOrder,
};
use serde::{Deserialize, Serialize};
use serde_json::{Value as JsonValue, json};
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fs::File;
use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tokio::time;
use toml_edit::{Array, DocumentMut, Item, Table, Value, value};
use tracing::warn;
#[derive(Debug, Clone, Deserialize, Default)]
struct QueryDepartmentFilters {
pub q: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct QueryUserFilters {
pub q: Option<String>,
pub active: Option<bool>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct LogRecentQuery {
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct LogFollowQuery {
pub position: Option<u64>,
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct LogStreamQuery {
pub position: Option<u64>,
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Deserialize)]
struct DepartmentPayload {
pub name: String,
pub display_label: Option<String>,
pub slug: Option<String>,
pub description: Option<String>,
pub color: Option<String>,
pub manager_contact: Option<String>,
#[serde(default)]
pub metadata: Option<JsonValue>,
}
#[derive(Debug, Clone, Deserialize)]
struct UserPayload {
pub email: String,
pub username: String,
#[serde(default)]
pub password: Option<String>,
#[serde(default)]
pub is_active: Option<bool>,
#[serde(default)]
pub is_staff: Option<bool>,
#[serde(default)]
pub is_superuser: Option<bool>,
}
#[derive(Debug, Clone, Deserialize)]
struct RolePayload {
pub name: String,
pub description: Option<String>,
#[serde(default)]
pub permissions: Vec<PermissionEntry>,
}
#[derive(Debug, Clone, Deserialize)]
struct PermissionEntry {
pub resource: String,
pub action: String,
}
#[derive(Debug, Clone, Deserialize)]
struct AssignRolesPayload {
pub role_ids: Vec<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct ProxySettingsPayload {
pub realms: Option<Vec<String>>,
pub registrar_expires: Option<u32>,
pub max_registrar_expires: Option<u32>,
pub locator_webhook: Option<LocatorWebhookConfig>,
pub rwi_webhook: Option<LocatorWebhookConfig>,
pub user_backends: Option<Vec<UserBackendConfig>>,
pub http_router: Option<HttpRouterConfig>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct TestLocatorWebhookPayload {
pub url: String,
pub headers: Option<std::collections::HashMap<String, String>>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct TestRwiWebhookPayload {
pub url: String,
pub headers: Option<std::collections::HashMap<String, String>>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct TestHttpRouterPayload {
pub url: String,
pub headers: Option<std::collections::HashMap<String, String>>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct TestUserBackendPayload {
pub backend: UserBackendConfig,
}
#[derive(Debug, Clone, Serialize)]
struct UserView {
pub id: i64,
pub email: String,
pub username: String,
pub last_login_at: Option<DateTime<Utc>>,
pub last_login_ip: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub is_active: bool,
pub is_staff: bool,
pub is_superuser: bool,
}
impl From<UserModel> for UserView {
fn from(model: UserModel) -> Self {
Self {
id: model.id,
email: model.email,
username: model.username,
last_login_at: model.last_login_at,
last_login_ip: model.last_login_ip,
created_at: model.created_at,
updated_at: model.updated_at,
is_active: model.is_active,
is_staff: model.is_staff,
is_superuser: model.is_superuser,
}
}
}
pub fn urls() -> Router<Arc<ConsoleState>> {
let router = Router::new()
.route("/settings", get(page_settings))
.route("/settings/logs/recent", get(fetch_recent_logs))
.route("/settings/logs/follow", get(follow_logs))
.route("/settings/logs/stream", get(stream_logs))
.route("/settings/config/platform", patch(update_platform_settings))
.route(
"/settings/config/platform/auto-external-ip/test",
post(test_auto_external_ip),
)
.route("/settings/config/proxy", patch(update_proxy_settings))
.route("/settings/config/storage", patch(update_storage_settings))
.route(
"/settings/config/storage/test",
post(test_storage_connection),
)
.route(
"/settings/config/proxy/locator-webhook/test",
post(test_locator_webhook),
)
.route(
"/settings/config/proxy/rwi-webhook/test",
post(test_rwi_webhook),
)
.route(
"/settings/config/proxy/http-router/test",
post(test_http_router),
)
.route(
"/settings/config/proxy/user-backend/test",
post(test_user_backend),
)
.route("/settings/config/security", patch(update_security_settings))
.route("/settings/config/rwi", patch(update_rwi_settings));
#[cfg(feature = "commerce")]
let router = router
.route("/settings/config/cluster", patch(update_cluster_settings))
.route(
"/settings/config/cluster/reload",
get(cluster_reload_sse_handler),
)
.route(
"/settings/config/cluster/reload-addons",
get(list_reload_addons_handler),
);
router
.route(
"/settings/departments",
post(query_departments).put(create_department),
)
.route(
"/settings/departments/{id}",
get(get_department)
.patch(update_department)
.delete(delete_department),
)
.route("/settings/users", post(query_users).put(create_user))
.route(
"/settings/users/{id}",
get(get_user).patch(update_user).delete(delete_user),
)
.route(
"/settings/users/{id}/roles",
get(get_user_roles).post(assign_user_roles),
)
.route("/settings/roles", get(list_roles).post(create_role))
.route(
"/settings/roles/{id}",
get(get_role).patch(update_role).delete(delete_role_handler),
)
}
pub fn api_urls() -> Router<Arc<ConsoleState>> {
let router = Router::new()
.route("/settings/logs/recent", get(fetch_recent_logs))
.route("/settings/logs/follow", get(follow_logs))
.route("/settings/logs/stream", get(stream_logs))
.route("/settings/config/platform", patch(update_platform_settings))
.route(
"/settings/config/platform/auto-external-ip/test",
post(test_auto_external_ip),
)
.route("/settings/config/proxy", patch(update_proxy_settings))
.route("/settings/config/storage", patch(update_storage_settings))
.route(
"/settings/config/storage/test",
post(test_storage_connection),
)
.route(
"/settings/config/proxy/locator-webhook/test",
post(test_locator_webhook),
)
.route(
"/settings/config/proxy/rwi-webhook/test",
post(test_rwi_webhook),
)
.route(
"/settings/config/proxy/http-router/test",
post(test_http_router),
)
.route(
"/settings/config/proxy/user-backend/test",
post(test_user_backend),
)
.route("/settings/config/security", patch(update_security_settings))
.route("/settings/config/rwi", patch(update_rwi_settings));
#[cfg(feature = "commerce")]
let router = router
.route("/settings/config/cluster", patch(update_cluster_settings))
.route(
"/settings/config/cluster/reload",
get(cluster_reload_sse_handler),
)
.route(
"/settings/config/cluster/reload-addons",
get(list_reload_addons_handler),
);
router
.route(
"/settings/departments",
post(query_departments).put(create_department),
)
.route(
"/settings/departments/{id}",
get(get_department)
.patch(update_department)
.delete(delete_department),
)
.route("/settings/users", post(query_users).put(create_user))
.route(
"/settings/users/{id}",
get(get_user).patch(update_user).delete(delete_user),
)
.route(
"/settings/users/{id}/roles",
get(get_user_roles).post(assign_user_roles),
)
.route("/settings/roles", get(list_roles).post(create_role))
.route(
"/settings/roles/{id}",
get(get_role).patch(update_role).delete(delete_role_handler),
)
}
pub async fn page_settings(
State(state): State<Arc<ConsoleState>>,
headers: HeaderMap,
AuthRequired(user): AuthRequired,
) -> Response {
let settings = build_settings_payload(&state).await;
let current_user = state.build_current_user_ctx(&user).await;
state.render_with_headers(
"console/settings.html",
json!({
"nav_active": "settings",
"settings": settings,
"settings_data": settings,
"username": user.username,
"email": user.email,
"current_user": current_user,
"user_is_superuser": user.is_superuser,
}),
&headers,
)
}
async fn build_settings_payload(state: &ConsoleState) -> JsonValue {
let mut data = serde_json::Map::new();
let now = Utc::now();
let mut ami_endpoint = "/ami/v1".to_string();
let mut platform = json!({});
let mut proxy = json!({});
let mut config_meta = json!({ "key_items": [] });
let mut acl = json!({
"active_rules": [],
"embedded_count": 0usize,
"file_patterns": [],
"reload_supported": false,
"metrics": JsonValue::Null,
});
let mut operations: Vec<JsonValue> = Vec::new();
let mut console_meta = JsonValue::Null;
let mut proxy_stats_value = JsonValue::Null;
if let Some(app_state) = state.app_state() {
let config_arc = app_state.config().clone();
ami_endpoint = config_arc
.proxy
.ami_path
.clone()
.unwrap_or_else(|| "/ami/v1".to_string());
let mut loaded_config: Option<Config> = None;
if let Some(path) = app_state.config_path.as_ref() {
match Config::load(path) {
Ok(cfg) => {
loaded_config = Some(cfg);
}
Err(err) => {
warn!(config_path = %path, ?err, "failed to reload config from disk");
}
}
}
let config = loaded_config.as_ref().unwrap_or(config_arc.as_ref());
let uptime_duration = now - app_state.uptime;
let uptime_seconds = uptime_duration.num_seconds().max(0);
platform = json!({
"version": crate::version::get_short_version(),
"uptime_seconds": uptime_seconds,
"uptime_pretty": human_duration(uptime_duration),
"http_addr": config.http_addr.clone(),
"log_level": config.log_level.clone(),
"log_file": config.log_file.clone(),
"config_loaded_at": app_state.config_loaded_at.to_rfc3339(),
"config_path": app_state.config_path.clone(),
"generated_at": now.to_rfc3339(),
});
let recorder_path = config.recorder_path();
let mut key_items: Vec<JsonValue> = Vec::new();
key_items.push(json!({ "label": "HTTP address", "value": config.http_addr.clone() }));
if let Some(ext) = config.external_ip.as_ref() {
key_items.push(json!({ "label": "External IP", "value": ext }));
} else if let Some(url) = config.auto_external_ip.as_ref() {
let display = if url.is_empty() {
"http://ifconfig.me"
} else {
url
};
key_items
.push(json!({ "label": "External IP", "value": format!("auto ({})", display) }));
}
if let (Some(start), Some(end)) = (config.rtp_start_port, config.rtp_end_port) {
key_items.push(json!({ "label": "RTP ports", "value": format!("{}-{}", start, end) }));
}
key_items.push(json!({ "label": "Recorder path", "value": recorder_path.clone() }));
if let Some(ref console_cfg) = config.console {
key_items.push(
json!({ "label": "Console base path", "value": console_cfg.base_path.clone() }),
);
}
if let Some(ref ami_cfg) = config.ami {
let allows = ami_cfg
.allows
.as_ref()
.map(|items| items.join(", "))
.unwrap_or_else(|| "127.0.0.1, ::1".to_string());
key_items.push(json!({ "label": "AMI allow list", "value": allows }));
}
key_items.push(
json!({ "label": "Config loaded", "value": app_state.config_loaded_at.to_rfc3339() }),
);
if let Some(ref path) = app_state.config_path {
key_items.push(json!({ "label": "Config path", "value": path.clone() }));
}
if let Some(summary) = summarize_callrecord(config.callrecord.as_ref()) {
key_items.push(summary);
}
config_meta = json!({ "key_items": key_items });
let stats = app_state.sip_server().inner.endpoint.inner.get_stats();
proxy_stats_value = json!({
"transactions": {
"running": stats.running_transactions,
"finished": stats.finished_transactions,
"waiting_ack": stats.waiting_ack,
},
"dialogs": app_state.sip_server().inner.dialog_layer.len(),
});
proxy = json!({
"enabled": true,
"addr": config.proxy.addr.clone(),
"ports": build_port_list(&config.proxy),
"modules": config.proxy.modules.clone().unwrap_or_default(),
"max_concurrency": config.proxy.max_concurrency,
"registrar_expires": config.proxy.registrar_expires,
"max_registrar_expires": config.proxy.max_registrar_expires,
"callid_suffix": config.proxy.callid_suffix.clone(),
"useragent": config.proxy.useragent.clone(),
"ua_whitelist": config.proxy.ua_white_list.clone().unwrap_or_default(),
"ua_blacklist": config.proxy.ua_black_list.clone().unwrap_or_default(),
"data_sources": json!({
"routes": "toml",
"trunks": "toml",
}),
"rtp": config.rtp_config(),
"user_backends": config.proxy.user_backends.clone(),
"locator_webhook": config.proxy.locator_webhook.clone(),
"rwi_webhook": config.rwi_webhook.clone(),
"http_router": config.proxy.http_router.clone(),
"realms": config.proxy.realms.clone().unwrap_or_default(),
"dos_enabled": config.proxy.dos_enabled,
"dos_max_cps_per_ip": config.proxy.dos_max_cps_per_ip,
"dos_max_concurrent_per_ip": config.proxy.dos_max_concurrent_per_ip,
"dos_scan_probe_threshold": config.proxy.dos_scan_probe_threshold,
"dos_scan_block_duration_secs": config.proxy.dos_scan_block_duration_secs,
"uri_max_length": config.proxy.uri_max_length,
"uri_reject_malformed": config.proxy.uri_reject_malformed,
"emergency": config.proxy.emergency.clone(),
"session_cmd_channel_capacity": config.proxy.session_cmd_channel_capacity,
"session_state_channel_capacity": config.proxy.session_state_channel_capacity,
"media_cmd_channel_capacity": config.proxy.media_cmd_channel_capacity,
"media_event_channel_capacity": config.proxy.media_event_channel_capacity,
"stats": proxy_stats_value.clone(),
});
let (active_rules, embedded_count) = resolve_acl_rules(app_state.clone()).await;
let acl_files = &config.proxy.acl_files;
acl = json!({
"active_rules": active_rules,
"embedded_count": embedded_count,
"file_patterns": acl_files,
"reload_supported": true,
"metrics": JsonValue::Null,
});
operations.push(json!({
"id": "reload-acl",
"label": "Reload ACL rules",
"description": "Re-read ACL definitions from config files and embedded lists.",
"method": "POST",
"endpoint": format!("{}/reload/acl", ami_endpoint.trim_end_matches('/')),
}));
if app_state.config_path.is_some() {
operations.push(json!({
"id": "reload-app",
"label": "Reload application",
"description": "Validate the configuration file and restart core services.",
"method": "POST",
"endpoint": format!("{}/reload/app", ami_endpoint.trim_end_matches('/')),
}));
}
let (storage_meta, storage_profiles) = build_storage_profiles(config);
data.insert("storage".to_string(), storage_meta.clone());
data.insert(
"storage_profiles".to_string(),
JsonValue::Array(storage_profiles.clone()),
);
data.insert(
"server".to_string(),
json!({
"operations": operations.clone(),
"storage": storage_meta,
"storage_profiles": storage_profiles,
}),
);
console_meta = config
.console
.as_ref()
.map(|cfg| {
json!({
"base_path": cfg.base_path,
"allow_registration": cfg.allow_registration,
})
})
.unwrap_or(JsonValue::Null);
let recording_meta = config
.recording
.as_ref()
.and_then(|policy| serde_json::to_value(policy).ok())
.unwrap_or(JsonValue::Null);
data.insert("recording".to_string(), recording_meta);
} else {
data.insert("storage".to_string(), json!({ "mode": "unknown" }));
data.insert(
"storage_profiles".to_string(),
JsonValue::Array(Vec::<JsonValue>::new()),
);
data.insert(
"server".to_string(),
json!({
"operations": operations.clone(),
"storage": {"mode": "unknown"},
"storage_profiles": Vec::<JsonValue>::new(),
}),
);
data.insert("recording".to_string(), JsonValue::Null);
}
let stats = json!({
"generated_at": now.to_rfc3339(),
"proxy": proxy_stats_value,
});
data.insert("platform".to_string(), platform);
data.insert("proxy".to_string(), proxy);
data.insert("config".to_string(), config_meta);
data.insert("acl".to_string(), acl);
data.insert("stats".to_string(), stats);
data.insert("ami_endpoint".to_string(), json!(ami_endpoint));
data.insert(
"operations".to_string(),
JsonValue::Array(operations.clone()),
);
data.insert("console".to_string(), console_meta);
let rwi_config = if let Some(app_state) = state.app_state() {
let config_arc = app_state.config().clone();
config_arc.rwi.clone().unwrap_or_default()
} else {
RwiConfig::default()
};
data.insert(
"rwi".to_string(),
serde_json::to_value(rwi_config).unwrap_or(JsonValue::Null),
);
{
let cluster: Option<crate::config::ClusterConfig> =
if let Some(app_state) = state.app_state() {
let cluster_config = app_state
.cluster_config
.read()
.map(|c| c.clone())
.unwrap_or(None);
cluster_config.or_else(|| {
let config_arc = app_state.config().clone();
config_arc.cluster.clone()
})
} else {
None
};
data.insert(
"cluster".to_string(),
serde_json::to_value(cluster.or_else(|| Some(crate::config::ClusterConfig::default())))
.unwrap_or(JsonValue::Null),
);
}
JsonValue::Object(data)
}
async fn resolve_acl_rules(app_state: Arc<AppStateInner>) -> (Vec<String>, usize) {
let context = app_state.sip_server().inner.data_context.clone();
let snapshot = context.acl_rules_snapshot();
let embedded = if let Some(path) = app_state.config_path.as_ref() {
match Config::load(path) {
Ok(cfg) => cfg
.proxy
.acl_rules
.as_ref()
.map(|rules| rules.len())
.unwrap_or(0),
Err(err) => {
warn!(config_path = %path, ?err, "failed to reload config for acl snapshot");
app_state
.sip_server()
.inner
.proxy_config
.acl_rules
.as_ref()
.map(|rules| rules.len())
.unwrap_or(0)
}
}
} else {
app_state
.sip_server()
.inner
.proxy_config
.acl_rules
.as_ref()
.map(|rules| rules.len())
.unwrap_or(0)
};
(snapshot, embedded)
}
fn build_storage_profiles(config: &crate::config::Config) -> (JsonValue, Vec<JsonValue>) {
use serde_json::Map;
struct Profile {
id: String,
label: &'static str,
description: String,
config: Map<String, JsonValue>,
}
impl Profile {
fn new(id: impl Into<String>, label: &'static str, description: impl Into<String>) -> Self {
Self {
id: id.into(),
label,
description: description.into(),
config: Map::new(),
}
}
fn insert(&mut self, key: &str, value: JsonValue) {
self.config.insert(key.to_string(), value);
}
fn into_json(self) -> JsonValue {
let mut object = Map::new();
object.insert("id".to_string(), json!(self.id));
object.insert("label".to_string(), json!(self.label));
object.insert("description".to_string(), json!(self.description));
object.insert("config".to_string(), JsonValue::Object(self.config));
JsonValue::Object(object)
}
}
let recorder_path = config.recorder_path();
let (mode, callrecord_profile) = match config.callrecord.as_ref().map(|cfg| &cfg.storage) {
Some(CallRecordStorageConfig::Local { root }) => {
let mut profile = Profile::new(
"callrecord-local",
"Call recordings",
format!("Storing call detail records on {}", root),
);
profile.insert("type", json!("local"));
profile.insert("root", json!(root));
("local".to_string(), profile)
}
Some(CallRecordStorageConfig::S3 {
vendor,
bucket,
region,
access_key,
secret_key,
endpoint,
root,
with_media,
keep_media_copy,
}) => {
let mut profile = Profile::new(
"callrecord-s3",
"Call recordings",
format!("Uploading call detail records to S3 bucket {}", bucket),
);
let vendor_value = serde_json::to_value(vendor)
.ok()
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_else(|| format!("{:?}", vendor).to_lowercase());
profile.insert("type", json!("s3"));
profile.insert("vendor", json!(vendor_value));
profile.insert("bucket", json!(bucket));
profile.insert("region", json!(region));
profile.insert("endpoint", json!(endpoint));
profile.insert("root", json!(root));
profile.insert("access_key", json!(mask_basic(access_key)));
profile.insert("secret_key", json!(mask_basic(secret_key)));
if let Some(flag) = with_media {
profile.insert("with_media", json!(flag));
}
if let Some(flag) = keep_media_copy {
profile.insert("keep_media_copy", json!(flag));
}
("s3".to_string(), profile)
}
Some(CallRecordStorageConfig::Http {
url,
headers,
with_media,
keep_media_copy,
}) => {
let mut profile = Profile::new(
"callrecord-http",
"Call recordings",
"Streaming call detail records to HTTP endpoint",
);
profile.insert("type", json!("http"));
profile.insert("url", json!(url));
if let Some(headers) = headers {
profile.insert("headers", json!(headers));
}
if let Some(flag) = with_media {
profile.insert("with_media", json!(flag));
}
if let Some(flag) = keep_media_copy {
profile.insert("keep_media_copy", json!(flag));
}
("http".to_string(), profile)
}
Some(CallRecordStorageConfig::Database {
database_url,
table_name,
}) => {
let mut profile = Profile::new(
"callrecord-database",
"Call recordings",
"Storing call detail records in database",
);
profile.insert("type", json!("database"));
if let Some(url) = database_url {
profile.insert("database_url", json!(url));
}
profile.insert("table_name", json!(table_name));
("database".to_string(), profile)
}
None => {
let mut profile = Profile::new(
"callrecord-local",
"Call recordings",
format!("Storing call detail records on {}", recorder_path),
);
profile.insert("type", json!("local"));
profile.insert("root", json!(&recorder_path));
("local".to_string(), profile)
}
};
let mut spool_profile = Profile::new(
"spool-paths",
"Spool directories",
"Server-side spool paths for recordings and media cache.",
);
spool_profile.insert("recorder_path", json!(&recorder_path));
if let Some(policy) = config.recording.as_ref()
&& let Ok(policy_value) = serde_json::to_value(policy)
{
spool_profile.insert("recording", policy_value);
}
let active_profile_id = callrecord_profile.id.clone();
let active_description = callrecord_profile.description.clone();
let storage_mode = mode.clone();
let mut storage_meta = serde_json::Map::new();
storage_meta.insert("mode".to_string(), json!(storage_mode));
storage_meta.insert("active_profile".to_string(), json!(active_profile_id));
storage_meta.insert("description".to_string(), json!(active_description));
storage_meta.insert("recorder_path".to_string(), json!(&recorder_path));
storage_meta.insert(
"recording".to_string(),
config
.recording
.as_ref()
.and_then(|policy| serde_json::to_value(policy).ok())
.unwrap_or(JsonValue::Null),
);
let profiles = vec![callrecord_profile.into_json(), spool_profile.into_json()];
(JsonValue::Object(storage_meta), profiles)
}
async fn query_departments(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<forms::ListQuery<QueryDepartmentFilters>>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "departments", "read").await {
return resp;
}
let db = state.db();
let mut selector = DepartmentEntity::find().order_by_asc(DepartmentColumn::Name);
if let Some(filters) = payload.filters.as_ref()
&& let Some(keyword) = filters
.q
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
{
let pattern = format!("%{}%", keyword);
selector = selector.filter(
Condition::any()
.add(DepartmentColumn::Name.like(pattern.clone()))
.add(DepartmentColumn::DisplayLabel.like(pattern.clone()))
.add(DepartmentColumn::Slug.like(pattern)),
);
}
let paginator = selector.paginate(db, payload.normalize().1);
let pagination = match forms::paginate(paginator, &payload).await {
Ok(pagination) => pagination,
Err(err) => {
warn!("failed to query departments: {}", err);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response();
}
};
let forms::Pagination {
items,
current_page,
per_page,
total_items,
total_pages,
has_prev,
has_next,
} = pagination;
Json(json!({
"page": current_page,
"per_page": per_page,
"total_items": total_items,
"total_pages": total_pages,
"has_prev": has_prev,
"has_next": has_next,
"items": items,
}))
.into_response()
}
async fn get_department(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "departments", "read").await {
return resp;
}
match DepartmentEntity::find_by_id(id).one(state.db()).await {
Ok(Some(model)) => Json(model).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"message": "Department not found"})),
)
.into_response(),
Err(err) => {
warn!("failed to load department {}: {}", id, err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn create_department(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<DepartmentPayload>,
) -> Response {
if let Err(resp) = state
.require_permission(&user, "departments", "write")
.await
{
return resp;
}
let name = payload.name.trim();
if name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Department name is required"})),
)
.into_response();
}
let now = Utc::now();
let mut active = DepartmentActiveModel {
name: Set(name.to_string()),
created_at: Set(now),
updated_at: Set(now),
..Default::default()
};
active.display_label = Set(normalize_opt_string(payload.display_label));
active.slug = Set(normalize_opt_string(payload.slug));
active.description = Set(normalize_opt_string(payload.description));
active.color = Set(normalize_opt_string(payload.color));
active.manager_contact = Set(normalize_opt_string(payload.manager_contact));
active.metadata = Set(payload.metadata);
match active.insert(state.db()).await {
Ok(model) => (
StatusCode::CREATED,
Json(json!({"status": "ok", "id": model.id})),
)
.into_response(),
Err(err) => {
warn!("failed to create department: {}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn update_department(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<DepartmentPayload>,
) -> Response {
if let Err(resp) = state
.require_permission(&user, "departments", "write")
.await
{
return resp;
}
let model = find_or_404!(DepartmentEntity, id, state.db(), "Department");
let mut active: DepartmentActiveModel = model.into();
let name = payload.name.trim();
if name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Department name is required"})),
)
.into_response();
}
active.name = Set(name.to_string());
active.display_label = Set(normalize_opt_string(payload.display_label));
active.slug = Set(normalize_opt_string(payload.slug));
active.description = Set(normalize_opt_string(payload.description));
active.color = Set(normalize_opt_string(payload.color));
active.manager_contact = Set(normalize_opt_string(payload.manager_contact));
active.metadata = Set(payload.metadata);
active.updated_at = Set(Utc::now());
match active.update(state.db()).await {
Ok(_) => Json(json!({"status": "ok"})).into_response(),
Err(err) => {
warn!("failed to update department {}: {}", id, err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn delete_department(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state
.require_permission(&user, "departments", "write")
.await
{
return resp;
}
let model = find_or_404!(DepartmentEntity, id, state.db(), "Department");
let active: DepartmentActiveModel = model.into();
match active.delete(state.db()).await {
Ok(_) => Json(json!({"status": "ok"})).into_response(),
Err(err) => {
warn!("failed to delete department {}: {}", id, err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn query_users(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(query): Json<forms::ListQuery<QueryUserFilters>>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let db = state.db();
let mut selector = UserEntity::find().order_by_asc(UserColumn::Username);
if let Some(filters) = query.filters.as_ref() {
if let Some(keyword) = filters
.q
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
{
let pattern = format!("%{}%", keyword);
selector = selector.filter(
Condition::any()
.add(UserColumn::Email.like(pattern.clone()))
.add(UserColumn::Username.like(pattern)),
);
}
if let Some(active_only) = filters.active
&& active_only
{
selector = selector.filter(UserColumn::IsActive.eq(true));
}
}
let paginator = selector.paginate(db, query.normalize().1);
let pagination = match forms::paginate(paginator, &query).await {
Ok(pagination) => pagination,
Err(err) => {
warn!("failed to query users: {}", err);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response();
}
};
let forms::Pagination {
items,
current_page,
per_page,
total_items,
total_pages,
has_prev,
has_next,
} = pagination;
let view_items: Vec<UserView> = items.into_iter().map(UserView::from).collect();
Json(json!({
"page": current_page,
"per_page": per_page,
"total_items": total_items,
"total_pages": total_pages,
"has_prev": has_prev,
"has_next": has_next,
"items": view_items,
}))
.into_response()
}
async fn get_user(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
match UserEntity::find_by_id(id).one(state.db()).await {
Ok(Some(model)) => Json(UserView::from(model)).into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"message": "User not found"})),
)
.into_response(),
Err(err) => {
warn!("failed to load user {}: {}", id, err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn create_user(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<UserPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let email = payload.email.trim();
let username = payload.username.trim();
if email.is_empty() || username.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Email and username are required"})),
)
.into_response();
}
let password = match payload
.password
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
{
Some(password) => password.to_string(),
None => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Password is required"})),
)
.into_response();
}
};
if email_exists(state.db(), email, None).await {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Email already in use"})),
)
.into_response();
}
if username_exists(state.db(), username, None).await {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Username already in use"})),
)
.into_response();
}
let now = Utc::now();
let hashed = match hash_password(&password) {
Ok(hash) => hash,
Err(err) => {
warn!("failed to hash password: {}", err);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": "Failed to hash password"})),
)
.into_response();
}
};
let active = UserActiveModel {
email: Set(email.to_lowercase()),
username: Set(username.to_string()),
password_hash: Set(hashed),
created_at: Set(now),
updated_at: Set(now),
is_active: Set(payload.is_active.unwrap_or(true)),
is_staff: Set(payload.is_staff.unwrap_or(false)),
is_superuser: Set(payload.is_superuser.unwrap_or(false)),
..Default::default()
};
match active.insert(state.db()).await {
Ok(model) => (
StatusCode::CREATED,
Json(json!({"status": "ok", "id": model.id})),
)
.into_response(),
Err(err) => {
warn!("failed to create user: {}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn update_user(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<UserPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let model = crate::console::config_helpers::find_or_404!(UserEntity, id, state.db(), "User");
let email = payload.email.trim();
let username = payload.username.trim();
if email.is_empty() || username.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Email and username are required"})),
)
.into_response();
}
if email_exists(state.db(), email, Some(id)).await {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Email already in use"})),
)
.into_response();
}
if username_exists(state.db(), username, Some(id)).await {
return (
StatusCode::BAD_REQUEST,
Json(json!({"message": "Username already in use"})),
)
.into_response();
}
let mut active: UserActiveModel = model.into();
active.email = Set(email.to_lowercase());
active.username = Set(username.to_string());
active.is_active = Set(payload.is_active.unwrap_or(true));
active.is_staff = Set(payload.is_staff.unwrap_or(false));
active.is_superuser = Set(payload.is_superuser.unwrap_or(false));
active.updated_at = Set(Utc::now());
if let Some(password) = payload
.password
.as_ref()
.map(|v| v.trim())
.filter(|v| !v.is_empty())
{
match hash_password(password) {
Ok(hash) => active.password_hash = Set(hash),
Err(err) => {
warn!("failed to hash password: {}", err);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": "Failed to hash password"})),
)
.into_response();
}
}
}
match active.update(state.db()).await {
Ok(_) => Json(json!({"status": "ok"})).into_response(),
Err(err) => {
warn!("failed to update user {}: {}", id, err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn delete_user(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let model = crate::console::config_helpers::find_or_404!(UserEntity, id, state.db(), "User");
let active: UserActiveModel = model.into();
match active.delete(state.db()).await {
Ok(_) => Json(json!({"status": "ok"})).into_response(),
Err(err) => {
warn!("failed to delete user {}: {}", id, err);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"message": err.to_string()})),
)
.into_response()
}
}
}
async fn email_exists(db: &sea_orm::DatabaseConnection, email: &str, exclude: Option<i64>) -> bool {
let mut selector = UserEntity::find().filter(UserColumn::Email.eq(email));
if let Some(id) = exclude {
selector = selector.filter(UserColumn::Id.ne(id));
}
selector.count(db).await.unwrap_or(0) > 0
}
async fn username_exists(
db: &sea_orm::DatabaseConnection,
username: &str,
exclude: Option<i64>,
) -> bool {
let mut selector = UserEntity::find().filter(UserColumn::Username.eq(username));
if let Some(id) = exclude {
selector = selector.filter(UserColumn::Id.ne(id));
}
selector.count(db).await.unwrap_or(0) > 0
}
fn normalize_opt_string(value: Option<String>) -> Option<String> {
value.and_then(|raw| {
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
}
fn hash_password(password: &str) -> Result<String, argon2::password_hash::Error> {
let salt = SaltString::generate(&mut OsRng);
Argon2::default()
.hash_password(password.as_bytes(), &salt)
.map(|hash| hash.to_string())
}
fn human_duration(duration: Duration) -> String {
let total = duration.num_seconds().max(0);
let days = total / 86_400;
let hours = (total % 86_400) / 3_600;
let minutes = (total % 3_600) / 60;
let seconds = total % 60;
let mut parts = Vec::new();
if days > 0 {
parts.push(format!("{}d", days));
}
if hours > 0 {
parts.push(format!("{}h", hours));
}
if minutes > 0 {
parts.push(format!("{}m", minutes));
}
if seconds > 0 && parts.is_empty() {
parts.push(format!("{}s", seconds));
}
if parts.is_empty() {
"0s".to_string()
} else {
parts.join(" ")
}
}
fn mask_basic(value: &str) -> String {
let chars: Vec<char> = value.chars().collect();
if chars.len() <= 4 {
return "****".to_string();
}
let mut masked = String::new();
masked.extend(&chars[..2]);
masked.push_str("****");
masked.extend(&chars[chars.len() - 2..]);
masked
}
fn summarize_callrecord(config: Option<&CallRecordConfig>) -> Option<JsonValue> {
match &config?.storage {
CallRecordStorageConfig::Local { root } => Some(json!({
"label": "Call record storage",
"value": format!("Local ({})", root),
})),
CallRecordStorageConfig::S3 {
bucket,
region,
endpoint,
..
} => Some(json!({
"label": "Call record storage",
"value": format!("S3 bucket {} ({})", bucket, region),
"hint": endpoint,
})),
CallRecordStorageConfig::Http { url, .. } => Some(json!({
"label": "Call record storage",
"value": format!("HTTP {}", url),
})),
CallRecordStorageConfig::Database {
database_url,
table_name,
} => Some(json!({
"label": "Call record storage",
"value": format!("Database ({})", table_name),
"hint": database_url.as_deref().unwrap_or("not configured"),
})),
}
}
fn build_port_list(proxy_cfg: &ProxyConfig) -> Vec<JsonValue> {
let mut ports = Vec::new();
for (idx, port) in proxy_cfg.all_udp_ports().iter().enumerate() {
ports.push(json!({
"label": if idx == 0 { "UDP (primary)" } else { "UDP" },
"value": port,
"primary": idx == 0,
}));
}
if let Some(port) = proxy_cfg.tcp_port {
ports.push(json!({ "label": "TCP", "value": port }));
}
if let Some(port) = proxy_cfg.tls_port {
ports.push(json!({ "label": "TLS", "value": port }));
}
if let Some(port) = proxy_cfg.ws_port {
ports.push(json!({ "label": "WS", "value": port }));
}
ports
}
#[derive(Debug, Deserialize)]
pub(crate) struct PlatformSettingsPayload {
#[serde(default)]
log_level: Option<Option<String>>,
#[serde(default)]
log_file: Option<Option<String>>,
#[serde(default)]
external_ip: Option<Option<String>>,
#[serde(default)]
auto_external_ip: Option<Option<String>>,
#[serde(default)]
rtp_start_port: Option<Option<u16>>,
#[serde(default)]
rtp_end_port: Option<Option<u16>>,
}
#[derive(Debug, Deserialize)]
pub(crate) struct TestAutoExternalIpPayload {
url: Option<String>,
}
#[derive(Debug, Deserialize)]
pub(crate) struct TestStoragePayload {
pub vendor: crate::storage::S3Vendor,
pub bucket: String,
pub region: String,
pub access_key: String,
pub secret_key: String,
pub endpoint: Option<String>,
pub root: Option<String>,
}
#[derive(Debug, Deserialize)]
pub(crate) struct StorageSettingsPayload {
#[serde(default)]
recorder_path: Option<Option<String>>,
#[serde(default)]
media_cache_path: Option<Option<String>>,
#[serde(default)]
recorder_format: Option<Option<String>>,
#[serde(default)]
callrecord: Option<Option<CallRecordStoragePayload>>,
#[serde(default)]
recording_policy: Option<Option<RecordingPolicyPayload>>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "mode", rename_all = "snake_case")]
enum CallRecordStoragePayload {
Disabled,
Local {
#[serde(default)]
root: Option<String>,
},
S3 {
vendor: String,
bucket: String,
region: String,
access_key: String,
secret_key: String,
#[serde(default)]
endpoint: Option<String>,
#[serde(default)]
root: Option<String>,
#[serde(default)]
with_media: Option<bool>,
#[serde(default)]
keep_media_copy: Option<bool>,
},
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct RecordingPolicyPayload {
#[serde(default)]
enabled: Option<bool>,
#[serde(default)]
directions: Option<Vec<String>>,
#[serde(default)]
caller_allow: Option<Vec<String>>,
#[serde(default)]
caller_deny: Option<Vec<String>>,
#[serde(default)]
callee_allow: Option<Vec<String>>,
#[serde(default)]
callee_deny: Option<Vec<String>>,
#[serde(default)]
auto_start: Option<bool>,
#[serde(default)]
filename_pattern: Option<Option<String>>,
#[serde(default)]
samplerate: Option<Option<u32>>,
#[serde(default)]
ptime: Option<Option<u32>>,
#[serde(default)]
path: Option<Option<String>>,
#[serde(default)]
format: Option<Option<String>>,
#[serde(default)]
force_file: Option<bool>,
}
#[derive(Debug, Deserialize)]
pub(crate) struct SecuritySettingsPayload {
#[serde(default)]
acl_rules: Option<Option<String>>,
dos_enabled: Option<bool>,
dos_max_cps_per_ip: Option<u32>,
dos_max_concurrent_per_ip: Option<u32>,
dos_scan_probe_threshold: Option<u32>,
dos_scan_block_duration_secs: Option<u64>,
uri_max_length: Option<usize>,
uri_reject_malformed: Option<bool>,
emergency: Option<Option<EmergencySettingsPayload>>,
session_cmd_channel_capacity: Option<usize>,
session_state_channel_capacity: Option<usize>,
media_cmd_channel_capacity: Option<usize>,
media_event_channel_capacity: Option<usize>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct EmergencySettingsPayload {
enabled: bool,
numbers: Vec<String>,
emergency_trunk: String,
}
pub(crate) async fn update_platform_settings(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<PlatformSettingsPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let config_path = match get_config_path(&state) {
Ok(path) => path,
Err(resp) => return resp,
};
let mut doc = match load_document(&config_path) {
Ok(doc) => doc,
Err(resp) => return resp,
};
let mut modified = false;
if let Some(level_opt) = payload.log_level {
if let Some(level) = normalize_opt_string(level_opt) {
doc["log_level"] = value(level);
} else {
doc.insert("log_level", Item::None);
}
modified = true;
}
if let Some(file_opt) = payload.log_file {
if let Some(path) = normalize_opt_string(file_opt) {
doc["log_file"] = value(path);
} else {
doc.insert("log_file", Item::None);
}
modified = true;
}
if let Some(ext_opt) = payload.external_ip {
if let Some(ip) = normalize_opt_string(ext_opt) {
doc["external_ip"] = value(ip);
doc.insert("auto_external_ip", Item::None);
} else {
doc.insert("external_ip", Item::None);
}
modified = true;
}
if let Some(auto_opt) = payload.auto_external_ip {
if let Some(url) = normalize_opt_string(auto_opt) {
doc["auto_external_ip"] = value(url);
doc.insert("external_ip", Item::None);
} else {
doc.insert("auto_external_ip", Item::None);
}
modified = true;
}
if let Some(start_opt) = payload.rtp_start_port {
if let Some(port) = start_opt {
if port == 0 {
return json_error(
StatusCode::UNPROCESSABLE_ENTITY,
"rtp_start_port must be greater than 0",
);
}
doc["rtp_start_port"] = value(i64::from(port));
} else {
doc.remove("rtp_start_port");
}
modified = true;
}
if let Some(end_opt) = payload.rtp_end_port {
if let Some(port) = end_opt {
if port == 0 {
return json_error(
StatusCode::UNPROCESSABLE_ENTITY,
"rtp_end_port must be greater than 0",
);
}
doc["rtp_end_port"] = value(i64::from(port));
} else {
doc.remove("rtp_end_port");
}
modified = true;
}
let doc_text = doc.to_string();
let config = match parse_config_from_str(&doc_text) {
Ok(cfg) => cfg,
Err(resp) => return resp,
};
if let (Some(start), Some(end)) = (config.rtp_start_port, config.rtp_end_port)
&& start > end
{
return json_error(
StatusCode::UNPROCESSABLE_ENTITY,
"rtp_start_port must be less than or equal to rtp_end_port",
);
}
if modified && let Err(resp) = persist_document(&config_path, doc_text) {
return resp;
}
Json(json!({
"status": "ok",
"requires_restart": true,
"message": "Platform settings saved. Restart RustPBX to apply changes.",
"platform": {
"log_level": config.log_level,
"log_file": config.log_file,
},
"rtp": {
"external_ip": config.external_ip,
"auto_external_ip": config.auto_external_ip,
"start_port": config.rtp_start_port,
"end_port": config.rtp_end_port,
}
}))
.into_response()
}
fn serialize_to_item<T: Serialize>(val: &T, field: &str) -> Result<Item, Response> {
let toml_s = toml::to_string(val).map_err(|err| {
json_error(
StatusCode::BAD_REQUEST,
format!("Failed to serialize {}: {}", field, err),
)
})?;
let new_doc = toml_s.parse::<DocumentMut>().map_err(|err| {
json_error(
StatusCode::BAD_REQUEST,
format!("Invalid {} payload: {}", field, err),
)
})?;
Ok(new_doc.as_item().clone())
}
pub(crate) async fn update_proxy_settings(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<ProxySettingsPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let config_path = match get_config_path(&state) {
Ok(path) => path,
Err(resp) => return resp,
};
let mut doc = match load_document(&config_path) {
Ok(doc) => doc,
Err(resp) => return resp,
};
let mut modified = false;
let mut rwi_webhook_value = None;
let mut remove_rwi_webhook = false;
let table = ensure_table_mut(&mut doc, "proxy");
if let Some(realms) = payload.realms {
set_string_array(table, "realms", realms);
modified = true;
}
if let Some(registrar_expires) = payload.registrar_expires {
table["registrar_expires"] = value(i64::from(registrar_expires));
modified = true;
}
if let Some(max_registrar_expires) = payload.max_registrar_expires {
table["max_registrar_expires"] = value(i64::from(max_registrar_expires));
modified = true;
}
if let Some(webhook) = payload.locator_webhook {
if webhook.url.is_empty() {
if table.contains_key("locator_webhook") {
table.remove("locator_webhook");
modified = true;
}
} else {
match serialize_to_item(&webhook, "locator_webhook") {
Ok(item) => table["locator_webhook"] = item,
Err(resp) => return resp,
}
modified = true;
}
}
if let Some(webhook) = payload.rwi_webhook {
if webhook.url.is_empty() {
remove_rwi_webhook = true;
} else {
match serialize_to_item(&webhook, "rwi_webhook") {
Ok(item) => rwi_webhook_value = Some(item),
Err(resp) => return resp,
}
}
}
if let Some(backends) = payload.user_backends {
#[derive(Serialize)]
struct UserBackendsToml {
b: Vec<UserBackendConfig>,
}
let wrapper = UserBackendsToml { b: backends };
let item = match serialize_to_item(&wrapper, "user_backends") {
Ok(item) => item,
Err(resp) => return resp,
};
let Some(backends_item) = item.as_table().and_then(|t| t.get("b").cloned()) else {
return json_error(StatusCode::BAD_REQUEST, "Invalid user_backends payload");
};
table["user_backends"] = backends_item;
modified = true;
}
if let Some(router) = payload.http_router {
if router.url.is_empty() {
if table.contains_key("http_router") {
table.remove("http_router");
modified = true;
}
} else {
match serialize_to_item(&router, "http_router") {
Ok(item) => table["http_router"] = item,
Err(resp) => return resp,
}
modified = true;
}
}
if let Some(value) = rwi_webhook_value {
doc["rwi_webhook"] = value;
modified = true;
}
if remove_rwi_webhook {
doc.remove("rwi_webhook");
modified = true;
}
if modified {
let doc_text = doc.to_string();
if let Err(resp) = parse_config_from_str(&doc_text) {
return resp;
}
if let Err(resp) = persist_document(&config_path, doc_text) {
return resp;
}
}
Json(json!({
"status": "ok",
"message": "Proxy settings saved. Restart required to apply.",
}))
.into_response()
}
pub(crate) async fn update_storage_settings(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<StorageSettingsPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let config_path = match get_config_path(&state) {
Ok(path) => path,
Err(resp) => return resp,
};
let mut doc = match load_document(&config_path) {
Ok(doc) => doc,
Err(resp) => return resp,
};
let mut modified = false;
if let Some(path_opt) = payload.recorder_path {
{
let table = ensure_table_mut(&mut doc, "recording");
if let Some(path) = normalize_opt_string(path_opt) {
table["path"] = value(path);
} else {
table.remove("path");
}
}
doc.remove("recorder_path");
modified = true;
}
if let Some(cache_opt) = payload.media_cache_path {
if let Some(path) = normalize_opt_string(cache_opt) {
doc["media_cache_path"] = value(path);
} else {
doc.remove("media_cache_path");
}
modified = true;
}
if let Some(callrecord_opt) = payload.callrecord {
match callrecord_opt {
Some(CallRecordStoragePayload::Disabled) | None => {
doc.remove("callrecord");
}
Some(payload) => match serialize_to_item(&payload, "callrecord") {
Ok(item) => doc["callrecord"] = item,
Err(resp) => return resp,
},
}
modified = true;
}
if let Some(policy_opt) = payload.recording_policy {
match policy_opt {
Some(policy_payload) => match serialize_to_item(&policy_payload, "recording_policy") {
Ok(item) => doc["recording"] = item,
Err(resp) => return resp,
},
None => {
doc.remove("recording");
}
}
modified = true;
}
if let Some(format_opt) = payload.recorder_format {
{
let table = ensure_table_mut(&mut doc, "recording");
match format_opt {
Some(format_value) => {
let normalized = format_value.trim().to_ascii_lowercase();
if normalized.is_empty() {
table.remove("format");
} else if normalized == "wav" {
table["format"] = value(normalized);
} else {
return json_error(
StatusCode::UNPROCESSABLE_ENTITY,
"recorder_format must be 'wav'",
);
}
}
None => {
table.remove("format");
}
}
}
doc.remove("recorder_format");
modified = true;
}
let doc_text = doc.to_string();
let config = match parse_config_from_str(&doc_text) {
Ok(cfg) => cfg,
Err(resp) => return resp,
};
if modified && let Err(resp) = persist_document(&config_path, doc_text) {
return resp;
}
let (storage_meta, storage_profiles) = build_storage_profiles(&config);
Json(json!({
"status": "ok",
"requires_restart": true,
"message": "Storage settings saved. Restart RustPBX to apply changes.",
"storage": storage_meta,
"storage_profiles": storage_profiles,
}))
.into_response()
}
pub(crate) async fn update_security_settings(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<SecuritySettingsPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let config_path = match get_config_path(&state) {
Ok(path) => path,
Err(resp) => return resp,
};
let mut doc = match load_document(&config_path) {
Ok(doc) => doc,
Err(resp) => return resp,
};
let mut modified = false;
if let Some(acl_opt) = payload.acl_rules {
let table = ensure_table_mut(&mut doc, "proxy");
match acl_opt {
Some(raw) => {
let rules = parse_lines_to_vec(&raw);
if rules.is_empty() {
table.remove("acl_rules");
} else {
set_string_array(table, "acl_rules", rules);
}
}
None => {
table.remove("acl_rules");
}
}
modified = true;
}
{
let table = ensure_table_mut(&mut doc, "proxy");
macro_rules! set_opt {
($field:ident, $cast:ty) => {
if let Some(val) = payload.$field {
table[stringify!($field)] = value(val as $cast);
modified = true;
}
};
($field:ident) => {
if let Some(val) = payload.$field {
table[stringify!($field)] = value(val);
modified = true;
}
};
}
set_opt!(dos_enabled);
set_opt!(dos_max_cps_per_ip, i64);
set_opt!(dos_max_concurrent_per_ip, i64);
set_opt!(dos_scan_probe_threshold, i64);
set_opt!(dos_scan_block_duration_secs, i64);
set_opt!(session_cmd_channel_capacity, i64);
set_opt!(session_state_channel_capacity, i64);
set_opt!(media_cmd_channel_capacity, i64);
set_opt!(media_event_channel_capacity, i64);
set_opt!(uri_max_length, i64);
set_opt!(uri_reject_malformed);
if let Some(emg_opt) = payload.emergency {
match emg_opt {
Some(cfg) => {
let toml_s = toml::to_string(&cfg).unwrap_or_default();
if let Ok(emg_doc) = toml_s.parse::<DocumentMut>() {
table["emergency"] = emg_doc.as_item().clone();
}
}
None => {
table.remove("emergency");
}
}
modified = true;
}
}
let doc_text = doc.to_string();
let config = match parse_config_from_str(&doc_text) {
Ok(cfg) => cfg,
Err(resp) => return resp,
};
if modified && let Err(resp) = persist_document(&config_path, doc_text) {
return resp;
}
let acl_rules = config.proxy.acl_rules.clone().unwrap_or_default();
if let Some(app_state) = state.app_state() {
let _ = app_state
.sip_server()
.inner
.data_context
.reload_acl_rules(false, Some(Arc::new(config.proxy.clone())));
}
Json(json!({
"status": "ok",
"requires_restart": false,
"message": "Security settings saved and applied.",
"security": {
"acl_rules": acl_rules,
}
}))
.into_response()
}
#[derive(Debug, Deserialize)]
pub(crate) struct RwiSettingsPayload {
enabled: Option<bool>,
max_connections: Option<usize>,
max_calls_per_connection: Option<usize>,
orphan_hold_secs: Option<u32>,
originate_rate_limit: Option<usize>,
tokens: Option<Vec<RwiTokenPayload>>,
contexts: Option<Vec<RwiContextPayload>>,
}
#[derive(Debug, Deserialize, Serialize)]
struct RwiTokenPayload {
token: String,
scopes: Vec<String>,
}
#[derive(Debug, Deserialize, Serialize)]
struct RwiContextPayload {
name: String,
no_answer_timeout_secs: Option<u32>,
no_answer_action: Option<String>,
no_answer_transfer_target: Option<String>,
}
pub(crate) async fn update_rwi_settings(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<RwiSettingsPayload>,
) -> Response {
if !state.has_permission(&user, "system", "write").await
&& !state.has_permission(&user, "ami", "access").await
{
return json_error(StatusCode::FORBIDDEN, "Permission denied");
}
let config_path = match get_config_path(&state) {
Ok(path) => path,
Err(resp) => return resp,
};
let mut doc = match load_document(&config_path) {
Ok(doc) => doc,
Err(resp) => return resp,
};
let mut modified = false;
let rwi_table = ensure_table_mut(&mut doc, "rwi");
if let Some(enabled) = payload.enabled {
rwi_table.insert("enabled", value(enabled));
modified = true;
}
if let Some(max_connections) = payload.max_connections {
rwi_table.insert("max_connections", value(max_connections as i64));
modified = true;
}
if let Some(max_calls) = payload.max_calls_per_connection {
rwi_table.insert("max_calls_per_connection", value(max_calls as i64));
modified = true;
}
if let Some(orphan_hold) = payload.orphan_hold_secs {
rwi_table.insert("orphan_hold_secs", value(orphan_hold as i64));
modified = true;
}
if let Some(rate_limit) = payload.originate_rate_limit {
rwi_table.insert("originate_rate_limit", value(rate_limit as i64));
modified = true;
}
if let Some(tokens) = payload.tokens {
#[derive(Serialize)]
struct RwiTokensToml {
tokens: Vec<RwiTokenPayload>,
}
let wrapper = RwiTokensToml { tokens };
let item = match serialize_to_item(&wrapper, "RWI tokens") {
Ok(item) => item,
Err(resp) => return resp,
};
let Some(tokens_item) = item.as_table().and_then(|t| t.get("tokens").cloned()) else {
return json_error(StatusCode::BAD_REQUEST, "Invalid RWI tokens payload");
};
rwi_table["tokens"] = tokens_item;
modified = true;
}
if let Some(contexts) = payload.contexts {
#[derive(Serialize)]
struct RwiContextsToml {
contexts: Vec<RwiContextPayload>,
}
let wrapper = RwiContextsToml { contexts };
let item = match serialize_to_item(&wrapper, "RWI contexts") {
Ok(item) => item,
Err(resp) => return resp,
};
let Some(contexts_item) = item.as_table().and_then(|t| t.get("contexts").cloned()) else {
return json_error(StatusCode::BAD_REQUEST, "Invalid RWI contexts payload");
};
rwi_table["contexts"] = contexts_item;
modified = true;
}
let doc_text = doc.to_string();
let config = match parse_config_from_str(&doc_text) {
Ok(cfg) => cfg,
Err(resp) => return resp,
};
if modified && let Err(resp) = persist_document(&config_path, doc_text) {
return resp;
}
let rwi_config = config.rwi.unwrap_or_default();
Json(json!({
"status": "ok",
"requires_restart": true,
"message": "RWI settings saved. Please restart the service for changes to take effect.",
"rwi": rwi_config
}))
.into_response()
}
#[cfg(feature = "commerce")]
#[derive(Debug, Deserialize)]
pub(crate) struct ClusterSettingsPayload {
#[serde(default)]
pub peers: Option<Vec<ClusterPeerPayload>>,
}
#[cfg(feature = "commerce")]
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct ClusterPeerPayload {
pub addr: String,
pub sip_port: u16,
pub ami_port: u16,
}
#[cfg(feature = "commerce")]
pub(crate) async fn update_cluster_settings(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<ClusterSettingsPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let config_path = match get_config_path(&state) {
Ok(path) => path,
Err(resp) => return resp,
};
let mut doc = match load_document(&config_path) {
Ok(doc) => doc,
Err(resp) => return resp,
};
let mut modified = false;
if let Some(peers) = payload.peers {
#[derive(Serialize)]
struct ClusterToml {
peers: Vec<ClusterPeerPayload>,
}
let wrapper = ClusterToml { peers };
let item = match serialize_to_item(&wrapper, "cluster peers") {
Ok(item) => item,
Err(resp) => return resp,
};
let cluster_table = ensure_table_mut(&mut doc, "cluster");
let Some(peers_item) = item.as_table().and_then(|t| t.get("peers").cloned()) else {
return json_error(StatusCode::BAD_REQUEST, "Invalid cluster peers payload");
};
cluster_table.insert("peers", peers_item);
modified = true;
}
let doc_text = doc.to_string();
let config = match parse_config_from_str(&doc_text) {
Ok(cfg) => cfg,
Err(resp) => return resp,
};
if modified && let Err(resp) = persist_document(&config_path, doc_text) {
return resp;
}
let cluster = config.cluster;
if let Some(app_state) = state.app_state() {
app_state.update_cluster_config(cluster.clone());
}
Json(json!({
"status": "ok",
"requires_restart": true,
"message": "Cluster settings saved. Please restart the service for changes to take effect.",
"cluster": cluster,
}))
.into_response()
}
#[cfg(feature = "commerce")]
async fn list_reload_addons_handler(State(state): State<Arc<ConsoleState>>) -> Response {
let items: Vec<serde_json::Value> = if let Some(app_state) = state.app_state() {
app_state
.addon_registry
.export_reload
.list()
.into_iter()
.map(|(id, name)| serde_json::json!({ "id": id, "name": name }))
.collect()
} else {
Vec::new()
};
Json(json!({ "addons": items })).into_response()
}
#[cfg(feature = "commerce")]
async fn cluster_reload_sse_handler(
State(state): State<Arc<ConsoleState>>,
Query(query): Query<crate::handler::ami::PingReloadPayload>,
) -> Response {
use axum::response::sse::Event as SseEvent;
let app_state = match state.app_state() {
Some(s) => s,
None => {
use axum::response::sse::Sse;
return Sse::new(futures::stream::once(async move {
Ok::<_, std::convert::Infallible>(
SseEvent::default()
.event("error")
.data(r#"{"error":"PBX not running"}"#),
)
}))
.into_response();
}
};
use axum::response::sse::{KeepAlive, Sse};
let (tx, rx) =
tokio::sync::mpsc::unbounded_channel::<Result<SseEvent, std::convert::Infallible>>();
let payload = crate::handler::ami::PingReloadPayload {
trunks: query.trunks,
routes: query.routes,
addons: query.addons,
};
let app = app_state.clone();
crate::utils::spawn(async move {
macro_rules! sse_send {
($event_type:expr, $data:expr) => {
let tx = tx.clone();
let _ = tx.send(Ok(SseEvent::default()
.event($event_type)
.data($data.to_string())));
};
}
if payload.trunks {
sse_send!(
"progress",
serde_json::json!({"type": "addon_start", "node": "current", "addon": "trunks"})
);
let result = reload_trunks(&app, "current").await;
sse_send!(
"progress",
serde_json::json!({"type": "addon_complete", "node": "current", "addon": "trunks", "result": result})
);
}
if payload.routes {
sse_send!(
"progress",
serde_json::json!({"type": "addon_start", "node": "current", "addon": "routes"})
);
let result = reload_routes_console(&app, "current").await;
sse_send!(
"progress",
serde_json::json!({"type": "addon_complete", "node": "current", "addon": "routes", "result": result})
);
}
for addon_id in &payload.addons {
sse_send!(
"progress",
serde_json::json!({"type": "addon_start", "node": "current", "addon": addon_id})
);
let results = app
.addon_registry
.export_reload
.invoke_selected(&[addon_id.clone()], &app)
.await;
let json_result = match results.into_iter().next() {
Some((_, Ok(v))) => serde_json::json!({ "status": "ok", "details": v }),
Some((_, Err(e))) => serde_json::json!({ "status": "error", "message": e }),
None => serde_json::json!({ "status": "error", "message": "Handler not found" }),
};
sse_send!(
"progress",
serde_json::json!({"type": "addon_complete", "node": "current", "addon": addon_id, "result": json_result})
);
}
let peers = app
.config()
.cluster
.as_ref()
.map(|c| c.peers.clone())
.unwrap_or_default();
let ami_path = app
.config()
.proxy
.ami_path
.clone()
.unwrap_or_else(|| "/ami/v1".to_string());
let mut peer_results_summary: Vec<serde_json::Value> = Vec::new();
let mut any_error = false;
for peer in &peers {
let peer_label = format!("{}:{}", peer.addr, peer.ami_port);
sse_send!(
"progress",
serde_json::json!({"type": "node_start", "node": &peer_label})
);
let url = format!(
"http://{}:{}{}/cluster/reload_sync",
peer.addr, peer.ami_port, ami_path
);
let opts = crate::http_util::HttpFetchOptions::new()
.with_timeout(std::time::Duration::from_secs(120));
let start = std::time::Instant::now();
let req = reqwest::Client::new().post(&url).json(&payload);
match crate::http_util::execute_request(req, &opts.headers, opts.timeout).await {
Ok(resp) => {
let elapsed_ms = start.elapsed().as_millis() as u64;
match resp.json::<serde_json::Value>().await {
Ok(peer_results) => {
sse_send!(
"progress",
serde_json::json!({"type": "node_complete", "node": &peer_label, "elapsed_ms": elapsed_ms, "result": peer_results})
);
peer_results_summary.push(serde_json::json!({"node": &peer_label, "status": "ok", "elapsed_ms": elapsed_ms}));
}
Err(e) => {
sse_send!(
"progress",
serde_json::json!({"type": "node_error", "node": &peer_label, "elapsed_ms": elapsed_ms, "error": format!("Invalid JSON: {}", e)})
);
any_error = true;
peer_results_summary.push(serde_json::json!({"node": &peer_label, "status": "error", "elapsed_ms": elapsed_ms, "error": format!("Invalid JSON: {}", e)}));
}
}
}
Err(e) => {
let elapsed_ms = start.elapsed().as_millis() as u64;
sse_send!(
"progress",
serde_json::json!({"type": "node_error", "node": &peer_label, "elapsed_ms": elapsed_ms, "error": format!("Connection failed: {}", e)})
);
any_error = true;
peer_results_summary.push(serde_json::json!({"node": &peer_label, "status": "error", "elapsed_ms": elapsed_ms, "error": format!("Connection failed: {}", e)}));
}
}
}
let overall_status = if any_error { "error" } else { "ok" };
sse_send!(
"complete",
serde_json::json!({"type": "complete", "overall_status": overall_status, "peers": peer_results_summary})
);
});
let sse_stream = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|event| (event, rx))
});
Sse::new(sse_stream)
.keep_alive(
KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("keep-alive"),
)
.into_response()
}
#[cfg(feature = "commerce")]
async fn reload_trunks(app: &crate::app::AppStateInner, _node: &str) -> serde_json::Value {
let config_path = app.config_path.clone();
let config_override = config_path.as_ref().and_then(|path| {
crate::config::Config::load(path)
.ok()
.map(|cfg| std::sync::Arc::new(cfg.proxy))
});
match app
.sip_server()
.inner
.data_context
.reload_trunks(true, config_override)
.await
{
Ok(metrics) => {
if let Some(ref console) = app.console {
console.clear_pending_reload(ReloadTarget::Trunks);
}
serde_json::json!({ "addon": "trunks", "status": "ok", "reloaded": metrics.total })
}
Err(e) => {
serde_json::json!({ "addon": "trunks", "status": "error", "message": e.to_string() })
}
}
}
#[cfg(feature = "commerce")]
async fn reload_routes_console(app: &crate::app::AppStateInner, _node: &str) -> serde_json::Value {
let config_path = app.config_path.clone();
let config_override = config_path.as_ref().and_then(|path| {
crate::config::Config::load(path)
.ok()
.map(|cfg| std::sync::Arc::new(cfg.proxy))
});
match app
.sip_server()
.inner
.data_context
.reload_routes(true, config_override)
.await
{
Ok(metrics) => {
if let Some(ref console) = app.console {
console.clear_pending_reload(ReloadTarget::Routes);
}
serde_json::json!({ "addon": "routes", "status": "ok", "reloaded": metrics.total })
}
Err(e) => {
serde_json::json!({ "addon": "routes", "status": "error", "message": e.to_string() })
}
}
}
const LOG_DEFAULT_LIMIT: usize = 200;
const LOG_MAX_LIMIT: usize = 5000;
struct FollowReadResult {
lines: Vec<String>,
next_position: u64,
reset: bool,
truncated: bool,
}
async fn fetch_recent_logs(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Query(query): Query<LogRecentQuery>,
) -> Response {
if !user.is_superuser {
return json_error(
StatusCode::FORBIDDEN,
"Permission denied. Superuser required.",
);
}
let Some(path) = resolve_log_file_path(&state) else {
return Json(json!({
"status": "ok",
"available": false,
"exists": false,
"path": JsonValue::Null,
"lines": [],
"next_position": 0u64,
"reset": false,
"truncated": false,
"message": "Log file is not configured. Set settings -> platform -> log_file first.",
}))
.into_response();
};
let limit = normalize_log_limit(query.limit);
match read_recent_log_lines(&path, limit) {
Ok((lines, next_position, truncated)) => Json(json!({
"status": "ok",
"available": true,
"exists": true,
"path": path,
"lines": lines,
"next_position": next_position,
"reset": false,
"truncated": truncated,
"message": JsonValue::Null,
}))
.into_response(),
Err(err) if err.kind() == io::ErrorKind::NotFound => Json(json!({
"status": "ok",
"available": true,
"exists": false,
"path": path,
"lines": [],
"next_position": 0u64,
"reset": false,
"truncated": false,
"message": "Log file does not exist yet.",
}))
.into_response(),
Err(err) => json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to read log file: {err}"),
),
}
}
async fn follow_logs(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Query(query): Query<LogFollowQuery>,
) -> Response {
if !user.is_superuser {
return json_error(
StatusCode::FORBIDDEN,
"Permission denied. Superuser required.",
);
}
let Some(path) = resolve_log_file_path(&state) else {
return Json(json!({
"status": "ok",
"available": false,
"exists": false,
"path": JsonValue::Null,
"lines": [],
"next_position": 0u64,
"reset": false,
"truncated": false,
"message": "Log file is not configured. Set settings -> platform -> log_file first.",
}))
.into_response();
};
let position = query.position.unwrap_or(0);
let limit = normalize_log_limit(query.limit);
match read_follow_log_lines(&path, position, limit) {
Ok(result) => Json(json!({
"status": "ok",
"available": true,
"exists": true,
"path": path,
"lines": result.lines,
"next_position": result.next_position,
"reset": result.reset,
"truncated": result.truncated,
"message": JsonValue::Null,
}))
.into_response(),
Err(err) if err.kind() == io::ErrorKind::NotFound => Json(json!({
"status": "ok",
"available": true,
"exists": false,
"path": path,
"lines": [],
"next_position": 0u64,
"reset": position > 0,
"truncated": false,
"message": "Log file does not exist yet.",
}))
.into_response(),
Err(err) => json_error(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to follow log file: {err}"),
),
}
}
async fn stream_logs(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Query(query): Query<LogStreamQuery>,
) -> Response {
if !user.is_superuser {
return json_error(
StatusCode::FORBIDDEN,
"Permission denied. Superuser required.",
);
}
let Some(path) = resolve_log_file_path(&state) else {
return Json(json!({
"status": "ok",
"available": false,
"exists": false,
"path": JsonValue::Null,
"lines": [],
"next_position": 0u64,
"reset": false,
"truncated": false,
"message": "Log file is not configured. Set settings -> platform -> log_file first.",
}))
.into_response();
};
let start_position = query.position.unwrap_or(0);
let limit = normalize_log_limit(query.limit);
let path_for_stream = path.clone();
let stream = stream::unfold(start_position, move |mut cursor| {
let path = path_for_stream.clone();
async move {
time::sleep(StdDuration::from_millis(1000)).await;
let payload = match read_follow_log_lines(&path, cursor, limit) {
Ok(result) => {
cursor = result.next_position;
json!({
"status": "ok",
"path": path,
"lines": result.lines,
"next_position": result.next_position,
"reset": result.reset,
"truncated": result.truncated,
})
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {
cursor = 0;
json!({
"status": "ok",
"path": path,
"lines": [],
"next_position": 0u64,
"reset": true,
"truncated": false,
"exists": false,
"message": "Log file does not exist yet.",
})
}
Err(err) => json!({
"status": "error",
"message": format!("Failed to follow log file: {err}"),
}),
};
let event = axum::response::sse::Event::default()
.event("logs")
.data(payload.to_string());
Some((Ok::<_, Infallible>(event), cursor))
}
});
Sse::new(stream)
.keep_alive(
KeepAlive::new()
.interval(StdDuration::from_secs(15))
.text("keep-alive"),
)
.into_response()
}
fn normalize_log_limit(limit: Option<usize>) -> usize {
match limit {
Some(value) => value.clamp(1, LOG_MAX_LIMIT),
None => LOG_DEFAULT_LIMIT,
}
}
fn resolve_log_file_path(state: &ConsoleState) -> Option<String> {
state.app_state().and_then(|app| {
app.config()
.log_file
.as_ref()
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty())
})
}
fn read_recent_log_lines(path: &str, limit: usize) -> io::Result<(Vec<String>, u64, bool)> {
let file = File::open(path)?;
let next_position = file.metadata()?.len();
let reader = BufReader::new(file);
let mut lines = VecDeque::new();
let mut truncated = false;
for line in reader.lines() {
let line = line?;
if lines.len() == limit {
lines.pop_front();
truncated = true;
}
lines.push_back(line);
}
Ok((lines.into_iter().collect(), next_position, truncated))
}
fn read_follow_log_lines(path: &str, position: u64, limit: usize) -> io::Result<FollowReadResult> {
let file = File::open(path)?;
let file_len = file.metadata()?.len();
if position > file_len {
let (lines, next_position, truncated) = read_recent_log_lines(path, limit)?;
return Ok(FollowReadResult {
lines,
next_position,
reset: true,
truncated,
});
}
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::Start(position))?;
let mut lines = Vec::new();
let mut raw = String::new();
let mut truncated = false;
while lines.len() < limit {
raw.clear();
let read = reader.read_line(&mut raw)?;
if read == 0 {
break;
}
lines.push(raw.trim_end_matches(&['\n', '\r'][..]).to_string());
}
if lines.len() == limit {
let mut extra = String::new();
let extra_read = reader.read_line(&mut extra)?;
if extra_read > 0 {
truncated = true;
let rewind = i64::try_from(extra_read).unwrap_or(i64::MAX);
reader.seek(SeekFrom::Current(-rewind))?;
}
}
let next_position = reader.stream_position()?;
Ok(FollowReadResult {
lines,
next_position,
reset: false,
truncated,
})
}
fn parse_lines_to_vec(raw: &str) -> Vec<String> {
raw.lines()
.map(|line| line.trim())
.filter(|line| !line.is_empty())
.map(|line| line.to_string())
.collect()
}
fn set_string_array(table: &mut Table, key: &str, values: Vec<String>) {
let mut array = Array::new();
for value in values {
array.push(value.as_str());
}
table[key] = Item::Value(Value::Array(array));
}
pub(crate) async fn test_storage_connection(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<TestStoragePayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
use crate::storage::{Storage, StorageConfig};
use uuid::Uuid;
let config = StorageConfig::S3 {
vendor: payload.vendor,
bucket: payload.bucket,
region: payload.region,
access_key: payload.access_key,
secret_key: payload.secret_key,
endpoint: payload.endpoint,
prefix: payload.root,
};
let storage = match Storage::new(&config) {
Ok(s) => s,
Err(err) => {
return json_error(
StatusCode::BAD_REQUEST,
format!("Failed to initialize storage: {}", err),
);
}
};
let filename = format!("test-connection-{}.txt", Uuid::new_v4());
let content = b"RustPBX storage connection test";
let test_fut = async {
storage
.write(&filename, bytes::Bytes::from_static(content))
.await?;
if let Err(err) = storage.delete(&filename).await {
warn!("Failed to delete test file {}: {}", filename, err);
}
Ok::<_, anyhow::Error>(())
};
match tokio::time::timeout(std::time::Duration::from_secs(10), test_fut).await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
return json_error(
StatusCode::BAD_REQUEST,
format!("Failed to write test file: {}", err),
);
}
Err(_) => {
return json_error(
StatusCode::REQUEST_TIMEOUT,
"Storage connection timed out after 10 seconds",
);
}
}
Json(json!({
"status": "ok",
"message": "Connection successful. Test file created and deleted.",
}))
.into_response()
}
pub(crate) async fn test_locator_webhook(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<TestLocatorWebhookPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let opts = crate::http_util::HttpFetchOptions::new()
.with_timeout(std::time::Duration::from_secs(5))
.with_headers(payload.headers.unwrap_or_default());
let test_event = json!({
"event": "test",
"timestamp": Utc::now().timestamp(),
"message": "RustPBX locator webhook test"
});
let req = reqwest::Client::new().post(&payload.url).json(&test_event);
match crate::http_util::execute_request(req, &opts.headers, opts.timeout).await {
Ok(resp) => Json(json!({
"status": "ok",
"message": format!("Webhook test successful: HTTP {}", resp.status()),
}))
.into_response(),
Err(err) => json_error(
StatusCode::BAD_REQUEST,
format!("Webhook request failed: {}", err),
),
}
}
pub(crate) async fn test_rwi_webhook(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<TestRwiWebhookPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
match crate::rwi::webhook::send_test_event(&payload.url, payload.headers.as_ref()).await {
Ok(()) => Json(json!({
"status": "ok",
"message": "RWI webhook test successful",
}))
.into_response(),
Err(err) => json_error(
StatusCode::BAD_REQUEST,
format!("RWI webhook request failed: {}", err),
),
}
}
pub(crate) async fn test_http_router(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<TestHttpRouterPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let opts = crate::http_util::HttpFetchOptions::new()
.with_timeout(std::time::Duration::from_secs(5))
.with_headers(payload.headers.unwrap_or_default());
let test_request = json!({
"call_id": "test-call-id",
"from": "sip:test@localhost",
"to": "sip:echo@localhost",
"method": "INVITE",
"uri": "sip:echo@localhost",
"direction": "internal"
});
let req = reqwest::Client::new()
.post(&payload.url)
.json(&test_request);
match crate::http_util::execute_request(req, &opts.headers, opts.timeout).await {
Ok(resp) => Json(json!({
"status": "ok",
"message": format!("HTTP Router test successful: HTTP {}", resp.status()),
}))
.into_response(),
Err(err) => json_error(
StatusCode::BAD_REQUEST,
format!("HTTP Router request failed: {}", err),
),
}
}
pub(crate) async fn test_auto_external_ip(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<TestAutoExternalIpPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
let url = payload.url.unwrap_or_default();
match crate::auto_external_ip::detect_external_ip(&url).await {
Ok(ip) => Json(json!({
"status": "ok",
"ip": ip.to_string(),
}))
.into_response(),
Err(e) => json_error(
StatusCode::BAD_GATEWAY,
format!("Auto external IP detection failed: {}", e),
),
}
}
pub(crate) async fn test_user_backend(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<TestUserBackendPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "system", "write").await {
return resp;
}
match payload.backend {
UserBackendConfig::Memory { .. } => Json(json!({
"status": "ok",
"message": "Memory backend configuration is valid."
}))
.into_response(),
UserBackendConfig::Http { url, .. } => {
let opts = crate::http_util::HttpFetchOptions::new()
.with_timeout(std::time::Duration::from_secs(5));
match crate::http_util::execute_request(
reqwest::Client::new().get(&url),
&opts.headers,
opts.timeout,
)
.await
{
Ok(resp) => Json(json!({
"status": "ok",
"message": format!("HTTP backend reachable: HTTP {}", resp.status()),
}))
.into_response(),
Err(err) => json_error(
StatusCode::BAD_REQUEST,
format!("HTTP backend unreachable: {}", err),
),
}
}
UserBackendConfig::Database { url, .. } => {
if let Some(db_url) = url {
Json(json!({
"status": "ok",
"message": format!("Database URL configured: {}", db_url)
}))
.into_response()
} else {
json_error(StatusCode::BAD_REQUEST, "Database URL is missing")
}
}
UserBackendConfig::Plain { path } => {
if std::path::Path::new(&path).exists() {
Json(json!({
"status": "ok",
"message": format!("Plain file exists: {}", path)
}))
.into_response()
} else {
json_error(
StatusCode::BAD_REQUEST,
format!("Plain file does not exist: {}", path),
)
}
}
UserBackendConfig::Extension { .. } => Json(json!({
"status": "ok",
"message": "Extension backend uses internal database."
}))
.into_response(),
}
}
async fn list_roles(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
match RoleEntity::find()
.order_by_asc(RoleColumn::Name)
.all(state.db())
.await
{
Ok(roles) => Json(json!({ "items": roles })).into_response(),
Err(err) => {
warn!("failed to list roles: {}", err);
json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
}
}
}
async fn get_role(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let db = state.db();
let role = match RoleEntity::find_by_id(id).one(db).await {
Ok(Some(r)) => r,
Ok(None) => return json_error(StatusCode::NOT_FOUND, "Role not found"),
Err(err) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
};
let perms = match role_permission::Entity::find()
.filter(role_permission::Column::RoleId.eq(id))
.all(db)
.await
{
Ok(p) => p,
Err(err) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
};
Json(json!({ "role": role, "permissions": perms })).into_response()
}
async fn create_role(
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<RolePayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let name = payload.name.trim();
if name.is_empty() {
return json_error(StatusCode::BAD_REQUEST, "Role name is required");
}
let now = Utc::now();
let active = RoleActiveModel {
name: Set(name.to_string()),
description: Set(Some(payload.description.unwrap_or_default())),
is_system: Set(false),
created_at: Set(now),
updated_at: Set(now),
..Default::default()
};
let inserted = match RoleEntity::insert(active)
.exec_with_returning(state.db())
.await
{
Ok(r) => r,
Err(err) => {
warn!("failed to create role: {}", err);
return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
}
};
for entry in &payload.permissions {
let perm = role_permission::ActiveModel {
role_id: Set(inserted.id),
resource: Set(entry.resource.clone()),
action: Set(entry.action.clone()),
..Default::default()
};
if let Err(err) = role_permission::Entity::insert(perm).exec(state.db()).await {
warn!("failed to insert permission: {}", err);
}
}
(
StatusCode::CREATED,
Json(json!({ "status": "ok", "id": inserted.id })),
)
.into_response()
}
async fn update_role(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<RolePayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let db = state.db();
let model = crate::console::config_helpers::find_or_404!(RoleEntity, id, db, "Role");
if model.is_system {
return json_error(StatusCode::FORBIDDEN, "Cannot modify system roles");
}
let mut active: RoleActiveModel = model.into();
let name = payload.name.trim();
if !name.is_empty() {
active.name = Set(name.to_string());
}
if let Some(desc) = payload.description {
active.description = Set(Some(desc));
}
active.updated_at = Set(Utc::now());
if let Err(err) = active.update(db).await {
return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
}
if let Err(err) = role_permission::Entity::delete_many()
.filter(role_permission::Column::RoleId.eq(id))
.exec(db)
.await
{
warn!("failed to clear permissions for role {}: {}", id, err);
}
for entry in &payload.permissions {
let perm = role_permission::ActiveModel {
role_id: Set(id),
resource: Set(entry.resource.clone()),
action: Set(entry.action.clone()),
..Default::default()
};
if let Err(err) = role_permission::Entity::insert(perm).exec(db).await {
warn!("failed to insert permission: {}", err);
}
}
Json(json!({ "status": "ok" })).into_response()
}
async fn delete_role_handler(
AxumPath(id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let db = state.db();
let model = crate::console::config_helpers::find_or_404!(RoleEntity, id, db, "Role");
if model.is_system {
return json_error(StatusCode::FORBIDDEN, "Cannot delete system roles");
}
match RoleEntity::delete_by_id(id).exec(db).await {
Ok(_) => Json(json!({ "status": "ok" })).into_response(),
Err(err) => json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
}
}
async fn get_user_roles(
AxumPath(user_id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let db = state.db();
let assignments = match user_role::Entity::find()
.filter(user_role::Column::UserId.eq(user_id))
.all(db)
.await
{
Ok(rows) => rows,
Err(err) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
};
let role_ids: Vec<i64> = assignments.iter().map(|r| r.role_id).collect();
let roles = match RoleEntity::find()
.filter(RoleColumn::Id.is_in(role_ids))
.all(db)
.await
{
Ok(r) => r,
Err(err) => return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
};
Json(json!({ "items": roles })).into_response()
}
async fn assign_user_roles(
AxumPath(user_id): AxumPath<i64>,
State(state): State<Arc<ConsoleState>>,
AuthRequired(user): AuthRequired,
Json(payload): Json<AssignRolesPayload>,
) -> Response {
if let Err(resp) = state.require_permission(&user, "users", "manage").await {
return resp;
}
let db = state.db();
if let Err(err) = user_role::Entity::delete_many()
.filter(user_role::Column::UserId.eq(user_id))
.exec(db)
.await
{
return json_error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
}
let now = Utc::now();
for role_id in &payload.role_ids {
let assignment = user_role::ActiveModel {
user_id: Set(user_id),
role_id: Set(*role_id),
created_at: Set(now),
..Default::default()
};
if let Err(err) = user_role::Entity::insert(assignment).exec(db).await {
warn!(
"failed to assign role {} to user {}: {}",
role_id, user_id, err
);
}
}
Json(json!({ "status": "ok" })).into_response()
}
#[cfg(test)]
mod tests {
use crate::console::handlers::test_helpers::{setup_state, superuser};
use super::*;
use crate::models::rbac;
use std::io::Write;
use tempfile::NamedTempFile;
#[tokio::test]
async fn list_roles_returns_seeded_roles() {
let state = setup_state().await;
let user = superuser();
let response = list_roles(State(state), AuthRequired(user)).await;
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.expect("read body");
let parsed: serde_json::Value = serde_json::from_slice(&body).expect("parse json");
let items = parsed["items"].as_array().expect("items");
assert_eq!(items.len(), rbac::SYSTEM_ROLES.len());
}
#[tokio::test]
async fn create_and_delete_custom_role() {
let state = setup_state().await;
let user = superuser();
let payload = RolePayload {
name: "test_custom".into(),
description: Some("Test role".into()),
permissions: vec![PermissionEntry {
resource: "extensions".into(),
action: "read".into(),
}],
};
let create_resp = create_role(
State(state.clone()),
AuthRequired(user.clone()),
Json(payload),
)
.await;
assert_eq!(create_resp.status(), StatusCode::CREATED);
let body = axum::body::to_bytes(create_resp.into_body(), usize::MAX)
.await
.expect("read body");
let parsed: serde_json::Value = serde_json::from_slice(&body).expect("parse json");
let role_id = parsed["id"].as_i64().expect("role id");
let del_resp =
delete_role_handler(AxumPath(role_id), State(state.clone()), AuthRequired(user)).await;
assert_eq!(del_resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn cannot_delete_system_role() {
let state = setup_state().await;
let user = superuser();
let roles = rbac::Entity::find()
.all(state.db())
.await
.expect("query roles");
let system_role = roles.first().expect("at least one role");
let resp =
delete_role_handler(AxumPath(system_role.id), State(state), AuthRequired(user)).await;
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn assign_and_fetch_user_roles() {
let state = setup_state().await;
let user = superuser();
let roles = rbac::Entity::find()
.all(state.db())
.await
.expect("query roles");
let viewer = roles.iter().find(|r| r.name == "viewer").expect("viewer");
let assign_resp = assign_user_roles(
AxumPath(42i64),
State(state.clone()),
AuthRequired(user.clone()),
Json(AssignRolesPayload {
role_ids: vec![viewer.id],
}),
)
.await;
assert_eq!(assign_resp.status(), StatusCode::OK);
let fetch_resp = get_user_roles(AxumPath(42i64), State(state), AuthRequired(user)).await;
assert_eq!(fetch_resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(fetch_resp.into_body(), usize::MAX)
.await
.expect("read body");
let parsed: serde_json::Value = serde_json::from_slice(&body).expect("parse json");
let items = parsed["items"].as_array().expect("items");
assert_eq!(items.len(), 1);
assert_eq!(items[0]["name"], "viewer");
}
#[test]
fn read_recent_log_lines_limits_tail() {
let mut file = NamedTempFile::new().expect("tempfile");
writeln!(file, "line-1").expect("write line 1");
writeln!(file, "line-2").expect("write line 2");
writeln!(file, "line-3").expect("write line 3");
let path = file.path().to_string_lossy().to_string();
let (lines, next_position, truncated) =
read_recent_log_lines(&path, 2).expect("read recent logs");
assert_eq!(lines, vec!["line-2".to_string(), "line-3".to_string()]);
assert!(next_position > 0);
assert!(truncated);
}
#[test]
fn follow_logs_resets_on_rotation() {
let mut file = NamedTempFile::new().expect("tempfile");
writeln!(file, "new-1").expect("write new-1");
writeln!(file, "new-2").expect("write new-2");
let path = file.path().to_string_lossy().to_string();
let result = read_follow_log_lines(&path, 10_000, 200).expect("follow logs");
assert!(result.reset);
assert_eq!(result.lines, vec!["new-1".to_string(), "new-2".to_string()]);
assert!(result.next_position > 0);
}
#[test]
fn follow_logs_keeps_position_when_truncated() {
let mut file = NamedTempFile::new().expect("tempfile");
writeln!(file, "l1").expect("write l1");
writeln!(file, "l2").expect("write l2");
writeln!(file, "l3").expect("write l3");
let path = file.path().to_string_lossy().to_string();
let first = read_follow_log_lines(&path, 0, 2).expect("first follow");
assert_eq!(first.lines, vec!["l1".to_string(), "l2".to_string()]);
assert!(first.truncated);
let second = read_follow_log_lines(&path, first.next_position, 2).expect("second follow");
assert_eq!(second.lines, vec!["l3".to_string()]);
assert!(!second.reset);
}
}