Skip to main content

mofa_runtime/
lib.rs

1#[cfg(feature = "monitoring")]
2pub use mofa_monitoring::*;
3
4// =============================================================================
5// MoFA Runtime - Agent Lifecycle and Execution Management
6// =============================================================================
7//
8// This module provides runtime infrastructure for managing agent execution.
9// It follows microkernel architecture principles by depending only on the
10// kernel layer for core abstractions.
11//
12// Main Components:
13// - AgentBuilder: Builder pattern for constructing agents
14// - SimpleRuntime: Multi-agent coordination (non-dora mode)
15// - AgentRuntime: Dora-rs integration (with `dora` feature)
16// - run_agents: Simplified agent execution helper
17//
18// =============================================================================
19
20pub mod agent;
21pub mod builder;
22pub mod config;
23pub mod interrupt;
24pub mod runner;
25
26// Dora adapter module (only compiled when dora feature is enabled)
27#[cfg(feature = "dora")]
28pub mod dora_adapter;
29
30// =============================================================================
31// Re-exports from Kernel (minimal, only what runtime needs)
32// =============================================================================
33//
34// Runtime needs these core types from kernel for its functionality:
35// - MoFAAgent: Core agent trait that runtime executes
36// - AgentConfig: Configuration structure
37// - AgentMetadata: Agent metadata
38// - AgentEvent, AgentMessage: Event and message types
39// - AgentPlugin: Plugin trait for extensibility
40//
41// These are re-exported for user convenience when working with runtime APIs.
42// =============================================================================
43
44pub use interrupt::*;
45
46// Core agent trait - runtime executes agents implementing this trait
47pub use mofa_kernel::agent::MoFAAgent;
48
49pub use mofa_kernel::agent::AgentMetadata;
50// Core types needed for runtime operations
51pub use mofa_kernel::core::AgentConfig;
52pub use mofa_kernel::message::{AgentEvent, AgentMessage};
53
54// Plugin system - runtime supports plugins
55pub use mofa_kernel::plugin::AgentPlugin;
56
57// Import from mofa-foundation
58// Import from mofa-kernel
59
60// Import from mofa-plugins
61use mofa_plugins::AgentPlugin as PluginAgent;
62
63// External dependencies
64use std::collections::HashMap;
65use std::time::Duration;
66
67// Dora feature dependencies
68#[cfg(feature = "dora")]
69use crate::dora_adapter::{
70    ChannelConfig, DataflowConfig, DoraAgentNode, DoraChannel, DoraDataflow, DoraError,
71    DoraNodeConfig, DoraResult, MessageEnvelope,
72};
73#[cfg(feature = "dora")]
74use std::sync::Arc;
75#[cfg(feature = "dora")]
76use tokio::sync::RwLock;
77#[cfg(feature = "dora")]
78use tracing::{debug, info};
79
80// Private import for internal use
81use mofa_kernel::message::StreamType;
82
83/// 智能体构建器 - 提供流式 API
84pub struct AgentBuilder {
85    agent_id: String,
86    name: String,
87    capabilities: Vec<String>,
88    dependencies: Vec<String>,
89    plugins: Vec<Box<dyn PluginAgent>>,
90    node_config: HashMap<String, String>,
91    inputs: Vec<String>,
92    outputs: Vec<String>,
93    max_concurrent_tasks: usize,
94    default_timeout: Duration,
95}
96// ------------------------------
97// 简化的 SDK API
98// ------------------------------
99
100pub use crate::runner::run_agents;
101
102impl AgentBuilder {
103    /// 创建新的 AgentBuilder
104    pub fn new(agent_id: &str, name: &str) -> Self {
105        Self {
106            agent_id: agent_id.to_string(),
107            name: name.to_string(),
108            capabilities: Vec::new(),
109            dependencies: Vec::new(),
110            plugins: Vec::new(),
111            node_config: HashMap::new(),
112            inputs: vec!["task_input".to_string()],
113            outputs: vec!["task_output".to_string()],
114            max_concurrent_tasks: 10,
115            default_timeout: Duration::from_secs(30),
116        }
117    }
118
119    /// 添加能力
120    pub fn with_capability(mut self, capability: &str) -> Self {
121        self.capabilities.push(capability.to_string());
122        self
123    }
124
125    /// 添加多个能力
126    pub fn with_capabilities(mut self, capabilities: Vec<&str>) -> Self {
127        for cap in capabilities {
128            self.capabilities.push(cap.to_string());
129        }
130        self
131    }
132
133    /// 添加依赖
134    pub fn with_dependency(mut self, dependency: &str) -> Self {
135        self.dependencies.push(dependency.to_string());
136        self
137    }
138
139    /// 添加插件
140    pub fn with_plugin(mut self, plugin: Box<dyn AgentPlugin>) -> Self {
141        self.plugins.push(plugin);
142        self
143    }
144
145    /// 添加输入端口
146    pub fn with_input(mut self, input: &str) -> Self {
147        self.inputs.push(input.to_string());
148        self
149    }
150
151    /// 添加输出端口
152    pub fn with_output(mut self, output: &str) -> Self {
153        self.outputs.push(output.to_string());
154        self
155    }
156
157    /// 设置最大并发任务数
158    pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
159        self.max_concurrent_tasks = max;
160        self
161    }
162
163    /// 设置默认超时
164    pub fn with_timeout(mut self, timeout: Duration) -> Self {
165        self.default_timeout = timeout;
166        self
167    }
168
169    /// 添加自定义配置
170    pub fn with_config(mut self, key: &str, value: &str) -> Self {
171        self.node_config.insert(key.to_string(), value.to_string());
172        self
173    }
174
175    /// 构建智能体配置
176    pub fn build_config(&self) -> AgentConfig {
177        AgentConfig {
178            agent_id: self.agent_id.clone(),
179            name: self.name.clone(),
180            node_config: self.node_config.clone(),
181        }
182    }
183
184    /// 构建元数据
185    pub fn build_metadata(&self) -> AgentMetadata {
186        use mofa_kernel::agent::AgentCapabilities;
187        use mofa_kernel::agent::AgentState;
188
189        // 将 Vec<String> 转换为 AgentCapabilities
190        let agent_capabilities = AgentCapabilities::builder()
191            .tags(self.capabilities.clone())
192            .build();
193
194        AgentMetadata {
195            id: self.agent_id.clone(),
196            name: self.name.clone(),
197            description: None,
198            version: None,
199            capabilities: agent_capabilities,
200            state: AgentState::Created,
201        }
202    }
203
204    /// 构建 DoraNodeConfig
205    #[cfg(feature = "dora")]
206    pub fn build_node_config(&self) -> DoraNodeConfig {
207        DoraNodeConfig {
208            node_id: self.agent_id.clone(),
209            name: self.name.clone(),
210            inputs: self.inputs.clone(),
211            outputs: self.outputs.clone(),
212            event_buffer_size: self.max_concurrent_tasks * 10,
213            default_timeout: self.default_timeout,
214            custom_config: self.node_config.clone(),
215        }
216    }
217
218    /// 使用提供的 MoFAAgent 实现构建运行时
219    #[cfg(feature = "dora")]
220    pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
221        let node_config = self.build_node_config();
222        let metadata = self.build_metadata();
223        let config = self.build_config();
224
225        let node = DoraAgentNode::new(node_config);
226        let interrupt = node.interrupt().clone();
227
228        Ok(AgentRuntime {
229            agent,
230            node: Arc::new(node),
231            metadata,
232            config,
233            interrupt,
234            plugins: self.plugins,
235        })
236    }
237
238    /// 构建并启动智能体(需要提供 MoFAAgent 实现)
239    #[cfg(feature = "dora")]
240    pub async fn build_and_start<A: MoFAAgent>(self, agent: A) -> DoraResult<AgentRuntime<A>> {
241        let runtime: AgentRuntime<A> = self.with_agent(agent).await?;
242        runtime.start().await?;
243        Ok(runtime)
244    }
245
246    /// 使用提供的 MoFAAgent 实现构建简单运行时(非 dora 模式)
247    #[cfg(not(feature = "dora"))]
248    pub async fn with_agent<A: MoFAAgent>(self, agent: A) -> anyhow::Result<SimpleAgentRuntime<A>> {
249        let metadata = self.build_metadata();
250        let config = self.build_config();
251        let interrupt = AgentInterrupt::new();
252
253        // 创建事件通道
254        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
255
256        Ok(SimpleAgentRuntime {
257            agent,
258            metadata,
259            config,
260            interrupt,
261            plugins: self.plugins,
262            inputs: self.inputs,
263            outputs: self.outputs,
264            max_concurrent_tasks: self.max_concurrent_tasks,
265            default_timeout: self.default_timeout,
266            event_tx,
267            event_rx: Some(event_rx),
268        })
269    }
270
271    /// 构建并启动智能体(非 dora 模式)
272    #[cfg(not(feature = "dora"))]
273    pub async fn build_and_start<A: MoFAAgent>(
274        self,
275        agent: A,
276    ) -> anyhow::Result<SimpleAgentRuntime<A>> {
277        let mut runtime = self.with_agent(agent).await?;
278        runtime.start().await?;
279        Ok(runtime)
280    }
281}
282
283/// 智能体运行时
284#[cfg(feature = "dora")]
285pub struct AgentRuntime<A: MoFAAgent> {
286    agent: A,
287    node: Arc<DoraAgentNode>,
288    metadata: AgentMetadata,
289    config: AgentConfig,
290    interrupt: AgentInterrupt,
291    plugins: Vec<Box<dyn AgentPlugin>>,
292}
293
294#[cfg(feature = "dora")]
295impl<A: MoFAAgent> AgentRuntime<A> {
296    /// 获取智能体引用
297    pub fn agent(&self) -> &A {
298        &self.agent
299    }
300
301    /// 获取可变智能体引用
302    pub fn agent_mut(&mut self) -> &mut A {
303        &mut self.agent
304    }
305
306    /// 获取节点
307    pub fn node(&self) -> &Arc<DoraAgentNode> {
308        &self.node
309    }
310
311    /// 获取元数据
312    pub fn metadata(&self) -> &AgentMetadata {
313        &self.metadata
314    }
315
316    /// 获取配置
317    pub fn config(&self) -> &AgentConfig {
318        &self.config
319    }
320
321    /// 获取中断句柄
322    pub fn interrupt(&self) -> &AgentInterrupt {
323        &self.interrupt
324    }
325
326    /// 初始化插件
327    pub async fn init_plugins(&mut self) -> DoraResult<()> {
328        for plugin in &mut self.plugins {
329            plugin
330                .init_plugin()
331                .await
332                .map_err(|e| DoraError::OperatorError(e.to_string()))?;
333        }
334        Ok(())
335    }
336
337    /// 启动运行时
338    pub async fn start(&self) -> DoraResult<()> {
339        self.node.init().await?;
340        info!("AgentRuntime {} started", self.metadata.id);
341        Ok(())
342    }
343
344    /// 运行事件循环
345    pub async fn run_event_loop(&mut self) -> DoraResult<()> {
346        // 创建 CoreAgentContext 并初始化智能体
347        let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
348        self.agent
349            .initialize(&context)
350            .await
351            .map_err(|e| DoraError::Internal(e.to_string()))?;
352
353        // 初始化插件
354        self.init_plugins().await?;
355
356        let event_loop = self.node.create_event_loop();
357
358        loop {
359            // 检查中断
360            if event_loop.should_interrupt() {
361                debug!("Interrupt signal received for {}", self.metadata.id);
362                self.interrupt.reset();
363            }
364
365            // 获取下一个事件
366            match event_loop.next_event().await {
367                Some(AgentEvent::Shutdown) => {
368                    info!("Received shutdown event");
369                    break;
370                }
371                Some(event) => {
372                    // 处理事件前检查中断
373                    if self.interrupt.check() {
374                        debug!("Interrupt signal received for {}", self.metadata.id);
375                        self.interrupt.reset();
376                    }
377
378                    // 将事件转换为输入并使用 execute
379                    use mofa_kernel::agent::types::AgentInput;
380                    use mofa_kernel::message::TaskRequest;
381
382                    let input = match event.clone() {
383                        AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
384                        AgentEvent::Custom(data, _) => AgentInput::text(data),
385                        _ => AgentInput::text(format!("{:?}", event)),
386                    };
387
388                    self.agent
389                        .execute(input, &context)
390                        .await
391                        .map_err(|e| DoraError::Internal(e.to_string()))?;
392                }
393                None => {
394                    // 无事件,继续等待
395                    tokio::time::sleep(Duration::from_millis(10)).await;
396                }
397            }
398        }
399
400        // 关闭智能体
401        self.agent
402            .shutdown()
403            .await
404            .map_err(|e| DoraError::Internal(e.to_string()))?;
405
406        Ok(())
407    }
408
409    /// 停止运行时
410    pub async fn stop(&self) -> DoraResult<()> {
411        self.interrupt.trigger();
412        self.node.stop().await?;
413        info!("AgentRuntime {} stopped", self.metadata.id);
414        Ok(())
415    }
416
417    /// 发送消息到输出
418    pub async fn send_output(&self, output_id: &str, message: &AgentMessage) -> DoraResult<()> {
419        self.node.send_message(output_id, message).await
420    }
421
422    /// 注入事件
423    pub async fn inject_event(&self, event: AgentEvent) -> DoraResult<()> {
424        self.node.inject_event(event).await
425    }
426}
427
428// ============================================================================
429// 非 dora 运行时实现 - SimpleAgentRuntime
430// ============================================================================
431
432/// 简单智能体运行时 - 不依赖 dora-rs 的轻量级运行时
433#[cfg(not(feature = "dora"))]
434pub struct SimpleAgentRuntime<A: MoFAAgent> {
435    agent: A,
436    metadata: AgentMetadata,
437    config: AgentConfig,
438    interrupt: AgentInterrupt,
439    plugins: Vec<Box<dyn AgentPlugin>>,
440    inputs: Vec<String>,
441    outputs: Vec<String>,
442    max_concurrent_tasks: usize,
443    default_timeout: Duration,
444    // 添加事件通道
445    event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
446    event_rx: Option<tokio::sync::mpsc::Receiver<AgentEvent>>,
447}
448
449#[cfg(not(feature = "dora"))]
450impl<A: MoFAAgent> SimpleAgentRuntime<A> {
451    pub async fn inject_event(&self, event: AgentEvent) {
452        // 将事件发送到事件通道
453        let _ = self.event_tx.send(event).await;
454    }
455}
456
457#[cfg(not(feature = "dora"))]
458#[cfg(not(feature = "dora"))]
459impl<A: MoFAAgent> SimpleAgentRuntime<A> {
460    /// 获取智能体引用
461    pub fn agent(&self) -> &A {
462        &self.agent
463    }
464
465    /// 获取可变智能体引用
466    pub fn agent_mut(&mut self) -> &mut A {
467        &mut self.agent
468    }
469
470    /// 获取元数据
471    pub fn metadata(&self) -> &AgentMetadata {
472        &self.metadata
473    }
474
475    /// 获取配置
476    pub fn config(&self) -> &AgentConfig {
477        &self.config
478    }
479
480    /// 获取中断句柄
481    pub fn interrupt(&self) -> &AgentInterrupt {
482        &self.interrupt
483    }
484
485    /// 获取输入端口列表
486    pub fn inputs(&self) -> &[String] {
487        &self.inputs
488    }
489
490    /// 获取输出端口列表
491    pub fn outputs(&self) -> &[String] {
492        &self.outputs
493    }
494
495    /// 获取最大并发任务数
496    pub fn max_concurrent_tasks(&self) -> usize {
497        self.max_concurrent_tasks
498    }
499
500    /// 获取默认超时时间
501    pub fn default_timeout(&self) -> Duration {
502        self.default_timeout
503    }
504
505    /// 初始化插件
506    pub async fn init_plugins(&mut self) -> anyhow::Result<()> {
507        for plugin in &mut self.plugins {
508            plugin.init_plugin().await?;
509        }
510        Ok(())
511    }
512
513    /// 启动运行时
514    pub async fn start(&mut self) -> anyhow::Result<()> {
515        // 创建 CoreAgentContext
516        let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
517
518        // 初始化智能体 - 使用 MoFAAgent 的 initialize 方法
519        self.agent.initialize(&context).await?;
520        // 初始化插件
521        self.init_plugins().await?;
522        tracing::info!("SimpleAgentRuntime {} started", self.metadata.id);
523        Ok(())
524    }
525
526    /// 处理单个事件
527    pub async fn handle_event(&mut self, event: AgentEvent) -> anyhow::Result<()> {
528        // 检查中断 - 注意:MoFAAgent 没有 on_interrupt 方法
529        // 中断处理需要由 Agent 内部自行处理或通过 AgentMessaging 扩展
530        if self.interrupt.check() {
531            // 中断信号,可以选择停止或通知 agent
532            tracing::debug!("Interrupt signal received for {}", self.metadata.id);
533            self.interrupt.reset();
534        }
535
536        // 将事件转换为输入并使用 execute
537        use mofa_kernel::agent::types::AgentInput;
538
539        let context = mofa_kernel::agent::AgentContext::new(self.metadata.id.clone());
540
541        // 尝试将事件转换为输入
542        let input = match event {
543            AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
544            AgentEvent::Shutdown => {
545                tracing::info!("Shutdown event received for {}", self.metadata.id);
546                return Ok(());
547            }
548            AgentEvent::Custom(data, _) => AgentInput::text(data),
549            _ => AgentInput::text(format!("{:?}", event)),
550        };
551
552        let _output = self.agent.execute(input, &context).await?;
553        Ok(())
554    }
555
556    /// 运行事件循环(使用内部事件接收器)
557    pub async fn run(&mut self) -> anyhow::Result<()> {
558        // 获取内部事件接收器
559        let event_rx = self
560            .event_rx
561            .take()
562            .ok_or_else(|| anyhow::anyhow!("Event receiver already taken"))?;
563
564        self.run_with_receiver(event_rx).await
565    }
566
567    /// 运行事件循环(使用事件通道)
568    pub async fn run_with_receiver(
569        &mut self,
570        mut event_rx: tokio::sync::mpsc::Receiver<AgentEvent>,
571    ) -> anyhow::Result<()> {
572        loop {
573            // 检查中断
574            if self.interrupt.check() {
575                tracing::debug!("Interrupt signal received for {}", self.metadata.id);
576                self.interrupt.reset();
577            }
578
579            // 等待事件
580            match tokio::time::timeout(Duration::from_millis(100), event_rx.recv()).await {
581                Ok(Some(AgentEvent::Shutdown)) => {
582                    tracing::info!("Received shutdown event");
583                    break;
584                }
585                Ok(Some(event)) => {
586                    // 使用 handle_event 方法(它会将事件转换为 execute 调用)
587                    self.handle_event(event).await?;
588                }
589                Ok(None) => {
590                    // 通道关闭
591                    break;
592                }
593                Err(_) => {
594                    // 超时,继续等待
595                    continue;
596                }
597            }
598        }
599
600        // 关闭智能体 - 使用 shutdown 而不是 destroy
601        self.agent.shutdown().await?;
602        Ok(())
603    }
604
605    /// 停止运行时
606    pub async fn stop(&mut self) -> anyhow::Result<()> {
607        self.interrupt.trigger();
608        self.agent.shutdown().await?;
609        tracing::info!("SimpleAgentRuntime {} stopped", self.metadata.id);
610        Ok(())
611    }
612
613    /// 触发中断
614    pub fn trigger_interrupt(&self) {
615        self.interrupt.trigger();
616    }
617}
618
619// ============================================================================
620// 简单多智能体运行时 - SimpleRuntime
621// ============================================================================
622
623/// 简单运行时 - 管理多个智能体的协同运行(非 dora 版本)
624#[cfg(not(feature = "dora"))]
625pub struct SimpleRuntime {
626    agents: std::sync::Arc<tokio::sync::RwLock<HashMap<String, SimpleAgentInfo>>>,
627    agent_roles: std::sync::Arc<tokio::sync::RwLock<HashMap<String, String>>>,
628    message_bus: std::sync::Arc<SimpleMessageBus>,
629}
630
631/// 智能体信息
632#[cfg(not(feature = "dora"))]
633pub struct SimpleAgentInfo {
634    pub metadata: AgentMetadata,
635    pub config: AgentConfig,
636    pub event_tx: tokio::sync::mpsc::Sender<AgentEvent>,
637}
638
639/// 流状态信息
640#[cfg(not(feature = "dora"))]
641#[derive(Debug, Clone)]
642pub struct StreamInfo {
643    pub stream_id: String,
644    pub stream_type: StreamType,
645    pub metadata: HashMap<String, String>,
646    pub subscribers: Vec<String>,
647    pub sequence: u64,
648    pub is_paused: bool,
649}
650
651/// 简单消息总线
652#[cfg(not(feature = "dora"))]
653pub struct SimpleMessageBus {
654    subscribers: tokio::sync::RwLock<HashMap<String, Vec<tokio::sync::mpsc::Sender<AgentEvent>>>>,
655    topic_subscribers: tokio::sync::RwLock<HashMap<String, Vec<String>>>,
656    // 流支持
657    streams: tokio::sync::RwLock<HashMap<String, StreamInfo>>,
658}
659
660#[cfg(not(feature = "dora"))]
661impl SimpleMessageBus {
662    /// 创建新的消息总线
663    pub fn new() -> Self {
664        Self {
665            subscribers: tokio::sync::RwLock::new(HashMap::new()),
666            topic_subscribers: tokio::sync::RwLock::new(HashMap::new()),
667            streams: tokio::sync::RwLock::new(HashMap::new()),
668        }
669    }
670
671    /// 注册智能体
672    pub async fn register(&self, agent_id: &str, tx: tokio::sync::mpsc::Sender<AgentEvent>) {
673        let mut subs = self.subscribers.write().await;
674        subs.entry(agent_id.to_string())
675            .or_insert_with(Vec::new)
676            .push(tx);
677    }
678
679    /// 订阅主题
680    pub async fn subscribe(&self, agent_id: &str, topic: &str) {
681        let mut topics = self.topic_subscribers.write().await;
682        topics
683            .entry(topic.to_string())
684            .or_insert_with(Vec::new)
685            .push(agent_id.to_string());
686    }
687
688    /// 发送点对点消息
689    pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
690        let subs = self.subscribers.read().await;
691        if let Some(senders) = subs.get(target_id) {
692            for tx in senders {
693                let _ = tx.send(event.clone()).await;
694            }
695        }
696        Ok(())
697    }
698
699    /// 广播消息给所有智能体
700    pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
701        let subs = self.subscribers.read().await;
702        for senders in subs.values() {
703            for tx in senders {
704                let _ = tx.send(event.clone()).await;
705            }
706        }
707        Ok(())
708    }
709
710    /// 发布到主题
711    pub async fn publish(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
712        let topics = self.topic_subscribers.read().await;
713        if let Some(agent_ids) = topics.get(topic) {
714            let subs = self.subscribers.read().await;
715            for agent_id in agent_ids {
716                if let Some(senders) = subs.get(agent_id) {
717                    for tx in senders {
718                        let _ = tx.send(event.clone()).await;
719                    }
720                }
721            }
722        }
723        Ok(())
724    }
725
726    // ---------------------------------
727    // 流支持方法
728    // ---------------------------------
729
730    /// 创建流
731    pub async fn create_stream(
732        &self,
733        stream_id: &str,
734        stream_type: StreamType,
735        metadata: HashMap<String, String>,
736    ) -> anyhow::Result<()> {
737        let mut streams = self.streams.write().await;
738        if streams.contains_key(stream_id) {
739            return Err(anyhow::anyhow!("Stream {} already exists", stream_id));
740        }
741
742        // 创建流信息
743        let stream_info = StreamInfo {
744            stream_id: stream_id.to_string(),
745            stream_type: stream_type.clone(),
746            metadata: metadata.clone(),
747            subscribers: Vec::new(),
748            sequence: 0,
749            is_paused: false,
750        };
751
752        streams.insert(stream_id.to_string(), stream_info.clone());
753
754        // 广播流创建事件
755        self.broadcast(AgentEvent::StreamCreated {
756            stream_id: stream_id.to_string(),
757            stream_type,
758            metadata,
759        })
760        .await
761    }
762
763    /// 关闭流
764    pub async fn close_stream(&self, stream_id: &str, reason: &str) -> anyhow::Result<()> {
765        let mut streams = self.streams.write().await;
766        if let Some(stream_info) = streams.remove(stream_id) {
767            // 广播流关闭事件
768            let event = AgentEvent::StreamClosed {
769                stream_id: stream_id.to_string(),
770                reason: reason.to_string(),
771            };
772
773            // 通知所有订阅者
774            let subs = self.subscribers.read().await;
775            for agent_id in &stream_info.subscribers {
776                if let Some(senders) = subs.get(agent_id) {
777                    for tx in senders {
778                        let _ = tx.send(event.clone()).await;
779                    }
780                }
781            }
782        }
783        Ok(())
784    }
785
786    /// 订阅流
787    pub async fn subscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
788        let mut streams = self.streams.write().await;
789        if let Some(stream_info) = streams.get_mut(stream_id) {
790            // 检查是否已订阅
791            if !stream_info.subscribers.contains(&agent_id.to_string()) {
792                stream_info.subscribers.push(agent_id.to_string());
793
794                // 广播订阅事件
795                self.broadcast(AgentEvent::StreamSubscription {
796                    stream_id: stream_id.to_string(),
797                    subscriber_id: agent_id.to_string(),
798                })
799                .await?;
800            }
801        }
802        Ok(())
803    }
804
805    /// 取消订阅流
806    pub async fn unsubscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
807        let mut streams = self.streams.write().await;
808        if let Some(stream_info) = streams.get_mut(stream_id) {
809            // 移除订阅者
810            if let Some(pos) = stream_info.subscribers.iter().position(|id| id == agent_id) {
811                stream_info.subscribers.remove(pos);
812
813                // 广播取消订阅事件
814                self.broadcast(AgentEvent::StreamUnsubscription {
815                    stream_id: stream_id.to_string(),
816                    subscriber_id: agent_id.to_string(),
817                })
818                .await?;
819            }
820        }
821        Ok(())
822    }
823
824    /// 发送流消息
825    pub async fn send_stream_message(
826        &self,
827        stream_id: &str,
828        message: Vec<u8>,
829    ) -> anyhow::Result<()> {
830        let mut streams = self.streams.write().await;
831        if let Some(stream_info) = streams.get_mut(stream_id) {
832            // 如果流被暂停,直接返回
833            if stream_info.is_paused {
834                return Ok(());
835            }
836
837            // 生成序列号
838            let sequence = stream_info.sequence;
839            stream_info.sequence += 1;
840
841            // 构造流消息事件
842            let event = AgentEvent::StreamMessage {
843                stream_id: stream_id.to_string(),
844                message,
845                sequence,
846            };
847
848            // 发送给所有订阅者
849            let subs = self.subscribers.read().await;
850            for agent_id in &stream_info.subscribers {
851                if let Some(senders) = subs.get(agent_id) {
852                    for tx in senders {
853                        let _ = tx.send(event.clone()).await;
854                    }
855                }
856            }
857        }
858        Ok(())
859    }
860
861    /// 暂停流
862    pub async fn pause_stream(&self, stream_id: &str) -> anyhow::Result<()> {
863        let mut streams = self.streams.write().await;
864        if let Some(stream_info) = streams.get_mut(stream_id) {
865            stream_info.is_paused = true;
866        }
867        Ok(())
868    }
869
870    /// 恢复流
871    pub async fn resume_stream(&self, stream_id: &str) -> anyhow::Result<()> {
872        let mut streams = self.streams.write().await;
873        if let Some(stream_info) = streams.get_mut(stream_id) {
874            stream_info.is_paused = false;
875        }
876        Ok(())
877    }
878
879    /// 获取流信息
880    pub async fn get_stream_info(&self, stream_id: &str) -> anyhow::Result<Option<StreamInfo>> {
881        let streams = self.streams.read().await;
882        Ok(streams.get(stream_id).cloned())
883    }
884}
885
886#[cfg(not(feature = "dora"))]
887impl Default for SimpleMessageBus {
888    fn default() -> Self {
889        Self::new()
890    }
891}
892
893#[cfg(not(feature = "dora"))]
894impl SimpleRuntime {
895    /// 创建新的简单运行时
896    pub fn new() -> Self {
897        Self {
898            agents: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
899            agent_roles: std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())),
900            message_bus: std::sync::Arc::new(SimpleMessageBus::new()),
901        }
902    }
903
904    /// 注册智能体
905    pub async fn register_agent(
906        &self,
907        metadata: AgentMetadata,
908        config: AgentConfig,
909        role: &str,
910    ) -> anyhow::Result<tokio::sync::mpsc::Receiver<AgentEvent>> {
911        let agent_id = metadata.id.clone();
912        let (tx, rx) = tokio::sync::mpsc::channel(100);
913
914        // 注册到消息总线
915        self.message_bus.register(&agent_id, tx.clone()).await;
916
917        // 添加智能体信息
918        let mut agents = self.agents.write().await;
919        agents.insert(
920            agent_id.clone(),
921            SimpleAgentInfo {
922                metadata,
923                config,
924                event_tx: tx,
925            },
926        );
927
928        // 记录角色
929        let mut roles = self.agent_roles.write().await;
930        roles.insert(agent_id.clone(), role.to_string());
931
932        tracing::info!("Agent {} registered with role {}", agent_id, role);
933        Ok(rx)
934    }
935
936    /// 获取消息总线
937    pub fn message_bus(&self) -> &std::sync::Arc<SimpleMessageBus> {
938        &self.message_bus
939    }
940
941    /// 获取指定角色的智能体列表
942    pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
943        let roles = self.agent_roles.read().await;
944        roles
945            .iter()
946            .filter(|(_, r)| *r == role)
947            .map(|(id, _)| id.clone())
948            .collect()
949    }
950
951    /// 发送消息给指定智能体
952    pub async fn send_to_agent(&self, target_id: &str, event: AgentEvent) -> anyhow::Result<()> {
953        self.message_bus.send_to(target_id, event).await
954    }
955
956    /// 广播消息给所有智能体
957    pub async fn broadcast(&self, event: AgentEvent) -> anyhow::Result<()> {
958        self.message_bus.broadcast(event).await
959    }
960
961    /// 发布到主题
962    pub async fn publish_to_topic(&self, topic: &str, event: AgentEvent) -> anyhow::Result<()> {
963        self.message_bus.publish(topic, event).await
964    }
965
966    /// 订阅主题
967    pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> anyhow::Result<()> {
968        self.message_bus.subscribe(agent_id, topic).await;
969        Ok(())
970    }
971
972    // ---------------------------------
973    // 流支持方法
974    // ---------------------------------
975
976    /// 创建流
977    pub async fn create_stream(
978        &self,
979        stream_id: &str,
980        stream_type: StreamType,
981        metadata: std::collections::HashMap<String, String>,
982    ) -> anyhow::Result<()> {
983        self.message_bus
984            .create_stream(stream_id, stream_type, metadata)
985            .await
986    }
987
988    /// 关闭流
989    pub async fn close_stream(&self, stream_id: &str, reason: &str) -> anyhow::Result<()> {
990        self.message_bus.close_stream(stream_id, reason).await
991    }
992
993    /// 订阅流
994    pub async fn subscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
995        self.message_bus.subscribe_stream(agent_id, stream_id).await
996    }
997
998    /// 取消订阅流
999    pub async fn unsubscribe_stream(&self, agent_id: &str, stream_id: &str) -> anyhow::Result<()> {
1000        self.message_bus
1001            .unsubscribe_stream(agent_id, stream_id)
1002            .await
1003    }
1004
1005    /// 发送流消息
1006    pub async fn send_stream_message(
1007        &self,
1008        stream_id: &str,
1009        message: Vec<u8>,
1010    ) -> anyhow::Result<()> {
1011        self.message_bus
1012            .send_stream_message(stream_id, message)
1013            .await
1014    }
1015
1016    /// 暂停流
1017    pub async fn pause_stream(&self, stream_id: &str) -> anyhow::Result<()> {
1018        self.message_bus.pause_stream(stream_id).await
1019    }
1020
1021    /// 恢复流
1022    pub async fn resume_stream(&self, stream_id: &str) -> anyhow::Result<()> {
1023        self.message_bus.resume_stream(stream_id).await
1024    }
1025
1026    /// 获取流信息
1027    pub async fn get_stream_info(&self, stream_id: &str) -> anyhow::Result<Option<StreamInfo>> {
1028        self.message_bus.get_stream_info(stream_id).await
1029    }
1030
1031    /// 停止所有智能体
1032    pub async fn stop_all(&self) -> anyhow::Result<()> {
1033        self.message_bus.broadcast(AgentEvent::Shutdown).await?;
1034        tracing::info!("SimpleRuntime stopped");
1035        Ok(())
1036    }
1037}
1038
1039#[cfg(not(feature = "dora"))]
1040impl Default for SimpleRuntime {
1041    fn default() -> Self {
1042        Self::new()
1043    }
1044}
1045
1046/// 智能体节点存储类型
1047#[cfg(feature = "dora")]
1048type AgentNodeMap = HashMap<String, Arc<DoraAgentNode>>;
1049
1050/// MoFA 运行时 - 管理多个智能体的协同运行
1051#[cfg(feature = "dora")]
1052pub struct MoFARuntime {
1053    dataflow: Option<DoraDataflow>,
1054    channel: Arc<DoraChannel>,
1055    agents: Arc<RwLock<AgentNodeMap>>,
1056    agent_roles: Arc<RwLock<HashMap<String, String>>>,
1057}
1058
1059#[cfg(feature = "dora")]
1060impl MoFARuntime {
1061    /// 创建新的运行时
1062    pub async fn new() -> Self {
1063        let channel_config = ChannelConfig::default();
1064        Self {
1065            dataflow: None,
1066            channel: Arc::new(DoraChannel::new(channel_config)),
1067            agents: Arc::new(RwLock::new(HashMap::new())),
1068            agent_roles: Arc::new(RwLock::new(HashMap::new())),
1069        }
1070    }
1071
1072    /// 使用 Dataflow 配置创建运行时
1073    pub async fn with_dataflow(dataflow_config: DataflowConfig) -> Self {
1074        let dataflow = DoraDataflow::new(dataflow_config);
1075        let channel_config = ChannelConfig::default();
1076        Self {
1077            dataflow: Some(dataflow),
1078            channel: Arc::new(DoraChannel::new(channel_config)),
1079            agents: Arc::new(RwLock::new(HashMap::new())),
1080            agent_roles: Arc::new(RwLock::new(HashMap::new())),
1081        }
1082    }
1083
1084    /// 注册智能体节点
1085    pub async fn register_agent(&self, node: DoraAgentNode, role: &str) -> DoraResult<()> {
1086        let agent_id = node.config().node_id.clone();
1087
1088        // 注册到通道
1089        self.channel.register_agent(&agent_id).await?;
1090
1091        // 添加到 dataflow(如果存在)
1092        if let Some(ref dataflow) = self.dataflow {
1093            dataflow.add_node(node).await?;
1094        } else {
1095            let mut agents: tokio::sync::RwLockWriteGuard<'_, AgentNodeMap> =
1096                self.agents.write().await;
1097            agents.insert(agent_id.clone(), Arc::new(node));
1098        }
1099
1100        // 记录角色
1101        let mut roles = self.agent_roles.write().await;
1102        roles.insert(agent_id.clone(), role.to_string());
1103
1104        info!("Agent {} registered with role {}", agent_id, role);
1105        Ok(())
1106    }
1107
1108    /// 连接两个智能体
1109    pub async fn connect_agents(
1110        &self,
1111        source_id: &str,
1112        source_output: &str,
1113        target_id: &str,
1114        target_input: &str,
1115    ) -> DoraResult<()> {
1116        if let Some(ref dataflow) = self.dataflow {
1117            dataflow
1118                .connect(source_id, source_output, target_id, target_input)
1119                .await?;
1120        }
1121        Ok(())
1122    }
1123
1124    /// 获取通道
1125    pub fn channel(&self) -> &Arc<DoraChannel> {
1126        &self.channel
1127    }
1128
1129    /// 获取指定角色的智能体列表
1130    pub async fn get_agents_by_role(&self, role: &str) -> Vec<String> {
1131        let roles = self.agent_roles.read().await;
1132        roles
1133            .iter()
1134            .filter(|(_, r)| *r == role)
1135            .map(|(id, _)| id.clone())
1136            .collect()
1137    }
1138
1139    /// 发送消息给指定智能体
1140    pub async fn send_to_agent(
1141        &self,
1142        sender_id: &str,
1143        receiver_id: &str,
1144        message: &AgentMessage,
1145    ) -> DoraResult<()> {
1146        let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.to(receiver_id);
1147        self.channel.send_p2p(envelope).await
1148    }
1149
1150    /// 广播消息给所有智能体
1151    pub async fn broadcast(&self, sender_id: &str, message: &AgentMessage) -> DoraResult<()> {
1152        let envelope = MessageEnvelope::from_agent_message(sender_id, message)?;
1153        self.channel.broadcast(envelope).await
1154    }
1155
1156    /// 发布到主题
1157    pub async fn publish_to_topic(
1158        &self,
1159        sender_id: &str,
1160        topic: &str,
1161        message: &AgentMessage,
1162    ) -> DoraResult<()> {
1163        let envelope = MessageEnvelope::from_agent_message(sender_id, message)?.with_topic(topic);
1164        self.channel.publish(envelope).await
1165    }
1166
1167    /// 订阅主题
1168    pub async fn subscribe_topic(&self, agent_id: &str, topic: &str) -> DoraResult<()> {
1169        self.channel.subscribe(agent_id, topic).await
1170    }
1171
1172    /// 构建并启动运行时
1173    pub async fn build_and_start(&self) -> DoraResult<()> {
1174        if let Some(ref dataflow) = self.dataflow {
1175            dataflow.build().await?;
1176            dataflow.start().await?;
1177        } else {
1178            // 初始化所有独立注册的智能体
1179            let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
1180            for (id, node) in agents.iter() {
1181                node.init().await?;
1182                debug!("Agent {} initialized", id);
1183            }
1184        }
1185        info!("MoFARuntime started");
1186        Ok(())
1187    }
1188
1189    /// 停止运行时
1190    pub async fn stop(&self) -> DoraResult<()> {
1191        if let Some(ref dataflow) = self.dataflow {
1192            dataflow.stop().await?;
1193        } else {
1194            let agents: tokio::sync::RwLockReadGuard<'_, AgentNodeMap> = self.agents.read().await;
1195            for node in agents.values() {
1196                node.stop().await?;
1197            }
1198        }
1199        info!("MoFARuntime stopped");
1200        Ok(())
1201    }
1202
1203    /// 暂停运行时
1204    pub async fn pause(&self) -> DoraResult<()> {
1205        if let Some(ref dataflow) = self.dataflow {
1206            dataflow.pause().await?;
1207        }
1208        Ok(())
1209    }
1210
1211    /// 恢复运行时
1212    pub async fn resume(&self) -> DoraResult<()> {
1213        if let Some(ref dataflow) = self.dataflow {
1214            dataflow.resume().await?;
1215        }
1216        Ok(())
1217    }
1218}