pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# pipeflow

[![Crates.io](https://img.shields.io/crates/v/pipeflow.svg)](https://crates.io/crates/pipeflow)
[![Documentation](https://docs.rs/pipeflow/badge.svg)](https://docs.rs/pipeflow)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

[中文文档](README_CN.md)

A lightweight, configuration-driven data pipeline framework for Rust.

```text
Source → Transform → Sink
```

## Features

- **YAML Configuration**: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs/outputs, cycles)
- **Fan-out**: One source can broadcast to multiple sinks
- **Configurable Broadcast Capacity**: Tune `output_buffer_size` (source/transform broadcast)
- **Built-in Nodes**:
  - Sources: `http_client`, `http_server`, `file`, `redis`, `sql`
  - Sinks: `console`, `file`, `blackhole`, `http_client`, `redis`, `sql`, `notify`
- **CLI**: `run`, `config validate`, `config show`, `config graph`

## Feature Flags

Pipeflow uses Cargo features to keep optional dependencies behind flags.

- `http-client` (default): Enables the `http_client` source and sink.
- `http-server`: Enables the `http_server` source.
- `database`: Enables `sql` source and sink.
- `redis`: Enables the `redis` source and sink.
- `file` (default): Enables the `file` source and sink.
- `notify` (default): Enables the `notify` sink.

Core-only build (no optional sources/sinks):

```bash
cargo build --no-default-features
```

If a pipeline config references a node behind a disabled feature, `Engine::build()` returns a
configuration error explaining which feature is required.

## Quick Start

### Requirements

- Rust 1.92 or later (uses Rust 2024 edition)

### Installation

```bash
cargo add pipeflow
```

### Configuration

Create a pipeline configuration file `pipeline.yaml`:

```yaml
system:
  # output_buffer_size: broadcast channel capacity for sources/transforms (default: 1024)
  output_buffer_size: 1024

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"
        # schedule: "0 0 * * *" # Run daily at 00:00 (local time, 5 fields; seconds default to 0)

  transforms:
    - id: pass_through
      inputs: [api_poller]
      outputs: [console]

  sinks:
    - id: console
      type: console
      config:
        format: pretty
```

#### Wiring Nodes

Pipeflow wiring is `source -> transform -> sink`:

- Transforms declare `inputs` (one or more sources or transforms).
- Transforms declare `outputs` (one or more sinks or transforms).
- Transform-to-transform wiring can be declared on either side; the engine infers the missing side.
- Transforms may omit `steps` to act as pass-through nodes.
- Sources do not declare `inputs` or `outputs`.
- Sinks do not declare `inputs`; their target is defined by sink type/config (e.g. file path).

#### Loading from a directory

All commands that accept `CONFIG` also accept a **directory**. When a directory is provided,
pipeflow loads all `*.yaml` / `*.yml` files in **lexical order** and merges them into a single
configuration before normalization and validation.

This is useful for larger pipelines:

```bash
# Directory-based config
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yaml
```

### Run (Programmatic)

```rust
use pipeflow::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let config = Config::from_file("pipeline.yaml")?;
    let mut engine = Engine::from_config(config)?;
    engine.build().await?;
    engine.run().await
}
```

## Node Types

### Sources

| Type          | Description       | Status      |
| ------------- | ----------------- | ----------- |
| `http_client` | HTTP polling      | Implemented |
| `http_server` | HTTP push/webhook | Implemented |
| `redis`       | Redis GET polling | Implemented |
| `sql`         | SQL polling       | Implemented |
| `file`        | File watching     | Implemented |

**System Sources** (implicit, fixed IDs):

- `source::system::dlq` - Dead Letter Queue
- `source::system::event` - System events
- `source::system::notify` - Notifications

System sources are always available and emit into their corresponding system sinks by default.
You can consume a system source in your pipeline by referencing it in a transform `inputs`.

### Transforms

| Type      | Description            | I/O   | Status      |
| --------- | ---------------------- | ----- | ----------- |
| `remap`   | Field mapping (step)   | 1:1   | Implemented |
| `filter`  | Conditional filtering  | 1:0/1 | Implemented |
| `window`  | Time/count aggregation | n:1   | Implemented |
| `compute` | Math expression eval   | 1:1   | Implemented |
| `hash`    | Generate hash IDs      | 1:1   | Implemented |

### Sinks

| Type          | Description            | Status      |
| ------------- | ---------------------- | ----------- |
| `blackhole`   | Discard messages       | Implemented |
| `console`     | Print to stdout        | Implemented |
| `file`        | Write to file          | Implemented |
| `sql`         | SQL Database insert    | Implemented |
| `redis`       | Redis operations       | Implemented |
| `notify`      | Email/Telegram/Webhook | Implemented |
| `http_client` | HTTP API calls         | Implemented |

**System Sinks** (implicit, fixed IDs):

- `sink::system::dlq` - Default DLQ output (`data/system_dlq.jsonl`)
- `sink::system::event` - Default event output (`data/system_event.jsonl`)
- `sink::system::notify` - Default notify output (`data/system_notify.jsonl`)

You can override the base directory with `PIPEFLOW_SYSTEM_SINK_DIR` or configure paths in `system.sinks`.

```yaml
system:
  sinks:
    dir: ./data
    dlq: ./data/custom_dlq.jsonl
    event: ./data/custom_event.jsonl
    notify: ./data/custom_notify.jsonl
```

### Configuration Reference

See [docs/CONFIGURATION.md](docs/CONFIGURATION.md) for detailed configuration parameters for all supported sources and sinks.

## Broadcast Buffer Configuration

Pipeflow uses `tokio::sync::broadcast` channels to connect nodes that can emit messages
(sources/transforms). You can tune the broadcast capacity via `output_buffer_size`.

```yaml
system:
  output_buffer_size: 1024 # broadcast channel capacity for sources/transforms
```

Notes:

- Sources can override `output_buffer_size` per source.
- If a sink/transform lags behind the broadcast buffer, it may drop messages and log `Lagged`.

## Dead Letter Queue

The `source::system::dlq` source is implemented and can be wired to any sink. Chain-depth protection
prevents infinite loops when messages are routed through system sources (max depth: 8).

**Current status:**

- `source::system::dlq` source: Implemented
- Chain-depth protection: Implemented
- Automatic DLQ routing on transform/sink errors: Implemented

See `docs/DESIGN.md` for the full design.

## CLI Commands

```bash
# Run pipeline
pipeflow run config.yaml

# Validate configuration
pipeflow config validate config.yaml

# Show pipeline graph (ASCII)
pipeflow config graph config.yaml

# Show merged + normalized configuration
pipeflow config show config.yaml --format yaml
```

Notes:

- `pipeflow config validate` checks YAML structure and pipeline wiring (IDs, references, cycles,
  system routing). It does **not** validate node-specific `config` contents (e.g. required
  `http_client.url`); those are validated during `Engine::build()` (and therefore `pipeflow run`).
- If you use directory-based configs, `config show` displays the merged + normalized result.

## Distributed & High Availability

**Pipeflow is stand-alone by design.**

To keep the architecture simple and robust (KISS principle), Pipeflow does not implement
complex distributed coordination protocols (like Raft or Paxos).

- **Persistence**: State (like silence records) is stored on the local filesystem (`./data` by default).
  We have removed complex distributed backends like Redis for silence to favor simplicity and filesystem atomicity.
- **Scaling**: We recommend **Manual Sharding**. Deploy multiple independent instances,
  each handling a different subset of configuration files.
- **High Availability**: Use detailed health checks (e.g., K8s liveness probes) to restart failed instances.
  If you need shared state across instances (e.g., shared silence), mount a shared volume (NFS/EFS) to the `data_dir`.

## Documentation

See [docs/DESIGN.md](docs/DESIGN.md) for detailed design documentation.

## Testing

```bash
# Unit + integration tests
cargo test --all-features

# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings

# Format check
cargo fmt --all -- --check
```

## License

MIT