Skip to main content

wfe_core/builder/
step_builder.rs

1use crate::models::{ErrorBehavior, ExecutionResult};
2use crate::primitives;
3use crate::traits::step::{StepBody, WorkflowData};
4
5use super::inline_step::InlineStep;
6use super::workflow_builder::WorkflowBuilder;
7
8/// Builder for configuring a single step in the workflow.
9///
10/// Owns the WorkflowBuilder, consuming self on each method call.
11/// This avoids all lifetime/borrow issues.
12pub struct StepBuilder<D: WorkflowData> {
13    builder: WorkflowBuilder<D>,
14    step_id: usize,
15}
16
17/// Builder for parallel branches.
18pub struct ParallelBuilder<D: WorkflowData> {
19    builder: WorkflowBuilder<D>,
20    container_id: usize,
21}
22
23impl<D: WorkflowData> StepBuilder<D> {
24    pub(crate) fn new(builder: WorkflowBuilder<D>, step_id: usize) -> Self {
25        Self { builder, step_id }
26    }
27
28    /// Set the human-readable display name of the current step.
29    ///
30    /// This name appears in logs, the execution trace, and the web UI.
31    pub fn name(mut self, name: &str) -> Self {
32        self.builder.steps[self.step_id].name = Some(name.to_string());
33        self
34    }
35
36    /// Set an external ID for forward references.
37    pub fn id(mut self, external_id: &str) -> Self {
38        self.builder.steps[self.step_id].external_id = Some(external_id.to_string());
39        self
40    }
41
42    /// Set the error handling behavior for this step.
43    ///
44    /// When a step returns `Err` from its `run` method,
45    /// the executor checks this behavior to decide what to do next.
46    ///
47    /// # Example
48    /// ```ignore
49    /// .then::< risky::Step>()
50    ///     .name("Risky")
51    ///     .on_error(ErrorBehavior::Retry {
52    ///         interval: Duration::from_secs(5),
53    ///         max_retries: 3,
54    ///     })
55    /// ```
56    pub fn on_error(mut self, behavior: ErrorBehavior) -> Self {
57        self.builder.steps[self.step_id].error_behavior = Some(behavior);
58        self
59    }
60
61    /// Attach arbitrary JSON configuration to this step.
62    ///
63    /// The step can read it at runtime via `context.step.step_config`.
64    pub fn config(mut self, config: serde_json::Value) -> Self {
65        self.builder.steps[self.step_id].step_config = Some(config);
66        self
67    }
68
69    /// Register a compensation step for saga rollback.
70    ///
71    /// When this step fails inside a [`saga`](Self::saga) container, the executor
72    /// runs the compensation steps in reverse order to undo partial work.
73    ///
74    /// # Example
75    /// ```ignore
76    /// .then::<ChargeCard>()
77    ///     .name("Charge")
78    ///     .compensate_with::<RefundCard>()
79    /// ```
80    pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
81        let comp_id = self.builder.add_step(std::any::type_name::<C>());
82        self.builder.steps[self.step_id].compensation_step_id = Some(comp_id);
83        self
84    }
85
86    /// Chain the next step sequentially.
87    ///
88    /// Wires an outcome from the current step to the new one so that when the
89    /// current step returns [`ExecutionResult::next`](crate::models::ExecutionResult::next),
90    /// execution continues with `S`.
91    pub fn then<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
92        let next_id = self.builder.add_step(std::any::type_name::<S>());
93        self.builder.wire_outcome(self.step_id, next_id, None);
94        self.builder.last_step = Some(next_id);
95        StepBuilder::new(self.builder, next_id)
96    }
97
98    /// Chain an inline function step.
99    pub fn then_fn(
100        mut self,
101        f: impl Fn() -> ExecutionResult + Send + Sync + 'static,
102    ) -> StepBuilder<D> {
103        let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
104        self.builder.wire_outcome(self.step_id, next_id, None);
105        self.builder.last_step = Some(next_id);
106        self.builder.inline_closures.insert(next_id, Box::new(f));
107        StepBuilder::new(self.builder, next_id)
108    }
109
110    /// Suspend the workflow until an external event arrives.
111    ///
112    /// The workflow pauses and the executor releases the lock. When you call
113    /// `WorkflowHost::publish_event` (from the `wfe` crate) with
114    /// a matching `event_name` and `event_key`, the workflow resumes from this point.
115    ///
116    /// # Example
117    /// ```ignore
118    /// .then::<RequestApproval>()
119    ///     .name("Request Approval")
120    /// .wait_for("approval", "order-123")
121    ///     .name("Wait for approval")
122    /// ```
123    pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
124        let next_id = self
125            .builder
126            .add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
127        self.builder.wire_outcome(self.step_id, next_id, None);
128        self.builder.last_step = Some(next_id);
129        self.builder.steps[next_id].step_config = Some(serde_json::json!({
130            "event_name": event_name,
131            "event_key": event_key,
132        }));
133        StepBuilder::new(self.builder, next_id)
134    }
135
136    /// Pause execution for a fixed duration.
137    ///
138    /// The executor persists the workflow, sleeps for the given duration, then
139    /// re-queues the instance for continued execution.
140    pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
141        let next_id = self
142            .builder
143            .add_step(std::any::type_name::<primitives::delay::DelayStep>());
144        self.builder.wire_outcome(self.step_id, next_id, None);
145        self.builder.last_step = Some(next_id);
146        self.builder.steps[next_id].step_config = Some(serde_json::json!({
147            "duration_millis": duration.as_millis() as u64,
148        }));
149        StepBuilder::new(self.builder, next_id)
150    }
151
152    /// Conditional branching.
153    ///
154    /// The closure builds the child steps that run when the condition evaluates
155    /// to `true`. Use `.then::<ConditionStep>().if_do(|b| { ... })` where
156    /// `ConditionStep` returns [`ExecutionResult::branch("true")`](crate::models::ExecutionResult::branch)
157    /// or `branch("false")` to control which path is taken.
158    pub fn if_do<S: StepBody + Default + 'static>(
159        mut self,
160        build_children: impl FnOnce(&mut WorkflowBuilder<D>),
161    ) -> StepBuilder<D> {
162        let if_id = self
163            .builder
164            .add_step(std::any::type_name::<primitives::if_step::IfStep>());
165        self.builder.wire_outcome(self.step_id, if_id, None);
166
167        // Build children
168        let before_count = self.builder.steps.len();
169        build_children(&mut self.builder);
170        let after_count = self.builder.steps.len();
171
172        // Register children with the If step
173        for child_id in before_count..after_count {
174            self.builder.add_child(if_id, child_id);
175        }
176
177        self.builder.last_step = Some(if_id);
178        StepBuilder::new(self.builder, if_id)
179    }
180
181    /// Loop while a condition holds.
182    ///
183    /// The closure builds the body of the loop. A condition step (type `S`)
184    /// should return [`ExecutionResult::next()`](crate::models::ExecutionResult::next)
185    /// to continue looping or `ExecutionResult::next()` with a condition that evaluates to `false`
186    /// to break out.
187    pub fn while_do<S: StepBody + Default + 'static>(
188        mut self,
189        build_children: impl FnOnce(&mut WorkflowBuilder<D>),
190    ) -> StepBuilder<D> {
191        let while_id = self
192            .builder
193            .add_step(std::any::type_name::<primitives::while_step::WhileStep>());
194        self.builder.wire_outcome(self.step_id, while_id, None);
195
196        let before_count = self.builder.steps.len();
197        build_children(&mut self.builder);
198        let after_count = self.builder.steps.len();
199
200        for child_id in before_count..after_count {
201            self.builder.add_child(while_id, child_id);
202        }
203
204        self.builder.last_step = Some(while_id);
205        StepBuilder::new(self.builder, while_id)
206    }
207
208    /// Iterate over a collection.
209    ///
210    /// The step type `S` receives each item via
211    /// [`StepExecutionContext::item`](crate::traits::step::StepExecutionContext::item).
212    /// The collection is taken from the workflow data field configured in the
213    /// step's `step_config`.
214    pub fn for_each<S: StepBody + Default + 'static>(
215        mut self,
216        build_children: impl FnOnce(&mut WorkflowBuilder<D>),
217    ) -> StepBuilder<D> {
218        let fe_id = self
219            .builder
220            .add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
221        self.builder.wire_outcome(self.step_id, fe_id, None);
222
223        let before_count = self.builder.steps.len();
224        build_children(&mut self.builder);
225        let after_count = self.builder.steps.len();
226
227        for child_id in before_count..after_count {
228            self.builder.add_child(fe_id, child_id);
229        }
230
231        self.builder.last_step = Some(fe_id);
232        StepBuilder::new(self.builder, fe_id)
233    }
234
235    /// Transaction-like container with compensation on failure.
236    ///
237    /// Child steps run normally. If any child fails, the executor runs
238    /// compensation steps (registered via [`compensate_with`](Self::compensate_with))
239    /// in reverse order to undo partial work.
240    ///
241    /// # Example
242    /// ```ignore
243    /// .saga(|b| {
244    ///     b.add_step_typed::<ReserveInventory>("reserve", None);
245    ///     b.add_step_typed::<ChargeCard>("charge", None);
246    ///     b.add_step_typed::<ShipOrder>("ship", None);
247    /// })
248    /// ```
249    pub fn saga(mut self, build_children: impl FnOnce(&mut WorkflowBuilder<D>)) -> StepBuilder<D> {
250        let saga_id = self.builder.add_step(std::any::type_name::<
251            primitives::saga_container::SagaContainerStep,
252        >());
253        self.builder.steps[saga_id].saga = true;
254        self.builder.wire_outcome(self.step_id, saga_id, None);
255
256        let before_count = self.builder.steps.len();
257        build_children(&mut self.builder);
258        let after_count = self.builder.steps.len();
259
260        for child_id in before_count..after_count {
261            self.builder.add_child(saga_id, child_id);
262        }
263
264        self.builder.last_step = Some(saga_id);
265        StepBuilder::new(self.builder, saga_id)
266    }
267
268    /// Run multiple branches concurrently.
269    ///
270    /// All branches inside the [`ParallelBuilder`] execute in parallel. The
271    /// workflow continues to the next step only after **all** branches complete.
272    pub fn parallel(
273        mut self,
274        build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
275    ) -> StepBuilder<D> {
276        let seq_id = self
277            .builder
278            .add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
279        self.builder.wire_outcome(self.step_id, seq_id, None);
280
281        let pb = ParallelBuilder {
282            builder: self.builder,
283            container_id: seq_id,
284        };
285        let pb = build_branches(pb);
286        let mut builder = pb.builder;
287        builder.last_step = Some(seq_id);
288        StepBuilder::new(builder, seq_id)
289    }
290
291    /// Finish building the current branch and return the [`WorkflowBuilder`].
292    ///
293    /// Call this when you are done chaining steps. You can then call
294    /// [`WorkflowBuilder::build`] to compile the definition.
295    ///
296    /// # Example
297    /// ```ignore
298    /// let def = WorkflowBuilder::<MyData>::new()
299    ///     .start_with::<A>()
300    ///     .then::<B>()
301    ///     .end_workflow()
302    ///     .build("my-workflow", 1);
303    /// ```
304    pub fn end_workflow(self) -> WorkflowBuilder<D> {
305        self.builder
306    }
307
308    /// Shortcut for `end_workflow().build(id, version)`.
309    pub fn build(self, id: impl Into<String>, version: u32) -> crate::models::WorkflowDefinition {
310        self.builder.build(id, version)
311    }
312}
313
314impl<D: WorkflowData> ParallelBuilder<D> {
315    /// Add a parallel branch.
316    pub fn branch(mut self, build_branch: impl FnOnce(&mut WorkflowBuilder<D>)) -> Self {
317        let before_count = self.builder.steps.len();
318        build_branch(&mut self.builder);
319        let after_count = self.builder.steps.len();
320
321        for child_id in before_count..after_count {
322            self.builder.add_child(self.container_id, child_id);
323        }
324        self
325    }
326}