1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# 配置参考
[English](CONFIGURATION.md)
本文档提供了 Pipeflow 节点的详细配置参数。
## 变量语法
Pipeflow 支持多种变量语法,在不同阶段进行处理:
| 语法类型 | 格式 | 处理阶段 | 描述 |
| :------------- | :------------ | :--------- | :----------------------------------------- |
| **环境变量** | `${VAR}` | 加载配置时 | 在任何配置字段中替换为操作系统环境变量值。 |
| **值环境变量** | `$ENV:VAR` | 编译时 | 值表达式(变换/汇)中的环境变量。 |
| **消息元数据** | `$META.field` | 运行时 | 在值表达式中访问消息元数据字段。 |
| **内置变量** | `$VAR` | 运行时 | 引擎注入的动态值(例如 `$UUID`, `$NOW`)。 |
| **JSONPath** | `$.field` | 运行时 | 从消息负载中提取数据。 |
### 1. 环境变量 (`${...}`)
用于任何配置字段中的凭据和配置值。
- **语法**: `${VAR_NAME}` 或 `${VAR_NAME:-default}`
- **作用域**: 任何配置值(URL、凭据、路径等)
- **处理时机**: YAML 解析之前
- **示例**: `url: "${API_URL}"`
### 2. 值环境变量 (`$ENV:...`)
用于值表达式(变换重映射的 `mappings`,汇的 `columns`)中访问环境变量。
- **语法**: `$ENV:VAR_NAME` 或 `$ENV:VAR_NAME:-default`
- **作用域**: 仅值表达式(映射/列中的 `from` 字段)
- **处理时机**: 管道编译时(启动时解析一次)
- **示例**:
- `from: "$ENV:DEPLOYMENT_ENV"` → 使用环境变量值
- `from: "$ENV:LOG_LEVEL:-info"` → 如果未设置则使用默认值
- `from: "Env: {{ $ENV:APP_NAME:-unknown }}"` → 在模板中使用
### 3. 消息元数据 (`$META...`)
用于值表达式中访问消息元数据字段。
- **语法**: `$META.field_name` 或 `$META.tags.key`
- **作用域**: 仅值表达式
- **可用字段**:
- `$META.id` → 消息 UUID (字符串)
- `$META.timestamp` → Unix 时间戳 (毫秒, 数字)
- `$META.source_node` → 源节点 ID (字符串)
- `$META.correlation_id` → 关联 UUID(如果设置)(字符串或 null)
- `$META.chain_depth` → 处理链深度 (数字)
- `$META.tags.{key}` → 自定义标签值 (字符串或 null)
- **示例**:
- `from: "$META.source_node"` → 获取源节点 ID
- `from: "Source: {{ $META.source_node }}"` → 在模板中使用
### 4. 内置变量 (`$...`)
用于变换和汇中访问系统生成的值。
- **语法**: `$VAR_NAME` (无花括号)
- **示例**:
- `$UUID`: 唯一消息 ID (UUIDv7)
- `$NOW`: 当前时间戳 (ISO8601, 秒级精度)
- `$DATE`: 当前日期 (`YYYY-MM-DD`)
- `$TIMESTAMP`: Unix 时间戳 (毫秒)
- `$SOURCE_ID`: 源标识符
- `$MSG_ID`: 消息标识符
### 5. JSONPath (`$. ...`)
用于从消息负载中提取数据。
- **语法**: `$.path.to.field` 或 `$['path']['to']['field']`
- **示例**: `$.user.id`, `$.items[0].name`
## 变换配置 (Transform Configuration)
### Compute Step (计算步骤)
| 参数 | 默认值 | 描述 |
| ------------ | ------ | ------------------------------------------------------------ |
| `expression` | — | 数学表达式 (支持 `+`, `-`, `*`, `/`, `( )`, 数值和 JSONPath) |
| `output` | — | 写入结果的 JSONPath |
| `precision` | — | 可选的保留小数位数 |
### Hash Step (哈希步骤)
| 参数 | 默认值 | 描述 |
| ---------- | ------ | -------- |
| `mappings` | — | 映射列表 |
`mappings` 项字段:
| 字段 | 默认值 | 描述 |
| ------- | ------ | --------------------------------- |
| `algo` | — | 哈希算法: `md5`, `sha1`, `sha256` |
| `from` | — | 源字段 JSONPath 或模板 |
| `value` | — | 静态值 (作为 `from` 的替代) |
| `to` | — | 目标字段 JSONPath |
## 汇配置 (Sink Configuration)
### Console Sink (控制台汇)
| 参数 | 默认值 | 描述 |
| -------- | -------- | -------------------------------------------------------------- |
| `format` | `pretty` | 输出格式: `pretty` (缩进 JSON), `json` (紧凑), `text` (仅负载) |
### File Sink (文件汇)
| 参数 | 默认值 | 描述 |
| ---------------- | ------- | ------------------------------- |
| `path` | — | 输出文件路径 (必须) |
| `format` | `jsonl` | 输出格式: `jsonl`, `tsv`, `csv` |
| `append` | `true` | 追加到现有文件;`false` 则覆盖 |
| `include_header` | `false` | TSV/CSV 格式是否包含标题行 |
### Redis Sink (Redis 汇)
| 参数 | 默认值 | 描述 |
| ------- | ------ | ------------------------------------- |
| `url` | — | Redis 连接 URL |
| `key` | — | 键映射 (`from` 或 `value`) |
| `value` | — | 值映射 (`from` 或 `value`) |
| `ttl` | — | `SETEX` 的可选 TTL (例如 `30s`, `5m`) |
### Notify Sink (通知汇)
Notify 汇支持 `email`, `webhook`, 和 `telegram` 提供商。模板支持静态文本、JSONPath (如 `$.message`) 和 `{{ }}` 插值。
#### Email Provider
| 参数 | 默认值 | 描述 |
| --------------- | ---------- | -------------------------------------------------------- |
| `provider` | — | `email` |
| `smtp.server` | — | SMTP 服务器主机名 |
| `smtp.port` | provider | SMTP 端口 (可选) |
| `smtp.username` | — | SMTP 认证用户名 (可选) |
| `smtp.password` | — | SMTP 认证密码 (可选) |
| `smtp.security` | `starttls` | `starttls` 或 `none` |
| `from` | — | 发件人地址 |
| `to` | — | 收件人列表 |
| `subject` | default | 可选主题模板 |
| `message` | default | 可选消息模板 |
| `min_severity` | — | 发送的最低严重级别 (`info`/`warning`/`error`/`critical`) |
| `active_window` | — | 可选活动窗口配置对象 |
| `silence` | — | 可选静默配置对象 |
#### Webhook Provider
| 参数 | 默认值 | 描述 |
| --------------- | ------- | -------------------------------------------------------- |
| `provider` | — | `webhook` |
| `url` | — | 目标 URL |
| `method` | `POST` | `POST`, `PUT`, `PATCH` |
| `headers` | `{}` | 可选请求头 |
| `timeout` | `30s` | 请求超时 |
| `body` | `full` | `payload` 或 `full` |
| `message` | default | 可选消息模板 (由 `full` 使用) |
| `min_severity` | — | 发送的最低严重级别 (`info`/`warning`/`error`/`critical`) |
| `active_window` | — | 可选活动窗口配置对象 |
| `silence` | — | 可选静默配置对象 |
#### Telegram Provider
| 参数 | 默认值 | 描述 |
| -------------------------- | -------------------------- | -------------------------------------------------------- |
| `provider` | — | `telegram` |
| `bot_token` | — | Bot Token |
| `chat_id` | — | Chat ID |
| `api_base_url` | `https://api.telegram.org` | API 基础 URL (可选) |
| `parse_mode` | — | `MarkdownV2` 或 `HTML` |
| `disable_web_page_preview` | — | 禁用链接预览 |
| `timeout` | `30s` | 请求超时 |
| `message` | default | 可选消息模板 |
| `min_severity` | — | 发送的最低严重级别 (`info`/`warning`/`error`/`critical`) |
| `active_window` | — | 可选活动窗口配置对象 |
| `silence` | — | 可选静默配置对象 |
`silence` 对象字段:
| 字段 | 默认值 | 描述 |
| --------- | -------- | ------------------------------------------------ |
| `window` | — | 静默窗口 (例如 `2h`); 如果系统默认值提供了则可选 |
| `backend` | `memory` | `memory` 或 `redis` (如果设置则回退到系统默认) |
| `key` | — | 可选静默键模板 |
| `redis` | — | 当后端为 `redis` 时的 Redis 配置 |
`silence.redis` 字段:
| 字段 | 默认值 | 描述 |
| ------------ | -------------------------- | -------------- |
| `url` | — | Redis 连接 URL |
| `key_prefix` | `pipeflow:notify:silence:` | Redis 键前缀 |
`active_window` 对象字段:
| 字段 | 默认值 | 描述 |
| ----------------- | ------ | ------------------------------------------------------------ |
| `start` | — | 窗口开始时间 (`HH:MM`) |
| `end` | — | 窗口结束时间 (`HH:MM`) |
| `timezone` | local | IANA 时区 (例如 `Asia/Shanghai`) |
| `days` | all | 活动天数 (`mon`..`sun`) |
| `bypass_severity` | — | 绕过窗口的严重级别阈值 (`info`/`warning`/`error`/`critical`) |
当设置了 `active_window` 时,窗口外的通知将延迟到下一个窗口开始时发送。如果设置了 `bypass_severity` 且通知严重级别达到或超过该值,即使在窗口外也会立即发送警报。
`start` 和 `end` 不能相等。
## 系统配置 (System Configuration)
系统级通知默认值可以在 `system.notify.silence` 和 `system.notify.active_window` 下配置。设置后,notify sinks 可以覆盖特定字段或省略这些块以继承系统默认值。
```yaml
system:
notify:
silence:
window: 2h
backend: redis
key: "{{ $.alert_key }}"
redis:
url: "redis://127.0.0.1:6379/0"
key_prefix: "pipeflow:notify:silence:"
active_window:
start: "08:00"
end: "22:00"
timezone: "Asia/Shanghai"
days: ["mon", "tue", "wed", "thu", "fri"]
bypass_severity: error
```
Sink 级覆盖示例:
```yaml
pipeline:
sinks:
- id: notify_webhook
type: notify
config:
provider: webhook
url: "https://example.com/webhook"
silence:
key: "{{ $.name }}|{{ $.labels.host }}"
```
**示例**:
```yaml
pipeline:
transforms:
- id: notify_passthrough
inputs: [source::system::notify]
outputs: [notify_webhook]
steps: []
sinks:
- id: notify_webhook
type: notify
config:
provider: webhook
url: "https://example.com/webhook"
body: full
message: "Alert: {{ $.message }}"
```
### HTTP Client Sink (HTTP 客户端汇)
| 参数 | 默认值 | 描述 |
| --------- | ------ | -------------------------------- |
| `url` | — | 目标 URL |
| `method` | `POST` | `POST`, `PUT`, `PATCH` |
| `headers` | `{}` | 可选请求头 |
| `fields` | — | 请求体的可选字段映射 |
| `timeout` | `30s` | 请求超时 |
| `auth` | — | 可选认证配置 (见 HTTP Auth 部分) |
### SQL Sink (SQL 汇)
| 参数 | 默认值 | 描述 |
| ------------ | -------- | ------------------------------------- |
| `driver` | `sqlite` | `sqlite` 或 `postgres` |
| `connection` | — | 连接字符串或 SQLite 路径 |
| `table` | — | 目标表名 |
| `columns` | — | 列映射 (`from` 或 `value`) |
| `upsert` | — | 可选 UPSERT 配置 (`conflict_columns`) |
`columns` 支持:
| 字段 | 默认值 | 描述 |
| ------------- | ------- | --------------------------------------------------------------- |
| `name` | — | 列名 |
| `from` | — | 类似 JSONPath 的源 |
| `value` | — | 静态值 (`$NOW`, `$UUID`, `$TIMESTAMP`, `$SOURCE_ID`, `$MSG_ID`) |
| `insert_only` | `false` | 仅插入列 (UPSERT 更新时排除) |
| `type` | — | 可选 SQL 类型提示 (主要用于 Postgres) |
### Blackhole Sink (黑洞汇)
| 参数 | 默认值 | 描述 |
| ---- | ------ | ------------ |
| — | — | 丢弃所有消息 |
## 源配置 (Source Configuration)
### File Source (文件源)
| 参数 | 默认值 | 描述 |
| ---------- | ------ | -------------------------------------------- |
| `path` | — | 读取文件的路径 |
| `mode` | `tail` | `tail` (追加/跟随) 或 `oneshot` (一次性读取) |
| `interval` | `1s` | `tail` 模式的轮询间隔 |
### Redis Source (Redis 源)
| 参数 | 默认值 | 描述 |
| ------------ | ------ | ---------------------------------------------------------------------- |
| `url` | — | Redis 连接 URL |
| `key` | — | 获取的键 |
| `mode` | `poll` | `poll` 或 `oneshot` |
| `interval` | `5s` | 轮询间隔 (仅用于 `poll` 模式) |
| `schedule` | — | Cron 表达式 (5 或 6 个字段; 5 字段默认秒为 `0`); 设置时忽略 `interval` |
| `parse_json` | `true` | 尽可能解析 JSON 值 |
### HTTP Client Source (HTTP 客户端源)
| 参数 | 默认值 | 描述 |
| ------------------ | ------ | ----------------------------------------------------------------------------------------------------------------- |
| `url` | — | 请求的目标 URL |
| `interval` | `60s` | 未设置 `schedule` 时的轮询间隔 |
| `schedule` | — | Cron 表达式 (5 或 6 个字段; 5 字段默认秒为 `0`, 例如 `0 0 * * *` 表示每天 00:00, 本地时间); 设置时忽略 `interval` |
| `method` | `GET` | HTTP 方法 (`GET`, `POST`, `PUT`, `DELETE`) |
| `headers` | `{}` | 可选请求头 |
| `error_body_limit` | `2048` | 包含在错误中的最大错误响应体长度 |
| `auth` | — | 可选认证配置 (见 HTTP Auth 部分) |
| `expect_status` | — | 可选的接受状态码列表 |
| `expect_body` | — | 可选的响应体验证规则 |
`expect_body` 支持:
| 字段 | 默认值 | 描述 |
| -------------- | ------ | ------------------------------ |
| `path` | — | JSONPath 提取值 |
| `eq` | — | 期望值 (需要 `path`) |
| `ne` | — | 不等于值 (需要 `path`) |
| `contains` | — | 响应体必须包含此关键字 |
| `not_contains` | — | 响应体必须 **不** 包含此关键字 |
### HTTP Server Source (HTTP 服务端源)
| 参数 | 默认值 | 描述 |
| ------ | ------ | -------------------------------- |
| `bind` | — | 绑定地址 |
| `path` | `/` | 请求路径 |
| `auth` | — | 可选认证配置 (见 HTTP Auth 部分) |
### SQL Source (SQL 源)
| 参数 | 默认值 | 描述 |
| ------------ | -------- | --------------------------------------------------------------------------------- |
| `driver` | `sqlite` | `sqlite` 或 `postgres` |
| `connection` | — | 连接字符串或 SQLite 路径 |
| `query` | — | 执行的查询 |
| `interval` | `60s` | 未设置 `schedule` 时的轮询间隔 |
| `schedule` | — | Cron 表达式 (5 或 6 个字段; 5 字段默认秒为 `0`); 设置时忽略 `interval` (本地时间) |
## HTTP Auth (HTTP 认证)
`http_client` 源/汇和 `http_server` 源共享 `auth` 块。
| 类型 | 参数 | 描述 |
| --------- | ---------------------- | --------------- |
| `basic` | `username`, `password` | HTTP Basic 认证 |
| `bearer` | `token` | Bearer Token 头 |
| `api_key` | `header`, `key` | 自定义头 键/值 |
| `header` | `name`, `value` | 通用头 键/值 |