About
FeedMe is a high-performance, streaming data pipeline engine for Rust applications. It provides a linear, deterministic processing model with bounded resource usage, explicit error handling, and comprehensive observability. Perfect for ETL, log processing, data cleaning, and real-time ingestion pipelines.
Key Guarantees
- Streaming, bounded memory: Processes events one-by-one; memory usage doesn't grow with input size.
- Deterministic and testable: Ownership transfer prevents shared state; stages are deterministic given the same inputs and configuration.
- Fail-fast with attribution: Errors include stage, code, and message; no silent failures.
- Observable without overhead: Metrics collected automatically, exportable to Prometheus/JSON.
- Extensible with contracts: Plugin system for custom stages without runtime discovery.
Why FeedMe is Not X
FeedMe is intentionally not these things, and that's by design. Here's why:
Not Distributed (like Vector or Fluent Bit)
FeedMe runs in a single process with no networking, coordination, or cluster management. This eliminates complexity from consensus, partitioning, and network failures. If you need distributed processing, FeedMe can be a building block within a larger system, but it doesn't handle distribution itself.
Not Stateful (like traditional ETL tools)
Stages should be deterministic: given the same input and configuration, they produce the same output. The framework does not introduce concurrency or hidden state—stages are responsible for their own determinism. This makes pipelines predictable, testable, and easy to reason about—perfect for "data plumbing" where state management belongs at the edges.
Not Async-First (like many modern Rust libraries)
Processing is synchronous by default. Async is an implementation detail for I/O-bound stages, not the core API. This keeps the mental model simple: a pipeline is a sequence of transformations, not a graph of futures. If you need high-concurrency async processing, FeedMe integrates cleanly but doesn't force it.
Not a DSL (like Logstash)
No embedded languages, no required configuration files. Code-first design with optional YAML support for complex configurations. This means you get compile-time safety, IDE support, and the full power of Rust's type system—while still supporting declarative configuration when needed. DSLs are great for non-programmers, but they hide complexity and limit expressiveness—FeedMe assumes you're writing code, but provides YAML as an optional convenience.
Not a Daemon (like filebeat)
No long-running services, no background processes, no auto-restart. FeedMe is designed for batch processing and streaming pipelines that you control. It's a library you embed in your application, not a tool you deploy separately. This keeps deployment simple and resource usage predictable.
This focus enables FeedMe's core guarantees while keeping the codebase small and maintainable.
Invariants
FeedMe enforces 12 non-negotiable behavioral guarantees that are tested mechanically. These invariants ensure reliability and prevent regressions. See docs/invariants.md for the complete list.
Features
- 🚀 High Performance: Streaming processing with bounded memory usage
- 🔒 Memory Safe: Bounded resource usage prevents memory leaks
- 🎯 Deterministic: Consistent output for identical inputs
- 📊 Observable: Built-in metrics and monitoring
- 🛡️ Error Resilient: Structured error handling with deadletter queues
- 🔧 Extensible: Plugin architecture for custom processing stages
- 📝 Well Documented: Comprehensive examples and API docs
Installation
Add this to your Cargo.toml:
[]
= "0.1"
= "1.0"
= "1.0"
Or install via cargo:
Quick Start
use ;
use PathBuf;
Examples
Messy Input → Clean Output
Given messy.ndjson:
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"user@example.com"}
{"level":"info","message":"Missing timestamp"}
{invalid json}
Run:
Available Examples
FeedMe includes 12 comprehensive examples covering:
- Data Cleaning: PII redaction, field validation
- Compliance: GDPR/CCPA handling
- Monitoring: Bounded metrics, observability
- ETL/Ingestion: Multi-stage pipelines
- Error Resilience: Deadletter queues
- Extensibility: Custom stages and plugins
Run any example:
See examples/ for the full list.
API Reference
- Pipeline - Core processing pipeline
- Stage - Processing stage trait
- InputSource - Data input sources
- Event - Data event structure
- Metrics - Observability metrics
Full documentation: docs.rs/feedme
Performance
FeedMe is designed for high-throughput, low-latency data processing:
- Memory: Bounded usage regardless of input size
- CPU: Efficient streaming with minimal allocations
- Observability: Metrics collection with zero runtime overhead
- Scalability: Linear processing model scales horizontally
Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Setup
Determinism Verification
FeedMe guarantees deterministic output for identical inputs. Verify this with:
# On Unix: sha256sum run1.out run2.out
# On Windows: certutil -hashfile run1.out SHA256 && certutil -hashfile run2.out SHA256
The hashes should match, proving deterministic behavior.
Code of Conduct
This project follows a code of conduct to ensure a welcoming environment for all contributors. See CODE_OF_CONDUCT.md for details.
Sponsors
FeedMe is supported by our amazing sponsors. See SPONSORS.md for details.
License
Licensed under the MIT License. See LICENSE for details.
Processed output (samples/processed.ndjson):
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"[REDACTED]"}
Deadletter (errors logged with context):
{"error":{"stage":"Input_File","code":"PARSE_ERROR","message":"expected value at line 1 column 1"},"raw":"{invalid json}"}
Metrics:
{"metric":"events_processed","value":1}
{"metric":"events_dropped","value":1}
{"metric":"errors","value":1}
{"metric":"stage_latencies","stage":"PIIRedaction","count":1,"sum":0.1,"min":0.1,"max":0.1}
More Examples
cargo run --example 01_redact_validate_deadletter: PII redaction + validation + deadlettercargo run --example 02_filter_warn_error: Filter logs to warn/error onlycargo run --example 03_field_projection: Shrink events to essential fieldscargo run --example 04_directory_ingest: Process directory of log filescargo run --example 05_custom_stage: Write and use a custom stagecargo run --example 06_syslog_parsing: Parse syslog into structured eventscargo run --example 07_metrics_export_demo: Focus on metrics collection and exportcargo run --example 08_stdin_streaming: Stream processing from stdincargo run --example 09_complex_pipeline: Multi-stage pipeline with transformscargo run --example 10_plugin_usage: Register and use custom stages via pluginscargo run --example 11_config_driven_pipeline: Load and use YAML configurationcargo run --example 12_error_handling_variations: Fail-fast vs continue with deadletter
See /examples for code.
Non-Goals
- Distributed processing
- Network I/O (except stubbed HTTP_Post)
- Persistent storage
- Query languages
- Compression or encryption
API Overview
Stage Contract
Execution Semantics
Some(event): Pass to next stageNone: Drop (filtered); ifis_output(), consumedErr: Stop pipeline, error with attribution
Core Types
Pipeline: Add stages, process events, export metricsEvent: JSON data + optional metadataInputSource: Stream from stdin/file/directoryPipelineError: Categorized errors (Parse/Transform/Validation/Output/System)