use std::collections::HashMap;
use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::info;
use vti_common::auth::AdminAuth;
use vti_common::error::AppError;
use crate::community::{CommunityProfile, CommunityProfileUpdate, load_profile, store_profile};
use crate::config_store::{
ConfigStore, EffectiveConfig, compute_effective_config, lookup, validate_value,
};
use crate::server::AppState;
#[allow(unused_imports)]
use crate::supervisor::SupervisorKind;
use vti_common::audit::{
AuditEvent, AuditWriter, CommunityProfileUpdatedData, ConfigChange, ConfigChangedData,
ConfigReloadedData, ConfigSource, RestartRequestedData,
};
#[derive(Debug, Deserialize)]
pub struct PatchRequest {
#[serde(flatten)]
pub overrides: HashMap<String, Value>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PatchResponse {
pub applied: Vec<String>,
pub pending_restart: Vec<String>,
pub rejected: Vec<RejectedKey>,
}
#[derive(Debug, Serialize)]
pub struct RejectedKey {
pub key: String,
pub reason: String,
}
pub async fn get_config(
_admin: AdminAuth,
State(state): State<AppState>,
) -> Result<Json<EffectiveConfig>, AppError> {
let cfg = state.config.read().await;
let store = ConfigStore::new(state.config_ks.clone());
let eff = compute_effective_config(&cfg, &store).await?;
Ok(Json(eff))
}
pub async fn patch_config(
_admin: AdminAuth,
State(state): State<AppState>,
Json(req): Json<PatchRequest>,
) -> Result<(StatusCode, Json<PatchResponse>), AppError> {
let store = ConfigStore::new(state.config_ks.clone());
let mut applied = Vec::new();
let mut pending_restart = Vec::new();
let mut rejected = Vec::new();
for (key, value) in req.overrides {
let Some(def) = lookup(&key) else {
rejected.push(RejectedKey {
key,
reason: "unknown config key (not in registry)".into(),
});
continue;
};
if let Err(e) = validate_value(def, &value) {
rejected.push(RejectedKey {
key,
reason: format!("validation failed: {e}"),
});
continue;
}
if let Err(e) = store.put(&key, &value).await {
rejected.push(RejectedKey {
key,
reason: format!("persistence failed: {e}"),
});
continue;
}
info!(
key = %key,
requires_restart = def.requires_restart,
sensitive = def.sensitive,
"admin config PATCH applied"
);
if def.requires_restart {
pending_restart.push(key);
} else {
applied.push(key);
}
}
Ok((
StatusCode::OK,
Json(PatchResponse {
applied,
pending_restart,
rejected,
}),
))
}
const DEFAULT_DRAIN_TIMEOUT_SECS: u64 = 30;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReloadResponse {
pub keys_reloaded: Vec<String>,
}
pub async fn reload_config(
_admin: AdminAuth,
State(state): State<AppState>,
) -> Result<Json<ReloadResponse>, AppError> {
let audit_writer = require_audit_writer(&state)?;
let store = ConfigStore::new(state.config_ks.clone());
let new_effective = {
let cfg = state.config.read().await;
compute_effective_config(&cfg, &store).await?
};
let mut keys_reloaded = Vec::new();
{
let mut cfg = state.config.write().await;
for def in crate::config_store::REGISTRY {
if def.requires_restart {
continue;
}
let new_value = new_effective
.fields
.iter()
.find(|f| f.key == def.key)
.map(|f| f.value.clone())
.unwrap_or(Value::Null);
let live_value = lookup_live(&cfg, def.key);
if new_value != live_value && apply_to_live(&mut cfg, def.key, &new_value) {
keys_reloaded.push(def.key.to_string());
}
}
}
audit_writer
.write(
"did:key:vtc-admin", None,
AuditEvent::ConfigReloaded(ConfigReloadedData {
keys_reloaded: keys_reloaded.clone(),
}),
)
.await?;
info!(?keys_reloaded, "config reloaded");
Ok(Json(ReloadResponse { keys_reloaded }))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RestartResponse {
pub supervisor: SupervisorKind,
pub drain_timeout_seconds: u64,
}
pub async fn restart_config(
_admin: AdminAuth,
State(state): State<AppState>,
) -> Result<Json<RestartResponse>, AppError> {
let audit_writer = require_audit_writer(&state)?;
let supervisor = state.supervisor.ok_or_else(|| AppError::ServiceError {
status: StatusCode::PRECONDITION_FAILED,
message: "SupervisorRequired: refusing to restart without a process supervisor \
(set VTC_SUPERVISED=1 or run under systemd / kubernetes)"
.into(),
})?;
audit_writer
.write(
"did:key:vtc-admin",
None,
AuditEvent::RestartRequested(RestartRequestedData {
drain_timeout_seconds: DEFAULT_DRAIN_TIMEOUT_SECS,
}),
)
.await?;
info!(?supervisor, "restart requested");
let _ = state.shutdown_tx.send(true);
Ok(Json(RestartResponse {
supervisor,
drain_timeout_seconds: DEFAULT_DRAIN_TIMEOUT_SECS,
}))
}
fn require_audit_writer(state: &AppState) -> Result<&AuditWriter, AppError> {
state
.audit_writer
.as_ref()
.ok_or_else(|| AppError::ServiceError {
status: StatusCode::SERVICE_UNAVAILABLE,
message: "audit writer not configured".into(),
})
}
fn lookup_live(cfg: &crate::config::AppConfig, key: &str) -> Value {
match key {
"server.host" => Value::String(cfg.server.host.clone()),
"server.port" => Value::Number(cfg.server.port.into()),
"log.level" => Value::String(cfg.log.level.clone()),
_ => Value::Null,
}
}
fn apply_to_live(cfg: &mut crate::config::AppConfig, key: &str, value: &Value) -> bool {
if key == "log.level"
&& let Some(s) = value.as_str()
{
cfg.log.level = s.to_string();
return true;
}
false
}
pub const EXPORT_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfigExport {
pub schema_version: u32,
pub exported_at: DateTime<Utc>,
pub community_profile: Option<CommunityProfile>,
pub config_overrides: HashMap<String, Value>,
}
pub async fn export_config(
_admin: AdminAuth,
State(state): State<AppState>,
) -> Result<Json<ConfigExport>, AppError> {
let community_profile = load_profile(&state.community_ks).await?;
let store = ConfigStore::new(state.config_ks.clone());
let config_overrides = store.snapshot().await?;
Ok(Json(ConfigExport {
schema_version: EXPORT_SCHEMA_VERSION,
exported_at: Utc::now(),
community_profile,
config_overrides,
}))
}
#[derive(Debug, Deserialize)]
pub struct ImportQuery {
#[serde(default)]
pub confirm: bool,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FieldDiff {
pub key: String,
pub old_value: Option<Value>,
pub new_value: Option<Value>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ImportResponse {
pub confirmed: bool,
pub community_profile_diff: Vec<FieldDiff>,
pub config_overrides_diff: Vec<FieldDiff>,
pub community_profile_applied: Vec<String>,
pub config_overrides_applied: Vec<String>,
pub rejected: Vec<RejectedKey>,
}
pub async fn import_config(
_admin: AdminAuth,
State(state): State<AppState>,
Query(query): Query<ImportQuery>,
Json(req): Json<ConfigExport>,
) -> Result<(StatusCode, Json<ImportResponse>), AppError> {
if req.schema_version != EXPORT_SCHEMA_VERSION {
return Err(AppError::Validation(format!(
"unsupported export schemaVersion: got {}, expected {EXPORT_SCHEMA_VERSION}",
req.schema_version
)));
}
let current_profile = load_profile(&state.community_ks).await?;
if let (Some(current), Some(incoming)) = (¤t_profile, &req.community_profile)
&& current.community_did != incoming.community_did
{
return Err(AppError::Conflict(format!(
"communityDid mismatch: current is {}, import carries {}",
current.community_did, incoming.community_did
)));
}
let store = ConfigStore::new(state.config_ks.clone());
let current_overrides = store.snapshot().await?;
let mut profile_diff: Vec<FieldDiff> = Vec::new();
let mut overrides_diff: Vec<FieldDiff> = Vec::new();
let mut rejected: Vec<RejectedKey> = Vec::new();
if let Some(incoming) = &req.community_profile {
for (key, old, new) in profile_field_pairs(current_profile.as_ref(), incoming) {
if old != new {
profile_diff.push(FieldDiff {
key: key.into(),
old_value: old,
new_value: new,
});
}
}
}
for (key, new_value) in &req.config_overrides {
let Some(def) = lookup(key) else {
rejected.push(RejectedKey {
key: key.clone(),
reason: "unknown config key (not in registry)".into(),
});
continue;
};
if let Err(e) = validate_value(def, new_value) {
rejected.push(RejectedKey {
key: key.clone(),
reason: format!("validation failed: {e}"),
});
continue;
}
let old = current_overrides.get(key).cloned();
if old.as_ref() != Some(new_value) {
overrides_diff.push(FieldDiff {
key: key.clone(),
old_value: old,
new_value: Some(new_value.clone()),
});
}
}
if !query.confirm {
return Ok((
StatusCode::OK,
Json(ImportResponse {
confirmed: false,
community_profile_diff: profile_diff,
config_overrides_diff: overrides_diff,
community_profile_applied: Vec::new(),
config_overrides_applied: Vec::new(),
rejected,
}),
));
}
let audit_writer = require_audit_writer(&state)?;
let community_profile_applied = if let Some(incoming) = req.community_profile.clone() {
apply_profile_import(&state, incoming, current_profile.as_ref()).await?
} else {
Vec::new()
};
let mut config_overrides_applied: Vec<String> = Vec::new();
let mut audit_changes: Vec<ConfigChange> = Vec::new();
let mut requires_restart = false;
for FieldDiff {
key,
old_value,
new_value,
} in &overrides_diff
{
let Some(def) = lookup(key) else { continue };
let new_value = match new_value {
Some(v) => v.clone(),
None => continue,
};
if let Err(e) = store.put(key, &new_value).await {
rejected.push(RejectedKey {
key: key.clone(),
reason: format!("persistence failed: {e}"),
});
continue;
}
config_overrides_applied.push(key.clone());
let mut change = ConfigChange {
key: key.clone(),
old_value: old_value.clone(),
new_value,
source_before: if old_value.is_some() {
ConfigSource::Db
} else {
ConfigSource::Default
},
};
change.redact_if(|k| matches!(lookup(k), Some(d) if d.sensitive));
audit_changes.push(change);
if def.requires_restart {
requires_restart = true;
}
}
if !audit_changes.is_empty() {
audit_writer
.write(
"did:key:vtc-admin",
None,
AuditEvent::ConfigChanged(ConfigChangedData {
changes: audit_changes,
requires_restart,
}),
)
.await?;
}
if !community_profile_applied.is_empty() {
audit_writer
.write(
"did:key:vtc-admin",
None,
AuditEvent::CommunityProfileUpdated(CommunityProfileUpdatedData {
fields_changed: community_profile_applied.clone(),
}),
)
.await?;
}
info!(
profile_changed = community_profile_applied.len(),
overrides_applied = config_overrides_applied.len(),
rejected = rejected.len(),
"config imported"
);
Ok((
StatusCode::OK,
Json(ImportResponse {
confirmed: true,
community_profile_diff: profile_diff,
config_overrides_diff: overrides_diff,
community_profile_applied,
config_overrides_applied,
rejected,
}),
))
}
async fn apply_profile_import(
state: &AppState,
incoming: CommunityProfile,
current: Option<&CommunityProfile>,
) -> Result<Vec<String>, AppError> {
let Some(current) = current else {
let mut changed = Vec::new();
if !incoming.name.is_empty() {
changed.push("name".into());
}
if !incoming.description.is_empty() {
changed.push("description".into());
}
if incoming.logo_url.is_some() {
changed.push("logoUrl".into());
}
if incoming.public_url.is_some() {
changed.push("publicUrl".into());
}
if incoming.contact_email.is_some() {
changed.push("contactEmail".into());
}
if incoming.language != "en" {
changed.push("language".into());
}
if !incoming.extensions.is_null() {
changed.push("extensions".into());
}
store_profile(&state.community_ks, &incoming).await?;
return Ok(changed);
};
let patch = CommunityProfileUpdate {
name: Some(incoming.name),
description: Some(incoming.description),
logo_url: Some(incoming.logo_url),
public_url: Some(incoming.public_url),
contact_email: Some(incoming.contact_email),
language: Some(incoming.language),
extensions: Some(incoming.extensions),
};
let mut updated = current.clone();
let changed = patch.apply(&mut updated)?;
if !changed.is_empty() {
store_profile(&state.community_ks, &updated).await?;
}
Ok(changed)
}
fn profile_field_pairs(
current: Option<&CommunityProfile>,
incoming: &CommunityProfile,
) -> Vec<(&'static str, Option<Value>, Option<Value>)> {
let s = |v: &str| Value::String(v.to_string());
let opt_s = |v: &Option<String>| match v {
Some(s) => Value::String(s.clone()),
None => Value::Null,
};
vec![
(
"communityDid",
current.map(|p| s(&p.community_did)),
Some(s(&incoming.community_did)),
),
("name", current.map(|p| s(&p.name)), Some(s(&incoming.name))),
(
"description",
current.map(|p| s(&p.description)),
Some(s(&incoming.description)),
),
(
"logoUrl",
current.map(|p| opt_s(&p.logo_url)),
Some(opt_s(&incoming.logo_url)),
),
(
"publicUrl",
current.map(|p| opt_s(&p.public_url)),
Some(opt_s(&incoming.public_url)),
),
(
"contactEmail",
current.map(|p| opt_s(&p.contact_email)),
Some(opt_s(&incoming.contact_email)),
),
(
"language",
current.map(|p| s(&p.language)),
Some(s(&incoming.language)),
),
(
"extensions",
current.map(|p| p.extensions.clone()),
Some(incoming.extensions.clone()),
),
]
}
#[cfg(test)]
mod tests {
}