use crate::config::{ActionDefinition, TaskDefinition, WorkflowConfig};
use anyhow::Result;
use flowbuilder_core::{
ActionSpec, ConfigParser, ExecutionNode, NodeType, RetryConfig,
RetryStrategy, TimeoutConfig,
};
use std::collections::HashMap;
pub struct YamlConfigParser {
config: WorkflowConfig,
}
impl YamlConfigParser {
pub fn new(config: WorkflowConfig) -> Self {
Self { config }
}
pub fn parse(&self) -> Result<Vec<ExecutionNode>> {
let mut nodes = Vec::new();
for task_wrapper in &self.config.workflow.tasks {
let task = &task_wrapper.task;
let node = self.create_execution_node(task)?;
nodes.push(node);
}
Ok(nodes)
}
fn create_execution_node(
&self,
task: &TaskDefinition,
) -> Result<ExecutionNode> {
let action_spec = self.merge_task_actions(task)?;
let mut node =
ExecutionNode::new(task.id.clone(), task.name.clone(), action_spec);
node.node_type = self.determine_node_type(task);
node.dependencies = self.extract_dependencies(task)?;
node.condition = self.extract_condition(task)?;
node.priority = self.determine_priority(task)?;
if let Some(retry_config) = self.extract_retry_config(task)? {
node.retry_config = Some(retry_config);
}
if let Some(timeout_config) = self.extract_timeout_config(task)? {
node.timeout_config = Some(timeout_config);
}
Ok(node)
}
fn merge_task_actions(&self, task: &TaskDefinition) -> Result<ActionSpec> {
if task.actions.is_empty() {
return Err(anyhow::anyhow!("任务 {} 没有动作", task.id));
}
if task.actions.len() == 1 {
return self.convert_action_to_spec(&task.actions[0].action);
}
let mut parameters = HashMap::new();
let mut outputs = HashMap::new();
for (index, action_wrapper) in task.actions.iter().enumerate() {
let action = &action_wrapper.action;
let prefix = format!("action_{index}");
for (key, value) in &action.parameters {
parameters
.insert(format!("{prefix}_{key}"), value.value.clone());
}
for (key, value) in &action.outputs {
outputs.insert(format!("{prefix}_{key}"), value.clone());
}
}
Ok(ActionSpec {
action_type: "composite".to_string(),
parameters,
outputs,
})
}
fn convert_action_to_spec(
&self,
action: &ActionDefinition,
) -> Result<ActionSpec> {
let mut parameters = HashMap::new();
for (key, param) in &action.parameters {
parameters.insert(key.clone(), param.value.clone());
}
Ok(ActionSpec {
action_type: format!("{:?}", action.action_type).to_lowercase(),
parameters,
outputs: action.outputs.clone(),
})
}
fn determine_node_type(&self, task: &TaskDefinition) -> NodeType {
for action_wrapper in &task.actions {
let action = &action_wrapper.action;
if action.flow.next_if.is_some() {
return NodeType::Condition;
}
if action.flow.while_util.is_some() {
return NodeType::Loop;
}
}
for action_wrapper in &task.actions {
let action = &action_wrapper.action;
if action.flow.next.is_some()
&& action.flow.next.as_ref().unwrap() != "null"
{
return NodeType::Branch;
}
}
NodeType::Action
}
fn extract_dependencies(
&self,
task: &TaskDefinition,
) -> Result<Vec<String>> {
let mut deps = Vec::new();
for action_wrapper in &task.actions {
let action = &action_wrapper.action;
if let Some(next) = &action.flow.next {
if next != "null" {
deps.push(next.clone());
}
}
}
deps.sort();
deps.dedup();
Ok(deps)
}
fn extract_condition(
&self,
task: &TaskDefinition,
) -> Result<Option<String>> {
if let Some(action_wrapper) = task.actions.first() {
Ok(action_wrapper.action.flow.next_if.clone())
} else {
Ok(None)
}
}
fn determine_priority(&self, task: &TaskDefinition) -> Result<u32> {
let name_lower = task.name.to_lowercase();
let desc_lower = task.description.to_lowercase();
if name_lower.contains("critical") || desc_lower.contains("critical") {
Ok(1) } else if name_lower.contains("urgent") || desc_lower.contains("urgent")
{
Ok(2)
} else if name_lower.contains("high") || desc_lower.contains("high") {
Ok(10)
} else if name_lower.contains("low") || desc_lower.contains("low") {
Ok(200)
} else {
Ok(100) }
}
fn extract_retry_config(
&self,
task: &TaskDefinition,
) -> Result<Option<RetryConfig>> {
if let Some(action_wrapper) = task.actions.first() {
if let Some(retry) = &action_wrapper.action.flow.retry {
let strategy = if retry.delay > 0 {
RetryStrategy::Fixed
} else {
RetryStrategy::Exponential { multiplier: 2.0 }
};
return Ok(Some(RetryConfig {
max_retries: retry.max_retries,
delay: retry.delay,
strategy,
}));
}
}
Ok(None)
}
fn extract_timeout_config(
&self,
task: &TaskDefinition,
) -> Result<Option<TimeoutConfig>> {
if let Some(action_wrapper) = task.actions.first() {
if let Some(timeout) = &action_wrapper.action.flow.timeout {
return Ok(Some(TimeoutConfig {
duration: timeout.duration,
on_timeout: action_wrapper.action.flow.on_timeout.clone(),
}));
}
}
Ok(None)
}
pub fn get_env_vars(&self) -> HashMap<String, String> {
self.config.workflow.env.clone()
}
pub fn get_flow_vars(&self) -> HashMap<String, serde_yaml::Value> {
self.config.workflow.vars.clone()
}
pub fn get_workflow_name(&self) -> String {
self.config
.workflow
.vars
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("Unknown Workflow")
.to_string()
}
pub fn get_workflow_version(&self) -> String {
self.config.workflow.version.clone()
}
pub fn validate(&self) -> Result<()> {
if self.config.workflow.tasks.is_empty() {
return Err(anyhow::anyhow!("工作流没有任务"));
}
for task_wrapper in &self.config.workflow.tasks {
let task = &task_wrapper.task;
if task.id.is_empty() {
return Err(anyhow::anyhow!("任务ID不能为空"));
}
if task.name.is_empty() {
return Err(anyhow::anyhow!("任务名称不能为空"));
}
if task.actions.is_empty() {
return Err(anyhow::anyhow!("任务 {} 没有动作", task.id));
}
for action_wrapper in &task.actions {
let action = &action_wrapper.action;
if action.id.is_empty() {
return Err(anyhow::anyhow!("动作ID不能为空"));
}
if action.name.is_empty() {
return Err(anyhow::anyhow!("动作名称不能为空"));
}
}
}
Ok(())
}
}
impl ConfigParser<WorkflowConfig> for YamlConfigParser {
type Output = Vec<ExecutionNode>;
type Error = anyhow::Error;
fn parse(
&self,
config: WorkflowConfig,
) -> Result<Self::Output, Self::Error> {
let parser = YamlConfigParser::new(config);
parser.parse()
}
}
#[derive(Debug, Clone)]
pub struct ParseResult {
pub nodes: Vec<ExecutionNode>,
pub env_vars: HashMap<String, String>,
pub flow_vars: HashMap<String, serde_yaml::Value>,
pub workflow_name: String,
pub workflow_version: String,
}
impl YamlConfigParser {
pub fn parse_full(&self) -> Result<ParseResult> {
let nodes = self.parse()?;
Ok(ParseResult {
nodes,
env_vars: self.get_env_vars(),
flow_vars: self.get_flow_vars(),
workflow_name: self.get_workflow_name(),
workflow_version: self.get_workflow_version(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::loader::WorkflowLoader;
#[test]
fn test_yaml_parser_creation() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
TEST_ENV: "test"
vars:
name: "Test Workflow"
tasks:
- task:
id: "task1"
name: "Test Task"
description: "A test task"
actions:
- action:
id: "action1"
name: "Test Action"
description: "A test action"
type: "builtin"
flow:
next: null
outputs: {}
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let parser = YamlConfigParser::new(config);
assert!(parser.validate().is_ok());
}
#[test]
fn test_parse_nodes() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
TEST_ENV: "test"
vars:
name: "Test Workflow"
tasks:
- task:
id: "task1"
name: "Test Task"
description: "A test task"
actions:
- action:
id: "action1"
name: "Test Action"
description: "A test action"
type: "builtin"
flow:
next: null
outputs: {}
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let parser = YamlConfigParser::new(config);
let nodes = parser.parse().unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, "task1");
assert_eq!(nodes[0].name, "Test Task");
assert_eq!(nodes[0].action_spec.action_type, "builtin");
}
#[test]
fn test_parse_full_result() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
TEST_ENV: "test"
vars:
name: "Test Workflow"
tasks:
- task:
id: "task1"
name: "Test Task"
description: "A test task"
actions:
- action:
id: "action1"
name: "Test Action"
description: "A test action"
type: "builtin"
flow:
next: null
outputs: {}
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let parser = YamlConfigParser::new(config);
let result = parser.parse_full().unwrap();
assert_eq!(result.nodes.len(), 1);
assert_eq!(result.workflow_name, "Test Workflow");
assert_eq!(result.workflow_version, "1.0");
assert!(result.env_vars.contains_key("TEST_ENV"));
assert!(result.flow_vars.contains_key("name"));
}
}