Skip to main content

mofa_runtime/
builder.rs

1//! 高级 Agent 构建器 API
2//!
3//! 提供流式 API 来构建和运行智能体
4//!
5//! 该模块支持两种运行时模式:
6//! - 当启用 `dora` feature 时,使用 dora-rs 运行时
7//! - 当未启用 `dora` feature 时,使用内置的 SimpleRuntime
8
9#[cfg(feature = "dora")]
10use crate::dora_adapter::{
11    ChannelConfig, DataflowConfig, DoraAgentNode, DoraChannel, DoraDataflow, DoraError,
12    DoraNodeConfig, DoraResult, MessageEnvelope,
13};
14use crate::interrupt::AgentInterrupt;
15#[cfg(feature = "dora")]
16use crate::message::AgentMessage;
17use crate::{AgentConfig, AgentMetadata, MoFAAgent};
18use mofa_kernel::AgentPlugin;
19use mofa_kernel::message::AgentEvent;
20use std::collections::HashMap;
21#[cfg(feature = "dora")]
22use std::sync::Arc;
23use std::time::Duration;
24#[cfg(feature = "dora")]
25use tokio::sync::RwLock;
26#[cfg(feature = "dora")]
27use tracing::{debug, info};
28
29/// 智能体构建器 - 提供流式 API
30pub struct AgentBuilder {
31    agent_id: String,
32    name: String,
33    capabilities: Vec<String>,
34    dependencies: Vec<String>,
35    plugins: Vec<Box<dyn AgentPlugin>>,
36    node_config: HashMap<String, String>,
37    inputs: Vec<String>,
38    outputs: Vec<String>,
39    max_concurrent_tasks: usize,
40    default_timeout: Duration,
41}
42
43impl AgentBuilder {
44    /// 创建新的 AgentBuilder
45    pub fn new(agent_id: &str, name: &str) -> Self {
46        Self {
47            agent_id: agent_id.to_string(),
48            name: name.to_string(),
49            capabilities: Vec::new(),
50            dependencies: Vec::new(),
51            plugins: Vec::new(),
52            node_config: HashMap::new(),
53            inputs: vec!["task_input".to_string()],
54            outputs: vec!["task_output".to_string()],
55            max_concurrent_tasks: 10,
56            default_timeout: Duration::from_secs(30),
57        }
58    }
59
60    /// 添加能力
61    pub fn with_capability(mut self, capability: &str) -> Self {
62        self.capabilities.push(capability.to_string());
63        self
64    }
65
66    /// 添加多个能力
67    pub fn with_capabilities(mut self, capabilities: Vec<&str>) -> Self {
68        for cap in capabilities {
69            self.capabilities.push(cap.to_string());
70        }
71        self
72    }
73
74    /// 添加依赖
75    pub fn with_dependency(mut self, dependency: &str) -> Self {
76        self.dependencies.push(dependency.to_string());
77        self
78    }
79
80    /// 添加插件
81    pub fn with_plugin(mut self, plugin: Box<dyn AgentPlugin>) -> Self {
82        self.plugins.push(plugin);
83        self
84    }
85
86    /// 添加输入端口
87    pub fn with_input(mut self, input: &str) -> Self {
88        self.inputs.push(input.to_string());
89        self
90    }
91
92    /// 添加输出端口
93    pub fn with_output(mut self, output: &str) -> Self {
94        self.outputs.push(output.to_string());
95        self
96    }
97
98    /// 设置最大并发任务数
99    pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
100        self.max_concurrent_tasks = max;
101        self
102    }
103
104    /// 设置默认超时
105    pub fn with_timeout(mut self, timeout: Duration) -> Self {
106        self.default_timeout = timeout;
107        self
108    }
109
110    /// 添加自定义配置
111    pub fn with_config(mut self, key: &str, value: &str) -> Self {
112        self.node_config.insert(key.to_string(), value.to_string());
113        self
114    }
115
116    /// 构建智能体配置
117    pub fn build_config(&self) -> AgentConfig {
118        AgentConfig {
119            agent_id: self.agent_id.clone(),
120            name: self.name.clone(),
121            node_config: self.node_config.clone(),
122        }
123    }
124
125    /// 构建元数据
126    pub fn build_metadata(&self) -> AgentMetadata {
127        use mofa_kernel::agent::AgentCapabilities;
128        use mofa_kernel::agent::AgentState;
129
130        // 将 Vec<String> 转换为 AgentCapabilities
131        let agent_capabilities = AgentCapabilities::builder()
132            .tags(self.capabilities.clone())
133            .build();
134
135        AgentMetadata {
136            id: self.agent_id.clone(),
137            name: self.name.clone(),
138            description: None,
139            version: None,
140            capabilities: agent_capabilities,
141            state: AgentState::Created,
142        }
143    }
144
145    /// 构建 DoraNodeConfig
146    #[cfg(feature = "dora")]
147    pub fn build_node_config(&self) -> DoraNodeConfig {
148        DoraNodeConfig {
149            node_id: self.agent_id.clone(),
150            name: self.name.clone(),
151            inputs: self.inputs.clone(),
152            outputs: self.outputs.clone(),
153            event_buffer_size: self.max_concurrent_tasks * 10,
154            default_timeout: self.default_timeout,
155            custom_config: self.node_config.clone(),
156        }
157    }
158
159    /// 使用提供的 MoFAAgent 实现构建运行时
160    #[cfg(feature = "dora")]
161    pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
162        let node_config = self.build_node_config();
163        let metadata = self.build_metadata();
164        let config = self.build_config();
165
166        let node = DoraAgentNode::new(node_config);
167        let interrupt = node.interrupt().clone();
168
169        Ok(AgentRuntime {
170            agent,
171            node: Arc::new(node),
172            metadata,
173            config,
174            interrupt,
175            plugins: self.plugins,
176        })
177    }
178
179    /// 构建并启动智能体(需要提供 MoFAAgent 实现)
180    #[cfg(feature = "dora")]
181    pub async fn build_and_start<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
182        let runtime: AgentRuntime<A> = self.with_agent(agent).await?;
183        runtime.start().await?;
184        Ok(runtime)
185    }
186
187    /// 使用提供的 MoFAAgent 实现构建简单运行时(非 dora 模式)
188    #[cfg(not(feature = "dora"))]
189    pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> anyhow::Result<SimpleAgentRuntime<A>> {
190        let metadata = self.build_metadata();
191        let config = self.build_config();
192        let interrupt = AgentInterrupt::new();
193
194        Ok(SimpleAgentRuntime {
195            agent,
196            metadata,
197            config,
198            interrupt,
199            plugins: self.plugins,
200            inputs: self.inputs,
201            outputs: self.outputs,
202            max_concurrent_tasks: self.max_concurrent_tasks,
203            default_timeout: self.default_timeout,
204        })
205    }
206
207    /// 构建并启动智能体(非 dora 模式)
208    #[cfg(not(feature = "dora"))]
209    pub async fn build_and_start<A: MoFAAgent>(
210        self,
211        agent: A,
212    ) -> anyhow::Result<SimpleAgentRuntime<A>> {
213        let mut runtime = self.with_agent(agent).await?;
214        runtime.start().await?;
215        Ok(runtime)
216    }
217}
218
219/// 智能体运行时
220#[cfg(feature = "dora")]
221pub struct AgentRuntime<A: MoFAAgent> {
222    agent: A,
223    node: Arc<DoraAgentNode>,
224    metadata: AgentMetadata,
225    config: AgentConfig,
226    interrupt: AgentInterrupt,
227    plugins: Vec<Box<dyn AgentPlugin>>,
228}
229
230#[cfg(feature = "dora")]
231impl<A: MoFAAgent> AgentRuntime<A> {
232    /// 获取智能体引用
233    pub fn agent(&self) -> &A {
234        &self.agent
235    }
236
237    /// 获取可变智能体引用
238    pub fn agent_mut(&mut self) -> &mut A {
239        &mut self.agent
240    }
241
242    /// 获取节点
243    pub fn node(&self) -> &Arc<DoraAgentNode> {
244        &self.node
245    }
246
247    /// 获取元数据
248    pub fn metadata(&self) -> &AgentMetadata {
249        &self.metadata
250    }
251
252    /// 获取配置
253    pub fn config(&self) -> &AgentConfig {
254        &self.config
255    }
256
257    /// 获取中断句柄
258    pub fn interrupt(&self) -> &AgentInterrupt {
259        &self.interrupt
260    }
261
262    /// 初始化插件
263    pub async fn init_plugins(&mut self) -> DoraResult<()> {
264        for plugin in &mut self.plugins {
265            plugin
266                .init_plugin()
267                .await
268                .map_err(|e| DoraError::OperatorError(e.to_string()))?;
269        }
270        Ok(())
271    }
272
273    /// 启动运行时
274    pub async fn start(&self) -> DoraResult<()> {
275        self.node.init().await?;
276        info!("AgentRuntime {} started", self.metadata.id);
277        Ok(())
278    }
279
280    /// 运行事件循环
281    pub async fn run_event_loop(&mut self) -> DoraResult<()> {
282        // 创建 CoreAgentContext 并初始化智能体
283        let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
284        self.agent
285            .initialize(&context)
286            .await
287            .map_err(|e| DoraError::Internal(e.to_string()))?;
288
289        // 初始化插件
290        self.init_plugins().await?;
291
292        let event_loop = self.node.create_event_loop();
293
294        loop {
295            // 检查中断
296            if event_loop.should_interrupt() {
297                debug!("Interrupt signal received");
298                self.interrupt.reset();
299            }
300
301            // 获取下一个事件
302            match event_loop.next_event().await {
303                Some(AgentEvent::Shutdown) => {
304                    info!("Received shutdown event");
305                    break;
306                }
307                Some(event) => {
308                    // 处理事件前检查中断
309                    if self.interrupt.check() {
310                        debug!("Interrupt signal received");
311                        self.interrupt.reset();
312                    }
313
314                    // 将事件转换为输入并执行
315                    use mofa_kernel::agent::types::AgentInput;
316                    use mofa_kernel::message::TaskRequest;
317
318                    let input = match event {
319                        AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
320                        AgentEvent::Custom(data, _) => AgentInput::text(data),
321                        _ => AgentInput::text(format!("{:?}", event)),
322                    };
323
324                    self.agent
325                        .execute(input, &context)
326                        .await
327                        .map_err(|e| DoraError::Internal(e.to_string()))?;
328                }
329                None => {
330                    // 无事件,继续等待
331                    tokio::time::sleep(Duration::from_millis(10)).await;
332                }
333            }
334        }
335
336        // 销毁智能体
337        self.agent
338            .shutdown()
339            .await
340            .map_err(|e| DoraError::Internal(e.to_string()))?;
341
342        Ok(())
343    }
344
345    /// 停止运行时
346    pub async fn stop(&self) -> DoraResult<()> {
347        self.interrupt.trigger();
348        self.node.stop().await?;
349        info!("AgentRuntime {} stopped", self.metadata.id);
350        Ok(())
351    }
352
353    /// 发送消息到输出
354    pub async fn send_output(&self, output_id: &str, message: &AgentMessage) -> DoraResult<()> {
355        self.node.send_message(output_id, message).await
356    }
357
358    /// 注入事件
359    pub async fn inject_event(&self, event: AgentEvent) -> DoraResult<()> {
360        self.node.inject_event(event).await
361    }
362}
363
364// ============================================================================
365// 非 dora 运行时实现 - SimpleAgentRuntime
366// ============================================================================
367
368/// 简单智能体运行时 - 不依赖 dora-rs 的轻量级运行时
369#[cfg(not(feature = "dora"))]
370pub struct SimpleAgentRuntime<A: MoFAAgent> {
371    agent: A,
372    metadata: AgentMetadata,
373    config: AgentConfig,
374    interrupt: AgentInterrupt,
375    plugins: Vec<Box<dyn AgentPlugin>>,
376    inputs: Vec<String>,
377    outputs: Vec<String>,
378    max_concurrent_tasks: usize,
379    default_timeout: Duration,
380}
381
382#[cfg(not(feature = "dora"))]
383impl<A: MoFAAgent> SimpleAgentRuntime<A> {
384    /// 获取智能体引用
385    pub fn agent(&self) -> &A {
386        &self.agent
387    }
388
389    /// 获取可变智能体引用
390    pub fn agent_mut(&mut self) -> &mut A {
391        &mut self.agent
392    }
393
394    /// 获取元数据
395    pub fn metadata(&self) -> &AgentMetadata {
396        &self.metadata
397    }
398
399    /// 获取配置
400    pub fn config(&self) -> &AgentConfig {
401        &self.config
402    }
403
404    /// 获取中断句柄
405    pub fn interrupt(&self) -> &AgentInterrupt {
406        &self.interrupt
407    }
408
409    /// 获取输入端口列表
410    pub fn inputs(&self) -> &[String] {
411        &self.inputs
412    }
413
414    /// 获取输出端口列表
415    pub fn outputs(&self) -> &[String] {
416        &self.outputs
417    }
418
419    /// 获取最大并发任务数
420    pub fn max_concurrent_tasks(&self) -> usize {
421        self.max_concurrent_tasks
422    }
423
424    /// 获取默认超时时间
425    pub fn default_timeout(&self) -> Duration {
426        self.default_timeout
427    }
428
429    /// 初始化插件
430    pub async fn init_plugins(&mut self) -> anyhow::Result<()> {
431        for plugin in &mut self.plugins {
432            plugin.init_plugin().await?;
433        }
434        Ok(())
435    }
436
437    /// 启动运行时
438    pub async fn start(&mut self) -> anyhow::Result<()> {
439        // 创建 CoreAgentContext 并初始化智能体
440        let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
441        self.agent.initialize(&context).await?;
442        // 初始化插件
443        self.init_plugins().await?;
444        tracing::info!("SimpleAgentRuntime {} started", self.metadata.id);
445        Ok(())
446    }
447
448    /// 处理单个事件
449    pub async fn handle_event(&mut self, event: AgentEvent) -> anyhow::Result<()> {
450        // 检查中断
451        if self.interrupt.check() {
452            tracing::debug!("Interrupt signal received");
453            self.interrupt.reset();
454        }
455
456        // 将事件转换为输入并执行
457        use mofa_kernel::agent::types::AgentInput;
458
459        let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
460        let input = match event {
461            AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
462            AgentEvent::Shutdown => {
463                tracing::info!("Shutdown event received");
464                return Ok(());
465            }
466            AgentEvent::Custom(data, _) => AgentInput::text(data),
467            _ => AgentInput::text(format!("{:?}", event)),
468        };
469
470        let _output = self.agent.execute(input, &context).await?;
471        Ok(())
472    }
473
474    /// 运行事件循环(使用事件通道)
475    pub async fn run_with_receiver(
476        &mut self,
477        mut event_rx: tokio::sync::mpsc::Receiver<AgentEvent>,
478    ) -> anyhow::Result<()> {
479        loop {
480            // 检查中断
481            if self.interrupt.check() {
482                // 中断处理
483                tracing::debug!("Interrupt signal received");
484                self.interrupt.reset();
485            }
486
487            // 等待事件
488            match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
489                Ok(Some(AgentEvent::Shutdown)) => {
490                    tracing::info!("Received shutdown event");
491                    break;
492                }
493                Ok(Some(event)) => {
494                    // 将事件转换为输入并执行
495                    use mofa_kernel::agent::types::AgentInput;
496                    let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
497                    let input = match event {
498                        AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
499                        AgentEvent::Custom(data, _) => AgentInput::text(data),
500                        _ => AgentInput::text(format!("{:?}", event)),
501                    };
502
503                    self.agent.execute(input, &context).await?;
504                }
505                Ok(None) => {
506                    // 通道关闭
507                    break;
508                }
509                Err(_) => {
510                    // 超时,继续等待
511                    continue;
512                }
513            }
514        }
515
516        // 销毁智能体
517        self.agent.shutdown().await?;
518        Ok(())
519    }
520
521    /// 停止运行时
522    pub async fn stop(&mut self) -> anyhow::Result<()> {
523        self.interrupt.trigger();
524        self.agent.shutdown().await?;
525        tracing::info!("SimpleAgentRuntime {} stopped", self.metadata.id);
526        Ok(())
527    }
528
529    /// 触发中断
530    pub fn trigger_interrupt(&self) {
531        self.interrupt.trigger();
532    }
533}
534
535// ============================================================================
536// 简单多智能体运行时 - SimpleRuntime
537// ============================================================================
538
539/// 简单运行时 - 管理多个智能体的协同运行(非 dora 版本)
540#[cfg(not(feature = "dora"))]
541pub struct SimpleRuntime {
542    agents: std::sync::Arc<tokio::sync::RwLock<HashMap<String, SimpleAgentInfo>>>,
543    agent_roles: std::sync::Arc<tokio::sync::RwLock<HashMap<String, String>>>,
544    message_bus: std::sync::Arc<SimpleMessageBus>,
545}
546
547/// 智能体信息
548#[cfg(not(feature = "dora"))]
549pub struct SimpleAgentInfo {
550    pub metadata: AgentMetadata,
551    pub config: AgentConfig,
552    pub event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
553}
554
555/// 简单消息总线
556#[cfg(not(feature = "dora"))]
557pub struct SimpleMessageBus {
558    subscribers: tokio::sync::RwLock<HashMap<String, Vec<tokio::sync::mpsc::Sender<AgentEvent>>>>,
559    topic_subscribers: tokio::sync::RwLock<HashMap<String, Vec<String>>>,
560}
561
562#[cfg(not(feature = "dora"))]
563impl SimpleMessageBus {
564    /// 创建新的消息总线
565    pub fn new() -> Self {
566        Self {
567            subscribers: tokio::sync::RwLock::new(HashMap::new()),
568            topic_subscribers: tokio::sync::RwLock::new(HashMap::new()),
569        }
570    }
571
572    /// 注册智能体
573    pub async fn register(&self, agent_id: &str, tx: tokio::sync::mpsc::Sender<AgentEvent>) {
574        let mut subs = self.subscribers.write().await;
575        subs.entry(agent_id.to_string())
576            .or_insert_with(Vec::new)
577            .push(tx);
578    }
579
580    /// 订阅主题
581    pub async fn subscribe(&self, agent_id: &str, topic: &str) {
582        let mut topics = self.topic_subscribers.write().await;
583        topics
584            .entry(topic.to_string())
585            .or_insert_with(Vec::new)
586            .push(agent_id.to_string());
587    }
588
589    /// 发送点对点消息
590    pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
591        let subs = self.subscribers.read().await;
592        if let Some(senders) = subs.get(target_id) {
593            for tx in senders {
594                let _ = tx.send(event.clone()).await;
595            }
596        }
597        Ok(())
598    }
599
600    /// 广播消息给所有智能体
601    pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
602        let subs = self.subscribers.read().await;
603        for senders in subs.values() {
604            for tx in senders {
605                let _ = tx.send(event.clone()).await;
606            }
607        }
608        Ok(())
609    }
610
611    /// 发布到主题
612    pub async fn publish(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
613        let topics = self.topic_subscribers.read().await;
614        if let Some(agent_ids) = topics.get(topic) {
615            let subs = self.subscribers.read().await;
616            for agent_id in agent_ids {
617                if let Some(senders) = subs.get(agent_id) {
618                    for tx in senders {
619                        let _ = tx.send(event.clone()).await;
620                    }
621                }
622            }
623        }
624        Ok(())
625    }
626}
627
628#[cfg(not(feature = "dora"))]
629impl Default for SimpleMessageBus {
630    fn default() -> Self {
631        Self::new()
632    }
633}
634
635#[cfg(not(feature = "dora"))]
636impl SimpleRuntime {
637    /// 创建新的简单运行时
638    pub fn new() -> Self {
639        Self {
640            agents: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
641            agent_roles: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
642            message_bus: std::sync::Arc::new(SimpleMessageBus::new()),
643        }
644    }
645
646    /// 注册智能体
647    pub async fn register_agent(
648        &self,
649        metadata: AgentMetadata,
650        config: AgentConfig,
651        role: &str,
652    ) -> anyhow::Result<tokio::sync::mpsc::Receiver<AgentEvent>> {
653        let agent_id = metadata.id.clone();
654        let (tx, rx) = tokio::sync::mpsc::channel(100);
655
656        // 注册到消息总线
657        self.message_bus.register(&agent_id, tx.clone()).await;
658
659        // 添加智能体信息
660        let mut agents = self.agents.write().await;
661        agents.insert(
662            agent_id.clone(),
663            SimpleAgentInfo {
664                metadata,
665                config,
666                event_tx: tx,
667            },
668        );
669
670        // 记录角色
671        let mut roles = self.agent_roles.write().await;
672        roles.insert(agent_id.clone(), role.to_string());
673
674        tracing::info!("Agent {} registered with role {}", agent_id, role);
675        Ok(rx)
676    }
677
678    /// 获取消息总线
679    pub fn message_bus(&self) -> &std::sync::Arc<SimpleMessageBus> {
680        &self.message_bus
681    }
682
683    /// 获取指定角色的智能体列表
684    pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
685        let roles = self.agent_roles.read().await;
686        roles
687            .iter()
688            .filter(|(_, r)| *r == role)
689            .map(|(id, _)| id.clone())
690            .collect()
691    }
692
693    /// 发送消息给指定智能体
694    pub async fn send_to_agent(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
695        self.message_bus.send_to(target_id, event).await
696    }
697
698    /// 广播消息给所有智能体
699    pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
700        self.message_bus.broadcast(event).await
701    }
702
703    /// 发布到主题
704    pub async fn publish_to_topic(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
705        self.message_bus.publish(topic, event).await
706    }
707
708    /// 订阅主题
709    pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> anyhow::Result<()> {
710        self.message_bus.subscribe(agent_id, topic).await;
711        Ok(())
712    }
713
714    /// 停止所有智能体
715    pub async fn stop_all(&self) -> anyhow::Result<()> {
716        self.message_bus.broadcast(AgentEvent::Shutdown).await?;
717        tracing::info!("SimpleRuntime stopped");
718        Ok(())
719    }
720}
721
722#[cfg(not(feature = "dora"))]
723impl Default for SimpleRuntime {
724    fn default() -> Self {
725        Self::new()
726    }
727}
728
729/// 智能体节点存储类型
730#[cfg(feature = "dora")]
731type AgentNodeMap = HashMap<String, Arc<DoraAgentNode>>;
732
733/// MoFA 运行时 - 管理多个智能体的协同运行
734#[cfg(feature = "dora")]
735pub struct MoFARuntime {
736    dataflow: Option<DoraDataflow>,
737    channel: Arc<DoraChannel>,
738    agents: Arc<RwLock<AgentNodeMap>>,
739    agent_roles: Arc<RwLock<HashMap<String, String>>>,
740}
741
742#[cfg(feature = "dora")]
743impl MoFARuntime {
744    /// 创建新的运行时
745    pub async fn new() -> Self {
746        let channel_config = ChannelConfig::default();
747        Self {
748            dataflow: None,
749            channel: Arc::new(DoraChannel::new(channel_config)),
750            agents: Arc::new(RwLock::new(HashMap::new())),
751            agent_roles: Arc::new(RwLock::new(HashMap::new())),
752        }
753    }
754
755    /// 使用 Dataflow 配置创建运行时
756    pub async fn with_dataflow(dataflow_config: DataflowConfig) -> Self {
757        let dataflow = DoraDataflow::new(dataflow_config);
758        let channel_config = ChannelConfig::default();
759        Self {
760            dataflow: Some(dataflow),
761            channel: Arc::new(DoraChannel::new(channel_config)),
762            agents: Arc::new(RwLock::new(HashMap::new())),
763            agent_roles: Arc::new(RwLock::new(HashMap::new())),
764        }
765    }
766
767    /// 注册智能体节点
768    pub async fn register_agent(&self, node: DoraAgentNode, role: &str) -> DoraResult<()> {
769        let agent_id = node.config().node_id.clone();
770
771        // 注册到通道
772        self.channel.register_agent(&agent_id).await?;
773
774        // 添加到 dataflow(如果存在)
775        if let Some(ref dataflow) = self.dataflow {
776            dataflow.add_node(node).await?;
777        } else {
778            let mut agents: tokio::sync::RwLockWriteGuard<'_, AgentNodeMap> =
779                self.agents.write().await;
780            agents.insert(agent_id.clone(), Arc::new(node));
781        }
782
783        // 记录角色
784        let mut roles = self.agent_roles.write().await;
785        roles.insert(agent_id.clone(), role.to_string());
786
787        info!("Agent {} registered with role {}", agent_id, role);
788        Ok(())
789    }
790
791    /// 连接两个智能体
792    pub async fn connect_agents(
793        &self,
794        source_id: &str,
795        source_output: &str,
796        target_id: &str,
797        target_input: &str,
798    ) -> DoraResult<()> {
799        if let Some(ref dataflow) = self.dataflow {
800            dataflow
801                .connect(source_id, source_output, target_id, target_input)
802                .await?;
803        }
804        Ok(())
805    }
806
807    /// 获取通道
808    pub fn channel(&self) -> &Arc<DoraChannel> {
809        &self.channel
810    }
811
812    /// 获取指定角色的智能体列表
813    pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
814        let roles = self.agent_roles.read().await;
815        roles
816            .iter()
817            .filter(|(_, r)| *r == role)
818            .map(|(id, _)| id.clone())
819            .collect()
820    }
821
822    /// 发送消息给指定智能体
823    pub async fn send_to_agent(
824        &self,
825        sender_id: &str,
826        receiver_id: &str,
827        message: &AgentMessage,
828    ) -> DoraResult<()> {
829        let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.to(receiver_id);
830        self.channel.send_p2p(envelope).await
831    }
832
833    /// 广播消息给所有智能体
834    pub async fn broadcast(&self, sender_id: &str, message: &AgentMessage) -> DoraResult<()> {
835        let envelope = MessageEnvelope::from_agent_message(sender_id, message)?;
836        self.channel.broadcast(envelope).await
837    }
838
839    /// 发布到主题
840    pub async fn publish_to_topic(
841        &self,
842        sender_id: &str,
843        topic: &str,
844        message: &AgentMessage,
845    ) -> DoraResult<()> {
846        let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.with_topic(topic);
847        self.channel.publish(envelope).await
848    }
849
850    /// 订阅主题
851    pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> DoraResult<()> {
852        self.channel.subscribe(agent_id, topic).await
853    }
854
855    /// 构建并启动运行时
856    pub async fn build_and_start(&self) -> DoraResult<()> {
857        if let Some(ref dataflow) = self.dataflow {
858            dataflow.build().await?;
859            dataflow.start().await?;
860        } else {
861            // 初始化所有独立注册的智能体
862            let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
863            for (id, node) in agents.iter() {
864                node.init().await?;
865                debug!("Agent {} initialized", id);
866            }
867        }
868        info!("MoFARuntime started");
869        Ok(())
870    }
871
872    /// 停止运行时
873    pub async fn stop(&self) -> DoraResult<()> {
874        if let Some(ref dataflow) = self.dataflow {
875            dataflow.stop().await?;
876        } else {
877            let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
878            for node in agents.values() {
879                node.stop().await?;
880            }
881        }
882        info!("MoFARuntime stopped");
883        Ok(())
884    }
885
886    /// 暂停运行时
887    pub async fn pause(&self) -> DoraResult<()> {
888        if let Some(ref dataflow) = self.dataflow {
889            dataflow.pause().await?;
890        }
891        Ok(())
892    }
893
894    /// 恢复运行时
895    pub async fn resume(&self) -> DoraResult<()> {
896        if let Some(ref dataflow) = self.dataflow {
897            dataflow.resume().await?;
898        }
899        Ok(())
900    }
901}