pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# pipeflow Design Document

[中文](DESIGN_CN.md)

## Overview

pipeflow is a lightweight, configuration-driven data pipeline framework for Rust.
It follows the classic ETL pattern with a DAG (Directed Acyclic Graph) execution model.

```text
Source → Transform → Sink
```

Note: This is a design document. Some sections describe planned functionality that
is not implemented yet. For the current behavior and supported node types, see
`README.md`.

## Design Decisions

| Decision       | Choice                      | Rationale                                              |
| -------------- | --------------------------- | ------------------------------------------------------ |
| Data Format    | `Message { meta, payload }` | Generic metadata + JSON payload                        |
| Remap DSL      | JSONPath-like               | Simple field mapping, e.g., `$.data.price`             |
| Window State   | Non-persistent              | Restart clears windows, acceptable trade-off           |
| Error Handling | DLQ + Event + Notify        | Failed messages → DLQ, errors → Event, alerts → Notify |
| Config         | YAML + DAG validation       | Declarative, cycle detection at load time              |

## Configuration Loading and Normalization (Implemented)

Pipeflow loads configuration via `Config::from_file(path)`:

- **File**: parse a single YAML file.
- **Directory**: load all `*.yaml` / `*.yml` files in **lexical order** and merge them.

After loading, the configuration is **normalized** and then **validated**:

- **System sources/sinks**:
  - Built-in system nodes are implicit and always available.
  - System sink output paths can be configured via `system.sinks`.

Validation focuses on **pipeline wiring semantics** (unique IDs, references, source-only inputs,
unused nodes, cycle detection, and system routing constraints). Node-specific `config` schemas
(e.g. required fields for `http_client`) are validated during `Engine::build()`.

## Pipeline Wiring Model

Pipeflow supports flexible DAG wiring:

- **Sources** have no inputs and no outputs declared in config.
- **Transforms** declare one or more `inputs` (sources or other transforms) and one or more `outputs` (sinks or other transforms).
- **Transform chaining** is supported: transforms can connect to other transforms for multi-stage processing.
- **Transforms** may omit `steps` to act as pass-through routers.
- **Transforms** can wire to other transforms via `inputs`/`outputs` (either side is enough).
- **Sinks** have no inputs; their target is defined by sink type/config.

Example transform chain:

```yaml
transforms:
  - id: stage1
    inputs: [source]
    outputs: [stage2] # Output to another transform
  - id: stage2
    inputs: [stage1] # Input from another transform
    outputs: [sink]
```

## Core Data Model

### Message

```rust
pub struct Message {
    pub meta: MessageMeta,
    pub payload: serde_json::Value,
}

pub struct MessageMeta {
    pub id: Uuid,                          // UUIDv7 for time-ordering
    pub timestamp: i64,                    // Created timestamp (ms)
    pub source_node: String,               // Originating node ID
    pub correlation_id: Option<Uuid>,      // For tracing event chains
    pub chain_depth: u8,                   // Prevents infinite loops
    pub tags: HashMap<String, String>,     // Custom key-value pairs
}
```

### System Channels (Implemented)

System channels handle error routing and observability:

| Channel | Purpose         | Data Type | Description                             |
| ------- | --------------- | --------- | --------------------------------------- |
| DLQ     | Failed messages | `Message` | Original message that failed processing |
| Event   | Error details   | `Event`   | Structured error information            |
| Notify  | User alerts     | `Notify`  | Notifications requiring user attention  |

**Routing behavior**:

- Transform/Sink errors → DLQ (original message) + Event (error details)
- Buffer overflow → Event (error details) + Notify (user alert)
- Source errors → Event (error details) only

### PipelineMetrics (Implemented)

```rust
pub struct PipelineMetrics {
    pub messages_sent: AtomicU64,      // Total messages sent by sources
    pub messages_received: AtomicU64,  // Total messages received by sinks
    pub messages_dropped: AtomicU64,   // Total messages dropped (buffer overflow)
    pub errors: AtomicU64,             // Total processing errors
    pub dlq_sent: AtomicU64,           // Total messages sent to DLQ
}
```

Access via `Engine::metrics()` to get a snapshot of current values.

## Node Types

### Sources

| Type          | Description         | Key Config                   | Status      |
| ------------- | ------------------- | ---------------------------- | ----------- |
| `http_client` | HTTP polling        | url, interval, headers, auth | Implemented |
| `http_server` | HTTP push/webhook   | bind, path, auth             | Implemented |
| `sql`         | SQL polling         | connection, query            | Implemented |
| `file`        | File watching       | path, mode                   | Implemented |
| `redis`       | Redis key polling   | url, key, interval           | Implemented |
| `websocket`   | WebSocket streaming | url, subscribe_msg           | Planned     |

#### System Sources

System sources have fixed IDs that map to specific system channels:

| Source ID                | Data Structure | Description                               |
| ------------------------ | -------------- | ----------------------------------------- |
| `source::system::dlq`    | `Message`      | Receives original messages that failed    |
| `source::system::event`  | `Event`        | Receives structured error/event info      |
| `source::system::notify` | `Notify`       | Receives user-facing notifications/alerts |

System sinks are implicit and write JSONL outputs by default:

- `sink::system::dlq` -> `data/system_dlq.jsonl`
- `sink::system::event` -> `data/system_event.jsonl`
- `sink::system::notify` -> `data/system_notify.jsonl`

By default, each system source emits to its corresponding system sink. You can route a system
source to user-defined sinks by wiring it into a transform.

**Event Structure**:

