1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
//! # Treadle
//!
//! A persistent, resumable, human-in-the-loop workflow engine backed by a
//! [petgraph](https://docs.rs/petgraph) DAG.
//!
//! Treadle fills the gap between single-shot DAG executors (like
//! [dagrs](https://crates.io/crates/dagrs)) and heavyweight distributed
//! workflow engines (like [Restate](https://restate.dev) or
//! [Temporal](https://temporal.io)). It is designed for **local,
//! single-process pipelines** where:
//!
//! - Work items progress through a DAG of stages over time
//! - Each item's state is tracked persistently (survives restarts)
//! - Stages can pause for human review and resume later
//! - Fan-out stages (e.g., enriching from multiple sources) track each
//! subtask independently with per-subtask retry
//! - The full pipeline is inspectable at any moment
//!
//! ## Quick Start
//!
//! Define your work item and stages:
//!
//! ```rust,ignore
//! use treadle::{Stage, StageOutcome, StageContext, Result, WorkItem};
//! use async_trait::async_trait;
//!
//! // Your work item
//! #[derive(Debug, Clone)]
//! struct Document {
//! id: String,
//! content: String,
//! }
//!
//! impl WorkItem for Document {
//! fn id(&self) -> &str {
//! &self.id
//! }
//! }
//!
//! // A processing stage
//! #[derive(Debug)]
//! struct ParseStage;
//!
//! #[async_trait]
//! impl Stage for ParseStage {
//! fn name(&self) -> &str {
//! "parse"
//! }
//!
//! async fn execute(&self, item: &dyn WorkItem, ctx: &mut StageContext) -> Result<StageOutcome> {
//! println!("Parsing document");
//! Ok(StageOutcome::Complete)
//! }
//! }
//! ```
//!
//! Build and run the workflow:
//!
//! ```rust,ignore
//! use treadle::{Workflow, MemoryStateStore};
//!
//! # async fn example() -> treadle::Result<()> {
//! let workflow = Workflow::builder()
//! .stage("parse", ParseStage)
//! .stage("process", ProcessStage)
//! .dependency("process", "parse")
//! .build()?;
//!
//! let mut store = MemoryStateStore::new();
//! let doc = Document { id: "doc-1".into(), content: "...".into() };
//!
//! workflow.advance(&doc, &mut store).await?;
//!
//! if workflow.is_complete(doc.id(), &store).await? {
//! println!("Done!");
//! }
//! # Ok(())
//! # }
//! ```
//!
//! ## Human-in-the-Loop
//!
//! Stages can pause for human review:
//!
//! ```rust,ignore
//! #[async_trait]
//! impl Stage for ReviewStage {
//! fn name(&self) -> &str {
//! "review"
//! }
//!
//! async fn execute(&self, item: &dyn WorkItem, ctx: &mut StageContext) -> Result<StageOutcome> {
//! // Stage will pause here, waiting for human approval
//! Ok(StageOutcome::NeedsReview)
//! }
//! }
//! ```
//!
//! Later, approve or reject:
//!
//! ```rust,ignore
//! // Approve and continue
//! workflow.approve_review(doc.id(), "review", &mut store).await?;
//! workflow.advance(&doc, &mut store).await?; // Continues to next stage
//!
//! // Or reject with a reason
//! workflow.reject_review(doc.id(), "review", "Quality issues", &mut store).await?;
//! ```
//!
//! ## Fan-Out Execution
//!
//! Stages can spawn parallel subtasks:
//!
//! ```rust,ignore
//! #[async_trait]
//! impl Stage for EnrichStage {
//! fn name(&self) -> &str {
//! "enrich"
//! }
//!
//! async fn execute(&self, item: &dyn WorkItem, ctx: &mut StageContext) -> Result<StageOutcome> {
//! if ctx.subtask_name.is_some() {
//! // Handle individual subtask execution
//! let source = ctx.subtask_name.as_ref().unwrap();
//! println!("Fetching data from {}", source);
//! Ok(StageOutcome::Complete)
//! } else {
//! // Declare subtasks on first invocation
//! Ok(StageOutcome::FanOut(vec![
//! SubTask::new("source-a".to_string()),
//! SubTask::new("source-b".to_string()),
//! SubTask::new("source-c".to_string()),
//! ]))
//! }
//! }
//! }
//! ```
//!
//! The workflow will execute all subtasks and track their progress independently.
//!
//! ## State Persistence
//!
//! Use [`MemoryStateStore`] for testing or [`SqliteStateStore`] for production:
//!
//! ```rust,ignore
//! // In-memory (testing)
//! let mut store = MemoryStateStore::new();
//!
//! // SQLite (production) - survives process restarts
//! # #[cfg(feature = "sqlite")]
//! let mut store = SqliteStateStore::open("workflow.db").await?;
//! ```
//!
//! State is automatically saved after each stage execution. You can restart
//! your process and resume from where you left off:
//!
//! ```rust,ignore
//! // After restart, load the workflow and continue
//! let workflow = Workflow::builder()
//! .stage("parse", ParseStage)
//! .stage("process", ProcessStage)
//! .build()?;
//!
//! # #[cfg(feature = "sqlite")]
//! let mut store = SqliteStateStore::open("workflow.db").await?;
//!
//! // Continue processing - skips already-completed stages
//! workflow.advance(&doc, &mut store).await?;
//! ```
//!
//! ## Event Observation
//!
//! Subscribe to workflow events for logging, monitoring, or building UIs:
//!
//! ```rust,ignore
//! let mut events = workflow.subscribe();
//!
//! tokio::spawn(async move {
//! while let Ok(event) = events.recv().await {
//! match event {
//! WorkflowEvent::StageStarted { item_id, stage } => {
//! println!("Stage {} started for {}", stage, item_id);
//! }
//! WorkflowEvent::StageCompleted { item_id, stage } => {
//! println!("Stage {} completed for {}", stage, item_id);
//! }
//! WorkflowEvent::ReviewRequired { item_id, stage, .. } => {
//! println!("Review needed for {} at stage {}", item_id, stage);
//! }
//! WorkflowEvent::WorkflowCompleted { item_id } => {
//! println!("Workflow completed for {}", item_id);
//! }
//! _ => {}
//! }
//! }
//! });
//! ```
//!
//! ## Pipeline Status
//!
//! Inspect the current state of a workflow at any time:
//!
//! ```rust,ignore
//! let status = workflow.status(doc.id(), &store).await?;
//!
//! println!("Progress: {:.0}%", status.progress_percent());
//!
//! if status.has_pending_reviews() {
//! for stage in status.review_stages() {
//! println!("Review needed at stage: {}", stage);
//! }
//! }
//!
//! // Pretty-print the entire pipeline status
//! println!("{}", status);
//! ```
//!
//! ## Feature Flags
//!
//! - `sqlite` (default): Enables [`SqliteStateStore`] for persistent storage
//!
//! ## Design Philosophy
//!
//! The name comes from the **treadle** — the foot-operated lever that drives
//! a loom, spinning wheel, or lathe. The machine has stages and mechanisms,
//! but without the human pressing the treadle, nothing moves. This captures
//! the core design: a pipeline engine where human judgment gates the flow.
pub use ;
pub use WorkflowEvent;
pub use ;
pub use ;
pub use ;
pub use SqliteStateStore;
pub use WorkItem;
pub use ;
/// Treadle is under active development. See the README for the design
/// and roadmap.