pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# 配置参考

[English](CONFIGURATION.md)

本文档提供了 Pipeflow 节点的详细配置参数。

## 变量语法

Pipeflow 支持多种变量语法,在不同阶段进行处理:

| 语法类型       | 格式          | 处理阶段   | 描述                                       |
| :------------- | :------------ | :--------- | :----------------------------------------- |
| **环境变量**   | `${VAR}`      | 加载配置时 | 在任何配置字段中替换为操作系统环境变量值。 |
| **值环境变量** | `$ENV:VAR`    | 编译时     | 值表达式(变换/汇)中的环境变量。          |
| **消息元数据** | `$META.field` | 运行时     | 在值表达式中访问消息元数据字段。           |
| **内置变量**   | `$VAR`        | 运行时     | 引擎注入的动态值(例如 `$UUID`, `$NOW`)。 |
| **JSONPath**   | `$.field`     | 运行时     | 从消息负载中提取数据。                     |

### 1. 环境变量 (`${...}`)

用于任何配置字段中的凭据和配置值。

- **语法**: `${VAR_NAME}``${VAR_NAME:-default}`
- **作用域**: 任何配置值(URL、凭据、路径等)
- **处理时机**: YAML 解析之前
- **示例**: `url: "${API_URL}"`

### 2. 值环境变量 (`$ENV:...`)

用于值表达式(变换重映射的 `mappings`,汇的 `columns`)中访问环境变量。

- **语法**: `$ENV:VAR_NAME``$ENV:VAR_NAME:-default`
- **作用域**: 仅值表达式(映射/列中的 `from` 字段)
- **处理时机**: 管道编译时(启动时解析一次)
- **示例**:
  - `from: "$ENV:DEPLOYMENT_ENV"` → 使用环境变量值
  - `from: "$ENV:LOG_LEVEL:-info"` → 如果未设置则使用默认值
  - `from: "Env: {{ $ENV:APP_NAME:-unknown }}"` → 在模板中使用

### 3. 消息元数据 (`$META...`)

用于值表达式中访问消息元数据字段。

- **语法**: `$META.field_name``$META.tags.key`
- **作用域**: 仅值表达式
- **可用字段**:
  - `$META.id` → 消息 UUID (字符串)
  - `$META.timestamp` → Unix 时间戳 (毫秒, 数字)
  - `$META.source_node` → 源节点 ID (字符串)
  - `$META.correlation_id` → 关联 UUID(如果设置)(字符串或 null)
  - `$META.chain_depth` → 处理链深度 (数字)
  - `$META.tags.{key}` → 自定义标签值 (字符串或 null)
- **示例**:
  - `from: "$META.source_node"` → 获取源节点 ID
  - `from: "Source: {{ $META.source_node }}"` → 在模板中使用

### 4. 内置变量 (`$...`)

用于变换和汇中访问系统生成的值。

- **语法**: `$VAR_NAME` (无花括号)
- **示例**:
  - `$UUID`: 唯一消息 ID (UUIDv7)
  - `$NOW`: 当前时间戳 (ISO8601, 秒级精度)
  - `$DATE`: 当前日期 (`YYYY-MM-DD`)
  - `$TIMESTAMP`: Unix 时间戳 (毫秒)
  - `$SOURCE_ID`: 源标识符
  - `$MSG_ID`: 消息标识符

### 5. JSONPath (`$. ...`)

用于从消息负载中提取数据。

- **语法**: `$.path.to.field``$['path']['to']['field']`
- **示例**: `$.user.id`, `$.items[0].name`

## 变换配置 (Transform Configuration)

### Compute Step (计算步骤)

| 参数         | 默认值 | 描述                                                         |
| ------------ | ------ | ------------------------------------------------------------ |
| `expression` || 数学表达式 (支持 `+`, `-`, `*`, `/`, `( )`, 数值和 JSONPath) |
| `output`     || 写入结果的 JSONPath                                          |
| `precision`  || 可选的保留小数位数                                           |

### Hash Step (哈希步骤)

| 参数       | 默认值 | 描述     |
| ---------- | ------ | -------- |
| `mappings` || 映射列表 |

`mappings` 项字段:

