pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# pipeflow

[![Crates.io](https://img.shields.io/crates/v/pipeflow.svg)](https://crates.io/crates/pipeflow)
[![Documentation](https://docs.rs/pipeflow/badge.svg)](https://docs.rs/pipeflow)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

[English](README.md)

Rust 编写的轻量级、配置驱动的数据管道框架。

```text
Source → Transform → Sink
```

## 特性 (Features)

- **YAML 配置**: 声明式的管道定义,支持 DAG 校验(ID 重复、输入/输出缺失、环路检测)。
- **扇出 (Fan-out)**: 一个 Source 可以将数据广播给多个 Sink。
- **可配置广播容量**: 可调整 `output_buffer_size`(Source/Transform 的广播通道容量)。
- **内置节点**:
  - Sources: `http_client`, `http_server`, `file`, `redis`, `sql`
  - Sinks: `console`, `file`, `blackhole`, `http_client`, `redis`, `sql`, `notify`
- **CLI 工具**: `run` (运行), `config validate` (校验), `config show` (查看), `config graph` (图示)

## 特性标志 (Feature Flags)

Pipeflow 使用 Cargo features 来管理可选依赖。

- `http-client` (默认): 启用 `http_client` Source 和 Sink。
- `http-server`: 启用 `http_server` Source。
- `database`: 启用 `sql` Source 和 Sink。
- `redis`: 启用 `redis` Source 和 Sink。
- `file` (默认): 启用 `file` Source 和 Sink。
- `notify` (默认): 启用 `notify` Sink。

仅构建核心功能(不包含可选 Source/Sink):

```bash
cargo build --no-default-features
```

如果管道配置引用了被禁用特性背后的节点,`Engine::build()` 将返回配置错误,说明需要哪个特性。

## 快速开始 (Quick Start)

### 环境要求

- Rust 1.92 或更高版本 (使用 Rust 2024 edition)

### 安装

```bash
cargo add pipeflow
```

### 配置

创建一个管道配置文件 `pipeline.yaml`:

```yaml
system:
  # output_buffer_size: Source/Transform 的广播通道容量 (默认: 1024)
  output_buffer_size: 1024

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"
        # schedule: "0 0 * * *" # 每天 00:00 运行 (本地时间, 5 个字段; 秒默认为 0)

  transforms:
    - id: pass_through
      inputs: [api_poller]
      outputs: [console]

  sinks:
    - id: console
      type: console
      config:
        format: pretty
```

#### 节点连线

Pipeflow 的连线方式为 `source -> transform -> sink`:

- **Transforms** 声明 `inputs`(一个或多个 Source 或 Transform)。
- **Transforms** 声明 `outputs`(一个或多个 Sink 或 Transform)。
- 变换到变换的连线可以在任一侧声明;引擎会自动推断缺失的一侧。
- Transforms 可以省略 `steps` 以充当直通节点。
- **Sources** 不声明 `inputs``outputs`- **Sinks** 不声明 `inputs`;它们的目标由 sink 类型/配置定义(例如文件路径)。

#### 从目录加载

所有接受 `CONFIG` 的命令也接受 **目录**。当提供目录时,pipeflow 会按 **字典序** 加载所有 `*.yaml` / `*.yml` 文件,并将它们合并为单个配置,然后再进行标准化和校验。

这对于大型管道非常有用:

```bash
# 基于目录的配置
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yaml
```

### 运行 (编程方式)

```rust
use pipeflow::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let config = Config::from_file("pipeline.yaml")?;
    let mut engine = Engine::from_config(config)?;
    engine.build().await?;
    engine.run().await
}
```

## 节点类型 (Node Types)

### Sources

| 类型          | 描述              | 状态   |
| ------------- | ----------------- | ------ |
| `http_client` | HTTP 轮询         | 已实现 |
| `http_server` | HTTP 推送/Webhook | 已实现 |
| `redis`       | Redis GET 轮询    | 已实现 |
| `sql`         | SQL 轮询          | 已实现 |
| `file`        | 文件监听          | 已实现 |

**System Sources** (隐式, 固定 ID):

- `source::system::dlq` - 死信队列 (Dead Letter Queue)
- `source::system::event` - 系统事件
- `source::system::notify` - 通知

System Sources 始终可用,并且默认会发送到对应的 System Sinks。你可以通过在 Transform 的 `inputs` 中引用 System Source 来在管道中消费它。

### Transforms

| 类型      | 描述            | I/O   | 状态   |
| --------- | --------------- | ----- | ------ |
| `remap`   | 字段映射 (步骤) | 1:1   | 已实现 |
| `filter`  | 条件过滤        | 1:0/1 | 已实现 |
| `window`  | 时间/计数聚合   | n:1   | 已实现 |
| `compute` | 数学表达式计算  | 1:1   | 已实现 |
| `hash`    | 生成哈希 ID     | 1:1   | 已实现 |

### Sinks

| 类型          | 描述                  | 状态   |
| ------------- | --------------------- | ------ |
| `blackhole`   | 丢弃消息              | 已实现 |
| `console`     | 打印到标准输出        | 已实现 |
| `file`        | 写入文件              | 已实现 |
| `sql`         | SQL 数据库插入        | 已实现 |
| `redis`       | Redis 操作            | 已实现 |
| `notify`      | 邮件/Telegram/Webhook | 已实现 |
| `http_client` | HTTP API 调用         | 已实现 |

**System Sinks** (隐式, 固定 ID):

- `sink::system::dlq` - 默认 DLQ 输出 (`data/system_dlq.jsonl`)
- `sink::system::event` - 默认事件输出 (`data/system_event.jsonl`)
- `sink::system::notify` - 默认通知输出 (`data/system_notify.jsonl`)

你可以通过 `PIPEFLOW_SYSTEM_SINK_DIR` 覆盖基础目录,或在 `system.sinks` 中配置路径。

```yaml
system:
  sinks:
    dir: ./data
    dlq: ./data/custom_dlq.jsonl
    event: ./data/custom_event.jsonl
    notify: ./data/custom_notify.jsonl
```

### 配置参考

有关所有支持的 Source 和 Sink 的详细配置参数,请参阅 [docs/CONFIGURATION_CN.md](docs/CONFIGURATION_CN.md)。

## 广播缓冲区配置

Pipeflow 使用 `tokio::sync::broadcast` 通道连接可发送消息的节点(Source/Transform)。你可以通过 `output_buffer_size` 调整广播容量。

```yaml
system:
  output_buffer_size: 1024 # Source/Transform 的广播通道容量
```

注意:

- Source 可以根据每个 Source 覆盖 `output_buffer_size`- 如果 Sink/Transform 落后于广播缓冲区,它可能会丢弃消息并记录 `Lagged`
## 死信队列 (Dead Letter Queue)

`source::system::dlq` Source 已实现,可以连接到任何 Sink。链深度保护 (Chain-depth protection) 可防止通过 System Sources 路由消息时出现无限循环(最大深度: 8)。

**当前状态:**

- `source::system::dlq` Source: 已实现
- 链深度保护: 已实现
- Transform/Sink 错误时的自动 DLQ 路由: 已实现

完整设计请参阅 `docs/DESIGN_CN.md`。

## CLI 命令

```bash
# 运行管道
pipeflow run config.yaml

# 校验配置
pipeflow config validate config.yaml

# 显示管道图 (ASCII)
pipeflow config graph config.yaml

# 显示合并 + 标准化后的配置
pipeflow config show config.yaml --format yaml
```

注意:

- `pipeflow config validate` 检查 YAML 结构和管道连线(ID、引用、环路、系统路由)。它 **不会** 校验节点特定的
  `config` 内容(例如必须的 `http_client.url`);这些在 `Engine::build()`(因此也在 `pipeflow run`)期间通过
  serde 校验。
- 如果你使用基于目录的配置,`config show` 会显示合并 + 标准化后的结果。

## 分布式与高可用

**Pipeflow 设计为独立运行 (Stand-alone)。**

为了保持架构简单且健壮 (KISS 原则),Pipeflow 不实现复杂的分布式协调协议(如 Raft 或 Paxos)。

- **持久化**: 状态(如静默记录)存储在本地文件系统(默认为 `./data`)。我们移除了像 Redis 这样的复杂分布式后端用于静默,以支持简单性和文件系统原子性。
- **扩展**: 我们推荐 **手动分片 (Manual Sharding)**。部署多个独立实例,每个实例处理不同的配置文件子集。
- **高可用**: 使用详细的健康检查(例如 K8s 存活探针)来重启失败的实例。如果你需要跨实例共享状态(例如共享静默),请将共享卷 (NFS/EFS) 挂载到 `data_dir`
## 文档

详细设计文档请参阅 [docs/DESIGN_CN.md](docs/DESIGN_CN.md)。

## 测试

```bash
# 单元 + 集成测试
cargo test --all-features

# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings

# 格式检查
cargo fmt --all -- --check
```

## 许可证

MIT