Skip to main content

erio_workflow/
builder.rs

1//! Fluent builder for constructing workflows.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::WorkflowError;
7use crate::dag::Dag;
8use crate::step::Step;
9
10/// A validated workflow ready for execution.
11///
12/// Contains steps arranged in a DAG with dependency relationships.
13pub struct Workflow {
14    steps: HashMap<String, Arc<dyn Step>>,
15    dag: Dag,
16}
17
18impl Workflow {
19    /// Creates a new builder for constructing a workflow.
20    pub fn builder() -> WorkflowBuilder {
21        WorkflowBuilder::new()
22    }
23
24    /// Returns the number of steps.
25    pub fn step_count(&self) -> usize {
26        self.steps.len()
27    }
28
29    /// Returns the IDs of all steps in topological order.
30    pub fn step_ids(&self) -> Vec<&str> {
31        // Unwrap is safe: DAG was validated at build time
32        self.dag.topological_order().expect("DAG was validated")
33    }
34
35    /// Returns a step by its ID.
36    pub fn step(&self, id: &str) -> Option<Arc<dyn Step>> {
37        self.steps.get(id).cloned()
38    }
39
40    /// Returns groups of steps that can run in parallel.
41    pub fn parallel_groups(&self) -> Result<Vec<Vec<&str>>, WorkflowError> {
42        self.dag.parallel_groups()
43    }
44
45    /// Returns a reference to the internal DAG.
46    pub fn dag(&self) -> &Dag {
47        &self.dag
48    }
49}
50
51/// Builder for constructing a `Workflow` with validated dependencies.
52pub struct WorkflowBuilder {
53    steps: Vec<(Box<dyn Step>, Vec<String>)>,
54}
55
56impl WorkflowBuilder {
57    fn new() -> Self {
58        Self { steps: Vec::new() }
59    }
60
61    /// Adds a step with its dependency IDs.
62    #[must_use]
63    pub fn step(mut self, step: impl Step + 'static, deps: &[&str]) -> Self {
64        let dep_list = deps.iter().map(|d| (*d).into()).collect();
65        self.steps.push((Box::new(step), dep_list));
66        self
67    }
68
69    /// Validates the DAG and builds the workflow.
70    pub fn build(self) -> Result<Workflow, WorkflowError> {
71        if self.steps.is_empty() {
72            return Err(WorkflowError::EmptyWorkflow);
73        }
74
75        let mut dag = Dag::new();
76        let mut step_map: HashMap<String, Arc<dyn Step>> = HashMap::new();
77
78        for (step, deps) in self.steps {
79            let id = step.id().to_string();
80            let dep_refs: Vec<&str> = deps.iter().map(String::as_str).collect();
81            dag.add_node(&id, &dep_refs)?;
82            step_map.insert(id, Arc::from(step));
83        }
84
85        // Validate DAG is acyclic
86        dag.topological_order()?;
87
88        Ok(Workflow {
89            steps: step_map,
90            dag,
91        })
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use crate::WorkflowError;
99    use crate::context::WorkflowContext;
100    use crate::step::{Step, StepOutput};
101
102    // === Mock Step ===
103
104    struct MockStep {
105        step_id: String,
106        output: String,
107    }
108
109    impl MockStep {
110        fn new(id: &str, output: &str) -> Self {
111            Self {
112                step_id: id.into(),
113                output: output.into(),
114            }
115        }
116    }
117
118    #[async_trait::async_trait]
119    impl Step for MockStep {
120        fn id(&self) -> &str {
121            &self.step_id
122        }
123
124        async fn execute(&self, _ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
125            Ok(StepOutput::new(&self.output))
126        }
127    }
128
129    // === Builder Tests ===
130
131    #[test]
132    fn builds_workflow_with_single_step() {
133        let workflow = Workflow::builder()
134            .step(MockStep::new("a", "result"), &[])
135            .build()
136            .unwrap();
137
138        assert_eq!(workflow.step_count(), 1);
139    }
140
141    #[test]
142    fn builds_workflow_with_dependencies() {
143        let workflow = Workflow::builder()
144            .step(MockStep::new("a", "A"), &[])
145            .step(MockStep::new("b", "B"), &["a"])
146            .step(MockStep::new("c", "C"), &["a", "b"])
147            .build()
148            .unwrap();
149
150        assert_eq!(workflow.step_count(), 3);
151    }
152
153    #[test]
154    fn build_rejects_empty_workflow() {
155        let result = Workflow::builder().build();
156        assert!(matches!(result, Err(WorkflowError::EmptyWorkflow)));
157    }
158
159    #[test]
160    fn build_rejects_duplicate_step_ids() {
161        let result = Workflow::builder()
162            .step(MockStep::new("a", "1"), &[])
163            .step(MockStep::new("a", "2"), &[])
164            .build();
165
166        assert!(matches!(
167            result,
168            Err(WorkflowError::DuplicateStep { step_id }) if step_id == "a"
169        ));
170    }
171
172    #[test]
173    fn build_rejects_missing_dependency() {
174        let result = Workflow::builder()
175            .step(MockStep::new("b", "B"), &["unknown"])
176            .build();
177
178        assert!(matches!(
179            result,
180            Err(WorkflowError::MissingDependency { .. })
181        ));
182    }
183
184    #[test]
185    fn workflow_returns_step_ids() {
186        let workflow = Workflow::builder()
187            .step(MockStep::new("x", "X"), &[])
188            .step(MockStep::new("y", "Y"), &["x"])
189            .build()
190            .unwrap();
191
192        let ids = workflow.step_ids();
193        assert_eq!(ids, vec!["x", "y"]);
194    }
195
196    #[test]
197    fn workflow_returns_parallel_groups() {
198        let workflow = Workflow::builder()
199            .step(MockStep::new("a", "A"), &[])
200            .step(MockStep::new("b", "B"), &[])
201            .step(MockStep::new("c", "C"), &["a", "b"])
202            .build()
203            .unwrap();
204
205        let groups = workflow.parallel_groups().unwrap();
206        assert_eq!(groups.len(), 2);
207        let mut first = groups[0].clone();
208        first.sort_unstable();
209        assert_eq!(first, vec!["a", "b"]);
210        assert_eq!(groups[1], vec!["c"]);
211    }
212
213    #[test]
214    fn workflow_gets_step_by_id() {
215        let workflow = Workflow::builder()
216            .step(MockStep::new("a", "A"), &[])
217            .build()
218            .unwrap();
219
220        assert!(workflow.step("a").is_some());
221        assert!(workflow.step("unknown").is_none());
222    }
223}