flowbuilder_runtime/
enhanced_orchestrator.rs

1//! # FlowBuilder Runtime - 增强的流程编排器
2//!
3//! 基于执行计划的流程编排器,负责生成和优化执行计划
4
5use anyhow::Result;
6use flowbuilder_core::{
7    ExecutionNode, ExecutionPhase, ExecutionPlan, FlowPlanner,
8    PhaseExecutionMode,
9};
10use std::collections::HashMap;
11
12/// 增强的流程编排器
13pub struct EnhancedFlowOrchestrator {
14    /// 优化配置
15    config: OrchestratorConfig,
16}
17
18/// 编排器配置
19#[derive(Debug, Clone)]
20pub struct OrchestratorConfig {
21    /// 是否启用并行优化
22    pub enable_parallel_optimization: bool,
23    /// 最大并行度
24    pub max_parallelism: usize,
25    /// 是否启用依赖分析
26    pub enable_dependency_analysis: bool,
27    /// 是否启用条件优化
28    pub enable_condition_optimization: bool,
29}
30
31impl Default for OrchestratorConfig {
32    fn default() -> Self {
33        Self {
34            enable_parallel_optimization: true,
35            max_parallelism: 10,
36            enable_dependency_analysis: true,
37            enable_condition_optimization: true,
38        }
39    }
40}
41
42impl Default for EnhancedFlowOrchestrator {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl EnhancedFlowOrchestrator {
49    /// 创建新的编排器
50    pub fn new() -> Self {
51        Self {
52            config: OrchestratorConfig::default(),
53        }
54    }
55
56    /// 使用配置创建编排器
57    pub fn with_config(config: OrchestratorConfig) -> Self {
58        Self { config }
59    }
60
61    /// 从节点列表创建执行计划
62    pub fn create_execution_plan(
63        &self,
64        nodes: Vec<ExecutionNode>,
65        env_vars: HashMap<String, serde_yaml::Value>,
66        flow_vars: HashMap<String, serde_yaml::Value>,
67        workflow_name: String,
68        workflow_version: String,
69    ) -> Result<ExecutionPlan> {
70        let mut plan = ExecutionPlan::new(
71            workflow_name,
72            workflow_version,
73            env_vars,
74            flow_vars,
75        );
76
77        // 1. 构建依赖图
78        let dependency_graph = self.build_dependency_graph(&nodes)?;
79
80        // 2. 执行拓扑排序
81        let sorted_layers = self.topological_sort(&nodes, &dependency_graph)?;
82
83        // 3. 生成执行阶段
84        let phases = self.create_execution_phases(sorted_layers)?;
85
86        // 4. 添加阶段到计划
87        for phase in phases {
88            plan.add_phase(phase);
89        }
90
91        // 5. 优化计划
92        if self.config.enable_parallel_optimization {
93            self.optimize_for_parallelism(&mut plan)?;
94        }
95
96        // 6. 验证计划
97        plan.validate()
98            .map_err(|e| anyhow::anyhow!("执行计划验证失败: {}", e))?;
99
100        Ok(plan)
101    }
102
103    /// 构建依赖图
104    fn build_dependency_graph(
105        &self,
106        nodes: &[ExecutionNode],
107    ) -> Result<HashMap<String, Vec<String>>> {
108        let mut graph = HashMap::new();
109
110        for node in nodes {
111            graph.insert(node.id.clone(), node.dependencies.clone());
112        }
113
114        // 验证依赖的有效性
115        for (node_id, deps) in &graph {
116            for dep in deps {
117                if !graph.contains_key(dep) {
118                    return Err(anyhow::anyhow!(
119                        "节点 {} 依赖的节点 {} 不存在",
120                        node_id,
121                        dep
122                    ));
123                }
124            }
125        }
126
127        Ok(graph)
128    }
129
130    /// 拓扑排序,生成执行层次
131    fn topological_sort(
132        &self,
133        nodes: &[ExecutionNode],
134        _graph: &HashMap<String, Vec<String>>,
135    ) -> Result<Vec<Vec<ExecutionNode>>> {
136        let mut layers = Vec::new();
137        let mut remaining_nodes: HashMap<String, ExecutionNode> =
138            nodes.iter().map(|n| (n.id.clone(), n.clone())).collect();
139        let mut in_degree = HashMap::new();
140
141        // 计算入度
142        for node in nodes {
143            in_degree.insert(node.id.clone(), node.dependencies.len());
144        }
145
146        // 分层处理
147        while !remaining_nodes.is_empty() {
148            let mut current_layer = Vec::new();
149
150            // 找出当前层可以执行的节点(入度为0)
151            let ready_nodes: Vec<String> = in_degree
152                .iter()
153                .filter(|(_, &degree)| degree == 0)
154                .map(|(id, _)| id.clone())
155                .collect();
156
157            if ready_nodes.is_empty() {
158                return Err(anyhow::anyhow!("检测到循环依赖"));
159            }
160
161            // 添加到当前层
162            for node_id in ready_nodes {
163                if let Some(node) = remaining_nodes.remove(&node_id) {
164                    current_layer.push(node);
165                    in_degree.remove(&node_id);
166                }
167            }
168
169            // 更新剩余节点的入度
170            for node in &current_layer {
171                for other_node in remaining_nodes.values() {
172                    if other_node.dependencies.contains(&node.id) {
173                        if let Some(degree) = in_degree.get_mut(&other_node.id)
174                        {
175                            *degree -= 1;
176                        }
177                    }
178                }
179            }
180
181            layers.push(current_layer);
182        }
183
184        Ok(layers)
185    }
186
187    /// 创建执行阶段
188    fn create_execution_phases(
189        &self,
190        layers: Vec<Vec<ExecutionNode>>,
191    ) -> Result<Vec<ExecutionPhase>> {
192        let mut phases = Vec::new();
193
194        for (index, layer) in layers.into_iter().enumerate() {
195            let execution_mode = if layer.len() == 1 {
196                PhaseExecutionMode::Sequential
197            } else if layer.len() <= self.config.max_parallelism {
198                PhaseExecutionMode::Parallel
199            } else {
200                // 如果节点数超过最大并行度,分批处理
201                PhaseExecutionMode::Parallel
202            };
203
204            let phase = ExecutionPhase {
205                id: format!("phase_{index}"),
206                name: format!("执行阶段 {}", index + 1),
207                execution_mode,
208                nodes: layer,
209                condition: None,
210            };
211
212            phases.push(phase);
213        }
214
215        Ok(phases)
216    }
217
218    /// 优化并行执行
219    fn optimize_for_parallelism(&self, plan: &mut ExecutionPlan) -> Result<()> {
220        if !self.config.enable_parallel_optimization {
221            return Ok(());
222        }
223
224        // 分析每个阶段的并行可能性
225        for phase in &mut plan.phases {
226            if phase.nodes.len() > self.config.max_parallelism {
227                // 如果节点数量超过最大并行度,需要分批
228                let chunks: Vec<Vec<ExecutionNode>> = phase
229                    .nodes
230                    .chunks(self.config.max_parallelism)
231                    .map(|chunk| chunk.to_vec())
232                    .collect();
233
234                // 创建子阶段
235                let mut sub_phases = Vec::new();
236                for (i, chunk) in chunks.into_iter().enumerate() {
237                    let sub_phase = ExecutionPhase {
238                        id: format!("{}_sub_{}", phase.id, i),
239                        name: format!("{} - 子阶段 {}", phase.name, i + 1),
240                        execution_mode: PhaseExecutionMode::Parallel,
241                        nodes: chunk,
242                        condition: phase.condition.clone(),
243                    };
244                    sub_phases.push(sub_phase);
245                }
246
247                // 注意:这里需要重新构建计划结构来支持子阶段
248                // 为了简化,这里只是记录优化信息
249                println!(
250                    "阶段 {} 被优化为 {} 个子阶段",
251                    phase.name,
252                    sub_phases.len()
253                );
254            }
255        }
256
257        Ok(())
258    }
259
260    /// 分析执行计划的复杂度
261    pub fn analyze_complexity(
262        &self,
263        plan: &ExecutionPlan,
264    ) -> ExecutionComplexity {
265        let mut total_nodes = 0;
266        let mut max_parallel_nodes = 0;
267        let mut total_dependencies = 0;
268        let mut conditional_nodes = 0;
269
270        for phase in &plan.phases {
271            total_nodes += phase.nodes.len();
272
273            if matches!(phase.execution_mode, PhaseExecutionMode::Parallel) {
274                max_parallel_nodes = max_parallel_nodes.max(phase.nodes.len());
275            }
276
277            for node in &phase.nodes {
278                total_dependencies += node.dependencies.len();
279                if node.condition.is_some() {
280                    conditional_nodes += 1;
281                }
282            }
283        }
284
285        ExecutionComplexity {
286            total_nodes,
287            total_phases: plan.phases.len(),
288            max_parallel_nodes,
289            total_dependencies,
290            conditional_nodes,
291            complexity_score: self.calculate_complexity_score(
292                total_nodes,
293                total_dependencies,
294                conditional_nodes,
295                max_parallel_nodes,
296            ),
297        }
298    }
299
300    /// 计算复杂度分数
301    fn calculate_complexity_score(
302        &self,
303        total_nodes: usize,
304        total_dependencies: usize,
305        conditional_nodes: usize,
306        max_parallel_nodes: usize,
307    ) -> f64 {
308        let base_score = total_nodes as f64;
309        let dependency_penalty = (total_dependencies as f64) * 0.5;
310        let condition_penalty = (conditional_nodes as f64) * 0.3;
311        let parallel_bonus = (max_parallel_nodes as f64) * 0.2;
312
313        base_score + dependency_penalty + condition_penalty - parallel_bonus
314    }
315}
316
317/// 执行复杂度分析结果
318#[derive(Debug, Clone)]
319pub struct ExecutionComplexity {
320    /// 总节点数
321    pub total_nodes: usize,
322    /// 总阶段数
323    pub total_phases: usize,
324    /// 最大并行节点数
325    pub max_parallel_nodes: usize,
326    /// 总依赖关系数
327    pub total_dependencies: usize,
328    /// 条件节点数
329    pub conditional_nodes: usize,
330    /// 复杂度分数
331    pub complexity_score: f64,
332}
333
334impl FlowPlanner for EnhancedFlowOrchestrator {
335    type Input = (
336        Vec<ExecutionNode>,
337        HashMap<String, serde_yaml::Value>,
338        HashMap<String, serde_yaml::Value>,
339        String,
340        String,
341    );
342    type Output = ExecutionPlan;
343    type Error = anyhow::Error;
344
345    fn create_execution_plan(
346        &self,
347        input: Self::Input,
348    ) -> Result<Self::Output, Self::Error> {
349        let (nodes, env_vars, flow_vars, workflow_name, workflow_version) =
350            input;
351        self.create_execution_plan(
352            nodes,
353            env_vars,
354            flow_vars,
355            workflow_name,
356            workflow_version,
357        )
358    }
359
360    fn optimize_plan(
361        &self,
362        mut plan: Self::Output,
363    ) -> Result<Self::Output, Self::Error> {
364        self.optimize_for_parallelism(&mut plan)?;
365        Ok(plan)
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use flowbuilder_core::{ActionSpec, ExecutionNode};
373
374    #[test]
375    fn test_orchestrator_creation() {
376        let orchestrator = EnhancedFlowOrchestrator::new();
377        assert!(orchestrator.config.enable_parallel_optimization);
378    }
379
380    #[test]
381    fn test_dependency_graph_build() {
382        let orchestrator = EnhancedFlowOrchestrator::new();
383
384        let node1 = ExecutionNode::new(
385            "node1".to_string(),
386            "Node 1".to_string(),
387            ActionSpec {
388                action_type: "test".to_string(),
389                parameters: HashMap::new(),
390                outputs: HashMap::new(),
391            },
392        );
393
394        let node2 = ExecutionNode::new(
395            "node2".to_string(),
396            "Node 2".to_string(),
397            ActionSpec {
398                action_type: "test".to_string(),
399                parameters: HashMap::new(),
400                outputs: HashMap::new(),
401            },
402        )
403        .add_dependency("node1".to_string());
404
405        let nodes = vec![node1, node2];
406        let graph = orchestrator.build_dependency_graph(&nodes).unwrap();
407
408        assert_eq!(graph.len(), 2);
409        assert_eq!(graph.get("node1").unwrap().len(), 0);
410        assert_eq!(graph.get("node2").unwrap().len(), 1);
411    }
412}