modular_agent_kit/
mak.rs

1use std::sync::{Arc, Mutex};
2
3use serde_json::Value;
4use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::id::{new_id, update_ids};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::spec::{AgentSpec, PresetSpec, ConnectionSpec};
16use crate::preset::{Preset, PresetInfo, Presets};
17use crate::value::AgentValue;
18
19const MESSAGE_LIMIT: usize = 1024;
20const EVENT_CHANNEL_CAPACITY: usize = 256;
21
22#[derive(Clone)]
23pub struct MAK {
24    // agent id -> agent
25    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
26
27    // agent id -> sender
28    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
29
30    // board name -> [board out agent id]
31    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
32
33    // board name -> value
34    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
35
36    // source agent id -> [target agent id / source handle / target handle]
37    pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
38
39    // agent def name -> agent definition
40    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
41
42    // presets (preset id -> preset)
43    pub(crate) presets: Arc<Mutex<Presets>>,
44
45    // agent def name -> config
46    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
47
48    // message sender
49    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
50
51    // observers
52    pub(crate) observers: broadcast::Sender<MAKEvent>,
53}
54
55impl MAK {
56    pub fn new() -> Self {
57        let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
58        Self {
59            agents: Default::default(),
60            agent_txs: Default::default(),
61            board_out_agents: Default::default(),
62            board_value: Default::default(),
63            connections: Default::default(),
64            defs: Default::default(),
65            presets: Default::default(),
66            global_configs_map: Default::default(),
67            tx: Arc::new(Mutex::new(None)),
68            observers: tx,
69        }
70    }
71
72    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
73        self.tx
74            .lock()
75            .unwrap()
76            .clone()
77            .ok_or(AgentError::TxNotInitialized)
78    }
79
80    /// Initialize MAK.
81    pub fn init() -> Result<Self, AgentError> {
82        let mak = Self::new();
83        mak.register_agents();
84        Ok(mak)
85    }
86
87    fn register_agents(&self) {
88        registry::register_inventory_agents(self);
89    }
90
91    /// Prepare MAK to be ready.
92    pub async fn ready(&self) -> Result<(), AgentError> {
93        self.spawn_message_loop().await?;
94        Ok(())
95    }
96
97    /// Quit MAK.
98    pub fn quit(&self) {
99        let mut tx_lock = self.tx.lock().unwrap();
100        *tx_lock = None;
101    }
102
103    /// Register an agent definition.
104    pub fn register_agent_definiton(&self, def: AgentDefinition) {
105        let def_name = def.name.clone();
106        let def_global_configs = def.global_configs.clone();
107
108        let mut defs = self.defs.lock().unwrap();
109        defs.insert(def.name.clone(), def);
110
111        // if there is a global config, set it
112        if let Some(def_global_configs) = def_global_configs {
113            let mut new_configs = AgentConfigs::default();
114            for (key, config_entry) in def_global_configs.iter() {
115                new_configs.set(key.clone(), config_entry.value.clone());
116            }
117            self.set_global_configs(def_name, new_configs);
118        }
119    }
120
121    /// Get all agent definitions.
122    pub fn get_agent_definitions(&self) -> AgentDefinitions {
123        let defs = self.defs.lock().unwrap();
124        defs.clone()
125    }
126
127    /// Get an agent definition by name.
128    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
129        let defs = self.defs.lock().unwrap();
130        defs.get(def_name).cloned()
131    }
132
133    /// Get the config specs of an agent definition by name.
134    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
135        let defs = self.defs.lock().unwrap();
136        let Some(def) = defs.get(def_name) else {
137            return None;
138        };
139        def.configs.clone()
140    }
141
142    /// Get the agent spec by id.
143    pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
144        let agent = {
145            let agents = self.agents.lock().unwrap();
146            let Some(agent) = agents.get(agent_id) else {
147                return None;
148            };
149            agent.clone()
150        };
151        let agent = agent.lock().await;
152        Some(agent.spec().clone())
153    }
154
155    /// Update the agent spec by id.
156    pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
157        let agent = {
158            let agents = self.agents.lock().unwrap();
159            let Some(agent) = agents.get(agent_id) else {
160                return Err(AgentError::AgentNotFound(agent_id.to_string()));
161            };
162            agent.clone()
163        };
164        let mut agent = agent.lock().await;
165        agent.update_spec(value)?;
166        Ok(())
167    }
168
169    // presets
170
171    /// Get info of the preset by id.
172    pub fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
173        let presets = self.presets.lock().unwrap();
174        presets.get(id).map(|preset| preset.into())
175    }
176
177    /// Get infos of all presets.
178    pub fn get_preset_infos(&self) -> Vec<PresetInfo> {
179        let presets = self.presets.lock().unwrap();
180        presets.values().map(|p| p.into()).collect()
181    }
182
183    /// Get the preset spec by id.
184    pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
185        let preset_spec = {
186            let presets = self.presets.lock().unwrap();
187            presets.get(id).map(|preset| preset.spec().clone())
188        };
189        let Some(mut preset_spec) = preset_spec else {
190            return None;
191        };
192
193        // collect agent specs in the preset
194        let mut agent_specs = Vec::new();
195        for agent in &preset_spec.agents {
196            if let Some(spec) = self.get_agent_spec(&agent.id).await {
197                agent_specs.push(spec);
198            }
199        }
200        preset_spec.agents = agent_specs;
201
202        // No need to change connections
203
204        Some(preset_spec)
205    }
206
207    /// Update the preset spec
208    pub fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
209        let mut presets = self.presets.lock().unwrap();
210        let Some(preset) = presets.get_mut(id) else {
211            return Err(AgentError::PresetNotFound(id.to_string()));
212        };
213        preset.update_spec(value)?;
214        Ok(())
215    }
216
217    /// Create a new preset with the given name.
218    /// If the name already exists, a unique name will be generated by appending a number suffix.
219    /// Returns the id of the new preset.
220    pub fn new_preset(&self, name: &str) -> Result<String, AgentError> {
221        if !is_valid_preseet_name(name) {
222            return Err(AgentError::InvalidPresetName(name.into()));
223        }
224        let new_name = self.unique_preset_name(name);
225        let spec = PresetSpec::default();
226        let id = self.add_preset(new_name, spec)?;
227        Ok(id)
228    }
229
230    /// Rename an existing preset.
231    pub fn rename_preset(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
232        if !is_valid_preseet_name(new_name) {
233            return Err(AgentError::InvalidPresetName(new_name.into()));
234        }
235
236        // check if the new name is already used
237        let new_name = self.unique_preset_name(new_name);
238
239        let mut presets = self.presets.lock().unwrap();
240
241        // remove the original preset
242        let Some(mut preset) = presets.swap_remove(id) else {
243            return Err(AgentError::RenamePresetFailed(id.into()));
244        };
245
246        // insert renamed preset
247        preset.set_name(new_name.clone());
248        presets.insert(preset.id().to_string(), preset);
249        Ok(new_name)
250    }
251
252    /// Generate a unique preset name by appending a number suffix if needed.
253    pub fn unique_preset_name(&self, name: &str) -> String {
254        let mut new_name = name.trim().to_string();
255        let mut i = 2;
256        let presets = self.presets.lock().unwrap();
257        while presets.values().any(|preset| preset.name() == new_name) {
258            new_name = format!("{}{}", name, i);
259            i += 1;
260        }
261        new_name
262    }
263
264    /// Add a new preset with the given name and spec, and returns the id of the new preset.
265    ///
266    /// The ids of the given spec, including agents and connections, are changed to new unique ids.
267    pub fn add_preset(
268        &self,
269        name: String,
270        spec: PresetSpec,
271    ) -> Result<String, AgentError> {
272        let preset = Preset::new(name, spec);
273        let id = preset.id().to_string();
274
275        // add agents
276        for agent in &preset.spec().agents {
277            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
278                log::error!("Failed to add_agent {}: {}", agent.id, e);
279            }
280        }
281
282        // add connections
283        for connection in &preset.spec().connections {
284            self.add_connection_internal(connection.clone())
285                .unwrap_or_else(|e| {
286                    log::error!("Failed to add_connection {}: {}", connection.source, e);
287                });
288        }
289
290        // add the given preset into presets
291        let mut presets = self.presets.lock().unwrap();
292        if presets.contains_key(&id) {
293            return Err(AgentError::DuplicateId(id.into()));
294        }
295        presets.insert(id.to_string(), preset);
296
297        Ok(id)
298    }
299
300    /// Remove an preset by id.
301    pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
302        let mut preset = {
303            let mut presets = self.presets.lock().unwrap();
304            let Some(preset) = presets.swap_remove(id) else {
305                return Err(AgentError::PresetNotFound(id.to_string()));
306            };
307            preset
308        };
309
310        preset.stop(self).await.unwrap_or_else(|e| {
311            log::error!("Failed to stop preset {}: {}", id, e);
312        });
313
314        // Remove all agents and connections associated with the preset
315        for agent in &preset.spec().agents {
316            self.remove_agent_internal(&agent.id)
317                .await
318                .unwrap_or_else(|e| {
319                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
320                });
321        }
322        for connection in &preset.spec().connections {
323            self.remove_connection_internal(connection);
324        }
325
326        Ok(())
327    }
328
329    /// Start an preset by id.
330    pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
331        let mut preset = {
332            let mut presets = self.presets.lock().unwrap();
333            let Some(preset) = presets.swap_remove(id) else {
334                return Err(AgentError::PresetNotFound(id.to_string()));
335            };
336            preset
337        };
338
339        preset.start(self).await?;
340
341        let mut presets = self.presets.lock().unwrap();
342        presets.insert(id.to_string(), preset);
343        Ok(())
344    }
345
346    /// Stop an preset by id.
347    pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
348        let mut preset = {
349            let mut presets = self.presets.lock().unwrap();
350            let Some(preset) = presets.swap_remove(id) else {
351                return Err(AgentError::PresetNotFound(id.to_string()));
352            };
353            preset
354        };
355
356        preset.stop(self).await?;
357
358        let mut presets = self.presets.lock().unwrap();
359        presets.insert(id.to_string(), preset);
360        Ok(())
361    }
362
363    // Agents
364
365    /// Create a new agent spec from the given agent definition name.
366    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
367        let def = self
368            .get_agent_definition(def_name)
369            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
370        Ok(def.to_spec())
371    }
372
373    /// Add an agent to the specified preset, and returns the id of the newly added agent.
374    pub fn add_agent(&self, preset_id: String, mut spec: AgentSpec) -> Result<String, AgentError> {
375        let mut presets = self.presets.lock().unwrap();
376        let Some(preset) = presets.get_mut(&preset_id) else {
377            return Err(AgentError::PresetNotFound(preset_id.to_string()));
378        };
379        let id = new_id();
380        spec.id = id.clone();
381        self.add_agent_internal(preset_id, spec.clone())?;
382        preset.add_agent(spec.clone());
383        Ok(id)
384    }
385
386    fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
387        let mut agents = self.agents.lock().unwrap();
388        if agents.contains_key(&spec.id) {
389            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
390        }
391        let spec_id = spec.id.clone();
392        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
393        agent.set_preset_id(preset_id);
394        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
395        Ok(())
396    }
397
398    /// Get the agent by id.
399    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
400        let agents = self.agents.lock().unwrap();
401        agents.get(agent_id).cloned()
402    }
403
404    /// Add a connection to the specified preset.
405    pub fn add_connection(&self, preset_id: &str, connection: ConnectionSpec) -> Result<(), AgentError> {
406        // check if the source and target agents exist
407        {
408            let agents = self.agents.lock().unwrap();
409            if !agents.contains_key(&connection.source) {
410                return Err(AgentError::AgentNotFound(connection.source.to_string()));
411            }
412            if !agents.contains_key(&connection.target) {
413                return Err(AgentError::AgentNotFound(connection.target.to_string()));
414            }
415        }
416
417        // check if handles are valid
418        if connection.source_handle.is_empty() {
419            return Err(AgentError::EmptySourceHandle);
420        }
421        if connection.target_handle.is_empty() {
422            return Err(AgentError::EmptyTargetHandle);
423        }
424
425        let mut presets = self.presets.lock().unwrap();
426        let Some(preset) = presets.get_mut(preset_id) else {
427            return Err(AgentError::PresetNotFound(preset_id.to_string()));
428        };
429        preset.add_connection(connection.clone());
430        self.add_connection_internal(connection)?;
431        Ok(())
432    }
433
434    fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
435        let mut connections = self.connections.lock().unwrap();
436        if let Some(targets) = connections.get_mut(&connection.source) {
437            if targets
438                .iter()
439                .any(|(target, source_handle, target_handle)| {
440                    *target == connection.target
441                        && *source_handle == connection.source_handle
442                        && *target_handle == connection.target_handle
443                })
444            {
445                return Err(AgentError::ConnectionAlreadyExists);
446            }
447            targets.push((connection.target, connection.source_handle, connection.target_handle));
448        } else {
449            connections.insert(
450                connection.source,
451                vec![(connection.target, connection.source_handle, connection.target_handle)],
452            );
453        }
454        Ok(())
455    }
456
457    /// Add agents and connections to the specified preset.
458    ///
459    /// The ids of the given agents and connections are changed to new unique ids.
460    /// The agents are not started automatically, even if the preset is running.
461    pub fn add_agents_and_connections(
462        &self,
463        preset_id: &str,
464        agents: &Vec<AgentSpec>,
465        connections: &Vec<ConnectionSpec>,
466    ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
467        let (agents, connections) = update_ids(agents, connections);
468
469        let mut presets = self.presets.lock().unwrap();
470        let Some(preset) = presets.get_mut(preset_id) else {
471            return Err(AgentError::PresetNotFound(preset_id.to_string()));
472        };
473
474        for agent in &agents {
475            self.add_agent_internal(preset_id.to_string(), agent.clone())?;
476            preset.add_agent(agent.clone());
477        }
478
479        for connection in &connections {
480            self.add_connection_internal(connection.clone())?;
481            preset.add_connection(connection.clone());
482        }
483
484        Ok((agents, connections))
485    }
486
487    /// Remove an agent from the specified preset.
488    ///
489    /// If the agent is running, it will be stopped first.
490    pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
491        {
492            let mut presets = self.presets.lock().unwrap();
493            let Some(preset) = presets.get_mut(preset_id) else {
494                return Err(AgentError::PresetNotFound(preset_id.to_string()));
495            };
496            preset.remove_agent(agent_id);
497        }
498        if let Err(e) = self.remove_agent_internal(agent_id).await {
499            return Err(e);
500        }
501        Ok(())
502    }
503
504    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
505        self.stop_agent(agent_id).await?;
506
507        // remove from connections
508        {
509            let mut connections = self.connections.lock().unwrap();
510            let mut sources_to_remove = Vec::new();
511            for (source, targets) in connections.iter_mut() {
512                targets.retain(|(target, _, _)| target != agent_id);
513                if targets.is_empty() {
514                    sources_to_remove.push(source.clone());
515                }
516            }
517            for source in sources_to_remove {
518                connections.swap_remove(&source);
519            }
520            connections.swap_remove(agent_id);
521        }
522
523        // remove from agents
524        {
525            let mut agents = self.agents.lock().unwrap();
526            agents.swap_remove(agent_id);
527        }
528
529        Ok(())
530    }
531
532    /// Remove a connection from the specified preset.
533    pub fn remove_connection(&self, preset_id: &str, connection: &ConnectionSpec) -> Result<(), AgentError> {
534        let mut preset = {
535            let mut presets = self.presets.lock().unwrap();
536            let Some(preset) = presets.swap_remove(preset_id) else {
537                return Err(AgentError::PresetNotFound(preset_id.to_string()));
538            };
539            preset
540        };
541
542        let Some(connection) = preset.remove_connection(connection) else {
543            let mut presets = self.presets.lock().unwrap();
544            presets.insert(preset_id.to_string(), preset);
545            return Err(AgentError::ConnectionNotFound(format!(
546                "{}:{}->{}:{}",
547                connection.source, connection.source_handle, connection.target, connection.target_handle
548            )));
549        };
550        let mut presets = self.presets.lock().unwrap();
551        presets.insert(preset_id.to_string(), preset);
552
553        self.remove_connection_internal(&connection);
554        Ok(())
555    }
556
557    fn remove_connection_internal(&self, connection: &ConnectionSpec) {
558        let mut connections = self.connections.lock().unwrap();
559        if let Some(targets) = connections.get_mut(&connection.source) {
560            targets.retain(|(target, source_handle, target_handle)| {
561                *target != connection.target
562                    || *source_handle != connection.source_handle
563                    || *target_handle != connection.target_handle
564            });
565            if targets.is_empty() {
566                connections.swap_remove(&connection.source);
567            }
568        }
569    }
570
571    /// Start an agent by id.
572    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
573        let agent = {
574            let agents = self.agents.lock().unwrap();
575            let Some(a) = agents.get(agent_id) else {
576                return Err(AgentError::AgentNotFound(agent_id.to_string()));
577            };
578            a.clone()
579        };
580        let def_name = {
581            let agent = agent.lock().await;
582            agent.def_name().to_string()
583        };
584        let uses_native_thread = {
585            let defs = self.defs.lock().unwrap();
586            let Some(def) = defs.get(&def_name) else {
587                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
588            };
589            def.native_thread
590        };
591        let agent_status = {
592            // This will not block since the agent is not started yet.
593            let agent = agent.lock().await;
594            agent.status().clone()
595        };
596        if agent_status == AgentStatus::Init {
597            log::info!("Starting agent {}", agent_id);
598
599            let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
600
601            {
602                let mut agent_txs = self.agent_txs.lock().unwrap();
603                agent_txs.insert(agent_id.to_string(), tx.clone());
604            };
605
606            let agent_clone = agent.clone();
607            let agent_id_clone = agent_id.to_string();
608
609            let agent_loop = async move {
610                {
611                    let mut agent_guard = agent_clone.lock().await;
612                    if let Err(e) = agent_guard.start().await {
613                        log::error!("Failed to start agent {}: {}", agent_id_clone, e);
614                        return;
615                    }
616                }
617
618                while let Some(message) = rx.recv().await {
619                    match message {
620                        AgentMessage::Input { ctx, port, value } => {
621                            agent_clone
622                                .lock()
623                                .await
624                                .process(ctx, port, value)
625                                .await
626                                .unwrap_or_else(|e| {
627                                    log::error!("Process Error {}: {}", agent_id_clone, e);
628                                });
629                        }
630                        AgentMessage::Config { key, value } => {
631                            agent_clone
632                                .lock()
633                                .await
634                                .set_config(key, value)
635                                .unwrap_or_else(|e| {
636                                    log::error!("Config Error {}: {}", agent_id_clone, e);
637                                });
638                        }
639                        AgentMessage::Configs { configs } => {
640                            agent_clone
641                                .lock()
642                                .await
643                                .set_configs(configs)
644                                .unwrap_or_else(|e| {
645                                    log::error!("Configs Error {}: {}", agent_id_clone, e);
646                                });
647                        }
648                        AgentMessage::Stop => {
649                            rx.close();
650                            break;
651                        }
652                    }
653                }
654            };
655
656            if uses_native_thread {
657                std::thread::spawn(move || {
658                    let rt = tokio::runtime::Builder::new_current_thread()
659                        .enable_all()
660                        .build()
661                        .unwrap();
662                    rt.block_on(agent_loop);
663                });
664            } else {
665                tokio::spawn(agent_loop);
666            }
667        }
668        Ok(())
669    }
670
671    /// Stop an agent by id.
672    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
673        {
674            // remove the sender first to prevent new messages being sent
675            let mut agent_txs = self.agent_txs.lock().unwrap();
676            if let Some(tx) = agent_txs.swap_remove(agent_id) {
677                if let Err(e) = tx.try_send(AgentMessage::Stop) {
678                    log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
679                }
680            }
681        }
682
683        let agent = {
684            let agents = self.agents.lock().unwrap();
685            let Some(a) = agents.get(agent_id) else {
686                return Err(AgentError::AgentNotFound(agent_id.to_string()));
687            };
688            a.clone()
689        };
690        let mut agent_guard = agent.lock().await;
691        if *agent_guard.status() == AgentStatus::Start {
692            log::info!("Stopping agent {}", agent_id);
693            agent_guard.stop().await?;
694        }
695
696        Ok(())
697    }
698
699    /// Set configs for an agent by id.
700    pub async fn set_agent_configs(
701        &self,
702        agent_id: String,
703        configs: AgentConfigs,
704    ) -> Result<(), AgentError> {
705        let tx = {
706            let agent_txs = self.agent_txs.lock().unwrap();
707            agent_txs.get(&agent_id).cloned()
708        };
709
710        let Some(tx) = tx else {
711            // The agent is not running. We can set the configs directly.
712            let agent = {
713                let agents = self.agents.lock().unwrap();
714                let Some(a) = agents.get(&agent_id) else {
715                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
716                };
717                a.clone()
718            };
719            agent.lock().await.set_configs(configs.clone())?;
720            return Ok(());
721        };
722        let message = AgentMessage::Configs { configs };
723        tx.send(message).await.map_err(|_| {
724            AgentError::SendMessageFailed("Failed to send config message".to_string())
725        })?;
726        Ok(())
727    }
728
729    /// Get global configs for the agent definition by name.
730    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
731        let global_configs_map = self.global_configs_map.lock().unwrap();
732        global_configs_map.get(def_name).cloned()
733    }
734
735    /// Set global configs for the agent definition by name.
736    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
737        let mut global_configs_map = self.global_configs_map.lock().unwrap();
738
739        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
740            global_configs_map.insert(def_name, configs);
741            return;
742        };
743
744        for (key, value) in configs {
745            existing_configs.set(key, value);
746        }
747    }
748
749    /// Get the global configs map.
750    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
751        let global_configs_map = self.global_configs_map.lock().unwrap();
752        global_configs_map.clone()
753    }
754
755    /// Set the global configs map.
756    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
757        for (agent_name, new_configs) in new_configs_map {
758            self.set_global_configs(agent_name, new_configs);
759        }
760    }
761
762    /// Send input to an agent.
763    pub(crate) async fn agent_input(
764        &self,
765        agent_id: String,
766        ctx: AgentContext,
767        port: String,
768        value: AgentValue,
769    ) -> Result<(), AgentError> {
770        let message = if port.starts_with("config:") {
771            let config_key = port[7..].to_string();
772            AgentMessage::Config {
773                key: config_key,
774                value,
775            }
776        } else {
777            AgentMessage::Input {
778                ctx,
779                port: port.clone(),
780                value,
781            }
782        };
783
784        let tx = {
785            let agent_txs = self.agent_txs.lock().unwrap();
786            agent_txs.get(&agent_id).cloned()
787        };
788
789        let Some(tx) = tx else {
790            // The agent is not running. If it's a config message, we can set it directly.
791            let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
792                let agents = self.agents.lock().unwrap();
793                let Some(a) = agents.get(&agent_id) else {
794                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
795                };
796                a.clone()
797            };
798            if let AgentMessage::Config { key, value } = message {
799                agent.lock().await.set_config(key, value)?;
800            }
801            return Ok(());
802        };
803        tx.send(message).await.map_err(|_| {
804            AgentError::SendMessageFailed("Failed to send input message".to_string())
805        })?;
806
807        self.emit_agent_input(agent_id.to_string(), port);
808
809        Ok(())
810    }
811
812    /// Send output from an agent. (Async version)
813    pub async fn send_agent_out(
814        &self,
815        agent_id: String,
816        ctx: AgentContext,
817        port: String,
818        value: AgentValue,
819    ) -> Result<(), AgentError> {
820        message::send_agent_out(self, agent_id, ctx, port, value).await
821    }
822
823    /// Send output from an agent.
824    pub fn try_send_agent_out(
825        &self,
826        agent_id: String,
827        ctx: AgentContext,
828        port: String,
829        value: AgentValue,
830    ) -> Result<(), AgentError> {
831        message::try_send_agent_out(self, agent_id, ctx, port, value)
832    }
833
834    /// Write a value to the board.
835    pub async fn write_board_value(
836        &self,
837        name: String,
838        value: AgentValue,
839    ) -> Result<(), AgentError> {
840        self.send_board_out(name, AgentContext::new(), value).await
841    }
842
843    /// Write a value to the variable board.
844    pub async fn write_var_value(
845        &self,
846        preset_id: &str,
847        name: &str,
848        value: AgentValue,
849    ) -> Result<(), AgentError> {
850        let var_name = format!("%{}/{}", preset_id, name);
851        self.send_board_out(var_name, AgentContext::new(), value)
852            .await
853    }
854
855    pub(crate) async fn send_board_out(
856        &self,
857        name: String,
858        ctx: AgentContext,
859        value: AgentValue,
860    ) -> Result<(), AgentError> {
861        message::send_board_out(self, name, ctx, value).await
862    }
863
864    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
865        // TODO: settings for the channel size
866        let (tx, mut rx) = mpsc::channel(4096);
867        {
868            let mut tx_lock = self.tx.lock().unwrap();
869            *tx_lock = Some(tx);
870        }
871
872        // spawn the main loop
873        let mak = self.clone();
874        tokio::spawn(async move {
875            while let Some(message) = rx.recv().await {
876                use AgentEventMessage::*;
877
878                match message {
879                    AgentOut {
880                        agent,
881                        ctx,
882                        port,
883                        value,
884                    } => {
885                        message::agent_out(&mak, agent, ctx, port, value).await;
886                    }
887                    BoardOut { name, ctx, value } => {
888                        message::board_out(&mak, name, ctx, value).await;
889                    }
890                }
891            }
892        });
893
894        tokio::task::yield_now().await;
895
896        Ok(())
897    }
898
899    /// Subscribe to all MAK events.
900    pub fn subscribe(&self) -> broadcast::Receiver<MAKEvent> {
901        self.observers.subscribe()
902    }
903
904    /// Subscribe to a specific type of `MAKEvent`.
905    ///
906    /// It takes a closure that filters and maps the events, and returns an `mpsc::UnboundedReceiver`
907    /// that will receive only the successfully mapped events.
908    pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
909    where
910        F: FnMut(MAKEvent) -> Option<T> + Send + 'static,
911        T: Send + 'static,
912    {
913        let (tx, rx) = mpsc::unbounded_channel();
914        let mut event_rx = self.subscribe();
915
916        tokio::spawn(async move {
917            loop {
918                match event_rx.recv().await {
919                    Ok(event) => {
920                        if let Some(mapped_event) = filter_map(event) {
921                            if tx.send(mapped_event).is_err() {
922                                // Receiver dropped, task can exit
923                                break;
924                            }
925                        }
926                    }
927                    Err(RecvError::Lagged(n)) => {
928                        log::warn!("Event subscriber lagged by {} events", n);
929                    }
930                    Err(RecvError::Closed) => {
931                        // Sender dropped, task can exit
932                        break;
933                    }
934                }
935            }
936        });
937        rx
938    }
939
940    pub(crate) fn emit_agent_config_updated(
941        &self,
942        agent_id: String,
943        key: String,
944        value: AgentValue,
945    ) {
946        self.notify_observers(MAKEvent::AgentConfigUpdated(agent_id, key, value));
947    }
948
949    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
950        self.notify_observers(MAKEvent::AgentError(agent_id, message));
951    }
952
953    pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
954        self.notify_observers(MAKEvent::AgentIn(agent_id, port));
955    }
956
957    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
958        self.notify_observers(MAKEvent::AgentSpecUpdated(agent_id));
959    }
960
961    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
962        // // ignore variables
963        // if name.starts_with('%') {
964        //     return;
965        // }
966        self.notify_observers(MAKEvent::Board(name, value));
967    }
968
969    fn notify_observers(&self, event: MAKEvent) {
970        let _ = self.observers.send(event);
971    }
972}
973
974fn is_valid_preseet_name(new_name: &str) -> bool {
975    // Check if the name is empty
976    if new_name.trim().is_empty() {
977        return false;
978    }
979
980    // Checks for path-like names:
981    if new_name.contains('/') {
982        // Disallow leading, trailing, or consecutive slashes
983        if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
984            return false;
985        }
986        // Disallow segments that are "." or ".."
987        if new_name
988            .split('/')
989            .any(|segment| segment == "." || segment == "..")
990        {
991            return false;
992        }
993    }
994
995    // Check if the name contains invalid characters
996    let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
997    for c in invalid_chars {
998        if new_name.contains(c) {
999            return false;
1000        }
1001    }
1002
1003    true
1004}
1005
1006#[derive(Clone, Debug)]
1007pub enum MAKEvent {
1008    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
1009    AgentError(String, String),                     // (agent_id, message)
1010    AgentIn(String, String),                        // (agent_id, port)
1011    AgentSpecUpdated(String),                       // (agent_id)
1012    Board(String, AgentValue),                      // (board name, value)
1013}