1use crate::types::{Step, Workflow};
4use anyhow::{Context, Result};
5use chrono::Utc;
6use std::collections::HashMap;
7
8#[derive(Debug, Clone)]
10pub struct ComposedWorkflow {
11 pub workflow: Workflow,
13 pub child_ids: Vec<String>,
15}
16
17#[derive(Debug, serde::Deserialize)]
19pub struct ComposableWorkflowYaml {
20 pub id: String,
21 pub name: String,
22 #[serde(default)]
23 pub description: String,
24 #[serde(default)]
25 pub variables: HashMap<String, String>,
26 #[serde(default)]
27 pub steps: Vec<super::parser::StepYaml>,
28 #[serde(default)]
30 pub includes: Vec<String>,
31}
32
33pub fn compose_workflows(
39 base_id: &str,
40 base_name: &str,
41 workflows: &[Workflow],
42) -> Result<ComposedWorkflow> {
43 anyhow::ensure!(!workflows.is_empty(), "At least one workflow is required");
44
45 let mut merged_steps: Vec<Step> = Vec::new();
46 let mut merged_variables: HashMap<String, String> = HashMap::new();
47 let mut child_ids: Vec<String> = Vec::new();
48 let mut seen_step_names: HashMap<String, usize> = HashMap::new();
49
50 for wf in workflows {
51 child_ids.push(wf.id.clone());
52
53 for (key, value) in &wf.variables {
55 if merged_variables.contains_key(key) && merged_variables[key] != *value {
56 let prefixed_key = format!("{}.{}", wf.id, key);
58 merged_variables.insert(prefixed_key, value.clone());
59 } else {
60 merged_variables.insert(key.clone(), value.clone());
61 }
62 }
63
64 for step in &wf.steps {
66 let name = if let Some(count) = seen_step_names.get(&step.name) {
67 format!("{}.{}", step.name, count + 1)
68 } else {
69 step.name.clone()
70 };
71 *seen_step_names.entry(step.name.clone()).or_insert(0) += 1;
72
73 merged_steps.push(Step {
74 name,
75 step_type: step.step_type.clone(),
76 action: step.action.clone(),
77 on_failure: step.on_failure.clone(),
78 breakpoint: step.breakpoint,
79 breakpoint_message: step.breakpoint_message.clone(),
80 });
81 }
82 }
83
84 let now = Utc::now();
85 Ok(ComposedWorkflow {
86 workflow: Workflow {
87 id: base_id.to_string(),
88 name: base_name.to_string(),
89 description: format!(
90 "Composed workflow from: {}",
91 child_ids.join(", ")
92 ),
93 steps: merged_steps,
94 variables: merged_variables,
95 schedule: None,
96 created_at: now,
97 updated_at: now,
98 },
99 child_ids,
100 })
101}
102
103pub fn resolve_includes(yaml: &str) -> Result<Workflow> {
107 let raw: ComposableWorkflowYaml =
108 serde_yaml::from_str(yaml).context("Parsing composable workflow YAML")?;
109
110 if raw.includes.is_empty() {
111 return super::parser::parse_workflow_str(yaml);
113 }
114
115 let all_workflows = super::parser::load_all_workflows()?;
117 let workflow_map: HashMap<&str, &Workflow> = all_workflows
118 .iter()
119 .map(|w| (w.id.as_str(), w))
120 .collect();
121
122 let mut child_workflows: Vec<Workflow> = Vec::new();
124 for include_id in &raw.includes {
125 let child = workflow_map
126 .get(include_id.as_str())
127 .with_context(|| format!("Included workflow '{}' not found", include_id))?;
128 child_workflows.push((*child).clone());
129 }
130
131 let now = Utc::now();
133 let base = Workflow {
134 id: raw.id.clone(),
135 name: raw.name.clone(),
136 description: raw.description,
137 steps: raw
138 .steps
139 .into_iter()
140 .map(super::parser::step_yaml_to_step)
141 .collect(),
142 variables: raw.variables,
143 schedule: None,
144 created_at: now,
145 updated_at: now,
146 };
147
148 let mut all = vec![base];
150 all.extend(child_workflows);
151
152 let composed = compose_workflows(&raw.id, &raw.name, &all)?;
153 Ok(composed.workflow)
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::types::{FailureAction, StepType};
160
161 fn make_workflow(id: &str, steps: Vec<(&str, &str)>, vars: Vec<(&str, &str)>) -> Workflow {
162 let now = Utc::now();
163 Workflow {
164 id: id.into(),
165 name: format!("Workflow {}", id),
166 description: String::new(),
167 steps: steps
168 .into_iter()
169 .map(|(name, action)| Step {
170 name: name.into(),
171 step_type: StepType::Execute,
172 action: action.into(),
173 on_failure: FailureAction::Abort,
174 breakpoint: false,
175 breakpoint_message: None,
176 })
177 .collect(),
178 variables: vars
179 .into_iter()
180 .map(|(k, v)| (k.into(), v.into()))
181 .collect(),
182 schedule: None,
183 created_at: now,
184 updated_at: now,
185 }
186 }
187
188 #[test]
189 fn test_compose_merges_steps() {
190 let wf1 = make_workflow("a", vec![("step1", "echo a")], vec![]);
191 let wf2 = make_workflow("b", vec![("step2", "echo b")], vec![]);
192
193 let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
194 assert_eq!(composed.workflow.steps.len(), 2);
195 assert_eq!(composed.workflow.steps[0].name, "step1");
196 assert_eq!(composed.workflow.steps[1].name, "step2");
197 assert_eq!(composed.child_ids, vec!["a", "b"]);
198 }
199
200 #[test]
201 fn test_compose_deduplicates_step_names() {
202 let wf1 = make_workflow("a", vec![("deploy", "echo a")], vec![]);
203 let wf2 = make_workflow("b", vec![("deploy", "echo b")], vec![]);
204
205 let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
206 assert_eq!(composed.workflow.steps.len(), 2);
207 assert_eq!(composed.workflow.steps[0].name, "deploy");
208 assert_eq!(composed.workflow.steps[1].name, "deploy.2");
209 }
210
211 #[test]
212 fn test_compose_merges_variables_no_conflict() {
213 let wf1 = make_workflow("a", vec![], vec![("host", "localhost")]);
214 let wf2 = make_workflow("b", vec![], vec![("port", "8080")]);
215
216 let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
217 assert_eq!(composed.workflow.variables.get("host").unwrap(), "localhost");
218 assert_eq!(composed.workflow.variables.get("port").unwrap(), "8080");
219 }
220
221 #[test]
222 fn test_compose_handles_variable_conflicts() {
223 let wf1 = make_workflow("a", vec![], vec![("host", "localhost")]);
224 let wf2 = make_workflow("b", vec![], vec![("host", "production.example.com")]);
225
226 let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
227 assert_eq!(composed.workflow.variables.get("host").unwrap(), "localhost");
228 assert_eq!(
229 composed.workflow.variables.get("b.host").unwrap(),
230 "production.example.com"
231 );
232 }
233
234 #[test]
235 fn test_compose_empty_fails() {
236 let result = compose_workflows("empty", "Empty", &[]);
237 assert!(result.is_err());
238 }
239}