#[cfg(feature = "monitoring")]
pub use mofa_monitoring::*;
pub mod agent;
pub mod builder;
pub mod config;
pub mod interrupt;
pub mod runner;
#[cfg(feature = "dora")]
pub mod dora_adapter;
pub use interrupt::*;
pub use mofa_kernel::agent::MoFAAgent;
pub use mofa_kernel::agent::AgentMetadata;
pub use mofa_kernel::core::AgentConfig;
pub use mofa_kernel::message::{AgentEvent, AgentMessage};
pub use mofa_kernel::plugin::AgentPlugin;
use mofa_plugins::AgentPlugin as PluginAgent;
use std::collections::HashMap;
use std::time::Duration;
#[cfg(feature = "dora")]
use crate::dora_adapter::{
ChannelConfig, DataflowConfig, DoraAgentNode, DoraChannel, DoraDataflow, DoraError,
DoraNodeConfig, DoraResult, MessageEnvelope,
};
#[cfg(feature = "dora")]
use std::sync::Arc;
#[cfg(feature = "dora")]
use tokio::sync::RwLock;
#[cfg(feature = "dora")]
use tracing::{debug, info};
use mofa_kernel::message::StreamType;
pub struct AgentBuilder {
agent_id: String,
name: String,
capabilities: Vec<String>,
dependencies: Vec<String>,
plugins: Vec<Box<dyn PluginAgent>>,
node_config: HashMap<String, String>,
inputs: Vec<String>,
outputs: Vec<String>,
max_concurrent_tasks: usize,
default_timeout: Duration,
}
pub use crate::runner::run_agents;
impl AgentBuilder {
pub fn new(agent_id: &str, name: &str) -> Self {
Self {
agent_id: agent_id.to_string(),
name: name.to_string(),
capabilities: Vec::new(),
dependencies: Vec::new(),
plugins: Vec::new(),
node_config: HashMap::new(),
inputs: vec!["task_input".to_string()],
outputs: vec!["task_output".to_string()],
max_concurrent_tasks: 10,
default_timeout: Duration::from_secs(30),
}
}
pub fn with_capability(mut self, capability: &str) -> Self {
self.capabilities.push(capability.to_string());
self
}
pub fn with_capabilities(mut self, capabilities: Vec<&str>) -> Self {
for cap in capabilities {
self.capabilities.push(cap.to_string());
}
self
}
pub fn with_dependency(mut self, dependency: &str) -> Self {
self.dependencies.push(dependency.to_string());
self
}
pub fn with_plugin(mut self, plugin: Box<dyn AgentPlugin>) -> Self {
self.plugins.push(plugin);
self
}
pub fn with_input(mut self, input: &str) -> Self {
self.inputs.push(input.to_string());
self
}
pub fn with_output(mut self, output: &str) -> Self {
self.outputs.push(output.to_string());
self
}
pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
self.max_concurrent_tasks = max;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = timeout;
self
}
pub fn with_config(mut self, key: &str, value: &str) -> Self {
self.node_config.insert(key.to_string(), value.to_string());
self
}
pub fn build_config(&self) -> AgentConfig {
AgentConfig {
agent_id: self.agent_id.clone(),
name: self.name.clone(),
node_config: self.node_config.clone(),
}
}
pub fn build_metadata(&self) -> AgentMetadata {
use mofa_kernel::agent::AgentCapabilities;
use mofa_kernel::agent::AgentState;
let agent_capabilities = AgentCapabilities::builder()
.tags(self.capabilities.clone())
.build();
AgentMetadata {
id: self.agent_id.clone(),
name: self.name.clone(),
description: None,
version: None,
capabilities: agent_capabilities,
state: AgentState::Created,
}
}
#[cfg(feature = "dora")]
pub fn build_node_config(&self) -> DoraNodeConfig {
DoraNodeConfig {
node_id: self.agent_id.clone(),
name: self.name.clone(),
inputs: self.inputs.clone(),
outputs: self.outputs.clone(),
event_buffer_size: self.max_concurrent_tasks * 10,
default_timeout: self.default_timeout,
custom_config: self.node_config.clone(),
}
}
#[cfg(feature = "dora")]
pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
let node_config = self.build_node_config();
let metadata = self.build_metadata();
let config = self.build_config();
let node = DoraAgentNode::new(node_config);
let interrupt = node.interrupt().clone();
Ok(AgentRuntime {
agent,
node: Arc::new(node),
metadata,
config,
interrupt,
plugins: self.plugins,
})
}
#[cfg(feature = "dora")]
pub async fn build_and_start<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
let runtime: AgentRuntime<A> = self.with_agent(agent).await?;
runtime.start().await?;
Ok(runtime)
}
#[cfg(not(feature = "dora"))]
pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> anyhow::Result<SimpleAgentRuntime<A>> {
let metadata = self.build_metadata();
let config = self.build_config();
let interrupt = AgentInterrupt::new();
let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
Ok(SimpleAgentRuntime {
agent,
metadata,
config,
interrupt,
plugins: self.plugins,
inputs: self.inputs,
outputs: self.outputs,
max_concurrent_tasks: self.max_concurrent_tasks,
default_timeout: self.default_timeout,
event_tx,
event_rx: Some(event_rx),
})
}
#[cfg(not(feature = "dora"))]
pub async fn build_and_start<A: MoFAAgent>(
self,
agent: A,
) -> anyhow::Result<SimpleAgentRuntime<A>> {
let mut runtime = self.with_agent(agent).await?;
runtime.start().await?;
Ok(runtime)
}
}
#[cfg(feature = "dora")]
pub struct AgentRuntime<A: MoFAAgent> {
agent: A,
node: Arc<DoraAgentNode>,
metadata: AgentMetadata,
config: AgentConfig,
interrupt: AgentInterrupt,
plugins: Vec<Box<dyn AgentPlugin>>,
}
#[cfg(feature = "dora")]
impl<A: MoFAAgent> AgentRuntime<A> {
pub fn agent(&self) -> &A {
&self.agent
}
pub fn agent_mut(&mut self) -> &mut A {
&mut self.agent
}
pub fn node(&self) -> &Arc<DoraAgentNode> {
&self.node
}
pub fn metadata(&self) -> &AgentMetadata {
&self.metadata
}
pub fn config(&self) -> &AgentConfig {
&self.config
}
pub fn interrupt(&self) -> &AgentInterrupt {
&self.interrupt
}
pub async fn init_plugins(&mut self) -> DoraResult<()> {
for plugin in &mut self.plugins {
plugin
.init_plugin()
.await
.map_err(|e| DoraError::OperatorError(e.to_string()))?;
}
Ok(())
}
pub async fn start(&self) -> DoraResult<()> {
self.node.init().await?;
info!("AgentRuntime {} started", self.metadata.id);
Ok(())
}
pub async fn run_event_loop(&mut self) -> DoraResult<()> {
let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
self.agent
.initialize(&context)
.await
.map_err(|e| DoraError::Internal(e.to_string()))?;
self.init_plugins().await?;
let event_loop = self.node.create_event_loop();
loop {
if event_loop.should_interrupt() {
debug!("Interrupt signal received for {}", self.metadata.id);
self.interrupt.reset();
}
match event_loop.next_event().await {
Some(AgentEvent::Shutdown) => {
info!("Received shutdown event");
break;
}
Some(event) => {
if self.interrupt.check() {
debug!("Interrupt signal received for {}", self.metadata.id);
self.interrupt.reset();
}
use mofa_kernel::agent::types::AgentInput;
use mofa_kernel::message::TaskRequest;
let input = match event.clone() {
AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
AgentEvent::Custom(data, _) => AgentInput::text(data),
_ => AgentInput::text(format!("{:?}", event)),
};
self.agent
.execute(input, &context)
.await
.map_err(|e| DoraError::Internal(e.to_string()))?;
}
None => {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
self.agent
.shutdown()
.await
.map_err(|e| DoraError::Internal(e.to_string()))?;
Ok(())
}
pub async fn stop(&self) -> DoraResult<()> {
self.interrupt.trigger();
self.node.stop().await?;
info!("AgentRuntime {} stopped", self.metadata.id);
Ok(())
}
pub async fn send_output(&self, output_id: &str, message: &AgentMessage) -> DoraResult<()> {
self.node.send_message(output_id, message).await
}
pub async fn inject_event(&self, event: AgentEvent) -> DoraResult<()> {
self.node.inject_event(event).await
}
}
#[cfg(not(feature = "dora"))]
pub struct SimpleAgentRuntime<A: MoFAAgent> {
agent: A,
metadata: AgentMetadata,
config: AgentConfig,
interrupt: AgentInterrupt,
plugins: Vec<Box<dyn AgentPlugin>>,
inputs: Vec<String>,
outputs: Vec<String>,
max_concurrent_tasks: usize,
default_timeout: Duration,
event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
event_rx: Option<tokio::sync::mpsc::Receiver<AgentEvent>>,
}
#[cfg(not(feature = "dora"))]
impl<A: MoFAAgent> SimpleAgentRuntime<A> {
pub async fn inject_event(&self, event: AgentEvent) {
let _ = self.event_tx.send(event).await;
}
}
#[cfg(not(feature = "dora"))]
#[cfg(not(feature = "dora"))]
impl<A: MoFAAgent> SimpleAgentRuntime<A> {
pub fn agent(&self) -> &A {
&self.agent
}
pub fn agent_mut(&mut self) -> &mut A {
&mut self.agent
}
pub fn metadata(&self) -> &AgentMetadata {
&self.metadata
}
pub fn config(&self) -> &AgentConfig {
&self.config
}
pub fn interrupt(&self) -> &AgentInterrupt {
&self.interrupt
}
pub fn inputs(&self) -> &[String] {
&self.inputs
}
pub fn outputs(&self) -> &[String] {
&self.outputs
}
pub fn max_concurrent_tasks(&self) -> usize {
self.max_concurrent_tasks
}
pub fn default_timeout(&self) -> Duration {
self.default_timeout
}
pub async fn init_plugins(&mut self) -> anyhow::Result<()> {
for plugin in &mut self.plugins {
plugin.init_plugin().await?;
}
Ok(())
}
pub async fn start(&mut self) -> anyhow::Result<()> {
let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
self.agent.initialize(&context).await?;
self.init_plugins().await?;
tracing::info!("SimpleAgentRuntime {} started", self.metadata.id);
Ok(())
}
pub async fn handle_event(&mut self, event: AgentEvent) -> anyhow::Result<()> {
if self.interrupt.check() {
tracing::debug!("Interrupt signal received for {}", self.metadata.id);
self.interrupt.reset();
}
use mofa_kernel::agent::types::AgentInput;
let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
let input = match event {
AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
AgentEvent::Shutdown => {
tracing::info!("Shutdown event received for {}", self.metadata.id);
return Ok(());
}
AgentEvent::Custom(data, _) => AgentInput::text(data),
_ => AgentInput::text(format!("{:?}", event)),
};
let _output = self.agent.execute(input, &context).await?;
Ok(())
}
pub async fn run(&mut self) -> anyhow::Result<()> {
let event_rx = self
.event_rx
.take()
.ok_or_else(|| anyhow::anyhow!("Event receiver already taken"))?;
self.run_with_receiver(event_rx).await
}
pub async fn run_with_receiver(
&mut self,
mut event_rx: tokio::sync::mpsc::Receiver<AgentEvent>,
) -> anyhow::Result<()> {
loop {
if self.interrupt.check() {
tracing::debug!("Interrupt signal received for {}", self.metadata.id);
self.interrupt.reset();
}
match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
Ok(Some(AgentEvent::Shutdown)) => {
tracing::info!("Received shutdown event");
break;
}
Ok(Some(event)) => {
self.handle_event(event).await?;
}
Ok(None) => {
break;
}
Err(_) => {
continue;
}
}
}
self.agent.shutdown().await?;
Ok(())
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
self.interrupt.trigger();
self.agent.shutdown().await?;
tracing::info!("SimpleAgentRuntime {} stopped", self.metadata.id);
Ok(())
}
pub fn trigger_interrupt(&self) {
self.interrupt.trigger();
}
}
#[cfg(not(feature = "dora"))]
pub struct SimpleRuntime {
agents: std::sync::Arc<tokio::sync::RwLock<HashMap<String, SimpleAgentInfo>>>,
agent_roles: std::sync::Arc<tokio::sync::RwLock<HashMap<String, String>>>,
message_bus: std::sync::Arc<SimpleMessageBus>,
}
#[cfg(not(feature = "dora"))]
pub struct SimpleAgentInfo {
pub metadata: AgentMetadata,
pub config: AgentConfig,
pub event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
}
#[cfg(not(feature = "dora"))]
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub stream_id: String,
pub stream_type: StreamType,
pub metadata: HashMap<String, String>,
pub subscribers: Vec<String>,
pub sequence: u64,
pub is_paused: bool,
}
#[cfg(not(feature = "dora"))]
pub struct SimpleMessageBus {
subscribers: tokio::sync::RwLock<HashMap<String, Vec<tokio::sync::mpsc::Sender<AgentEvent>>>>,
topic_subscribers: tokio::sync::RwLock<HashMap<String, Vec<String>>>,
streams: tokio::sync::RwLock<HashMap<String, StreamInfo>>,
}
#[cfg(not(feature = "dora"))]
impl SimpleMessageBus {
pub fn new() -> Self {
Self {
subscribers: tokio::sync::RwLock::new(HashMap::new()),
topic_subscribers: tokio::sync::RwLock::new(HashMap::new()),
streams: tokio::sync::RwLock::new(HashMap::new()),
}
}
pub async fn register(&self, agent_id: &str, tx: tokio::sync::mpsc::Sender<AgentEvent>) {
let mut subs = self.subscribers.write().await;
subs.entry(agent_id.to_string())
.or_insert_with(Vec::new)
.push(tx);
}
pub async fn subscribe(&self, agent_id: &str, topic: &str) {
let mut topics = self.topic_subscribers.write().await;
topics
.entry(topic.to_string())
.or_insert_with(Vec::new)
.push(agent_id.to_string());
}
pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
let subs = self.subscribers.read().await;
if let Some(senders) = subs.get(target_id) {
for tx in senders {
let _ = tx.send(event.clone()).await;
}
}
Ok(())
}
pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
let subs = self.subscribers.read().await;
for senders in subs.values() {
for tx in senders {
let _ = tx.send(event.clone()).await;
}
}
Ok(())
}
pub async fn publish(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
let topics = self.topic_subscribers.read().await;
if let Some(agent_ids) = topics.get(topic) {
let subs = self.subscribers.read().await;
for agent_id in agent_ids {
if let Some(senders) = subs.get(agent_id) {
for tx in senders {
let _ = tx.send(event.clone()).await;
}
}
}
}
Ok(())
}
pub async fn create_stream(
&self,
stream_id: &str,
stream_type: StreamType,
metadata: HashMap<String, String>,
) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if streams.contains_key(stream_id) {
return Err(anyhow::anyhow!("Stream {} already exists", stream_id));
}
let stream_info = StreamInfo {
stream_id: stream_id.to_string(),
stream_type: stream_type.clone(),
metadata: metadata.clone(),
subscribers: Vec::new(),
sequence: 0,
is_paused: false,
};
streams.insert(stream_id.to_string(), stream_info.clone());
self.broadcast(AgentEvent::StreamCreated {
stream_id: stream_id.to_string(),
stream_type,
metadata,
})
.await
}
pub async fn close_stream(&self, stream_id: &str, reason: &str) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream_info) = streams.remove(stream_id) {
let event = AgentEvent::StreamClosed {
stream_id: stream_id.to_string(),
reason: reason.to_string(),
};
let subs = self.subscribers.read().await;
for agent_id in &stream_info.subscribers {
if let Some(senders) = subs.get(agent_id) {
for tx in senders {
let _ = tx.send(event.clone()).await;
}
}
}
}
Ok(())
}
pub async fn subscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream_info) = streams.get_mut(stream_id) {
if !stream_info.subscribers.contains(&agent_id.to_string()) {
stream_info.subscribers.push(agent_id.to_string());
self.broadcast(AgentEvent::StreamSubscription {
stream_id: stream_id.to_string(),
subscriber_id: agent_id.to_string(),
})
.await?;
}
}
Ok(())
}
pub async fn unsubscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream_info) = streams.get_mut(stream_id) {
if let Some(pos) = stream_info.subscribers.iter().position(|id| id == agent_id) {
stream_info.subscribers.remove(pos);
self.broadcast(AgentEvent::StreamUnsubscription {
stream_id: stream_id.to_string(),
subscriber_id: agent_id.to_string(),
})
.await?;
}
}
Ok(())
}
pub async fn send_stream_message(
&self,
stream_id: &str,
message: Vec<u8>,
) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream_info) = streams.get_mut(stream_id) {
if stream_info.is_paused {
return Ok(());
}
let sequence = stream_info.sequence;
stream_info.sequence += 1;
let event = AgentEvent::StreamMessage {
stream_id: stream_id.to_string(),
message,
sequence,
};
let subs = self.subscribers.read().await;
for agent_id in &stream_info.subscribers {
if let Some(senders) = subs.get(agent_id) {
for tx in senders {
let _ = tx.send(event.clone()).await;
}
}
}
}
Ok(())
}
pub async fn pause_stream(&self, stream_id: &str) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream_info) = streams.get_mut(stream_id) {
stream_info.is_paused = true;
}
Ok(())
}
pub async fn resume_stream(&self, stream_id: &str) -> anyhow::Result<()> {
let mut streams = self.streams.write().await;
if let Some(stream_info) = streams.get_mut(stream_id) {
stream_info.is_paused = false;
}
Ok(())
}
pub async fn get_stream_info(&self, stream_id: &str) -> anyhow::Result<Option<StreamInfo>> {
let streams = self.streams.read().await;
Ok(streams.get(stream_id).cloned())
}
}
#[cfg(not(feature = "dora"))]
impl Default for SimpleMessageBus {
fn default() -> Self {
Self::new()
}
}
#[cfg(not(feature = "dora"))]
impl SimpleRuntime {
pub fn new() -> Self {
Self {
agents: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
agent_roles: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
message_bus: std::sync::Arc::new(SimpleMessageBus::new()),
}
}
pub async fn register_agent(
&self,
metadata: AgentMetadata,
config: AgentConfig,
role: &str,
) -> anyhow::Result<tokio::sync::mpsc::Receiver<AgentEvent>> {
let agent_id = metadata.id.clone();
let (tx, rx) = tokio::sync::mpsc::channel(100);
self.message_bus.register(&agent_id, tx.clone()).await;
let mut agents = self.agents.write().await;
agents.insert(
agent_id.clone(),
SimpleAgentInfo {
metadata,
config,
event_tx: tx,
},
);
let mut roles = self.agent_roles.write().await;
roles.insert(agent_id.clone(), role.to_string());
tracing::info!("Agent {} registered with role {}", agent_id, role);
Ok(rx)
}
pub fn message_bus(&self) -> &std::sync::Arc<SimpleMessageBus> {
&self.message_bus
}
pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
let roles = self.agent_roles.read().await;
roles
.iter()
.filter(|(_, r)| *r == role)
.map(|(id, _)| id.clone())
.collect()
}
pub async fn send_to_agent(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
self.message_bus.send_to(target_id, event).await
}
pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
self.message_bus.broadcast(event).await
}
pub async fn publish_to_topic(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
self.message_bus.publish(topic, event).await
}
pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> anyhow::Result<()> {
self.message_bus.subscribe(agent_id, topic).await;
Ok(())
}
pub async fn create_stream(
&self,
stream_id: &str,
stream_type: StreamType,
metadata: std::collections::HashMap<String, String>,
) -> anyhow::Result<()> {
self.message_bus
.create_stream(stream_id, stream_type, metadata)
.await
}
pub async fn close_stream(&self, stream_id: &str, reason: &str) -> anyhow::Result<()> {
self.message_bus.close_stream(stream_id, reason).await
}
pub async fn subscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
self.message_bus.subscribe_stream(agent_id, stream_id).await
}
pub async fn unsubscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
self.message_bus
.unsubscribe_stream(agent_id, stream_id)
.await
}
pub async fn send_stream_message(
&self,
stream_id: &str,
message: Vec<u8>,
) -> anyhow::Result<()> {
self.message_bus
.send_stream_message(stream_id, message)
.await
}
pub async fn pause_stream(&self, stream_id: &str) -> anyhow::Result<()> {
self.message_bus.pause_stream(stream_id).await
}
pub async fn resume_stream(&self, stream_id: &str) -> anyhow::Result<()> {
self.message_bus.resume_stream(stream_id).await
}
pub async fn get_stream_info(&self, stream_id: &str) -> anyhow::Result<Option<StreamInfo>> {
self.message_bus.get_stream_info(stream_id).await
}
pub async fn stop_all(&self) -> anyhow::Result<()> {
self.message_bus.broadcast(AgentEvent::Shutdown).await?;
tracing::info!("SimpleRuntime stopped");
Ok(())
}
}
#[cfg(not(feature = "dora"))]
impl Default for SimpleRuntime {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "dora")]
type AgentNodeMap = HashMap<String, Arc<DoraAgentNode>>;
#[cfg(feature = "dora")]
pub struct MoFARuntime {
dataflow: Option<DoraDataflow>,
channel: Arc<DoraChannel>,
agents: Arc<RwLock<AgentNodeMap>>,
agent_roles: Arc<RwLock<HashMap<String, String>>>,
}
#[cfg(feature = "dora")]
impl MoFARuntime {
pub async fn new() -> Self {
let channel_config = ChannelConfig::default();
Self {
dataflow: None,
channel: Arc::new(DoraChannel::new(channel_config)),
agents: Arc::new(RwLock::new(HashMap::new())),
agent_roles: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn with_dataflow(dataflow_config: DataflowConfig) -> Self {
let dataflow = DoraDataflow::new(dataflow_config);
let channel_config = ChannelConfig::default();
Self {
dataflow: Some(dataflow),
channel: Arc::new(DoraChannel::new(channel_config)),
agents: Arc::new(RwLock::new(HashMap::new())),
agent_roles: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_agent(&self, node: DoraAgentNode, role: &str) -> DoraResult<()> {
let agent_id = node.config().node_id.clone();
self.channel.register_agent(&agent_id).await?;
if let Some(ref dataflow) = self.dataflow {
dataflow.add_node(node).await?;
} else {
let mut agents: tokio::sync::RwLockWriteGuard<'_, AgentNodeMap> =
self.agents.write().await;
agents.insert(agent_id.clone(), Arc::new(node));
}
let mut roles = self.agent_roles.write().await;
roles.insert(agent_id.clone(), role.to_string());
info!("Agent {} registered with role {}", agent_id, role);
Ok(())
}
pub async fn connect_agents(
&self,
source_id: &str,
source_output: &str,
target_id: &str,
target_input: &str,
) -> DoraResult<()> {
if let Some(ref dataflow) = self.dataflow {
dataflow
.connect(source_id, source_output, target_id, target_input)
.await?;
}
Ok(())
}
pub fn channel(&self) -> &Arc<DoraChannel> {
&self.channel
}
pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
let roles = self.agent_roles.read().await;
roles
.iter()
.filter(|(_, r)| *r == role)
.map(|(id, _)| id.clone())
.collect()
}
pub async fn send_to_agent(
&self,
sender_id: &str,
receiver_id: &str,
message: &AgentMessage,
) -> DoraResult<()> {
let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.to(receiver_id);
self.channel.send_p2p(envelope).await
}
pub async fn broadcast(&self, sender_id: &str, message: &AgentMessage) -> DoraResult<()> {
let envelope = MessageEnvelope::from_agent_message(sender_id, message)?;
self.channel.broadcast(envelope).await
}
pub async fn publish_to_topic(
&self,
sender_id: &str,
topic: &str,
message: &AgentMessage,
) -> DoraResult<()> {
let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.with_topic(topic);
self.channel.publish(envelope).await
}
pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> DoraResult<()> {
self.channel.subscribe(agent_id, topic).await
}
pub async fn build_and_start(&self) -> DoraResult<()> {
if let Some(ref dataflow) = self.dataflow {
dataflow.build().await?;
dataflow.start().await?;
} else {
let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
for (id, node) in agents.iter() {
node.init().await?;
debug!("Agent {} initialized", id);
}
}
info!("MoFARuntime started");
Ok(())
}
pub async fn stop(&self) -> DoraResult<()> {
if let Some(ref dataflow) = self.dataflow {
dataflow.stop().await?;
} else {
let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
for node in agents.values() {
node.stop().await?;
}
}
info!("MoFARuntime stopped");
Ok(())
}
pub async fn pause(&self) -> DoraResult<()> {
if let Some(ref dataflow) = self.dataflow {
dataflow.pause().await?;
}
Ok(())
}
pub async fn resume(&self) -> DoraResult<()> {
if let Some(ref dataflow) = self.dataflow {
dataflow.resume().await?;
}
Ok(())
}
}