Skip to main content

modular_agent_core/
modular_agent.rs

1#[cfg(feature = "file")]
2use std::sync::{Arc, Mutex};
3
4use serde_json::Value;
5use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
6
7use crate::FnvIndexMap;
8use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::id::{new_id, update_ids};
14use crate::message::{self, AgentEventMessage};
15use crate::preset::{Preset, PresetInfo};
16use crate::registry;
17use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
18use crate::value::AgentValue;
19
20const MESSAGE_LIMIT: usize = 1024;
21const EVENT_CHANNEL_CAPACITY: usize = 256;
22
23/// The central orchestrator for the modular agent system.
24///
25/// `ModularAgent` manages agent lifecycle, connections, and message routing.
26/// It maintains agent instances, connection maps, and handles [`ModularAgentEvent`]s.
27///
28/// # Lifecycle
29///
30/// 1. [`init()`](Self::init) - Create instance and register agent definitions
31/// 2. [`ready()`](Self::ready) - Start the internal message loop
32/// 3. Load presets with [`open_preset_from_file()`](Self::open_preset_from_file) or [`add_preset()`](Self::add_preset)
33/// 4. [`start_preset()`](Self::start_preset) - Start agents in a preset
34/// 5. Interact via [`write_external_input()`](Self::write_external_input) and [`subscribe()`](Self::subscribe)
35/// 6. [`stop_preset()`](Self::stop_preset) - Stop agents
36/// 7. [`quit()`](Self::quit) - Shut down
37///
38/// # Example
39///
40/// ```rust,no_run
41/// use modular_agent_core::{ModularAgent, AgentValue, ModularAgentEvent};
42///
43/// #[tokio::main]
44/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
45///     // Initialize and start
46///     let ma = ModularAgent::init()?;
47///     ma.ready().await?;
48///
49///     // Load a preset
50///     let preset_id = ma.open_preset_from_file("my_preset.json", None).await?;
51///     ma.start_preset(&preset_id).await?;
52///
53///     // Send external input
54///     ma.write_external_input("input".to_string(), AgentValue::string("hello")).await?;
55///
56///     // Cleanup
57///     ma.stop_preset(&preset_id).await?;
58///     ma.quit();
59///     Ok(())
60/// }
61/// ```
62#[derive(Clone)]
63pub struct ModularAgent {
64    // agent id -> agent
65    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
66
67    // agent id -> sender
68    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
69
70    // channel name -> [external input agent id]
71    pub(crate) external_input_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
72
73    // channel name -> value
74    pub(crate) external_values: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
75
76    // source agent id -> [target agent id / source handle / target handle]
77    pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
78
79    // agent def name -> agent definition
80    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
81
82    // presets (preset id -> preset)
83    pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
84
85    // agent def name -> config
86    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
87
88    // message sender
89    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
90
91    // observers
92    pub(crate) observers: broadcast::Sender<ModularAgentEvent>,
93}
94
95impl ModularAgent {
96    /// Create a new `ModularAgent` instance without registering agents.
97    ///
98    /// For most use cases, prefer [`init()`](Self::init) which also registers
99    /// all agent definitions from the inventory.
100    pub fn new() -> Self {
101        let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
102        Self {
103            agents: Default::default(),
104            agent_txs: Default::default(),
105            external_input_agents: Default::default(),
106            external_values: Default::default(),
107            connections: Default::default(),
108            defs: Default::default(),
109            presets: Default::default(),
110            global_configs_map: Default::default(),
111            tx: Arc::new(Mutex::new(None)),
112            observers: tx,
113        }
114    }
115
116    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
117        self.tx
118            .lock()
119            .unwrap()
120            .clone()
121            .ok_or(AgentError::TxNotInitialized)
122    }
123
124    /// Initialize a new `ModularAgent` instance.
125    ///
126    /// This creates a new `ModularAgent` and registers all available agent definitions
127    /// from the inventory. Call [`ready`](Self::ready) after this to start the message loop.
128    ///
129    /// # Example
130    ///
131    /// ```rust,no_run
132    /// use modular_agent_core::ModularAgent;
133    ///
134    /// let ma = ModularAgent::init().unwrap();
135    /// ```
136    pub fn init() -> Result<Self, AgentError> {
137        let ma = Self::new();
138        ma.register_agents();
139        Ok(ma)
140    }
141
142    fn register_agents(&self) {
143        registry::register_inventory_agents(self);
144    }
145
146    /// Start the internal message loop.
147    ///
148    /// This must be called after [`init`](Self::init) before loading presets or sending messages.
149    /// The message loop handles routing between agents and external output events.
150    ///
151    /// # Example
152    ///
153    /// ```rust,no_run
154    /// use modular_agent_core::ModularAgent;
155    ///
156    /// #[tokio::main]
157    /// async fn main() {
158    ///     let ma = ModularAgent::init().unwrap();
159    ///     ma.ready().await.unwrap(); // Start the message loop
160    /// }
161    /// ```
162    pub async fn ready(&self) -> Result<(), AgentError> {
163        self.spawn_message_loop().await?;
164        Ok(())
165    }
166
167    /// Shut down the `ModularAgent`.
168    ///
169    /// This stops the internal message loop. Call [`stop_preset`](Self::stop_preset)
170    /// for each running preset before calling this method for graceful shutdown.
171    ///
172    /// # Example
173    ///
174    /// ```rust,no_run
175    /// # use modular_agent_core::ModularAgent;
176    /// # async fn example(ma: ModularAgent, preset_id: &str) {
177    /// // Stop all presets first
178    /// ma.stop_preset(preset_id).await.unwrap();
179    /// // Then quit
180    /// ma.quit();
181    /// # }
182    /// ```
183    pub fn quit(&self) {
184        let mut tx_lock = self.tx.lock().unwrap();
185        *tx_lock = None;
186    }
187
188    // Preset management
189
190    /// Create a new empty preset.
191    ///
192    /// Returns the id of the new preset. The preset is created with default settings
193    /// and contains no agents or connections initially.
194    pub fn new_preset(&self) -> Result<String, AgentError> {
195        let spec = PresetSpec::default();
196        let id = self.add_preset(spec)?;
197        Ok(id)
198    }
199
200    /// Create a new empty preset with the given name.
201    ///
202    /// Returns the id of the new preset.
203    pub fn new_preset_with_name(&self, name: String) -> Result<String, AgentError> {
204        let spec = PresetSpec::default();
205        let id = self.add_preset_with_name(spec, name)?;
206        Ok(id)
207    }
208
209    /// Get a preset by id.
210    ///
211    /// Returns `None` if no preset exists with the given id.
212    pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
213        let presets = self.presets.lock().unwrap();
214        presets.get(id).cloned()
215    }
216
217    /// Add a new preset with the given spec, and returns the id of the new preset.
218    ///
219    /// The ids of the given spec, including agents and connections, are changed to new unique ids.
220    /// This allows the same spec to be added multiple times without id conflicts.
221    pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
222        self.add_preset_raw(spec, None)
223    }
224
225    /// Add a new preset with the given name and spec, and returns the id of the new preset.
226    ///
227    /// The ids of the given spec, including agents and connections, are changed to new unique ids.
228    pub fn add_preset_with_name(
229        &self,
230        spec: PresetSpec,
231        name: String,
232    ) -> Result<String, AgentError> {
233        self.add_preset_raw(spec, Some(name))
234    }
235
236    fn add_preset_raw(&self, spec: PresetSpec, name: Option<String>) -> Result<String, AgentError> {
237        let mut preset = Preset::new(spec);
238        if let Some(name) = name {
239            preset.set_name(name);
240        }
241        let id = preset.id().to_string();
242
243        // add agents
244        for agent in &preset.spec().agents {
245            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
246                log::error!("Failed to add_agent {}: {}", agent.id, e);
247            }
248        }
249
250        // add connections
251        for connection in &preset.spec().connections {
252            self.add_connection_internal(connection.clone())
253                .unwrap_or_else(|e| {
254                    log::error!("Failed to add_connection {}: {}", connection.source, e);
255                });
256        }
257
258        // add the given preset into presets
259        let mut presets = self.presets.lock().unwrap();
260        if presets.contains_key(&id) {
261            return Err(AgentError::DuplicateId(id.into()));
262        }
263        presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
264
265        Ok(id)
266    }
267
268    /// Rename a preset by id.
269    pub async fn rename_preset(&self, id: &str, new_name: String) -> Result<(), AgentError> {
270        let preset = self
271            .get_preset(id)
272            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
273        let mut preset = preset.lock().await;
274        preset.set_name(new_name);
275        Ok(())
276    }
277
278    /// Remove a preset by id.
279    ///
280    /// Stops the preset if running, then removes all associated agents and connections.
281    pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
282        let preset = self
283            .get_preset(id)
284            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
285
286        let mut preset = preset.lock().await;
287        preset.stop(self).await.unwrap_or_else(|e| {
288            log::error!("Failed to stop preset {}: {}", id, e);
289        });
290
291        // Remove all agents and connections associated with the preset
292        for agent in &preset.spec().agents {
293            self.remove_agent_internal(&agent.id)
294                .await
295                .unwrap_or_else(|e| {
296                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
297                });
298        }
299        for connection in &preset.spec().connections {
300            self.remove_connection_internal(connection);
301        }
302
303        // Drop the preset lock before modifying the presets map
304        drop(preset);
305
306        // Remove the preset entry from the map
307        {
308            let mut presets = self.presets.lock().unwrap();
309            presets.swap_remove(id);
310        }
311
312        Ok(())
313    }
314
315    /// Start a preset by id.
316    ///
317    /// This starts all agents in the preset, enabling message flow between them.
318    /// Each agent's [`start()`](crate::AsAgent::start) method is called.
319    pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
320        let preset = self
321            .get_preset(id)
322            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
323        let mut preset = preset.lock().await;
324        preset.start(self).await?;
325
326        Ok(())
327    }
328
329    /// Stop a preset by id.
330    ///
331    /// This stops all agents in the preset, terminating message processing.
332    /// Each agent's [`stop()`](crate::AsAgent::stop) method is called.
333    pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
334        let preset = self
335            .get_preset(id)
336            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
337        let mut preset = preset.lock().await;
338        preset.stop(self).await?;
339
340        Ok(())
341    }
342
343    /// Open a preset from a JSON file.
344    ///
345    /// Reads the file, parses the JSON as a [`PresetSpec`], and adds it to the system.
346    /// Optionally provide a custom name for the preset.
347    ///
348    /// # Arguments
349    ///
350    /// * `path` - Path to the JSON preset file
351    /// * `name` - Optional custom name for the preset
352    #[cfg(feature = "file")]
353    pub async fn open_preset_from_file(
354        &self,
355        path: &str,
356        name: Option<String>,
357    ) -> Result<String, AgentError> {
358        let json_str =
359            std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
360        let spec = PresetSpec::from_json(&json_str)?;
361        let id = self.add_preset_raw(spec, name)?;
362        Ok(id)
363    }
364
365    /// Save a preset to a JSON file.
366    ///
367    /// Serializes the current preset state (including agent configs) to JSON
368    /// and writes it to the specified path.
369    #[cfg(feature = "file")]
370    pub async fn save_preset(&self, id: &str, path: &str) -> Result<(), AgentError> {
371        let Some(preset_spec) = self.get_preset_spec(id).await else {
372            return Err(AgentError::PresetNotFound(id.to_string()));
373        };
374        let json_str = preset_spec.to_json()?;
375        std::fs::write(path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
376        Ok(())
377    }
378
379    // PresetSpec
380
381    /// Get the current preset spec by id.
382    pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
383        let Some(preset) = self.get_preset(id) else {
384            return None;
385        };
386        let mut preset_spec = {
387            let preset = preset.lock().await;
388            preset.spec().clone()
389        };
390
391        // collect current agent specs in the preset
392        let mut agent_specs = Vec::new();
393        for agent in &preset_spec.agents {
394            if let Some(spec) = self.get_agent_spec(&agent.id).await {
395                agent_specs.push(spec);
396            }
397        }
398        preset_spec.agents = agent_specs;
399
400        // No need to change connections
401
402        Some(preset_spec)
403    }
404
405    /// Update the preset spec
406    pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
407        let preset = self
408            .get_preset(id)
409            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
410        let mut preset = preset.lock().await;
411        preset.update_spec(value)?;
412        Ok(())
413    }
414
415    // PresetInfo
416
417    /// Get info of the preset by id.
418    pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
419        let Some(preset) = self.get_preset(id) else {
420            return None;
421        };
422        Some(PresetInfo::from(&*preset.lock().await))
423    }
424
425    /// Get infos of all presets.
426    pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
427        let presets = {
428            let presets = self.presets.lock().unwrap();
429            presets.values().cloned().collect::<Vec<_>>()
430        };
431        let mut preset_infos = Vec::new();
432        for preset in presets {
433            let preset_guard = preset.lock().await;
434            preset_infos.push(PresetInfo::from(&*preset_guard));
435        }
436        preset_infos
437    }
438
439    // Agents
440
441    /// Register an agent definition.
442    ///
443    /// This makes the agent type available for use in presets. The definition
444    /// includes metadata (title, category), input/output ports, and config specs.
445    ///
446    /// Note: Agents using `#[modular_agent]` macro are registered automatically via inventory.
447    pub fn register_agent_definiton(&self, def: AgentDefinition) {
448        let def_name = def.name.clone();
449        let def_global_configs = def.global_configs.clone();
450
451        let mut defs = self.defs.lock().unwrap();
452        defs.insert(def.name.clone(), def);
453
454        // if there is a global config, set it
455        if let Some(def_global_configs) = def_global_configs {
456            let mut new_configs = AgentConfigs::default();
457            for (key, config_entry) in def_global_configs.iter() {
458                new_configs.set(key.clone(), config_entry.value.clone());
459            }
460            self.set_global_configs(def_name, new_configs);
461        }
462    }
463
464    /// Get all registered agent definitions.
465    ///
466    /// Returns a map of definition name to [`AgentDefinition`].
467    pub fn get_agent_definitions(&self) -> AgentDefinitions {
468        let defs = self.defs.lock().unwrap();
469        defs.clone()
470    }
471
472    /// Get an agent definition by name.
473    ///
474    /// The name is typically in the format `module::path::StructName`.
475    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
476        let defs = self.defs.lock().unwrap();
477        defs.get(def_name).cloned()
478    }
479
480    /// Get the config specs of an agent definition by name.
481    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
482        let defs = self.defs.lock().unwrap();
483        let Some(def) = defs.get(def_name) else {
484            return None;
485        };
486        def.configs.clone()
487    }
488
489    /// Get the agent spec by id.
490    pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
491        let agent = {
492            let agents = self.agents.lock().unwrap();
493            let Some(agent) = agents.get(agent_id) else {
494                return None;
495            };
496            agent.clone()
497        };
498        let agent = agent.lock().await;
499        Some(agent.spec().clone())
500    }
501
502    /// Update the agent spec by id.
503    pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
504        let agent = {
505            let agents = self.agents.lock().unwrap();
506            let Some(agent) = agents.get(agent_id) else {
507                return Err(AgentError::AgentNotFound(agent_id.to_string()));
508            };
509            agent.clone()
510        };
511        let mut agent = agent.lock().await;
512        agent.update_spec(value)?;
513        Ok(())
514    }
515
516    /// Create a new agent spec from the given agent definition name.
517    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
518        let def = self
519            .get_agent_definition(def_name)
520            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
521        Ok(def.to_spec())
522    }
523
524    /// Add an agent to the specified preset.
525    ///
526    /// Creates a new agent instance from the given spec and adds it to the preset.
527    /// Returns the id of the newly created agent. The agent is not started automatically;
528    /// call [`start_preset`](Self::start_preset) or [`start_agent`](Self::start_agent) to start it.
529    pub async fn add_agent(
530        &self,
531        preset_id: String,
532        mut spec: AgentSpec,
533    ) -> Result<String, AgentError> {
534        let preset = self
535            .get_preset(&preset_id)
536            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
537
538        let id = new_id();
539        spec.id = id.clone();
540        self.add_agent_internal(preset_id, spec.clone())?;
541
542        let mut preset = preset.lock().await;
543        preset.add_agent(spec.clone());
544
545        Ok(id)
546    }
547
548    fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
549        let mut agents = self.agents.lock().unwrap();
550        if agents.contains_key(&spec.id) {
551            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
552        }
553        let spec_id = spec.id.clone();
554        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
555        agent.set_preset_id(preset_id);
556        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
557        Ok(())
558    }
559
560    /// Get the agent by id.
561    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
562        let agents = self.agents.lock().unwrap();
563        agents.get(agent_id).cloned()
564    }
565
566    /// Add a connection between two agents in the specified preset.
567    ///
568    /// When the source agent outputs a value on the source handle (port),
569    /// it will be delivered to the target agent's target handle (port).
570    pub async fn add_connection(
571        &self,
572        preset_id: &str,
573        connection: ConnectionSpec,
574    ) -> Result<(), AgentError> {
575        // check if the source and target agents exist
576        {
577            let agents = self.agents.lock().unwrap();
578            if !agents.contains_key(&connection.source) {
579                return Err(AgentError::AgentNotFound(connection.source.to_string()));
580            }
581            if !agents.contains_key(&connection.target) {
582                return Err(AgentError::AgentNotFound(connection.target.to_string()));
583            }
584        }
585
586        // check if handles are valid
587        if connection.source_handle.is_empty() {
588            return Err(AgentError::EmptySourceHandle);
589        }
590        if connection.target_handle.is_empty() {
591            return Err(AgentError::EmptyTargetHandle);
592        }
593
594        let preset = self
595            .get_preset(preset_id)
596            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
597        let mut preset = preset.lock().await;
598        preset.add_connection(connection.clone());
599        self.add_connection_internal(connection)?;
600        Ok(())
601    }
602
603    fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
604        let mut connections = self.connections.lock().unwrap();
605        if let Some(targets) = connections.get_mut(&connection.source) {
606            if targets
607                .iter()
608                .any(|(target, source_handle, target_handle)| {
609                    *target == connection.target
610                        && *source_handle == connection.source_handle
611                        && *target_handle == connection.target_handle
612                })
613            {
614                return Err(AgentError::ConnectionAlreadyExists);
615            }
616            targets.push((
617                connection.target,
618                connection.source_handle,
619                connection.target_handle,
620            ));
621        } else {
622            connections.insert(
623                connection.source,
624                vec![(
625                    connection.target,
626                    connection.source_handle,
627                    connection.target_handle,
628                )],
629            );
630        }
631        Ok(())
632    }
633
634    /// Add agents and connections to the specified preset.
635    ///
636    /// The ids of the given agents and connections are changed to new unique ids.
637    /// The agents are not started automatically, even if the preset is running.
638    pub async fn add_agents_and_connections(
639        &self,
640        preset_id: &str,
641        agents: &Vec<AgentSpec>,
642        connections: &Vec<ConnectionSpec>,
643    ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
644        let (agents, connections) = update_ids(agents, connections);
645
646        let preset = self
647            .get_preset(preset_id)
648            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
649        let mut preset = preset.lock().await;
650
651        for agent in &agents {
652            self.add_agent_internal(preset_id.to_string(), agent.clone())?;
653            preset.add_agent(agent.clone());
654        }
655
656        for connection in &connections {
657            self.add_connection_internal(connection.clone())?;
658            preset.add_connection(connection.clone());
659        }
660
661        Ok((agents, connections))
662    }
663
664    /// Remove an agent from the specified preset.
665    ///
666    /// If the agent is running, it will be stopped first.
667    pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
668        {
669            let preset = self
670                .get_preset(preset_id)
671                .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
672            let mut preset = preset.lock().await;
673            preset.remove_agent(agent_id);
674        }
675        if let Err(e) = self.remove_agent_internal(agent_id).await {
676            return Err(e);
677        }
678        Ok(())
679    }
680
681    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
682        self.stop_agent(agent_id).await?;
683
684        // remove from connections
685        {
686            let mut connections = self.connections.lock().unwrap();
687            let mut sources_to_remove = Vec::new();
688            for (source, targets) in connections.iter_mut() {
689                targets.retain(|(target, _, _)| target != agent_id);
690                if targets.is_empty() {
691                    sources_to_remove.push(source.clone());
692                }
693            }
694            for source in sources_to_remove {
695                connections.swap_remove(&source);
696            }
697            connections.swap_remove(agent_id);
698        }
699
700        // remove from agents
701        {
702            let mut agents = self.agents.lock().unwrap();
703            agents.swap_remove(agent_id);
704        }
705
706        Ok(())
707    }
708
709    /// Remove a connection from the specified preset.
710    pub async fn remove_connection(
711        &self,
712        preset_id: &str,
713        connection: &ConnectionSpec,
714    ) -> Result<(), AgentError> {
715        let preset = self
716            .get_preset(preset_id)
717            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
718        let mut preset = preset.lock().await;
719        let Some(connection) = preset.remove_connection(connection) else {
720            return Err(AgentError::ConnectionNotFound(format!(
721                "{}:{}->{}:{}",
722                connection.source,
723                connection.source_handle,
724                connection.target,
725                connection.target_handle
726            )));
727        };
728        self.remove_connection_internal(&connection);
729        Ok(())
730    }
731
732    fn remove_connection_internal(&self, connection: &ConnectionSpec) {
733        let mut connections = self.connections.lock().unwrap();
734        if let Some(targets) = connections.get_mut(&connection.source) {
735            targets.retain(|(target, source_handle, target_handle)| {
736                *target != connection.target
737                    || *source_handle != connection.source_handle
738                    || *target_handle != connection.target_handle
739            });
740            if targets.is_empty() {
741                connections.swap_remove(&connection.source);
742            }
743        }
744    }
745
746    /// Start an agent by id.
747    ///
748    /// Creates a message channel for the agent and spawns its event loop.
749    /// The agent's [`start()`](crate::AsAgent::start) method is called, then
750    /// the agent begins processing incoming messages.
751    ///
752    /// If the agent's definition has `native_thread = true`, the agent runs
753    /// on a dedicated OS thread instead of the tokio runtime.
754    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
755        let agent = {
756            let agents = self.agents.lock().unwrap();
757            let Some(a) = agents.get(agent_id) else {
758                return Err(AgentError::AgentNotFound(agent_id.to_string()));
759            };
760            a.clone()
761        };
762        let def_name = {
763            let agent = agent.lock().await;
764            agent.def_name().to_string()
765        };
766        let uses_native_thread = {
767            let defs = self.defs.lock().unwrap();
768            let Some(def) = defs.get(&def_name) else {
769                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
770            };
771            def.native_thread
772        };
773        let agent_status = {
774            // This will not block since the agent is not started yet.
775            let agent = agent.lock().await;
776            agent.status().clone()
777        };
778        if agent_status == AgentStatus::Init {
779            log::info!("Starting agent {}", agent_id);
780
781            let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
782
783            {
784                let mut agent_txs = self.agent_txs.lock().unwrap();
785                agent_txs.insert(agent_id.to_string(), tx.clone());
786            };
787
788            let agent_clone = agent.clone();
789            let agent_id_clone = agent_id.to_string();
790
791            let agent_loop = async move {
792                {
793                    let mut agent_guard = agent_clone.lock().await;
794                    if let Err(e) = agent_guard.start().await {
795                        log::error!("Failed to start agent {}: {}", agent_id_clone, e);
796                        return;
797                    }
798                }
799
800                while let Some(message) = rx.recv().await {
801                    match message {
802                        AgentMessage::Input { ctx, port, value } => {
803                            agent_clone
804                                .lock()
805                                .await
806                                .process(ctx, port, value)
807                                .await
808                                .unwrap_or_else(|e| {
809                                    log::error!("Process Error {}: {}", agent_id_clone, e);
810                                });
811                        }
812                        AgentMessage::Config { key, value } => {
813                            agent_clone
814                                .lock()
815                                .await
816                                .set_config(key, value)
817                                .unwrap_or_else(|e| {
818                                    log::error!("Config Error {}: {}", agent_id_clone, e);
819                                });
820                        }
821                        AgentMessage::Configs { configs } => {
822                            agent_clone
823                                .lock()
824                                .await
825                                .set_configs(configs)
826                                .unwrap_or_else(|e| {
827                                    log::error!("Configs Error {}: {}", agent_id_clone, e);
828                                });
829                        }
830                        AgentMessage::Stop => {
831                            rx.close();
832                            break;
833                        }
834                    }
835                }
836            };
837
838            if uses_native_thread {
839                std::thread::spawn(move || {
840                    let rt = tokio::runtime::Builder::new_current_thread()
841                        .enable_all()
842                        .build()
843                        .unwrap();
844                    rt.block_on(agent_loop);
845                });
846            } else {
847                tokio::spawn(agent_loop);
848            }
849        }
850        Ok(())
851    }
852
853    /// Stop an agent by id.
854    ///
855    /// Sends a stop message to the agent, closes its message channel,
856    /// and calls the agent's [`stop()`](crate::AsAgent::stop) method.
857    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
858        {
859            // remove the sender first to prevent new messages being sent
860            let mut agent_txs = self.agent_txs.lock().unwrap();
861            if let Some(tx) = agent_txs.swap_remove(agent_id) {
862                if let Err(e) = tx.try_send(AgentMessage::Stop) {
863                    log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
864                }
865            }
866        }
867
868        let agent = {
869            let agents = self.agents.lock().unwrap();
870            let Some(a) = agents.get(agent_id) else {
871                return Err(AgentError::AgentNotFound(agent_id.to_string()));
872            };
873            a.clone()
874        };
875        let mut agent_guard = agent.lock().await;
876        if *agent_guard.status() == AgentStatus::Start {
877            log::info!("Stopping agent {}", agent_id);
878            agent_guard.stop().await?;
879        }
880
881        Ok(())
882    }
883
884    /// Set configs for an agent by id.
885    pub async fn set_agent_configs(
886        &self,
887        agent_id: String,
888        configs: AgentConfigs,
889    ) -> Result<(), AgentError> {
890        let tx = {
891            let agent_txs = self.agent_txs.lock().unwrap();
892            agent_txs.get(&agent_id).cloned()
893        };
894
895        let Some(tx) = tx else {
896            // The agent is not running. We can set the configs directly.
897            let agent = {
898                let agents = self.agents.lock().unwrap();
899                let Some(a) = agents.get(&agent_id) else {
900                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
901                };
902                a.clone()
903            };
904            agent.lock().await.set_configs(configs.clone())?;
905            return Ok(());
906        };
907        let message = AgentMessage::Configs { configs };
908        tx.send(message).await.map_err(|_| {
909            AgentError::SendMessageFailed("Failed to send config message".to_string())
910        })?;
911        Ok(())
912    }
913
914    /// Get global configs for the agent definition by name.
915    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
916        let global_configs_map = self.global_configs_map.lock().unwrap();
917        global_configs_map.get(def_name).cloned()
918    }
919
920    /// Set global configs for the agent definition by name.
921    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
922        let mut global_configs_map = self.global_configs_map.lock().unwrap();
923
924        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
925            global_configs_map.insert(def_name, configs);
926            return;
927        };
928
929        for (key, value) in configs {
930            existing_configs.set(key, value);
931        }
932    }
933
934    /// Get the global configs map.
935    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
936        let global_configs_map = self.global_configs_map.lock().unwrap();
937        global_configs_map.clone()
938    }
939
940    /// Set the global configs map.
941    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
942        for (agent_name, new_configs) in new_configs_map {
943            self.set_global_configs(agent_name, new_configs);
944        }
945    }
946
947    /// Send input to an agent.
948    pub(crate) async fn agent_input(
949        &self,
950        agent_id: String,
951        ctx: AgentContext,
952        port: String,
953        value: AgentValue,
954    ) -> Result<(), AgentError> {
955        let message = if port.starts_with("config:") {
956            let config_key = port[7..].to_string();
957            AgentMessage::Config {
958                key: config_key,
959                value,
960            }
961        } else {
962            AgentMessage::Input {
963                ctx,
964                port: port.clone(),
965                value,
966            }
967        };
968
969        let tx = {
970            let agent_txs = self.agent_txs.lock().unwrap();
971            agent_txs.get(&agent_id).cloned()
972        };
973
974        let Some(tx) = tx else {
975            // The agent is not running. If it's a config message, we can set it directly.
976            let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
977                let agents = self.agents.lock().unwrap();
978                let Some(a) = agents.get(&agent_id) else {
979                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
980                };
981                a.clone()
982            };
983            if let AgentMessage::Config { key, value } = message {
984                agent.lock().await.set_config(key, value)?;
985            }
986            return Ok(());
987        };
988        tx.send(message).await.map_err(|_| {
989            AgentError::SendMessageFailed("Failed to send input message".to_string())
990        })?;
991
992        self.emit_agent_input(agent_id.to_string(), port);
993
994        Ok(())
995    }
996
997    /// Send output from an agent. (Async version)
998    pub async fn send_agent_out(
999        &self,
1000        agent_id: String,
1001        ctx: AgentContext,
1002        port: String,
1003        value: AgentValue,
1004    ) -> Result<(), AgentError> {
1005        message::send_agent_out(self, agent_id, ctx, port, value).await
1006    }
1007
1008    /// Send output from an agent.
1009    pub fn try_send_agent_out(
1010        &self,
1011        agent_id: String,
1012        ctx: AgentContext,
1013        port: String,
1014        value: AgentValue,
1015    ) -> Result<(), AgentError> {
1016        message::try_send_agent_out(self, agent_id, ctx, port, value)
1017    }
1018
1019    /// Write a value to a named channel.
1020    ///
1021    /// This is the primary method for sending external input into the agent network.
1022    /// The value will be delivered to all [`ExternalInputAgent`](crate::external_agent::ExternalInputAgent)
1023    /// instances listening to the specified channel name, which will then forward it to
1024    /// their connected agents.
1025    ///
1026    /// # Arguments
1027    ///
1028    /// * `name` - The channel name to write to. Must match the `name` config of an `ExternalInputAgent`.
1029    /// * `value` - The value to send.
1030    ///
1031    /// # Example
1032    ///
1033    /// ```rust,no_run
1034    /// # use modular_agent_core::{ModularAgent, AgentValue};
1035    /// # async fn example(ma: ModularAgent) {
1036    /// // Send a string to the "input" channel
1037    /// ma.write_external_input("input".to_string(), AgentValue::string("hello")).await.unwrap();
1038    ///
1039    /// // Send an integer
1040    /// ma.write_external_input("numbers".to_string(), AgentValue::integer(42)).await.unwrap();
1041    /// # }
1042    /// ```
1043    pub async fn write_external_input(
1044        &self,
1045        name: String,
1046        value: AgentValue,
1047    ) -> Result<(), AgentError> {
1048        self.send_external_output(name, AgentContext::new(), value).await
1049    }
1050
1051    /// Write a value to the local variable channel.
1052    pub async fn write_local_input(
1053        &self,
1054        preset_id: &str,
1055        name: &str,
1056        value: AgentValue,
1057    ) -> Result<(), AgentError> {
1058        let channel_name = format!("%{}/{}", preset_id, name);
1059        self.send_external_output(channel_name, AgentContext::new(), value)
1060            .await
1061    }
1062
1063    pub(crate) async fn send_external_output(
1064        &self,
1065        name: String,
1066        ctx: AgentContext,
1067        value: AgentValue,
1068    ) -> Result<(), AgentError> {
1069        message::send_external_output(self, name, ctx, value).await
1070    }
1071
1072    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
1073        // TODO: settings for the channel size
1074        let (tx, mut rx) = mpsc::channel(4096);
1075        {
1076            let mut tx_lock = self.tx.lock().unwrap();
1077            *tx_lock = Some(tx);
1078        }
1079
1080        // spawn the main loop
1081        let ma = self.clone();
1082        tokio::spawn(async move {
1083            while let Some(message) = rx.recv().await {
1084                use AgentEventMessage::*;
1085
1086                match message {
1087                    AgentOut {
1088                        agent,
1089                        ctx,
1090                        port,
1091                        value,
1092                    } => {
1093                        message::agent_out(&ma, agent, ctx, port, value).await;
1094                    }
1095                    ExternalOutput { name, ctx, value } => {
1096                        message::external_input(&ma, name, ctx, value).await;
1097                    }
1098                }
1099            }
1100        });
1101
1102        tokio::task::yield_now().await;
1103
1104        Ok(())
1105    }
1106
1107    /// Subscribe to all `ModularAgent` events.
1108    ///
1109    /// Returns a broadcast receiver that receives all [`ModularAgentEvent`]s.
1110    /// For filtered subscriptions, use [`subscribe_to_event`](Self::subscribe_to_event).
1111    ///
1112    /// **Note**: Subscribe before starting presets to avoid missing events.
1113    pub fn subscribe(&self) -> broadcast::Receiver<ModularAgentEvent> {
1114        self.observers.subscribe()
1115    }
1116
1117    /// Subscribe to filtered [`ModularAgentEvent`]s.
1118    ///
1119    /// This method creates a filtered subscription to events. The provided closure
1120    /// filters and maps events, and only successfully mapped events are forwarded
1121    /// to the returned receiver.
1122    ///
1123    /// **Important**: Subscribe to events BEFORE starting presets to avoid missing
1124    /// events due to race conditions.
1125    ///
1126    /// # Arguments
1127    ///
1128    /// * `filter_map` - A closure that receives each event and returns `Some(T)` for
1129    ///   events you want to receive, or `None` to skip them.
1130    ///
1131    /// # Returns
1132    ///
1133    /// An unbounded receiver that will receive the filtered and mapped events.
1134    ///
1135    /// # Example
1136    ///
1137    /// ```rust,no_run
1138    /// use modular_agent_core::{ModularAgent, ModularAgentEvent, AgentValue};
1139    ///
1140    /// # async fn example(ma: &ModularAgent) {
1141    /// // Subscribe to a specific channel's output
1142    /// let output_channel = "output".to_string();
1143    /// let mut output_rx = ma.subscribe_to_event(move |event| {
1144    ///     if let ModularAgentEvent::ExternalOutput(name, value) = event {
1145    ///         if name == output_channel {
1146    ///             return Some(value);
1147    ///         }
1148    ///     }
1149    ///     None
1150    /// });
1151    ///
1152    /// // Now start the preset and receive events
1153    /// while let Some(value) = output_rx.recv().await {
1154    ///     println!("Received: {:?}", value);
1155    /// }
1156    /// # }
1157    /// ```
1158    pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
1159    where
1160        F: FnMut(ModularAgentEvent) -> Option<T> + Send + 'static,
1161        T: Send + 'static,
1162    {
1163        let (tx, rx) = mpsc::unbounded_channel();
1164        let mut event_rx = self.subscribe();
1165
1166        tokio::spawn(async move {
1167            loop {
1168                match event_rx.recv().await {
1169                    Ok(event) => {
1170                        if let Some(mapped_event) = filter_map(event) {
1171                            if tx.send(mapped_event).is_err() {
1172                                // Receiver dropped, task can exit
1173                                break;
1174                            }
1175                        }
1176                    }
1177                    Err(RecvError::Lagged(n)) => {
1178                        log::warn!("Event subscriber lagged by {} events", n);
1179                    }
1180                    Err(RecvError::Closed) => {
1181                        // Sender dropped, task can exit
1182                        break;
1183                    }
1184                }
1185            }
1186        });
1187        rx
1188    }
1189
1190    pub(crate) fn emit_agent_config_updated(
1191        &self,
1192        agent_id: String,
1193        key: String,
1194        value: AgentValue,
1195    ) {
1196        self.notify_observers(ModularAgentEvent::AgentConfigUpdated(agent_id, key, value));
1197    }
1198
1199    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
1200        self.notify_observers(ModularAgentEvent::AgentError(agent_id, message));
1201    }
1202
1203    pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
1204        self.notify_observers(ModularAgentEvent::AgentIn(agent_id, port));
1205    }
1206
1207    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
1208        self.notify_observers(ModularAgentEvent::AgentSpecUpdated(agent_id));
1209    }
1210
1211    pub(crate) fn emit_external_output(&self, name: String, value: AgentValue) {
1212        // // ignore local variables
1213        // if name.starts_with('%') {
1214        //     return;
1215        // }
1216        self.notify_observers(ModularAgentEvent::ExternalOutput(name, value));
1217    }
1218
1219    fn notify_observers(&self, event: ModularAgentEvent) {
1220        let _ = self.observers.send(event);
1221    }
1222}
1223
1224/// Events emitted by [`ModularAgent`] during operation.
1225///
1226/// Subscribe to these events using [`ModularAgent::subscribe`] or
1227/// [`ModularAgent::subscribe_to_event`].
1228///
1229/// # Example
1230///
1231/// ```rust,no_run
1232/// use modular_agent_core::{ModularAgent, ModularAgentEvent};
1233///
1234/// # fn example(ma: &ModularAgent) {
1235/// // Subscribe to all external output events
1236/// let mut rx = ma.subscribe_to_event(|event| {
1237///     if let ModularAgentEvent::ExternalOutput(name, value) = event {
1238///         Some((name, value))
1239///     } else {
1240///         None
1241///     }
1242/// });
1243/// # }
1244/// ```
1245#[derive(Clone, Debug)]
1246pub enum ModularAgentEvent {
1247    /// An agent's configuration was updated.
1248    ///
1249    /// Fields: `(agent_id, config_key, new_value)`
1250    AgentConfigUpdated(String, String, AgentValue),
1251
1252    /// An agent encountered an error.
1253    ///
1254    /// Fields: `(agent_id, error_message)`
1255    AgentError(String, String),
1256
1257    /// An agent received input on a port.
1258    ///
1259    /// Fields: `(agent_id, port_name)`
1260    AgentIn(String, String),
1261
1262    /// An agent's spec was updated.
1263    ///
1264    /// Fields: `(agent_id)`
1265    AgentSpecUpdated(String),
1266
1267    /// A value was written to an external output channel.
1268    ///
1269    /// This event is emitted when:
1270    /// - [`ModularAgent::write_external_input`] is called and flows through the network
1271    /// - An [`ExternalOutputAgent`](crate::external_agent::ExternalOutputAgent) receives a value
1272    ///
1273    /// Fields: `(channel_name, value)`
1274    ExternalOutput(String, AgentValue),
1275}