pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# Pipeflow Project Features and Implementation Details

[δΈ­ζ–‡](PROJECT_IMPL_CN.md)

## 1. Overview

Pipeflow is a lightweight, configuration-driven data pipeline framework for Rust. It allows users to define data
processing flows via YAML configuration files, performing common ETL (Extract, Transform, Load) tasks without
writing code.

**Core Philosophy:**

- **Configuration-Driven**: Complete pipeline definition (Sources, Transforms, Sinks) managed via YAML.
- **High Performance**: Built on Rust and the Tokio asynchronous runtime, utilizing `broadcast` channels for
  efficient message distribution.
- **Modular**: Provides standardized abstractions for Source, Transform, and Sink, making it easy to extend.
- **Observability**: Built-in system-level event streams (Events), audit logs (Audit), and dead letter queues (DLQ).

## 2. Core Architecture

The project adopts the classic **DAG (Directed Acyclic Graph)** architecture model:

```text
Source (Data Source) β†’ Transform (Data Transformation) β†’ Sink (Data Output)
```

### 2.1 Engine

`src/engine.rs` is the core scheduler, responsible for:

1. **Loading Configuration**: Parsing YAML, normalizing, and validating (checking for loops, isolated nodes, etc.).
2. **Building the Graph**: Instantiating nodes (Source/Transform/Sink) based on configuration.
3. **Establishing Connections**: Connecting nodes using `tokio::sync::broadcast` channels. seamless support for
   "Fan-out" mode, where one node's output can be sent to multiple downstream nodes simultaneously.
4. **Lifecycle Management**: Starting all asynchronous tasks and managing graceful shutdowns.

### 2.2 Message Passing

- **Message Structure**: The `Message` struct encapsulates the data payload (`payload`) and metadata (`meta`).
- **Sharing Mechanism**: Using `Arc<Message>` allows for efficient data sharing among multiple downstream nodes,
  avoiding unnecessary memory copies.
- **Backpressure Control**: `broadcast` channels have a fixed capacity (`output_buffer_size` is configurable).
  When downstream consumption is too slow, old messages are dropped and a `Lagged` warning is logged to prevent
  unbounded memory growth.

## 3. Modules

### 3.1 Sources

Located in `src/source/`, responsible for producing data:

- **`http_client`**: Periodically polls HTTP APIs to fetch data.
- **`http_server`**: Starts an HTTP service to receive Webhook pushes.
- **`redis`**: Fetches data from Redis.
- **`sql`**: Executes SQL queries to pull data.
- **`file`**: Watches for file changes or reads file content.
- **System Sources**: `source::system::dlq` (Dead Letter Queue), `source::system::event` (System Events),
  `source::system::notify` (Notifications).

### 3.2 Transforms

Located in `src/transform/`, responsible for processing data:

- **`remap`**: Field remapping/renaming.
- **`filter`**: Filters messages based on conditions (e.g., regex matching).
- **`window`**: Time window or count window aggregation.
- **`compute`**: Computational logic processing.
- **`hash`**: Generating deterministic identifiers.

### 3.3 Sinks

Located in `src/sink/`, responsible for outputting data:

- **`console`**: Prints to standard output (for debugging).
- **`file`**: Writes to local files.
- **`http_client`**: Sends HTTP requests to external services.
- **`redis`**: Writes Redis commands.
- **`sql`**: Executes SQL inserts/updates.
- **`notify`**: Sends notifications (supports Telegram, Email, Webhook, etc.).
- **`blackhole`**: Discards data (for testing).

## 4. Key Mechanisms

### 4.1 System Channels

Pipeflow has a built-in set of "shadow pipelines" for handling system-level messages, established during
`Engine` initialization:

- **DLQ (Dead Letter Queue)**: Captures messages that failed processing.
- **Event**: Records critical events during system runtime (e.g., connection lost, reconnecting).
- **Audit**: Records processing time and status (Success/Failure) of messages at each node.

In implementation, the `Engine` creates a global `SystemChannels` structure containing three `mpsc` senders.
Every Source/Transform/Sink holds these senders at runtime and asynchronously sends metadata when errors occur
or processing completes.

### 4.2 Dead Letter Queue & Loop Protection

To prevent dead letter messages themselves from triggering errors and causing infinite loops, the system
implements **Chain Depth Protection**:

- Message metadata contains a `chain_depth` counter.
- When a new DLQ message is generated due to an error, the counter is incremented by 1.
- If the counter exceeds a threshold (e.g., 8), the message is forcibly dropped to prevent a system avalanche.

### 4.3 Fan-out & Broadcast

The engine creates a `broadcast::Sender` for each Source and Transform via `Engine::create_node_channels`.

- **Transforms** subscribe to upstream broadcast channels.
- **Sinks** also subscribe to upstream broadcast channels.
- Sinks use `futures::stream::select_all` to merge multiple upstream input streams into a single stream for
  sequential processing.

## 5. Summary

Pipeflow builds a safe and high-performance data processing framework by leveraging Rust's strong type system
and Tokio's high concurrency capabilities. Its design core lies in the **separation of configuration and logic**,
allowing users to focus on business logic (YAML configuration) while delegating concurrency control, error handling,
and resource management to the underlying framework implementation.