use crate::WorkflowError;
use crate::context::WorkflowContext;
use crate::step::{Step, StepOutput};
pub struct ConditionalStep<S: Step> {
id: String,
inner: S,
condition: Box<dyn Fn(&WorkflowContext) -> bool + Send + Sync>,
}
impl<S: Step> ConditionalStep<S> {
pub fn new(
id: &str,
inner: S,
condition: impl Fn(&WorkflowContext) -> bool + Send + Sync + 'static,
) -> Self {
Self {
id: id.into(),
inner,
condition: Box::new(condition),
}
}
}
#[async_trait::async_trait]
impl<S: Step + 'static> Step for ConditionalStep<S> {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
if (self.condition)(ctx) {
self.inner.execute(ctx).await
} else {
Ok(StepOutput::skipped())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MockInner {
id: String,
output: String,
}
impl MockInner {
fn new(id: &str, output: &str) -> Self {
Self {
id: id.into(),
output: output.into(),
}
}
}
#[async_trait::async_trait]
impl Step for MockInner {
fn id(&self) -> &str {
&self.id
}
async fn execute(&self, _ctx: &mut WorkflowContext) -> Result<StepOutput, WorkflowError> {
Ok(StepOutput::new(&self.output))
}
}
#[tokio::test]
async fn executes_inner_when_condition_is_true() {
let inner = MockInner::new("check", "executed");
let step = ConditionalStep::new("check", inner, |_ctx| true);
let mut ctx = WorkflowContext::new();
let output = step.execute(&mut ctx).await.unwrap();
assert_eq!(output.value(), "executed");
assert!(!output.is_skipped());
}
#[tokio::test]
async fn skips_when_condition_is_false() {
let inner = MockInner::new("check", "executed");
let step = ConditionalStep::new("check", inner, |_ctx| false);
let mut ctx = WorkflowContext::new();
let output = step.execute(&mut ctx).await.unwrap();
assert!(output.is_skipped());
assert_eq!(output.value(), "");
}
#[tokio::test]
async fn condition_receives_workflow_context() {
let inner = MockInner::new("check", "ran");
let step = ConditionalStep::new("check", inner, |ctx| {
ctx.output("gate")
.is_some_and(|o| o.value() == "open")
});
let mut ctx = WorkflowContext::new();
let output = step.execute(&mut ctx).await.unwrap();
assert!(output.is_skipped());
let mut ctx2 = WorkflowContext::new();
ctx2.set_output("gate", StepOutput::new("closed"));
let output2 = step.execute(&mut ctx2).await.unwrap();
assert!(output2.is_skipped());
let mut ctx3 = WorkflowContext::new();
ctx3.set_output("gate", StepOutput::new("open"));
let output3 = step.execute(&mut ctx3).await.unwrap();
assert_eq!(output3.value(), "ran");
}
#[test]
fn returns_correct_id() {
let inner = MockInner::new("inner_id", "val");
let step = ConditionalStep::new("my_cond", inner, |_| true);
assert_eq!(step.id(), "my_cond");
}
#[tokio::test]
async fn inner_error_propagates() {
struct FailInner;
#[async_trait::async_trait]
impl Step for FailInner {
#[allow(clippy::unnecessary_literal_bound)]
fn id(&self) -> &str {
"fail"
}
async fn execute(
&self,
_ctx: &mut WorkflowContext,
) -> Result<StepOutput, WorkflowError> {
Err(WorkflowError::StepFailed {
step_id: "fail".into(),
message: "boom".into(),
})
}
}
let step = ConditionalStep::new("cond_fail", FailInner, |_| true);
let mut ctx = WorkflowContext::new();
let result = step.execute(&mut ctx).await;
assert!(result.is_err());
}
#[tokio::test]
async fn skipped_output_has_metadata_marker() {
let output = StepOutput::skipped();
assert!(output.is_skipped());
assert_eq!(output.value(), "");
}
#[tokio::test]
async fn integrates_with_workflow_builder() {
use crate::builder::Workflow;
use crate::engine::WorkflowEngine;
struct GateStep;
#[async_trait::async_trait]
impl Step for GateStep {
#[allow(clippy::unnecessary_literal_bound)]
fn id(&self) -> &str {
"gate"
}
async fn execute(
&self,
_ctx: &mut WorkflowContext,
) -> Result<StepOutput, WorkflowError> {
Ok(StepOutput::new("open"))
}
}
let workflow = Workflow::builder()
.step(GateStep, &[])
.step(
ConditionalStep::new("guarded", MockInner::new("guarded", "success"), |ctx| {
ctx.output("gate")
.is_some_and(|o| o.value() == "open")
}),
&["gate"],
)
.build()
.unwrap();
let engine = WorkflowEngine::new();
let result = engine.run(workflow).await.unwrap();
assert_eq!(result.output("guarded").unwrap().value(), "success");
}
#[tokio::test]
async fn skipped_step_in_workflow_still_completes() {
use crate::builder::Workflow;
use crate::engine::WorkflowEngine;
struct GateStep;
#[async_trait::async_trait]
impl Step for GateStep {
#[allow(clippy::unnecessary_literal_bound)]
fn id(&self) -> &str {
"gate"
}
async fn execute(
&self,
_ctx: &mut WorkflowContext,
) -> Result<StepOutput, WorkflowError> {
Ok(StepOutput::new("closed"))
}
}
let workflow = Workflow::builder()
.step(GateStep, &[])
.step(
ConditionalStep::new("guarded", MockInner::new("guarded", "ran"), |ctx| {
ctx.output("gate")
.is_some_and(|o| o.value() == "open")
}),
&["gate"],
)
.build()
.unwrap();
let engine = WorkflowEngine::new();
let result = engine.run(workflow).await.unwrap();
assert!(result.is_completed("guarded"));
assert!(result.output("guarded").unwrap().is_skipped());
}
}