pipeflow
A lightweight, configuration-driven data pipeline framework for Rust.
Source → Transform → Sink
Features
- YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs, cycles)
- Fan-out: One source can broadcast to multiple sinks
- Configurable Broadcast Capacity: Tune
output_buffer_size(source/transform broadcast) - Built-in Nodes:
- Sources:
http_client,internal - Sinks:
console,file,blackhole,internal
- Sources:
- CLI:
run,config validate,config show
Feature Flags
Pipeflow uses Cargo features to keep optional dependencies behind flags.
http-client(default): Enables thehttp_clientsource.file(default): Enables thefilesink.
Core-only build (no optional sources/sinks):
If a pipeline config references a node behind a disabled feature, Engine::build() returns a
configuration error explaining which feature is required.
Quick Start
Requirements
- Rust 1.92 or later (uses Rust 2024 edition)
Installation
Configuration
Create a pipeline configuration file pipeline.yaml:
global:
# output_buffer_size: broadcast channel capacity for sources/transforms (default: 1024)
output_buffer_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
sinks:
- id: console
type: console
inputs:
config:
format: pretty
Wiring Nodes: inputs vs outputs
Pipeflow supports two equivalent ways to define edges:
- Pull-style (recommended): declare
inputson transforms/sinks. - Push-style: declare
outputson sources/transforms and omitinputson consumers. Pipeflow will normalizeoutputsinto downstreaminputsat load time.
Example (push-style):
pipeline:
sources:
- id: api_poller
type: http_client
outputs:
config:
url: "https://httpbin.org/json"
sinks:
- id: console
type: console
# inputs omitted: will be filled from outputs during normalization
config:
format: pretty
Loading from a directory
All commands that accept CONFIG also accept a directory. When a directory is provided,
pipeflow loads all *.yaml / *.yml files in lexical order and merges them into a single
configuration before normalization and validation.
This is useful for larger pipelines:
# Directory-based config
Run (Programmatic)
use *;
async
Node Types
Sources
| Type | Description | Status |
|---|---|---|
internal |
Built-in internal sources (internal::dlq/event/alert) |
Implemented |
http_client |
HTTP polling | Implemented |
http_server |
HTTP push/webhook | Planned |
database |
Database polling/CDC | Planned |
file |
File watching | Planned |
websocket |
WebSocket streaming | Planned |
Internal Sources (fixed IDs):
internal::dlq- Dead Letter Queueinternal::event- System eventsinternal::alert- Alert notifications
To route pipeline data into an internal channel, use the internal sink with a target
and make sure the corresponding internal source is configured (e.g., internal::event).
Note: An internal sink without a target is treated as a convenience and is normalized
to a console sink with format: json.
Transforms
| Type | Description | I/O | Status |
|---|---|---|---|
remap |
Field mapping (step) | 1:1 | Implemented |
filter |
Conditional filtering | 1:0/1 | Implemented |
aggregate |
Window-based aggregation | n:m | Planned |
Sinks
| Type | Description | Status |
|---|---|---|
blackhole |
Discard messages | Implemented |
console |
Print to stdout | Implemented |
file |
Write to file | Implemented |
database |
Database insert | Planned |
redis |
Redis operations | Planned |
notify |
Email/Telegram/Webhook | Planned |
http_api |
HTTP API calls | Planned |
internal |
Route to internal source | Implemented |
Console Sink Configuration
| Parameter | Default | Description |
|---|---|---|
format |
pretty |
Output format: pretty (indented JSON), json (compact), text (payload only) |
File Sink Configuration
| Parameter | Default | Description |
|---|---|---|
path |
— | Output file path (required) |
format |
jsonl |
Output format: jsonl, tsv, csv |
append |
true |
Append to existing file; false to overwrite |
include_header |
false |
Include header row for TSV/CSV formats |
Broadcast Buffer Configuration
Pipeflow uses tokio::sync::broadcast channels to connect nodes that can emit messages
(sources/transforms). You can tune the broadcast capacity via output_buffer_size.
global:
output_buffer_size: 1024 # broadcast channel capacity for sources/transforms
Notes:
- Sources can override
output_buffer_sizeper source. - If a sink/transform lags behind the broadcast buffer, it may drop messages and log
Lagged.
Dead Letter Queue
The internal::dlq source is implemented and can be wired to any sink. Chain-depth protection
prevents infinite loops when messages are routed through internal sources (max depth: 8).
Current status:
internal::dlqsource: Implemented- Chain-depth protection: Implemented
- Automatic DLQ routing on transform errors: Planned (errors are currently logged)
See docs/DESIGN.md for the full design.
CLI Commands
# Run pipeline
# Validate configuration
# Show pipeline graph (ASCII)
# Show merged + normalized configuration
Notes:
pipeflow config validatechecks YAML structure and pipeline wiring (IDs, references, cycles, internal routing). It does not validate node-specificconfigcontents (e.g. requiredhttp_client.url); those are validated duringEngine::build()(and thereforepipeflow run).- If you use directory-based configs,
config showdisplays the merged + normalized result.
Development Status
| Component | Status |
|---|---|
| Core Engine | Implemented |
| Config Validation | Implemented |
| HTTP Client Source | Implemented |
| Internal Sources | Implemented |
| Console Sink | Implemented |
| File Sink | Implemented |
| Blackhole Sink | Implemented |
| Internal Sink | Implemented |
| Transform Steps | Implemented |
| Other Nodes | Planned |
| CLI | Implemented |
Documentation
See docs/DESIGN.md for detailed design documentation.
Testing
# Unit + integration tests
# Lint (clippy)
# Format check
License
MIT