Skip to main content

Crate wfe_core

Crate wfe_core 

Source
Expand description

wfe-core — Core traits, models, builder, executor, and primitives for the WFE persistent workflow engine.

§What is WFE?

WFE (Workflow Engine) is a trait-based, pluggable workflow engine for Rust. It is designed for long-running, persistent workflows that survive process restarts. You define workflows as code using a fluent builder API, and the executor drives them to completion with support for parallel branches, conditional logic, loops, saga compensation, and event-driven pausing.

§Core concepts

ConceptDescription
StepBodyThe trait you implement to define a unit of work.
WorkflowDataThe data type that flows between steps. Must be serializable.
WorkflowBuilderFluent API for composing workflow definitions.
StepBuilderPer-step configuration (name, error handling, compensation).
WorkflowExecutorDrives execution: acquires locks, runs steps, persists state.
StepExecutionContextRuntime context passed to each step (data, pointers, tokens).
ExecutionResultWhat a step returns to control flow (next, branch, sleep, etc.).
WorkflowDefinitionThe compiled, serializable blueprint of a workflow.
WorkflowInstanceA running (or persisted) execution of a definition.
ExecutionPointerTracks the position of a single branch of execution.

§Hello workflow

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use wfe_core::builder::WorkflowBuilder;
use wfe_core::models::ExecutionResult;
use wfe_core::traits::step::{StepBody, StepExecutionContext};

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct OrderData {
    order_id: String,
    amount: f64,
}

#[derive(Default)]
struct ValidateOrder;

#[async_trait]
impl StepBody for ValidateOrder {
    async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
        let data: OrderData = ctx.workflow.data()?;
        if data.amount <= 0.0 {
            return Err(wfe_core::WfeError::Execution("amount must be positive".into()));
        }
        Ok(ExecutionResult::next())
    }
}

#[derive(Default)]
struct ProcessPayment;

#[async_trait]
impl StepBody for ProcessPayment {
    async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
        println!("Processing payment...");
        Ok(ExecutionResult::next())
    }
}

let definition = WorkflowBuilder::<OrderData>::new()
    .start_with::<ValidateOrder>()
        .name("Validate")
    .then::<ProcessPayment>()
        .name("Payment")
    .end_workflow()
    .build("order-pipeline", 1);

§Builder patterns

The builder supports linear chains, containers, and control flow:

  • .then::<S>() — sequential next step
  • .parallel(|b| { b.add_step_typed::<A>("a", None); b.add_step_typed::<B>("b", None); }) — run branches in parallel
  • .if_do(|b| { b.add_step_typed::<Then>("then", None); }) — conditional branch
  • .while_do(|b| { b.add_step_typed::<LoopBody>("loop", None); }) — loop while condition holds
  • .for_each("items", |b| { b.add_step_typed::<ProcessItem>("item", None); }) — iterate over a collection
  • .saga(|b| { b.add_step_typed::<Do>("do", None); }).compensate_with::<Undo>() — saga with compensation
  • .wait_for("event_name", "event_key") — suspend until an external event arrives
  • .delay(Duration::from_secs(30)) — pause for a fixed duration
  • .then_fn(|| { println!("inline"); ExecutionResult::next() }) — inline closure step

§Execution model

  1. You build a WorkflowDefinition using the builder API.
  2. You register the definition and all step types with a WorkflowHost (from the wfe crate).
  3. The host creates a WorkflowInstance, persists it, and queues it for execution.
  4. The WorkflowExecutor picks up the instance, acquires a distributed lock, and runs each active ExecutionPointer.
  5. After each step, the executor processes the ExecutionResult — branching, suspending, or completing pointers — and persists the new state.
  6. The lock is released, and the instance is re-queued if there is more work to do.

This means workflows are durable: if the process crashes after step 3, the next executor invocation will resume from step 4 because the pointer state was persisted.

§Primitives

Built-in control-flow steps live in primitives:

PrimitivePurpose
IfStepConditional branching with then/else children
WhileStepLoop while a condition evaluates to true
ForEachStepIterate over a JSON array in workflow data
SequenceStepParallel branch container (all children run concurrently)
DecideStepMulti-way branch (switch/case style)
DelayStepPause execution for a duration
ScheduleStepResume at a specific wall-clock time
WaitForStepSuspend until an external event is published
SagaContainerStepTransaction-like container with compensation on failure
RecurStepRecurring/periodic execution
PollEndpointStepPoll an HTTP endpoint until a condition is met
SubWorkflowStepStart a child workflow and wait for it to complete
EndStepExplicit workflow termination

§Error handling

Steps return ExecutionResult which can signal:

When a step fails, the executor checks the step’s ErrorBehavior:

  • Retry { interval, max_retries } — retry with backoff
  • Suspend — pause the workflow for manual intervention
  • CompensateThenRetry — run the compensation step, then retry
  • CompensateThenSuspend — run compensation, then suspend

§Feature flags

FeatureDescription
test-supportIn-memory persistence, lock, and queue providers for unit testing
otelOpenTelemetry tracing integration

§Testing

cargo test -p wfe-core

No external dependencies required.

Re-exports§

pub use artifact_volume::ArtifactVolume;
pub use artifact_volume::ArtifactVolumePackage;
pub use error::Result;
pub use error::WfeError;
pub use local_artifact_store::LocalArtifactStore;
pub use local_artifact_store::extract_artifact_to_dir;

Modules§

artifact_volume
Artifact volume abstraction for distributed workflow execution. Artifact volume abstraction for distributed workflow execution.
builder
Fluent builder API for composing workflow definitions.
error
Error types and the Result alias used throughout WFE.
executor
The workflow executor and supporting infrastructure.
local_artifact_store
Local filesystem artifact store (OCI Image Layout). Local filesystem-based artifact store using the OCI Image Layout.
models
Data models for workflows, instances, pointers, events, and execution results. Core data models for workflows, execution pointers, events, and results.
primitives
Built-in control-flow primitives (if, while, foreach, saga, etc.). Built-in control-flow primitives for workflow definitions.
traits
Core traits that define the plugin architecture.