| 字段    | 默认值 | 描述                              |
| ------- | ------ | --------------------------------- |
| `algo`  || 哈希算法: `md5`, `sha1`, `sha256` |
| `from`  || 源字段 JSONPath 或模板            |
| `value` || 静态值 (作为 `from` 的替代)       |
| `to`    || 目标字段 JSONPath                 |

## 汇配置 (Sink Configuration)

### Console Sink (控制台汇)

| 参数     | 默认值   | 描述                                                           |
| -------- | -------- | -------------------------------------------------------------- |
| `format` | `pretty` | 输出格式: `pretty` (缩进 JSON), `json` (紧凑), `text` (仅负载) |

### File Sink (文件汇)

| 参数             | 默认值  | 描述                            |
| ---------------- | ------- | ------------------------------- |
| `path`           || 输出文件路径 (必须)             |
| `format`         | `jsonl` | 输出格式: `jsonl`, `tsv`, `csv` |
| `append`         | `true`  | 追加到现有文件;`false` 则覆盖  |
| `include_header` | `false` | TSV/CSV 格式是否包含标题行      |

### Redis Sink (Redis 汇)

| 参数    | 默认值 | 描述                                  |
| ------- | ------ | ------------------------------------- |
| `url`   || Redis 连接 URL                        |
| `key`   || 键映射 (`from``value`)            |
| `value` || 值映射 (`from``value`)            |
| `ttl`   || `SETEX` 的可选 TTL (例如 `30s`, `5m`) |

### Notify Sink (通知汇)

Notify 汇支持 `email`, `webhook`, 和 `telegram` 提供商。模板支持静态文本、JSONPath (如 `$.message`) 和 `{{ }}` 插值。

#### Email Provider

| 参数            | 默认值     | 描述                                                     |
| --------------- | ---------- | -------------------------------------------------------- |
| `provider`      || `email`                                                  |
| `smtp.server`   || SMTP 服务器主机名                                        |
| `smtp.port`     | provider   | SMTP 端口 (可选)                                         |
| `smtp.username` || SMTP 认证用户名 (可选)                                   |
| `smtp.password` || SMTP 认证密码 (可选)                                     |
| `smtp.security` | `starttls` | `starttls``none`                                     |
| `from`          || 发件人地址                                               |
| `to`            || 收件人列表                                               |
| `subject`       | default    | 可选主题模板                                             |
| `message`       | default    | 可选消息模板                                             |
| `min_severity`  || 发送的最低严重级别 (`info`/`warning`/`error`/`critical`) |
| `active_window` || 可选活动窗口配置对象                                     |
| `silence`       || 可选静默配置对象                                         |

#### Webhook Provider

| 参数            | 默认值  | 描述                                                     |
| --------------- | ------- | -------------------------------------------------------- |
| `provider`      || `webhook`                                                |
| `url`           || 目标 URL                                                 |
| `method`        | `POST`  | `POST`, `PUT`, `PATCH`                                   |
| `headers`       | `{}`    | 可选请求头                                               |
| `timeout`       | `30s`   | 请求超时                                                 |
| `body`          | `full`  | `payload``full`                                      |
| `message`       | default | 可选消息模板 (由 `full` 使用)                            |
| `min_severity`  || 发送的最低严重级别 (`info`/`warning`/`error`/`critical`) |
| `active_window` || 可选活动窗口配置对象                                     |
| `silence`       || 可选静默配置对象                                         |

#### Telegram Provider

| 参数                       | 默认值                     | 描述                                                     |
| -------------------------- | -------------------------- | -------------------------------------------------------- |
| `provider`                 || `telegram`                                               |
| `bot_token`                || Bot Token                                                |
| `chat_id`                  || Chat ID                                                  |
| `api_base_url`             | `https://api.telegram.org` | API 基础 URL (可选)                                      |
| `parse_mode`               || `MarkdownV2``HTML`                                   |
| `disable_web_page_preview` || 禁用链接预览                                             |
| `timeout`                  | `30s`                      | 请求超时                                                 |
| `message`                  | default                    | 可选消息模板                                             |
| `min_severity`             || 发送的最低严重级别 (`info`/`warning`/`error`/`critical`) |
| `active_window`            || 可选活动窗口配置对象                                     |
| `silence`                  || 可选静默配置对象                                         |

