1#[cfg(feature = "dora")]
10use crate::dora_adapter::{
11 ChannelConfig, DataflowConfig, DoraAgentNode, DoraChannel, DoraDataflow, DoraError,
12 DoraNodeConfig, DoraResult, MessageEnvelope,
13};
14use crate::interrupt::AgentInterrupt;
15#[cfg(feature = "dora")]
16use crate::message::AgentMessage;
17use crate::{AgentConfig, AgentMetadata, MoFAAgent};
18use mofa_kernel::AgentPlugin;
19use mofa_kernel::message::AgentEvent;
20use std::collections::HashMap;
21#[cfg(feature = "dora")]
22use std::sync::Arc;
23use std::time::Duration;
24#[cfg(feature = "dora")]
25use tokio::sync::RwLock;
26#[cfg(feature = "dora")]
27use tracing::{debug, info};
28
29pub struct AgentBuilder {
31 agent_id: String,
32 name: String,
33 capabilities: Vec<String>,
34 dependencies: Vec<String>,
35 plugins: Vec<Box<dyn AgentPlugin>>,
36 node_config: HashMap<String, String>,
37 inputs: Vec<String>,
38 outputs: Vec<String>,
39 max_concurrent_tasks: usize,
40 default_timeout: Duration,
41}
42
43impl AgentBuilder {
44 pub fn new(agent_id: &str, name: &str) -> Self {
46 Self {
47 agent_id: agent_id.to_string(),
48 name: name.to_string(),
49 capabilities: Vec::new(),
50 dependencies: Vec::new(),
51 plugins: Vec::new(),
52 node_config: HashMap::new(),
53 inputs: vec!["task_input".to_string()],
54 outputs: vec!["task_output".to_string()],
55 max_concurrent_tasks: 10,
56 default_timeout: Duration::from_secs(30),
57 }
58 }
59
60 pub fn with_capability(mut self, capability: &str) -> Self {
62 self.capabilities.push(capability.to_string());
63 self
64 }
65
66 pub fn with_capabilities(mut self, capabilities: Vec<&str>) -> Self {
68 for cap in capabilities {
69 self.capabilities.push(cap.to_string());
70 }
71 self
72 }
73
74 pub fn with_dependency(mut self, dependency: &str) -> Self {
76 self.dependencies.push(dependency.to_string());
77 self
78 }
79
80 pub fn with_plugin(mut self, plugin: Box<dyn AgentPlugin>) -> Self {
82 self.plugins.push(plugin);
83 self
84 }
85
86 pub fn with_input(mut self, input: &str) -> Self {
88 self.inputs.push(input.to_string());
89 self
90 }
91
92 pub fn with_output(mut self, output: &str) -> Self {
94 self.outputs.push(output.to_string());
95 self
96 }
97
98 pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
100 self.max_concurrent_tasks = max;
101 self
102 }
103
104 pub fn with_timeout(mut self, timeout: Duration) -> Self {
106 self.default_timeout = timeout;
107 self
108 }
109
110 pub fn with_config(mut self, key: &str, value: &str) -> Self {
112 self.node_config.insert(key.to_string(), value.to_string());
113 self
114 }
115
116 pub fn build_config(&self) -> AgentConfig {
118 AgentConfig {
119 agent_id: self.agent_id.clone(),
120 name: self.name.clone(),
121 node_config: self.node_config.clone(),
122 }
123 }
124
125 pub fn build_metadata(&self) -> AgentMetadata {
127 use mofa_kernel::agent::AgentCapabilities;
128 use mofa_kernel::agent::AgentState;
129
130 let agent_capabilities = AgentCapabilities::builder()
132 .tags(self.capabilities.clone())
133 .build();
134
135 AgentMetadata {
136 id: self.agent_id.clone(),
137 name: self.name.clone(),
138 description: None,
139 version: None,
140 capabilities: agent_capabilities,
141 state: AgentState::Created,
142 }
143 }
144
145 #[cfg(feature = "dora")]
147 pub fn build_node_config(&self) -> DoraNodeConfig {
148 DoraNodeConfig {
149 node_id: self.agent_id.clone(),
150 name: self.name.clone(),
151 inputs: self.inputs.clone(),
152 outputs: self.outputs.clone(),
153 event_buffer_size: self.max_concurrent_tasks * 10,
154 default_timeout: self.default_timeout,
155 custom_config: self.node_config.clone(),
156 }
157 }
158
159 #[cfg(feature = "dora")]
161 pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
162 let node_config = self.build_node_config();
163 let metadata = self.build_metadata();
164 let config = self.build_config();
165
166 let node = DoraAgentNode::new(node_config);
167 let interrupt = node.interrupt().clone();
168
169 Ok(AgentRuntime {
170 agent,
171 node: Arc::new(node),
172 metadata,
173 config,
174 interrupt,
175 plugins: self.plugins,
176 })
177 }
178
179 #[cfg(feature = "dora")]
181 pub async fn build_and_start<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
182 let runtime: AgentRuntime<A> = self.with_agent(agent).await?;
183 runtime.start().await?;
184 Ok(runtime)
185 }
186
187 #[cfg(not(feature = "dora"))]
189 pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> anyhow::Result<SimpleAgentRuntime<A>> {
190 let metadata = self.build_metadata();
191 let config = self.build_config();
192 let interrupt = AgentInterrupt::new();
193
194 Ok(SimpleAgentRuntime {
195 agent,
196 metadata,
197 config,
198 interrupt,
199 plugins: self.plugins,
200 inputs: self.inputs,
201 outputs: self.outputs,
202 max_concurrent_tasks: self.max_concurrent_tasks,
203 default_timeout: self.default_timeout,
204 })
205 }
206
207 #[cfg(not(feature = "dora"))]
209 pub async fn build_and_start<A: MoFAAgent>(
210 self,
211 agent: A,
212 ) -> anyhow::Result<SimpleAgentRuntime<A>> {
213 let mut runtime = self.with_agent(agent).await?;
214 runtime.start().await?;
215 Ok(runtime)
216 }
217}
218
219#[cfg(feature = "dora")]
221pub struct AgentRuntime<A: MoFAAgent> {
222 agent: A,
223 node: Arc<DoraAgentNode>,
224 metadata: AgentMetadata,
225 config: AgentConfig,
226 interrupt: AgentInterrupt,
227 plugins: Vec<Box<dyn AgentPlugin>>,
228}
229
230#[cfg(feature = "dora")]
231impl<A: MoFAAgent> AgentRuntime<A> {
232 pub fn agent(&self) -> &A {
234 &self.agent
235 }
236
237 pub fn agent_mut(&mut self) -> &mut A {
239 &mut self.agent
240 }
241
242 pub fn node(&self) -> &Arc<DoraAgentNode> {
244 &self.node
245 }
246
247 pub fn metadata(&self) -> &AgentMetadata {
249 &self.metadata
250 }
251
252 pub fn config(&self) -> &AgentConfig {
254 &self.config
255 }
256
257 pub fn interrupt(&self) -> &AgentInterrupt {
259 &self.interrupt
260 }
261
262 pub async fn init_plugins(&mut self) -> DoraResult<()> {
264 for plugin in &mut self.plugins {
265 plugin
266 .init_plugin()
267 .await
268 .map_err(|e| DoraError::OperatorError(e.to_string()))?;
269 }
270 Ok(())
271 }
272
273 pub async fn start(&self) -> DoraResult<()> {
275 self.node.init().await?;
276 info!("AgentRuntime {} started", self.metadata.id);
277 Ok(())
278 }
279
280 pub async fn run_event_loop(&mut self) -> DoraResult<()> {
282 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
284 self.agent
285 .initialize(&context)
286 .await
287 .map_err(|e| DoraError::Internal(e.to_string()))?;
288
289 self.init_plugins().await?;
291
292 let event_loop = self.node.create_event_loop();
293
294 loop {
295 if event_loop.should_interrupt() {
297 debug!("Interrupt signal received");
298 self.interrupt.reset();
299 }
300
301 match event_loop.next_event().await {
303 Some(AgentEvent::Shutdown) => {
304 info!("Received shutdown event");
305 break;
306 }
307 Some(event) => {
308 if self.interrupt.check() {
310 debug!("Interrupt signal received");
311 self.interrupt.reset();
312 }
313
314 use mofa_kernel::agent::types::AgentInput;
316 use mofa_kernel::message::TaskRequest;
317
318 let input = match event {
319 AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
320 AgentEvent::Custom(data, _) => AgentInput::text(data),
321 _ => AgentInput::text(format!("{:?}", event)),
322 };
323
324 self.agent
325 .execute(input, &context)
326 .await
327 .map_err(|e| DoraError::Internal(e.to_string()))?;
328 }
329 None => {
330 tokio::time::sleep(Duration::from_millis(10)).await;
332 }
333 }
334 }
335
336 self.agent
338 .shutdown()
339 .await
340 .map_err(|e| DoraError::Internal(e.to_string()))?;
341
342 Ok(())
343 }
344
345 pub async fn stop(&self) -> DoraResult<()> {
347 self.interrupt.trigger();
348 self.node.stop().await?;
349 info!("AgentRuntime {} stopped", self.metadata.id);
350 Ok(())
351 }
352
353 pub async fn send_output(&self, output_id: &str, message: &AgentMessage) -> DoraResult<()> {
355 self.node.send_message(output_id, message).await
356 }
357
358 pub async fn inject_event(&self, event: AgentEvent) -> DoraResult<()> {
360 self.node.inject_event(event).await
361 }
362}
363
364#[cfg(not(feature = "dora"))]
370pub struct SimpleAgentRuntime<A: MoFAAgent> {
371 agent: A,
372 metadata: AgentMetadata,
373 config: AgentConfig,
374 interrupt: AgentInterrupt,
375 plugins: Vec<Box<dyn AgentPlugin>>,
376 inputs: Vec<String>,
377 outputs: Vec<String>,
378 max_concurrent_tasks: usize,
379 default_timeout: Duration,
380}
381
382#[cfg(not(feature = "dora"))]
383impl<A: MoFAAgent> SimpleAgentRuntime<A> {
384 pub fn agent(&self) -> &A {
386 &self.agent
387 }
388
389 pub fn agent_mut(&mut self) -> &mut A {
391 &mut self.agent
392 }
393
394 pub fn metadata(&self) -> &AgentMetadata {
396 &self.metadata
397 }
398
399 pub fn config(&self) -> &AgentConfig {
401 &self.config
402 }
403
404 pub fn interrupt(&self) -> &AgentInterrupt {
406 &self.interrupt
407 }
408
409 pub fn inputs(&self) -> &[String] {
411 &self.inputs
412 }
413
414 pub fn outputs(&self) -> &[String] {
416 &self.outputs
417 }
418
419 pub fn max_concurrent_tasks(&self) -> usize {
421 self.max_concurrent_tasks
422 }
423
424 pub fn default_timeout(&self) -> Duration {
426 self.default_timeout
427 }
428
429 pub async fn init_plugins(&mut self) -> anyhow::Result<()> {
431 for plugin in &mut self.plugins {
432 plugin.init_plugin().await?;
433 }
434 Ok(())
435 }
436
437 pub async fn start(&mut self) -> anyhow::Result<()> {
439 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
441 self.agent.initialize(&context).await?;
442 self.init_plugins().await?;
444 tracing::info!("SimpleAgentRuntime {} started", self.metadata.id);
445 Ok(())
446 }
447
448 pub async fn handle_event(&mut self, event: AgentEvent) -> anyhow::Result<()> {
450 if self.interrupt.check() {
452 tracing::debug!("Interrupt signal received");
453 self.interrupt.reset();
454 }
455
456 use mofa_kernel::agent::types::AgentInput;
458
459 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
460 let input = match event {
461 AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
462 AgentEvent::Shutdown => {
463 tracing::info!("Shutdown event received");
464 return Ok(());
465 }
466 AgentEvent::Custom(data, _) => AgentInput::text(data),
467 _ => AgentInput::text(format!("{:?}", event)),
468 };
469
470 let _output = self.agent.execute(input, &context).await?;
471 Ok(())
472 }
473
474 pub async fn run_with_receiver(
476 &mut self,
477 mut event_rx: tokio::sync::mpsc::Receiver<AgentEvent>,
478 ) -> anyhow::Result<()> {
479 loop {
480 if self.interrupt.check() {
482 tracing::debug!("Interrupt signal received");
484 self.interrupt.reset();
485 }
486
487 match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
489 Ok(Some(AgentEvent::Shutdown)) => {
490 tracing::info!("Received shutdown event");
491 break;
492 }
493 Ok(Some(event)) => {
494 use mofa_kernel::agent::types::AgentInput;
496 let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
497 let input = match event {
498 AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
499 AgentEvent::Custom(data, _) => AgentInput::text(data),
500 _ => AgentInput::text(format!("{:?}", event)),
501 };
502
503 self.agent.execute(input, &context).await?;
504 }
505 Ok(None) => {
506 break;
508 }
509 Err(_) => {
510 continue;
512 }
513 }
514 }
515
516 self.agent.shutdown().await?;
518 Ok(())
519 }
520
521 pub async fn stop(&mut self) -> anyhow::Result<()> {
523 self.interrupt.trigger();
524 self.agent.shutdown().await?;
525 tracing::info!("SimpleAgentRuntime {} stopped", self.metadata.id);
526 Ok(())
527 }
528
529 pub fn trigger_interrupt(&self) {
531 self.interrupt.trigger();
532 }
533}
534
535#[cfg(not(feature = "dora"))]
541pub struct SimpleRuntime {
542 agents: std::sync::Arc<tokio::sync::RwLock<HashMap<String, SimpleAgentInfo>>>,
543 agent_roles: std::sync::Arc<tokio::sync::RwLock<HashMap<String, String>>>,
544 message_bus: std::sync::Arc<SimpleMessageBus>,
545}
546
547#[cfg(not(feature = "dora"))]
549pub struct SimpleAgentInfo {
550 pub metadata: AgentMetadata,
551 pub config: AgentConfig,
552 pub event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
553}
554
555#[cfg(not(feature = "dora"))]
557pub struct SimpleMessageBus {
558 subscribers: tokio::sync::RwLock<HashMap<String, Vec<tokio::sync::mpsc::Sender<AgentEvent>>>>,
559 topic_subscribers: tokio::sync::RwLock<HashMap<String, Vec<String>>>,
560}
561
562#[cfg(not(feature = "dora"))]
563impl SimpleMessageBus {
564 pub fn new() -> Self {
566 Self {
567 subscribers: tokio::sync::RwLock::new(HashMap::new()),
568 topic_subscribers: tokio::sync::RwLock::new(HashMap::new()),
569 }
570 }
571
572 pub async fn register(&self, agent_id: &str, tx: tokio::sync::mpsc::Sender<AgentEvent>) {
574 let mut subs = self.subscribers.write().await;
575 subs.entry(agent_id.to_string())
576 .or_insert_with(Vec::new)
577 .push(tx);
578 }
579
580 pub async fn subscribe(&self, agent_id: &str, topic: &str) {
582 let mut topics = self.topic_subscribers.write().await;
583 topics
584 .entry(topic.to_string())
585 .or_insert_with(Vec::new)
586 .push(agent_id.to_string());
587 }
588
589 pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
591 let subs = self.subscribers.read().await;
592 if let Some(senders) = subs.get(target_id) {
593 for tx in senders {
594 let _ = tx.send(event.clone()).await;
595 }
596 }
597 Ok(())
598 }
599
600 pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
602 let subs = self.subscribers.read().await;
603 for senders in subs.values() {
604 for tx in senders {
605 let _ = tx.send(event.clone()).await;
606 }
607 }
608 Ok(())
609 }
610
611 pub async fn publish(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
613 let topics = self.topic_subscribers.read().await;
614 if let Some(agent_ids) = topics.get(topic) {
615 let subs = self.subscribers.read().await;
616 for agent_id in agent_ids {
617 if let Some(senders) = subs.get(agent_id) {
618 for tx in senders {
619 let _ = tx.send(event.clone()).await;
620 }
621 }
622 }
623 }
624 Ok(())
625 }
626}
627
628#[cfg(not(feature = "dora"))]
629impl Default for SimpleMessageBus {
630 fn default() -> Self {
631 Self::new()
632 }
633}
634
635#[cfg(not(feature = "dora"))]
636impl SimpleRuntime {
637 pub fn new() -> Self {
639 Self {
640 agents: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
641 agent_roles: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
642 message_bus: std::sync::Arc::new(SimpleMessageBus::new()),
643 }
644 }
645
646 pub async fn register_agent(
648 &self,
649 metadata: AgentMetadata,
650 config: AgentConfig,
651 role: &str,
652 ) -> anyhow::Result<tokio::sync::mpsc::Receiver<AgentEvent>> {
653 let agent_id = metadata.id.clone();
654 let (tx, rx) = tokio::sync::mpsc::channel(100);
655
656 self.message_bus.register(&agent_id, tx.clone()).await;
658
659 let mut agents = self.agents.write().await;
661 agents.insert(
662 agent_id.clone(),
663 SimpleAgentInfo {
664 metadata,
665 config,
666 event_tx: tx,
667 },
668 );
669
670 let mut roles = self.agent_roles.write().await;
672 roles.insert(agent_id.clone(), role.to_string());
673
674 tracing::info!("Agent {} registered with role {}", agent_id, role);
675 Ok(rx)
676 }
677
678 pub fn message_bus(&self) -> &std::sync::Arc<SimpleMessageBus> {
680 &self.message_bus
681 }
682
683 pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
685 let roles = self.agent_roles.read().await;
686 roles
687 .iter()
688 .filter(|(_, r)| *r == role)
689 .map(|(id, _)| id.clone())
690 .collect()
691 }
692
693 pub async fn send_to_agent(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
695 self.message_bus.send_to(target_id, event).await
696 }
697
698 pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
700 self.message_bus.broadcast(event).await
701 }
702
703 pub async fn publish_to_topic(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
705 self.message_bus.publish(topic, event).await
706 }
707
708 pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> anyhow::Result<()> {
710 self.message_bus.subscribe(agent_id, topic).await;
711 Ok(())
712 }
713
714 pub async fn stop_all(&self) -> anyhow::Result<()> {
716 self.message_bus.broadcast(AgentEvent::Shutdown).await?;
717 tracing::info!("SimpleRuntime stopped");
718 Ok(())
719 }
720}
721
722#[cfg(not(feature = "dora"))]
723impl Default for SimpleRuntime {
724 fn default() -> Self {
725 Self::new()
726 }
727}
728
729#[cfg(feature = "dora")]
731type AgentNodeMap = HashMap<String, Arc<DoraAgentNode>>;
732
733#[cfg(feature = "dora")]
735pub struct MoFARuntime {
736 dataflow: Option<DoraDataflow>,
737 channel: Arc<DoraChannel>,
738 agents: Arc<RwLock<AgentNodeMap>>,
739 agent_roles: Arc<RwLock<HashMap<String, String>>>,
740}
741
742#[cfg(feature = "dora")]
743impl MoFARuntime {
744 pub async fn new() -> Self {
746 let channel_config = ChannelConfig::default();
747 Self {
748 dataflow: None,
749 channel: Arc::new(DoraChannel::new(channel_config)),
750 agents: Arc::new(RwLock::new(HashMap::new())),
751 agent_roles: Arc::new(RwLock::new(HashMap::new())),
752 }
753 }
754
755 pub async fn with_dataflow(dataflow_config: DataflowConfig) -> Self {
757 let dataflow = DoraDataflow::new(dataflow_config);
758 let channel_config = ChannelConfig::default();
759 Self {
760 dataflow: Some(dataflow),
761 channel: Arc::new(DoraChannel::new(channel_config)),
762 agents: Arc::new(RwLock::new(HashMap::new())),
763 agent_roles: Arc::new(RwLock::new(HashMap::new())),
764 }
765 }
766
767 pub async fn register_agent(&self, node: DoraAgentNode, role: &str) -> DoraResult<()> {
769 let agent_id = node.config().node_id.clone();
770
771 self.channel.register_agent(&agent_id).await?;
773
774 if let Some(ref dataflow) = self.dataflow {
776 dataflow.add_node(node).await?;
777 } else {
778 let mut agents: tokio::sync::RwLockWriteGuard<'_, AgentNodeMap> =
779 self.agents.write().await;
780 agents.insert(agent_id.clone(), Arc::new(node));
781 }
782
783 let mut roles = self.agent_roles.write().await;
785 roles.insert(agent_id.clone(), role.to_string());
786
787 info!("Agent {} registered with role {}", agent_id, role);
788 Ok(())
789 }
790
791 pub async fn connect_agents(
793 &self,
794 source_id: &str,
795 source_output: &str,
796 target_id: &str,
797 target_input: &str,
798 ) -> DoraResult<()> {
799 if let Some(ref dataflow) = self.dataflow {
800 dataflow
801 .connect(source_id, source_output, target_id, target_input)
802 .await?;
803 }
804 Ok(())
805 }
806
807 pub fn channel(&self) -> &Arc<DoraChannel> {
809 &self.channel
810 }
811
812 pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
814 let roles = self.agent_roles.read().await;
815 roles
816 .iter()
817 .filter(|(_, r)| *r == role)
818 .map(|(id, _)| id.clone())
819 .collect()
820 }
821
822 pub async fn send_to_agent(
824 &self,
825 sender_id: &str,
826 receiver_id: &str,
827 message: &AgentMessage,
828 ) -> DoraResult<()> {
829 let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.to(receiver_id);
830 self.channel.send_p2p(envelope).await
831 }
832
833 pub async fn broadcast(&self, sender_id: &str, message: &AgentMessage) -> DoraResult<()> {
835 let envelope = MessageEnvelope::from_agent_message(sender_id, message)?;
836 self.channel.broadcast(envelope).await
837 }
838
839 pub async fn publish_to_topic(
841 &self,
842 sender_id: &str,
843 topic: &str,
844 message: &AgentMessage,
845 ) -> DoraResult<()> {
846 let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.with_topic(topic);
847 self.channel.publish(envelope).await
848 }
849
850 pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> DoraResult<()> {
852 self.channel.subscribe(agent_id, topic).await
853 }
854
855 pub async fn build_and_start(&self) -> DoraResult<()> {
857 if let Some(ref dataflow) = self.dataflow {
858 dataflow.build().await?;
859 dataflow.start().await?;
860 } else {
861 let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
863 for (id, node) in agents.iter() {
864 node.init().await?;
865 debug!("Agent {} initialized", id);
866 }
867 }
868 info!("MoFARuntime started");
869 Ok(())
870 }
871
872 pub async fn stop(&self) -> DoraResult<()> {
874 if let Some(ref dataflow) = self.dataflow {
875 dataflow.stop().await?;
876 } else {
877 let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
878 for node in agents.values() {
879 node.stop().await?;
880 }
881 }
882 info!("MoFARuntime stopped");
883 Ok(())
884 }
885
886 pub async fn pause(&self) -> DoraResult<()> {
888 if let Some(ref dataflow) = self.dataflow {
889 dataflow.pause().await?;
890 }
891 Ok(())
892 }
893
894 pub async fn resume(&self) -> DoraResult<()> {
896 if let Some(ref dataflow) = self.dataflow {
897 dataflow.resume().await?;
898 }
899 Ok(())
900 }
901}