Features
Pipeline composition
- Typed builder - the carrier type is tracked at compile time across every stage transition. Wrong types are a build error, not a runtime panic.
- Closure adapters -
map,filter,filter_map,flat_map,inspect,try_mapall live on the builder. No.next()ceremony. - Custom stages - implement
Stagedirectly when state spreads across multiple fields or you need 1:N emission. - Custom sources and sinks - the
SourceandSinktraits are two methods each (withflush/closedefaults).
Backpressure, batching, windowing
- Bounded buffers - inter-stage edges are bounded; full downstream slows the upstream.
- Count / byte / age batching -
BatchPolicy::new().max_items(N).max_bytes(M).max_age(D). Triggers OR together. - Tumbling / sliding / session windows - behind a pluggable
Clocktrait. Custom clocks for deterministic tests and embedded time sources. ByteSizetrait - opt-in for byte-aware batching; blanket impls for&str,String,Vec<u8>,&[u8].
Error isolation
ErrorPolicy::FailFast(default) - first error in a stage bubbles out ofrun.ErrorPolicy::Continue- drop the failing item, keep going. Useful for noisy enrichment stages.ErrorPolicy::DeadLetter- route the failure to a dead-letterSink<Item = StageFailure>installed via.dead_letter(sink). Sink install order does not matter.StageId- every error carries the&'static strname of the stage that produced it.
Runtime and execution
SyncDriver- drives the pipeline on the calling thread; no_std-compatible.ThreadedDriver- spawns a worker thread; the calling thread blocks onjoin.Drivertrait - open for external executors. Implement for tokio, rayon, glommio, or custom thread farms.Pipeline::run,run_threaded,run_with(driver)- three terminal forms.
Reliability and safety
#![forbid(unsafe_code)]at the crate root.- Zero runtime dependencies. Built on
core+alloc(always) plusstd(default feature).proptestis a dev-only dependency. - 84 tests pass under
--all-features(32 unit + 42 integration + 10 doctest), plus 11 property tests and 4 fuzz harnesses. cargo-semver-checksruns in CI from0.9.0onward and is a hard gate from1.0.0.- MSRV 1.75, locked. Bumps require a minor version increment.
Benchmark numbers under
--releaseon a developer laptop: source -> null sink ~500 M items/s; source -> map -> sink ~260 M items/s; full three-stage chain ~170 M items/s. Seedocs/BENCH.md.
Installation
Add to Cargo.toml:
[]
= "1"
For no_std builds (data primitives and SyncDriver only):
[]
= { = "1", = false }
Feature flags
| Flag | Default | Effect |
|---|---|---|
std |
yes | ThreadedDriver, Pipeline::run_threaded, ChannelSource, ReaderSource, ChannelSink, WriterSink, VecSink, Window / Clock / SystemClock, BatchPolicy::max_age, dead-letter routing. |
The no_std build still ships the full trait surface, closure adapters, count + byte batching, the synchronous driver, and the error model. The std-only items above are the ones that need std::sync::Mutex or std::thread::spawn.
Quick start
use ;
let sink = new;
let handle = sink.handle;
from_iter
.map
.filter
.sink
.run
.expect;
assert_eq!;
Batching
use ;
let sink = new;
let handle = sink.handle;
from_iter
.batch
.map
.sink
.run
.unwrap;
assert_eq!;
Windowing
use Duration;
use ;
let sink = new;
from_iter
.window
.map
.sink
.run?;
Dead-letter routing
use ;
let dlq = new;
let dlq_handle = dlq.handle;
from_iter
.stage_id
.on_error
.try_map
.dead_letter
.sink
.run?;
Threaded driver
from_iter
.map
.sink
.run_threaded?;
Custom driver
use ;
;
More patterns are in docs/GUIDE.md and the examples/ directory.
Examples
Every example lives under examples/ and runs with cargo run --example <name>:
| Example | Demonstrates |
|---|---|
basic |
Minimal map / filter pipeline collecting into VecSink. |
batching |
BatchPolicy::new().max_items(N) count-triggered batching. |
windowing |
Tumbling window with a deterministic scripted clock. |
dead_letter |
ErrorPolicy::DeadLetter + .dead_letter(sink) routing. |
threaded |
Pipeline::run_threaded() on a spawned worker thread. |
custom_driver |
Implementing the Driver trait with timing instrumentation. |
custom_source |
Implementing Source for a stateful Fibonacci producer. |
etl |
Multi-stage CSV ETL with Continue, enrichment, batching. |
Status
1.0.0 - Stable. The public API is frozen. Backwards-incompatible changes after 1.0.0 require a major version bump per Semantic Versioning. See REPS.md section 8 for the binding policy and docs/API.md for the complete public-symbol reference.
Stability rules (from 1.0.0 forward):
- Patch (
1.0.x) - bug fixes, doc improvements, internal performance work, test additions. No new public items. - Minor (
1.x.0) - pure additions to the public surface, new opt-in features, new variants on enums reserved for growth, MSRV bumps. - Major (
2.0.0) - removes, renames, or signature changes of public symbols, or non-opt-in runtime dependency additions.
The cargo-semver-checks CI job enforces this for the public API surface from 1.0.0 onward.
Documentation
- User guide (
docs/GUIDE.md) - 11-section walkthrough of every public surface. - API reference (
docs/API.md) - offline mirror of the rustdoc on docs.rs. - Project specification (
REPS.md) - the binding public surface and stability policy. - Benchmarks (
docs/BENCH.md) - methodology and measured numbers. - Migration guide (
docs/MIGRATION.md) - per-version upgrade notes. - Release notes (
docs/release/) - one note per tagged release. - Examples (
examples/) - 8 runnable example files.
External:
- docs.rs - https://docs.rs/pipe-io
- crates.io - https://crates.io/crates/pipe-io
Version compatibility
| Crate version | MSRV | Status |
|---|---|---|
1.0.x |
1.75 |
Stable (this release) |
0.9.x |
1.75 |
Pre-1.0 stabilization |
0.x earlier |
1.75 |
Feature releases |
MSRV is locked at 1.75. A bump requires a minor version increment and a CHANGELOG entry under ### Changed with rationale, per REPS.md section 6.
Contributing
The public surface is locked at 1.0.0. Issues, bug reports, and pull requests for bug fixes, documentation improvements, and internal performance work are welcome. Additions to the public API need a minor version bump and a documented motivation in .dev/ planning material.
See .dev/DIRECTIVES.md (not published) for the project's development directives: code style, build matrix, banned-word policy, and commit conventions.
License
Licensed under the Apache License, Version 2.0. See LICENSE for the full text.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you shall be licensed as above, without any additional terms or conditions.