pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# pipeflow 设计文档

[English](DESIGN.md)

## 概述

pipeflow 是一个 Rust 编写的轻量级、配置驱动的数据管道框架。
它遵循经典的 ETL 模式,采用 DAG (有向无环图) 执行模型。

```text
Source (源) → Transform (变换) → Sink (汇)
```

注意:这是一份设计文档。某些章节描述的是计划中的功能,尚未完全实现。关于当前行为和支持的节点类型,请参阅 `README.md`。

## 设计决策

| 决策      | 选择                        | 理由                                        |
| --------- | --------------------------- | ------------------------------------------- |
| 数据格式  | `Message { meta, payload }` | 通用元数据 + JSON 负载                      |
| Remap DSL | 类 JSONPath                 | 简单的字段映射,例如 `$.data.price`         |
| 窗口状态  | 非持久化                    | 重启会清除窗口,可接受的权衡                |
| 错误处理  | DLQ + Event + Notify        | 失败消息 → DLQ, 错误 → Event, 警报 → Notify |
| 配置      | YAML + DAG 校验             | 声明式,加载时进行环路检测                  |

## 配置加载与标准化 (已实现)

Pipeflow 通过 `Config::from_file(path)` 加载配置:

- **文件**: 解析单个 YAML 文件。
- **目录**: 按 **字典序** 加载所有 `*.yaml` / `*.yml` 文件并合并它们。

加载后,配置会被 **标准化** 然后 **校验**:

- **系统源/汇**:
  - 内置系统节点是隐式的,始终可用。
  - 系统汇输出路径可通过 `system.sinks` 配置。

校验侧重于 **管道连线语义** (唯一 ID、引用、仅源输入、未使用节点、环路检测和系统路由约束)。节点特定的 `config` 模式(例如 `http_client` 的必填字段)在 `Engine::build()` 期间进行校验。

## 管道连线模型

Pipeflow 支持灵活的 DAG 连线:

- **Sources** 在配置中不声明输入和输出。
- **Transforms** 声明一个或多个 `inputs`(源或其他变换)和一个或多个 `outputs`(汇或其他变换)。
- **Transform chaining** (变换链) 受支持:变换可以连接到其他变换以进行多阶段处理。
- **Transforms** 可以省略 `steps` 以充当直通路由。
- **Transforms** 可以通过 `inputs`/`outputs` 连接到其他变换(任一侧声明即可)。
- **Sinks** 无输入;它们的目标由 sink 类型/配置定义。

变换链示例:

```yaml
transforms:
  - id: stage1
    inputs: [source]
    outputs: [stage2] # 输出到另一个变换
  - id: stage2
    inputs: [stage1] # 从另一个变换输入
    outputs: [sink]
```

## 核心数据模型

### Message (消息)

```rust
pub struct Message {
    pub meta: MessageMeta,
    pub payload: serde_json::Value,
}

pub struct MessageMeta {
    pub id: Uuid,                          // UUIDv7 用于时间排序
    pub timestamp: i64,                    // 创建时间戳 (ms)
    pub source_node: String,               // 发起节点 ID
    pub correlation_id: Option<Uuid>,      // 用于追踪事件链
    pub chain_depth: u8,                   // 防止无限循环
    pub tags: HashMap<String, String>,     // 自定义键值对
}
```

### System Channels (系统通道) (已实现)

系统通道处理错误路由和可观测性:

| 通道   | 用途     | 数据类型  | 描述               |
| ------ | -------- | --------- | ------------------ |
| DLQ    | 失败消息 | `Message` | 处理失败的原始消息 |
| Event  | 错误详情 | `Event`   | 结构化的错误信息   |
| Notify | 用户警报 | `Notify`  | 需要用户关注的通知 |

**路由行为**:

- 变换/汇错误 → DLQ (原始消息) + Event (错误详情)
- 缓冲区溢出 → Event (错误详情) + Notify (用户警报)
- 源错误 → 仅 Event (错误详情)

### PipelineMetrics (管道指标) (已实现)

