pipeflow 0.0.3

A lightweight, configuration-driven data pipeline framework
Documentation
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

pipeflow is a lightweight, configuration-driven data pipeline framework for Rust.
It implements a classic ETL pattern with DAG-based execution:

```text
Source → Transform → Sink
```

Pipelines are defined in YAML and validated at load time (duplicate IDs, missing inputs, cycle detection).

## Build & Test Commands

```bash
# Build
cargo build                          # Default features (http-client, file)
cargo build --all-features           # All features
cargo build --no-default-features    # Core only

# Test
cargo test --all-features            # Full test suite
cargo test --test http_to_file_test  # Single integration test

# Lint & Format
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings

# Run pipeline
cargo run -- run examples/http_to_console.yaml
cargo run -- config validate examples/http_to_console.yaml
cargo run -- -v run examples/http_to_console.yaml  # Verbose/debug output
```

## Architecture

### Core Modules (src/)

- **engine.rs** - DAG construction and async execution orchestrator.
  Creates broadcast channels for fan-out, spawns source/transform/sink tasks, handles graceful shutdown.
- **config.rs** - YAML parsing with DAG validation (duplicate IDs, input references, cycle detection via DFS).
- **message.rs** - `Message` (metadata + JSON payload) and `SharedMessage` (Arc-wrapped for broadcast).

### Node Types

**Sources** (src/source/):

- `http_client` - HTTP polling with configurable interval (feature-gated)
- `internal` - Built-in internal sources (`internal::dlq`, `internal::event`, `internal::alert`)

**Transforms** (src/transform/):

- Multi-step pipeline architecture (`TransformPipeline` in pipeline.rs)
- Steps: `filter` (conditional), `remap` (field mapping with JSONPath-like syntax)
- Step trait in step.rs, JSONPath extraction in json_path.rs

**Sinks** (src/sink/):

- `console` - stdout output (pretty/compact/text format)
- `file` - JSONL/TSV/CSV file output
- `blackhole` - discard messages
- `internal` - route to internal sources

### Channel Architecture

Uses `tokio::sync::broadcast` for fan-out (one source to multiple sinks).
`SharedMessage` (`Arc<Message>`) enables efficient cloning.
Slow consumers may lag and drop messages (logged as warnings).

### Feature Flags

- `http-client` (default) - HTTP polling source
- `file` (default) - File sink
- `test-utils` - Exposes internal APIs for integration tests

## Key Patterns

- Nodes implement `Source`, `Transform`, or `Sink` traits (all async_trait)
- Config validation runs before engine build
- Sources receive shutdown signal via broadcast channel
- Engine spawns: sinks first → transforms → sources (ensures receivers ready before senders)

## Integration Tests

Located in `tests/`. Use wiremock for HTTP mocking, tempfile for file sink tests.
Tests use `Engine::run_with_signal()` for controlled shutdown.