use crate::agent::SubAgentMap;
use crate::error::Result;
use echo_core::agent::Agent;
use futures::future::BoxFuture;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::{Mutex as AsyncMutex, Notify, RwLock};
use tracing::{debug, info, warn};
use super::events::SubagentEventBus;
use super::types::{RegisteredSubagent, SubagentDefinition};
type AgentMap = Arc<RwLock<HashMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>;
pub trait AgentFactory: Send + Sync {
fn create(&self) -> BoxFuture<'static, Result<Box<dyn Agent>>>;
}
pub struct FnAgentFactory<F>
where
F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
{
f: F,
}
impl<F> FnAgentFactory<F>
where
F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
{
pub fn new(f: F) -> Self {
Self { f }
}
}
impl<F> AgentFactory for FnAgentFactory<F>
where
F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
{
fn create(&self) -> BoxFuture<'static, Result<Box<dyn Agent>>> {
(self.f)()
}
}
pub struct SubagentRegistry {
agents: AgentMap,
definitions: Arc<RwLock<HashMap<String, SubagentDefinition>>>,
factories: Arc<RwLock<HashMap<String, Arc<dyn AgentFactory>>>>,
instantiating: Arc<RwLock<HashSet<String>>>,
instantiating_done: Arc<Notify>,
event_bus: SubagentEventBus,
}
impl SubagentRegistry {
pub fn new() -> Self {
Self {
agents: AgentMap::new(RwLock::new(HashMap::new())),
definitions: Arc::new(RwLock::new(HashMap::new())),
factories: Arc::new(RwLock::new(HashMap::new())),
instantiating: Arc::new(RwLock::new(HashSet::new())),
instantiating_done: Arc::new(Notify::new()),
event_bus: SubagentEventBus::new(),
}
}
pub fn with_event_bus(event_bus: SubagentEventBus) -> Self {
Self {
agents: AgentMap::new(RwLock::new(HashMap::new())),
definitions: Arc::new(RwLock::new(HashMap::new())),
factories: Arc::new(RwLock::new(HashMap::new())),
instantiating: Arc::new(RwLock::new(HashSet::new())),
instantiating_done: Arc::new(Notify::new()),
event_bus,
}
}
pub fn from_subagent_map(map: SubAgentMap) -> Self {
let registry = Self::new();
if let Ok(agents) = map.read() {
for (name, agent) in agents.iter() {
let def = SubagentDefinition::simple_sync(name.clone());
let agents_map = registry.agents.clone();
let definitions_map = registry.definitions.clone();
if let Ok(mut a) = agents_map.try_write() {
a.insert(name.clone(), agent.clone());
}
if let Ok(mut d) = definitions_map.try_write() {
d.insert(name.clone(), def);
}
}
}
registry
}
pub async fn register(&self, def: SubagentDefinition, agent: Box<dyn Agent>) {
let name = def.name.clone();
info!(subagent = %name, mode = %def.execution_mode, "Registering subagent");
let arc_agent = Arc::new(AsyncMutex::new(agent));
{
let mut agents = self.agents.write().await;
agents.insert(name.clone(), arc_agent);
}
{
let mut defs = self.definitions.write().await;
defs.insert(name.clone(), def);
}
self.event_bus
.emit(super::events::SubagentEvent::Registered { name: name.clone() });
}
pub fn register_sync(&self, def: SubagentDefinition, agent: Box<dyn Agent>) -> bool {
let name = def.name.clone();
let arc_agent = Arc::new(AsyncMutex::new(agent));
let ok = match self.agents.try_write() {
Ok(mut agents) => {
agents.insert(name.clone(), arc_agent);
true
}
Err(_) => {
warn!(subagent = %name, "Lock contention on agents map, registration deferred");
false
}
};
if ok {
if let Ok(mut defs) = self.definitions.try_write() {
defs.insert(name.clone(), def);
} else {
warn!(subagent = %name, "Lock contention on definitions map");
}
self.event_bus
.emit(super::events::SubagentEvent::Registered { name });
}
ok
}
pub async fn register_factory(&self, def: SubagentDefinition, factory: Arc<dyn AgentFactory>) {
let name = def.name.clone();
debug!(subagent = %name, "Registering subagent factory");
{
let mut defs = self.definitions.write().await;
defs.insert(name.clone(), def);
}
{
let mut facts = self.factories.write().await;
facts.insert(name.clone(), factory);
}
}
pub async fn remove(&self, name: &str) {
{
let mut agents = self.agents.write().await;
agents.remove(name);
}
{
let mut defs = self.definitions.write().await;
defs.remove(name);
}
{
let mut facts = self.factories.write().await;
facts.remove(name);
}
self.event_bus
.emit(super::events::SubagentEvent::Unregistered {
name: name.to_string(),
});
}
pub async fn get(&self, name: &str) -> Option<RegisteredSubagent> {
let agents = self.agents.read().await;
let defs = self.definitions.read().await;
let definition = defs.get(name).cloned()?;
let has_instance = agents.contains_key(name);
Some(RegisteredSubagent {
definition,
has_instance,
})
}
pub async fn get_agent(&self, name: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
use std::time::Duration;
{
let agents = self.agents.read().await;
if let Some(agent) = agents.get(name) {
return Some(agent.clone());
}
}
let factory_arc = {
let factories = self.factories.read().await;
factories.get(name).cloned()
};
if let Some(factory) = factory_arc {
{
let mut in_progress = self.instantiating.write().await;
if in_progress.contains(name) {
debug!(subagent = %name, "Factory instantiation already in progress, waiting");
drop(in_progress);
let timeout = Duration::from_secs(30);
let start = std::time::Instant::now();
loop {
tokio::time::sleep(Duration::from_millis(50)).await;
{
let agents = self.agents.read().await;
if let Some(agent) = agents.get(name) {
return Some(agent.clone());
}
}
{
let in_progress = self.instantiating.read().await;
if !in_progress.contains(name) {
let agents = self.agents.read().await;
return agents.get(name).cloned();
}
}
if start.elapsed() > timeout {
warn!(subagent = %name, "Timeout waiting for agent instantiation");
return None;
}
}
}
in_progress.insert(name.to_string());
}
info!(subagent = %name, "Instantiating agent from factory");
let result = factory.create().await;
{
let mut in_progress = self.instantiating.write().await;
in_progress.remove(name);
}
self.instantiating_done.notify_waiters();
match result {
Ok(agent) => {
let arc_agent = Arc::new(AsyncMutex::new(agent));
let mut agents = self.agents.write().await;
agents.insert(name.to_string(), arc_agent.clone());
drop(agents);
let mut facts = self.factories.write().await;
facts.remove(name);
return Some(arc_agent);
}
Err(e) => {
warn!(subagent = %name, error = %e, "Factory instantiation failed");
return None;
}
}
}
None
}
pub async fn contains(&self, name: &str) -> bool {
let defs = self.definitions.read().await;
defs.contains_key(name)
}
pub async fn list_available(&self) -> Vec<SubagentDefinition> {
let defs = self.definitions.read().await;
defs.values().cloned().collect()
}
pub async fn list_by_tag(&self, tag: &str) -> Vec<SubagentDefinition> {
let defs = self.definitions.read().await;
defs.values()
.filter(|d| d.tags.iter().any(|t| t == tag))
.cloned()
.collect()
}
pub async fn agent_names(&self) -> Vec<String> {
let defs = self.definitions.read().await;
defs.keys().cloned().collect()
}
pub fn event_bus(&self) -> &SubagentEventBus {
&self.event_bus
}
pub fn agents_map(&self) -> AgentMap {
self.agents.clone()
}
}
impl Default for SubagentRegistry {
fn default() -> Self {
Self::new()
}
}
impl Clone for SubagentRegistry {
fn clone(&self) -> Self {
Self {
agents: self.agents.clone(),
definitions: self.definitions.clone(),
factories: self.factories.clone(),
instantiating: self.instantiating.clone(),
instantiating_done: self.instantiating_done.clone(),
event_bus: self.event_bus.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::MockAgent;
#[tokio::test]
async fn test_register_and_get() {
let registry = SubagentRegistry::new();
let agent = MockAgent::new("researcher");
let def = SubagentDefinition::new("researcher", "Researches topics");
registry.register(def, Box::new(agent)).await;
assert!(registry.contains("researcher").await);
let registered = registry.get("researcher").await.unwrap();
assert_eq!(registered.definition.name, "researcher");
assert!(registered.has_instance);
}
#[tokio::test]
async fn test_remove() {
let registry = SubagentRegistry::new();
let agent = MockAgent::new("worker");
let def = SubagentDefinition::new("worker", "Worker agent");
registry.register(def, Box::new(agent)).await;
assert!(registry.contains("worker").await);
registry.remove("worker").await;
assert!(!registry.contains("worker").await);
}
#[tokio::test]
async fn test_list_available() {
let registry = SubagentRegistry::new();
let a1 = MockAgent::new("a1");
let a2 = MockAgent::new("a2");
registry
.register(SubagentDefinition::new("a1", "Agent 1"), Box::new(a1))
.await;
registry
.register(SubagentDefinition::new("a2", "Agent 2"), Box::new(a2))
.await;
let available = registry.list_available().await;
assert_eq!(available.len(), 2);
}
#[tokio::test]
async fn test_list_by_tag() {
let registry = SubagentRegistry::new();
let mut def1 = SubagentDefinition::new("researcher", "Research");
def1.tags.push("research".into());
let mut def2 = SubagentDefinition::new("writer", "Write");
def2.tags.push("writing".into());
registry
.register(def1, Box::new(MockAgent::new("researcher")))
.await;
registry
.register(def2, Box::new(MockAgent::new("writer")))
.await;
let research = registry.list_by_tag("research").await;
assert_eq!(research.len(), 1);
assert_eq!(research[0].name, "researcher");
}
#[tokio::test]
async fn test_get_agent() {
let registry = SubagentRegistry::new();
let agent = MockAgent::new("a");
registry
.register(SubagentDefinition::new("a", "A"), Box::new(agent))
.await;
let handle = registry.get_agent("a").await;
assert!(handle.is_some());
}
#[tokio::test]
async fn test_agent_names() {
let registry = SubagentRegistry::new();
assert!(registry.agent_names().await.is_empty());
registry
.register(
SubagentDefinition::new("x", "X"),
Box::new(MockAgent::new("x")),
)
.await;
let names = registry.agent_names().await;
assert_eq!(names, vec!["x"]);
}
#[tokio::test]
async fn test_factory_instantiation() {
let registry = SubagentRegistry::new();
let factory = Arc::new(FnAgentFactory::new(|| {
Box::pin(async {
Ok(
Box::new(MockAgent::new("lazy_agent").with_response("lazy result"))
as Box<dyn Agent>,
)
})
}));
let def = SubagentDefinition::new("lazy_agent", "Lazy agent");
registry.register_factory(def, factory).await;
let registered = registry.get("lazy_agent").await.unwrap();
assert_eq!(registered.definition.name, "lazy_agent");
assert!(!registered.has_instance);
let handle = registry.get_agent("lazy_agent").await;
assert!(handle.is_some());
let registered = registry.get("lazy_agent").await.unwrap();
assert!(registered.has_instance);
let agent = handle.unwrap();
let agent = agent.lock().await;
let result = agent.execute("test").await.unwrap();
assert_eq!(result, "lazy result");
}
#[tokio::test]
async fn test_from_subagent_map() {
use crate::agent::SubAgentMap;
let map: SubAgentMap = Arc::new(std::sync::RwLock::new(HashMap::new()));
{
let mut m = map.write().unwrap();
m.insert(
"migrated".to_string(),
Arc::new(AsyncMutex::new(
Box::new(MockAgent::new("migrated")) as Box<dyn Agent>
)),
);
}
let registry = SubagentRegistry::from_subagent_map(map);
assert!(registry.contains("migrated").await);
let handle = registry.get_agent("migrated").await;
assert!(handle.is_some());
}
}