feedme 0.1.1

Deterministic streaming data pipeline with mechanical guarantees for Rust
Documentation
<p align="center">
  <img src="assets/feedme-logo.png" width="400" height="200" alt="FeedMe Logo">
</p>

<h1 align="center">FeedMe</h1>

<p align="center">
  <strong>FeedMe is a deterministic, linear, streaming ingest pipeline with mechanical guarantees around memory, ordering, and failure.</strong>
</p>

<p align="center">
  <a href="https://crates.io/crates/feedme">
    <img src="https://img.shields.io/crates/v/feedme.svg" alt="Crates.io">
  </a>
  <a href="https://docs.rs/feedme">
    <img src="https://docs.rs/feedme/badge.svg" alt="Docs.rs">
  </a>
  <a href="https://github.com/Michael-A-Kuykendall/feedme/actions/workflows/test-and-coverage.yml">
    <img src="https://img.shields.io/github/actions/workflow/status/Michael-A-Kuykendall/feedme/test-and-coverage.yml?branch=master&label=CI" alt="CI">
  </a>
  <a href="https://github.com/Michael-A-Kuykendall/feedme/blob/master/LICENSE">
    <img src="https://img.shields.io/github/license/Michael-A-Kuykendall/feedme" alt="License">
  </a>
</p>


---

## About


FeedMe is a **high-performance, streaming data pipeline engine** for Rust applications. It provides a linear, deterministic processing model with bounded resource usage, explicit error handling, and comprehensive observability.

Perfect for **ETL**, **log processing**, **data cleaning**, and **real-time ingestion pipelines**.

## Key Guarantees


- **Streaming, bounded memory** — processes one event at a time; memory usage stays flat
- **Deterministic processing** — same input + same config → same output
- **Structured errors** — stage, code, and message for every failure
- **Observability** — metrics exportable (Prometheus or JSON) without affecting execution
- **Extensible** — add custom stages via a defined plugin contract

## FeedMe Guarantees


FeedMe provides these mechanical guarantees:

- Events are processed strictly in input order
- Memory usage is bounded and input-size independent
- Stages cannot observe shared or mutated state
- Validation failures cannot be silently ignored
- Metrics collection cannot influence execution

## Why FeedMe is Not X


FeedMe is intentionally **not** these things, and that's by design:

### Not Distributed (like Vector or Fluent Bit)

FeedMe runs in a single process with no networking or cluster management.

### Not Stateful (like traditional ETL tools)

Stages are deterministic: same input + config → same output. No hidden state or concurrency.

### Not Async-First (like many modern Rust libraries)

Processing is synchronous by default. Async is an implementation detail for I/O stages.

### Not a DSL (like Logstash)

No embedded languages or required config files. Code-first with optional YAML support.

### Not a Daemon (like filebeat)

No long-running services or auto-restart. FeedMe is a library you embed in your application.

**This focus enables FeedMe's core guarantees while keeping the codebase small and maintainable.**

## Invariants


FeedMe enforces **mechanical behavioral guarantees** that are tested via runtime assertions and contract tests. These invariants ensure reliability and prevent regressions.

## Installation


Add this to your `Cargo.toml`:

```toml
[dependencies]
feedme = "0.1"
serde_json = "1.0"
regex = "1.0"
```

Or install via cargo:

```bash
cargo add feedme serde_json regex
```

## Quick Start


Here's a 12-line pipeline that:

* ingests logs
* redacts PII
* validates schema
* filters noise
* guarantees determinism
* and fails safely

```rust
use feedme::{
    Pipeline, FieldSelect, RequiredFields, StdoutOutput, Deadletter,
    PIIRedaction, Filter, InputSource, Stage
};
use std::path::PathBuf;

fn main() -> anyhow::Result<()> {
    // Create pipeline: select fields → redact PII → require fields → filter → output
    let mut pipeline = Pipeline::new();
    pipeline.add_stage(Box::new(FieldSelect::new(vec![
        "timestamp".into(), "level".into(), "message".into(), "email".into()
    ])));
    pipeline.add_stage(Box::new(PIIRedaction::new(vec!["email".into()])));
    pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".into()])));
    pipeline.add_stage(Box::new(Filter::new(Box::new(|event| {
        event.get("level").and_then(|v| v.as_str()) != Some("debug")
    }))));
    pipeline.add_stage(Box::new(StdoutOutput::new()));

    // Deadletter for errors
    let mut deadletter = Deadletter::new(PathBuf::from("errors.ndjson"));

    // Process input file
    let mut input = InputSource::File(PathBuf::from("input.ndjson"));
    input.process_input(&mut pipeline, &mut Some(&mut deadletter))?;

    // Export final metrics
    println!("Pipeline complete. Metrics:");
    for metric in pipeline.export_json_logs() {
        println!("{}", serde_json::to_string(&metric)?);
    }

    Ok(())
}
```

**Input** (`input.ndjson`):
```
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"user@example.com"}
{"level":"debug","message":"Debug info"}
{"message":"Missing level"}
```

**Output** (stdout):
```
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"[REDACTED]"}
```

**Deadletter** (`errors.ndjson`):
```
{"error":{"stage":"RequiredFields","code":"MISSING_FIELD","message":"Required field 'level' is missing"},"raw":"{\"message\":\"Missing level\"}"}
```

