1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# =============================================================================
# HTTP Client -> SQLite UPSERT (with hash id) Example
# =============================================================================
#
# This example demonstrates:
# 1) Polling an HTTP API at regular intervals
# 2) Extracting fields via `remap`
# 3) Generating a deterministic `id` via `hash` (sha256)
# 4) Writing rows to SQLite using `sql` sink with UPSERT-by-id
# - `created_at` is INSERT-only (will not be updated on conflict)
# - other fields (e.g. title/author) are updated on conflict
#
# Prerequisites:
# - Create the database and table first:
# mkdir -p data
# sqlite3 data/pipeflow_events.db "CREATE TABLE IF NOT EXISTS events (
# id TEXT PRIMARY KEY,
# created_at TEXT NOT NULL,
# source TEXT NOT NULL,
# title TEXT,
# author TEXT
# );"
#
# Usage:
# cargo run --all-features -- run examples/http_to_sqlite.yaml
#
# Verify:
# sqlite3 data/pipeflow_events.db "SELECT id, created_at, source, title, author FROM events;"
#
# Notes:
# - Table/column identifiers must match: [A-Za-z_][A-Za-z0-9_]*
# - The upsert key must be backed by PRIMARY KEY or UNIQUE constraint.
#
# =============================================================================
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
transforms:
- id: process_events
inputs:
outputs:
steps:
- type: remap
config:
keep_unmapped: false
mappings:
- from: "$.slideshow.title"
to: "$.title"
- from: "$.slideshow.author"
to: "$.author"
- type: hash
config:
mappings:
# Deterministic id for UPSERT key (lower-case hex string)
- algo: sha256
from: "{{ $.title }}|{{ $.author }}"
to: "$.id"
sinks:
- id: sqlite_out
type: sql
config:
driver: sqlite
connection: "data/pipeflow_events.db"
table: events
# Enable UPSERT and set conflict key.
# The table must have PRIMARY KEY or UNIQUE constraint on this column.
upsert:
conflict_columns:
columns:
- name: id
from: "$.id"
# INSERT-only: initial creation time (will not change on conflict).
- name: created_at
value: "$NOW"
insert_only: true
- name: source
value: "$SOURCE_ID"
- name: title
from: "$.title"
- name: author
from: "$.author"
# Also output to console for debugging
- id: console_out
type: console
config:
format: json