# Pipeflow Project Features and Implementation Details
[δΈζ](PROJECT_IMPL_CN.md)
## 1. Overview
Pipeflow is a lightweight, configuration-driven data pipeline framework for Rust. It allows users to define data
processing flows via YAML configuration files, performing common ETL (Extract, Transform, Load) tasks without
writing code.
**Core Philosophy:**
- **Configuration-Driven**: Complete pipeline definition (Sources, Transforms, Sinks) managed via YAML.
- **High Performance**: Built on Rust and the Tokio asynchronous runtime, utilizing `broadcast` channels for
efficient message distribution.
- **Modular**: Provides standardized abstractions for Source, Transform, and Sink, making it easy to extend.
- **Observability**: Built-in system-level event streams (Events), audit logs (Audit), and dead letter queues (DLQ).
## 2. Core Architecture
The project adopts the classic **DAG (Directed Acyclic Graph)** architecture model:
```text
Source (Data Source) β Transform (Data Transformation) β Sink (Data Output)
```
### 2.1 Engine
`src/engine.rs` is the core scheduler, responsible for:
1. **Loading Configuration**: Parsing YAML, normalizing, and validating (checking for loops, isolated nodes, etc.).
2. **Building the Graph**: Instantiating nodes (Source/Transform/Sink) based on configuration.
3. **Establishing Connections**: Connecting nodes using `tokio::sync::broadcast` channels. seamless support for
"Fan-out" mode, where one node's output can be sent to multiple downstream nodes simultaneously.
4. **Lifecycle Management**: Starting all asynchronous tasks and managing graceful shutdowns.
### 2.2 Message Passing
- **Message Structure**: The `Message` struct encapsulates the data payload (`payload`) and metadata (`meta`).
- **Sharing Mechanism**: Using `Arc<Message>` allows for efficient data sharing among multiple downstream nodes,
avoiding unnecessary memory copies.
- **Backpressure Control**: `broadcast` channels have a fixed capacity (`output_buffer_size` is configurable).
When downstream consumption is too slow, old messages are dropped and a `Lagged` warning is logged to prevent
unbounded memory growth.
## 3. Modules
### 3.1 Sources
Located in `src/source/`, responsible for producing data:
- **`http_client`**: Periodically polls HTTP APIs to fetch data.
- **`http_server`**: Starts an HTTP service to receive Webhook pushes.
- **`redis`**: Fetches data from Redis.
- **`sql`**: Executes SQL queries to pull data.
- **`file`**: Watches for file changes or reads file content.
- **System Sources**: `source::system::dlq` (Dead Letter Queue), `source::system::event` (System Events),
`source::system::notify` (Notifications).
### 3.2 Transforms
Located in `src/transform/`, responsible for processing data:
- **`remap`**: Field remapping/renaming.
- **`filter`**: Filters messages based on conditions (e.g., regex matching).
- **`window`**: Time window or count window aggregation.
- **`compute`**: Computational logic processing.
- **`hash`**: Generating deterministic identifiers.
### 3.3 Sinks
Located in `src/sink/`, responsible for outputting data:
- **`console`**: Prints to standard output (for debugging).
- **`file`**: Writes to local files.
- **`http_client`**: Sends HTTP requests to external services.
- **`redis`**: Writes Redis commands.
- **`sql`**: Executes SQL inserts/updates.
- **`notify`**: Sends notifications (supports Telegram, Email, Webhook, etc.).
- **`blackhole`**: Discards data (for testing).
## 4. Key Mechanisms
### 4.1 System Channels
Pipeflow has a built-in set of "shadow pipelines" for handling system-level messages, established during
`Engine` initialization:
- **DLQ (Dead Letter Queue)**: Captures messages that failed processing.
- **Event**: Records critical events during system runtime (e.g., connection lost, reconnecting).
- **Audit**: Records processing time and status (Success/Failure) of messages at each node.
In implementation, the `Engine` creates a global `SystemChannels` structure containing three `mpsc` senders.
Every Source/Transform/Sink holds these senders at runtime and asynchronously sends metadata when errors occur
or processing completes.
### 4.2 Dead Letter Queue & Loop Protection
To prevent dead letter messages themselves from triggering errors and causing infinite loops, the system
implements **Chain Depth Protection**:
- Message metadata contains a `chain_depth` counter.
- When a new DLQ message is generated due to an error, the counter is incremented by 1.
- If the counter exceeds a threshold (e.g., 8), the message is forcibly dropped to prevent a system avalanche.
### 4.3 Fan-out & Broadcast
The engine creates a `broadcast::Sender` for each Source and Transform via `Engine::create_node_channels`.
- **Transforms** subscribe to upstream broadcast channels.
- **Sinks** also subscribe to upstream broadcast channels.
- Sinks use `futures::stream::select_all` to merge multiple upstream input streams into a single stream for
sequential processing.
## 5. Summary
Pipeflow builds a safe and high-performance data processing framework by leveraging Rust's strong type system
and Tokio's high concurrency capabilities. Its design core lies in the **separation of configuration and logic**,
allowing users to focus on business logic (YAML configuration) while delegating concurrency control, error handling,
and resource management to the underlying framework implementation.