1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# =============================================================================
# HTTP Client with Multi-Step Transform to File Example
# =============================================================================
#
# This example demonstrates a complete transform pipeline that:
# 1. Polls an HTTP API at regular intervals (http_client source)
# 2. Filters messages based on a condition (filter step)
# 3. Remaps fields to a new structure (remap step)
# 4. Writes transformed messages to a file (file sink)
#
# Usage:
# pipeflow run examples/http_transform_to_file.yaml
#
# =============================================================================
pipeline:
# ---------------------------------------------------------------------------
# Sources
# ---------------------------------------------------------------------------
sources:
- id: api_poller
type: http_client
config:
# NOTE: /posts returns an array; this example expects an object payload.
url: "https://jsonplaceholder.typicode.com/posts/1"
interval: "30s"
# ---------------------------------------------------------------------------
# Transforms (Multi-Step Pipeline)
# ---------------------------------------------------------------------------
transforms:
- id: process_posts
inputs:
outputs:
steps:
# ---------------------------------------------------------------------
# Step 1: Filter - Only pass posts from userId == 1
# ---------------------------------------------------------------------
# Single condition format:
# field: JSONPath-like accessor ($.field, $.nested.field, $.array[0])
# operator: eq | ne | gt | ge | lt | le | contains | matches
# value: Expected value (string, number, or boolean)
#
# Multi-condition format:
# mode: and | or
# conditions:
# - { field: "$.field1", operator: eq, value: "val1" }
# - { field: "$.field2", operator: gt, value: 10 }
- type: filter
config:
field: "$.userId"
operator: eq
value: 1
# ---------------------------------------------------------------------
# Step 2: Remap - Transform field structure
# ---------------------------------------------------------------------
# mappings: List of field mappings (from -> to)
# keep_unmapped: If true, keep fields not listed in mappings (default: false)
- type: remap
config:
mappings:
- from: "$.id"
to: "$.post_id"
- from: "$.title"
to: "$.post_title"
- from: "$.userId"
to: "$.author_id"
keep_unmapped: false
# ---------------------------------------------------------------------------
# Sinks
# ---------------------------------------------------------------------------
sinks:
- id: file_out
type: file
config:
path: "./data/transformed_posts.jsonl"
append: false
format: jsonl