1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep};
5use crate::traits::step::{StepBody, WorkflowData};
6
7use super::inline_step::InlineStep;
8use super::step_builder::StepBuilder;
9
10pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
12
13pub struct WorkflowBuilder<D: WorkflowData> {
29 pub steps: Vec<WorkflowStep>,
31 pub last_step: Option<usize>,
33 pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
35 _phantom: PhantomData<D>,
36}
37
38impl<D: WorkflowData> WorkflowBuilder<D> {
39pub fn new() -> Self {
41 Self {
42 steps: Vec::new(),
43 last_step: None,
44 inline_closures: HashMap::new(),
45 _phantom: PhantomData,
46 }
47 }
48
49 pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
65 let id = self.steps.len();
66 let step = WorkflowStep::new(id, std::any::type_name::<S>());
67 self.steps.push(step);
68 self.last_step = Some(id);
69 StepBuilder::new(self, id)
70 }
71
72 pub fn add_step(&mut self, step_type: &str) -> usize {
74 let id = self.steps.len();
75 self.steps.push(WorkflowStep::new(id, step_type));
76 id
77 }
78
79 pub fn add_step_typed<S: StepBody + Default + 'static>(
82 &mut self,
83 name: &str,
84 config: Option<serde_json::Value>,
85 ) -> usize {
86 let id = self.add_step(std::any::type_name::<S>());
87 self.steps[id].name = Some(name.to_string());
88 if let Some(cfg) = config {
89 self.steps[id].step_config = Some(cfg);
90 }
91 id
92 }
93
94 pub fn wire_outcome(
96 &mut self,
97 from_step: usize,
98 to_step: usize,
99 value: Option<serde_json::Value>,
100 ) {
101 if let Some(step) = self.steps.get_mut(from_step) {
102 step.outcomes.push(StepOutcome {
103 next_step: to_step,
104 label: None,
105 value,
106 });
107 }
108 }
109
110 pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
112 if let Some(step) = self.steps.get_mut(parent) {
113 step.children.push(child);
114 }
115 }
116
117 pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
127 let mut def = WorkflowDefinition::new(id, version);
128 def.steps = self.steps;
129 def
131 }
132
133 pub fn build_with_closures(
140 self,
141 id: impl Into<String>,
142 version: u32,
143 ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
144 let mut def = WorkflowDefinition::new(id, version);
145 def.steps = self.steps;
146 (def, self.inline_closures)
147 }
148
149 pub fn register_inline_steps(
154 self,
155 registry: &mut crate::executor::StepRegistry,
156 id: impl Into<String>,
157 version: u32,
158 ) -> WorkflowDefinition {
159 let mut def = WorkflowDefinition::new(id, version);
160 def.steps = self.steps;
161 for (step_id, closure) in self.inline_closures {
162 let closure = std::sync::Arc::new(closure);
163 let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
164 if let Some(step) = def.steps.get_mut(step_id) {
166 step.step_type = key.clone();
167 }
168 let closure = closure.clone();
169 registry.register_factory(&key, move || {
170 let c = closure.clone();
171 Box::new(InlineStep::new(move || (c)()))
172 });
173 }
174 def
175 }
176}
177
178impl<D: WorkflowData> Default for WorkflowBuilder<D> {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use crate::models::{ErrorBehavior, ExecutionResult};
188 use crate::traits::step::StepExecutionContext;
189 use pretty_assertions::assert_eq;
190 use serde::{Deserialize, Serialize};
191
192 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
193 struct TestData {
194 counter: i32,
195 }
196
197 #[derive(Default)]
198 struct StepA;
199
200 #[async_trait::async_trait]
201 impl StepBody for StepA {
202 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
203 Ok(ExecutionResult::next())
204 }
205 }
206
207 #[derive(Default)]
208 struct StepB;
209
210 #[async_trait::async_trait]
211 impl StepBody for StepB {
212 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
213 Ok(ExecutionResult::next())
214 }
215 }
216
217 #[derive(Default)]
218 struct StepC;
219
220 #[async_trait::async_trait]
221 impl StepBody for StepC {
222 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
223 Ok(ExecutionResult::next())
224 }
225 }
226
227 #[test]
228 fn build_empty_workflow() {
229 let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
230 assert_eq!(def.id, "empty");
231 assert_eq!(def.version, 1);
232 assert!(def.steps.is_empty());
233 }
234
235 #[test]
236 fn start_with_adds_first_step() {
237 let def = WorkflowBuilder::<TestData>::new()
238 .start_with::<StepA>()
239 .end_workflow()
240 .build("test", 1);
241 assert_eq!(def.steps.len(), 1);
242 assert!(def.steps[0].step_type.contains("StepA"));
243 }
244
245 #[test]
246 fn then_chains_two_steps_with_outcome() {
247 let def = WorkflowBuilder::<TestData>::new()
248 .start_with::<StepA>()
249 .then::<StepB>()
250 .end_workflow()
251 .build("test", 1);
252 assert_eq!(def.steps.len(), 2);
253 assert_eq!(def.steps[0].outcomes.len(), 1);
255 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
256 }
257
258 #[test]
259 fn then_chains_three_steps() {
260 let def = WorkflowBuilder::<TestData>::new()
261 .start_with::<StepA>()
262 .then::<StepB>()
263 .then::<StepC>()
264 .end_workflow()
265 .build("test", 1);
266 assert_eq!(def.steps.len(), 3);
267 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
268 assert_eq!(def.steps[1].outcomes[0].next_step, 2);
269 assert!(def.steps[2].outcomes.is_empty());
270 }
271
272 #[test]
273 fn name_sets_step_name() {
274 let def = WorkflowBuilder::<TestData>::new()
275 .start_with::<StepA>()
276 .name("First Step")
277 .end_workflow()
278 .build("test", 1);
279 assert_eq!(def.steps[0].name, Some("First Step".into()));
280 }
281
282 #[test]
283 fn on_error_sets_behavior() {
284 let def = WorkflowBuilder::<TestData>::new()
285 .start_with::<StepA>()
286 .on_error(ErrorBehavior::Suspend)
287 .end_workflow()
288 .build("test", 1);
289 assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
290 }
291
292 #[test]
293 fn if_do_inserts_container_with_children() {
294 let def = WorkflowBuilder::<TestData>::new()
295 .start_with::<StepA>()
296 .if_do::<StepB>(|b| {
297 let id = b.add_step(std::any::type_name::<StepC>());
298 b.last_step = Some(id);
299 })
300 .end_workflow()
301 .build("test", 1);
302
303 assert!(def.steps.len() >= 3);
306 assert!(def.steps[1].step_type.contains("IfStep"));
308 assert!(def.steps[1].children.contains(&2));
309 }
310
311 #[test]
312 fn while_do_inserts_container() {
313 let def = WorkflowBuilder::<TestData>::new()
314 .start_with::<StepA>()
315 .while_do::<StepB>(|b| {
316 b.add_step(std::any::type_name::<StepC>());
317 })
318 .end_workflow()
319 .build("test", 1);
320
321 assert!(def.steps.len() >= 3);
322 assert!(def.steps[1].step_type.contains("WhileStep"));
323 }
324
325 #[test]
326 fn for_each_inserts_container() {
327 let def = WorkflowBuilder::<TestData>::new()
328 .start_with::<StepA>()
329 .for_each::<StepB>(|b| {
330 b.add_step(std::any::type_name::<StepC>());
331 })
332 .end_workflow()
333 .build("test", 1);
334
335 assert!(def.steps.len() >= 3);
336 assert!(def.steps[1].step_type.contains("ForEachStep"));
337 }
338
339 #[test]
340 fn parallel_creates_branches() {
341 let def = WorkflowBuilder::<TestData>::new()
342 .start_with::<StepA>()
343 .parallel(|branches| {
344 branches
345 .branch(|b| {
346 b.add_step(std::any::type_name::<StepB>());
347 })
348 .branch(|b| {
349 b.add_step(std::any::type_name::<StepC>());
350 })
351 })
352 .end_workflow()
353 .build("test", 1);
354
355 assert!(def.steps.len() >= 4);
357 assert!(def.steps[1].step_type.contains("SequenceStep"));
358 assert!(def.steps[1].children.len() >= 2);
359 }
360
361 #[test]
362 fn saga_with_compensation() {
363 let def = WorkflowBuilder::<TestData>::new()
364 .start_with::<StepA>()
365 .saga(|b| {
366 b.add_step(std::any::type_name::<StepB>());
367 b.add_step(std::any::type_name::<StepC>());
368 })
369 .end_workflow()
370 .build("test", 1);
371
372 assert!(def.steps[1].step_type.contains("SagaContainerStep"));
374 assert!(def.steps[1].saga);
375 assert!(!def.steps[1].children.is_empty());
376 }
377
378 #[test]
379 fn compensate_with_sets_compensation_step() {
380 let def = WorkflowBuilder::<TestData>::new()
381 .start_with::<StepA>()
382 .compensate_with::<StepB>()
383 .end_workflow()
384 .build("test", 1);
385
386 assert_eq!(def.steps[0].compensation_step_id, Some(1));
388 assert!(def.steps[1].step_type.contains("StepB"));
389 }
390
391 #[test]
392 fn config_sets_step_config() {
393 let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
394 let def = WorkflowBuilder::<TestData>::new()
395 .start_with::<StepA>()
396 .config(cfg.clone())
397 .end_workflow()
398 .build("test", 1);
399 assert_eq!(def.steps[0].step_config, Some(cfg));
400 }
401
402 #[test]
403 fn config_chains_with_name() {
404 let cfg = serde_json::json!({"namespace": "data"});
405 let def = WorkflowBuilder::<TestData>::new()
406 .start_with::<StepA>()
407 .name("apply-data")
408 .config(cfg.clone())
409 .then::<StepB>()
410 .end_workflow()
411 .build("test", 1);
412 assert_eq!(def.steps[0].name, Some("apply-data".into()));
413 assert_eq!(def.steps[0].step_config, Some(cfg));
414 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
415 }
416
417 #[test]
418 fn config_on_multiple_steps_of_same_type() {
419 let cfg_a = serde_json::json!({"namespace": "ory"});
420 let cfg_b = serde_json::json!({"namespace": "data"});
421 let def = WorkflowBuilder::<TestData>::new()
422 .start_with::<StepA>()
423 .name("apply-ory")
424 .config(cfg_a.clone())
425 .then::<StepA>()
426 .name("apply-data")
427 .config(cfg_b.clone())
428 .end_workflow()
429 .build("test", 1);
430 assert_eq!(def.steps[0].step_config, Some(cfg_a));
431 assert_eq!(def.steps[1].step_config, Some(cfg_b));
432 assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
434 }
435
436 #[test]
437 fn add_step_typed_sets_name_and_config() {
438 let cfg = serde_json::json!({"namespace": "ory"});
439 let mut builder = WorkflowBuilder::<TestData>::new();
440 let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
441 assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
442 assert_eq!(builder.steps[id].step_config, Some(cfg));
443 assert!(builder.steps[id].step_type.contains("StepA"));
444 }
445
446 #[test]
447 fn add_step_typed_without_config() {
448 let mut builder = WorkflowBuilder::<TestData>::new();
449 let id = builder.add_step_typed::<StepB>("my-step", None);
450 assert_eq!(builder.steps[id].name, Some("my-step".into()));
451 assert_eq!(builder.steps[id].step_config, None);
452 }
453
454 #[test]
455 fn wire_outcome_connects_steps() {
456 let mut builder = WorkflowBuilder::<TestData>::new();
457 let id0 = builder.add_step_typed::<StepA>("first", None);
458 let id1 = builder.add_step_typed::<StepB>("second", None);
459 builder.wire_outcome(id0, id1, None);
460 assert_eq!(builder.steps[id0].outcomes.len(), 1);
461 assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
462 }
463
464 #[test]
465 fn inline_step_via_then_fn() {
466 let def = WorkflowBuilder::<TestData>::new()
467 .start_with::<StepA>()
468 .then_fn(ExecutionResult::next)
469 .end_workflow()
470 .build("test", 1);
471
472 assert_eq!(def.steps.len(), 2);
473 assert!(def.steps[1].step_type.contains("InlineStep"));
474 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
475 }
476}