1#[cfg(feature = "file")]
2use std::sync::{Arc, Mutex};
3
4use serde_json::Value;
5use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
6
7use crate::FnvIndexMap;
8use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::id::{new_id, update_ids};
14use crate::message::{self, AgentEventMessage};
15use crate::preset::{Preset, PresetInfo};
16use crate::registry;
17use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
18use crate::value::AgentValue;
19
20const MESSAGE_LIMIT: usize = 1024;
21const EVENT_CHANNEL_CAPACITY: usize = 256;
22
23#[derive(Clone)]
63pub struct ModularAgent {
64 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
66
67 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
69
70 pub(crate) external_input_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
72
73 pub(crate) external_values: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
75
76 pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
78
79 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
81
82 pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
84
85 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
87
88 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
90
91 pub(crate) observers: broadcast::Sender<ModularAgentEvent>,
93}
94
95impl ModularAgent {
96 pub fn new() -> Self {
101 let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
102 Self {
103 agents: Default::default(),
104 agent_txs: Default::default(),
105 external_input_agents: Default::default(),
106 external_values: Default::default(),
107 connections: Default::default(),
108 defs: Default::default(),
109 presets: Default::default(),
110 global_configs_map: Default::default(),
111 tx: Arc::new(Mutex::new(None)),
112 observers: tx,
113 }
114 }
115
116 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
117 self.tx
118 .lock()
119 .unwrap()
120 .clone()
121 .ok_or(AgentError::TxNotInitialized)
122 }
123
124 pub fn init() -> Result<Self, AgentError> {
137 let ma = Self::new();
138 ma.register_agents();
139 Ok(ma)
140 }
141
142 fn register_agents(&self) {
143 registry::register_inventory_agents(self);
144 }
145
146 pub async fn ready(&self) -> Result<(), AgentError> {
163 self.spawn_message_loop().await?;
164 Ok(())
165 }
166
167 pub fn quit(&self) {
184 let mut tx_lock = self.tx.lock().unwrap();
185 *tx_lock = None;
186 }
187
188 pub fn new_preset(&self) -> Result<String, AgentError> {
195 let spec = PresetSpec::default();
196 let id = self.add_preset(spec)?;
197 Ok(id)
198 }
199
200 pub fn new_preset_with_name(&self, name: String) -> Result<String, AgentError> {
204 let spec = PresetSpec::default();
205 let id = self.add_preset_with_name(spec, name)?;
206 Ok(id)
207 }
208
209 pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
213 let presets = self.presets.lock().unwrap();
214 presets.get(id).cloned()
215 }
216
217 pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
222 self.add_preset_raw(spec, None)
223 }
224
225 pub fn add_preset_with_name(
229 &self,
230 spec: PresetSpec,
231 name: String,
232 ) -> Result<String, AgentError> {
233 self.add_preset_raw(spec, Some(name))
234 }
235
236 fn add_preset_raw(&self, spec: PresetSpec, name: Option<String>) -> Result<String, AgentError> {
237 let mut preset = Preset::new(spec);
238 if let Some(name) = name {
239 preset.set_name(name);
240 }
241 let id = preset.id().to_string();
242
243 for agent in &preset.spec().agents {
245 if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
246 log::error!("Failed to add_agent {}: {}", agent.id, e);
247 }
248 }
249
250 for connection in &preset.spec().connections {
252 self.add_connection_internal(connection.clone())
253 .unwrap_or_else(|e| {
254 log::error!("Failed to add_connection {}: {}", connection.source, e);
255 });
256 }
257
258 let mut presets = self.presets.lock().unwrap();
260 if presets.contains_key(&id) {
261 return Err(AgentError::DuplicateId(id.into()));
262 }
263 presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
264
265 Ok(id)
266 }
267
268 pub async fn rename_preset(&self, id: &str, new_name: String) -> Result<(), AgentError> {
270 let preset = self
271 .get_preset(id)
272 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
273 let mut preset = preset.lock().await;
274 preset.set_name(new_name);
275 Ok(())
276 }
277
278 pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
282 let preset = self
283 .get_preset(id)
284 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
285
286 let mut preset = preset.lock().await;
287 preset.stop(self).await.unwrap_or_else(|e| {
288 log::error!("Failed to stop preset {}: {}", id, e);
289 });
290
291 for agent in &preset.spec().agents {
293 self.remove_agent_internal(&agent.id)
294 .await
295 .unwrap_or_else(|e| {
296 log::error!("Failed to remove_agent {}: {}", agent.id, e);
297 });
298 }
299 for connection in &preset.spec().connections {
300 self.remove_connection_internal(connection);
301 }
302
303 drop(preset);
305
306 {
308 let mut presets = self.presets.lock().unwrap();
309 presets.swap_remove(id);
310 }
311
312 Ok(())
313 }
314
315 pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
320 let preset = self
321 .get_preset(id)
322 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
323 let mut preset = preset.lock().await;
324 preset.start(self).await?;
325
326 Ok(())
327 }
328
329 pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
334 let preset = self
335 .get_preset(id)
336 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
337 let mut preset = preset.lock().await;
338 preset.stop(self).await?;
339
340 Ok(())
341 }
342
343 #[cfg(feature = "file")]
353 pub async fn open_preset_from_file(
354 &self,
355 path: &str,
356 name: Option<String>,
357 ) -> Result<String, AgentError> {
358 let json_str =
359 std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
360 let spec = PresetSpec::from_json(&json_str)?;
361 let id = self.add_preset_raw(spec, name)?;
362 Ok(id)
363 }
364
365 #[cfg(feature = "file")]
370 pub async fn save_preset(&self, id: &str, path: &str) -> Result<(), AgentError> {
371 let Some(preset_spec) = self.get_preset_spec(id).await else {
372 return Err(AgentError::PresetNotFound(id.to_string()));
373 };
374 let json_str = preset_spec.to_json()?;
375 std::fs::write(path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
376 Ok(())
377 }
378
379 pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
383 let Some(preset) = self.get_preset(id) else {
384 return None;
385 };
386 let mut preset_spec = {
387 let preset = preset.lock().await;
388 preset.spec().clone()
389 };
390
391 let mut agent_specs = Vec::new();
393 for agent in &preset_spec.agents {
394 if let Some(spec) = self.get_agent_spec(&agent.id).await {
395 agent_specs.push(spec);
396 }
397 }
398 preset_spec.agents = agent_specs;
399
400 Some(preset_spec)
403 }
404
405 pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
407 let preset = self
408 .get_preset(id)
409 .ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
410 let mut preset = preset.lock().await;
411 preset.update_spec(value)?;
412 Ok(())
413 }
414
415 pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
419 let Some(preset) = self.get_preset(id) else {
420 return None;
421 };
422 Some(PresetInfo::from(&*preset.lock().await))
423 }
424
425 pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
427 let presets = {
428 let presets = self.presets.lock().unwrap();
429 presets.values().cloned().collect::<Vec<_>>()
430 };
431 let mut preset_infos = Vec::new();
432 for preset in presets {
433 let preset_guard = preset.lock().await;
434 preset_infos.push(PresetInfo::from(&*preset_guard));
435 }
436 preset_infos
437 }
438
439 pub fn register_agent_definiton(&self, def: AgentDefinition) {
448 let def_name = def.name.clone();
449 let def_global_configs = def.global_configs.clone();
450
451 let mut defs = self.defs.lock().unwrap();
452 defs.insert(def.name.clone(), def);
453
454 if let Some(def_global_configs) = def_global_configs {
456 let mut new_configs = AgentConfigs::default();
457 for (key, config_entry) in def_global_configs.iter() {
458 new_configs.set(key.clone(), config_entry.value.clone());
459 }
460 self.set_global_configs(def_name, new_configs);
461 }
462 }
463
464 pub fn get_agent_definitions(&self) -> AgentDefinitions {
468 let defs = self.defs.lock().unwrap();
469 defs.clone()
470 }
471
472 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
476 let defs = self.defs.lock().unwrap();
477 defs.get(def_name).cloned()
478 }
479
480 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
482 let defs = self.defs.lock().unwrap();
483 let Some(def) = defs.get(def_name) else {
484 return None;
485 };
486 def.configs.clone()
487 }
488
489 pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
491 let agent = {
492 let agents = self.agents.lock().unwrap();
493 let Some(agent) = agents.get(agent_id) else {
494 return None;
495 };
496 agent.clone()
497 };
498 let agent = agent.lock().await;
499 Some(agent.spec().clone())
500 }
501
502 pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
504 let agent = {
505 let agents = self.agents.lock().unwrap();
506 let Some(agent) = agents.get(agent_id) else {
507 return Err(AgentError::AgentNotFound(agent_id.to_string()));
508 };
509 agent.clone()
510 };
511 let mut agent = agent.lock().await;
512 agent.update_spec(value)?;
513 Ok(())
514 }
515
516 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
518 let def = self
519 .get_agent_definition(def_name)
520 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
521 Ok(def.to_spec())
522 }
523
524 pub async fn add_agent(
530 &self,
531 preset_id: String,
532 mut spec: AgentSpec,
533 ) -> Result<String, AgentError> {
534 let preset = self
535 .get_preset(&preset_id)
536 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
537
538 let id = new_id();
539 spec.id = id.clone();
540 self.add_agent_internal(preset_id, spec.clone())?;
541
542 let mut preset = preset.lock().await;
543 preset.add_agent(spec.clone());
544
545 Ok(id)
546 }
547
548 fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
549 let mut agents = self.agents.lock().unwrap();
550 if agents.contains_key(&spec.id) {
551 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
552 }
553 let spec_id = spec.id.clone();
554 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
555 agent.set_preset_id(preset_id);
556 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
557 Ok(())
558 }
559
560 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
562 let agents = self.agents.lock().unwrap();
563 agents.get(agent_id).cloned()
564 }
565
566 pub async fn add_connection(
571 &self,
572 preset_id: &str,
573 connection: ConnectionSpec,
574 ) -> Result<(), AgentError> {
575 {
577 let agents = self.agents.lock().unwrap();
578 if !agents.contains_key(&connection.source) {
579 return Err(AgentError::AgentNotFound(connection.source.to_string()));
580 }
581 if !agents.contains_key(&connection.target) {
582 return Err(AgentError::AgentNotFound(connection.target.to_string()));
583 }
584 }
585
586 if connection.source_handle.is_empty() {
588 return Err(AgentError::EmptySourceHandle);
589 }
590 if connection.target_handle.is_empty() {
591 return Err(AgentError::EmptyTargetHandle);
592 }
593
594 let preset = self
595 .get_preset(preset_id)
596 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
597 let mut preset = preset.lock().await;
598 preset.add_connection(connection.clone());
599 self.add_connection_internal(connection)?;
600 Ok(())
601 }
602
603 fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
604 let mut connections = self.connections.lock().unwrap();
605 if let Some(targets) = connections.get_mut(&connection.source) {
606 if targets
607 .iter()
608 .any(|(target, source_handle, target_handle)| {
609 *target == connection.target
610 && *source_handle == connection.source_handle
611 && *target_handle == connection.target_handle
612 })
613 {
614 return Err(AgentError::ConnectionAlreadyExists);
615 }
616 targets.push((
617 connection.target,
618 connection.source_handle,
619 connection.target_handle,
620 ));
621 } else {
622 connections.insert(
623 connection.source,
624 vec![(
625 connection.target,
626 connection.source_handle,
627 connection.target_handle,
628 )],
629 );
630 }
631 Ok(())
632 }
633
634 pub async fn add_agents_and_connections(
639 &self,
640 preset_id: &str,
641 agents: &Vec<AgentSpec>,
642 connections: &Vec<ConnectionSpec>,
643 ) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
644 let (agents, connections) = update_ids(agents, connections);
645
646 let preset = self
647 .get_preset(preset_id)
648 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
649 let mut preset = preset.lock().await;
650
651 for agent in &agents {
652 self.add_agent_internal(preset_id.to_string(), agent.clone())?;
653 preset.add_agent(agent.clone());
654 }
655
656 for connection in &connections {
657 self.add_connection_internal(connection.clone())?;
658 preset.add_connection(connection.clone());
659 }
660
661 Ok((agents, connections))
662 }
663
664 pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
668 {
669 let preset = self
670 .get_preset(preset_id)
671 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
672 let mut preset = preset.lock().await;
673 preset.remove_agent(agent_id);
674 }
675 if let Err(e) = self.remove_agent_internal(agent_id).await {
676 return Err(e);
677 }
678 Ok(())
679 }
680
681 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
682 self.stop_agent(agent_id).await?;
683
684 {
686 let mut connections = self.connections.lock().unwrap();
687 let mut sources_to_remove = Vec::new();
688 for (source, targets) in connections.iter_mut() {
689 targets.retain(|(target, _, _)| target != agent_id);
690 if targets.is_empty() {
691 sources_to_remove.push(source.clone());
692 }
693 }
694 for source in sources_to_remove {
695 connections.swap_remove(&source);
696 }
697 connections.swap_remove(agent_id);
698 }
699
700 {
702 let mut agents = self.agents.lock().unwrap();
703 agents.swap_remove(agent_id);
704 }
705
706 Ok(())
707 }
708
709 pub async fn remove_connection(
711 &self,
712 preset_id: &str,
713 connection: &ConnectionSpec,
714 ) -> Result<(), AgentError> {
715 let preset = self
716 .get_preset(preset_id)
717 .ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
718 let mut preset = preset.lock().await;
719 let Some(connection) = preset.remove_connection(connection) else {
720 return Err(AgentError::ConnectionNotFound(format!(
721 "{}:{}->{}:{}",
722 connection.source,
723 connection.source_handle,
724 connection.target,
725 connection.target_handle
726 )));
727 };
728 self.remove_connection_internal(&connection);
729 Ok(())
730 }
731
732 fn remove_connection_internal(&self, connection: &ConnectionSpec) {
733 let mut connections = self.connections.lock().unwrap();
734 if let Some(targets) = connections.get_mut(&connection.source) {
735 targets.retain(|(target, source_handle, target_handle)| {
736 *target != connection.target
737 || *source_handle != connection.source_handle
738 || *target_handle != connection.target_handle
739 });
740 if targets.is_empty() {
741 connections.swap_remove(&connection.source);
742 }
743 }
744 }
745
746 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
755 let agent = {
756 let agents = self.agents.lock().unwrap();
757 let Some(a) = agents.get(agent_id) else {
758 return Err(AgentError::AgentNotFound(agent_id.to_string()));
759 };
760 a.clone()
761 };
762 let def_name = {
763 let agent = agent.lock().await;
764 agent.def_name().to_string()
765 };
766 let uses_native_thread = {
767 let defs = self.defs.lock().unwrap();
768 let Some(def) = defs.get(&def_name) else {
769 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
770 };
771 def.native_thread
772 };
773 let agent_status = {
774 let agent = agent.lock().await;
776 agent.status().clone()
777 };
778 if agent_status == AgentStatus::Init {
779 log::info!("Starting agent {}", agent_id);
780
781 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
782
783 {
784 let mut agent_txs = self.agent_txs.lock().unwrap();
785 agent_txs.insert(agent_id.to_string(), tx.clone());
786 };
787
788 let agent_clone = agent.clone();
789 let agent_id_clone = agent_id.to_string();
790
791 let agent_loop = async move {
792 {
793 let mut agent_guard = agent_clone.lock().await;
794 if let Err(e) = agent_guard.start().await {
795 log::error!("Failed to start agent {}: {}", agent_id_clone, e);
796 return;
797 }
798 }
799
800 while let Some(message) = rx.recv().await {
801 match message {
802 AgentMessage::Input { ctx, port, value } => {
803 agent_clone
804 .lock()
805 .await
806 .process(ctx, port, value)
807 .await
808 .unwrap_or_else(|e| {
809 log::error!("Process Error {}: {}", agent_id_clone, e);
810 });
811 }
812 AgentMessage::Config { key, value } => {
813 agent_clone
814 .lock()
815 .await
816 .set_config(key, value)
817 .unwrap_or_else(|e| {
818 log::error!("Config Error {}: {}", agent_id_clone, e);
819 });
820 }
821 AgentMessage::Configs { configs } => {
822 agent_clone
823 .lock()
824 .await
825 .set_configs(configs)
826 .unwrap_or_else(|e| {
827 log::error!("Configs Error {}: {}", agent_id_clone, e);
828 });
829 }
830 AgentMessage::Stop => {
831 rx.close();
832 break;
833 }
834 }
835 }
836 };
837
838 if uses_native_thread {
839 std::thread::spawn(move || {
840 let rt = tokio::runtime::Builder::new_current_thread()
841 .enable_all()
842 .build()
843 .unwrap();
844 rt.block_on(agent_loop);
845 });
846 } else {
847 tokio::spawn(agent_loop);
848 }
849 }
850 Ok(())
851 }
852
853 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
858 {
859 let mut agent_txs = self.agent_txs.lock().unwrap();
861 if let Some(tx) = agent_txs.swap_remove(agent_id) {
862 if let Err(e) = tx.try_send(AgentMessage::Stop) {
863 log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
864 }
865 }
866 }
867
868 let agent = {
869 let agents = self.agents.lock().unwrap();
870 let Some(a) = agents.get(agent_id) else {
871 return Err(AgentError::AgentNotFound(agent_id.to_string()));
872 };
873 a.clone()
874 };
875 let mut agent_guard = agent.lock().await;
876 if *agent_guard.status() == AgentStatus::Start {
877 log::info!("Stopping agent {}", agent_id);
878 agent_guard.stop().await?;
879 }
880
881 Ok(())
882 }
883
884 pub async fn set_agent_configs(
886 &self,
887 agent_id: String,
888 configs: AgentConfigs,
889 ) -> Result<(), AgentError> {
890 let tx = {
891 let agent_txs = self.agent_txs.lock().unwrap();
892 agent_txs.get(&agent_id).cloned()
893 };
894
895 let Some(tx) = tx else {
896 let agent = {
898 let agents = self.agents.lock().unwrap();
899 let Some(a) = agents.get(&agent_id) else {
900 return Err(AgentError::AgentNotFound(agent_id.to_string()));
901 };
902 a.clone()
903 };
904 agent.lock().await.set_configs(configs.clone())?;
905 return Ok(());
906 };
907 let message = AgentMessage::Configs { configs };
908 tx.send(message).await.map_err(|_| {
909 AgentError::SendMessageFailed("Failed to send config message".to_string())
910 })?;
911 Ok(())
912 }
913
914 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
916 let global_configs_map = self.global_configs_map.lock().unwrap();
917 global_configs_map.get(def_name).cloned()
918 }
919
920 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
922 let mut global_configs_map = self.global_configs_map.lock().unwrap();
923
924 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
925 global_configs_map.insert(def_name, configs);
926 return;
927 };
928
929 for (key, value) in configs {
930 existing_configs.set(key, value);
931 }
932 }
933
934 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
936 let global_configs_map = self.global_configs_map.lock().unwrap();
937 global_configs_map.clone()
938 }
939
940 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
942 for (agent_name, new_configs) in new_configs_map {
943 self.set_global_configs(agent_name, new_configs);
944 }
945 }
946
947 pub(crate) async fn agent_input(
949 &self,
950 agent_id: String,
951 ctx: AgentContext,
952 port: String,
953 value: AgentValue,
954 ) -> Result<(), AgentError> {
955 let message = if port.starts_with("config:") {
956 let config_key = port[7..].to_string();
957 AgentMessage::Config {
958 key: config_key,
959 value,
960 }
961 } else {
962 AgentMessage::Input {
963 ctx,
964 port: port.clone(),
965 value,
966 }
967 };
968
969 let tx = {
970 let agent_txs = self.agent_txs.lock().unwrap();
971 agent_txs.get(&agent_id).cloned()
972 };
973
974 let Some(tx) = tx else {
975 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
977 let agents = self.agents.lock().unwrap();
978 let Some(a) = agents.get(&agent_id) else {
979 return Err(AgentError::AgentNotFound(agent_id.to_string()));
980 };
981 a.clone()
982 };
983 if let AgentMessage::Config { key, value } = message {
984 agent.lock().await.set_config(key, value)?;
985 }
986 return Ok(());
987 };
988 tx.send(message).await.map_err(|_| {
989 AgentError::SendMessageFailed("Failed to send input message".to_string())
990 })?;
991
992 self.emit_agent_input(agent_id.to_string(), port);
993
994 Ok(())
995 }
996
997 pub async fn send_agent_out(
999 &self,
1000 agent_id: String,
1001 ctx: AgentContext,
1002 port: String,
1003 value: AgentValue,
1004 ) -> Result<(), AgentError> {
1005 message::send_agent_out(self, agent_id, ctx, port, value).await
1006 }
1007
1008 pub fn try_send_agent_out(
1010 &self,
1011 agent_id: String,
1012 ctx: AgentContext,
1013 port: String,
1014 value: AgentValue,
1015 ) -> Result<(), AgentError> {
1016 message::try_send_agent_out(self, agent_id, ctx, port, value)
1017 }
1018
1019 pub async fn write_external_input(
1044 &self,
1045 name: String,
1046 value: AgentValue,
1047 ) -> Result<(), AgentError> {
1048 self.send_external_output(name, AgentContext::new(), value).await
1049 }
1050
1051 pub async fn write_local_input(
1053 &self,
1054 preset_id: &str,
1055 name: &str,
1056 value: AgentValue,
1057 ) -> Result<(), AgentError> {
1058 let channel_name = format!("%{}/{}", preset_id, name);
1059 self.send_external_output(channel_name, AgentContext::new(), value)
1060 .await
1061 }
1062
1063 pub(crate) async fn send_external_output(
1064 &self,
1065 name: String,
1066 ctx: AgentContext,
1067 value: AgentValue,
1068 ) -> Result<(), AgentError> {
1069 message::send_external_output(self, name, ctx, value).await
1070 }
1071
1072 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
1073 let (tx, mut rx) = mpsc::channel(4096);
1075 {
1076 let mut tx_lock = self.tx.lock().unwrap();
1077 *tx_lock = Some(tx);
1078 }
1079
1080 let ma = self.clone();
1082 tokio::spawn(async move {
1083 while let Some(message) = rx.recv().await {
1084 use AgentEventMessage::*;
1085
1086 match message {
1087 AgentOut {
1088 agent,
1089 ctx,
1090 port,
1091 value,
1092 } => {
1093 message::agent_out(&ma, agent, ctx, port, value).await;
1094 }
1095 ExternalOutput { name, ctx, value } => {
1096 message::external_input(&ma, name, ctx, value).await;
1097 }
1098 }
1099 }
1100 });
1101
1102 tokio::task::yield_now().await;
1103
1104 Ok(())
1105 }
1106
1107 pub fn subscribe(&self) -> broadcast::Receiver<ModularAgentEvent> {
1114 self.observers.subscribe()
1115 }
1116
1117 pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
1159 where
1160 F: FnMut(ModularAgentEvent) -> Option<T> + Send + 'static,
1161 T: Send + 'static,
1162 {
1163 let (tx, rx) = mpsc::unbounded_channel();
1164 let mut event_rx = self.subscribe();
1165
1166 tokio::spawn(async move {
1167 loop {
1168 match event_rx.recv().await {
1169 Ok(event) => {
1170 if let Some(mapped_event) = filter_map(event) {
1171 if tx.send(mapped_event).is_err() {
1172 break;
1174 }
1175 }
1176 }
1177 Err(RecvError::Lagged(n)) => {
1178 log::warn!("Event subscriber lagged by {} events", n);
1179 }
1180 Err(RecvError::Closed) => {
1181 break;
1183 }
1184 }
1185 }
1186 });
1187 rx
1188 }
1189
1190 pub(crate) fn emit_agent_config_updated(
1191 &self,
1192 agent_id: String,
1193 key: String,
1194 value: AgentValue,
1195 ) {
1196 self.notify_observers(ModularAgentEvent::AgentConfigUpdated(agent_id, key, value));
1197 }
1198
1199 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
1200 self.notify_observers(ModularAgentEvent::AgentError(agent_id, message));
1201 }
1202
1203 pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
1204 self.notify_observers(ModularAgentEvent::AgentIn(agent_id, port));
1205 }
1206
1207 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
1208 self.notify_observers(ModularAgentEvent::AgentSpecUpdated(agent_id));
1209 }
1210
1211 pub(crate) fn emit_external_output(&self, name: String, value: AgentValue) {
1212 self.notify_observers(ModularAgentEvent::ExternalOutput(name, value));
1217 }
1218
1219 fn notify_observers(&self, event: ModularAgentEvent) {
1220 let _ = self.observers.send(event);
1221 }
1222}
1223
1224#[derive(Clone, Debug)]
1246pub enum ModularAgentEvent {
1247 AgentConfigUpdated(String, String, AgentValue),
1251
1252 AgentError(String, String),
1256
1257 AgentIn(String, String),
1261
1262 AgentSpecUpdated(String),
1266
1267 ExternalOutput(String, AgentValue),
1275}