wfe_core/builder/
step_builder.rs1use crate::models::{ErrorBehavior, ExecutionResult};
2use crate::primitives;
3use crate::traits::step::{StepBody, WorkflowData};
4
5use super::inline_step::InlineStep;
6use super::workflow_builder::WorkflowBuilder;
7
8pub struct StepBuilder<D: WorkflowData> {
13 builder: WorkflowBuilder<D>,
14 step_id: usize,
15}
16
17pub struct ParallelBuilder<D: WorkflowData> {
19 builder: WorkflowBuilder<D>,
20 container_id: usize,
21}
22
23impl<D: WorkflowData> StepBuilder<D> {
24 pub(crate) fn new(builder: WorkflowBuilder<D>, step_id: usize) -> Self {
25 Self { builder, step_id }
26 }
27
28 pub fn name(mut self, name: &str) -> Self {
30 self.builder.steps[self.step_id].name = Some(name.to_string());
31 self
32 }
33
34 pub fn id(mut self, external_id: &str) -> Self {
36 self.builder.steps[self.step_id].external_id = Some(external_id.to_string());
37 self
38 }
39
40 pub fn on_error(mut self, behavior: ErrorBehavior) -> Self {
42 self.builder.steps[self.step_id].error_behavior = Some(behavior);
43 self
44 }
45
46 pub fn config(mut self, config: serde_json::Value) -> Self {
50 self.builder.steps[self.step_id].step_config = Some(config);
51 self
52 }
53
54 pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
56 let comp_id = self.builder.add_step(std::any::type_name::<C>());
57 self.builder.steps[self.step_id].compensation_step_id = Some(comp_id);
58 self
59 }
60
61 pub fn then<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
63 let next_id = self.builder.add_step(std::any::type_name::<S>());
64 self.builder.wire_outcome(self.step_id, next_id, None);
65 self.builder.last_step = Some(next_id);
66 StepBuilder::new(self.builder, next_id)
67 }
68
69 pub fn then_fn(
71 mut self,
72 f: impl Fn() -> ExecutionResult + Send + Sync + 'static,
73 ) -> StepBuilder<D> {
74 let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
75 self.builder.wire_outcome(self.step_id, next_id, None);
76 self.builder.last_step = Some(next_id);
77 self.builder.inline_closures.insert(next_id, Box::new(f));
78 StepBuilder::new(self.builder, next_id)
79 }
80
81 pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
83 let next_id = self
84 .builder
85 .add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
86 self.builder.wire_outcome(self.step_id, next_id, None);
87 self.builder.last_step = Some(next_id);
88 self.builder.steps[next_id].step_config = Some(serde_json::json!({
89 "event_name": event_name,
90 "event_key": event_key,
91 }));
92 StepBuilder::new(self.builder, next_id)
93 }
94
95 pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
97 let next_id = self
98 .builder
99 .add_step(std::any::type_name::<primitives::delay::DelayStep>());
100 self.builder.wire_outcome(self.step_id, next_id, None);
101 self.builder.last_step = Some(next_id);
102 self.builder.steps[next_id].step_config = Some(serde_json::json!({
103 "duration_millis": duration.as_millis() as u64,
104 }));
105 StepBuilder::new(self.builder, next_id)
106 }
107
108 pub fn if_do<S: StepBody + Default + 'static>(
111 mut self,
112 build_children: impl FnOnce(&mut WorkflowBuilder<D>),
113 ) -> StepBuilder<D> {
114 let if_id = self
115 .builder
116 .add_step(std::any::type_name::<primitives::if_step::IfStep>());
117 self.builder.wire_outcome(self.step_id, if_id, None);
118
119 let before_count = self.builder.steps.len();
121 build_children(&mut self.builder);
122 let after_count = self.builder.steps.len();
123
124 for child_id in before_count..after_count {
126 self.builder.add_child(if_id, child_id);
127 }
128
129 self.builder.last_step = Some(if_id);
130 StepBuilder::new(self.builder, if_id)
131 }
132
133 pub fn while_do<S: StepBody + Default + 'static>(
135 mut self,
136 build_children: impl FnOnce(&mut WorkflowBuilder<D>),
137 ) -> StepBuilder<D> {
138 let while_id = self
139 .builder
140 .add_step(std::any::type_name::<primitives::while_step::WhileStep>());
141 self.builder.wire_outcome(self.step_id, while_id, None);
142
143 let before_count = self.builder.steps.len();
144 build_children(&mut self.builder);
145 let after_count = self.builder.steps.len();
146
147 for child_id in before_count..after_count {
148 self.builder.add_child(while_id, child_id);
149 }
150
151 self.builder.last_step = Some(while_id);
152 StepBuilder::new(self.builder, while_id)
153 }
154
155 pub fn for_each<S: StepBody + Default + 'static>(
157 mut self,
158 build_children: impl FnOnce(&mut WorkflowBuilder<D>),
159 ) -> StepBuilder<D> {
160 let fe_id = self
161 .builder
162 .add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
163 self.builder.wire_outcome(self.step_id, fe_id, None);
164
165 let before_count = self.builder.steps.len();
166 build_children(&mut self.builder);
167 let after_count = self.builder.steps.len();
168
169 for child_id in before_count..after_count {
170 self.builder.add_child(fe_id, child_id);
171 }
172
173 self.builder.last_step = Some(fe_id);
174 StepBuilder::new(self.builder, fe_id)
175 }
176
177 pub fn saga(mut self, build_children: impl FnOnce(&mut WorkflowBuilder<D>)) -> StepBuilder<D> {
179 let saga_id = self.builder.add_step(std::any::type_name::<
180 primitives::saga_container::SagaContainerStep,
181 >());
182 self.builder.steps[saga_id].saga = true;
183 self.builder.wire_outcome(self.step_id, saga_id, None);
184
185 let before_count = self.builder.steps.len();
186 build_children(&mut self.builder);
187 let after_count = self.builder.steps.len();
188
189 for child_id in before_count..after_count {
190 self.builder.add_child(saga_id, child_id);
191 }
192
193 self.builder.last_step = Some(saga_id);
194 StepBuilder::new(self.builder, saga_id)
195 }
196
197 pub fn parallel(
199 mut self,
200 build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
201 ) -> StepBuilder<D> {
202 let seq_id = self
203 .builder
204 .add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
205 self.builder.wire_outcome(self.step_id, seq_id, None);
206
207 let pb = ParallelBuilder {
208 builder: self.builder,
209 container_id: seq_id,
210 };
211 let pb = build_branches(pb);
212 let mut builder = pb.builder;
213 builder.last_step = Some(seq_id);
214 StepBuilder::new(builder, seq_id)
215 }
216
217 pub fn end_workflow(self) -> WorkflowBuilder<D> {
219 self.builder
220 }
221
222 pub fn build(self, id: impl Into<String>, version: u32) -> crate::models::WorkflowDefinition {
224 self.builder.build(id, version)
225 }
226}
227
228impl<D: WorkflowData> ParallelBuilder<D> {
229 pub fn branch(mut self, build_branch: impl FnOnce(&mut WorkflowBuilder<D>)) -> Self {
231 let before_count = self.builder.steps.len();
232 build_branch(&mut self.builder);
233 let after_count = self.builder.steps.len();
234
235 for child_id in before_count..after_count {
236 self.builder.add_child(self.container_id, child_id);
237 }
238 self
239 }
240}