1use crate::config::WorkflowConfig;
6use crate::config_parser::YamlConfigParser;
7use crate::expression::ExpressionEvaluator;
8use anyhow::{Context, Result};
9use flowbuilder_context::SharedContext;
10use flowbuilder_core::{ExecutionPlan, Executor, ExecutorStatus};
11use flowbuilder_runtime::{
12 EnhancedFlowOrchestrator, EnhancedTaskExecutor, ExecutionComplexity,
13 ExecutionResult, ExecutionStats, ExecutorConfig,
14};
15
16#[cfg(test)]
17use std::sync::Arc;
18
19pub struct DynamicFlowExecutor {
21 config: WorkflowConfig,
23 parser: YamlConfigParser,
25 orchestrator: EnhancedFlowOrchestrator,
27 executor: EnhancedTaskExecutor,
29 evaluator: ExpressionEvaluator,
31}
32
33impl DynamicFlowExecutor {
34 pub fn new(config: WorkflowConfig) -> Result<Self> {
36 let parser = YamlConfigParser::new(config.clone());
38
39 parser.validate().context("配置验证失败")?;
41
42 let orchestrator = EnhancedFlowOrchestrator::new();
44
45 let executor = EnhancedTaskExecutor::new();
47
48 let mut evaluator = ExpressionEvaluator::new();
50 evaluator.set_env_vars(config.workflow.env.clone());
51 evaluator.set_flow_vars(config.workflow.vars.clone());
52
53 Ok(Self {
54 config,
55 parser,
56 orchestrator,
57 executor,
58 evaluator,
59 })
60 }
61
62 pub fn with_executor_config(
64 config: WorkflowConfig,
65 executor_config: ExecutorConfig,
66 ) -> Result<Self> {
67 let parser = YamlConfigParser::new(config.clone());
68 parser.validate().context("配置验证失败")?;
69
70 let orchestrator = EnhancedFlowOrchestrator::new();
71 let executor = EnhancedTaskExecutor::with_config(executor_config);
72
73 let mut evaluator = ExpressionEvaluator::new();
74 evaluator.set_env_vars(config.workflow.env.clone());
75 evaluator.set_flow_vars(config.workflow.vars.clone());
76
77 Ok(Self {
78 config,
79 parser,
80 orchestrator,
81 executor,
82 evaluator,
83 })
84 }
85
86 pub async fn execute(
88 &mut self,
89 context: SharedContext,
90 ) -> Result<ExecutionResult> {
91 println!("开始执行工作流,使用新的分层架构");
92
93 let parse_result = self.parser.parse_full().context("配置解析失败")?;
95
96 println!("配置解析完成:");
97 println!(" 工作流名称: {}", parse_result.workflow_name);
98 println!(" 工作流版本: {}", parse_result.workflow_version);
99 println!(" 节点数量: {}", parse_result.nodes.len());
100
101 let env_vars = parse_result
103 .env_vars
104 .into_iter()
105 .map(|(k, v)| (k, serde_yaml::Value::String(v)))
106 .collect();
107
108 let execution_plan = self
109 .orchestrator
110 .create_execution_plan(
111 parse_result.nodes,
112 env_vars,
113 parse_result.flow_vars,
114 parse_result.workflow_name,
115 parse_result.workflow_version,
116 )
117 .context("执行计划创建失败")?;
118
119 println!("执行计划生成完成:");
120 println!(" 总阶段数: {}", execution_plan.phases.len());
121 println!(" 总节点数: {}", execution_plan.metadata.total_nodes);
122 println!(" 预计耗时: {:?}", execution_plan.estimated_duration());
123
124 let complexity = self.orchestrator.analyze_complexity(&execution_plan);
126 println!("执行复杂度分析:");
127 println!(" 复杂度分数: {:.2}", complexity.complexity_score);
128 println!(" 最大并行度: {}", complexity.max_parallel_nodes);
129 println!(" 条件节点数: {}", complexity.conditional_nodes);
130
131 let result = self
133 .executor
134 .execute_plan(execution_plan, context)
135 .await
136 .context("任务执行失败")?;
137
138 println!("工作流执行完成:");
139 println!(
140 " 执行结果: {}",
141 if result.success { "成功" } else { "失败" }
142 );
143 println!(" 总耗时: {:?}", result.total_duration);
144 println!(" 阶段数: {}", result.phase_results.len());
145
146 let stats = self.executor.get_stats();
148 println!("执行统计:");
149 println!(" 总任务数: {}", stats.total_tasks);
150 println!(" 成功任务数: {}", stats.successful_tasks);
151 println!(" 失败任务数: {}", stats.failed_tasks);
152 println!(" 平均执行时间: {:?}", stats.average_execution_time);
153
154 Ok(result)
155 }
156
157 pub fn get_execution_plan_preview(&self) -> Result<ExecutionPlan> {
159 let parse_result = self.parser.parse_full().context("配置解析失败")?;
160
161 let env_vars = parse_result
162 .env_vars
163 .into_iter()
164 .map(|(k, v)| (k, serde_yaml::Value::String(v)))
165 .collect();
166
167 self.orchestrator.create_execution_plan(
168 parse_result.nodes,
169 env_vars,
170 parse_result.flow_vars,
171 parse_result.workflow_name,
172 parse_result.workflow_version,
173 )
174 }
175
176 pub fn analyze_workflow_complexity(&self) -> Result<ExecutionComplexity> {
178 let execution_plan = self.get_execution_plan_preview()?;
179 Ok(self.orchestrator.analyze_complexity(&execution_plan))
180 }
181
182 pub fn validate_workflow(&self) -> Result<()> {
184 self.parser.validate()?;
186
187 let execution_plan = self.get_execution_plan_preview()?;
189 execution_plan
190 .validate()
191 .map_err(|e| anyhow::anyhow!("执行计划验证失败: {}", e))?;
192
193 Ok(())
194 }
195
196 pub fn get_stats(&self) -> &ExecutionStats {
198 self.executor.get_stats()
199 }
200
201 pub fn get_workflow_info(&self) -> WorkflowInfo {
203 WorkflowInfo {
204 name: self.parser.get_workflow_name(),
205 version: self.parser.get_workflow_version(),
206 task_count: self.config.workflow.tasks.len(),
207 env_var_count: self.config.workflow.env.len(),
208 flow_var_count: self.config.workflow.vars.len(),
209 }
210 }
211
212 pub fn evaluator(&self) -> &ExpressionEvaluator {
214 &self.evaluator
215 }
216
217 pub fn config(&self) -> &WorkflowConfig {
219 &self.config
220 }
221
222 pub fn executor_status(&self) -> ExecutorStatus {
224 self.executor.status()
225 }
226
227 pub async fn stop(&mut self) -> Result<()> {
229 self.executor.stop().await
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct WorkflowInfo {
236 pub name: String,
238 pub version: String,
240 pub task_count: usize,
242 pub env_var_count: usize,
244 pub flow_var_count: usize,
246}
247
248impl Executor for DynamicFlowExecutor {
249 type Input = SharedContext;
250 type Output = ExecutionResult;
251 type Error = anyhow::Error;
252
253 async fn execute(
254 &mut self,
255 input: Self::Input,
256 ) -> Result<Self::Output, Self::Error> {
257 self.execute(input).await
258 }
259
260 fn status(&self) -> ExecutorStatus {
261 self.executor_status()
262 }
263
264 async fn stop(&mut self) -> Result<(), Self::Error> {
265 self.stop().await
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::loader::WorkflowLoader;
273 use flowbuilder_context::FlowContext;
274
275 #[tokio::test]
276 async fn test_new_architecture_execution() {
277 let yaml_content = r#"
278workflow:
279 version: "1.0"
280 env:
281 FLOWBUILDER_ENV: "test"
282 vars:
283 name: "New Architecture Test"
284 description: "Testing new layered architecture"
285 tasks:
286 - task:
287 id: "task1"
288 name: "Test Task 1"
289 description: "First test task"
290 actions:
291 - action:
292 id: "action1"
293 name: "Test Action 1"
294 description: "First test action"
295 type: "builtin"
296 flow:
297 next: null
298 outputs:
299 result: "success"
300 parameters: {}
301 - task:
302 id: "task2"
303 name: "Test Task 2"
304 description: "Second test task"
305 actions:
306 - action:
307 id: "action2"
308 name: "Test Action 2"
309 description: "Second test action"
310 type: "cmd"
311 flow:
312 next: null
313 outputs:
314 status: "completed"
315 parameters: {}
316"#;
317
318 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
319 let mut executor = DynamicFlowExecutor::new(config).unwrap();
320
321 let context = Arc::new(tokio::sync::Mutex::new(FlowContext::default()));
322 let result = executor.execute(context).await;
323
324 assert!(result.is_ok());
325 let execution_result = result.unwrap();
326 assert!(execution_result.success);
327 assert_eq!(execution_result.phase_results.len(), 1); }
329
330 #[tokio::test]
331 async fn test_execution_plan_preview() {
332 let yaml_content = r#"
333workflow:
334 version: "1.0"
335 env:
336 TEST_ENV: "preview"
337 vars:
338 name: "Preview Test"
339 tasks:
340 - task:
341 id: "preview_task"
342 name: "Preview Task"
343 description: "Task for preview testing"
344 actions:
345 - action:
346 id: "preview_action"
347 name: "Preview Action"
348 description: "Action for preview"
349 type: "builtin"
350 flow:
351 next: null
352 outputs: {}
353 parameters: {}
354"#;
355
356 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
357 let executor = DynamicFlowExecutor::new(config).unwrap();
358
359 let plan = executor.get_execution_plan_preview().unwrap();
360 assert_eq!(plan.phases.len(), 1);
361 assert_eq!(plan.metadata.total_nodes, 1);
362 assert_eq!(plan.metadata.workflow_name, "Preview Test");
363 }
364
365 #[tokio::test]
366 async fn test_workflow_complexity_analysis() {
367 let yaml_content = r#"
368workflow:
369 version: "1.0"
370 env:
371 TEST_ENV: "complexity"
372 vars:
373 name: "Complexity Test"
374 tasks:
375 - task:
376 id: "complex_task"
377 name: "Complex Task"
378 description: "A complex task for analysis"
379 actions:
380 - action:
381 id: "complex_action"
382 name: "Complex Action"
383 description: "A complex action"
384 type: "builtin"
385 flow:
386 next_if: "true"
387 next: null
388 outputs: {}
389 parameters: {}
390"#;
391
392 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
393 let executor = DynamicFlowExecutor::new(config).unwrap();
394
395 let complexity = executor.analyze_workflow_complexity().unwrap();
396 assert_eq!(complexity.total_nodes, 1);
397 assert_eq!(complexity.conditional_nodes, 1);
398 assert!(complexity.complexity_score > 0.0);
399 }
400
401 #[test]
402 fn test_workflow_info() {
403 let yaml_content = r#"
404workflow:
405 version: "2.0"
406 env:
407 ENV1: "value1"
408 ENV2: "value2"
409 vars:
410 name: "Info Test Workflow"
411 var1: "value1"
412 tasks:
413 - task:
414 id: "info_task"
415 name: "Info Task"
416 description: "Task for info testing"
417 actions:
418 - action:
419 id: "info_action"
420 name: "Info Action"
421 description: "Action for info"
422 type: "builtin"
423 flow:
424 next: null
425 outputs: {}
426 parameters: {}
427"#;
428
429 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
430 let executor = DynamicFlowExecutor::new(config).unwrap();
431
432 let info = executor.get_workflow_info();
433 assert_eq!(info.name, "Info Test Workflow");
434 assert_eq!(info.version, "2.0");
435 assert_eq!(info.task_count, 1);
436 assert_eq!(info.env_var_count, 2);
437 assert_eq!(info.flow_var_count, 2);
438 }
439}