erio_workflow/
conditional.rs1use crate::WorkflowError;
4use crate::context::WorkflowContext;
5use crate::step::{Step, StepOutput};
6
7pub struct ConditionalStep<S: Step> {
12 id: String,
13 inner: S,
14 condition: Box<dyn Fn(&WorkflowContext) -> bool + Send + Sync>,
15}
16
17impl<S: Step> ConditionalStep<S> {
18 pub fn new(
20 id: &str,
21 inner: S,
22 condition: impl Fn(&WorkflowContext) -> bool + Send + Sync + 'static,
23 ) -> Self {
24 Self {
25 id: id.into(),
26 inner,
27 condition: Box::new(condition),
28 }
29 }
30}
31
32#[async_trait::async_trait]
33impl<S: Step + 'static> Step for ConditionalStep<S> {
34 fn id(&self) -> &str {
35 &self.id
36 }
37
38 async fn execute(&self, ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
39 if (self.condition)(ctx) {
40 self.inner.execute(ctx).await
41 } else {
42 Ok(StepOutput::skipped())
43 }
44 }
45}
46
47#[cfg(test)]
48mod tests {
49 use super::*;
50
51 struct MockInner {
54 id: String,
55 output: String,
56 }
57
58 impl MockInner {
59 fn new(id: &str, output: &str) -> Self {
60 Self {
61 id: id.into(),
62 output: output.into(),
63 }
64 }
65 }
66
67 #[async_trait::async_trait]
68 impl Step for MockInner {
69 fn id(&self) -> &str {
70 &self.id
71 }
72
73 async fn execute(&self, _ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
74 Ok(StepOutput::new(&self.output))
75 }
76 }
77
78 #[tokio::test]
81 async fn executes_inner_when_condition_is_true() {
82 let inner = MockInner::new("check", "executed");
83 let step = ConditionalStep::new("check", inner, |_ctx| true);
84
85 let mut ctx = WorkflowContext::new();
86 let output = step.execute(&mut ctx).await.unwrap();
87
88 assert_eq!(output.value(), "executed");
89 assert!(!output.is_skipped());
90 }
91
92 #[tokio::test]
93 async fn skips_when_condition_is_false() {
94 let inner = MockInner::new("check", "executed");
95 let step = ConditionalStep::new("check", inner, |_ctx| false);
96
97 let mut ctx = WorkflowContext::new();
98 let output = step.execute(&mut ctx).await.unwrap();
99
100 assert!(output.is_skipped());
101 assert_eq!(output.value(), "");
102 }
103
104 #[tokio::test]
105 async fn condition_receives_workflow_context() {
106 let inner = MockInner::new("check", "ran");
107 let step = ConditionalStep::new("check", inner, |ctx| {
108 ctx.output("gate")
110 .is_some_and(|o| o.value() == "open")
111 });
112
113 let mut ctx = WorkflowContext::new();
115 let output = step.execute(&mut ctx).await.unwrap();
116 assert!(output.is_skipped());
117
118 let mut ctx2 = WorkflowContext::new();
120 ctx2.set_output("gate", StepOutput::new("closed"));
121 let output2 = step.execute(&mut ctx2).await.unwrap();
122 assert!(output2.is_skipped());
123
124 let mut ctx3 = WorkflowContext::new();
126 ctx3.set_output("gate", StepOutput::new("open"));
127 let output3 = step.execute(&mut ctx3).await.unwrap();
128 assert_eq!(output3.value(), "ran");
129 }
130
131 #[test]
132 fn returns_correct_id() {
133 let inner = MockInner::new("inner_id", "val");
134 let step = ConditionalStep::new("my_cond", inner, |_| true);
135 assert_eq!(step.id(), "my_cond");
136 }
137
138 #[tokio::test]
139 async fn inner_error_propagates() {
140 struct FailInner;
141
142 #[async_trait::async_trait]
143 impl Step for FailInner {
144 #[allow(clippy::unnecessary_literal_bound)]
145 fn id(&self) -> &str {
146 "fail"
147 }
148 async fn execute(
149 &self,
150 _ctx: &mut WorkflowContext,
151 ) -> Result<StepOutput, WorkflowError> {
152 Err(WorkflowError::StepFailed {
153 step_id: "fail".into(),
154 message: "boom".into(),
155 })
156 }
157 }
158
159 let step = ConditionalStep::new("cond_fail", FailInner, |_| true);
160 let mut ctx = WorkflowContext::new();
161 let result = step.execute(&mut ctx).await;
162
163 assert!(result.is_err());
164 }
165
166 #[tokio::test]
167 async fn skipped_output_has_metadata_marker() {
168 let output = StepOutput::skipped();
169 assert!(output.is_skipped());
170 assert_eq!(output.value(), "");
171 }
172
173 #[tokio::test]
174 async fn integrates_with_workflow_builder() {
175 use crate::builder::Workflow;
176 use crate::engine::WorkflowEngine;
177
178 struct GateStep;
179
180 #[async_trait::async_trait]
181 impl Step for GateStep {
182 #[allow(clippy::unnecessary_literal_bound)]
183 fn id(&self) -> &str {
184 "gate"
185 }
186 async fn execute(
187 &self,
188 _ctx: &mut WorkflowContext,
189 ) -> Result<StepOutput, WorkflowError> {
190 Ok(StepOutput::new("open"))
191 }
192 }
193
194 let workflow = Workflow::builder()
195 .step(GateStep, &[])
196 .step(
197 ConditionalStep::new("guarded", MockInner::new("guarded", "success"), |ctx| {
198 ctx.output("gate")
199 .is_some_and(|o| o.value() == "open")
200 }),
201 &["gate"],
202 )
203 .build()
204 .unwrap();
205
206 let engine = WorkflowEngine::new();
207 let result = engine.run(workflow).await.unwrap();
208
209 assert_eq!(result.output("guarded").unwrap().value(), "success");
210 }
211
212 #[tokio::test]
213 async fn skipped_step_in_workflow_still_completes() {
214 use crate::builder::Workflow;
215 use crate::engine::WorkflowEngine;
216
217 struct GateStep;
218
219 #[async_trait::async_trait]
220 impl Step for GateStep {
221 #[allow(clippy::unnecessary_literal_bound)]
222 fn id(&self) -> &str {
223 "gate"
224 }
225 async fn execute(
226 &self,
227 _ctx: &mut WorkflowContext,
228 ) -> Result<StepOutput, WorkflowError> {
229 Ok(StepOutput::new("closed"))
230 }
231 }
232
233 let workflow = Workflow::builder()
234 .step(GateStep, &[])
235 .step(
236 ConditionalStep::new("guarded", MockInner::new("guarded", "ran"), |ctx| {
237 ctx.output("gate")
238 .is_some_and(|o| o.value() == "open")
239 }),
240 &["gate"],
241 )
242 .build()
243 .unwrap();
244
245 let engine = WorkflowEngine::new();
246 let result = engine.run(workflow).await.unwrap();
247
248 assert!(result.is_completed("guarded"));
250 assert!(result.output("guarded").unwrap().is_skipped());
251 }
252}