pipe-io 1.0.0

Typed source-transform-sink pipelines with backpressure, batching, windowing, and per-stage error isolation. A lightweight runtime-agnostic stream processor for in-process workloads. The missing middle ground between raw iterators and full distributed stream processing.
Documentation

Features

Pipeline composition

  • Typed builder - the carrier type is tracked at compile time across every stage transition. Wrong types are a build error, not a runtime panic.
  • Closure adapters - map, filter, filter_map, flat_map, inspect, try_map all live on the builder. No .next() ceremony.
  • Custom stages - implement Stage directly when state spreads across multiple fields or you need 1:N emission.
  • Custom sources and sinks - the Source and Sink traits are two methods each (with flush / close defaults).

Backpressure, batching, windowing

  • Bounded buffers - inter-stage edges are bounded; full downstream slows the upstream.
  • Count / byte / age batching - BatchPolicy::new().max_items(N).max_bytes(M).max_age(D). Triggers OR together.
  • Tumbling / sliding / session windows - behind a pluggable Clock trait. Custom clocks for deterministic tests and embedded time sources.
  • ByteSize trait - opt-in for byte-aware batching; blanket impls for &str, String, Vec<u8>, &[u8].

Error isolation

  • ErrorPolicy::FailFast (default) - first error in a stage bubbles out of run.
  • ErrorPolicy::Continue - drop the failing item, keep going. Useful for noisy enrichment stages.
  • ErrorPolicy::DeadLetter - route the failure to a dead-letter Sink<Item = StageFailure> installed via .dead_letter(sink). Sink install order does not matter.
  • StageId - every error carries the &'static str name of the stage that produced it.

Runtime and execution

  • SyncDriver - drives the pipeline on the calling thread; no_std-compatible.
  • ThreadedDriver - spawns a worker thread; the calling thread blocks on join.
  • Driver trait - open for external executors. Implement for tokio, rayon, glommio, or custom thread farms.
  • Pipeline::run, run_threaded, run_with(driver) - three terminal forms.

Reliability and safety

  • #![forbid(unsafe_code)] at the crate root.
  • Zero runtime dependencies. Built on core + alloc (always) plus std (default feature). proptest is a dev-only dependency.
  • 84 tests pass under --all-features (32 unit + 42 integration + 10 doctest), plus 11 property tests and 4 fuzz harnesses.
  • cargo-semver-checks runs in CI from 0.9.0 onward and is a hard gate from 1.0.0.
  • MSRV 1.75, locked. Bumps require a minor version increment.

Benchmark numbers under --release on a developer laptop: source -> null sink ~500 M items/s; source -> map -> sink ~260 M items/s; full three-stage chain ~170 M items/s. See docs/BENCH.md.


Installation

Add to Cargo.toml:

[dependencies]
pipe-io = "1"

For no_std builds (data primitives and SyncDriver only):

[dependencies]
pipe-io = { version = "1", default-features = false }

Feature flags

Flag Default Effect
std yes ThreadedDriver, Pipeline::run_threaded, ChannelSource, ReaderSource, ChannelSink, WriterSink, VecSink, Window / Clock / SystemClock, BatchPolicy::max_age, dead-letter routing.

The no_std build still ships the full trait surface, closure adapters, count + byte batching, the synchronous driver, and the error model. The std-only items above are the ones that need std::sync::Mutex or std::thread::spawn.


Quick start

use pipe_io::{Pipeline, sink::VecSink};

let sink = VecSink::<i64>::new();
let handle = sink.handle();

Pipeline::from_iter(1..=5)
    .map(|n: i32| i64::from(n) * 10)
    .filter(|n: &i64| *n > 20)
    .sink(sink)
    .run()
    .expect("pipeline run");

assert_eq!(handle.take(), vec![30, 40, 50]);

Batching

use pipe_io::{Pipeline, Batch, BatchPolicy, sink::VecSink};

let sink = VecSink::<Vec<u32>>::new();
let handle = sink.handle();

