wfaas
A high-performance, async Rust workflow engine for orchestrating multi-step operations with full DAG (Directed Acyclic Graph) support.
Features
- Parallel Execution - Independent steps run concurrently via DAG-based scheduling
- Flexible Dependencies - Support for
depends_on (all) and depends_on_any (at least one) semantics
- Configurable Retry Logic - Fixed, exponential, or linear backoff strategies per step
- Conditional Branching - Runtime conditions to skip or execute steps
- Scheduled Execution - Delay steps or schedule them for specific times
- State Persistence - Pluggable state stores for durability and recovery
- Event-Driven Observability - Subscribe to workflow and step lifecycle events
- Graceful Shutdown - Clean shutdown with timeout and force-cancel options
Installation
Add to your Cargo.toml:
[dependencies]
wfaas = "1.0"
Quick Start
use wfaas::{
WorkflowEngine, WorkflowDefinition, StepDefinition, StepExecutor, StepResult,
WorkflowContext, WorkflowData, WorkflowId, WorkflowResult,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Serialize, Deserialize, Default)]
struct MyWorkflowData {
input: String,
result: Option<String>,
}
impl WorkflowData for MyWorkflowData {
fn workflow_type() -> &'static str {
"my_workflow"
}
}
struct ProcessStep;
#[async_trait]
impl StepExecutor<MyWorkflowData> for ProcessStep {
async fn execute(&self, ctx: &mut WorkflowContext<MyWorkflowData>) -> WorkflowResult<StepResult> {
ctx.data.result = Some(format!("Processed: {}", ctx.data.input));
Ok(StepResult::Success)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let engine = WorkflowEngine::new();
let workflow = WorkflowDefinition::new("process_workflow", "Process Workflow")
.add_step(StepDefinition::new("process", "Process Input", Arc::new(ProcessStep)));
engine.register_workflow(workflow)?;
let instance_id = engine
.start_workflow(WorkflowId::new("process_workflow"), MyWorkflowData {
input: "Hello, World!".into(),
result: None,
})
.await?;
match engine.wait_for_completion(instance_id, "example", Duration::from_secs(30)).await {
Ok(msg) => println!("{}", msg),
Err(e) => return Err(e.into()),
}
Ok(())
}
Core Concepts
WorkflowData
Your workflow context must implement WorkflowData:
pub trait WorkflowData: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {
fn workflow_type() -> &'static str;
}
This enables state serialization for persistence and recovery.
Handling Non-Serializable Fields
For fields that cannot be serialized (like database connections or application context), use #[serde(skip)]:
#[derive(Clone, Serialize, Deserialize)]
struct MyWorkflowData {
pub config: MyConfig,
pub result: Option<String>,
#[serde(skip, default)]
pub app_context: Option<Arc<AppContext>>,
}
StepExecutor
Each step requires an executor that implements StepExecutor<D>:
#[async_trait]
pub trait StepExecutor<D: WorkflowData>: Send + Sync {
async fn execute(&self, context: &mut WorkflowContext<D>) -> WorkflowResult<StepResult>;
fn is_retryable(&self, _error: &WorkflowError) -> bool { true }
async fn on_success(&self, _context: &WorkflowContext<D>) -> WorkflowResult<()> { Ok(()) }
async fn on_failure(&self, _context: &WorkflowContext<D>, _error: &WorkflowError) -> WorkflowResult<()> { Ok(()) }
}
FunctionStep
For simple steps, use FunctionStep instead of implementing a struct:
use wfaas::FunctionStep;
let step = FunctionStep::new(|ctx| Box::pin(async move {
ctx.data.result = Some("done".into());
Ok(StepResult::Success)
}));
Step Results
pub enum StepResult {
Success, Failure, Skip, }
Building DAG Workflows
Dependencies
Steps can declare dependencies on other steps:
let workflow = WorkflowDefinition::new("dag_example", "DAG Example")
.add_step(StepDefinition::new("fetch_a", "Fetch A", Arc::new(FetchA)))
.add_step(StepDefinition::new("fetch_b", "Fetch B", Arc::new(FetchB)))
.add_step(
StepDefinition::new("combine", "Combine Results", Arc::new(Combine))
.depends_on(&["fetch_a", "fetch_b"])
);
Execution flow:
fetch_a ──┐
├──> combine
fetch_b ──┘
depends_on vs depends_on_any
depends_on(&["a", "b"]) - Waits for ALL listed steps to complete
depends_on_any(&["a", "b"]) - Waits for AT LEAST ONE to complete
.add_step(
StepDefinition::new("final", "Final Step", Arc::new(FinalStep))
.depends_on(&["step_a"])
.depends_on_any(&["step_b", "step_c"])
)
Retry Configuration
Configure retry behavior per step or set workflow defaults:
use wfaas::{RetryPolicy, BackoffStrategy};
.add_step(
StepDefinition::new("unreliable", "Unreliable Step", Arc::new(UnreliableStep))
.with_retry(RetryPolicy {
max_attempts: 5,
backoff: BackoffStrategy::Exponential {
base: Duration::from_millis(100),
max: Duration::from_secs(30),
},
})
.with_timeout(Duration::from_secs(60))
)
let workflow = WorkflowDefinition::new("retry_example", "Retry Example")
.with_default_retry(RetryPolicy {
max_attempts: 3,
backoff: BackoffStrategy::Fixed(Duration::from_secs(1)),
})
.with_default_timeout(Duration::from_secs(30));
Backoff Strategies
pub enum BackoffStrategy {
Fixed(Duration), Exponential { base: Duration, max: Duration }, Linear { increment: Duration, max: Duration }, }
Failure Handling
Control what happens when a step fails:
use wfaas::FailureAction;
.add_step(
StepDefinition::new("optional", "Optional Step", Arc::new(OptionalStep))
.with_failure_action(FailureAction::ContinueNextStep) )
Options:
FailureAction::FailWorkflow - Entire workflow fails (default)
FailureAction::ContinueNextStep - Skip failed step, continue workflow
FailureAction::RetryIndefinitely - Keep retrying until manual intervention
Conditional Execution
Skip steps based on runtime conditions:
.add_step(
StepDefinition::new("conditional", "Conditional Step", Arc::new(ConditionalStep))
.run_if(|ctx| ctx.data.should_execute)
)
When a condition returns false, the step is skipped but dependents can still proceed.
Scheduled and Delayed Execution
use chrono::{Utc, Duration as ChronoDuration};
.add_step(
StepDefinition::new("delayed", "Delayed Step", Arc::new(DelayedStep))
.with_delay(Duration::from_secs(60))
)
.add_step(
StepDefinition::new("scheduled", "Scheduled Step", Arc::new(ScheduledStep))
.scheduled_at(Utc::now() + ChronoDuration::hours(1))
)
Event Observability
Subscribe to workflow events for monitoring and logging:
use wfaas::{EventSubscriber, WorkflowEvent};
struct MySubscriber;
#[async_trait]
impl EventSubscriber for MySubscriber {
async fn on_event(&self, event: &WorkflowEvent) {
match event {
WorkflowEvent::WorkflowStarted { instance_id, .. } => {
println!("Workflow {} started", instance_id);
}
WorkflowEvent::StepSucceeded { step_id, duration, .. } => {
println!("Step {} completed in {:?}", step_id, duration);
}
WorkflowEvent::WorkflowFailed { instance_id, error, .. } => {
eprintln!("Workflow {} failed: {}", instance_id, error);
}
_ => {}
}
}
}
engine.event_bus().subscribe(Arc::new(MySubscriber)).await;
Available events:
WorkflowStarted, WorkflowCompleted, WorkflowFailed, WorkflowCancelled
StepStarted, StepSucceeded, StepFailed, StepRetrying
A built-in LoggingSubscriber integrates with the tracing crate.
State Persistence
Implement custom state stores for durability:
use wfaas::{StateStore, WorkflowState};
#[async_trait]
impl<D: WorkflowData> StateStore<D> for MyDatabaseStore {
async fn save(&self, state: WorkflowState<D>) -> WorkflowResult<()> { }
async fn load(&self, instance_id: WorkflowInstanceId) -> WorkflowResult<WorkflowState<D>> { }
async fn update<F>(&self, instance_id: WorkflowInstanceId, f: F) -> WorkflowResult<()>
where F: FnOnce(&mut WorkflowState<D>) + Send { }
async fn delete(&self, instance_id: WorkflowInstanceId) -> WorkflowResult<()> { }
async fn list_active(&self) -> WorkflowResult<Vec<WorkflowState<D>>> { }
async fn list_all(&self) -> WorkflowResult<Vec<WorkflowState<D>>> { }
async fn is_cancelled(&self, instance_id: WorkflowInstanceId) -> WorkflowResult<bool> { }
async fn cleanup_old_workflows(&self, ttl: Duration) -> usize { }
async fn get_context(&self, instance_id: WorkflowInstanceId) -> WorkflowResult<WorkflowContext<D>> { }
async fn cleanup_if_terminal(&self, instance_id: WorkflowInstanceId) -> bool { }
}
let engine = WorkflowEngine::with_store(MyDatabaseStore::new());
The default InMemoryStore is thread-safe and suitable for development or ephemeral workflows.
Workflow Management
Cancellation
engine.cancel_workflow(instance_id).await?;
Status Queries
let state = engine.get_status(instance_id).await?;
println!("Status: {:?}", state.status);
println!("Step states: {:?}", state.step_states);
Graceful Shutdown
engine.shutdown();
if !engine.wait_for_shutdown(Duration::from_secs(30)).await {
let cancelled = engine.force_cancel_all().await;
println!("Force cancelled {} workflows", cancelled);
}
Automatic Cleanup
let cleanup_handle = engine.start_cleanup_task(
Some(Duration::from_secs(24 * 60 * 60)), Some(Duration::from_secs(5 * 60)), ).await;
Error Handling
use wfaas::WorkflowError;
match engine.start_workflow(workflow_id, data).await {
Ok(instance_id) => println!("Started: {}", instance_id),
Err(WorkflowError::DefinitionNotFound(id)) => eprintln!("Unknown workflow: {}", id),
Err(WorkflowError::ShuttingDown) => eprintln!("Engine is shutting down"),
Err(e) => eprintln!("Error: {}", e),
}
Error types:
NotFound - Workflow instance not found
DefinitionNotFound - Workflow definition not registered
StepFailed - Step execution failed
StepTimeout - Step exceeded timeout
Cancelled - Workflow was cancelled
InvalidStateTransition - Invalid status change
ContextValueNotFound - Missing context value
TypeMismatch - Type conversion error
ShuttingDown - Engine is shutting down
License
Apache-2.0