pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# Configuration Reference

[中文](CONFIGURATION_CN.md)

This document provides detailed configuration parameters for pipeflow nodes.

## Variable Syntax

Pipeflow supports multiple variable syntaxes, processed at different stages:

| Syntax Type               | Format        | Processed At | Description                                                       |
| :------------------------ | :------------ | :----------- | :---------------------------------------------------------------- |
| **Environment Variables** | `${VAR}`      | Config Load  | Replaced with OS environment variable values in any config field. |
| **Value Env Variables**   | `$ENV:VAR`    | Compile Time | Environment variables in value expressions (transforms/sinks).    |
| **Message Metadata**      | `$META.field` | Runtime      | Access message metadata fields in value expressions.              |
| **Built-in Variables**    | `$VAR`        | Runtime      | Dynamic values injected by the engine (e.g. `$UUID`, `$NOW`).     |
| **JSONPath**              | `$.field`     | Runtime      | Extracts data from the message payload.                           |

### 1. Environment Variables (`${...}`)

Used for credentials and configuration values in any config field.

- **Syntax**: `${VAR_NAME}` or `${VAR_NAME:-default}`
- **Scope**: Any configuration value (URLs, credentials, paths, etc.)
- **Processed**: Before YAML parsing
- **Example**: `url: "${API_URL}"`

### 2. Value Environment Variables (`$ENV:...`)

Used in value expressions (transform remap mappings, sink column values) to access environment variables.

- **Syntax**: `$ENV:VAR_NAME` or `$ENV:VAR_NAME:-default`
- **Scope**: Value expressions only (`from` field in mappings/columns)
- **Processed**: When pipeline is compiled (value is resolved once at startup)
- **Examples**:
  - `from: "$ENV:DEPLOYMENT_ENV"` → Use env var value
  - `from: "$ENV:LOG_LEVEL:-info"` → Use default if not set
  - `from: "Env: {{ $ENV:APP_NAME:-unknown }}"` → In templates

### 3. Message Metadata (`$META...`)

Used in value expressions to access message metadata fields.

- **Syntax**: `$META.field_name` or `$META.tags.key`
- **Scope**: Value expressions only
- **Available fields**:
  - `$META.id` → Message UUID (string)
  - `$META.timestamp` → Unix timestamp in milliseconds (number)
  - `$META.source_node` → Source node ID (string)
  - `$META.correlation_id` → Correlation UUID if set (string or null)
  - `$META.chain_depth` → Processing chain depth (number)
  - `$META.tags.{key}` → Custom tag value (string or null)
- **Examples**:
  - `from: "$META.source_node"` → Get source node ID
  - `from: "Source: {{ $META.source_node }}"` → In templates

### 4. Built-in Variables (`$...`)

Used in transforms and sinks to access system-generated values.

- **Syntax**: `$VAR_NAME` (no braces)
- **Examples**:
  - `$UUID`: Unique message ID (UUIDv7)
  - `$NOW`: Current timestamp (ISO8601, second precision)
  - `$DATE`: Current date (`YYYY-MM-DD`)
  - `$TIMESTAMP`: Unix timestamp (milliseconds)
  - `$SOURCE_ID`: Source identifier
  - `$MSG_ID`: Message identifier

### 5. JSONPath (`$. ...`)

Used to extract data from the message payload.

- **Syntax**: `$.path.to.field` or `$['path']['to']['field']`
- **Examples**: `$.user.id`, `$.items[0].name`

## Transform Configuration

### Compute Step

| Parameter    | Default | Description                                                                |
| ------------ | ------- | -------------------------------------------------------------------------- |
| `expression` || Math expression (supports `+`, `-`, `*`, `/`, `( )`, values, and JSONPath) |
| `output`     || JSONPath to write result to                                                |
| `precision`  || Optional decimal places to round to                                        |

### Hash Step

| Parameter  | Default | Description      |
| ---------- | ------- | ---------------- |
| `mappings` || List of mappings |

