1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# =============================================================================
# Window Aggregation Example
# =============================================================================
#
# This example demonstrates the window transform step that:
# 1. Polls an HTTP API at regular intervals (http_client source)
# 2. Buffers messages and emits aggregated output (window step)
# 3. Writes aggregated messages to console (console sink)
#
# Window triggers (first one wins):
# - Time trigger: emit after `duration` (e.g., "30s")
# - Count trigger: emit after `size` messages
#
# Window operations:
# - merge: Combine all payload objects (objects merged, non-objects as array)
# - select_one: Select one message (strategy: first | last)
#
# Usage:
# pipeflow run examples/window_aggregation.yaml
#
# =============================================================================
pipeline:
# ---------------------------------------------------------------------------
# Sources
# ---------------------------------------------------------------------------
sources:
- id: api_poller
type: http_client
config:
url: "https://jsonplaceholder.typicode.com/posts/1"
interval: "5s"
# ---------------------------------------------------------------------------
# Transforms
# ---------------------------------------------------------------------------
transforms:
- id: aggregate_posts
inputs:
outputs:
steps:
# ---------------------------------------------------------------------
# Step 1: Remap - Extract relevant fields
# ---------------------------------------------------------------------
- type: remap
config:
mappings:
- from: "$.id"
to: "$.post_id"
- from: "$.title"
to: "$.title"
keep_unmapped: false
# ---------------------------------------------------------------------
# Step 2: Window - Aggregate 3 messages
# ---------------------------------------------------------------------
# Triggers:
# duration: Time-based trigger (e.g., "30s", "1m", "500ms")
# size: Count-based trigger (emit after N messages)
#
# Operations:
# merge: Merge all payloads (objects merged, primitives as array)
# select_one: Select one message (strategy: first | last)
#
# Buffer limits:
# max_messages: Maximum buffer capacity (default: 10000)
# on_overflow: drop_oldest | error (default: drop_oldest)
- type: window
config:
size: 3
operation: merge
max_messages: 100
# ---------------------------------------------------------------------------
# Sinks
# ---------------------------------------------------------------------------
sinks:
- id: console_out
type: console
config:
format: pretty