wfe_core/builder/
step_builder.rs1use crate::models::{ErrorBehavior, ExecutionResult};
2use crate::primitives;
3use crate::traits::step::{StepBody, WorkflowData};
4
5use super::inline_step::InlineStep;
6use super::workflow_builder::WorkflowBuilder;
7
8pub struct StepBuilder<D: WorkflowData> {
13 builder: WorkflowBuilder<D>,
14 step_id: usize,
15}
16
17pub struct ParallelBuilder<D: WorkflowData> {
19 builder: WorkflowBuilder<D>,
20 container_id: usize,
21}
22
23impl<D: WorkflowData> StepBuilder<D> {
24 pub(crate) fn new(builder: WorkflowBuilder<D>, step_id: usize) -> Self {
25 Self { builder, step_id }
26 }
27
28 pub fn name(mut self, name: &str) -> Self {
32 self.builder.steps[self.step_id].name = Some(name.to_string());
33 self
34 }
35
36 pub fn id(mut self, external_id: &str) -> Self {
38 self.builder.steps[self.step_id].external_id = Some(external_id.to_string());
39 self
40 }
41
42 pub fn on_error(mut self, behavior: ErrorBehavior) -> Self {
57 self.builder.steps[self.step_id].error_behavior = Some(behavior);
58 self
59 }
60
61 pub fn config(mut self, config: serde_json::Value) -> Self {
65 self.builder.steps[self.step_id].step_config = Some(config);
66 self
67 }
68
69 pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
81 let comp_id = self.builder.add_step(std::any::type_name::<C>());
82 self.builder.steps[self.step_id].compensation_step_id = Some(comp_id);
83 self
84 }
85
86 pub fn then<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
92 let next_id = self.builder.add_step(std::any::type_name::<S>());
93 self.builder.wire_outcome(self.step_id, next_id, None);
94 self.builder.last_step = Some(next_id);
95 StepBuilder::new(self.builder, next_id)
96 }
97
98 pub fn then_fn(
100 mut self,
101 f: impl Fn() -> ExecutionResult + Send + Sync + 'static,
102 ) -> StepBuilder<D> {
103 let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
104 self.builder.wire_outcome(self.step_id, next_id, None);
105 self.builder.last_step = Some(next_id);
106 self.builder.inline_closures.insert(next_id, Box::new(f));
107 StepBuilder::new(self.builder, next_id)
108 }
109
110 pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
124 let next_id = self
125 .builder
126 .add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
127 self.builder.wire_outcome(self.step_id, next_id, None);
128 self.builder.last_step = Some(next_id);
129 self.builder.steps[next_id].step_config = Some(serde_json::json!({
130 "event_name": event_name,
131 "event_key": event_key,
132 }));
133 StepBuilder::new(self.builder, next_id)
134 }
135
136 pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
141 let next_id = self
142 .builder
143 .add_step(std::any::type_name::<primitives::delay::DelayStep>());
144 self.builder.wire_outcome(self.step_id, next_id, None);
145 self.builder.last_step = Some(next_id);
146 self.builder.steps[next_id].step_config = Some(serde_json::json!({
147 "duration_millis": duration.as_millis() as u64,
148 }));
149 StepBuilder::new(self.builder, next_id)
150 }
151
152 pub fn if_do<S: StepBody + Default + 'static>(
159 mut self,
160 build_children: impl FnOnce(&mut WorkflowBuilder<D>),
161 ) -> StepBuilder<D> {
162 let if_id = self
163 .builder
164 .add_step(std::any::type_name::<primitives::if_step::IfStep>());
165 self.builder.wire_outcome(self.step_id, if_id, None);
166
167 let before_count = self.builder.steps.len();
169 build_children(&mut self.builder);
170 let after_count = self.builder.steps.len();
171
172 for child_id in before_count..after_count {
174 self.builder.add_child(if_id, child_id);
175 }
176
177 self.builder.last_step = Some(if_id);
178 StepBuilder::new(self.builder, if_id)
179 }
180
181 pub fn while_do<S: StepBody + Default + 'static>(
188 mut self,
189 build_children: impl FnOnce(&mut WorkflowBuilder<D>),
190 ) -> StepBuilder<D> {
191 let while_id = self
192 .builder
193 .add_step(std::any::type_name::<primitives::while_step::WhileStep>());
194 self.builder.wire_outcome(self.step_id, while_id, None);
195
196 let before_count = self.builder.steps.len();
197 build_children(&mut self.builder);
198 let after_count = self.builder.steps.len();
199
200 for child_id in before_count..after_count {
201 self.builder.add_child(while_id, child_id);
202 }
203
204 self.builder.last_step = Some(while_id);
205 StepBuilder::new(self.builder, while_id)
206 }
207
208 pub fn for_each<S: StepBody + Default + 'static>(
215 mut self,
216 build_children: impl FnOnce(&mut WorkflowBuilder<D>),
217 ) -> StepBuilder<D> {
218 let fe_id = self
219 .builder
220 .add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
221 self.builder.wire_outcome(self.step_id, fe_id, None);
222
223 let before_count = self.builder.steps.len();
224 build_children(&mut self.builder);
225 let after_count = self.builder.steps.len();
226
227 for child_id in before_count..after_count {
228 self.builder.add_child(fe_id, child_id);
229 }
230
231 self.builder.last_step = Some(fe_id);
232 StepBuilder::new(self.builder, fe_id)
233 }
234
235 pub fn saga(mut self, build_children: impl FnOnce(&mut WorkflowBuilder<D>)) -> StepBuilder<D> {
250 let saga_id = self.builder.add_step(std::any::type_name::<
251 primitives::saga_container::SagaContainerStep,
252 >());
253 self.builder.steps[saga_id].saga = true;
254 self.builder.wire_outcome(self.step_id, saga_id, None);
255
256 let before_count = self.builder.steps.len();
257 build_children(&mut self.builder);
258 let after_count = self.builder.steps.len();
259
260 for child_id in before_count..after_count {
261 self.builder.add_child(saga_id, child_id);
262 }
263
264 self.builder.last_step = Some(saga_id);
265 StepBuilder::new(self.builder, saga_id)
266 }
267
268 pub fn parallel(
273 mut self,
274 build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
275 ) -> StepBuilder<D> {
276 let seq_id = self
277 .builder
278 .add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
279 self.builder.wire_outcome(self.step_id, seq_id, None);
280
281 let pb = ParallelBuilder {
282 builder: self.builder,
283 container_id: seq_id,
284 };
285 let pb = build_branches(pb);
286 let mut builder = pb.builder;
287 builder.last_step = Some(seq_id);
288 StepBuilder::new(builder, seq_id)
289 }
290
291 pub fn end_workflow(self) -> WorkflowBuilder<D> {
305 self.builder
306 }
307
308 pub fn build(self, id: impl Into<String>, version: u32) -> crate::models::WorkflowDefinition {
310 self.builder.build(id, version)
311 }
312}
313
314impl<D: WorkflowData> ParallelBuilder<D> {
315 pub fn branch(mut self, build_branch: impl FnOnce(&mut WorkflowBuilder<D>)) -> Self {
317 let before_count = self.builder.steps.len();
318 build_branch(&mut self.builder);
319 let after_count = self.builder.steps.len();
320
321 for child_id in before_count..after_count {
322 self.builder.add_child(self.container_id, child_id);
323 }
324 self
325 }
326}