```rust
pub struct PipelineMetrics {
    pub messages_sent: AtomicU64,      // 源发送的总消息数
    pub messages_received: AtomicU64,  // 汇接收的总消息数
    pub messages_dropped: AtomicU64,   // 丢弃的总消息数 (缓冲区溢出)
    pub errors: AtomicU64,             // 总处理错误数
    pub dlq_sent: AtomicU64,           // 发送到 DLQ 的总消息数
}
```

通过 `Engine::metrics()` 获取当前值的快照。

## 节点类型

### Sources (源)

| 类型          | 描述              | 关键配置                     | 状态   |
| ------------- | ----------------- | ---------------------------- | ------ |
| `http_client` | HTTP 轮询         | url, interval, headers, auth | 已实现 |
| `http_server` | HTTP 推送/webhook | bind, path, auth             | 已实现 |
| `sql`         | SQL 轮询          | connection, query            | 已实现 |
| `file`        | 文件监听          | path, mode                   | 已实现 |
| `redis`       | Redis 键轮询      | url, key, interval           | 已实现 |
| `websocket`   | WebSocket 流      | url, subscribe_msg           | 计划中 |

#### System Sources (系统源)

系统源具有映射到特定系统通道的固定 ID:

| 源 ID                    | 数据结构  | 描述                      |
| ------------------------ | --------- | ------------------------- |
| `source::system::dlq`    | `Message` | 接收处理失败的原始消息    |
| `source::system::event`  | `Event`   | 接收结构化的错误/事件信息 |
| `source::system::notify` | `Notify`  | 接收面向用户的通知/警报   |

系统汇是隐式的,默认写入 JSONL 输出:

- `sink::system::dlq` -> `data/system_dlq.jsonl`
- `sink::system::event` -> `data/system_event.jsonl`
- `sink::system::notify` -> `data/system_notify.jsonl`

默认情况下,每个系统源都会发送到其对应的系统汇。你可以通过将系统源连接到变换中,将其路由到自定义汇。

**Event 结构**:

```rust
pub struct Event {
    pub name: String,                        // 事件名称/类型
    pub payload: serde_json::Value,          // 事件数据
    pub labels: HashMap<String, String>,     // 键值标签
    pub timestamp: i64,                      // 时间戳 (ms)
}
```

**Notify 结构**:

```rust
pub struct Notify {
    pub name: String,
    pub severity: NotifySeverity,             // Info | Warning | Error | Critical
    pub message: String,
    pub labels: HashMap<String, String>,
    pub timestamp: i64,
}

pub enum NotifySeverity {
    Info,
    Warning,
    Error,
    Critical,
}
```

系统源是隐式的,不需要在配置中声明。

### Transforms (变换)

变换使用多步骤管道架构。每个变换包含零个或多个按顺序执行的步骤。

| 步骤类型  | 描述           | I/O 比率 | 状态   |
| --------- | -------------- | -------- | ------ |
| `filter`  | 条件过滤       | 1:0/1    | 已实现 |
| `remap`   | 字段映射       | 1:1      | 已实现 |
| `window`  | 基于窗口的聚合 | n:1      | 已实现 |
| `compute` | 数学表达式计算 | 1:1      | 已实现 |
| `hash`    | 确定性 ID 生成 | 1:1      | 已实现 |

**配置示例**:

```yaml
transforms:
  - id: process_data
    inputs: [source]
    outputs: [sink]
    steps:
      # 步骤 1: 重映射字段
      - type: remap
        config:
          mappings:
            # 从源路径提取
            - from: "$.data.user_id"
              to: "$.user_id"
            # 静态值赋值
            - value: "processed"
              to: "$.status"
            # 带插值的模板字符串
            - from: "User: {{ $.data.name }} (ID: {{ $.data.user_id }})"
              to: "$.description"
          keep_unmapped: false

      # 步骤 2: 过滤 (单个条件)
      - type: filter
        config:
          field: "$.status"
          operator: eq
          value: "active"

      # 步骤 2 替代方案: 过滤 (多个条件)
      - type: filter
        config:
          mode: and # 或 "or"
          conditions:
            - field: "$.status"
              operator: eq
              value: "active"
            - field: "$.score"
              operator: ge
              value: 50
```

