# pipeflow 设计文档
[English](DESIGN.md)
## 概述
pipeflow 是一个 Rust 编写的轻量级、配置驱动的数据管道框架。
它遵循经典的 ETL 模式,采用 DAG (有向无环图) 执行模型。
```text
Source (源) → Transform (变换) → Sink (汇)
```
注意:这是一份设计文档。某些章节描述的是计划中的功能,尚未完全实现。关于当前行为和支持的节点类型,请参阅 `README.md`。
## 设计决策
| 数据格式 | `Message { meta, payload }` | 通用元数据 + JSON 负载 |
| Remap DSL | 类 JSONPath | 简单的字段映射,例如 `$.data.price` |
| 窗口状态 | 非持久化 | 重启会清除窗口,可接受的权衡 |
| 错误处理 | DLQ + Event + Notify | 失败消息 → DLQ, 错误 → Event, 警报 → Notify |
| 配置 | YAML + DAG 校验 | 声明式,加载时进行环路检测 |
## 配置加载与标准化 (已实现)
Pipeflow 通过 `Config::from_file(path)` 加载配置:
- **文件**: 解析单个 YAML 文件。
- **目录**: 按 **字典序** 加载所有 `*.yaml` / `*.yml` 文件并合并它们。
加载后,配置会被 **标准化** 然后 **校验**:
- **系统源/汇**:
- 内置系统节点是隐式的,始终可用。
- 系统汇输出路径可通过 `system.sinks` 配置。
校验侧重于 **管道连线语义** (唯一 ID、引用、仅源输入、未使用节点、环路检测和系统路由约束)。节点特定的 `config` 模式(例如 `http_client` 的必填字段)在 `Engine::build()` 期间进行校验。
## 管道连线模型
Pipeflow 支持灵活的 DAG 连线:
- **Sources** 在配置中不声明输入和输出。
- **Transforms** 声明一个或多个 `inputs`(源或其他变换)和一个或多个 `outputs`(汇或其他变换)。
- **Transform chaining** (变换链) 受支持:变换可以连接到其他变换以进行多阶段处理。
- **Transforms** 可以省略 `steps` 以充当直通路由。
- **Transforms** 可以通过 `inputs`/`outputs` 连接到其他变换(任一侧声明即可)。
- **Sinks** 无输入;它们的目标由 sink 类型/配置定义。
变换链示例:
```yaml
transforms:
- id: stage1
inputs: [source]
outputs: [stage2] # 输出到另一个变换
- id: stage2
inputs: [stage1] # 从另一个变换输入
outputs: [sink]
```
## 核心数据模型
### Message (消息)
```rust
pub struct Message {
pub meta: MessageMeta,
pub payload: serde_json::Value,
}
pub struct MessageMeta {
pub id: Uuid, // UUIDv7 用于时间排序
pub timestamp: i64, // 创建时间戳 (ms)
pub source_node: String, // 发起节点 ID
pub correlation_id: Option<Uuid>, // 用于追踪事件链
pub chain_depth: u8, // 防止无限循环
pub tags: HashMap<String, String>, // 自定义键值对
}
```
### System Channels (系统通道) (已实现)
系统通道处理错误路由和可观测性:
| DLQ | 失败消息 | `Message` | 处理失败的原始消息 |
| Event | 错误详情 | `Event` | 结构化的错误信息 |
| Notify | 用户警报 | `Notify` | 需要用户关注的通知 |
**路由行为**:
- 变换/汇错误 → DLQ (原始消息) + Event (错误详情)
- 缓冲区溢出 → Event (错误详情) + Notify (用户警报)
- 源错误 → 仅 Event (错误详情)
### PipelineMetrics (管道指标) (已实现)
```rust
pub struct PipelineMetrics {
pub messages_sent: AtomicU64, // 源发送的总消息数
pub messages_received: AtomicU64, // 汇接收的总消息数
pub messages_dropped: AtomicU64, // 丢弃的总消息数 (缓冲区溢出)
pub errors: AtomicU64, // 总处理错误数
pub dlq_sent: AtomicU64, // 发送到 DLQ 的总消息数
}
```
通过 `Engine::metrics()` 获取当前值的快照。
## 节点类型
### Sources (源)
| `http_client` | HTTP 轮询 | url, interval, headers, auth | 已实现 |
| `http_server` | HTTP 推送/webhook | bind, path, auth | 已实现 |
| `sql` | SQL 轮询 | connection, query | 已实现 |
| `file` | 文件监听 | path, mode | 已实现 |
| `redis` | Redis 键轮询 | url, key, interval | 已实现 |
| `websocket` | WebSocket 流 | url, subscribe_msg | 计划中 |
#### System Sources (系统源)
系统源具有映射到特定系统通道的固定 ID:
| `source::system::dlq` | `Message` | 接收处理失败的原始消息 |
| `source::system::event` | `Event` | 接收结构化的错误/事件信息 |
| `source::system::notify` | `Notify` | 接收面向用户的通知/警报 |
系统汇是隐式的,默认写入 JSONL 输出:
- `sink::system::dlq` -> `data/system_dlq.jsonl`
- `sink::system::event` -> `data/system_event.jsonl`
- `sink::system::notify` -> `data/system_notify.jsonl`
默认情况下,每个系统源都会发送到其对应的系统汇。你可以通过将系统源连接到变换中,将其路由到自定义汇。
**Event 结构**:
```rust
pub struct Event {
pub name: String, // 事件名称/类型
pub payload: serde_json::Value, // 事件数据
pub labels: HashMap<String, String>, // 键值标签
pub timestamp: i64, // 时间戳 (ms)
}
```
**Notify 结构**:
```rust
pub struct Notify {
pub name: String,
pub severity: NotifySeverity, // Info | Warning | Error | Critical
pub message: String,
pub labels: HashMap<String, String>,
pub timestamp: i64,
}
pub enum NotifySeverity {
Info,
Warning,
Error,
Critical,
}
```
系统源是隐式的,不需要在配置中声明。
### Transforms (变换)
变换使用多步骤管道架构。每个变换包含零个或多个按顺序执行的步骤。
| `filter` | 条件过滤 | 1:0/1 | 已实现 |
| `remap` | 字段映射 | 1:1 | 已实现 |
| `window` | 基于窗口的聚合 | n:1 | 已实现 |
| `compute` | 数学表达式计算 | 1:1 | 已实现 |
| `hash` | 确定性 ID 生成 | 1:1 | 已实现 |
**配置示例**:
```yaml
transforms:
- id: process_data
inputs: [source]
outputs: [sink]
steps:
# 步骤 1: 重映射字段
- type: remap
config:
mappings:
# 从源路径提取
- from: "$.data.user_id"
to: "$.user_id"
# 静态值赋值
- value: "processed"
to: "$.status"
# 带插值的模板字符串
- from: "User: {{ $.data.name }} (ID: {{ $.data.user_id }})"
to: "$.description"
keep_unmapped: false
# 步骤 2: 过滤 (单个条件)
- type: filter
config:
field: "$.status"
operator: eq
value: "active"
# 步骤 2 替代方案: 过滤 (多个条件)
- type: filter
config:
mode: and # 或 "or"
conditions:
- field: "$.status"
operator: eq
value: "active"
- field: "$.score"
operator: ge
value: 50
```
**过滤器操作符**:
| 操作符 | 描述 |
| ---------- | ---------------- |
| `eq` | 等于 |
| `ne` | 不等于 |
| `gt` | 大于 |
| `ge` | 大于或等于 |
| `lt` | 小于 |
| `le` | 小于或等于 |
| `abs_gt` | 绝对值大于 |
| `abs_ge` | 绝对值大于或等于 |
| `abs_lt` | 绝对值小于 |
| `abs_le` | 绝对值小于或等于 |
| `contains` | 字符串包含 |
| `matches` | 正则匹配 |
**窗口步骤 (已实现)**:
```yaml
- type: window
config:
duration: "30s" # 时间触发 (可选)
size: 10 # 计数触发 (可选, 至少需要一个)
operation: merge # merge | select_one (默认: merge)
strategy: first # 用于 select_one: first | last (默认: first)
max_messages: 10000 # 缓冲区容量 (默认: 10000)
on_overflow: drop_oldest # drop_oldest | error (默认: drop_oldest)
```
### Sinks (汇)
| `blackhole` | 丢弃 (默认 DLQ) | 已实现 |
| `console` | 打印到标准输出 | 已实现 |
| `file` | 写入文件 | 已实现 |
| `sql` | SQL 数据库插入 | 已实现 |
| `redis` | Redis SET/SETEX | 已实现 |
| `notify` | 邮件/Telegram/Webhook | 已实现 |
| `http_client` | HTTP API 调用 | 已实现 |
## 死信队列 (DLQ) (已实现)
处理错误会自动路由到系统通道:
```text
┌─────────┐ ┌───────────┐ ┌──────┐
│ Source │────▶│ Transform │────▶│ Sink │
└────┬────┘ └─────┬─────┘ └───┬──┘
│ │ error │ error
│ ├───────────────┤
│ │ │
│ ▼ ▼
│ ┌─────────────────┐ ┌─────────────────┐
│ │ source::system::dlq │ │ source::system::event │
│ │ (original msg) │ │ (error details)│
│ └─────────────────┘ └─────────────────┘
│
└─────▶ source::system::event (error details only)
```
**路由行为**:
- 变换/汇错误 → DLQ (原始消息) + Event (错误详情)
- 缓冲区溢出 (Lagged) → Event (错误详情) + Notify (用户警报)
`source::system::dlq` 源可以连接到任何汇进行自定义处理:
```yaml
pipeline:
transforms:
- id: dlq_to_file
inputs: [source::system::dlq]
outputs: [dlq_file]
sinks:
- id: dlq_file
type: file
config:
path: "./dead_letters.jsonl"
```
### 循环预防
源自 `source::system::dlq` 的消息在失败时不会重新路由到 DLQ,以防止无限循环。消息元数据中的 `chain_depth` 字段提供了额外的保护(最大深度: 8)。
## 核心 Traits
```rust
use async_trait::async_trait;
use tokio::sync::broadcast;
use crate::message::{Message, SharedMessage};
// Source: 生产消息 (通过广播扇出)
#[async_trait]
pub trait Source: Send + Sync {
fn id(&self) -> &str;
async fn run(
&self,
sender: broadcast::Sender<SharedMessage>,
shutdown: broadcast::Receiver<()>,
) -> Result<()>;
}
// Transform: 处理消息 (1:n)
#[async_trait]
pub trait Transform: Send + Sync {
fn id(&self) -> &str;
async fn process(&self, msg: Message) -> Result<Vec<Message>>;
}
// Sink: 消费消息
#[async_trait]
pub trait Sink: Send + Sync {
fn id(&self) -> &str;
async fn process(&self, msg: SharedMessage) -> Result<()>;
}
```