pipe-it 0.1.0

A lightweight, type-safe library for building linear and concurrent processing pipelines in Rust.
Documentation

Pipeline Core

中文文档

Pipeline Core is a lightweight, type-safe library for building linear and concurrent processing pipelines in Rust. It utilizes dependency injection pattern similar to Bevy engines SystemParam to manage state and inputs cleanly.

Key Features

  • Linear Composition: Chain async functions seamlessly to build processing workflows.
  • Dependency Injection: Access shared resources (Res, ResMut) or inputs (Input) directly in your function signatures.
  • Type Safety: Compile-time checks ensure pipeline stages connect correctly.
  • State Management: Built-in Shared container for managing global or scoped application state.

Getting Started

1. Basic Linear Pipeline

You can chain handlers (functions) together using .pipe() and .connect().

use pipeline_core::{Context, Input, ext::HandlerExt};

// Step 1: Takes an integer, returns an integer
async fn double(num: Input<i32>) -> i32 {
    *num * 2
}

// Step 2: Takes an integer, returns a string
async fn to_string(num: Input<i32>) -> String {
    format!("Value is: {}", *num)
}

#[tokio::main]
async fn main() {
    // Define the pipeline: double -> to_string
    // The output of 'double' (i32) becomes the input context for 'to_string'
    let pipeline = double.pipe().connect(to_string);

    // execute
    let ctx = Context::empty(10); // Initial input: 10
    let result = pipeline.apply(ctx).await;

    println!("{}", result); // Output: "Value is: 20"
}

2. Using Shared Resources & Mutation

You can inject shared state into any handler using Res (read-only) and ResMut (mutable).

use pipeline_core::{Context, Shared, Input, ResMut, ext::HandlerExt};

#[derive(Default)]
struct AccessLog {
    count: usize,
}

// This handler updates the shared log AND processes the input
async fn log_and_square(input: Input<i32>, mut log: ResMut<AccessLog>) -> i32 {
    log.count += 1;
    println!("Log count: {}", log.count);
    *input * *input
}

#[tokio::main]
async fn main() {
    // 1. Setup shared state
    let shared = Shared::new()
        .insert(AccessLog::default());

    // 2. Create context with input and shared state
    let ctx = Context::new(5, shared);

    // 3. Run the pipeline
    let result = log_and_square.pipe().apply(ctx).await;

    assert_eq!(result, 25);
}

API Overview

Core Concepts

  • Handler: Any async function that takes arguments implementing FromContext.
  • Context<I>: Holds the current input I and Shared resources.
  • Input<I>: A wrapper to extract the current input from the context.
  • Res<T> / ResMut<T>: Wrappers to extract shared resources from the context.

Extension Methods (HandlerExt)

  • .pipe(): Converts a handler into a Pipeline.
  • .connect(next_handler): Chains two pipelines linearly.
  • .map(inner_pipeline): (If available) Processes Vec<I> inputs concurrently.