pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

pipeflow is a lightweight, configuration-driven data pipeline framework for Rust.
It implements a classic ETL pattern with DAG-based execution:

```text
Source → Transform → Sink
```

Pipelines are defined in YAML and validated at load time (duplicate IDs, missing inputs, cycle detection).

## Build & Test Commands

```bash
# Build
cargo build                          # Default features (http-client, file)
cargo build --all-features           # All features
cargo build --no-default-features    # Core only

# Test
cargo test --all-features            # Full test suite
cargo test --test http_to_file_test  # Single integration test

# Lint & Format
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings

# Run pipeline
cargo run -- run examples/http_to_console.yaml
cargo run -- config validate examples/http_to_console.yaml
cargo run -- -v run examples/http_to_console.yaml  # Verbose/debug output
```

## Architecture

### Core Modules (src/)

- **engine.rs** - DAG construction and async execution orchestrator.
  Creates broadcast channels for fan-out, spawns source/transform/sink tasks, handles graceful shutdown.
- **config.rs** - YAML parsing with DAG validation (duplicate IDs, input references, cycle detection via DFS).
- **message.rs** - `Message` (metadata + JSON payload) and `SharedMessage` (Arc-wrapped for broadcast).

### Node Types

**Sources** (src/source/):

- `http_client` - HTTP polling with configurable interval (feature-gated)
- System sources (implicit): `source::system::dlq`, `source::system::event`, `source::system::notify`

**Transforms** (src/transform/):

- Multi-step pipeline architecture (`TransformPipeline` in pipeline.rs)
- Steps: `filter` (conditional), `remap` (field mapping with JSONPath-like syntax)
- Step trait in step.rs, JSONPath extraction in json_path.rs

**Sinks** (src/sink/):

- `console` - stdout output (pretty/compact/text format)
- `file` - JSONL/TSV/CSV file output
- `sql` - SQLite/PostgreSQL database output (feature-gated: `database`)
- `blackhole` - discard messages
- System sinks (implicit): `sink::system::dlq`, `sink::system::event`, `sink::system::notify`

### Channel Architecture

Uses `tokio::sync::broadcast` for fan-out (one source to multiple sinks).
`SharedMessage` (`Arc<Message>`) enables efficient cloning.
Slow consumers may lag and drop messages (logged as warnings).

### Feature Flags

- `http-client` (default) - HTTP polling source
- `file` (default) - File sink
- `test-utils` - Exposes internal APIs for integration tests

## Key Patterns

- Nodes implement `Source`, `Transform`, or `Sink` traits (all async_trait)
- Config validation runs before engine build
- Sources receive shutdown signal via broadcast channel
- Engine spawns: sinks first → transforms → sources (ensures receivers ready before senders)

## Integration Tests

Located in `tests/`. Use wiremock for HTTP mocking, tempfile for file sink tests.
Tests use `Engine::run_with_signal()` for controlled shutdown.

## Code Style Conventions

### Imports

- Use top-level `use` statements for std types (e.g., `use std::sync::Arc;`)
- Avoid inline `std::sync::Arc::new()` - prefer `Arc::new()` with top-level import
- Group imports: std → external crates → local crate (enforced by `.rustfmt.toml`)

### Component Structure

All Sources, Sinks, and Transforms follow this pattern:

```rust
// 1. Config struct with serde attributes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XxxConfig { ... }

// 2. Validation method
impl XxxConfig {
    fn validate_and_normalize(&mut self) -> Result<()> { ... }
}

// 3. Implementation struct
pub struct Xxx { ... }

// 4. Constructor with validation
impl Xxx {
    pub fn new(id: impl Into<String>, config: XxxConfig) -> Result<Self> { ... }
}

// 5. Trait implementation
#[async_trait]
impl Source/Sink/Transform for Xxx { ... }
```

### Error Handling

Use factory methods from `crate::error::Error`:

- `Error::config("message")` - configuration errors
- `Error::source("message")` - source execution errors
- `Error::sink("message")` - sink execution errors
- `Error::transform("message")` - transform processing errors

### Logging

Use structured tracing fields:

- String fields: `field = %value` (Display trait)
- Complex types: `field = ?value` (Debug trait)
- Example: `tracing::info!(source_id = %self.id, mode = ?self.mode, "Started")`

### Testing

- Unit tests: `#[cfg(test)] mod tests { ... }` at file bottom
- Use `unwrap()` in unit tests, `expect("descriptive message")` in integration tests
- Integration tests in `tests/` directory use `common.rs` helpers