Pipeline::from_iter(1u32..=11)
    .batch(BatchPolicy::new().max_items(4))
    .map(|b: Batch<u32>| b.into_inner())
    .sink(sink)
    .run()
    .unwrap();

assert_eq!(handle.take().len(), 3);

Windowing

use core::time::Duration;
use pipe_io::{Pipeline, Window, WindowPolicy, sink::VecSink};

let sink = VecSink::<u64>::new();
Pipeline::from_iter(metric_stream)
    .window(WindowPolicy::Tumbling { size: Duration::from_secs(60) })
    .map(|w: Window<u64>| w.into_inner().iter().sum::<u64>())
    .sink(sink)
    .run()?;

Dead-letter routing

use pipe_io::{ErrorPolicy, Pipeline, StageFailure, sink::VecSink};

let dlq = VecSink::<StageFailure>::new();
let dlq_handle = dlq.handle();

Pipeline::from_iter(records)
    .stage_id("parse")
    .on_error(ErrorPolicy::DeadLetter)
    .try_map(parse_row)
    .dead_letter(dlq)
    .sink(main_sink)
    .run()?;

Threaded driver

Pipeline::from_iter(records)
    .map(transform)
    .sink(writer)
    .run_threaded()?;

Custom driver

use pipe_io::driver::{Driver, RunStats, SyncDriver};

struct MyDriver;

impl Driver for MyDriver {
    fn run<S>(self, pipeline: pipe_io::Pipeline<S>) -> pipe_io::Result<RunStats>
    where
        S: pipe_io::Source + Send + 'static,
        S::Item: Send + 'static,
        S::Error: Send + 'static,
    {
        SyncDriver::new().run(pipeline)
    }
}

More patterns are in docs/GUIDE.md and the examples/ directory.


Examples

Every example lives under examples/ and runs with cargo run --example <name>:

Example Demonstrates
basic Minimal map / filter pipeline collecting into VecSink.
batching BatchPolicy::new().max_items(N) count-triggered batching.
windowing Tumbling window with a deterministic scripted clock.
dead_letter ErrorPolicy::DeadLetter + .dead_letter(sink) routing.
threaded Pipeline::run_threaded() on a spawned worker thread.
custom_driver Implementing the Driver trait with timing instrumentation.
custom_source Implementing Source for a stateful Fibonacci producer.
etl Multi-stage CSV ETL with Continue, enrichment, batching.

Status

1.0.0 - Stable. The public API is frozen. Backwards-incompatible changes after 1.0.0 require a major version bump per Semantic Versioning. See REPS.md section 8 for the binding policy and docs/API.md for the complete public-symbol reference.

Stability rules (from 1.0.0 forward):

  • Patch (1.0.x) - bug fixes, doc improvements, internal performance work, test additions. No new public items.
  • Minor (1.x.0) - pure additions to the public surface, new opt-in features, new variants on enums reserved for growth, MSRV bumps.
  • Major (2.0.0) - removes, renames, or signature changes of public symbols, or non-opt-in runtime dependency additions.

The cargo-semver-checks CI job enforces this for the public API surface from 1.0.0 onward.


Documentation

External:


Version compatibility

Crate version MSRV Status
1.0.x 1.75 Stable (this release)
0.9.x 1.75 Pre-1.0 stabilization
0.x earlier 1.75 Feature releases

MSRV is locked at 1.75. A bump requires a minor version increment and a CHANGELOG entry under ### Changed with rationale, per REPS.md section 6.


Contributing

The public surface is locked at 1.0.0. Issues, bug reports, and pull requests for bug fixes, documentation improvements, and internal performance work are welcome. Additions to the public API need a minor version bump and a documented motivation in .dev/ planning material.

See .dev/DIRECTIVES.md (not published) for the project's development directives: code style, build matrix, banned-word policy, and commit conventions.


License

Licensed under the Apache License, Version 2.0. See LICENSE for the full text.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you shall be licensed as above, without any additional terms or conditions.