pipeflow 0.0.2

A lightweight, configuration-driven data pipeline framework
Documentation

pipeflow

Crates.io Documentation License: MIT

A lightweight, configuration-driven data pipeline framework for Rust.

Source → Transform → Sink

Features

  • YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs, cycles)
  • Fan-out: One source can broadcast to multiple sinks
  • Configurable Channel Capacity: Tune output_buffer_size (source broadcast) and input_buffer_size (sink fan-in)
  • Built-in Nodes:
    • Sources: http_client
    • Sinks: console, file, blackhole
  • CLI: run, config validate, config show

Quick Start

Installation

cargo add pipeflow

Configuration

Create a pipeline configuration file pipeline.yaml:

global:
  # output_buffer_size: broadcast channel capacity for sources (default: 1024)
  output_buffer_size: 1024
  # input_buffer_size: default sink input buffer size (default: 1024)
  input_buffer_size: 1024

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"

  sinks:
    - id: console
      type: console
      inputs: [api_poller]
      config:
        format: pretty

Run (Programmatic)

use pipeflow::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let config = Config::from_file("pipeline.yaml")?;
    let engine = Engine::from_config(config)?;
    engine.run().await
}

Node Types

Sources

Type Description Status
internal Built-in internal sources (internal::dlq/event/alert) Planned
http_client HTTP polling Implemented
http_server HTTP push/webhook Planned
database Database polling/CDC Planned
file File watching Planned
websocket WebSocket streaming Planned

Internal Sources (fixed IDs):

  • internal::dlq - Dead Letter Queue
  • internal::event - System events
  • internal::alert - Alert notifications

Transforms

Type Description I/O Status
remap Field mapping 1:1 Planned
filter Conditional filtering 1:0/1 Planned
aggregate Window-based aggregation n:m Planned

Sinks

Type Description Status
blackhole Discard messages Implemented
console Print to stdout Implemented
file Write to file Implemented
database Database insert Planned
redis Redis operations Planned
notify Email/Telegram/Webhook Planned
http_api HTTP API calls Planned
internal Route to internal source Planned

Buffer Configuration

Pipeflow uses channels to connect nodes:

global:
  output_buffer_size: 1024 # broadcast channel capacity for sources
  input_buffer_size: 1024 # default sink input buffer size

pipeline:
  sinks:
    - id: reliable_db
      type: database
      inputs: [transform]
      input_buffer_size: 2048 # Override global settings

Notes:

  • Sources can override output_buffer_size per source.
  • Sinks can override input_buffer_size per sink.

Dead Letter Queue

Dead Letter Queue (DLQ) routing is planned but not implemented yet. See docs/DESIGN.md for the intended design.

CLI Commands

# Run pipeline
pipeflow run config.yaml

# Validate configuration
pipeflow config validate config.yaml

# Show merged configuration
pipeflow config show config.yaml --format yaml

Development Status

Component Status
Core Engine Implemented
Config Validation Implemented
HTTP Client Source Implemented
Internal Sources Planned
Console Sink Implemented
Blackhole Sink Implemented
Transforms Planned
Other Nodes Planned
CLI Implemented

Documentation

See docs/DESIGN.md for detailed design documentation.

Testing

# Unit + integration tests
cargo test --all-features

# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings

# Format check
cargo fmt --all -- --check

License

MIT