`mappings` item fields:

| Field   | Default | Description                             |
| ------- | ------- | --------------------------------------- |
| `algo`  || Hash algorithm: `md5`, `sha1`, `sha256` |
| `from`  || Source field JSONPath or template       |
| `value` || Static value (alternative to `from`)    |
| `to`    || Target field JSONPath                   |

## Sink Configuration

### Console Sink

| Parameter | Default  | Description                                                                      |
| --------- | -------- | -------------------------------------------------------------------------------- |
| `format`  | `pretty` | Output format: `pretty` (indented JSON), `json` (compact), `text` (payload only) |

### File Sink

| Parameter        | Default | Description                                   |
| ---------------- | ------- | --------------------------------------------- |
| `path`           || Output file path (required)                   |
| `format`         | `jsonl` | Output format: `jsonl`, `tsv`, `csv`          |
| `append`         | `true`  | Append to existing file; `false` to overwrite |
| `include_header` | `false` | Include header row for TSV/CSV formats        |

### Redis Sink

| Parameter | Default | Description                                 |
| --------- | ------- | ------------------------------------------- |
| `url`     || Redis connection URL                        |
| `key`     || Key mapping (`from` or `value`)             |
| `value`   || Value mapping (`from` or `value`)           |
| `ttl`     || Optional TTL for `SETEX` (e.g. `30s`, `5m`) |

### Notify Sink

The notify sink supports `email`, `webhook`, and `telegram` providers. Templates support
static text, JSONPath (e.g. `$.message`), and `{{ }}` interpolation.

#### Email Provider

| Parameter       | Default    | Description                                                    |
| --------------- | ---------- | -------------------------------------------------------------- |
| `provider`      || `email`                                                        |
| `smtp.server`   || SMTP server hostname                                           |
| `smtp.port`     | provider   | SMTP port (optional)                                           |
| `smtp.username` || SMTP auth username (optional)                                  |
| `smtp.password` || SMTP auth password (optional)                                  |
| `smtp.security` | `starttls` | `starttls` or `none`                                           |
| `from`          || From address                                                   |
| `to`            || Recipient list                                                 |
| `subject`       | default    | Optional subject template                                      |
| `message`       | default    | Optional message template                                      |
| `min_severity`  || Minimum severity to send (`info`/`warning`/`error`/`critical`) |
| `active_window` || Optional active window config object                           |
| `silence`       || Optional silence config object                                 |

#### Webhook Provider

| Parameter       | Default | Description                                                    |
| --------------- | ------- | -------------------------------------------------------------- |
| `provider`      || `webhook`                                                      |
| `url`           || Target URL                                                     |
| `method`        | `POST`  | `POST`, `PUT`, `PATCH`                                         |
| `headers`       | `{}`    | Optional request headers                                       |
| `timeout`       | `30s`   | Request timeout                                                |
| `body`          | `full`  | `payload` or `full`                                            |
| `message`       | default | Optional message template (used by `full`)                     |
| `min_severity`  || Minimum severity to send (`info`/`warning`/`error`/`critical`) |
| `active_window` || Optional active window config object                           |
| `silence`       || Optional silence config object                                 |

#### Telegram Provider

| Parameter                  | Default                    | Description                                                    |
| -------------------------- | -------------------------- | -------------------------------------------------------------- |
| `provider`                 || `telegram`                                                     |
| `bot_token`                || Bot token                                                      |
| `chat_id`                  || Chat ID                                                        |
| `api_base_url`             | `https://api.telegram.org` | API base URL (optional)                                        |
| `parse_mode`               || `MarkdownV2` or `HTML`                                         |
| `disable_web_page_preview` || Disable link previews                                          |
| `timeout`                  | `30s`                      | Request timeout                                                |
| `message`                  | default                    | Optional message template                                      |
| `min_severity`             || Minimum severity to send (`info`/`warning`/`error`/`critical`) |
| `active_window`            || Optional active window config object                           |
| `silence`                  || Optional silence config object                                 |