```rust
pub struct Event {
    pub name: String,                        // Event name/type
    pub payload: serde_json::Value,          // Event data
    pub labels: HashMap<String, String>,     // Key-value tags
    pub timestamp: i64,                      // Timestamp (ms)
}
```

**Notify Structure**:

```rust
pub struct Notify {
    pub name: String,
    pub severity: NotifySeverity,             // Info | Warning | Error | Critical
    pub message: String,
    pub labels: HashMap<String, String>,
    pub timestamp: i64,
}

pub enum NotifySeverity {
    Info,
    Warning,
    Error,
    Critical,
}
```

System sources are implicit and do not need to be declared in configuration.

### Transforms

Transforms use a multi-step pipeline architecture. Each transform contains zero or more
steps that execute sequentially.

| Step Type | Description               | I/O Ratio | Status      |
| --------- | ------------------------- | --------- | ----------- |
| `filter`  | Conditional filtering     | 1:0/1     | Implemented |
| `remap`   | Field mapping             | 1:1       | Implemented |
| `window`  | Window-based aggregation  | n:1       | Implemented |
| `compute` | Math expression eval      | 1:1       | Implemented |
| `hash`    | Generate deterministic ID | 1:1       | Implemented |

**Configuration Example**:

```yaml
transforms:
  - id: process_data
    inputs: [source]
    outputs: [sink]
    steps:
      # Step 1: Remap fields
      - type: remap
        config:
          mappings:
            # Extract from source path
            - from: "$.data.user_id"
              to: "$.user_id"
            # Static value assignment
            - value: "processed"
              to: "$.status"
            # Template string with interpolation
            - from: "User: {{ $.data.name }} (ID: {{ $.data.user_id }})"
              to: "$.description"
          keep_unmapped: false

      # Step 2: Filter (single condition)
      - type: filter
        config:
          field: "$.status"
          operator: eq
          value: "active"

      # Step 2 alternative: Filter (multiple conditions)
      - type: filter
        config:
          mode: and # or "or"
          conditions:
            - field: "$.status"
              operator: eq
              value: "active"
            - field: "$.score"
              operator: ge
              value: 50
```

**Filter Operators**:

| Operator   | Description                     |
| ---------- | ------------------------------- |
| `eq`       | Equal                           |
| `ne`       | Not equal                       |
| `gt`       | Greater than                    |
| `ge`       | Greater than or equal           |
| `lt`       | Less than                       |
| `le`       | Less than or equal              |
| `abs_gt`   | Absolute value greater than     |
| `abs_ge`   | Absolute value greater or equal |
| `abs_lt`   | Absolute value less than        |
| `abs_le`   | Absolute value less or equal    |
| `contains` | String contains                 |
| `matches`  | Regex match                     |

**Window Step (Implemented)**:

```yaml
- type: window
  config:
    duration: "30s" # Time trigger (optional)
    size: 10 # Count trigger (optional, at least one required)
    operation: merge # merge | select_one (default: merge)
    strategy: first # For select_one: first | last (default: first)
    max_messages: 10000 # Buffer capacity (default: 10000)
    on_overflow: drop_oldest # drop_oldest | error (default: drop_oldest)
```

### Sinks

| Type          | Description            | Status      |
| ------------- | ---------------------- | ----------- |
| `blackhole`   | Discard (default DLQ)  | Implemented |
| `console`     | Print to stdout        | Implemented |
| `file`        | Write to file          | Implemented |
| `sql`         | SQL Database insert    | Implemented |
| `redis`       | Redis SET/SETEX        | Implemented |
| `notify`      | Email/Telegram/Webhook | Implemented |
| `http_client` | HTTP API calls         | Implemented |

## Dead Letter Queue (DLQ) (Implemented)

Processing errors are automatically routed to system channels:

```text
┌─────────┐     ┌───────────┐     ┌──────┐
│ Source  │────▶│ Transform │────▶│ Sink │
└────┬────┘     └─────┬─────┘     └───┬──┘
     │                │ error         │ error
     │                ├───────────────┤
     │                │               │
     │                ▼               ▼
     │      ┌─────────────────┐ ┌─────────────────┐
     │      │ source::system::dlq │ │ source::system::event │
     │      │  (original msg) │ │  (error details)│
     │      └─────────────────┘ └─────────────────┘
     └─────▶ source::system::event (error details only)
```

**Routing behavior**:

- Transform/Sink errors → DLQ (original message) + Event (error details)
- Buffer overflow (Lagged) → Event (error details) + Notify (user alert)

The `source::system::dlq` source can be connected to any sink for custom handling:

```yaml
pipeline:
  transforms:
    - id: dlq_to_file
      inputs: [source::system::dlq]
      outputs: [dlq_file]

  sinks:
    - id: dlq_file
      type: file
      config:
        path: "./dead_letters.jsonl"
```

### Cycle Prevention

Messages originating from `source::system::dlq` are not re-routed to DLQ on failure
to prevent infinite loops. The `chain_depth` field in message metadata provides
additional protection (max depth: 8).

## Core Traits

```rust
use async_trait::async_trait;
use tokio::sync::broadcast;

use crate::message::{Message, SharedMessage};

// Source: Produces messages (fan-out via broadcast)
#[async_trait]
pub trait Source: Send + Sync {
    fn id(&self) -> &str;
    async fn run(
        &self,
        sender: broadcast::Sender<SharedMessage>,
        shutdown: broadcast::Receiver<()>,
    ) -> Result<()>;
}

// Transform: Processes messages (1:n)
#[async_trait]
pub trait Transform: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, msg: Message) -> Result<Vec<Message>>;
}

// Sink: Consumes messages
#[async_trait]
pub trait Sink: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, msg: SharedMessage) -> Result<()>;
}
```