flowbuilder_yaml/
config_parser.rs

1//! # FlowBuilder YAML - 配置解析器
2//!
3//! 从YAML配置解析生成执行节点
4
5use crate::config::{ActionDefinition, TaskDefinition, WorkflowConfig};
6use anyhow::Result;
7use flowbuilder_core::{
8    ActionSpec, ConfigParser, ExecutionNode, NodeType, RetryConfig,
9    RetryStrategy, TimeoutConfig,
10};
11use std::collections::HashMap;
12
13/// YAML配置解析器
14pub struct YamlConfigParser {
15    config: WorkflowConfig,
16}
17
18impl YamlConfigParser {
19    /// 创建新的配置解析器
20    pub fn new(config: WorkflowConfig) -> Self {
21        Self { config }
22    }
23
24    /// 解析配置,生成执行节点列表
25    pub fn parse(&self) -> Result<Vec<ExecutionNode>> {
26        let mut nodes = Vec::new();
27
28        for task_wrapper in &self.config.workflow.tasks {
29            let task = &task_wrapper.task;
30
31            // 为每个任务创建执行节点
32            let node = self.create_execution_node(task)?;
33            nodes.push(node);
34        }
35
36        Ok(nodes)
37    }
38
39    /// 创建执行节点
40    fn create_execution_node(
41        &self,
42        task: &TaskDefinition,
43    ) -> Result<ExecutionNode> {
44        // 合并所有动作为一个节点
45        let action_spec = self.merge_task_actions(task)?;
46
47        let mut node =
48            ExecutionNode::new(task.id.clone(), task.name.clone(), action_spec);
49
50        // 设置节点类型
51        node.node_type = self.determine_node_type(task);
52
53        // 提取依赖关系
54        node.dependencies = self.extract_dependencies(task)?;
55
56        // 提取执行条件
57        node.condition = self.extract_condition(task)?;
58
59        // 设置优先级
60        node.priority = self.determine_priority(task)?;
61
62        // 设置重试配置
63        if let Some(retry_config) = self.extract_retry_config(task)? {
64            node.retry_config = Some(retry_config);
65        }
66
67        // 设置超时配置
68        if let Some(timeout_config) = self.extract_timeout_config(task)? {
69            node.timeout_config = Some(timeout_config);
70        }
71
72        Ok(node)
73    }
74
75    /// 合并任务中的所有动作
76    fn merge_task_actions(&self, task: &TaskDefinition) -> Result<ActionSpec> {
77        if task.actions.is_empty() {
78            return Err(anyhow::anyhow!("任务 {} 没有动作", task.id));
79        }
80
81        // 如果只有一个动作,直接使用
82        if task.actions.len() == 1 {
83            return self.convert_action_to_spec(&task.actions[0].action);
84        }
85
86        // 多个动作时,创建一个复合动作
87        let mut parameters = HashMap::new();
88        let mut outputs = HashMap::new();
89
90        for (index, action_wrapper) in task.actions.iter().enumerate() {
91            let action = &action_wrapper.action;
92
93            // 为每个动作添加前缀
94            let prefix = format!("action_{index}");
95
96            for (key, value) in &action.parameters {
97                parameters
98                    .insert(format!("{prefix}_{key}"), value.value.clone());
99            }
100
101            for (key, value) in &action.outputs {
102                outputs.insert(format!("{prefix}_{key}"), value.clone());
103            }
104        }
105
106        Ok(ActionSpec {
107            action_type: "composite".to_string(),
108            parameters,
109            outputs,
110        })
111    }
112
113    /// 将动作定义转换为动作规格
114    fn convert_action_to_spec(
115        &self,
116        action: &ActionDefinition,
117    ) -> Result<ActionSpec> {
118        let mut parameters = HashMap::new();
119        for (key, param) in &action.parameters {
120            parameters.insert(key.clone(), param.value.clone());
121        }
122
123        Ok(ActionSpec {
124            action_type: format!("{:?}", action.action_type).to_lowercase(),
125            parameters,
126            outputs: action.outputs.clone(),
127        })
128    }
129
130    /// 确定节点类型
131    fn determine_node_type(&self, task: &TaskDefinition) -> NodeType {
132        // 检查是否有条件逻辑
133        for action_wrapper in &task.actions {
134            let action = &action_wrapper.action;
135            if action.flow.next_if.is_some() {
136                return NodeType::Condition;
137            }
138            if action.flow.while_util.is_some() {
139                return NodeType::Loop;
140            }
141        }
142
143        // 检查是否有分支逻辑
144        for action_wrapper in &task.actions {
145            let action = &action_wrapper.action;
146            if action.flow.next.is_some()
147                && action.flow.next.as_ref().unwrap() != "null"
148            {
149                return NodeType::Branch;
150            }
151        }
152
153        // 默认为动作节点
154        NodeType::Action
155    }
156
157    /// 提取任务依赖关系
158    fn extract_dependencies(
159        &self,
160        task: &TaskDefinition,
161    ) -> Result<Vec<String>> {
162        let mut deps = Vec::new();
163
164        // 从动作中提取依赖
165        for action_wrapper in &task.actions {
166            let action = &action_wrapper.action;
167
168            // 如果动作有next字段,可能表示依赖关系
169            if let Some(next) = &action.flow.next {
170                if next != "null" {
171                    // 这里需要根据实际的依赖逻辑来处理
172                    // 简化处理:假设next指向的是依赖的任务
173                    deps.push(next.clone());
174                }
175            }
176        }
177
178        // 移除重复的依赖
179        deps.sort();
180        deps.dedup();
181
182        Ok(deps)
183    }
184
185    /// 提取执行条件
186    fn extract_condition(
187        &self,
188        task: &TaskDefinition,
189    ) -> Result<Option<String>> {
190        // 从第一个动作的条件中提取
191        if let Some(action_wrapper) = task.actions.first() {
192            Ok(action_wrapper.action.flow.next_if.clone())
193        } else {
194            Ok(None)
195        }
196    }
197
198    /// 确定优先级
199    fn determine_priority(&self, task: &TaskDefinition) -> Result<u32> {
200        // 根据任务名称或描述确定优先级
201        let name_lower = task.name.to_lowercase();
202        let desc_lower = task.description.to_lowercase();
203
204        if name_lower.contains("critical") || desc_lower.contains("critical") {
205            Ok(1) // 最高优先级
206        } else if name_lower.contains("urgent") || desc_lower.contains("urgent")
207        {
208            Ok(2)
209        } else if name_lower.contains("high") || desc_lower.contains("high") {
210            Ok(10)
211        } else if name_lower.contains("low") || desc_lower.contains("low") {
212            Ok(200)
213        } else {
214            Ok(100) // 默认优先级
215        }
216    }
217
218    /// 提取重试配置
219    fn extract_retry_config(
220        &self,
221        task: &TaskDefinition,
222    ) -> Result<Option<RetryConfig>> {
223        // 从第一个动作的重试配置中提取
224        if let Some(action_wrapper) = task.actions.first() {
225            if let Some(retry) = &action_wrapper.action.flow.retry {
226                let strategy = if retry.delay > 0 {
227                    RetryStrategy::Fixed
228                } else {
229                    RetryStrategy::Exponential { multiplier: 2.0 }
230                };
231
232                return Ok(Some(RetryConfig {
233                    max_retries: retry.max_retries,
234                    delay: retry.delay,
235                    strategy,
236                }));
237            }
238        }
239
240        Ok(None)
241    }
242
243    /// 提取超时配置
244    fn extract_timeout_config(
245        &self,
246        task: &TaskDefinition,
247    ) -> Result<Option<TimeoutConfig>> {
248        // 从第一个动作的超时配置中提取
249        if let Some(action_wrapper) = task.actions.first() {
250            if let Some(timeout) = &action_wrapper.action.flow.timeout {
251                return Ok(Some(TimeoutConfig {
252                    duration: timeout.duration,
253                    on_timeout: action_wrapper.action.flow.on_timeout.clone(),
254                }));
255            }
256        }
257
258        Ok(None)
259    }
260
261    /// 获取环境变量
262    pub fn get_env_vars(&self) -> HashMap<String, String> {
263        self.config.workflow.env.clone()
264    }
265
266    /// 获取流程变量
267    pub fn get_flow_vars(&self) -> HashMap<String, serde_yaml::Value> {
268        self.config.workflow.vars.clone()
269    }
270
271    /// 获取工作流名称
272    pub fn get_workflow_name(&self) -> String {
273        self.config
274            .workflow
275            .vars
276            .get("name")
277            .and_then(|v| v.as_str())
278            .unwrap_or("Unknown Workflow")
279            .to_string()
280    }
281
282    /// 获取工作流版本
283    pub fn get_workflow_version(&self) -> String {
284        self.config.workflow.version.clone()
285    }
286
287    /// 验证配置的有效性
288    pub fn validate(&self) -> Result<()> {
289        if self.config.workflow.tasks.is_empty() {
290            return Err(anyhow::anyhow!("工作流没有任务"));
291        }
292
293        for task_wrapper in &self.config.workflow.tasks {
294            let task = &task_wrapper.task;
295
296            if task.id.is_empty() {
297                return Err(anyhow::anyhow!("任务ID不能为空"));
298            }
299
300            if task.name.is_empty() {
301                return Err(anyhow::anyhow!("任务名称不能为空"));
302            }
303
304            if task.actions.is_empty() {
305                return Err(anyhow::anyhow!("任务 {} 没有动作", task.id));
306            }
307
308            // 验证每个动作
309            for action_wrapper in &task.actions {
310                let action = &action_wrapper.action;
311
312                if action.id.is_empty() {
313                    return Err(anyhow::anyhow!("动作ID不能为空"));
314                }
315
316                if action.name.is_empty() {
317                    return Err(anyhow::anyhow!("动作名称不能为空"));
318                }
319            }
320        }
321
322        Ok(())
323    }
324}
325
326impl ConfigParser<WorkflowConfig> for YamlConfigParser {
327    type Output = Vec<ExecutionNode>;
328    type Error = anyhow::Error;
329
330    fn parse(
331        &self,
332        config: WorkflowConfig,
333    ) -> Result<Self::Output, Self::Error> {
334        let parser = YamlConfigParser::new(config);
335        parser.parse()
336    }
337}
338
339/// 配置解析结果
340#[derive(Debug, Clone)]
341pub struct ParseResult {
342    /// 执行节点列表
343    pub nodes: Vec<ExecutionNode>,
344    /// 环境变量
345    pub env_vars: HashMap<String, String>,
346    /// 流程变量
347    pub flow_vars: HashMap<String, serde_yaml::Value>,
348    /// 工作流名称
349    pub workflow_name: String,
350    /// 工作流版本
351    pub workflow_version: String,
352}
353
354impl YamlConfigParser {
355    /// 解析配置并返回完整结果
356    pub fn parse_full(&self) -> Result<ParseResult> {
357        let nodes = self.parse()?;
358
359        Ok(ParseResult {
360            nodes,
361            env_vars: self.get_env_vars(),
362            flow_vars: self.get_flow_vars(),
363            workflow_name: self.get_workflow_name(),
364            workflow_version: self.get_workflow_version(),
365        })
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::loader::WorkflowLoader;
373
374    #[test]
375    fn test_yaml_parser_creation() {
376        let yaml_content = r#"
377workflow:
378  version: "1.0"
379  env:
380    TEST_ENV: "test"
381  vars:
382    name: "Test Workflow"
383  tasks:
384    - task:
385        id: "task1"
386        name: "Test Task"
387        description: "A test task"
388        actions:
389          - action:
390              id: "action1"
391              name: "Test Action"
392              description: "A test action"
393              type: "builtin"
394              flow:
395                next: null
396              outputs: {}
397              parameters: {}
398"#;
399
400        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
401        let parser = YamlConfigParser::new(config);
402
403        assert!(parser.validate().is_ok());
404    }
405
406    #[test]
407    fn test_parse_nodes() {
408        let yaml_content = r#"
409workflow:
410  version: "1.0"
411  env:
412    TEST_ENV: "test"
413  vars:
414    name: "Test Workflow"
415  tasks:
416    - task:
417        id: "task1"
418        name: "Test Task"
419        description: "A test task"
420        actions:
421          - action:
422              id: "action1"
423              name: "Test Action"
424              description: "A test action"
425              type: "builtin"
426              flow:
427                next: null
428              outputs: {}
429              parameters: {}
430"#;
431
432        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
433        let parser = YamlConfigParser::new(config);
434        let nodes = parser.parse().unwrap();
435
436        assert_eq!(nodes.len(), 1);
437        assert_eq!(nodes[0].id, "task1");
438        assert_eq!(nodes[0].name, "Test Task");
439        assert_eq!(nodes[0].action_spec.action_type, "builtin");
440    }
441
442    #[test]
443    fn test_parse_full_result() {
444        let yaml_content = r#"
445workflow:
446  version: "1.0"
447  env:
448    TEST_ENV: "test"
449  vars:
450    name: "Test Workflow"
451  tasks:
452    - task:
453        id: "task1"
454        name: "Test Task"
455        description: "A test task"
456        actions:
457          - action:
458              id: "action1"
459              name: "Test Action"
460              description: "A test action"
461              type: "builtin"
462              flow:
463                next: null
464              outputs: {}
465              parameters: {}
466"#;
467
468        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
469        let parser = YamlConfigParser::new(config);
470        let result = parser.parse_full().unwrap();
471
472        assert_eq!(result.nodes.len(), 1);
473        assert_eq!(result.workflow_name, "Test Workflow");
474        assert_eq!(result.workflow_version, "1.0");
475        assert!(result.env_vars.contains_key("TEST_ENV"));
476        assert!(result.flow_vars.contains_key("name"));
477    }
478}