flowbuilder_yaml/
config_parser.rs1use crate::config::{ActionDefinition, TaskDefinition, WorkflowConfig};
6use anyhow::Result;
7use flowbuilder_core::{
8 ActionSpec, ConfigParser, ExecutionNode, NodeType, RetryConfig,
9 RetryStrategy, TimeoutConfig,
10};
11use std::collections::HashMap;
12
13pub struct YamlConfigParser {
15 config: WorkflowConfig,
16}
17
18impl YamlConfigParser {
19 pub fn new(config: WorkflowConfig) -> Self {
21 Self { config }
22 }
23
24 pub fn parse(&self) -> Result<Vec<ExecutionNode>> {
26 let mut nodes = Vec::new();
27
28 for task_wrapper in &self.config.workflow.tasks {
29 let task = &task_wrapper.task;
30
31 let node = self.create_execution_node(task)?;
33 nodes.push(node);
34 }
35
36 Ok(nodes)
37 }
38
39 fn create_execution_node(
41 &self,
42 task: &TaskDefinition,
43 ) -> Result<ExecutionNode> {
44 let action_spec = self.merge_task_actions(task)?;
46
47 let mut node =
48 ExecutionNode::new(task.id.clone(), task.name.clone(), action_spec);
49
50 node.node_type = self.determine_node_type(task);
52
53 node.dependencies = self.extract_dependencies(task)?;
55
56 node.condition = self.extract_condition(task)?;
58
59 node.priority = self.determine_priority(task)?;
61
62 if let Some(retry_config) = self.extract_retry_config(task)? {
64 node.retry_config = Some(retry_config);
65 }
66
67 if let Some(timeout_config) = self.extract_timeout_config(task)? {
69 node.timeout_config = Some(timeout_config);
70 }
71
72 Ok(node)
73 }
74
75 fn merge_task_actions(&self, task: &TaskDefinition) -> Result<ActionSpec> {
77 if task.actions.is_empty() {
78 return Err(anyhow::anyhow!("任务 {} 没有动作", task.id));
79 }
80
81 if task.actions.len() == 1 {
83 return self.convert_action_to_spec(&task.actions[0].action);
84 }
85
86 let mut parameters = HashMap::new();
88 let mut outputs = HashMap::new();
89
90 for (index, action_wrapper) in task.actions.iter().enumerate() {
91 let action = &action_wrapper.action;
92
93 let prefix = format!("action_{index}");
95
96 for (key, value) in &action.parameters {
97 parameters
98 .insert(format!("{prefix}_{key}"), value.value.clone());
99 }
100
101 for (key, value) in &action.outputs {
102 outputs.insert(format!("{prefix}_{key}"), value.clone());
103 }
104 }
105
106 Ok(ActionSpec {
107 action_type: "composite".to_string(),
108 parameters,
109 outputs,
110 })
111 }
112
113 fn convert_action_to_spec(
115 &self,
116 action: &ActionDefinition,
117 ) -> Result<ActionSpec> {
118 let mut parameters = HashMap::new();
119 for (key, param) in &action.parameters {
120 parameters.insert(key.clone(), param.value.clone());
121 }
122
123 Ok(ActionSpec {
124 action_type: format!("{:?}", action.action_type).to_lowercase(),
125 parameters,
126 outputs: action.outputs.clone(),
127 })
128 }
129
130 fn determine_node_type(&self, task: &TaskDefinition) -> NodeType {
132 for action_wrapper in &task.actions {
134 let action = &action_wrapper.action;
135 if action.flow.next_if.is_some() {
136 return NodeType::Condition;
137 }
138 if action.flow.while_util.is_some() {
139 return NodeType::Loop;
140 }
141 }
142
143 for action_wrapper in &task.actions {
145 let action = &action_wrapper.action;
146 if action.flow.next.is_some()
147 && action.flow.next.as_ref().unwrap() != "null"
148 {
149 return NodeType::Branch;
150 }
151 }
152
153 NodeType::Action
155 }
156
157 fn extract_dependencies(
159 &self,
160 task: &TaskDefinition,
161 ) -> Result<Vec<String>> {
162 let mut deps = Vec::new();
163
164 for action_wrapper in &task.actions {
166 let action = &action_wrapper.action;
167
168 if let Some(next) = &action.flow.next {
170 if next != "null" {
171 deps.push(next.clone());
174 }
175 }
176 }
177
178 deps.sort();
180 deps.dedup();
181
182 Ok(deps)
183 }
184
185 fn extract_condition(
187 &self,
188 task: &TaskDefinition,
189 ) -> Result<Option<String>> {
190 if let Some(action_wrapper) = task.actions.first() {
192 Ok(action_wrapper.action.flow.next_if.clone())
193 } else {
194 Ok(None)
195 }
196 }
197
198 fn determine_priority(&self, task: &TaskDefinition) -> Result<u32> {
200 let name_lower = task.name.to_lowercase();
202 let desc_lower = task.description.to_lowercase();
203
204 if name_lower.contains("critical") || desc_lower.contains("critical") {
205 Ok(1) } else if name_lower.contains("urgent") || desc_lower.contains("urgent")
207 {
208 Ok(2)
209 } else if name_lower.contains("high") || desc_lower.contains("high") {
210 Ok(10)
211 } else if name_lower.contains("low") || desc_lower.contains("low") {
212 Ok(200)
213 } else {
214 Ok(100) }
216 }
217
218 fn extract_retry_config(
220 &self,
221 task: &TaskDefinition,
222 ) -> Result<Option<RetryConfig>> {
223 if let Some(action_wrapper) = task.actions.first() {
225 if let Some(retry) = &action_wrapper.action.flow.retry {
226 let strategy = if retry.delay > 0 {
227 RetryStrategy::Fixed
228 } else {
229 RetryStrategy::Exponential { multiplier: 2.0 }
230 };
231
232 return Ok(Some(RetryConfig {
233 max_retries: retry.max_retries,
234 delay: retry.delay,
235 strategy,
236 }));
237 }
238 }
239
240 Ok(None)
241 }
242
243 fn extract_timeout_config(
245 &self,
246 task: &TaskDefinition,
247 ) -> Result<Option<TimeoutConfig>> {
248 if let Some(action_wrapper) = task.actions.first() {
250 if let Some(timeout) = &action_wrapper.action.flow.timeout {
251 return Ok(Some(TimeoutConfig {
252 duration: timeout.duration,
253 on_timeout: action_wrapper.action.flow.on_timeout.clone(),
254 }));
255 }
256 }
257
258 Ok(None)
259 }
260
261 pub fn get_env_vars(&self) -> HashMap<String, String> {
263 self.config.workflow.env.clone()
264 }
265
266 pub fn get_flow_vars(&self) -> HashMap<String, serde_yaml::Value> {
268 self.config.workflow.vars.clone()
269 }
270
271 pub fn get_workflow_name(&self) -> String {
273 self.config
274 .workflow
275 .vars
276 .get("name")
277 .and_then(|v| v.as_str())
278 .unwrap_or("Unknown Workflow")
279 .to_string()
280 }
281
282 pub fn get_workflow_version(&self) -> String {
284 self.config.workflow.version.clone()
285 }
286
287 pub fn validate(&self) -> Result<()> {
289 if self.config.workflow.tasks.is_empty() {
290 return Err(anyhow::anyhow!("工作流没有任务"));
291 }
292
293 for task_wrapper in &self.config.workflow.tasks {
294 let task = &task_wrapper.task;
295
296 if task.id.is_empty() {
297 return Err(anyhow::anyhow!("任务ID不能为空"));
298 }
299
300 if task.name.is_empty() {
301 return Err(anyhow::anyhow!("任务名称不能为空"));
302 }
303
304 if task.actions.is_empty() {
305 return Err(anyhow::anyhow!("任务 {} 没有动作", task.id));
306 }
307
308 for action_wrapper in &task.actions {
310 let action = &action_wrapper.action;
311
312 if action.id.is_empty() {
313 return Err(anyhow::anyhow!("动作ID不能为空"));
314 }
315
316 if action.name.is_empty() {
317 return Err(anyhow::anyhow!("动作名称不能为空"));
318 }
319 }
320 }
321
322 Ok(())
323 }
324}
325
326impl ConfigParser<WorkflowConfig> for YamlConfigParser {
327 type Output = Vec<ExecutionNode>;
328 type Error = anyhow::Error;
329
330 fn parse(
331 &self,
332 config: WorkflowConfig,
333 ) -> Result<Self::Output, Self::Error> {
334 let parser = YamlConfigParser::new(config);
335 parser.parse()
336 }
337}
338
339#[derive(Debug, Clone)]
341pub struct ParseResult {
342 pub nodes: Vec<ExecutionNode>,
344 pub env_vars: HashMap<String, String>,
346 pub flow_vars: HashMap<String, serde_yaml::Value>,
348 pub workflow_name: String,
350 pub workflow_version: String,
352}
353
354impl YamlConfigParser {
355 pub fn parse_full(&self) -> Result<ParseResult> {
357 let nodes = self.parse()?;
358
359 Ok(ParseResult {
360 nodes,
361 env_vars: self.get_env_vars(),
362 flow_vars: self.get_flow_vars(),
363 workflow_name: self.get_workflow_name(),
364 workflow_version: self.get_workflow_version(),
365 })
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::loader::WorkflowLoader;
373
374 #[test]
375 fn test_yaml_parser_creation() {
376 let yaml_content = r#"
377workflow:
378 version: "1.0"
379 env:
380 TEST_ENV: "test"
381 vars:
382 name: "Test Workflow"
383 tasks:
384 - task:
385 id: "task1"
386 name: "Test Task"
387 description: "A test task"
388 actions:
389 - action:
390 id: "action1"
391 name: "Test Action"
392 description: "A test action"
393 type: "builtin"
394 flow:
395 next: null
396 outputs: {}
397 parameters: {}
398"#;
399
400 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
401 let parser = YamlConfigParser::new(config);
402
403 assert!(parser.validate().is_ok());
404 }
405
406 #[test]
407 fn test_parse_nodes() {
408 let yaml_content = r#"
409workflow:
410 version: "1.0"
411 env:
412 TEST_ENV: "test"
413 vars:
414 name: "Test Workflow"
415 tasks:
416 - task:
417 id: "task1"
418 name: "Test Task"
419 description: "A test task"
420 actions:
421 - action:
422 id: "action1"
423 name: "Test Action"
424 description: "A test action"
425 type: "builtin"
426 flow:
427 next: null
428 outputs: {}
429 parameters: {}
430"#;
431
432 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
433 let parser = YamlConfigParser::new(config);
434 let nodes = parser.parse().unwrap();
435
436 assert_eq!(nodes.len(), 1);
437 assert_eq!(nodes[0].id, "task1");
438 assert_eq!(nodes[0].name, "Test Task");
439 assert_eq!(nodes[0].action_spec.action_type, "builtin");
440 }
441
442 #[test]
443 fn test_parse_full_result() {
444 let yaml_content = r#"
445workflow:
446 version: "1.0"
447 env:
448 TEST_ENV: "test"
449 vars:
450 name: "Test Workflow"
451 tasks:
452 - task:
453 id: "task1"
454 name: "Test Task"
455 description: "A test task"
456 actions:
457 - action:
458 id: "action1"
459 name: "Test Action"
460 description: "A test action"
461 type: "builtin"
462 flow:
463 next: null
464 outputs: {}
465 parameters: {}
466"#;
467
468 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
469 let parser = YamlConfigParser::new(config);
470 let result = parser.parse_full().unwrap();
471
472 assert_eq!(result.nodes.len(), 1);
473 assert_eq!(result.workflow_name, "Test Workflow");
474 assert_eq!(result.workflow_version, "1.0");
475 assert!(result.env_vars.contains_key("TEST_ENV"));
476 assert!(result.flow_vars.contains_key("name"));
477 }
478}