# Pipe-it
[](https://github.com/Jw-23/pipe-it/actions/workflows/rust.yml)
[](https://crates.io/crates/pipe-it)
[中文文档](./README_zh.md)
**Pipe-it** is a lightweight, type-safe library for building linear and concurrent processing pipelines in Rust. It draws inspiration from the API style of the Bevy framework, using Res and ResMut to access shared resources, but with a simpler implementation.
## Key Features
- **Linear Composition**: Chain async functions seamlessly to build processing workflows.
- **Dependency Injection**: Access shared resources (`Res`, `ResMut`) or inputs (`Input`) directly in your function signatures.
- **Type Safety**: Compile-time checks ensure pipeline stages connect correctly.
- **State Management**: Built-in `Shared` container for managing global or scoped application state.
## Getting Started
Cargo dependency
```cargo
pipe-it = "0.2"
```
### 1. Basic Linear Pipeline
You can chain handlers (functions) together using `.pipe()` and `.connect()`.
```rust
use pipe_it::{Context, Input, ext::HandlerExt};
// Step 1: Takes an integer, returns an integer
async fn double(num: Input<i32>) -> i32 {
*num * 2
}
// Step 2: Takes an integer, returns a string
async fn to_string(num: Input<i32>) -> String {
format!("Value is: {}", *num)
}
#[tokio::main]
async fn main() {
// Define the pipeline: double -> to_string
// The output of 'double' (i32) becomes the input context for 'to_string'
let pipeline = double.pipe().connect(to_string);
// execute
let ctx = Context::empty(10); // Initial input: 10
let result = pipeline.apply(ctx).await;
println!("{}", result); // Output: "Value is: 20"
}
```
**Map Context**:use `invoke` to context pass to pipline and generate new context.
```rust
use crate::{Context, Input, Res, ResMut, Shared};
#[derive(Debug, Clone)]
struct Counter {
c: i32,
}
#[tokio::test]
// test context invoke api
async fn test_chain_invoke() {
let ctx = Context::new(3, Shared::new().insert(Counter { c: 1 }));
ctx.invoke(async |x: Input<i32>, mut counter: ResMut<Counter>| {
counter.c += 1;
*x + 1
})
.await
.invoke(async |x: Input<i32>, counter: Res<Counter>| *x + counter.c)
.await
.invoke(async |x:Input<i32>|assert_eq!(*x,6))
.await;
}
```
### 2. Using Shared Resources & Mutation
You can inject shared state into any handler using `Res` (read-only) and `ResMut` (mutable).
```rust
use pipe_it::{Context, Shared, Input, ResMut, ext::HandlerExt};
#[derive(Default)]
struct AccessLog {
count: usize,
}
// This handler updates the shared log AND processes the input
async fn log_and_square(input: Input<i32>, mut log: ResMut<AccessLog>) -> i32 {
log.count += 1;
println!("Log count: {}", log.count);
*input * *input
}
#[tokio::main]
async fn main() {
// 1. Setup shared state
let shared = Shared::new()
.insert(AccessLog::default());
// 2. Create context with input and shared state
let ctx = Context::new(5, shared);
// 3. Run the pipeline
let result = log_and_square.pipe().apply(ctx).await;
assert_eq!(result, 25);
}
```
### 3. Combinators
Pipe-it provides powerful combinators to handle control flow and concurrency.
#### Map (Collection Processing)
Process `Vec<I>` inputs by applying a pipeline to each element. Supports concurrency options.
```rust
use pipe_it::{Context, concurrency::map, ext::HandlerExt};
// Define an item processor
async fn process_item(n: Input<i32>) -> String { n.to_string() }
#[tokio::main]
async fn main() {
let inputs = vec![1, 2, 3];
// Create a map pipeline (concurrent by default)
let pipe = map(process_item.pipe());
// Or explicit: map(process_item.pipe()).sequential()
let result = pipe.apply(Context::empty(inputs)).await;
assert_eq!(result, vec!["1", "2", "3"]);
}
```
#### Alt (Branching & Fallback)
Use `alt((A, B, ...))` to compose multiple pipelines that return `PResult`. The framework attempts execution sequentially until one succeeds (returns `Ok`).
```rust
use pipe_it::{Context, Input, PResult, PipelineError, alt, ext::HandlerExt};
async fn parse_int(s: Input<String>) -> PResult<i32> {
s.parse().map_err(|_| PipelineError::Failure { msg: "Not an int".into(), expected: "int".into() })
}
async fn default_zero(_: Input<String>) -> PResult<i32> { Ok(0) }
#[tokio::main]
async fn main() {
// Try parse_int first, if it fails, run default_zero
let pipeline = alt((parse_int.pipe(), default_zero.pipe()));
let ctx = Context::empty("not number".to_string());
assert_eq!(pipeline.apply(ctx).await.unwrap(), 0);
}
```
#### Sink (Aggregation)
Use `sink((A, B, ...))` to run multiple handlers on the same input and collect their results into a tuple. Supports `.concurrent()`.
```rust
use pipe_it::{Context, Input, sink::sink, ext::HandlerExt};
async fn task1(input: Input<i32>) -> i32 { *input + 1 }
async fn task2(input: Input<i32>) -> String { format!("val: {}", *input) }
#[tokio::main]
async fn main() {
// Run task1 and task2 concurrently
let pipeline = sink((task1, task2)).concurrent();
let result = pipeline.call(Context::empty(10)).await;
assert_eq!(result, (11, "val: 10".to_string()));
}
```
#### Cond (Conditional)
Use `cond(predicate, pipeline)` to execute a pipeline only if the value satisfies a condition.
```rust
use pipe_it::{Context, Input, cond, ext::HandlerExt};
async fn is_positive(n: Input<i32>) -> bool { *n > 0 }
async fn double(n: Input<i32>) -> i32 { *n * 2 }
#[tokio::main]
async fn main() {
let pipe = cond(is_positive, double);
let res = pipe.apply(Context::empty(10)).await;
assert_eq!(res.unwrap(), 20);
let res_fail = pipe.apply(Context::empty(-5)).await;
assert!(res_fail.is_err());
}
```
### 4. Using Tags for Composition
You can decouple handler definition from usage by assigning string tags to functions using the `#[node]` macro, and referencing them with the `tag!()` macro.
```rust
use pipe_it::{Context, Input, ext::HandlerExt, node, tag};
// 1. Define a handler with a unique tag
#[node("greet")]
async fn greet(name: Input<String>) -> String {
format!("Hello, {}!", *name)
}
#[tokio::main]
async fn main() {
// 2. Create a pipeline using the tag instead of the function name
let pipeline = tag!("greet").pipe();
let ctx = Context::empty("World".to_string());
let result = pipeline.apply(ctx).await;
println!("{}", result); // Output: "Hello, World!"
}
```
### 5. Reusable Pipelines (Middle Pipeline)
You can define functions that return a constructed `Pipeline`. This is useful for encapsulating complex logic or creating reusable middleware.
```rust
use pipe_it::{Context, Input, Pipeline, ext::HandlerExt};
async fn step_a(n: Input<i32>) -> i32 { *n + 1 }
async fn step_b(n: Input<i32>) -> String { n.to_string() }
// Encapsulate a chain of handlers into a single function
fn common_logic() -> impl Pipeline<i32, String> {
step_a.pipe().connect(step_b)
}
#[tokio::main]
async fn main() {
let pipe = common_logic();
let result = pipe.apply(Context::empty(10)).await;
println!("{}", result); // "11"
}
```
### 6. Integrations (Tower & Cache)
**Pipe-it** provides optional features to integrate with the ecosystem.
#### Tower Integration
Enable the `tower` feature in `Cargo.toml`. You can convert any `Pipeline` into a `tower::Service`.
```rust
use pipe_it::service::PipeService;
// Convert pipeline to a Tower Service
let service = PipeService::new(my_pipeline);
// You can also inject shared resources to be reused across requests
let service = PipeService::with_shared(my_pipeline, shared_resources);
```
#### Caching Results
Enable the `cache` feature (enabled by default). Use `.cache(capacity)` to cache the results of pure pipelines based on input hash.
```rust
// Cache the result of heavy_computation for the last 100 inputs
let cached_pipe = heavy_computation.pipe().cache(100);
```
## Best Practices
### Safety Reminder
Using `try_unwrap` can be dangerous in certain environments, such as **tuple branches**. Because each branch needs to attempt to access the data, if one branch takes ownership (occupies the data), subsequent branches will fail to access it.
### Performance Benchmarks
We compared the performance of a 5-step pipeline processing a `Vec<i32>` of 10,000 items.
| **Raw Rust (Move)** | 1.02 µs | - | Baseline (Zero Copy) |
| **Pipe-it (try_unwrap)** | **1.09 µs** | **+7%** | **Recommended** |
| Pipe-it (Clone) | 5.22 µs | +411% | Default usage |
> **Conclusion**: By using `try_unwrap()` to acquire ownership, the framework overhead is negligible (~0.07 µs), achieving near-native performance (5x faster than cloning).
**Recommendation**: Acquire ownership via `try_unwrap` in **safe** (linear) workflows to avoid cloning costs.
## API Overview
### Core Concepts
- **`Handler`**: Any async function that takes arguments implementing `FromContext`.
- **`Context<I>`**: Holds the current input `I` and `Shared` resources.
- **`Input<I>`**: A wrapper to extract the current input from the context.
- **`Res<T>` / `ResMut<T>`**: Wrappers to extract shared resources from the context.
### Builders & Combinators
- `sink((...))`: Aggregates multiple handlers. Use `.concurrent()` for parallel execution.
- `alt((...))`: Tries multiple handlers in sequence until one succeeds.
- `map(pipeline)`: Maps a pipeline over a collection. Use `.concurrent()` or `.sequential()`.
- `cond(pred, next)`: Conditional execution.
### Extension Methods (`HandlerExt`)
- `.pipe()`: Converts a handler into a `Pipeline`.
- `.connect(next_handler)`: Chains two pipelines linearly.