Skip to main content

modular_agent_core/
modular_agent.rs

1#[cfg(feature = "file")]
2use std::sync::{Arc, Mutex};
3
4use serde_json::Value;
5use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
6
7use crate::FnvIndexMap;
8use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::id::{new_id, update_ids};
14use crate::message::{self, AgentEventMessage};
15use crate::preset::{Preset, PresetInfo};
16use crate::registry;
17use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
18use crate::value::AgentValue;
19
20const MESSAGE_LIMIT: usize = 1024;
21const EVENT_CHANNEL_CAPACITY: usize = 256;
22
23#[derive(Clone)]
24pub struct ModularAgent {
25    // agent id -> agent
26    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
27
28    // agent id -> sender
29    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
30
31    // board name -> [board out agent id]
32    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
33
34    // board name -> value
35    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
36
37    // source agent id -> [target agent id / source handle / target handle]
38    pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
39
40    // agent def name -> agent definition
41    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
42
43    // presets (preset id -> preset)
44    pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
45
46    // agent def name -> config
47    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
48
49    // message sender
50    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
51
52    // observers
53    pub(crate) observers: broadcast::Sender<MAKEvent>,
54}
55
56impl ModularAgent {
57    pub fn new() -> Self {
58        let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59        Self {
60            agents: Default::default(),
61            agent_txs: Default::default(),
62            board_out_agents: Default::default(),
63            board_value: Default::default(),
64            connections: Default::default(),
65            defs: Default::default(),
66            presets: Default::default(),
67            global_configs_map: Default::default(),
68            tx: Arc::new(Mutex::new(None)),
69            observers: tx,
70        }
71    }
72
73    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
74        self.tx
75            .lock()
76            .unwrap()
77            .clone()
78            .ok_or(AgentError::TxNotInitialized)
79    }
80
81    /// Initialize ModularAgent.
82    pub fn init() -> Result<Self, AgentError> {
83        let ma = Self::new();
84        ma.register_agents();
85        Ok(ma)
86    }
87
88    fn register_agents(&self) {
89        registry::register_inventory_agents(self);
90    }
91
92    /// Prepare ModularAgent to be ready.
93    pub async fn ready(&self) -> Result<(), AgentError> {
94        self.spawn_message_loop().await?;
95        Ok(())
96    }
97
98    /// Quit ModularAgent.
99    pub fn quit(&self) {
100        let mut tx_lock = self.tx.lock().unwrap();
101        *tx_lock = None;
102    }
103
104    // Preset management
105
106    /// Create a new preset.
107    /// Returns the id of the new preset.
108    pub fn new_preset(&self) -> Result<String, AgentError> {
109        let spec = PresetSpec::default();
110        let id = self.add_preset(spec)?;
111        Ok(id)
112    }
113
114    /// Create a new preset with the given name.
115    /// Returns the id of the new preset.
116    pub fn new_preset_with_name(&self, name: String) -> Result<String, AgentError> {
117        let spec = PresetSpec::default();
118        let id = self.add_preset_with_name(spec, name)?;
119        Ok(id)
120    }
121
122    /// Get a preset by id.
123    pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
124        let presets = self.presets.lock().unwrap();
125        presets.get(id).cloned()
126    }
127
128    /// Add a new preset with the given spec, and returns the id of the new preset.
129    ///
130    /// The ids of the given spec, including agents and connections, are changed to new unique ids.
131    pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
132        self.add_preset_raw(spec, None)
133    }
134
135    /// Add a new preset with the given name and spec, and returns the id of the new preset.
136    ///
137    /// The ids of the given spec, including agents and connections, are changed to new unique ids.
138    pub fn add_preset_with_name(
139        &self,
140        spec: PresetSpec,
141        name: String,
142    ) -> Result<String, AgentError> {
143        self.add_preset_raw(spec, Some(name))
144    }
145
146    fn add_preset_raw(&self, spec: PresetSpec, name: Option<String>) -> Result<String, AgentError> {
147        let mut preset = Preset::new(spec);
148        if let Some(name) = name {
149            preset.set_name(name);
150        }
151        let id = preset.id().to_string();
152
153        // add agents
154        for agent in &preset.spec().agents {
155            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
156                log::error!("Failed to add_agent {}: {}", agent.id, e);
157            }
158        }
159
160        // add connections
161        for connection in &preset.spec().connections {
162            self.add_connection_internal(connection.clone())
163                .unwrap_or_else(|e| {
164                    log::error!("Failed to add_connection {}: {}", connection.source, e);
165                });
166        }
167
168        // add the given preset into presets
169        let mut presets = self.presets.lock().unwrap();
170        if presets.contains_key(&id) {
171            return Err(AgentError::DuplicateId(id.into()));
172        }
173        presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
174
175        Ok(id)
176    }
177
178    /// Remove an preset by id.
179    pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
180        let preset = self
181            .get_preset(id)
182            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
183
184        let mut preset = preset.lock().await;
185        preset.stop(self).await.unwrap_or_else(|e| {
186            log::error!("Failed to stop preset {}: {}", id, e);
187        });
188
189        // Remove all agents and connections associated with the preset
190        for agent in &preset.spec().agents {
191            self.remove_agent_internal(&agent.id)
192                .await
193                .unwrap_or_else(|e| {
194                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
195                });
196        }
197        for connection in &preset.spec().connections {
198            self.remove_connection_internal(connection);
199        }
200
201        Ok(())
202    }
203
204    /// Start a preset by id.
205    pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
206        let preset = self
207            .get_preset(id)
208            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
209        let mut preset = preset.lock().await;
210        preset.start(self).await?;
211
212        Ok(())
213    }
214
215    /// Stop a preset by id.
216    pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
217        let preset = self
218            .get_preset(id)
219            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
220        let mut preset = preset.lock().await;
221        preset.stop(self).await?;
222
223        Ok(())
224    }
225
226    /// Open a preset
227    #[cfg(feature = "file")]
228    pub async fn open_preset_from_file(
229        &self,
230        path: &str,
231        name: Option<String>,
232    ) -> Result<String, AgentError> {
233        let json_str =
234            std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
235        let spec = PresetSpec::from_json(&json_str)?;
236        let id = self.add_preset_raw(spec, name)?;
237        Ok(id)
238    }
239
240    /// Save a preset.
241    #[cfg(feature = "file")]
242    pub async fn save_preset(&self, id: &str, path: &str) -> Result<(), AgentError> {
243        let Some(preset_spec) = self.get_preset_spec(id).await else {
244            return Err(AgentError::PresetNotFound(id.to_string()));
245        };
246        let json_str = preset_spec.to_json()?;
247        std::fs::write(path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
248        Ok(())
249    }
250
251    // PresetSpec
252
253    /// Get the current preset spec by id.
254    pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
255        let Some(preset) = self.get_preset(id) else {
256            return None;
257        };
258        let mut preset_spec = {
259            let preset = preset.lock().await;
260            preset.spec().clone()
261        };
262
263        // collect current agent specs in the preset
264        let mut agent_specs = Vec::new();
265        for agent in &preset_spec.agents {
266            if let Some(spec) = self.get_agent_spec(&agent.id).await {
267                agent_specs.push(spec);
268            }
269        }
270        preset_spec.agents = agent_specs;
271
272        // No need to change connections
273
274        Some(preset_spec)
275    }
276
277    /// Update the preset spec
278    pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
279        let preset = self
280            .get_preset(id)
281            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
282        let mut preset = preset.lock().await;
283        preset.update_spec(value)?;
284        Ok(())
285    }
286
287    // PresetInfo
288
289    /// Get info of the preset by id.
290    pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
291        let Some(preset) = self.get_preset(id) else {
292            return None;
293        };
294        Some(PresetInfo::from(&*preset.lock().await))
295    }
296
297    /// Get infos of all presets.
298    pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
299        let presets = {
300            let presets = self.presets.lock().unwrap();
301            presets.values().cloned().collect::<Vec<_>>()
302        };
303        let mut preset_infos = Vec::new();
304        for preset in presets {
305            let preset_guard = preset.lock().await;
306            preset_infos.push(PresetInfo::from(&*preset_guard));
307        }
308        preset_infos
309    }
310
311    // Agents
312
313    /// Register an agent definition.
314    pub fn register_agent_definiton(&self, def: AgentDefinition) {
315        let def_name = def.name.clone();
316        let def_global_configs = def.global_configs.clone();
317
318        let mut defs = self.defs.lock().unwrap();
319        defs.insert(def.name.clone(), def);
320
321        // if there is a global config, set it
322        if let Some(def_global_configs) = def_global_configs {
323            let mut new_configs = AgentConfigs::default();
324            for (key, config_entry) in def_global_configs.iter() {
325                new_configs.set(key.clone(), config_entry.value.clone());
326            }
327            self.set_global_configs(def_name, new_configs);
328        }
329    }
330
331    /// Get all agent definitions.
332    pub fn get_agent_definitions(&self) -> AgentDefinitions {
333        let defs = self.defs.lock().unwrap();
334        defs.clone()
335    }
336
337    /// Get an agent definition by name.
338    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
339        let defs = self.defs.lock().unwrap();
340        defs.get(def_name).cloned()
341    }
342
343    /// Get the config specs of an agent definition by name.
344    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
345        let defs = self.defs.lock().unwrap();
346        let Some(def) = defs.get(def_name) else {
347            return None;
348        };
349        def.configs.clone()
350    }
351
352    /// Get the agent spec by id.
353    pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
354        let agent = {
355            let agents = self.agents.lock().unwrap();
356            let Some(agent) = agents.get(agent_id) else {
357                return None;
358            };
359            agent.clone()
360        };
361        let agent = agent.lock().await;
362        Some(agent.spec().clone())
363    }
364
365    /// Update the agent spec by id.
366    pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
367        let agent = {
368            let agents = self.agents.lock().unwrap();
369            let Some(agent) = agents.get(agent_id) else {
370                return Err(AgentError::AgentNotFound(agent_id.to_string()));
371            };
372            agent.clone()
373        };
374        let mut agent = agent.lock().await;
375        agent.update_spec(value)?;
376        Ok(())
377    }
378
379    /// Create a new agent spec from the given agent definition name.
380    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
381        let def = self
382            .get_agent_definition(def_name)
383            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
384        Ok(def.to_spec())
385    }
386
387    /// Add an agent to the specified preset, and returns the id of the newly added agent.
388    pub async fn add_agent(
389        &self,
390        preset_id: String,
391        mut spec: AgentSpec,
392    ) -> Result<String, AgentError> {
393        let preset = self
394            .get_preset(&preset_id)
395            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
396
397        let id = new_id();
398        spec.id = id.clone();
399        self.add_agent_internal(preset_id, spec.clone())?;
400
401        let mut preset = preset.lock().await;
402        preset.add_agent(spec.clone());
403
404        Ok(id)
405    }
406
407    fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
408        let mut agents = self.agents.lock().unwrap();
409        if agents.contains_key(&spec.id) {
410            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
411        }
412        let spec_id = spec.id.clone();
413        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
414        agent.set_preset_id(preset_id);
415        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
416        Ok(())
417    }
418
419    /// Get the agent by id.
420    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
421        let agents = self.agents.lock().unwrap();
422        agents.get(agent_id).cloned()
423    }
424
425    /// Add a connection to the specified preset.
426    pub async fn add_connection(
427        &self,
428        preset_id: &str,
429        connection: ConnectionSpec,
430    ) -> Result<(), AgentError> {
431        // check if the source and target agents exist
432        {
433            let agents = self.agents.lock().unwrap();
434            if !agents.contains_key(&connection.source) {
435                return Err(AgentError::AgentNotFound(connection.source.to_string()));
436            }
437            if !agents.contains_key(&connection.target) {
438                return Err(AgentError::AgentNotFound(connection.target.to_string()));
439            }
440        }
441
442        // check if handles are valid
443        if connection.source_handle.is_empty() {
444            return Err(AgentError::EmptySourceHandle);
445        }
446        if connection.target_handle.is_empty() {
447            return Err(AgentError::EmptyTargetHandle);
448        }
449
450        let preset = self
451            .get_preset(preset_id)
452            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
453        let mut preset = preset.lock().await;
454        preset.add_connection(connection.clone());
455        self.add_connection_internal(connection)?;
456        Ok(())
457    }
458
459    fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
460        let mut connections = self.connections.lock().unwrap();
461        if let Some(targets) = connections.get_mut(&connection.source) {
462            if targets
463                .iter()
464                .any(|(target, source_handle, target_handle)| {
465                    *target == connection.target
466                        && *source_handle == connection.source_handle
467                        && *target_handle == connection.target_handle
468                })
469            {
470                return Err(AgentError::ConnectionAlreadyExists);
471            }
472            targets.push((
473                connection.target,
474                connection.source_handle,
475                connection.target_handle,
476            ));
477        } else {
478            connections.insert(
479                connection.source,
480                vec![(
481                    connection.target,
482                    connection.source_handle,
483                    connection.target_handle,
484                )],
485            );
486        }
487        Ok(())
488    }
489
490    /// Add agents and connections to the specified preset.
491    ///
492    /// The ids of the given agents and connections are changed to new unique ids.
493    /// The agents are not started automatically, even if the preset is running.
494    pub async fn add_agents_and_connections(
495        &self,
496        preset_id: &str,
497        agents: &Vec<AgentSpec>,
498        connections: &Vec<ConnectionSpec>,
499    ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
500        let (agents, connections) = update_ids(agents, connections);
501
502        let preset = self
503            .get_preset(preset_id)
504            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
505        let mut preset = preset.lock().await;
506
507        for agent in &agents {
508            self.add_agent_internal(preset_id.to_string(), agent.clone())?;
509            preset.add_agent(agent.clone());
510        }
511
512        for connection in &connections {
513            self.add_connection_internal(connection.clone())?;
514            preset.add_connection(connection.clone());
515        }
516
517        Ok((agents, connections))
518    }
519
520    /// Remove an agent from the specified preset.
521    ///
522    /// If the agent is running, it will be stopped first.
523    pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
524        {
525            let preset = self
526                .get_preset(preset_id)
527                .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
528            let mut preset = preset.lock().await;
529            preset.remove_agent(agent_id);
530        }
531        if let Err(e) = self.remove_agent_internal(agent_id).await {
532            return Err(e);
533        }
534        Ok(())
535    }
536
537    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
538        self.stop_agent(agent_id).await?;
539
540        // remove from connections
541        {
542            let mut connections = self.connections.lock().unwrap();
543            let mut sources_to_remove = Vec::new();
544            for (source, targets) in connections.iter_mut() {
545                targets.retain(|(target, _, _)| target != agent_id);
546                if targets.is_empty() {
547                    sources_to_remove.push(source.clone());
548                }
549            }
550            for source in sources_to_remove {
551                connections.swap_remove(&source);
552            }
553            connections.swap_remove(agent_id);
554        }
555
556        // remove from agents
557        {
558            let mut agents = self.agents.lock().unwrap();
559            agents.swap_remove(agent_id);
560        }
561
562        Ok(())
563    }
564
565    /// Remove a connection from the specified preset.
566    pub async fn remove_connection(
567        &self,
568        preset_id: &str,
569        connection: &ConnectionSpec,
570    ) -> Result<(), AgentError> {
571        let preset = self
572            .get_preset(preset_id)
573            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
574        let mut preset = preset.lock().await;
575        let Some(connection) = preset.remove_connection(connection) else {
576            return Err(AgentError::ConnectionNotFound(format!(
577                "{}:{}->{}:{}",
578                connection.source,
579                connection.source_handle,
580                connection.target,
581                connection.target_handle
582            )));
583        };
584        self.remove_connection_internal(&connection);
585        Ok(())
586    }
587
588    fn remove_connection_internal(&self, connection: &ConnectionSpec) {
589        let mut connections = self.connections.lock().unwrap();
590        if let Some(targets) = connections.get_mut(&connection.source) {
591            targets.retain(|(target, source_handle, target_handle)| {
592                *target != connection.target
593                    || *source_handle != connection.source_handle
594                    || *target_handle != connection.target_handle
595            });
596            if targets.is_empty() {
597                connections.swap_remove(&connection.source);
598            }
599        }
600    }
601
602    /// Start an agent by id.
603    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
604        let agent = {
605            let agents = self.agents.lock().unwrap();
606            let Some(a) = agents.get(agent_id) else {
607                return Err(AgentError::AgentNotFound(agent_id.to_string()));
608            };
609            a.clone()
610        };
611        let def_name = {
612            let agent = agent.lock().await;
613            agent.def_name().to_string()
614        };
615        let uses_native_thread = {
616            let defs = self.defs.lock().unwrap();
617            let Some(def) = defs.get(&def_name) else {
618                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
619            };
620            def.native_thread
621        };
622        let agent_status = {
623            // This will not block since the agent is not started yet.
624            let agent = agent.lock().await;
625            agent.status().clone()
626        };
627        if agent_status == AgentStatus::Init {
628            log::info!("Starting agent {}", agent_id);
629
630            let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
631
632            {
633                let mut agent_txs = self.agent_txs.lock().unwrap();
634                agent_txs.insert(agent_id.to_string(), tx.clone());
635            };
636
637            let agent_clone = agent.clone();
638            let agent_id_clone = agent_id.to_string();
639
640            let agent_loop = async move {
641                {
642                    let mut agent_guard = agent_clone.lock().await;
643                    if let Err(e) = agent_guard.start().await {
644                        log::error!("Failed to start agent {}: {}", agent_id_clone, e);
645                        return;
646                    }
647                }
648
649                while let Some(message) = rx.recv().await {
650                    match message {
651                        AgentMessage::Input { ctx, port, value } => {
652                            agent_clone
653                                .lock()
654                                .await
655                                .process(ctx, port, value)
656                                .await
657                                .unwrap_or_else(|e| {
658                                    log::error!("Process Error {}: {}", agent_id_clone, e);
659                                });
660                        }
661                        AgentMessage::Config { key, value } => {
662                            agent_clone
663                                .lock()
664                                .await
665                                .set_config(key, value)
666                                .unwrap_or_else(|e| {
667                                    log::error!("Config Error {}: {}", agent_id_clone, e);
668                                });
669                        }
670                        AgentMessage::Configs { configs } => {
671                            agent_clone
672                                .lock()
673                                .await
674                                .set_configs(configs)
675                                .unwrap_or_else(|e| {
676                                    log::error!("Configs Error {}: {}", agent_id_clone, e);
677                                });
678                        }
679                        AgentMessage::Stop => {
680                            rx.close();
681                            break;
682                        }
683                    }
684                }
685            };
686
687            if uses_native_thread {
688                std::thread::spawn(move || {
689                    let rt = tokio::runtime::Builder::new_current_thread()
690                        .enable_all()
691                        .build()
692                        .unwrap();
693                    rt.block_on(agent_loop);
694                });
695            } else {
696                tokio::spawn(agent_loop);
697            }
698        }
699        Ok(())
700    }
701
702    /// Stop an agent by id.
703    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
704        {
705            // remove the sender first to prevent new messages being sent
706            let mut agent_txs = self.agent_txs.lock().unwrap();
707            if let Some(tx) = agent_txs.swap_remove(agent_id) {
708                if let Err(e) = tx.try_send(AgentMessage::Stop) {
709                    log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
710                }
711            }
712        }
713
714        let agent = {
715            let agents = self.agents.lock().unwrap();
716            let Some(a) = agents.get(agent_id) else {
717                return Err(AgentError::AgentNotFound(agent_id.to_string()));
718            };
719            a.clone()
720        };
721        let mut agent_guard = agent.lock().await;
722        if *agent_guard.status() == AgentStatus::Start {
723            log::info!("Stopping agent {}", agent_id);
724            agent_guard.stop().await?;
725        }
726
727        Ok(())
728    }
729
730    /// Set configs for an agent by id.
731    pub async fn set_agent_configs(
732        &self,
733        agent_id: String,
734        configs: AgentConfigs,
735    ) -> Result<(), AgentError> {
736        let tx = {
737            let agent_txs = self.agent_txs.lock().unwrap();
738            agent_txs.get(&agent_id).cloned()
739        };
740
741        let Some(tx) = tx else {
742            // The agent is not running. We can set the configs directly.
743            let agent = {
744                let agents = self.agents.lock().unwrap();
745                let Some(a) = agents.get(&agent_id) else {
746                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
747                };
748                a.clone()
749            };
750            agent.lock().await.set_configs(configs.clone())?;
751            return Ok(());
752        };
753        let message = AgentMessage::Configs { configs };
754        tx.send(message).await.map_err(|_| {
755            AgentError::SendMessageFailed("Failed to send config message".to_string())
756        })?;
757        Ok(())
758    }
759
760    /// Get global configs for the agent definition by name.
761    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
762        let global_configs_map = self.global_configs_map.lock().unwrap();
763        global_configs_map.get(def_name).cloned()
764    }
765
766    /// Set global configs for the agent definition by name.
767    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
768        let mut global_configs_map = self.global_configs_map.lock().unwrap();
769
770        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
771            global_configs_map.insert(def_name, configs);
772            return;
773        };
774
775        for (key, value) in configs {
776            existing_configs.set(key, value);
777        }
778    }
779
780    /// Get the global configs map.
781    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
782        let global_configs_map = self.global_configs_map.lock().unwrap();
783        global_configs_map.clone()
784    }
785
786    /// Set the global configs map.
787    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
788        for (agent_name, new_configs) in new_configs_map {
789            self.set_global_configs(agent_name, new_configs);
790        }
791    }
792
793    /// Send input to an agent.
794    pub(crate) async fn agent_input(
795        &self,
796        agent_id: String,
797        ctx: AgentContext,
798        port: String,
799        value: AgentValue,
800    ) -> Result<(), AgentError> {
801        let message = if port.starts_with("config:") {
802            let config_key = port[7..].to_string();
803            AgentMessage::Config {
804                key: config_key,
805                value,
806            }
807        } else {
808            AgentMessage::Input {
809                ctx,
810                port: port.clone(),
811                value,
812            }
813        };
814
815        let tx = {
816            let agent_txs = self.agent_txs.lock().unwrap();
817            agent_txs.get(&agent_id).cloned()
818        };
819
820        let Some(tx) = tx else {
821            // The agent is not running. If it's a config message, we can set it directly.
822            let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
823                let agents = self.agents.lock().unwrap();
824                let Some(a) = agents.get(&agent_id) else {
825                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
826                };
827                a.clone()
828            };
829            if let AgentMessage::Config { key, value } = message {
830                agent.lock().await.set_config(key, value)?;
831            }
832            return Ok(());
833        };
834        tx.send(message).await.map_err(|_| {
835            AgentError::SendMessageFailed("Failed to send input message".to_string())
836        })?;
837
838        self.emit_agent_input(agent_id.to_string(), port);
839
840        Ok(())
841    }
842
843    /// Send output from an agent. (Async version)
844    pub async fn send_agent_out(
845        &self,
846        agent_id: String,
847        ctx: AgentContext,
848        port: String,
849        value: AgentValue,
850    ) -> Result<(), AgentError> {
851        message::send_agent_out(self, agent_id, ctx, port, value).await
852    }
853
854    /// Send output from an agent.
855    pub fn try_send_agent_out(
856        &self,
857        agent_id: String,
858        ctx: AgentContext,
859        port: String,
860        value: AgentValue,
861    ) -> Result<(), AgentError> {
862        message::try_send_agent_out(self, agent_id, ctx, port, value)
863    }
864
865    /// Write a value to the board.
866    pub async fn write_board_value(
867        &self,
868        name: String,
869        value: AgentValue,
870    ) -> Result<(), AgentError> {
871        self.send_board_out(name, AgentContext::new(), value).await
872    }
873
874    /// Write a value to the variable board.
875    pub async fn write_var_value(
876        &self,
877        preset_id: &str,
878        name: &str,
879        value: AgentValue,
880    ) -> Result<(), AgentError> {
881        let var_name = format!("%{}/{}", preset_id, name);
882        self.send_board_out(var_name, AgentContext::new(), value)
883            .await
884    }
885
886    pub(crate) async fn send_board_out(
887        &self,
888        name: String,
889        ctx: AgentContext,
890        value: AgentValue,
891    ) -> Result<(), AgentError> {
892        message::send_board_out(self, name, ctx, value).await
893    }
894
895    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
896        // TODO: settings for the channel size
897        let (tx, mut rx) = mpsc::channel(4096);
898        {
899            let mut tx_lock = self.tx.lock().unwrap();
900            *tx_lock = Some(tx);
901        }
902
903        // spawn the main loop
904        let ma = self.clone();
905        tokio::spawn(async move {
906            while let Some(message) = rx.recv().await {
907                use AgentEventMessage::*;
908
909                match message {
910                    AgentOut {
911                        agent,
912                        ctx,
913                        port,
914                        value,
915                    } => {
916                        message::agent_out(&ma, agent, ctx, port, value).await;
917                    }
918                    BoardOut { name, ctx, value } => {
919                        message::board_out(&ma, name, ctx, value).await;
920                    }
921                }
922            }
923        });
924
925        tokio::task::yield_now().await;
926
927        Ok(())
928    }
929
930    /// Subscribe to all ModularAgent events.
931    pub fn subscribe(&self) -> broadcast::Receiver<MAKEvent> {
932        self.observers.subscribe()
933    }
934
935    /// Subscribe to a specific type of `MAKEvent`.
936    ///
937    /// It takes a closure that filters and maps the events, and returns an `mpsc::UnboundedReceiver`
938    /// that will receive only the successfully mapped events.
939    pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
940    where
941        F: FnMut(MAKEvent) -> Option<T> + Send + 'static,
942        T: Send + 'static,
943    {
944        let (tx, rx) = mpsc::unbounded_channel();
945        let mut event_rx = self.subscribe();
946
947        tokio::spawn(async move {
948            loop {
949                match event_rx.recv().await {
950                    Ok(event) => {
951                        if let Some(mapped_event) = filter_map(event) {
952                            if tx.send(mapped_event).is_err() {
953                                // Receiver dropped, task can exit
954                                break;
955                            }
956                        }
957                    }
958                    Err(RecvError::Lagged(n)) => {
959                        log::warn!("Event subscriber lagged by {} events", n);
960                    }
961                    Err(RecvError::Closed) => {
962                        // Sender dropped, task can exit
963                        break;
964                    }
965                }
966            }
967        });
968        rx
969    }
970
971    pub(crate) fn emit_agent_config_updated(
972        &self,
973        agent_id: String,
974        key: String,
975        value: AgentValue,
976    ) {
977        self.notify_observers(MAKEvent::AgentConfigUpdated(agent_id, key, value));
978    }
979
980    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
981        self.notify_observers(MAKEvent::AgentError(agent_id, message));
982    }
983
984    pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
985        self.notify_observers(MAKEvent::AgentIn(agent_id, port));
986    }
987
988    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
989        self.notify_observers(MAKEvent::AgentSpecUpdated(agent_id));
990    }
991
992    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
993        // // ignore variables
994        // if name.starts_with('%') {
995        //     return;
996        // }
997        self.notify_observers(MAKEvent::Board(name, value));
998    }
999
1000    fn notify_observers(&self, event: MAKEvent) {
1001        let _ = self.observers.send(event);
1002    }
1003}
1004
1005#[derive(Clone, Debug)]
1006pub enum MAKEvent {
1007    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
1008    AgentError(String, String),                     // (agent_id, message)
1009    AgentIn(String, String),                        // (agent_id, port)
1010    AgentSpecUpdated(String),                       // (agent_id)
1011    Board(String, AgentValue),                      // (board name, value)
1012}