wfe-core 1.0.0

Core traits, models, builder, and executor for the WFE workflow engine
Documentation
use std::collections::HashMap;
use std::marker::PhantomData;

use crate::models::{
    ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
};
use crate::traits::step::{StepBody, WorkflowData};

use super::inline_step::InlineStep;
use super::step_builder::StepBuilder;

/// Type alias for boxed inline step closures.
pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;

/// Fluent builder for constructing workflow definitions.
///
/// Uses an owned-self pattern: each method consumes and returns the builder,
/// avoiding lifetime issues with mutable borrows.
///
/// # Example
/// ```ignore
/// let def = WorkflowBuilder::<MyData>::new()
///     .start_with::<StepA>()
///     .name("Step A")
///     .then::<StepB>()
///     .name("Step B")
///     .end_workflow()
///     .build("my-workflow", 1);
/// ```
pub struct WorkflowBuilder<D: WorkflowData> {
    pub(crate) steps: Vec<WorkflowStep>,
    pub(crate) last_step: Option<usize>,
    /// Inline closures keyed by step id, stored for later registration.
    pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
    _phantom: PhantomData<D>,
}

impl<D: WorkflowData> WorkflowBuilder<D> {
    pub fn new() -> Self {
        Self {
            steps: Vec::new(),
            last_step: None,
            inline_closures: HashMap::new(),
            _phantom: PhantomData,
        }
    }

    /// Add the first step of the workflow.
    pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
        let id = self.steps.len();
        let step = WorkflowStep::new(id, std::any::type_name::<S>());
        self.steps.push(step);
        self.last_step = Some(id);
        StepBuilder::new(self, id)
    }

    /// Add a step by type name. Used by container builder closures.
    pub fn add_step(&mut self, step_type: &str) -> usize {
        let id = self.steps.len();
        self.steps.push(WorkflowStep::new(id, step_type));
        id
    }

    /// Wire an outcome from `from_step` to `to_step`.
    pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
        if let Some(step) = self.steps.get_mut(from_step) {
            step.outcomes.push(StepOutcome {
                next_step: to_step,
                label: None,
                value,
            });
        }
    }

    /// Add a child step ID to a parent container step.
    pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
        if let Some(step) = self.steps.get_mut(parent) {
            step.children.push(child);
        }
    }

    /// Compile the builder into a WorkflowDefinition.
    pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
        let mut def = WorkflowDefinition::new(id, version);
        def.steps = self.steps;
        // Note: inline closures are dropped here. Use `build_with_closures` to retain them.
        def
    }

    /// Compile the builder into a WorkflowDefinition and return any inline closures
    /// keyed by step id.
    pub fn build_with_closures(
        self,
        id: impl Into<String>,
        version: u32,
    ) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
        let mut def = WorkflowDefinition::new(id, version);
        def.steps = self.steps;
        (def, self.inline_closures)
    }

    /// Register all inline closures from this builder into the given step registry.
    ///
    /// Each inline closure is registered under a unique key derived from the
    /// `InlineStep` type name and step id.
    pub fn register_inline_steps(
        self,
        registry: &mut crate::executor::StepRegistry,
        id: impl Into<String>,
        version: u32,
    ) -> WorkflowDefinition {
        let mut def = WorkflowDefinition::new(id, version);
        def.steps = self.steps;
        for (step_id, closure) in self.inline_closures {
            let closure = std::sync::Arc::new(closure);
            let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
            // Update the step_type so the executor resolves correctly.
            if let Some(step) = def.steps.get_mut(step_id) {
                step.step_type = key.clone();
            }
            let closure = closure.clone();
            registry.register_factory(&key, move || {
                let c = closure.clone();
                Box::new(InlineStep::new(move || (c)()))
            });
        }
        def
    }
}

impl<D: WorkflowData> Default for WorkflowBuilder<D> {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::models::{ErrorBehavior, ExecutionResult};
    use crate::traits::step::StepExecutionContext;
    use pretty_assertions::assert_eq;
    use serde::{Deserialize, Serialize};

    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
    struct TestData {
        counter: i32,
    }

    #[derive(Default)]
    struct StepA;

