1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{
5 ErrorBehavior, ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
6};
7use crate::traits::step::{StepBody, WorkflowData};
8
9use super::inline_step::InlineStep;
10use super::step_builder::StepBuilder;
11
12pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
14
15pub struct WorkflowBuilder<D: WorkflowData> {
31 pub steps: Vec<WorkflowStep>,
33 pub last_step: Option<usize>,
35 pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
37 pub(crate) default_error_behavior: Option<ErrorBehavior>,
39 _phantom: PhantomData<D>,
40}
41
42impl<D: WorkflowData> WorkflowBuilder<D> {
43 pub fn new() -> Self {
45 Self {
46 steps: Vec::new(),
47 last_step: None,
48 inline_closures: HashMap::new(),
49 default_error_behavior: None,
50 _phantom: PhantomData,
51 }
52 }
53
54 pub fn default_error_behavior(mut self, behavior: ErrorBehavior) -> Self {
59 self.default_error_behavior = Some(behavior);
60 self
61 }
62
63 pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
79 let id = self.steps.len();
80 let step = WorkflowStep::new(id, std::any::type_name::<S>());
81 self.steps.push(step);
82 self.last_step = Some(id);
83 StepBuilder::new(self, id)
84 }
85
86 pub fn add_step(&mut self, step_type: &str) -> usize {
88 let id = self.steps.len();
89 self.steps.push(WorkflowStep::new(id, step_type));
90 id
91 }
92
93 pub fn add_step_typed<S: StepBody + Default + 'static>(
96 &mut self,
97 name: &str,
98 config: Option<serde_json::Value>,
99 ) -> usize {
100 let id = self.add_step(std::any::type_name::<S>());
101 self.steps[id].name = Some(name.to_string());
102 if let Some(cfg) = config {
103 self.steps[id].step_config = Some(cfg);
104 }
105 id
106 }
107
108 pub fn wire_outcome(
110 &mut self,
111 from_step: usize,
112 to_step: usize,
113 value: Option<serde_json::Value>,
114 ) {
115 if let Some(step) = self.steps.get_mut(from_step) {
116 step.outcomes.push(StepOutcome {
117 next_step: to_step,
118 label: None,
119 value,
120 });
121 }
122 }
123
124 pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
126 if let Some(step) = self.steps.get_mut(parent) {
127 step.children.push(child);
128 }
129 }
130
131 pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
141 let mut def = WorkflowDefinition::new(id, version);
142 def.steps = self.steps;
143 if let Some(eb) = self.default_error_behavior {
144 def.default_error_behavior = eb;
145 }
146 def
148 }
149
150 pub fn build_with_closures(
157 self,
158 id: impl Into<String>,
159 version: u32,
160 ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
161 let mut def = WorkflowDefinition::new(id, version);
162 def.steps = self.steps;
163 if let Some(eb) = self.default_error_behavior {
164 def.default_error_behavior = eb;
165 }
166 (def, self.inline_closures)
167 }
168
169 pub fn register_inline_steps(
174 self,
175 registry: &mut crate::executor::StepRegistry,
176 id: impl Into<String>,
177 version: u32,
178 ) -> WorkflowDefinition {
179 let mut def = WorkflowDefinition::new(id, version);
180 def.steps = self.steps;
181 if let Some(eb) = self.default_error_behavior {
182 def.default_error_behavior = eb;
183 }
184 for (step_id, closure) in self.inline_closures {
185 let closure = std::sync::Arc::new(closure);
186 let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
187 if let Some(step) = def.steps.get_mut(step_id) {
189 step.step_type = key.clone();
190 }
191 let closure = closure.clone();
192 registry.register_factory(&key, move || {
193 let c = closure.clone();
194 Box::new(InlineStep::new(move || (c)()))
195 });
196 }
197 def
198 }
199}
200
201impl<D: WorkflowData> Default for WorkflowBuilder<D> {
202 fn default() -> Self {
203 Self::new()
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::models::{ErrorBehavior, ExecutionResult};
211 use crate::traits::step::StepExecutionContext;
212 use pretty_assertions::assert_eq;
213 use serde::{Deserialize, Serialize};
214
215 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
216 struct TestData {
217 counter: i32,
218 }
219
220 #[derive(Default)]
221 struct StepA;
222
223 #[async_trait::async_trait]
224 impl StepBody for StepA {
225 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
226 Ok(ExecutionResult::next())
227 }
228 }
229
230 #[derive(Default)]
231 struct StepB;
232
233 #[async_trait::async_trait]
234 impl StepBody for StepB {
235 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
236 Ok(ExecutionResult::next())
237 }
238 }
239
240 #[derive(Default)]
241 struct StepC;
242
243 #[async_trait::async_trait]
244 impl StepBody for StepC {
245 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
246 Ok(ExecutionResult::next())
247 }
248 }
249
250 #[test]
251 fn build_empty_workflow() {
252 let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
253 assert_eq!(def.id, "empty");
254 assert_eq!(def.version, 1);
255 assert!(def.steps.is_empty());
256 }
257
258 #[test]
259 fn start_with_adds_first_step() {
260 let def = WorkflowBuilder::<TestData>::new()
261 .start_with::<StepA>()
262 .end_workflow()
263 .build("test", 1);
264 assert_eq!(def.steps.len(), 1);
265 assert!(def.steps[0].step_type.contains("StepA"));
266 }
267
268 #[test]
269 fn then_chains_two_steps_with_outcome() {
270 let def = WorkflowBuilder::<TestData>::new()
271 .start_with::<StepA>()
272 .then::<StepB>()
273 .end_workflow()
274 .build("test", 1);
275 assert_eq!(def.steps.len(), 2);
276 assert_eq!(def.steps[0].outcomes.len(), 1);
278 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
279 }
280
281 #[test]
282 fn then_chains_three_steps() {
283 let def = WorkflowBuilder::<TestData>::new()
284 .start_with::<StepA>()
285 .then::<StepB>()
286 .then::<StepC>()
287 .end_workflow()
288 .build("test", 1);
289 assert_eq!(def.steps.len(), 3);
290 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
291 assert_eq!(def.steps[1].outcomes[0].next_step, 2);
292 assert!(def.steps[2].outcomes.is_empty());
293 }
294
295 #[test]
296 fn name_sets_step_name() {
297 let def = WorkflowBuilder::<TestData>::new()
298 .start_with::<StepA>()
299 .name("First Step")
300 .end_workflow()
301 .build("test", 1);
302 assert_eq!(def.steps[0].name, Some("First Step".into()));
303 }
304
305 #[test]
306 fn on_error_sets_behavior() {
307 let def = WorkflowBuilder::<TestData>::new()
308 .start_with::<StepA>()
309 .on_error(ErrorBehavior::Suspend)
310 .end_workflow()
311 .build("test", 1);
312 assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
313 }
314
315 #[test]
316 fn if_do_inserts_container_with_children() {
317 let def = WorkflowBuilder::<TestData>::new()
318 .start_with::<StepA>()
319 .if_do::<StepB>(|b| {
320 let id = b.add_step(std::any::type_name::<StepC>());
321 b.last_step = Some(id);
322 })
323 .end_workflow()
324 .build("test", 1);
325
326 assert!(def.steps.len() >= 3);
329 assert!(def.steps[1].step_type.contains("IfStep"));
331 assert!(def.steps[1].children.contains(&2));
332 }
333
334 #[test]
335 fn while_do_inserts_container() {
336 let def = WorkflowBuilder::<TestData>::new()
337 .start_with::<StepA>()
338 .while_do::<StepB>(|b| {
339 b.add_step(std::any::type_name::<StepC>());
340 })
341 .end_workflow()
342 .build("test", 1);
343
344 assert!(def.steps.len() >= 3);
345 assert!(def.steps[1].step_type.contains("WhileStep"));
346 }
347
348 #[test]
349 fn for_each_inserts_container() {
350 let def = WorkflowBuilder::<TestData>::new()
351 .start_with::<StepA>()
352 .for_each::<StepB>(|b| {
353 b.add_step(std::any::type_name::<StepC>());
354 })
355 .end_workflow()
356 .build("test", 1);
357
358 assert!(def.steps.len() >= 3);
359 assert!(def.steps[1].step_type.contains("ForEachStep"));
360 }
361
362 #[test]
363 fn parallel_creates_branches() {
364 let def = WorkflowBuilder::<TestData>::new()
365 .start_with::<StepA>()
366 .parallel(|branches| {
367 branches
368 .branch(|b| {
369 b.add_step(std::any::type_name::<StepB>());
370 })
371 .branch(|b| {
372 b.add_step(std::any::type_name::<StepC>());
373 })
374 })
375 .end_workflow()
376 .build("test", 1);
377
378 assert!(def.steps.len() >= 4);
380 assert!(def.steps[1].step_type.contains("SequenceStep"));
381 assert!(def.steps[1].children.len() >= 2);
382 }
383
384 #[test]
385 fn saga_with_compensation() {
386 let def = WorkflowBuilder::<TestData>::new()
387 .start_with::<StepA>()
388 .saga(|b| {
389 b.add_step(std::any::type_name::<StepB>());
390 b.add_step(std::any::type_name::<StepC>());
391 })
392 .end_workflow()
393 .build("test", 1);
394
395 assert!(def.steps[1].step_type.contains("SagaContainerStep"));
397 assert!(def.steps[1].saga);
398 assert!(!def.steps[1].children.is_empty());
399 }
400
401 #[test]
402 fn compensate_with_sets_compensation_step() {
403 let def = WorkflowBuilder::<TestData>::new()
404 .start_with::<StepA>()
405 .compensate_with::<StepB>()
406 .end_workflow()
407 .build("test", 1);
408
409 assert_eq!(def.steps[0].compensation_step_id, Some(1));
411 assert!(def.steps[1].step_type.contains("StepB"));
412 }
413
414 #[test]
415 fn config_sets_step_config() {
416 let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
417 let def = WorkflowBuilder::<TestData>::new()
418 .start_with::<StepA>()
419 .config(cfg.clone())
420 .end_workflow()
421 .build("test", 1);
422 assert_eq!(def.steps[0].step_config, Some(cfg));
423 }
424
425 #[test]
426 fn config_chains_with_name() {
427 let cfg = serde_json::json!({"namespace": "data"});
428 let def = WorkflowBuilder::<TestData>::new()
429 .start_with::<StepA>()
430 .name("apply-data")
431 .config(cfg.clone())
432 .then::<StepB>()
433 .end_workflow()
434 .build("test", 1);
435 assert_eq!(def.steps[0].name, Some("apply-data".into()));
436 assert_eq!(def.steps[0].step_config, Some(cfg));
437 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
438 }
439
440 #[test]
441 fn config_on_multiple_steps_of_same_type() {
442 let cfg_a = serde_json::json!({"namespace": "ory"});
443 let cfg_b = serde_json::json!({"namespace": "data"});
444 let def = WorkflowBuilder::<TestData>::new()
445 .start_with::<StepA>()
446 .name("apply-ory")
447 .config(cfg_a.clone())
448 .then::<StepA>()
449 .name("apply-data")
450 .config(cfg_b.clone())
451 .end_workflow()
452 .build("test", 1);
453 assert_eq!(def.steps[0].step_config, Some(cfg_a));
454 assert_eq!(def.steps[1].step_config, Some(cfg_b));
455 assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
457 }
458
459 #[test]
460 fn add_step_typed_sets_name_and_config() {
461 let cfg = serde_json::json!({"namespace": "ory"});
462 let mut builder = WorkflowBuilder::<TestData>::new();
463 let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
464 assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
465 assert_eq!(builder.steps[id].step_config, Some(cfg));
466 assert!(builder.steps[id].step_type.contains("StepA"));
467 }
468
469 #[test]
470 fn add_step_typed_without_config() {
471 let mut builder = WorkflowBuilder::<TestData>::new();
472 let id = builder.add_step_typed::<StepB>("my-step", None);
473 assert_eq!(builder.steps[id].name, Some("my-step".into()));
474 assert_eq!(builder.steps[id].step_config, None);
475 }
476
477 #[test]
478 fn wire_outcome_connects_steps() {
479 let mut builder = WorkflowBuilder::<TestData>::new();
480 let id0 = builder.add_step_typed::<StepA>("first", None);
481 let id1 = builder.add_step_typed::<StepB>("second", None);
482 builder.wire_outcome(id0, id1, None);
483 assert_eq!(builder.steps[id0].outcomes.len(), 1);
484 assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
485 }
486
487 #[test]
488 fn inline_step_via_then_fn() {
489 let def = WorkflowBuilder::<TestData>::new()
490 .start_with::<StepA>()
491 .then_fn(ExecutionResult::next)
492 .end_workflow()
493 .build("test", 1);
494
495 assert_eq!(def.steps.len(), 2);
496 assert!(def.steps[1].step_type.contains("InlineStep"));
497 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
498 }
499
500 #[test]
501 fn default_error_behavior_preserved_in_build() {
502 let def = WorkflowBuilder::<TestData>::new()
503 .default_error_behavior(ErrorBehavior::Terminate)
504 .start_with::<StepA>()
505 .end_workflow()
506 .build("test", 1);
507
508 assert_eq!(def.default_error_behavior, ErrorBehavior::Terminate);
509 }
510
511 #[test]
512 fn default_error_behavior_omitted_uses_default() {
513 let def = WorkflowBuilder::<TestData>::new()
514 .start_with::<StepA>()
515 .end_workflow()
516 .build("test", 1);
517
518 assert_eq!(def.default_error_behavior, ErrorBehavior::default());
519 }
520}