1use anyhow::Result;
6use flowbuilder_core::{
7 ExecutionNode, ExecutionPhase, ExecutionPlan, FlowPlanner,
8 PhaseExecutionMode,
9};
10use std::collections::HashMap;
11
12pub struct EnhancedFlowOrchestrator {
14 config: OrchestratorConfig,
16}
17
18#[derive(Debug, Clone)]
20pub struct OrchestratorConfig {
21 pub enable_parallel_optimization: bool,
23 pub max_parallelism: usize,
25 pub enable_dependency_analysis: bool,
27 pub enable_condition_optimization: bool,
29}
30
31impl Default for OrchestratorConfig {
32 fn default() -> Self {
33 Self {
34 enable_parallel_optimization: true,
35 max_parallelism: 10,
36 enable_dependency_analysis: true,
37 enable_condition_optimization: true,
38 }
39 }
40}
41
42impl Default for EnhancedFlowOrchestrator {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl EnhancedFlowOrchestrator {
49 pub fn new() -> Self {
51 Self {
52 config: OrchestratorConfig::default(),
53 }
54 }
55
56 pub fn with_config(config: OrchestratorConfig) -> Self {
58 Self { config }
59 }
60
61 pub fn create_execution_plan(
63 &self,
64 nodes: Vec<ExecutionNode>,
65 env_vars: HashMap<String, serde_yaml::Value>,
66 flow_vars: HashMap<String, serde_yaml::Value>,
67 workflow_name: String,
68 workflow_version: String,
69 ) -> Result<ExecutionPlan> {
70 let mut plan = ExecutionPlan::new(
71 workflow_name,
72 workflow_version,
73 env_vars,
74 flow_vars,
75 );
76
77 let dependency_graph = self.build_dependency_graph(&nodes)?;
79
80 let sorted_layers = self.topological_sort(&nodes, &dependency_graph)?;
82
83 let phases = self.create_execution_phases(sorted_layers)?;
85
86 for phase in phases {
88 plan.add_phase(phase);
89 }
90
91 if self.config.enable_parallel_optimization {
93 self.optimize_for_parallelism(&mut plan)?;
94 }
95
96 plan.validate()
98 .map_err(|e| anyhow::anyhow!("执行计划验证失败: {}", e))?;
99
100 Ok(plan)
101 }
102
103 fn build_dependency_graph(
105 &self,
106 nodes: &[ExecutionNode],
107 ) -> Result<HashMap<String, Vec<String>>> {
108 let mut graph = HashMap::new();
109
110 for node in nodes {
111 graph.insert(node.id.clone(), node.dependencies.clone());
112 }
113
114 for (node_id, deps) in &graph {
116 for dep in deps {
117 if !graph.contains_key(dep) {
118 return Err(anyhow::anyhow!(
119 "节点 {} 依赖的节点 {} 不存在",
120 node_id,
121 dep
122 ));
123 }
124 }
125 }
126
127 Ok(graph)
128 }
129
130 fn topological_sort(
132 &self,
133 nodes: &[ExecutionNode],
134 _graph: &HashMap<String, Vec<String>>,
135 ) -> Result<Vec<Vec<ExecutionNode>>> {
136 let mut layers = Vec::new();
137 let mut remaining_nodes: HashMap<String, ExecutionNode> =
138 nodes.iter().map(|n| (n.id.clone(), n.clone())).collect();
139 let mut in_degree = HashMap::new();
140
141 for node in nodes {
143 in_degree.insert(node.id.clone(), node.dependencies.len());
144 }
145
146 while !remaining_nodes.is_empty() {
148 let mut current_layer = Vec::new();
149
150 let ready_nodes: Vec<String> = in_degree
152 .iter()
153 .filter(|(_, °ree)| degree == 0)
154 .map(|(id, _)| id.clone())
155 .collect();
156
157 if ready_nodes.is_empty() {
158 return Err(anyhow::anyhow!("检测到循环依赖"));
159 }
160
161 for node_id in ready_nodes {
163 if let Some(node) = remaining_nodes.remove(&node_id) {
164 current_layer.push(node);
165 in_degree.remove(&node_id);
166 }
167 }
168
169 for node in ¤t_layer {
171 for other_node in remaining_nodes.values() {
172 if other_node.dependencies.contains(&node.id) {
173 if let Some(degree) = in_degree.get_mut(&other_node.id)
174 {
175 *degree -= 1;
176 }
177 }
178 }
179 }
180
181 layers.push(current_layer);
182 }
183
184 Ok(layers)
185 }
186
187 fn create_execution_phases(
189 &self,
190 layers: Vec<Vec<ExecutionNode>>,
191 ) -> Result<Vec<ExecutionPhase>> {
192 let mut phases = Vec::new();
193
194 for (index, layer) in layers.into_iter().enumerate() {
195 let execution_mode = if layer.len() == 1 {
196 PhaseExecutionMode::Sequential
197 } else if layer.len() <= self.config.max_parallelism {
198 PhaseExecutionMode::Parallel
199 } else {
200 PhaseExecutionMode::Parallel
202 };
203
204 let phase = ExecutionPhase {
205 id: format!("phase_{index}"),
206 name: format!("执行阶段 {}", index + 1),
207 execution_mode,
208 nodes: layer,
209 condition: None,
210 };
211
212 phases.push(phase);
213 }
214
215 Ok(phases)
216 }
217
218 fn optimize_for_parallelism(&self, plan: &mut ExecutionPlan) -> Result<()> {
220 if !self.config.enable_parallel_optimization {
221 return Ok(());
222 }
223
224 for phase in &mut plan.phases {
226 if phase.nodes.len() > self.config.max_parallelism {
227 let chunks: Vec<Vec<ExecutionNode>> = phase
229 .nodes
230 .chunks(self.config.max_parallelism)
231 .map(|chunk| chunk.to_vec())
232 .collect();
233
234 let mut sub_phases = Vec::new();
236 for (i, chunk) in chunks.into_iter().enumerate() {
237 let sub_phase = ExecutionPhase {
238 id: format!("{}_sub_{}", phase.id, i),
239 name: format!("{} - 子阶段 {}", phase.name, i + 1),
240 execution_mode: PhaseExecutionMode::Parallel,
241 nodes: chunk,
242 condition: phase.condition.clone(),
243 };
244 sub_phases.push(sub_phase);
245 }
246
247 println!(
250 "阶段 {} 被优化为 {} 个子阶段",
251 phase.name,
252 sub_phases.len()
253 );
254 }
255 }
256
257 Ok(())
258 }
259
260 pub fn analyze_complexity(
262 &self,
263 plan: &ExecutionPlan,
264 ) -> ExecutionComplexity {
265 let mut total_nodes = 0;
266 let mut max_parallel_nodes = 0;
267 let mut total_dependencies = 0;
268 let mut conditional_nodes = 0;
269
270 for phase in &plan.phases {
271 total_nodes += phase.nodes.len();
272
273 if matches!(phase.execution_mode, PhaseExecutionMode::Parallel) {
274 max_parallel_nodes = max_parallel_nodes.max(phase.nodes.len());
275 }
276
277 for node in &phase.nodes {
278 total_dependencies += node.dependencies.len();
279 if node.condition.is_some() {
280 conditional_nodes += 1;
281 }
282 }
283 }
284
285 ExecutionComplexity {
286 total_nodes,
287 total_phases: plan.phases.len(),
288 max_parallel_nodes,
289 total_dependencies,
290 conditional_nodes,
291 complexity_score: self.calculate_complexity_score(
292 total_nodes,
293 total_dependencies,
294 conditional_nodes,
295 max_parallel_nodes,
296 ),
297 }
298 }
299
300 fn calculate_complexity_score(
302 &self,
303 total_nodes: usize,
304 total_dependencies: usize,
305 conditional_nodes: usize,
306 max_parallel_nodes: usize,
307 ) -> f64 {
308 let base_score = total_nodes as f64;
309 let dependency_penalty = (total_dependencies as f64) * 0.5;
310 let condition_penalty = (conditional_nodes as f64) * 0.3;
311 let parallel_bonus = (max_parallel_nodes as f64) * 0.2;
312
313 base_score + dependency_penalty + condition_penalty - parallel_bonus
314 }
315}
316
317#[derive(Debug, Clone)]
319pub struct ExecutionComplexity {
320 pub total_nodes: usize,
322 pub total_phases: usize,
324 pub max_parallel_nodes: usize,
326 pub total_dependencies: usize,
328 pub conditional_nodes: usize,
330 pub complexity_score: f64,
332}
333
334impl FlowPlanner for EnhancedFlowOrchestrator {
335 type Input = (
336 Vec<ExecutionNode>,
337 HashMap<String, serde_yaml::Value>,
338 HashMap<String, serde_yaml::Value>,
339 String,
340 String,
341 );
342 type Output = ExecutionPlan;
343 type Error = anyhow::Error;
344
345 fn create_execution_plan(
346 &self,
347 input: Self::Input,
348 ) -> Result<Self::Output, Self::Error> {
349 let (nodes, env_vars, flow_vars, workflow_name, workflow_version) =
350 input;
351 self.create_execution_plan(
352 nodes,
353 env_vars,
354 flow_vars,
355 workflow_name,
356 workflow_version,
357 )
358 }
359
360 fn optimize_plan(
361 &self,
362 mut plan: Self::Output,
363 ) -> Result<Self::Output, Self::Error> {
364 self.optimize_for_parallelism(&mut plan)?;
365 Ok(plan)
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use flowbuilder_core::{ActionSpec, ExecutionNode};
373
374 #[test]
375 fn test_orchestrator_creation() {
376 let orchestrator = EnhancedFlowOrchestrator::new();
377 assert!(orchestrator.config.enable_parallel_optimization);
378 }
379
380 #[test]
381 fn test_dependency_graph_build() {
382 let orchestrator = EnhancedFlowOrchestrator::new();
383
384 let node1 = ExecutionNode::new(
385 "node1".to_string(),
386 "Node 1".to_string(),
387 ActionSpec {
388 action_type: "test".to_string(),
389 parameters: HashMap::new(),
390 outputs: HashMap::new(),
391 },
392 );
393
394 let node2 = ExecutionNode::new(
395 "node2".to_string(),
396 "Node 2".to_string(),
397 ActionSpec {
398 action_type: "test".to_string(),
399 parameters: HashMap::new(),
400 outputs: HashMap::new(),
401 },
402 )
403 .add_dependency("node1".to_string());
404
405 let nodes = vec![node1, node2];
406 let graph = orchestrator.build_dependency_graph(&nodes).unwrap();
407
408 assert_eq!(graph.len(), 2);
409 assert_eq!(graph.get("node1").unwrap().len(), 0);
410 assert_eq!(graph.get("node2").unwrap().len(), 1);
411 }
412}