`silence` object fields:

| Field     | Default  | Description                                                        |
| --------- | -------- | ------------------------------------------------------------------ |
| `window`  || Silence window (e.g. `2h`); optional if system defaults provide it |
| `backend` | `memory` | `memory` or `redis` (falls back to system default if set)          |
| `key`     || Optional silence key template                                      |
| `redis`   || Redis config when backend is `redis`                               |

`silence.redis` fields:

| Field        | Default                    | Description          |
| ------------ | -------------------------- | -------------------- |
| `url`        || Redis connection URL |
| `key_prefix` | `pipeflow:notify:silence:` | Redis key prefix     |

`active_window` object fields:

| Field             | Default | Description                                                               |
| ----------------- | ------- | ------------------------------------------------------------------------- |
| `start`           || Window start time (`HH:MM`)                                               |
| `end`             || Window end time (`HH:MM`)                                                 |
| `timezone`        | local   | IANA timezone (e.g. `Asia/Shanghai`)                                      |
| `days`            | all     | Active days (`mon`..`sun`)                                                |
| `bypass_severity` || Severity threshold to bypass window (`info`/`warning`/`error`/`critical`) |

When `active_window` is set, notifications outside the window are delayed until the next
window start. If `bypass_severity` is set and the notification severity meets or exceeds
it, the alert is sent immediately even outside the window.
`start` and `end` must not be equal.

## System Configuration

System-level notify defaults can be configured under `system.notify.silence` and
`system.notify.active_window`. When set, notify sinks can override specific fields or
omit these blocks to inherit the system defaults.

```yaml
system:
  notify:
    silence:
      window: 2h
      backend: redis
      key: "{{ $.alert_key }}"
      redis:
        url: "redis://127.0.0.1:6379/0"
        key_prefix: "pipeflow:notify:silence:"
    active_window:
      start: "08:00"
      end: "22:00"
      timezone: "Asia/Shanghai"
      days: ["mon", "tue", "wed", "thu", "fri"]
      bypass_severity: error
```

Per-sink override example:

```yaml
pipeline:
  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "https://example.com/webhook"
        silence:
          key: "{{ $.name }}|{{ $.labels.host }}"
```

**Example**:

```yaml
pipeline:
  transforms:
    - id: notify_passthrough
      inputs: [source::system::notify]
      outputs: [notify_webhook]
      steps: []

  sinks:
    - id: notify_webhook
      type: notify
      config:
        provider: webhook
        url: "https://example.com/webhook"
        body: full
        message: "Alert: {{ $.message }}"
```

### HTTP Client Sink

| Parameter | Default | Description                                  |
| --------- | ------- | -------------------------------------------- |
| `url`     || Target URL                                   |
| `method`  | `POST`  | `POST`, `PUT`, `PATCH`                       |
| `headers` | `{}`    | Optional request headers                     |
| `fields`  || Optional field mappings for request body     |
| `timeout` | `30s`   | Request timeout                              |
| `auth`    || Optional auth config (see HTTP Auth section) |

### SQL Sink

| Parameter    | Default  | Description                                 |
| ------------ | -------- | ------------------------------------------- |
| `driver`     | `sqlite` | `sqlite` or `postgres`                      |
| `connection` || Connection string or SQLite path            |
| `table`      || Target table name                           |
| `columns`    || Column mappings (`from` or `value`)         |
| `upsert`     || Optional UPSERT config (`conflict_columns`) |

`columns` supports:

| Field         | Default | Description                                                           |
| ------------- | ------- | --------------------------------------------------------------------- |
| `name`        || Column name                                                           |
| `from`        || JSONPath-like source                                                  |
| `value`       || Static value (`$NOW`, `$UUID`, `$TIMESTAMP`, `$SOURCE_ID`, `$MSG_ID`) |
| `insert_only` | `false` | Insert-only column (excluded from UPSERT updates)                     |
| `type`        || Optional SQL type hint (mainly for Postgres)                          |

