# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
pipeflow is a lightweight, configuration-driven data pipeline framework for Rust.
It implements a classic ETL pattern with DAG-based execution:
```text
Source → Transform → Sink
```
Pipelines are defined in YAML and validated at load time (duplicate IDs, missing inputs, cycle detection).
## Build & Test Commands
```bash
# Build
cargo build # Default features (http-client, file)
cargo build --all-features # All features
cargo build --no-default-features # Core only
# Test
cargo test --all-features # Full test suite
cargo test --test http_to_file_test # Single integration test
# Lint & Format
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings
# Run pipeline
cargo run -- run examples/http_to_console.yaml
cargo run -- config validate examples/http_to_console.yaml
cargo run -- -v run examples/http_to_console.yaml # Verbose/debug output
```
## Architecture
### Core Modules (src/)
- **engine.rs** - DAG construction and async execution orchestrator.
Creates broadcast channels for fan-out, spawns source/transform/sink tasks, handles graceful shutdown.
- **config.rs** - YAML parsing with DAG validation (duplicate IDs, input references, cycle detection via DFS).
- **message.rs** - `Message` (metadata + JSON payload) and `SharedMessage` (Arc-wrapped for broadcast).
### Node Types
**Sources** (src/source/):
- `http_client` - HTTP polling with configurable interval (feature-gated)
- `internal` - Built-in internal sources (`internal::dlq`, `internal::event`, `internal::alert`)
**Transforms** (src/transform/):
- Multi-step pipeline architecture (`TransformPipeline` in pipeline.rs)
- Steps: `filter` (conditional), `remap` (field mapping with JSONPath-like syntax)
- Step trait in step.rs, JSONPath extraction in json_path.rs
**Sinks** (src/sink/):
- `console` - stdout output (pretty/compact/text format)
- `file` - JSONL/TSV/CSV file output
- `blackhole` - discard messages
- `internal` - route to internal sources
### Channel Architecture
Uses `tokio::sync::broadcast` for fan-out (one source to multiple sinks).
`SharedMessage` (`Arc<Message>`) enables efficient cloning.
Slow consumers may lag and drop messages (logged as warnings).
### Feature Flags
- `http-client` (default) - HTTP polling source
- `file` (default) - File sink
- `test-utils` - Exposes internal APIs for integration tests
## Key Patterns
- Nodes implement `Source`, `Transform`, or `Sink` traits (all async_trait)
- Config validation runs before engine build
- Sources receive shutdown signal via broadcast channel
- Engine spawns: sinks first → transforms → sources (ensures receivers ready before senders)
## Integration Tests
Located in `tests/`. Use wiremock for HTTP mocking, tempfile for file sink tests.
Tests use `Engine::run_with_signal()` for controlled shutdown.