pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# =============================================================================
# HTTP Client -> SQLite UPSERT (with hash id) Example
# =============================================================================
#
# This example demonstrates:
#   1) Polling an HTTP API at regular intervals
#   2) Extracting fields via `remap`
#   3) Generating a deterministic `id` via `hash` (sha256)
#   4) Writing rows to SQLite using `sql` sink with UPSERT-by-id
#      - `created_at` is INSERT-only (will not be updated on conflict)
#      - other fields (e.g. title/author) are updated on conflict
#
# Prerequisites:
#   - Create the database and table first:
#     mkdir -p data
#     sqlite3 data/pipeflow_events.db "CREATE TABLE IF NOT EXISTS events (
#       id TEXT PRIMARY KEY,
#       created_at TEXT NOT NULL,
#       source TEXT NOT NULL,
#       title TEXT,
#       author TEXT
#     );"
#
# Usage:
#   cargo run --all-features -- run examples/http_to_sqlite.yaml
#
# Verify:
#   sqlite3 data/pipeflow_events.db "SELECT id, created_at, source, title, author FROM events;"
#
# Notes:
# - Table/column identifiers must match: [A-Za-z_][A-Za-z0-9_]*
# - The upsert key must be backed by PRIMARY KEY or UNIQUE constraint.
#
# =============================================================================

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"

  transforms:
    - id: process_events
      inputs: [api_poller]
      outputs: [sqlite_out, console_out]
      steps:
        - type: remap
          config:
            keep_unmapped: false
            mappings:
              - from: "$.slideshow.title"
                to: "$.title"
              - from: "$.slideshow.author"
                to: "$.author"
        - type: hash
          config:
            mappings:
              # Deterministic id for UPSERT key (lower-case hex string)
              - algo: sha256
                from: "{{ $.title }}|{{ $.author }}"
                to: "$.id"

  sinks:
    - id: sqlite_out
      type: sql
      config:
        driver: sqlite
        connection: "data/pipeflow_events.db"
        table: events

        # Enable UPSERT and set conflict key.
        # The table must have PRIMARY KEY or UNIQUE constraint on this column.
        upsert:
          conflict_columns: ["id"]

        columns:
          - name: id
            from: "$.id"

          # INSERT-only: initial creation time (will not change on conflict).
          - name: created_at
            value: "$NOW"
            insert_only: true

          - name: source
            value: "$SOURCE_ID"

          - name: title
            from: "$.title"
          - name: author
            from: "$.author"

    # Also output to console for debugging
    - id: console_out
      type: console
      config:
        format: json