flowbuilder_yaml/
loader.rs1use crate::config::WorkflowConfig;
2use anyhow::{Context, Result};
3use std::fs;
4use std::path::Path;
5
6pub struct WorkflowLoader;
8
9impl WorkflowLoader {
10 pub fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<WorkflowConfig> {
12 let content = fs::read_to_string(&path).with_context(|| {
13 format!("Failed to read YAML file: {:?}", path.as_ref())
14 })?;
15 Self::from_yaml_str(&content)
16 }
17
18 pub fn from_yaml_str(content: &str) -> Result<WorkflowConfig> {
20 serde_yaml::from_str(content)
21 .with_context(|| "Failed to parse YAML content")
22 }
23
24 pub fn from_json_file<P: AsRef<Path>>(path: P) -> Result<WorkflowConfig> {
26 let content = fs::read_to_string(&path).with_context(|| {
27 format!("Failed to read JSON file: {:?}", path.as_ref())
28 })?;
29 Self::from_json_str(&content)
30 }
31
32 pub fn from_json_str(content: &str) -> Result<WorkflowConfig> {
34 serde_json::from_str(content)
35 .with_context(|| "Failed to parse JSON content")
36 }
37
38 pub fn save_to_yaml<P: AsRef<Path>>(
40 config: &WorkflowConfig,
41 path: P,
42 ) -> Result<()> {
43 let yaml_content = serde_yaml::to_string(config)
44 .with_context(|| "Failed to serialize config to YAML")?;
45
46 fs::write(&path, yaml_content).with_context(|| {
47 format!("Failed to write YAML file: {:?}", path.as_ref())
48 })?;
49
50 Ok(())
51 }
52
53 pub fn save_to_json<P: AsRef<Path>>(
55 config: &WorkflowConfig,
56 path: P,
57 ) -> Result<()> {
58 let json_content = serde_json::to_string_pretty(config)
59 .with_context(|| "Failed to serialize config to JSON")?;
60
61 fs::write(&path, json_content).with_context(|| {
62 format!("Failed to write JSON file: {:?}", path.as_ref())
63 })?;
64
65 Ok(())
66 }
67
68 pub fn validate(config: &WorkflowConfig) -> Result<()> {
70 let workflow = &config.workflow;
71
72 if workflow.version.is_empty() {
74 return Err(anyhow::anyhow!("Workflow version cannot be empty"));
75 }
76
77 if workflow.tasks.is_empty() {
79 return Err(anyhow::anyhow!(
80 "Workflow must contain at least one task"
81 ));
82 }
83
84 let mut task_ids = std::collections::HashSet::new();
86 for task in &workflow.tasks {
87 if !task_ids.insert(&task.task.id) {
88 return Err(anyhow::anyhow!(
89 "Duplicate task ID: {}",
90 task.task.id
91 ));
92 }
93 }
94
95 let mut action_ids = std::collections::HashSet::new();
97 for task in &workflow.tasks {
98 for action in &task.task.actions {
99 let full_action_id =
100 format!("{}.{}", task.task.id, action.action.id);
101 if !action_ids.insert(full_action_id.clone()) {
102 return Err(anyhow::anyhow!(
103 "Duplicate action ID: {}",
104 full_action_id
105 ));
106 }
107 }
108 }
109
110 Ok(())
111 }
112
113 pub fn create_runtime_executor(
115 config: WorkflowConfig,
116 ) -> Result<crate::executor::DynamicFlowExecutor> {
117 use crate::executor::DynamicFlowExecutor;
118 DynamicFlowExecutor::new(config)
119 }
120
121 pub async fn execute_workflow_file<P: AsRef<Path>>(path: P) -> Result<()> {
123 let config = Self::from_yaml_file(path)?;
124 Self::validate(&config)?;
125
126 let mut executor = Self::create_runtime_executor(config)?;
127 let context = std::sync::Arc::new(tokio::sync::Mutex::new(
128 flowbuilder_context::FlowContext::default(),
129 ));
130
131 executor.execute(context).await?;
133
134 Ok(())
135 }
136
137 pub async fn execute_workflow_batch<P: AsRef<Path>>(
139 paths: Vec<P>,
140 max_concurrent: usize,
141 ) -> Result<Vec<Result<()>>> {
142 let semaphore =
143 std::sync::Arc::new(tokio::sync::Semaphore::new(max_concurrent));
144 let mut results = Vec::new();
145
146 for path in paths {
147 let _permit = semaphore.acquire().await.map_err(|e| {
148 anyhow::anyhow!("Failed to acquire semaphore: {}", e)
149 })?;
150 let result = Self::execute_workflow_file(path).await;
151 results.push(result);
152 }
153
154 Ok(results)
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_load_from_yaml_str() {
164 let yaml_content = r#"
165workflow:
166 version: "1.0"
167 env:
168 FLOWBUILDER_ENV: "test"
169 vars:
170 name: "Test Workflow"
171 tasks:
172 - task:
173 id: "task1"
174 name: "Test Task"
175 description: "A test task"
176 actions: []
177"#;
178
179 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
180 assert_eq!(config.workflow.version, "1.0");
181 assert_eq!(
182 config.workflow.env.get("FLOWBUILDER_ENV"),
183 Some(&"test".to_string())
184 );
185 assert_eq!(config.workflow.tasks.len(), 1);
186 assert_eq!(config.workflow.tasks[0].task.id, "task1");
187 }
188
189 #[test]
190 fn test_validate_config() {
191 let yaml_content = r#"
192workflow:
193 version: "1.0"
194 tasks:
195 - task:
196 id: "task1"
197 name: "Test Task"
198 description: "A test task"
199 actions: []
200"#;
201
202 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
203 assert!(WorkflowLoader::validate(&config).is_ok());
204 }
205
206 #[test]
207 fn test_validate_duplicate_task_ids() {
208 let yaml_content = r#"
209workflow:
210 version: "1.0"
211 tasks:
212 - task:
213 id: "task1"
214 name: "Test Task 1"
215 description: "A test task"
216 actions: []
217 - task:
218 id: "task1"
219 name: "Test Task 2"
220 description: "Another test task"
221 actions: []
222"#;
223
224 let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
225 assert!(WorkflowLoader::validate(&config).is_err());
226 }
227}