Skip to main content

wfe_core/builder/
step_builder.rs

1use crate::models::{ErrorBehavior, ExecutionResult};
2use crate::primitives;
3use crate::traits::step::{StepBody, WorkflowData};
4
5use super::inline_step::InlineStep;
6use super::workflow_builder::WorkflowBuilder;
7
8/// Builder for configuring a single step in the workflow.
9///
10/// Owns the WorkflowBuilder, consuming self on each method call.
11/// This avoids all lifetime/borrow issues.
12pub struct StepBuilder<D: WorkflowData> {
13    builder: WorkflowBuilder<D>,
14    step_id: usize,
15}
16
17/// Builder for parallel branches.
18pub struct ParallelBuilder<D: WorkflowData> {
19    builder: WorkflowBuilder<D>,
20    container_id: usize,
21}
22
23impl<D: WorkflowData> StepBuilder<D> {
24    pub(crate) fn new(builder: WorkflowBuilder<D>, step_id: usize) -> Self {
25        Self { builder, step_id }
26    }
27
28    /// Set the display name of the current step.
29    pub fn name(mut self, name: &str) -> Self {
30        self.builder.steps[self.step_id].name = Some(name.to_string());
31        self
32    }
33
34    /// Set an external ID for forward references.
35    pub fn id(mut self, external_id: &str) -> Self {
36        self.builder.steps[self.step_id].external_id = Some(external_id.to_string());
37        self
38    }
39
40    /// Set the error handling behavior for this step.
41    pub fn on_error(mut self, behavior: ErrorBehavior) -> Self {
42        self.builder.steps[self.step_id].error_behavior = Some(behavior);
43        self
44    }
45
46    /// Attach arbitrary JSON configuration to this step.
47    ///
48    /// The step can read it at runtime via `context.step.step_config`.
49    pub fn config(mut self, config: serde_json::Value) -> Self {
50        self.builder.steps[self.step_id].step_config = Some(config);
51        self
52    }
53
54    /// Add a compensation step for saga rollback.
55    pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
56        let comp_id = self.builder.add_step(std::any::type_name::<C>());
57        self.builder.steps[self.step_id].compensation_step_id = Some(comp_id);
58        self
59    }
60
61    /// Chain the next step. Wires an outcome from the current step to the new one.
62    pub fn then<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
63        let next_id = self.builder.add_step(std::any::type_name::<S>());
64        self.builder.wire_outcome(self.step_id, next_id, None);
65        self.builder.last_step = Some(next_id);
66        StepBuilder::new(self.builder, next_id)
67    }
68
69    /// Chain an inline function step.
70    pub fn then_fn(
71        mut self,
72        f: impl Fn() -> ExecutionResult + Send + Sync + 'static,
73    ) -> StepBuilder<D> {
74        let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
75        self.builder.wire_outcome(self.step_id, next_id, None);
76        self.builder.last_step = Some(next_id);
77        self.builder.inline_closures.insert(next_id, Box::new(f));
78        StepBuilder::new(self.builder, next_id)
79    }
80
81    /// Insert a WaitFor step.
82    pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
83        let next_id = self
84            .builder
85            .add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
86        self.builder.wire_outcome(self.step_id, next_id, None);
87        self.builder.last_step = Some(next_id);
88        self.builder.steps[next_id].step_config = Some(serde_json::json!({
89            "event_name": event_name,
90            "event_key": event_key,
91        }));
92        StepBuilder::new(self.builder, next_id)
93    }
94
95    /// Insert a Delay step.
96    pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
97        let next_id = self
98            .builder
99            .add_step(std::any::type_name::<primitives::delay::DelayStep>());
100        self.builder.wire_outcome(self.step_id, next_id, None);
101        self.builder.last_step = Some(next_id);
102        self.builder.steps[next_id].step_config = Some(serde_json::json!({
103            "duration_millis": duration.as_millis() as u64,
104        }));
105        StepBuilder::new(self.builder, next_id)
106    }
107
108    /// Insert an If container step with child steps built by the closure.
109    /// The type parameter S is the step type used for the If condition evaluation.
110    pub fn if_do<S: StepBody + Default + 'static>(
111        mut self,
112        build_children: impl FnOnce(&mut WorkflowBuilder<D>),
113    ) -> StepBuilder<D> {
114        let if_id = self
115            .builder
116            .add_step(std::any::type_name::<primitives::if_step::IfStep>());
117        self.builder.wire_outcome(self.step_id, if_id, None);
118
119        // Build children
120        let before_count = self.builder.steps.len();
121        build_children(&mut self.builder);
122        let after_count = self.builder.steps.len();
123
124        // Register children with the If step
125        for child_id in before_count..after_count {
126            self.builder.add_child(if_id, child_id);
127        }
128
129        self.builder.last_step = Some(if_id);
130        StepBuilder::new(self.builder, if_id)
131    }
132
133    /// Insert a While container step.
134    pub fn while_do<S: StepBody + Default + 'static>(
135        mut self,
136        build_children: impl FnOnce(&mut WorkflowBuilder<D>),
137    ) -> StepBuilder<D> {
138        let while_id = self
139            .builder
140            .add_step(std::any::type_name::<primitives::while_step::WhileStep>());
141        self.builder.wire_outcome(self.step_id, while_id, None);
142
143        let before_count = self.builder.steps.len();
144        build_children(&mut self.builder);
145        let after_count = self.builder.steps.len();
146
147        for child_id in before_count..after_count {
148            self.builder.add_child(while_id, child_id);
149        }
150
151        self.builder.last_step = Some(while_id);
152        StepBuilder::new(self.builder, while_id)
153    }
154
155    /// Insert a ForEach container step.
156    pub fn for_each<S: StepBody + Default + 'static>(
157        mut self,
158        build_children: impl FnOnce(&mut WorkflowBuilder<D>),
159    ) -> StepBuilder<D> {
160        let fe_id = self
161            .builder
162            .add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
163        self.builder.wire_outcome(self.step_id, fe_id, None);
164
165        let before_count = self.builder.steps.len();
166        build_children(&mut self.builder);
167        let after_count = self.builder.steps.len();
168
169        for child_id in before_count..after_count {
170            self.builder.add_child(fe_id, child_id);
171        }
172
173        self.builder.last_step = Some(fe_id);
174        StepBuilder::new(self.builder, fe_id)
175    }
176
177    /// Insert a Saga container step with child steps.
178    pub fn saga(mut self, build_children: impl FnOnce(&mut WorkflowBuilder<D>)) -> StepBuilder<D> {
179        let saga_id = self.builder.add_step(std::any::type_name::<
180            primitives::saga_container::SagaContainerStep,
181        >());
182        self.builder.steps[saga_id].saga = true;
183        self.builder.wire_outcome(self.step_id, saga_id, None);
184
185        let before_count = self.builder.steps.len();
186        build_children(&mut self.builder);
187        let after_count = self.builder.steps.len();
188
189        for child_id in before_count..after_count {
190            self.builder.add_child(saga_id, child_id);
191        }
192
193        self.builder.last_step = Some(saga_id);
194        StepBuilder::new(self.builder, saga_id)
195    }
196
197    /// Start a parallel block.
198    pub fn parallel(
199        mut self,
200        build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
201    ) -> StepBuilder<D> {
202        let seq_id = self
203            .builder
204            .add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
205        self.builder.wire_outcome(self.step_id, seq_id, None);
206
207        let pb = ParallelBuilder {
208            builder: self.builder,
209            container_id: seq_id,
210        };
211        let pb = build_branches(pb);
212        let mut builder = pb.builder;
213        builder.last_step = Some(seq_id);
214        StepBuilder::new(builder, seq_id)
215    }
216
217    /// Mark this step as the terminal step (no further outcomes).
218    pub fn end_workflow(self) -> WorkflowBuilder<D> {
219        self.builder
220    }
221
222    /// Access the compiled definition directly (shortcut for end_workflow().build()).
223    pub fn build(self, id: impl Into<String>, version: u32) -> crate::models::WorkflowDefinition {
224        self.builder.build(id, version)
225    }
226}
227
228impl<D: WorkflowData> ParallelBuilder<D> {
229    /// Add a parallel branch.
230    pub fn branch(mut self, build_branch: impl FnOnce(&mut WorkflowBuilder<D>)) -> Self {
231        let before_count = self.builder.steps.len();
232        build_branch(&mut self.builder);
233        let after_count = self.builder.steps.len();
234
235        for child_id in before_count..after_count {
236            self.builder.add_child(self.container_id, child_id);
237        }
238        self
239    }
240}