1#[cfg(feature = "file")]
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use serde_json::Value;
6use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
7
8use crate::FnvIndexMap;
9use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
10use crate::config::{AgentConfigs, AgentConfigsMap};
11use crate::context::AgentContext;
12use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
13use crate::error::AgentError;
14use crate::id::{new_id, update_ids};
15use crate::message::{self, AgentEventMessage};
16use crate::preset::{Preset, PresetInfo};
17use crate::registry;
18use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
19use crate::value::AgentValue;
20
21const MESSAGE_LIMIT: usize = 1024;
22const EVENT_CHANNEL_CAPACITY: usize = 256;
23
24#[derive(Clone)]
25pub struct MAK {
26 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
28
29 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
31
32 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
34
35 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
37
38 pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
40
41 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
43
44 pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
46
47 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
49
50 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
52
53 pub(crate) observers: broadcast::Sender<MAKEvent>,
55}
56
57impl MAK {
58 pub fn new() -> Self {
59 let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
60 Self {
61 agents: Default::default(),
62 agent_txs: Default::default(),
63 board_out_agents: Default::default(),
64 board_value: Default::default(),
65 connections: Default::default(),
66 defs: Default::default(),
67 presets: Default::default(),
68 global_configs_map: Default::default(),
69 tx: Arc::new(Mutex::new(None)),
70 observers: tx,
71 }
72 }
73
74 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
75 self.tx
76 .lock()
77 .unwrap()
78 .clone()
79 .ok_or(AgentError::TxNotInitialized)
80 }
81
82 pub fn init() -> Result<Self, AgentError> {
84 let mak = Self::new();
85 mak.register_agents();
86 Ok(mak)
87 }
88
89 fn register_agents(&self) {
90 registry::register_inventory_agents(self);
91 }
92
93 pub async fn ready(&self) -> Result<(), AgentError> {
95 self.spawn_message_loop().await?;
96 Ok(())
97 }
98
99 pub fn quit(&self) {
101 let mut tx_lock = self.tx.lock().unwrap();
102 *tx_lock = None;
103 }
104
105 pub fn new_preset(&self) -> Result<String, AgentError> {
110 let spec = PresetSpec::default();
111 let id = self.add_preset(spec)?;
112 Ok(id)
113 }
114
115 pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
151 let presets = self.presets.lock().unwrap();
152 presets.get(id).cloned()
153 }
154
155 pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
159 let preset = Preset::new(spec);
160 let id = preset.id().to_string();
161
162 for agent in &preset.spec().agents {
164 if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
165 log::error!("Failed to add_agent {}: {}", agent.id, e);
166 }
167 }
168
169 for connection in &preset.spec().connections {
171 self.add_connection_internal(connection.clone())
172 .unwrap_or_else(|e| {
173 log::error!("Failed to add_connection {}: {}", connection.source, e);
174 });
175 }
176
177 let mut presets = self.presets.lock().unwrap();
179 if presets.contains_key(&id) {
180 return Err(AgentError::DuplicateId(id.into()));
181 }
182 presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
183
184 Ok(id)
185 }
186
187 pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
189 let preset = self
190 .get_preset(id)
191 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
192
193 let mut preset = preset.lock().await;
194 preset.stop(self).await.unwrap_or_else(|e| {
195 log::error!("Failed to stop preset {}: {}", id, e);
196 });
197
198 for agent in &preset.spec().agents {
200 self.remove_agent_internal(&agent.id)
201 .await
202 .unwrap_or_else(|e| {
203 log::error!("Failed to remove_agent {}: {}", agent.id, e);
204 });
205 }
206 for connection in &preset.spec().connections {
207 self.remove_connection_internal(connection);
208 }
209
210 Ok(())
211 }
212
213 pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
215 let preset = self
216 .get_preset(id)
217 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
218 let mut preset = preset.lock().await;
219 preset.start(self).await?;
220
221 Ok(())
222 }
223
224 pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
226 let preset = self
227 .get_preset(id)
228 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
229 let mut preset = preset.lock().await;
230 preset.stop(self).await?;
231
232 Ok(())
233 }
234
235 #[cfg(feature = "file")]
237 pub async fn open_preset_from_file(&self, path: &str) -> Result<String, AgentError> {
238 let json_str =
239 std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
240 let spec = PresetSpec::from_json(&json_str)?;
241 let id = self.add_preset(spec)?;
242 self.set_preset_file_name(&id, path).await?;
243 Ok(id)
244 }
245
246 #[cfg(feature = "file")]
248 pub async fn save_preset(&self, id: &str) -> Result<(), AgentError> {
249 let Some(preset_spec) = self.get_preset_spec(id).await else {
250 return Err(AgentError::PresetNotFound(id.to_string()));
251 };
252 let json_str = preset_spec.to_json()?;
253 let path = self
254 .get_preset_path(id)
255 .await
256 .ok_or_else(|| AgentError::EmptyFileName)?;
257 std::fs::write(&path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
258 Ok(())
259 }
260
261 #[cfg(feature = "file")]
263 pub async fn save_preset_as(&self, id: &str, path: &str) -> Result<(), AgentError> {
264 self.set_preset_file_name(id, path).await?;
265 self.save_preset(id).await?;
266 Ok(())
267 }
268
269 #[cfg(feature = "file")]
271 pub async fn get_preset_path(&self, id: &str) -> Option<PathBuf> {
272 let Some(preset) = self.get_preset(id) else {
273 return None;
274 };
275 let preset = preset.lock().await;
276 let Some(name) = preset.name() else {
277 return None;
278 };
279 let Some(dir) = preset.dir() else {
280 return None;
281 };
282 let path = std::path::Path::new(&dir).join(name);
283 Some(path)
284 }
285
286 #[cfg(feature = "file")]
288 pub async fn set_preset_file_name(&self, id: &str, path: &str) -> Result<(), AgentError> {
289 let path = std::path::Path::new(path);
290 let name = path
291 .file_stem()
292 .and_then(|s| s.to_str())
293 .ok_or(AgentError::InvalidFileExtension)?;
294 let dir = path
295 .parent()
296 .and_then(|s| s.to_str())
297 .unwrap_or("")
298 .to_string();
299 let preset = {
300 let presets = self.presets.lock().unwrap();
301 let Some(preset) = presets.get(id) else {
302 return Err(AgentError::PresetNotFound(id.to_string()));
303 };
304 preset.clone()
305 };
306 let mut preset = preset.lock().await;
307 preset.set_name(name.to_string());
308 preset.set_dir(dir);
309 Ok(())
310 }
311
312 pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
316 let Some(preset) = self.get_preset(id) else {
317 return None;
318 };
319 let mut preset_spec = {
320 let preset = preset.lock().await;
321 preset.spec().clone()
322 };
323
324 let mut agent_specs = Vec::new();
326 for agent in &preset_spec.agents {
327 if let Some(spec) = self.get_agent_spec(&agent.id).await {
328 agent_specs.push(spec);
329 }
330 }
331 preset_spec.agents = agent_specs;
332
333 Some(preset_spec)
336 }
337
338 pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
340 let preset = self
341 .get_preset(id)
342 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
343 let mut preset = preset.lock().await;
344 preset.update_spec(value)?;
345 Ok(())
346 }
347
348 pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
352 let Some(preset) = self.get_preset(id) else {
353 return None;
354 };
355 Some(PresetInfo::from(&*preset.lock().await))
356 }
357
358 pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
360 let presets = {
361 let presets = self.presets.lock().unwrap();
362 presets.values().cloned().collect::<Vec<_>>()
363 };
364 let mut preset_infos = Vec::new();
365 for preset in presets {
366 let preset_guard = preset.lock().await;
367 preset_infos.push(PresetInfo::from(&*preset_guard));
368 }
369 preset_infos
370 }
371
372 pub fn register_agent_definiton(&self, def: AgentDefinition) {
376 let def_name = def.name.clone();
377 let def_global_configs = def.global_configs.clone();
378
379 let mut defs = self.defs.lock().unwrap();
380 defs.insert(def.name.clone(), def);
381
382 if let Some(def_global_configs) = def_global_configs {
384 let mut new_configs = AgentConfigs::default();
385 for (key, config_entry) in def_global_configs.iter() {
386 new_configs.set(key.clone(), config_entry.value.clone());
387 }
388 self.set_global_configs(def_name, new_configs);
389 }
390 }
391
392 pub fn get_agent_definitions(&self) -> AgentDefinitions {
394 let defs = self.defs.lock().unwrap();
395 defs.clone()
396 }
397
398 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
400 let defs = self.defs.lock().unwrap();
401 defs.get(def_name).cloned()
402 }
403
404 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
406 let defs = self.defs.lock().unwrap();
407 let Some(def) = defs.get(def_name) else {
408 return None;
409 };
410 def.configs.clone()
411 }
412
413 pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
415 let agent = {
416 let agents = self.agents.lock().unwrap();
417 let Some(agent) = agents.get(agent_id) else {
418 return None;
419 };
420 agent.clone()
421 };
422 let agent = agent.lock().await;
423 Some(agent.spec().clone())
424 }
425
426 pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
428 let agent = {
429 let agents = self.agents.lock().unwrap();
430 let Some(agent) = agents.get(agent_id) else {
431 return Err(AgentError::AgentNotFound(agent_id.to_string()));
432 };
433 agent.clone()
434 };
435 let mut agent = agent.lock().await;
436 agent.update_spec(value)?;
437 Ok(())
438 }
439
440 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
442 let def = self
443 .get_agent_definition(def_name)
444 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
445 Ok(def.to_spec())
446 }
447
448 pub async fn add_agent(
450 &self,
451 preset_id: String,
452 mut spec: AgentSpec,
453 ) -> Result<String, AgentError> {
454 let preset = self
455 .get_preset(&preset_id)
456 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
457
458 let id = new_id();
459 spec.id = id.clone();
460 self.add_agent_internal(preset_id, spec.clone())?;
461
462 let mut preset = preset.lock().await;
463 preset.add_agent(spec.clone());
464
465 Ok(id)
466 }
467
468 fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
469 let mut agents = self.agents.lock().unwrap();
470 if agents.contains_key(&spec.id) {
471 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
472 }
473 let spec_id = spec.id.clone();
474 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
475 agent.set_preset_id(preset_id);
476 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
477 Ok(())
478 }
479
480 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
482 let agents = self.agents.lock().unwrap();
483 agents.get(agent_id).cloned()
484 }
485
486 pub async fn add_connection(
488 &self,
489 preset_id: &str,
490 connection: ConnectionSpec,
491 ) -> Result<(), AgentError> {
492 {
494 let agents = self.agents.lock().unwrap();
495 if !agents.contains_key(&connection.source) {
496 return Err(AgentError::AgentNotFound(connection.source.to_string()));
497 }
498 if !agents.contains_key(&connection.target) {
499 return Err(AgentError::AgentNotFound(connection.target.to_string()));
500 }
501 }
502
503 if connection.source_handle.is_empty() {
505 return Err(AgentError::EmptySourceHandle);
506 }
507 if connection.target_handle.is_empty() {
508 return Err(AgentError::EmptyTargetHandle);
509 }
510
511 let preset = self
512 .get_preset(preset_id)
513 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
514 let mut preset = preset.lock().await;
515 preset.add_connection(connection.clone());
516 self.add_connection_internal(connection)?;
517 Ok(())
518 }
519
520 fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
521 let mut connections = self.connections.lock().unwrap();
522 if let Some(targets) = connections.get_mut(&connection.source) {
523 if targets
524 .iter()
525 .any(|(target, source_handle, target_handle)| {
526 *target == connection.target
527 && *source_handle == connection.source_handle
528 && *target_handle == connection.target_handle
529 })
530 {
531 return Err(AgentError::ConnectionAlreadyExists);
532 }
533 targets.push((
534 connection.target,
535 connection.source_handle,
536 connection.target_handle,
537 ));
538 } else {
539 connections.insert(
540 connection.source,
541 vec![(
542 connection.target,
543 connection.source_handle,
544 connection.target_handle,
545 )],
546 );
547 }
548 Ok(())
549 }
550
551 pub async fn add_agents_and_connections(
556 &self,
557 preset_id: &str,
558 agents: &Vec<AgentSpec>,
559 connections: &Vec<ConnectionSpec>,
560 ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
561 let (agents, connections) = update_ids(agents, connections);
562
563 let preset = self
564 .get_preset(preset_id)
565 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
566 let mut preset = preset.lock().await;
567
568 for agent in &agents {
569 self.add_agent_internal(preset_id.to_string(), agent.clone())?;
570 preset.add_agent(agent.clone());
571 }
572
573 for connection in &connections {
574 self.add_connection_internal(connection.clone())?;
575 preset.add_connection(connection.clone());
576 }
577
578 Ok((agents, connections))
579 }
580
581 pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
585 {
586 let preset = self
587 .get_preset(preset_id)
588 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
589 let mut preset = preset.lock().await;
590 preset.remove_agent(agent_id);
591 }
592 if let Err(e) = self.remove_agent_internal(agent_id).await {
593 return Err(e);
594 }
595 Ok(())
596 }
597
598 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
599 self.stop_agent(agent_id).await?;
600
601 {
603 let mut connections = self.connections.lock().unwrap();
604 let mut sources_to_remove = Vec::new();
605 for (source, targets) in connections.iter_mut() {
606 targets.retain(|(target, _, _)| target != agent_id);
607 if targets.is_empty() {
608 sources_to_remove.push(source.clone());
609 }
610 }
611 for source in sources_to_remove {
612 connections.swap_remove(&source);
613 }
614 connections.swap_remove(agent_id);
615 }
616
617 {
619 let mut agents = self.agents.lock().unwrap();
620 agents.swap_remove(agent_id);
621 }
622
623 Ok(())
624 }
625
626 pub async fn remove_connection(
628 &self,
629 preset_id: &str,
630 connection: &ConnectionSpec,
631 ) -> Result<(), AgentError> {
632 let preset = self
633 .get_preset(preset_id)
634 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
635 let mut preset = preset.lock().await;
636 let Some(connection) = preset.remove_connection(connection) else {
637 return Err(AgentError::ConnectionNotFound(format!(
638 "{}:{}->{}:{}",
639 connection.source,
640 connection.source_handle,
641 connection.target,
642 connection.target_handle
643 )));
644 };
645 self.remove_connection_internal(&connection);
646 Ok(())
647 }
648
649 fn remove_connection_internal(&self, connection: &ConnectionSpec) {
650 let mut connections = self.connections.lock().unwrap();
651 if let Some(targets) = connections.get_mut(&connection.source) {
652 targets.retain(|(target, source_handle, target_handle)| {
653 *target != connection.target
654 || *source_handle != connection.source_handle
655 || *target_handle != connection.target_handle
656 });
657 if targets.is_empty() {
658 connections.swap_remove(&connection.source);
659 }
660 }
661 }
662
663 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
665 let agent = {
666 let agents = self.agents.lock().unwrap();
667 let Some(a) = agents.get(agent_id) else {
668 return Err(AgentError::AgentNotFound(agent_id.to_string()));
669 };
670 a.clone()
671 };
672 let def_name = {
673 let agent = agent.lock().await;
674 agent.def_name().to_string()
675 };
676 let uses_native_thread = {
677 let defs = self.defs.lock().unwrap();
678 let Some(def) = defs.get(&def_name) else {
679 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
680 };
681 def.native_thread
682 };
683 let agent_status = {
684 let agent = agent.lock().await;
686 agent.status().clone()
687 };
688 if agent_status == AgentStatus::Init {
689 log::info!("Starting agent {}", agent_id);
690
691 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
692
693 {
694 let mut agent_txs = self.agent_txs.lock().unwrap();
695 agent_txs.insert(agent_id.to_string(), tx.clone());
696 };
697
698 let agent_clone = agent.clone();
699 let agent_id_clone = agent_id.to_string();
700
701 let agent_loop = async move {
702 {
703 let mut agent_guard = agent_clone.lock().await;
704 if let Err(e) = agent_guard.start().await {
705 log::error!("Failed to start agent {}: {}", agent_id_clone, e);
706 return;
707 }
708 }
709
710 while let Some(message) = rx.recv().await {
711 match message {
712 AgentMessage::Input { ctx, port, value } => {
713 agent_clone
714 .lock()
715 .await
716 .process(ctx, port, value)
717 .await
718 .unwrap_or_else(|e| {
719 log::error!("Process Error {}: {}", agent_id_clone, e);
720 });
721 }
722 AgentMessage::Config { key, value } => {
723 agent_clone
724 .lock()
725 .await
726 .set_config(key, value)
727 .unwrap_or_else(|e| {
728 log::error!("Config Error {}: {}", agent_id_clone, e);
729 });
730 }
731 AgentMessage::Configs { configs } => {
732 agent_clone
733 .lock()
734 .await
735 .set_configs(configs)
736 .unwrap_or_else(|e| {
737 log::error!("Configs Error {}: {}", agent_id_clone, e);
738 });
739 }
740 AgentMessage::Stop => {
741 rx.close();
742 break;
743 }
744 }
745 }
746 };
747
748 if uses_native_thread {
749 std::thread::spawn(move || {
750 let rt = tokio::runtime::Builder::new_current_thread()
751 .enable_all()
752 .build()
753 .unwrap();
754 rt.block_on(agent_loop);
755 });
756 } else {
757 tokio::spawn(agent_loop);
758 }
759 }
760 Ok(())
761 }
762
763 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
765 {
766 let mut agent_txs = self.agent_txs.lock().unwrap();
768 if let Some(tx) = agent_txs.swap_remove(agent_id) {
769 if let Err(e) = tx.try_send(AgentMessage::Stop) {
770 log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
771 }
772 }
773 }
774
775 let agent = {
776 let agents = self.agents.lock().unwrap();
777 let Some(a) = agents.get(agent_id) else {
778 return Err(AgentError::AgentNotFound(agent_id.to_string()));
779 };
780 a.clone()
781 };
782 let mut agent_guard = agent.lock().await;
783 if *agent_guard.status() == AgentStatus::Start {
784 log::info!("Stopping agent {}", agent_id);
785 agent_guard.stop().await?;
786 }
787
788 Ok(())
789 }
790
791 pub async fn set_agent_configs(
793 &self,
794 agent_id: String,
795 configs: AgentConfigs,
796 ) -> Result<(), AgentError> {
797 let tx = {
798 let agent_txs = self.agent_txs.lock().unwrap();
799 agent_txs.get(&agent_id).cloned()
800 };
801
802 let Some(tx) = tx else {
803 let agent = {
805 let agents = self.agents.lock().unwrap();
806 let Some(a) = agents.get(&agent_id) else {
807 return Err(AgentError::AgentNotFound(agent_id.to_string()));
808 };
809 a.clone()
810 };
811 agent.lock().await.set_configs(configs.clone())?;
812 return Ok(());
813 };
814 let message = AgentMessage::Configs { configs };
815 tx.send(message).await.map_err(|_| {
816 AgentError::SendMessageFailed("Failed to send config message".to_string())
817 })?;
818 Ok(())
819 }
820
821 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
823 let global_configs_map = self.global_configs_map.lock().unwrap();
824 global_configs_map.get(def_name).cloned()
825 }
826
827 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
829 let mut global_configs_map = self.global_configs_map.lock().unwrap();
830
831 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
832 global_configs_map.insert(def_name, configs);
833 return;
834 };
835
836 for (key, value) in configs {
837 existing_configs.set(key, value);
838 }
839 }
840
841 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
843 let global_configs_map = self.global_configs_map.lock().unwrap();
844 global_configs_map.clone()
845 }
846
847 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
849 for (agent_name, new_configs) in new_configs_map {
850 self.set_global_configs(agent_name, new_configs);
851 }
852 }
853
854 pub(crate) async fn agent_input(
856 &self,
857 agent_id: String,
858 ctx: AgentContext,
859 port: String,
860 value: AgentValue,
861 ) -> Result<(), AgentError> {
862 let message = if port.starts_with("config:") {
863 let config_key = port[7..].to_string();
864 AgentMessage::Config {
865 key: config_key,
866 value,
867 }
868 } else {
869 AgentMessage::Input {
870 ctx,
871 port: port.clone(),
872 value,
873 }
874 };
875
876 let tx = {
877 let agent_txs = self.agent_txs.lock().unwrap();
878 agent_txs.get(&agent_id).cloned()
879 };
880
881 let Some(tx) = tx else {
882 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
884 let agents = self.agents.lock().unwrap();
885 let Some(a) = agents.get(&agent_id) else {
886 return Err(AgentError::AgentNotFound(agent_id.to_string()));
887 };
888 a.clone()
889 };
890 if let AgentMessage::Config { key, value } = message {
891 agent.lock().await.set_config(key, value)?;
892 }
893 return Ok(());
894 };
895 tx.send(message).await.map_err(|_| {
896 AgentError::SendMessageFailed("Failed to send input message".to_string())
897 })?;
898
899 self.emit_agent_input(agent_id.to_string(), port);
900
901 Ok(())
902 }
903
904 pub async fn send_agent_out(
906 &self,
907 agent_id: String,
908 ctx: AgentContext,
909 port: String,
910 value: AgentValue,
911 ) -> Result<(), AgentError> {
912 message::send_agent_out(self, agent_id, ctx, port, value).await
913 }
914
915 pub fn try_send_agent_out(
917 &self,
918 agent_id: String,
919 ctx: AgentContext,
920 port: String,
921 value: AgentValue,
922 ) -> Result<(), AgentError> {
923 message::try_send_agent_out(self, agent_id, ctx, port, value)
924 }
925
926 pub async fn write_board_value(
928 &self,
929 name: String,
930 value: AgentValue,
931 ) -> Result<(), AgentError> {
932 self.send_board_out(name, AgentContext::new(), value).await
933 }
934
935 pub async fn write_var_value(
937 &self,
938 preset_id: &str,
939 name: &str,
940 value: AgentValue,
941 ) -> Result<(), AgentError> {
942 let var_name = format!("%{}/{}", preset_id, name);
943 self.send_board_out(var_name, AgentContext::new(), value)
944 .await
945 }
946
947 pub(crate) async fn send_board_out(
948 &self,
949 name: String,
950 ctx: AgentContext,
951 value: AgentValue,
952 ) -> Result<(), AgentError> {
953 message::send_board_out(self, name, ctx, value).await
954 }
955
956 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
957 let (tx, mut rx) = mpsc::channel(4096);
959 {
960 let mut tx_lock = self.tx.lock().unwrap();
961 *tx_lock = Some(tx);
962 }
963
964 let mak = self.clone();
966 tokio::spawn(async move {
967 while let Some(message) = rx.recv().await {
968 use AgentEventMessage::*;
969
970 match message {
971 AgentOut {
972 agent,
973 ctx,
974 port,
975 value,
976 } => {
977 message::agent_out(&mak, agent, ctx, port, value).await;
978 }
979 BoardOut { name, ctx, value } => {
980 message::board_out(&mak, name, ctx, value).await;
981 }
982 }
983 }
984 });
985
986 tokio::task::yield_now().await;
987
988 Ok(())
989 }
990
991 pub fn subscribe(&self) -> broadcast::Receiver<MAKEvent> {
993 self.observers.subscribe()
994 }
995
996 pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
1001 where
1002 F: FnMut(MAKEvent) -> Option<T> + Send + 'static,
1003 T: Send + 'static,
1004 {
1005 let (tx, rx) = mpsc::unbounded_channel();
1006 let mut event_rx = self.subscribe();
1007
1008 tokio::spawn(async move {
1009 loop {
1010 match event_rx.recv().await {
1011 Ok(event) => {
1012 if let Some(mapped_event) = filter_map(event) {
1013 if tx.send(mapped_event).is_err() {
1014 break;
1016 }
1017 }
1018 }
1019 Err(RecvError::Lagged(n)) => {
1020 log::warn!("Event subscriber lagged by {} events", n);
1021 }
1022 Err(RecvError::Closed) => {
1023 break;
1025 }
1026 }
1027 }
1028 });
1029 rx
1030 }
1031
1032 pub(crate) fn emit_agent_config_updated(
1033 &self,
1034 agent_id: String,
1035 key: String,
1036 value: AgentValue,
1037 ) {
1038 self.notify_observers(MAKEvent::AgentConfigUpdated(agent_id, key, value));
1039 }
1040
1041 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
1042 self.notify_observers(MAKEvent::AgentError(agent_id, message));
1043 }
1044
1045 pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
1046 self.notify_observers(MAKEvent::AgentIn(agent_id, port));
1047 }
1048
1049 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
1050 self.notify_observers(MAKEvent::AgentSpecUpdated(agent_id));
1051 }
1052
1053 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
1054 self.notify_observers(MAKEvent::Board(name, value));
1059 }
1060
1061 fn notify_observers(&self, event: MAKEvent) {
1062 let _ = self.observers.send(event);
1063 }
1064}
1065
1066#[derive(Clone, Debug)]
1067pub enum MAKEvent {
1068 AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }