Skip to main content

echo_agent/agent/subagent/
registry.rs

1//! Subagent registry — discovery, registration, and lifecycle management
2//!
3//! Wraps the existing `SubAgentMap` with declarative definitions, factory support,
4//! and lifecycle events. Backward compatible — `register_agent()` still works.
5
6use crate::agent::SubAgentMap;
7use crate::error::Result;
8use echo_core::agent::Agent;
9use futures::future::BoxFuture;
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::{Mutex as AsyncMutex, Notify, RwLock};
13use tracing::{debug, info, warn};
14
15use super::events::SubagentEventBus;
16use super::types::{RegisteredSubagent, SubagentDefinition};
17
18type AgentMap = Arc<RwLock<HashMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>;
19
20// ── Agent Factory ─────────────────────────────────────────────────────────────
21
22/// Factory trait for lazy agent instantiation.
23///
24/// Used when you want to register an agent definition but defer
25/// the actual agent construction until it's first dispatched.
26pub trait AgentFactory: Send + Sync {
27    /// Create an agent instance asynchronously.
28    ///
29    /// # Returns
30    /// A boxed future that resolves to a `Result<Box<dyn Agent>>`.
31    fn create(&self) -> BoxFuture<'static, Result<Box<dyn Agent>>>;
32}
33
34/// Type-erased closure wrapper for `AgentFactory`.
35pub struct FnAgentFactory<F>
36where
37    F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
38{
39    f: F,
40}
41
42impl<F> FnAgentFactory<F>
43where
44    F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
45{
46    /// Create a new function-based agent factory.
47    ///
48    /// # Parameters
49    /// * `f` - Async closure that creates an agent when invoked.
50    pub fn new(f: F) -> Self {
51        Self { f }
52    }
53}
54
55impl<F> AgentFactory for FnAgentFactory<F>
56where
57    F: Fn() -> BoxFuture<'static, Result<Box<dyn Agent>>> + Send + Sync,
58{
59    fn create(&self) -> BoxFuture<'static, Result<Box<dyn Agent>>> {
60        (self.f)()
61    }
62}
63
64// ── Subagent Registry ─────────────────────────────────────────────────────────
65
66/// Registry for subagent definitions and instances.
67///
68/// Wraps the existing `SubAgentMap` and adds:
69/// - Definition-based lookup
70/// - Factory support for lazy instantiation
71/// - Lifecycle events
72pub struct SubagentRegistry {
73    /// Agent instances (compatible with existing SubAgentMap).
74    agents: AgentMap,
75    /// Definitions for each registered agent.
76    definitions: Arc<RwLock<HashMap<String, SubagentDefinition>>>,
77    /// Factory functions for lazy instantiation.
78    factories: Arc<RwLock<HashMap<String, Arc<dyn AgentFactory>>>>,
79    /// Names currently being instantiated (prevents double-creation races).
80    instantiating: Arc<RwLock<HashSet<String>>>,
81    /// Notifier for waiters on factory instantiation completion.
82    instantiating_done: Arc<Notify>,
83    /// Event bus for lifecycle events.
84    event_bus: SubagentEventBus,
85}
86
87impl SubagentRegistry {
88    /// Create an empty registry.
89    pub fn new() -> Self {
90        Self {
91            agents: AgentMap::new(RwLock::new(HashMap::new())),
92            definitions: Arc::new(RwLock::new(HashMap::new())),
93            factories: Arc::new(RwLock::new(HashMap::new())),
94            instantiating: Arc::new(RwLock::new(HashSet::new())),
95            instantiating_done: Arc::new(Notify::new()),
96            event_bus: SubagentEventBus::new(),
97        }
98    }
99
100    /// Create with a specific event bus.
101    pub fn with_event_bus(event_bus: SubagentEventBus) -> Self {
102        Self {
103            agents: AgentMap::new(RwLock::new(HashMap::new())),
104            definitions: Arc::new(RwLock::new(HashMap::new())),
105            factories: Arc::new(RwLock::new(HashMap::new())),
106            instantiating: Arc::new(RwLock::new(HashSet::new())),
107            instantiating_done: Arc::new(Notify::new()),
108            event_bus,
109        }
110    }
111
112    /// Migrate from an existing `SubAgentMap` (backward compatible).
113    ///
114    /// Each agent gets a default Sync-mode `BuiltIn` definition.
115    pub fn from_subagent_map(map: SubAgentMap) -> Self {
116        let registry = Self::new();
117        if let Ok(agents) = map.read() {
118            for (name, agent) in agents.iter() {
119                let def = SubagentDefinition::simple_sync(name.clone());
120                // We can't do async writes here, so we use blocking inserts
121                // into the Arc<RwLock> maps. Since we just created the registry,
122                // there are no other references yet.
123                let agents_map = registry.agents.clone();
124                let definitions_map = registry.definitions.clone();
125                if let Ok(mut a) = agents_map.try_write() {
126                    a.insert(name.clone(), agent.clone());
127                }
128                if let Ok(mut d) = definitions_map.try_write() {
129                    d.insert(name.clone(), def);
130                }
131            }
132        }
133        registry
134    }
135
136    // ── Registration ──────────────────────────────────────────────────────
137
138    /// Register a pre-built agent with its definition.
139    pub async fn register(&self, def: SubagentDefinition, agent: Box<dyn Agent>) {
140        let name = def.name.clone();
141        info!(subagent = %name, mode = %def.execution_mode, "Registering subagent");
142
143        let arc_agent = Arc::new(AsyncMutex::new(agent));
144        {
145            let mut agents = self.agents.write().await;
146            agents.insert(name.clone(), arc_agent);
147        }
148        {
149            let mut defs = self.definitions.write().await;
150            defs.insert(name.clone(), def);
151        }
152
153        self.event_bus
154            .emit(super::events::SubagentEvent::Registered { name: name.clone() });
155    }
156
157    /// Sync registration — uses `try_write` to avoid `block_on` deadlock.
158    ///
159    /// Use this from synchronous contexts (e.g., builder pattern, `main()`).
160    /// Falls back to logging a warning if locks are contended.
161    pub fn register_sync(&self, def: SubagentDefinition, agent: Box<dyn Agent>) -> bool {
162        let name = def.name.clone();
163        let arc_agent = Arc::new(AsyncMutex::new(agent));
164
165        let ok = match self.agents.try_write() {
166            Ok(mut agents) => {
167                agents.insert(name.clone(), arc_agent);
168                true
169            }
170            Err(_) => {
171                warn!(subagent = %name, "Lock contention on agents map, registration deferred");
172                false
173            }
174        };
175
176        if ok {
177            if let Ok(mut defs) = self.definitions.try_write() {
178                defs.insert(name.clone(), def);
179            } else {
180                warn!(subagent = %name, "Lock contention on definitions map");
181            }
182            self.event_bus
183                .emit(super::events::SubagentEvent::Registered { name });
184        }
185
186        ok
187    }
188
189    /// Register a definition with a factory for lazy instantiation.
190    pub async fn register_factory(&self, def: SubagentDefinition, factory: Arc<dyn AgentFactory>) {
191        let name = def.name.clone();
192        debug!(subagent = %name, "Registering subagent factory");
193
194        {
195            let mut defs = self.definitions.write().await;
196            defs.insert(name.clone(), def);
197        }
198        {
199            let mut facts = self.factories.write().await;
200            facts.insert(name.clone(), factory);
201        }
202    }
203
204    /// Remove a subagent by name.
205    pub async fn remove(&self, name: &str) {
206        {
207            let mut agents = self.agents.write().await;
208            agents.remove(name);
209        }
210        {
211            let mut defs = self.definitions.write().await;
212            defs.remove(name);
213        }
214        {
215            let mut facts = self.factories.write().await;
216            facts.remove(name);
217        }
218
219        self.event_bus
220            .emit(super::events::SubagentEvent::Unregistered {
221                name: name.to_string(),
222            });
223    }
224
225    // ── Lookup ────────────────────────────────────────────────────────────
226
227    /// Look up a registered subagent. Returns None if not found.
228    pub async fn get(&self, name: &str) -> Option<RegisteredSubagent> {
229        let agents = self.agents.read().await;
230        let defs = self.definitions.read().await;
231
232        let definition = defs.get(name).cloned()?;
233        let has_instance = agents.contains_key(name);
234
235        Some(RegisteredSubagent {
236            definition,
237            has_instance,
238        })
239    }
240
241    /// Get the agent instance for immediate execution.
242    ///
243    /// If the agent was registered via factory and not yet instantiated,
244    /// this will create it on demand.
245    ///
246    /// Uses a loop with timeout to handle concurrent instantiation attempts
247    /// rather than relying on a single `notified()` call.
248    pub async fn get_agent(&self, name: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
249        use std::time::Duration;
250
251        // Check if already instantiated
252        {
253            let agents = self.agents.read().await;
254            if let Some(agent) = agents.get(name) {
255                return Some(agent.clone());
256            }
257        }
258
259        // Try factory
260        let factory_arc = {
261            let factories = self.factories.read().await;
262            factories.get(name).cloned()
263        };
264
265        if let Some(factory) = factory_arc {
266            // Prevent concurrent double-instantiation
267            {
268                let mut in_progress = self.instantiating.write().await;
269                if in_progress.contains(name) {
270                    debug!(subagent = %name, "Factory instantiation already in progress, waiting");
271                    drop(in_progress);
272                    // Loop-check with timeout instead of single notified()
273                    let timeout = Duration::from_secs(30);
274                    let start = std::time::Instant::now();
275                    loop {
276                        tokio::time::sleep(Duration::from_millis(50)).await;
277                        // Check if agent has been created
278                        {
279                            let agents = self.agents.read().await;
280                            if let Some(agent) = agents.get(name) {
281                                return Some(agent.clone());
282                            }
283                        }
284                        // Check if instantiation failed (removed from instantiating but not in agents)
285                        {
286                            let in_progress = self.instantiating.read().await;
287                            if !in_progress.contains(name) {
288                                // Instantiation finished (success or failure), re-check agents
289                                let agents = self.agents.read().await;
290                                return agents.get(name).cloned();
291                            }
292                        }
293                        if start.elapsed() > timeout {
294                            warn!(subagent = %name, "Timeout waiting for agent instantiation");
295                            return None;
296                        }
297                    }
298                }
299                in_progress.insert(name.to_string());
300            }
301
302            info!(subagent = %name, "Instantiating agent from factory");
303            let result = factory.create().await;
304
305            // Clean up instantiating guard and notify waiters
306            {
307                let mut in_progress = self.instantiating.write().await;
308                in_progress.remove(name);
309            }
310            self.instantiating_done.notify_waiters();
311
312            match result {
313                Ok(agent) => {
314                    let arc_agent = Arc::new(AsyncMutex::new(agent));
315                    let mut agents = self.agents.write().await;
316                    agents.insert(name.to_string(), arc_agent.clone());
317                    // Remove factory after successful instantiation
318                    drop(agents);
319                    let mut facts = self.factories.write().await;
320                    facts.remove(name);
321                    return Some(arc_agent);
322                }
323                Err(e) => {
324                    warn!(subagent = %name, error = %e, "Factory instantiation failed");
325                    return None;
326                }
327            }
328        }
329
330        None
331    }
332
333    /// Check if a subagent is registered.
334    pub async fn contains(&self, name: &str) -> bool {
335        let defs = self.definitions.read().await;
336        defs.contains_key(name)
337    }
338
339    /// List all available subagent definitions.
340    pub async fn list_available(&self) -> Vec<SubagentDefinition> {
341        let defs = self.definitions.read().await;
342        defs.values().cloned().collect()
343    }
344
345    /// List subagent definitions matching a tag.
346    pub async fn list_by_tag(&self, tag: &str) -> Vec<SubagentDefinition> {
347        let defs = self.definitions.read().await;
348        defs.values()
349            .filter(|d| d.tags.iter().any(|t| t == tag))
350            .cloned()
351            .collect()
352    }
353
354    /// Get agent names for tool description (convenience).
355    pub async fn agent_names(&self) -> Vec<String> {
356        let defs = self.definitions.read().await;
357        defs.keys().cloned().collect()
358    }
359
360    /// Get the event bus reference.
361    pub fn event_bus(&self) -> &SubagentEventBus {
362        &self.event_bus
363    }
364
365    /// Get the underlying agents map (for backward compat).
366    pub fn agents_map(&self) -> AgentMap {
367        self.agents.clone()
368    }
369}
370
371impl Default for SubagentRegistry {
372    fn default() -> Self {
373        Self::new()
374    }
375}
376
377impl Clone for SubagentRegistry {
378    fn clone(&self) -> Self {
379        Self {
380            agents: self.agents.clone(),
381            definitions: self.definitions.clone(),
382            factories: self.factories.clone(),
383            instantiating: self.instantiating.clone(),
384            instantiating_done: self.instantiating_done.clone(),
385            event_bus: self.event_bus.clone(),
386        }
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::testing::MockAgent;
394
395    #[tokio::test]
396    async fn test_register_and_get() {
397        let registry = SubagentRegistry::new();
398        let agent = MockAgent::new("researcher");
399        let def = SubagentDefinition::new("researcher", "Researches topics");
400
401        registry.register(def, Box::new(agent)).await;
402
403        assert!(registry.contains("researcher").await);
404        let registered = registry.get("researcher").await.unwrap();
405        assert_eq!(registered.definition.name, "researcher");
406        assert!(registered.has_instance);
407    }
408
409    #[tokio::test]
410    async fn test_remove() {
411        let registry = SubagentRegistry::new();
412        let agent = MockAgent::new("worker");
413        let def = SubagentDefinition::new("worker", "Worker agent");
414
415        registry.register(def, Box::new(agent)).await;
416        assert!(registry.contains("worker").await);
417
418        registry.remove("worker").await;
419        assert!(!registry.contains("worker").await);
420    }
421
422    #[tokio::test]
423    async fn test_list_available() {
424        let registry = SubagentRegistry::new();
425
426        let a1 = MockAgent::new("a1");
427        let a2 = MockAgent::new("a2");
428
429        registry
430            .register(SubagentDefinition::new("a1", "Agent 1"), Box::new(a1))
431            .await;
432        registry
433            .register(SubagentDefinition::new("a2", "Agent 2"), Box::new(a2))
434            .await;
435
436        let available = registry.list_available().await;
437        assert_eq!(available.len(), 2);
438    }
439
440    #[tokio::test]
441    async fn test_list_by_tag() {
442        let registry = SubagentRegistry::new();
443
444        let mut def1 = SubagentDefinition::new("researcher", "Research");
445        def1.tags.push("research".into());
446        let mut def2 = SubagentDefinition::new("writer", "Write");
447        def2.tags.push("writing".into());
448
449        registry
450            .register(def1, Box::new(MockAgent::new("researcher")))
451            .await;
452        registry
453            .register(def2, Box::new(MockAgent::new("writer")))
454            .await;
455
456        let research = registry.list_by_tag("research").await;
457        assert_eq!(research.len(), 1);
458        assert_eq!(research[0].name, "researcher");
459    }
460
461    #[tokio::test]
462    async fn test_get_agent() {
463        let registry = SubagentRegistry::new();
464        let agent = MockAgent::new("a");
465        registry
466            .register(SubagentDefinition::new("a", "A"), Box::new(agent))
467            .await;
468
469        let handle = registry.get_agent("a").await;
470        assert!(handle.is_some());
471    }
472
473    #[tokio::test]
474    async fn test_agent_names() {
475        let registry = SubagentRegistry::new();
476        assert!(registry.agent_names().await.is_empty());
477
478        registry
479            .register(
480                SubagentDefinition::new("x", "X"),
481                Box::new(MockAgent::new("x")),
482            )
483            .await;
484
485        let names = registry.agent_names().await;
486        assert_eq!(names, vec!["x"]);
487    }
488
489    #[tokio::test]
490    async fn test_factory_instantiation() {
491        let registry = SubagentRegistry::new();
492
493        let factory = Arc::new(FnAgentFactory::new(|| {
494            Box::pin(async {
495                Ok(
496                    Box::new(MockAgent::new("lazy_agent").with_response("lazy result"))
497                        as Box<dyn Agent>,
498                )
499            })
500        }));
501
502        let def = SubagentDefinition::new("lazy_agent", "Lazy agent");
503        registry.register_factory(def, factory).await;
504
505        // Should be registered but not yet instantiated
506        let registered = registry.get("lazy_agent").await.unwrap();
507        assert_eq!(registered.definition.name, "lazy_agent");
508        assert!(!registered.has_instance);
509
510        // get_agent should trigger factory instantiation
511        let handle = registry.get_agent("lazy_agent").await;
512        assert!(handle.is_some());
513
514        // Now it should show as having an instance
515        let registered = registry.get("lazy_agent").await.unwrap();
516        assert!(registered.has_instance);
517
518        // Verify the agent actually works
519        let agent = handle.unwrap();
520        let agent = agent.lock().await;
521        let result = agent.execute("test").await.unwrap();
522        assert_eq!(result, "lazy result");
523    }
524
525    #[tokio::test]
526    async fn test_from_subagent_map() {
527        use crate::agent::SubAgentMap;
528
529        let map: SubAgentMap = Arc::new(std::sync::RwLock::new(HashMap::new()));
530        {
531            let mut m = map.write().unwrap();
532            m.insert(
533                "migrated".to_string(),
534                Arc::new(AsyncMutex::new(
535                    Box::new(MockAgent::new("migrated")) as Box<dyn Agent>
536                )),
537            );
538        }
539
540        let registry = SubagentRegistry::from_subagent_map(map);
541        assert!(registry.contains("migrated").await);
542
543        let handle = registry.get_agent("migrated").await;
544        assert!(handle.is_some());
545    }
546}