# pipeflow
[](https://crates.io/crates/pipeflow)
[](https://docs.rs/pipeflow)
[](https://opensource.org/licenses/MIT)
A lightweight, configuration-driven data pipeline framework for Rust.
```text
Source → Transform → Sink
```
## Features
- **YAML Configuration**: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs, cycles)
- **Fan-out**: One source can broadcast to multiple sinks
- **Configurable Channel Capacity**: Tune `output_buffer_size` (source broadcast) and `input_buffer_size` (sink fan-in)
- **Built-in Nodes**:
- Sources: `http_client`
- Sinks: `console`, `file`, `blackhole`
- **CLI**: `run`, `config validate`, `config show`
## Quick Start
### Installation
```bash
cargo add pipeflow
```
### Configuration
Create a pipeline configuration file `pipeline.yaml`:
```yaml
global:
# output_buffer_size: broadcast channel capacity for sources (default: 1024)
output_buffer_size: 1024
# input_buffer_size: default sink input buffer size (default: 1024)
input_buffer_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
sinks:
- id: console
type: console
inputs: [api_poller]
config:
format: pretty
```
### Run (Programmatic)
```rust
use pipeflow::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_file("pipeline.yaml")?;
let engine = Engine::from_config(config)?;
engine.run().await
}
```
## Node Types
### Sources
| `internal` | Built-in internal sources (`internal::dlq/event/alert`) | Planned |
| `http_client` | HTTP polling | Implemented |
| `http_server` | HTTP push/webhook | Planned |
| `database` | Database polling/CDC | Planned |
| `file` | File watching | Planned |
| `websocket` | WebSocket streaming | Planned |
**Internal Sources** (fixed IDs):
- `internal::dlq` - Dead Letter Queue
- `internal::event` - System events
- `internal::alert` - Alert notifications
### Transforms
| `remap` | Field mapping | 1:1 | Planned |
| `filter` | Conditional filtering | 1:0/1 | Planned |
| `aggregate` | Window-based aggregation | n:m | Planned |
### Sinks
| `blackhole` | Discard messages | Implemented |
| `console` | Print to stdout | Implemented |
| `file` | Write to file | Implemented |
| `database` | Database insert | Planned |
| `redis` | Redis operations | Planned |
| `notify` | Email/Telegram/Webhook | Planned |
| `http_api` | HTTP API calls | Planned |
| `internal` | Route to internal source | Planned |
## Buffer Configuration
Pipeflow uses channels to connect nodes:
```yaml
global:
output_buffer_size: 1024 # broadcast channel capacity for sources
input_buffer_size: 1024 # default sink input buffer size
pipeline:
sinks:
- id: reliable_db
type: database
inputs: [transform]
input_buffer_size: 2048 # Override global settings
```
Notes:
- Sources can override `output_buffer_size` per source.
- Sinks can override `input_buffer_size` per sink.
## Dead Letter Queue
Dead Letter Queue (DLQ) routing is planned but not implemented yet.
See `docs/DESIGN.md` for the intended design.
## CLI Commands
```bash
# Run pipeline
pipeflow run config.yaml
# Validate configuration
pipeflow config validate config.yaml
# Show merged configuration
pipeflow config show config.yaml --format yaml
```
## Development Status
| Core Engine | Implemented |
| Config Validation | Implemented |
| HTTP Client Source | Implemented |
| Internal Sources | Planned |
| Console Sink | Implemented |
| Blackhole Sink | Implemented |
| Transforms | Planned |
| Other Nodes | Planned |
| CLI | Implemented |
## Documentation
See [docs/DESIGN.md](docs/DESIGN.md) for detailed design documentation.
## Testing
```bash
# Unit + integration tests
cargo test --all-features
# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings
# Format check
cargo fmt --all -- --check
```
## License
MIT