1use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::WorkflowError;
7use crate::dag::Dag;
8use crate::step::Step;
9
10pub struct Workflow {
14 steps: HashMap<String, Arc<dyn Step>>,
15 dag: Dag,
16}
17
18impl Workflow {
19 pub fn builder() -> WorkflowBuilder {
21 WorkflowBuilder::new()
22 }
23
24 pub fn step_count(&self) -> usize {
26 self.steps.len()
27 }
28
29 pub fn step_ids(&self) -> Vec<&str> {
31 self.dag.topological_order().expect("DAG was validated")
33 }
34
35 pub fn step(&self, id: &str) -> Option<Arc<dyn Step>> {
37 self.steps.get(id).cloned()
38 }
39
40 pub fn parallel_groups(&self) -> Result<Vec<Vec<&str>>, WorkflowError> {
42 self.dag.parallel_groups()
43 }
44
45 pub fn dag(&self) -> &Dag {
47 &self.dag
48 }
49}
50
51pub struct WorkflowBuilder {
53 steps: Vec<(Box<dyn Step>, Vec<String>)>,
54}
55
56impl WorkflowBuilder {
57 fn new() -> Self {
58 Self { steps: Vec::new() }
59 }
60
61 #[must_use]
63 pub fn step(mut self, step: impl Step + 'static, deps: &[&str]) -> Self {
64 let dep_list = deps.iter().map(|d| (*d).into()).collect();
65 self.steps.push((Box::new(step), dep_list));
66 self
67 }
68
69 pub fn build(self) -> Result<Workflow, WorkflowError> {
71 if self.steps.is_empty() {
72 return Err(WorkflowError::EmptyWorkflow);
73 }
74
75 let mut dag = Dag::new();
76 let mut step_map: HashMap<String, Arc<dyn Step>> = HashMap::new();
77
78 for (step, deps) in self.steps {
79 let id = step.id().to_string();
80 let dep_refs: Vec<&str> = deps.iter().map(String::as_str).collect();
81 dag.add_node(&id, &dep_refs)?;
82 step_map.insert(id, Arc::from(step));
83 }
84
85 dag.topological_order()?;
87
88 Ok(Workflow {
89 steps: step_map,
90 dag,
91 })
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use crate::WorkflowError;
99 use crate::context::WorkflowContext;
100 use crate::step::{Step, StepOutput};
101
102 struct MockStep {
105 step_id: String,
106 output: String,
107 }
108
109 impl MockStep {
110 fn new(id: &str, output: &str) -> Self {
111 Self {
112 step_id: id.into(),
113 output: output.into(),
114 }
115 }
116 }
117
118 #[async_trait::async_trait]
119 impl Step for MockStep {
120 fn id(&self) -> &str {
121 &self.step_id
122 }
123
124 async fn execute(&self, _ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
125 Ok(StepOutput::new(&self.output))
126 }
127 }
128
129 #[test]
132 fn builds_workflow_with_single_step() {
133 let workflow = Workflow::builder()
134 .step(MockStep::new("a", "result"), &[])
135 .build()
136 .unwrap();
137
138 assert_eq!(workflow.step_count(), 1);
139 }
140
141 #[test]
142 fn builds_workflow_with_dependencies() {
143 let workflow = Workflow::builder()
144 .step(MockStep::new("a", "A"), &[])
145 .step(MockStep::new("b", "B"), &["a"])
146 .step(MockStep::new("c", "C"), &["a", "b"])
147 .build()
148 .unwrap();
149
150 assert_eq!(workflow.step_count(), 3);
151 }
152
153 #[test]
154 fn build_rejects_empty_workflow() {
155 let result = Workflow::builder().build();
156 assert!(matches!(result, Err(WorkflowError::EmptyWorkflow)));
157 }
158
159 #[test]
160 fn build_rejects_duplicate_step_ids() {
161 let result = Workflow::builder()
162 .step(MockStep::new("a", "1"), &[])
163 .step(MockStep::new("a", "2"), &[])
164 .build();
165
166 assert!(matches!(
167 result,
168 Err(WorkflowError::DuplicateStep { step_id }) if step_id == "a"
169 ));
170 }
171
172 #[test]
173 fn build_rejects_missing_dependency() {
174 let result = Workflow::builder()
175 .step(MockStep::new("b", "B"), &["unknown"])
176 .build();
177
178 assert!(matches!(
179 result,
180 Err(WorkflowError::MissingDependency { .. })
181 ));
182 }
183
184 #[test]
185 fn workflow_returns_step_ids() {
186 let workflow = Workflow::builder()
187 .step(MockStep::new("x", "X"), &[])
188 .step(MockStep::new("y", "Y"), &["x"])
189 .build()
190 .unwrap();
191
192 let ids = workflow.step_ids();
193 assert_eq!(ids, vec!["x", "y"]);
194 }
195
196 #[test]
197 fn workflow_returns_parallel_groups() {
198 let workflow = Workflow::builder()
199 .step(MockStep::new("a", "A"), &[])
200 .step(MockStep::new("b", "B"), &[])
201 .step(MockStep::new("c", "C"), &["a", "b"])
202 .build()
203 .unwrap();
204
205 let groups = workflow.parallel_groups().unwrap();
206 assert_eq!(groups.len(), 2);
207 let mut first = groups[0].clone();
208 first.sort_unstable();
209 assert_eq!(first, vec!["a", "b"]);
210 assert_eq!(groups[1], vec!["c"]);
211 }
212
213 #[test]
214 fn workflow_gets_step_by_id() {
215 let workflow = Workflow::builder()
216 .step(MockStep::new("a", "A"), &[])
217 .build()
218 .unwrap();
219
220 assert!(workflow.step("a").is_some());
221 assert!(workflow.step("unknown").is_none());
222 }
223}