`silence` 对象字段:

| 字段      | 默认值   | 描述                                             |
| --------- | -------- | ------------------------------------------------ |
| `window`  || 静默窗口 (例如 `2h`); 如果系统默认值提供了则可选 |
| `backend` | `memory` | `memory``redis` (如果设置则回退到系统默认)   |
| `key`     || 可选静默键模板                                   |
| `redis`   || 当后端为 `redis` 时的 Redis 配置                 |

`silence.redis` 字段:

| 字段         | 默认值                     | 描述           |
| ------------ | -------------------------- | -------------- |
| `url`        || Redis 连接 URL |
| `key_prefix` | `pipeflow:notify:silence:` | Redis 键前缀   |

`active_window` 对象字段:

| 字段              | 默认值 | 描述                                                         |
| ----------------- | ------ | ------------------------------------------------------------ |
| `start`           || 窗口开始时间 (`HH:MM`)                                       |
| `end`             || 窗口结束时间 (`HH:MM`)                                       |
| `timezone`        | local  | IANA 时区 (例如 `Asia/Shanghai`)                             |
| `days`            | all    | 活动天数 (`mon`..`sun`)                                      |
| `bypass_severity` || 绕过窗口的严重级别阈值 (`info`/`warning`/`error`/`critical`) |

当设置了 `active_window` 时,窗口外的通知将延迟到下一个窗口开始时发送。如果设置了 `bypass_severity` 且通知严重级别达到或超过该值,即使在窗口外也会立即发送警报。
`start` 和 `end` 不能相等。

## 系统配置 (System Configuration)

系统级通知默认值可以在 `system.notify.silence` 和 `system.notify.active_window` 下配置。设置后,notify sinks 可以覆盖特定字段或省略这些块以继承系统默认值。

```yaml
system:
  notify:
    silence:
      window: 2h
      backend: redis
      key: "{{ $.alert_key }}"
      redis:
        url: "redis://127.0.0.1:6379/0"
        key_prefix: "pipeflow:notify:silence:"
    active_window:
      start: "08:00"
      end: "22:00"
      timezone: "Asia/Shanghai"
      days: ["mon", "tue", "wed", "thu", "fri"]
      bypass_severity: error
```

Sink 级覆盖示例:

```yaml
pipeline:
  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "https://example.com/webhook"
        silence:
          key: "{{ $.name }}|{{ $.labels.host }}"
```

**示例**:

```yaml
pipeline:
  transforms:
    - id: notify_passthrough
      inputs: [source::system::notify]
      outputs: [notify_webhook]
      steps: []

  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "https://example.com/webhook"
        body: full
        message: "Alert: {{ $.message }}"
```

### HTTP Client Sink (HTTP 客户端汇)

| 参数      | 默认值 | 描述                             |
| --------- | ------ | -------------------------------- |
| `url`     || 目标 URL                         |
| `method`  | `POST` | `POST`, `PUT`, `PATCH`           |
| `headers` | `{}`   | 可选请求头                       |
| `fields`  || 请求体的可选字段映射             |
| `timeout` | `30s`  | 请求超时                         |
| `auth`    || 可选认证配置 (见 HTTP Auth 部分) |

### SQL Sink (SQL 汇)

| 参数         | 默认值   | 描述                                  |
| ------------ | -------- | ------------------------------------- |
| `driver`     | `sqlite` | `sqlite``postgres`                |
| `connection` || 连接字符串或 SQLite 路径              |
| `table`      || 目标表名                              |
| `columns`    || 列映射 (`from``value`)            |
| `upsert`     || 可选 UPSERT 配置 (`conflict_columns`) |

`columns` 支持:

| 字段          | 默认值  | 描述                                                            |
| ------------- | ------- | --------------------------------------------------------------- |
| `name`        || 列名                                                            |
| `from`        || 类似 JSONPath 的源                                              |
| `value`       || 静态值 (`$NOW`, `$UUID`, `$TIMESTAMP`, `$SOURCE_ID`, `$MSG_ID`) |
| `insert_only` | `false` | 仅插入列 (UPSERT 更新时排除)                                    |
| `type`        || 可选 SQL 类型提示 (主要用于 Postgres)                           |

