use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use terraphim_rolegraph::RoleGraph;
use crate::{
AgentDiscoveryQuery, AgentDiscoveryResult, AgentMetadata, AgentPid, AutomataConfig,
KnowledgeGraphIntegration, RegistryError, RegistryResult, SimilarityThresholds, SupervisorId,
};
#[async_trait]
pub trait AgentRegistry: Send + Sync {
async fn register_agent(&self, metadata: AgentMetadata) -> RegistryResult<()>;
async fn unregister_agent(&self, agent_id: &AgentPid) -> RegistryResult<()>;
async fn update_agent(&self, metadata: AgentMetadata) -> RegistryResult<()>;
async fn get_agent(&self, agent_id: &AgentPid) -> RegistryResult<Option<AgentMetadata>>;
async fn list_agents(&self) -> RegistryResult<Vec<AgentMetadata>>;
async fn discover_agents(
&self,
query: AgentDiscoveryQuery,
) -> RegistryResult<AgentDiscoveryResult>;
async fn find_agents_by_role(&self, role_id: &str) -> RegistryResult<Vec<AgentMetadata>>;
async fn find_agents_by_capability(
&self,
capability_id: &str,
) -> RegistryResult<Vec<AgentMetadata>>;
async fn find_agents_by_supervisor(
&self,
supervisor_id: &SupervisorId,
) -> RegistryResult<Vec<AgentMetadata>>;
async fn get_statistics(&self) -> RegistryResult<RegistryStatistics>;
}
pub struct KnowledgeGraphAgentRegistry {
agents: Arc<RwLock<HashMap<AgentPid, AgentMetadata>>>,
kg_integration: Arc<KnowledgeGraphIntegration>,
config: RegistryConfig,
statistics: Arc<RwLock<RegistryStatistics>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryConfig {
pub max_agents: usize,
pub auto_cleanup: bool,
pub cleanup_interval_secs: u64,
pub enable_monitoring: bool,
pub discovery_cache_ttl_secs: u64,
}
impl Default for RegistryConfig {
fn default() -> Self {
Self {
max_agents: 10000,
auto_cleanup: true,
cleanup_interval_secs: 300, enable_monitoring: true,
discovery_cache_ttl_secs: 3600, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryStatistics {
pub total_agents: usize,
pub agents_by_status: HashMap<String, usize>,
pub agents_by_role: HashMap<String, usize>,
pub total_discovery_queries: u64,
pub avg_discovery_time_ms: f64,
pub discovery_cache_hit_rate: f64,
pub uptime_secs: u64,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
impl Default for RegistryStatistics {
fn default() -> Self {
Self {
total_agents: 0,
agents_by_status: HashMap::new(),
agents_by_role: HashMap::new(),
total_discovery_queries: 0,
avg_discovery_time_ms: 0.0,
discovery_cache_hit_rate: 0.0,
uptime_secs: 0,
last_updated: chrono::Utc::now(),
}
}
}
impl KnowledgeGraphAgentRegistry {
pub fn new(
role_graph: Arc<RoleGraph>,
config: RegistryConfig,
automata_config: AutomataConfig,
similarity_thresholds: SimilarityThresholds,
) -> Self {
let kg_integration = Arc::new(KnowledgeGraphIntegration::new(
role_graph,
automata_config,
similarity_thresholds,
));
Self {
agents: Arc::new(RwLock::new(HashMap::new())),
kg_integration,
config,
statistics: Arc::new(RwLock::new(RegistryStatistics::default())),
}
}
pub async fn start_background_tasks(&self) -> RegistryResult<()> {
if self.config.auto_cleanup {
self.start_cleanup_task().await?;
}
if self.config.enable_monitoring {
self.start_monitoring_task().await?;
}
Ok(())
}
async fn start_cleanup_task(&self) -> RegistryResult<()> {
let agents = self.agents.clone();
let statistics = self.statistics.clone();
let cleanup_interval = self.config.cleanup_interval_secs;
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(cleanup_interval));
loop {
interval.tick().await;
let mut agents_guard = agents.write().await;
let initial_count = agents_guard.len();
agents_guard
.retain(|_, agent| !matches!(agent.status, crate::AgentStatus::Terminated));
let cleaned_count = initial_count - agents_guard.len();
drop(agents_guard);
if cleaned_count > 0 {
log::info!("Cleaned up {} terminated agents", cleaned_count);
let mut stats = statistics.write().await;
stats.total_agents = stats.total_agents.saturating_sub(cleaned_count);
stats.last_updated = chrono::Utc::now();
}
}
});
Ok(())
}
async fn start_monitoring_task(&self) -> RegistryResult<()> {
let statistics = self.statistics.clone();
let kg_integration = self.kg_integration.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
let start_time = std::time::Instant::now();
loop {
interval.tick().await;
{
let mut stats = statistics.write().await;
stats.uptime_secs = start_time.elapsed().as_secs();
stats.last_updated = chrono::Utc::now();
}
kg_integration.cleanup_cache().await;
}
});
Ok(())
}
async fn update_statistics(&self) -> RegistryResult<()> {
let agents = self.agents.read().await;
let mut stats = self.statistics.write().await;
stats.total_agents = agents.len();
stats.agents_by_status.clear();
for agent in agents.values() {
let status_key = match &agent.status {
crate::AgentStatus::Initializing => "initializing",
crate::AgentStatus::Active => "active",
crate::AgentStatus::Busy => "busy",
crate::AgentStatus::Idle => "idle",
crate::AgentStatus::Hibernating => "hibernating",
crate::AgentStatus::Terminating => "terminating",
crate::AgentStatus::Terminated => "terminated",
crate::AgentStatus::Failed(_) => "failed",
};
*stats
.agents_by_status
.entry(status_key.to_string())
.or_insert(0) += 1;
}
stats.agents_by_role.clear();
for agent in agents.values() {
*stats
.agents_by_role
.entry(agent.primary_role.role_id.clone())
.or_insert(0) += 1;
}
stats.last_updated = chrono::Utc::now();
Ok(())
}
fn validate_agent_metadata(&self, metadata: &AgentMetadata) -> RegistryResult<()> {
metadata.validate()?;
Ok(())
}
}
#[async_trait]
impl AgentRegistry for KnowledgeGraphAgentRegistry {
async fn register_agent(&self, metadata: AgentMetadata) -> RegistryResult<()> {
self.validate_agent_metadata(&metadata)?;
{
let agents = self.agents.read().await;
if agents.len() >= self.config.max_agents {
return Err(RegistryError::System(format!(
"Registry capacity exceeded (max: {})",
self.config.max_agents
)));
}
}
let agent_id = metadata.agent_id.clone();
{
let mut agents = self.agents.write().await;
if agents.contains_key(&metadata.agent_id) {
return Err(RegistryError::AgentAlreadyExists(metadata.agent_id.clone()));
}
agents.insert(agent_id.clone(), metadata);
}
self.update_statistics().await?;
log::info!("Agent {} registered successfully", agent_id);
Ok(())
}
async fn unregister_agent(&self, agent_id: &AgentPid) -> RegistryResult<()> {
let removed = {
let mut agents = self.agents.write().await;
agents.remove(agent_id)
};
if removed.is_some() {
self.update_statistics().await?;
log::info!("Agent {} unregistered successfully", agent_id);
Ok(())
} else {
Err(RegistryError::AgentNotFound(agent_id.clone()))
}
}
async fn update_agent(&self, metadata: AgentMetadata) -> RegistryResult<()> {
self.validate_agent_metadata(&metadata)?;
{
let mut agents = self.agents.write().await;
if !agents.contains_key(&metadata.agent_id) {
return Err(RegistryError::AgentNotFound(metadata.agent_id.clone()));
}
agents.insert(metadata.agent_id.clone(), metadata);
}
self.update_statistics().await?;
Ok(())
}
async fn get_agent(&self, agent_id: &AgentPid) -> RegistryResult<Option<AgentMetadata>> {
let agents = self.agents.read().await;
Ok(agents.get(agent_id).cloned())
}
async fn list_agents(&self) -> RegistryResult<Vec<AgentMetadata>> {
let agents = self.agents.read().await;
Ok(agents.values().cloned().collect())
}
async fn discover_agents(
&self,
query: AgentDiscoveryQuery,
) -> RegistryResult<AgentDiscoveryResult> {
let start_time = std::time::Instant::now();
let available_agents = self.list_agents().await?;
let result = self
.kg_integration
.discover_agents(query, &available_agents)
.await?;
{
let mut stats = self.statistics.write().await;
stats.total_discovery_queries += 1;
let query_time_ms = start_time.elapsed().as_millis() as f64;
if stats.total_discovery_queries == 1 {
stats.avg_discovery_time_ms = query_time_ms;
} else {
let total_time =
stats.avg_discovery_time_ms * (stats.total_discovery_queries - 1) as f64;
stats.avg_discovery_time_ms =
(total_time + query_time_ms) / stats.total_discovery_queries as f64;
}
stats.last_updated = chrono::Utc::now();
}
Ok(result)
}
async fn find_agents_by_role(&self, role_id: &str) -> RegistryResult<Vec<AgentMetadata>> {
let agents = self.agents.read().await;
let matching_agents: Vec<AgentMetadata> = agents
.values()
.filter(|agent| agent.has_role(role_id))
.cloned()
.collect();
Ok(matching_agents)
}
async fn find_agents_by_capability(
&self,
capability_id: &str,
) -> RegistryResult<Vec<AgentMetadata>> {
let agents = self.agents.read().await;
let matching_agents: Vec<AgentMetadata> = agents
.values()
.filter(|agent| agent.has_capability(capability_id))
.cloned()
.collect();
Ok(matching_agents)
}
async fn find_agents_by_supervisor(
&self,
supervisor_id: &SupervisorId,
) -> RegistryResult<Vec<AgentMetadata>> {
let agents = self.agents.read().await;
let matching_agents: Vec<AgentMetadata> = agents
.values()
.filter(|agent| agent.supervisor_id == *supervisor_id)
.cloned()
.collect();
Ok(matching_agents)
}
async fn get_statistics(&self) -> RegistryResult<RegistryStatistics> {
self.update_statistics().await?;
let stats = self.statistics.read().await;
Ok(stats.clone())
}
}
pub struct RegistryBuilder {
role_graph: Option<Arc<RoleGraph>>,
config: RegistryConfig,
automata_config: AutomataConfig,
similarity_thresholds: SimilarityThresholds,
}
impl RegistryBuilder {
pub fn new() -> Self {
Self {
role_graph: None,
config: RegistryConfig::default(),
automata_config: AutomataConfig::default(),
similarity_thresholds: SimilarityThresholds::default(),
}
}
pub fn with_role_graph(mut self, role_graph: Arc<RoleGraph>) -> Self {
self.role_graph = Some(role_graph);
self
}
pub fn with_config(mut self, config: RegistryConfig) -> Self {
self.config = config;
self
}
pub fn with_automata_config(mut self, automata_config: AutomataConfig) -> Self {
self.automata_config = automata_config;
self
}
pub fn with_similarity_thresholds(
mut self,
similarity_thresholds: SimilarityThresholds,
) -> Self {
self.similarity_thresholds = similarity_thresholds;
self
}
pub fn build(self) -> RegistryResult<KnowledgeGraphAgentRegistry> {
let role_graph = self
.role_graph
.ok_or_else(|| RegistryError::System("Role graph is required".to_string()))?;
Ok(KnowledgeGraphAgentRegistry::new(
role_graph,
self.config,
self.automata_config,
self.similarity_thresholds,
))
}
}
impl Default for RegistryBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{AgentCapability, AgentRole, CapabilityMetrics};
use terraphim_types::{RoleName, Thesaurus};
#[tokio::test]
async fn test_registry_creation() {
let role_name = RoleName::new("test_role");
let thesaurus = Thesaurus::new("test_thesaurus".to_string());
let role_graph = Arc::new(RoleGraph::new(role_name, thesaurus).await.unwrap());
let config = RegistryConfig::default();
let automata_config = AutomataConfig::default();
let similarity_thresholds = SimilarityThresholds::default();
let registry = KnowledgeGraphAgentRegistry::new(
role_graph,
config,
automata_config,
similarity_thresholds,
);
let stats = registry.get_statistics().await.unwrap();
assert_eq!(stats.total_agents, 0);
}
#[tokio::test]
async fn test_agent_registration() {
let role_name = RoleName::new("test_role");
let thesaurus = Thesaurus::new("test_thesaurus".to_string());
let role_graph = Arc::new(RoleGraph::new(role_name, thesaurus).await.unwrap());
let registry = RegistryBuilder::new()
.with_role_graph(role_graph)
.build()
.unwrap();
let agent_id = AgentPid::new();
let supervisor_id = SupervisorId::new();
let role = AgentRole::new(
"test_role".to_string(),
"Test Role".to_string(),
"A test role".to_string(),
);
let metadata = AgentMetadata::new(agent_id.clone(), supervisor_id, role);
registry.register_agent(metadata.clone()).await.unwrap();
let retrieved = registry.get_agent(&agent_id).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().agent_id, agent_id);
let stats = registry.get_statistics().await.unwrap();
assert_eq!(stats.total_agents, 1);
}
#[tokio::test]
async fn test_agent_discovery() {
let role_name = RoleName::new("test_role");
let thesaurus = Thesaurus::new("test_thesaurus".to_string());
let role_graph = Arc::new(RoleGraph::new(role_name, thesaurus).await.unwrap());
let registry = RegistryBuilder::new()
.with_role_graph(role_graph)
.build()
.unwrap();
let agent_id = AgentPid::new();
let supervisor_id = SupervisorId::new();
let mut role = AgentRole::new(
"planner".to_string(),
"Planning Agent".to_string(),
"Responsible for task planning".to_string(),
);
role.knowledge_domains
.push("project_management".to_string());
let mut metadata = AgentMetadata::new(agent_id, supervisor_id, role);
let capability = AgentCapability {
capability_id: "task_planning".to_string(),
name: "Task Planning".to_string(),
description: "Plan and organize tasks".to_string(),
category: "planning".to_string(),
required_domains: vec!["project_management".to_string()],
input_types: vec!["requirements".to_string()],
output_types: vec!["plan".to_string()],
performance_metrics: CapabilityMetrics::default(),
dependencies: Vec::new(),
};
metadata.add_capability(capability).unwrap();
registry.register_agent(metadata).await.unwrap();
let query = AgentDiscoveryQuery {
required_roles: vec!["planner".to_string()],
required_capabilities: vec!["task_planning".to_string()],
required_domains: vec!["project_management".to_string()],
task_description: Some("Plan a software project".to_string()),
min_success_rate: None,
max_resource_usage: None,
preferred_tags: Vec::new(),
};
let result = registry.discover_agents(query).await.unwrap();
assert!(!result.matches.is_empty());
assert!(result.matches[0].match_score > 0.0);
}
#[tokio::test]
async fn test_registry_builder() {
let role_name = RoleName::new("test_role");
let thesaurus = Thesaurus::new("test_thesaurus".to_string());
let role_graph = Arc::new(RoleGraph::new(role_name, thesaurus).await.unwrap());
let config = RegistryConfig {
max_agents: 100,
auto_cleanup: false,
cleanup_interval_secs: 60,
enable_monitoring: false,
discovery_cache_ttl_secs: 1800,
};
let registry = RegistryBuilder::new()
.with_role_graph(role_graph)
.with_config(config.clone())
.build()
.unwrap();
assert_eq!(registry.config.max_agents, 100);
assert!(!registry.config.auto_cleanup);
}
}