Deltaflow
The embeddable workflow engine.
Type-safe, Elixir-inspired pipelines that run in your process. No infrastructure required.
Why Deltaflow?
- Type-safe composition - Compiler enforces step output matches next step's input
- Elixir-inspired - Declarative pipelines via method chaining, not scattered callbacks
- Observable by default - Every run and step recorded for debugging
- Embeddable - A library, not a service. Runs in your process.
Quick Start
use ;
// Define steps implementing the Step trait
;
;
;
// Build a type-safe pipeline
let pipeline = new
.start_with // String -> ParsedData
.then // ParsedData -> ProcessedData
.then // ProcessedData -> Output
.with_retry
.with_recorder
.build;
// Run it
let result = pipeline.run.await?;
Installation
Add to your Cargo.toml:
[]
= "0.1"
For SQLite-backed recording and task queue:
[]
= { = "0.1", = ["sqlite"] }
Core Concepts
Step
The fundamental building block. Each step transforms a typed input to a typed output:
Pipeline
Compose steps with method chaining. The compiler ensures each step's output type matches the next step's input:
let pipeline = new
.start_with // Order -> ValidatedOrder
.then // ValidatedOrder -> PaidOrder
.then // PaidOrder -> CompletedOrder
.with_retry
.with_recorder
.build;
Runner
Background task processing with the sqlite feature. Register pipelines, submit work, process concurrently:
let runner = new
.pipeline
.pipeline
.max_concurrent
.build;
// Submit work
runner.submit.await?;
// Process tasks
runner.run.await;
Periodic Scheduler
For time-based task enqueueing, use PeriodicScheduler:
use ;
use Duration;
let scheduler = new
.job
.run_on_start
.job
.run_on_start
.build;
// Run alongside your pipeline runner
select!
Per-Pipeline Concurrency
For pipelines that need rate limiting (e.g., external API calls):
let runner = new
.pipeline
.pipeline_with_concurrency // Only 1 concurrent
.build;
Pipelines with custom concurrency use their own semaphore instead of the global max_concurrent limit.
Status
Deltaflow is experimental (0.1.x). The API will evolve based on feedback.
What works:
- Pipeline composition with type-safe step chaining
- Retry policies (exponential backoff, fixed delay)
- SQLite recording for observability
- Task runner with concurrent execution
- Follow-up task spawning
What's coming:
- Per-step retry policies
- Task priorities
- More storage backends
Not planned (by design):
- Distributed execution (single-process by design)
- DAG dependencies (pipelines are linear)
Feedback welcome: GitHub Issues
License
MIT