wfe_core/builder/
workflow_builder.rs1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{
5 ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
6};
7use crate::traits::step::{StepBody, WorkflowData};
8
9use super::inline_step::InlineStep;
10use super::step_builder::StepBuilder;
11
12pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
14
15pub struct WorkflowBuilder<D: WorkflowData> {
31 pub(crate) steps: Vec<WorkflowStep>,
32 pub(crate) last_step: Option<usize>,
33 pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
35 _phantom: PhantomData<D>,
36}
37
38impl<D: WorkflowData> WorkflowBuilder<D> {
39 pub fn new() -> Self {
40 Self {
41 steps: Vec::new(),
42 last_step: None,
43 inline_closures: HashMap::new(),
44 _phantom: PhantomData,
45 }
46 }
47
48 pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
50 let id = self.steps.len();
51 let step = WorkflowStep::new(id, std::any::type_name::<S>());
52 self.steps.push(step);
53 self.last_step = Some(id);
54 StepBuilder::new(self, id)
55 }
56
57 pub fn add_step(&mut self, step_type: &str) -> usize {
59 let id = self.steps.len();
60 self.steps.push(WorkflowStep::new(id, step_type));
61 id
62 }
63
64 pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
66 if let Some(step) = self.steps.get_mut(from_step) {
67 step.outcomes.push(StepOutcome {
68 next_step: to_step,
69 label: None,
70 value,
71 });
72 }
73 }
74
75 pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
77 if let Some(step) = self.steps.get_mut(parent) {
78 step.children.push(child);
79 }
80 }
81
82 pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
84 let mut def = WorkflowDefinition::new(id, version);
85 def.steps = self.steps;
86 def
88 }
89
90 pub fn build_with_closures(
93 self,
94 id: impl Into<String>,
95 version: u32,
96 ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
97 let mut def = WorkflowDefinition::new(id, version);
98 def.steps = self.steps;
99 (def, self.inline_closures)
100 }
101
102 pub fn register_inline_steps(
107 self,
108 registry: &mut crate::executor::StepRegistry,
109 id: impl Into<String>,
110 version: u32,
111 ) -> WorkflowDefinition {
112 let mut def = WorkflowDefinition::new(id, version);
113 def.steps = self.steps;
114 for (step_id, closure) in self.inline_closures {
115 let closure = std::sync::Arc::new(closure);
116 let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
117 if let Some(step) = def.steps.get_mut(step_id) {
119 step.step_type = key.clone();
120 }
121 let closure = closure.clone();
122 registry.register_factory(&key, move || {
123 let c = closure.clone();
124 Box::new(InlineStep::new(move || (c)()))
125 });
126 }
127 def
128 }
129}
130
131impl<D: WorkflowData> Default for WorkflowBuilder<D> {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::models::{ErrorBehavior, ExecutionResult};
141 use crate::traits::step::StepExecutionContext;
142 use pretty_assertions::assert_eq;
143 use serde::{Deserialize, Serialize};
144
145 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
146 struct TestData {
147 counter: i32,
148 }
149
150 #[derive(Default)]
151 struct StepA;
152
153 #[async_trait::async_trait]
154 impl StepBody for StepA {
155 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
156 Ok(ExecutionResult::next())
157 }
158 }
159
160 #[derive(Default)]
161 struct StepB;
162
163 #[async_trait::async_trait]
164 impl StepBody for StepB {
165 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
166 Ok(ExecutionResult::next())
167 }
168 }
169
170 #[derive(Default)]
171 struct StepC;
172
173 #[async_trait::async_trait]
174 impl StepBody for StepC {
175 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
176 Ok(ExecutionResult::next())
177 }
178 }
179
180 #[test]
181 fn build_empty_workflow() {
182 let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
183 assert_eq!(def.id, "empty");
184 assert_eq!(def.version, 1);
185 assert!(def.steps.is_empty());
186 }
187
188 #[test]
189 fn start_with_adds_first_step() {
190 let def = WorkflowBuilder::<TestData>::new()
191 .start_with::<StepA>()
192 .end_workflow()
193 .build("test", 1);
194 assert_eq!(def.steps.len(), 1);
195 assert!(def.steps[0].step_type.contains("StepA"));
196 }
197
198 #[test]
199 fn then_chains_two_steps_with_outcome() {
200 let def = WorkflowBuilder::<TestData>::new()
201 .start_with::<StepA>()
202 .then::<StepB>()
203 .end_workflow()
204 .build("test", 1);
205 assert_eq!(def.steps.len(), 2);
206 assert_eq!(def.steps[0].outcomes.len(), 1);
208 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
209 }
210
211 #[test]
212 fn then_chains_three_steps() {
213 let def = WorkflowBuilder::<TestData>::new()
214 .start_with::<StepA>()
215 .then::<StepB>()
216 .then::<StepC>()
217 .end_workflow()
218 .build("test", 1);
219 assert_eq!(def.steps.len(), 3);
220 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
221 assert_eq!(def.steps[1].outcomes[0].next_step, 2);
222 assert!(def.steps[2].outcomes.is_empty());
223 }
224
225 #[test]
226 fn name_sets_step_name() {
227 let def = WorkflowBuilder::<TestData>::new()
228 .start_with::<StepA>()
229 .name("First Step")
230 .end_workflow()
231 .build("test", 1);
232 assert_eq!(def.steps[0].name, Some("First Step".into()));
233 }
234
235 #[test]
236 fn on_error_sets_behavior() {
237 let def = WorkflowBuilder::<TestData>::new()
238 .start_with::<StepA>()
239 .on_error(ErrorBehavior::Suspend)
240 .end_workflow()
241 .build("test", 1);
242 assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
243 }
244
245 #[test]
246 fn if_do_inserts_container_with_children() {
247 let def = WorkflowBuilder::<TestData>::new()
248 .start_with::<StepA>()
249 .if_do::<StepB>(|b| {
250 let id = b.add_step(std::any::type_name::<StepC>());
251 b.last_step = Some(id);
252 })
253 .end_workflow()
254 .build("test", 1);
255
256 assert!(def.steps.len() >= 3);
259 assert!(def.steps[1].step_type.contains("IfStep"));
261 assert!(def.steps[1].children.contains(&2));
262 }
263
264 #[test]
265 fn while_do_inserts_container() {
266 let def = WorkflowBuilder::<TestData>::new()
267 .start_with::<StepA>()
268 .while_do::<StepB>(|b| {
269 b.add_step(std::any::type_name::<StepC>());
270 })
271 .end_workflow()
272 .build("test", 1);
273
274 assert!(def.steps.len() >= 3);
275 assert!(def.steps[1].step_type.contains("WhileStep"));
276 }
277
278 #[test]
279 fn for_each_inserts_container() {
280 let def = WorkflowBuilder::<TestData>::new()
281 .start_with::<StepA>()
282 .for_each::<StepB>(|b| {
283 b.add_step(std::any::type_name::<StepC>());
284 })
285 .end_workflow()
286 .build("test", 1);
287
288 assert!(def.steps.len() >= 3);
289 assert!(def.steps[1].step_type.contains("ForEachStep"));
290 }
291
292 #[test]
293 fn parallel_creates_branches() {
294 let def = WorkflowBuilder::<TestData>::new()
295 .start_with::<StepA>()
296 .parallel(|branches| {
297 branches
298 .branch(|b| {
299 b.add_step(std::any::type_name::<StepB>());
300 })
301 .branch(|b| {
302 b.add_step(std::any::type_name::<StepC>());
303 })
304 })
305 .end_workflow()
306 .build("test", 1);
307
308 assert!(def.steps.len() >= 4);
310 assert!(def.steps[1].step_type.contains("SequenceStep"));
311 assert!(def.steps[1].children.len() >= 2);
312 }
313
314 #[test]
315 fn saga_with_compensation() {
316 let def = WorkflowBuilder::<TestData>::new()
317 .start_with::<StepA>()
318 .saga(|b| {
319 b.add_step(std::any::type_name::<StepB>());
320 b.add_step(std::any::type_name::<StepC>());
321 })
322 .end_workflow()
323 .build("test", 1);
324
325 assert!(def.steps[1].step_type.contains("SagaContainerStep"));
327 assert!(def.steps[1].saga);
328 assert!(!def.steps[1].children.is_empty());
329 }
330
331 #[test]
332 fn compensate_with_sets_compensation_step() {
333 let def = WorkflowBuilder::<TestData>::new()
334 .start_with::<StepA>()
335 .compensate_with::<StepB>()
336 .end_workflow()
337 .build("test", 1);
338
339 assert_eq!(def.steps[0].compensation_step_id, Some(1));
341 assert!(def.steps[1].step_type.contains("StepB"));
342 }
343
344 #[test]
345 fn inline_step_via_then_fn() {
346 let def = WorkflowBuilder::<TestData>::new()
347 .start_with::<StepA>()
348 .then_fn(ExecutionResult::next)
349 .end_workflow()
350 .build("test", 1);
351
352 assert_eq!(def.steps.len(), 2);
353 assert!(def.steps[1].step_type.contains("InlineStep"));
354 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
355 }
356}