pipeflow
A lightweight, configuration-driven data pipeline framework for Rust.
Source → Transform → Sink
Features
- YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs, cycles)
- Fan-out: One source can broadcast to multiple sinks
- Configurable Channel Capacity: Tune
output_buffer_size(source broadcast) andinput_buffer_size(sink fan-in) - Built-in Nodes:
- Sources:
http_client - Sinks:
console,file,blackhole
- Sources:
- CLI:
run,config validate,config show
Quick Start
Installation
Configuration
Create a pipeline configuration file pipeline.yaml:
global:
# output_buffer_size: broadcast channel capacity for sources (default: 1024)
output_buffer_size: 1024
# input_buffer_size: default sink input buffer size (default: 1024)
input_buffer_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
sinks:
- id: console
type: console
inputs:
config:
format: pretty
Run (Programmatic)
use *;
async
Node Types
Sources
| Type | Description | Status |
|---|---|---|
internal |
Built-in internal sources (internal::dlq/event/alert) |
Planned |
http_client |
HTTP polling | Implemented |
http_server |
HTTP push/webhook | Planned |
database |
Database polling/CDC | Planned |
file |
File watching | Planned |
websocket |
WebSocket streaming | Planned |
Internal Sources (fixed IDs):
internal::dlq- Dead Letter Queueinternal::event- System eventsinternal::alert- Alert notifications
Transforms
| Type | Description | I/O | Status |
|---|---|---|---|
remap |
Field mapping | 1:1 | Planned |
filter |
Conditional filtering | 1:0/1 | Planned |
aggregate |
Window-based aggregation | n:m | Planned |
Sinks
| Type | Description | Status |
|---|---|---|
blackhole |
Discard messages | Implemented |
console |
Print to stdout | Implemented |
file |
Write to file | Implemented |
database |
Database insert | Planned |
redis |
Redis operations | Planned |
notify |
Email/Telegram/Webhook | Planned |
http_api |
HTTP API calls | Planned |
internal |
Route to internal source | Planned |
Buffer Configuration
Pipeflow uses channels to connect nodes:
global:
output_buffer_size: 1024 # broadcast channel capacity for sources
input_buffer_size: 1024 # default sink input buffer size
pipeline:
sinks:
- id: reliable_db
type: database
inputs:
input_buffer_size: 2048 # Override global settings
Notes:
- Sources can override
output_buffer_sizeper source. - Sinks can override
input_buffer_sizeper sink.
Dead Letter Queue
Dead Letter Queue (DLQ) routing is planned but not implemented yet.
See docs/DESIGN.md for the intended design.
CLI Commands
# Run pipeline
# Validate configuration
# Show merged configuration
Development Status
| Component | Status |
|---|---|
| Core Engine | Implemented |
| Config Validation | Implemented |
| HTTP Client Source | Implemented |
| Internal Sources | Planned |
| Console Sink | Implemented |
| Blackhole Sink | Implemented |
| Transforms | Planned |
| Other Nodes | Planned |
| CLI | Implemented |
Documentation
See docs/DESIGN.md for detailed design documentation.
Testing
# Unit + integration tests
# Lint (clippy)
# Format check
License
MIT