# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
pipeflow is a lightweight, configuration-driven data pipeline framework for Rust.
It implements a classic ETL pattern with DAG-based execution:
```text
Source → Transform → Sink
```
Pipelines are defined in YAML and validated at load time (duplicate IDs, missing inputs, cycle detection).
## Build & Test Commands
```bash
# Build
cargo build # Default features (http-client, file)
cargo build --all-features # All features
cargo build --no-default-features # Core only
# Test
cargo test --all-features # Full test suite
cargo test --test http_to_file_test # Single integration test
# Lint & Format
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings
# Run pipeline
cargo run -- run examples/http_to_console.yaml
cargo run -- config validate examples/http_to_console.yaml
cargo run -- -v run examples/http_to_console.yaml # Verbose/debug output
```
## Architecture
### Core Modules (src/)
- **engine.rs** - DAG construction and async execution orchestrator.
Creates broadcast channels for fan-out, spawns source/transform/sink tasks, handles graceful shutdown.
- **config.rs** - YAML parsing with DAG validation (duplicate IDs, input references, cycle detection via DFS).
- **message.rs** - `Message` (metadata + JSON payload) and `SharedMessage` (Arc-wrapped for broadcast).
### Node Types
**Sources** (src/source/):
- `http_client` - HTTP polling with configurable interval (feature-gated)
- System sources (implicit): `source::system::dlq`, `source::system::event`, `source::system::notify`
**Transforms** (src/transform/):
- Multi-step pipeline architecture (`TransformPipeline` in pipeline.rs)
- Steps: `filter` (conditional), `remap` (field mapping with JSONPath-like syntax)
- Step trait in step.rs, JSONPath extraction in json_path.rs
**Sinks** (src/sink/):
- `console` - stdout output (pretty/compact/text format)
- `file` - JSONL/TSV/CSV file output
- `sql` - SQLite/PostgreSQL database output (feature-gated: `database`)
- `blackhole` - discard messages
- System sinks (implicit): `sink::system::dlq`, `sink::system::event`, `sink::system::notify`
### Channel Architecture
Uses `tokio::sync::broadcast` for fan-out (one source to multiple sinks).
`SharedMessage` (`Arc<Message>`) enables efficient cloning.
Slow consumers may lag and drop messages (logged as warnings).
### Feature Flags
- `http-client` (default) - HTTP polling source
- `file` (default) - File sink
- `test-utils` - Exposes internal APIs for integration tests
## Key Patterns
- Nodes implement `Source`, `Transform`, or `Sink` traits (all async_trait)
- Config validation runs before engine build
- Sources receive shutdown signal via broadcast channel
- Engine spawns: sinks first → transforms → sources (ensures receivers ready before senders)
## Integration Tests
Located in `tests/`. Use wiremock for HTTP mocking, tempfile for file sink tests.
Tests use `Engine::run_with_signal()` for controlled shutdown.
## Code Style Conventions
### Imports
- Use top-level `use` statements for std types (e.g., `use std::sync::Arc;`)
- Avoid inline `std::sync::Arc::new()` - prefer `Arc::new()` with top-level import
- Group imports: std → external crates → local crate (enforced by `.rustfmt.toml`)
### Component Structure
All Sources, Sinks, and Transforms follow this pattern:
```rust
// 1. Config struct with serde attributes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XxxConfig { ... }
// 2. Validation method
impl XxxConfig {
fn validate_and_normalize(&mut self) -> Result<()> { ... }
}
// 3. Implementation struct
pub struct Xxx { ... }
// 4. Constructor with validation
impl Xxx {
pub fn new(id: impl Into<String>, config: XxxConfig) -> Result<Self> { ... }
}
// 5. Trait implementation
#[async_trait]
impl Source/Sink/Transform for Xxx { ... }
```
### Error Handling
Use factory methods from `crate::error::Error`:
- `Error::config("message")` - configuration errors
- `Error::source("message")` - source execution errors
- `Error::sink("message")` - sink execution errors
- `Error::transform("message")` - transform processing errors
### Logging
Use structured tracing fields:
- String fields: `field = %value` (Display trait)
- Complex types: `field = ?value` (Debug trait)
- Example: `tracing::info!(source_id = %self.id, mode = ?self.mode, "Started")`
### Testing
- Unit tests: `#[cfg(test)] mod tests { ... }` at file bottom
- Use `unwrap()` in unit tests, `expect("descriptive message")` in integration tests
- Integration tests in `tests/` directory use `common.rs` helpers