use crate::config::{
ActionDefinition, ActionType, FlowControl, WorkflowConfig,
};
use crate::config_parser::YamlConfigParser;
use crate::expression::ExpressionEvaluator;
use anyhow::{Context, Result};
use flowbuilder_context::SharedContext;
use flowbuilder_core::{ExecutionPlan, Executor, ExecutorStatus};
use flowbuilder_runtime::{
EnhancedFlowOrchestrator, EnhancedTaskExecutor, ExecutionComplexity,
ExecutionResult, ExecutorConfig,
};
use std::sync::Arc;
use std::time::Duration;
pub struct DynamicFlowExecutor {
config: WorkflowConfig,
parser: YamlConfigParser,
orchestrator: EnhancedFlowOrchestrator,
executor: EnhancedTaskExecutor,
evaluator: ExpressionEvaluator,
}
impl DynamicFlowExecutor {
pub fn new(config: WorkflowConfig) -> Result<Self> {
let parser = YamlConfigParser::new(config.clone());
parser.validate().context("配置验证失败")?;
let orchestrator = EnhancedFlowOrchestrator::new();
let executor = EnhancedTaskExecutor::new();
let mut evaluator = ExpressionEvaluator::new();
evaluator.set_env_vars(config.workflow.env.clone());
evaluator.set_flow_vars(config.workflow.vars.clone());
Ok(Self {
config,
parser,
orchestrator,
executor,
evaluator,
})
}
pub fn with_executor_config(
config: WorkflowConfig,
executor_config: ExecutorConfig,
) -> Result<Self> {
let parser = YamlConfigParser::new(config.clone());
parser.validate().context("配置验证失败")?;
let orchestrator = EnhancedFlowOrchestrator::new();
let executor = EnhancedTaskExecutor::with_config(executor_config);
let mut evaluator = ExpressionEvaluator::new();
evaluator.set_env_vars(config.workflow.env.clone());
evaluator.set_flow_vars(config.workflow.vars.clone());
Ok(Self {
config,
parser,
orchestrator,
executor,
evaluator,
})
}
pub async fn execute(
&mut self,
context: SharedContext,
) -> Result<ExecutionResult> {
println!("开始执行工作流,使用新的分层架构");
let parse_result = self.parser.parse_full().context("配置解析失败")?;
println!("配置解析完成:");
println!(" 工作流名称: {}", parse_result.workflow_name);
println!(" 工作流版本: {}", parse_result.workflow_version);
println!(" 节点数量: {}", parse_result.nodes.len());
let execution_plan = self
.orchestrator
.create_execution_plan(
parse_result.nodes,
parse_result.env_vars,
parse_result.flow_vars,
parse_result.workflow_name,
parse_result.workflow_version,
)
.context("执行计划创建失败")?;
println!("执行计划生成完成:");
println!(" 总阶段数: {}", execution_plan.phases.len());
println!(" 总节点数: {}", execution_plan.metadata.total_nodes);
println!(" 预计耗时: {:?}", execution_plan.estimated_duration());
let complexity = self.orchestrator.analyze_complexity(&execution_plan);
println!("执行复杂度分析:");
println!(" 复杂度分数: {:.2}", complexity.complexity_score);
println!(" 最大并行度: {}", complexity.max_parallel_nodes);
println!(" 条件节点数: {}", complexity.conditional_nodes);
let result = self
.executor
.execute_plan(execution_plan, context)
.await
.context("任务执行失败")?;
println!("工作流执行完成:");
println!(
" 执行结果: {}",
if result.success { "成功" } else { "失败" }
);
println!(" 总耗时: {:?}", result.total_duration);
println!(" 阶段数: {}", result.phase_results.len());
let stats = self.executor.get_stats();
println!("执行统计:");
println!(" 总任务数: {}", stats.total_tasks);
println!(" 成功任务数: {}", stats.successful_tasks);
println!(" 失败任务数: {}", stats.failed_tasks);
println!(" 平均执行时间: {:?}", stats.average_execution_time);
Ok(result)
}
pub fn get_execution_plan_preview(&self) -> Result<ExecutionPlan> {
let parse_result = self.parser.parse_full().context("配置解析失败")?;
self.orchestrator.create_execution_plan(
parse_result.nodes,
parse_result.env_vars,
parse_result.flow_vars,
parse_result.workflow_name,
parse_result.workflow_version,
)
}
pub fn analyze_workflow_complexity(&self) -> Result<ExecutionComplexity> {
let execution_plan = self.get_execution_plan_preview()?;
Ok(self.orchestrator.analyze_complexity(&execution_plan))
}
pub fn validate_workflow(&self) -> Result<()> {
self.parser.validate()?;
let execution_plan = self.get_execution_plan_preview()?;
execution_plan
.validate()
.map_err(|e| anyhow::anyhow!("执行计划验证失败: {}", e))?;
Ok(())
}
pub fn get_workflow_info(&self) -> WorkflowInfo {
WorkflowInfo {
name: self.parser.get_workflow_name(),
version: self.parser.get_workflow_version(),
task_count: self.config.workflow.tasks.len(),
env_var_count: self.config.workflow.env.len(),
flow_var_count: self.config.workflow.vars.len(),
}
}
pub fn evaluator(&self) -> &ExpressionEvaluator {
&self.evaluator
}
pub fn config(&self) -> &WorkflowConfig {
&self.config
}
pub fn executor_status(&self) -> ExecutorStatus {
self.executor.status()
}
pub async fn stop(&mut self) -> Result<()> {
self.executor.stop().await
}
}
#[derive(Debug, Clone)]
pub struct WorkflowInfo {
pub name: String,
pub version: String,
pub task_count: usize,
pub env_var_count: usize,
pub flow_var_count: usize,
}
impl Executor for DynamicFlowExecutor {
type Input = SharedContext;
type Output = ExecutionResult;
type Error = anyhow::Error;
async fn execute(
&mut self,
input: Self::Input,
) -> Result<Self::Output, Self::Error> {
self.execute(input).await
}
fn status(&self) -> ExecutorStatus {
self.executor_status()
}
async fn stop(&mut self) -> Result<(), Self::Error> {
self.stop().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::loader::WorkflowLoader;
use flowbuilder_context::FlowContext;
#[tokio::test]
async fn test_new_architecture_execution() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
FLOWBUILDER_ENV: "test"
vars:
name: "New Architecture Test"
description: "Testing new layered architecture"
tasks:
- task:
id: "task1"
name: "Test Task 1"
description: "First test task"
actions:
- action:
id: "action1"
name: "Test Action 1"
description: "First test action"
type: "builtin"
flow:
next: null
outputs:
result: "success"
parameters: {}
- task:
id: "task2"
name: "Test Task 2"
description: "Second test task"
actions:
- action:
id: "action2"
name: "Test Action 2"
description: "Second test action"
type: "cmd"
flow:
next: null
outputs:
status: "completed"
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let mut executor = DynamicFlowExecutor::new(config).unwrap();
let context = Arc::new(tokio::sync::Mutex::new(FlowContext::default()));
let result = executor.execute(context).await;
assert!(result.is_ok());
let execution_result = result.unwrap();
assert!(execution_result.success);
assert_eq!(execution_result.phase_results.len(), 1); }
#[tokio::test]
async fn test_execution_plan_preview() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
TEST_ENV: "preview"
vars:
name: "Preview Test"
tasks:
- task:
id: "preview_task"
name: "Preview Task"
description: "Task for preview testing"
actions:
- action:
id: "preview_action"
name: "Preview Action"
description: "Action for preview"
type: "builtin"
flow:
next: null
outputs: {}
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let executor = DynamicFlowExecutor::new(config).unwrap();
let plan = executor.get_execution_plan_preview().unwrap();
assert_eq!(plan.phases.len(), 1);
assert_eq!(plan.metadata.total_nodes, 1);
assert_eq!(plan.metadata.workflow_name, "Preview Test");
}
#[tokio::test]
async fn test_workflow_complexity_analysis() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
TEST_ENV: "complexity"
vars:
name: "Complexity Test"
tasks:
- task:
id: "complex_task"
name: "Complex Task"
description: "A complex task for analysis"
actions:
- action:
id: "complex_action"
name: "Complex Action"
description: "A complex action"
type: "builtin"
flow:
next_if: "true"
next: null
outputs: {}
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let executor = DynamicFlowExecutor::new(config).unwrap();
let complexity = executor.analyze_workflow_complexity().unwrap();
assert_eq!(complexity.total_nodes, 1);
assert_eq!(complexity.conditional_nodes, 1);
assert!(complexity.complexity_score > 0.0);
}
#[test]
fn test_workflow_info() {
let yaml_content = r#"
workflow:
version: "2.0"
env:
ENV1: "value1"
ENV2: "value2"
vars:
name: "Info Test Workflow"
var1: "value1"
tasks:
- task:
id: "info_task"
name: "Info Task"
description: "Task for info testing"
actions:
- action:
id: "info_action"
name: "Info Action"
description: "Action for info"
type: "builtin"
flow:
next: null
outputs: {}
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let executor = DynamicFlowExecutor::new(config).unwrap();
let info = executor.get_workflow_info();
assert_eq!(info.name, "Info Test Workflow");
assert_eq!(info.version, "2.0");
assert_eq!(info.task_count, 1);
assert_eq!(info.env_var_count, 2);
assert_eq!(info.flow_var_count, 2);
}
}