pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# =============================================================================
# SQL Source -> Console Example
# =============================================================================
#
# This example demonstrates:
#   1) Polling a SQLite database at regular intervals
#   2) Emitting each row as a JSON message
#   3) Outputting to console with pretty formatting
#
# Prerequisites:
#   - Create the database and table first:
#     mkdir -p data
#     sqlite3 data/demo.db "CREATE TABLE IF NOT EXISTS events (
#       id INTEGER PRIMARY KEY AUTOINCREMENT,
#       name TEXT NOT NULL,
#       value REAL,
#       created_at TEXT DEFAULT (datetime('now'))
#     );"
#     sqlite3 data/demo.db "INSERT INTO events (name, value) VALUES
#       ('temperature', 23.5),
#       ('humidity', 65.2),
#       ('pressure', 1013.25);"
#
# Usage:
#   cargo run --all-features -- run examples/sql_to_console.yaml
#
# Expected Output:
#   Each row from the events table will be printed as JSON:
#   {"id": 1, "name": "temperature", "value": 23.5, "created_at": "2025-01-13 02:30:00"}
#
# Notes:
# - The source will poll immediately on startup, then every `interval` seconds
# - Use `schedule` instead of `interval` for cron-based scheduling
# - For PostgreSQL, change `driver` to `postgres` and use a connection string
#
# =============================================================================

pipeline:
  sources:
    - id: db_poller
      type: sql
      config:
        # Database driver: sqlite | postgres
        driver: sqlite
        # Connection string
        # - SQLite: file path or ":memory:"
        # - Postgres: "postgres://user:pass@host:port/dbname"
        connection: "data/demo.db"
        # Query to execute each interval (returns all columns as JSON)
        query: "SELECT id, name, value, created_at FROM events ORDER BY id"
        # Polling interval (e.g., "10s", "1m", "1h")
        interval: "10s"
        # Alternative: use cron schedule instead of interval
        # schedule: "*/30 * * * * *"  # Every 30 seconds

  transforms:
    - id: passthrough
      inputs: [db_poller]
      outputs: [console_out]
      steps: []

  sinks:
    # Pretty-print each row to console
    - id: console_out
      type: console
      config:
        format: pretty # Options: pretty | json | text