1#[cfg(feature = "file")]
2use std::sync::{Arc, Mutex};
3
4use serde_json::Value;
5use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
6
7use crate::FnvIndexMap;
8use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::id::{new_id, update_ids};
14use crate::message::{self, AgentEventMessage};
15use crate::preset::{Preset, PresetInfo};
16use crate::registry;
17use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
18use crate::value::AgentValue;
19
20const MESSAGE_LIMIT: usize = 1024;
21const EVENT_CHANNEL_CAPACITY: usize = 256;
22
23#[derive(Clone)]
24pub struct ModularAgent {
25 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
27
28 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
30
31 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
33
34 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
36
37 pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
39
40 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
42
43 pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
45
46 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
48
49 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
51
52 pub(crate) observers: broadcast::Sender<MAKEvent>,
54}
55
56impl ModularAgent {
57 pub fn new() -> Self {
58 let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59 Self {
60 agents: Default::default(),
61 agent_txs: Default::default(),
62 board_out_agents: Default::default(),
63 board_value: Default::default(),
64 connections: Default::default(),
65 defs: Default::default(),
66 presets: Default::default(),
67 global_configs_map: Default::default(),
68 tx: Arc::new(Mutex::new(None)),
69 observers: tx,
70 }
71 }
72
73 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
74 self.tx
75 .lock()
76 .unwrap()
77 .clone()
78 .ok_or(AgentError::TxNotInitialized)
79 }
80
81 pub fn init() -> Result<Self, AgentError> {
83 let ma = Self::new();
84 ma.register_agents();
85 Ok(ma)
86 }
87
88 fn register_agents(&self) {
89 registry::register_inventory_agents(self);
90 }
91
92 pub async fn ready(&self) -> Result<(), AgentError> {
94 self.spawn_message_loop().await?;
95 Ok(())
96 }
97
98 pub fn quit(&self) {
100 let mut tx_lock = self.tx.lock().unwrap();
101 *tx_lock = None;
102 }
103
104 pub fn new_preset(&self) -> Result<String, AgentError> {
109 let spec = PresetSpec::default();
110 let id = self.add_preset(spec)?;
111 Ok(id)
112 }
113
114 pub fn new_preset_with_name(&self, name: String) -> Result<String, AgentError> {
117 let spec = PresetSpec::default();
118 let id = self.add_preset_with_name(spec, name)?;
119 Ok(id)
120 }
121
122 pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
124 let presets = self.presets.lock().unwrap();
125 presets.get(id).cloned()
126 }
127
128 pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
132 self.add_preset_raw(spec, None)
133 }
134
135 pub fn add_preset_with_name(
139 &self,
140 spec: PresetSpec,
141 name: String,
142 ) -> Result<String, AgentError> {
143 self.add_preset_raw(spec, Some(name))
144 }
145
146 fn add_preset_raw(&self, spec: PresetSpec, name: Option<String>) -> Result<String, AgentError> {
147 let mut preset = Preset::new(spec);
148 if let Some(name) = name {
149 preset.set_name(name);
150 }
151 let id = preset.id().to_string();
152
153 for agent in &preset.spec().agents {
155 if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
156 log::error!("Failed to add_agent {}: {}", agent.id, e);
157 }
158 }
159
160 for connection in &preset.spec().connections {
162 self.add_connection_internal(connection.clone())
163 .unwrap_or_else(|e| {
164 log::error!("Failed to add_connection {}: {}", connection.source, e);
165 });
166 }
167
168 let mut presets = self.presets.lock().unwrap();
170 if presets.contains_key(&id) {
171 return Err(AgentError::DuplicateId(id.into()));
172 }
173 presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
174
175 Ok(id)
176 }
177
178 pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
180 let preset = self
181 .get_preset(id)
182 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
183
184 let mut preset = preset.lock().await;
185 preset.stop(self).await.unwrap_or_else(|e| {
186 log::error!("Failed to stop preset {}: {}", id, e);
187 });
188
189 for agent in &preset.spec().agents {
191 self.remove_agent_internal(&agent.id)
192 .await
193 .unwrap_or_else(|e| {
194 log::error!("Failed to remove_agent {}: {}", agent.id, e);
195 });
196 }
197 for connection in &preset.spec().connections {
198 self.remove_connection_internal(connection);
199 }
200
201 Ok(())
202 }
203
204 pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
206 let preset = self
207 .get_preset(id)
208 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
209 let mut preset = preset.lock().await;
210 preset.start(self).await?;
211
212 Ok(())
213 }
214
215 pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
217 let preset = self
218 .get_preset(id)
219 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
220 let mut preset = preset.lock().await;
221 preset.stop(self).await?;
222
223 Ok(())
224 }
225
226 #[cfg(feature = "file")]
228 pub async fn open_preset_from_file(
229 &self,
230 path: &str,
231 name: Option<String>,
232 ) -> Result<String, AgentError> {
233 let json_str =
234 std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
235 let spec = PresetSpec::from_json(&json_str)?;
236 let id = self.add_preset_raw(spec, name)?;
237 Ok(id)
238 }
239
240 #[cfg(feature = "file")]
242 pub async fn save_preset(&self, id: &str, path: &str) -> Result<(), AgentError> {
243 let Some(preset_spec) = self.get_preset_spec(id).await else {
244 return Err(AgentError::PresetNotFound(id.to_string()));
245 };
246 let json_str = preset_spec.to_json()?;
247 std::fs::write(path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
248 Ok(())
249 }
250
251 pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
255 let Some(preset) = self.get_preset(id) else {
256 return None;
257 };
258 let mut preset_spec = {
259 let preset = preset.lock().await;
260 preset.spec().clone()
261 };
262
263 let mut agent_specs = Vec::new();
265 for agent in &preset_spec.agents {
266 if let Some(spec) = self.get_agent_spec(&agent.id).await {
267 agent_specs.push(spec);
268 }
269 }
270 preset_spec.agents = agent_specs;
271
272 Some(preset_spec)
275 }
276
277 pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
279 let preset = self
280 .get_preset(id)
281 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
282 let mut preset = preset.lock().await;
283 preset.update_spec(value)?;
284 Ok(())
285 }
286
287 pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
291 let Some(preset) = self.get_preset(id) else {
292 return None;
293 };
294 Some(PresetInfo::from(&*preset.lock().await))
295 }
296
297 pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
299 let presets = {
300 let presets = self.presets.lock().unwrap();
301 presets.values().cloned().collect::<Vec<_>>()
302 };
303 let mut preset_infos = Vec::new();
304 for preset in presets {
305 let preset_guard = preset.lock().await;
306 preset_infos.push(PresetInfo::from(&*preset_guard));
307 }
308 preset_infos
309 }
310
311 pub fn register_agent_definiton(&self, def: AgentDefinition) {
315 let def_name = def.name.clone();
316 let def_global_configs = def.global_configs.clone();
317
318 let mut defs = self.defs.lock().unwrap();
319 defs.insert(def.name.clone(), def);
320
321 if let Some(def_global_configs) = def_global_configs {
323 let mut new_configs = AgentConfigs::default();
324 for (key, config_entry) in def_global_configs.iter() {
325 new_configs.set(key.clone(), config_entry.value.clone());
326 }
327 self.set_global_configs(def_name, new_configs);
328 }
329 }
330
331 pub fn get_agent_definitions(&self) -> AgentDefinitions {
333 let defs = self.defs.lock().unwrap();
334 defs.clone()
335 }
336
337 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
339 let defs = self.defs.lock().unwrap();
340 defs.get(def_name).cloned()
341 }
342
343 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
345 let defs = self.defs.lock().unwrap();
346 let Some(def) = defs.get(def_name) else {
347 return None;
348 };
349 def.configs.clone()
350 }
351
352 pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
354 let agent = {
355 let agents = self.agents.lock().unwrap();
356 let Some(agent) = agents.get(agent_id) else {
357 return None;
358 };
359 agent.clone()
360 };
361 let agent = agent.lock().await;
362 Some(agent.spec().clone())
363 }
364
365 pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
367 let agent = {
368 let agents = self.agents.lock().unwrap();
369 let Some(agent) = agents.get(agent_id) else {
370 return Err(AgentError::AgentNotFound(agent_id.to_string()));
371 };
372 agent.clone()
373 };
374 let mut agent = agent.lock().await;
375 agent.update_spec(value)?;
376 Ok(())
377 }
378
379 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
381 let def = self
382 .get_agent_definition(def_name)
383 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
384 Ok(def.to_spec())
385 }
386
387 pub async fn add_agent(
389 &self,
390 preset_id: String,
391 mut spec: AgentSpec,
392 ) -> Result<String, AgentError> {
393 let preset = self
394 .get_preset(&preset_id)
395 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
396
397 let id = new_id();
398 spec.id = id.clone();
399 self.add_agent_internal(preset_id, spec.clone())?;
400
401 let mut preset = preset.lock().await;
402 preset.add_agent(spec.clone());
403
404 Ok(id)
405 }
406
407 fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
408 let mut agents = self.agents.lock().unwrap();
409 if agents.contains_key(&spec.id) {
410 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
411 }
412 let spec_id = spec.id.clone();
413 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
414 agent.set_preset_id(preset_id);
415 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
416 Ok(())
417 }
418
419 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
421 let agents = self.agents.lock().unwrap();
422 agents.get(agent_id).cloned()
423 }
424
425 pub async fn add_connection(
427 &self,
428 preset_id: &str,
429 connection: ConnectionSpec,
430 ) -> Result<(), AgentError> {
431 {
433 let agents = self.agents.lock().unwrap();
434 if !agents.contains_key(&connection.source) {
435 return Err(AgentError::AgentNotFound(connection.source.to_string()));
436 }
437 if !agents.contains_key(&connection.target) {
438 return Err(AgentError::AgentNotFound(connection.target.to_string()));
439 }
440 }
441
442 if connection.source_handle.is_empty() {
444 return Err(AgentError::EmptySourceHandle);
445 }
446 if connection.target_handle.is_empty() {
447 return Err(AgentError::EmptyTargetHandle);
448 }
449
450 let preset = self
451 .get_preset(preset_id)
452 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
453 let mut preset = preset.lock().await;
454 preset.add_connection(connection.clone());
455 self.add_connection_internal(connection)?;
456 Ok(())
457 }
458
459 fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
460 let mut connections = self.connections.lock().unwrap();
461 if let Some(targets) = connections.get_mut(&connection.source) {
462 if targets
463 .iter()
464 .any(|(target, source_handle, target_handle)| {
465 *target == connection.target
466 && *source_handle == connection.source_handle
467 && *target_handle == connection.target_handle
468 })
469 {
470 return Err(AgentError::ConnectionAlreadyExists);
471 }
472 targets.push((
473 connection.target,
474 connection.source_handle,
475 connection.target_handle,
476 ));
477 } else {
478 connections.insert(
479 connection.source,
480 vec![(
481 connection.target,
482 connection.source_handle,
483 connection.target_handle,
484 )],
485 );
486 }
487 Ok(())
488 }
489
490 pub async fn add_agents_and_connections(
495 &self,
496 preset_id: &str,
497 agents: &Vec<AgentSpec>,
498 connections: &Vec<ConnectionSpec>,
499 ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
500 let (agents, connections) = update_ids(agents, connections);
501
502 let preset = self
503 .get_preset(preset_id)
504 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
505 let mut preset = preset.lock().await;
506
507 for agent in &agents {
508 self.add_agent_internal(preset_id.to_string(), agent.clone())?;
509 preset.add_agent(agent.clone());
510 }
511
512 for connection in &connections {
513 self.add_connection_internal(connection.clone())?;
514 preset.add_connection(connection.clone());
515 }
516
517 Ok((agents, connections))
518 }
519
520 pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
524 {
525 let preset = self
526 .get_preset(preset_id)
527 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
528 let mut preset = preset.lock().await;
529 preset.remove_agent(agent_id);
530 }
531 if let Err(e) = self.remove_agent_internal(agent_id).await {
532 return Err(e);
533 }
534 Ok(())
535 }
536
537 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
538 self.stop_agent(agent_id).await?;
539
540 {
542 let mut connections = self.connections.lock().unwrap();
543 let mut sources_to_remove = Vec::new();
544 for (source, targets) in connections.iter_mut() {
545 targets.retain(|(target, _, _)| target != agent_id);
546 if targets.is_empty() {
547 sources_to_remove.push(source.clone());
548 }
549 }
550 for source in sources_to_remove {
551 connections.swap_remove(&source);
552 }
553 connections.swap_remove(agent_id);
554 }
555
556 {
558 let mut agents = self.agents.lock().unwrap();
559 agents.swap_remove(agent_id);
560 }
561
562 Ok(())
563 }
564
565 pub async fn remove_connection(
567 &self,
568 preset_id: &str,
569 connection: &ConnectionSpec,
570 ) -> Result<(), AgentError> {
571 let preset = self
572 .get_preset(preset_id)
573 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
574 let mut preset = preset.lock().await;
575 let Some(connection) = preset.remove_connection(connection) else {
576 return Err(AgentError::ConnectionNotFound(format!(
577 "{}:{}->{}:{}",
578 connection.source,
579 connection.source_handle,
580 connection.target,
581 connection.target_handle
582 )));
583 };
584 self.remove_connection_internal(&connection);
585 Ok(())
586 }
587
588 fn remove_connection_internal(&self, connection: &ConnectionSpec) {
589 let mut connections = self.connections.lock().unwrap();
590 if let Some(targets) = connections.get_mut(&connection.source) {
591 targets.retain(|(target, source_handle, target_handle)| {
592 *target != connection.target
593 || *source_handle != connection.source_handle
594 || *target_handle != connection.target_handle
595 });
596 if targets.is_empty() {
597 connections.swap_remove(&connection.source);
598 }
599 }
600 }
601
602 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
604 let agent = {
605 let agents = self.agents.lock().unwrap();
606 let Some(a) = agents.get(agent_id) else {
607 return Err(AgentError::AgentNotFound(agent_id.to_string()));
608 };
609 a.clone()
610 };
611 let def_name = {
612 let agent = agent.lock().await;
613 agent.def_name().to_string()
614 };
615 let uses_native_thread = {
616 let defs = self.defs.lock().unwrap();
617 let Some(def) = defs.get(&def_name) else {
618 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
619 };
620 def.native_thread
621 };
622 let agent_status = {
623 let agent = agent.lock().await;
625 agent.status().clone()
626 };
627 if agent_status == AgentStatus::Init {
628 log::info!("Starting agent {}", agent_id);
629
630 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
631
632 {
633 let mut agent_txs = self.agent_txs.lock().unwrap();
634 agent_txs.insert(agent_id.to_string(), tx.clone());
635 };
636
637 let agent_clone = agent.clone();
638 let agent_id_clone = agent_id.to_string();
639
640 let agent_loop = async move {
641 {
642 let mut agent_guard = agent_clone.lock().await;
643 if let Err(e) = agent_guard.start().await {
644 log::error!("Failed to start agent {}: {}", agent_id_clone, e);
645 return;
646 }
647 }
648
649 while let Some(message) = rx.recv().await {
650 match message {
651 AgentMessage::Input { ctx, port, value } => {
652 agent_clone
653 .lock()
654 .await
655 .process(ctx, port, value)
656 .await
657 .unwrap_or_else(|e| {
658 log::error!("Process Error {}: {}", agent_id_clone, e);
659 });
660 }
661 AgentMessage::Config { key, value } => {
662 agent_clone
663 .lock()
664 .await
665 .set_config(key, value)
666 .unwrap_or_else(|e| {
667 log::error!("Config Error {}: {}", agent_id_clone, e);
668 });
669 }
670 AgentMessage::Configs { configs } => {
671 agent_clone
672 .lock()
673 .await
674 .set_configs(configs)
675 .unwrap_or_else(|e| {
676 log::error!("Configs Error {}: {}", agent_id_clone, e);
677 });
678 }
679 AgentMessage::Stop => {
680 rx.close();
681 break;
682 }
683 }
684 }
685 };
686
687 if uses_native_thread {
688 std::thread::spawn(move || {
689 let rt = tokio::runtime::Builder::new_current_thread()
690 .enable_all()
691 .build()
692 .unwrap();
693 rt.block_on(agent_loop);
694 });
695 } else {
696 tokio::spawn(agent_loop);
697 }
698 }
699 Ok(())
700 }
701
702 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
704 {
705 let mut agent_txs = self.agent_txs.lock().unwrap();
707 if let Some(tx) = agent_txs.swap_remove(agent_id) {
708 if let Err(e) = tx.try_send(AgentMessage::Stop) {
709 log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
710 }
711 }
712 }
713
714 let agent = {
715 let agents = self.agents.lock().unwrap();
716 let Some(a) = agents.get(agent_id) else {
717 return Err(AgentError::AgentNotFound(agent_id.to_string()));
718 };
719 a.clone()
720 };
721 let mut agent_guard = agent.lock().await;
722 if *agent_guard.status() == AgentStatus::Start {
723 log::info!("Stopping agent {}", agent_id);
724 agent_guard.stop().await?;
725 }
726
727 Ok(())
728 }
729
730 pub async fn set_agent_configs(
732 &self,
733 agent_id: String,
734 configs: AgentConfigs,
735 ) -> Result<(), AgentError> {
736 let tx = {
737 let agent_txs = self.agent_txs.lock().unwrap();
738 agent_txs.get(&agent_id).cloned()
739 };
740
741 let Some(tx) = tx else {
742 let agent = {
744 let agents = self.agents.lock().unwrap();
745 let Some(a) = agents.get(&agent_id) else {
746 return Err(AgentError::AgentNotFound(agent_id.to_string()));
747 };
748 a.clone()
749 };
750 agent.lock().await.set_configs(configs.clone())?;
751 return Ok(());
752 };
753 let message = AgentMessage::Configs { configs };
754 tx.send(message).await.map_err(|_| {
755 AgentError::SendMessageFailed("Failed to send config message".to_string())
756 })?;
757 Ok(())
758 }
759
760 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
762 let global_configs_map = self.global_configs_map.lock().unwrap();
763 global_configs_map.get(def_name).cloned()
764 }
765
766 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
768 let mut global_configs_map = self.global_configs_map.lock().unwrap();
769
770 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
771 global_configs_map.insert(def_name, configs);
772 return;
773 };
774
775 for (key, value) in configs {
776 existing_configs.set(key, value);
777 }
778 }
779
780 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
782 let global_configs_map = self.global_configs_map.lock().unwrap();
783 global_configs_map.clone()
784 }
785
786 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
788 for (agent_name, new_configs) in new_configs_map {
789 self.set_global_configs(agent_name, new_configs);
790 }
791 }
792
793 pub(crate) async fn agent_input(
795 &self,
796 agent_id: String,
797 ctx: AgentContext,
798 port: String,
799 value: AgentValue,
800 ) -> Result<(), AgentError> {
801 let message = if port.starts_with("config:") {
802 let config_key = port[7..].to_string();
803 AgentMessage::Config {
804 key: config_key,
805 value,
806 }
807 } else {
808 AgentMessage::Input {
809 ctx,
810 port: port.clone(),
811 value,
812 }
813 };
814
815 let tx = {
816 let agent_txs = self.agent_txs.lock().unwrap();
817 agent_txs.get(&agent_id).cloned()
818 };
819
820 let Some(tx) = tx else {
821 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
823 let agents = self.agents.lock().unwrap();
824 let Some(a) = agents.get(&agent_id) else {
825 return Err(AgentError::AgentNotFound(agent_id.to_string()));
826 };
827 a.clone()
828 };
829 if let AgentMessage::Config { key, value } = message {
830 agent.lock().await.set_config(key, value)?;
831 }
832 return Ok(());
833 };
834 tx.send(message).await.map_err(|_| {
835 AgentError::SendMessageFailed("Failed to send input message".to_string())
836 })?;
837
838 self.emit_agent_input(agent_id.to_string(), port);
839
840 Ok(())
841 }
842
843 pub async fn send_agent_out(
845 &self,
846 agent_id: String,
847 ctx: AgentContext,
848 port: String,
849 value: AgentValue,
850 ) -> Result<(), AgentError> {
851 message::send_agent_out(self, agent_id, ctx, port, value).await
852 }
853
854 pub fn try_send_agent_out(
856 &self,
857 agent_id: String,
858 ctx: AgentContext,
859 port: String,
860 value: AgentValue,
861 ) -> Result<(), AgentError> {
862 message::try_send_agent_out(self, agent_id, ctx, port, value)
863 }
864
865 pub async fn write_board_value(
867 &self,
868 name: String,
869 value: AgentValue,
870 ) -> Result<(), AgentError> {
871 self.send_board_out(name, AgentContext::new(), value).await
872 }
873
874 pub async fn write_var_value(
876 &self,
877 preset_id: &str,
878 name: &str,
879 value: AgentValue,
880 ) -> Result<(), AgentError> {
881 let var_name = format!("%{}/{}", preset_id, name);
882 self.send_board_out(var_name, AgentContext::new(), value)
883 .await
884 }
885
886 pub(crate) async fn send_board_out(
887 &self,
888 name: String,
889 ctx: AgentContext,
890 value: AgentValue,
891 ) -> Result<(), AgentError> {
892 message::send_board_out(self, name, ctx, value).await
893 }
894
895 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
896 let (tx, mut rx) = mpsc::channel(4096);
898 {
899 let mut tx_lock = self.tx.lock().unwrap();
900 *tx_lock = Some(tx);
901 }
902
903 let ma = self.clone();
905 tokio::spawn(async move {
906 while let Some(message) = rx.recv().await {
907 use AgentEventMessage::*;
908
909 match message {
910 AgentOut {
911 agent,
912 ctx,
913 port,
914 value,
915 } => {
916 message::agent_out(&ma, agent, ctx, port, value).await;
917 }
918 BoardOut { name, ctx, value } => {
919 message::board_out(&ma, name, ctx, value).await;
920 }
921 }
922 }
923 });
924
925 tokio::task::yield_now().await;
926
927 Ok(())
928 }
929
930 pub fn subscribe(&self) -> broadcast::Receiver<MAKEvent> {
932 self.observers.subscribe()
933 }
934
935 pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
940 where
941 F: FnMut(MAKEvent) -> Option<T> + Send + 'static,
942 T: Send + 'static,
943 {
944 let (tx, rx) = mpsc::unbounded_channel();
945 let mut event_rx = self.subscribe();
946
947 tokio::spawn(async move {
948 loop {
949 match event_rx.recv().await {
950 Ok(event) => {
951 if let Some(mapped_event) = filter_map(event) {
952 if tx.send(mapped_event).is_err() {
953 break;
955 }
956 }
957 }
958 Err(RecvError::Lagged(n)) => {
959 log::warn!("Event subscriber lagged by {} events", n);
960 }
961 Err(RecvError::Closed) => {
962 break;
964 }
965 }
966 }
967 });
968 rx
969 }
970
971 pub(crate) fn emit_agent_config_updated(
972 &self,
973 agent_id: String,
974 key: String,
975 value: AgentValue,
976 ) {
977 self.notify_observers(MAKEvent::AgentConfigUpdated(agent_id, key, value));
978 }
979
980 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
981 self.notify_observers(MAKEvent::AgentError(agent_id, message));
982 }
983
984 pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
985 self.notify_observers(MAKEvent::AgentIn(agent_id, port));
986 }
987
988 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
989 self.notify_observers(MAKEvent::AgentSpecUpdated(agent_id));
990 }
991
992 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
993 self.notify_observers(MAKEvent::Board(name, value));
998 }
999
1000 fn notify_observers(&self, event: MAKEvent) {
1001 let _ = self.observers.send(event);
1002 }
1003}
1004
1005#[derive(Clone, Debug)]
1006pub enum MAKEvent {
1007 AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }