use async_trait::async_trait;
use treadle::{
MemoryStateStore, Result, Stage, StageContext, StageOutcome, WorkItem, Workflow,
WorkflowEvent,
};
#[derive(Debug, Clone)]
struct Document {
id: String,
content: String,
}
impl Document {
fn new(id: &str, content: &str) -> Self {
Self {
id: id.to_string(),
content: content.to_string(),
}
}
}
impl WorkItem for Document {
fn id(&self) -> &str {
&self.id
}
}
#[derive(Debug)]
struct ScanStage;
#[async_trait]
impl Stage for ScanStage {
fn name(&self) -> &str {
"scan"
}
async fn execute(
&self,
item: &dyn WorkItem,
_ctx: &mut StageContext,
) -> Result<StageOutcome> {
let item_id = item.id().to_string();
println!(" π Scanning document '{}'", item_id);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!(" β Document structure identified");
Ok(StageOutcome::Complete)
}
}
#[derive(Debug)]
struct ExtractStage;
#[async_trait]
impl Stage for ExtractStage {
fn name(&self) -> &str {
"extract"
}
async fn execute(
&self,
item: &dyn WorkItem,
_ctx: &mut StageContext,
) -> Result<StageOutcome> {
let item_id = item.id().to_string();
println!(" π Extracting entities from '{}'", item_id);
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
println!(" β Found: 2 people, 1 organization");
Ok(StageOutcome::Complete)
}
}
#[derive(Debug)]
struct ReviewStage;
#[async_trait]
impl Stage for ReviewStage {
fn name(&self) -> &str {
"review"
}
async fn execute(
&self,
item: &dyn WorkItem,
_ctx: &mut StageContext,
) -> Result<StageOutcome> {
let item_id = item.id().to_string();
println!(" π Review requested for '{}'", item_id);
println!(" Entities to review:");
println!(" - Person: John Doe");
println!(" - Org: Acme Inc");
Ok(StageOutcome::NeedsReview)
}
}
#[derive(Debug)]
struct ExportStage;
#[async_trait]
impl Stage for ExportStage {
fn name(&self) -> &str {
"export"
}
async fn execute(
&self,
item: &dyn WorkItem,
_ctx: &mut StageContext,
) -> Result<StageOutcome> {
let item_id = item.id().to_string();
println!(" πΎ Exporting document '{}'", item_id);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!(" β Exported to output.json");
Ok(StageOutcome::Complete)
}
}
#[tokio::main]
async fn main() -> Result<()> {
println!("βββββββββββββββββββββββββββββββββββββββββββββ");
println!("β Treadle Basic Pipeline Example β");
println!("βββββββββββββββββββββββββββββββββββββββββββββ\n");
println!("π Building workflow...");
let workflow = Workflow::builder()
.stage("scan", ScanStage)
.stage("extract", ExtractStage)
.stage("review", ReviewStage)
.stage("export", ExportStage)
.dependency("extract", "scan")
.dependency("review", "extract")
.dependency("export", "review")
.build()?;
println!(" Stages: {:?}\n", workflow.stages());
let mut event_receiver = workflow.subscribe();
tokio::spawn(async move {
while let Ok(event) = event_receiver.recv().await {
match event {
WorkflowEvent::StageStarted { stage, .. } => {
println!(" [Event] Stage '{}' started", stage);
}
WorkflowEvent::StageCompleted { stage, .. } => {
println!(" [Event] Stage '{}' completed", stage);
}
WorkflowEvent::ReviewRequired { stage, .. } => {
println!(" [Event] Review required at stage '{}'", stage);
}
WorkflowEvent::WorkflowCompleted { item_id } => {
println!(" [Event] Workflow completed for '{}'", item_id);
}
_ => {}
}
}
});
let mut store = MemoryStateStore::new();
let doc = Document::new(
"doc-001",
"John Doe works at Acme Inc. He manages the engineering team.",
);
println!("π Processing document: {}", doc.id);
println!(" Content: \"{}\"\n", doc.content);
println!("βΆοΈ First Advance (automatic stages)");
println!("βββββββββββββββββββββββββββββββββββββββββ");
workflow.advance(&doc, &mut store).await?;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!("\nπ Status after first advance:");
println!("βββββββββββββββββββββββββββββββββββββββββ");
let status = workflow.status(doc.id(), &store).await?;
println!("{}", status);
if status.has_pending_reviews() {
println!("βΈοΈ Workflow paused - awaiting human review\n");
println!("π€ Human reviewer checking entities...");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
println!(" Decision: APPROVED β\n");
println!("β
Approving review stage");
println!("βββββββββββββββββββββββββββββββββββββββββ");
workflow
.approve_review(doc.id(), "review", &mut store)
.await?;
}
println!("\nβΆοΈ Second Advance (remaining stages)");
println!("βββββββββββββββββββββββββββββββββββββββββ");
workflow.advance(&doc, &mut store).await?;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!("\nπ Final Status:");
println!("βββββββββββββββββββββββββββββββββββββββββ");
let final_status = workflow.status(doc.id(), &store).await?;
println!("{}", final_status);
if workflow.is_complete(doc.id(), &store).await? {
println!("\nπ Success! Document processing complete!\n");
}
Ok(())
}