1#[cfg(feature = "monitoring")]
2pub use mofa_monitoring::*;
3
4pub mod agent;
21pub mod builder;
22pub mod config;
23pub mod interrupt;
24pub mod runner;
25
26#[cfg(feature = "dora")]
28pub mod dora_adapter;
29
30pub use interrupt::*;
45
46pub use mofa_kernel::agent::MoFAAgent;
48
49pub use mofa_kernel::agent::AgentMetadata;
50pub use mofa_kernel::core::AgentConfig;
52pub use mofa_kernel::message::{AgentEvent, AgentMessage};
53
54pub use mofa_kernel::plugin::AgentPlugin;
56
57use mofa_plugins::AgentPlugin as PluginAgent;
62
63use std::collections::HashMap;
65use std::time::Duration;
66
67#[cfg(feature = "dora")]
69use crate::dora_adapter::{
70 ChannelConfig, DataflowConfig, DoraAgentNode, DoraChannel, DoraDataflow, DoraError,
71 DoraNodeConfig, DoraResult, MessageEnvelope,
72};
73#[cfg(feature = "dora")]
74use std::sync::Arc;
75#[cfg(feature = "dora")]
76use tokio::sync::RwLock;
77#[cfg(feature = "dora")]
78use tracing::{debug, info};
79
80use mofa_kernel::message::StreamType;
82
83pub struct AgentBuilder {
85 agent_id: String,
86 name: String,
87 capabilities: Vec<String>,
88 dependencies: Vec<String>,
89 plugins: Vec<Box<dyn PluginAgent>>,
90 node_config: HashMap<String, String>,
91 inputs: Vec<String>,
92 outputs: Vec<String>,
93 max_concurrent_tasks: usize,
94 default_timeout: Duration,
95}
96pub use crate::runner::run_agents;
101
102impl AgentBuilder {
103 pub fn new(agent_id: &str, name: &str) -> Self {
105 Self {
106 agent_id: agent_id.to_string(),
107 name: name.to_string(),
108 capabilities: Vec::new(),
109 dependencies: Vec::new(),
110 plugins: Vec::new(),
111 node_config: HashMap::new(),
112 inputs: vec!["task_input".to_string()],
113 outputs: vec!["task_output".to_string()],
114 max_concurrent_tasks: 10,
115 default_timeout: Duration::from_secs(30),
116 }
117 }
118
119 pub fn with_capability(mut self, capability: &str) -> Self {
121 self.capabilities.push(capability.to_string());
122 self
123 }
124
125 pub fn with_capabilities(mut self, capabilities: Vec<&str>) -> Self {
127 for cap in capabilities {
128 self.capabilities.push(cap.to_string());
129 }
130 self
131 }
132
133 pub fn with_dependency(mut self, dependency: &str) -> Self {
135 self.dependencies.push(dependency.to_string());
136 self
137 }
138
139 pub fn with_plugin(mut self, plugin: Box<dyn AgentPlugin>) -> Self {
141 self.plugins.push(plugin);
142 self
143 }
144
145 pub fn with_input(mut self, input: &str) -> Self {
147 self.inputs.push(input.to_string());
148 self
149 }
150
151 pub fn with_output(mut self, output: &str) -> Self {
153 self.outputs.push(output.to_string());
154 self
155 }
156
157 pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
159 self.max_concurrent_tasks = max;
160 self
161 }
162
163 pub fn with_timeout(mut self, timeout: Duration) -> Self {
165 self.default_timeout = timeout;
166 self
167 }
168
169 pub fn with_config(mut self, key: &str, value: &str) -> Self {
171 self.node_config.insert(key.to_string(), value.to_string());
172 self
173 }
174
175 pub fn build_config(&self) -> AgentConfig {
177 AgentConfig {
178 agent_id: self.agent_id.clone(),
179 name: self.name.clone(),
180 node_config: self.node_config.clone(),
181 }
182 }
183
184 pub fn build_metadata(&self) -> AgentMetadata {
186 use mofa_kernel::agent::AgentCapabilities;
187 use mofa_kernel::agent::AgentState;
188
189 let agent_capabilities = AgentCapabilities::builder()
191 .tags(self.capabilities.clone())
192 .build();
193
194 AgentMetadata {
195 id: self.agent_id.clone(),
196 name: self.name.clone(),
197 description: None,
198 version: None,
199 capabilities: agent_capabilities,
200 state: AgentState::Created,
201 }
202 }
203
204 #[cfg(feature = "dora")]
206 pub fn build_node_config(&self) -> DoraNodeConfig {
207 DoraNodeConfig {
208 node_id: self.agent_id.clone(),
209 name: self.name.clone(),
210 inputs: self.inputs.clone(),
211 outputs: self.outputs.clone(),
212 event_buffer_size: self.max_concurrent_tasks * 10,
213 default_timeout: self.default_timeout,
214 custom_config: self.node_config.clone(),
215 }
216 }
217
218 #[cfg(feature = "dora")]
220 pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
221 let node_config = self.build_node_config();
222 let metadata = self.build_metadata();
223 let config = self.build_config();
224
225 let node = DoraAgentNode::new(node_config);
226 let interrupt = node.interrupt().clone();
227
228 Ok(AgentRuntime {
229 agent,
230 node: Arc::new(node),
231 metadata,
232 config,
233 interrupt,
234 plugins: self.plugins,
235 })
236 }
237
238 #[cfg(feature = "dora")]
240 pub async fn build_and_start<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
241 let runtime: AgentRuntime<A> = self.with_agent(agent).await?;
242 runtime.start().await?;
243 Ok(runtime)
244 }
245
246 #[cfg(not(feature = "dora"))]
248 pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> anyhow::Result<SimpleAgentRuntime<A>> {
249 let metadata = self.build_metadata();
250 let config = self.build_config();
251 let interrupt = AgentInterrupt::new();
252
253 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
255
256 Ok(SimpleAgentRuntime {
257 agent,
258 metadata,
259 config,
260 interrupt,
261 plugins: self.plugins,
262 inputs: self.inputs,
263 outputs: self.outputs,
264 max_concurrent_tasks: self.max_concurrent_tasks,
265 default_timeout: self.default_timeout,
266 event_tx,
267 event_rx: Some(event_rx),
268 })
269 }
270
271 #[cfg(not(feature = "dora"))]
273 pub async fn build_and_start<A: MoFAAgent>(
274 self,
275 agent: A,
276 ) -> anyhow::Result<SimpleAgentRuntime<A>> {
277 let mut runtime = self.with_agent(agent).await?;
278 runtime.start().await?;
279 Ok(runtime)
280 }
281}
282
283#[cfg(feature = "dora")]
285pub struct AgentRuntime<A: MoFAAgent> {
286 agent: A,
287 node: Arc<DoraAgentNode>,
288 metadata: AgentMetadata,
289 config: AgentConfig,
290 interrupt: AgentInterrupt,
291 plugins: Vec<Box<dyn AgentPlugin>>,
292}
293
294#[cfg(feature = "dora")]
295impl<A: MoFAAgent> AgentRuntime<A> {
296 pub fn agent(&self) -> &A {
298 &self.agent
299 }
300
301 pub fn agent_mut(&mut self) -> &mut A {
303 &mut self.agent
304 }
305
306 pub fn node(&self) -> &Arc<DoraAgentNode> {
308 &self.node
309 }
310
311 pub fn metadata(&self) -> &AgentMetadata {
313 &self.metadata
314 }
315
316 pub fn config(&self) -> &AgentConfig {
318 &self.config
319 }
320
321 pub fn interrupt(&self) -> &AgentInterrupt {
323 &self.interrupt
324 }
325
326 pub async fn init_plugins(&mut self) -> DoraResult<()> {
328 for plugin in &mut self.plugins {
329 plugin
330 .init_plugin()
331 .await
332 .map_err(|e| DoraError::OperatorError(e.to_string()))?;
333 }
334 Ok(())
335 }
336
337 pub async fn start(&self) -> DoraResult<()> {
339 self.node.init().await?;
340 info!("AgentRuntime {} started", self.metadata.id);
341 Ok(())
342 }
343
344 pub async fn run_event_loop(&mut self) -> DoraResult<()> {
346 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
348 self.agent
349 .initialize(&context)
350 .await
351 .map_err(|e| DoraError::Internal(e.to_string()))?;
352
353 self.init_plugins().await?;
355
356 let event_loop = self.node.create_event_loop();
357
358 loop {
359 if event_loop.should_interrupt() {
361 debug!("Interrupt signal received for {}", self.metadata.id);
362 self.interrupt.reset();
363 }
364
365 match event_loop.next_event().await {
367 Some(AgentEvent::Shutdown) => {
368 info!("Received shutdown event");
369 break;
370 }
371 Some(event) => {
372 if self.interrupt.check() {
374 debug!("Interrupt signal received for {}", self.metadata.id);
375 self.interrupt.reset();
376 }
377
378 use mofa_kernel::agent::types::AgentInput;
380 use mofa_kernel::message::TaskRequest;
381
382 let input = match event.clone() {
383 AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
384 AgentEvent::Custom(data, _) => AgentInput::text(data),
385 _ => AgentInput::text(format!("{:?}", event)),
386 };
387
388 self.agent
389 .execute(input, &context)
390 .await
391 .map_err(|e| DoraError::Internal(e.to_string()))?;
392 }
393 None => {
394 tokio::time::sleep(Duration::from_millis(10)).await;
396 }
397 }
398 }
399
400 self.agent
402 .shutdown()
403 .await
404 .map_err(|e| DoraError::Internal(e.to_string()))?;
405
406 Ok(())
407 }
408
409 pub async fn stop(&self) -> DoraResult<()> {
411 self.interrupt.trigger();
412 self.node.stop().await?;
413 info!("AgentRuntime {} stopped", self.metadata.id);
414 Ok(())
415 }
416
417 pub async fn send_output(&self, output_id: &str, message: &AgentMessage) -> DoraResult<()> {
419 self.node.send_message(output_id, message).await
420 }
421
422 pub async fn inject_event(&self, event: AgentEvent) -> DoraResult<()> {
424 self.node.inject_event(event).await
425 }
426}
427
428#[cfg(not(feature = "dora"))]
434pub struct SimpleAgentRuntime<A: MoFAAgent> {
435 agent: A,
436 metadata: AgentMetadata,
437 config: AgentConfig,
438 interrupt: AgentInterrupt,
439 plugins: Vec<Box<dyn AgentPlugin>>,
440 inputs: Vec<String>,
441 outputs: Vec<String>,
442 max_concurrent_tasks: usize,
443 default_timeout: Duration,
444 event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
446 event_rx: Option<tokio::sync::mpsc::Receiver<AgentEvent>>,
447}
448
449#[cfg(not(feature = "dora"))]
450impl<A: MoFAAgent> SimpleAgentRuntime<A> {
451 pub async fn inject_event(&self, event: AgentEvent) {
452 let _ = self.event_tx.send(event).await;
454 }
455}
456
457#[cfg(not(feature = "dora"))]
458#[cfg(not(feature = "dora"))]
459impl<A: MoFAAgent> SimpleAgentRuntime<A> {
460 pub fn agent(&self) -> &A {
462 &self.agent
463 }
464
465 pub fn agent_mut(&mut self) -> &mut A {
467 &mut self.agent
468 }
469
470 pub fn metadata(&self) -> &AgentMetadata {
472 &self.metadata
473 }
474
475 pub fn config(&self) -> &AgentConfig {
477 &self.config
478 }
479
480 pub fn interrupt(&self) -> &AgentInterrupt {
482 &self.interrupt
483 }
484
485 pub fn inputs(&self) -> &[String] {
487 &self.inputs
488 }
489
490 pub fn outputs(&self) -> &[String] {
492 &self.outputs
493 }
494
495 pub fn max_concurrent_tasks(&self) -> usize {
497 self.max_concurrent_tasks
498 }
499
500 pub fn default_timeout(&self) -> Duration {
502 self.default_timeout
503 }
504
505 pub async fn init_plugins(&mut self) -> anyhow::Result<()> {
507 for plugin in &mut self.plugins {
508 plugin.init_plugin().await?;
509 }
510 Ok(())
511 }
512
513 pub async fn start(&mut self) -> anyhow::Result<()> {
515 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
517
518 self.agent.initialize(&context).await?;
520 self.init_plugins().await?;
522 tracing::info!("SimpleAgentRuntime {} started", self.metadata.id);
523 Ok(())
524 }
525
526 pub async fn handle_event(&mut self, event: AgentEvent) -> anyhow::Result<()> {
528 if self.interrupt.check() {
531 tracing::debug!("Interrupt signal received for {}", self.metadata.id);
533 self.interrupt.reset();
534 }
535
536 use mofa_kernel::agent::types::AgentInput;
538
539 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
540
541 let input = match event {
543 AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
544 AgentEvent::Shutdown => {
545 tracing::info!("Shutdown event received for {}", self.metadata.id);
546 return Ok(());
547 }
548 AgentEvent::Custom(data, _) => AgentInput::text(data),
549 _ => AgentInput::text(format!("{:?}", event)),
550 };
551
552 let _output = self.agent.execute(input, &context).await?;
553 Ok(())
554 }
555
556 pub async fn run(&mut self) -> anyhow::Result<()> {
558 let event_rx = self
560 .event_rx
561 .take()
562 .ok_or_else(|| anyhow::anyhow!("Event receiver already taken"))?;
563
564 self.run_with_receiver(event_rx).await
565 }
566
567 pub async fn run_with_receiver(
569 &mut self,
570 mut event_rx: tokio::sync::mpsc::Receiver<AgentEvent>,
571 ) -> anyhow::Result<()> {
572 loop {
573 if self.interrupt.check() {
575 tracing::debug!("Interrupt signal received for {}", self.metadata.id);
576 self.interrupt.reset();
577 }
578
579 match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
581 Ok(Some(AgentEvent::Shutdown)) => {
582 tracing::info!("Received shutdown event");
583 break;
584 }
585 Ok(Some(event)) => {
586 self.handle_event(event).await?;
588 }
589 Ok(None) => {
590 break;
592 }
593 Err(_) => {
594 continue;
596 }
597 }
598 }
599
600 self.agent.shutdown().await?;
602 Ok(())
603 }
604
605 pub async fn stop(&mut self) -> anyhow::Result<()> {
607 self.interrupt.trigger();
608 self.agent.shutdown().await?;
609 tracing::info!("SimpleAgentRuntime {} stopped", self.metadata.id);
610 Ok(())
611 }
612
613 pub fn trigger_interrupt(&self) {
615 self.interrupt.trigger();
616 }
617}
618
619#[cfg(not(feature = "dora"))]
625pub struct SimpleRuntime {
626 agents: std::sync::Arc<tokio::sync::RwLock<HashMap<String, SimpleAgentInfo>>>,
627 agent_roles: std::sync::Arc<tokio::sync::RwLock<HashMap<String, String>>>,
628 message_bus: std::sync::Arc<SimpleMessageBus>,
629}
630
631#[cfg(not(feature = "dora"))]
633pub struct SimpleAgentInfo {
634 pub metadata: AgentMetadata,
635 pub config: AgentConfig,
636 pub event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
637}
638
639#[cfg(not(feature = "dora"))]
641#[derive(Debug, Clone)]
642pub struct StreamInfo {
643 pub stream_id: String,
644 pub stream_type: StreamType,
645 pub metadata: HashMap<String, String>,
646 pub subscribers: Vec<String>,
647 pub sequence: u64,
648 pub is_paused: bool,
649}
650
651#[cfg(not(feature = "dora"))]
653pub struct SimpleMessageBus {
654 subscribers: tokio::sync::RwLock<HashMap<String, Vec<tokio::sync::mpsc::Sender<AgentEvent>>>>,
655 topic_subscribers: tokio::sync::RwLock<HashMap<String, Vec<String>>>,
656 streams: tokio::sync::RwLock<HashMap<String, StreamInfo>>,
658}
659
660#[cfg(not(feature = "dora"))]
661impl SimpleMessageBus {
662 pub fn new() -> Self {
664 Self {
665 subscribers: tokio::sync::RwLock::new(HashMap::new()),
666 topic_subscribers: tokio::sync::RwLock::new(HashMap::new()),
667 streams: tokio::sync::RwLock::new(HashMap::new()),
668 }
669 }
670
671 pub async fn register(&self, agent_id: &str, tx: tokio::sync::mpsc::Sender<AgentEvent>) {
673 let mut subs = self.subscribers.write().await;
674 subs.entry(agent_id.to_string())
675 .or_insert_with(Vec::new)
676 .push(tx);
677 }
678
679 pub async fn subscribe(&self, agent_id: &str, topic: &str) {
681 let mut topics = self.topic_subscribers.write().await;
682 topics
683 .entry(topic.to_string())
684 .or_insert_with(Vec::new)
685 .push(agent_id.to_string());
686 }
687
688 pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
690 let subs = self.subscribers.read().await;
691 if let Some(senders) = subs.get(target_id) {
692 for tx in senders {
693 let _ = tx.send(event.clone()).await;
694 }
695 }
696 Ok(())
697 }
698
699 pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
701 let subs = self.subscribers.read().await;
702 for senders in subs.values() {
703 for tx in senders {
704 let _ = tx.send(event.clone()).await;
705 }
706 }
707 Ok(())
708 }
709
710 pub async fn publish(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
712 let topics = self.topic_subscribers.read().await;
713 if let Some(agent_ids) = topics.get(topic) {
714 let subs = self.subscribers.read().await;
715 for agent_id in agent_ids {
716 if let Some(senders) = subs.get(agent_id) {
717 for tx in senders {
718 let _ = tx.send(event.clone()).await;
719 }
720 }
721 }
722 }
723 Ok(())
724 }
725
726 pub async fn create_stream(
732 &self,
733 stream_id: &str,
734 stream_type: StreamType,
735 metadata: HashMap<String, String>,
736 ) -> anyhow::Result<()> {
737 let mut streams = self.streams.write().await;
738 if streams.contains_key(stream_id) {
739 return Err(anyhow::anyhow!("Stream {} already exists", stream_id));
740 }
741
742 let stream_info = StreamInfo {
744 stream_id: stream_id.to_string(),
745 stream_type: stream_type.clone(),
746 metadata: metadata.clone(),
747 subscribers: Vec::new(),
748 sequence: 0,
749 is_paused: false,
750 };
751
752 streams.insert(stream_id.to_string(), stream_info.clone());
753
754 self.broadcast(AgentEvent::StreamCreated {
756 stream_id: stream_id.to_string(),
757 stream_type,
758 metadata,
759 })
760 .await
761 }
762
763 pub async fn close_stream(&self, stream_id: &str, reason: &str) -> anyhow::Result<()> {
765 let mut streams = self.streams.write().await;
766 if let Some(stream_info) = streams.remove(stream_id) {
767 let event = AgentEvent::StreamClosed {
769 stream_id: stream_id.to_string(),
770 reason: reason.to_string(),
771 };
772
773 let subs = self.subscribers.read().await;
775 for agent_id in &stream_info.subscribers {
776 if let Some(senders) = subs.get(agent_id) {
777 for tx in senders {
778 let _ = tx.send(event.clone()).await;
779 }
780 }
781 }
782 }
783 Ok(())
784 }
785
786 pub async fn subscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
788 let mut streams = self.streams.write().await;
789 if let Some(stream_info) = streams.get_mut(stream_id) {
790 if !stream_info.subscribers.contains(&agent_id.to_string()) {
792 stream_info.subscribers.push(agent_id.to_string());
793
794 self.broadcast(AgentEvent::StreamSubscription {
796 stream_id: stream_id.to_string(),
797 subscriber_id: agent_id.to_string(),
798 })
799 .await?;
800 }
801 }
802 Ok(())
803 }
804
805 pub async fn unsubscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
807 let mut streams = self.streams.write().await;
808 if let Some(stream_info) = streams.get_mut(stream_id) {
809 if let Some(pos) = stream_info.subscribers.iter().position(|id| id == agent_id) {
811 stream_info.subscribers.remove(pos);
812
813 self.broadcast(AgentEvent::StreamUnsubscription {
815 stream_id: stream_id.to_string(),
816 subscriber_id: agent_id.to_string(),
817 })
818 .await?;
819 }
820 }
821 Ok(())
822 }
823
824 pub async fn send_stream_message(
826 &self,
827 stream_id: &str,
828 message: Vec<u8>,
829 ) -> anyhow::Result<()> {
830 let mut streams = self.streams.write().await;
831 if let Some(stream_info) = streams.get_mut(stream_id) {
832 if stream_info.is_paused {
834 return Ok(());
835 }
836
837 let sequence = stream_info.sequence;
839 stream_info.sequence += 1;
840
841 let event = AgentEvent::StreamMessage {
843 stream_id: stream_id.to_string(),
844 message,
845 sequence,
846 };
847
848 let subs = self.subscribers.read().await;
850 for agent_id in &stream_info.subscribers {
851 if let Some(senders) = subs.get(agent_id) {
852 for tx in senders {
853 let _ = tx.send(event.clone()).await;
854 }
855 }
856 }
857 }
858 Ok(())
859 }
860
861 pub async fn pause_stream(&self, stream_id: &str) -> anyhow::Result<()> {
863 let mut streams = self.streams.write().await;
864 if let Some(stream_info) = streams.get_mut(stream_id) {
865 stream_info.is_paused = true;
866 }
867 Ok(())
868 }
869
870 pub async fn resume_stream(&self, stream_id: &str) -> anyhow::Result<()> {
872 let mut streams = self.streams.write().await;
873 if let Some(stream_info) = streams.get_mut(stream_id) {
874 stream_info.is_paused = false;
875 }
876 Ok(())
877 }
878
879 pub async fn get_stream_info(&self, stream_id: &str) -> anyhow::Result<Option<StreamInfo>> {
881 let streams = self.streams.read().await;
882 Ok(streams.get(stream_id).cloned())
883 }
884}
885
886#[cfg(not(feature = "dora"))]
887impl Default for SimpleMessageBus {
888 fn default() -> Self {
889 Self::new()
890 }
891}
892
893#[cfg(not(feature = "dora"))]
894impl SimpleRuntime {
895 pub fn new() -> Self {
897 Self {
898 agents: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
899 agent_roles: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
900 message_bus: std::sync::Arc::new(SimpleMessageBus::new()),
901 }
902 }
903
904 pub async fn register_agent(
906 &self,
907 metadata: AgentMetadata,
908 config: AgentConfig,
909 role: &str,
910 ) -> anyhow::Result<tokio::sync::mpsc::Receiver<AgentEvent>> {
911 let agent_id = metadata.id.clone();
912 let (tx, rx) = tokio::sync::mpsc::channel(100);
913
914 self.message_bus.register(&agent_id, tx.clone()).await;
916
917 let mut agents = self.agents.write().await;
919 agents.insert(
920 agent_id.clone(),
921 SimpleAgentInfo {
922 metadata,
923 config,
924 event_tx: tx,
925 },
926 );
927
928 let mut roles = self.agent_roles.write().await;
930 roles.insert(agent_id.clone(), role.to_string());
931
932 tracing::info!("Agent {} registered with role {}", agent_id, role);
933 Ok(rx)
934 }
935
936 pub fn message_bus(&self) -> &std::sync::Arc<SimpleMessageBus> {
938 &self.message_bus
939 }
940
941 pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
943 let roles = self.agent_roles.read().await;
944 roles
945 .iter()
946 .filter(|(_, r)| *r == role)
947 .map(|(id, _)| id.clone())
948 .collect()
949 }
950
951 pub async fn send_to_agent(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
953 self.message_bus.send_to(target_id, event).await
954 }
955
956 pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
958 self.message_bus.broadcast(event).await
959 }
960
961 pub async fn publish_to_topic(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
963 self.message_bus.publish(topic, event).await
964 }
965
966 pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> anyhow::Result<()> {
968 self.message_bus.subscribe(agent_id, topic).await;
969 Ok(())
970 }
971
972 pub async fn create_stream(
978 &self,
979 stream_id: &str,
980 stream_type: StreamType,
981 metadata: std::collections::HashMap<String, String>,
982 ) -> anyhow::Result<()> {
983 self.message_bus
984 .create_stream(stream_id, stream_type, metadata)
985 .await
986 }
987
988 pub async fn close_stream(&self, stream_id: &str, reason: &str) -> anyhow::Result<()> {
990 self.message_bus.close_stream(stream_id, reason).await
991 }
992
993 pub async fn subscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
995 self.message_bus.subscribe_stream(agent_id, stream_id).await
996 }
997
998 pub async fn unsubscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
1000 self.message_bus
1001 .unsubscribe_stream(agent_id, stream_id)
1002 .await
1003 }
1004
1005 pub async fn send_stream_message(
1007 &self,
1008 stream_id: &str,
1009 message: Vec<u8>,
1010 ) -> anyhow::Result<()> {
1011 self.message_bus
1012 .send_stream_message(stream_id, message)
1013 .await
1014 }
1015
1016 pub async fn pause_stream(&self, stream_id: &str) -> anyhow::Result<()> {
1018 self.message_bus.pause_stream(stream_id).await
1019 }
1020
1021 pub async fn resume_stream(&self, stream_id: &str) -> anyhow::Result<()> {
1023 self.message_bus.resume_stream(stream_id).await
1024 }
1025
1026 pub async fn get_stream_info(&self, stream_id: &str) -> anyhow::Result<Option<StreamInfo>> {
1028 self.message_bus.get_stream_info(stream_id).await
1029 }
1030
1031 pub async fn stop_all(&self) -> anyhow::Result<()> {
1033 self.message_bus.broadcast(AgentEvent::Shutdown).await?;
1034 tracing::info!("SimpleRuntime stopped");
1035 Ok(())
1036 }
1037}
1038
1039#[cfg(not(feature = "dora"))]
1040impl Default for SimpleRuntime {
1041 fn default() -> Self {
1042 Self::new()
1043 }
1044}
1045
1046#[cfg(feature = "dora")]
1048type AgentNodeMap = HashMap<String, Arc<DoraAgentNode>>;
1049
1050#[cfg(feature = "dora")]
1052pub struct MoFARuntime {
1053 dataflow: Option<DoraDataflow>,
1054 channel: Arc<DoraChannel>,
1055 agents: Arc<RwLock<AgentNodeMap>>,
1056 agent_roles: Arc<RwLock<HashMap<String, String>>>,
1057}
1058
1059#[cfg(feature = "dora")]
1060impl MoFARuntime {
1061 pub async fn new() -> Self {
1063 let channel_config = ChannelConfig::default();
1064 Self {
1065 dataflow: None,
1066 channel: Arc::new(DoraChannel::new(channel_config)),
1067 agents: Arc::new(RwLock::new(HashMap::new())),
1068 agent_roles: Arc::new(RwLock::new(HashMap::new())),
1069 }
1070 }
1071
1072 pub async fn with_dataflow(dataflow_config: DataflowConfig) -> Self {
1074 let dataflow = DoraDataflow::new(dataflow_config);
1075 let channel_config = ChannelConfig::default();
1076 Self {
1077 dataflow: Some(dataflow),
1078 channel: Arc::new(DoraChannel::new(channel_config)),
1079 agents: Arc::new(RwLock::new(HashMap::new())),
1080 agent_roles: Arc::new(RwLock::new(HashMap::new())),
1081 }
1082 }
1083
1084 pub async fn register_agent(&self, node: DoraAgentNode, role: &str) -> DoraResult<()> {
1086 let agent_id = node.config().node_id.clone();
1087
1088 self.channel.register_agent(&agent_id).await?;
1090
1091 if let Some(ref dataflow) = self.dataflow {
1093 dataflow.add_node(node).await?;
1094 } else {
1095 let mut agents: tokio::sync::RwLockWriteGuard<'_, AgentNodeMap> =
1096 self.agents.write().await;
1097 agents.insert(agent_id.clone(), Arc::new(node));
1098 }
1099
1100 let mut roles = self.agent_roles.write().await;
1102 roles.insert(agent_id.clone(), role.to_string());
1103
1104 info!("Agent {} registered with role {}", agent_id, role);
1105 Ok(())
1106 }
1107
1108 pub async fn connect_agents(
1110 &self,
1111 source_id: &str,
1112 source_output: &str,
1113 target_id: &str,
1114 target_input: &str,
1115 ) -> DoraResult<()> {
1116 if let Some(ref dataflow) = self.dataflow {
1117 dataflow
1118 .connect(source_id, source_output, target_id, target_input)
1119 .await?;
1120 }
1121 Ok(())
1122 }
1123
1124 pub fn channel(&self) -> &Arc<DoraChannel> {
1126 &self.channel
1127 }
1128
1129 pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
1131 let roles = self.agent_roles.read().await;
1132 roles
1133 .iter()
1134 .filter(|(_, r)| *r == role)
1135 .map(|(id, _)| id.clone())
1136 .collect()
1137 }
1138
1139 pub async fn send_to_agent(
1141 &self,
1142 sender_id: &str,
1143 receiver_id: &str,
1144 message: &AgentMessage,
1145 ) -> DoraResult<()> {
1146 let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.to(receiver_id);
1147 self.channel.send_p2p(envelope).await
1148 }
1149
1150 pub async fn broadcast(&self, sender_id: &str, message: &AgentMessage) -> DoraResult<()> {
1152 let envelope = MessageEnvelope::from_agent_message(sender_id, message)?;
1153 self.channel.broadcast(envelope).await
1154 }
1155
1156 pub async fn publish_to_topic(
1158 &self,
1159 sender_id: &str,
1160 topic: &str,
1161 message: &AgentMessage,
1162 ) -> DoraResult<()> {
1163 let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.with_topic(topic);
1164 self.channel.publish(envelope).await
1165 }
1166
1167 pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> DoraResult<()> {
1169 self.channel.subscribe(agent_id, topic).await
1170 }
1171
1172 pub async fn build_and_start(&self) -> DoraResult<()> {
1174 if let Some(ref dataflow) = self.dataflow {
1175 dataflow.build().await?;
1176 dataflow.start().await?;
1177 } else {
1178 let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
1180 for (id, node) in agents.iter() {
1181 node.init().await?;
1182 debug!("Agent {} initialized", id);
1183 }
1184 }
1185 info!("MoFARuntime started");
1186 Ok(())
1187 }
1188
1189 pub async fn stop(&self) -> DoraResult<()> {
1191 if let Some(ref dataflow) = self.dataflow {
1192 dataflow.stop().await?;
1193 } else {
1194 let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
1195 for node in agents.values() {
1196 node.stop().await?;
1197 }
1198 }
1199 info!("MoFARuntime stopped");
1200 Ok(())
1201 }
1202
1203 pub async fn pause(&self) -> DoraResult<()> {
1205 if let Some(ref dataflow) = self.dataflow {
1206 dataflow.pause().await?;
1207 }
1208 Ok(())
1209 }
1210
1211 pub async fn resume(&self) -> DoraResult<()> {
1213 if let Some(ref dataflow) = self.dataflow {
1214 dataflow.resume().await?;
1215 }
1216 Ok(())
1217 }
1218}