pipeflow 0.0.3

A lightweight, configuration-driven data pipeline framework
Documentation

pipeflow

Crates.io Documentation License: MIT

A lightweight, configuration-driven data pipeline framework for Rust.

Source → Transform → Sink

Features

  • YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs, cycles)
  • Fan-out: One source can broadcast to multiple sinks
  • Configurable Broadcast Capacity: Tune output_buffer_size (source/transform broadcast)
  • Built-in Nodes:
    • Sources: http_client, internal
    • Sinks: console, file, blackhole, internal
  • CLI: run, config validate, config show

Feature Flags

Pipeflow uses Cargo features to keep optional dependencies behind flags.

  • http-client (default): Enables the http_client source.
  • file (default): Enables the file sink.

Core-only build (no optional sources/sinks):

cargo build --no-default-features

If a pipeline config references a node behind a disabled feature, Engine::build() returns a configuration error explaining which feature is required.

Quick Start

Requirements

  • Rust 1.92 or later (uses Rust 2024 edition)

Installation

cargo add pipeflow

Configuration

Create a pipeline configuration file pipeline.yaml:

global:
  # output_buffer_size: broadcast channel capacity for sources/transforms (default: 1024)
  output_buffer_size: 1024

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"

  sinks:
    - id: console
      type: console
      inputs: [api_poller]
      config:
        format: pretty

Wiring Nodes: inputs vs outputs

Pipeflow supports two equivalent ways to define edges:

  • Pull-style (recommended): declare inputs on transforms/sinks.
  • Push-style: declare outputs on sources/transforms and omit inputs on consumers. Pipeflow will normalize outputs into downstream inputs at load time.

Example (push-style):

pipeline:
  sources:
    - id: api_poller
      type: http_client
      outputs: [console]
      config:
        url: "https://httpbin.org/json"

  sinks:
    - id: console
      type: console
      # inputs omitted: will be filled from outputs during normalization
      config:
        format: pretty

Loading from a directory

All commands that accept CONFIG also accept a directory. When a directory is provided, pipeflow loads all *.yaml / *.yml files in lexical order and merges them into a single configuration before normalization and validation.

This is useful for larger pipelines:

# Directory-based config
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yaml

Run (Programmatic)

use pipeflow::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let config = Config::from_file("pipeline.yaml")?;
    let mut engine = Engine::from_config(config)?;
    engine.build().await?;
    engine.run().await
}

Node Types

Sources

Type Description Status
internal Built-in internal sources (internal::dlq/event/alert) Implemented
http_client HTTP polling Implemented
http_server HTTP push/webhook Planned
database Database polling/CDC Planned
file File watching Planned
websocket WebSocket streaming Planned

Internal Sources (fixed IDs):

  • internal::dlq - Dead Letter Queue
  • internal::event - System events
  • internal::alert - Alert notifications

To route pipeline data into an internal channel, use the internal sink with a target and make sure the corresponding internal source is configured (e.g., internal::event).

Note: An internal sink without a target is treated as a convenience and is normalized to a console sink with format: json.

Transforms

Type Description I/O Status
remap Field mapping (step) 1:1 Implemented
filter Conditional filtering 1:0/1 Implemented
aggregate Window-based aggregation n:m Planned

Sinks

Type Description Status
blackhole Discard messages Implemented
console Print to stdout Implemented
file Write to file Implemented
database Database insert Planned
redis Redis operations Planned
notify Email/Telegram/Webhook Planned
http_api HTTP API calls Planned
internal Route to internal source Implemented

Console Sink Configuration

Parameter Default Description
format pretty Output format: pretty (indented JSON), json (compact), text (payload only)

File Sink Configuration

Parameter Default Description
path Output file path (required)
format jsonl Output format: jsonl, tsv, csv
append true Append to existing file; false to overwrite
include_header false Include header row for TSV/CSV formats

Broadcast Buffer Configuration

Pipeflow uses tokio::sync::broadcast channels to connect nodes that can emit messages (sources/transforms). You can tune the broadcast capacity via output_buffer_size.

global:
  output_buffer_size: 1024 # broadcast channel capacity for sources/transforms

Notes:

  • Sources can override output_buffer_size per source.
  • If a sink/transform lags behind the broadcast buffer, it may drop messages and log Lagged.

Dead Letter Queue

The internal::dlq source is implemented and can be wired to any sink. Chain-depth protection prevents infinite loops when messages are routed through internal sources (max depth: 8).

Current status:

  • internal::dlq source: Implemented
  • Chain-depth protection: Implemented
  • Automatic DLQ routing on transform errors: Planned (errors are currently logged)

See docs/DESIGN.md for the full design.

CLI Commands

# Run pipeline
pipeflow run config.yaml

# Validate configuration
pipeflow config validate config.yaml

# Show pipeline graph (ASCII)
pipeflow config graph config.yaml

# Show merged + normalized configuration
pipeflow config show config.yaml --format yaml

Notes:

  • pipeflow config validate checks YAML structure and pipeline wiring (IDs, references, cycles, internal routing). It does not validate node-specific config contents (e.g. required http_client.url); those are validated during Engine::build() (and therefore pipeflow run).
  • If you use directory-based configs, config show displays the merged + normalized result.

Development Status

Component Status
Core Engine Implemented
Config Validation Implemented
HTTP Client Source Implemented
Internal Sources Implemented
Console Sink Implemented
File Sink Implemented
Blackhole Sink Implemented
Internal Sink Implemented
Transform Steps Implemented
Other Nodes Planned
CLI Implemented

Documentation

See docs/DESIGN.md for detailed design documentation.

Testing

# Unit + integration tests
cargo test --all-features

# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings

# Format check
cargo fmt --all -- --check

License

MIT