use super::factory::{
ActorFactory, DynASBActorFactory, JavaScriptActorFactory, PythonActorFactory,
ScriptActorFactory,
};
use super::types::*;
use crate::actor::Actor;
use crate::websocket_rpc::{DeploymentStatus, DynASBClient, DynASBConfig, DynASBFunction};
use anyhow::Result;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub enum ComponentType {
Native,
Wasm,
Script(ScriptRuntime),
DynASB,
}
pub struct ComponentRegistry {
native_actors: HashMap<String, Arc<dyn Actor>>,
script_actors: HashMap<String, Arc<dyn ActorFactory>>,
component_index: HashMap<String, ComponentType>,
dynasb_client: Option<Arc<tokio::sync::Mutex<DynASBClient>>>,
}
impl Default for ComponentRegistry {
fn default() -> Self {
Self::new()
}
}
impl ComponentRegistry {
pub fn new() -> Self {
Self {
native_actors: HashMap::new(),
script_actors: HashMap::new(),
component_index: HashMap::new(),
dynasb_client: None,
}
}
pub fn set_dynasb_config(&mut self, config: DynASBConfig) {
self.dynasb_client = Some(Arc::new(tokio::sync::Mutex::new(DynASBClient::new(config))));
}
pub fn register_dynasb_actor(
&mut self,
name: &str,
metadata: DiscoveredScriptActor,
) -> Result<()> {
let client = self.dynasb_client.clone().ok_or_else(|| {
anyhow::anyhow!("dynASB not configured. Call set_dynasb_config() first.")
})?;
let factory: Arc<dyn ActorFactory> = Arc::new(DynASBActorFactory::new(metadata, client));
self.script_actors.insert(name.to_string(), factory);
self.component_index
.insert(name.to_string(), ComponentType::DynASB);
info!("Registered dynASB script actor: {}", name);
Ok(())
}
pub fn register_script_actor(
&mut self,
name: &str,
metadata: DiscoveredScriptActor,
) -> Result<()> {
let runtime = metadata.runtime;
let factory: Arc<dyn ActorFactory> = match runtime {
ScriptRuntime::Python => Arc::new(PythonActorFactory::new(metadata)?),
ScriptRuntime::JavaScript => Arc::new(JavaScriptActorFactory::new(metadata)?),
};
self.script_actors.insert(name.to_string(), factory);
self.component_index
.insert(name.to_string(), ComponentType::Script(runtime));
info!("Registered script actor: {} ({:?})", name, runtime);
Ok(())
}
pub fn register_native_actor(&mut self, name: &str, actor: Arc<dyn Actor>) -> Result<()> {
self.native_actors.insert(name.to_string(), actor);
self.component_index
.insert(name.to_string(), ComponentType::Native);
info!("Registered native actor: {}", name);
Ok(())
}
pub async fn get_actor(&self, name: &str) -> Result<Arc<dyn Actor>> {
match self.component_index.get(name) {
Some(ComponentType::Native) => self
.native_actors
.get(name)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Native actor not found: {}", name)),
Some(ComponentType::Script(_)) | Some(ComponentType::DynASB) => {
let factory = self
.script_actors
.get(name)
.ok_or_else(|| anyhow::anyhow!("Script actor factory not found: {}", name))?;
let actor = factory.create_instance().await?;
Ok(Arc::from(actor))
}
Some(ComponentType::Wasm) => {
Err(anyhow::anyhow!("WASM actor support not yet implemented"))
}
None => Err(anyhow::anyhow!("Component not found: {}", name)),
}
}
pub fn has_component(&self, name: &str) -> bool {
self.component_index.contains_key(name)
}
pub fn get_component_type(&self, name: &str) -> Option<&ComponentType> {
self.component_index.get(name)
}
pub fn list_components(&self) -> Vec<ComponentInfo> {
self.component_index
.iter()
.map(|(name, comp_type)| ComponentInfo {
name: name.clone(),
component_type: comp_type.clone(),
})
.collect()
}
pub fn get_script_metadata(&self, name: &str) -> Option<&DiscoveredScriptActor> {
self.script_actors
.get(name)
.map(|factory| factory.get_metadata())
}
pub fn total_count(&self) -> usize {
self.component_index.len()
}
pub async fn get_deployment_status(&self, name: &str) -> Option<DynASBFunction> {
if !matches!(self.component_index.get(name), Some(ComponentType::DynASB)) {
return None;
}
let client = self.dynasb_client.as_ref()?;
let guard = client.lock().await;
guard
.deployed_functions()
.values()
.find(|f| f.name == name)
.cloned()
}
pub async fn check_deployment_health(&self, name: &str) -> Option<DeploymentStatus> {
if !matches!(self.component_index.get(name), Some(ComponentType::DynASB)) {
return None;
}
let client = self.dynasb_client.as_ref()?;
let mut guard = client.lock().await;
let function_id = guard
.deployed_functions()
.values()
.find(|f| f.name == name)
.map(|f| f.function_id.clone())?;
guard.health_check(&function_id).await.ok()
}
pub async fn get_deployment_metadata(
&self,
name: &str,
) -> Option<HashMap<String, serde_json::Value>> {
if !matches!(self.component_index.get(name), Some(ComponentType::DynASB)) {
return None;
}
let client = self.dynasb_client.as_ref()?;
let guard = client.lock().await;
let function_id = guard
.deployed_functions()
.values()
.find(|f| f.name == name)
.map(|f| f.function_id.clone())?;
Some(guard.deployment_metadata(&function_id))
}
pub fn count_by_type(&self) -> HashMap<String, usize> {
let mut counts = HashMap::new();
for comp_type in self.component_index.values() {
let key = match comp_type {
ComponentType::Native => "native",
ComponentType::Wasm => "wasm",
ComponentType::Script(ScriptRuntime::Python) => "python",
ComponentType::Script(ScriptRuntime::JavaScript) => "javascript",
ComponentType::DynASB => "dynasb",
};
*counts.entry(key.to_string()).or_insert(0) += 1;
}
counts
}
}
#[derive(Debug, Clone)]
pub struct ComponentInfo {
pub name: String,
pub component_type: ComponentType,
}
#[derive(Clone)]
pub struct RegisteredActor {
pub discovery_info: DiscoveredScriptActor,
pub factory: Arc<dyn ActorFactory>,
pub registration_time: chrono::DateTime<chrono::Utc>,
pub instantiation_count: Arc<AtomicUsize>,
}
pub struct ActorRegistry {
registered_actors: Arc<RwLock<HashMap<String, RegisteredActor>>>,
component_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
namespace_index: Arc<RwLock<HashMap<String, Vec<String>>>>,
}
impl Default for ActorRegistry {
fn default() -> Self {
Self::new()
}
}
impl ActorRegistry {
pub fn new() -> Self {
Self {
registered_actors: Arc::new(RwLock::new(HashMap::new())),
component_index: Arc::new(RwLock::new(HashMap::new())),
namespace_index: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn register(&mut self, actor: DiscoveredScriptActor) -> Result<()> {
let component_name = actor.component.clone();
let namespace = actor.workspace_metadata.namespace.clone();
let factory = Arc::new(ScriptActorFactory::new(actor.clone())?);
let registered = RegisteredActor {
discovery_info: actor,
factory,
registration_time: chrono::Utc::now(),
instantiation_count: Arc::new(AtomicUsize::new(0)),
};
self.registered_actors
.write()
.insert(component_name.clone(), registered);
self.component_index
.write()
.entry(component_name.clone())
.or_default()
.push(namespace.clone());
self.namespace_index
.write()
.entry(namespace)
.or_default()
.push(component_name.clone());
debug!("Registered actor: {}", component_name);
Ok(())
}
pub fn get(&self, name: &str) -> Option<RegisteredActor> {
self.registered_actors.read().get(name).cloned()
}
pub fn list_by_namespace(&self, namespace: &str) -> Vec<String> {
self.namespace_index
.read()
.get(namespace)
.cloned()
.unwrap_or_default()
}
pub async fn create_instance(&self, name: &str) -> Result<Arc<dyn Actor>> {
let registered = self
.get(name)
.ok_or_else(|| anyhow::anyhow!("Actor not registered: {}", name))?;
registered
.instantiation_count
.fetch_add(1, Ordering::Relaxed);
let actor = registered.factory.create_instance().await?;
Ok(Arc::from(actor))
}
}