pub mod control;
pub mod health;
pub mod manifest;
pub mod usage;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use parking_lot::RwLock;
use serde_json::json;
use tokio::sync::Mutex;
use crate::config::Config;
use crate::errors::{ErrorCode, ModuleError};
use crate::events::emitter::{ApCoreEvent, EventEmitter};
use crate::events::subscribers::create_subscriber;
use crate::executor::Executor;
use crate::middleware::PlatformNotifyMiddleware;
use crate::module::Module;
use crate::observability::error_history::{ErrorHistory, ErrorHistoryMiddleware};
use crate::observability::metrics::MetricsCollector;
use crate::observability::usage::{UsageCollector, UsageMiddleware};
use crate::registry::registry::{ModuleDescriptor, Registry};
pub use control::UpdateConfigModule;
pub(crate) use control::{ReloadModule, ToggleFeatureModule};
pub struct ToggleState {
disabled: RwLock<HashSet<String>>,
}
impl ToggleState {
pub fn new() -> Self {
Self {
disabled: RwLock::new(HashSet::new()),
}
}
pub fn is_disabled(&self, module_id: &str) -> bool {
self.disabled.read().contains(module_id)
}
pub fn disable(&self, module_id: &str) {
self.disabled.write().insert(module_id.to_string());
}
pub fn enable(&self, module_id: &str) {
self.disabled.write().remove(module_id);
}
pub fn clear(&self) {
self.disabled.write().clear();
}
}
impl Default for ToggleState {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_TOGGLE_STATE: OnceLock<ToggleState> = OnceLock::new();
fn global_toggle_state() -> &'static ToggleState {
GLOBAL_TOGGLE_STATE.get_or_init(ToggleState::new)
}
pub fn is_module_disabled(module_id: &str) -> bool {
global_toggle_state().is_disabled(module_id)
}
pub fn check_module_disabled(module_id: &str) -> Result<(), ModuleError> {
if is_module_disabled(module_id) {
return Err(ModuleError::new(
ErrorCode::ModuleDisabled,
format!("Module '{module_id}' is disabled"),
));
}
Ok(())
}
pub(crate) const SENSITIVE_SEGMENTS: &[&str] =
&["token", "secret", "key", "password", "auth", "credential"];
pub(crate) fn is_sensitive_key(key: &str) -> bool {
let lower = key.to_lowercase();
lower.split('.').any(|seg| {
SENSITIVE_SEGMENTS.iter().any(|&s| {
seg == s || seg.ends_with(&format!("_{s}")) || seg.starts_with(&format!("{s}_"))
})
})
}
pub(crate) const RESTRICTED_KEYS: &[&str] = &["sys_modules.enabled"];
pub(crate) fn require_string(
inputs: &serde_json::Value,
field: &str,
) -> Result<String, ModuleError> {
inputs
.get(field)
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(std::string::ToString::to_string)
.ok_or_else(|| {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
format!("'{field}' is required and must be a non-empty string"),
)
})
}
pub(crate) fn missing_field_error(field: &str) -> ModuleError {
ModuleError::new(
ErrorCode::GeneralInvalidInput,
format!("'{field}' is required"),
)
}
pub(crate) async fn emit_event(
emitter: &Arc<Mutex<EventEmitter>>,
event_type: &str,
module_id: &str,
timestamp: &str,
data: serde_json::Value,
) {
let event = ApCoreEvent {
event_type: event_type.to_string(),
timestamp: timestamp.to_string(),
data,
module_id: Some(module_id.to_string()),
severity: "info".to_string(),
};
let em = emitter.lock().await;
if let Err(e) = em.emit(&event).await {
tracing::warn!(error = %e, event_type = %event_type, "Event emit failed");
}
}
pub struct SysModulesContext {
pub registered_modules: HashMap<String, serde_json::Value>,
pub emitter: Arc<Mutex<EventEmitter>>,
pub toggle_state: Arc<ToggleState>,
pub error_history: ErrorHistory,
pub usage_collector: UsageCollector,
}
#[allow(clippy::too_many_lines)] #[allow(clippy::needless_pass_by_value)] pub fn register_sys_modules(
registry: Arc<Registry>,
executor: &Executor,
config: &Config,
metrics_collector: Option<MetricsCollector>,
) -> Option<SysModulesContext> {
let enabled = config
.get("sys_modules.enabled")
.and_then(|v| v.as_bool())
.unwrap_or(true);
if !enabled {
return None;
}
#[allow(clippy::cast_possible_truncation)] let max_per_module = config
.get("sys_modules.error_history.max_entries_per_module")
.and_then(|v| v.as_u64())
.unwrap_or(50) as usize;
#[allow(clippy::cast_possible_truncation)] let max_total = config
.get("sys_modules.error_history.max_total_entries")
.and_then(|v| v.as_u64())
.unwrap_or(1000) as usize;
let error_history = ErrorHistory::with_limits(max_per_module, max_total);
let eh_middleware = ErrorHistoryMiddleware::new(error_history.clone());
let _ = executor.use_middleware(Box::new(eh_middleware));
let usage_collector = UsageCollector::new();
let usage_middleware = UsageMiddleware::new(usage_collector.clone());
let _ = executor.use_middleware(Box::new(usage_middleware));
let config_arc = Arc::new(Mutex::new(config.clone()));
let mut emitter = EventEmitter::new();
let events_enabled = config
.get("sys_modules.events.enabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if events_enabled {
if let Some(subs) = config.get("sys_modules.events.subscribers") {
if let Some(arr) = subs.as_array() {
for sub_config in arr {
match create_subscriber(sub_config) {
Ok(subscriber) => emitter.subscribe(subscriber),
Err(e) => {
tracing::warn!(error = %e, "Failed to create subscriber from config");
}
}
}
}
}
}
let emitter_arc = Arc::new(Mutex::new(emitter));
let toggle_state = Arc::new(ToggleState::new());
let mut modules: Vec<(&str, Box<dyn Module>, Vec<String>)> = vec![
(
"system.health.summary",
Box::new(health::HealthSummaryModule::new(
Arc::clone(®istry),
metrics_collector.clone(),
error_history.clone(),
Arc::clone(&config_arc),
)),
vec!["system".into(), "health".into()],
),
(
"system.health.module",
Box::new(health::HealthModule::new(
Arc::clone(®istry),
metrics_collector.clone(),
error_history.clone(),
)),
vec!["system".into(), "health".into()],
),
(
"system.manifest.module",
Box::new(manifest::ManifestModule::new(
Arc::clone(®istry),
Arc::clone(&config_arc),
)),
vec!["system".into(), "manifest".into()],
),
(
"system.manifest.full",
Box::new(manifest::ManifestFullModule::new(
Arc::clone(®istry),
Arc::clone(&config_arc),
)),
vec!["system".into(), "manifest".into()],
),
(
"system.usage.summary",
Box::new(usage::UsageSummaryModule::new(usage_collector.clone())),
vec!["system".into(), "usage".into()],
),
(
"system.usage.module",
Box::new(usage::UsageModule::new(
Arc::clone(®istry),
usage_collector.clone(),
)),
vec!["system".into(), "usage".into()],
),
];
if events_enabled {
let error_rate_threshold = config
.get("sys_modules.events.thresholds.error_rate")
.and_then(|v| v.as_f64())
.unwrap_or(0.1);
let latency_p99_threshold = config
.get("sys_modules.events.thresholds.latency_p99_ms")
.and_then(|v| v.as_f64())
.unwrap_or(5000.0);
let pn_middleware = PlatformNotifyMiddleware::new(
EventEmitter::new(),
metrics_collector.clone(),
error_rate_threshold,
latency_p99_threshold,
);
let _ = executor.use_middleware(Box::new(pn_middleware));
modules.push((
"system.control.update_config",
Box::new(UpdateConfigModule::new(
Arc::clone(&config_arc),
Arc::clone(&emitter_arc),
)),
vec!["system".into(), "control".into()],
));
modules.push((
"system.control.reload_module",
Box::new(ReloadModule::new(
Arc::clone(®istry),
Arc::clone(&emitter_arc),
)),
vec!["system".into(), "control".into()],
));
modules.push((
"system.control.toggle_feature",
Box::new(ToggleFeatureModule::new(
Arc::clone(®istry),
Arc::clone(&emitter_arc),
Arc::clone(&toggle_state),
)),
vec!["system".into(), "control".into()],
));
}
let mut registered: HashMap<String, serde_json::Value> = HashMap::new();
for (id, module, tags) in modules {
let is_control = tags.contains(&"control".to_string());
let descriptor = ModuleDescriptor {
name: id.to_string(),
annotations: crate::module::ModuleAnnotations {
requires_approval: is_control,
readonly: !is_control,
idempotent: !is_control,
..Default::default()
},
input_schema: module.input_schema(),
output_schema: module.output_schema(),
enabled: true,
tags,
dependencies: vec![],
};
let info = json!({
"name": id,
"description": module.description(),
});
match registry.register_internal(id, module, descriptor) {
Ok(()) => {
registered.insert(id.to_string(), info);
}
Err(e) => {
tracing::warn!(module_id = %id, error = %e, "Failed to register sys module");
}
}
}
if events_enabled {
registry.on(
"register",
Box::new(move |module_id: &str, _module: &dyn Module| {
tracing::info!(module_id = %module_id, "module_registered");
}),
);
registry.on(
"unregister",
Box::new(move |module_id: &str, _module: &dyn Module| {
tracing::info!(module_id = %module_id, "module_unregistered");
}),
);
}
Some(SysModulesContext {
registered_modules: registered,
emitter: emitter_arc,
toggle_state,
error_history,
usage_collector,
})
}