Skip to main content

wfe_core/builder/
workflow_builder.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{
5    ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
6};
7use crate::traits::step::{StepBody, WorkflowData};
8
9use super::inline_step::InlineStep;
10use super::step_builder::StepBuilder;
11
12/// Type alias for boxed inline step closures.
13pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
14
15/// Fluent builder for constructing workflow definitions.
16///
17/// Uses an owned-self pattern: each method consumes and returns the builder,
18/// avoiding lifetime issues with mutable borrows.
19///
20/// # Example
21/// ```ignore
22/// let def = WorkflowBuilder::<MyData>::new()
23///     .start_with::<StepA>()
24///     .name("Step A")
25///     .then::<StepB>()
26///     .name("Step B")
27///     .end_workflow()
28///     .build("my-workflow", 1);
29/// ```
30pub struct WorkflowBuilder<D: WorkflowData> {
31    pub(crate) steps: Vec<WorkflowStep>,
32    pub(crate) last_step: Option<usize>,
33    /// Inline closures keyed by step id, stored for later registration.
34    pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
35    _phantom: PhantomData<D>,
36}
37
38impl<D: WorkflowData> WorkflowBuilder<D> {
39    pub fn new() -> Self {
40        Self {
41            steps: Vec::new(),
42            last_step: None,
43            inline_closures: HashMap::new(),
44            _phantom: PhantomData,
45        }
46    }
47
48    /// Add the first step of the workflow.
49    pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
50        let id = self.steps.len();
51        let step = WorkflowStep::new(id, std::any::type_name::<S>());
52        self.steps.push(step);
53        self.last_step = Some(id);
54        StepBuilder::new(self, id)
55    }
56
57    /// Add a step by type name. Used by container builder closures.
58    pub fn add_step(&mut self, step_type: &str) -> usize {
59        let id = self.steps.len();
60        self.steps.push(WorkflowStep::new(id, step_type));
61        id
62    }
63
64    /// Wire an outcome from `from_step` to `to_step`.
65    pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
66        if let Some(step) = self.steps.get_mut(from_step) {
67            step.outcomes.push(StepOutcome {
68                next_step: to_step,
69                label: None,
70                value,
71            });
72        }
73    }
74
75    /// Add a child step ID to a parent container step.
76    pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
77        if let Some(step) = self.steps.get_mut(parent) {
78            step.children.push(child);
79        }
80    }
81
82    /// Compile the builder into a WorkflowDefinition.
83    pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
84        let mut def = WorkflowDefinition::new(id, version);
85        def.steps = self.steps;
86        // Note: inline closures are dropped here. Use `build_with_closures` to retain them.
87        def
88    }
89
90    /// Compile the builder into a WorkflowDefinition and return any inline closures
91    /// keyed by step id.
92    pub fn build_with_closures(
93        self,
94        id: impl Into<String>,
95        version: u32,
96    ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
97        let mut def = WorkflowDefinition::new(id, version);
98        def.steps = self.steps;
99        (def, self.inline_closures)
100    }
101
102    /// Register all inline closures from this builder into the given step registry.
103    ///
104    /// Each inline closure is registered under a unique key derived from the
105    /// `InlineStep` type name and step id.
106    pub fn register_inline_steps(
107        self,
108        registry: &mut crate::executor::StepRegistry,
109        id: impl Into<String>,
110        version: u32,
111    ) -> WorkflowDefinition {
112        let mut def = WorkflowDefinition::new(id, version);
113        def.steps = self.steps;
114        for (step_id, closure) in self.inline_closures {
115            let closure = std::sync::Arc::new(closure);
116            let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
117            // Update the step_type so the executor resolves correctly.
118            if let Some(step) = def.steps.get_mut(step_id) {
119                step.step_type = key.clone();
120            }
121            let closure = closure.clone();
122            registry.register_factory(&key, move || {
123                let c = closure.clone();
124                Box::new(InlineStep::new(move || (c)()))
125            });
126        }
127        def
128    }
129}
130
131impl<D: WorkflowData> Default for WorkflowBuilder<D> {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::models::{ErrorBehavior, ExecutionResult};
141    use crate::traits::step::StepExecutionContext;
142    use pretty_assertions::assert_eq;
143    use serde::{Deserialize, Serialize};
144
145    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
146    struct TestData {
147        counter: i32,
148    }
149
150    #[derive(Default)]
151    struct StepA;
152
153    #[async_trait::async_trait]
154    impl StepBody for StepA {
155        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
156            Ok(ExecutionResult::next())
157        }
158    }
159
160    #[derive(Default)]
161    struct StepB;
162
163    #[async_trait::async_trait]
164    impl StepBody for StepB {
165        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
166            Ok(ExecutionResult::next())
167        }
168    }
169
170    #[derive(Default)]
171    struct StepC;
172
173    #[async_trait::async_trait]
174    impl StepBody for StepC {
175        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
176            Ok(ExecutionResult::next())
177        }
178    }
179
180    #[test]
181    fn build_empty_workflow() {
182        let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
183        assert_eq!(def.id, "empty");
184        assert_eq!(def.version, 1);
185        assert!(def.steps.is_empty());
186    }
187
188    #[test]
189    fn start_with_adds_first_step() {
190        let def = WorkflowBuilder::<TestData>::new()
191            .start_with::<StepA>()
192            .end_workflow()
193            .build("test", 1);
194        assert_eq!(def.steps.len(), 1);
195        assert!(def.steps[0].step_type.contains("StepA"));
196    }
197
198    #[test]
199    fn then_chains_two_steps_with_outcome() {
200        let def = WorkflowBuilder::<TestData>::new()
201            .start_with::<StepA>()
202            .then::<StepB>()
203            .end_workflow()
204            .build("test", 1);
205        assert_eq!(def.steps.len(), 2);
206        // Step 0 should have outcome pointing to step 1
207        assert_eq!(def.steps[0].outcomes.len(), 1);
208        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
209    }
210
211    #[test]
212    fn then_chains_three_steps() {
213        let def = WorkflowBuilder::<TestData>::new()
214            .start_with::<StepA>()
215            .then::<StepB>()
216            .then::<StepC>()
217            .end_workflow()
218            .build("test", 1);
219        assert_eq!(def.steps.len(), 3);
220        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
221        assert_eq!(def.steps[1].outcomes[0].next_step, 2);
222        assert!(def.steps[2].outcomes.is_empty());
223    }
224
225    #[test]
226    fn name_sets_step_name() {
227        let def = WorkflowBuilder::<TestData>::new()
228            .start_with::<StepA>()
229            .name("First Step")
230            .end_workflow()
231            .build("test", 1);
232        assert_eq!(def.steps[0].name, Some("First Step".into()));
233    }
234
235    #[test]
236    fn on_error_sets_behavior() {
237        let def = WorkflowBuilder::<TestData>::new()
238            .start_with::<StepA>()
239            .on_error(ErrorBehavior::Suspend)
240            .end_workflow()
241            .build("test", 1);
242        assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
243    }
244
245    #[test]
246    fn if_do_inserts_container_with_children() {
247        let def = WorkflowBuilder::<TestData>::new()
248            .start_with::<StepA>()
249            .if_do::<StepB>(|b| {
250                let id = b.add_step(std::any::type_name::<StepC>());
251                b.last_step = Some(id);
252            })
253            .end_workflow()
254            .build("test", 1);
255
256        // Steps: 0=StepA, 1=IfStep, 2=StepC (child)
257        // StepA -> IfStep -> (after if)
258        assert!(def.steps.len() >= 3);
259        // The If step should have StepC as a child
260        assert!(def.steps[1].step_type.contains("IfStep"));
261        assert!(def.steps[1].children.contains(&2));
262    }
263
264    #[test]
265    fn while_do_inserts_container() {
266        let def = WorkflowBuilder::<TestData>::new()
267            .start_with::<StepA>()
268            .while_do::<StepB>(|b| {
269                b.add_step(std::any::type_name::<StepC>());
270            })
271            .end_workflow()
272            .build("test", 1);
273
274        assert!(def.steps.len() >= 3);
275        assert!(def.steps[1].step_type.contains("WhileStep"));
276    }
277
278    #[test]
279    fn for_each_inserts_container() {
280        let def = WorkflowBuilder::<TestData>::new()
281            .start_with::<StepA>()
282            .for_each::<StepB>(|b| {
283                b.add_step(std::any::type_name::<StepC>());
284            })
285            .end_workflow()
286            .build("test", 1);
287
288        assert!(def.steps.len() >= 3);
289        assert!(def.steps[1].step_type.contains("ForEachStep"));
290    }
291
292    #[test]
293    fn parallel_creates_branches() {
294        let def = WorkflowBuilder::<TestData>::new()
295            .start_with::<StepA>()
296            .parallel(|branches| {
297                branches
298                    .branch(|b| {
299                        b.add_step(std::any::type_name::<StepB>());
300                    })
301                    .branch(|b| {
302                        b.add_step(std::any::type_name::<StepC>());
303                    })
304            })
305            .end_workflow()
306            .build("test", 1);
307
308        // Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC
309        assert!(def.steps.len() >= 4);
310        assert!(def.steps[1].step_type.contains("SequenceStep"));
311        assert!(def.steps[1].children.len() >= 2);
312    }
313
314    #[test]
315    fn saga_with_compensation() {
316        let def = WorkflowBuilder::<TestData>::new()
317            .start_with::<StepA>()
318            .saga(|b| {
319                b.add_step(std::any::type_name::<StepB>());
320                b.add_step(std::any::type_name::<StepC>());
321            })
322            .end_workflow()
323            .build("test", 1);
324
325        // Saga container should exist and have children
326        assert!(def.steps[1].step_type.contains("SagaContainerStep"));
327        assert!(def.steps[1].saga);
328        assert!(!def.steps[1].children.is_empty());
329    }
330
331    #[test]
332    fn compensate_with_sets_compensation_step() {
333        let def = WorkflowBuilder::<TestData>::new()
334            .start_with::<StepA>()
335            .compensate_with::<StepB>()
336            .end_workflow()
337            .build("test", 1);
338
339        // Step 0 (StepA) should have compensation pointing to step 1 (StepB)
340        assert_eq!(def.steps[0].compensation_step_id, Some(1));
341        assert!(def.steps[1].step_type.contains("StepB"));
342    }
343
344    #[test]
345    fn inline_step_via_then_fn() {
346        let def = WorkflowBuilder::<TestData>::new()
347            .start_with::<StepA>()
348            .then_fn(ExecutionResult::next)
349            .end_workflow()
350            .build("test", 1);
351
352        assert_eq!(def.steps.len(), 2);
353        assert!(def.steps[1].step_type.contains("InlineStep"));
354        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
355    }
356}