Skip to main content

erio_workflow/
conditional.rs

1//! Conditional step that executes based on a runtime predicate.
2
3use crate::WorkflowError;
4use crate::context::WorkflowContext;
5use crate::step::{Step, StepOutput};
6
7/// A step that only executes its inner step when a condition is met.
8///
9/// If the condition returns `false`, the step produces a skipped output
10/// and downstream steps still see it as completed.
11pub struct ConditionalStep<S: Step> {
12    id: String,
13    inner: S,
14    condition: Box<dyn Fn(&WorkflowContext) -> bool + Send + Sync>,
15}
16
17impl<S: Step> ConditionalStep<S> {
18    /// Creates a conditional step wrapping `inner` with the given predicate.
19    pub fn new(
20        id: &str,
21        inner: S,
22        condition: impl Fn(&WorkflowContext) -> bool + Send + Sync + 'static,
23    ) -> Self {
24        Self {
25            id: id.into(),
26            inner,
27            condition: Box::new(condition),
28        }
29    }
30}
31
32#[async_trait::async_trait]
33impl<S: Step + 'static> Step for ConditionalStep<S> {
34    fn id(&self) -> &str {
35        &self.id
36    }
37
38    async fn execute(&self, ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
39        if (self.condition)(ctx) {
40            self.inner.execute(ctx).await
41        } else {
42            Ok(StepOutput::skipped())
43        }
44    }
45}
46
47#[cfg(test)]
48mod tests {
49    use super::*;
50
51    // === Mock inner step ===
52
53    struct MockInner {
54        id: String,
55        output: String,
56    }
57
58    impl MockInner {
59        fn new(id: &str, output: &str) -> Self {
60            Self {
61                id: id.into(),
62                output: output.into(),
63            }
64        }
65    }
66
67    #[async_trait::async_trait]
68    impl Step for MockInner {
69        fn id(&self) -> &str {
70            &self.id
71        }
72
73        async fn execute(&self, _ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
74            Ok(StepOutput::new(&self.output))
75        }
76    }
77
78    // === ConditionalStep Tests ===
79
80    #[tokio::test]
81    async fn executes_inner_when_condition_is_true() {
82        let inner = MockInner::new("check", "executed");
83        let step = ConditionalStep::new("check", inner, |_ctx| true);
84
85        let mut ctx = WorkflowContext::new();
86        let output = step.execute(&mut ctx).await.unwrap();
87
88        assert_eq!(output.value(), "executed");
89        assert!(!output.is_skipped());
90    }
91
92    #[tokio::test]
93    async fn skips_when_condition_is_false() {
94        let inner = MockInner::new("check", "executed");
95        let step = ConditionalStep::new("check", inner, |_ctx| false);
96
97        let mut ctx = WorkflowContext::new();
98        let output = step.execute(&mut ctx).await.unwrap();
99
100        assert!(output.is_skipped());
101        assert_eq!(output.value(), "");
102    }
103
104    #[tokio::test]
105    async fn condition_receives_workflow_context() {
106        let inner = MockInner::new("check", "ran");
107        let step = ConditionalStep::new("check", inner, |ctx| {
108            // Only run if step "gate" produced "open"
109            ctx.output("gate")
110                .is_some_and(|o| o.value() == "open")
111        });
112
113        // Without gate output → skip
114        let mut ctx = WorkflowContext::new();
115        let output = step.execute(&mut ctx).await.unwrap();
116        assert!(output.is_skipped());
117
118        // With gate output "closed" → skip
119        let mut ctx2 = WorkflowContext::new();
120        ctx2.set_output("gate", StepOutput::new("closed"));
121        let output2 = step.execute(&mut ctx2).await.unwrap();
122        assert!(output2.is_skipped());
123
124        // With gate output "open" → execute
125        let mut ctx3 = WorkflowContext::new();
126        ctx3.set_output("gate", StepOutput::new("open"));
127        let output3 = step.execute(&mut ctx3).await.unwrap();
128        assert_eq!(output3.value(), "ran");
129    }
130
131    #[test]
132    fn returns_correct_id() {
133        let inner = MockInner::new("inner_id", "val");
134        let step = ConditionalStep::new("my_cond", inner, |_| true);
135        assert_eq!(step.id(), "my_cond");
136    }
137
138    #[tokio::test]
139    async fn inner_error_propagates() {
140        struct FailInner;
141
142        #[async_trait::async_trait]
143        impl Step for FailInner {
144            #[allow(clippy::unnecessary_literal_bound)]
145            fn id(&self) -> &str {
146                "fail"
147            }
148            async fn execute(
149                &self,
150                _ctx: &mut WorkflowContext,
151            ) -> Result<StepOutput, WorkflowError> {
152                Err(WorkflowError::StepFailed {
153                    step_id: "fail".into(),
154                    message: "boom".into(),
155                })
156            }
157        }
158
159        let step = ConditionalStep::new("cond_fail", FailInner, |_| true);
160        let mut ctx = WorkflowContext::new();
161        let result = step.execute(&mut ctx).await;
162
163        assert!(result.is_err());
164    }
165
166    #[tokio::test]
167    async fn skipped_output_has_metadata_marker() {
168        let output = StepOutput::skipped();
169        assert!(output.is_skipped());
170        assert_eq!(output.value(), "");
171    }
172
173    #[tokio::test]
174    async fn integrates_with_workflow_builder() {
175        use crate::builder::Workflow;
176        use crate::engine::WorkflowEngine;
177
178        struct GateStep;
179
180        #[async_trait::async_trait]
181        impl Step for GateStep {
182            #[allow(clippy::unnecessary_literal_bound)]
183            fn id(&self) -> &str {
184                "gate"
185            }
186            async fn execute(
187                &self,
188                _ctx: &mut WorkflowContext,
189            ) -> Result<StepOutput, WorkflowError> {
190                Ok(StepOutput::new("open"))
191            }
192        }
193
194        let workflow = Workflow::builder()
195            .step(GateStep, &[])
196            .step(
197                ConditionalStep::new("guarded", MockInner::new("guarded", "success"), |ctx| {
198                    ctx.output("gate")
199                        .is_some_and(|o| o.value() == "open")
200                }),
201                &["gate"],
202            )
203            .build()
204            .unwrap();
205
206        let engine = WorkflowEngine::new();
207        let result = engine.run(workflow).await.unwrap();
208
209        assert_eq!(result.output("guarded").unwrap().value(), "success");
210    }
211
212    #[tokio::test]
213    async fn skipped_step_in_workflow_still_completes() {
214        use crate::builder::Workflow;
215        use crate::engine::WorkflowEngine;
216
217        struct GateStep;
218
219        #[async_trait::async_trait]
220        impl Step for GateStep {
221            #[allow(clippy::unnecessary_literal_bound)]
222            fn id(&self) -> &str {
223                "gate"
224            }
225            async fn execute(
226                &self,
227                _ctx: &mut WorkflowContext,
228            ) -> Result<StepOutput, WorkflowError> {
229                Ok(StepOutput::new("closed"))
230            }
231        }
232
233        let workflow = Workflow::builder()
234            .step(GateStep, &[])
235            .step(
236                ConditionalStep::new("guarded", MockInner::new("guarded", "ran"), |ctx| {
237                    ctx.output("gate")
238                        .is_some_and(|o| o.value() == "open")
239                }),
240                &["gate"],
241            )
242            .build()
243            .unwrap();
244
245        let engine = WorkflowEngine::new();
246        let result = engine.run(workflow).await.unwrap();
247
248        // Step is "completed" but output is skipped
249        assert!(result.is_completed("guarded"));
250        assert!(result.output("guarded").unwrap().is_skipped());
251    }
252}