use async_trait::async_trait;
use glob::Pattern;
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::config::Config;
use crate::context::Context;
use crate::errors::{ErrorCode, ModuleError};
use crate::events::emitter::EventEmitter;
use crate::module::Module;
use crate::observability::redaction::DEFAULT_REPLACEMENT;
use crate::registry::dependencies::resolve_dependencies;
use crate::registry::registry::Registry;
use crate::registry::types::DepInfo;
use super::audit::{build_audit_entry, record_audit, AuditAction, AuditChange, AuditStore};
use super::overrides::{persist_one, write_override, OverridesStore};
use super::{
augment_with_context_identity, emit_event, is_sensitive_key, missing_field_error,
require_string, ToggleState, RESTRICTED_KEYS,
};
pub struct UpdateConfigModule {
config: Arc<Mutex<Config>>,
emitter: Arc<Mutex<EventEmitter>>,
overrides_path: Option<PathBuf>,
overrides_store: Option<Arc<dyn OverridesStore>>,
audit_store: Option<Arc<dyn AuditStore>>,
}
impl UpdateConfigModule {
pub fn new(config: Arc<Mutex<Config>>, emitter: Arc<Mutex<EventEmitter>>) -> Self {
Self {
config,
emitter,
overrides_path: None,
overrides_store: None,
audit_store: None,
}
}
#[must_use]
pub fn with_overrides_path(mut self, overrides_path: Option<PathBuf>) -> Self {
self.overrides_path = overrides_path;
self
}
#[must_use]
pub fn with_overrides_store(
mut self,
overrides_store: Option<Arc<dyn OverridesStore>>,
) -> Self {
self.overrides_store = overrides_store;
self
}
#[must_use]
pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
self.audit_store = audit_store;
self
}
}
#[async_trait]
impl Module for UpdateConfigModule {
fn description(&self) -> &'static str {
"Update a runtime configuration value by dot-path key"
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"required": ["key", "value", "reason"],
"properties": {
"key": {"type": "string"},
"value": {},
"reason": {"type": "string"}
}
})
}
fn output_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"success": {"type": "boolean"},
"key": {"type": "string"},
"old_value": {},
"new_value": {}
}
})
}
async fn execute(
&self,
inputs: serde_json::Value,
ctx: &Context<serde_json::Value>,
) -> Result<serde_json::Value, ModuleError> {
let key = require_string(&inputs, "key")?;
let reason = require_string(&inputs, "reason")?;
let value = inputs
.get("value")
.cloned()
.ok_or_else(|| missing_field_error("value"))?;
if RESTRICTED_KEYS.contains(&key.as_str()) {
return Err(ModuleError::new(
ErrorCode::ConfigKeyRestricted,
format!("Configuration key '{key}' cannot be changed at runtime"),
)
.with_details([("key".to_string(), json!(key))].into_iter().collect()));
}
let old_value = {
let cfg = self.config.lock().await;
cfg.get(&key)
};
{
let mut cfg = self.config.lock().await;
cfg.set(&key, value.clone());
}
if let Some(store) = self.overrides_store.as_ref() {
if let Err(e) = persist_one(store.as_ref(), &key, &value).await {
tracing::warn!(error = %e, key = %key, "OverridesStore persist failed");
}
} else if let Some(path) = self.overrides_path.as_deref() {
write_override(path, &key, &value);
}
let sensitive = is_sensitive_key(&key);
let redacted_old: serde_json::Value = if sensitive {
json!(DEFAULT_REPLACEMENT)
} else {
old_value.clone().unwrap_or(serde_json::Value::Null)
};
let redacted_new: serde_json::Value = if sensitive {
json!(DEFAULT_REPLACEMENT)
} else {
value.clone()
};
let timestamp = chrono::Utc::now().to_rfc3339();
let event_data = augment_with_context_identity(
json!({
"key": key,
"old_value": redacted_old,
"new_value": redacted_new,
"reason": reason,
}),
ctx,
);
emit_event(
&self.emitter,
"apcore.config.updated",
"system.control.update_config",
×tamp,
event_data,
)
.await;
if sensitive {
tracing::info!(key = %key, reason = %reason, "Config updated: old_value=*** new_value=***");
} else {
tracing::info!(
key = %key,
old_value = ?old_value,
new_value = ?value,
reason = %reason,
"Config updated"
);
}
let entry = build_audit_entry(
AuditAction::UpdateConfig,
"system.control.update_config",
ctx,
AuditChange {
before: redacted_old.clone(),
after: redacted_new.clone(),
},
);
record_audit(self.audit_store.as_ref(), entry).await;
Ok(json!({
"success": true,
"key": key,
"old_value": redacted_old,
"new_value": redacted_new,
}))
}
}
pub struct ReloadModule {
registry: Arc<Registry>,
emitter: Arc<Mutex<EventEmitter>>,
audit_store: Option<Arc<dyn AuditStore>>,
config: Option<Arc<Mutex<Config>>>,
}
impl ReloadModule {
pub fn new(registry: Arc<Registry>, emitter: Arc<Mutex<EventEmitter>>) -> Self {
Self {
registry,
emitter,
audit_store: None,
config: None,
}
}
#[must_use]
pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
self.audit_store = audit_store;
self
}
#[must_use]
pub fn with_config(mut self, config: Option<Arc<Mutex<Config>>>) -> Self {
self.config = config;
self
}
fn topo_sort_modules(&self, matched: &[String]) -> Vec<String> {
let matched_set: std::collections::HashSet<String> = matched.iter().cloned().collect();
let entries: Vec<(String, Vec<DepInfo>)> = matched
.iter()
.map(|mid| {
let deps: Vec<DepInfo> = self
.registry
.get_definition(mid)
.ok()
.flatten()
.map(|d| {
d.dependencies
.into_iter()
.filter(|dep| matched_set.contains(&dep.module_id))
.map(|dep| DepInfo {
module_id: dep.module_id,
version: if dep.version_constraint.is_empty() {
None
} else {
Some(dep.version_constraint)
},
optional: dep.optional,
})
.collect()
})
.unwrap_or_default();
(mid.clone(), deps)
})
.collect();
match resolve_dependencies(&entries, Some(&matched_set), None) {
Ok(order) => order,
Err(e) => {
tracing::warn!(
error = %e,
"Topological sort failed for path_filter reload; falling back to alphabetical"
);
let mut sorted = matched.to_vec();
sorted.sort();
sorted
}
}
}
#[allow(
clippy::too_many_lines,
clippy::single_match_else,
clippy::map_unwrap_or
)]
async fn execute_single(
&self,
module_id: String,
reason: &str,
ctx: &Context<serde_json::Value>,
) -> Result<serde_json::Value, ModuleError> {
let start = std::time::Instant::now();
if !self.registry.has(&module_id) {
return Err(ModuleError::new(
ErrorCode::ModuleNotFound,
format!("Module '{module_id}' not found"),
));
}
let previous_version = self
.registry
.get_definition(&module_id)
.ok()
.flatten()
.map(|d| d.version)
.unwrap_or_else(|| "unknown".to_string());
let suspended_state = match self.registry.get(&module_id) {
Ok(Some(module)) => {
let module_for_panic = Arc::clone(&module);
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
module_for_panic.on_suspend()
})) {
Ok(state) => state,
Err(_) => {
tracing::warn!(
module_id = %module_id,
"Module on_suspend panicked; continuing reload"
);
None
}
}
}
_ => None,
};
self.registry.safe_unregister(&module_id, 5000).await?;
match self.registry.discover_internal().await {
Ok(count) => tracing::debug!(
module_id = %module_id,
count,
"Reload: discover_internal repopulated registry"
),
Err(e) => tracing::warn!(
module_id = %module_id,
error = %e.message,
"Reload: discover_internal returned error (best-effort, continuing)"
),
}
let new_version = self
.registry
.get_definition(&module_id)
.ok()
.flatten()
.map(|d| d.version)
.unwrap_or_else(|| previous_version.clone());
if let Some(state) = suspended_state {
if let Ok(Some(module)) = self.registry.get(&module_id) {
let module_for_panic = Arc::clone(&module);
if std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
module_for_panic.on_resume(state);
}))
.is_err()
{
tracing::warn!(
module_id = %module_id,
"Module on_resume panicked; reload still considered successful"
);
}
}
}
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
let timestamp = chrono::Utc::now().to_rfc3339();
let event_data = augment_with_context_identity(
json!({
"module_id": module_id,
"previous_version": previous_version,
"new_version": new_version,
"reason": reason,
}),
ctx,
);
emit_event(
&self.emitter,
"apcore.module.reloaded",
&module_id,
×tamp,
event_data,
)
.await;
tracing::info!(
module_id = %module_id,
previous_version = %previous_version,
new_version = %new_version,
reason = %reason,
"Module reloaded"
);
let entry = build_audit_entry(
AuditAction::ReloadModule,
&module_id,
ctx,
AuditChange {
before: json!(previous_version),
after: json!(new_version),
},
);
record_audit(self.audit_store.as_ref(), entry).await;
Ok(json!({
"success": true,
"module_id": module_id,
"previous_version": previous_version,
"new_version": new_version,
"reload_duration_ms": elapsed_ms,
}))
}
async fn execute_bulk(
&self,
path_filter: String,
reason: &str,
ctx: &Context<serde_json::Value>,
) -> Result<serde_json::Value, ModuleError> {
let pattern = Pattern::new(&path_filter).map_err(|e| {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
format!("'path_filter' is not a valid glob pattern: {e}"),
)
})?;
let mut matched: Vec<String> = self
.registry
.module_ids()
.into_iter()
.filter(|id| pattern.matches(id))
.collect();
matched.sort();
let order = self.topo_sort_modules(&matched);
let start = std::time::Instant::now();
let mut reloaded: Vec<String> = Vec::new();
for mid in order {
if !self.registry.has(&mid) {
continue;
}
match self.registry.safe_unregister(&mid, 5000).await {
Ok(_) => {
let timestamp = chrono::Utc::now().to_rfc3339();
let event_data = augment_with_context_identity(
json!({
"previous_version": "unknown",
"new_version": "unknown",
"reason": reason,
}),
ctx,
);
emit_event(
&self.emitter,
"apcore.module.reloaded",
&mid,
×tamp,
event_data,
)
.await;
reloaded.push(mid);
}
Err(e) => {
tracing::error!(error = %e, module_id = %mid, "Bulk reload: failed to unregister");
}
}
}
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
tracing::info!(
count = reloaded.len(),
path_filter = %path_filter,
reason = %reason,
"Bulk module reload"
);
let entry = build_audit_entry(
AuditAction::ReloadModule,
&path_filter,
ctx,
AuditChange {
before: serde_json::Value::Null,
after: json!(reloaded.clone()),
},
);
record_audit(self.audit_store.as_ref(), entry).await;
Ok(json!({
"success": true,
"module_id": serde_json::Value::Null,
"reloaded_modules": reloaded,
"reload_duration_ms": elapsed_ms,
}))
}
}
#[async_trait]
impl Module for ReloadModule {
fn description(&self) -> &'static str {
"Hot-reload a module by safe unregister (re-registration must be done explicitly in Rust)"
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"required": ["reason"],
"properties": {
"module_id": {"type": "string"},
"path_filter": {"type": "string"},
"reload_dependents": {"type": "boolean", "default": false},
"reload_config": {"type": "boolean", "default": false},
"reason": {"type": "string"}
}
})
}
fn output_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"success": {"type": "boolean"},
"module_id": {"type": ["string", "null"]},
"previous_version": {"type": "string"},
"new_version": {"type": "string"},
"reload_duration_ms": {"type": "number"},
"reloaded_modules": {"type": "array", "items": {"type": "string"}}
}
})
}
async fn execute(
&self,
inputs: serde_json::Value,
ctx: &Context<serde_json::Value>,
) -> Result<serde_json::Value, ModuleError> {
let reason = require_string(&inputs, "reason")?;
let module_id_input = inputs
.get("module_id")
.filter(|v| !v.is_null())
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty());
let path_filter_input = inputs
.get("path_filter")
.filter(|v| !v.is_null())
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty());
if module_id_input.is_some() && path_filter_input.is_some() {
return Err(ModuleError::new(
ErrorCode::ModuleReloadConflict,
"'module_id' and 'path_filter' are mutually exclusive",
));
}
let reload_config_flag = inputs
.get("reload_config")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let mut config_reloaded = false;
if reload_config_flag {
if let Some(cfg_handle) = self.config.as_ref() {
let mut cfg = cfg_handle.lock().await;
match cfg.reload_from_disk() {
Ok(()) => {
config_reloaded = true;
let timestamp = chrono::Utc::now().to_rfc3339();
emit_event(
&self.emitter,
"apcore.config.reloaded",
"system.control.reload_module",
×tamp,
json!({"reason": reason}),
)
.await;
tracing::info!(reason = %reason, "Config reloaded from disk");
}
Err(e) => {
tracing::warn!(
error = %e.message,
"reload_config: Config::reload_from_disk failed (continuing)"
);
}
}
} else {
tracing::warn!(
"reload_config: requested but no Config bound to ReloadModule \
(use ReloadModule::with_config to enable)"
);
}
}
if let Some(filter) = path_filter_input {
let mut result = self.execute_bulk(filter.to_string(), &reason, ctx).await?;
if let Some(obj) = result.as_object_mut() {
obj.insert("config_reloaded".to_string(), json!(config_reloaded));
}
return Ok(result);
}
if module_id_input.is_none() && reload_config_flag {
return Ok(json!({
"success": true,
"module_id": serde_json::Value::Null,
"config_reloaded": config_reloaded,
}));
}
let module_id = module_id_input.ok_or_else(|| {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
"'module_id', 'path_filter', or 'reload_config' is required",
)
})?;
let mut result = self
.execute_single(module_id.to_string(), &reason, ctx)
.await?;
if let Some(obj) = result.as_object_mut() {
obj.insert("config_reloaded".to_string(), json!(config_reloaded));
}
Ok(result)
}
}
pub struct ToggleFeatureModule {
registry: Arc<Registry>,
emitter: Arc<Mutex<EventEmitter>>,
toggle_state: Arc<ToggleState>,
overrides_path: Option<PathBuf>,
overrides_store: Option<Arc<dyn OverridesStore>>,
audit_store: Option<Arc<dyn AuditStore>>,
}
impl ToggleFeatureModule {
pub fn new(
registry: Arc<Registry>,
emitter: Arc<Mutex<EventEmitter>>,
toggle_state: Arc<ToggleState>,
) -> Self {
Self {
registry,
emitter,
toggle_state,
overrides_path: None,
overrides_store: None,
audit_store: None,
}
}
#[must_use]
pub fn with_overrides_path(mut self, overrides_path: Option<PathBuf>) -> Self {
self.overrides_path = overrides_path;
self
}
#[must_use]
pub fn with_overrides_store(
mut self,
overrides_store: Option<Arc<dyn OverridesStore>>,
) -> Self {
self.overrides_store = overrides_store;
self
}
#[must_use]
pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
self.audit_store = audit_store;
self
}
}
#[async_trait]
impl Module for ToggleFeatureModule {
fn description(&self) -> &'static str {
"Disable or enable a module without unloading it"
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"required": ["module_id", "enabled", "reason"],
"properties": {
"module_id": {"type": "string"},
"enabled": {"type": "boolean"},
"reason": {"type": "string"}
}
})
}
fn output_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"success": {"type": "boolean"},
"module_id": {"type": "string"},
"enabled": {"type": "boolean"}
}
})
}
async fn execute(
&self,
inputs: serde_json::Value,
ctx: &Context<serde_json::Value>,
) -> Result<serde_json::Value, ModuleError> {
let module_id = require_string(&inputs, "module_id")?;
let reason = require_string(&inputs, "reason")?;
let enabled = inputs
.get("enabled")
.and_then(serde_json::Value::as_bool)
.ok_or_else(|| {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
"'enabled' is required and must be a boolean",
)
})?;
if !self.registry.has(&module_id) {
return Err(ModuleError::new(
ErrorCode::ModuleNotFound,
format!("Module '{module_id}' not found"),
));
}
let before_enabled = !self.toggle_state.is_disabled(&module_id);
if enabled {
self.registry.enable(&module_id)?;
self.toggle_state.enable(&module_id);
} else {
self.registry.disable(&module_id)?;
self.toggle_state.disable(&module_id);
}
let toggle_key = format!("toggle.{module_id}");
let toggle_value = serde_json::Value::Bool(enabled);
if let Some(store) = self.overrides_store.as_ref() {
if let Err(e) = persist_one(store.as_ref(), &toggle_key, &toggle_value).await {
tracing::warn!(error = %e, key = %toggle_key, "OverridesStore persist failed");
}
} else if let Some(path) = self.overrides_path.as_deref() {
write_override(path, &toggle_key, &toggle_value);
}
let timestamp = chrono::Utc::now().to_rfc3339();
let event_data = augment_with_context_identity(
json!({
"module_id": module_id,
"enabled": enabled,
"reason": reason,
}),
ctx,
);
emit_event(
&self.emitter,
"apcore.module.toggled",
&module_id,
×tamp,
event_data,
)
.await;
tracing::info!(
module_id = %module_id,
enabled = %enabled,
reason = %reason,
"Module toggled"
);
let entry = build_audit_entry(
AuditAction::ToggleFeature,
&module_id,
ctx,
AuditChange {
before: json!(before_enabled),
after: json!(enabled),
},
);
record_audit(self.audit_store.as_ref(), entry).await;
Ok(json!({
"success": true,
"module_id": module_id,
"enabled": enabled,
}))
}
}