1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
use crate::models::{ErrorBehavior, ExecutionResult};
use crate::primitives;
use crate::traits::step::{StepBody, WorkflowData};
use super::inline_step::InlineStep;
use super::workflow_builder::WorkflowBuilder;
/// Builder for configuring a single step in the workflow.
///
/// Owns the WorkflowBuilder, consuming self on each method call.
/// This avoids all lifetime/borrow issues.
pub struct StepBuilder<D: WorkflowData> {
builder: WorkflowBuilder<D>,
step_id: usize,
}
/// Builder for parallel branches.
pub struct ParallelBuilder<D: WorkflowData> {
builder: WorkflowBuilder<D>,
container_id: usize,
}
impl<D: WorkflowData> StepBuilder<D> {
pub(crate) fn new(builder: WorkflowBuilder<D>, step_id: usize) -> Self {
Self { builder, step_id }
}
/// Set the human-readable display name of the current step.
///
/// This name appears in logs, the execution trace, and the web UI.
pub fn name(mut self, name: &str) -> Self {
self.builder.steps[self.step_id].name = Some(name.to_string());
self
}
/// Set an external ID for forward references.
pub fn id(mut self, external_id: &str) -> Self {
self.builder.steps[self.step_id].external_id = Some(external_id.to_string());
self
}
/// Set the error handling behavior for this step.
///
/// When a step returns `Err` from its `run` method,
/// the executor checks this behavior to decide what to do next.
///
/// # Example
/// ```ignore
/// .then::< risky::Step>()
/// .name("Risky")
/// .on_error(ErrorBehavior::Retry {
/// interval: Duration::from_secs(5),
/// max_retries: 3,
/// })
/// ```
pub fn on_error(mut self, behavior: ErrorBehavior) -> Self {
self.builder.steps[self.step_id].error_behavior = Some(behavior);
self
}
/// Attach arbitrary JSON configuration to this step.
///
/// The step can read it at runtime via `context.step.step_config`.
pub fn config(mut self, config: serde_json::Value) -> Self {
self.builder.steps[self.step_id].step_config = Some(config);
self
}
/// Register a compensation step for saga rollback.
///
/// When this step fails inside a [`saga`](Self::saga) container, the executor
/// runs the compensation steps in reverse order to undo partial work.
///
/// # Example
/// ```ignore
/// .then::<ChargeCard>()
/// .name("Charge")
/// .compensate_with::<RefundCard>()
/// ```
pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
let comp_id = self.builder.add_step(std::any::type_name::<C>());
self.builder.steps[self.step_id].compensation_step_id = Some(comp_id);
self
}
/// Chain the next step sequentially.
///
/// Wires an outcome from the current step to the new one so that when the
/// current step returns [`ExecutionResult::next`](crate::models::ExecutionResult::next),
/// execution continues with `S`.
pub fn then<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
let next_id = self.builder.add_step(std::any::type_name::<S>());
self.builder.wire_outcome(self.step_id, next_id, None);
self.builder.last_step = Some(next_id);
StepBuilder::new(self.builder, next_id)
}
/// Chain an inline function step.
pub fn then_fn(
mut self,
f: impl Fn() -> ExecutionResult + Send + Sync + 'static,
) -> StepBuilder<D> {
let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
self.builder.wire_outcome(self.step_id, next_id, None);
self.builder.last_step = Some(next_id);
self.builder.inline_closures.insert(next_id, Box::new(f));
StepBuilder::new(self.builder, next_id)
}
/// Suspend the workflow until an external event arrives.
///
/// The workflow pauses and the executor releases the lock. When you call
/// `WorkflowHost::publish_event` (from the `wfe` crate) with
/// a matching `event_name` and `event_key`, the workflow resumes from this point.
///
/// # Example
/// ```ignore
/// .then::<RequestApproval>()
/// .name("Request Approval")
/// .wait_for("approval", "order-123")
/// .name("Wait for approval")
/// ```
pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
let next_id = self
.builder
.add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
self.builder.wire_outcome(self.step_id, next_id, None);
self.builder.last_step = Some(next_id);
self.builder.steps[next_id].step_config = Some(serde_json::json!({
"event_name": event_name,
"event_key": event_key,
}));
StepBuilder::new(self.builder, next_id)
}
/// Pause execution for a fixed duration.
///
/// The executor persists the workflow, sleeps for the given duration, then
/// re-queues the instance for continued execution.
pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
let next_id = self
.builder
.add_step(std::any::type_name::<primitives::delay::DelayStep>());
self.builder.wire_outcome(self.step_id, next_id, None);
self.builder.last_step = Some(next_id);
self.builder.steps[next_id].step_config = Some(serde_json::json!({
"duration_millis": duration.as_millis() as u64,
}));
StepBuilder::new(self.builder, next_id)
}
/// Conditional branching.
///
/// The closure builds the child steps that run when the condition evaluates
/// to `true`. Use `.then::<ConditionStep>().if_do(|b| { ... })` where
/// `ConditionStep` returns [`ExecutionResult::branch("true")`](crate::models::ExecutionResult::branch)
/// or `branch("false")` to control which path is taken.
pub fn if_do<S: StepBody + Default + 'static>(
mut self,
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
) -> StepBuilder<D> {
let if_id = self
.builder
.add_step(std::any::type_name::<primitives::if_step::IfStep>());
self.builder.wire_outcome(self.step_id, if_id, None);
// Build children
let before_count = self.builder.steps.len();
build_children(&mut self.builder);
let after_count = self.builder.steps.len();
// Register children with the If step
for child_id in before_count..after_count {
self.builder.add_child(if_id, child_id);
}
self.builder.last_step = Some(if_id);
StepBuilder::new(self.builder, if_id)
}
/// Loop while a condition holds.
///
/// The closure builds the body of the loop. A condition step (type `S`)
/// should return [`ExecutionResult::next()`](crate::models::ExecutionResult::next)
/// to continue looping or `ExecutionResult::next()` with a condition that evaluates to `false`
/// to break out.
pub fn while_do<S: StepBody + Default + 'static>(
mut self,
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
) -> StepBuilder<D> {
let while_id = self
.builder
.add_step(std::any::type_name::<primitives::while_step::WhileStep>());
self.builder.wire_outcome(self.step_id, while_id, None);
let before_count = self.builder.steps.len();
build_children(&mut self.builder);
let after_count = self.builder.steps.len();
for child_id in before_count..after_count {
self.builder.add_child(while_id, child_id);
}
self.builder.last_step = Some(while_id);
StepBuilder::new(self.builder, while_id)
}
/// Iterate over a collection.
///
/// The step type `S` receives each item via
/// [`StepExecutionContext::item`](crate::traits::step::StepExecutionContext::item).
/// The collection is taken from the workflow data field configured in the
/// step's `step_config`.
pub fn for_each<S: StepBody + Default + 'static>(
mut self,
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
) -> StepBuilder<D> {
let fe_id = self
.builder
.add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
self.builder.wire_outcome(self.step_id, fe_id, None);
let before_count = self.builder.steps.len();
build_children(&mut self.builder);
let after_count = self.builder.steps.len();
for child_id in before_count..after_count {
self.builder.add_child(fe_id, child_id);
}
self.builder.last_step = Some(fe_id);
StepBuilder::new(self.builder, fe_id)
}
/// Transaction-like container with compensation on failure.
///
/// Child steps run normally. If any child fails, the executor runs
/// compensation steps (registered via [`compensate_with`](Self::compensate_with))
/// in reverse order to undo partial work.
///
/// # Example
/// ```ignore
/// .saga(|b| {
/// b.add_step_typed::<ReserveInventory>("reserve", None);
/// b.add_step_typed::<ChargeCard>("charge", None);
/// b.add_step_typed::<ShipOrder>("ship", None);
/// })
/// ```
pub fn saga(mut self, build_children: impl FnOnce(&mut WorkflowBuilder<D>)) -> StepBuilder<D> {
let saga_id = self.builder.add_step(std::any::type_name::<
primitives::saga_container::SagaContainerStep,
>());
self.builder.steps[saga_id].saga = true;
self.builder.wire_outcome(self.step_id, saga_id, None);
let before_count = self.builder.steps.len();
build_children(&mut self.builder);
let after_count = self.builder.steps.len();
for child_id in before_count..after_count {
self.builder.add_child(saga_id, child_id);
}
self.builder.last_step = Some(saga_id);
StepBuilder::new(self.builder, saga_id)
}
/// Run multiple branches concurrently.
///
/// All branches inside the [`ParallelBuilder`] execute in parallel. The
/// workflow continues to the next step only after **all** branches complete.
pub fn parallel(
mut self,
build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
) -> StepBuilder<D> {
let seq_id = self
.builder
.add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
self.builder.wire_outcome(self.step_id, seq_id, None);
let pb = ParallelBuilder {
builder: self.builder,
container_id: seq_id,
};
let pb = build_branches(pb);
let mut builder = pb.builder;
builder.last_step = Some(seq_id);
StepBuilder::new(builder, seq_id)
}
/// Finish building the current branch and return the [`WorkflowBuilder`].
///
/// Call this when you are done chaining steps. You can then call
/// [`WorkflowBuilder::build`] to compile the definition.
///
/// # Example
/// ```ignore
/// let def = WorkflowBuilder::<MyData>::new()
/// .start_with::<A>()
/// .then::<B>()
/// .end_workflow()
/// .build("my-workflow", 1);
/// ```
pub fn end_workflow(self) -> WorkflowBuilder<D> {
self.builder
}
/// Shortcut for `end_workflow().build(id, version)`.
pub fn build(self, id: impl Into<String>, version: u32) -> crate::models::WorkflowDefinition {
self.builder.build(id, version)
}
}
impl<D: WorkflowData> ParallelBuilder<D> {
/// Add a parallel branch.
pub fn branch(mut self, build_branch: impl FnOnce(&mut WorkflowBuilder<D>)) -> Self {
let before_count = self.builder.steps.len();
build_branch(&mut self.builder);
let after_count = self.builder.steps.len();
for child_id in before_count..after_count {
self.builder.add_child(self.container_id, child_id);
}
self
}
}