Module pipeline

Source
Expand description

This module defines a flexible pipeline API for defining a sequence of operations that may or may not use AI components (e.g.: semantic search, LLMs prompting, etc).

The pipeline API was inspired by general orchestration pipelines such as Airflow, Dagster and Prefect, but implemented with idiomatic Rust patterns and providing some AI-specific ops out-of-the-box along general combinators.

Pipelines are made up of one or more operations, or “ops”, each of which must implement the Op trait. The Op trait requires the implementation of only one method: call, which takes an input and returns an output. The trait provides a wide range of combinators for chaining operations together.

One can think of a pipeline as a DAG (Directed Acyclic Graph) where each node is an operation and the edges represent the data flow between operations. When invoking the pipeline on some input, the input is passed to the root node of the DAG (i.e.: the first op defined in the pipeline). The output of each op is then passed to the next op in the pipeline until the output reaches the leaf node (i.e.: the last op defined in the pipeline). The output of the leaf node is then returned as the result of the pipeline.

§Basic Example

For example, the pipeline below takes a tuple of two integers, adds them together and then formats the result as a string using the map combinator method, which applies a simple function op to the output of the previous op:

use rig::pipeline::{self, Op};

let pipeline = pipeline::new()
    // op1: add two numbers
    .map(|(x, y)| x + y)
    // op2: format result
    .map(|z| format!("Result: {z}!"));

let result = pipeline.call((1, 2)).await;
assert_eq!(result, "Result: 3!");

This pipeline can be visualized as the following DAG:

         ┌─────────┐   ┌─────────┐         
Input───►│   op1   ├──►│   op2   ├──►Output
         └─────────┘   └─────────┘         

§Parallel Operations

The pipeline API also provides a parallel! and macro for running operations in parallel. The macro takes a list of ops and turns them into a single op that will duplicate the input and run each op in concurrently. The results of each op are then collected and returned as a tuple.

For example, the pipeline below runs two operations concurrently:

use rig::{pipeline::{self, Op, map}, parallel};

let pipeline = pipeline::new()
    .chain(parallel!(
        // op1: add 1 to input
        map(|x| x + 1),
        // op2: subtract 1 from input
        map(|x| x - 1),
    ))
    // op3: format results
    .map(|(a, b)| format!("Results: {a}, {b}"));

let result = pipeline.call(1).await;
assert_eq!(result, "Result: 2, 0");

Notes:

  • The chain method is similar to the map method but it allows for chaining arbitrary operations, as long as they implement the Op trait.
  • map is a function that initializes a standalone Map op without an existing pipeline/op.

The pipeline above can be visualized as the following DAG:

          Input            
            │              
     ┌──────┴──────┐       
     ▼             ▼       
┌─────────┐   ┌─────────┐  
│   op1   │   │   op2   │  
└────┬────┘   └────┬────┘  
     └──────┬──────┘       
            ▼              
       ┌─────────┐         
       │   op3   │         
       └────┬────┘         
            │              
            ▼              
         Output           

Re-exports§

pub use op::Op;
pub use op::map;
pub use op::passthrough;
pub use op::then;
pub use try_op::TryOp;

Modules§

agent_ops
conditional
op
parallel
try_op

Structs§

PipelineBuilder

Enums§

ChainError

Functions§

new
with_error