flowbuilder_yaml/
executor.rs

1//! # FlowBuilder YAML - 统一的流程执行器
2//!
3//! 实现新的分层架构:配置解析器 → 流程编排器 → 任务执行器
4
5use crate::config::WorkflowConfig;
6use crate::config_parser::YamlConfigParser;
7use crate::expression::ExpressionEvaluator;
8use anyhow::{Context, Result};
9use flowbuilder_context::SharedContext;
10use flowbuilder_core::{ExecutionPlan, Executor, ExecutorStatus};
11use flowbuilder_runtime::{
12    EnhancedFlowOrchestrator, EnhancedTaskExecutor, ExecutionComplexity,
13    ExecutionResult, ExecutionStats, ExecutorConfig,
14};
15
16#[cfg(test)]
17use std::sync::Arc;
18
19/// 统一的动态流程执行器
20pub struct DynamicFlowExecutor {
21    /// 原始配置
22    config: WorkflowConfig,
23    /// 配置解析器
24    parser: YamlConfigParser,
25    /// 流程编排器
26    orchestrator: EnhancedFlowOrchestrator,
27    /// 任务执行器
28    executor: EnhancedTaskExecutor,
29    /// 表达式评估器
30    evaluator: ExpressionEvaluator,
31}
32
33impl DynamicFlowExecutor {
34    /// 创建新的动态流程执行器
35    pub fn new(config: WorkflowConfig) -> Result<Self> {
36        // 创建配置解析器
37        let parser = YamlConfigParser::new(config.clone());
38
39        // 验证配置
40        parser.validate().context("配置验证失败")?;
41
42        // 创建流程编排器
43        let orchestrator = EnhancedFlowOrchestrator::new();
44
45        // 创建任务执行器
46        let executor = EnhancedTaskExecutor::new();
47
48        // 创建表达式评估器
49        let mut evaluator = ExpressionEvaluator::new();
50        evaluator.set_env_vars(config.workflow.env.clone());
51        evaluator.set_flow_vars(config.workflow.vars.clone());
52
53        Ok(Self {
54            config,
55            parser,
56            orchestrator,
57            executor,
58            evaluator,
59        })
60    }
61
62    /// 使用自定义执行器配置创建
63    pub fn with_executor_config(
64        config: WorkflowConfig,
65        executor_config: ExecutorConfig,
66    ) -> Result<Self> {
67        let parser = YamlConfigParser::new(config.clone());
68        parser.validate().context("配置验证失败")?;
69
70        let orchestrator = EnhancedFlowOrchestrator::new();
71        let executor = EnhancedTaskExecutor::with_config(executor_config);
72
73        let mut evaluator = ExpressionEvaluator::new();
74        evaluator.set_env_vars(config.workflow.env.clone());
75        evaluator.set_flow_vars(config.workflow.vars.clone());
76
77        Ok(Self {
78            config,
79            parser,
80            orchestrator,
81            executor,
82            evaluator,
83        })
84    }
85
86    /// 执行工作流 - 新的分层架构实现
87    pub async fn execute(
88        &mut self,
89        context: SharedContext,
90    ) -> Result<ExecutionResult> {
91        println!("开始执行工作流,使用新的分层架构");
92
93        // 第1步:解析配置,生成执行节点
94        let parse_result = self.parser.parse_full().context("配置解析失败")?;
95
96        println!("配置解析完成:");
97        println!("  工作流名称: {}", parse_result.workflow_name);
98        println!("  工作流版本: {}", parse_result.workflow_version);
99        println!("  节点数量: {}", parse_result.nodes.len());
100
101        // 第2步:流程编排,生成执行计划
102        let env_vars = parse_result
103            .env_vars
104            .into_iter()
105            .map(|(k, v)| (k, serde_yaml::Value::String(v)))
106            .collect();
107
108        let execution_plan = self
109            .orchestrator
110            .create_execution_plan(
111                parse_result.nodes,
112                env_vars,
113                parse_result.flow_vars,
114                parse_result.workflow_name,
115                parse_result.workflow_version,
116            )
117            .context("执行计划创建失败")?;
118
119        println!("执行计划生成完成:");
120        println!("  总阶段数: {}", execution_plan.phases.len());
121        println!("  总节点数: {}", execution_plan.metadata.total_nodes);
122        println!("  预计耗时: {:?}", execution_plan.estimated_duration());
123
124        // 第3步:分析执行复杂度
125        let complexity = self.orchestrator.analyze_complexity(&execution_plan);
126        println!("执行复杂度分析:");
127        println!("  复杂度分数: {:.2}", complexity.complexity_score);
128        println!("  最大并行度: {}", complexity.max_parallel_nodes);
129        println!("  条件节点数: {}", complexity.conditional_nodes);
130
131        // 第4步:执行任务
132        let result = self
133            .executor
134            .execute_plan(execution_plan, context)
135            .await
136            .context("任务执行失败")?;
137
138        println!("工作流执行完成:");
139        println!(
140            "  执行结果: {}",
141            if result.success { "成功" } else { "失败" }
142        );
143        println!("  总耗时: {:?}", result.total_duration);
144        println!("  阶段数: {}", result.phase_results.len());
145
146        // 打印执行统计
147        let stats = self.executor.get_stats();
148        println!("执行统计:");
149        println!("  总任务数: {}", stats.total_tasks);
150        println!("  成功任务数: {}", stats.successful_tasks);
151        println!("  失败任务数: {}", stats.failed_tasks);
152        println!("  平均执行时间: {:?}", stats.average_execution_time);
153
154        Ok(result)
155    }
156
157    /// 获取执行计划预览(不执行)
158    pub fn get_execution_plan_preview(&self) -> Result<ExecutionPlan> {
159        let parse_result = self.parser.parse_full().context("配置解析失败")?;
160
161        let env_vars = parse_result
162            .env_vars
163            .into_iter()
164            .map(|(k, v)| (k, serde_yaml::Value::String(v)))
165            .collect();
166
167        self.orchestrator.create_execution_plan(
168            parse_result.nodes,
169            env_vars,
170            parse_result.flow_vars,
171            parse_result.workflow_name,
172            parse_result.workflow_version,
173        )
174    }
175
176    /// 分析工作流复杂度
177    pub fn analyze_workflow_complexity(&self) -> Result<ExecutionComplexity> {
178        let execution_plan = self.get_execution_plan_preview()?;
179        Ok(self.orchestrator.analyze_complexity(&execution_plan))
180    }
181
182    /// 验证工作流配置
183    pub fn validate_workflow(&self) -> Result<()> {
184        // 验证配置
185        self.parser.validate()?;
186
187        // 验证执行计划
188        let execution_plan = self.get_execution_plan_preview()?;
189        execution_plan
190            .validate()
191            .map_err(|e| anyhow::anyhow!("执行计划验证失败: {}", e))?;
192
193        Ok(())
194    }
195
196    /// 获取执行统计信息
197    pub fn get_stats(&self) -> &ExecutionStats {
198        self.executor.get_stats()
199    }
200
201    /// 获取工作流信息
202    pub fn get_workflow_info(&self) -> WorkflowInfo {
203        WorkflowInfo {
204            name: self.parser.get_workflow_name(),
205            version: self.parser.get_workflow_version(),
206            task_count: self.config.workflow.tasks.len(),
207            env_var_count: self.config.workflow.env.len(),
208            flow_var_count: self.config.workflow.vars.len(),
209        }
210    }
211
212    /// 获取表达式评估器
213    pub fn evaluator(&self) -> &ExpressionEvaluator {
214        &self.evaluator
215    }
216
217    /// 获取工作流配置
218    pub fn config(&self) -> &WorkflowConfig {
219        &self.config
220    }
221
222    /// 获取执行器状态
223    pub fn executor_status(&self) -> ExecutorStatus {
224        self.executor.status()
225    }
226
227    /// 停止执行器
228    pub async fn stop(&mut self) -> Result<()> {
229        self.executor.stop().await
230    }
231}
232
233/// 工作流信息
234#[derive(Debug, Clone)]
235pub struct WorkflowInfo {
236    /// 工作流名称
237    pub name: String,
238    /// 工作流版本
239    pub version: String,
240    /// 任务数量
241    pub task_count: usize,
242    /// 环境变量数量
243    pub env_var_count: usize,
244    /// 流程变量数量
245    pub flow_var_count: usize,
246}
247
248impl Executor for DynamicFlowExecutor {
249    type Input = SharedContext;
250    type Output = ExecutionResult;
251    type Error = anyhow::Error;
252
253    async fn execute(
254        &mut self,
255        input: Self::Input,
256    ) -> Result<Self::Output, Self::Error> {
257        self.execute(input).await
258    }
259
260    fn status(&self) -> ExecutorStatus {
261        self.executor_status()
262    }
263
264    async fn stop(&mut self) -> Result<(), Self::Error> {
265        self.stop().await
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::loader::WorkflowLoader;
273    use flowbuilder_context::FlowContext;
274
275    #[tokio::test]
276    async fn test_new_architecture_execution() {
277        let yaml_content = r#"
278workflow:
279  version: "1.0"
280  env:
281    FLOWBUILDER_ENV: "test"
282  vars:
283    name: "New Architecture Test"
284    description: "Testing new layered architecture"
285  tasks:
286    - task:
287        id: "task1"
288        name: "Test Task 1"
289        description: "First test task"
290        actions:
291          - action:
292              id: "action1"
293              name: "Test Action 1"
294              description: "First test action"
295              type: "builtin"
296              flow:
297                next: null
298              outputs:
299                result: "success"
300              parameters: {}
301    - task:
302        id: "task2"
303        name: "Test Task 2"
304        description: "Second test task"
305        actions:
306          - action:
307              id: "action2"
308              name: "Test Action 2"
309              description: "Second test action"
310              type: "cmd"
311              flow:
312                next: null
313              outputs:
314                status: "completed"
315              parameters: {}
316"#;
317
318        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
319        let mut executor = DynamicFlowExecutor::new(config).unwrap();
320
321        let context = Arc::new(tokio::sync::Mutex::new(FlowContext::default()));
322        let result = executor.execute(context).await;
323
324        assert!(result.is_ok());
325        let execution_result = result.unwrap();
326        assert!(execution_result.success);
327        assert_eq!(execution_result.phase_results.len(), 1); // 两个任务应该在同一阶段
328    }
329
330    #[tokio::test]
331    async fn test_execution_plan_preview() {
332        let yaml_content = r#"
333workflow:
334  version: "1.0"
335  env:
336    TEST_ENV: "preview"
337  vars:
338    name: "Preview Test"
339  tasks:
340    - task:
341        id: "preview_task"
342        name: "Preview Task"
343        description: "Task for preview testing"
344        actions:
345          - action:
346              id: "preview_action"
347              name: "Preview Action"
348              description: "Action for preview"
349              type: "builtin"
350              flow:
351                next: null
352              outputs: {}
353              parameters: {}
354"#;
355
356        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
357        let executor = DynamicFlowExecutor::new(config).unwrap();
358
359        let plan = executor.get_execution_plan_preview().unwrap();
360        assert_eq!(plan.phases.len(), 1);
361        assert_eq!(plan.metadata.total_nodes, 1);
362        assert_eq!(plan.metadata.workflow_name, "Preview Test");
363    }
364
365    #[tokio::test]
366    async fn test_workflow_complexity_analysis() {
367        let yaml_content = r#"
368workflow:
369  version: "1.0"
370  env:
371    TEST_ENV: "complexity"
372  vars:
373    name: "Complexity Test"
374  tasks:
375    - task:
376        id: "complex_task"
377        name: "Complex Task"
378        description: "A complex task for analysis"
379        actions:
380          - action:
381              id: "complex_action"
382              name: "Complex Action"
383              description: "A complex action"
384              type: "builtin"
385              flow:
386                next_if: "true"
387                next: null
388              outputs: {}
389              parameters: {}
390"#;
391
392        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
393        let executor = DynamicFlowExecutor::new(config).unwrap();
394
395        let complexity = executor.analyze_workflow_complexity().unwrap();
396        assert_eq!(complexity.total_nodes, 1);
397        assert_eq!(complexity.conditional_nodes, 1);
398        assert!(complexity.complexity_score > 0.0);
399    }
400
401    #[test]
402    fn test_workflow_info() {
403        let yaml_content = r#"
404workflow:
405  version: "2.0"
406  env:
407    ENV1: "value1"
408    ENV2: "value2"
409  vars:
410    name: "Info Test Workflow"
411    var1: "value1"
412  tasks:
413    - task:
414        id: "info_task"
415        name: "Info Task"
416        description: "Task for info testing"
417        actions:
418          - action:
419              id: "info_action"
420              name: "Info Action"
421              description: "Action for info"
422              type: "builtin"
423              flow:
424                next: null
425              outputs: {}
426              parameters: {}
427"#;
428
429        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
430        let executor = DynamicFlowExecutor::new(config).unwrap();
431
432        let info = executor.get_workflow_info();
433        assert_eq!(info.name, "Info Test Workflow");
434        assert_eq!(info.version, "2.0");
435        assert_eq!(info.task_count, 1);
436        assert_eq!(info.env_var_count, 2);
437        assert_eq!(info.flow_var_count, 2);
438    }
439}