**过滤器操作符**:

| 操作符     | 描述             |
| ---------- | ---------------- |
| `eq`       | 等于             |
| `ne`       | 不等于           |
| `gt`       | 大于             |
| `ge`       | 大于或等于       |
| `lt`       | 小于             |
| `le`       | 小于或等于       |
| `abs_gt`   | 绝对值大于       |
| `abs_ge`   | 绝对值大于或等于 |
| `abs_lt`   | 绝对值小于       |
| `abs_le`   | 绝对值小于或等于 |
| `contains` | 字符串包含       |
| `matches`  | 正则匹配         |

**窗口步骤 (已实现)**:

```yaml
- type: window
  config:
    duration: "30s" # 时间触发 (可选)
    size: 10 # 计数触发 (可选, 至少需要一个)
    operation: merge # merge | select_one (默认: merge)
    strategy: first # 用于 select_one: first | last (默认: first)
    max_messages: 10000 # 缓冲区容量 (默认: 10000)
    on_overflow: drop_oldest # drop_oldest | error (默认: drop_oldest)
```

### Sinks (汇)

| 类型          | 描述                  | 状态   |
| ------------- | --------------------- | ------ |
| `blackhole`   | 丢弃 (默认 DLQ)       | 已实现 |
| `console`     | 打印到标准输出        | 已实现 |
| `file`        | 写入文件              | 已实现 |
| `sql`         | SQL 数据库插入        | 已实现 |
| `redis`       | Redis SET/SETEX       | 已实现 |
| `notify`      | 邮件/Telegram/Webhook | 已实现 |
| `http_client` | HTTP API 调用         | 已实现 |

## 死信队列 (DLQ) (已实现)

处理错误会自动路由到系统通道:

```text
┌─────────┐     ┌───────────┐     ┌──────┐
│ Source  │────▶│ Transform │────▶│ Sink │
└────┬────┘     └─────┬─────┘     └───┬──┘
     │                │ error         │ error
     │                ├───────────────┤
     │                │               │
     │                ▼               ▼
     │      ┌─────────────────┐ ┌─────────────────┐
     │      │ source::system::dlq │ │ source::system::event │
     │      │  (original msg) │ │  (error details)│
     │      └─────────────────┘ └─────────────────┘
     └─────▶ source::system::event (error details only)
```

**路由行为**:

- 变换/汇错误 → DLQ (原始消息) + Event (错误详情)
- 缓冲区溢出 (Lagged) → Event (错误详情) + Notify (用户警报)

`source::system::dlq` 源可以连接到任何汇进行自定义处理:

```yaml
pipeline:
  transforms:
    - id: dlq_to_file
      inputs: [source::system::dlq]
      outputs: [dlq_file]

  sinks:
    - id: dlq_file
      type: file
      config:
        path: "./dead_letters.jsonl"
```

### 循环预防

源自 `source::system::dlq` 的消息在失败时不会重新路由到 DLQ,以防止无限循环。消息元数据中的 `chain_depth` 字段提供了额外的保护(最大深度: 8)。

## 核心 Traits

```rust
use async_trait::async_trait;
use tokio::sync::broadcast;

use crate::message::{Message, SharedMessage};

// Source: 生产消息 (通过广播扇出)
#[async_trait]
pub trait Source: Send + Sync {
    fn id(&self) -> &str;
    async fn run(
        &self,
        sender: broadcast::Sender<SharedMessage>,
        shutdown: broadcast::Receiver<()>,
    ) -> Result<()>;
}

// Transform: 处理消息 (1:n)
#[async_trait]
pub trait Transform: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, msg: Message) -> Result<Vec<Message>>;
}

// Sink: 消费消息
#[async_trait]
pub trait Sink: Send + Sync {
    fn id(&self) -> &str;
    async fn process(&self, msg: SharedMessage) -> Result<()>;
}
```