pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# Pipeflow 项目功能与实现详解

[English](PROJECT_IMPL.md)

## 1. 项目概述 (Overview)

Pipeflow 是一个轻量级、配置驱动的 Rust 数据管道(Data Pipeline)框架。它允许用户通过 YAML 配置文件定义数据处理流程,无需编写代码即可实现常见的 ETL(Extract, Transform, Load)任务。

**核心理念:**

- **配置驱动**: 完整的管道定义(源、变换、汇)均通过 YAML 管理。
- **高性能**: 基于 Rust 语言和 Tokio 异步运行时,利用 `broadcast` 通道实现高效的消息分发。
- **模块化**: 提供标准化的 Source、Transform、Sink 抽象,易于扩展。
- **可观测性**: 内置系统级事件流(Events)、审计日志(Audit)和死信队列(DLQ)。

## 2. 核心架构 (Core Architecture)

项目采用了经典的 **DAG (有向无环图)** 架构模型:

```text
Source (数据源) → Transform (数据变换) → Sink (数据输出)
```

### 2.1 引擎 (Engine)

`src/engine.rs` 是核心调度器,负责:

1. **加载配置**: 解析 YAML,进行规范化和校验(检查环路、孤立节点等)。
2. **构建图谱**: 根据配置实例化各个节点(Source/Transform/Sink)。
3. **建立连接**: 使用 `tokio::sync::broadcast` 通道连接各节点,支持“扇出”(Fan-out)模式,即一个节点的输出可以同时发送给多个下游节点。
4. **生命周期管理**: 启动所有异步任务,并负责优雅停机(Graceful Shutdown)。

### 2.2 消息流转 (Message Passing)

- **消息结构**: `Message` 结构体封装了数据负载(`payload`)和元数据(`meta`)。
- **共享机制**: 使用 `Arc<Message>` 能够在多个下游节点间高效共享数据,避免不必要的内存拷贝。
- **背压控制**: `broadcast` 通道具有固定容量(可配置 `output_buffer_size`),当下游消费过慢时会丢弃旧消息并记录 `Lagged` 警告,防止内存无限增长。

## 3. 功能模块 (Modules)

### 3.1 数据源 (Sources)

位于 `src/source/`,负责生产数据:

- **`http_client`**: 定时轮询 HTTP API 获取数据。
- **`http_server`**: 启动 HTTP 服务接收 Webhook 推送。
- **`redis`**: 从 Redis 获取数据。
- **`sql`**: 执行 SQL 查询拉取数据。
- **`file`**: 监听文件变动或读取文件内容。
- **系统源**: `source::system::dlq` (死信), `source::system::event` (系统事件), `source::system::notify` (通知)。

### 3.2 数据变换 (Transforms)

位于 `src/transform/`,负责处理数据:

- **`remap`**: 字段重映射/重命名。
- **`filter`**: 基于条件(如正则匹配 `regex`)过滤消息。
- **`window`**: 时间窗口或计数窗口聚合。
- **`compute`**: 计算逻辑处理。
- **`hash`**: 生成确定性标识符。

### 3.3 数据汇 (Sinks)

位于 `src/sink/`,负责输出数据:

- **`console`**: 打印到标准输出(调试用)。
- **`file`**: 写入本地文件。
- **`http_client`**: 发送 HTTP 请求到外部服务。
- **`redis`**: 写入 Redis 命令。
- **`sql`**: 执行 SQL 插入/更新。
- **`notify`**: 发送通知(支持 Telegram, Email, Webhook 等)。
- **`blackhole`**: 丢弃数据(测试用)。

## 4. 关键机制实现 (Key Mechanisms)

### 4.1 系统通道 (System Channels)

Pipeflow 内置了一套“影子管道”用于处理系统级消息,在 `Engine` 初始化时建立:

- **DLQ (Dead Letter Queue)**: 捕获处理失败的消息。
- **Event (事件)**: 记录系统运行时的关键事件(如连接断开、重连)。
- **Audit (审计)**: 记录消息在每个节点的处理耗时和状态(Success/Failure)。

实现上,`Engine` 会创建一个全局的 `SystemChannels` 结构,包含三个 `mpsc` 发送端。每个 Source/Transform/Sink 在运行时都会持有这些发送端,在发生错误或处理完成时异步发送元数据。

### 4.2 死信队列与循环防护

为了防止死信消息本身再次触发错误导致无限循环,系统实现了 **链深度保护 (Chain Depth Protection)**:

- 消息元数据中包含 `chain_depth` 计数器。
- 当由于错误产生新的 DLQ 消息时,计数器加 1。
- 如果计数器超过阈值(如 8),消息将被强制丢弃,防止系统雪崩。

### 4.3 扇出与广播 (Fan-out & Broadcast)

引擎通过 `Engine::create_node_channels` 为每个 Source 和 Transform 创建一个 `broadcast::Sender`。

- **Transforms** 订阅上游节点的广播通道。
- **Sinks** 同样订阅上游节点的广播通道。
- Sink 使用 `futures::stream::select_all` 将多个上游输入流合并为一个流进行顺序处理。

## 5. 总结

Pipeflow 通过 Rust 的强类型系统和 Tokio 的高并发能力,构建了一个既安全又高效的数据处理框架。其设计核心在于**配置与逻辑分离**,让用户专注于业务逻辑(YAML 配置),而将并发控制、错误处理和资源管理交给框架底层实现。