pipeflow 0.0.2

A lightweight, configuration-driven data pipeline framework
Documentation
# pipeflow

[![Crates.io](https://img.shields.io/crates/v/pipeflow.svg)](https://crates.io/crates/pipeflow)
[![Documentation](https://docs.rs/pipeflow/badge.svg)](https://docs.rs/pipeflow)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A lightweight, configuration-driven data pipeline framework for Rust.

```text
Source → Transform → Sink
```

## Features

- **YAML Configuration**: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs, cycles)
- **Fan-out**: One source can broadcast to multiple sinks
- **Configurable Channel Capacity**: Tune `output_buffer_size` (source broadcast) and `input_buffer_size` (sink fan-in)
- **Built-in Nodes**:
  - Sources: `http_client`
  - Sinks: `console`, `file`, `blackhole`
- **CLI**: `run`, `config validate`, `config show`

## Quick Start

### Installation

```bash
cargo add pipeflow
```

### Configuration

Create a pipeline configuration file `pipeline.yaml`:

```yaml
global:
  # output_buffer_size: broadcast channel capacity for sources (default: 1024)
  output_buffer_size: 1024
  # input_buffer_size: default sink input buffer size (default: 1024)
  input_buffer_size: 1024

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"

  sinks:
    - id: console
      type: console
      inputs: [api_poller]
      config:
        format: pretty
```

### Run (Programmatic)

```rust
use pipeflow::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let config = Config::from_file("pipeline.yaml")?;
    let engine = Engine::from_config(config)?;
    engine.run().await
}
```

## Node Types

### Sources

| Type          | Description                                             | Status      |
| ------------- | ------------------------------------------------------- | ----------- |
| `internal`    | Built-in internal sources (`internal::dlq/event/alert`) | Planned     |
| `http_client` | HTTP polling                                            | Implemented |
| `http_server` | HTTP push/webhook                                       | Planned     |
| `database`    | Database polling/CDC                                    | Planned     |
| `file`        | File watching                                           | Planned     |
| `websocket`   | WebSocket streaming                                     | Planned     |

**Internal Sources** (fixed IDs):

- `internal::dlq` - Dead Letter Queue
- `internal::event` - System events
- `internal::alert` - Alert notifications

### Transforms

| Type        | Description              | I/O   | Status  |
| ----------- | ------------------------ | ----- | ------- |
| `remap`     | Field mapping            | 1:1   | Planned |
| `filter`    | Conditional filtering    | 1:0/1 | Planned |
| `aggregate` | Window-based aggregation | n:m   | Planned |

### Sinks

| Type        | Description              | Status      |
| ----------- | ------------------------ | ----------- |
| `blackhole` | Discard messages         | Implemented |
| `console`   | Print to stdout          | Implemented |
| `file`      | Write to file            | Implemented |
| `database`  | Database insert          | Planned     |
| `redis`     | Redis operations         | Planned     |
| `notify`    | Email/Telegram/Webhook   | Planned     |
| `http_api`  | HTTP API calls           | Planned     |
| `internal`  | Route to internal source | Planned     |

## Buffer Configuration

Pipeflow uses channels to connect nodes:

```yaml
global:
  output_buffer_size: 1024 # broadcast channel capacity for sources
  input_buffer_size: 1024 # default sink input buffer size

pipeline:
  sinks:
    - id: reliable_db
      type: database
      inputs: [transform]
      input_buffer_size: 2048 # Override global settings
```

Notes:

- Sources can override `output_buffer_size` per source.
- Sinks can override `input_buffer_size` per sink.

## Dead Letter Queue

Dead Letter Queue (DLQ) routing is planned but not implemented yet.
See `docs/DESIGN.md` for the intended design.

## CLI Commands

```bash
# Run pipeline
pipeflow run config.yaml

# Validate configuration
pipeflow config validate config.yaml

# Show merged configuration
pipeflow config show config.yaml --format yaml
```

## Development Status

| Component          | Status      |
| ------------------ | ----------- |
| Core Engine        | Implemented |
| Config Validation  | Implemented |
| HTTP Client Source | Implemented |
| Internal Sources   | Planned     |
| Console Sink       | Implemented |
| Blackhole Sink     | Implemented |
| Transforms         | Planned     |
| Other Nodes        | Planned     |
| CLI                | Implemented |

## Documentation

See [docs/DESIGN.md](docs/DESIGN.md) for detailed design documentation.

## Testing

```bash
# Unit + integration tests
cargo test --all-features

# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings

# Format check
cargo fmt --all -- --check
```

## License

MIT