### Blackhole Sink

| Parameter | Default | Description           |
| --------- | ------- | --------------------- |
||| Discards all messages |

## Source Configuration

### File Source

| Parameter  | Default | Description                              |
| ---------- | ------- | ---------------------------------------- |
| `path`     || Path to the file to read                 |
| `mode`     | `tail`  | `tail` (follow) or `oneshot` (read once) |
| `interval` | `1s`    | Poll interval for `tail` mode            |

### Redis Source

| Parameter    | Default | Description                                                                                            |
| ------------ | ------- | ------------------------------------------------------------------------------------------------------ |
| `url`        || Redis connection URL                                                                                   |
| `key`        || Key to fetch                                                                                           |
| `mode`       | `poll`  | `poll` or `oneshot`                                                                                    |
| `interval`   | `5s`    | Poll interval (only for `poll` mode)                                                                   |
| `schedule`   || Cron expression (5 or 6 fields; 5-field schedules assume `0` seconds); when set, `interval` is ignored |
| `parse_json` | `true`  | Parse JSON values when possible                                                                        |

### HTTP Client Source

| Parameter          | Default | Description                                                                                                                                           |
| ------------------ | ------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| `url`              || Target URL to request                                                                                                                                 |
| `interval`         | `60s`   | Poll interval when `schedule` is not set                                                                                                              |
| `schedule`         || Cron expression (5 or 6 fields; 5-field schedules assume `0` seconds, e.g., `0 0 * * *` for daily 00:00, local time); when set, `interval` is ignored |
| `method`           | `GET`   | HTTP method (`GET`, `POST`, `PUT`, `DELETE`)                                                                                                          |
| `headers`          | `{}`    | Optional request headers                                                                                                                              |
| `error_body_limit` | `2048`  | Max error response body length included in errors                                                                                                     |
| `auth`             || Optional auth config (see HTTP Auth section)                                                                                                          |
| `expect_status`    || Optional list of accepted status codes                                                                                                                |
| `expect_body`      || Optional body validation rules                                                                                                                        |

`expect_body` supports:

| Field          | Default | Description                                 |
| -------------- | ------- | ------------------------------------------- |
| `path`         || JSONPath to extract value                   |
| `eq`           || Expected value (requires `path`)            |
| `ne`           || Not-equal value (requires `path`)           |
| `contains`     || Response body must contain this keyword     |
| `not_contains` || Response body must NOT contain this keyword |

### HTTP Server Source

| Parameter | Default | Description                                  |
| --------- | ------- | -------------------------------------------- |
| `bind`    || Bind address                                 |
| `path`    | `/`     | Request path                                 |
| `auth`    || Optional auth config (see HTTP Auth section) |

### SQL Source

| Parameter    | Default  | Description                                                                                                         |
| ------------ | -------- | ------------------------------------------------------------------------------------------------------------------- |
| `driver`     | `sqlite` | `sqlite` or `postgres`                                                                                              |
| `connection` || Connection string or SQLite path                                                                                    |
| `query`      || Query to execute                                                                                                    |
| `interval`   | `60s`    | Poll interval when `schedule` is not set                                                                            |
| `schedule`   || Cron expression (5 or 6 fields; 5-field schedules assume `0` seconds); when set, `interval` is ignored (local time) |

## HTTP Auth

The `auth` block is shared by `http_client` sources/sinks and the `http_server` source.

| Type      | Parameters             | Description              |
| --------- | ---------------------- | ------------------------ |
| `basic`   | `username`, `password` | HTTP Basic auth          |
| `bearer`  | `token`                | Bearer token header      |
| `api_key` | `header`, `key`        | Custom header key/value  |
| `header`  | `name`, `value`        | Generic header key/value |