### Blackhole Sink (黑洞汇)

| 参数 | 默认值 | 描述         |
| ---- | ------ | ------------ |
||| 丢弃所有消息 |

## 源配置 (Source Configuration)

### File Source (文件源)

| 参数       | 默认值 | 描述                                         |
| ---------- | ------ | -------------------------------------------- |
| `path`     || 读取文件的路径                               |
| `mode`     | `tail` | `tail` (追加/跟随) 或 `oneshot` (一次性读取) |
| `interval` | `1s`   | `tail` 模式的轮询间隔                        |

### Redis Source (Redis 源)

| 参数         | 默认值 | 描述                                                                   |
| ------------ | ------ | ---------------------------------------------------------------------- |
| `url`        || Redis 连接 URL                                                         |
| `key`        || 获取的键                                                               |
| `mode`       | `poll` | `poll``oneshot`                                                    |
| `interval`   | `5s`   | 轮询间隔 (仅用于 `poll` 模式)                                          |
| `schedule`   || Cron 表达式 (5 或 6 个字段; 5 字段默认秒为 `0`); 设置时忽略 `interval` |
| `parse_json` | `true` | 尽可能解析 JSON 值                                                     |

### HTTP Client Source (HTTP 客户端源)

| 参数               | 默认值 | 描述                                                                                                              |
| ------------------ | ------ | ----------------------------------------------------------------------------------------------------------------- |
| `url`              || 请求的目标 URL                                                                                                    |
| `interval`         | `60s`  | 未设置 `schedule` 时的轮询间隔                                                                                    |
| `schedule`         || Cron 表达式 (5 或 6 个字段; 5 字段默认秒为 `0`, 例如 `0 0 * * *` 表示每天 00:00, 本地时间); 设置时忽略 `interval` |
| `method`           | `GET`  | HTTP 方法 (`GET`, `POST`, `PUT`, `DELETE`)                                                                        |
| `headers`          | `{}`   | 可选请求头                                                                                                        |
| `error_body_limit` | `2048` | 包含在错误中的最大错误响应体长度                                                                                  |
| `auth`             || 可选认证配置 (见 HTTP Auth 部分)                                                                                  |
| `expect_status`    || 可选的接受状态码列表                                                                                              |
| `expect_body`      || 可选的响应体验证规则                                                                                              |

`expect_body` 支持:

| 字段           | 默认值 | 描述                           |
| -------------- | ------ | ------------------------------ |
| `path`         || JSONPath 提取值                |
| `eq`           || 期望值 (需要 `path`)           |
| `ne`           || 不等于值 (需要 `path`)         |
| `contains`     || 响应体必须包含此关键字         |
| `not_contains` || 响应体必须 **** 包含此关键字 |

### HTTP Server Source (HTTP 服务端源)

| 参数   | 默认值 | 描述                             |
| ------ | ------ | -------------------------------- |
| `bind` || 绑定地址                         |
| `path` | `/`    | 请求路径                         |
| `auth` || 可选认证配置 (见 HTTP Auth 部分) |

### SQL Source (SQL 源)

| 参数         | 默认值   | 描述                                                                              |
| ------------ | -------- | --------------------------------------------------------------------------------- |
| `driver`     | `sqlite` | `sqlite``postgres`                                                            |
| `connection` || 连接字符串或 SQLite 路径                                                          |
| `query`      || 执行的查询                                                                        |
| `interval`   | `60s`    | 未设置 `schedule` 时的轮询间隔                                                    |
| `schedule`   || Cron 表达式 (5 或 6 个字段; 5 字段默认秒为 `0`); 设置时忽略 `interval` (本地时间) |

## HTTP Auth (HTTP 认证)

`http_client` 源/汇和 `http_server` 源共享 `auth` 块。

| 类型      | 参数                   | 描述            |
| --------- | ---------------------- | --------------- |
| `basic`   | `username`, `password` | HTTP Basic 认证 |
| `bearer`  | `token`                | Bearer Token 头 |
| `api_key` | `header`, `key`        | 自定义头 键/值  |
| `header`  | `name`, `value`        | 通用头 键/值    |