Skip to main content

mur_core/workflow/
compose.rs

1//! Workflow composition — merge sub-workflows into a single composed workflow.
2
3use crate::types::{Step, Workflow};
4use anyhow::{Context, Result};
5use chrono::Utc;
6use std::collections::HashMap;
7
8/// A composed workflow that references child workflow IDs.
9#[derive(Debug, Clone)]
10pub struct ComposedWorkflow {
11    /// The resulting merged workflow.
12    pub workflow: Workflow,
13    /// IDs of child workflows that were composed.
14    pub child_ids: Vec<String>,
15}
16
17/// Raw YAML structure supporting `includes` directive.
18#[derive(Debug, serde::Deserialize)]
19pub struct ComposableWorkflowYaml {
20    pub id: String,
21    pub name: String,
22    #[serde(default)]
23    pub description: String,
24    #[serde(default)]
25    pub variables: HashMap<String, String>,
26    #[serde(default)]
27    pub steps: Vec<super::parser::StepYaml>,
28    /// List of sub-workflow IDs to include.
29    #[serde(default)]
30    pub includes: Vec<String>,
31}
32
33/// Compose multiple workflows into a single workflow by merging steps and variables.
34///
35/// Steps are concatenated in order; variables from later workflows override earlier ones
36/// unless there's a naming conflict, in which case the child workflow's variable is prefixed
37/// with its workflow ID.
38pub fn compose_workflows(
39    base_id: &str,
40    base_name: &str,
41    workflows: &[Workflow],
42) -> Result<ComposedWorkflow> {
43    anyhow::ensure!(!workflows.is_empty(), "At least one workflow is required");
44
45    let mut merged_steps: Vec<Step> = Vec::new();
46    let mut merged_variables: HashMap<String, String> = HashMap::new();
47    let mut child_ids: Vec<String> = Vec::new();
48    let mut seen_step_names: HashMap<String, usize> = HashMap::new();
49
50    for wf in workflows {
51        child_ids.push(wf.id.clone());
52
53        // Merge variables, prefixing on conflict
54        for (key, value) in &wf.variables {
55            if merged_variables.contains_key(key) && merged_variables[key] != *value {
56                // Naming conflict: prefix with workflow ID
57                let prefixed_key = format!("{}.{}", wf.id, key);
58                merged_variables.insert(prefixed_key, value.clone());
59            } else {
60                merged_variables.insert(key.clone(), value.clone());
61            }
62        }
63
64        // Merge steps, deduplicating names
65        for step in &wf.steps {
66            let name = if let Some(count) = seen_step_names.get(&step.name) {
67                format!("{}.{}", step.name, count + 1)
68            } else {
69                step.name.clone()
70            };
71            *seen_step_names.entry(step.name.clone()).or_insert(0) += 1;
72
73            merged_steps.push(Step {
74                name,
75                step_type: step.step_type.clone(),
76                action: step.action.clone(),
77                on_failure: step.on_failure.clone(),
78                breakpoint: step.breakpoint,
79                breakpoint_message: step.breakpoint_message.clone(),
80            });
81        }
82    }
83
84    let now = Utc::now();
85    Ok(ComposedWorkflow {
86        workflow: Workflow {
87            id: base_id.to_string(),
88            name: base_name.to_string(),
89            description: format!(
90                "Composed workflow from: {}",
91                child_ids.join(", ")
92            ),
93            steps: merged_steps,
94            variables: merged_variables,
95            schedule: None,
96            created_at: now,
97            updated_at: now,
98        },
99        child_ids,
100    })
101}
102
103/// Resolve `includes` directives in a composable workflow YAML string.
104///
105/// Loads referenced sub-workflows from the workflows directory and merges them.
106pub fn resolve_includes(yaml: &str) -> Result<Workflow> {
107    let raw: ComposableWorkflowYaml =
108        serde_yaml::from_str(yaml).context("Parsing composable workflow YAML")?;
109
110    if raw.includes.is_empty() {
111        // No includes — parse as a regular workflow
112        return super::parser::parse_workflow_str(yaml);
113    }
114
115    // Load all available workflows to resolve includes
116    let all_workflows = super::parser::load_all_workflows()?;
117    let workflow_map: HashMap<&str, &Workflow> = all_workflows
118        .iter()
119        .map(|w| (w.id.as_str(), w))
120        .collect();
121
122    // Collect included workflows in order
123    let mut child_workflows: Vec<Workflow> = Vec::new();
124    for include_id in &raw.includes {
125        let child = workflow_map
126            .get(include_id.as_str())
127            .with_context(|| format!("Included workflow '{}' not found", include_id))?;
128        child_workflows.push((*child).clone());
129    }
130
131    // Build the base workflow from inline steps (if any)
132    let now = Utc::now();
133    let base = Workflow {
134        id: raw.id.clone(),
135        name: raw.name.clone(),
136        description: raw.description,
137        steps: raw
138            .steps
139            .into_iter()
140            .map(super::parser::step_yaml_to_step)
141            .collect(),
142        variables: raw.variables,
143        schedule: None,
144        created_at: now,
145        updated_at: now,
146    };
147
148    // Insert base as the first workflow, then append children
149    let mut all = vec![base];
150    all.extend(child_workflows);
151
152    let composed = compose_workflows(&raw.id, &raw.name, &all)?;
153    Ok(composed.workflow)
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::types::{FailureAction, StepType};
160
161    fn make_workflow(id: &str, steps: Vec<(&str, &str)>, vars: Vec<(&str, &str)>) -> Workflow {
162        let now = Utc::now();
163        Workflow {
164            id: id.into(),
165            name: format!("Workflow {}", id),
166            description: String::new(),
167            steps: steps
168                .into_iter()
169                .map(|(name, action)| Step {
170                    name: name.into(),
171                    step_type: StepType::Execute,
172                    action: action.into(),
173                    on_failure: FailureAction::Abort,
174                    breakpoint: false,
175                    breakpoint_message: None,
176                })
177                .collect(),
178            variables: vars
179                .into_iter()
180                .map(|(k, v)| (k.into(), v.into()))
181                .collect(),
182            schedule: None,
183            created_at: now,
184            updated_at: now,
185        }
186    }
187
188    #[test]
189    fn test_compose_merges_steps() {
190        let wf1 = make_workflow("a", vec![("step1", "echo a")], vec![]);
191        let wf2 = make_workflow("b", vec![("step2", "echo b")], vec![]);
192
193        let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
194        assert_eq!(composed.workflow.steps.len(), 2);
195        assert_eq!(composed.workflow.steps[0].name, "step1");
196        assert_eq!(composed.workflow.steps[1].name, "step2");
197        assert_eq!(composed.child_ids, vec!["a", "b"]);
198    }
199
200    #[test]
201    fn test_compose_deduplicates_step_names() {
202        let wf1 = make_workflow("a", vec![("deploy", "echo a")], vec![]);
203        let wf2 = make_workflow("b", vec![("deploy", "echo b")], vec![]);
204
205        let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
206        assert_eq!(composed.workflow.steps.len(), 2);
207        assert_eq!(composed.workflow.steps[0].name, "deploy");
208        assert_eq!(composed.workflow.steps[1].name, "deploy.2");
209    }
210
211    #[test]
212    fn test_compose_merges_variables_no_conflict() {
213        let wf1 = make_workflow("a", vec![], vec![("host", "localhost")]);
214        let wf2 = make_workflow("b", vec![], vec![("port", "8080")]);
215
216        let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
217        assert_eq!(composed.workflow.variables.get("host").unwrap(), "localhost");
218        assert_eq!(composed.workflow.variables.get("port").unwrap(), "8080");
219    }
220
221    #[test]
222    fn test_compose_handles_variable_conflicts() {
223        let wf1 = make_workflow("a", vec![], vec![("host", "localhost")]);
224        let wf2 = make_workflow("b", vec![], vec![("host", "production.example.com")]);
225
226        let composed = compose_workflows("composed", "Composed", &[wf1, wf2]).unwrap();
227        assert_eq!(composed.workflow.variables.get("host").unwrap(), "localhost");
228        assert_eq!(
229            composed.workflow.variables.get("b.host").unwrap(),
230            "production.example.com"
231        );
232    }
233
234    #[test]
235    fn test_compose_empty_fails() {
236        let result = compose_workflows("empty", "Empty", &[]);
237        assert!(result.is_err());
238    }
239}