use async_trait::async_trait;
use parking_lot::RwLock;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
};
use crate::errors::ModuleError;
use crate::module::{Module, ModuleAnnotations, ValidationResult};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModuleDescriptor {
pub name: String,
pub annotations: ModuleAnnotations,
pub input_schema: serde_json::Value,
pub output_schema: serde_json::Value,
#[serde(default)]
pub enabled: bool,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub dependencies: Vec<DependencyInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DependencyInfo {
pub module_id: String,
pub version_constraint: String,
#[serde(default)]
pub optional: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveredModule {
pub name: String,
pub source: String,
pub descriptor: ModuleDescriptor,
}
#[async_trait]
pub trait Discoverer: Send + Sync {
async fn discover(&self, roots: &[String]) -> Result<Vec<DiscoveredModule>, ModuleError>;
}
pub trait ModuleValidator: Send + Sync {
fn validate(
&self,
module: &dyn Module,
descriptor: Option<&ModuleDescriptor>,
) -> ValidationResult;
}
pub type ModuleCallbackFn = dyn Fn(&str, &dyn Module) + Send + Sync;
type CallbackMap = HashMap<String, Vec<(u64, Arc<ModuleCallbackFn>)>>;
pub const RESERVED_WORDS: &[&str] = &[
"system", "internal", "core", "apcore", "plugin", "schema", "acl",
];
pub const MAX_MODULE_ID_LENGTH: usize = 192;
pub mod registry_events {
pub const REGISTER: &str = "register";
pub const UNREGISTER: &str = "unregister";
}
pub struct RegistryEvents;
impl RegistryEvents {
pub const REGISTER: &'static str = registry_events::REGISTER;
pub const UNREGISTER: &'static str = registry_events::UNREGISTER;
}
pub const REGISTRY_EVENTS: RegistryEvents = RegistryEvents;
pub const MODULE_ID_PATTERN: &str = r"^[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)*$";
pub fn module_id_pattern() -> &'static Regex {
static PATTERN: OnceLock<Regex> = OnceLock::new();
PATTERN.get_or_init(|| Regex::new(MODULE_ID_PATTERN).unwrap())
}
fn validate_module_id(name: &str, allow_reserved: bool) -> Result<(), ModuleError> {
if name.is_empty() {
return Err(ModuleError::new(
crate::errors::ErrorCode::GeneralInvalidInput,
"module_id must be a non-empty string".to_string(),
));
}
if !module_id_pattern().is_match(name) {
return Err(ModuleError::new(
crate::errors::ErrorCode::GeneralInvalidInput,
format!(
"Invalid module ID: '{name}'. Must match pattern: ^[a-z][a-z0-9_]*(\\.[a-z][a-z0-9_]*)*$ (lowercase, digits, underscores, dots only; no hyphens)"
),
));
}
if name.len() > MAX_MODULE_ID_LENGTH {
return Err(ModuleError::new(
crate::errors::ErrorCode::GeneralInvalidInput,
format!(
"Module ID exceeds maximum length of {}: {}",
MAX_MODULE_ID_LENGTH,
name.len()
),
));
}
if !allow_reserved {
for segment in name.split('.') {
if RESERVED_WORDS.contains(&segment) {
return Err(ModuleError::new(
crate::errors::ErrorCode::GeneralInvalidInput,
format!("Module ID contains reserved word: '{segment}'"),
));
}
}
}
Ok(())
}
struct RegistryCore {
modules: HashMap<String, Arc<dyn Module>>,
descriptors: HashMap<String, ModuleDescriptor>,
ref_counts: HashMap<String, usize>,
draining: HashSet<String>,
lowercase_map: HashMap<String, String>,
schema_cache: HashMap<String, serde_json::Value>,
}
impl RegistryCore {
fn new() -> Self {
Self {
modules: HashMap::new(),
descriptors: HashMap::new(),
ref_counts: HashMap::new(),
draining: HashSet::new(),
lowercase_map: HashMap::new(),
schema_cache: HashMap::new(),
}
}
}
pub struct Registry {
core: RwLock<RegistryCore>,
callbacks: RwLock<CallbackMap>,
callback_counter: AtomicU64,
drain_events: RwLock<HashMap<String, Arc<tokio::sync::Notify>>>,
discoverer: RwLock<Option<Box<dyn Discoverer>>>,
validator: RwLock<Option<Box<dyn ModuleValidator>>>,
}
impl std::fmt::Debug for Registry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let core = self.core.read();
f.debug_struct("Registry")
.field("modules", &core.modules.keys().collect::<Vec<_>>())
.field("descriptors", &core.descriptors)
.field("ref_counts", &core.ref_counts)
.field("draining", &core.draining)
.field(
"drain_events_keys",
&self.drain_events.read().keys().cloned().collect::<Vec<_>>(),
)
.field(
"callbacks_keys",
&self.callbacks.read().keys().cloned().collect::<Vec<_>>(),
)
.field("lowercase_map", &core.lowercase_map)
.field(
"schema_cache_keys",
&core.schema_cache.keys().collect::<Vec<_>>(),
)
.field(
"discoverer",
&self.discoverer.read().as_ref().map(|_| "<Discoverer>"),
)
.field(
"validator",
&self.validator.read().as_ref().map(|_| "<Validator>"),
)
.finish_non_exhaustive()
}
}
impl Registry {
pub fn new() -> Self {
Self {
core: RwLock::new(RegistryCore::new()),
callbacks: RwLock::new(HashMap::new()),
callback_counter: AtomicU64::new(1),
drain_events: RwLock::new(HashMap::new()),
discoverer: RwLock::new(None),
validator: RwLock::new(None),
}
}
fn snapshot_callbacks(&self, event: &str) -> Vec<Arc<ModuleCallbackFn>> {
self.callbacks
.read()
.get(event)
.map(|v| v.iter().map(|(_, cb)| cb.clone()).collect())
.unwrap_or_default()
}
pub fn register(
&self,
name: &str,
module: Box<dyn Module>,
descriptor: ModuleDescriptor,
) -> Result<(), ModuleError> {
validate_module_id(name, false)?;
if let Some(validator) = self.validator.read().as_ref() {
let result = validator.validate(module.as_ref(), Some(&descriptor));
if !result.valid {
return Err(ModuleError::new(
crate::errors::ErrorCode::ModuleLoadError,
format!(
"Module '{}' failed validation: {}",
name,
result.errors.join(", ")
),
));
}
}
module.on_load();
let module_arc: Arc<dyn Module> = module.into();
let module_clone = Arc::clone(&module_arc);
{
let mut core = self.core.write();
if core.modules.contains_key(name) {
return Err(ModuleError::new(
crate::errors::ErrorCode::GeneralInvalidInput,
format!("Module ID '{name}' is already registered"),
));
}
let schema = serde_json::json!({
"input": descriptor.input_schema,
"output": descriptor.output_schema,
});
core.schema_cache.insert(name.to_string(), schema);
core.lowercase_map
.insert(name.to_lowercase(), name.to_string());
core.modules.insert(name.to_string(), module_arc);
core.descriptors.insert(name.to_string(), descriptor);
}
for cb in self.snapshot_callbacks("register") {
cb(name, module_clone.as_ref());
}
Ok(())
}
pub fn register_module(&self, name: &str, module: Box<dyn Module>) -> Result<(), ModuleError> {
let descriptor = ModuleDescriptor {
name: name.to_string(),
annotations: ModuleAnnotations::default(),
input_schema: module.input_schema(),
output_schema: module.output_schema(),
enabled: true,
tags: vec![],
dependencies: vec![],
};
self.register(name, module, descriptor)
}
pub fn unregister(&self, name: &str) -> Result<(), ModuleError> {
let removed: Arc<dyn Module> = {
let core = self.core.read();
match core.modules.get(name) {
Some(m) => Arc::clone(m),
None => {
return Err(ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' not found"),
));
}
}
};
for cb in self.snapshot_callbacks("unregister") {
cb(name, removed.as_ref());
}
removed.on_unload();
{
let mut core = self.core.write();
core.modules.remove(name);
core.descriptors.remove(name);
core.lowercase_map.remove(&name.to_lowercase());
core.schema_cache.remove(name);
core.ref_counts.remove(name);
core.draining.remove(name);
}
self.drain_events.write().remove(name);
Ok(())
}
pub fn get(&self, name: &str) -> Option<Arc<dyn Module>> {
self.core.read().modules.get(name).cloned()
}
pub fn get_definition(&self, name: &str) -> Option<ModuleDescriptor> {
self.core.read().descriptors.get(name).cloned()
}
pub fn list(&self, tags: Option<&[&str]>, prefix: Option<&str>) -> Vec<String> {
let core = self.core.read();
core.modules
.keys()
.filter(|name| {
if let Some(pfx) = prefix {
if !name.starts_with(pfx) {
return false;
}
}
if let Some(required_tags) = tags {
if let Some(desc) = core.descriptors.get(name.as_str()) {
let module_tags = &desc.tags;
if !required_tags
.iter()
.all(|t| module_tags.contains(&t.to_string()))
{
return false;
}
} else {
return false;
}
}
true
})
.cloned()
.collect()
}
pub fn has(&self, name: &str) -> bool {
self.core.read().modules.contains_key(name)
}
#[allow(clippy::similar_names)] pub async fn discover(&self, discoverer: &dyn Discoverer) -> Result<usize, ModuleError> {
let discovered = discoverer.discover(&[]).await?;
let count = discovered.len();
{
let mut core = self.core.write();
for dm in discovered {
core.descriptors.insert(dm.name.clone(), dm.descriptor);
core.lowercase_map
.insert(dm.name.to_lowercase(), dm.name.clone());
}
}
Ok(count)
}
pub fn register_internal(
&self,
name: &str,
module: Box<dyn Module>,
descriptor: ModuleDescriptor,
) -> Result<(), ModuleError> {
validate_module_id(name, true)?;
module.on_load();
let module_arc: Arc<dyn Module> = module.into();
let module_clone = Arc::clone(&module_arc);
{
let mut core = self.core.write();
if core.modules.contains_key(name) {
return Err(ModuleError::new(
crate::errors::ErrorCode::GeneralInvalidInput,
format!("Module ID '{name}' is already registered"),
));
}
let schema = serde_json::json!({
"input": descriptor.input_schema,
"output": descriptor.output_schema,
});
core.schema_cache.insert(name.to_string(), schema);
core.lowercase_map
.insert(name.to_lowercase(), name.to_string());
core.modules.insert(name.to_string(), module_arc);
core.descriptors.insert(name.to_string(), descriptor);
}
for cb in self.snapshot_callbacks("register") {
cb(name, module_clone.as_ref());
}
Ok(())
}
pub fn for_each_module(&self, mut f: impl FnMut(&str, &dyn Module)) {
let core = self.core.read();
for (name, module) in &core.modules {
f(name.as_str(), module.as_ref());
}
}
pub fn describe(&self, name: &str) -> String {
match self.core.read().modules.get(name) {
Some(module) => module.description().to_string(),
None => "Module not found".to_string(),
}
}
pub async fn safe_unregister(&self, name: &str, timeout_ms: u64) -> Result<bool, ModuleError> {
let need_wait_notify: Option<Arc<tokio::sync::Notify>> = {
let mut core = self.core.write();
if !core.modules.contains_key(name) {
return Err(ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' not found"),
));
}
core.draining.insert(name.to_string());
let current_refs = core.ref_counts.get(name).copied().unwrap_or(0);
if current_refs == 0 {
None
} else {
let notify = Arc::new(tokio::sync::Notify::new());
self.drain_events
.write()
.insert(name.to_string(), Arc::clone(¬ify));
Some(notify)
}
};
match need_wait_notify {
None => {
self.unregister(name)?;
Ok(true)
}
Some(notify) => {
let result = tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms),
notify.notified(),
)
.await;
if let Ok(()) = result {
self.unregister(name)?;
Ok(true)
} else {
self.core.write().draining.remove(name);
self.drain_events.write().remove(name);
Ok(false)
}
}
}
}
pub fn acquire_ref(&self, name: &str) -> Result<Arc<dyn Module>, ModuleError> {
let mut core = self.core.write();
if core.draining.contains(name) {
return Err(ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' is draining"),
));
}
let module = core.modules.get(name).cloned().ok_or_else(|| {
ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' not found"),
)
})?;
*core.ref_counts.entry(name.to_string()).or_insert(0) += 1;
Ok(module)
}
pub fn release_ref(&self, name: &str) {
let should_notify = {
let mut core = self.core.write();
if let Some(count) = core.ref_counts.get_mut(name) {
if *count > 0 {
*count -= 1;
}
if *count == 0 {
core.ref_counts.remove(name);
true
} else {
false
}
} else {
false
}
};
if should_notify {
if let Some(notify) = self.drain_events.read().get(name) {
notify.notify_one();
}
}
}
pub fn acquire(&self, name: &str) -> Result<Arc<dyn Module>, ModuleError> {
let core = self.core.read();
if core.draining.contains(name) {
return Err(ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' is draining"),
));
}
core.modules.get(name).cloned().ok_or_else(|| {
ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' not found"),
)
})
}
pub fn is_draining(&self, name: &str) -> bool {
self.core.read().draining.contains(name)
}
pub fn on(&self, event: &str, callback: Box<ModuleCallbackFn>) -> u64 {
let id = self.callback_counter.fetch_add(1, Ordering::Relaxed);
self.callbacks
.write()
.entry(event.to_string())
.or_default()
.push((id, Arc::from(callback)));
id
}
pub fn off(&self, handle_id: u64) -> bool {
let mut callbacks = self.callbacks.write();
for entries in callbacks.values_mut() {
if let Some(pos) = entries.iter().position(|(id, _)| *id == handle_id) {
entries.remove(pos);
return true;
}
}
false
}
pub async fn reload(&self) -> Result<usize, ModuleError> {
self.discover_internal().await
}
#[allow(clippy::unused_async)] pub async fn watch(&self) -> Result<(), ModuleError> {
Ok(())
}
pub fn unwatch(&self) {
}
pub async fn discover_internal(&self) -> Result<usize, ModuleError> {
let discoverer_opt = self.discoverer.write().take();
let Some(active_discoverer) = discoverer_opt else {
return Err(ModuleError::new(
crate::errors::ErrorCode::ModuleLoadError,
"No discoverer configured".to_string(),
));
};
let discover_result = active_discoverer.discover(&[] as &[String]).await;
{
let mut slot = self.discoverer.write();
if slot.is_none() {
*slot = Some(active_discoverer);
}
}
let discovered = discover_result?;
let count = discovered.len();
{
let mut core = self.core.write();
for dm in discovered {
core.descriptors.insert(dm.name.clone(), dm.descriptor);
core.lowercase_map
.insert(dm.name.to_lowercase(), dm.name.clone());
}
}
Ok(count)
}
pub fn set_discoverer(&self, discoverer: Box<dyn Discoverer>) {
*self.discoverer.write() = Some(discoverer);
}
pub fn set_validator(&self, validator: Box<dyn ModuleValidator>) {
*self.validator.write() = Some(validator);
}
pub fn count(&self) -> usize {
self.core.read().modules.len()
}
pub fn module_ids(&self) -> Vec<String> {
let mut ids: Vec<String> = self.core.read().modules.keys().cloned().collect();
ids.sort();
ids
}
pub fn entries(&self) -> Vec<(String, Arc<dyn Module>)> {
self.core
.read()
.modules
.iter()
.map(|(k, v)| (k.clone(), Arc::clone(v)))
.collect()
}
pub fn export_schema(&self, name: &str) -> Option<serde_json::Value> {
self.core.read().schema_cache.get(name).cloned()
}
pub fn disable(&self, name: &str) -> Result<(), ModuleError> {
let mut core = self.core.write();
let descriptor = core.descriptors.get_mut(name).ok_or_else(|| {
ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' not found"),
)
})?;
descriptor.enabled = false;
Ok(())
}
pub fn enable(&self, name: &str) -> Result<(), ModuleError> {
let mut core = self.core.write();
let descriptor = core.descriptors.get_mut(name).ok_or_else(|| {
ModuleError::new(
crate::errors::ErrorCode::ModuleNotFound,
format!("Module '{name}' not found"),
)
})?;
descriptor.enabled = true;
Ok(())
}
pub fn is_enabled(&self, name: &str) -> Option<bool> {
self.core.read().descriptors.get(name).map(|d| d.enabled)
}
}
impl Default for Registry {
fn default() -> Self {
Self::new()
}
}