Pipe-it
Pipe-it is a lightweight, type-safe library for building linear and concurrent processing pipelines in Rust. It draws inspiration from the API style of the Bevy framework, using Res and ResMut to access shared resources, but with a simpler implementation.
Key Features
- Linear Composition: Chain async functions seamlessly to build processing workflows.
- Dependency Injection: Access shared resources (
Res,ResMut) or inputs (Input) directly in your function signatures. - Type Safety: Compile-time checks ensure pipeline stages connect correctly.
- State Management: Built-in
Sharedcontainer for managing global or scoped application state.
Getting Started
Cargo dependency
pipe-it = "0.2"
1. Basic Linear Pipeline
You can chain handlers (functions) together using .pipe() and .connect().
use ;
// Step 1: Takes an integer, returns an integer
async
// Step 2: Takes an integer, returns a string
async
async
Map Context:use invoke to context pass to pipline and generate new context.
use crate::;
// test context invoke api
async
2. Using Shared Resources & Mutation
You can inject shared state into any handler using Res (read-only) and ResMut (mutable).
use ;
// This handler updates the shared log AND processes the input
async
async
3. Combinators
Pipe-it provides powerful combinators to handle control flow and concurrency.
Map (Collection Processing)
Process Vec<I> inputs by applying a pipeline to each element. Supports concurrency options.
use ;
// Define an item processor
async
async
Alt (Branching & Fallback)
Use alt((A, B, ...)) to compose multiple pipelines that return PResult. The framework attempts execution sequentially until one succeeds (returns Ok).
use ;
async
async
async
Sink (Aggregation)
Use sink((A, B, ...)) to run multiple handlers on the same input and collect their results into a tuple. Supports .concurrent().
use ;
async
async
async
Cond (Conditional)
Use cond(predicate, pipeline) to execute a pipeline only if the value satisfies a condition.
use ;
async
async
async
4. Using Tags for Composition
You can decouple handler definition from usage by assigning string tags to functions using the #[node] macro, and referencing them with the tag!() macro.
use ;
// 1. Define a handler with a unique tag
async
async
5. Reusable Pipelines (Middle Pipeline)
You can define functions that return a constructed Pipeline. This is useful for encapsulating complex logic or creating reusable middleware.
use ;
async
async
// Encapsulate a chain of handlers into a single function
async
6. Integrations (Tower & Cache)
Pipe-it provides optional features to integrate with the ecosystem.
Tower Integration
Enable the tower feature in Cargo.toml. You can convert any Pipeline into a tower::Service.
use PipeService;
// Convert pipeline to a Tower Service
let service = new;
// You can also inject shared resources to be reused across requests
let service = with_shared;
Caching Results
Enable the cache feature (enabled by default). Use .cache(capacity) to cache the results of pure pipelines based on input hash.
// Cache the result of heavy_computation for the last 100 inputs
let cached_pipe = heavy_computation.pipe.cache;
Best Practices
Safety Reminder
Using try_unwrap can be dangerous in certain environments, such as tuple branches. Because each branch needs to attempt to access the data, if one branch takes ownership (occupies the data), subsequent branches will fail to access it.
Performance Benchmarks
We compared the performance of a 5-step pipeline processing a Vec<i32> of 10,000 items.
| Implementation | Time (per iter) | Overhead vs Raw | Note |
|---|---|---|---|
| Raw Rust (Move) | 1.02 µs | - | Baseline (Zero Copy) |
| Pipe-it (try_unwrap) | 1.09 µs | +7% | Recommended |
| Pipe-it (Clone) | 5.22 µs | +411% | Default usage |
Conclusion: By using
try_unwrap()to acquire ownership, the framework overhead is negligible (~0.07 µs), achieving near-native performance (5x faster than cloning).
Recommendation: Acquire ownership via try_unwrap in safe (linear) workflows to avoid cloning costs.
API Overview
Core Concepts
Handler: Any async function that takes arguments implementingFromContext.Context<I>: Holds the current inputIandSharedresources.Input<I>: A wrapper to extract the current input from the context.Res<T>/ResMut<T>: Wrappers to extract shared resources from the context.
Builders & Combinators
sink((...)): Aggregates multiple handlers. Use.concurrent()for parallel execution.alt((...)): Tries multiple handlers in sequence until one succeeds.map(pipeline): Maps a pipeline over a collection. Use.concurrent()or.sequential().cond(pred, next): Conditional execution.
Extension Methods (HandlerExt)
.pipe(): Converts a handler into aPipeline..connect(next_handler): Chains two pipelines linearly.