flowbuilder_yaml/
loader.rs

1use crate::config::WorkflowConfig;
2use anyhow::{Context, Result};
3use std::fs;
4use std::path::Path;
5
6/// 工作流加载器,支持从文件或字符串加载配置
7pub struct WorkflowLoader;
8
9impl WorkflowLoader {
10    /// 从 YAML 文件加载工作流配置
11    pub fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<WorkflowConfig> {
12        let content = fs::read_to_string(&path).with_context(|| {
13            format!("Failed to read YAML file: {:?}", path.as_ref())
14        })?;
15        Self::from_yaml_str(&content)
16    }
17
18    /// 从 YAML 字符串加载工作流配置
19    pub fn from_yaml_str(content: &str) -> Result<WorkflowConfig> {
20        serde_yaml::from_str(content)
21            .with_context(|| "Failed to parse YAML content")
22    }
23
24    /// 从 JSON 文件加载工作流配置
25    pub fn from_json_file<P: AsRef<Path>>(path: P) -> Result<WorkflowConfig> {
26        let content = fs::read_to_string(&path).with_context(|| {
27            format!("Failed to read JSON file: {:?}", path.as_ref())
28        })?;
29        Self::from_json_str(&content)
30    }
31
32    /// 从 JSON 字符串加载工作流配置
33    pub fn from_json_str(content: &str) -> Result<WorkflowConfig> {
34        serde_json::from_str(content)
35            .with_context(|| "Failed to parse JSON content")
36    }
37
38    /// 保存工作流配置到 YAML 文件
39    pub fn save_to_yaml<P: AsRef<Path>>(
40        config: &WorkflowConfig,
41        path: P,
42    ) -> Result<()> {
43        let yaml_content = serde_yaml::to_string(config)
44            .with_context(|| "Failed to serialize config to YAML")?;
45
46        fs::write(&path, yaml_content).with_context(|| {
47            format!("Failed to write YAML file: {:?}", path.as_ref())
48        })?;
49
50        Ok(())
51    }
52
53    /// 保存工作流配置到 JSON 文件
54    pub fn save_to_json<P: AsRef<Path>>(
55        config: &WorkflowConfig,
56        path: P,
57    ) -> Result<()> {
58        let json_content = serde_json::to_string_pretty(config)
59            .with_context(|| "Failed to serialize config to JSON")?;
60
61        fs::write(&path, json_content).with_context(|| {
62            format!("Failed to write JSON file: {:?}", path.as_ref())
63        })?;
64
65        Ok(())
66    }
67
68    /// 验证工作流配置的基本有效性
69    pub fn validate(config: &WorkflowConfig) -> Result<()> {
70        let workflow = &config.workflow;
71
72        // 检查版本格式
73        if workflow.version.is_empty() {
74            return Err(anyhow::anyhow!("Workflow version cannot be empty"));
75        }
76
77        // 检查任务定义
78        if workflow.tasks.is_empty() {
79            return Err(anyhow::anyhow!(
80                "Workflow must contain at least one task"
81            ));
82        }
83
84        // 检查任务 ID 的唯一性
85        let mut task_ids = std::collections::HashSet::new();
86        for task in &workflow.tasks {
87            if !task_ids.insert(&task.task.id) {
88                return Err(anyhow::anyhow!(
89                    "Duplicate task ID: {}",
90                    task.task.id
91                ));
92            }
93        }
94
95        // 检查动作 ID 的唯一性
96        let mut action_ids = std::collections::HashSet::new();
97        for task in &workflow.tasks {
98            for action in &task.task.actions {
99                let full_action_id =
100                    format!("{}.{}", task.task.id, action.action.id);
101                if !action_ids.insert(full_action_id.clone()) {
102                    return Err(anyhow::anyhow!(
103                        "Duplicate action ID: {}",
104                        full_action_id
105                    ));
106                }
107            }
108        }
109
110        Ok(())
111    }
112
113    /// 从配置创建带有 runtime 功能的执行器
114    pub fn create_runtime_executor(
115        config: WorkflowConfig,
116    ) -> Result<crate::executor::DynamicFlowExecutor> {
117        use crate::executor::DynamicFlowExecutor;
118        DynamicFlowExecutor::new(config)
119    }
120
121    /// 快速执行工作流文件(使用 runtime 功能)
122    pub async fn execute_workflow_file<P: AsRef<Path>>(path: P) -> Result<()> {
123        let config = Self::from_yaml_file(path)?;
124        Self::validate(&config)?;
125
126        let mut executor = Self::create_runtime_executor(config)?;
127        let context = std::sync::Arc::new(tokio::sync::Mutex::new(
128            flowbuilder_context::FlowContext::default(),
129        ));
130
131        // 使用统一的执行接口
132        executor.execute(context).await?;
133
134        Ok(())
135    }
136
137    /// 批量执行多个工作流文件(简化版本)
138    pub async fn execute_workflow_batch<P: AsRef<Path>>(
139        paths: Vec<P>,
140        max_concurrent: usize,
141    ) -> Result<Vec<Result<()>>> {
142        let semaphore =
143            std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent));
144        let mut results = Vec::new();
145
146        for path in paths {
147            let _permit = semaphore.acquire().await.map_err(|e| {
148                anyhow::anyhow!("Failed to acquire semaphore: {}", e)
149            })?;
150            let result = Self::execute_workflow_file(path).await;
151            results.push(result);
152        }
153
154        Ok(results)
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn test_load_from_yaml_str() {
164        let yaml_content = r#"
165workflow:
166  version: "1.0"
167  env:
168    FLOWBUILDER_ENV: "test"
169  vars:
170    name: "Test Workflow"
171  tasks:
172    - task:
173        id: "task1"
174        name: "Test Task"
175        description: "A test task"
176        actions: []
177"#;
178
179        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
180        assert_eq!(config.workflow.version, "1.0");
181        assert_eq!(
182            config.workflow.env.get("FLOWBUILDER_ENV"),
183            Some(&"test".to_string())
184        );
185        assert_eq!(config.workflow.tasks.len(), 1);
186        assert_eq!(config.workflow.tasks[0].task.id, "task1");
187    }
188
189    #[test]
190    fn test_validate_config() {
191        let yaml_content = r#"
192workflow:
193  version: "1.0"
194  tasks:
195    - task:
196        id: "task1"
197        name: "Test Task"
198        description: "A test task"
199        actions: []
200"#;
201
202        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
203        assert!(WorkflowLoader::validate(&config).is_ok());
204    }
205
206    #[test]
207    fn test_validate_duplicate_task_ids() {
208        let yaml_content = r#"
209workflow:
210  version: "1.0"
211  tasks:
212    - task:
213        id: "task1"
214        name: "Test Task 1"
215        description: "A test task"
216        actions: []
217    - task:
218        id: "task1"
219        name: "Test Task 2"
220        description: "Another test task"
221        actions: []
222"#;
223
224        let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
225        assert!(WorkflowLoader::validate(&config).is_err());
226    }
227}