Skip to main content

wfe_core/builder/
workflow_builder.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{
5    ErrorBehavior, ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
6};
7use crate::traits::step::{StepBody, WorkflowData};
8
9use super::inline_step::InlineStep;
10use super::step_builder::StepBuilder;
11
12/// Type alias for boxed inline step closures.
13pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
14
15/// Fluent builder for constructing workflow definitions.
16///
17/// Uses an owned-self pattern: each method consumes and returns the builder,
18/// avoiding lifetime issues with mutable borrows.
19///
20/// # Example
21/// ```ignore
22/// let def = WorkflowBuilder::<MyData>::new()
23///     .start_with::<StepA>()
24///     .name("Step A")
25///     .then::<StepB>()
26///     .name("Step B")
27///     .end_workflow()
28///     .build("my-workflow", 1);
29/// ```
30pub struct WorkflowBuilder<D: WorkflowData> {
31    /// Steps.
32    pub steps: Vec<WorkflowStep>,
33    /// Last step.
34    pub last_step: Option<usize>,
35    /// Inline closures keyed by step id, stored for later registration.
36    pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
37    /// Default error behavior for steps that don't override it.
38    pub(crate) default_error_behavior: Option<ErrorBehavior>,
39    _phantom: PhantomData<D>,
40}
41
42impl<D: WorkflowData> WorkflowBuilder<D> {
43    /// New.
44    pub fn new() -> Self {
45        Self {
46            steps: Vec::new(),
47            last_step: None,
48            inline_closures: HashMap::new(),
49            default_error_behavior: None,
50            _phantom: PhantomData,
51        }
52    }
53
54    /// Set the default error behavior for all steps in this workflow.
55    ///
56    /// Steps that don't have their own [`on_error`](crate::builder::StepBuilder::on_error)
57    /// configuration will use this behavior when they fail.
58    pub fn default_error_behavior(mut self, behavior: ErrorBehavior) -> Self {
59        self.default_error_behavior = Some(behavior);
60        self
61    }
62
63    /// Add the first step of the workflow.
64    ///
65    /// Returns a [`StepBuilder`] so you can configure the step's name, error
66    /// behavior, and compensation before chaining the next step.
67    ///
68    /// # Example
69    /// ```ignore
70    /// let def = WorkflowBuilder::<MyData>::new()
71    ///     .start_with::<FetchData>()
72    ///         .name("Fetch")
73    ///     .then::<Process>()
74    ///         .name("Process")
75    ///     .end_workflow()
76    ///     .build("my-workflow", 1);
77    /// ```
78    pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
79        let id = self.steps.len();
80        let step = WorkflowStep::new(id, std::any::type_name::<S>());
81        self.steps.push(step);
82        self.last_step = Some(id);
83        StepBuilder::new(self, id)
84    }
85
86    /// Add a step by type name. Used by container builder closures.
87    pub fn add_step(&mut self, step_type: &str) -> usize {
88        let id = self.steps.len();
89        self.steps.push(WorkflowStep::new(id, step_type));
90        id
91    }
92
93    /// Add a typed step with an optional name and config.
94    /// Convenience for use inside `parallel` branch closures.
95    pub fn add_step_typed<S: StepBody + Default + 'static>(
96        &mut self,
97        name: &str,
98        config: Option<serde_json::Value>,
99    ) -> usize {
100        let id = self.add_step(std::any::type_name::<S>());
101        self.steps[id].name = Some(name.to_string());
102        if let Some(cfg) = config {
103            self.steps[id].step_config = Some(cfg);
104        }
105        id
106    }
107
108    /// Wire an outcome from `from_step` to `to_step`.
109    pub fn wire_outcome(
110        &mut self,
111        from_step: usize,
112        to_step: usize,
113        value: Option<serde_json::Value>,
114    ) {
115        if let Some(step) = self.steps.get_mut(from_step) {
116            step.outcomes.push(StepOutcome {
117                next_step: to_step,
118                label: None,
119                value,
120            });
121        }
122    }
123
124    /// Add a child step ID to a parent container step.
125    pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
126        if let Some(step) = self.steps.get_mut(parent) {
127            step.children.push(child);
128        }
129    }
130
131    /// Compile the builder into a [`WorkflowDefinition`].
132    ///
133    /// The `id` and `version` together uniquely identify this workflow blueprint.
134    /// Definitions are registered with a [`WorkflowHost`](crate::traits::HostContext)
135    /// before instances can be started.
136    ///
137    /// **Note:** inline closures added via [`StepBuilder::then_fn`] are discarded
138    /// by this method. Use [`build_with_closures`](Self::build_with_closures) if you
139    /// need to retain them for runtime registration.
140    pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
141        let mut def = WorkflowDefinition::new(id, version);
142        def.steps = self.steps;
143        if let Some(eb) = self.default_error_behavior {
144            def.default_error_behavior = eb;
145        }
146        // Note: inline closures are dropped here. Use `build_with_closures` to retain them.
147        def
148    }
149
150    /// Compile the builder into a [`WorkflowDefinition`] and return any inline closures.
151    ///
152    /// Inline closures (added with [`StepBuilder::then_fn`]) are keyed by step id
153    /// in the returned `HashMap`. You must register these closures with the
154    /// [`StepRegistry`](crate::executor::StepRegistry) at runtime so the executor
155    /// can invoke them.
156    pub fn build_with_closures(
157        self,
158        id: impl Into<String>,
159        version: u32,
160    ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
161        let mut def = WorkflowDefinition::new(id, version);
162        def.steps = self.steps;
163        if let Some(eb) = self.default_error_behavior {
164            def.default_error_behavior = eb;
165        }
166        (def, self.inline_closures)
167    }
168
169    /// Register all inline closures from this builder into the given step registry.
170    ///
171    /// Each inline closure is registered under a unique key derived from the
172    /// `InlineStep` type name and step id.
173    pub fn register_inline_steps(
174        self,
175        registry: &mut crate::executor::StepRegistry,
176        id: impl Into<String>,
177        version: u32,
178    ) -> WorkflowDefinition {
179        let mut def = WorkflowDefinition::new(id, version);
180        def.steps = self.steps;
181        if let Some(eb) = self.default_error_behavior {
182            def.default_error_behavior = eb;
183        }
184        for (step_id, closure) in self.inline_closures {
185            let closure = std::sync::Arc::new(closure);
186            let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
187            // Update the step_type so the executor resolves correctly.
188            if let Some(step) = def.steps.get_mut(step_id) {
189                step.step_type = key.clone();
190            }
191            let closure = closure.clone();
192            registry.register_factory(&key, move || {
193                let c = closure.clone();
194                Box::new(InlineStep::new(move || (c)()))
195            });
196        }
197        def
198    }
199}
200
201impl<D: WorkflowData> Default for WorkflowBuilder<D> {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::models::{ErrorBehavior, ExecutionResult};
211    use crate::traits::step::StepExecutionContext;
212    use pretty_assertions::assert_eq;
213    use serde::{Deserialize, Serialize};
214
215    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
216    struct TestData {
217        counter: i32,
218    }
219
220    #[derive(Default)]
221    struct StepA;
222
223    #[async_trait::async_trait]
224    impl StepBody for StepA {
225        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
226            Ok(ExecutionResult::next())
227        }
228    }
229
230    #[derive(Default)]
231    struct StepB;
232
233    #[async_trait::async_trait]
234    impl StepBody for StepB {
235        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
236            Ok(ExecutionResult::next())
237        }
238    }
239
240    #[derive(Default)]
241    struct StepC;
242
243    #[async_trait::async_trait]
244    impl StepBody for StepC {
245        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
246            Ok(ExecutionResult::next())
247        }
248    }
249
250    #[test]
251    fn build_empty_workflow() {
252        let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
253        assert_eq!(def.id, "empty");
254        assert_eq!(def.version, 1);
255        assert!(def.steps.is_empty());
256    }
257
258    #[test]
259    fn start_with_adds_first_step() {
260        let def = WorkflowBuilder::<TestData>::new()
261            .start_with::<StepA>()
262            .end_workflow()
263            .build("test", 1);
264        assert_eq!(def.steps.len(), 1);
265        assert!(def.steps[0].step_type.contains("StepA"));
266    }
267
268    #[test]
269    fn then_chains_two_steps_with_outcome() {
270        let def = WorkflowBuilder::<TestData>::new()
271            .start_with::<StepA>()
272            .then::<StepB>()
273            .end_workflow()
274            .build("test", 1);
275        assert_eq!(def.steps.len(), 2);
276        // Step 0 should have outcome pointing to step 1
277        assert_eq!(def.steps[0].outcomes.len(), 1);
278        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
279    }
280
281    #[test]
282    fn then_chains_three_steps() {
283        let def = WorkflowBuilder::<TestData>::new()
284            .start_with::<StepA>()
285            .then::<StepB>()
286            .then::<StepC>()
287            .end_workflow()
288            .build("test", 1);
289        assert_eq!(def.steps.len(), 3);
290        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
291        assert_eq!(def.steps[1].outcomes[0].next_step, 2);
292        assert!(def.steps[2].outcomes.is_empty());
293    }
294
295    #[test]
296    fn name_sets_step_name() {
297        let def = WorkflowBuilder::<TestData>::new()
298            .start_with::<StepA>()
299            .name("First Step")
300            .end_workflow()
301            .build("test", 1);
302        assert_eq!(def.steps[0].name, Some("First Step".into()));
303    }
304
305    #[test]
306    fn on_error_sets_behavior() {
307        let def = WorkflowBuilder::<TestData>::new()
308            .start_with::<StepA>()
309            .on_error(ErrorBehavior::Suspend)
310            .end_workflow()
311            .build("test", 1);
312        assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
313    }
314
315    #[test]
316    fn if_do_inserts_container_with_children() {
317        let def = WorkflowBuilder::<TestData>::new()
318            .start_with::<StepA>()
319            .if_do::<StepB>(|b| {
320                let id = b.add_step(std::any::type_name::<StepC>());
321                b.last_step = Some(id);
322            })
323            .end_workflow()
324            .build("test", 1);
325
326        // Steps: 0=StepA, 1=IfStep, 2=StepC (child)
327        // StepA -> IfStep -> (after if)
328        assert!(def.steps.len() >= 3);
329        // The If step should have StepC as a child
330        assert!(def.steps[1].step_type.contains("IfStep"));
331        assert!(def.steps[1].children.contains(&2));
332    }
333
334    #[test]
335    fn while_do_inserts_container() {
336        let def = WorkflowBuilder::<TestData>::new()
337            .start_with::<StepA>()
338            .while_do::<StepB>(|b| {
339                b.add_step(std::any::type_name::<StepC>());
340            })
341            .end_workflow()
342            .build("test", 1);
343
344        assert!(def.steps.len() >= 3);
345        assert!(def.steps[1].step_type.contains("WhileStep"));
346    }
347
348    #[test]
349    fn for_each_inserts_container() {
350        let def = WorkflowBuilder::<TestData>::new()
351            .start_with::<StepA>()
352            .for_each::<StepB>(|b| {
353                b.add_step(std::any::type_name::<StepC>());
354            })
355            .end_workflow()
356            .build("test", 1);
357
358        assert!(def.steps.len() >= 3);
359        assert!(def.steps[1].step_type.contains("ForEachStep"));
360    }
361
362    #[test]
363    fn parallel_creates_branches() {
364        let def = WorkflowBuilder::<TestData>::new()
365            .start_with::<StepA>()
366            .parallel(|branches| {
367                branches
368                    .branch(|b| {
369                        b.add_step(std::any::type_name::<StepB>());
370                    })
371                    .branch(|b| {
372                        b.add_step(std::any::type_name::<StepC>());
373                    })
374            })
375            .end_workflow()
376            .build("test", 1);
377
378        // Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC
379        assert!(def.steps.len() >= 4);
380        assert!(def.steps[1].step_type.contains("SequenceStep"));
381        assert!(def.steps[1].children.len() >= 2);
382    }
383
384    #[test]
385    fn saga_with_compensation() {
386        let def = WorkflowBuilder::<TestData>::new()
387            .start_with::<StepA>()
388            .saga(|b| {
389                b.add_step(std::any::type_name::<StepB>());
390                b.add_step(std::any::type_name::<StepC>());
391            })
392            .end_workflow()
393            .build("test", 1);
394
395        // Saga container should exist and have children
396        assert!(def.steps[1].step_type.contains("SagaContainerStep"));
397        assert!(def.steps[1].saga);
398        assert!(!def.steps[1].children.is_empty());
399    }
400
401    #[test]
402    fn compensate_with_sets_compensation_step() {
403        let def = WorkflowBuilder::<TestData>::new()
404            .start_with::<StepA>()
405            .compensate_with::<StepB>()
406            .end_workflow()
407            .build("test", 1);
408
409        // Step 0 (StepA) should have compensation pointing to step 1 (StepB)
410        assert_eq!(def.steps[0].compensation_step_id, Some(1));
411        assert!(def.steps[1].step_type.contains("StepB"));
412    }
413
414    #[test]
415    fn config_sets_step_config() {
416        let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
417        let def = WorkflowBuilder::<TestData>::new()
418            .start_with::<StepA>()
419            .config(cfg.clone())
420            .end_workflow()
421            .build("test", 1);
422        assert_eq!(def.steps[0].step_config, Some(cfg));
423    }
424
425    #[test]
426    fn config_chains_with_name() {
427        let cfg = serde_json::json!({"namespace": "data"});
428        let def = WorkflowBuilder::<TestData>::new()
429            .start_with::<StepA>()
430            .name("apply-data")
431            .config(cfg.clone())
432            .then::<StepB>()
433            .end_workflow()
434            .build("test", 1);
435        assert_eq!(def.steps[0].name, Some("apply-data".into()));
436        assert_eq!(def.steps[0].step_config, Some(cfg));
437        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
438    }
439
440    #[test]
441    fn config_on_multiple_steps_of_same_type() {
442        let cfg_a = serde_json::json!({"namespace": "ory"});
443        let cfg_b = serde_json::json!({"namespace": "data"});
444        let def = WorkflowBuilder::<TestData>::new()
445            .start_with::<StepA>()
446            .name("apply-ory")
447            .config(cfg_a.clone())
448            .then::<StepA>()
449            .name("apply-data")
450            .config(cfg_b.clone())
451            .end_workflow()
452            .build("test", 1);
453        assert_eq!(def.steps[0].step_config, Some(cfg_a));
454        assert_eq!(def.steps[1].step_config, Some(cfg_b));
455        // Both are StepA
456        assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
457    }
458
459    #[test]
460    fn add_step_typed_sets_name_and_config() {
461        let cfg = serde_json::json!({"namespace": "ory"});
462        let mut builder = WorkflowBuilder::<TestData>::new();
463        let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
464        assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
465        assert_eq!(builder.steps[id].step_config, Some(cfg));
466        assert!(builder.steps[id].step_type.contains("StepA"));
467    }
468
469    #[test]
470    fn add_step_typed_without_config() {
471        let mut builder = WorkflowBuilder::<TestData>::new();
472        let id = builder.add_step_typed::<StepB>("my-step", None);
473        assert_eq!(builder.steps[id].name, Some("my-step".into()));
474        assert_eq!(builder.steps[id].step_config, None);
475    }
476
477    #[test]
478    fn wire_outcome_connects_steps() {
479        let mut builder = WorkflowBuilder::<TestData>::new();
480        let id0 = builder.add_step_typed::<StepA>("first", None);
481        let id1 = builder.add_step_typed::<StepB>("second", None);
482        builder.wire_outcome(id0, id1, None);
483        assert_eq!(builder.steps[id0].outcomes.len(), 1);
484        assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
485    }
486
487    #[test]
488    fn inline_step_via_then_fn() {
489        let def = WorkflowBuilder::<TestData>::new()
490            .start_with::<StepA>()
491            .then_fn(ExecutionResult::next)
492            .end_workflow()
493            .build("test", 1);
494
495        assert_eq!(def.steps.len(), 2);
496        assert!(def.steps[1].step_type.contains("InlineStep"));
497        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
498    }
499
500    #[test]
501    fn default_error_behavior_preserved_in_build() {
502        let def = WorkflowBuilder::<TestData>::new()
503            .default_error_behavior(ErrorBehavior::Terminate)
504            .start_with::<StepA>()
505            .end_workflow()
506            .build("test", 1);
507
508        assert_eq!(def.default_error_behavior, ErrorBehavior::Terminate);
509    }
510
511    #[test]
512    fn default_error_behavior_omitted_uses_default() {
513        let def = WorkflowBuilder::<TestData>::new()
514            .start_with::<StepA>()
515            .end_workflow()
516            .build("test", 1);
517
518        assert_eq!(def.default_error_behavior, ErrorBehavior::default());
519    }
520}