echo_agent/agent/subagent/
registry.rs1use crate::agent::SubAgentMap;
7use crate::error::Result;
8use echo_core::agent::Agent;
9use futures::future::BoxFuture;
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::{Mutex as AsyncMutex, Notify, RwLock};
13use tracing::{debug, info, warn};
14
15use super::events::SubagentEventBus;
16use super::types::{RegisteredSubagent, SubagentDefinition};
17
18type AgentMap = Arc<RwLock<HashMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>;
19
20pub trait AgentFactory: Send + Sync {
27 fn create(&self) -> BoxFuture<'static, Result<Box<dyn Agent>>>;
32}
33
34pub struct FnAgentFactory<F>
36where
37 F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
38{
39 f: F,
40}
41
42impl<F> FnAgentFactory<F>
43where
44 F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
45{
46 pub fn new(f: F) -> Self {
51 Self { f }
52 }
53}
54
55impl<F> AgentFactory for FnAgentFactory<F>
56where
57 F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
58{
59 fn create(&self) -> BoxFuture<'static, Result<Box<dyn Agent>>> {
60 (self.f)()
61 }
62}
63
64pub struct SubagentRegistry {
73 agents: AgentMap,
75 definitions: Arc<RwLock<HashMap<String, SubagentDefinition>>>,
77 factories: Arc<RwLock<HashMap<String, Arc<dyn AgentFactory>>>>,
79 instantiating: Arc<RwLock<HashSet<String>>>,
81 instantiating_done: Arc<Notify>,
83 event_bus: SubagentEventBus,
85}
86
87impl SubagentRegistry {
88 pub fn new() -> Self {
90 Self {
91 agents: AgentMap::new(RwLock::new(HashMap::new())),
92 definitions: Arc::new(RwLock::new(HashMap::new())),
93 factories: Arc::new(RwLock::new(HashMap::new())),
94 instantiating: Arc::new(RwLock::new(HashSet::new())),
95 instantiating_done: Arc::new(Notify::new()),
96 event_bus: SubagentEventBus::new(),
97 }
98 }
99
100 pub fn with_event_bus(event_bus: SubagentEventBus) -> Self {
102 Self {
103 agents: AgentMap::new(RwLock::new(HashMap::new())),
104 definitions: Arc::new(RwLock::new(HashMap::new())),
105 factories: Arc::new(RwLock::new(HashMap::new())),
106 instantiating: Arc::new(RwLock::new(HashSet::new())),
107 instantiating_done: Arc::new(Notify::new()),
108 event_bus,
109 }
110 }
111
112 pub fn from_subagent_map(map: SubAgentMap) -> Self {
116 let registry = Self::new();
117 if let Ok(agents) = map.read() {
118 for (name, agent) in agents.iter() {
119 let def = SubagentDefinition::simple_sync(name.clone());
120 let agents_map = registry.agents.clone();
124 let definitions_map = registry.definitions.clone();
125 if let Ok(mut a) = agents_map.try_write() {
126 a.insert(name.clone(), agent.clone());
127 }
128 if let Ok(mut d) = definitions_map.try_write() {
129 d.insert(name.clone(), def);
130 }
131 }
132 }
133 registry
134 }
135
136 pub async fn register(&self, def: SubagentDefinition, agent: Box<dyn Agent>) {
140 let name = def.name.clone();
141 info!(subagent = %name, mode = %def.execution_mode, "Registering subagent");
142
143 let arc_agent = Arc::new(AsyncMutex::new(agent));
144 {
145 let mut agents = self.agents.write().await;
146 agents.insert(name.clone(), arc_agent);
147 }
148 {
149 let mut defs = self.definitions.write().await;
150 defs.insert(name.clone(), def);
151 }
152
153 self.event_bus
154 .emit(super::events::SubagentEvent::Registered { name: name.clone() });
155 }
156
157 pub fn register_sync(&self, def: SubagentDefinition, agent: Box<dyn Agent>) -> bool {
162 let name = def.name.clone();
163 let arc_agent = Arc::new(AsyncMutex::new(agent));
164
165 let ok = match self.agents.try_write() {
166 Ok(mut agents) => {
167 agents.insert(name.clone(), arc_agent);
168 true
169 }
170 Err(_) => {
171 warn!(subagent = %name, "Lock contention on agents map, registration deferred");
172 false
173 }
174 };
175
176 if ok {
177 if let Ok(mut defs) = self.definitions.try_write() {
178 defs.insert(name.clone(), def);
179 } else {
180 warn!(subagent = %name, "Lock contention on definitions map");
181 }
182 self.event_bus
183 .emit(super::events::SubagentEvent::Registered { name });
184 }
185
186 ok
187 }
188
189 pub async fn register_factory(&self, def: SubagentDefinition, factory: Arc<dyn AgentFactory>) {
191 let name = def.name.clone();
192 debug!(subagent = %name, "Registering subagent factory");
193
194 {
195 let mut defs = self.definitions.write().await;
196 defs.insert(name.clone(), def);
197 }
198 {
199 let mut facts = self.factories.write().await;
200 facts.insert(name.clone(), factory);
201 }
202 }
203
204 pub async fn remove(&self, name: &str) {
206 {
207 let mut agents = self.agents.write().await;
208 agents.remove(name);
209 }
210 {
211 let mut defs = self.definitions.write().await;
212 defs.remove(name);
213 }
214 {
215 let mut facts = self.factories.write().await;
216 facts.remove(name);
217 }
218
219 self.event_bus
220 .emit(super::events::SubagentEvent::Unregistered {
221 name: name.to_string(),
222 });
223 }
224
225 pub async fn get(&self, name: &str) -> Option<RegisteredSubagent> {
229 let agents = self.agents.read().await;
230 let defs = self.definitions.read().await;
231
232 let definition = defs.get(name).cloned()?;
233 let has_instance = agents.contains_key(name);
234
235 Some(RegisteredSubagent {
236 definition,
237 has_instance,
238 })
239 }
240
241 pub async fn get_agent(&self, name: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
249 use std::time::Duration;
250
251 {
253 let agents = self.agents.read().await;
254 if let Some(agent) = agents.get(name) {
255 return Some(agent.clone());
256 }
257 }
258
259 let factory_arc = {
261 let factories = self.factories.read().await;
262 factories.get(name).cloned()
263 };
264
265 if let Some(factory) = factory_arc {
266 {
268 let mut in_progress = self.instantiating.write().await;
269 if in_progress.contains(name) {
270 debug!(subagent = %name, "Factory instantiation already in progress, waiting");
271 drop(in_progress);
272 let timeout = Duration::from_secs(30);
274 let start = std::time::Instant::now();
275 loop {
276 tokio::time::sleep(Duration::from_millis(50)).await;
277 {
279 let agents = self.agents.read().await;
280 if let Some(agent) = agents.get(name) {
281 return Some(agent.clone());
282 }
283 }
284 {
286 let in_progress = self.instantiating.read().await;
287 if !in_progress.contains(name) {
288 let agents = self.agents.read().await;
290 return agents.get(name).cloned();
291 }
292 }
293 if start.elapsed() > timeout {
294 warn!(subagent = %name, "Timeout waiting for agent instantiation");
295 return None;
296 }
297 }
298 }
299 in_progress.insert(name.to_string());
300 }
301
302 info!(subagent = %name, "Instantiating agent from factory");
303 let result = factory.create().await;
304
305 {
307 let mut in_progress = self.instantiating.write().await;
308 in_progress.remove(name);
309 }
310 self.instantiating_done.notify_waiters();
311
312 match result {
313 Ok(agent) => {
314 let arc_agent = Arc::new(AsyncMutex::new(agent));
315 let mut agents = self.agents.write().await;
316 agents.insert(name.to_string(), arc_agent.clone());
317 drop(agents);
319 let mut facts = self.factories.write().await;
320 facts.remove(name);
321 return Some(arc_agent);
322 }
323 Err(e) => {
324 warn!(subagent = %name, error = %e, "Factory instantiation failed");
325 return None;
326 }
327 }
328 }
329
330 None
331 }
332
333 pub async fn contains(&self, name: &str) -> bool {
335 let defs = self.definitions.read().await;
336 defs.contains_key(name)
337 }
338
339 pub async fn list_available(&self) -> Vec<SubagentDefinition> {
341 let defs = self.definitions.read().await;
342 defs.values().cloned().collect()
343 }
344
345 pub async fn list_by_tag(&self, tag: &str) -> Vec<SubagentDefinition> {
347 let defs = self.definitions.read().await;
348 defs.values()
349 .filter(|d| d.tags.iter().any(|t| t == tag))
350 .cloned()
351 .collect()
352 }
353
354 pub async fn agent_names(&self) -> Vec<String> {
356 let defs = self.definitions.read().await;
357 defs.keys().cloned().collect()
358 }
359
360 pub fn event_bus(&self) -> &SubagentEventBus {
362 &self.event_bus
363 }
364
365 pub fn agents_map(&self) -> AgentMap {
367 self.agents.clone()
368 }
369}
370
371impl Default for SubagentRegistry {
372 fn default() -> Self {
373 Self::new()
374 }
375}
376
377impl Clone for SubagentRegistry {
378 fn clone(&self) -> Self {
379 Self {
380 agents: self.agents.clone(),
381 definitions: self.definitions.clone(),
382 factories: self.factories.clone(),
383 instantiating: self.instantiating.clone(),
384 instantiating_done: self.instantiating_done.clone(),
385 event_bus: self.event_bus.clone(),
386 }
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use crate::testing::MockAgent;
394
395 #[tokio::test]
396 async fn test_register_and_get() {
397 let registry = SubagentRegistry::new();
398 let agent = MockAgent::new("researcher");
399 let def = SubagentDefinition::new("researcher", "Researches topics");
400
401 registry.register(def, Box::new(agent)).await;
402
403 assert!(registry.contains("researcher").await);
404 let registered = registry.get("researcher").await.unwrap();
405 assert_eq!(registered.definition.name, "researcher");
406 assert!(registered.has_instance);
407 }
408
409 #[tokio::test]
410 async fn test_remove() {
411 let registry = SubagentRegistry::new();
412 let agent = MockAgent::new("worker");
413 let def = SubagentDefinition::new("worker", "Worker agent");
414
415 registry.register(def, Box::new(agent)).await;
416 assert!(registry.contains("worker").await);
417
418 registry.remove("worker").await;
419 assert!(!registry.contains("worker").await);
420 }
421
422 #[tokio::test]
423 async fn test_list_available() {
424 let registry = SubagentRegistry::new();
425
426 let a1 = MockAgent::new("a1");
427 let a2 = MockAgent::new("a2");
428
429 registry
430 .register(SubagentDefinition::new("a1", "Agent 1"), Box::new(a1))
431 .await;
432 registry
433 .register(SubagentDefinition::new("a2", "Agent 2"), Box::new(a2))
434 .await;
435
436 let available = registry.list_available().await;
437 assert_eq!(available.len(), 2);
438 }
439
440 #[tokio::test]
441 async fn test_list_by_tag() {
442 let registry = SubagentRegistry::new();
443
444 let mut def1 = SubagentDefinition::new("researcher", "Research");
445 def1.tags.push("research".into());
446 let mut def2 = SubagentDefinition::new("writer", "Write");
447 def2.tags.push("writing".into());
448
449 registry
450 .register(def1, Box::new(MockAgent::new("researcher")))
451 .await;
452 registry
453 .register(def2, Box::new(MockAgent::new("writer")))
454 .await;
455
456 let research = registry.list_by_tag("research").await;
457 assert_eq!(research.len(), 1);
458 assert_eq!(research[0].name, "researcher");
459 }
460
461 #[tokio::test]
462 async fn test_get_agent() {
463 let registry = SubagentRegistry::new();
464 let agent = MockAgent::new("a");
465 registry
466 .register(SubagentDefinition::new("a", "A"), Box::new(agent))
467 .await;
468
469 let handle = registry.get_agent("a").await;
470 assert!(handle.is_some());
471 }
472
473 #[tokio::test]
474 async fn test_agent_names() {
475 let registry = SubagentRegistry::new();
476 assert!(registry.agent_names().await.is_empty());
477
478 registry
479 .register(
480 SubagentDefinition::new("x", "X"),
481 Box::new(MockAgent::new("x")),
482 )
483 .await;
484
485 let names = registry.agent_names().await;
486 assert_eq!(names, vec!["x"]);
487 }
488
489 #[tokio::test]
490 async fn test_factory_instantiation() {
491 let registry = SubagentRegistry::new();
492
493 let factory = Arc::new(FnAgentFactory::new(|| {
494 Box::pin(async {
495 Ok(
496 Box::new(MockAgent::new("lazy_agent").with_response("lazy result"))
497 as Box<dyn Agent>,
498 )
499 })
500 }));
501
502 let def = SubagentDefinition::new("lazy_agent", "Lazy agent");
503 registry.register_factory(def, factory).await;
504
505 let registered = registry.get("lazy_agent").await.unwrap();
507 assert_eq!(registered.definition.name, "lazy_agent");
508 assert!(!registered.has_instance);
509
510 let handle = registry.get_agent("lazy_agent").await;
512 assert!(handle.is_some());
513
514 let registered = registry.get("lazy_agent").await.unwrap();
516 assert!(registered.has_instance);
517
518 let agent = handle.unwrap();
520 let agent = agent.lock().await;
521 let result = agent.execute("test").await.unwrap();
522 assert_eq!(result, "lazy result");
523 }
524
525 #[tokio::test]
526 async fn test_from_subagent_map() {
527 use crate::agent::SubAgentMap;
528
529 let map: SubAgentMap = Arc::new(std::sync::RwLock::new(HashMap::new()));
530 {
531 let mut m = map.write().unwrap();
532 m.insert(
533 "migrated".to_string(),
534 Arc::new(AsyncMutex::new(
535 Box::new(MockAgent::new("migrated")) as Box<dyn Agent>
536 )),
537 );
538 }
539
540 let registry = SubagentRegistry::from_subagent_map(map);
541 assert!(registry.contains("migrated").await);
542
543 let handle = registry.get_agent("migrated").await;
544 assert!(handle.is_some());
545 }
546}