Skip to main content

modular_agent_kit/
mak.rs

1#[cfg(feature = "file")]
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use serde_json::Value;
6use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
7
8use crate::FnvIndexMap;
9use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
10use crate::config::{AgentConfigs, AgentConfigsMap};
11use crate::context::AgentContext;
12use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
13use crate::error::AgentError;
14use crate::id::{new_id, update_ids};
15use crate::message::{self, AgentEventMessage};
16use crate::preset::{Preset, PresetInfo};
17use crate::registry;
18use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
19use crate::value::AgentValue;
20
21const MESSAGE_LIMIT: usize = 1024;
22const EVENT_CHANNEL_CAPACITY: usize = 256;
23
24#[derive(Clone)]
25pub struct MAK {
26    // agent id -> agent
27    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
28
29    // agent id -> sender
30    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
31
32    // board name -> [board out agent id]
33    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
34
35    // board name -> value
36    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
37
38    // source agent id -> [target agent id / source handle / target handle]
39    pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
40
41    // agent def name -> agent definition
42    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
43
44    // presets (preset id -> preset)
45    pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
46
47    // agent def name -> config
48    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
49
50    // message sender
51    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
52
53    // observers
54    pub(crate) observers: broadcast::Sender<MAKEvent>,
55}
56
57impl MAK {
58    pub fn new() -> Self {
59        let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
60        Self {
61            agents: Default::default(),
62            agent_txs: Default::default(),
63            board_out_agents: Default::default(),
64            board_value: Default::default(),
65            connections: Default::default(),
66            defs: Default::default(),
67            presets: Default::default(),
68            global_configs_map: Default::default(),
69            tx: Arc::new(Mutex::new(None)),
70            observers: tx,
71        }
72    }
73
74    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
75        self.tx
76            .lock()
77            .unwrap()
78            .clone()
79            .ok_or(AgentError::TxNotInitialized)
80    }
81
82    /// Initialize MAK.
83    pub fn init() -> Result<Self, AgentError> {
84        let mak = Self::new();
85        mak.register_agents();
86        Ok(mak)
87    }
88
89    fn register_agents(&self) {
90        registry::register_inventory_agents(self);
91    }
92
93    /// Prepare MAK to be ready.
94    pub async fn ready(&self) -> Result<(), AgentError> {
95        self.spawn_message_loop().await?;
96        Ok(())
97    }
98
99    /// Quit MAK.
100    pub fn quit(&self) {
101        let mut tx_lock = self.tx.lock().unwrap();
102        *tx_lock = None;
103    }
104
105    // Preset management
106
107    /// Create a new preset.
108    /// Returns the id of the new preset.
109    pub fn new_preset(&self) -> Result<String, AgentError> {
110        let spec = PresetSpec::default();
111        let id = self.add_preset(spec)?;
112        Ok(id)
113    }
114
115    // /// Rename an existing preset.
116    // pub fn rename_preset(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
117    //     if !is_valid_preseet_name(new_name) {
118    //         return Err(AgentError::InvalidPresetName(new_name.into()));
119    //     }
120
121    //     // check if the new name is already used
122    //     let new_name = self.unique_preset_name(new_name);
123
124    //     let mut presets = self.presets.lock().unwrap();
125
126    //     // remove the original preset
127    //     let Some(mut preset) = presets.swap_remove(id) else {
128    //         return Err(AgentError::RenamePresetFailed(id.into()));
129    //     };
130
131    //     // insert renamed preset
132    //     preset.set_name(new_name.clone());
133    //     presets.insert(preset.id().to_string(), preset);
134    //     Ok(new_name)
135    // }
136
137    // /// Generate a unique preset name by appending a number suffix if needed.
138    // pub fn unique_preset_name(&self, name: &str) -> String {
139    //     let mut new_name = name.trim().to_string();
140    //     let mut i = 2;
141    //     let presets = self.presets.lock().unwrap();
142    //     while presets.values().any(|preset| preset.name() == new_name) {
143    //         new_name = format!("{}{}", name, i);
144    //         i += 1;
145    //     }
146    //     new_name
147    // }
148
149    /// Get a preset by id.
150    pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
151        let presets = self.presets.lock().unwrap();
152        presets.get(id).cloned()
153    }
154
155    /// Add a new preset with the given name and spec, and returns the id of the new preset.
156    ///
157    /// The ids of the given spec, including agents and connections, are changed to new unique ids.
158    pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
159        let preset = Preset::new(spec);
160        let id = preset.id().to_string();
161
162        // add agents
163        for agent in &preset.spec().agents {
164            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
165                log::error!("Failed to add_agent {}: {}", agent.id, e);
166            }
167        }
168
169        // add connections
170        for connection in &preset.spec().connections {
171            self.add_connection_internal(connection.clone())
172                .unwrap_or_else(|e| {
173                    log::error!("Failed to add_connection {}: {}", connection.source, e);
174                });
175        }
176
177        // add the given preset into presets
178        let mut presets = self.presets.lock().unwrap();
179        if presets.contains_key(&id) {
180            return Err(AgentError::DuplicateId(id.into()));
181        }
182        presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
183
184        Ok(id)
185    }
186
187    /// Remove an preset by id.
188    pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
189        let preset = self
190            .get_preset(id)
191            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
192
193        let mut preset = preset.lock().await;
194        preset.stop(self).await.unwrap_or_else(|e| {
195            log::error!("Failed to stop preset {}: {}", id, e);
196        });
197
198        // Remove all agents and connections associated with the preset
199        for agent in &preset.spec().agents {
200            self.remove_agent_internal(&agent.id)
201                .await
202                .unwrap_or_else(|e| {
203                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
204                });
205        }
206        for connection in &preset.spec().connections {
207            self.remove_connection_internal(connection);
208        }
209
210        Ok(())
211    }
212
213    /// Start an preset by id.
214    pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
215        let preset = self
216            .get_preset(id)
217            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
218        let mut preset = preset.lock().await;
219        preset.start(self).await?;
220
221        Ok(())
222    }
223
224    /// Stop an preset by id.
225    pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
226        let preset = self
227            .get_preset(id)
228            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
229        let mut preset = preset.lock().await;
230        preset.stop(self).await?;
231
232        Ok(())
233    }
234
235    /// Open a preset from a file.
236    #[cfg(feature = "file")]
237    pub async fn open_preset_from_file(&self, path: &str) -> Result<String, AgentError> {
238        let json_str =
239            std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
240        let spec = PresetSpec::from_json(&json_str)?;
241        let id = self.add_preset(spec)?;
242        self.set_preset_file_name(&id, path).await?;
243        Ok(id)
244    }
245
246    /// Save a preset.
247    #[cfg(feature = "file")]
248    pub async fn save_preset(&self, id: &str) -> Result<(), AgentError> {
249        let Some(preset_spec) = self.get_preset_spec(id).await else {
250            return Err(AgentError::PresetNotFound(id.to_string()));
251        };
252        let json_str = preset_spec.to_json()?;
253        let path = self
254            .get_preset_path(id)
255            .await
256            .ok_or_else(|| AgentError::EmptyFileName)?;
257        std::fs::write(&path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
258        Ok(())
259    }
260
261    /// Save a preset to the given file name.
262    #[cfg(feature = "file")]
263    pub async fn save_preset_as(&self, id: &str, path: &str) -> Result<(), AgentError> {
264        self.set_preset_file_name(id, path).await?;
265        self.save_preset(id).await?;
266        Ok(())
267    }
268
269    /// Get the file name of a preset.
270    #[cfg(feature = "file")]
271    pub async fn get_preset_path(&self, id: &str) -> Option<PathBuf> {
272        let Some(preset) = self.get_preset(id) else {
273            return None;
274        };
275        let preset = preset.lock().await;
276        let Some(name) = preset.name() else {
277            return None;
278        };
279        let Some(dir) = preset.dir() else {
280            return None;
281        };
282        let path = std::path::Path::new(&dir).join(name);
283        Some(path)
284    }
285
286    /// Set the file name of a preset.
287    #[cfg(feature = "file")]
288    pub async fn set_preset_file_name(&self, id: &str, path: &str) -> Result<(), AgentError> {
289        let path = std::path::Path::new(path);
290        let name = path
291            .file_stem()
292            .and_then(|s| s.to_str())
293            .ok_or(AgentError::InvalidFileExtension)?;
294        let dir = path
295            .parent()
296            .and_then(|s| s.to_str())
297            .unwrap_or("")
298            .to_string();
299        let preset = {
300            let presets = self.presets.lock().unwrap();
301            let Some(preset) = presets.get(id) else {
302                return Err(AgentError::PresetNotFound(id.to_string()));
303            };
304            preset.clone()
305        };
306        let mut preset = preset.lock().await;
307        preset.set_name(name.to_string());
308        preset.set_dir(dir);
309        Ok(())
310    }
311
312    // PresetSpec
313
314    /// Get the current preset spec by id.
315    pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
316        let Some(preset) = self.get_preset(id) else {
317            return None;
318        };
319        let mut preset_spec = {
320            let preset = preset.lock().await;
321            preset.spec().clone()
322        };
323
324        // collect current agent specs in the preset
325        let mut agent_specs = Vec::new();
326        for agent in &preset_spec.agents {
327            if let Some(spec) = self.get_agent_spec(&agent.id).await {
328                agent_specs.push(spec);
329            }
330        }
331        preset_spec.agents = agent_specs;
332
333        // No need to change connections
334
335        Some(preset_spec)
336    }
337
338    /// Update the preset spec
339    pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
340        let preset = self
341            .get_preset(id)
342            .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
343        let mut preset = preset.lock().await;
344        preset.update_spec(value)?;
345        Ok(())
346    }
347
348    // PresetInfo
349
350    /// Get info of the preset by id.
351    pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
352        let Some(preset) = self.get_preset(id) else {
353            return None;
354        };
355        Some(PresetInfo::from(&*preset.lock().await))
356    }
357
358    /// Get infos of all presets.
359    pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
360        let presets = {
361            let presets = self.presets.lock().unwrap();
362            presets.values().cloned().collect::<Vec<_>>()
363        };
364        let mut preset_infos = Vec::new();
365        for preset in presets {
366            let preset_guard = preset.lock().await;
367            preset_infos.push(PresetInfo::from(&*preset_guard));
368        }
369        preset_infos
370    }
371
372    // Agents
373
374    /// Register an agent definition.
375    pub fn register_agent_definiton(&self, def: AgentDefinition) {
376        let def_name = def.name.clone();
377        let def_global_configs = def.global_configs.clone();
378
379        let mut defs = self.defs.lock().unwrap();
380        defs.insert(def.name.clone(), def);
381
382        // if there is a global config, set it
383        if let Some(def_global_configs) = def_global_configs {
384            let mut new_configs = AgentConfigs::default();
385            for (key, config_entry) in def_global_configs.iter() {
386                new_configs.set(key.clone(), config_entry.value.clone());
387            }
388            self.set_global_configs(def_name, new_configs);
389        }
390    }
391
392    /// Get all agent definitions.
393    pub fn get_agent_definitions(&self) -> AgentDefinitions {
394        let defs = self.defs.lock().unwrap();
395        defs.clone()
396    }
397
398    /// Get an agent definition by name.
399    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
400        let defs = self.defs.lock().unwrap();
401        defs.get(def_name).cloned()
402    }
403
404    /// Get the config specs of an agent definition by name.
405    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
406        let defs = self.defs.lock().unwrap();
407        let Some(def) = defs.get(def_name) else {
408            return None;
409        };
410        def.configs.clone()
411    }
412
413    /// Get the agent spec by id.
414    pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
415        let agent = {
416            let agents = self.agents.lock().unwrap();
417            let Some(agent) = agents.get(agent_id) else {
418                return None;
419            };
420            agent.clone()
421        };
422        let agent = agent.lock().await;
423        Some(agent.spec().clone())
424    }
425
426    /// Update the agent spec by id.
427    pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
428        let agent = {
429            let agents = self.agents.lock().unwrap();
430            let Some(agent) = agents.get(agent_id) else {
431                return Err(AgentError::AgentNotFound(agent_id.to_string()));
432            };
433            agent.clone()
434        };
435        let mut agent = agent.lock().await;
436        agent.update_spec(value)?;
437        Ok(())
438    }
439
440    /// Create a new agent spec from the given agent definition name.
441    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
442        let def = self
443            .get_agent_definition(def_name)
444            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
445        Ok(def.to_spec())
446    }
447
448    /// Add an agent to the specified preset, and returns the id of the newly added agent.
449    pub async fn add_agent(
450        &self,
451        preset_id: String,
452        mut spec: AgentSpec,
453    ) -> Result<String, AgentError> {
454        let preset = self
455            .get_preset(&preset_id)
456            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
457
458        let id = new_id();
459        spec.id = id.clone();
460        self.add_agent_internal(preset_id, spec.clone())?;
461
462        let mut preset = preset.lock().await;
463        preset.add_agent(spec.clone());
464
465        Ok(id)
466    }
467
468    fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
469        let mut agents = self.agents.lock().unwrap();
470        if agents.contains_key(&spec.id) {
471            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
472        }
473        let spec_id = spec.id.clone();
474        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
475        agent.set_preset_id(preset_id);
476        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
477        Ok(())
478    }
479
480    /// Get the agent by id.
481    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
482        let agents = self.agents.lock().unwrap();
483        agents.get(agent_id).cloned()
484    }
485
486    /// Add a connection to the specified preset.
487    pub async fn add_connection(
488        &self,
489        preset_id: &str,
490        connection: ConnectionSpec,
491    ) -> Result<(), AgentError> {
492        // check if the source and target agents exist
493        {
494            let agents = self.agents.lock().unwrap();
495            if !agents.contains_key(&connection.source) {
496                return Err(AgentError::AgentNotFound(connection.source.to_string()));
497            }
498            if !agents.contains_key(&connection.target) {
499                return Err(AgentError::AgentNotFound(connection.target.to_string()));
500            }
501        }
502
503        // check if handles are valid
504        if connection.source_handle.is_empty() {
505            return Err(AgentError::EmptySourceHandle);
506        }
507        if connection.target_handle.is_empty() {
508            return Err(AgentError::EmptyTargetHandle);
509        }
510
511        let preset = self
512            .get_preset(preset_id)
513            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
514        let mut preset = preset.lock().await;
515        preset.add_connection(connection.clone());
516        self.add_connection_internal(connection)?;
517        Ok(())
518    }
519
520    fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
521        let mut connections = self.connections.lock().unwrap();
522        if let Some(targets) = connections.get_mut(&connection.source) {
523            if targets
524                .iter()
525                .any(|(target, source_handle, target_handle)| {
526                    *target == connection.target
527                        && *source_handle == connection.source_handle
528                        && *target_handle == connection.target_handle
529                })
530            {
531                return Err(AgentError::ConnectionAlreadyExists);
532            }
533            targets.push((
534                connection.target,
535                connection.source_handle,
536                connection.target_handle,
537            ));
538        } else {
539            connections.insert(
540                connection.source,
541                vec![(
542                    connection.target,
543                    connection.source_handle,
544                    connection.target_handle,
545                )],
546            );
547        }
548        Ok(())
549    }
550
551    /// Add agents and connections to the specified preset.
552    ///
553    /// The ids of the given agents and connections are changed to new unique ids.
554    /// The agents are not started automatically, even if the preset is running.
555    pub async fn add_agents_and_connections(
556        &self,
557        preset_id: &str,
558        agents: &Vec<AgentSpec>,
559        connections: &Vec<ConnectionSpec>,
560    ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
561        let (agents, connections) = update_ids(agents, connections);
562
563        let preset = self
564            .get_preset(preset_id)
565            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
566        let mut preset = preset.lock().await;
567
568        for agent in &agents {
569            self.add_agent_internal(preset_id.to_string(), agent.clone())?;
570            preset.add_agent(agent.clone());
571        }
572
573        for connection in &connections {
574            self.add_connection_internal(connection.clone())?;
575            preset.add_connection(connection.clone());
576        }
577
578        Ok((agents, connections))
579    }
580
581    /// Remove an agent from the specified preset.
582    ///
583    /// If the agent is running, it will be stopped first.
584    pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
585        {
586            let preset = self
587                .get_preset(preset_id)
588                .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
589            let mut preset = preset.lock().await;
590            preset.remove_agent(agent_id);
591        }
592        if let Err(e) = self.remove_agent_internal(agent_id).await {
593            return Err(e);
594        }
595        Ok(())
596    }
597
598    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
599        self.stop_agent(agent_id).await?;
600
601        // remove from connections
602        {
603            let mut connections = self.connections.lock().unwrap();
604            let mut sources_to_remove = Vec::new();
605            for (source, targets) in connections.iter_mut() {
606                targets.retain(|(target, _, _)| target != agent_id);
607                if targets.is_empty() {
608                    sources_to_remove.push(source.clone());
609                }
610            }
611            for source in sources_to_remove {
612                connections.swap_remove(&source);
613            }
614            connections.swap_remove(agent_id);
615        }
616
617        // remove from agents
618        {
619            let mut agents = self.agents.lock().unwrap();
620            agents.swap_remove(agent_id);
621        }
622
623        Ok(())
624    }
625
626    /// Remove a connection from the specified preset.
627    pub async fn remove_connection(
628        &self,
629        preset_id: &str,
630        connection: &ConnectionSpec,
631    ) -> Result<(), AgentError> {
632        let preset = self
633            .get_preset(preset_id)
634            .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
635        let mut preset = preset.lock().await;
636        let Some(connection) = preset.remove_connection(connection) else {
637            return Err(AgentError::ConnectionNotFound(format!(
638                "{}:{}->{}:{}",
639                connection.source,
640                connection.source_handle,
641                connection.target,
642                connection.target_handle
643            )));
644        };
645        self.remove_connection_internal(&connection);
646        Ok(())
647    }
648
649    fn remove_connection_internal(&self, connection: &ConnectionSpec) {
650        let mut connections = self.connections.lock().unwrap();
651        if let Some(targets) = connections.get_mut(&connection.source) {
652            targets.retain(|(target, source_handle, target_handle)| {
653                *target != connection.target
654                    || *source_handle != connection.source_handle
655                    || *target_handle != connection.target_handle
656            });
657            if targets.is_empty() {
658                connections.swap_remove(&connection.source);
659            }
660        }
661    }
662
663    /// Start an agent by id.
664    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
665        let agent = {
666            let agents = self.agents.lock().unwrap();
667            let Some(a) = agents.get(agent_id) else {
668                return Err(AgentError::AgentNotFound(agent_id.to_string()));
669            };
670            a.clone()
671        };
672        let def_name = {
673            let agent = agent.lock().await;
674            agent.def_name().to_string()
675        };
676        let uses_native_thread = {
677            let defs = self.defs.lock().unwrap();
678            let Some(def) = defs.get(&def_name) else {
679                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
680            };
681            def.native_thread
682        };
683        let agent_status = {
684            // This will not block since the agent is not started yet.
685            let agent = agent.lock().await;
686            agent.status().clone()
687        };
688        if agent_status == AgentStatus::Init {
689            log::info!("Starting agent {}", agent_id);
690
691            let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
692
693            {
694                let mut agent_txs = self.agent_txs.lock().unwrap();
695                agent_txs.insert(agent_id.to_string(), tx.clone());
696            };
697
698            let agent_clone = agent.clone();
699            let agent_id_clone = agent_id.to_string();
700
701            let agent_loop = async move {
702                {
703                    let mut agent_guard = agent_clone.lock().await;
704                    if let Err(e) = agent_guard.start().await {
705                        log::error!("Failed to start agent {}: {}", agent_id_clone, e);
706                        return;
707                    }
708                }
709
710                while let Some(message) = rx.recv().await {
711                    match message {
712                        AgentMessage::Input { ctx, port, value } => {
713                            agent_clone
714                                .lock()
715                                .await
716                                .process(ctx, port, value)
717                                .await
718                                .unwrap_or_else(|e| {
719                                    log::error!("Process Error {}: {}", agent_id_clone, e);
720                                });
721                        }
722                        AgentMessage::Config { key, value } => {
723                            agent_clone
724                                .lock()
725                                .await
726                                .set_config(key, value)
727                                .unwrap_or_else(|e| {
728                                    log::error!("Config Error {}: {}", agent_id_clone, e);
729                                });
730                        }
731                        AgentMessage::Configs { configs } => {
732                            agent_clone
733                                .lock()
734                                .await
735                                .set_configs(configs)
736                                .unwrap_or_else(|e| {
737                                    log::error!("Configs Error {}: {}", agent_id_clone, e);
738                                });
739                        }
740                        AgentMessage::Stop => {
741                            rx.close();
742                            break;
743                        }
744                    }
745                }
746            };
747
748            if uses_native_thread {
749                std::thread::spawn(move || {
750                    let rt = tokio::runtime::Builder::new_current_thread()
751                        .enable_all()
752                        .build()
753                        .unwrap();
754                    rt.block_on(agent_loop);
755                });
756            } else {
757                tokio::spawn(agent_loop);
758            }
759        }
760        Ok(())
761    }
762
763    /// Stop an agent by id.
764    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
765        {
766            // remove the sender first to prevent new messages being sent
767            let mut agent_txs = self.agent_txs.lock().unwrap();
768            if let Some(tx) = agent_txs.swap_remove(agent_id) {
769                if let Err(e) = tx.try_send(AgentMessage::Stop) {
770                    log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
771                }
772            }
773        }
774
775        let agent = {
776            let agents = self.agents.lock().unwrap();
777            let Some(a) = agents.get(agent_id) else {
778                return Err(AgentError::AgentNotFound(agent_id.to_string()));
779            };
780            a.clone()
781        };
782        let mut agent_guard = agent.lock().await;
783        if *agent_guard.status() == AgentStatus::Start {
784            log::info!("Stopping agent {}", agent_id);
785            agent_guard.stop().await?;
786        }
787
788        Ok(())
789    }
790
791    /// Set configs for an agent by id.
792    pub async fn set_agent_configs(
793        &self,
794        agent_id: String,
795        configs: AgentConfigs,
796    ) -> Result<(), AgentError> {
797        let tx = {
798            let agent_txs = self.agent_txs.lock().unwrap();
799            agent_txs.get(&agent_id).cloned()
800        };
801
802        let Some(tx) = tx else {
803            // The agent is not running. We can set the configs directly.
804            let agent = {
805                let agents = self.agents.lock().unwrap();
806                let Some(a) = agents.get(&agent_id) else {
807                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
808                };
809                a.clone()
810            };
811            agent.lock().await.set_configs(configs.clone())?;
812            return Ok(());
813        };
814        let message = AgentMessage::Configs { configs };
815        tx.send(message).await.map_err(|_| {
816            AgentError::SendMessageFailed("Failed to send config message".to_string())
817        })?;
818        Ok(())
819    }
820
821    /// Get global configs for the agent definition by name.
822    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
823        let global_configs_map = self.global_configs_map.lock().unwrap();
824        global_configs_map.get(def_name).cloned()
825    }
826
827    /// Set global configs for the agent definition by name.
828    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
829        let mut global_configs_map = self.global_configs_map.lock().unwrap();
830
831        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
832            global_configs_map.insert(def_name, configs);
833            return;
834        };
835
836        for (key, value) in configs {
837            existing_configs.set(key, value);
838        }
839    }
840
841    /// Get the global configs map.
842    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
843        let global_configs_map = self.global_configs_map.lock().unwrap();
844        global_configs_map.clone()
845    }
846
847    /// Set the global configs map.
848    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
849        for (agent_name, new_configs) in new_configs_map {
850            self.set_global_configs(agent_name, new_configs);
851        }
852    }
853
854    /// Send input to an agent.
855    pub(crate) async fn agent_input(
856        &self,
857        agent_id: String,
858        ctx: AgentContext,
859        port: String,
860        value: AgentValue,
861    ) -> Result<(), AgentError> {
862        let message = if port.starts_with("config:") {
863            let config_key = port[7..].to_string();
864            AgentMessage::Config {
865                key: config_key,
866                value,
867            }
868        } else {
869            AgentMessage::Input {
870                ctx,
871                port: port.clone(),
872                value,
873            }
874        };
875
876        let tx = {
877            let agent_txs = self.agent_txs.lock().unwrap();
878            agent_txs.get(&agent_id).cloned()
879        };
880
881        let Some(tx) = tx else {
882            // The agent is not running. If it's a config message, we can set it directly.
883            let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
884                let agents = self.agents.lock().unwrap();
885                let Some(a) = agents.get(&agent_id) else {
886                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
887                };
888                a.clone()
889            };
890            if let AgentMessage::Config { key, value } = message {
891                agent.lock().await.set_config(key, value)?;
892            }
893            return Ok(());
894        };
895        tx.send(message).await.map_err(|_| {
896            AgentError::SendMessageFailed("Failed to send input message".to_string())
897        })?;
898
899        self.emit_agent_input(agent_id.to_string(), port);
900
901        Ok(())
902    }
903
904    /// Send output from an agent. (Async version)
905    pub async fn send_agent_out(
906        &self,
907        agent_id: String,
908        ctx: AgentContext,
909        port: String,
910        value: AgentValue,
911    ) -> Result<(), AgentError> {
912        message::send_agent_out(self, agent_id, ctx, port, value).await
913    }
914
915    /// Send output from an agent.
916    pub fn try_send_agent_out(
917        &self,
918        agent_id: String,
919        ctx: AgentContext,
920        port: String,
921        value: AgentValue,
922    ) -> Result<(), AgentError> {
923        message::try_send_agent_out(self, agent_id, ctx, port, value)
924    }
925
926    /// Write a value to the board.
927    pub async fn write_board_value(
928        &self,
929        name: String,
930        value: AgentValue,
931    ) -> Result<(), AgentError> {
932        self.send_board_out(name, AgentContext::new(), value).await
933    }
934
935    /// Write a value to the variable board.
936    pub async fn write_var_value(
937        &self,
938        preset_id: &str,
939        name: &str,
940        value: AgentValue,
941    ) -> Result<(), AgentError> {
942        let var_name = format!("%{}/{}", preset_id, name);
943        self.send_board_out(var_name, AgentContext::new(), value)
944            .await
945    }
946
947    pub(crate) async fn send_board_out(
948        &self,
949        name: String,
950        ctx: AgentContext,
951        value: AgentValue,
952    ) -> Result<(), AgentError> {
953        message::send_board_out(self, name, ctx, value).await
954    }
955
956    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
957        // TODO: settings for the channel size
958        let (tx, mut rx) = mpsc::channel(4096);
959        {
960            let mut tx_lock = self.tx.lock().unwrap();
961            *tx_lock = Some(tx);
962        }
963
964        // spawn the main loop
965        let mak = self.clone();
966        tokio::spawn(async move {
967            while let Some(message) = rx.recv().await {
968                use AgentEventMessage::*;
969
970                match message {
971                    AgentOut {
972                        agent,
973                        ctx,
974                        port,
975                        value,
976                    } => {
977                        message::agent_out(&mak, agent, ctx, port, value).await;
978                    }
979                    BoardOut { name, ctx, value } => {
980                        message::board_out(&mak, name, ctx, value).await;
981                    }
982                }
983            }
984        });
985
986        tokio::task::yield_now().await;
987
988        Ok(())
989    }
990
991    /// Subscribe to all MAK events.
992    pub fn subscribe(&self) -> broadcast::Receiver<MAKEvent> {
993        self.observers.subscribe()
994    }
995
996    /// Subscribe to a specific type of `MAKEvent`.
997    ///
998    /// It takes a closure that filters and maps the events, and returns an `mpsc::UnboundedReceiver`
999    /// that will receive only the successfully mapped events.
1000    pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
1001    where
1002        F: FnMut(MAKEvent) -> Option<T> + Send + 'static,
1003        T: Send + 'static,
1004    {
1005        let (tx, rx) = mpsc::unbounded_channel();
1006        let mut event_rx = self.subscribe();
1007
1008        tokio::spawn(async move {
1009            loop {
1010                match event_rx.recv().await {
1011                    Ok(event) => {
1012                        if let Some(mapped_event) = filter_map(event) {
1013                            if tx.send(mapped_event).is_err() {
1014                                // Receiver dropped, task can exit
1015                                break;
1016                            }
1017                        }
1018                    }
1019                    Err(RecvError::Lagged(n)) => {
1020                        log::warn!("Event subscriber lagged by {} events", n);
1021                    }
1022                    Err(RecvError::Closed) => {
1023                        // Sender dropped, task can exit
1024                        break;
1025                    }
1026                }
1027            }
1028        });
1029        rx
1030    }
1031
1032    pub(crate) fn emit_agent_config_updated(
1033        &self,
1034        agent_id: String,
1035        key: String,
1036        value: AgentValue,
1037    ) {
1038        self.notify_observers(MAKEvent::AgentConfigUpdated(agent_id, key, value));
1039    }
1040
1041    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
1042        self.notify_observers(MAKEvent::AgentError(agent_id, message));
1043    }
1044
1045    pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
1046        self.notify_observers(MAKEvent::AgentIn(agent_id, port));
1047    }
1048
1049    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
1050        self.notify_observers(MAKEvent::AgentSpecUpdated(agent_id));
1051    }
1052
1053    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
1054        // // ignore variables
1055        // if name.starts_with('%') {
1056        //     return;
1057        // }
1058        self.notify_observers(MAKEvent::Board(name, value));
1059    }
1060
1061    fn notify_observers(&self, event: MAKEvent) {
1062        let _ = self.observers.send(event);
1063    }
1064}
1065
1066#[derive(Clone, Debug)]
1067pub enum MAKEvent {
1068    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
1069    AgentError(String, String),                     // (agent_id, message)
1070    AgentIn(String, String),                        // (agent_id, port)
1071    AgentSpecUpdated(String),                       // (agent_id)
1072    Board(String, AgentValue),                      // (board name, value)
1073}