Skip to main content

wfe_core/builder/
workflow_builder.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep};
5use crate::traits::step::{StepBody, WorkflowData};
6
7use super::inline_step::InlineStep;
8use super::step_builder::StepBuilder;
9
10/// Type alias for boxed inline step closures.
11pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
12
13/// Fluent builder for constructing workflow definitions.
14///
15/// Uses an owned-self pattern: each method consumes and returns the builder,
16/// avoiding lifetime issues with mutable borrows.
17///
18/// # Example
19/// ```ignore
20/// let def = WorkflowBuilder::<MyData>::new()
21///     .start_with::<StepA>()
22///     .name("Step A")
23///     .then::<StepB>()
24///     .name("Step B")
25///     .end_workflow()
26///     .build("my-workflow", 1);
27/// ```
28pub struct WorkflowBuilder<D: WorkflowData> {
29    /// Steps.
30    pub steps: Vec<WorkflowStep>,
31    /// Last step.
32    pub last_step: Option<usize>,
33    /// Inline closures keyed by step id, stored for later registration.
34    pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
35    _phantom: PhantomData<D>,
36}
37
38impl<D: WorkflowData> WorkflowBuilder<D> {
39/// New.
40    pub fn new() -> Self {
41        Self {
42            steps: Vec::new(),
43            last_step: None,
44            inline_closures: HashMap::new(),
45            _phantom: PhantomData,
46        }
47    }
48
49    /// Add the first step of the workflow.
50    pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
51        let id = self.steps.len();
52        let step = WorkflowStep::new(id, std::any::type_name::<S>());
53        self.steps.push(step);
54        self.last_step = Some(id);
55        StepBuilder::new(self, id)
56    }
57
58    /// Add a step by type name. Used by container builder closures.
59    pub fn add_step(&mut self, step_type: &str) -> usize {
60        let id = self.steps.len();
61        self.steps.push(WorkflowStep::new(id, step_type));
62        id
63    }
64
65    /// Add a typed step with an optional name and config.
66    /// Convenience for use inside `parallel` branch closures.
67    pub fn add_step_typed<S: StepBody + Default + 'static>(
68        &mut self,
69        name: &str,
70        config: Option<serde_json::Value>,
71    ) -> usize {
72        let id = self.add_step(std::any::type_name::<S>());
73        self.steps[id].name = Some(name.to_string());
74        if let Some(cfg) = config {
75            self.steps[id].step_config = Some(cfg);
76        }
77        id
78    }
79
80    /// Wire an outcome from `from_step` to `to_step`.
81    pub fn wire_outcome(
82        &mut self,
83        from_step: usize,
84        to_step: usize,
85        value: Option<serde_json::Value>,
86    ) {
87        if let Some(step) = self.steps.get_mut(from_step) {
88            step.outcomes.push(StepOutcome {
89                next_step: to_step,
90                label: None,
91                value,
92            });
93        }
94    }
95
96    /// Add a child step ID to a parent container step.
97    pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
98        if let Some(step) = self.steps.get_mut(parent) {
99            step.children.push(child);
100        }
101    }
102
103    /// Compile the builder into a WorkflowDefinition.
104    pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
105        let mut def = WorkflowDefinition::new(id, version);
106        def.steps = self.steps;
107        // Note: inline closures are dropped here. Use `build_with_closures` to retain them.
108        def
109    }
110
111    /// Compile the builder into a WorkflowDefinition and return any inline closures
112    /// keyed by step id.
113    pub fn build_with_closures(
114        self,
115        id: impl Into<String>,
116        version: u32,
117    ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
118        let mut def = WorkflowDefinition::new(id, version);
119        def.steps = self.steps;
120        (def, self.inline_closures)
121    }
122
123    /// Register all inline closures from this builder into the given step registry.
124    ///
125    /// Each inline closure is registered under a unique key derived from the
126    /// `InlineStep` type name and step id.
127    pub fn register_inline_steps(
128        self,
129        registry: &mut crate::executor::StepRegistry,
130        id: impl Into<String>,
131        version: u32,
132    ) -> WorkflowDefinition {
133        let mut def = WorkflowDefinition::new(id, version);
134        def.steps = self.steps;
135        for (step_id, closure) in self.inline_closures {
136            let closure = std::sync::Arc::new(closure);
137            let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
138            // Update the step_type so the executor resolves correctly.
139            if let Some(step) = def.steps.get_mut(step_id) {
140                step.step_type = key.clone();
141            }
142            let closure = closure.clone();
143            registry.register_factory(&key, move || {
144                let c = closure.clone();
145                Box::new(InlineStep::new(move || (c)()))
146            });
147        }
148        def
149    }
150}
151
152impl<D: WorkflowData> Default for WorkflowBuilder<D> {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::models::{ErrorBehavior, ExecutionResult};
162    use crate::traits::step::StepExecutionContext;
163    use pretty_assertions::assert_eq;
164    use serde::{Deserialize, Serialize};
165
166    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
167    struct TestData {
168        counter: i32,
169    }
170
171    #[derive(Default)]
172    struct StepA;
173
174    #[async_trait::async_trait]
175    impl StepBody for StepA {
176        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
177            Ok(ExecutionResult::next())
178        }
179    }
180
181    #[derive(Default)]
182    struct StepB;
183
184    #[async_trait::async_trait]
185    impl StepBody for StepB {
186        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
187            Ok(ExecutionResult::next())
188        }
189    }
190
191    #[derive(Default)]
192    struct StepC;
193
194    #[async_trait::async_trait]
195    impl StepBody for StepC {
196        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
197            Ok(ExecutionResult::next())
198        }
199    }
200
201    #[test]
202    fn build_empty_workflow() {
203        let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
204        assert_eq!(def.id, "empty");
205        assert_eq!(def.version, 1);
206        assert!(def.steps.is_empty());
207    }
208
209    #[test]
210    fn start_with_adds_first_step() {
211        let def = WorkflowBuilder::<TestData>::new()
212            .start_with::<StepA>()
213            .end_workflow()
214            .build("test", 1);
215        assert_eq!(def.steps.len(), 1);
216        assert!(def.steps[0].step_type.contains("StepA"));
217    }
218
219    #[test]
220    fn then_chains_two_steps_with_outcome() {
221        let def = WorkflowBuilder::<TestData>::new()
222            .start_with::<StepA>()
223            .then::<StepB>()
224            .end_workflow()
225            .build("test", 1);
226        assert_eq!(def.steps.len(), 2);
227        // Step 0 should have outcome pointing to step 1
228        assert_eq!(def.steps[0].outcomes.len(), 1);
229        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
230    }
231
232    #[test]
233    fn then_chains_three_steps() {
234        let def = WorkflowBuilder::<TestData>::new()
235            .start_with::<StepA>()
236            .then::<StepB>()
237            .then::<StepC>()
238            .end_workflow()
239            .build("test", 1);
240        assert_eq!(def.steps.len(), 3);
241        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
242        assert_eq!(def.steps[1].outcomes[0].next_step, 2);
243        assert!(def.steps[2].outcomes.is_empty());
244    }
245
246    #[test]
247    fn name_sets_step_name() {
248        let def = WorkflowBuilder::<TestData>::new()
249            .start_with::<StepA>()
250            .name("First Step")
251            .end_workflow()
252            .build("test", 1);
253        assert_eq!(def.steps[0].name, Some("First Step".into()));
254    }
255
256    #[test]
257    fn on_error_sets_behavior() {
258        let def = WorkflowBuilder::<TestData>::new()
259            .start_with::<StepA>()
260            .on_error(ErrorBehavior::Suspend)
261            .end_workflow()
262            .build("test", 1);
263        assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
264    }
265
266    #[test]
267    fn if_do_inserts_container_with_children() {
268        let def = WorkflowBuilder::<TestData>::new()
269            .start_with::<StepA>()
270            .if_do::<StepB>(|b| {
271                let id = b.add_step(std::any::type_name::<StepC>());
272                b.last_step = Some(id);
273            })
274            .end_workflow()
275            .build("test", 1);
276
277        // Steps: 0=StepA, 1=IfStep, 2=StepC (child)
278        // StepA -> IfStep -> (after if)
279        assert!(def.steps.len() >= 3);
280        // The If step should have StepC as a child
281        assert!(def.steps[1].step_type.contains("IfStep"));
282        assert!(def.steps[1].children.contains(&2));
283    }
284
285    #[test]
286    fn while_do_inserts_container() {
287        let def = WorkflowBuilder::<TestData>::new()
288            .start_with::<StepA>()
289            .while_do::<StepB>(|b| {
290                b.add_step(std::any::type_name::<StepC>());
291            })
292            .end_workflow()
293            .build("test", 1);
294
295        assert!(def.steps.len() >= 3);
296        assert!(def.steps[1].step_type.contains("WhileStep"));
297    }
298
299    #[test]
300    fn for_each_inserts_container() {
301        let def = WorkflowBuilder::<TestData>::new()
302            .start_with::<StepA>()
303            .for_each::<StepB>(|b| {
304                b.add_step(std::any::type_name::<StepC>());
305            })
306            .end_workflow()
307            .build("test", 1);
308
309        assert!(def.steps.len() >= 3);
310        assert!(def.steps[1].step_type.contains("ForEachStep"));
311    }
312
313    #[test]
314    fn parallel_creates_branches() {
315        let def = WorkflowBuilder::<TestData>::new()
316            .start_with::<StepA>()
317            .parallel(|branches| {
318                branches
319                    .branch(|b| {
320                        b.add_step(std::any::type_name::<StepB>());
321                    })
322                    .branch(|b| {
323                        b.add_step(std::any::type_name::<StepC>());
324                    })
325            })
326            .end_workflow()
327            .build("test", 1);
328
329        // Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC
330        assert!(def.steps.len() >= 4);
331        assert!(def.steps[1].step_type.contains("SequenceStep"));
332        assert!(def.steps[1].children.len() >= 2);
333    }
334
335    #[test]
336    fn saga_with_compensation() {
337        let def = WorkflowBuilder::<TestData>::new()
338            .start_with::<StepA>()
339            .saga(|b| {
340                b.add_step(std::any::type_name::<StepB>());
341                b.add_step(std::any::type_name::<StepC>());
342            })
343            .end_workflow()
344            .build("test", 1);
345
346        // Saga container should exist and have children
347        assert!(def.steps[1].step_type.contains("SagaContainerStep"));
348        assert!(def.steps[1].saga);
349        assert!(!def.steps[1].children.is_empty());
350    }
351
352    #[test]
353    fn compensate_with_sets_compensation_step() {
354        let def = WorkflowBuilder::<TestData>::new()
355            .start_with::<StepA>()
356            .compensate_with::<StepB>()
357            .end_workflow()
358            .build("test", 1);
359
360        // Step 0 (StepA) should have compensation pointing to step 1 (StepB)
361        assert_eq!(def.steps[0].compensation_step_id, Some(1));
362        assert!(def.steps[1].step_type.contains("StepB"));
363    }
364
365    #[test]
366    fn config_sets_step_config() {
367        let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
368        let def = WorkflowBuilder::<TestData>::new()
369            .start_with::<StepA>()
370            .config(cfg.clone())
371            .end_workflow()
372            .build("test", 1);
373        assert_eq!(def.steps[0].step_config, Some(cfg));
374    }
375
376    #[test]
377    fn config_chains_with_name() {
378        let cfg = serde_json::json!({"namespace": "data"});
379        let def = WorkflowBuilder::<TestData>::new()
380            .start_with::<StepA>()
381            .name("apply-data")
382            .config(cfg.clone())
383            .then::<StepB>()
384            .end_workflow()
385            .build("test", 1);
386        assert_eq!(def.steps[0].name, Some("apply-data".into()));
387        assert_eq!(def.steps[0].step_config, Some(cfg));
388        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
389    }
390
391    #[test]
392    fn config_on_multiple_steps_of_same_type() {
393        let cfg_a = serde_json::json!({"namespace": "ory"});
394        let cfg_b = serde_json::json!({"namespace": "data"});
395        let def = WorkflowBuilder::<TestData>::new()
396            .start_with::<StepA>()
397            .name("apply-ory")
398            .config(cfg_a.clone())
399            .then::<StepA>()
400            .name("apply-data")
401            .config(cfg_b.clone())
402            .end_workflow()
403            .build("test", 1);
404        assert_eq!(def.steps[0].step_config, Some(cfg_a));
405        assert_eq!(def.steps[1].step_config, Some(cfg_b));
406        // Both are StepA
407        assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
408    }
409
410    #[test]
411    fn add_step_typed_sets_name_and_config() {
412        let cfg = serde_json::json!({"namespace": "ory"});
413        let mut builder = WorkflowBuilder::<TestData>::new();
414        let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
415        assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
416        assert_eq!(builder.steps[id].step_config, Some(cfg));
417        assert!(builder.steps[id].step_type.contains("StepA"));
418    }
419
420    #[test]
421    fn add_step_typed_without_config() {
422        let mut builder = WorkflowBuilder::<TestData>::new();
423        let id = builder.add_step_typed::<StepB>("my-step", None);
424        assert_eq!(builder.steps[id].name, Some("my-step".into()));
425        assert_eq!(builder.steps[id].step_config, None);
426    }
427
428    #[test]
429    fn wire_outcome_connects_steps() {
430        let mut builder = WorkflowBuilder::<TestData>::new();
431        let id0 = builder.add_step_typed::<StepA>("first", None);
432        let id1 = builder.add_step_typed::<StepB>("second", None);
433        builder.wire_outcome(id0, id1, None);
434        assert_eq!(builder.steps[id0].outcomes.len(), 1);
435        assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
436    }
437
438    #[test]
439    fn inline_step_via_then_fn() {
440        let def = WorkflowBuilder::<TestData>::new()
441            .start_with::<StepA>()
442            .then_fn(ExecutionResult::next)
443            .end_workflow()
444            .build("test", 1);
445
446        assert_eq!(def.steps.len(), 2);
447        assert!(def.steps[1].step_type.contains("InlineStep"));
448        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
449    }
450}