Skip to main content

mofa_runtime/
runner.rs

1//! 统一 Agent 运行器
2//!
3//! 提供统一的 Agent 执行接口,可以运行任何实现了 `MoFAAgent` 的 Agent。
4//!
5//! # 架构
6//!
7//! ```text
8//! ┌─────────────────────────────────────────────────────────────────────┐
9//! │                      AgentRunner<T: MoFAAgent>                      │
10//! │  ┌─────────────────────────────────────────────────────────────┐   │
11//! │  │  状态管理                                                     │   │
12//! │  │  • RunnerState: Initializing, Running, Paused, Stopping      │   │
13//! │  └─────────────────────────────────────────────────────────────┘   │
14//! │  ┌─────────────────────────────────────────────────────────────┐   │
15//! │  │  执行模式                                                     │   │
16//! │  │  • Single: 单次执行                                           │   │
17//! │  │  • EventLoop: 事件循环(支持 AgentMessaging)                 │   │
18//! │  │  • Stream: 流式执行                                           │   │
19//! │  └─────────────────────────────────────────────────────────────┘   │
20//! └─────────────────────────────────────────────────────────────────────┘
21//! ```
22//!
23//! # 示例
24//!
25//! ## 基本使用
26//!
27//! ```rust,ignore
28//! use mofa_runtime::runner::AgentRunner;
29//! use mofa_runtime::agent::MoFAAgent;
30//!
31//! #[tokio::main]
32//! async fn main() -> AgentResult<()> {
33//!     let agent = MyAgent::new();
34//!     let mut runner = AgentRunner::new(agent).await?;
35//!
36//!     // 执行任务
37//!     let input = AgentInput::text("Hello, Agent!");
38//!     let output = runner.execute(input).await?;
39//!
40//!     // 关闭
41//!     runner.shutdown().await?;
42//!     Ok(())
43//! }
44//! ```
45//!
46//! ## 事件循环模式
47//!
48//! ```rust,ignore
49//! use mofa_runtime::runner::AgentRunner;
50//! use mofa_runtime::agent::{MoFAAgent, AgentMessaging};
51//!
52//! struct MyEventAgent { }
53//!
54//! #[async_trait]
55//! impl MoFAAgent for MyEventAgent { /* ... */ }
56//!
57//! #[async_trait]
58//! impl AgentMessaging for MyEventAgent { /* ... */ }
59//!
60//! #[tokio::main]
61//! async fn main() -> AgentResult<()> {
62//!     let agent = MyEventAgent::new();
63//!     let mut runner = AgentRunner::new(agent).await?;
64//!
65//!     // 运行事件循环
66//!     runner.run_event_loop().await?;
67//!
68//!     Ok(())
69//! }
70//! ```
71
72use crate::agent::capabilities::AgentCapabilities;
73use crate::agent::context::{AgentContext, AgentEvent};
74use crate::agent::core::{AgentLifecycle, AgentMessage, AgentMessaging, MoFAAgent};
75use crate::agent::error::{AgentError, AgentResult};
76use crate::agent::types::{AgentInput, AgentOutput, AgentState, InterruptResult};
77use std::sync::Arc;
78use tokio::sync::RwLock;
79
80/// 运行器状态
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum RunnerState {
83    /// 已创建
84    Created,
85    /// 初始化中
86    Initializing,
87    /// 运行中
88    Running,
89    /// 暂停
90    Paused,
91    /// 停止中
92    Stopping,
93    /// 已停止
94    Stopped,
95    /// 错误
96    Error,
97}
98
99/// 运行器统计信息
100#[derive(Debug, Clone, Default)]
101pub struct RunnerStats {
102    /// 总执行次数
103    pub total_executions: u64,
104    /// 成功次数
105    pub successful_executions: u64,
106    /// 失败次数
107    pub failed_executions: u64,
108    /// 平均执行时间(毫秒)
109    pub avg_execution_time_ms: f64,
110    /// 最后执行时间
111    pub last_execution_time_ms: Option<u64>,
112}
113
114/// 统一 Agent 运行器
115///
116/// 可以运行任何实现了 `MoFAAgent` 的 Agent。
117pub struct AgentRunner<T: MoFAAgent> {
118    /// Agent 实例
119    agent: T,
120    /// 执行上下文
121    context: AgentContext,
122    /// 运行器状态
123    state: Arc<RwLock<RunnerState>>,
124    /// 统计信息
125    stats: Arc<RwLock<RunnerStats>>,
126}
127
128impl<T: MoFAAgent> AgentRunner<T> {
129    /// 创建新的运行器
130    ///
131    /// 此方法会初始化 Agent。
132    pub async fn new(mut agent: T) -> AgentResult<Self> {
133        let context = AgentContext::new(agent.id().to_string());
134
135        // 初始化 Agent
136        agent
137            .initialize(&context)
138            .await
139            .map_err(|e| AgentError::InitializationFailed(e.to_string()))?;
140
141        Ok(Self {
142            agent,
143            context,
144            state: Arc::new(RwLock::new(RunnerState::Created)),
145            stats: Arc::new(RwLock::new(RunnerStats::default())),
146        })
147    }
148
149    /// 使用自定义上下文创建运行器
150    pub async fn with_context(mut agent: T, context: AgentContext) -> AgentResult<Self> {
151        agent
152            .initialize(&context)
153            .await
154            .map_err(|e| AgentError::InitializationFailed(e.to_string()))?;
155
156        Ok(Self {
157            agent,
158            context,
159            state: Arc::new(RwLock::new(RunnerState::Created)),
160            stats: Arc::new(RwLock::new(RunnerStats::default())),
161        })
162    }
163
164    /// 获取 Agent 引用
165    pub fn agent(&self) -> &T {
166        &self.agent
167    }
168
169    /// 获取 Agent 可变引用
170    pub fn agent_mut(&mut self) -> &mut T {
171        &mut self.agent
172    }
173
174    /// 获取执行上下文
175    pub fn context(&self) -> &AgentContext {
176        &self.context
177    }
178
179    /// 获取运行器状态
180    pub async fn state(&self) -> RunnerState {
181        *self.state.read().await
182    }
183
184    /// 获取统计信息
185    pub async fn stats(&self) -> RunnerStats {
186        self.stats.read().await.clone()
187    }
188
189    /// 检查是否正在运行
190    pub async fn is_running(&self) -> bool {
191        matches!(
192            *self.state.read().await,
193            RunnerState::Running | RunnerState::Paused
194        )
195    }
196
197    /// 执行单个任务
198    ///
199    /// # 参数
200    ///
201    /// - `input`: 输入数据
202    ///
203    /// # 返回
204    ///
205    /// 返回 Agent 的输出。
206    pub async fn execute(&mut self, input: AgentInput) -> AgentResult<AgentOutput> {
207        // 检查状态
208        let current_state = self.state().await;
209        if !matches!(
210            current_state,
211            RunnerState::Running | RunnerState::Created | RunnerState::Stopped
212        ) {
213            return Err(AgentError::ValidationFailed(format!(
214                "Cannot execute in state: {:?}",
215                current_state
216            )));
217        }
218
219        // 更新状态为运行中
220        *self.state.write().await = RunnerState::Running;
221
222        let start = std::time::Instant::now();
223
224        // 执行 Agent
225        let result = self.agent.execute(input, &self.context).await;
226
227        let duration = start.elapsed().as_millis() as u64;
228
229        // 更新统计信息
230        let mut stats = self.stats.write().await;
231        stats.total_executions += 1;
232        stats.last_execution_time_ms = Some(duration);
233
234        match &result {
235            Ok(_) => {
236                stats.successful_executions += 1;
237            }
238            Err(_) => {
239                stats.failed_executions += 1;
240            }
241        }
242
243        // 更新平均执行时间
244        let n = stats.total_executions as f64;
245        stats.avg_execution_time_ms =
246            (stats.avg_execution_time_ms * (n - 1.0) + duration as f64) / n;
247
248        result
249    }
250
251    /// 批量执行多个任务
252    ///
253    /// # 参数
254    ///
255    /// - `inputs`: 输入数据列表
256    ///
257    /// # 返回
258    ///
259    /// 返回输出列表,如果某个任务失败,返回对应错误。
260    pub async fn execute_batch(
261        &mut self,
262        inputs: Vec<AgentInput>,
263    ) -> Vec<AgentResult<AgentOutput>> {
264        let mut results = Vec::with_capacity(inputs.len());
265        for input in inputs {
266            results.push(self.execute(input).await);
267        }
268        results
269    }
270
271    /// 暂停运行器
272    ///
273    /// 仅支持实现了 `AgentLifecycle` 的 Agent。
274    pub async fn pause(&mut self) -> AgentResult<()>
275    where
276        T: AgentLifecycle,
277    {
278        *self.state.write().await = RunnerState::Stopping;
279
280        self.agent
281            .pause()
282            .await
283            .map_err(|e| AgentError::Other(format!("Pause failed: {}", e)))?;
284
285        *self.state.write().await = RunnerState::Paused;
286        Ok(())
287    }
288
289    /// 恢复运行器
290    ///
291    /// 仅支持实现了 `AgentLifecycle` 的 Agent。
292    pub async fn resume(&mut self) -> AgentResult<()>
293    where
294        T: AgentLifecycle,
295    {
296        *self.state.write().await = RunnerState::Running;
297
298        self.agent
299            .resume()
300            .await
301            .map_err(|e| AgentError::Other(format!("Resume failed: {}", e)))?;
302
303        Ok(())
304    }
305
306    /// 关闭运行器
307    ///
308    /// 优雅关闭,释放资源。
309    pub async fn shutdown(mut self) -> AgentResult<()> {
310        *self.state.write().await = RunnerState::Stopping;
311
312        self.agent
313            .shutdown()
314            .await
315            .map_err(|e| AgentError::ShutdownFailed(e.to_string()))?;
316
317        *self.state.write().await = RunnerState::Stopped;
318        Ok(())
319    }
320
321    /// 中断当前执行
322    pub async fn interrupt(&mut self) -> AgentResult<InterruptResult> {
323        self.agent
324            .interrupt()
325            .await
326            .map_err(|e| AgentError::Other(format!("Interrupt failed: {}", e)))
327    }
328
329    /// 消耗运行器,返回内部 Agent
330    pub fn into_inner(self) -> T {
331        self.agent
332    }
333
334    /// 获取 Agent ID
335    pub fn id(&self) -> &str {
336        self.agent.id()
337    }
338
339    /// 获取 Agent 名称
340    pub fn name(&self) -> &str {
341        self.agent.name()
342    }
343
344    /// 获取 Agent 能力
345    pub fn capabilities(&self) -> &AgentCapabilities {
346        self.agent.capabilities()
347    }
348
349    /// 获取 Agent 状态
350    pub fn agent_state(&self) -> AgentState {
351        self.agent.state()
352    }
353}
354
355/// 为支持消息处理的 Agent 提供的扩展方法
356impl<T: MoFAAgent + AgentMessaging> AgentRunner<T> {
357    /// 处理单个事件
358    pub async fn handle_event(&mut self, event: AgentEvent) -> AgentResult<()> {
359        self.agent.handle_event(event).await
360    }
361
362    /// 发送消息给 Agent
363    pub async fn send_message(&mut self, msg: AgentMessage) -> AgentResult<AgentMessage> {
364        self.agent.handle_message(msg).await
365    }
366}
367
368// ============================================================================
369// 构建器模式
370// ============================================================================
371
372/// AgentRunner 构建器
373pub struct AgentRunnerBuilder<T: MoFAAgent> {
374    agent: Option<T>,
375    context: Option<AgentContext>,
376}
377
378impl<T: MoFAAgent> AgentRunnerBuilder<T> {
379    /// 创建新的构建器
380    pub fn new() -> Self {
381        Self {
382            agent: None,
383            context: None,
384        }
385    }
386
387    /// 设置 Agent
388    pub fn with_agent(mut self, agent: T) -> Self {
389        self.agent = Some(agent);
390        self
391    }
392
393    /// 设置上下文
394    pub fn with_context(mut self, context: AgentContext) -> Self {
395        self.context = Some(context);
396        self
397    }
398
399    /// 构建运行器
400    pub async fn build(self) -> AgentResult<AgentRunner<T>> {
401        let agent = self
402            .agent
403            .ok_or_else(|| AgentError::ValidationFailed("Agent not set".to_string()))?;
404
405        if let Some(context) = self.context {
406            AgentRunner::with_context(agent, context).await
407        } else {
408            AgentRunner::new(agent).await
409        }
410    }
411}
412
413impl<T: MoFAAgent> Default for AgentRunnerBuilder<T> {
414    fn default() -> Self {
415        Self::new()
416    }
417}
418
419// ============================================================================
420// 便捷函数
421// ============================================================================
422
423/// 创建并运行 Agent(多次执行)
424pub async fn run_agents<T: MoFAAgent>(
425    agent: T,
426    inputs: Vec<AgentInput>,
427) -> AgentResult<Vec<AgentOutput>> {
428    let mut runner = AgentRunner::new(agent).await?;
429    let results = runner.execute_batch(inputs).await;
430    runner.shutdown().await?;
431    results.into_iter().collect()
432}
433
434// ============================================================================
435// 测试
436// ============================================================================
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use crate::agent::capabilities::AgentCapabilitiesBuilder;
442
443    struct TestAgent {
444        id: String,
445        name: String,
446        state: AgentState,
447    }
448
449    impl TestAgent {
450        fn new(id: &str, name: &str) -> Self {
451            Self {
452                id: id.to_string(),
453                name: name.to_string(),
454                state: AgentState::Created,
455            }
456        }
457    }
458
459    #[async_trait::async_trait]
460    impl MoFAAgent for TestAgent {
461        fn id(&self) -> &str {
462            &self.id
463        }
464
465        fn name(&self) -> &str {
466            &self.name
467        }
468
469        fn capabilities(&self) -> &AgentCapabilities {
470            static CAPS: std::sync::OnceLock<AgentCapabilities> = std::sync::OnceLock::new();
471            CAPS.get_or_init(|| AgentCapabilitiesBuilder::new().build())
472        }
473
474        async fn initialize(&mut self, _ctx: &AgentContext) -> AgentResult<()> {
475            self.state = AgentState::Ready;
476            Ok(())
477        }
478
479        async fn execute(
480            &mut self,
481            input: AgentInput,
482            _ctx: &AgentContext,
483        ) -> AgentResult<AgentOutput> {
484            self.state = AgentState::Executing;
485            let text = input.to_text();
486            Ok(AgentOutput::text(format!("Echo: {}", text)))
487        }
488
489        async fn shutdown(&mut self) -> AgentResult<()> {
490            self.state = AgentState::Shutdown;
491            Ok(())
492        }
493
494        fn state(&self) -> AgentState {
495            self.state.clone()
496        }
497    }
498
499    #[tokio::test]
500    async fn test_agent_runner_new() {
501        let agent = TestAgent::new("test-001", "Test Agent");
502        let runner = AgentRunner::new(agent).await.unwrap();
503
504        assert_eq!(runner.id(), "test-001");
505        assert_eq!(runner.name(), "Test Agent");
506        // 初始化后状态是 Created(因为 initialize 已经完成)
507        assert_eq!(runner.state().await, RunnerState::Created);
508    }
509
510    #[tokio::test]
511    async fn test_agent_runner_execute() {
512        let agent = TestAgent::new("test-002", "Test Agent");
513        let mut runner = AgentRunner::new(agent).await.unwrap();
514
515        let input = AgentInput::text("Hello");
516        let output = runner.execute(input).await.unwrap();
517
518        assert_eq!(output.to_text(), "Echo: Hello");
519
520        let stats = runner.stats().await;
521        assert_eq!(stats.total_executions, 1);
522        assert_eq!(stats.successful_executions, 1);
523    }
524
525    #[tokio::test]
526    async fn test_run_agents_function() {
527        let agent = TestAgent::new("test-003", "Test Agent");
528        let inputs = vec![AgentInput::text("Test")];
529        let outputs = run_agents(agent, inputs).await.unwrap();
530
531        assert_eq!(outputs[0].to_text(), "Echo: Test");
532    }
533}