Skip to main content

wfe_core/builder/
workflow_builder.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep};
5use crate::traits::step::{StepBody, WorkflowData};
6
7use super::inline_step::InlineStep;
8use super::step_builder::StepBuilder;
9
10/// Type alias for boxed inline step closures.
11pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
12
13/// Fluent builder for constructing workflow definitions.
14///
15/// Uses an owned-self pattern: each method consumes and returns the builder,
16/// avoiding lifetime issues with mutable borrows.
17///
18/// # Example
19/// ```ignore
20/// let def = WorkflowBuilder::<MyData>::new()
21///     .start_with::<StepA>()
22///     .name("Step A")
23///     .then::<StepB>()
24///     .name("Step B")
25///     .end_workflow()
26///     .build("my-workflow", 1);
27/// ```
28pub struct WorkflowBuilder<D: WorkflowData> {
29    /// Steps.
30    pub steps: Vec<WorkflowStep>,
31    /// Last step.
32    pub last_step: Option<usize>,
33    /// Inline closures keyed by step id, stored for later registration.
34    pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
35    _phantom: PhantomData<D>,
36}
37
38impl<D: WorkflowData> WorkflowBuilder<D> {
39/// New.
40    pub fn new() -> Self {
41        Self {
42            steps: Vec::new(),
43            last_step: None,
44            inline_closures: HashMap::new(),
45            _phantom: PhantomData,
46        }
47    }
48
49    /// Add the first step of the workflow.
50    ///
51    /// Returns a [`StepBuilder`] so you can configure the step's name, error
52    /// behavior, and compensation before chaining the next step.
53    ///
54    /// # Example
55    /// ```ignore
56    /// let def = WorkflowBuilder::<MyData>::new()
57    ///     .start_with::<FetchData>()
58    ///         .name("Fetch")
59    ///     .then::<Process>()
60    ///         .name("Process")
61    ///     .end_workflow()
62    ///     .build("my-workflow", 1);
63    /// ```
64    pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
65        let id = self.steps.len();
66        let step = WorkflowStep::new(id, std::any::type_name::<S>());
67        self.steps.push(step);
68        self.last_step = Some(id);
69        StepBuilder::new(self, id)
70    }
71
72    /// Add a step by type name. Used by container builder closures.
73    pub fn add_step(&mut self, step_type: &str) -> usize {
74        let id = self.steps.len();
75        self.steps.push(WorkflowStep::new(id, step_type));
76        id
77    }
78
79    /// Add a typed step with an optional name and config.
80    /// Convenience for use inside `parallel` branch closures.
81    pub fn add_step_typed<S: StepBody + Default + 'static>(
82        &mut self,
83        name: &str,
84        config: Option<serde_json::Value>,
85    ) -> usize {
86        let id = self.add_step(std::any::type_name::<S>());
87        self.steps[id].name = Some(name.to_string());
88        if let Some(cfg) = config {
89            self.steps[id].step_config = Some(cfg);
90        }
91        id
92    }
93
94    /// Wire an outcome from `from_step` to `to_step`.
95    pub fn wire_outcome(
96        &mut self,
97        from_step: usize,
98        to_step: usize,
99        value: Option<serde_json::Value>,
100    ) {
101        if let Some(step) = self.steps.get_mut(from_step) {
102            step.outcomes.push(StepOutcome {
103                next_step: to_step,
104                label: None,
105                value,
106            });
107        }
108    }
109
110    /// Add a child step ID to a parent container step.
111    pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
112        if let Some(step) = self.steps.get_mut(parent) {
113            step.children.push(child);
114        }
115    }
116
117    /// Compile the builder into a [`WorkflowDefinition`].
118    ///
119    /// The `id` and `version` together uniquely identify this workflow blueprint.
120    /// Definitions are registered with a [`WorkflowHost`](crate::traits::HostContext)
121    /// before instances can be started.
122    ///
123    /// **Note:** inline closures added via [`StepBuilder::then_fn`] are discarded
124    /// by this method. Use [`build_with_closures`](Self::build_with_closures) if you
125    /// need to retain them for runtime registration.
126    pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
127        let mut def = WorkflowDefinition::new(id, version);
128        def.steps = self.steps;
129        // Note: inline closures are dropped here. Use `build_with_closures` to retain them.
130        def
131    }
132
133    /// Compile the builder into a [`WorkflowDefinition`] and return any inline closures.
134    ///
135    /// Inline closures (added with [`StepBuilder::then_fn`]) are keyed by step id
136    /// in the returned `HashMap`. You must register these closures with the
137    /// [`StepRegistry`](crate::executor::StepRegistry) at runtime so the executor
138    /// can invoke them.
139    pub fn build_with_closures(
140        self,
141        id: impl Into<String>,
142        version: u32,
143    ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
144        let mut def = WorkflowDefinition::new(id, version);
145        def.steps = self.steps;
146        (def, self.inline_closures)
147    }
148
149    /// Register all inline closures from this builder into the given step registry.
150    ///
151    /// Each inline closure is registered under a unique key derived from the
152    /// `InlineStep` type name and step id.
153    pub fn register_inline_steps(
154        self,
155        registry: &mut crate::executor::StepRegistry,
156        id: impl Into<String>,
157        version: u32,
158    ) -> WorkflowDefinition {
159        let mut def = WorkflowDefinition::new(id, version);
160        def.steps = self.steps;
161        for (step_id, closure) in self.inline_closures {
162            let closure = std::sync::Arc::new(closure);
163            let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
164            // Update the step_type so the executor resolves correctly.
165            if let Some(step) = def.steps.get_mut(step_id) {
166                step.step_type = key.clone();
167            }
168            let closure = closure.clone();
169            registry.register_factory(&key, move || {
170                let c = closure.clone();
171                Box::new(InlineStep::new(move || (c)()))
172            });
173        }
174        def
175    }
176}
177
178impl<D: WorkflowData> Default for WorkflowBuilder<D> {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187    use crate::models::{ErrorBehavior, ExecutionResult};
188    use crate::traits::step::StepExecutionContext;
189    use pretty_assertions::assert_eq;
190    use serde::{Deserialize, Serialize};
191
192    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
193    struct TestData {
194        counter: i32,
195    }
196
197    #[derive(Default)]
198    struct StepA;
199
200    #[async_trait::async_trait]
201    impl StepBody for StepA {
202        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
203            Ok(ExecutionResult::next())
204        }
205    }
206
207    #[derive(Default)]
208    struct StepB;
209
210    #[async_trait::async_trait]
211    impl StepBody for StepB {
212        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
213            Ok(ExecutionResult::next())
214        }
215    }
216
217    #[derive(Default)]
218    struct StepC;
219
220    #[async_trait::async_trait]
221    impl StepBody for StepC {
222        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
223            Ok(ExecutionResult::next())
224        }
225    }
226
227    #[test]
228    fn build_empty_workflow() {
229        let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
230        assert_eq!(def.id, "empty");
231        assert_eq!(def.version, 1);
232        assert!(def.steps.is_empty());
233    }
234
235    #[test]
236    fn start_with_adds_first_step() {
237        let def = WorkflowBuilder::<TestData>::new()
238            .start_with::<StepA>()
239            .end_workflow()
240            .build("test", 1);
241        assert_eq!(def.steps.len(), 1);
242        assert!(def.steps[0].step_type.contains("StepA"));
243    }
244
245    #[test]
246    fn then_chains_two_steps_with_outcome() {
247        let def = WorkflowBuilder::<TestData>::new()
248            .start_with::<StepA>()
249            .then::<StepB>()
250            .end_workflow()
251            .build("test", 1);
252        assert_eq!(def.steps.len(), 2);
253        // Step 0 should have outcome pointing to step 1
254        assert_eq!(def.steps[0].outcomes.len(), 1);
255        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
256    }
257
258    #[test]
259    fn then_chains_three_steps() {
260        let def = WorkflowBuilder::<TestData>::new()
261            .start_with::<StepA>()
262            .then::<StepB>()
263            .then::<StepC>()
264            .end_workflow()
265            .build("test", 1);
266        assert_eq!(def.steps.len(), 3);
267        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
268        assert_eq!(def.steps[1].outcomes[0].next_step, 2);
269        assert!(def.steps[2].outcomes.is_empty());
270    }
271
272    #[test]
273    fn name_sets_step_name() {
274        let def = WorkflowBuilder::<TestData>::new()
275            .start_with::<StepA>()
276            .name("First Step")
277            .end_workflow()
278            .build("test", 1);
279        assert_eq!(def.steps[0].name, Some("First Step".into()));
280    }
281
282    #[test]
283    fn on_error_sets_behavior() {
284        let def = WorkflowBuilder::<TestData>::new()
285            .start_with::<StepA>()
286            .on_error(ErrorBehavior::Suspend)
287            .end_workflow()
288            .build("test", 1);
289        assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
290    }
291
292    #[test]
293    fn if_do_inserts_container_with_children() {
294        let def = WorkflowBuilder::<TestData>::new()
295            .start_with::<StepA>()
296            .if_do::<StepB>(|b| {
297                let id = b.add_step(std::any::type_name::<StepC>());
298                b.last_step = Some(id);
299            })
300            .end_workflow()
301            .build("test", 1);
302
303        // Steps: 0=StepA, 1=IfStep, 2=StepC (child)
304        // StepA -> IfStep -> (after if)
305        assert!(def.steps.len() >= 3);
306        // The If step should have StepC as a child
307        assert!(def.steps[1].step_type.contains("IfStep"));
308        assert!(def.steps[1].children.contains(&2));
309    }
310
311    #[test]
312    fn while_do_inserts_container() {
313        let def = WorkflowBuilder::<TestData>::new()
314            .start_with::<StepA>()
315            .while_do::<StepB>(|b| {
316                b.add_step(std::any::type_name::<StepC>());
317            })
318            .end_workflow()
319            .build("test", 1);
320
321        assert!(def.steps.len() >= 3);
322        assert!(def.steps[1].step_type.contains("WhileStep"));
323    }
324
325    #[test]
326    fn for_each_inserts_container() {
327        let def = WorkflowBuilder::<TestData>::new()
328            .start_with::<StepA>()
329            .for_each::<StepB>(|b| {
330                b.add_step(std::any::type_name::<StepC>());
331            })
332            .end_workflow()
333            .build("test", 1);
334
335        assert!(def.steps.len() >= 3);
336        assert!(def.steps[1].step_type.contains("ForEachStep"));
337    }
338
339    #[test]
340    fn parallel_creates_branches() {
341        let def = WorkflowBuilder::<TestData>::new()
342            .start_with::<StepA>()
343            .parallel(|branches| {
344                branches
345                    .branch(|b| {
346                        b.add_step(std::any::type_name::<StepB>());
347                    })
348                    .branch(|b| {
349                        b.add_step(std::any::type_name::<StepC>());
350                    })
351            })
352            .end_workflow()
353            .build("test", 1);
354
355        // Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC
356        assert!(def.steps.len() >= 4);
357        assert!(def.steps[1].step_type.contains("SequenceStep"));
358        assert!(def.steps[1].children.len() >= 2);
359    }
360
361    #[test]
362    fn saga_with_compensation() {
363        let def = WorkflowBuilder::<TestData>::new()
364            .start_with::<StepA>()
365            .saga(|b| {
366                b.add_step(std::any::type_name::<StepB>());
367                b.add_step(std::any::type_name::<StepC>());
368            })
369            .end_workflow()
370            .build("test", 1);
371
372        // Saga container should exist and have children
373        assert!(def.steps[1].step_type.contains("SagaContainerStep"));
374        assert!(def.steps[1].saga);
375        assert!(!def.steps[1].children.is_empty());
376    }
377
378    #[test]
379    fn compensate_with_sets_compensation_step() {
380        let def = WorkflowBuilder::<TestData>::new()
381            .start_with::<StepA>()
382            .compensate_with::<StepB>()
383            .end_workflow()
384            .build("test", 1);
385
386        // Step 0 (StepA) should have compensation pointing to step 1 (StepB)
387        assert_eq!(def.steps[0].compensation_step_id, Some(1));
388        assert!(def.steps[1].step_type.contains("StepB"));
389    }
390
391    #[test]
392    fn config_sets_step_config() {
393        let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
394        let def = WorkflowBuilder::<TestData>::new()
395            .start_with::<StepA>()
396            .config(cfg.clone())
397            .end_workflow()
398            .build("test", 1);
399        assert_eq!(def.steps[0].step_config, Some(cfg));
400    }
401
402    #[test]
403    fn config_chains_with_name() {
404        let cfg = serde_json::json!({"namespace": "data"});
405        let def = WorkflowBuilder::<TestData>::new()
406            .start_with::<StepA>()
407            .name("apply-data")
408            .config(cfg.clone())
409            .then::<StepB>()
410            .end_workflow()
411            .build("test", 1);
412        assert_eq!(def.steps[0].name, Some("apply-data".into()));
413        assert_eq!(def.steps[0].step_config, Some(cfg));
414        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
415    }
416
417    #[test]
418    fn config_on_multiple_steps_of_same_type() {
419        let cfg_a = serde_json::json!({"namespace": "ory"});
420        let cfg_b = serde_json::json!({"namespace": "data"});
421        let def = WorkflowBuilder::<TestData>::new()
422            .start_with::<StepA>()
423            .name("apply-ory")
424            .config(cfg_a.clone())
425            .then::<StepA>()
426            .name("apply-data")
427            .config(cfg_b.clone())
428            .end_workflow()
429            .build("test", 1);
430        assert_eq!(def.steps[0].step_config, Some(cfg_a));
431        assert_eq!(def.steps[1].step_config, Some(cfg_b));
432        // Both are StepA
433        assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
434    }
435
436    #[test]
437    fn add_step_typed_sets_name_and_config() {
438        let cfg = serde_json::json!({"namespace": "ory"});
439        let mut builder = WorkflowBuilder::<TestData>::new();
440        let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
441        assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
442        assert_eq!(builder.steps[id].step_config, Some(cfg));
443        assert!(builder.steps[id].step_type.contains("StepA"));
444    }
445
446    #[test]
447    fn add_step_typed_without_config() {
448        let mut builder = WorkflowBuilder::<TestData>::new();
449        let id = builder.add_step_typed::<StepB>("my-step", None);
450        assert_eq!(builder.steps[id].name, Some("my-step".into()));
451        assert_eq!(builder.steps[id].step_config, None);
452    }
453
454    #[test]
455    fn wire_outcome_connects_steps() {
456        let mut builder = WorkflowBuilder::<TestData>::new();
457        let id0 = builder.add_step_typed::<StepA>("first", None);
458        let id1 = builder.add_step_typed::<StepB>("second", None);
459        builder.wire_outcome(id0, id1, None);
460        assert_eq!(builder.steps[id0].outcomes.len(), 1);
461        assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
462    }
463
464    #[test]
465    fn inline_step_via_then_fn() {
466        let def = WorkflowBuilder::<TestData>::new()
467            .start_with::<StepA>()
468            .then_fn(ExecutionResult::next)
469            .end_workflow()
470            .build("test", 1);
471
472        assert_eq!(def.steps.len(), 2);
473        assert!(def.steps[1].step_type.contains("InlineStep"));
474        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
475    }
476}