**Metrics** (JSON logs):
```
{"metric":"events_processed","value":2}
{"metric":"events_dropped","value":1}
{"metric":"errors","value":1}
{"metric":"stage_latencies","stage":"FieldSelect","count":3,"sum":0.05,"min":0.01,"max":0.02}
...
```

    input.process_input(&mut pipeline, &mut deadletter)?;
    println!("Example finished — metrics: {:?}", pipeline.export_json_logs());
    Ok(())
}
```
## Determinism Verification


> **Determinism is a core guarantee** — identical runs produce identical outputs.

FeedMe guarantees deterministic output for identical inputs. Verify this with:

```bash
cargo run --example 09_complex_pipeline > run1.out
cargo run --example 09_complex_pipeline > run2.out
# On Unix: sha256sum run1.out run2.out

# On Windows: certutil -hashfile run1.out SHA256 && certutil -hashfile run2.out SHA256

```

The hashes should match, proving deterministic behavior.
## Examples


### Messy Input → Clean Output


Given `messy.ndjson`:
```
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"user@example.com"}
{"level":"info","message":"Missing timestamp"}
{invalid json}
```

Run:
```bash
cargo run --example 01_redact_validate_deadletter
```

**Processed output** (`samples/processed.ndjson`):
```
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"[REDACTED]"}
```

**Deadletter** (errors logged with context):
```
{"error":{"stage":"Input_File","code":"PARSE_ERROR","message":"expected value at line 1 column 1"},"raw":"{invalid json}"}
```

**Metrics:**
```
{"metric":"events_processed","value":1}
{"metric":"events_dropped","value":1}
{"metric":"errors","value":1}
{"metric":"stage_latencies","stage":"PIIRedaction","count":1,"sum":0.1,"min":0.1,"max":0.1}
```

### Available Examples


FeedMe includes 12 comprehensive examples covering:

- **Data Cleaning**: PII redaction, field validation
- **Compliance**: GDPR/CCPA handling
- **Monitoring**: Bounded metrics, observability
- **ETL/Ingestion**: Multi-stage pipelines
- **Error Resilience**: Deadletter queues
- **Extensibility**: Custom stages and plugins

Run any example:
```bash
cargo run --example <number>_<description>
```

See [examples/](examples/) for the full list.

## 🏗️ API Reference


- [Pipeline]https://docs.rs/feedme/latest/feedme/struct.Pipeline.html - Core processing pipeline
- [Stage]https://docs.rs/feedme/latest/feedme/trait.Stage.html - Processing stage trait
- [InputSource]https://docs.rs/feedme/latest/feedme/enum.InputSource.html - Data input sources
- [Event]https://docs.rs/feedme/latest/feedme/struct.Event.html - Data event structure
- [Metrics]https://docs.rs/feedme/latest/feedme/struct.Metrics.html - Observability metrics

Full documentation: [docs.rs/feedme](https://docs.rs/feedme)

## Performance


FeedMe is designed for high-throughput, low-latency data processing:

- **Bounded memory usage** — no unbounded buffering regardless of input size
- **Efficient streaming** — minimal allocations with ownership transfer
- **Zero-overhead observability** — metrics collection doesn't affect execution
- **Horizontal scalability** — linear processing model scales across cores/processes

## 🛡️ Invariants


FeedMe enforces **mechanical behavioral guarantees** that are tested via runtime assertions and contract tests. These invariants ensure reliability and prevent regressions. Key guarantees include:

- **Metrics purity**: Exporting metrics doesn't affect pipeline state
- **Drop counting rules**: Events are only counted as dropped under specific conditions  
- **Latency recording**: Successful stage execution always records timing
- **Directory determinism**: Same directory input produces identical output

See [src/ppt_invariant_contracts.rs](src/ppt_invariant_contracts.rs) for the complete contract test suite.

## 🚫 Non-Goals


- Distributed processing
- Network I/O (except stubbed HTTP_Post)
- Persistent storage
- Query languages
- Compression or encryption

## 📋 API Overview


### Stage Contract

```rust
pub trait Stage {
    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError>;
    fn name(&self) -> &str;
    fn is_output(&self) -> bool { false }  // true if consumes event
}
```

### Execution Semantics

- `Some(event)`: Pass to next stage
- `None`: Drop (filtered); if `is_output()`, consumed
- `Err`: Stop pipeline, error with attribution

### Core Types

- `Pipeline`: Add stages, process events, export metrics
- `Event`: JSON data + optional metadata
- `InputSource`: Stream from stdin/file/directory
- `PipelineError`: Categorized errors (Parse/Transform/Validation/Output/System)

## 🤝 Contributing


We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

### Development Setup


```bash
git clone https://github.com/Michael-A-Kuykendall/feedme.git
cd feedme
cargo build
cargo test
```

### Determinism Verification


FeedMe guarantees deterministic output for identical inputs. Verify this with:

```bash
cargo run --example 09_complex_pipeline > run1.out
cargo run --example 09_complex_pipeline > run2.out
# On Unix: sha256sum run1.out run2.out

# On Windows: certutil -hashfile run1.out SHA256 && certutil -hashfile run2.out SHA256

```

The hashes should match, proving deterministic behavior.

### Code of Conduct


This project follows a code of conduct to ensure a welcoming environment for all contributors. See [CODE_OF_CONDUCT.md](CODE_OF_CONDUCT.md) for details.

## Sponsors


FeedMe is supported by our amazing sponsors. See [SPONSORS.md](SPONSORS.md) for details.

## License


Licensed under the MIT License. See [LICENSE](LICENSE) for details.

---

<p align="center">
  Made with ❤️ by <a href="https://github.com/Michael-A-Kuykendall">Michael Kuykendall</a>
</p>