pipeflow
A lightweight, configuration-driven data pipeline framework for Rust.
Source → Transform → Sink
Features
- YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs/outputs, cycles)
- Fan-out: One source can broadcast to multiple sinks
- Configurable Broadcast Capacity: Tune
output_buffer_size(source/transform broadcast) - Built-in Nodes:
- Sources:
http_client,http_server,file,redis,sql - Sinks:
console,file,blackhole,http_client,redis,sql,notify
- Sources:
- CLI:
run,config validate,config show,config graph
Feature Flags
Pipeflow uses Cargo features to keep optional dependencies behind flags.
http-client(default): Enables thehttp_clientsource and sink.http-server: Enables thehttp_serversource.database: Enablessqlsource and sink.redis: Enables theredissource and sink.file(default): Enables thefilesource and sink.notify(default): Enables thenotifysink.
Core-only build (no optional sources/sinks):
If a pipeline config references a node behind a disabled feature, Engine::build() returns a
configuration error explaining which feature is required.
Quick Start
Requirements
- Rust 1.92 or later (uses Rust 2024 edition)
Installation
Configuration
Create a pipeline configuration file pipeline.yaml:
system:
# output_buffer_size: broadcast channel capacity for sources/transforms (default: 1024)
output_buffer_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
# schedule: "0 0 * * *" # Run daily at 00:00 (local time, 5 fields; seconds default to 0)
transforms:
- id: pass_through
inputs:
outputs:
sinks:
- id: console
type: console
config:
format: pretty
Wiring Nodes
Pipeflow wiring is source -> transform -> sink:
- Transforms declare
inputs(one or more sources or transforms). - Transforms declare
outputs(one or more sinks or transforms). - Transform-to-transform wiring can be declared on either side; the engine infers the missing side.
- Transforms may omit
stepsto act as pass-through nodes. - Sources do not declare
inputsoroutputs. - Sinks do not declare
inputs; their target is defined by sink type/config (e.g. file path).
Loading from a directory
All commands that accept CONFIG also accept a directory. When a directory is provided,
pipeflow loads all *.yaml / *.yml files in lexical order and merges them into a single
configuration before normalization and validation.
This is useful for larger pipelines:
# Directory-based config
Run (Programmatic)
use *;
async
Node Types
Sources
| Type | Description | Status |
|---|---|---|
http_client |
HTTP polling | Implemented |
http_server |
HTTP push/webhook | Implemented |
redis |
Redis GET polling | Implemented |
sql |
SQL polling | Implemented |
file |
File watching | Implemented |
System Sources (implicit, fixed IDs):
source::system::dlq- Dead Letter Queuesource::system::event- System eventssource::system::notify- Notifications
System sources are always available and emit into their corresponding system sinks by default.
You can consume a system source in your pipeline by referencing it in a transform inputs.
Transforms
| Type | Description | I/O | Status |
|---|---|---|---|
remap |
Field mapping (step) | 1:1 | Implemented |
filter |
Conditional filtering | 1:0/1 | Implemented |
window |
Time/count aggregation | n:1 | Implemented |
compute |
Math expression eval | 1:1 | Implemented |
hash |
Generate hash IDs | 1:1 | Implemented |
Sinks
| Type | Description | Status |
|---|---|---|
blackhole |
Discard messages | Implemented |
console |
Print to stdout | Implemented |
file |
Write to file | Implemented |
sql |
SQL Database insert | Implemented |
redis |
Redis operations | Implemented |
notify |
Email/Telegram/Webhook | Implemented |
http_client |
HTTP API calls | Implemented |
System Sinks (implicit, fixed IDs):
sink::system::dlq- Default DLQ output (data/system_dlq.jsonl)sink::system::event- Default event output (data/system_event.jsonl)sink::system::notify- Default notify output (data/system_notify.jsonl)
You can override the base directory with PIPEFLOW_SYSTEM_SINK_DIR or configure paths in system.sinks.
system:
sinks:
dir: ./data
dlq: ./data/custom_dlq.jsonl
event: ./data/custom_event.jsonl
notify: ./data/custom_notify.jsonl
Configuration Reference
See docs/CONFIGURATION.md for detailed configuration parameters for all supported sources and sinks.
Broadcast Buffer Configuration
Pipeflow uses tokio::sync::broadcast channels to connect nodes that can emit messages
(sources/transforms). You can tune the broadcast capacity via output_buffer_size.
system:
output_buffer_size: 1024 # broadcast channel capacity for sources/transforms
Notes:
- Sources can override
output_buffer_sizeper source. - If a sink/transform lags behind the broadcast buffer, it may drop messages and log
Lagged.
Dead Letter Queue
The source::system::dlq source is implemented and can be wired to any sink. Chain-depth protection
prevents infinite loops when messages are routed through system sources (max depth: 8).
Current status:
source::system::dlqsource: Implemented- Chain-depth protection: Implemented
- Automatic DLQ routing on transform/sink errors: Implemented
See docs/DESIGN.md for the full design.
CLI Commands
# Run pipeline
# Validate configuration
# Show pipeline graph (ASCII)
# Show merged + normalized configuration
Notes:
pipeflow config validatechecks YAML structure and pipeline wiring (IDs, references, cycles, system routing). It does not validate node-specificconfigcontents (e.g. requiredhttp_client.url); those are validated duringEngine::build()(and thereforepipeflow run).- If you use directory-based configs,
config showdisplays the merged + normalized result.
Distributed & High Availability
Pipeflow is stand-alone by design.
To keep the architecture simple and robust (KISS principle), Pipeflow does not implement complex distributed coordination protocols (like Raft or Paxos).
- Persistence: State (like silence records) is stored on the local filesystem (
./databy default). We have removed complex distributed backends like Redis for silence to favor simplicity and filesystem atomicity. - Scaling: We recommend Manual Sharding. Deploy multiple independent instances, each handling a different subset of configuration files.
- High Availability: Use detailed health checks (e.g., K8s liveness probes) to restart failed instances.
If you need shared state across instances (e.g., shared silence), mount a shared volume (NFS/EFS) to the
data_dir.
Documentation
See docs/DESIGN.md for detailed design documentation.
Testing
# Unit + integration tests
# Lint (clippy)
# Format check
License
MIT