pub mod audit;
pub mod control;
pub mod health;
pub mod manifest;
pub mod overrides;
pub mod usage;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::{Arc, OnceLock};
use parking_lot::RwLock;
use serde_json::json;
use thiserror::Error;
use tokio::sync::Mutex;
use crate::config::Config;
use crate::errors::{ErrorCode, ModuleError};
use crate::events::emitter::{ApCoreEvent, EventEmitter};
use crate::events::subscribers::create_subscriber;
use crate::executor::Executor;
use crate::middleware::PlatformNotifyMiddleware;
use crate::module::Module;
use crate::observability::error_history::{ErrorHistory, ErrorHistoryMiddleware};
use crate::observability::metrics::MetricsCollector;
use crate::observability::usage::{UsageCollector, UsageMiddleware};
use crate::registry::registry::{is_ephemeral_module_id, ModuleDescriptor, Registry};
pub use audit::{AuditAction, AuditChange, AuditEntry, AuditStore, InMemoryAuditStore};
pub use control::{ReloadModule, ToggleFeatureModule, UpdateConfigModule};
pub use health::{HealthModule, HealthSummaryModule};
pub use manifest::{ManifestFullModule, ManifestModule};
pub use usage::{UsageModule, UsageSummaryModule};
pub struct ToggleState {
disabled: RwLock<HashSet<String>>,
}
impl ToggleState {
#[must_use]
pub fn new() -> Self {
Self {
disabled: RwLock::new(HashSet::new()),
}
}
pub fn is_disabled(&self, module_id: &str) -> bool {
self.disabled.read().contains(module_id)
}
pub fn disable(&self, module_id: &str) {
self.disabled.write().insert(module_id.to_string());
}
pub fn enable(&self, module_id: &str) {
self.disabled.write().remove(module_id);
}
pub fn clear(&self) {
self.disabled.write().clear();
}
}
impl Default for ToggleState {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_TOGGLE_STATE: OnceLock<ToggleState> = OnceLock::new();
fn global_toggle_state() -> &'static ToggleState {
GLOBAL_TOGGLE_STATE.get_or_init(ToggleState::new)
}
#[must_use]
pub fn is_module_disabled(module_id: &str) -> bool {
global_toggle_state().is_disabled(module_id)
}
pub fn check_module_disabled(module_id: &str) -> Result<(), ModuleError> {
if is_module_disabled(module_id) {
return Err(ModuleError::new(
ErrorCode::ModuleDisabled,
format!("Module '{module_id}' is disabled"),
));
}
Ok(())
}
pub(crate) const SENSITIVE_SEGMENTS: &[&str] =
&["token", "secret", "key", "password", "auth", "credential"];
pub(crate) fn is_sensitive_key(key: &str) -> bool {
let lower = key.to_lowercase();
lower.split('.').any(|seg| {
SENSITIVE_SEGMENTS.iter().any(|&s| {
seg == s || seg.ends_with(&format!("_{s}")) || seg.starts_with(&format!("{s}_"))
})
})
}
pub(crate) const RESTRICTED_KEYS: &[&str] = &["sys_modules.enabled"];
pub(crate) fn require_string(
inputs: &serde_json::Value,
field: &str,
) -> Result<String, ModuleError> {
inputs
.get(field)
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(std::string::ToString::to_string)
.ok_or_else(|| {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
format!("'{field}' is required and must be a non-empty string"),
)
})
}
pub(crate) fn missing_field_error(field: &str) -> ModuleError {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
format!("'{field}' is required"),
)
}
pub(crate) async fn emit_event(
emitter: &Arc<Mutex<EventEmitter>>,
event_type: &str,
module_id: &str,
timestamp: &str,
data: serde_json::Value,
) {
let event = ApCoreEvent {
event_type: event_type.to_string(),
timestamp: timestamp.to_string(),
data,
module_id: Some(module_id.to_string()),
severity: "info".to_string(),
};
let em = emitter.lock().await;
em.emit(&event).await;
}
pub(crate) const DEFAULT_EXTERNAL_CALLER: &str = "@external";
const IDENTITY_SENSITIVE_SUBSTRINGS: &[&str] = &[
"token",
"secret",
"password",
"passwd",
"key",
"auth",
"credential",
"cookie",
"session",
"bearer",
];
const IDENTITY_REDACTED_TOKEN: &str = "<redacted>";
fn redact_identity_attr(name: &str, value: &serde_json::Value) -> serde_json::Value {
let lower = name.to_lowercase();
if IDENTITY_SENSITIVE_SUBSTRINGS
.iter()
.any(|sub| lower.contains(sub))
{
return serde_json::Value::String(IDENTITY_REDACTED_TOKEN.to_string());
}
value.clone()
}
pub(crate) fn augment_with_context_identity(
mut data: serde_json::Value,
ctx: &crate::context::Context<serde_json::Value>,
) -> serde_json::Value {
if let Some(obj) = data.as_object_mut() {
let caller_id = ctx
.caller_id
.as_ref()
.filter(|s| !s.is_empty())
.cloned()
.unwrap_or_else(|| DEFAULT_EXTERNAL_CALLER.to_string());
obj.insert("caller_id".to_string(), serde_json::json!(caller_id));
if let Some(identity) = ctx.identity.as_ref() {
obj.insert("actor_id".to_string(), serde_json::json!(identity.id()));
obj.insert(
"actor_type".to_string(),
serde_json::json!(identity.identity_type()),
);
let mut snapshot = serde_json::Map::new();
snapshot.insert("id".to_string(), serde_json::json!(identity.id()));
snapshot.insert(
"type".to_string(),
serde_json::json!(identity.identity_type()),
);
let roles = identity.roles();
if !roles.is_empty() {
snapshot.insert("roles".to_string(), serde_json::json!(roles));
}
for (key, value) in identity.attrs() {
if matches!(key.as_str(), "id" | "type" | "roles") {
continue;
}
snapshot.insert(key.clone(), redact_identity_attr(key, value));
}
obj.insert("identity".to_string(), serde_json::Value::Object(snapshot));
}
}
data
}
#[derive(Debug, Error)]
pub enum SysModuleError {
#[error("system module '{module_id}' failed to register: {source}")]
RegistrationFailed {
module_id: String,
#[source]
source: ModuleError,
},
}
impl SysModuleError {
#[must_use]
pub fn module_id(&self) -> &str {
match self {
Self::RegistrationFailed { module_id, .. } => module_id,
}
}
#[must_use]
pub fn error_code(&self) -> ErrorCode {
ErrorCode::SysModuleRegistrationFailed
}
}
pub struct SysModulesContext {
pub registered_modules: HashMap<String, serde_json::Value>,
pub emitter: Arc<Mutex<EventEmitter>>,
pub toggle_state: Arc<ToggleState>,
pub error_history: ErrorHistory,
pub usage_collector: UsageCollector,
pub audit_store: Option<Arc<dyn AuditStore>>,
}
#[derive(Default, Clone)]
pub struct SysModulesOptions {
pub overrides_path: Option<PathBuf>,
pub overrides_store: Option<Arc<dyn overrides::OverridesStore>>,
pub audit_store: Option<Arc<dyn AuditStore>>,
pub fail_on_error: bool,
}
pub fn register_sys_modules(
registry: Arc<Registry>,
executor: &Executor,
config: &Config,
metrics_collector: Option<MetricsCollector>,
) -> Result<SysModulesContext, SysModuleError> {
register_sys_modules_with_options(
registry,
executor,
config,
metrics_collector,
SysModulesOptions::default(),
)
}
#[allow(clippy::too_many_lines)] #[allow(clippy::needless_pass_by_value)] pub fn register_sys_modules_with_options(
registry: Arc<Registry>,
executor: &Executor,
config: &Config,
metrics_collector: Option<MetricsCollector>,
options: SysModulesOptions,
) -> Result<SysModulesContext, SysModuleError> {
let SysModulesOptions {
overrides_path,
overrides_store,
audit_store,
fail_on_error,
} = options;
let toggle_state = Arc::new(ToggleState::new());
let enabled = config
.get("sys_modules.enabled")
.and_then(|v| v.as_bool())
.unwrap_or(true);
if !enabled {
return Ok(SysModulesContext {
registered_modules: HashMap::new(),
emitter: Arc::new(Mutex::new(EventEmitter::new())),
toggle_state,
error_history: ErrorHistory::with_limits(50, 1000),
usage_collector: UsageCollector::new(),
audit_store,
});
}
let mut effective_config = config.clone();
if let Some(path) = overrides_path.as_deref() {
overrides::load_overrides(path, &mut effective_config, Some(&toggle_state));
}
#[allow(clippy::cast_possible_truncation)] let max_per_module = effective_config
.get("sys_modules.error_history.max_entries_per_module")
.and_then(|v| v.as_u64())
.unwrap_or(50) as usize;
#[allow(clippy::cast_possible_truncation)] let max_total = effective_config
.get("sys_modules.error_history.max_total_entries")
.and_then(|v| v.as_u64())
.unwrap_or(1000) as usize;
let error_history = ErrorHistory::with_limits(max_per_module, max_total);
let eh_middleware = ErrorHistoryMiddleware::new(error_history.clone());
if let Err(e) = executor.use_middleware(Box::new(eh_middleware)) {
tracing::error!(error = %e, middleware = "ErrorHistoryMiddleware", "sys middleware registration failed");
}
let usage_collector = UsageCollector::new();
let usage_middleware = UsageMiddleware::new(usage_collector.clone());
if let Err(e) = executor.use_middleware(Box::new(usage_middleware)) {
tracing::error!(error = %e, middleware = "UsageMiddleware", "sys middleware registration failed");
}
let config_arc = Arc::new(Mutex::new(effective_config.clone()));
let mut emitter = EventEmitter::new();
let events_enabled = effective_config
.get("sys_modules.events.enabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !events_enabled
&& (overrides_path.is_some() || overrides_store.is_some() || audit_store.is_some())
{
tracing::warn!(
overrides_path_set = overrides_path.is_some(),
overrides_store_set = overrides_store.is_some(),
audit_store_set = audit_store.is_some(),
"SysModulesOptions overrides/audit options set but \
sys_modules.events.enabled=false — control modules are not \
registered, so these options have no effect. Enable events to \
activate runtime overrides and audit trails."
);
}
if events_enabled {
if let Some(subs) = effective_config.get("sys_modules.events.subscribers") {
if let Some(arr) = subs.as_array() {
for sub_config in arr {
match create_subscriber(sub_config) {
Ok(subscriber) => emitter.subscribe(subscriber),
Err(e) => {
tracing::warn!(error = %e, "Failed to create subscriber from config");
}
}
}
}
}
}
let emitter_arc = Arc::new(Mutex::new(emitter));
let mut modules: Vec<(&str, Box<dyn Module>, Vec<String>)> = vec![
(
"system.health.summary",
Box::new(health::HealthSummaryModule::new(
Arc::clone(®istry),
metrics_collector.clone(),
error_history.clone(),
Arc::clone(&config_arc),
)),
vec!["system".into(), "health".into()],
),
(
"system.health.module",
Box::new(health::HealthModule::new(
Arc::clone(®istry),
metrics_collector.clone(),
error_history.clone(),
)),
vec!["system".into(), "health".into()],
),
(
"system.manifest.module",
Box::new(manifest::ManifestModule::new(
Arc::clone(®istry),
Arc::clone(&config_arc),
)),
vec!["system".into(), "manifest".into()],
),
(
"system.manifest.full",
Box::new(manifest::ManifestFullModule::new(
Arc::clone(®istry),
Arc::clone(&config_arc),
)),
vec!["system".into(), "manifest".into()],
),
(
"system.usage.summary",
Box::new(usage::UsageSummaryModule::new(usage_collector.clone())),
vec!["system".into(), "usage".into()],
),
(
"system.usage.module",
Box::new(usage::UsageModule::new(
Arc::clone(®istry),
usage_collector.clone(),
)),
vec!["system".into(), "usage".into()],
),
];
if events_enabled {
let error_rate_threshold = effective_config
.get("sys_modules.events.thresholds.error_rate")
.and_then(|v| v.as_f64())
.unwrap_or(0.1);
let latency_p99_threshold = effective_config
.get("sys_modules.events.thresholds.latency_p99_ms")
.and_then(|v| v.as_f64())
.unwrap_or(5000.0);
let pn_middleware = PlatformNotifyMiddleware::new(
EventEmitter::new(),
metrics_collector.clone(),
error_rate_threshold,
latency_p99_threshold,
);
if let Err(e) = executor.use_middleware(Box::new(pn_middleware)) {
tracing::error!(error = %e, middleware = "PlatformNotifyMiddleware", "sys middleware registration failed");
}
modules.push((
"system.control.update_config",
Box::new(
UpdateConfigModule::new(Arc::clone(&config_arc), Arc::clone(&emitter_arc))
.with_overrides_path(overrides_path.clone())
.with_overrides_store(overrides_store.clone())
.with_audit_store(audit_store.clone()),
),
vec!["system".into(), "control".into()],
));
modules.push((
"system.control.reload_module",
Box::new(
ReloadModule::new(Arc::clone(®istry), Arc::clone(&emitter_arc))
.with_audit_store(audit_store.clone()),
),
vec!["system".into(), "control".into()],
));
modules.push((
"system.control.toggle_feature",
Box::new(
ToggleFeatureModule::new(
Arc::clone(®istry),
Arc::clone(&emitter_arc),
Arc::clone(&toggle_state),
)
.with_overrides_path(overrides_path.clone())
.with_overrides_store(overrides_store.clone())
.with_audit_store(audit_store.clone()),
),
vec!["system".into(), "control".into()],
));
}
let mut registered: HashMap<String, serde_json::Value> = HashMap::new();
for (id, module, tags) in modules {
let is_control = tags.contains(&"control".to_string());
let descriptor = ModuleDescriptor {
module_id: id.to_string(),
name: None,
description: module.description().to_string(),
documentation: None,
input_schema: module.input_schema(),
output_schema: module.output_schema(),
version: "1.0.0".to_string(),
tags,
annotations: Some(crate::module::ModuleAnnotations {
requires_approval: is_control,
readonly: !is_control,
idempotent: !is_control,
..Default::default()
}),
examples: vec![],
metadata: std::collections::HashMap::new(),
display: None,
sunset_date: None,
dependencies: vec![],
enabled: true,
};
let info = json!({
"name": id,
"description": module.description(),
});
match registry.register_internal(id, module, descriptor) {
Ok(()) => {
registered.insert(id.to_string(), info);
}
Err(e) => {
if fail_on_error {
return Err(SysModuleError::RegistrationFailed {
module_id: id.to_string(),
source: e,
});
}
tracing::error!(module_id = %id, error = %e, "System module failed to register; continuing");
}
}
}
if events_enabled {
let emitter_for_register = Arc::clone(&emitter_arc);
registry.on(
"register",
Box::new(move |module_id: &str, _module: &dyn Module| {
tracing::info!(module_id = %module_id, "module_registered");
let emitter = Arc::clone(&emitter_for_register);
let module_id_owned = module_id.to_string();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if is_ephemeral_module_id(&module_id_owned) {
let payload = json!({
"namespace_class": "ephemeral",
"caller_id": "@external",
});
let canonical = ApCoreEvent::with_module(
"apcore.registry.module_registered",
payload,
&module_id_owned,
"info",
);
let em = emitter.lock().await;
em.emit(&canonical).await;
return;
}
let canonical = ApCoreEvent::with_module(
"apcore.registry.module_registered",
json!({}),
&module_id_owned,
"info",
);
let legacy = ApCoreEvent::with_module(
"module_registered",
json!({
"deprecated": true,
"canonical_event": "apcore.registry.module_registered",
}),
&module_id_owned,
"info",
);
let em = emitter.lock().await;
em.emit(&canonical).await;
em.emit(&legacy).await;
});
}
}),
);
let emitter_for_unregister = Arc::clone(&emitter_arc);
registry.on(
"unregister",
Box::new(move |module_id: &str, _module: &dyn Module| {
tracing::info!(module_id = %module_id, "module_unregistered");
let emitter = Arc::clone(&emitter_for_unregister);
let module_id_owned = module_id.to_string();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if is_ephemeral_module_id(&module_id_owned) {
let payload = json!({
"namespace_class": "ephemeral",
"caller_id": "@external",
});
let canonical = ApCoreEvent::with_module(
"apcore.registry.module_unregistered",
payload,
&module_id_owned,
"info",
);
let em = emitter.lock().await;
em.emit(&canonical).await;
return;
}
let canonical = ApCoreEvent::with_module(
"apcore.registry.module_unregistered",
json!({}),
&module_id_owned,
"info",
);
let legacy = ApCoreEvent::with_module(
"module_unregistered",
json!({
"deprecated": true,
"canonical_event": "apcore.registry.module_unregistered",
}),
&module_id_owned,
"info",
);
let em = emitter.lock().await;
em.emit(&canonical).await;
em.emit(&legacy).await;
});
}
}),
);
}
Ok(SysModulesContext {
registered_modules: registered,
emitter: emitter_arc,
toggle_state,
error_history,
usage_collector,
audit_store,
})
}