1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# =============================================================================
# SQL Source -> Console Example
# =============================================================================
#
# This example demonstrates:
# 1) Polling a SQLite database at regular intervals
# 2) Emitting each row as a JSON message
# 3) Outputting to console with pretty formatting
#
# Prerequisites:
# - Create the database and table first:
# mkdir -p data
# sqlite3 data/demo.db "CREATE TABLE IF NOT EXISTS events (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# name TEXT NOT NULL,
# value REAL,
# created_at TEXT DEFAULT (datetime('now'))
# );"
# sqlite3 data/demo.db "INSERT INTO events (name, value) VALUES
# ('temperature', 23.5),
# ('humidity', 65.2),
# ('pressure', 1013.25);"
#
# Usage:
# cargo run --all-features -- run examples/sql_to_console.yaml
#
# Expected Output:
# Each row from the events table will be printed as JSON:
# {"id": 1, "name": "temperature", "value": 23.5, "created_at": "2025-01-13 02:30:00"}
#
# Notes:
# - The source will poll immediately on startup, then every `interval` seconds
# - Use `schedule` instead of `interval` for cron-based scheduling
# - For PostgreSQL, change `driver` to `postgres` and use a connection string
#
# =============================================================================
pipeline:
sources:
- id: db_poller
type: sql
config:
# Database driver: sqlite | postgres
driver: sqlite
# Connection string
# - SQLite: file path or ":memory:"
# - Postgres: "postgres://user:pass@host:port/dbname"
connection: "data/demo.db"
# Query to execute each interval (returns all columns as JSON)
query: "SELECT id, name, value, created_at FROM events ORDER BY id"
# Polling interval (e.g., "10s", "1m", "1h")
interval: "10s"
# Alternative: use cron schedule instead of interval
# schedule: "*/30 * * * * *" # Every 30 seconds
transforms:
- id: passthrough
inputs:
outputs:
steps:
sinks:
# Pretty-print each row to console
- id: console_out
type: console
config:
format: pretty # Options: pretty | json | text