# pipeflow
[](https://crates.io/crates/pipeflow)
[](https://docs.rs/pipeflow)
[](https://opensource.org/licenses/MIT)
[English](README.md)
Rust 编写的轻量级、配置驱动的数据管道框架。
```text
Source → Transform → Sink
```
## 特性 (Features)
- **YAML 配置**: 声明式的管道定义,支持 DAG 校验(ID 重复、输入/输出缺失、环路检测)。
- **扇出 (Fan-out)**: 一个 Source 可以将数据广播给多个 Sink。
- **可配置广播容量**: 可调整 `output_buffer_size`(Source/Transform 的广播通道容量)。
- **内置节点**:
- Sources: `http_client`, `http_server`, `file`, `redis`, `sql`
- Sinks: `console`, `file`, `blackhole`, `http_client`, `redis`, `sql`, `notify`
- **CLI 工具**: `run` (运行), `config validate` (校验), `config show` (查看), `config graph` (图示)
## 特性标志 (Feature Flags)
Pipeflow 使用 Cargo features 来管理可选依赖。
- `http-client` (默认): 启用 `http_client` Source 和 Sink。
- `http-server`: 启用 `http_server` Source。
- `database`: 启用 `sql` Source 和 Sink。
- `redis`: 启用 `redis` Source 和 Sink。
- `file` (默认): 启用 `file` Source 和 Sink。
- `notify` (默认): 启用 `notify` Sink。
仅构建核心功能(不包含可选 Source/Sink):
```bash
cargo build --no-default-features
```
如果管道配置引用了被禁用特性背后的节点,`Engine::build()` 将返回配置错误,说明需要哪个特性。
## 快速开始 (Quick Start)
### 环境要求
- Rust 1.92 或更高版本 (使用 Rust 2024 edition)
### 安装
```bash
cargo add pipeflow
```
### 配置
创建一个管道配置文件 `pipeline.yaml`:
```yaml
system:
# output_buffer_size: Source/Transform 的广播通道容量 (默认: 1024)
output_buffer_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
# schedule: "0 0 * * *" # 每天 00:00 运行 (本地时间, 5 个字段; 秒默认为 0)
transforms:
- id: pass_through
inputs: [api_poller]
outputs: [console]
sinks:
- id: console
type: console
config:
format: pretty
```
#### 节点连线
Pipeflow 的连线方式为 `source -> transform -> sink`:
- **Transforms** 声明 `inputs`(一个或多个 Source 或 Transform)。
- **Transforms** 声明 `outputs`(一个或多个 Sink 或 Transform)。
- 变换到变换的连线可以在任一侧声明;引擎会自动推断缺失的一侧。
- Transforms 可以省略 `steps` 以充当直通节点。
- **Sources** 不声明 `inputs` 或 `outputs`。
- **Sinks** 不声明 `inputs`;它们的目标由 sink 类型/配置定义(例如文件路径)。
#### 从目录加载
所有接受 `CONFIG` 的命令也接受 **目录**。当提供目录时,pipeflow 会按 **字典序** 加载所有 `*.yaml` / `*.yml` 文件,并将它们合并为单个配置,然后再进行标准化和校验。
这对于大型管道非常有用:
```bash
# 基于目录的配置
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yaml
```
### 运行 (编程方式)
```rust
use pipeflow::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_file("pipeline.yaml")?;
let mut engine = Engine::from_config(config)?;
engine.build().await?;
engine.run().await
}
```
## 节点类型 (Node Types)
### Sources
| `http_client` | HTTP 轮询 | 已实现 |
| `http_server` | HTTP 推送/Webhook | 已实现 |
| `redis` | Redis GET 轮询 | 已实现 |
| `sql` | SQL 轮询 | 已实现 |
| `file` | 文件监听 | 已实现 |
**System Sources** (隐式, 固定 ID):
- `source::system::dlq` - 死信队列 (Dead Letter Queue)
- `source::system::event` - 系统事件
- `source::system::notify` - 通知
System Sources 始终可用,并且默认会发送到对应的 System Sinks。你可以通过在 Transform 的 `inputs` 中引用 System Source 来在管道中消费它。
### Transforms
| `remap` | 字段映射 (步骤) | 1:1 | 已实现 |
| `filter` | 条件过滤 | 1:0/1 | 已实现 |
| `window` | 时间/计数聚合 | n:1 | 已实现 |
| `compute` | 数学表达式计算 | 1:1 | 已实现 |
| `hash` | 生成哈希 ID | 1:1 | 已实现 |
### Sinks
| `blackhole` | 丢弃消息 | 已实现 |
| `console` | 打印到标准输出 | 已实现 |
| `file` | 写入文件 | 已实现 |
| `sql` | SQL 数据库插入 | 已实现 |
| `redis` | Redis 操作 | 已实现 |
| `notify` | 邮件/Telegram/Webhook | 已实现 |
| `http_client` | HTTP API 调用 | 已实现 |
**System Sinks** (隐式, 固定 ID):
- `sink::system::dlq` - 默认 DLQ 输出 (`data/system_dlq.jsonl`)
- `sink::system::event` - 默认事件输出 (`data/system_event.jsonl`)
- `sink::system::notify` - 默认通知输出 (`data/system_notify.jsonl`)
你可以通过 `PIPEFLOW_SYSTEM_SINK_DIR` 覆盖基础目录,或在 `system.sinks` 中配置路径。
```yaml
system:
sinks:
dir: ./data
dlq: ./data/custom_dlq.jsonl
event: ./data/custom_event.jsonl
notify: ./data/custom_notify.jsonl
```
### 配置参考
有关所有支持的 Source 和 Sink 的详细配置参数,请参阅 [docs/CONFIGURATION_CN.md](docs/CONFIGURATION_CN.md)。
## 广播缓冲区配置
Pipeflow 使用 `tokio::sync::broadcast` 通道连接可发送消息的节点(Source/Transform)。你可以通过 `output_buffer_size` 调整广播容量。
```yaml
system:
output_buffer_size: 1024 # Source/Transform 的广播通道容量
```
注意:
- Source 可以根据每个 Source 覆盖 `output_buffer_size`。
- 如果 Sink/Transform 落后于广播缓冲区,它可能会丢弃消息并记录 `Lagged`。
## 死信队列 (Dead Letter Queue)
`source::system::dlq` Source 已实现,可以连接到任何 Sink。链深度保护 (Chain-depth protection) 可防止通过 System Sources 路由消息时出现无限循环(最大深度: 8)。
**当前状态:**
- `source::system::dlq` Source: 已实现
- 链深度保护: 已实现
- Transform/Sink 错误时的自动 DLQ 路由: 已实现
完整设计请参阅 `docs/DESIGN_CN.md`。
## CLI 命令
```bash
# 运行管道
pipeflow run config.yaml
# 校验配置
pipeflow config validate config.yaml
# 显示管道图 (ASCII)
pipeflow config graph config.yaml
# 显示合并 + 标准化后的配置
pipeflow config show config.yaml --format yaml
```
注意:
- `pipeflow config validate` 检查 YAML 结构和管道连线(ID、引用、环路、系统路由)。它 **不会** 校验节点特定的
`config` 内容(例如必须的 `http_client.url`);这些在 `Engine::build()`(因此也在 `pipeflow run`)期间通过
serde 校验。
- 如果你使用基于目录的配置,`config show` 会显示合并 + 标准化后的结果。
## 分布式与高可用
**Pipeflow 设计为独立运行 (Stand-alone)。**
为了保持架构简单且健壮 (KISS 原则),Pipeflow 不实现复杂的分布式协调协议(如 Raft 或 Paxos)。
- **持久化**: 状态(如静默记录)存储在本地文件系统(默认为 `./data`)。我们移除了像 Redis 这样的复杂分布式后端用于静默,以支持简单性和文件系统原子性。
- **扩展**: 我们推荐 **手动分片 (Manual Sharding)**。部署多个独立实例,每个实例处理不同的配置文件子集。
- **高可用**: 使用详细的健康检查(例如 K8s 存活探针)来重启失败的实例。如果你需要跨实例共享状态(例如共享静默),请将共享卷 (NFS/EFS) 挂载到 `data_dir`。
## 文档
详细设计文档请参阅 [docs/DESIGN_CN.md](docs/DESIGN_CN.md)。
## 测试
```bash
# 单元 + 集成测试
cargo test --all-features
# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings
# 格式检查
cargo fmt --all -- --check
```
## 许可证
MIT