    #[async_trait::async_trait]
    impl StepBody for StepA {
        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
            Ok(ExecutionResult::next())
        }
    }

    #[derive(Default)]
    struct StepB;

    #[async_trait::async_trait]
    impl StepBody for StepB {
        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
            Ok(ExecutionResult::next())
        }
    }

    #[derive(Default)]
    struct StepC;

    #[async_trait::async_trait]
    impl StepBody for StepC {
        async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
            Ok(ExecutionResult::next())
        }
    }

    #[test]
    fn build_empty_workflow() {
        let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
        assert_eq!(def.id, "empty");
        assert_eq!(def.version, 1);
        assert!(def.steps.is_empty());
    }

    #[test]
    fn start_with_adds_first_step() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .end_workflow()
            .build("test", 1);
        assert_eq!(def.steps.len(), 1);
        assert!(def.steps[0].step_type.contains("StepA"));
    }

    #[test]
    fn then_chains_two_steps_with_outcome() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .then::<StepB>()
            .end_workflow()
            .build("test", 1);
        assert_eq!(def.steps.len(), 2);
        // Step 0 should have outcome pointing to step 1
        assert_eq!(def.steps[0].outcomes.len(), 1);
        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
    }

    #[test]
    fn then_chains_three_steps() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .then::<StepB>()
            .then::<StepC>()
            .end_workflow()
            .build("test", 1);
        assert_eq!(def.steps.len(), 3);
        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
        assert_eq!(def.steps[1].outcomes[0].next_step, 2);
        assert!(def.steps[2].outcomes.is_empty());
    }

    #[test]
    fn name_sets_step_name() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .name("First Step")
            .end_workflow()
            .build("test", 1);
        assert_eq!(def.steps[0].name, Some("First Step".into()));
    }

    #[test]
    fn on_error_sets_behavior() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .on_error(ErrorBehavior::Suspend)
            .end_workflow()
            .build("test", 1);
        assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
    }

    #[test]
    fn if_do_inserts_container_with_children() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .if_do::<StepB>(|b| {
                let id = b.add_step(std::any::type_name::<StepC>());
                b.last_step = Some(id);
            })
            .end_workflow()
            .build("test", 1);

        // Steps: 0=StepA, 1=IfStep, 2=StepC (child)
        // StepA -> IfStep -> (after if)
        assert!(def.steps.len() >= 3);
        // The If step should have StepC as a child
        assert!(def.steps[1].step_type.contains("IfStep"));
        assert!(def.steps[1].children.contains(&2));
    }

    #[test]
    fn while_do_inserts_container() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .while_do::<StepB>(|b| {
                b.add_step(std::any::type_name::<StepC>());
            })
            .end_workflow()
            .build("test", 1);

        assert!(def.steps.len() >= 3);
        assert!(def.steps[1].step_type.contains("WhileStep"));
    }

    #[test]
    fn for_each_inserts_container() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .for_each::<StepB>(|b| {
                b.add_step(std::any::type_name::<StepC>());
            })
            .end_workflow()
            .build("test", 1);

        assert!(def.steps.len() >= 3);
        assert!(def.steps[1].step_type.contains("ForEachStep"));
    }

    #[test]
    fn parallel_creates_branches() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .parallel(|branches| {
                branches
                    .branch(|b| {
                        b.add_step(std::any::type_name::<StepB>());
                    })
                    .branch(|b| {
                        b.add_step(std::any::type_name::<StepC>());
                    })
            })
            .end_workflow()
            .build("test", 1);

        // Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC
        assert!(def.steps.len() >= 4);
        assert!(def.steps[1].step_type.contains("SequenceStep"));
        assert!(def.steps[1].children.len() >= 2);
    }

    #[test]
    fn saga_with_compensation() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .saga(|b| {
                b.add_step(std::any::type_name::<StepB>());
                b.add_step(std::any::type_name::<StepC>());
            })
            .end_workflow()
            .build("test", 1);

        // Saga container should exist and have children
        assert!(def.steps[1].step_type.contains("SagaContainerStep"));
        assert!(def.steps[1].saga);
        assert!(!def.steps[1].children.is_empty());
    }

    #[test]
    fn compensate_with_sets_compensation_step() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .compensate_with::<StepB>()
            .end_workflow()
            .build("test", 1);

        // Step 0 (StepA) should have compensation pointing to step 1 (StepB)
        assert_eq!(def.steps[0].compensation_step_id, Some(1));
        assert!(def.steps[1].step_type.contains("StepB"));
    }

    #[test]
    fn inline_step_via_then_fn() {
        let def = WorkflowBuilder::<TestData>::new()
            .start_with::<StepA>()
            .then_fn(ExecutionResult::next)
            .end_workflow()
            .build("test", 1);

        assert_eq!(def.steps.len(), 2);
        assert!(def.steps[1].step_type.contains("InlineStep"));
        assert_eq!(def.steps[0].outcomes[0].next_step, 1);
    }
}