1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use crate::models::{ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep};
5use crate::traits::step::{StepBody, WorkflowData};
6
7use super::inline_step::InlineStep;
8use super::step_builder::StepBuilder;
9
10pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
12
13pub struct WorkflowBuilder<D: WorkflowData> {
29 pub steps: Vec<WorkflowStep>,
31 pub last_step: Option<usize>,
33 pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
35 _phantom: PhantomData<D>,
36}
37
38impl<D: WorkflowData> WorkflowBuilder<D> {
39pub fn new() -> Self {
41 Self {
42 steps: Vec::new(),
43 last_step: None,
44 inline_closures: HashMap::new(),
45 _phantom: PhantomData,
46 }
47 }
48
49 pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
51 let id = self.steps.len();
52 let step = WorkflowStep::new(id, std::any::type_name::<S>());
53 self.steps.push(step);
54 self.last_step = Some(id);
55 StepBuilder::new(self, id)
56 }
57
58 pub fn add_step(&mut self, step_type: &str) -> usize {
60 let id = self.steps.len();
61 self.steps.push(WorkflowStep::new(id, step_type));
62 id
63 }
64
65 pub fn add_step_typed<S: StepBody + Default + 'static>(
68 &mut self,
69 name: &str,
70 config: Option<serde_json::Value>,
71 ) -> usize {
72 let id = self.add_step(std::any::type_name::<S>());
73 self.steps[id].name = Some(name.to_string());
74 if let Some(cfg) = config {
75 self.steps[id].step_config = Some(cfg);
76 }
77 id
78 }
79
80 pub fn wire_outcome(
82 &mut self,
83 from_step: usize,
84 to_step: usize,
85 value: Option<serde_json::Value>,
86 ) {
87 if let Some(step) = self.steps.get_mut(from_step) {
88 step.outcomes.push(StepOutcome {
89 next_step: to_step,
90 label: None,
91 value,
92 });
93 }
94 }
95
96 pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
98 if let Some(step) = self.steps.get_mut(parent) {
99 step.children.push(child);
100 }
101 }
102
103 pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
105 let mut def = WorkflowDefinition::new(id, version);
106 def.steps = self.steps;
107 def
109 }
110
111 pub fn build_with_closures(
114 self,
115 id: impl Into<String>,
116 version: u32,
117 ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
118 let mut def = WorkflowDefinition::new(id, version);
119 def.steps = self.steps;
120 (def, self.inline_closures)
121 }
122
123 pub fn register_inline_steps(
128 self,
129 registry: &mut crate::executor::StepRegistry,
130 id: impl Into<String>,
131 version: u32,
132 ) -> WorkflowDefinition {
133 let mut def = WorkflowDefinition::new(id, version);
134 def.steps = self.steps;
135 for (step_id, closure) in self.inline_closures {
136 let closure = std::sync::Arc::new(closure);
137 let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
138 if let Some(step) = def.steps.get_mut(step_id) {
140 step.step_type = key.clone();
141 }
142 let closure = closure.clone();
143 registry.register_factory(&key, move || {
144 let c = closure.clone();
145 Box::new(InlineStep::new(move || (c)()))
146 });
147 }
148 def
149 }
150}
151
152impl<D: WorkflowData> Default for WorkflowBuilder<D> {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::models::{ErrorBehavior, ExecutionResult};
162 use crate::traits::step::StepExecutionContext;
163 use pretty_assertions::assert_eq;
164 use serde::{Deserialize, Serialize};
165
166 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
167 struct TestData {
168 counter: i32,
169 }
170
171 #[derive(Default)]
172 struct StepA;
173
174 #[async_trait::async_trait]
175 impl StepBody for StepA {
176 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
177 Ok(ExecutionResult::next())
178 }
179 }
180
181 #[derive(Default)]
182 struct StepB;
183
184 #[async_trait::async_trait]
185 impl StepBody for StepB {
186 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
187 Ok(ExecutionResult::next())
188 }
189 }
190
191 #[derive(Default)]
192 struct StepC;
193
194 #[async_trait::async_trait]
195 impl StepBody for StepC {
196 async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
197 Ok(ExecutionResult::next())
198 }
199 }
200
201 #[test]
202 fn build_empty_workflow() {
203 let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
204 assert_eq!(def.id, "empty");
205 assert_eq!(def.version, 1);
206 assert!(def.steps.is_empty());
207 }
208
209 #[test]
210 fn start_with_adds_first_step() {
211 let def = WorkflowBuilder::<TestData>::new()
212 .start_with::<StepA>()
213 .end_workflow()
214 .build("test", 1);
215 assert_eq!(def.steps.len(), 1);
216 assert!(def.steps[0].step_type.contains("StepA"));
217 }
218
219 #[test]
220 fn then_chains_two_steps_with_outcome() {
221 let def = WorkflowBuilder::<TestData>::new()
222 .start_with::<StepA>()
223 .then::<StepB>()
224 .end_workflow()
225 .build("test", 1);
226 assert_eq!(def.steps.len(), 2);
227 assert_eq!(def.steps[0].outcomes.len(), 1);
229 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
230 }
231
232 #[test]
233 fn then_chains_three_steps() {
234 let def = WorkflowBuilder::<TestData>::new()
235 .start_with::<StepA>()
236 .then::<StepB>()
237 .then::<StepC>()
238 .end_workflow()
239 .build("test", 1);
240 assert_eq!(def.steps.len(), 3);
241 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
242 assert_eq!(def.steps[1].outcomes[0].next_step, 2);
243 assert!(def.steps[2].outcomes.is_empty());
244 }
245
246 #[test]
247 fn name_sets_step_name() {
248 let def = WorkflowBuilder::<TestData>::new()
249 .start_with::<StepA>()
250 .name("First Step")
251 .end_workflow()
252 .build("test", 1);
253 assert_eq!(def.steps[0].name, Some("First Step".into()));
254 }
255
256 #[test]
257 fn on_error_sets_behavior() {
258 let def = WorkflowBuilder::<TestData>::new()
259 .start_with::<StepA>()
260 .on_error(ErrorBehavior::Suspend)
261 .end_workflow()
262 .build("test", 1);
263 assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
264 }
265
266 #[test]
267 fn if_do_inserts_container_with_children() {
268 let def = WorkflowBuilder::<TestData>::new()
269 .start_with::<StepA>()
270 .if_do::<StepB>(|b| {
271 let id = b.add_step(std::any::type_name::<StepC>());
272 b.last_step = Some(id);
273 })
274 .end_workflow()
275 .build("test", 1);
276
277 assert!(def.steps.len() >= 3);
280 assert!(def.steps[1].step_type.contains("IfStep"));
282 assert!(def.steps[1].children.contains(&2));
283 }
284
285 #[test]
286 fn while_do_inserts_container() {
287 let def = WorkflowBuilder::<TestData>::new()
288 .start_with::<StepA>()
289 .while_do::<StepB>(|b| {
290 b.add_step(std::any::type_name::<StepC>());
291 })
292 .end_workflow()
293 .build("test", 1);
294
295 assert!(def.steps.len() >= 3);
296 assert!(def.steps[1].step_type.contains("WhileStep"));
297 }
298
299 #[test]
300 fn for_each_inserts_container() {
301 let def = WorkflowBuilder::<TestData>::new()
302 .start_with::<StepA>()
303 .for_each::<StepB>(|b| {
304 b.add_step(std::any::type_name::<StepC>());
305 })
306 .end_workflow()
307 .build("test", 1);
308
309 assert!(def.steps.len() >= 3);
310 assert!(def.steps[1].step_type.contains("ForEachStep"));
311 }
312
313 #[test]
314 fn parallel_creates_branches() {
315 let def = WorkflowBuilder::<TestData>::new()
316 .start_with::<StepA>()
317 .parallel(|branches| {
318 branches
319 .branch(|b| {
320 b.add_step(std::any::type_name::<StepB>());
321 })
322 .branch(|b| {
323 b.add_step(std::any::type_name::<StepC>());
324 })
325 })
326 .end_workflow()
327 .build("test", 1);
328
329 assert!(def.steps.len() >= 4);
331 assert!(def.steps[1].step_type.contains("SequenceStep"));
332 assert!(def.steps[1].children.len() >= 2);
333 }
334
335 #[test]
336 fn saga_with_compensation() {
337 let def = WorkflowBuilder::<TestData>::new()
338 .start_with::<StepA>()
339 .saga(|b| {
340 b.add_step(std::any::type_name::<StepB>());
341 b.add_step(std::any::type_name::<StepC>());
342 })
343 .end_workflow()
344 .build("test", 1);
345
346 assert!(def.steps[1].step_type.contains("SagaContainerStep"));
348 assert!(def.steps[1].saga);
349 assert!(!def.steps[1].children.is_empty());
350 }
351
352 #[test]
353 fn compensate_with_sets_compensation_step() {
354 let def = WorkflowBuilder::<TestData>::new()
355 .start_with::<StepA>()
356 .compensate_with::<StepB>()
357 .end_workflow()
358 .build("test", 1);
359
360 assert_eq!(def.steps[0].compensation_step_id, Some(1));
362 assert!(def.steps[1].step_type.contains("StepB"));
363 }
364
365 #[test]
366 fn config_sets_step_config() {
367 let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
368 let def = WorkflowBuilder::<TestData>::new()
369 .start_with::<StepA>()
370 .config(cfg.clone())
371 .end_workflow()
372 .build("test", 1);
373 assert_eq!(def.steps[0].step_config, Some(cfg));
374 }
375
376 #[test]
377 fn config_chains_with_name() {
378 let cfg = serde_json::json!({"namespace": "data"});
379 let def = WorkflowBuilder::<TestData>::new()
380 .start_with::<StepA>()
381 .name("apply-data")
382 .config(cfg.clone())
383 .then::<StepB>()
384 .end_workflow()
385 .build("test", 1);
386 assert_eq!(def.steps[0].name, Some("apply-data".into()));
387 assert_eq!(def.steps[0].step_config, Some(cfg));
388 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
389 }
390
391 #[test]
392 fn config_on_multiple_steps_of_same_type() {
393 let cfg_a = serde_json::json!({"namespace": "ory"});
394 let cfg_b = serde_json::json!({"namespace": "data"});
395 let def = WorkflowBuilder::<TestData>::new()
396 .start_with::<StepA>()
397 .name("apply-ory")
398 .config(cfg_a.clone())
399 .then::<StepA>()
400 .name("apply-data")
401 .config(cfg_b.clone())
402 .end_workflow()
403 .build("test", 1);
404 assert_eq!(def.steps[0].step_config, Some(cfg_a));
405 assert_eq!(def.steps[1].step_config, Some(cfg_b));
406 assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
408 }
409
410 #[test]
411 fn add_step_typed_sets_name_and_config() {
412 let cfg = serde_json::json!({"namespace": "ory"});
413 let mut builder = WorkflowBuilder::<TestData>::new();
414 let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
415 assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
416 assert_eq!(builder.steps[id].step_config, Some(cfg));
417 assert!(builder.steps[id].step_type.contains("StepA"));
418 }
419
420 #[test]
421 fn add_step_typed_without_config() {
422 let mut builder = WorkflowBuilder::<TestData>::new();
423 let id = builder.add_step_typed::<StepB>("my-step", None);
424 assert_eq!(builder.steps[id].name, Some("my-step".into()));
425 assert_eq!(builder.steps[id].step_config, None);
426 }
427
428 #[test]
429 fn wire_outcome_connects_steps() {
430 let mut builder = WorkflowBuilder::<TestData>::new();
431 let id0 = builder.add_step_typed::<StepA>("first", None);
432 let id1 = builder.add_step_typed::<StepB>("second", None);
433 builder.wire_outcome(id0, id1, None);
434 assert_eq!(builder.steps[id0].outcomes.len(), 1);
435 assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
436 }
437
438 #[test]
439 fn inline_step_via_then_fn() {
440 let def = WorkflowBuilder::<TestData>::new()
441 .start_with::<StepA>()
442 .then_fn(ExecutionResult::next)
443 .end_workflow()
444 .build("test", 1);
445
446 assert_eq!(def.steps.len(), 2);
447 assert!(def.steps[1].step_type.contains("InlineStep"));
448 assert_eq!(def.steps[0].outcomes[0].next_step, 1);
449 }
450}