Expand description
§Treadle
A persistent, resumable, human-in-the-loop workflow engine backed by a petgraph DAG.
Treadle fills the gap between single-shot DAG executors (like dagrs) and heavyweight distributed workflow engines (like Restate or Temporal). It is designed for local, single-process pipelines where:
- Work items progress through a DAG of stages over time
- Each item’s state is tracked persistently (survives restarts)
- Stages can pause for human review and resume later
- Fan-out stages (e.g., enriching from multiple sources) track each subtask independently with per-subtask retry
- The full pipeline is inspectable at any moment
§Quick Start
Define your work item and stages:
use treadle::{Stage, StageOutcome, StageContext, Result, WorkItem};
use async_trait::async_trait;
// Your work item
#[derive(Debug, Clone)]
struct Document {
id: String,
content: String,
}
impl WorkItem for Document {
fn id(&self) -> &str {
&self.id
}
}
// A processing stage
#[derive(Debug)]
struct ParseStage;
#[async_trait]
impl Stage for ParseStage {
fn name(&self) -> &str {
"parse"
}
async fn execute(&self, item: &dyn WorkItem, ctx: &mut StageContext) -> Result<StageOutcome> {
println!("Parsing document");
Ok(StageOutcome::Complete)
}
}Build and run the workflow:
use treadle::{Workflow, MemoryStateStore};
let workflow = Workflow::builder()
.stage("parse", ParseStage)
.stage("process", ProcessStage)
.dependency("process", "parse")
.build()?;
let mut store = MemoryStateStore::new();
let doc = Document { id: "doc-1".into(), content: "...".into() };
workflow.advance(&doc, &mut store).await?;
if workflow.is_complete(doc.id(), &store).await? {
println!("Done!");
}§Human-in-the-Loop
Stages can pause for human review:
#[async_trait]
impl Stage for ReviewStage {
fn name(&self) -> &str {
"review"
}
async fn execute(&self, item: &dyn WorkItem, ctx: &mut StageContext) -> Result<StageOutcome> {
// Stage will pause here, waiting for human approval
Ok(StageOutcome::NeedsReview)
}
}Later, approve or reject:
// Approve and continue
workflow.approve_review(doc.id(), "review", &mut store).await?;
workflow.advance(&doc, &mut store).await?; // Continues to next stage
// Or reject with a reason
workflow.reject_review(doc.id(), "review", "Quality issues", &mut store).await?;§Fan-Out Execution
Stages can spawn parallel subtasks:
#[async_trait]
impl Stage for EnrichStage {
fn name(&self) -> &str {
"enrich"
}
async fn execute(&self, item: &dyn WorkItem, ctx: &mut StageContext) -> Result<StageOutcome> {
if ctx.subtask_name.is_some() {
// Handle individual subtask execution
let source = ctx.subtask_name.as_ref().unwrap();
println!("Fetching data from {}", source);
Ok(StageOutcome::Complete)
} else {
// Declare subtasks on first invocation
Ok(StageOutcome::FanOut(vec![
SubTask::new("source-a".to_string()),
SubTask::new("source-b".to_string()),
SubTask::new("source-c".to_string()),
]))
}
}
}The workflow will execute all subtasks and track their progress independently.
§State Persistence
Use MemoryStateStore for testing or SqliteStateStore for production:
// In-memory (testing)
let mut store = MemoryStateStore::new();
// SQLite (production) - survives process restarts
let mut store = SqliteStateStore::open("workflow.db").await?;State is automatically saved after each stage execution. You can restart your process and resume from where you left off:
// After restart, load the workflow and continue
let workflow = Workflow::builder()
.stage("parse", ParseStage)
.stage("process", ProcessStage)
.build()?;
let mut store = SqliteStateStore::open("workflow.db").await?;
// Continue processing - skips already-completed stages
workflow.advance(&doc, &mut store).await?;§Event Observation
Subscribe to workflow events for logging, monitoring, or building UIs:
let mut events = workflow.subscribe();
tokio::spawn(async move {
while let Ok(event) = events.recv().await {
match event {
WorkflowEvent::StageStarted { item_id, stage } => {
println!("Stage {} started for {}", stage, item_id);
}
WorkflowEvent::StageCompleted { item_id, stage } => {
println!("Stage {} completed for {}", stage, item_id);
}
WorkflowEvent::ReviewRequired { item_id, stage, .. } => {
println!("Review needed for {} at stage {}", item_id, stage);
}
WorkflowEvent::WorkflowCompleted { item_id } => {
println!("Workflow completed for {}", item_id);
}
_ => {}
}
}
});§Pipeline Status
Inspect the current state of a workflow at any time:
let status = workflow.status(doc.id(), &store).await?;
println!("Progress: {:.0}%", status.progress_percent());
if status.has_pending_reviews() {
for stage in status.review_stages() {
println!("Review needed at stage: {}", stage);
}
}
// Pretty-print the entire pipeline status
println!("{}", status);§Feature Flags
sqlite(default): EnablesSqliteStateStorefor persistent storage
§Design Philosophy
The name comes from the treadle — the foot-operated lever that drives a loom, spinning wheel, or lathe. The machine has stages and mechanisms, but without the human pressing the treadle, nothing moves. This captures the core design: a pipeline engine where human judgment gates the flow.
Re-exports§
pub use error::Result;pub use error::TreadleError;pub use event::WorkflowEvent;pub use stage::ReviewData;pub use stage::Stage;pub use stage::StageContext;pub use stage::StageOutcome;pub use stage::StageState;pub use stage::StageStatus;pub use stage::SubTask;pub use state_store::MemoryStateStore;pub use state_store::StateStore;pub use status::PipelineStatus;pub use status::StageStatusEntry;pub use state_store::SqliteStateStore;pub use work_item::WorkItem;pub use workflow::Workflow;pub use workflow::WorkflowBuilder;
Modules§
- error
- Error types for Treadle workflow engine.
- event
- Workflow execution events.
- stage
- Stage types for the Treadle workflow engine.
- state_
store - State storage for the Treadle workflow engine.
- status
- Pipeline status reporting and visualization.
- work_
item - Work item trait for the Treadle workflow engine.
- workflow
- Workflow definition and DAG management.
Functions§
- version
- Treadle is under active development. See the README for the design and roadmap.