# pipeflow Design Document
[中文](DESIGN_CN.md)
## Overview
pipeflow is a lightweight, configuration-driven data pipeline framework for Rust.
It follows the classic ETL pattern with a DAG (Directed Acyclic Graph) execution model.
```text
Source → Transform → Sink
```
Note: This is a design document. Some sections describe planned functionality that
is not implemented yet. For the current behavior and supported node types, see
`README.md`.
## Design Decisions
| Data Format | `Message { meta, payload }` | Generic metadata + JSON payload |
| Remap DSL | JSONPath-like | Simple field mapping, e.g., `$.data.price` |
| Window State | Non-persistent | Restart clears windows, acceptable trade-off |
| Error Handling | DLQ + Event + Notify | Failed messages → DLQ, errors → Event, alerts → Notify |
| Config | YAML + DAG validation | Declarative, cycle detection at load time |
## Configuration Loading and Normalization (Implemented)
Pipeflow loads configuration via `Config::from_file(path)`:
- **File**: parse a single YAML file.
- **Directory**: load all `*.yaml` / `*.yml` files in **lexical order** and merge them.
After loading, the configuration is **normalized** and then **validated**:
- **System sources/sinks**:
- Built-in system nodes are implicit and always available.
- System sink output paths can be configured via `system.sinks`.
Validation focuses on **pipeline wiring semantics** (unique IDs, references, source-only inputs,
unused nodes, cycle detection, and system routing constraints). Node-specific `config` schemas
(e.g. required fields for `http_client`) are validated during `Engine::build()`.
## Pipeline Wiring Model
Pipeflow supports flexible DAG wiring:
- **Sources** have no inputs and no outputs declared in config.
- **Transforms** declare one or more `inputs` (sources or other transforms) and one or more `outputs` (sinks or other transforms).
- **Transform chaining** is supported: transforms can connect to other transforms for multi-stage processing.
- **Transforms** may omit `steps` to act as pass-through routers.
- **Transforms** can wire to other transforms via `inputs`/`outputs` (either side is enough).
- **Sinks** have no inputs; their target is defined by sink type/config.
Example transform chain:
```yaml
transforms:
- id: stage1
inputs: [source]
outputs: [stage2] # Output to another transform
- id: stage2
inputs: [stage1] # Input from another transform
outputs: [sink]
```
## Core Data Model
### Message
```rust
pub struct Message {
pub meta: MessageMeta,
pub payload: serde_json::Value,
}
pub struct MessageMeta {
pub id: Uuid, // UUIDv7 for time-ordering
pub timestamp: i64, // Created timestamp (ms)
pub source_node: String, // Originating node ID
pub correlation_id: Option<Uuid>, // For tracing event chains
pub chain_depth: u8, // Prevents infinite loops
pub tags: HashMap<String, String>, // Custom key-value pairs
}
```
### System Channels (Implemented)
System channels handle error routing and observability:
| DLQ | Failed messages | `Message` | Original message that failed processing |
| Event | Error details | `Event` | Structured error information |
| Notify | User alerts | `Notify` | Notifications requiring user attention |
**Routing behavior**:
- Transform/Sink errors → DLQ (original message) + Event (error details)
- Buffer overflow → Event (error details) + Notify (user alert)
- Source errors → Event (error details) only
### PipelineMetrics (Implemented)
```rust
pub struct PipelineMetrics {
pub messages_sent: AtomicU64, // Total messages sent by sources
pub messages_received: AtomicU64, // Total messages received by sinks
pub messages_dropped: AtomicU64, // Total messages dropped (buffer overflow)
pub errors: AtomicU64, // Total processing errors
pub dlq_sent: AtomicU64, // Total messages sent to DLQ
}
```
Access via `Engine::metrics()` to get a snapshot of current values.
## Node Types
### Sources
| `http_client` | HTTP polling | url, interval, headers, auth | Implemented |
| `http_server` | HTTP push/webhook | bind, path, auth | Implemented |
| `sql` | SQL polling | connection, query | Implemented |
| `file` | File watching | path, mode | Implemented |
| `redis` | Redis key polling | url, key, interval | Implemented |
| `websocket` | WebSocket streaming | url, subscribe_msg | Planned |
#### System Sources
System sources have fixed IDs that map to specific system channels:
| `source::system::dlq` | `Message` | Receives original messages that failed |
| `source::system::event` | `Event` | Receives structured error/event info |
| `source::system::notify` | `Notify` | Receives user-facing notifications/alerts |
System sinks are implicit and write JSONL outputs by default:
- `sink::system::dlq` -> `data/system_dlq.jsonl`
- `sink::system::event` -> `data/system_event.jsonl`
- `sink::system::notify` -> `data/system_notify.jsonl`
By default, each system source emits to its corresponding system sink. You can route a system
source to user-defined sinks by wiring it into a transform.
**Event Structure**:
```rust
pub struct Event {
pub name: String, // Event name/type
pub payload: serde_json::Value, // Event data
pub labels: HashMap<String, String>, // Key-value tags
pub timestamp: i64, // Timestamp (ms)
}
```
**Notify Structure**:
```rust
pub struct Notify {
pub name: String,
pub severity: NotifySeverity, // Info | Warning | Error | Critical
pub message: String,
pub labels: HashMap<String, String>,
pub timestamp: i64,
}
pub enum NotifySeverity {
Info,
Warning,
Error,
Critical,
}
```
System sources are implicit and do not need to be declared in configuration.
### Transforms
Transforms use a multi-step pipeline architecture. Each transform contains zero or more
steps that execute sequentially.
| `filter` | Conditional filtering | 1:0/1 | Implemented |
| `remap` | Field mapping | 1:1 | Implemented |
| `window` | Window-based aggregation | n:1 | Implemented |
| `compute` | Math expression eval | 1:1 | Implemented |
| `hash` | Generate deterministic ID | 1:1 | Implemented |
**Configuration Example**:
```yaml
transforms:
- id: process_data
inputs: [source]
outputs: [sink]
steps:
# Step 1: Remap fields
- type: remap
config:
mappings:
# Extract from source path
- from: "$.data.user_id"
to: "$.user_id"
# Static value assignment
- value: "processed"
to: "$.status"
# Template string with interpolation
- from: "User: {{ $.data.name }} (ID: {{ $.data.user_id }})"
to: "$.description"
keep_unmapped: false
# Step 2: Filter (single condition)
- type: filter
config:
field: "$.status"
operator: eq
value: "active"
# Step 2 alternative: Filter (multiple conditions)
- type: filter
config:
mode: and # or "or"
conditions:
- field: "$.status"
operator: eq
value: "active"
- field: "$.score"
operator: ge
value: 50
```
**Filter Operators**:
| Operator | Description |
| ---------- | ------------------------------- |
| `eq` | Equal |
| `ne` | Not equal |
| `gt` | Greater than |
| `ge` | Greater than or equal |
| `lt` | Less than |
| `le` | Less than or equal |
| `abs_gt` | Absolute value greater than |
| `abs_ge` | Absolute value greater or equal |
| `abs_lt` | Absolute value less than |
| `abs_le` | Absolute value less or equal |
| `contains` | String contains |
| `matches` | Regex match |
**Window Step (Implemented)**:
```yaml
- type: window
config:
duration: "30s" # Time trigger (optional)
size: 10 # Count trigger (optional, at least one required)
operation: merge # merge | select_one (default: merge)
strategy: first # For select_one: first | last (default: first)
max_messages: 10000 # Buffer capacity (default: 10000)
on_overflow: drop_oldest # drop_oldest | error (default: drop_oldest)
```
### Sinks
| `blackhole` | Discard (default DLQ) | Implemented |
| `console` | Print to stdout | Implemented |
| `file` | Write to file | Implemented |
| `sql` | SQL Database insert | Implemented |
| `redis` | Redis SET/SETEX | Implemented |
| `notify` | Email/Telegram/Webhook | Implemented |
| `http_client` | HTTP API calls | Implemented |
## Dead Letter Queue (DLQ) (Implemented)
Processing errors are automatically routed to system channels:
```text
┌─────────┐ ┌───────────┐ ┌──────┐
│ Source │────▶│ Transform │────▶│ Sink │
└────┬────┘ └─────┬─────┘ └───┬──┘
│ │ error │ error
│ ├───────────────┤
│ │ │
│ ▼ ▼
│ ┌─────────────────┐ ┌─────────────────┐
│ │ source::system::dlq │ │ source::system::event │
│ │ (original msg) │ │ (error details)│
│ └─────────────────┘ └─────────────────┘
│
└─────▶ source::system::event (error details only)
```
**Routing behavior**:
- Transform/Sink errors → DLQ (original message) + Event (error details)
- Buffer overflow (Lagged) → Event (error details) + Notify (user alert)
The `source::system::dlq` source can be connected to any sink for custom handling:
```yaml
pipeline:
transforms:
- id: dlq_to_file
inputs: [source::system::dlq]
outputs: [dlq_file]
sinks:
- id: dlq_file
type: file
config:
path: "./dead_letters.jsonl"
```
### Cycle Prevention
Messages originating from `source::system::dlq` are not re-routed to DLQ on failure
to prevent infinite loops. The `chain_depth` field in message metadata provides
additional protection (max depth: 8).
## Core Traits
```rust
use async_trait::async_trait;
use tokio::sync::broadcast;
use crate::message::{Message, SharedMessage};
// Source: Produces messages (fan-out via broadcast)
#[async_trait]
pub trait Source: Send + Sync {
fn id(&self) -> &str;
async fn run(
&self,
sender: broadcast::Sender<SharedMessage>,
shutdown: broadcast::Receiver<()>,
) -> Result<()>;
}
// Transform: Processes messages (1:n)
#[async_trait]
pub trait Transform: Send + Sync {
fn id(&self) -> &str;
async fn process(&self, msg: Message) -> Result<Vec<Message>>;
}
// Sink: Consumes messages
#[async_trait]
pub trait Sink: Send + Sync {
fn id(&self) -> &str;
async fn process(&self, msg: SharedMessage) -> Result<()>;
}
```